fix: remove pubsub stop from app consensus engine as it shouldn't manage pubsub lifecycle, integrate shutdown context to PerformSync to prevent stuck syncs from halting respawn

This commit is contained in:
Cassandra Heart 2026-02-23 22:47:13 -06:00
parent 4a4543fae9
commit 4b1cda5455
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
3 changed files with 34 additions and 17 deletions

View File

@ -68,7 +68,8 @@ func isGlobalProverShardBytes(shardKeyBytes []byte) bool {
func (hg *HypergraphCRDT) PerformSync(
stream protobufs.HypergraphComparisonService_PerformSyncServer,
) error {
ctx := stream.Context()
ctx, shutdownCancel := hg.contextWithShutdown(stream.Context())
defer shutdownCancel()
logger := hg.logger.With(zap.String("method", "PerformSync"))
sessionStart := time.Now()

View File

@ -980,24 +980,16 @@ func (e *AppConsensusEngine) Stop(force bool) <-chan error {
e.pubsub.Unsubscribe(e.getDispatchMessageBitmask(), false)
e.pubsub.UnregisterValidator(e.getDispatchMessageBitmask())
// Wait briefly for component workers to finish. If they don't exit in
// time, close pubsub to cancel subscription goroutines that may be
// keeping handlers alive and preventing clean shutdown.
// Wait for component workers to finish. The IPC server owns the pubsub
// lifecycle, so we don't close it here (doing so would break respawns).
e.logger.Info("app engine shutdown: waiting for workers")
select {
case <-e.Done():
e.logger.Info("app engine shutdown: all workers stopped")
case <-time.After(5 * time.Second):
e.logger.Warn("app engine shutdown: workers still running after 5s, closing pubsub")
e.pubsub.Close()
select {
case <-e.Done():
e.logger.Info("app engine shutdown: all workers stopped after pubsub close")
case <-time.After(25 * time.Second):
e.logger.Error("app engine shutdown: timed out after 30s")
if !force {
errChan <- errors.New("timeout waiting for app engine shutdown")
}
case <-time.After(30 * time.Second):
e.logger.Error("app engine shutdown: timed out after 30s")
if !force {
errChan <- errors.New("timeout waiting for app engine shutdown")
}
}

View File

@ -146,7 +146,19 @@ func (r *DataWorkerIPCServer) Stop() error {
r.pubsub.Close()
if r.server != nil {
r.server.GracefulStop()
stopped := make(chan struct{})
srv := r.server
go func() {
srv.GracefulStop()
close(stopped)
}()
select {
case <-stopped:
case <-time.After(5 * time.Second):
r.logger.Warn("server graceful stop timed out during shutdown, forcing")
srv.Stop()
<-stopped
}
}
if r.peerInfoCancel != nil {
r.peerInfoCancel()
@ -180,7 +192,19 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
}
if r.server != nil {
r.logger.Info("stopping server for respawn")
r.server.GracefulStop()
stopped := make(chan struct{})
srv := r.server
go func() {
srv.GracefulStop()
close(stopped)
}()
select {
case <-stopped:
case <-time.After(5 * time.Second):
r.logger.Warn("server graceful stop timed out, forcing stop")
srv.Stop()
<-stopped
}
r.server = nil
}