diff --git a/go-libp2p-blossomsub/blossomsub.go b/go-libp2p-blossomsub/blossomsub.go index 7bd3749..5888bfc 100644 --- a/go-libp2p-blossomsub/blossomsub.go +++ b/go-libp2p-blossomsub/blossomsub.go @@ -1402,18 +1402,18 @@ func (bs *BlossomSubRouter) sendRPC(p peer.ID, out *RPC, fast bool) { } // If we're below the max message size, go ahead and send - if out.Size() < bs.p.maxMessageSize { + if out.Size() < bs.p.softMaxMessageSize { bs.doSendRPC(out, p, mch, fast) return } outCopy := copyRPC(out) // Potentially split the RPC into multiple RPCs that are below the max message size - outRPCs := appendOrMergeRPC(nil, bs.p.maxMessageSize, outCopy) + outRPCs := appendOrMergeRPC(nil, bs.p.softMaxMessageSize, outCopy) for _, rpc := range outRPCs { - if rpc.Size() > bs.p.maxMessageSize { + if rpc.Size() > bs.p.hardMaxMessageSize { // This should only happen if a single message/control is above the maxMessageSize. - bs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), bs.p.maxMessageSize, rpc.Size()-bs.p.maxMessageSize)) + bs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), bs.p.hardMaxMessageSize, rpc.Size()-bs.p.hardMaxMessageSize)) continue } bs.doSendRPC(rpc, p, mch, fast) diff --git a/go-libp2p-blossomsub/blossomsub_test.go b/go-libp2p-blossomsub/blossomsub_test.go index 0df484d..37a3c2d 100644 --- a/go-libp2p-blossomsub/blossomsub_test.go +++ b/go-libp2p-blossomsub/blossomsub_test.go @@ -2314,7 +2314,7 @@ func (sq *sybilSquatter) handleStream(s network.Stream) { // send a subscription for test in the output stream to become candidate for GRAFT // and then just read and ignore the incoming RPCs - r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize) + r := msgio.NewVarintReaderSize(s, DefaultHardMaxMessageSize) w := msgio.NewVarintWriter(os) truth := true bitmask := []byte{0x00, 0x00, 0x81, 0x00} @@ -2534,7 +2534,7 @@ func TestBlossomSubRPCFragmentation(t *testing.T) { // (nMessages * msgSize) / ps.maxMessageSize total RPCs containing the messages we sent IWANTs for. // The actual number will probably be larger, since there's some overhead for the RPC itself, and // we probably aren't packing each RPC to it's maximum size - minExpectedRPCS := (nMessages * msgSize) / ps.maxMessageSize + minExpectedRPCS := (nMessages * msgSize) / ps.softMaxMessageSize if iwe.rpcsWithMessages < minExpectedRPCS { t.Fatalf("expected to receive at least %d RPCs containing messages, got %d", minExpectedRPCS, iwe.rpcsWithMessages) } @@ -2563,7 +2563,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { gossipMsgIdsReceived := make(map[string]struct{}) // send a subscription for test in the output stream to become candidate for gossip - r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize) + r := msgio.NewVarintReaderSize(s, DefaultHardMaxMessageSize) w := msgio.NewVarintWriter(os) truth := true bitmasks := [][]byte{{0x00, 0x00, 0x80, 0x00}, {0x00, 0x00, 0x01, 0x00}} diff --git a/go-libp2p-blossomsub/comm.go b/go-libp2p-blossomsub/comm.go index b09e05f..61e5b37 100644 --- a/go-libp2p-blossomsub/comm.go +++ b/go-libp2p-blossomsub/comm.go @@ -55,7 +55,7 @@ func (p *PubSub) handleNewStream(s network.Stream) { p.inboundStreams[peer] = s p.inboundStreamsMx.Unlock() - r := msgio.NewVarintReaderSize(s, p.maxMessageSize) + r := msgio.NewVarintReaderSize(s, p.hardMaxMessageSize) for { msgbytes, err := r.ReadMsg() if err != nil { diff --git a/go-libp2p-blossomsub/pubsub.go b/go-libp2p-blossomsub/pubsub.go index 7dae14a..efe9b14 100644 --- a/go-libp2p-blossomsub/pubsub.go +++ b/go-libp2p-blossomsub/pubsub.go @@ -26,8 +26,11 @@ import ( logging "github.com/ipfs/go-log/v2" ) -// DefaultMaximumMessageSize is 20 MB. -const DefaultMaxMessageSize = 10 << 21 +// DefaultSoftMaxMessageSize is 10 MiB. +const DefaultSoftMaxMessageSize = 10 << 20 + +// DefaultHardMaxMessageSize is 20 MB. +const DefaultHardMaxMessageSize = 10 << 21 var ( // TimeCacheDuration specifies how long a message ID will be remembered as seen. @@ -68,9 +71,12 @@ type PubSub struct { peerFilter PeerFilter - // maxMessageSize is the maximum message size; it applies globally to all - // bitmasks. - maxMessageSize int + // softMaxMessageSize is the maximum size of a single message fragment that + // we will attempt to send. + softMaxMessageSize int + // hardMaxMessageSize is the maximum size of a single message fragment that + // we will accept. + hardMaxMessageSize int // size of the outbound message channel that we maintain for each peer peerOutboundQueueSize int @@ -265,7 +271,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option val: newValidation(), peerFilter: DefaultPeerFilter, disc: &discover{}, - maxMessageSize: DefaultMaxMessageSize, + softMaxMessageSize: DefaultSoftMaxMessageSize, + hardMaxMessageSize: DefaultHardMaxMessageSize, peerOutboundQueueSize: 32, signID: h.ID(), signKey: nil, @@ -520,9 +527,10 @@ func WithRawTracer(tracer RawTracer) Option { // another type of locator, such that messages can be fetched on-demand, rather // than being pushed proactively. Under this design, you'd use the pubsub layer // as a signalling system, rather than a data delivery system. -func WithMaxMessageSize(maxMessageSize int) Option { +func WithMaxMessageSize(softMaxMessageSize, hardMaxMessageSize int) Option { return func(ps *PubSub) error { - ps.maxMessageSize = maxMessageSize + ps.softMaxMessageSize = softMaxMessageSize + ps.hardMaxMessageSize = hardMaxMessageSize return nil } } diff --git a/go-libp2p-blossomsub/trace_test.go b/go-libp2p-blossomsub/trace_test.go index 706f0e8..22b7378 100644 --- a/go-libp2p-blossomsub/trace_test.go +++ b/go-libp2p-blossomsub/trace_test.go @@ -251,7 +251,7 @@ func TestPBTracer(t *testing.T) { } defer f.Close() - r := msgio.NewVarintReaderSize(f, DefaultMaxMessageSize) + r := msgio.NewVarintReaderSize(f, DefaultHardMaxMessageSize) for { evt.Reset()