mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-25 20:37:53 +08:00
* feat: persist limit changes to config This changes the "ipfs swarm limit" command so that when limit changes are applied via the command line, they are persisted to the repo config, so that they remain in effect when the daemon restarts. Any existing limit.json can be dropped into the IPFS config easily using something like: cat ~/.ipfs/config | jq ".Swarm.ResourceMgr.Limits = $(cat limit.json)" | sponge ~/.ipfs/config This also upgrades to Resource Manager v0.3.0, which exports the config schema so that we don't have to maintain our own copy of it. Co-authored-by: Marcin Rataj <lidel@lidel.org>
401 lines
11 KiB
Go
401 lines
11 KiB
Go
package libp2p
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
config "github.com/ipfs/go-ipfs/config"
|
|
"github.com/ipfs/go-ipfs/repo"
|
|
|
|
"github.com/libp2p/go-libp2p"
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
|
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
|
|
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
const NetLimitDefaultFilename = "limit.json"
|
|
const NetLimitTraceFilename = "rcmgr.json.gz"
|
|
|
|
var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled")
|
|
|
|
func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
|
|
return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
|
|
var manager network.ResourceManager
|
|
var opts Libp2pOpts
|
|
|
|
// Config Swarm.ResourceMgr.Enabled decides if we run a real manager
|
|
enabled := cfg.ResourceMgr.Enabled.WithDefault(false)
|
|
|
|
/// ENV overrides Config (if present)
|
|
switch os.Getenv("LIBP2P_RCMGR") {
|
|
case "0", "false":
|
|
enabled = false
|
|
case "1", "true":
|
|
enabled = true
|
|
}
|
|
|
|
if enabled {
|
|
log.Debug("libp2p resource manager is enabled")
|
|
|
|
repoPath, err := config.PathRoot()
|
|
if err != nil {
|
|
return nil, opts, fmt.Errorf("opening IPFS_PATH: %w", err)
|
|
}
|
|
|
|
defaultLimits := adjustedDefaultLimits(cfg)
|
|
|
|
var limits rcmgr.BasicLimiterConfig
|
|
if cfg.ResourceMgr.Limits != nil {
|
|
limits = *cfg.ResourceMgr.Limits
|
|
}
|
|
|
|
limiter, err := rcmgr.NewLimiter(limits, defaultLimits)
|
|
if err != nil {
|
|
return nil, opts, err
|
|
}
|
|
|
|
libp2p.SetDefaultServiceLimits(limiter)
|
|
|
|
ropts := []rcmgr.Option{rcmgr.WithMetrics(createRcmgrMetrics())}
|
|
|
|
if os.Getenv("LIBP2P_DEBUG_RCMGR") != "" {
|
|
traceFilePath := filepath.Join(repoPath, NetLimitTraceFilename)
|
|
ropts = append(ropts, rcmgr.WithTrace(traceFilePath))
|
|
}
|
|
|
|
manager, err = rcmgr.NewResourceManager(limiter, ropts...)
|
|
if err != nil {
|
|
return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err)
|
|
}
|
|
} else {
|
|
log.Debug("libp2p resource manager is disabled")
|
|
manager = network.NullResourceManager
|
|
}
|
|
|
|
opts.Opts = append(opts.Opts, libp2p.ResourceManager(manager))
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error {
|
|
return manager.Close()
|
|
}})
|
|
|
|
return manager, opts, nil
|
|
}
|
|
}
|
|
|
|
type NetStatOut struct {
|
|
System *network.ScopeStat `json:",omitempty"`
|
|
Transient *network.ScopeStat `json:",omitempty"`
|
|
Services map[string]network.ScopeStat `json:",omitempty"`
|
|
Protocols map[string]network.ScopeStat `json:",omitempty"`
|
|
Peers map[string]network.ScopeStat `json:",omitempty"`
|
|
}
|
|
|
|
func NetStat(mgr network.ResourceManager, scope string) (NetStatOut, error) {
|
|
var err error
|
|
var result NetStatOut
|
|
switch {
|
|
case scope == "all":
|
|
rapi, ok := mgr.(rcmgr.ResourceManagerState)
|
|
if !ok { // NullResourceManager
|
|
return result, NoResourceMgrError
|
|
}
|
|
|
|
stat := rapi.Stat()
|
|
result.System = &stat.System
|
|
result.Transient = &stat.Transient
|
|
if len(stat.Services) > 0 {
|
|
result.Services = stat.Services
|
|
}
|
|
if len(stat.Protocols) > 0 {
|
|
result.Protocols = make(map[string]network.ScopeStat, len(stat.Protocols))
|
|
for proto, stat := range stat.Protocols {
|
|
result.Protocols[string(proto)] = stat
|
|
}
|
|
}
|
|
if len(stat.Peers) > 0 {
|
|
result.Peers = make(map[string]network.ScopeStat, len(stat.Peers))
|
|
for p, stat := range stat.Peers {
|
|
result.Peers[p.Pretty()] = stat
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
|
|
case scope == config.ResourceMgrSystemScope:
|
|
err = mgr.ViewSystem(func(s network.ResourceScope) error {
|
|
stat := s.Stat()
|
|
result.System = &stat
|
|
return nil
|
|
})
|
|
return result, err
|
|
|
|
case scope == config.ResourceMgrTransientScope:
|
|
err = mgr.ViewTransient(func(s network.ResourceScope) error {
|
|
stat := s.Stat()
|
|
result.Transient = &stat
|
|
return nil
|
|
})
|
|
return result, err
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
|
|
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
|
|
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
|
|
stat := s.Stat()
|
|
result.Services = map[string]network.ScopeStat{
|
|
svc: stat,
|
|
}
|
|
return nil
|
|
})
|
|
return result, err
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
|
|
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
|
|
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
|
|
stat := s.Stat()
|
|
result.Protocols = map[string]network.ScopeStat{
|
|
proto: stat,
|
|
}
|
|
return nil
|
|
})
|
|
return result, err
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
|
|
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
|
|
pid, err := peer.Decode(p)
|
|
if err != nil {
|
|
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
|
|
}
|
|
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
|
|
stat := s.Stat()
|
|
result.Peers = map[string]network.ScopeStat{
|
|
p: stat,
|
|
}
|
|
return nil
|
|
})
|
|
return result, err
|
|
|
|
default:
|
|
return result, fmt.Errorf("invalid scope %q", scope)
|
|
}
|
|
}
|
|
|
|
func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BasicLimitConfig, error) {
|
|
var result rcmgr.BasicLimitConfig
|
|
getLimit := func(s network.ResourceScope) error {
|
|
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
|
|
if !ok { // NullResourceManager
|
|
return NoResourceMgrError
|
|
}
|
|
limit := limiter.Limit()
|
|
switch l := limit.(type) {
|
|
case *rcmgr.StaticLimit:
|
|
result.Dynamic = false
|
|
result.Memory = l.Memory
|
|
result.Streams = l.BaseLimit.Streams
|
|
result.StreamsInbound = l.BaseLimit.StreamsInbound
|
|
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
|
|
result.Conns = l.BaseLimit.Conns
|
|
result.ConnsInbound = l.BaseLimit.ConnsInbound
|
|
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
|
|
result.FD = l.BaseLimit.FD
|
|
|
|
case *rcmgr.DynamicLimit:
|
|
result.Dynamic = true
|
|
result.MemoryFraction = l.MemoryLimit.MemoryFraction
|
|
result.MinMemory = l.MemoryLimit.MinMemory
|
|
result.MaxMemory = l.MemoryLimit.MaxMemory
|
|
result.Streams = l.BaseLimit.Streams
|
|
result.StreamsInbound = l.BaseLimit.StreamsInbound
|
|
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
|
|
result.Conns = l.BaseLimit.Conns
|
|
result.ConnsInbound = l.BaseLimit.ConnsInbound
|
|
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
|
|
result.FD = l.BaseLimit.FD
|
|
|
|
default:
|
|
return fmt.Errorf("unknown limit type %T", limit)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
switch {
|
|
case scope == config.ResourceMgrSystemScope:
|
|
err := mgr.ViewSystem(func(s network.ResourceScope) error {
|
|
return getLimit(s)
|
|
})
|
|
return result, err
|
|
|
|
case scope == config.ResourceMgrTransientScope:
|
|
err := mgr.ViewTransient(func(s network.ResourceScope) error {
|
|
return getLimit(s)
|
|
})
|
|
return result, err
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
|
|
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
|
|
err := mgr.ViewService(svc, func(s network.ServiceScope) error {
|
|
return getLimit(s)
|
|
})
|
|
return result, err
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
|
|
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
|
|
err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
|
|
return getLimit(s)
|
|
})
|
|
return result, err
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
|
|
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
|
|
pid, err := peer.Decode(p)
|
|
if err != nil {
|
|
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
|
|
}
|
|
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
|
|
return getLimit(s)
|
|
})
|
|
return result, err
|
|
|
|
default:
|
|
return result, fmt.Errorf("invalid scope %q", scope)
|
|
}
|
|
}
|
|
|
|
// NetSetLimit sets new ResourceManager limits for the given scope. The limits take effect immediately, and are also persisted to the repo config.
|
|
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.BasicLimitConfig) error {
|
|
setLimit := func(s network.ResourceScope) error {
|
|
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
|
|
if !ok { // NullResourceManager
|
|
return NoResourceMgrError
|
|
}
|
|
|
|
var newLimit rcmgr.Limit
|
|
if limit.Dynamic {
|
|
newLimit = &rcmgr.DynamicLimit{
|
|
MemoryLimit: rcmgr.MemoryLimit{
|
|
MemoryFraction: limit.MemoryFraction,
|
|
MinMemory: limit.MinMemory,
|
|
MaxMemory: limit.MaxMemory,
|
|
},
|
|
BaseLimit: rcmgr.BaseLimit{
|
|
Streams: limit.Streams,
|
|
StreamsInbound: limit.StreamsInbound,
|
|
StreamsOutbound: limit.StreamsOutbound,
|
|
Conns: limit.Conns,
|
|
ConnsInbound: limit.ConnsInbound,
|
|
ConnsOutbound: limit.ConnsOutbound,
|
|
FD: limit.FD,
|
|
},
|
|
}
|
|
} else {
|
|
newLimit = &rcmgr.StaticLimit{
|
|
Memory: limit.Memory,
|
|
BaseLimit: rcmgr.BaseLimit{
|
|
Streams: limit.Streams,
|
|
StreamsInbound: limit.StreamsInbound,
|
|
StreamsOutbound: limit.StreamsOutbound,
|
|
Conns: limit.Conns,
|
|
ConnsInbound: limit.ConnsInbound,
|
|
ConnsOutbound: limit.ConnsOutbound,
|
|
FD: limit.FD,
|
|
},
|
|
}
|
|
}
|
|
|
|
limiter.SetLimit(newLimit)
|
|
return nil
|
|
}
|
|
|
|
cfg, err := repo.Config()
|
|
if err != nil {
|
|
return fmt.Errorf("reading config to set limit: %w", err)
|
|
}
|
|
|
|
if cfg.Swarm.ResourceMgr.Limits == nil {
|
|
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
|
|
}
|
|
configLimits := cfg.Swarm.ResourceMgr.Limits
|
|
|
|
var setConfigFunc func()
|
|
switch {
|
|
case scope == config.ResourceMgrSystemScope:
|
|
err = mgr.ViewSystem(func(s network.ResourceScope) error {
|
|
return setLimit(s)
|
|
})
|
|
setConfigFunc = func() { configLimits.System = &limit }
|
|
|
|
case scope == config.ResourceMgrTransientScope:
|
|
err = mgr.ViewTransient(func(s network.ResourceScope) error {
|
|
return setLimit(s)
|
|
})
|
|
setConfigFunc = func() { configLimits.Transient = &limit }
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
|
|
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
|
|
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
|
|
return setLimit(s)
|
|
})
|
|
setConfigFunc = func() {
|
|
if configLimits.Service == nil {
|
|
configLimits.Service = map[string]rcmgr.BasicLimitConfig{}
|
|
}
|
|
configLimits.Service[svc] = limit
|
|
}
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
|
|
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
|
|
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
|
|
return setLimit(s)
|
|
})
|
|
setConfigFunc = func() {
|
|
if configLimits.Protocol == nil {
|
|
configLimits.Protocol = map[string]rcmgr.BasicLimitConfig{}
|
|
}
|
|
configLimits.Protocol[proto] = limit
|
|
}
|
|
|
|
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
|
|
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
|
|
var pid peer.ID
|
|
pid, err = peer.Decode(p)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid peer ID: %q: %w", p, err)
|
|
}
|
|
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
|
|
return setLimit(s)
|
|
})
|
|
setConfigFunc = func() {
|
|
if configLimits.Peer == nil {
|
|
configLimits.Peer = map[string]rcmgr.BasicLimitConfig{}
|
|
}
|
|
configLimits.Peer[p] = limit
|
|
}
|
|
|
|
default:
|
|
return fmt.Errorf("invalid scope %q", scope)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("setting new limits on resource manager: %w", err)
|
|
}
|
|
|
|
if cfg.Swarm.ResourceMgr.Limits == nil {
|
|
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
|
|
}
|
|
setConfigFunc()
|
|
|
|
if err := repo.SetConfig(cfg); err != nil {
|
|
return fmt.Errorf("writing new limits to repo config: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|