mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-06 08:47:52 +08:00
feat: log when resource manager limits are exceeded (#8980)
This periodically logs how many times Resource Manager limits were
exceeded. If they aren't exceeded, then nothing is logged. The log
levels are at ERROR log level so that they are shown by default.
The motivation is so that users know when they have exceeded resource
manager limits. To find what is exceeding the limits, they'll need to
turn on debug logging and inspect the errors being logged. This could
collect the specific limits being reached, but that's more complicated
to implement and could result in much longer log messages.
(cherry picked from commit 5615715c55)
This commit is contained in:
parent
4449909b2d
commit
cb72776dec
@ -409,7 +409,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel
|
||||
return errors.New("expected a JSON file")
|
||||
}
|
||||
if err := json.NewDecoder(file).Decode(&newLimit); err != nil {
|
||||
return errors.New("failed to decode JSON as ResourceMgrScopeConfig")
|
||||
return fmt.Errorf("decoding JSON as ResourceMgrScopeConfig: %w", err)
|
||||
}
|
||||
return libp2p.NetSetLimit(node.ResourceManager, node.Repo, scope, newLimit)
|
||||
}
|
||||
|
||||
@ -7,9 +7,11 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
config "github.com/ipfs/go-ipfs/config"
|
||||
"github.com/ipfs/go-ipfs/core/node/helpers"
|
||||
"github.com/ipfs/go-ipfs/repo"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@ -24,8 +26,8 @@ const NetLimitTraceFilename = "rcmgr.json.gz"
|
||||
|
||||
var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled")
|
||||
|
||||
func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
|
||||
return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
|
||||
func ResourceManager(cfg config.SwarmConfig) interface{} {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
|
||||
var manager network.ResourceManager
|
||||
var opts Libp2pOpts
|
||||
|
||||
@ -72,6 +74,13 @@ func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (netw
|
||||
if err != nil {
|
||||
return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err)
|
||||
}
|
||||
lrm := &loggingResourceManager{
|
||||
clock: clock.New(),
|
||||
logger: &logging.Logger("resourcemanager").SugaredLogger,
|
||||
delegate: manager,
|
||||
}
|
||||
lrm.start(helpers.LifecycleCtx(mctx, lc))
|
||||
manager = lrm
|
||||
} else {
|
||||
log.Debug("libp2p resource manager is disabled")
|
||||
manager = network.NullResourceManager
|
||||
|
||||
160
core/node/libp2p/rcmgr_logging.go
Normal file
160
core/node/libp2p/rcmgr_logging.go
Normal file
@ -0,0 +1,160 @@
|
||||
package libp2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type loggingResourceManager struct {
|
||||
clock clock.Clock
|
||||
logger *zap.SugaredLogger
|
||||
delegate network.ResourceManager
|
||||
logInterval time.Duration
|
||||
|
||||
mut sync.Mutex
|
||||
limitExceededErrs uint64
|
||||
}
|
||||
|
||||
type loggingScope struct {
|
||||
logger *zap.SugaredLogger
|
||||
delegate network.ResourceScope
|
||||
countErrs func(error)
|
||||
}
|
||||
|
||||
var _ network.ResourceManager = (*loggingResourceManager)(nil)
|
||||
|
||||
func (n *loggingResourceManager) start(ctx context.Context) {
|
||||
logInterval := n.logInterval
|
||||
if logInterval == 0 {
|
||||
logInterval = 10 * time.Second
|
||||
}
|
||||
ticker := n.clock.Ticker(logInterval)
|
||||
go func() {
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
n.mut.Lock()
|
||||
errs := n.limitExceededErrs
|
||||
n.limitExceededErrs = 0
|
||||
n.mut.Unlock()
|
||||
if errs != 0 {
|
||||
n.logger.Warnf("Resource limits were exceeded %d times, consider inspecting logs and raising the resource manager limits.", errs)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (n *loggingResourceManager) countErrs(err error) {
|
||||
if errors.Is(err, network.ErrResourceLimitExceeded) {
|
||||
n.mut.Lock()
|
||||
n.limitExceededErrs++
|
||||
n.mut.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (n *loggingResourceManager) ViewSystem(f func(network.ResourceScope) error) error {
|
||||
return n.delegate.ViewSystem(f)
|
||||
}
|
||||
func (n *loggingResourceManager) ViewTransient(f func(network.ResourceScope) error) error {
|
||||
return n.delegate.ViewTransient(func(s network.ResourceScope) error {
|
||||
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
|
||||
})
|
||||
}
|
||||
func (n *loggingResourceManager) ViewService(svc string, f func(network.ServiceScope) error) error {
|
||||
return n.delegate.ViewService(svc, func(s network.ServiceScope) error {
|
||||
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
|
||||
})
|
||||
}
|
||||
func (n *loggingResourceManager) ViewProtocol(p protocol.ID, f func(network.ProtocolScope) error) error {
|
||||
return n.delegate.ViewProtocol(p, func(s network.ProtocolScope) error {
|
||||
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
|
||||
})
|
||||
}
|
||||
func (n *loggingResourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error {
|
||||
return n.delegate.ViewPeer(p, func(s network.PeerScope) error {
|
||||
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
|
||||
})
|
||||
}
|
||||
func (n *loggingResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) {
|
||||
connMgmtScope, err := n.delegate.OpenConnection(dir, usefd)
|
||||
n.countErrs(err)
|
||||
return connMgmtScope, err
|
||||
}
|
||||
func (n *loggingResourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
|
||||
connMgmtScope, err := n.delegate.OpenStream(p, dir)
|
||||
n.countErrs(err)
|
||||
return connMgmtScope, err
|
||||
}
|
||||
func (n *loggingResourceManager) Close() error {
|
||||
return n.delegate.Close()
|
||||
}
|
||||
|
||||
func (s *loggingScope) ReserveMemory(size int, prio uint8) error {
|
||||
err := s.delegate.ReserveMemory(size, prio)
|
||||
s.countErrs(err)
|
||||
return err
|
||||
}
|
||||
func (s *loggingScope) ReleaseMemory(size int) {
|
||||
s.delegate.ReleaseMemory(size)
|
||||
}
|
||||
func (s *loggingScope) Stat() network.ScopeStat {
|
||||
return s.delegate.Stat()
|
||||
}
|
||||
func (s *loggingScope) BeginSpan() (network.ResourceScopeSpan, error) {
|
||||
return s.delegate.BeginSpan()
|
||||
}
|
||||
func (s *loggingScope) Done() {
|
||||
s.delegate.(network.ResourceScopeSpan).Done()
|
||||
}
|
||||
func (s *loggingScope) Name() string {
|
||||
return s.delegate.(network.ServiceScope).Name()
|
||||
}
|
||||
func (s *loggingScope) Protocol() protocol.ID {
|
||||
return s.delegate.(network.ProtocolScope).Protocol()
|
||||
}
|
||||
func (s *loggingScope) Peer() peer.ID {
|
||||
return s.delegate.(network.PeerScope).Peer()
|
||||
}
|
||||
func (s *loggingScope) PeerScope() network.PeerScope {
|
||||
return s.delegate.(network.PeerScope)
|
||||
}
|
||||
func (s *loggingScope) SetPeer(p peer.ID) error {
|
||||
err := s.delegate.(network.ConnManagementScope).SetPeer(p)
|
||||
s.countErrs(err)
|
||||
return err
|
||||
}
|
||||
func (s *loggingScope) ProtocolScope() network.ProtocolScope {
|
||||
return s.delegate.(network.ProtocolScope)
|
||||
}
|
||||
func (s *loggingScope) SetProtocol(proto protocol.ID) error {
|
||||
err := s.delegate.(network.StreamManagementScope).SetProtocol(proto)
|
||||
s.countErrs(err)
|
||||
return err
|
||||
}
|
||||
func (s *loggingScope) ServiceScope() network.ServiceScope {
|
||||
return s.delegate.(network.ServiceScope)
|
||||
}
|
||||
func (s *loggingScope) SetService(srv string) error {
|
||||
err := s.delegate.(network.StreamManagementScope).SetService(srv)
|
||||
s.countErrs(err)
|
||||
return err
|
||||
}
|
||||
func (s *loggingScope) Limit() rcmgr.Limit {
|
||||
return s.delegate.(rcmgr.ResourceScopeLimiter).Limit()
|
||||
}
|
||||
func (s *loggingScope) SetLimit(limit rcmgr.Limit) {
|
||||
s.delegate.(rcmgr.ResourceScopeLimiter).SetLimit(limit)
|
||||
}
|
||||
58
core/node/libp2p/rcmgr_logging_test.go
Normal file
58
core/node/libp2p/rcmgr_logging_test.go
Normal file
@ -0,0 +1,58 @@
|
||||
package libp2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest/observer"
|
||||
)
|
||||
|
||||
func TestLoggingResourceManager(t *testing.T) {
|
||||
clock := clock.NewMock()
|
||||
limiter := rcmgr.NewDefaultLimiter()
|
||||
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1, 1, 1)
|
||||
rm, err := rcmgr.NewResourceManager(limiter)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
oCore, oLogs := observer.New(zap.WarnLevel)
|
||||
oLogger := zap.New(oCore)
|
||||
lrm := &loggingResourceManager{
|
||||
clock: clock,
|
||||
logger: oLogger.Sugar(),
|
||||
delegate: rm,
|
||||
logInterval: 1 * time.Second,
|
||||
}
|
||||
|
||||
// 2 of these should result in resource limit exceeded errors and subsequent log messages
|
||||
for i := 0; i < 3; i++ {
|
||||
_, _ = lrm.OpenConnection(network.DirInbound, false)
|
||||
}
|
||||
|
||||
// run the logger which will write an entry for those errors
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
lrm.start(ctx)
|
||||
clock.Add(3 * time.Second)
|
||||
|
||||
timer := time.NewTimer(1 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
t.Fatalf("expected logs never arrived")
|
||||
default:
|
||||
if oLogs.Len() == 0 {
|
||||
continue
|
||||
}
|
||||
require.Equal(t, "Resource limits were exceeded 2 times, consider inspecting logs and raising the resource manager limits.", oLogs.All()[0].Message)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
7
go.mod
7
go.mod
@ -125,13 +125,17 @@ require (
|
||||
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/benbjohnson/clock v1.3.0
|
||||
github.com/ipfs/go-log/v2 v2.5.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
|
||||
github.com/Kubuxu/go-os-helper v0.0.1 // indirect
|
||||
github.com/Stebalien/go-bitfield v0.0.1 // indirect
|
||||
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect
|
||||
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
|
||||
github.com/benbjohnson/clock v1.3.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/btcsuite/btcd v0.22.0-beta // indirect
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
|
||||
@ -172,7 +176,6 @@ require (
|
||||
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
|
||||
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
|
||||
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
|
||||
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
||||
github.com/ipfs/go-peertaskqueue v0.7.1 // indirect
|
||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||
github.com/klauspost/compress v1.15.1 // indirect
|
||||
|
||||
Loading…
Reference in New Issue
Block a user