diff --git a/go-libp2p-blossomsub/bitmask.go b/go-libp2p-blossomsub/bitmask.go index 0625ccf..2fa509b 100644 --- a/go-libp2p-blossomsub/bitmask.go +++ b/go-libp2p-blossomsub/bitmask.go @@ -13,6 +13,9 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) +// DefaultSubscriptionQueueSize is the default size of the subscription queue. +const DefaultSubscriptionQueueSize = 16384 + // ErrBitmaskClosed is returned if a Bitmask is utilized after it has been closed var ErrBitmaskClosed = errors.New("this Bitmask is closed, try opening a new one") @@ -167,7 +170,7 @@ func (t *Bitmask) Subscribe(opts ...SubOpt) (*Subscription, error) { } if sub.ch == nil { - sub.ch = make(chan *Message, 32) + sub.ch = make(chan *Message, DefaultSubscriptionQueueSize) } out := make(chan *Subscription, 1) diff --git a/node/config/p2p.go b/node/config/p2p.go index 2c98048..49d7306 100644 --- a/node/config/p2p.go +++ b/node/config/p2p.go @@ -53,4 +53,5 @@ type P2PConfig struct { PingAttempts int `yaml:"pingAttempts"` ValidateQueueSize int `yaml:"validateQueueSize"` ValidateWorkers int `yaml:"validateWorkers"` + SubscriptionQueueSize int `yaml:"subscriptionQueueSize"` } diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index a5ba56d..00e9056 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -82,10 +82,10 @@ type BlossomSub struct { signKey crypto.PrivKey peerScore map[string]*appScore peerScoreMx sync.Mutex - network uint8 bootstrap internal.PeerConnector discovery internal.PeerConnector reachability atomic.Pointer[network.Reachability] + p2pConfig config.P2PConfig } var _ PubSub = (*BlossomSub)(nil) @@ -160,7 +160,7 @@ func NewBlossomSubStreamer( bitmaskMap: make(map[string]*blossomsub.Bitmask), signKey: privKey, peerScore: make(map[string]*appScore), - network: p2pConfig.Network, + p2pConfig: *p2pConfig, } h, err := libp2p.New(opts...) @@ -314,7 +314,7 @@ func NewBlossomSub( bitmaskMap: make(map[string]*blossomsub.Bitmask), signKey: privKey, peerScore: make(map[string]*appScore), - network: p2pConfig.Network, + p2pConfig: *p2pConfig, } h, err := libp2p.New(opts...) @@ -493,7 +493,7 @@ func NewBlossomSub( )) params := toBlossomSubParams(p2pConfig) - rt := blossomsub.NewBlossomSubRouter(h, params, bs.network) + rt := blossomsub.NewBlossomSubRouter(h, params, bs.p2pConfig.Network) blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer()) pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, h, rt, blossomOpts...) if err != nil { @@ -657,7 +657,7 @@ func (b *BlossomSub) Subscribe( b.logger.Info("subscribe to bitmask", zap.Binary("bitmask", bitmask)) subs := []*blossomsub.Subscription{} for _, bit := range bm { - sub, err := bit.Subscribe() + sub, err := bit.Subscribe(blossomsub.WithBufferSize(b.p2pConfig.SubscriptionQueueSize)) if err != nil { b.logger.Error("subscription failed", zap.Error(err)) return errors.Wrap(err, "subscribe") @@ -701,7 +701,9 @@ func (b *BlossomSub) Subscribe( } func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) { - networkBitmask := append([]byte{b.network}, bitmask...) + // TODO: Fix this, it is broken - the bitmask parameter is not sliced, and the + // network is not pre-pended to the bitmask. + networkBitmask := append([]byte{b.p2pConfig.Network}, bitmask...) bm, ok := b.bitmaskMap[string(networkBitmask)] if !ok { return @@ -740,7 +742,9 @@ func (b *BlossomSub) GetPeerID() []byte { } func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) { - networkBitmask := append([]byte{b.network}, bitmask...) + // TODO: Fix this, it is broken - the bitmask parameter is not sliced, and the + // network is not pre-pended to the bitmask. + networkBitmask := append([]byte{b.p2pConfig.Network}, bitmask...) peers := b.ps.ListPeers(networkBitmask) if len(peers) == 0 { return nil, errors.Wrap( @@ -875,6 +879,8 @@ func (b *BlossomSub) AddPeerScore(peerId []byte, scoreDelta int64) { func (b *BlossomSub) GetBitmaskPeers() map[string][]string { peers := map[string][]string{} + // TODO: Fix this, it is broken - the bitmask parameter is not sliced, and the + // network is not pre-pended to the bitmask. for _, k := range b.bitmaskMap { peers[fmt.Sprintf("%+x", k.Bitmask()[1:])] = []string{} @@ -931,7 +937,7 @@ func (b *BlossomSub) GetMultiaddrOfPeer(peerId []byte) string { } func (b *BlossomSub) GetNetwork() uint { - return uint(b.network) + return uint(b.p2pConfig.Network) } func (b *BlossomSub) StartDirectChannelListener( @@ -1165,6 +1171,9 @@ func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig { if p2pConfig.ValidateWorkers == 0 { p2pConfig.ValidateWorkers = qruntime.WorkerCount(0, false) } + if p2pConfig.SubscriptionQueueSize == 0 { + p2pConfig.SubscriptionQueueSize = blossomsub.DefaultSubscriptionQueueSize + } return p2pConfig }