kubo/core/node/libp2p/rcmgr.go
Gus Eggert cb72776dec
feat: log when resource manager limits are exceeded (#8980)
This periodically logs how many times Resource Manager limits were
exceeded. If they aren't exceeded, then nothing is logged. The log
levels are at ERROR log level so that they are shown by default.

The motivation is so that users know when they have exceeded resource
manager limits. To find what is exceeding the limits, they'll need to
turn on debug logging and inspect the errors being logged. This could
collect the specific limits being reached, but that's more complicated
to implement and could result in much longer log messages.

(cherry picked from commit 5615715c55)
2022-06-08 14:24:43 -04:00

409 lines
12 KiB
Go

package libp2p
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/benbjohnson/clock"
config "github.com/ipfs/go-ipfs/config"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"go.uber.org/fx"
)
const NetLimitDefaultFilename = "limit.json"
const NetLimitTraceFilename = "rcmgr.json.gz"
var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled")
func ResourceManager(cfg config.SwarmConfig) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
var manager network.ResourceManager
var opts Libp2pOpts
enabled := cfg.ResourceMgr.Enabled.WithDefault(true)
/// ENV overrides Config (if present)
switch os.Getenv("LIBP2P_RCMGR") {
case "0", "false":
enabled = false
case "1", "true":
enabled = true
}
if enabled {
log.Debug("libp2p resource manager is enabled")
repoPath, err := config.PathRoot()
if err != nil {
return nil, opts, fmt.Errorf("opening IPFS_PATH: %w", err)
}
defaultLimits := adjustedDefaultLimits(cfg)
var limits rcmgr.BasicLimiterConfig
if cfg.ResourceMgr.Limits != nil {
limits = *cfg.ResourceMgr.Limits
}
limiter, err := rcmgr.NewLimiter(limits, defaultLimits)
if err != nil {
return nil, opts, err
}
libp2p.SetDefaultServiceLimits(limiter)
ropts := []rcmgr.Option{rcmgr.WithMetrics(createRcmgrMetrics())}
if os.Getenv("LIBP2P_DEBUG_RCMGR") != "" {
traceFilePath := filepath.Join(repoPath, NetLimitTraceFilename)
ropts = append(ropts, rcmgr.WithTrace(traceFilePath))
}
manager, err = rcmgr.NewResourceManager(limiter, ropts...)
if err != nil {
return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err)
}
lrm := &loggingResourceManager{
clock: clock.New(),
logger: &logging.Logger("resourcemanager").SugaredLogger,
delegate: manager,
}
lrm.start(helpers.LifecycleCtx(mctx, lc))
manager = lrm
} else {
log.Debug("libp2p resource manager is disabled")
manager = network.NullResourceManager
}
opts.Opts = append(opts.Opts, libp2p.ResourceManager(manager))
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return manager.Close()
}})
return manager, opts, nil
}
}
type NetStatOut struct {
System *network.ScopeStat `json:",omitempty"`
Transient *network.ScopeStat `json:",omitempty"`
Services map[string]network.ScopeStat `json:",omitempty"`
Protocols map[string]network.ScopeStat `json:",omitempty"`
Peers map[string]network.ScopeStat `json:",omitempty"`
}
func NetStat(mgr network.ResourceManager, scope string) (NetStatOut, error) {
var err error
var result NetStatOut
switch {
case scope == "all":
rapi, ok := mgr.(rcmgr.ResourceManagerState)
if !ok { // NullResourceManager
return result, NoResourceMgrError
}
stat := rapi.Stat()
result.System = &stat.System
result.Transient = &stat.Transient
if len(stat.Services) > 0 {
result.Services = stat.Services
}
if len(stat.Protocols) > 0 {
result.Protocols = make(map[string]network.ScopeStat, len(stat.Protocols))
for proto, stat := range stat.Protocols {
result.Protocols[string(proto)] = stat
}
}
if len(stat.Peers) > 0 {
result.Peers = make(map[string]network.ScopeStat, len(stat.Peers))
for p, stat := range stat.Peers {
result.Peers[p.Pretty()] = stat
}
}
return result, nil
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error {
stat := s.Stat()
result.System = &stat
return nil
})
return result, err
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error {
stat := s.Stat()
result.Transient = &stat
return nil
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
stat := s.Stat()
result.Services = map[string]network.ScopeStat{
svc: stat,
}
return nil
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
stat := s.Stat()
result.Protocols = map[string]network.ScopeStat{
proto: stat,
}
return nil
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
pid, err := peer.Decode(p)
if err != nil {
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
stat := s.Stat()
result.Peers = map[string]network.ScopeStat{
p: stat,
}
return nil
})
return result, err
default:
return result, fmt.Errorf("invalid scope %q", scope)
}
}
func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BasicLimitConfig, error) {
var result rcmgr.BasicLimitConfig
getLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return NoResourceMgrError
}
limit := limiter.Limit()
switch l := limit.(type) {
case *rcmgr.StaticLimit:
result.Dynamic = false
result.Memory = l.Memory
result.Streams = l.BaseLimit.Streams
result.StreamsInbound = l.BaseLimit.StreamsInbound
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
result.Conns = l.BaseLimit.Conns
result.ConnsInbound = l.BaseLimit.ConnsInbound
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
result.FD = l.BaseLimit.FD
case *rcmgr.DynamicLimit:
result.Dynamic = true
result.MemoryFraction = l.MemoryLimit.MemoryFraction
result.MinMemory = l.MemoryLimit.MinMemory
result.MaxMemory = l.MemoryLimit.MaxMemory
result.Streams = l.BaseLimit.Streams
result.StreamsInbound = l.BaseLimit.StreamsInbound
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
result.Conns = l.BaseLimit.Conns
result.ConnsInbound = l.BaseLimit.ConnsInbound
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
result.FD = l.BaseLimit.FD
default:
return fmt.Errorf("unknown limit type %T", limit)
}
return nil
}
switch {
case scope == config.ResourceMgrSystemScope:
err := mgr.ViewSystem(func(s network.ResourceScope) error {
return getLimit(s)
})
return result, err
case scope == config.ResourceMgrTransientScope:
err := mgr.ViewTransient(func(s network.ResourceScope) error {
return getLimit(s)
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err := mgr.ViewService(svc, func(s network.ServiceScope) error {
return getLimit(s)
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return getLimit(s)
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
pid, err := peer.Decode(p)
if err != nil {
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
return getLimit(s)
})
return result, err
default:
return result, fmt.Errorf("invalid scope %q", scope)
}
}
// NetSetLimit sets new ResourceManager limits for the given scope. The limits take effect immediately, and are also persisted to the repo config.
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.BasicLimitConfig) error {
setLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return NoResourceMgrError
}
var newLimit rcmgr.Limit
if limit.Dynamic {
newLimit = &rcmgr.DynamicLimit{
MemoryLimit: rcmgr.MemoryLimit{
MemoryFraction: limit.MemoryFraction,
MinMemory: limit.MinMemory,
MaxMemory: limit.MaxMemory,
},
BaseLimit: rcmgr.BaseLimit{
Streams: limit.Streams,
StreamsInbound: limit.StreamsInbound,
StreamsOutbound: limit.StreamsOutbound,
Conns: limit.Conns,
ConnsInbound: limit.ConnsInbound,
ConnsOutbound: limit.ConnsOutbound,
FD: limit.FD,
},
}
} else {
newLimit = &rcmgr.StaticLimit{
Memory: limit.Memory,
BaseLimit: rcmgr.BaseLimit{
Streams: limit.Streams,
StreamsInbound: limit.StreamsInbound,
StreamsOutbound: limit.StreamsOutbound,
Conns: limit.Conns,
ConnsInbound: limit.ConnsInbound,
ConnsOutbound: limit.ConnsOutbound,
FD: limit.FD,
},
}
}
limiter.SetLimit(newLimit)
return nil
}
cfg, err := repo.Config()
if err != nil {
return fmt.Errorf("reading config to set limit: %w", err)
}
if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
}
configLimits := cfg.Swarm.ResourceMgr.Limits
var setConfigFunc func()
switch {
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error {
return setLimit(s)
})
setConfigFunc = func() { configLimits.System = &limit }
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error {
return setLimit(s)
})
setConfigFunc = func() { configLimits.Transient = &limit }
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
return setLimit(s)
})
setConfigFunc = func() {
if configLimits.Service == nil {
configLimits.Service = map[string]rcmgr.BasicLimitConfig{}
}
configLimits.Service[svc] = limit
}
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return setLimit(s)
})
setConfigFunc = func() {
if configLimits.Protocol == nil {
configLimits.Protocol = map[string]rcmgr.BasicLimitConfig{}
}
configLimits.Protocol[proto] = limit
}
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
var pid peer.ID
pid, err = peer.Decode(p)
if err != nil {
return fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
return setLimit(s)
})
setConfigFunc = func() {
if configLimits.Peer == nil {
configLimits.Peer = map[string]rcmgr.BasicLimitConfig{}
}
configLimits.Peer[p] = limit
}
default:
return fmt.Errorf("invalid scope %q", scope)
}
if err != nil {
return fmt.Errorf("setting new limits on resource manager: %w", err)
}
if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
}
setConfigFunc()
if err := repo.SetConfig(cfg); err != nil {
return fmt.Errorf("writing new limits to repo config: %w", err)
}
return nil
}