resolve hanging shutdown + pubsub proxy issue

This commit is contained in:
Cassandra Heart 2026-01-01 19:33:11 -06:00
parent 16f6991395
commit 62ad785d5f
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
3 changed files with 79 additions and 12 deletions

View File

@ -1182,6 +1182,11 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error {
e.pubsub.Unsubscribe(GLOBAL_ALERT_BITMASK, false)
e.pubsub.UnregisterValidator(GLOBAL_ALERT_BITMASK)
// Close pubsub to cancel all subscription goroutines
if err := e.pubsub.Close(); err != nil {
e.logger.Warn("error closing pubsub", zap.Error(err))
}
select {
case <-e.Done():
// Clean shutdown

View File

@ -70,6 +70,7 @@ type appScore struct {
type BlossomSub struct {
ps *blossomsub.PubSub
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
peerID peer.ID
derivedPeerID peer.ID
@ -77,6 +78,7 @@ type BlossomSub struct {
// Track which bit slices belong to which original bitmasks, used to reference
// count bitmasks for closed subscriptions
subscriptionTracker map[string][][]byte
subscriptions []*blossomsub.Subscription
subscriptionMutex sync.RWMutex
h host.Host
signKey crypto.PrivKey
@ -129,7 +131,7 @@ func NewBlossomSubWithHost(
privKey crypto.PrivKey,
bootstrapHosts []host.Host,
) *BlossomSub {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
if coreId == 0 {
logger = logger.With(zap.String("process", "master"))
} else {
@ -141,6 +143,7 @@ func NewBlossomSubWithHost(
bs := &BlossomSub{
ctx: ctx,
cancel: cancel,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
@ -500,8 +503,10 @@ func NewBlossomSub(
opts = append(opts, libp2p.ResourceManager(rm))
}
ctx, cancel := context.WithCancel(ctx)
bs := &BlossomSub{
ctx: ctx,
cancel: cancel,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
@ -966,6 +971,11 @@ func (b *BlossomSub) Subscribe(
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
// Track subscriptions for cleanup on Close
b.subscriptionMutex.Lock()
b.subscriptions = append(b.subscriptions, subs...)
b.subscriptionMutex.Unlock()
for _, sub := range subs {
copiedBitmask := make([]byte, len(bitmask))
copy(copiedBitmask[:], bitmask[:])
@ -973,7 +983,9 @@ func (b *BlossomSub) Subscribe(
go func() {
for {
b.subscribeHandler(sub, copiedBitmask, exact, handler)
if !b.subscribeHandler(sub, copiedBitmask, exact, handler) {
return
}
}
}()
}
@ -986,12 +998,14 @@ func (b *BlossomSub) Subscribe(
return nil
}
// subscribeHandler processes a single message from the subscription.
// Returns true if the loop should continue, false if it should exit.
func (b *BlossomSub) subscribeHandler(
sub *blossomsub.Subscription,
copiedBitmask []byte,
exact bool,
handler func(message *pb.Message) error,
) {
) bool {
defer func() {
if r := recover(); r != nil {
b.logger.Error(
@ -1004,16 +1018,23 @@ func (b *BlossomSub) subscribeHandler(
m, err := sub.Next(b.ctx)
if err != nil {
b.logger.Error(
"got error when fetching the next message",
// Context cancelled or subscription closed - exit the loop
b.logger.Debug(
"subscription exiting",
zap.Error(err),
)
return false
}
if m == nil {
// Subscription closed
return false
}
if bytes.Equal(m.Bitmask, copiedBitmask) || !exact {
if err = handler(m.Message); err != nil {
b.logger.Debug("message handler returned error", zap.Error(err))
}
}
return true
}
func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
@ -1920,6 +1941,19 @@ func getNetworkNamespace(network uint8) string {
// Close implements p2p.PubSub.
func (b *BlossomSub) Close() error {
// Cancel context to signal all subscription goroutines to exit
if b.cancel != nil {
b.cancel()
}
// Cancel all subscriptions to unblock any pending Next() calls
b.subscriptionMutex.Lock()
for _, sub := range b.subscriptions {
sub.Cancel()
}
b.subscriptions = nil
b.subscriptionMutex.Unlock()
return nil
}

View File

@ -26,9 +26,10 @@ type PubSubProxyServer struct {
logger *zap.Logger
// Track subscriptions and validators
subscriptions map[string]context.CancelFunc
validators map[string]validatorInfo
mu sync.RWMutex
subscriptions map[string]context.CancelFunc
validators map[string]validatorInfo
registeredBitmasks map[string]bool // bitmask key -> registered
mu sync.RWMutex
}
type validatorInfo struct {
@ -43,10 +44,11 @@ func NewPubSubProxyServer(
logger *zap.Logger,
) *PubSubProxyServer {
return &PubSubProxyServer{
pubsub: pubsub,
logger: logger,
subscriptions: make(map[string]context.CancelFunc),
validators: make(map[string]validatorInfo),
pubsub: pubsub,
logger: logger,
subscriptions: make(map[string]context.CancelFunc),
validators: make(map[string]validatorInfo),
registeredBitmasks: make(map[string]bool),
}
}
@ -174,6 +176,21 @@ func (s *PubSubProxyServer) ValidatorStream(
switch m := msg.Message.(type) {
case *protobufs.ValidationStreamMessage_Register:
reg := m.Register
bitmaskKey := string(reg.Bitmask)
// Check if validator already registered for this bitmask - the
// validator is always the same, so just noop for repeats
s.mu.RLock()
alreadyRegistered := s.registeredBitmasks[bitmaskKey]
s.mu.RUnlock()
if alreadyRegistered {
s.logger.Debug("validator already registered for bitmask, skipping",
zap.String("validator_id", reg.ValidatorId),
zap.Binary("bitmask", reg.Bitmask))
continue
}
s.logger.Debug("registering validator",
zap.String("validator_id", reg.ValidatorId),
zap.Binary("bitmask", reg.Bitmask))
@ -241,10 +258,16 @@ func (s *PubSubProxyServer) ValidatorStream(
s.logger.Error("failed to register validator", zap.Error(err))
delete(validatorCallbacks, reg.ValidatorId)
close(reqChan)
} else {
// Mark bitmask as having a registered validator
s.mu.Lock()
s.registeredBitmasks[bitmaskKey] = true
s.mu.Unlock()
}
case *protobufs.ValidationStreamMessage_Unregister:
unreg := m.Unregister
bitmaskKey := string(unreg.Bitmask)
s.logger.Debug("unregistering validator",
zap.String("validator_id", unreg.ValidatorId))
@ -252,6 +275,11 @@ func (s *PubSubProxyServer) ValidatorStream(
s.logger.Error("failed to unregister validator", zap.Error(err))
}
// Clear the bitmask registration
s.mu.Lock()
delete(s.registeredBitmasks, bitmaskKey)
s.mu.Unlock()
if ch, exists := validatorCallbacks[unreg.ValidatorId]; exists {
close(ch)
delete(validatorCallbacks, unreg.ValidatorId)