From 4e0d71409a31ad4be45a548709cb943e336b3ed6 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Mon, 23 Feb 2026 04:56:23 -0600 Subject: [PATCH] fix: slight reordering, also added named workers to trace hanging shutdowns --- node/consensus/app/app_consensus_engine.go | 71 ++++++++++++++-------- node/datarpc/data_worker_ipc_server.go | 14 +++-- 2 files changed, 55 insertions(+), 30 deletions(-) diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 800a346..15cebd8 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -740,11 +740,17 @@ func NewAppConsensusEngine( appSyncHooks, ) - // Add sync provider - componentBuilder.AddWorker(engine.syncProvider.Start) + // namedWorker wraps a worker function with shutdown logging so we can + // identify which worker(s) hang during shutdown. + namedWorker := func(name string, fn func(lifecycle.SignalerContext, lifecycle.ReadyFunc)) lifecycle.ComponentWorker { + return func(ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc) { + defer engine.logger.Debug("worker stopped", zap.String("worker", name)) + fn(ctx, ready) + } + } // Add consensus - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("consensus", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { @@ -802,16 +808,16 @@ func NewAppConsensusEngine( <-ctx.Done() <-lifecycle.AllDone(engine.voteAggregator, engine.timeoutAggregator) - }) + })) // Start app shard proposal queue processor - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("proposalQueue", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.processAppShardProposalQueue(ctx) - }) + })) err = engine.subscribeToConsensusMessages() if err != nil { @@ -854,82 +860,82 @@ func NewAppConsensusEngine( } // Add sync provider - componentBuilder.AddWorker(engine.syncProvider.Start) + componentBuilder.AddWorker(namedWorker("syncProvider", engine.syncProvider.Start)) // Start message queue processors - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("consensusMsgQueue", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.processConsensusMessageQueue(ctx) - }) + })) - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("proverMsgQueue", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.processProverMessageQueue(ctx) - }) + })) - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("frameMsgQueue", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.processFrameMessageQueue(ctx) - }) + })) - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("globalFrameMsgQueue", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.processGlobalFrameMessageQueue(ctx) - }) + })) - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("alertMsgQueue", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.processAlertMessageQueue(ctx) - }) + })) - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("peerInfoMsgQueue", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.processPeerInfoMessageQueue(ctx) - }) + })) - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("dispatchMsgQueue", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.processDispatchMessageQueue(ctx) - }) + })) // Start event distributor event loop - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("eventDistributor", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.eventDistributorLoop(ctx) - }) + })) // Start metrics update goroutine - componentBuilder.AddWorker(func( + componentBuilder.AddWorker(namedWorker("metricsLoop", func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) { ready() engine.updateMetricsLoop(ctx) - }) + })) engine.ComponentManager = componentBuilder.Build() if hgWithShutdown, ok := engine.hyperSync.(interface { @@ -977,13 +983,18 @@ func (e *AppConsensusEngine) Stop(force bool) <-chan error { // Wait briefly for component workers to finish. If they don't exit in // time, close pubsub to cancel subscription goroutines that may be // keeping handlers alive and preventing clean shutdown. + e.logger.Info("app engine shutdown: waiting for workers") select { case <-e.Done(): + e.logger.Info("app engine shutdown: all workers stopped") case <-time.After(5 * time.Second): + e.logger.Warn("app engine shutdown: workers still running after 5s, closing pubsub") e.pubsub.Close() select { case <-e.Done(): + e.logger.Info("app engine shutdown: all workers stopped after pubsub close") case <-time.After(25 * time.Second): + e.logger.Error("app engine shutdown: timed out after 30s") if !force { errChan <- errors.New("timeout waiting for app engine shutdown") } @@ -1140,6 +1151,16 @@ func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoo ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Cancel sync when the engine shuts down so this goroutine doesn't + // outlive the engine and hold resources (locks, connections). + go func() { + select { + case <-e.ShutdownSignal(): + cancel() + case <-ctx.Done(): + } + }() + shardKey := tries.ShardKey{ L1: [3]byte{0x00, 0x00, 0x00}, L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS, diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index c019097..c320366 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -167,11 +167,10 @@ func (r *DataWorkerIPCServer) Respawn( } func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { - if r.server != nil { - r.logger.Info("stopping server for respawn") - r.server.GracefulStop() - r.server = nil - } + // Cancel the engine context BEFORE stopping the gRPC server. GracefulStop + // waits for all in-flight RPCs (e.g. HyperStream, PerformSync) to + // complete, but those handlers won't stop until the engine context is + // cancelled. Reversing the order avoids a deadlock. if r.appConsensusEngine != nil { if r.cancel != nil { r.cancel() @@ -179,6 +178,11 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { <-r.appConsensusEngine.Stop(false) r.appConsensusEngine = nil } + if r.server != nil { + r.logger.Info("stopping server for respawn") + r.server.GracefulStop() + r.server = nil + } // Establish an auth provider r.authProvider = p2p.NewPeerAuthenticator(