feat: Reduce RM code footprint

Co-Authored-By: Antonio Navarro Perez <antnavper@gmail.com>
This commit is contained in:
Jorropo 2023-03-01 15:29:45 +01:00
parent d1541e1d30
commit 7986196414
19 changed files with 703 additions and 1015 deletions

View File

@ -309,7 +309,7 @@ jobs:
- run:
name: Cloning
command: |
git clone https://github.com/ipfs/go-ipfs-http-client.git
git clone https://github.com/ipfs/go-ipfs-http-client.git -b bump-for-rcmgr-last-push
git -C go-ipfs-http-client log -1
- restore_cache:
keys:

View File

@ -143,6 +143,7 @@ jobs:
with:
repository: ipfs/go-ipfs-http-client
path: go-ipfs-http-client
ref: bump-for-rcmgr-last-push
- uses: protocol/cache-go-action@v1
with:
name: ${{ github.job }}

View File

@ -1,7 +1,5 @@
package config
import rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
type SwarmConfig struct {
// AddrFilters specifies a set libp2p addresses that we should never
// dial or receive connections from.
@ -141,8 +139,8 @@ type ConnMgr struct {
// <https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager#readme>
type ResourceMgr struct {
// Enables the Network Resource Manager feature, default to on.
Enabled Flag `json:",omitempty"`
Limits *rcmgr.PartialLimitConfig `json:",omitempty"`
Enabled Flag `json:",omitempty"`
Limits swarmLimits `json:",omitempty"`
MaxMemory *OptionalString `json:",omitempty"`
MaxFileDescriptors *OptionalInteger `json:",omitempty"`

View File

@ -1,8 +1,10 @@
package config
import (
"bytes"
"encoding/json"
"fmt"
"io"
"strings"
"time"
)
@ -412,3 +414,27 @@ func (p OptionalString) String() string {
var _ json.Unmarshaler = (*OptionalInteger)(nil)
var _ json.Marshaler = (*OptionalInteger)(nil)
type swarmLimits struct{}
var _ json.Unmarshaler = swarmLimits{}
func (swarmLimits) UnmarshalJSON(b []byte) error {
d := json.NewDecoder(bytes.NewReader(b))
for {
switch tok, err := d.Token(); err {
case io.EOF:
return nil
case nil:
switch tok {
case json.Delim('{'), json.Delim('}'):
// accept empty objects
continue
}
//nolint
return fmt.Errorf("The Swarm.ResourceMgr.Limits configuration has been removed in Kubo 0.19 and should be empty or not present. To set custom libp2p limits, read https://github.com/ipfs/kubo/blob/master/docs/libp2p-resource-management.md#user-supplied-override-limits")
default:
return err
}
}
}

View File

@ -248,13 +248,12 @@ func TestCommands(t *testing.T) {
"/swarm/filters",
"/swarm/filters/add",
"/swarm/filters/rm",
"/swarm/limit",
"/swarm/peers",
"/swarm/peering",
"/swarm/peering/add",
"/swarm/peering/ls",
"/swarm/peering/rm",
"/swarm/stats",
"/swarm/resources",
"/tar",
"/tar/add",
"/tar/cat",

View File

@ -1,7 +1,6 @@
package commands
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -9,10 +8,11 @@ import (
"io"
"path"
"sort"
"strconv"
"sync"
"text/tabwriter"
"time"
"github.com/ipfs/go-libipfs/files"
"github.com/ipfs/kubo/commands"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/commands/cmdenv"
@ -57,8 +57,8 @@ ipfs peers in the internet.
"filters": swarmFiltersCmd,
"peers": swarmPeersCmd,
"peering": swarmPeeringCmd,
"stats": swarmStatsCmd, // libp2p Network Resource Manager
"limit": swarmLimitCmd, // libp2p Network Resource Manager
"resources": swarmResourcesCmd, // libp2p Network Resource Manager
},
}
@ -323,30 +323,15 @@ var swarmPeersCmd = &cmds.Command{
Type: connInfos{},
}
var swarmStatsCmd = &cmds.Command{
var swarmResourcesCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Report resource usage for a scope.",
LongDescription: `Report resource usage for a scope.
The scope can be one of the following:
- system -- reports the system aggregate resource usage.
- transient -- reports the transient resource usage.
- svc:<service> -- reports the resource usage of a specific service.
- proto:<proto> -- reports the resource usage of a specific protocol.
- peer:<peer> -- reports the resource usage of a specific peer.
- all -- reports the resource usage for all currently active scopes.
The output of this command is JSON.
To see all resources that are close to hitting their respective limit, one can do something like:
ipfs swarm stats --min-used-limit-perc=90 all
Tagline: "Get a summary of all resources accounted for by the libp2p Resource Manager.",
LongDescription: `
Get a summary of all resources accounted for by the libp2p Resource Manager.
This includes the limits and the usage against those limits.
This can output a human readable table and JSON encoding.
`},
Arguments: []cmds.Argument{
cmds.StringArg("scope", true, false, "scope of the stat report"),
},
Options: []cmds.Option{
cmds.IntOption(swarmUsedResourcesPercentageName, "Only display resources that are using above the specified percentage of their respective limit"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
node, err := cmdenv.GetNode(env)
if err != nil {
@ -357,128 +342,68 @@ To see all resources that are close to hitting their respective limit, one can d
return libp2p.ErrNoResourceMgr
}
if len(req.Arguments) != 1 {
return fmt.Errorf("must specify exactly one scope")
}
percentage, _ := req.Options[swarmUsedResourcesPercentageName].(int)
scope := req.Arguments[0]
if percentage != 0 && scope != "all" {
return fmt.Errorf("%q can only be used when scope is %q", swarmUsedResourcesPercentageName, "all")
}
result, err := libp2p.NetStat(node.ResourceManager, scope, percentage)
cfg, err := node.Repo.Config()
if err != nil {
return err
}
b := new(bytes.Buffer)
enc := json.NewEncoder(b)
err = enc.Encode(result)
if err != nil {
return err
}
return cmds.EmitOnce(res, b)
},
Encoders: cmds.EncoderMap{
cmds.Text: HumanJSONEncoder,
},
}
var swarmLimitCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Get or set resource limits for a scope.",
LongDescription: `Get or set resource limits for a scope.
The scope can be one of the following:
- all -- all limits actually being applied.
- system -- limits for the system aggregate resource usage.
- transient -- limits for the transient resource usage.
- svc:<service> -- limits for the resource usage of a specific service.
- proto:<proto> -- limits for the resource usage of a specific protocol.
- peer:<peer> -- limits for the resource usage of a specific peer.
The output of this command is JSON.
It is possible to use this command to inspect and tweak limits at runtime:
$ ipfs swarm limit system > limit.json
$ vi limit.json
$ ipfs swarm limit system limit.json
Changes made via command line are persisted in the Swarm.ResourceMgr.Limits field of the $IPFS_PATH/config file.
`},
Arguments: []cmds.Argument{
cmds.StringArg("scope", true, false, "scope of the limit"),
cmds.FileArg("limit.json", false, false, "limits to be set").EnableStdin(),
},
Options: []cmds.Option{
cmds.BoolOption(swarmResetLimitsOptionName, "reset limit to default"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
node, err := cmdenv.GetNode(env)
userResourceOverrides, err := node.Repo.UserResourceOverrides()
if err != nil {
return err
}
if node.ResourceManager == nil {
// FIXME: we shouldn't recompute limits, either save them or load them from libp2p (https://github.com/libp2p/go-libp2p/issues/2166)
limitConfig, _, err := libp2p.LimitConfig(cfg.Swarm, userResourceOverrides)
if err != nil {
return err
}
rapi, ok := node.ResourceManager.(rcmgr.ResourceManagerState)
if !ok { // NullResourceManager
return libp2p.ErrNoResourceMgr
}
scope := req.Arguments[0]
// set scope limit to new values (when limit.json is passed as a second arg)
if req.Files != nil {
var newLimit rcmgr.ResourceLimits
it := req.Files.Entries()
if it.Next() {
file := files.FileFromEntry(it)
if file == nil {
return errors.New("expected a JSON file")
}
r := io.LimitReader(file, 32*1024*1024) // 32MiB
if err := json.NewDecoder(r).Decode(&newLimit); err != nil {
return fmt.Errorf("decoding JSON as ResourceMgrScopeConfig: %w", err)
}
return libp2p.NetSetLimit(node.ResourceManager, node.Repo, scope, newLimit)
}
if err := it.Err(); err != nil {
return fmt.Errorf("error opening limit JSON file: %w", err)
}
}
var result interface{}
switch _, reset := req.Options[swarmResetLimitsOptionName]; {
case reset:
result, err = libp2p.NetResetLimit(node.ResourceManager, node.Repo, scope)
case scope == "all":
result, err = libp2p.NetLimitAll(node.ResourceManager)
default:
// get scope limit
result, err = libp2p.NetLimit(node.ResourceManager, scope)
}
if err != nil {
return err
}
if base, ok := result.(rcmgr.BaseLimit); ok {
result = base.ToResourceLimits()
}
b := new(bytes.Buffer)
enc := json.NewEncoder(b)
err = enc.Encode(result)
if err != nil {
return err
}
return cmds.EmitOnce(res, b)
return cmds.EmitOnce(res, libp2p.MergeLimitsAndStatsIntoLimitsConfigAndUsage(limitConfig, rapi.Stat()))
},
Encoders: cmds.EncoderMap{
cmds.Text: HumanJSONEncoder,
cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, limitsAndUsage libp2p.LimitsConfigAndUsage) error {
return json.NewEncoder(w).Encode(limitsAndUsage)
}),
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, limitsAndUsage libp2p.LimitsConfigAndUsage) error {
tw := tabwriter.NewWriter(w, 20, 8, 0, '\t', 0)
defer tw.Flush()
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t\n", "Scope", "Limit Name", "Limit Value", "Limit Usage Amount", "Limit Usage Percent")
for _, ri := range libp2p.LimitConfigsToInfo(limitsAndUsage) {
var limit, percentage string
switch ri.LimitValue {
case rcmgr.Unlimited64:
limit = "unlimited"
percentage = "n/a"
case rcmgr.BlockAllLimit64:
limit = "blockAll"
percentage = "n/a"
default:
limit = strconv.FormatInt(int64(ri.LimitValue), 10)
if ri.CurrentUsage == 0 {
percentage = "0%"
} else {
percentage = strconv.FormatFloat(float64(ri.CurrentUsage)/float64(ri.LimitValue)*100, 'f', 1, 64) + "%"
}
}
fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%s\t\n",
ri.ScopeName,
ri.LimitName,
limit,
ri.CurrentUsage,
percentage,
)
}
return nil
}),
},
Type: libp2p.LimitsConfigAndUsage{},
}
type streamInfo struct {

View File

@ -6,21 +6,19 @@ import (
"fmt"
"time"
"github.com/dustin/go-humanize"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
util "github.com/ipfs/go-ipfs-util"
"github.com/ipfs/go-log"
uio "github.com/ipfs/go-unixfs/io"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/p2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/p2p"
offline "github.com/ipfs/go-ipfs-exchange-offline"
uio "github.com/ipfs/go-unixfs/io"
"github.com/dustin/go-humanize"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"go.uber.org/fx"
)
@ -37,7 +35,7 @@ var BaseLibP2P = fx.Options(
fx.Invoke(libp2p.PNetChecker),
)
func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
var connmgr fx.Option
// set connmgr based on Swarm.ConnMgr.Type
@ -150,7 +148,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(libp2p.UserAgent()),
// Services (resource management)
fx.Provide(libp2p.ResourceManager(cfg.Swarm)),
fx.Provide(libp2p.ResourceManager(cfg.Swarm, userResourceOverrides)),
fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)),
fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.AppendAnnounce, cfg.Addresses.NoAnnounce)),
fx.Provide(libp2p.SmuxTransport(cfg.Swarm.Transports)),
@ -249,7 +247,7 @@ var IPNS = fx.Options(
)
// Online groups online-only units
func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
// Namesys params
@ -303,7 +301,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(p2p.New),
LibP2P(bcfg, cfg),
LibP2P(bcfg, cfg, userResourceOverrides),
OnlineProviders(
cfg.Experimental.StrategicProviding,
cfg.Experimental.AcceleratedDHTClient,
@ -340,9 +338,9 @@ var Core = fx.Options(
fx.Provide(Files),
)
func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option {
func Networked(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option {
if bcfg.Online {
return Online(bcfg, cfg)
return Online(bcfg, cfg, userResourceOverrides)
}
return Offline(cfg)
}
@ -358,6 +356,11 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
return bcfgOpts // error
}
userResourceOverrides, err := bcfg.Repo.UserResourceOverrides()
if err != nil {
return fx.Error(err)
}
// Auto-sharding settings
shardSizeString := cfg.Internal.UnixFSShardingSizeThreshold.WithDefault("256kiB")
shardSizeInt, err := humanize.ParseBytes(shardSizeString)
@ -381,7 +384,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
Storage(bcfg, cfg),
Identity(cfg),
IPNS,
Networked(bcfg, cfg),
Networked(bcfg, cfg, userResourceOverrides),
Core,
)

View File

@ -2,10 +2,10 @@ package libp2p
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"github.com/benbjohnson/clock"
logging "github.com/ipfs/go-log/v2"
@ -17,21 +17,17 @@ import (
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"golang.org/x/exp/constraints"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
)
// FIXME(@Jorropo): for go-libp2p v0.26.0 use .MustConcrete and .MustBaseLimit instead of .Build(rcmgr.BaseLimit{}).
const NetLimitDefaultFilename = "limit.json"
const NetLimitTraceFilename = "rcmgr.json.gz"
var ErrNoResourceMgr = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled")
func ResourceManager(cfg config.SwarmConfig) interface{} {
func ResourceManager(cfg config.SwarmConfig, userResourceOverrides rcmgr.PartialLimitConfig) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
var manager network.ResourceManager
var opts Libp2pOpts
@ -54,32 +50,25 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {
return nil, opts, fmt.Errorf("opening IPFS_PATH: %w", err)
}
var limitConfig rcmgr.ConcreteLimitConfig
defaultComputedLimitConfig, err := createDefaultLimitConfig(cfg)
limitConfig, msg, err := LimitConfig(cfg, userResourceOverrides)
if err != nil {
return nil, opts, fmt.Errorf("creating final Resource Manager config: %w", err)
}
if !isPartialConfigEmpty(userResourceOverrides) {
fmt.Print(`
libp2p-resource-limit-overrides.json has been loaded, "default" fields will be
filled in with autocomputed defaults.
`)
}
// We want to see this message on startup, that's why we are using fmt instead of log.
fmt.Print(msg)
if err := ensureConnMgrMakeSenseVsResourceMgr(limitConfig, cfg); err != nil {
return nil, opts, err
}
// The logic for defaults and overriding with specified SwarmConfig.ResourceMgr.Limits
// is documented in docs/config.md.
// Any changes here should be reflected there.
if cfg.ResourceMgr.Limits != nil {
userSuppliedOverrideLimitConfig := *cfg.ResourceMgr.Limits
// This effectively overrides the computed default LimitConfig with any non-zero values from cfg.ResourceMgr.Limits.
// Because of how how Apply works, any 0 value for a user supplied override
// will be overriden with a computed default value.
// There currently isn't a way for a user to supply a 0-value override.
limitConfig = userSuppliedOverrideLimitConfig.Build(defaultComputedLimitConfig)
} else {
limitConfig = defaultComputedLimitConfig
}
if err := ensureConnMgrMakeSenseVsResourceMgr(limitConfig, cfg.ConnMgr); err != nil {
return nil, opts, err
}
limiter := rcmgr.NewFixedLimiter(limitConfig)
str, err := rcmgrObs.NewStatsTraceReporter()
if err != nil {
return nil, opts, err
@ -106,6 +95,8 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {
ropts = append(ropts, rcmgr.WithTrace(traceFilePath))
}
limiter := rcmgr.NewFixedLimiter(limitConfig)
manager, err = rcmgr.NewResourceManager(limiter, ropts...)
if err != nil {
return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err)
@ -133,540 +124,363 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {
}
}
type notOmitEmptyResourceLimit struct {
Streams rcmgr.LimitVal
StreamsInbound rcmgr.LimitVal
StreamsOutbound rcmgr.LimitVal
Conns rcmgr.LimitVal
ConnsInbound rcmgr.LimitVal
ConnsOutbound rcmgr.LimitVal
FD rcmgr.LimitVal
Memory rcmgr.LimitVal64
}
func resourceLimitsToNotOmitEmpty(r rcmgr.ResourceLimits) notOmitEmptyResourceLimit {
return notOmitEmptyResourceLimit{
Streams: r.Streams,
StreamsInbound: r.StreamsInbound,
StreamsOutbound: r.StreamsOutbound,
Conns: r.Conns,
ConnsInbound: r.ConnsInbound,
ConnsOutbound: r.ConnsOutbound,
FD: r.FD,
Memory: r.Memory,
}
}
type NetStatOut struct {
System *notOmitEmptyResourceLimit `json:",omitempty"`
Transient *notOmitEmptyResourceLimit `json:",omitempty"`
Services map[string]notOmitEmptyResourceLimit `json:",omitempty"`
Protocols map[string]notOmitEmptyResourceLimit `json:",omitempty"`
Peers map[string]notOmitEmptyResourceLimit `json:",omitempty"`
}
func NetStat(mgr network.ResourceManager, scope string, percentage int) (NetStatOut, error) {
var err error
var result NetStatOut
switch {
case scope == "all":
rapi, ok := mgr.(rcmgr.ResourceManagerState)
if !ok { // NullResourceManager
return result, ErrNoResourceMgr
}
limits, err := NetLimitAll(mgr)
if err != nil {
return result, err
}
stat := rapi.Stat()
if s := scopeToLimit(stat.System); compareLimits(s, *limits.System, percentage) {
result.System = &s
}
if s := scopeToLimit(stat.Transient); compareLimits(s, *limits.Transient, percentage) {
result.Transient = &s
}
if len(stat.Services) > 0 {
result.Services = make(map[string]notOmitEmptyResourceLimit, len(stat.Services))
for srv, s := range stat.Services {
ls := limits.Services[srv]
if stat := scopeToLimit(s); compareLimits(stat, ls, percentage) {
result.Services[srv] = stat
}
}
}
if len(stat.Protocols) > 0 {
result.Protocols = make(map[string]notOmitEmptyResourceLimit, len(stat.Protocols))
for proto, s := range stat.Protocols {
ls := limits.Protocols[string(proto)]
if stat := scopeToLimit(s); compareLimits(stat, ls, percentage) {
result.Protocols[string(proto)] = stat
}
}
}
if len(stat.Peers) > 0 {
result.Peers = make(map[string]notOmitEmptyResourceLimit, len(stat.Peers))
for p, s := range stat.Peers {
ls := limits.Peers[p.Pretty()]
if stat := scopeToLimit(s); compareLimits(stat, ls, percentage) {
result.Peers[p.Pretty()] = stat
}
}
}
return result, nil
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error {
stat := scopeToLimit(s.Stat())
result.System = &stat
return nil
})
return result, err
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error {
stat := scopeToLimit(s.Stat())
result.Transient = &stat
return nil
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
result.Services = map[string]notOmitEmptyResourceLimit{
svc: scopeToLimit(s.Stat()),
}
return nil
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
result.Protocols = map[string]notOmitEmptyResourceLimit{
proto: scopeToLimit(s.Stat()),
}
return nil
})
return result, err
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
pid, err := peer.Decode(p)
if err != nil {
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
result.Peers = map[string]notOmitEmptyResourceLimit{
p: scopeToLimit(s.Stat()),
}
return nil
})
return result, err
default:
return result, fmt.Errorf("invalid scope %q", scope)
}
}
var scopes = []string{
config.ResourceMgrSystemScope,
config.ResourceMgrTransientScope,
config.ResourceMgrServiceScopePrefix,
config.ResourceMgrProtocolScopePrefix,
config.ResourceMgrPeerScopePrefix,
}
func scopeToLimit(s network.ScopeStat) notOmitEmptyResourceLimit {
return notOmitEmptyResourceLimit{
Streams: rcmgr.LimitVal(s.NumStreamsInbound + s.NumStreamsOutbound),
StreamsInbound: rcmgr.LimitVal(s.NumStreamsInbound),
StreamsOutbound: rcmgr.LimitVal(s.NumStreamsOutbound),
Conns: rcmgr.LimitVal(s.NumConnsInbound + s.NumConnsOutbound),
ConnsInbound: rcmgr.LimitVal(s.NumConnsInbound),
ConnsOutbound: rcmgr.LimitVal(s.NumConnsOutbound),
FD: rcmgr.LimitVal(s.NumFD),
Memory: rcmgr.LimitVal64(s.Memory),
}
}
// compareLimits compares stat and limit.
// If any of the stats value are equals or above the specified percentage,
// it returns true.
func compareLimits(stat, limit notOmitEmptyResourceLimit, percentage int) bool {
if abovePercentage(int(stat.Memory), int(limit.Memory), percentage) {
return true
}
if abovePercentage(stat.ConnsInbound, limit.ConnsInbound, percentage) {
return true
}
if abovePercentage(stat.ConnsOutbound, limit.ConnsOutbound, percentage) {
return true
}
if abovePercentage(stat.Conns, limit.Conns, percentage) {
return true
}
if abovePercentage(stat.FD, limit.FD, percentage) {
return true
}
if abovePercentage(stat.StreamsInbound, limit.StreamsInbound, percentage) {
return true
}
if abovePercentage(stat.StreamsOutbound, limit.StreamsOutbound, percentage) {
return true
}
if abovePercentage(stat.Streams, limit.Streams, percentage) {
return true
}
return false
}
func abovePercentage[T constraints.Integer | constraints.Float](v1, v2 T, percentage int) bool {
if percentage == 0 {
return true
}
if v2 == 0 {
func isPartialConfigEmpty(cfg rcmgr.PartialLimitConfig) bool {
var emptyResourceConfig rcmgr.ResourceLimits
if cfg.System != emptyResourceConfig ||
cfg.Transient != emptyResourceConfig ||
cfg.AllowlistedSystem != emptyResourceConfig ||
cfg.AllowlistedTransient != emptyResourceConfig ||
cfg.ServiceDefault != emptyResourceConfig ||
cfg.ServicePeerDefault != emptyResourceConfig ||
cfg.ProtocolDefault != emptyResourceConfig ||
cfg.ProtocolPeerDefault != emptyResourceConfig ||
cfg.PeerDefault != emptyResourceConfig ||
cfg.Conn != emptyResourceConfig ||
cfg.Stream != emptyResourceConfig {
return false
}
return int((float64(v1)/float64(v2))*100) >= percentage
}
func NetLimitAll(mgr network.ResourceManager) (*NetStatOut, error) {
var result = &NetStatOut{}
lister, ok := mgr.(rcmgr.ResourceManagerState)
if !ok { // NullResourceManager
return result, ErrNoResourceMgr
}
for _, s := range scopes {
switch s {
case config.ResourceMgrSystemScope:
s, err := NetLimit(mgr, config.ResourceMgrSystemScope)
if err != nil {
return nil, err
}
result.System = &s
case config.ResourceMgrTransientScope:
s, err := NetLimit(mgr, config.ResourceMgrSystemScope)
if err != nil {
return nil, err
}
result.Transient = &s
case config.ResourceMgrServiceScopePrefix:
result.Services = make(map[string]notOmitEmptyResourceLimit)
for _, serv := range lister.ListServices() {
s, err := NetLimit(mgr, config.ResourceMgrServiceScopePrefix+serv)
if err != nil {
return nil, err
}
result.Services[serv] = s
}
case config.ResourceMgrProtocolScopePrefix:
result.Protocols = make(map[string]notOmitEmptyResourceLimit)
for _, prot := range lister.ListProtocols() {
ps := string(prot)
s, err := NetLimit(mgr, config.ResourceMgrProtocolScopePrefix+ps)
if err != nil {
return nil, err
}
result.Protocols[ps] = s
}
case config.ResourceMgrPeerScopePrefix:
result.Peers = make(map[string]notOmitEmptyResourceLimit)
for _, peer := range lister.ListPeers() {
ps := peer.Pretty()
s, err := NetLimit(mgr, config.ResourceMgrPeerScopePrefix+ps)
if err != nil {
return nil, err
}
result.Peers[ps] = s
}
for _, v := range cfg.Service {
if v != emptyResourceConfig {
return false
}
}
return result, nil
for _, v := range cfg.ServicePeer {
if v != emptyResourceConfig {
return false
}
}
for _, v := range cfg.Protocol {
if v != emptyResourceConfig {
return false
}
}
for _, v := range cfg.ProtocolPeer {
if v != emptyResourceConfig {
return false
}
}
for _, v := range cfg.Peer {
if v != emptyResourceConfig {
return false
}
}
return true
}
func NetLimit(mgr network.ResourceManager, scope string) (notOmitEmptyResourceLimit, error) {
var result rcmgr.ResourceLimits
getLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return ErrNoResourceMgr
// LimitConfig returns the union of the Computed Default Limits and the User Supplied Override Limits.
func LimitConfig(cfg config.SwarmConfig, userResourceOverrides rcmgr.PartialLimitConfig) (limitConfig rcmgr.ConcreteLimitConfig, logMessageForStartup string, err error) {
limitConfig, msg, err := createDefaultLimitConfig(cfg)
if err != nil {
return rcmgr.ConcreteLimitConfig{}, msg, err
}
// The logic for defaults and overriding with specified userResourceOverrides
// is documented in docs/libp2p-resource-management.md.
// Any changes here should be reflected there.
// This effectively overrides the computed default LimitConfig with any non-"useDefault" values from the userResourceOverrides file.
// Because of how how Build works, any rcmgr.Default value in userResourceOverrides
// will be overriden with a computed default value.
limitConfig = userResourceOverrides.Build(limitConfig)
return limitConfig, msg, nil
}
type ResourceLimitsAndUsage struct {
// This is duplicated from rcmgr.ResourceResourceLimits but adding *Usage fields.
Memory rcmgr.LimitVal64
MemoryUsage int64
FD rcmgr.LimitVal
FDUsage int
Conns rcmgr.LimitVal
ConnsUsage int
ConnsInbound rcmgr.LimitVal
ConnsInboundUsage int
ConnsOutbound rcmgr.LimitVal
ConnsOutboundUsage int
Streams rcmgr.LimitVal
StreamsUsage int
StreamsInbound rcmgr.LimitVal
StreamsInboundUsage int
StreamsOutbound rcmgr.LimitVal
StreamsOutboundUsage int
}
func (u ResourceLimitsAndUsage) ToResourceLimits() rcmgr.ResourceLimits {
return rcmgr.ResourceLimits{
Memory: u.Memory,
FD: u.FD,
Conns: u.Conns,
ConnsInbound: u.ConnsInbound,
ConnsOutbound: u.ConnsOutbound,
Streams: u.Streams,
StreamsInbound: u.StreamsInbound,
StreamsOutbound: u.StreamsOutbound,
}
}
type LimitsConfigAndUsage struct {
// This is duplicated from rcmgr.ResourceManagerStat but using ResourceLimitsAndUsage
// instead of network.ScopeStat.
System ResourceLimitsAndUsage `json:",omitempty"`
Transient ResourceLimitsAndUsage `json:",omitempty"`
Services map[string]ResourceLimitsAndUsage `json:",omitempty"`
Protocols map[protocol.ID]ResourceLimitsAndUsage `json:",omitempty"`
Peers map[peer.ID]ResourceLimitsAndUsage `json:",omitempty"`
}
func (u LimitsConfigAndUsage) MarshalJSON() ([]byte, error) {
// we want to marshal the encoded peer id
encodedPeerMap := make(map[string]ResourceLimitsAndUsage, len(u.Peers))
for p, v := range u.Peers {
encodedPeerMap[p.String()] = v
}
type Alias LimitsConfigAndUsage
return json.Marshal(&struct {
*Alias
Peers map[string]ResourceLimitsAndUsage `json:",omitempty"`
}{
Alias: (*Alias)(&u),
Peers: encodedPeerMap,
})
}
func (u LimitsConfigAndUsage) ToPartialLimitConfig() (result rcmgr.PartialLimitConfig) {
result.System = u.System.ToResourceLimits()
result.Transient = u.Transient.ToResourceLimits()
result.Service = make(map[string]rcmgr.ResourceLimits, len(u.Services))
for s, l := range u.Services {
result.Service[s] = l.ToResourceLimits()
}
result.Protocol = make(map[protocol.ID]rcmgr.ResourceLimits, len(u.Protocols))
for p, l := range u.Protocols {
result.Protocol[p] = l.ToResourceLimits()
}
result.Peer = make(map[peer.ID]rcmgr.ResourceLimits, len(u.Peers))
for p, l := range u.Peers {
result.Peer[p] = l.ToResourceLimits()
}
return
}
func MergeLimitsAndStatsIntoLimitsConfigAndUsage(l rcmgr.ConcreteLimitConfig, stats rcmgr.ResourceManagerStat) LimitsConfigAndUsage {
limits := l.ToPartialLimitConfig()
return LimitsConfigAndUsage{
System: mergeResourceLimitsAndScopeStatToResourceLimitsAndUsage(limits.System, stats.System),
Transient: mergeResourceLimitsAndScopeStatToResourceLimitsAndUsage(limits.Transient, stats.Transient),
Services: mergeLimitsAndStatsMapIntoLimitsConfigAndUsageMap(limits.Service, stats.Services),
Protocols: mergeLimitsAndStatsMapIntoLimitsConfigAndUsageMap(limits.Protocol, stats.Protocols),
Peers: mergeLimitsAndStatsMapIntoLimitsConfigAndUsageMap(limits.Peer, stats.Peers),
}
}
func mergeLimitsAndStatsMapIntoLimitsConfigAndUsageMap[K comparable](limits map[K]rcmgr.ResourceLimits, stats map[K]network.ScopeStat) map[K]ResourceLimitsAndUsage {
r := make(map[K]ResourceLimitsAndUsage, maxInt(len(limits), len(stats)))
for p, s := range stats {
var l rcmgr.ResourceLimits
if limits != nil {
if rl, ok := limits[p]; ok {
l = rl
}
}
r[p] = mergeResourceLimitsAndScopeStatToResourceLimitsAndUsage(l, s)
}
for p, s := range limits {
if _, ok := stats[p]; ok {
continue // we already processed this element in the loop above
}
switch limit := limiter.Limit(); l := limit.(type) {
case *rcmgr.BaseLimit:
result = l.ToResourceLimits()
case rcmgr.BaseLimit:
result = l.ToResourceLimits()
default:
return fmt.Errorf("unknown limit type %T", limit)
r[p] = mergeResourceLimitsAndScopeStatToResourceLimitsAndUsage(s, network.ScopeStat{})
}
return r
}
func maxInt(x, y int) int {
if x > y {
return x
}
return y
}
func mergeResourceLimitsAndScopeStatToResourceLimitsAndUsage(rl rcmgr.ResourceLimits, ss network.ScopeStat) ResourceLimitsAndUsage {
return ResourceLimitsAndUsage{
Memory: rl.Memory,
MemoryUsage: ss.Memory,
FD: rl.FD,
FDUsage: ss.NumFD,
Conns: rl.Conns,
ConnsUsage: ss.NumConnsOutbound + ss.NumConnsInbound,
ConnsOutbound: rl.ConnsOutbound,
ConnsOutboundUsage: ss.NumConnsOutbound,
ConnsInbound: rl.ConnsInbound,
ConnsInboundUsage: ss.NumConnsInbound,
Streams: rl.Streams,
StreamsUsage: ss.NumStreamsOutbound + ss.NumConnsInbound,
StreamsOutbound: rl.StreamsOutbound,
StreamsOutboundUsage: ss.NumConnsOutbound,
StreamsInbound: rl.StreamsInbound,
StreamsInboundUsage: ss.NumConnsInbound,
}
}
type ResourceInfos []ResourceInfo
type ResourceInfo struct {
ScopeName string
LimitName string
LimitValue rcmgr.LimitVal64
CurrentUsage int64
}
// LimitConfigsToInfo gets limits and stats and generates a list of scopes and limits to be printed.
func LimitConfigsToInfo(stats LimitsConfigAndUsage) ResourceInfos {
result := ResourceInfos{}
result = append(result, resourceLimitsAndUsageToResourceInfo(config.ResourceMgrSystemScope, stats.System)...)
result = append(result, resourceLimitsAndUsageToResourceInfo(config.ResourceMgrTransientScope, stats.Transient)...)
for i, s := range stats.Services {
result = append(result, resourceLimitsAndUsageToResourceInfo(
config.ResourceMgrServiceScopePrefix+i,
s,
)...)
}
for i, p := range stats.Protocols {
result = append(result, resourceLimitsAndUsageToResourceInfo(
config.ResourceMgrProtocolScopePrefix+string(i),
p,
)...)
}
for i, p := range stats.Peers {
result = append(result, resourceLimitsAndUsageToResourceInfo(
config.ResourceMgrPeerScopePrefix+i.Pretty(),
p,
)...)
}
return result
}
const (
limitNameMemory = "Memory"
limitNameFD = "FD"
limitNameConns = "Conns"
limitNameConnsInbound = "ConnsInbound"
limitNameConnsOutbound = "ConnsOutbound"
limitNameStreams = "Streams"
limitNameStreamsInbound = "StreamsInbound"
limitNameStreamsOutbound = "StreamsOutbound"
)
var limits = []string{
limitNameMemory,
limitNameFD,
limitNameConns,
limitNameConnsInbound,
limitNameConnsOutbound,
limitNameStreams,
limitNameStreamsInbound,
limitNameStreamsOutbound,
}
func resourceLimitsAndUsageToResourceInfo(scopeName string, stats ResourceLimitsAndUsage) ResourceInfos {
result := ResourceInfos{}
for _, l := range limits {
ri := ResourceInfo{
ScopeName: scopeName,
}
switch l {
case limitNameMemory:
ri.LimitName = limitNameMemory
ri.LimitValue = stats.Memory
ri.CurrentUsage = stats.MemoryUsage
case limitNameFD:
ri.LimitName = limitNameFD
ri.LimitValue = rcmgr.LimitVal64(stats.FD)
ri.CurrentUsage = int64(stats.FDUsage)
case limitNameConns:
ri.LimitName = limitNameConns
ri.LimitValue = rcmgr.LimitVal64(stats.Conns)
ri.CurrentUsage = int64(stats.ConnsUsage)
case limitNameConnsInbound:
ri.LimitName = limitNameConnsInbound
ri.LimitValue = rcmgr.LimitVal64(stats.ConnsInbound)
ri.CurrentUsage = int64(stats.ConnsInboundUsage)
case limitNameConnsOutbound:
ri.LimitName = limitNameConnsOutbound
ri.LimitValue = rcmgr.LimitVal64(stats.ConnsOutbound)
ri.CurrentUsage = int64(stats.ConnsOutboundUsage)
case limitNameStreams:
ri.LimitName = limitNameStreams
ri.LimitValue = rcmgr.LimitVal64(stats.Streams)
ri.CurrentUsage = int64(stats.StreamsUsage)
case limitNameStreamsInbound:
ri.LimitName = limitNameStreamsInbound
ri.LimitValue = rcmgr.LimitVal64(stats.StreamsInbound)
ri.CurrentUsage = int64(stats.StreamsInboundUsage)
case limitNameStreamsOutbound:
ri.LimitName = limitNameStreamsOutbound
ri.LimitValue = rcmgr.LimitVal64(stats.StreamsOutbound)
ri.CurrentUsage = int64(stats.StreamsOutboundUsage)
}
if ri.LimitValue == rcmgr.Unlimited64 || ri.LimitValue == rcmgr.DefaultLimit64 {
// ignore unlimited and unset limits to remove noise from output.
continue
}
result = append(result, ri)
}
return result
}
func ensureConnMgrMakeSenseVsResourceMgr(concreteLimits rcmgr.ConcreteLimitConfig, cfg config.SwarmConfig) error {
if cfg.ConnMgr.Type.WithDefault(config.DefaultConnMgrType) == "none" || len(cfg.ResourceMgr.Allowlist) != 0 {
// no connmgr OR
// If an allowlist is set, a user may be enacting some form of DoS defense.
// We don't want want to modify the System.ConnsInbound in that case for example
// as it may make sense for it to be (and stay) as "blockAll"
// so that only connections within the allowlist of multiaddrs get established.
return nil
}
var err error
switch {
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error { return getLimit(s) })
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
var pid peer.ID
pid, err = peer.Decode(p)
if err != nil {
return notOmitEmptyResourceLimit{}, fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return getLimit(s) })
default:
err = fmt.Errorf("invalid scope %q", scope)
}
return resourceLimitsToNotOmitEmpty(result), err
}
rcm := concreteLimits.ToPartialLimitConfig()
// NetSetLimit sets new ResourceManager limits for the given scope. The limits take effect immediately, and are also persisted to the repo config.
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.ResourceLimits) error {
setLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return ErrNoResourceMgr
}
l := rcmgr.InfiniteLimits.ToPartialLimitConfig().System
limiter.SetLimit(limit.Build(l.Build(rcmgr.BaseLimit{})))
return nil
}
cfg, err := repo.Config()
if err != nil {
return fmt.Errorf("reading config to set limit: %w", err)
}
if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
}
configLimits := cfg.Swarm.ResourceMgr.Limits
var setConfigFunc func()
switch {
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error { return setLimit(s) })
setConfigFunc = func() { configLimits.System = limit }
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error { return setLimit(s) })
setConfigFunc = func() { configLimits.Transient = limit }
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Service == nil {
configLimits.Service = map[string]rcmgr.ResourceLimits{}
}
configLimits.Service[svc] = limit
}
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Protocol == nil {
configLimits.Protocol = map[protocol.ID]rcmgr.ResourceLimits{}
}
configLimits.Protocol[protocol.ID(proto)] = limit
}
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
var pid peer.ID
pid, err = peer.Decode(p)
if err != nil {
return fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Peer == nil {
configLimits.Peer = map[peer.ID]rcmgr.ResourceLimits{}
}
configLimits.Peer[pid] = limit
}
default:
return fmt.Errorf("invalid scope %q", scope)
}
if err != nil {
return fmt.Errorf("setting new limits on resource manager: %w", err)
}
if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
}
setConfigFunc()
if err := repo.SetConfig(cfg); err != nil {
return fmt.Errorf("writing new limits to repo config: %w", err)
}
return nil
}
// NetResetLimit resets ResourceManager limits to defaults. The limits take effect immediately, and are also persisted to the repo config.
func NetResetLimit(mgr network.ResourceManager, repo repo.Repo, scope string) (rcmgr.BaseLimit, error) {
var result rcmgr.BaseLimit
setLimit := func(s network.ResourceScope, l rcmgr.Limit) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok {
return ErrNoResourceMgr
}
limiter.SetLimit(l)
return nil
}
cfg, err := repo.Config()
if err != nil {
return rcmgr.BaseLimit{}, fmt.Errorf("reading config to reset limit: %w", err)
}
defaultsOrig, err := createDefaultLimitConfig(cfg.Swarm)
if err != nil {
return rcmgr.BaseLimit{}, fmt.Errorf("creating default limit config: %w", err)
}
defaults := defaultsOrig.ToPartialLimitConfig()
// INVESTIGATE(@Jorropo): Why do we save scaled configs in the repo ?
if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
}
configLimits := cfg.Swarm.ResourceMgr.Limits
var setConfigFunc func() rcmgr.BaseLimit
switch {
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error { return setLimit(s, defaults.System.Build(rcmgr.BaseLimit{})) })
setConfigFunc = func() rcmgr.BaseLimit {
configLimits.System = defaults.System
return defaults.System.Build(rcmgr.BaseLimit{})
}
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error { return setLimit(s, defaults.Transient.Build(rcmgr.BaseLimit{})) })
setConfigFunc = func() rcmgr.BaseLimit {
configLimits.Transient = defaults.Transient
return defaults.Transient.Build(rcmgr.BaseLimit{})
}
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
return setLimit(s, defaults.ServiceDefault.Build(rcmgr.BaseLimit{}))
})
setConfigFunc = func() rcmgr.BaseLimit {
if configLimits.Service == nil {
configLimits.Service = map[string]rcmgr.ResourceLimits{}
}
configLimits.Service[svc] = defaults.ServiceDefault
return defaults.ServiceDefault.Build(rcmgr.BaseLimit{})
}
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return setLimit(s, defaults.ProtocolDefault.Build(rcmgr.BaseLimit{}))
})
setConfigFunc = func() rcmgr.BaseLimit {
if configLimits.Protocol == nil {
configLimits.Protocol = map[protocol.ID]rcmgr.ResourceLimits{}
}
configLimits.Protocol[protocol.ID(proto)] = defaults.ProtocolDefault
return defaults.ProtocolDefault.Build(rcmgr.BaseLimit{})
}
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
var pid peer.ID
pid, err = peer.Decode(p)
if err != nil {
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return setLimit(s, defaults.PeerDefault.Build(rcmgr.BaseLimit{})) })
setConfigFunc = func() rcmgr.BaseLimit {
if configLimits.Peer == nil {
configLimits.Peer = map[peer.ID]rcmgr.ResourceLimits{}
}
configLimits.Peer[pid] = defaults.PeerDefault
return defaults.PeerDefault.Build(rcmgr.BaseLimit{})
}
default:
return result, fmt.Errorf("invalid scope %q", scope)
}
if err != nil {
return result, fmt.Errorf("resetting new limits on resource manager: %w", err)
}
result = setConfigFunc()
if err := repo.SetConfig(cfg); err != nil {
return result, fmt.Errorf("writing new limits to repo config: %w", err)
}
return result, nil
}
func ensureConnMgrMakeSenseVsResourceMgr(orig rcmgr.ConcreteLimitConfig, cmgr config.ConnMgr) error {
if cmgr.Type.WithDefault(config.DefaultConnMgrType) == "none" {
return nil // none connmgr, no checks to do
}
rcm := orig.ToPartialLimitConfig()
highWater := cmgr.HighWater.WithDefault(config.DefaultConnMgrHighWater)
if rcm.System.ConnsInbound <= rcm.System.Conns {
if int64(rcm.System.ConnsInbound) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting limit configuration:
ResourceMgr.Limits.System.ConnsInbound (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.ConnsInbound, highWater)
}
} else if int64(rcm.System.Conns) <= highWater {
highWater := cfg.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater)
if rcm.System.Conns != rcmgr.Unlimited && int64(rcm.System.Conns) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting limit configuration:
ResourceMgr.Limits.System.Conns (%d) must be bigger than ConnMgr.HighWater (%d)
Unable to initialize libp2p due to conflicting resource manager limit configuration.
resource manager System.Conns (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.Conns, highWater)
}
if rcm.System.StreamsInbound <= rcm.System.Streams {
if int64(rcm.System.StreamsInbound) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting limit configuration:
ResourceMgr.Limits.System.StreamsInbound (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.StreamsInbound, highWater)
}
} else if int64(rcm.System.Streams) <= highWater {
if rcm.System.ConnsInbound != rcmgr.Unlimited && int64(rcm.System.ConnsInbound) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting limit configuration:
ResourceMgr.Limits.System.Streams (%d) must be bigger than ConnMgr.HighWater (%d)
Unable to initialize libp2p due to conflicting resource manager limit configuration.
resource manager System.ConnsInbound (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.ConnsInbound, highWater)
}
if rcm.System.Streams != rcmgr.Unlimited && int64(rcm.System.Streams) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting resource manager limit configuration.
resource manager System.Streams (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.Streams, highWater)
}
if rcm.System.StreamsInbound != rcmgr.Unlimited && int64(rcm.System.StreamsInbound) <= highWater {
// nolint
return fmt.Errorf(`
Unable to initialize libp2p due to conflicting resource manager limit configuration.
resource manager System.StreamsInbound (%d) must be bigger than ConnMgr.HighWater (%d)
`, rcm.System.StreamsInbound, highWater)
}
return nil
}

View File

@ -4,70 +4,31 @@ import (
"fmt"
"github.com/dustin/go-humanize"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/libp2p/fd"
"github.com/libp2p/go-libp2p"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/pbnjay/memory"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/libp2p/fd"
)
// We are doing some magic when parsing config files (we are using a map[string]interface{} to compare config files).
// When you don't have a type the JSON Parse function cast numbers to float64 by default,
// losing precision when writing the final number. So if we use math.MaxInt as our infinite number,
// after writing the config file we will have 9223372036854776000 instead of 9223372036854775807,
// making the parsing process fail. Setting 1e9 (1000000000) as "no limit" value. It also avoids to overflow on 32 bit architectures.
const bigEnough = 1e9
var infiniteBaseLimit = rcmgr.BaseLimit{
Streams: bigEnough,
StreamsInbound: bigEnough,
StreamsOutbound: bigEnough,
Conns: bigEnough,
ConnsInbound: bigEnough,
ConnsOutbound: bigEnough,
FD: bigEnough,
Memory: bigEnough,
}
var noLimitIncrease = rcmgr.BaseLimitIncrease{
ConnsInbound: 0,
ConnsOutbound: 0,
Conns: 0,
StreamsInbound: 0,
StreamsOutbound: 0,
Streams: 0,
Memory: 0,
FDFraction: 0,
}
var infiniteResourceLimits = rcmgr.InfiniteLimits.ToPartialLimitConfig().System
// This file defines implicit limit defaults used when Swarm.ResourceMgr.Enabled
// createDefaultLimitConfig creates LimitConfig to pass to libp2p's resource manager.
// The defaults follow the documentation in docs/libp2p-resource-management.md.
// Any changes in the logic here should be reflected there.
func createDefaultLimitConfig(cfg config.SwarmConfig) (rcmgr.ConcreteLimitConfig, error) {
func createDefaultLimitConfig(cfg config.SwarmConfig) (limitConfig rcmgr.ConcreteLimitConfig, logMessageForStartup string, err error) {
maxMemoryDefaultString := humanize.Bytes(uint64(memory.TotalMemory()) / 2)
maxMemoryString := cfg.ResourceMgr.MaxMemory.WithDefault(maxMemoryDefaultString)
maxMemory, err := humanize.ParseBytes(maxMemoryString)
if err != nil {
return rcmgr.ConcreteLimitConfig{}, err
return rcmgr.ConcreteLimitConfig{}, "", err
}
maxMemoryMB := maxMemory / (1024 * 1024)
maxFD := int(cfg.ResourceMgr.MaxFileDescriptors.WithDefault(int64(fd.GetNumFDs()) / 2))
// We want to see this message on startup, that's why we are using fmt instead of log.
fmt.Printf(`
Computing default go-libp2p Resource Manager limits based on:
- 'Swarm.ResourceMgr.MaxMemory': %q
- 'Swarm.ResourceMgr.MaxFileDescriptors': %d
Applying any user-supplied overrides on top.
Run 'ipfs swarm limit all' to see the resulting limits.
`, maxMemoryString, maxFD)
// At least as of 2023-01-25, it's possible to open a connection that
// doesn't ask for any memory usage with the libp2p Resource Manager/Accountant
// (see https://github.com/libp2p/go-libp2p/issues/2010#issuecomment-1404280736).
@ -79,109 +40,86 @@ Run 'ipfs swarm limit all' to see the resulting limits.
// (see https://github.com/libp2p/go-libp2p/blob/master/p2p/host/resource-manager/limit_defaults.go#L357 ).
systemConnsInbound := int(1 * maxMemoryMB)
scalingLimitConfig := rcmgr.ScalingLimitConfig{
SystemBaseLimit: rcmgr.BaseLimit{
Memory: int64(maxMemory),
FD: maxFD,
partialLimits := rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory),
FD: rcmgr.LimitVal(maxFD),
// By default, we just limit connections on the inbound side.
Conns: bigEnough,
ConnsInbound: systemConnsInbound,
ConnsOutbound: bigEnough,
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound),
ConnsOutbound: rcmgr.Unlimited,
Streams: bigEnough,
StreamsInbound: bigEnough,
StreamsOutbound: bigEnough,
Streams: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
},
SystemLimitIncrease: noLimitIncrease,
// Transient connections won't cause any memory to accounted for by the resource manager.
// Transient connections won't cause any memory to be accounted for by the resource manager/accountant.
// Only established connections do.
// As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened.
// We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope.
TransientBaseLimit: rcmgr.BaseLimit{
Memory: int64(maxMemory / 4),
FD: maxFD / 4,
Transient: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory / 4),
FD: rcmgr.LimitVal(maxFD / 4),
Conns: bigEnough,
ConnsInbound: systemConnsInbound / 4,
ConnsOutbound: bigEnough,
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound / 4),
ConnsOutbound: rcmgr.Unlimited,
Streams: bigEnough,
StreamsInbound: bigEnough,
StreamsOutbound: bigEnough,
Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
},
TransientLimitIncrease: noLimitIncrease,
// Lets get out of the way of the allow list functionality.
// If someone specified "Swarm.ResourceMgr.Allowlist" we should let it go through.
AllowlistedSystemBaseLimit: infiniteBaseLimit,
AllowlistedSystemLimitIncrease: noLimitIncrease,
AllowlistedSystem: infiniteResourceLimits,
AllowlistedTransientBaseLimit: infiniteBaseLimit,
AllowlistedTransientLimitIncrease: noLimitIncrease,
AllowlistedTransient: infiniteResourceLimits,
// Keep it simple by not having Service, ServicePeer, Protocol, ProtocolPeer, Conn, or Stream limits.
ServiceBaseLimit: infiniteBaseLimit,
ServiceLimitIncrease: noLimitIncrease,
ServiceDefault: infiniteResourceLimits,
ServicePeerBaseLimit: infiniteBaseLimit,
ServicePeerLimitIncrease: noLimitIncrease,
ServicePeerDefault: infiniteResourceLimits,
ProtocolBaseLimit: infiniteBaseLimit,
ProtocolLimitIncrease: noLimitIncrease,
ProtocolDefault: infiniteResourceLimits,
ProtocolPeerBaseLimit: infiniteBaseLimit,
ProtocolPeerLimitIncrease: noLimitIncrease,
ProtocolPeerDefault: infiniteResourceLimits,
ConnBaseLimit: infiniteBaseLimit,
ConnLimitIncrease: noLimitIncrease,
Conn: infiniteResourceLimits,
StreamBaseLimit: infiniteBaseLimit,
StreamLimitIncrease: noLimitIncrease,
Stream: infiniteResourceLimits,
// Limit the resources consumed by a peer.
// This doesn't protect us against intentional DoS attacks since an attacker can easily spin up multiple peers.
// We specify this limit against unintentional DoS attacks (e.g., a peer has a bug and is sending too much traffic intentionally).
// In that case we want to keep that peer's resource consumption contained.
// To keep this simple, we only constrain inbound connections and streams.
PeerBaseLimit: rcmgr.BaseLimit{
Memory: bigEnough,
FD: bigEnough,
Conns: bigEnough,
ConnsInbound: rcmgr.DefaultLimits.PeerBaseLimit.ConnsInbound,
ConnsOutbound: bigEnough,
Streams: bigEnough,
StreamsInbound: rcmgr.DefaultLimits.PeerBaseLimit.StreamsInbound,
StreamsOutbound: bigEnough,
},
// Most limits don't see an increase because they're already infinite/bigEnough.
// The values that should scale based on the amount of memory allocated to libp2p need to increase accordingly.
PeerLimitIncrease: rcmgr.BaseLimitIncrease{
Memory: 0,
FDFraction: 0,
Conns: 0,
ConnsInbound: rcmgr.DefaultLimits.PeerLimitIncrease.ConnsInbound,
ConnsOutbound: 0,
Streams: 0,
StreamsInbound: rcmgr.DefaultLimits.PeerLimitIncrease.StreamsInbound,
StreamsOutbound: 0,
PeerDefault: rcmgr.ResourceLimits{
Memory: rcmgr.Unlimited64,
FD: rcmgr.Unlimited,
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.DefaultLimit,
ConnsOutbound: rcmgr.Unlimited,
Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.DefaultLimit,
StreamsOutbound: rcmgr.Unlimited,
},
}
// Whatever limits libp2p has specifically tuned for its protocols/services we'll apply.
scalingLimitConfig := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&scalingLimitConfig)
orig := scalingLimitConfig.Scale(int64(maxMemory), maxFD)
defaultLimitConfig := orig.ToPartialLimitConfig()
// Anything set above in partialLimits that had a value of rcmgr.DefaultLimit will be overridden.
// Anything in scalingLimitConfig that wasn't defined in partialLimits above will be added (e.g., libp2p's default service limits).
partialLimits = partialLimits.Build(scalingLimitConfig.Scale(int64(maxMemory), maxFD)).ToPartialLimitConfig()
// Simple checks to overide autoscaling ensuring limits make sense versus the connmgr values.
// There are ways to break this, but this should catch most problems already.
// We might improve this in the future.
// See: https://github.com/ipfs/kubo/issues/9545
if cfg.ConnMgr.Type.WithDefault(config.DefaultConnMgrType) != "none" {
maxInboundConns := int64(defaultLimitConfig.System.ConnsInbound)
if partialLimits.System.ConnsInbound != rcmgr.Unlimited && cfg.ConnMgr.Type.WithDefault(config.DefaultConnMgrType) != "none" {
maxInboundConns := int64(partialLimits.System.ConnsInbound)
if connmgrHighWaterTimesTwo := cfg.ConnMgr.HighWater.WithDefault(config.DefaultConnMgrHighWater) * 2; maxInboundConns < connmgrHighWaterTimesTwo {
maxInboundConns = connmgrHighWaterTimesTwo
}
@ -191,9 +129,19 @@ Run 'ipfs swarm limit all' to see the resulting limits.
}
// Scale System.StreamsInbound as well, but use the existing ratio of StreamsInbound to ConnsInbound
defaultLimitConfig.System.StreamsInbound = rcmgr.LimitVal(maxInboundConns * int64(defaultLimitConfig.System.StreamsInbound) / int64(defaultLimitConfig.System.ConnsInbound))
defaultLimitConfig.System.ConnsInbound = rcmgr.LimitVal(maxInboundConns)
partialLimits.System.StreamsInbound = rcmgr.LimitVal(maxInboundConns * int64(partialLimits.System.StreamsInbound) / int64(partialLimits.System.ConnsInbound))
partialLimits.System.ConnsInbound = rcmgr.LimitVal(maxInboundConns)
}
return defaultLimitConfig.Build(orig), nil
msg := fmt.Sprintf(`
Computed default go-libp2p Resource Manager limits based on:
- 'Swarm.ResourceMgr.MaxMemory': %q
- 'Swarm.ResourceMgr.MaxFileDescriptors': %d
Theses can be inspected with 'ipfs swarm resources'.
`, maxMemoryString, maxFD)
// We already have a complete value thus pass in an empty ConcreteLimitConfig.
return partialLimits.Build(rcmgr.ConcreteLimitConfig{}), msg, nil
}

View File

@ -1,12 +0,0 @@
package libp2p
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestPercentage(t *testing.T) {
require.True(t, abovePercentage(10, 100, 10))
require.True(t, abovePercentage(100, 100, 99))
}

View File

@ -14,7 +14,8 @@ import (
// RepoConfig loads configuration from the repo
func RepoConfig(repo repo.Repo) (*config.Config, error) {
return repo.Config()
cfg, err := repo.Config()
return cfg, err
}
// Datastore provides the datastore

2
go.mod
View File

@ -106,7 +106,6 @@ require (
go.uber.org/fx v1.18.2
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.5.0
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2
golang.org/x/mod v0.7.0
golang.org/x/sync v0.1.0
golang.org/x/sys v0.5.0
@ -228,6 +227,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/term v0.5.0 // indirect

View File

@ -16,6 +16,7 @@ import (
repo "github.com/ipfs/kubo/repo"
"github.com/ipfs/kubo/repo/common"
dir "github.com/ipfs/kubo/thirdparty/dir"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
ds "github.com/ipfs/go-datastore"
measure "github.com/ipfs/go-ds-measure"
@ -102,11 +103,12 @@ type FSRepo struct {
configFilePath string
// lockfile is the file system lock to prevent others from opening
// the same fsrepo path concurrently
lockfile io.Closer
config *config.Config
ds repo.Datastore
keystore keystore.Keystore
filemgr *filestore.FileManager
lockfile io.Closer
config *config.Config
userResourceOverrides rcmgr.PartialLimitConfig
ds repo.Datastore
keystore keystore.Keystore
filemgr *filestore.FileManager
}
var _ repo.Repo = (*FSRepo)(nil)
@ -180,6 +182,10 @@ func open(repoPath string, userConfigFilePath string) (repo.Repo, error) {
return nil, err
}
if err := r.openUserResourceOverrides(); err != nil {
return nil, err
}
if err := r.openDatastore(); err != nil {
return nil, err
}
@ -437,6 +443,17 @@ func (r *FSRepo) openConfig() error {
return nil
}
// openUserResourceOverrides will remove all overrides if the file is not present.
// It will error if the decoding fails.
func (r *FSRepo) openUserResourceOverrides() error {
// This filepath is documented in docs/libp2p-resource-management.md and be kept in sync.
err := serialize.ReadConfigFile(filepath.Join(r.path, "libp2p-resource-limit-overrides.json"), &r.userResourceOverrides)
if err == serialize.ErrNotInitialized {
err = nil
}
return err
}
func (r *FSRepo) openKeystore() error {
ksp := filepath.Join(r.path, "keystore")
ks, err := keystore.NewFSKeystore(ksp)
@ -554,6 +571,21 @@ func (r *FSRepo) Config() (*config.Config, error) {
return r.config, nil
}
func (r *FSRepo) UserResourceOverrides() (rcmgr.PartialLimitConfig, error) {
// It is not necessary to hold the package lock since the repo is in an
// opened state. The package lock is _not_ meant to ensure that the repo is
// thread-safe. The package lock is only meant to guard against removal and
// coordinate the lockfile. However, we provide thread-safety to keep
// things simple.
packageLock.Lock()
defer packageLock.Unlock()
if r.closed {
return rcmgr.PartialLimitConfig{}, errors.New("cannot access config, repo not open")
}
return r.userResourceOverrides, nil
}
func (r *FSRepo) FileManager() *filestore.FileManager {
return r.filemgr
}

View File

@ -7,6 +7,7 @@ import (
filestore "github.com/ipfs/go-filestore"
keystore "github.com/ipfs/go-ipfs-keystore"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
config "github.com/ipfs/kubo/config"
ma "github.com/multiformats/go-multiaddr"
@ -26,6 +27,10 @@ func (m *Mock) Config() (*config.Config, error) {
return &m.C, nil // FIXME threadsafety
}
func (m *Mock) UserResourceOverrides() (rcmgr.PartialLimitConfig, error) {
return rcmgr.PartialLimitConfig{}, nil
}
func (m *Mock) SetConfig(updated *config.Config) error {
m.C = *updated // FIXME threadsafety
return nil

View File

@ -8,6 +8,7 @@ import (
filestore "github.com/ipfs/go-filestore"
keystore "github.com/ipfs/go-ipfs-keystore"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
ds "github.com/ipfs/go-datastore"
config "github.com/ipfs/kubo/config"
@ -24,6 +25,10 @@ type Repo interface {
// to the returned config are not automatically persisted.
Config() (*config.Config, error)
// UserResourceOverrides returns optional user resource overrides for the
// libp2p resource manager.
UserResourceOverrides() (rcmgr.PartialLimitConfig, error)
// BackupConfig creates a backup of the current configuration file using
// the given prefix for naming.
BackupConfig(prefix string) (string, error)

View File

@ -88,6 +88,7 @@ func TestAllSubcommandsAcceptHelp(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode()
for _, cmd := range node.IPFSCommands() {
cmd := cmd
t.Run(fmt.Sprintf("command %q accepts help", cmd), func(t *testing.T) {
t.Parallel()
splitCmd := strings.Split(cmd, " ")[1:]

View File

@ -20,6 +20,7 @@ import (
"github.com/ipfs/kubo/config"
serial "github.com/ipfs/kubo/config/serialize"
"github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
@ -96,6 +97,38 @@ func (n *Node) UpdateConfig(f func(cfg *config.Config)) {
n.WriteConfig(cfg)
}
func (n *Node) ReadUserResourceOverrides() *rcmgr.PartialLimitConfig {
var r rcmgr.PartialLimitConfig
err := serial.ReadConfigFile(filepath.Join(n.Dir, "libp2p-resource-limit-overrides.json"), &r)
switch err {
case nil, serial.ErrNotInitialized:
return &r
default:
panic(err)
}
}
func (n *Node) WriteUserSuppliedResourceOverrides(c *rcmgr.PartialLimitConfig) {
err := serial.WriteConfigFile(filepath.Join(n.Dir, "libp2p-resource-limit-overrides.json"), c)
if err != nil {
panic(err)
}
}
func (n *Node) UpdateUserSuppliedResourceManagerOverrides(f func(overrides *rcmgr.PartialLimitConfig)) {
overrides := n.ReadUserResourceOverrides()
f(overrides)
n.WriteUserSuppliedResourceOverrides(overrides)
}
func (n *Node) UpdateConfigAndUserSuppliedResourceManagerOverrides(f func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig)) {
overrides := n.ReadUserResourceOverrides()
cfg := n.ReadConfig()
f(cfg, overrides)
n.WriteConfig(cfg)
n.WriteUserSuppliedResourceOverrides(overrides)
}
func (n *Node) IPFS(args ...string) RunResult {
res := n.RunIPFS(args...)
n.Runner.AssertNoError(res)

View File

@ -7,6 +7,8 @@ import (
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -24,19 +26,14 @@ func TestRcmgr(t *testing.T) {
node.StartDaemon()
t.Run("swarm limit should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "system")
t.Run("swarm resources should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "resources")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr")
})
t.Run("swarm stats should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "stats", "all")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr")
assert.Contains(t, res.Stderr.String(), "missing ResourceMgr")
})
})
t.Run("Node in offline mode", func(t *testing.T) {
t.Run("Node with resource manager disabled", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
@ -44,15 +41,10 @@ func TestRcmgr(t *testing.T) {
})
node.StartDaemon()
t.Run("swarm limit should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "system")
t.Run("swarm resources should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "resources")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr")
})
t.Run("swarm stats should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "stats", "all")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], "missing ResourceMgr")
assert.Contains(t, res.Stderr.String(), "missing ResourceMgr")
})
})
@ -63,12 +55,14 @@ func TestRcmgr(t *testing.T) {
})
node.StartDaemon()
res := node.RunIPFS("swarm", "limit", "system", "--enc=json")
res := node.RunIPFS("swarm", "resources", "--enc=json")
require.Equal(t, 0, res.ExitCode())
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.GreaterOrEqual(t, limits.ConnsInbound, 2000)
assert.GreaterOrEqual(t, limits.StreamsInbound, 2000)
rl := limits.System.ToResourceLimits()
s := rl.Build(rcmgr.BaseLimit{})
assert.GreaterOrEqual(t, s.ConnsInbound, 2000)
assert.GreaterOrEqual(t, s.StreamsInbound, 2000)
})
t.Run("default configuration", func(t *testing.T) {
@ -80,176 +74,102 @@ func TestRcmgr(t *testing.T) {
node.StartDaemon()
t.Run("conns and streams are above 800 for default connmgr settings", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "system", "--enc=json")
res := node.RunIPFS("swarm", "resources", "--enc=json")
require.Equal(t, 0, res.ExitCode())
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.GreaterOrEqual(t, limits.ConnsInbound, 800)
assert.GreaterOrEqual(t, limits.StreamsInbound, 800)
if limits.System.ConnsInbound != rcmgr.Unlimited {
assert.GreaterOrEqual(t, limits.System.ConnsInbound, 800)
}
if limits.System.StreamsInbound != rcmgr.Unlimited {
assert.GreaterOrEqual(t, limits.System.StreamsInbound, 800)
}
})
t.Run("limits|stats should succeed", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "all")
t.Run("limits should succeed", func(t *testing.T) {
res := node.RunIPFS("swarm", "resources", "--enc=json")
assert.Equal(t, 0, res.ExitCode())
limits := map[string]rcmgr.ResourceLimits{}
limits := rcmgr.PartialLimitConfig{}
err := json.Unmarshal(res.Stdout.Bytes(), &limits)
require.NoError(t, err)
assert.Greater(t, limits["System"].Memory, int64(0))
assert.Greater(t, limits["System"].FD, 0)
assert.Greater(t, limits["System"].Conns, 0)
assert.Greater(t, limits["System"].ConnsInbound, 0)
assert.Greater(t, limits["System"].ConnsOutbound, 0)
assert.Greater(t, limits["System"].Streams, 0)
assert.Greater(t, limits["System"].StreamsInbound, 0)
assert.Greater(t, limits["System"].StreamsOutbound, 0)
assert.Greater(t, limits["Transient"].Memory, int64(0))
})
t.Run("resetting limits should produce the same default limits", func(t *testing.T) {
resetRes := node.RunIPFS("swarm", "limit", "system", "--reset", "--enc=json")
require.Equal(t, 0, resetRes.ExitCode())
limitRes := node.RunIPFS("swarm", "limit", "system", "--enc=json")
require.Equal(t, 0, limitRes.ExitCode())
assert.Equal(t, resetRes.Stdout.Bytes(), limitRes.Stdout.Bytes())
})
t.Run("swarm stats system with filter should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "stats", "system", "--min-used-limit-perc=99")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], `Error: "min-used-limit-perc" can only be used when scope is "all"`)
})
t.Run("swarm limit reset on map values should work", func(t *testing.T) {
resetRes := node.RunIPFS("swarm", "limit", "peer:12D3KooWL7i1T9VSPeF8AgQApbyM51GNKZsYPvNvL347aMDmvNzG", "--reset", "--enc=json")
require.Equal(t, 0, resetRes.ExitCode())
limitRes := node.RunIPFS("swarm", "limit", "peer:12D3KooWL7i1T9VSPeF8AgQApbyM51GNKZsYPvNvL347aMDmvNzG", "--enc=json")
require.Equal(t, 0, limitRes.ExitCode())
assert.Equal(t, resetRes.Stdout.Bytes(), limitRes.Stdout.Bytes())
})
t.Run("scope is required using reset flags", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "--reset")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.Lines()[0], `Error: argument "scope" is required`)
assert.NotEqual(t, limits.Transient.Memory, rcmgr.BlockAllLimit64)
assert.NotEqual(t, limits.System.Memory, rcmgr.BlockAllLimit64)
assert.NotEqual(t, limits.System.FD, rcmgr.BlockAllLimit)
assert.NotEqual(t, limits.System.Conns, rcmgr.BlockAllLimit)
assert.NotEqual(t, limits.System.ConnsInbound, rcmgr.BlockAllLimit)
assert.NotEqual(t, limits.System.ConnsOutbound, rcmgr.BlockAllLimit)
assert.NotEqual(t, limits.System.Streams, rcmgr.BlockAllLimit)
assert.NotEqual(t, limits.System.StreamsInbound, rcmgr.BlockAllLimit)
assert.NotEqual(t, limits.System.StreamsOutbound, rcmgr.BlockAllLimit)
})
t.Run("swarm stats works", func(t *testing.T) {
res := node.RunIPFS("swarm", "stats", "all", "--enc=json")
res := node.RunIPFS("swarm", "resources", "--enc=json")
require.Equal(t, 0, res.ExitCode())
stats := libp2p.NetStatOut{}
err := json.Unmarshal(res.Stdout.Bytes(), &stats)
require.NoError(t, err)
limits := unmarshalLimits(t, res.Stdout.Bytes())
// every scope has the same fields, so we only inspect system
assert.Equal(t, rcmgr.LimitVal64(0), stats.System.Memory)
assert.Equal(t, rcmgr.LimitVal(0), stats.System.FD)
assert.Equal(t, rcmgr.LimitVal(0), stats.System.Conns)
assert.Equal(t, rcmgr.LimitVal(0), stats.System.ConnsInbound)
assert.Equal(t, rcmgr.LimitVal(0), stats.System.ConnsOutbound)
assert.Equal(t, rcmgr.LimitVal(0), stats.System.Streams)
assert.Equal(t, rcmgr.LimitVal(0), stats.System.StreamsInbound)
assert.Equal(t, rcmgr.LimitVal(0), stats.System.StreamsOutbound)
assert.Equal(t, rcmgr.LimitVal64(0), stats.Transient.Memory)
assert.Zero(t, limits.System.MemoryUsage)
assert.Zero(t, limits.System.FDUsage)
assert.Zero(t, limits.System.ConnsInboundUsage)
assert.Zero(t, limits.System.ConnsOutboundUsage)
assert.Zero(t, limits.System.StreamsInboundUsage)
assert.Zero(t, limits.System.StreamsOutboundUsage)
assert.Zero(t, limits.Transient.MemoryUsage)
})
})
t.Run("set system conns limit while daemon is not running", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
res := node.RunIPFS("config", "--json", "Swarm.ResourceMgr.Limits.System.Conns", "99999")
require.Equal(t, 0, res.ExitCode())
t.Run("set an invalid limit which should result in a failure", func(t *testing.T) {
res := node.RunIPFS("config", "--json", "Swarm.ResourceMgr.Limits.System.Conns", "asdf")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.String(), "failed to unmarshal")
})
node.StartDaemon()
t.Run("new system conns limit is applied", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "system", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, limits.Conns, rcmgr.LimitVal(99999))
})
})
t.Run("set the system memory limit while the daemon is running", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "system", func(limits *rcmgr.ResourceLimits) {
limits.Memory = 99998
})
assert.Equal(t, rcmgr.LimitVal64(99998), node.ReadConfig().Swarm.ResourceMgr.Limits.System.Memory)
res := node.RunIPFS("swarm", "limit", "system", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(99998), limits.Memory)
})
t.Run("smoke test transient scope", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "transient", func(limits *rcmgr.ResourceLimits) {
limits.Memory = 88888
node := harness.NewT(t).NewNode().Init()
node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) {
overrides.Transient.Memory = 88888
})
node.StartDaemon()
res := node.RunIPFS("swarm", "limit", "transient", "--enc=json")
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(88888), limits.Memory)
assert.Equal(t, rcmgr.LimitVal64(88888), limits.Transient.Memory)
})
t.Run("smoke test service scope", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "svc:foo", func(limits *rcmgr.ResourceLimits) {
limits.Memory = 77777
node := harness.NewT(t).NewNode().Init()
node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) {
overrides.Service = map[string]rcmgr.ResourceLimits{"foo": {Memory: 77777}}
})
node.StartDaemon()
res := node.RunIPFS("swarm", "limit", "svc:foo", "--enc=json")
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(77777), limits.Memory)
assert.Equal(t, rcmgr.LimitVal64(77777), limits.Services["foo"].Memory)
})
t.Run("smoke test protocol scope", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "proto:foo", func(limits *rcmgr.ResourceLimits) {
limits.Memory = 66666
node := harness.NewT(t).NewNode().Init()
node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) {
overrides.Protocol = map[protocol.ID]rcmgr.ResourceLimits{"foo": {Memory: 66666}}
})
node.StartDaemon()
res := node.RunIPFS("swarm", "limit", "proto:foo", "--enc=json")
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(66666), limits.Memory)
assert.Equal(t, rcmgr.LimitVal64(66666), limits.Protocols["foo"].Memory)
})
t.Run("smoke test peer scope", func(t *testing.T) {
validPeerID := "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN"
node := harness.NewT(t).NewNode().Init().StartDaemon()
updateLimitsWithFile(t, node, "peer:"+validPeerID, func(limits *rcmgr.ResourceLimits) {
limits.Memory = 66666
validPeerID, err := peer.Decode("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN")
assert.NoError(t, err)
node := harness.NewT(t).NewNode().Init()
node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) {
overrides.Peer = map[peer.ID]rcmgr.ResourceLimits{validPeerID: {Memory: 55555}}
})
node.StartDaemon()
res := node.RunIPFS("swarm", "limit", "peer:"+validPeerID, "--enc=json")
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
assert.Equal(t, rcmgr.LimitVal64(66666), limits.Memory)
t.Parallel()
t.Run("getting limit for invalid peer ID fails", func(t *testing.T) {
res := node.RunIPFS("swarm", "limit", "peer:foo")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.String(), "invalid peer ID")
})
t.Run("setting limit for invalid peer ID fails", func(t *testing.T) {
filename := "invalid-peer-id.json"
node.WriteBytes(filename, []byte(`{"Memory":"99"}`))
res := node.RunIPFS("swarm", "limit", "peer:foo", filename)
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stderr.String(), "invalid peer ID")
})
assert.Equal(t, rcmgr.LimitVal64(55555), limits.Peers[validPeerID].Memory)
})
t.Run("", func(t *testing.T) {
@ -258,20 +178,20 @@ func TestRcmgr(t *testing.T) {
// peerID0, peerID1, peerID2 := node0.PeerID(), node1.PeerID(), node2.PeerID()
peerID1, peerID2 := node1.PeerID().String(), node2.PeerID().String()
node0.UpdateConfig(func(cfg *config.Config) {
node0.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Conns: rcmgr.BlockAllLimit,
ConnsInbound: rcmgr.BlockAllLimit,
ConnsOutbound: rcmgr.BlockAllLimit,
},
}
cfg.Swarm.ResourceMgr.Enabled = config.True
cfg.Swarm.ResourceMgr.Allowlist = []string{"/ip4/0.0.0.0/ipcidr/0/p2p/" + peerID2}
})
nodes.StartDaemons()
// change system limits on node 0
updateLimitsWithFile(t, node0, "system", func(limits *rcmgr.ResourceLimits) {
limits.Conns = rcmgr.BlockAllLimit
limits.ConnsInbound = rcmgr.BlockAllLimit
limits.ConnsOutbound = rcmgr.BlockAllLimit
})
t.Parallel()
t.Run("node 0 should fail to connect to node 1", func(t *testing.T) {
res := node0.Runner.Run(harness.RunRequest{
@ -306,9 +226,10 @@ func TestRcmgr(t *testing.T) {
t.Parallel()
t.Run("system conns", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
cfg.Swarm.ResourceMgr.Limits.System.Conns = 128
node.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{Conns: 128},
}
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64)
})
@ -318,9 +239,10 @@ func TestRcmgr(t *testing.T) {
})
t.Run("system conns inbound", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
cfg.Swarm.ResourceMgr.Limits.System.ConnsInbound = 128
node.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{ConnsInbound: 128},
}
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64)
})
@ -330,9 +252,10 @@ func TestRcmgr(t *testing.T) {
})
t.Run("system streams", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
cfg.Swarm.ResourceMgr.Limits.System.Streams = 128
node.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{Streams: 128},
}
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64)
})
@ -342,9 +265,10 @@ func TestRcmgr(t *testing.T) {
})
t.Run("system streams inbound", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.PartialLimitConfig{}
cfg.Swarm.ResourceMgr.Limits.System.StreamsInbound = 128
node.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) {
*overrides = rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{StreamsInbound: 128},
}
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128)
cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64)
})
@ -355,22 +279,8 @@ func TestRcmgr(t *testing.T) {
})
}
func updateLimitsWithFile(t *testing.T, node *harness.Node, limit string, f func(*rcmgr.ResourceLimits)) {
filename := limit + ".json"
res := node.RunIPFS("swarm", "limit", limit)
limits := unmarshalLimits(t, res.Stdout.Bytes())
f(limits)
limitsOut, err := json.Marshal(limits)
require.NoError(t, err)
node.WriteBytes(filename, limitsOut)
res = node.RunIPFS("swarm", "limit", limit, filename)
assert.Equal(t, 0, res.ExitCode())
}
func unmarshalLimits(t *testing.T, b []byte) *rcmgr.ResourceLimits {
limits := &rcmgr.ResourceLimits{}
func unmarshalLimits(t *testing.T, b []byte) *libp2p.LimitsConfigAndUsage {
limits := &libp2p.LimitsConfigAndUsage{}
err := json.Unmarshal(b, limits)
require.NoError(t, err)
return limits

View File

@ -81,12 +81,11 @@ test_expect_success "ipfs daemon output looks good" '
echo "Initializing daemon..." >expected_daemon &&
ipfs version --all >> expected_daemon &&
echo "" >>expected_daemon &&
echo "Computing default go-libp2p Resource Manager limits based on:" >>expected_daemon &&
echo "Computed default go-libp2p Resource Manager limits based on:" >>expected_daemon &&
echo " - '"'"'Swarm.ResourceMgr.MaxMemory'"'"': \"4GB\"" >>expected_daemon &&
echo " - '"'"'Swarm.ResourceMgr.MaxFileDescriptors'"'"': 1024" >>expected_daemon &&
echo "" >>expected_daemon &&
echo "Applying any user-supplied overrides on top." >>expected_daemon &&
echo "Run '"'"'ipfs swarm limit all'"'"' to see the resulting limits." >>expected_daemon &&
echo "Theses can be inspected with '"'"'ipfs swarm resources'"'"'." >>expected_daemon &&
echo "" >>expected_daemon &&
sed "s/^/Swarm listening on /" listen_addrs >>expected_daemon &&
sed "s/^/Swarm announcing /" local_addrs >>expected_daemon &&