fix: slight reordering, also added named workers to trace hanging shutdowns

This commit is contained in:
Cassandra Heart 2026-02-23 04:56:23 -06:00
parent 2238333eda
commit 4e0d71409a
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
2 changed files with 55 additions and 30 deletions

View File

@ -740,11 +740,17 @@ func NewAppConsensusEngine(
appSyncHooks,
)
// Add sync provider
componentBuilder.AddWorker(engine.syncProvider.Start)
// namedWorker wraps a worker function with shutdown logging so we can
// identify which worker(s) hang during shutdown.
namedWorker := func(name string, fn func(lifecycle.SignalerContext, lifecycle.ReadyFunc)) lifecycle.ComponentWorker {
return func(ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc) {
defer engine.logger.Debug("worker stopped", zap.String("worker", name))
fn(ctx, ready)
}
}
// Add consensus
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("consensus", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
@ -802,16 +808,16 @@ func NewAppConsensusEngine(
<-ctx.Done()
<-lifecycle.AllDone(engine.voteAggregator, engine.timeoutAggregator)
})
}))
// Start app shard proposal queue processor
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("proposalQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.processAppShardProposalQueue(ctx)
})
}))
err = engine.subscribeToConsensusMessages()
if err != nil {
@ -854,82 +860,82 @@ func NewAppConsensusEngine(
}
// Add sync provider
componentBuilder.AddWorker(engine.syncProvider.Start)
componentBuilder.AddWorker(namedWorker("syncProvider", engine.syncProvider.Start))
// Start message queue processors
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("consensusMsgQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.processConsensusMessageQueue(ctx)
})
}))
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("proverMsgQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.processProverMessageQueue(ctx)
})
}))
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("frameMsgQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.processFrameMessageQueue(ctx)
})
}))
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("globalFrameMsgQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.processGlobalFrameMessageQueue(ctx)
})
}))
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("alertMsgQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.processAlertMessageQueue(ctx)
})
}))
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("peerInfoMsgQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.processPeerInfoMessageQueue(ctx)
})
}))
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("dispatchMsgQueue", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.processDispatchMessageQueue(ctx)
})
}))
// Start event distributor event loop
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("eventDistributor", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.eventDistributorLoop(ctx)
})
}))
// Start metrics update goroutine
componentBuilder.AddWorker(func(
componentBuilder.AddWorker(namedWorker("metricsLoop", func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
engine.updateMetricsLoop(ctx)
})
}))
engine.ComponentManager = componentBuilder.Build()
if hgWithShutdown, ok := engine.hyperSync.(interface {
@ -977,13 +983,18 @@ func (e *AppConsensusEngine) Stop(force bool) <-chan error {
// Wait briefly for component workers to finish. If they don't exit in
// time, close pubsub to cancel subscription goroutines that may be
// keeping handlers alive and preventing clean shutdown.
e.logger.Info("app engine shutdown: waiting for workers")
select {
case <-e.Done():
e.logger.Info("app engine shutdown: all workers stopped")
case <-time.After(5 * time.Second):
e.logger.Warn("app engine shutdown: workers still running after 5s, closing pubsub")
e.pubsub.Close()
select {
case <-e.Done():
e.logger.Info("app engine shutdown: all workers stopped after pubsub close")
case <-time.After(25 * time.Second):
e.logger.Error("app engine shutdown: timed out after 30s")
if !force {
errChan <- errors.New("timeout waiting for app engine shutdown")
}
@ -1140,6 +1151,16 @@ func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoo
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Cancel sync when the engine shuts down so this goroutine doesn't
// outlive the engine and hold resources (locks, connections).
go func() {
select {
case <-e.ShutdownSignal():
cancel()
case <-ctx.Done():
}
}()
shardKey := tries.ShardKey{
L1: [3]byte{0x00, 0x00, 0x00},
L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,

View File

@ -167,11 +167,10 @@ func (r *DataWorkerIPCServer) Respawn(
}
func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
if r.server != nil {
r.logger.Info("stopping server for respawn")
r.server.GracefulStop()
r.server = nil
}
// Cancel the engine context BEFORE stopping the gRPC server. GracefulStop
// waits for all in-flight RPCs (e.g. HyperStream, PerformSync) to
// complete, but those handlers won't stop until the engine context is
// cancelled. Reversing the order avoids a deadlock.
if r.appConsensusEngine != nil {
if r.cancel != nil {
r.cancel()
@ -179,6 +178,11 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
<-r.appConsensusEngine.Stop(false)
r.appConsensusEngine = nil
}
if r.server != nil {
r.logger.Info("stopping server for respawn")
r.server.GracefulStop()
r.server = nil
}
// Establish an auth provider
r.authProvider = p2p.NewPeerAuthenticator(