package libp2p import ( "context" "errors" "fmt" "os" "path/filepath" "strings" config "github.com/ipfs/go-ipfs/config" "github.com/ipfs/go-ipfs/repo" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" rcmgr "github.com/libp2p/go-libp2p-resource-manager" "go.uber.org/fx" ) const NetLimitDefaultFilename = "limit.json" const NetLimitTraceFilename = "rcmgr.json.gz" var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled") func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) { return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { var limiter *rcmgr.BasicLimiter var manager network.ResourceManager var opts Libp2pOpts // Config Swarm.ResourceMgr.Enabled decides if we run a real manager enabled := cfg.ResourceMgr.Enabled.WithDefault(false) /// ENV overrides Config (if present) switch os.Getenv("LIBP2P_RCMGR") { case "0", "false": enabled = false case "1", "true": enabled = true } if enabled { log.Debug("libp2p resource manager is enabled") repoPath, err := config.PathRoot() if err != nil { return nil, opts, fmt.Errorf("error opening IPFS_PATH: %w", err) } // Create limiter: // - parse $IPFS_PATH/limits.json if exists // - use defaultLimits from rcmgr_defaults.go defaultLimits := adjustedDefaultLimits(cfg) limitFilePath := filepath.Join(repoPath, NetLimitDefaultFilename) limitFile, err := os.Open(limitFilePath) switch { case err == nil: defer limitFile.Close() limiter, err = rcmgr.NewLimiterFromJSON(limitFile, defaultLimits) if err != nil { return nil, opts, fmt.Errorf("error parsing libp2p limit file: %w", err) } case errors.Is(err, os.ErrNotExist): limiter = rcmgr.NewStaticLimiter(defaultLimits) default: return nil, opts, err } libp2p.SetDefaultServiceLimits(limiter) ropts := []rcmgr.Option{rcmgr.WithMetrics(createRcmgrMetrics())} if os.Getenv("LIBP2P_DEBUG_RCMGR") != "" { traceFilePath := filepath.Join(repoPath, NetLimitTraceFilename) ropts = append(ropts, rcmgr.WithTrace(traceFilePath)) } manager, err = rcmgr.NewResourceManager(limiter, ropts...) if err != nil { return nil, opts, fmt.Errorf("error creating libp2p resource manager: %w", err) } } else { log.Debug("libp2p resource manager is disabled") manager = network.NullResourceManager } opts.Opts = append(opts.Opts, libp2p.ResourceManager(manager)) lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { return manager.Close() }}) return manager, opts, nil } } type NetStatOut struct { System *network.ScopeStat `json:",omitempty"` Transient *network.ScopeStat `json:",omitempty"` Services map[string]network.ScopeStat `json:",omitempty"` Protocols map[string]network.ScopeStat `json:",omitempty"` Peers map[string]network.ScopeStat `json:",omitempty"` } func NetStat(mgr network.ResourceManager, scope string) (NetStatOut, error) { var err error var result NetStatOut switch { case scope == "all": rapi, ok := mgr.(rcmgr.ResourceManagerState) if !ok { // NullResourceManager return result, NoResourceMgrError } stat := rapi.Stat() result.System = &stat.System result.Transient = &stat.Transient if len(stat.Services) > 0 { result.Services = stat.Services } if len(stat.Protocols) > 0 { result.Protocols = make(map[string]network.ScopeStat, len(stat.Protocols)) for proto, stat := range stat.Protocols { result.Protocols[string(proto)] = stat } } if len(stat.Peers) > 0 { result.Peers = make(map[string]network.ScopeStat, len(stat.Peers)) for p, stat := range stat.Peers { result.Peers[p.Pretty()] = stat } } return result, nil case scope == config.ResourceMgrSystemScope: err = mgr.ViewSystem(func(s network.ResourceScope) error { stat := s.Stat() result.System = &stat return nil }) return result, err case scope == config.ResourceMgrTransientScope: err = mgr.ViewTransient(func(s network.ResourceScope) error { stat := s.Stat() result.Transient = &stat return nil }) return result, err case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix) err = mgr.ViewService(svc, func(s network.ServiceScope) error { stat := s.Stat() result.Services = map[string]network.ScopeStat{ svc: stat, } return nil }) return result, err case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix) err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { stat := s.Stat() result.Protocols = map[string]network.ScopeStat{ proto: stat, } return nil }) return result, err case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix) pid, err := peer.Decode(p) if err != nil { return result, fmt.Errorf("invalid peer ID: %q: %w", p, err) } err = mgr.ViewPeer(pid, func(s network.PeerScope) error { stat := s.Stat() result.Peers = map[string]network.ScopeStat{ p: stat, } return nil }) return result, err default: return result, fmt.Errorf("invalid scope %q", scope) } } func NetLimit(mgr network.ResourceManager, scope string) (config.ResourceMgrScopeConfig, error) { var result config.ResourceMgrScopeConfig getLimit := func(s network.ResourceScope) error { limiter, ok := s.(rcmgr.ResourceScopeLimiter) if !ok { // NullResourceManager return NoResourceMgrError } limit := limiter.Limit() switch l := limit.(type) { case *rcmgr.StaticLimit: result.Dynamic = false result.Memory = l.Memory result.Streams = l.BaseLimit.Streams result.StreamsInbound = l.BaseLimit.StreamsInbound result.StreamsOutbound = l.BaseLimit.StreamsOutbound result.Conns = l.BaseLimit.Conns result.ConnsInbound = l.BaseLimit.ConnsInbound result.ConnsOutbound = l.BaseLimit.ConnsOutbound result.FD = l.BaseLimit.FD case *rcmgr.DynamicLimit: result.Dynamic = true result.MemoryFraction = l.MemoryLimit.MemoryFraction result.MinMemory = l.MemoryLimit.MinMemory result.MaxMemory = l.MemoryLimit.MaxMemory result.Streams = l.BaseLimit.Streams result.StreamsInbound = l.BaseLimit.StreamsInbound result.StreamsOutbound = l.BaseLimit.StreamsOutbound result.Conns = l.BaseLimit.Conns result.ConnsInbound = l.BaseLimit.ConnsInbound result.ConnsOutbound = l.BaseLimit.ConnsOutbound result.FD = l.BaseLimit.FD default: return fmt.Errorf("unknown limit type %T", limit) } return nil } switch { case scope == config.ResourceMgrSystemScope: err := mgr.ViewSystem(func(s network.ResourceScope) error { return getLimit(s) }) return result, err case scope == config.ResourceMgrTransientScope: err := mgr.ViewTransient(func(s network.ResourceScope) error { return getLimit(s) }) return result, err case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix) err := mgr.ViewService(svc, func(s network.ServiceScope) error { return getLimit(s) }) return result, err case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix) err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return getLimit(s) }) return result, err case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix) pid, err := peer.Decode(p) if err != nil { return result, fmt.Errorf("invalid peer ID: %q: %w", p, err) } err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return getLimit(s) }) return result, err default: return result, fmt.Errorf("invalid scope %q", scope) } } func NetSetLimit(mgr network.ResourceManager, scope string, limit config.ResourceMgrScopeConfig) error { setLimit := func(s network.ResourceScope) error { limiter, ok := s.(rcmgr.ResourceScopeLimiter) if !ok { // NullResourceManager return NoResourceMgrError } var newLimit rcmgr.Limit if limit.Dynamic { newLimit = &rcmgr.DynamicLimit{ MemoryLimit: rcmgr.MemoryLimit{ MemoryFraction: limit.MemoryFraction, MinMemory: limit.MinMemory, MaxMemory: limit.MaxMemory, }, BaseLimit: rcmgr.BaseLimit{ Streams: limit.Streams, StreamsInbound: limit.StreamsInbound, StreamsOutbound: limit.StreamsOutbound, Conns: limit.Conns, ConnsInbound: limit.ConnsInbound, ConnsOutbound: limit.ConnsOutbound, FD: limit.FD, }, } } else { newLimit = &rcmgr.StaticLimit{ Memory: limit.Memory, BaseLimit: rcmgr.BaseLimit{ Streams: limit.Streams, StreamsInbound: limit.StreamsInbound, StreamsOutbound: limit.StreamsOutbound, Conns: limit.Conns, ConnsInbound: limit.ConnsInbound, ConnsOutbound: limit.ConnsOutbound, FD: limit.FD, }, } } limiter.SetLimit(newLimit) return nil } switch { case scope == config.ResourceMgrSystemScope: err := mgr.ViewSystem(func(s network.ResourceScope) error { return setLimit(s) }) return err case scope == config.ResourceMgrTransientScope: err := mgr.ViewTransient(func(s network.ResourceScope) error { return setLimit(s) }) return err case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix): svc := scope[4:] err := mgr.ViewService(svc, func(s network.ServiceScope) error { return setLimit(s) }) return err case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix): proto := scope[6:] err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return setLimit(s) }) return err case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix): p := scope[5:] pid, err := peer.Decode(p) if err != nil { return fmt.Errorf("invalid peer ID: %q: %w", p, err) } err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return setLimit(s) }) return err default: return fmt.Errorf("invalid scope %q", scope) } }