From cef7fb65cc10f6acd05a0866aa412f678565a941 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Tue, 3 Dec 2024 00:07:58 +0100 Subject: [PATCH] Avoid pooling large buffers (#399) * Revert buffer reuse * Use pool only for small messages --- go-libp2p-blossomsub/comm.go | 85 ++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 47 deletions(-) diff --git a/go-libp2p-blossomsub/comm.go b/go-libp2p-blossomsub/comm.go index e70cd7d..28d0f2d 100644 --- a/go-libp2p-blossomsub/comm.go +++ b/go-libp2p-blossomsub/comm.go @@ -56,10 +56,29 @@ func (p *PubSub) handleNewStream(s network.Stream) { p.inboundStreamsMx.Unlock() r := msgio.NewVarintReaderSize(s, p.hardMaxMessageSize) - for { - msgbytes, err := r.ReadMsg() + read := func() (*RPC, error) { + n, err := r.NextMsgLen() + if err != nil { + return nil, err + } + if n == 0 { + _, err := r.Read(nil) + return nil, err + } + buf := poolGet(n, p.softMaxMessageSize) + defer poolPut(buf, p.softMaxMessageSize) + if _, err := r.Read(buf); err != nil { + return nil, err + } + rpc := new(pb.RPC) + if err := rpc.Unmarshal(buf); err != nil { + return nil, err + } + return &RPC{RPC: rpc, from: peer}, nil + } + for { + rpc, err := read() if err != nil { - r.ReleaseMsg(msgbytes) if err != io.EOF { s.Reset() log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) @@ -76,28 +95,10 @@ func (p *PubSub) handleNewStream(s network.Stream) { p.inboundStreamsMx.Unlock() return } - if len(msgbytes) == 0 { - r.ReleaseMsg(msgbytes) + if rpc == nil { continue } - rpc := &RPC{ - RPC: new(pb.RPC), - } - err = rpc.Unmarshal(msgbytes) - r.ReleaseMsg(msgbytes) - if err != nil { - s.Reset() - log.Warnf("bogus rpc from %s: %s", s.Conn().RemotePeer(), err) - p.inboundStreamsMx.Lock() - if p.inboundStreams[peer] == s { - delete(p.inboundStreams, peer) - } - p.inboundStreamsMx.Unlock() - return - } - - rpc.from = peer select { case p.incoming <- rpc: case <-p.ctx.Done(): @@ -169,11 +170,10 @@ func (p *PubSub) handlePeerDead(s network.Stream) { } func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, q *rpcQueue) { - getBuffer, returnLastBuffer := makeBufferSource() - defer returnLastBuffer() writeRPC := func(rpc *RPC) error { size := uint64(rpc.Size()) - buf := getBuffer(varint.UvarintSize(size) + int(size)) + buf := poolGet(varint.UvarintSize(size)+int(size), p.softMaxMessageSize) + defer poolPut(buf, p.softMaxMessageSize) n := binary.PutUvarint(buf, size) _, err := rpc.MarshalTo(buf[n:]) if err != nil { @@ -236,27 +236,18 @@ func copyRPC(rpc *RPC) *RPC { return res } -// makeBufferSource returns a function that can be used to allocate buffers of -// a given size, and a function that can be used to return the last buffer -// allocated. -// The returned function will attempt to reuse the last buffer allocated if -// the requested size is less than or equal to the capacity of the last buffer. -// If the requested size is greater than the capacity of the last buffer, the -// last buffer is returned to the pool and a new buffer is allocated. -// If the requested size is less than or equal to half the capacity of the last -// buffer, the last buffer is returned to the pool and a new buffer is allocated. -func makeBufferSource() (func(int) []byte, func()) { - b := pool.Get(0) - mk := func(n int) []byte { - if c := cap(b); c/2 < n && n <= c { - return b[:n] - } - pool.Put(b) - b = pool.Get(n) - return b +// poolGet returns a buffer of length n from the pool if n < limit, otherwise it allocates a new buffer. +func poolGet(n int, limit int) []byte { + if n >= limit { + return make([]byte, n) } - rt := func() { - pool.Put(b) - } - return mk, rt + return pool.Get(n) +} + +// poolPut returns a buffer to the pool if its length is less than limit. +func poolPut(buf []byte, limit int) { + if len(buf) >= limit { + return + } + pool.Put(buf) }