blossomsub: Separate soft and hard message limits (#387)

This commit is contained in:
petricadaipegsp 2024-11-25 01:40:38 +01:00 committed by GitHub
parent 7fd4d32521
commit ab9b90fed2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 25 additions and 17 deletions

View File

@ -1402,18 +1402,18 @@ func (bs *BlossomSubRouter) sendRPC(p peer.ID, out *RPC, fast bool) {
}
// If we're below the max message size, go ahead and send
if out.Size() < bs.p.maxMessageSize {
if out.Size() < bs.p.softMaxMessageSize {
bs.doSendRPC(out, p, mch, fast)
return
}
outCopy := copyRPC(out)
// Potentially split the RPC into multiple RPCs that are below the max message size
outRPCs := appendOrMergeRPC(nil, bs.p.maxMessageSize, outCopy)
outRPCs := appendOrMergeRPC(nil, bs.p.softMaxMessageSize, outCopy)
for _, rpc := range outRPCs {
if rpc.Size() > bs.p.maxMessageSize {
if rpc.Size() > bs.p.hardMaxMessageSize {
// This should only happen if a single message/control is above the maxMessageSize.
bs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), bs.p.maxMessageSize, rpc.Size()-bs.p.maxMessageSize))
bs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), bs.p.hardMaxMessageSize, rpc.Size()-bs.p.hardMaxMessageSize))
continue
}
bs.doSendRPC(rpc, p, mch, fast)

View File

@ -2314,7 +2314,7 @@ func (sq *sybilSquatter) handleStream(s network.Stream) {
// send a subscription for test in the output stream to become candidate for GRAFT
// and then just read and ignore the incoming RPCs
r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize)
r := msgio.NewVarintReaderSize(s, DefaultHardMaxMessageSize)
w := msgio.NewVarintWriter(os)
truth := true
bitmask := []byte{0x00, 0x00, 0x81, 0x00}
@ -2534,7 +2534,7 @@ func TestBlossomSubRPCFragmentation(t *testing.T) {
// (nMessages * msgSize) / ps.maxMessageSize total RPCs containing the messages we sent IWANTs for.
// The actual number will probably be larger, since there's some overhead for the RPC itself, and
// we probably aren't packing each RPC to it's maximum size
minExpectedRPCS := (nMessages * msgSize) / ps.maxMessageSize
minExpectedRPCS := (nMessages * msgSize) / ps.softMaxMessageSize
if iwe.rpcsWithMessages < minExpectedRPCS {
t.Fatalf("expected to receive at least %d RPCs containing messages, got %d", minExpectedRPCS, iwe.rpcsWithMessages)
}
@ -2563,7 +2563,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) {
gossipMsgIdsReceived := make(map[string]struct{})
// send a subscription for test in the output stream to become candidate for gossip
r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize)
r := msgio.NewVarintReaderSize(s, DefaultHardMaxMessageSize)
w := msgio.NewVarintWriter(os)
truth := true
bitmasks := [][]byte{{0x00, 0x00, 0x80, 0x00}, {0x00, 0x00, 0x01, 0x00}}

View File

@ -55,7 +55,7 @@ func (p *PubSub) handleNewStream(s network.Stream) {
p.inboundStreams[peer] = s
p.inboundStreamsMx.Unlock()
r := msgio.NewVarintReaderSize(s, p.maxMessageSize)
r := msgio.NewVarintReaderSize(s, p.hardMaxMessageSize)
for {
msgbytes, err := r.ReadMsg()
if err != nil {

View File

@ -26,8 +26,11 @@ import (
logging "github.com/ipfs/go-log/v2"
)
// DefaultMaximumMessageSize is 20 MB.
const DefaultMaxMessageSize = 10 << 21
// DefaultSoftMaxMessageSize is 10 MiB.
const DefaultSoftMaxMessageSize = 10 << 20
// DefaultHardMaxMessageSize is 20 MB.
const DefaultHardMaxMessageSize = 10 << 21
var (
// TimeCacheDuration specifies how long a message ID will be remembered as seen.
@ -68,9 +71,12 @@ type PubSub struct {
peerFilter PeerFilter
// maxMessageSize is the maximum message size; it applies globally to all
// bitmasks.
maxMessageSize int
// softMaxMessageSize is the maximum size of a single message fragment that
// we will attempt to send.
softMaxMessageSize int
// hardMaxMessageSize is the maximum size of a single message fragment that
// we will accept.
hardMaxMessageSize int
// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int
@ -265,7 +271,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
val: newValidation(),
peerFilter: DefaultPeerFilter,
disc: &discover{},
maxMessageSize: DefaultMaxMessageSize,
softMaxMessageSize: DefaultSoftMaxMessageSize,
hardMaxMessageSize: DefaultHardMaxMessageSize,
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: nil,
@ -520,9 +527,10 @@ func WithRawTracer(tracer RawTracer) Option {
// another type of locator, such that messages can be fetched on-demand, rather
// than being pushed proactively. Under this design, you'd use the pubsub layer
// as a signalling system, rather than a data delivery system.
func WithMaxMessageSize(maxMessageSize int) Option {
func WithMaxMessageSize(softMaxMessageSize, hardMaxMessageSize int) Option {
return func(ps *PubSub) error {
ps.maxMessageSize = maxMessageSize
ps.softMaxMessageSize = softMaxMessageSize
ps.hardMaxMessageSize = hardMaxMessageSize
return nil
}
}

View File

@ -251,7 +251,7 @@ func TestPBTracer(t *testing.T) {
}
defer f.Close()
r := msgio.NewVarintReaderSize(f, DefaultMaxMessageSize)
r := msgio.NewVarintReaderSize(f, DefaultHardMaxMessageSize)
for {
evt.Reset()