From 4b1cda5455701c48dfde20ee5b85e36f56540645 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Mon, 23 Feb 2026 22:47:13 -0600 Subject: [PATCH] fix: remove pubsub stop from app consensus engine as it shouldn't manage pubsub lifecycle, integrate shutdown context to PerformSync to prevent stuck syncs from halting respawn --- hypergraph/sync_client_driven.go | 3 ++- node/consensus/app/app_consensus_engine.go | 20 +++++----------- node/datarpc/data_worker_ipc_server.go | 28 ++++++++++++++++++++-- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/hypergraph/sync_client_driven.go b/hypergraph/sync_client_driven.go index 232be42..3b035a1 100644 --- a/hypergraph/sync_client_driven.go +++ b/hypergraph/sync_client_driven.go @@ -68,7 +68,8 @@ func isGlobalProverShardBytes(shardKeyBytes []byte) bool { func (hg *HypergraphCRDT) PerformSync( stream protobufs.HypergraphComparisonService_PerformSyncServer, ) error { - ctx := stream.Context() + ctx, shutdownCancel := hg.contextWithShutdown(stream.Context()) + defer shutdownCancel() logger := hg.logger.With(zap.String("method", "PerformSync")) sessionStart := time.Now() diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 15cebd8..18490a4 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -980,24 +980,16 @@ func (e *AppConsensusEngine) Stop(force bool) <-chan error { e.pubsub.Unsubscribe(e.getDispatchMessageBitmask(), false) e.pubsub.UnregisterValidator(e.getDispatchMessageBitmask()) - // Wait briefly for component workers to finish. If they don't exit in - // time, close pubsub to cancel subscription goroutines that may be - // keeping handlers alive and preventing clean shutdown. + // Wait for component workers to finish. The IPC server owns the pubsub + // lifecycle, so we don't close it here (doing so would break respawns). e.logger.Info("app engine shutdown: waiting for workers") select { case <-e.Done(): e.logger.Info("app engine shutdown: all workers stopped") - case <-time.After(5 * time.Second): - e.logger.Warn("app engine shutdown: workers still running after 5s, closing pubsub") - e.pubsub.Close() - select { - case <-e.Done(): - e.logger.Info("app engine shutdown: all workers stopped after pubsub close") - case <-time.After(25 * time.Second): - e.logger.Error("app engine shutdown: timed out after 30s") - if !force { - errChan <- errors.New("timeout waiting for app engine shutdown") - } + case <-time.After(30 * time.Second): + e.logger.Error("app engine shutdown: timed out after 30s") + if !force { + errChan <- errors.New("timeout waiting for app engine shutdown") } } diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index c320366..cdb77f9 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -146,7 +146,19 @@ func (r *DataWorkerIPCServer) Stop() error { r.pubsub.Close() if r.server != nil { - r.server.GracefulStop() + stopped := make(chan struct{}) + srv := r.server + go func() { + srv.GracefulStop() + close(stopped) + }() + select { + case <-stopped: + case <-time.After(5 * time.Second): + r.logger.Warn("server graceful stop timed out during shutdown, forcing") + srv.Stop() + <-stopped + } } if r.peerInfoCancel != nil { r.peerInfoCancel() @@ -180,7 +192,19 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { } if r.server != nil { r.logger.Info("stopping server for respawn") - r.server.GracefulStop() + stopped := make(chan struct{}) + srv := r.server + go func() { + srv.GracefulStop() + close(stopped) + }() + select { + case <-stopped: + case <-time.After(5 * time.Second): + r.logger.Warn("server graceful stop timed out, forcing stop") + srv.Stop() + <-stopped + } r.server = nil }