Increase subscription buffer size (#400)

This commit is contained in:
petricadaipegsp 2024-12-03 11:26:19 +01:00 committed by GitHub
parent 378d104691
commit 63394edc9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 9 deletions

View File

@ -13,6 +13,9 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)
// DefaultSubscriptionQueueSize is the default size of the subscription queue.
const DefaultSubscriptionQueueSize = 16384
// ErrBitmaskClosed is returned if a Bitmask is utilized after it has been closed
var ErrBitmaskClosed = errors.New("this Bitmask is closed, try opening a new one")
@ -167,7 +170,7 @@ func (t *Bitmask) Subscribe(opts ...SubOpt) (*Subscription, error) {
}
if sub.ch == nil {
sub.ch = make(chan *Message, 32)
sub.ch = make(chan *Message, DefaultSubscriptionQueueSize)
}
out := make(chan *Subscription, 1)

View File

@ -53,4 +53,5 @@ type P2PConfig struct {
PingAttempts int `yaml:"pingAttempts"`
ValidateQueueSize int `yaml:"validateQueueSize"`
ValidateWorkers int `yaml:"validateWorkers"`
SubscriptionQueueSize int `yaml:"subscriptionQueueSize"`
}

View File

@ -82,10 +82,10 @@ type BlossomSub struct {
signKey crypto.PrivKey
peerScore map[string]*appScore
peerScoreMx sync.Mutex
network uint8
bootstrap internal.PeerConnector
discovery internal.PeerConnector
reachability atomic.Pointer[network.Reachability]
p2pConfig config.P2PConfig
}
var _ PubSub = (*BlossomSub)(nil)
@ -160,7 +160,7 @@ func NewBlossomSubStreamer(
bitmaskMap: make(map[string]*blossomsub.Bitmask),
signKey: privKey,
peerScore: make(map[string]*appScore),
network: p2pConfig.Network,
p2pConfig: *p2pConfig,
}
h, err := libp2p.New(opts...)
@ -314,7 +314,7 @@ func NewBlossomSub(
bitmaskMap: make(map[string]*blossomsub.Bitmask),
signKey: privKey,
peerScore: make(map[string]*appScore),
network: p2pConfig.Network,
p2pConfig: *p2pConfig,
}
h, err := libp2p.New(opts...)
@ -493,7 +493,7 @@ func NewBlossomSub(
))
params := toBlossomSubParams(p2pConfig)
rt := blossomsub.NewBlossomSubRouter(h, params, bs.network)
rt := blossomsub.NewBlossomSubRouter(h, params, bs.p2pConfig.Network)
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, h, rt, blossomOpts...)
if err != nil {
@ -657,7 +657,7 @@ func (b *BlossomSub) Subscribe(
b.logger.Info("subscribe to bitmask", zap.Binary("bitmask", bitmask))
subs := []*blossomsub.Subscription{}
for _, bit := range bm {
sub, err := bit.Subscribe()
sub, err := bit.Subscribe(blossomsub.WithBufferSize(b.p2pConfig.SubscriptionQueueSize))
if err != nil {
b.logger.Error("subscription failed", zap.Error(err))
return errors.Wrap(err, "subscribe")
@ -701,7 +701,9 @@ func (b *BlossomSub) Subscribe(
}
func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
networkBitmask := append([]byte{b.network}, bitmask...)
// TODO: Fix this, it is broken - the bitmask parameter is not sliced, and the
// network is not pre-pended to the bitmask.
networkBitmask := append([]byte{b.p2pConfig.Network}, bitmask...)
bm, ok := b.bitmaskMap[string(networkBitmask)]
if !ok {
return
@ -740,7 +742,9 @@ func (b *BlossomSub) GetPeerID() []byte {
}
func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) {
networkBitmask := append([]byte{b.network}, bitmask...)
// TODO: Fix this, it is broken - the bitmask parameter is not sliced, and the
// network is not pre-pended to the bitmask.
networkBitmask := append([]byte{b.p2pConfig.Network}, bitmask...)
peers := b.ps.ListPeers(networkBitmask)
if len(peers) == 0 {
return nil, errors.Wrap(
@ -875,6 +879,8 @@ func (b *BlossomSub) AddPeerScore(peerId []byte, scoreDelta int64) {
func (b *BlossomSub) GetBitmaskPeers() map[string][]string {
peers := map[string][]string{}
// TODO: Fix this, it is broken - the bitmask parameter is not sliced, and the
// network is not pre-pended to the bitmask.
for _, k := range b.bitmaskMap {
peers[fmt.Sprintf("%+x", k.Bitmask()[1:])] = []string{}
@ -931,7 +937,7 @@ func (b *BlossomSub) GetMultiaddrOfPeer(peerId []byte) string {
}
func (b *BlossomSub) GetNetwork() uint {
return uint(b.network)
return uint(b.p2pConfig.Network)
}
func (b *BlossomSub) StartDirectChannelListener(
@ -1165,6 +1171,9 @@ func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig {
if p2pConfig.ValidateWorkers == 0 {
p2pConfig.ValidateWorkers = qruntime.WorkerCount(0, false)
}
if p2pConfig.SubscriptionQueueSize == 0 {
p2pConfig.SubscriptionQueueSize = blossomsub.DefaultSubscriptionQueueSize
}
return p2pConfig
}