fix: blossomsub pubsub interface does not properly track subscription status

This commit is contained in:
Cassandra Heart 2026-02-24 00:31:49 -06:00
parent 4b1cda5455
commit 9e676f8304
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
2 changed files with 205 additions and 19 deletions

View File

@ -85,8 +85,11 @@ type BlossomSub struct {
// Track which bit slices belong to which original bitmasks, used to reference
// count bitmasks for closed subscriptions
subscriptionTracker map[string][][]byte
subscriptions []*blossomsub.Subscription
subscriptionMutex sync.RWMutex
// Track subscriptions per bitmask key so Unsubscribe can cancel them
// before closing the bitmask (blossomsub refuses to close a bitmask
// with open subscriptions).
subscriptionsByBitmask map[string][]*blossomsub.Subscription
subscriptionMutex sync.RWMutex
h host.Host
signKey crypto.PrivKey
peerScore map[string]*appScore
@ -156,11 +159,12 @@ func NewBlossomSubWithHost(
cancel: cancel,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
coreId: coreId,
subscriptionTracker: make(map[string][][]byte),
subscriptionsByBitmask: make(map[string][]*blossomsub.Subscription),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
coreId: coreId,
}
idService := internal.IDServiceFromHost(host)
@ -539,13 +543,14 @@ func NewBlossomSub(
cancel: cancel,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
derivedPeerID: derivedPeerId,
coreId: coreId,
configDir: configDir,
subscriptionTracker: make(map[string][][]byte),
subscriptionsByBitmask: make(map[string][]*blossomsub.Subscription),
signKey: privKey,
peerScore: make(map[string]*appScore),
p2pConfig: *p2pConfig,
derivedPeerID: derivedPeerId,
coreId: coreId,
configDir: configDir,
}
h, err := libp2p.New(opts...)
@ -1069,9 +1074,9 @@ func (b *BlossomSub) Subscribe(
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
// Track subscriptions for cleanup on Close
// Track subscriptions per bitmask for cleanup
b.subscriptionMutex.Lock()
b.subscriptions = append(b.subscriptions, subs...)
b.subscriptionsByBitmask[string(bitmask)] = subs
b.subscriptionMutex.Unlock()
for _, sub := range subs {
@ -1154,6 +1159,15 @@ func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
zap.String("bitmask", hex.EncodeToString(bitmask)),
)
// Cancel the subscription objects so the bitmask can be closed and the
// subscription goroutines exit.
if subs, ok := b.subscriptionsByBitmask[bitmaskKey]; ok {
for _, sub := range subs {
sub.Cancel()
}
delete(b.subscriptionsByBitmask, bitmaskKey)
}
// Check each bit slice to see if it's still needed by other subscriptions
for _, bitSlice := range bitSlices {
bitSliceKey := string(bitSlice)
@ -2117,10 +2131,12 @@ func (b *BlossomSub) Close() error {
// Cancel all subscriptions to unblock any pending Next() calls
b.subscriptionMutex.Lock()
for _, sub := range b.subscriptions {
sub.Cancel()
for _, subs := range b.subscriptionsByBitmask {
for _, sub := range subs {
sub.Cancel()
}
}
b.subscriptions = nil
b.subscriptionsByBitmask = nil
b.subscriptionMutex.Unlock()
return nil

170
node/p2p/blossomsub_test.go Normal file
View File

@ -0,0 +1,170 @@
package p2p
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"go.uber.org/zap"
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
)
// newTestBlossomSub creates a minimal BlossomSub wrapper suitable for testing
// Subscribe/Unsubscribe without the full DHT/discovery/bootstrap setup.
func newTestBlossomSub(t *testing.T) *BlossomSub {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
h, err := libp2p.New(
libp2p.ResourceManager(&network.NullResourceManager{}),
)
if err != nil {
t.Fatal(err)
}
ps, err := blossomsub.NewBlossomSub(ctx, h)
if err != nil {
h.Close()
cancel()
t.Fatal(err)
}
bs := &BlossomSub{
ctx: ctx,
cancel: cancel,
logger: zap.NewNop(),
ps: ps,
h: h,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
subscriptionTracker: make(map[string][][]byte),
subscriptionsByBitmask: make(map[string][]*blossomsub.Subscription),
}
bs.p2pConfig.SubscriptionQueueSize = 128
t.Cleanup(func() {
cancel()
h.Close()
})
return bs
}
func noopHandler(*pb.Message) error { return nil }
// TestUnsubscribeAllowsResubscribe is the critical regression test. It verifies
// that after Unsubscribe, the same bitmask can be subscribed to again. Before
// the fix, bm.Close() silently failed because subscriptions were still open,
// and the subsequent ps.Join() returned an error because the bitmask was still
// registered.
func TestUnsubscribeAllowsResubscribe(t *testing.T) {
bs := newTestBlossomSub(t)
bitmask := []byte{0x01}
// First subscribe
if err := bs.Subscribe(bitmask, noopHandler); err != nil {
t.Fatalf("first Subscribe failed: %v", err)
}
// Unsubscribe must cancel subs before Close so Close succeeds
bs.Unsubscribe(bitmask, false)
// Re-subscribe this fails without the fix because the bitmask is still
// registered in the pubsub (Close was a silent no-op).
if err := bs.Subscribe(bitmask, noopHandler); err != nil {
t.Fatalf("re-Subscribe after Unsubscribe failed: %v", err)
}
// Clean up
bs.Unsubscribe(bitmask, false)
}
// TestUnsubscribeTracksPerBitmask verifies that subscribing to multiple
// bitmasks tracks them independently and unsubscribing one doesn't affect
// the other.
func TestUnsubscribeTracksPerBitmask(t *testing.T) {
bs := newTestBlossomSub(t)
bitmaskA := []byte{0x01}
bitmaskB := []byte{0x02}
// Subscribe to both
if err := bs.Subscribe(bitmaskA, noopHandler); err != nil {
t.Fatalf("Subscribe A failed: %v", err)
}
if err := bs.Subscribe(bitmaskB, noopHandler); err != nil {
t.Fatalf("Subscribe B failed: %v", err)
}
// Both should be tracked
bs.subscriptionMutex.RLock()
if _, ok := bs.subscriptionsByBitmask[string(bitmaskA)]; !ok {
t.Error("bitmask A not tracked in subscriptionsByBitmask")
}
if _, ok := bs.subscriptionsByBitmask[string(bitmaskB)]; !ok {
t.Error("bitmask B not tracked in subscriptionsByBitmask")
}
bs.subscriptionMutex.RUnlock()
// Unsubscribe A only
bs.Unsubscribe(bitmaskA, false)
bs.subscriptionMutex.RLock()
if _, ok := bs.subscriptionsByBitmask[string(bitmaskA)]; ok {
t.Error("bitmask A still tracked after Unsubscribe")
}
if _, ok := bs.subscriptionsByBitmask[string(bitmaskB)]; !ok {
t.Error("bitmask B should still be tracked")
}
bs.subscriptionMutex.RUnlock()
// A should be re-subscribable (Close succeeded)
if err := bs.Subscribe(bitmaskA, noopHandler); err != nil {
t.Fatalf("re-Subscribe A after Unsubscribe failed: %v", err)
}
// Unsubscribe both
bs.Unsubscribe(bitmaskA, false)
bs.Unsubscribe(bitmaskB, false)
bs.subscriptionMutex.RLock()
if len(bs.subscriptionsByBitmask) != 0 {
t.Errorf("subscriptionsByBitmask should be empty, got %d entries",
len(bs.subscriptionsByBitmask))
}
bs.subscriptionMutex.RUnlock()
}
// TestUnsubscribeHandlerExits verifies that after Unsubscribe, the handler
// goroutine actually stops. sub.Cancel() unblocks the sub.Next() call in the
// goroutine, causing it to return false and exit.
func TestUnsubscribeHandlerExits(t *testing.T) {
bs := newTestBlossomSub(t)
bitmask := []byte{0x01}
var calls atomic.Int32
handler := func(*pb.Message) error {
calls.Add(1)
return nil
}
if err := bs.Subscribe(bitmask, handler); err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
bs.Unsubscribe(bitmask, false)
// Give the goroutine time to observe the cancellation and exit.
time.Sleep(100 * time.Millisecond)
snapshot := calls.Load()
// Wait again and verify no further increments.
time.Sleep(100 * time.Millisecond)
if got := calls.Load(); got != snapshot {
t.Errorf("handler still running after Unsubscribe: calls went from %d to %d",
snapshot, got)
}
}