From 9e676f8304fec44f8cbe1d22487f7c0bfff0aec6 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Tue, 24 Feb 2026 00:31:49 -0600 Subject: [PATCH] fix: blossomsub pubsub interface does not properly track subscription status --- node/p2p/blossomsub.go | 54 ++++++++---- node/p2p/blossomsub_test.go | 170 ++++++++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 19 deletions(-) create mode 100644 node/p2p/blossomsub_test.go diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 8149116..cab28d9 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -85,8 +85,11 @@ type BlossomSub struct { // Track which bit slices belong to which original bitmasks, used to reference // count bitmasks for closed subscriptions subscriptionTracker map[string][][]byte - subscriptions []*blossomsub.Subscription - subscriptionMutex sync.RWMutex + // Track subscriptions per bitmask key so Unsubscribe can cancel them + // before closing the bitmask (blossomsub refuses to close a bitmask + // with open subscriptions). + subscriptionsByBitmask map[string][]*blossomsub.Subscription + subscriptionMutex sync.RWMutex h host.Host signKey crypto.PrivKey peerScore map[string]*appScore @@ -156,11 +159,12 @@ func NewBlossomSubWithHost( cancel: cancel, logger: logger, bitmaskMap: make(map[string]*blossomsub.Bitmask), - subscriptionTracker: make(map[string][][]byte), - signKey: privKey, - peerScore: make(map[string]*appScore), - p2pConfig: *p2pConfig, - coreId: coreId, + subscriptionTracker: make(map[string][][]byte), + subscriptionsByBitmask: make(map[string][]*blossomsub.Subscription), + signKey: privKey, + peerScore: make(map[string]*appScore), + p2pConfig: *p2pConfig, + coreId: coreId, } idService := internal.IDServiceFromHost(host) @@ -539,13 +543,14 @@ func NewBlossomSub( cancel: cancel, logger: logger, bitmaskMap: make(map[string]*blossomsub.Bitmask), - subscriptionTracker: make(map[string][][]byte), - signKey: privKey, - peerScore: make(map[string]*appScore), - p2pConfig: *p2pConfig, - derivedPeerID: derivedPeerId, - coreId: coreId, - configDir: configDir, + subscriptionTracker: make(map[string][][]byte), + subscriptionsByBitmask: make(map[string][]*blossomsub.Subscription), + signKey: privKey, + peerScore: make(map[string]*appScore), + p2pConfig: *p2pConfig, + derivedPeerID: derivedPeerId, + coreId: coreId, + configDir: configDir, } h, err := libp2p.New(opts...) @@ -1069,9 +1074,9 @@ func (b *BlossomSub) Subscribe( zap.String("bitmask", hex.EncodeToString(bitmask)), ) - // Track subscriptions for cleanup on Close + // Track subscriptions per bitmask for cleanup b.subscriptionMutex.Lock() - b.subscriptions = append(b.subscriptions, subs...) + b.subscriptionsByBitmask[string(bitmask)] = subs b.subscriptionMutex.Unlock() for _, sub := range subs { @@ -1154,6 +1159,15 @@ func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) { zap.String("bitmask", hex.EncodeToString(bitmask)), ) + // Cancel the subscription objects so the bitmask can be closed and the + // subscription goroutines exit. + if subs, ok := b.subscriptionsByBitmask[bitmaskKey]; ok { + for _, sub := range subs { + sub.Cancel() + } + delete(b.subscriptionsByBitmask, bitmaskKey) + } + // Check each bit slice to see if it's still needed by other subscriptions for _, bitSlice := range bitSlices { bitSliceKey := string(bitSlice) @@ -2117,10 +2131,12 @@ func (b *BlossomSub) Close() error { // Cancel all subscriptions to unblock any pending Next() calls b.subscriptionMutex.Lock() - for _, sub := range b.subscriptions { - sub.Cancel() + for _, subs := range b.subscriptionsByBitmask { + for _, sub := range subs { + sub.Cancel() + } } - b.subscriptions = nil + b.subscriptionsByBitmask = nil b.subscriptionMutex.Unlock() return nil diff --git a/node/p2p/blossomsub_test.go b/node/p2p/blossomsub_test.go new file mode 100644 index 0000000..2a458bf --- /dev/null +++ b/node/p2p/blossomsub_test.go @@ -0,0 +1,170 @@ +package p2p + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/network" + "go.uber.org/zap" + blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub" + "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" +) + +// newTestBlossomSub creates a minimal BlossomSub wrapper suitable for testing +// Subscribe/Unsubscribe without the full DHT/discovery/bootstrap setup. +func newTestBlossomSub(t *testing.T) *BlossomSub { + t.Helper() + + ctx, cancel := context.WithCancel(context.Background()) + + h, err := libp2p.New( + libp2p.ResourceManager(&network.NullResourceManager{}), + ) + if err != nil { + t.Fatal(err) + } + + ps, err := blossomsub.NewBlossomSub(ctx, h) + if err != nil { + h.Close() + cancel() + t.Fatal(err) + } + + bs := &BlossomSub{ + ctx: ctx, + cancel: cancel, + logger: zap.NewNop(), + ps: ps, + h: h, + bitmaskMap: make(map[string]*blossomsub.Bitmask), + subscriptionTracker: make(map[string][][]byte), + subscriptionsByBitmask: make(map[string][]*blossomsub.Subscription), + } + bs.p2pConfig.SubscriptionQueueSize = 128 + + t.Cleanup(func() { + cancel() + h.Close() + }) + + return bs +} + +func noopHandler(*pb.Message) error { return nil } + +// TestUnsubscribeAllowsResubscribe is the critical regression test. It verifies +// that after Unsubscribe, the same bitmask can be subscribed to again. Before +// the fix, bm.Close() silently failed because subscriptions were still open, +// and the subsequent ps.Join() returned an error because the bitmask was still +// registered. +func TestUnsubscribeAllowsResubscribe(t *testing.T) { + bs := newTestBlossomSub(t) + bitmask := []byte{0x01} + + // First subscribe + if err := bs.Subscribe(bitmask, noopHandler); err != nil { + t.Fatalf("first Subscribe failed: %v", err) + } + + // Unsubscribe – must cancel subs before Close so Close succeeds + bs.Unsubscribe(bitmask, false) + + // Re-subscribe – this fails without the fix because the bitmask is still + // registered in the pubsub (Close was a silent no-op). + if err := bs.Subscribe(bitmask, noopHandler); err != nil { + t.Fatalf("re-Subscribe after Unsubscribe failed: %v", err) + } + + // Clean up + bs.Unsubscribe(bitmask, false) +} + +// TestUnsubscribeTracksPerBitmask verifies that subscribing to multiple +// bitmasks tracks them independently and unsubscribing one doesn't affect +// the other. +func TestUnsubscribeTracksPerBitmask(t *testing.T) { + bs := newTestBlossomSub(t) + bitmaskA := []byte{0x01} + bitmaskB := []byte{0x02} + + // Subscribe to both + if err := bs.Subscribe(bitmaskA, noopHandler); err != nil { + t.Fatalf("Subscribe A failed: %v", err) + } + if err := bs.Subscribe(bitmaskB, noopHandler); err != nil { + t.Fatalf("Subscribe B failed: %v", err) + } + + // Both should be tracked + bs.subscriptionMutex.RLock() + if _, ok := bs.subscriptionsByBitmask[string(bitmaskA)]; !ok { + t.Error("bitmask A not tracked in subscriptionsByBitmask") + } + if _, ok := bs.subscriptionsByBitmask[string(bitmaskB)]; !ok { + t.Error("bitmask B not tracked in subscriptionsByBitmask") + } + bs.subscriptionMutex.RUnlock() + + // Unsubscribe A only + bs.Unsubscribe(bitmaskA, false) + + bs.subscriptionMutex.RLock() + if _, ok := bs.subscriptionsByBitmask[string(bitmaskA)]; ok { + t.Error("bitmask A still tracked after Unsubscribe") + } + if _, ok := bs.subscriptionsByBitmask[string(bitmaskB)]; !ok { + t.Error("bitmask B should still be tracked") + } + bs.subscriptionMutex.RUnlock() + + // A should be re-subscribable (Close succeeded) + if err := bs.Subscribe(bitmaskA, noopHandler); err != nil { + t.Fatalf("re-Subscribe A after Unsubscribe failed: %v", err) + } + + // Unsubscribe both + bs.Unsubscribe(bitmaskA, false) + bs.Unsubscribe(bitmaskB, false) + + bs.subscriptionMutex.RLock() + if len(bs.subscriptionsByBitmask) != 0 { + t.Errorf("subscriptionsByBitmask should be empty, got %d entries", + len(bs.subscriptionsByBitmask)) + } + bs.subscriptionMutex.RUnlock() +} + +// TestUnsubscribeHandlerExits verifies that after Unsubscribe, the handler +// goroutine actually stops. sub.Cancel() unblocks the sub.Next() call in the +// goroutine, causing it to return false and exit. +func TestUnsubscribeHandlerExits(t *testing.T) { + bs := newTestBlossomSub(t) + bitmask := []byte{0x01} + + var calls atomic.Int32 + handler := func(*pb.Message) error { + calls.Add(1) + return nil + } + + if err := bs.Subscribe(bitmask, handler); err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + bs.Unsubscribe(bitmask, false) + + // Give the goroutine time to observe the cancellation and exit. + time.Sleep(100 * time.Millisecond) + snapshot := calls.Load() + + // Wait again and verify no further increments. + time.Sleep(100 * time.Millisecond) + if got := calls.Load(); got != snapshot { + t.Errorf("handler still running after Unsubscribe: calls went from %d to %d", + snapshot, got) + } +}