feat: Provider.WorkerCount and stats reprovide (#10779)
Some checks are pending
CodeQL / codeql (push) Waiting to run
Docker Build / docker-build (push) Waiting to run
Gateway Conformance / gateway-conformance (push) Waiting to run
Gateway Conformance / gateway-conformance-libp2p-experiment (push) Waiting to run
Go Build / go-build (push) Waiting to run
Go Check / go-check (push) Waiting to run
Go Lint / go-lint (push) Waiting to run
Go Test / go-test (push) Waiting to run
Interop / interop-prep (push) Waiting to run
Interop / helia-interop (push) Blocked by required conditions
Interop / ipfs-webui (push) Blocked by required conditions
Sharness / sharness-test (push) Waiting to run
Spell Check / spellcheck (push) Waiting to run

* adjust ipfs stats provide
* update boxo dep
* bump boxo
* fixing tests
* docs/chore: mark stat reprovide as experimental
* docs: Provider.Strategy

explicitly document it is not used - without this legacy users will have
it in their config and be very confused

---------

Co-authored-by: Marcin Rataj <lidel@lidel.org>
This commit is contained in:
Guillaume Michel 2025-04-30 13:32:03 +00:00 committed by GitHub
parent 05565083df
commit a5997375db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 213 additions and 60 deletions

View File

@ -485,6 +485,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
// This should never happen, but better safe than sorry
log.Fatal("Private network does not work with Routing.Type=auto. Update your config to Routing.Type=dht (or none, and do manual peering)")
}
if cfg.Provider.Strategy.WithDefault("") != "" && cfg.Reprovider.Strategy.IsDefault() {
log.Fatal("Invalid config. Remove unused Provider.Strategy and set Reprovider.Strategy instead. Documentation: https://github.com/ipfs/kubo/blob/master/docs/config.md#reproviderstrategy")
}
printLibp2pPorts(node)

View File

@ -134,9 +134,9 @@ func TestCheckKey(t *testing.T) {
t.Fatal("Foo.Bar isn't a valid key in the config")
}
err = CheckKey("Provider.Strategy")
err = CheckKey("Reprovider.Strategy")
if err != nil {
t.Fatalf("%s: %s", err, "Provider.Strategy is a valid key in the config")
t.Fatalf("%s: %s", err, "Reprovider.Strategy is a valid key in the config")
}
err = CheckKey("Provider.Foo")

View File

@ -1,5 +1,12 @@
package config
const (
DefaultProviderWorkerCount = 64
)
// Provider configuration describes how NEW CIDs are announced the moment they are created.
// For periodical reprovide configuration, see Reprovider.*
type Provider struct {
Strategy string // Which keys to announce
Strategy *OptionalString `json:",omitempty"` // Unused, you are likely looking for Reprovider.Strategy instead
WorkerCount *OptionalInteger `json:",omitempty"` // Number of concurrent provides allowed, 0 means unlimited
}

View File

@ -7,6 +7,8 @@ const (
DefaultReproviderStrategy = "all"
)
// Reprovider configuration describes how CID from local datastore are periodically re-announced to routing systems.
// For provide behavior of ad-hoc or newly created CIDs and their first-time announcement, see Provider.*
type Reprovider struct {
Interval *OptionalDuration `json:",omitempty"` // Time period to reprovide locally stored objects to the network
Strategy *OptionalString `json:",omitempty"` // Which keys to announce

View File

@ -184,6 +184,7 @@ func TestCommands(t *testing.T) {
"/stats/bw",
"/stats/dht",
"/stats/provide",
"/stats/reprovide",
"/stats/repo",
"/swarm",
"/swarm/addrs",

View File

@ -27,11 +27,12 @@ for your IPFS node.`,
},
Subcommands: map[string]*cmds.Command{
"bw": statBwCmd,
"repo": repoStatCmd,
"bitswap": bitswapStatCmd,
"dht": statDhtCmd,
"provide": statProvideCmd,
"bw": statBwCmd,
"repo": repoStatCmd,
"bitswap": bitswapStatCmd,
"dht": statDhtCmd,
"provide": statProvideCmd,
"reprovide": statReprovideCmd,
},
}

View File

@ -4,28 +4,20 @@ import (
"fmt"
"io"
"text/tabwriter"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/ipfs/boxo/provider"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"golang.org/x/exp/constraints"
)
type reprovideStats struct {
provider.ReproviderStats
fullRT bool
}
var statProvideCmd = &cmds.Command{
Status: cmds.Deprecated,
Helptext: cmds.HelpText{
Tagline: "Returns statistics about the node's (re)provider system.",
Tagline: "Deprecated command, use 'ipfs stats reprovide' instead.",
ShortDescription: `
Returns statistics about the content the node is advertising.
This interface is not stable and may change from release to release.
'ipfs stats provide' is deprecated because provide and reprovide operations
are now distinct. This command may be replaced by provide only stats in the
future.
`,
},
Arguments: []cmds.Argument{},
@ -57,8 +49,8 @@ This interface is not stable and may change from release to release.
wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
defer wtr.Flush()
fmt.Fprintf(wtr, "TotalReprovides:\t%s\n", humanNumber(s.TotalReprovides))
fmt.Fprintf(wtr, "AvgReprovideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration))
fmt.Fprintf(wtr, "TotalProvides:\t%s\n", humanNumber(s.TotalReprovides))
fmt.Fprintf(wtr, "AvgProvideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration))
fmt.Fprintf(wtr, "LastReprovideDuration:\t%s\n", humanDuration(s.LastReprovideDuration))
if !s.LastRun.IsZero() {
fmt.Fprintf(wtr, "LastRun:\t%s\n", humanTime(s.LastRun))
@ -71,30 +63,3 @@ This interface is not stable and may change from release to release.
},
Type: reprovideStats{},
}
func humanDuration(val time.Duration) string {
return val.Truncate(time.Microsecond).String()
}
func humanTime(val time.Time) string {
return val.Format("2006-01-02 15:04:05")
}
func humanNumber[T constraints.Float | constraints.Integer](n T) string {
nf := float64(n)
str := humanSI(nf, 0)
fullStr := humanFull(nf, 0)
if str != fullStr {
return fmt.Sprintf("%s\t(%s)", str, fullStr)
}
return str
}
func humanSI(val float64, decimals int) string {
v, unit := humanize.ComputeSI(val)
return fmt.Sprintf("%s%s", humanFull(v, decimals), unit)
}
func humanFull(val float64, decimals int) string {
return humanize.CommafWithDigits(val, decimals)
}

View File

@ -0,0 +1,104 @@
package commands
import (
"fmt"
"io"
"text/tabwriter"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/ipfs/boxo/provider"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"golang.org/x/exp/constraints"
)
type reprovideStats struct {
provider.ReproviderStats
fullRT bool
}
var statReprovideCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Returns statistics about the node's reprovider system.",
ShortDescription: `
Returns statistics about the content the node is reproviding every
Reprovider.Interval according to Reprovider.Strategy:
https://github.com/ipfs/kubo/blob/master/docs/config.md#reprovider
This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
return err
}
if !nd.IsOnline {
return ErrNotOnline
}
stats, err := nd.Provider.Stat()
if err != nil {
return err
}
_, fullRT := nd.DHTClient.(*fullrt.FullRT)
if err := res.Emit(reprovideStats{stats, fullRT}); err != nil {
return err
}
return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s reprovideStats) error {
wtr := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
defer wtr.Flush()
fmt.Fprintf(wtr, "TotalReprovides:\t%s\n", humanNumber(s.TotalReprovides))
fmt.Fprintf(wtr, "AvgReprovideDuration:\t%s\n", humanDuration(s.AvgReprovideDuration))
fmt.Fprintf(wtr, "LastReprovideDuration:\t%s\n", humanDuration(s.LastReprovideDuration))
if !s.LastRun.IsZero() {
fmt.Fprintf(wtr, "LastReprovide:\t%s\n", humanTime(s.LastRun))
if s.fullRT {
fmt.Fprintf(wtr, "NextReprovide:\t%s\n", humanTime(s.LastRun.Add(s.ReprovideInterval)))
}
}
return nil
}),
},
Type: reprovideStats{},
}
func humanDuration(val time.Duration) string {
return val.Truncate(time.Microsecond).String()
}
func humanTime(val time.Time) string {
return val.Format("2006-01-02 15:04:05")
}
func humanNumber[T constraints.Float | constraints.Integer](n T) string {
nf := float64(n)
str := humanSI(nf, 0)
fullStr := humanFull(nf, 0)
if str != fullStr {
return fmt.Sprintf("%s\t(%s)", str, fullStr)
}
return str
}
func humanSI(val float64, decimals int) string {
v, unit := humanize.ComputeSI(val)
return fmt.Sprintf("%s%s", humanFull(v, decimals), unit)
}
func humanFull(val float64, decimals int) string {
return humanize.CommafWithDigits(val, decimals)
}

View File

@ -359,6 +359,7 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy),
cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval),
cfg.Routing.AcceleratedDHTClient.WithDefault(config.DefaultAcceleratedDHTClient),
int(cfg.Provider.WorkerCount.WithDefault(config.DefaultProviderWorkerCount)),
),
)
}

View File

@ -21,12 +21,13 @@ import (
// and in 'ipfs stats provide' report.
const sampledBatchSize = 1000
func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool) fx.Option {
func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option {
return fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, keyProvider provider.KeyChanFunc, repo repo.Repo, bs blockstore.Blockstore) (provider.System, error) {
opts := []provider.Option{
provider.Online(cr),
provider.ReproviderInterval(reprovideInterval),
provider.KeyProvider(keyProvider),
provider.ProvideWorkerCount(provideWorkerCount),
}
if !acceleratedDHTClient && reprovideInterval > 0 {
// The estimation kinda suck if you are running with accelerated DHT client,
@ -131,7 +132,7 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtcli
// ONLINE/OFFLINE
// OnlineProviders groups units managing provider routing records online
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool) fx.Option {
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option {
if useStrategicProviding {
return OfflineProviders()
}
@ -146,7 +147,7 @@ func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, repro
return fx.Options(
keyProvider,
ProviderSys(reprovideInterval, acceleratedDHTClient),
ProviderSys(reprovideInterval, acceleratedDHTClient, provideWorkerCount),
)
}
@ -169,7 +170,6 @@ func mfsProvider(mfsRoot *mfs.Root, fetcher fetcher.Factory) provider.KeyChanFun
kcf := provider.NewDAGProvider(rootNode.Cid(), fetcher)
return kcf(ctx)
}
}
func mfsRootProvider(mfsRoot *mfs.Root) provider.KeyChanFunc {

View File

@ -17,6 +17,8 @@ This release was brought to you by the [Shipyard](http://ipshipyard.com/) team.
- [New `ipfs add` Options](#new-ipfs-add-options)
- [Persistent `Import.*` Configuration](#persistent-import-configuration)
- [Updated Configuration Profiles](#updated-configuration-profiles)
- [Optimized, dedicated queue for providing fresh CIDs](#optimized-dedicated-queue-for-providing-fresh-cids)
- [Deprecated `ipfs stats provider`](#deprecated-ipfs-stats-provider)
- [📦️ Important dependency updates](#-important-dependency-updates)
- [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors)
@ -81,6 +83,36 @@ The release updated configuration [profiles](https://github.com/ipfs/kubo/blob/m
> [!TIP]
> Apply one of CIDv1 test [profiles](https://github.com/ipfs/kubo/blob/master/docs/config.md#profiles) with `ipfs config profile apply test-cid-v1[-wide]`.
#### Optimized, dedicated queue for providing fresh CIDs
From `kubo` [`v0.33.0`](https://github.com/ipfs/kubo/releases/tag/v0.33.0),
Bitswap stopped advertising newly added and received blocks to the DHT. Since
then `boxo/provider` is responsible for the provide and reprovide logic. Prior
to `v0.35.0`, provides and reprovides were handled together in batches, leading
to delays in initial advertisements (provides).
Provides and Reprovides now have separate queues, allowing for immediate
provide of new CIDs and optimised batching of reprovides.
This change introduces a new configuration option for limiting the number of
concurrent provide operations:
[`Provider.WorkerCount`](https://github.com/ipfs/kubo/blob/master/docs/config.md#providerworkercount).
> [!TIP]
> Users who need to provide large volumes of content immediately should consider removing the cap on concurrent provide operations and also set `Routing.AcceleratedDHTClient` to `true`.
##### Deprecated `ipfs stats provider`
Since the `ipfs stats provider` command was displaying statistics for both
provides and reprovides, this command isn't relevant anymore after separating
the two queues.
The successor command is `ipfs stats reprovide`, showing the same statistics,
but for reprovides only.
> [!NOTE]
> `ipfs stats provider` still works, but is marked as deprecated and will be removed in a future release. Be mindful that the command provides only statistics about reprovides (similar to `ipfs stats reprovide`) and not the new provide queue (this will be fixed as a part of wider refactor planned for a future release).
#### 📦️ Important dependency updates
- update `boxo` to [v0.30.0](https://github.com/ipfs/boxo/releases/tag/v0.30.0)

View File

@ -105,6 +105,9 @@ config file at runtime.
- [`Pinning.RemoteServices: Policies.MFS.Enabled`](#pinningremoteservices-policiesmfsenabled)
- [`Pinning.RemoteServices: Policies.MFS.PinName`](#pinningremoteservices-policiesmfspinname)
- [`Pinning.RemoteServices: Policies.MFS.RepinInterval`](#pinningremoteservices-policiesmfsrepininterval)
- [`Provider`](#provider)
- [`Provider.Strategy`](#providerstrategy)
- [`Provider.WorkerCount`](#providerworkercount)
- [`Pubsub`](#pubsub)
- [`Pubsub.Enabled`](#pubsubenabled)
- [`Pubsub.Router`](#pubsubrouter)
@ -207,6 +210,7 @@ config file at runtime.
- [`announce-on` profile](#announce-on-profile)
- [`legacy-cid-v0` profile](#legacy-cid-v0-profile)
- [`test-cid-v1` profile](#test-cid-v1-profile)
- [`test-cid-v1-wide` profile](#test-cid-v1-wide-profile)
- [Types](#types)
- [`flag`](#flag)
- [`priority`](#priority)
@ -1404,6 +1408,39 @@ Default: `"5m"`
Type: `duration`
## `Provider`
Configuration applied to the initial one-time announcement of fresh CIDs
created with `ipfs add`, `ipfs files`, `ipfs dag import`, `ipfs block|dag put`
commands.
For periodical DHT reprovide settings, see [`Reprovide.*`](#reprovider).
### `Provider.Strategy`
Legacy, not used at the moment, see [`Reprovider.Strategy`](#reproviderstrategy) instead.
### `Provider.WorkerCount`
Sets the maximum number of _concurrent_ DHT provide operations. DHT reprovides
operations do **not** count against that limit. A value of `0` allows an
unlimited number of provide workers.
If the [accelerated DHT client](#routingaccelerateddhtclient) is enabled, each
provide operation opens ~20 connections in parallel. With the standard DHT
client (accelerated disabled), each provide opens between 20 and 60
connections, with at most 10 active at once. Provides complete more quickly
when using the accelerated client. Be mindful of how many simultaneous
connections this setting can generate.
For nodes without strict connection limits that need to provide large volumes
of content immediately, we recommend enabling the `Routing.AcceleratedDHTClient` and
setting `Provider.WorkerCount` to `0` (unlimited).
Default: `64`
Type: `integer` (non-negative; `0` means unlimited number of workers)
## `Pubsub`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)

View File

@ -36,7 +36,7 @@ test_expect_success "docker image build succeeds" '
'
test_expect_success "write init scripts" '
echo "ipfs config Provider.Strategy Bar" > 001.sh &&
echo "ipfs config Mounts.IPFS Bar" > 001.sh &&
echo "ipfs config Pubsub.Router Qux" > 002.sh &&
chmod +x 002.sh
'
@ -65,7 +65,7 @@ test_expect_success "check that init scripts were run correctly and in the corre
test_expect_success "check that init script configs were applied" '
echo Bar > expected &&
docker exec "$DOC_ID" ipfs config Provider.Strategy > actual &&
docker exec "$DOC_ID" ipfs config Mounts.IPFS > actual &&
test_cmp actual expected &&
echo Qux > expected &&
docker exec "$DOC_ID" ipfs config Pubsub.Router > actual &&

View File

@ -11,12 +11,12 @@ test_description="Test user-provided config values"
test_init_ipfs
test_expect_success "bootstrap doesn't overwrite user-provided config keys (top-level)" '
ipfs config Provider.Strategy >previous &&
ipfs config Provider.Strategy foo &&
ipfs config Identity.PeerID >previous &&
ipfs config Identity.PeerID foo &&
ipfs bootstrap rm --all &&
echo "foo" >expected &&
ipfs config Provider.Strategy >actual &&
ipfs config Provider.Strategy $(cat previous) &&
ipfs config Identity.PeerID >actual &&
ipfs config Identity.PeerID $(cat previous) &&
test_cmp expected actual
'