mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-01 22:37:51 +08:00
This PR adds several new functionalities to make easier the usage of ResourceManager: - Now resource manager logs when resources are exceeded are on ERROR instead of warning. - The resources exceeded error now shows what kind of limit was reached and the scope. - When there was no limit exceeded, we print a message for the user saying that limits are not exceeded anymore. - Added `swarm limit all` command to show all set limits with the same format as `swarm stats all` - Added `min-used-limit-perc` option to `swarm stats all` to only show stats that are above a specific percentage - Simplify a lot default values. - **Enable ResourceManager by default.** Output example: ``` 2022-11-09T10:51:40.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:51:50.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 483095 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:51:50.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:52:00.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 455294 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:00.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:52:10.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 471384 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:10.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:52:20.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 8 times with error "peer:12D3KooWKqcaBtcmZKLKCCoDPBuA6AXGJMNrLQUPPMsA5Q6D1eG6: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:20.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 192 times with error "peer:12D3KooWPjetWPGQUih9LZTGHdyAM9fKaXtUxDyBhA93E3JAWCXj: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:20.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 469746 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:20.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:52:30.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 484137 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:30.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 29 times with error "peer:12D3KooWPjetWPGQUih9LZTGHdyAM9fKaXtUxDyBhA93E3JAWCXj: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:30.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:52:40.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 468843 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:40.566+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:52:50.566+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 366638 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:52:50.566+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:53:00.566+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 405526 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:53:00.566+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 107 times with error "peer:12D3KooWQZQCwevTDGhkE9iGYk5sBzWRDUSX68oyrcfM9tXyrs2Q: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:53:00.566+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:53:10.566+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 336923 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:53:10.566+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:53:20.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:55 Resource limits were exceeded 71 times with error "transient: cannot reserve inbound stream: resource limit exceeded". 2022-11-09T10:53:20.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:59 Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr 2022-11-09T10:53:30.565+0100 ERROR resourcemanager libp2p/rcmgr_logging.go:64 Resrouce limits are no longer being exceeded. ``` ## Validation tests - Accelerated DHT client runs with no errors when ResourceManager is active. No problems were observed. - Running an attack with 200 connections and 1M streams using yamux protocol. Node was usable during the attack. With ResourceManager deactivated, the node was killed by the OS because of the amount of memory consumed. - Actions done when the attack was active: - Add files - Force a reprovide - Use the gateway to resolve an IPNS address. It closes #9001 It closes #9351 It closes #9322
212 lines
6.0 KiB
Go
212 lines
6.0 KiB
Go
package libp2p
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/benbjohnson/clock"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type loggingResourceManager struct {
|
|
clock clock.Clock
|
|
logger *zap.SugaredLogger
|
|
delegate network.ResourceManager
|
|
logInterval time.Duration
|
|
|
|
mut sync.Mutex
|
|
limitExceededErrs map[string]int
|
|
}
|
|
|
|
type loggingScope struct {
|
|
logger *zap.SugaredLogger
|
|
delegate network.ResourceScope
|
|
countErrs func(error)
|
|
}
|
|
|
|
var _ network.ResourceManager = (*loggingResourceManager)(nil)
|
|
var _ rcmgr.ResourceManagerState = (*loggingResourceManager)(nil)
|
|
|
|
func (n *loggingResourceManager) start(ctx context.Context) {
|
|
logInterval := n.logInterval
|
|
if logInterval == 0 {
|
|
logInterval = 10 * time.Second
|
|
}
|
|
ticker := n.clock.Ticker(logInterval)
|
|
go func() {
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
n.mut.Lock()
|
|
errs := n.limitExceededErrs
|
|
n.limitExceededErrs = make(map[string]int)
|
|
|
|
for e, count := range errs {
|
|
n.logger.Errorf("Resource limits were exceeded %d times with error %q.", count, e)
|
|
}
|
|
|
|
if len(errs) != 0 {
|
|
n.logger.Errorf("Consider inspecting logs and raising the resource manager limits. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#swarmresourcemgr")
|
|
}
|
|
|
|
n.mut.Unlock()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (n *loggingResourceManager) countErrs(err error) {
|
|
if errors.Is(err, network.ErrResourceLimitExceeded) {
|
|
n.mut.Lock()
|
|
if n.limitExceededErrs == nil {
|
|
n.limitExceededErrs = make(map[string]int)
|
|
}
|
|
|
|
// we need to unwrap the error to get the limit scope and the kind of reached limit
|
|
eout := errors.Unwrap(err)
|
|
if eout != nil {
|
|
n.limitExceededErrs[eout.Error()]++
|
|
}
|
|
|
|
n.mut.Unlock()
|
|
}
|
|
}
|
|
|
|
func (n *loggingResourceManager) ViewSystem(f func(network.ResourceScope) error) error {
|
|
return n.delegate.ViewSystem(f)
|
|
}
|
|
func (n *loggingResourceManager) ViewTransient(f func(network.ResourceScope) error) error {
|
|
return n.delegate.ViewTransient(func(s network.ResourceScope) error {
|
|
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
|
|
})
|
|
}
|
|
func (n *loggingResourceManager) ViewService(svc string, f func(network.ServiceScope) error) error {
|
|
return n.delegate.ViewService(svc, func(s network.ServiceScope) error {
|
|
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
|
|
})
|
|
}
|
|
func (n *loggingResourceManager) ViewProtocol(p protocol.ID, f func(network.ProtocolScope) error) error {
|
|
return n.delegate.ViewProtocol(p, func(s network.ProtocolScope) error {
|
|
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
|
|
})
|
|
}
|
|
func (n *loggingResourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error {
|
|
return n.delegate.ViewPeer(p, func(s network.PeerScope) error {
|
|
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
|
|
})
|
|
}
|
|
func (n *loggingResourceManager) OpenConnection(dir network.Direction, usefd bool, remote ma.Multiaddr) (network.ConnManagementScope, error) {
|
|
connMgmtScope, err := n.delegate.OpenConnection(dir, usefd, remote)
|
|
n.countErrs(err)
|
|
return connMgmtScope, err
|
|
}
|
|
func (n *loggingResourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
|
|
connMgmtScope, err := n.delegate.OpenStream(p, dir)
|
|
n.countErrs(err)
|
|
return connMgmtScope, err
|
|
}
|
|
func (n *loggingResourceManager) Close() error {
|
|
return n.delegate.Close()
|
|
}
|
|
|
|
func (n *loggingResourceManager) ListServices() []string {
|
|
rapi, ok := n.delegate.(rcmgr.ResourceManagerState)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return rapi.ListServices()
|
|
}
|
|
func (n *loggingResourceManager) ListProtocols() []protocol.ID {
|
|
rapi, ok := n.delegate.(rcmgr.ResourceManagerState)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return rapi.ListProtocols()
|
|
}
|
|
func (n *loggingResourceManager) ListPeers() []peer.ID {
|
|
rapi, ok := n.delegate.(rcmgr.ResourceManagerState)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return rapi.ListPeers()
|
|
}
|
|
|
|
func (n *loggingResourceManager) Stat() rcmgr.ResourceManagerStat {
|
|
rapi, ok := n.delegate.(rcmgr.ResourceManagerState)
|
|
if !ok {
|
|
return rcmgr.ResourceManagerStat{}
|
|
}
|
|
|
|
return rapi.Stat()
|
|
}
|
|
|
|
func (s *loggingScope) ReserveMemory(size int, prio uint8) error {
|
|
err := s.delegate.ReserveMemory(size, prio)
|
|
s.countErrs(err)
|
|
return err
|
|
}
|
|
func (s *loggingScope) ReleaseMemory(size int) {
|
|
s.delegate.ReleaseMemory(size)
|
|
}
|
|
func (s *loggingScope) Stat() network.ScopeStat {
|
|
return s.delegate.Stat()
|
|
}
|
|
func (s *loggingScope) BeginSpan() (network.ResourceScopeSpan, error) {
|
|
return s.delegate.BeginSpan()
|
|
}
|
|
func (s *loggingScope) Done() {
|
|
s.delegate.(network.ResourceScopeSpan).Done()
|
|
}
|
|
func (s *loggingScope) Name() string {
|
|
return s.delegate.(network.ServiceScope).Name()
|
|
}
|
|
func (s *loggingScope) Protocol() protocol.ID {
|
|
return s.delegate.(network.ProtocolScope).Protocol()
|
|
}
|
|
func (s *loggingScope) Peer() peer.ID {
|
|
return s.delegate.(network.PeerScope).Peer()
|
|
}
|
|
func (s *loggingScope) PeerScope() network.PeerScope {
|
|
return s.delegate.(network.PeerScope)
|
|
}
|
|
func (s *loggingScope) SetPeer(p peer.ID) error {
|
|
err := s.delegate.(network.ConnManagementScope).SetPeer(p)
|
|
s.countErrs(err)
|
|
return err
|
|
}
|
|
func (s *loggingScope) ProtocolScope() network.ProtocolScope {
|
|
return s.delegate.(network.ProtocolScope)
|
|
}
|
|
func (s *loggingScope) SetProtocol(proto protocol.ID) error {
|
|
err := s.delegate.(network.StreamManagementScope).SetProtocol(proto)
|
|
s.countErrs(err)
|
|
return err
|
|
}
|
|
func (s *loggingScope) ServiceScope() network.ServiceScope {
|
|
return s.delegate.(network.ServiceScope)
|
|
}
|
|
func (s *loggingScope) SetService(srv string) error {
|
|
err := s.delegate.(network.StreamManagementScope).SetService(srv)
|
|
s.countErrs(err)
|
|
return err
|
|
}
|
|
func (s *loggingScope) Limit() rcmgr.Limit {
|
|
return s.delegate.(rcmgr.ResourceScopeLimiter).Limit()
|
|
}
|
|
func (s *loggingScope) SetLimit(limit rcmgr.Limit) {
|
|
s.delegate.(rcmgr.ResourceScopeLimiter).SetLimit(limit)
|
|
}
|