Avoid pooling large buffers (#399)

* Revert buffer reuse

* Use pool only for small messages
This commit is contained in:
petricadaipegsp 2024-12-03 00:07:58 +01:00 committed by GitHub
parent d1e65c1c92
commit cef7fb65cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -56,10 +56,29 @@ func (p *PubSub) handleNewStream(s network.Stream) {
p.inboundStreamsMx.Unlock()
r := msgio.NewVarintReaderSize(s, p.hardMaxMessageSize)
for {
msgbytes, err := r.ReadMsg()
read := func() (*RPC, error) {
n, err := r.NextMsgLen()
if err != nil {
return nil, err
}
if n == 0 {
_, err := r.Read(nil)
return nil, err
}
buf := poolGet(n, p.softMaxMessageSize)
defer poolPut(buf, p.softMaxMessageSize)
if _, err := r.Read(buf); err != nil {
return nil, err
}
rpc := new(pb.RPC)
if err := rpc.Unmarshal(buf); err != nil {
return nil, err
}
return &RPC{RPC: rpc, from: peer}, nil
}
for {
rpc, err := read()
if err != nil {
r.ReleaseMsg(msgbytes)
if err != io.EOF {
s.Reset()
log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
@ -76,28 +95,10 @@ func (p *PubSub) handleNewStream(s network.Stream) {
p.inboundStreamsMx.Unlock()
return
}
if len(msgbytes) == 0 {
r.ReleaseMsg(msgbytes)
if rpc == nil {
continue
}
rpc := &RPC{
RPC: new(pb.RPC),
}
err = rpc.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
s.Reset()
log.Warnf("bogus rpc from %s: %s", s.Conn().RemotePeer(), err)
p.inboundStreamsMx.Lock()
if p.inboundStreams[peer] == s {
delete(p.inboundStreams, peer)
}
p.inboundStreamsMx.Unlock()
return
}
rpc.from = peer
select {
case p.incoming <- rpc:
case <-p.ctx.Done():
@ -169,11 +170,10 @@ func (p *PubSub) handlePeerDead(s network.Stream) {
}
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, q *rpcQueue) {
getBuffer, returnLastBuffer := makeBufferSource()
defer returnLastBuffer()
writeRPC := func(rpc *RPC) error {
size := uint64(rpc.Size())
buf := getBuffer(varint.UvarintSize(size) + int(size))
buf := poolGet(varint.UvarintSize(size)+int(size), p.softMaxMessageSize)
defer poolPut(buf, p.softMaxMessageSize)
n := binary.PutUvarint(buf, size)
_, err := rpc.MarshalTo(buf[n:])
if err != nil {
@ -236,27 +236,18 @@ func copyRPC(rpc *RPC) *RPC {
return res
}
// makeBufferSource returns a function that can be used to allocate buffers of
// a given size, and a function that can be used to return the last buffer
// allocated.
// The returned function will attempt to reuse the last buffer allocated if
// the requested size is less than or equal to the capacity of the last buffer.
// If the requested size is greater than the capacity of the last buffer, the
// last buffer is returned to the pool and a new buffer is allocated.
// If the requested size is less than or equal to half the capacity of the last
// buffer, the last buffer is returned to the pool and a new buffer is allocated.
func makeBufferSource() (func(int) []byte, func()) {
b := pool.Get(0)
mk := func(n int) []byte {
if c := cap(b); c/2 < n && n <= c {
return b[:n]
}
pool.Put(b)
b = pool.Get(n)
return b
// poolGet returns a buffer of length n from the pool if n < limit, otherwise it allocates a new buffer.
func poolGet(n int, limit int) []byte {
if n >= limit {
return make([]byte, n)
}
rt := func() {
pool.Put(b)
}
return mk, rt
return pool.Get(n)
}
// poolPut returns a buffer to the pool if its length is less than limit.
func poolPut(buf []byte, limit int) {
if len(buf) >= limit {
return
}
pool.Put(buf)
}