From dc2bb67d225023e2674135ab5bfc17359ba8cacb Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Thu, 21 Aug 2025 16:19:05 +0200 Subject: [PATCH] extended tests --- core/node/provider.go | 236 ++++++++++--------- docs/examples/kubo-as-a-library/go.mod | 2 +- docs/examples/kubo-as-a-library/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- test/cli/provider_test.go | 313 ++++++++++++++----------- test/dependencies/go.mod | 2 +- test/dependencies/go.sum | 4 +- 8 files changed, 317 insertions(+), 250 deletions(-) diff --git a/core/node/provider.go b/core/node/provider.go index 6b94eb966..2c4942b6f 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -44,6 +44,7 @@ type NoopProvider struct{} func (r *NoopProvider) StartProviding(bool, ...mh.Multihash) {} func (r *NoopProvider) ProvideOnce(...mh.Multihash) {} func (r *NoopProvider) Clear() int { return 0 } +func (r *NoopProvider) RefreshSchedule() {} // DHTProvider is an interface for providing keys to a DHT swarm. It holds a // state of keys to be advertised, and is responsible for periodically @@ -71,6 +72,7 @@ type DHTProvider interface { // Clear clears the all the keys from the provide queue and returns // the number of keys that were cleared. Clear() int + RefreshSchedule() } var ( @@ -119,62 +121,65 @@ func (r *BurstProvider) Clear() int { return r.System.Clear() } +func (r *BurstProvider) RefreshSchedule() {} + // BurstProviderOpt creates a BurstProvider to be used as provider in the // IpfsNode func BurstProviderOpt(reprovideInterval time.Duration, strategy string, acceleratedDHTClient bool, provideWorkerCount int) fx.Option { - system := fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, repo repo.Repo) (provider.System, DHTProvider, error) { - // Initialize provider.System first, before pinner/blockstore/etc. - // The KeyChanFunc will be set later via SetKeyProvider() once we have - // created the pinner, blockstore and other dependencies. - opts := []provider.Option{ - provider.Online(cr), - provider.ReproviderInterval(reprovideInterval), - provider.ProvideWorkerCount(provideWorkerCount), - } - if !acceleratedDHTClient && reprovideInterval > 0 { - // The estimation kinda suck if you are running with accelerated DHT client, - // given this message is just trying to push people to use the acceleratedDHTClient - // let's not report on through if it's in use - opts = append(opts, - provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool { - avgProvideSpeed := duration / time.Duration(keysProvided) - count := uint64(keysProvided) + system := fx.Provide( + fx.Annotate(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, repo repo.Repo) (*BurstProvider, error) { + // Initialize provider.System first, before pinner/blockstore/etc. + // The KeyChanFunc will be set later via SetKeyProvider() once we have + // created the pinner, blockstore and other dependencies. + opts := []provider.Option{ + provider.Online(cr), + provider.ReproviderInterval(reprovideInterval), + provider.ProvideWorkerCount(provideWorkerCount), + } + if !acceleratedDHTClient && reprovideInterval > 0 { + // The estimation kinda suck if you are running with accelerated DHT client, + // given this message is just trying to push people to use the acceleratedDHTClient + // let's not report on through if it's in use + opts = append(opts, + provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool { + avgProvideSpeed := duration / time.Duration(keysProvided) + count := uint64(keysProvided) - if !reprovide || !complete { - // We don't know how many CIDs we have to provide, try to fetch it from the blockstore. - // But don't try for too long as this might be very expensive if you have a huge datastore. - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) - defer cancel() + if !reprovide || !complete { + // We don't know how many CIDs we have to provide, try to fetch it from the blockstore. + // But don't try for too long as this might be very expensive if you have a huge datastore. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() - // FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup. - // Note: talk to datastore directly, as to not depend on Blockstore here. - qr, err := repo.Datastore().Query(ctx, query.Query{ - Prefix: blockstore.BlockPrefix.String(), - KeysOnly: true, - }) - if err != nil { - logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err) - return false - } - defer qr.Close() - count = 0 - countLoop: - for { - select { - case _, ok := <-qr.Next(): - if !ok { - break countLoop - } - count++ - case <-ctx.Done(): - // really big blockstore mode + // FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup. + // Note: talk to datastore directly, as to not depend on Blockstore here. + qr, err := repo.Datastore().Query(ctx, query.Query{ + Prefix: blockstore.BlockPrefix.String(), + KeysOnly: true, + }) + if err != nil { + logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err) + return false + } + defer qr.Close() + count = 0 + countLoop: + for { + select { + case _, ok := <-qr.Next(): + if !ok { + break countLoop + } + count++ + case <-ctx.Done(): + // really big blockstore mode - // how many blocks would be in a 10TiB blockstore with 128KiB blocks. - const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024) - // How long per block that lasts us. - expectedProvideSpeed := reprovideInterval / probableBigBlockstore - if avgProvideSpeed > expectedProvideSpeed { - logger.Errorf(` + // how many blocks would be in a 10TiB blockstore with 128KiB blocks. + const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024) + // How long per block that lasts us. + expectedProvideSpeed := reprovideInterval / probableBigBlockstore + if avgProvideSpeed > expectedProvideSpeed { + logger.Errorf(` 🔔🔔🔔 YOU MAY BE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔 ⚠️ Your system might be struggling to keep up with DHT reprovides! @@ -189,21 +194,21 @@ size of 10TiB, it would take %v to provide the complete set. 💡 Consider enabling the Accelerated DHT to enhance your system performance. See: https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient`, - keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore, reprovideInterval) - return false + keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore, reprovideInterval) + return false + } } } } - } - // How long per block that lasts us. - expectedProvideSpeed := reprovideInterval - if count > 0 { - expectedProvideSpeed = reprovideInterval / time.Duration(count) - } + // How long per block that lasts us. + expectedProvideSpeed := reprovideInterval + if count > 0 { + expectedProvideSpeed = reprovideInterval / time.Duration(count) + } - if avgProvideSpeed > expectedProvideSpeed { - logger.Errorf(` + if avgProvideSpeed > expectedProvideSpeed { + logger.Errorf(` 🔔🔔🔔 YOU ARE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔 ⚠️ Your system is struggling to keep up with DHT reprovides! @@ -216,29 +221,31 @@ We observed that you recently provided %d keys at an average rate of %v per key. 💡 Consider enabling the Accelerated DHT to enhance your reprovide throughput. See: https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient`, - keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count), reprovideInterval) - } - return false - }, sampledBatchSize)) - } + keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count), reprovideInterval) + } + return false + }, sampledBatchSize)) + } - sys, err := provider.New(repo.Datastore(), opts...) - if err != nil { - return nil, nil, err - } + sys, err := provider.New(repo.Datastore(), opts...) + if err != nil { + return nil, err + } + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return sys.Close() + }, + }) - handleStrategyChange(strategy, sys, repo.Datastore()) + prov := &BurstProvider{sys} + handleStrategyChange(strategy, prov, repo.Datastore()) - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return sys.Close() - }, - }) - - prov := &BurstProvider{sys} - - return prov, prov, nil - }) + return prov, nil + }, + fx.As(new(provider.System)), + fx.As(new(DHTProvider)), + ), + ) setKeyProvider := fx.Invoke(func(lc fx.Lifecycle, system provider.System, keyProvider provider.KeyChanFunc) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { @@ -259,26 +266,28 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtcli } func SweepingProvider(cfg *config.Config) fx.Option { - keyStore := fx.Provide(func(repo repo.Repo) (*rds.KeyStore, error) { - return rds.NewKeyStore(repo.Datastore(), + type providerInput struct { + fx.In + DHT routing.Routing `name:"dhtc"` + Repo repo.Repo + } + fmt.Println("SweepingProvider") + sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *rds.KeyStore, error) { + keyStore, err := rds.NewKeyStore(in.Repo.Datastore(), rds.WithPrefixBits(10), rds.WithDatastorePrefix("/reprovider/mhs"), rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))), rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize))), ) - }) + if err != nil { + return nil, nil, err + } - type input struct { - fx.In - DHT routing.Routing `name:"dhtc"` - KeyStore *rds.KeyStore - } - sweepingReprovider := fx.Provide(func(in input) (DHTProvider, error) { switch dht := in.DHT.(type) { case *dual.DHT: if dht != nil { prov, err := ddhtprovider.New(dht, - ddhtprovider.WithKeyStore(in.KeyStore), + ddhtprovider.WithKeyStore(keyStore), ddhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)), ddhtprovider.WithMaxReprovideDelay(time.Hour), @@ -289,16 +298,18 @@ func SweepingProvider(cfg *config.Config) fx.Option { ddhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Reprovider.Sweep.MaxProvideConnsPerWorker.WithDefault(config.DefaultReproviderSweepMaxProvideConnsPerWorker))), ) if err != nil { - return nil, err + return nil, nil, err } // Add keys from the KeyStore to the schedule - prov.RefreshSchedule() - return prov, nil + // prov.RefreshSchedule() + return prov, keyStore, nil + // _ = prov + // return &NoopProvider{}, nil } case *fullrt.FullRT: if dht != nil { prov, err := dhtprovider.New( - dhtprovider.WithKeyStore(in.KeyStore), + dhtprovider.WithKeyStore(keyStore), dhtprovider.WithRouter(dht), dhtprovider.WithMessageSender(dht.MessageSender()), @@ -320,42 +331,47 @@ func SweepingProvider(cfg *config.Config) fx.Option { dhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Reprovider.Sweep.MaxProvideConnsPerWorker.WithDefault(config.DefaultReproviderSweepMaxProvideConnsPerWorker))), ) if err != nil { - return nil, err + return nil, nil, err } // Add keys from the KeyStore to the schedule - prov.RefreshSchedule() - return prov, nil + // prov.RefreshSchedule() + return prov, keyStore, nil } } - return &NoopProvider{}, nil + return &NoopProvider{}, keyStore, nil }) - initKeyStore := fx.Invoke(func(lc fx.Lifecycle, keyStore *rds.KeyStore, keyProvider provider.KeyChanFunc) { + type keystoreInput struct { + fx.In + Provider DHTProvider + KeyStore *rds.KeyStore + KeyProvider provider.KeyChanFunc + } + initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { // TODO: Set GCFunc to keystore - ch, err := keyProvider(ctx) + ch, err := in.KeyProvider(ctx) if err != nil { return err } - return keyStore.ResetCids(ctx, ch) + err = in.KeyStore.ResetCids(ctx, ch) + if err != nil { + return err + } + in.Provider.RefreshSchedule() + return nil }, - }) - }) - - closeKeyStore := fx.Invoke(func(lc fx.Lifecycle, keyStore *rds.KeyStore) { - lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { - return keyStore.Close() + return in.KeyStore.Close() }, }) }) return fx.Options( - keyStore, + // keyStore, sweepingReprovider, initKeyStore, - closeKeyStore, ) } @@ -516,7 +532,7 @@ func persistStrategy(ctx context.Context, strategy string, ds datastore.Datastor // Strategy change detection: when the reproviding strategy changes, // we clear the provide queue to avoid unexpected behavior from mixing // strategies. This ensures a clean transition between different providing modes. -func handleStrategyChange(strategy string, provider provider.System, ds datastore.Datastore) { +func handleStrategyChange(strategy string, provider DHTProvider, ds datastore.Datastore) { ctx := context.Background() previous, changed, err := detectStrategyChange(ctx, strategy, ds) diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index d636f20ec..46fd3fb5c 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -113,7 +113,7 @@ require ( github.com/libp2p/go-doh-resolver v0.5.0 // indirect github.com/libp2p/go-flow-metrics v0.3.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect - github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba // indirect + github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 // indirect github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect github.com/libp2p/go-libp2p-pubsub v0.14.2 // indirect github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index 60ed6cbd0..17ee5a1a3 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -428,8 +428,8 @@ github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl9 github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= -github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba h1:Q+VJpniUaZUhGBAh2MIY3NWZB6deQ0OmCjpPw3iJjOk= -github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk= +github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 h1:LsAanglHrVdoOFQRz2G8VoIMqPHACxhvNGCJoix0G08= +github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s= github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4= diff --git a/go.mod b/go.mod index f291d574b..d3af7b2ed 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/libp2p/go-doh-resolver v0.5.0 github.com/libp2p/go-libp2p v0.43.0 github.com/libp2p/go-libp2p-http v0.5.0 - github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba + github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 github.com/libp2p/go-libp2p-kbucket v0.8.0 github.com/libp2p/go-libp2p-pubsub v0.14.2 github.com/libp2p/go-libp2p-pubsub-router v0.6.0 diff --git a/go.sum b/go.sum index d6c326f57..7e3cdbf7f 100644 --- a/go.sum +++ b/go.sum @@ -514,8 +514,8 @@ github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qk github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA= github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc= github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg= -github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba h1:Q+VJpniUaZUhGBAh2MIY3NWZB6deQ0OmCjpPw3iJjOk= -github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk= +github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 h1:LsAanglHrVdoOFQRz2G8VoIMqPHACxhvNGCJoix0G08= +github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk= github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s= github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4= diff --git a/test/cli/provider_test.go b/test/cli/provider_test.go index 7820fb356..7c62558b8 100644 --- a/test/cli/provider_test.go +++ b/test/cli/provider_test.go @@ -12,17 +12,26 @@ import ( "github.com/stretchr/testify/require" ) -func TestProvider(t *testing.T) { - t.Parallel() +const ( + timeStep = 20 * time.Millisecond + timeout = time.Second +) + +type cfgApplier func(*harness.Node) + +func runProviderSuite(t *testing.T, reprovide bool, apply cfgApplier) { + t.Helper() initNodes := func(t *testing.T, n int, fn func(n *harness.Node)) harness.Nodes { nodes := harness.NewT(t).NewNodes(n).Init() + nodes.ForEachPar(apply) nodes.ForEachPar(fn) return nodes.StartDaemons().Connect() } initNodesWithoutStart := func(t *testing.T, n int, fn func(n *harness.Node)) harness.Nodes { nodes := harness.NewT(t).NewNodes(n).Init() + nodes.ForEachPar(apply) nodes.ForEachPar(fn) return nodes } @@ -35,9 +44,15 @@ func TestProvider(t *testing.T) { } expectProviders := func(t *testing.T, cid, expectedProvider string, nodes ...*harness.Node) { + outerLoop: for _, node := range nodes { - res := node.IPFS("routing", "findprovs", "-n=1", cid) - require.Equal(t, expectedProvider, res.Stdout.Trimmed()) + for i := time.Duration(0); i*timeStep < timeout; i++ { + res := node.IPFS("routing", "findprovs", "-n=1", cid) + if res.Stdout.Trimmed() == expectedProvider { + continue outerLoop + } + } + require.FailNowf(t, "found no providers", "expected a provider for %s", cid) } } @@ -272,182 +287,185 @@ func TestProvider(t *testing.T) { expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) }) - t.Run("Reprovides with 'all' strategy when strategy is '' (empty)", func(t *testing.T) { - t.Parallel() + if reprovide { - nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { - n.SetIPFSConfig("Reprovider.Strategy", "") + t.Run("Reprovides with 'all' strategy when strategy is '' (empty)", func(t *testing.T) { + t.Parallel() + + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "") + }) + + cid := nodes[0].IPFSAddStr(time.Now().String()) + + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + expectNoProviders(t, cid, nodes[1:]...) + + nodes[0].IPFS("routing", "reprovide") + + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) }) - cid := nodes[0].IPFSAddStr(time.Now().String()) + t.Run("Reprovides with 'all' strategy", func(t *testing.T) { + t.Parallel() - nodes = nodes.StartDaemons().Connect() - defer nodes.StopDaemons() - expectNoProviders(t, cid, nodes[1:]...) + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "all") + }) - nodes[0].IPFS("routing", "reprovide") + cid := nodes[0].IPFSAddStr(time.Now().String()) - expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) - }) + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + expectNoProviders(t, cid, nodes[1:]...) - t.Run("Reprovides with 'all' strategy", func(t *testing.T) { - t.Parallel() + nodes[0].IPFS("routing", "reprovide") - nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { - n.SetIPFSConfig("Reprovider.Strategy", "all") + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) }) - cid := nodes[0].IPFSAddStr(time.Now().String()) + t.Run("Reprovides with 'flat' strategy", func(t *testing.T) { + t.Parallel() - nodes = nodes.StartDaemons().Connect() - defer nodes.StopDaemons() - expectNoProviders(t, cid, nodes[1:]...) + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "flat") + }) - nodes[0].IPFS("routing", "reprovide") + cid := nodes[0].IPFSAddStr(time.Now().String()) - expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) - }) + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + expectNoProviders(t, cid, nodes[1:]...) - t.Run("Reprovides with 'flat' strategy", func(t *testing.T) { - t.Parallel() + nodes[0].IPFS("routing", "reprovide") - nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { - n.SetIPFSConfig("Reprovider.Strategy", "flat") + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) }) - cid := nodes[0].IPFSAddStr(time.Now().String()) + t.Run("Reprovides with 'pinned' strategy", func(t *testing.T) { + t.Parallel() - nodes = nodes.StartDaemons().Connect() - defer nodes.StopDaemons() - expectNoProviders(t, cid, nodes[1:]...) + foo := random.Bytes(1000) + bar := random.Bytes(1000) - nodes[0].IPFS("routing", "reprovide") + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "pinned") + }) - expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) - }) + // Add a pin while offline so it cannot be provided + cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w") - t.Run("Reprovides with 'pinned' strategy", func(t *testing.T) { - t.Parallel() + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() - foo := random.Bytes(1000) - bar := random.Bytes(1000) + // Add content without pinning while daemon line + cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo), "--pin=false") + cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false") - nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { - n.SetIPFSConfig("Reprovider.Strategy", "pinned") + // Nothing should have been provided. The pin was offline, and + // the others should not be provided per the strategy. + expectNoProviders(t, cidFoo, nodes[1:]...) + expectNoProviders(t, cidBar, nodes[1:]...) + expectNoProviders(t, cidBarDir, nodes[1:]...) + + nodes[0].IPFS("routing", "reprovide") + + // cidFoo is not pinned so should not be provided. + expectNoProviders(t, cidFoo, nodes[1:]...) + // cidBar gets provided by being a child from cidBarDir even though we added with pin=false. + expectProviders(t, cidBar, nodes[0].PeerID().String(), nodes[1:]...) + expectProviders(t, cidBarDir, nodes[0].PeerID().String(), nodes[1:]...) }) - // Add a pin while offline so it cannot be provided - cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w") + t.Run("Reprovides with 'roots' strategy", func(t *testing.T) { + t.Parallel() - nodes = nodes.StartDaemons().Connect() - defer nodes.StopDaemons() + foo := random.Bytes(1000) + bar := random.Bytes(1000) - // Add content without pinning while daemon line - cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo), "--pin=false") - cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false") + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "roots") + }) + n0pid := nodes[0].PeerID().String() - // Nothing should have been provided. The pin was offline, and - // the others should not be provided per the strategy. - expectNoProviders(t, cidFoo, nodes[1:]...) - expectNoProviders(t, cidBar, nodes[1:]...) - expectNoProviders(t, cidBarDir, nodes[1:]...) + // Add a pin. Only root should get pinned but not provided + // because node not started + cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w") - nodes[0].IPFS("routing", "reprovide") + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() - // cidFoo is not pinned so should not be provided. - expectNoProviders(t, cidFoo, nodes[1:]...) - // cidBar gets provided by being a child from cidBarDir even though we added with pin=false. - expectProviders(t, cidBar, nodes[0].PeerID().String(), nodes[1:]...) - expectProviders(t, cidBarDir, nodes[0].PeerID().String(), nodes[1:]...) - }) + cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo)) + cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false") - t.Run("Reprovides with 'roots' strategy", func(t *testing.T) { - t.Parallel() + // cidFoo will get provided per the strategy but cidBar will not. + expectProviders(t, cidFoo, n0pid, nodes[1:]...) + expectNoProviders(t, cidBar, nodes[1:]...) - foo := random.Bytes(1000) - bar := random.Bytes(1000) + nodes[0].IPFS("routing", "reprovide") - nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { - n.SetIPFSConfig("Reprovider.Strategy", "roots") + expectProviders(t, cidFoo, n0pid, nodes[1:]...) + expectNoProviders(t, cidBar, nodes[1:]...) + expectProviders(t, cidBarDir, n0pid, nodes[1:]...) }) - n0pid := nodes[0].PeerID().String() - // Add a pin. Only root should get pinned but not provided - // because node not started - cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w") + t.Run("Reprovides with 'mfs' strategy", func(t *testing.T) { + t.Parallel() - nodes = nodes.StartDaemons().Connect() - defer nodes.StopDaemons() + bar := random.Bytes(1000) - cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo)) - cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false") + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "mfs") + }) + n0pid := nodes[0].PeerID().String() - // cidFoo will get provided per the strategy but cidBar will not. - expectProviders(t, cidFoo, n0pid, nodes[1:]...) - expectNoProviders(t, cidBar, nodes[1:]...) + // add something and lets put it in MFS + cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false", "-Q") + nodes[0].IPFS("files", "cp", "/ipfs/"+cidBar, "/myfile") - nodes[0].IPFS("routing", "reprovide") + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() - expectProviders(t, cidFoo, n0pid, nodes[1:]...) - expectNoProviders(t, cidBar, nodes[1:]...) - expectProviders(t, cidBarDir, n0pid, nodes[1:]...) - }) + // cidBar is in MFS but not provided + expectNoProviders(t, cidBar, nodes[1:]...) - t.Run("Reprovides with 'mfs' strategy", func(t *testing.T) { - t.Parallel() + nodes[0].IPFS("routing", "reprovide") - bar := random.Bytes(1000) - - nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { - n.SetIPFSConfig("Reprovider.Strategy", "mfs") + // And now is provided + expectProviders(t, cidBar, n0pid, nodes[1:]...) }) - n0pid := nodes[0].PeerID().String() - // add something and lets put it in MFS - cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false", "-Q") - nodes[0].IPFS("files", "cp", "/ipfs/"+cidBar, "/myfile") + t.Run("Reprovides with 'pinned+mfs' strategy", func(t *testing.T) { + t.Parallel() - nodes = nodes.StartDaemons().Connect() - defer nodes.StopDaemons() + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "pinned+mfs") + }) + n0pid := nodes[0].PeerID().String() - // cidBar is in MFS but not provided - expectNoProviders(t, cidBar, nodes[1:]...) + // Add a pinned CID (should be provided) + cidPinned := nodes[0].IPFSAddStr("pinned content", "--pin=true") + // Add a CID to MFS (should be provided) + cidMFS := nodes[0].IPFSAddStr("mfs content") + nodes[0].IPFS("files", "cp", "/ipfs/"+cidMFS, "/myfile") + // Add a CID that is neither pinned nor in MFS (should not be provided) + cidNeither := nodes[0].IPFSAddStr("neither content", "--pin=false") - nodes[0].IPFS("routing", "reprovide") + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() - // And now is provided - expectProviders(t, cidBar, n0pid, nodes[1:]...) - }) + // Trigger reprovide + nodes[0].IPFS("routing", "reprovide") - t.Run("Reprovides with 'pinned+mfs' strategy", func(t *testing.T) { - t.Parallel() - - nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { - n.SetIPFSConfig("Reprovider.Strategy", "pinned+mfs") + // Check that pinned CID is provided + expectProviders(t, cidPinned, n0pid, nodes[1:]...) + // Check that MFS CID is provided + expectProviders(t, cidMFS, n0pid, nodes[1:]...) + // Check that neither CID is not provided + expectNoProviders(t, cidNeither, nodes[1:]...) }) - n0pid := nodes[0].PeerID().String() - - // Add a pinned CID (should be provided) - cidPinned := nodes[0].IPFSAddStr("pinned content", "--pin=true") - // Add a CID to MFS (should be provided) - cidMFS := nodes[0].IPFSAddStr("mfs content") - nodes[0].IPFS("files", "cp", "/ipfs/"+cidMFS, "/myfile") - // Add a CID that is neither pinned nor in MFS (should not be provided) - cidNeither := nodes[0].IPFSAddStr("neither content", "--pin=false") - - nodes = nodes.StartDaemons().Connect() - defer nodes.StopDaemons() - - // Trigger reprovide - nodes[0].IPFS("routing", "reprovide") - - // Check that pinned CID is provided - expectProviders(t, cidPinned, n0pid, nodes[1:]...) - // Check that MFS CID is provided - expectProviders(t, cidMFS, n0pid, nodes[1:]...) - // Check that neither CID is not provided - expectNoProviders(t, cidNeither, nodes[1:]...) - }) + } t.Run("provide clear command removes items from provide queue", func(t *testing.T) { t.Parallel() @@ -542,5 +560,38 @@ func TestProvider(t *testing.T) { // Should be a non-negative integer (0 or positive) assert.GreaterOrEqual(t, result, 0) }) - +} + +func TestProvider(t *testing.T) { + t.Parallel() + + variants := []struct { + name string + reprovide bool + apply cfgApplier + }{ + { + name: "BurstProvider", + reprovide: true, + apply: func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Sweep.Enabled", false) + }, + }, + { + name: "SweepingProvider", + reprovide: false, + apply: func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Sweep.Enabled", true) + n.SetIPFSConfig("Reprovider.Sweep.MaxWorkers", 6) // TODO: necessary otherswise Provide pinned+mfs fails. not sure why + }, + }, + } + + for _, v := range variants { + v := v // capture + t.Run(v.name, func(t *testing.T) { + // t.Parallel() + runProviderSuite(t, v.reprovide, v.apply) + }) + } } diff --git a/test/dependencies/go.mod b/test/dependencies/go.mod index 1eddf6d52..c7d467595 100644 --- a/test/dependencies/go.mod +++ b/test/dependencies/go.mod @@ -181,7 +181,7 @@ require ( github.com/libp2p/go-flow-metrics v0.3.0 // indirect github.com/libp2p/go-libp2p v0.43.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect - github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba // indirect + github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 // indirect github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect github.com/libp2p/go-libp2p-record v0.3.1 // indirect github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect diff --git a/test/dependencies/go.sum b/test/dependencies/go.sum index a8fea66ec..fe3e00e08 100644 --- a/test/dependencies/go.sum +++ b/test/dependencies/go.sum @@ -462,8 +462,8 @@ github.com/libp2p/go-libp2p v0.43.0 h1:b2bg2cRNmY4HpLK8VHYQXLX2d3iND95OjodLFymvq github.com/libp2p/go-libp2p v0.43.0/go.mod h1:IiSqAXDyP2sWH+J2gs43pNmB/y4FOi2XQPbsb+8qvzc= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= -github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba h1:Q+VJpniUaZUhGBAh2MIY3NWZB6deQ0OmCjpPw3iJjOk= -github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk= +github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 h1:LsAanglHrVdoOFQRz2G8VoIMqPHACxhvNGCJoix0G08= +github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk= github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s= github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4= github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=