From 62ad785d5f809285f4bc6d879f522c93f63aa50d Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 1 Jan 2026 19:33:11 -0600 Subject: [PATCH] resolve hanging shutdown + pubsub proxy issue --- .../global/global_consensus_engine.go | 5 +++ node/p2p/blossomsub.go | 44 ++++++++++++++++--- node/rpc/pubsub_proxy.go | 42 +++++++++++++++--- 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 67f111e..1f774c1 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -1182,6 +1182,11 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error { e.pubsub.Unsubscribe(GLOBAL_ALERT_BITMASK, false) e.pubsub.UnregisterValidator(GLOBAL_ALERT_BITMASK) + // Close pubsub to cancel all subscription goroutines + if err := e.pubsub.Close(); err != nil { + e.logger.Warn("error closing pubsub", zap.Error(err)) + } + select { case <-e.Done(): // Clean shutdown diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 3dc1525..d152544 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -70,6 +70,7 @@ type appScore struct { type BlossomSub struct { ps *blossomsub.PubSub ctx context.Context + cancel context.CancelFunc logger *zap.Logger peerID peer.ID derivedPeerID peer.ID @@ -77,6 +78,7 @@ type BlossomSub struct { // Track which bit slices belong to which original bitmasks, used to reference // count bitmasks for closed subscriptions subscriptionTracker map[string][][]byte + subscriptions []*blossomsub.Subscription subscriptionMutex sync.RWMutex h host.Host signKey crypto.PrivKey @@ -129,7 +131,7 @@ func NewBlossomSubWithHost( privKey crypto.PrivKey, bootstrapHosts []host.Host, ) *BlossomSub { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) if coreId == 0 { logger = logger.With(zap.String("process", "master")) } else { @@ -141,6 +143,7 @@ func NewBlossomSubWithHost( bs := &BlossomSub{ ctx: ctx, + cancel: cancel, logger: logger, bitmaskMap: make(map[string]*blossomsub.Bitmask), subscriptionTracker: make(map[string][][]byte), @@ -500,8 +503,10 @@ func NewBlossomSub( opts = append(opts, libp2p.ResourceManager(rm)) } + ctx, cancel := context.WithCancel(ctx) bs := &BlossomSub{ ctx: ctx, + cancel: cancel, logger: logger, bitmaskMap: make(map[string]*blossomsub.Bitmask), subscriptionTracker: make(map[string][][]byte), @@ -966,6 +971,11 @@ func (b *BlossomSub) Subscribe( zap.String("bitmask", hex.EncodeToString(bitmask)), ) + // Track subscriptions for cleanup on Close + b.subscriptionMutex.Lock() + b.subscriptions = append(b.subscriptions, subs...) + b.subscriptionMutex.Unlock() + for _, sub := range subs { copiedBitmask := make([]byte, len(bitmask)) copy(copiedBitmask[:], bitmask[:]) @@ -973,7 +983,9 @@ func (b *BlossomSub) Subscribe( go func() { for { - b.subscribeHandler(sub, copiedBitmask, exact, handler) + if !b.subscribeHandler(sub, copiedBitmask, exact, handler) { + return + } } }() } @@ -986,12 +998,14 @@ func (b *BlossomSub) Subscribe( return nil } +// subscribeHandler processes a single message from the subscription. +// Returns true if the loop should continue, false if it should exit. func (b *BlossomSub) subscribeHandler( sub *blossomsub.Subscription, copiedBitmask []byte, exact bool, handler func(message *pb.Message) error, -) { +) bool { defer func() { if r := recover(); r != nil { b.logger.Error( @@ -1004,16 +1018,23 @@ func (b *BlossomSub) subscribeHandler( m, err := sub.Next(b.ctx) if err != nil { - b.logger.Error( - "got error when fetching the next message", + // Context cancelled or subscription closed - exit the loop + b.logger.Debug( + "subscription exiting", zap.Error(err), ) + return false + } + if m == nil { + // Subscription closed + return false } if bytes.Equal(m.Bitmask, copiedBitmask) || !exact { if err = handler(m.Message); err != nil { b.logger.Debug("message handler returned error", zap.Error(err)) } } + return true } func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) { @@ -1920,6 +1941,19 @@ func getNetworkNamespace(network uint8) string { // Close implements p2p.PubSub. func (b *BlossomSub) Close() error { + // Cancel context to signal all subscription goroutines to exit + if b.cancel != nil { + b.cancel() + } + + // Cancel all subscriptions to unblock any pending Next() calls + b.subscriptionMutex.Lock() + for _, sub := range b.subscriptions { + sub.Cancel() + } + b.subscriptions = nil + b.subscriptionMutex.Unlock() + return nil } diff --git a/node/rpc/pubsub_proxy.go b/node/rpc/pubsub_proxy.go index 7cf3fbf..a05f216 100644 --- a/node/rpc/pubsub_proxy.go +++ b/node/rpc/pubsub_proxy.go @@ -26,9 +26,10 @@ type PubSubProxyServer struct { logger *zap.Logger // Track subscriptions and validators - subscriptions map[string]context.CancelFunc - validators map[string]validatorInfo - mu sync.RWMutex + subscriptions map[string]context.CancelFunc + validators map[string]validatorInfo + registeredBitmasks map[string]bool // bitmask key -> registered + mu sync.RWMutex } type validatorInfo struct { @@ -43,10 +44,11 @@ func NewPubSubProxyServer( logger *zap.Logger, ) *PubSubProxyServer { return &PubSubProxyServer{ - pubsub: pubsub, - logger: logger, - subscriptions: make(map[string]context.CancelFunc), - validators: make(map[string]validatorInfo), + pubsub: pubsub, + logger: logger, + subscriptions: make(map[string]context.CancelFunc), + validators: make(map[string]validatorInfo), + registeredBitmasks: make(map[string]bool), } } @@ -174,6 +176,21 @@ func (s *PubSubProxyServer) ValidatorStream( switch m := msg.Message.(type) { case *protobufs.ValidationStreamMessage_Register: reg := m.Register + bitmaskKey := string(reg.Bitmask) + + // Check if validator already registered for this bitmask - the + // validator is always the same, so just noop for repeats + s.mu.RLock() + alreadyRegistered := s.registeredBitmasks[bitmaskKey] + s.mu.RUnlock() + + if alreadyRegistered { + s.logger.Debug("validator already registered for bitmask, skipping", + zap.String("validator_id", reg.ValidatorId), + zap.Binary("bitmask", reg.Bitmask)) + continue + } + s.logger.Debug("registering validator", zap.String("validator_id", reg.ValidatorId), zap.Binary("bitmask", reg.Bitmask)) @@ -241,10 +258,16 @@ func (s *PubSubProxyServer) ValidatorStream( s.logger.Error("failed to register validator", zap.Error(err)) delete(validatorCallbacks, reg.ValidatorId) close(reqChan) + } else { + // Mark bitmask as having a registered validator + s.mu.Lock() + s.registeredBitmasks[bitmaskKey] = true + s.mu.Unlock() } case *protobufs.ValidationStreamMessage_Unregister: unreg := m.Unregister + bitmaskKey := string(unreg.Bitmask) s.logger.Debug("unregistering validator", zap.String("validator_id", unreg.ValidatorId)) @@ -252,6 +275,11 @@ func (s *PubSubProxyServer) ValidatorStream( s.logger.Error("failed to unregister validator", zap.Error(err)) } + // Clear the bitmask registration + s.mu.Lock() + delete(s.registeredBitmasks, bitmaskKey) + s.mu.Unlock() + if ch, exists := validatorCallbacks[unreg.ValidatorId]; exists { close(ch) delete(validatorCallbacks, unreg.ValidatorId)