extended tests

This commit is contained in:
guillaumemichel 2025-08-21 16:19:05 +02:00
parent c9fbe62069
commit dc2bb67d22
No known key found for this signature in database
GPG Key ID: 612745DB2E6D0E15
8 changed files with 317 additions and 250 deletions

View File

@ -44,6 +44,7 @@ type NoopProvider struct{}
func (r *NoopProvider) StartProviding(bool, ...mh.Multihash) {}
func (r *NoopProvider) ProvideOnce(...mh.Multihash) {}
func (r *NoopProvider) Clear() int { return 0 }
func (r *NoopProvider) RefreshSchedule() {}
// DHTProvider is an interface for providing keys to a DHT swarm. It holds a
// state of keys to be advertised, and is responsible for periodically
@ -71,6 +72,7 @@ type DHTProvider interface {
// Clear clears the all the keys from the provide queue and returns
// the number of keys that were cleared.
Clear() int
RefreshSchedule()
}
var (
@ -119,62 +121,65 @@ func (r *BurstProvider) Clear() int {
return r.System.Clear()
}
func (r *BurstProvider) RefreshSchedule() {}
// BurstProviderOpt creates a BurstProvider to be used as provider in the
// IpfsNode
func BurstProviderOpt(reprovideInterval time.Duration, strategy string, acceleratedDHTClient bool, provideWorkerCount int) fx.Option {
system := fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, repo repo.Repo) (provider.System, DHTProvider, error) {
// Initialize provider.System first, before pinner/blockstore/etc.
// The KeyChanFunc will be set later via SetKeyProvider() once we have
// created the pinner, blockstore and other dependencies.
opts := []provider.Option{
provider.Online(cr),
provider.ReproviderInterval(reprovideInterval),
provider.ProvideWorkerCount(provideWorkerCount),
}
if !acceleratedDHTClient && reprovideInterval > 0 {
// The estimation kinda suck if you are running with accelerated DHT client,
// given this message is just trying to push people to use the acceleratedDHTClient
// let's not report on through if it's in use
opts = append(opts,
provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool {
avgProvideSpeed := duration / time.Duration(keysProvided)
count := uint64(keysProvided)
system := fx.Provide(
fx.Annotate(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, repo repo.Repo) (*BurstProvider, error) {
// Initialize provider.System first, before pinner/blockstore/etc.
// The KeyChanFunc will be set later via SetKeyProvider() once we have
// created the pinner, blockstore and other dependencies.
opts := []provider.Option{
provider.Online(cr),
provider.ReproviderInterval(reprovideInterval),
provider.ProvideWorkerCount(provideWorkerCount),
}
if !acceleratedDHTClient && reprovideInterval > 0 {
// The estimation kinda suck if you are running with accelerated DHT client,
// given this message is just trying to push people to use the acceleratedDHTClient
// let's not report on through if it's in use
opts = append(opts,
provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool {
avgProvideSpeed := duration / time.Duration(keysProvided)
count := uint64(keysProvided)
if !reprovide || !complete {
// We don't know how many CIDs we have to provide, try to fetch it from the blockstore.
// But don't try for too long as this might be very expensive if you have a huge datastore.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
if !reprovide || !complete {
// We don't know how many CIDs we have to provide, try to fetch it from the blockstore.
// But don't try for too long as this might be very expensive if you have a huge datastore.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
// FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup.
// Note: talk to datastore directly, as to not depend on Blockstore here.
qr, err := repo.Datastore().Query(ctx, query.Query{
Prefix: blockstore.BlockPrefix.String(),
KeysOnly: true,
})
if err != nil {
logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err)
return false
}
defer qr.Close()
count = 0
countLoop:
for {
select {
case _, ok := <-qr.Next():
if !ok {
break countLoop
}
count++
case <-ctx.Done():
// really big blockstore mode
// FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup.
// Note: talk to datastore directly, as to not depend on Blockstore here.
qr, err := repo.Datastore().Query(ctx, query.Query{
Prefix: blockstore.BlockPrefix.String(),
KeysOnly: true,
})
if err != nil {
logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err)
return false
}
defer qr.Close()
count = 0
countLoop:
for {
select {
case _, ok := <-qr.Next():
if !ok {
break countLoop
}
count++
case <-ctx.Done():
// really big blockstore mode
// how many blocks would be in a 10TiB blockstore with 128KiB blocks.
const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024)
// How long per block that lasts us.
expectedProvideSpeed := reprovideInterval / probableBigBlockstore
if avgProvideSpeed > expectedProvideSpeed {
logger.Errorf(`
// how many blocks would be in a 10TiB blockstore with 128KiB blocks.
const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024)
// How long per block that lasts us.
expectedProvideSpeed := reprovideInterval / probableBigBlockstore
if avgProvideSpeed > expectedProvideSpeed {
logger.Errorf(`
🔔🔔🔔 YOU MAY BE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔
Your system might be struggling to keep up with DHT reprovides!
@ -189,21 +194,21 @@ size of 10TiB, it would take %v to provide the complete set.
💡 Consider enabling the Accelerated DHT to enhance your system performance. See:
https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient`,
keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore, reprovideInterval)
return false
keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore, reprovideInterval)
return false
}
}
}
}
}
// How long per block that lasts us.
expectedProvideSpeed := reprovideInterval
if count > 0 {
expectedProvideSpeed = reprovideInterval / time.Duration(count)
}
// How long per block that lasts us.
expectedProvideSpeed := reprovideInterval
if count > 0 {
expectedProvideSpeed = reprovideInterval / time.Duration(count)
}
if avgProvideSpeed > expectedProvideSpeed {
logger.Errorf(`
if avgProvideSpeed > expectedProvideSpeed {
logger.Errorf(`
🔔🔔🔔 YOU ARE FALLING BEHIND DHT REPROVIDES! 🔔🔔🔔
Your system is struggling to keep up with DHT reprovides!
@ -216,29 +221,31 @@ We observed that you recently provided %d keys at an average rate of %v per key.
💡 Consider enabling the Accelerated DHT to enhance your reprovide throughput. See:
https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient`,
keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count), reprovideInterval)
}
return false
}, sampledBatchSize))
}
keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count), reprovideInterval)
}
return false
}, sampledBatchSize))
}
sys, err := provider.New(repo.Datastore(), opts...)
if err != nil {
return nil, nil, err
}
sys, err := provider.New(repo.Datastore(), opts...)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
handleStrategyChange(strategy, sys, repo.Datastore())
prov := &BurstProvider{sys}
handleStrategyChange(strategy, prov, repo.Datastore())
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
prov := &BurstProvider{sys}
return prov, prov, nil
})
return prov, nil
},
fx.As(new(provider.System)),
fx.As(new(DHTProvider)),
),
)
setKeyProvider := fx.Invoke(func(lc fx.Lifecycle, system provider.System, keyProvider provider.KeyChanFunc) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
@ -259,26 +266,28 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtcli
}
func SweepingProvider(cfg *config.Config) fx.Option {
keyStore := fx.Provide(func(repo repo.Repo) (*rds.KeyStore, error) {
return rds.NewKeyStore(repo.Datastore(),
type providerInput struct {
fx.In
DHT routing.Routing `name:"dhtc"`
Repo repo.Repo
}
fmt.Println("SweepingProvider")
sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *rds.KeyStore, error) {
keyStore, err := rds.NewKeyStore(in.Repo.Datastore(),
rds.WithPrefixBits(10),
rds.WithDatastorePrefix("/reprovider/mhs"),
rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))),
rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize))),
)
})
if err != nil {
return nil, nil, err
}
type input struct {
fx.In
DHT routing.Routing `name:"dhtc"`
KeyStore *rds.KeyStore
}
sweepingReprovider := fx.Provide(func(in input) (DHTProvider, error) {
switch dht := in.DHT.(type) {
case *dual.DHT:
if dht != nil {
prov, err := ddhtprovider.New(dht,
ddhtprovider.WithKeyStore(in.KeyStore),
ddhtprovider.WithKeyStore(keyStore),
ddhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)),
ddhtprovider.WithMaxReprovideDelay(time.Hour),
@ -289,16 +298,18 @@ func SweepingProvider(cfg *config.Config) fx.Option {
ddhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Reprovider.Sweep.MaxProvideConnsPerWorker.WithDefault(config.DefaultReproviderSweepMaxProvideConnsPerWorker))),
)
if err != nil {
return nil, err
return nil, nil, err
}
// Add keys from the KeyStore to the schedule
prov.RefreshSchedule()
return prov, nil
// prov.RefreshSchedule()
return prov, keyStore, nil
// _ = prov
// return &NoopProvider{}, nil
}
case *fullrt.FullRT:
if dht != nil {
prov, err := dhtprovider.New(
dhtprovider.WithKeyStore(in.KeyStore),
dhtprovider.WithKeyStore(keyStore),
dhtprovider.WithRouter(dht),
dhtprovider.WithMessageSender(dht.MessageSender()),
@ -320,42 +331,47 @@ func SweepingProvider(cfg *config.Config) fx.Option {
dhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Reprovider.Sweep.MaxProvideConnsPerWorker.WithDefault(config.DefaultReproviderSweepMaxProvideConnsPerWorker))),
)
if err != nil {
return nil, err
return nil, nil, err
}
// Add keys from the KeyStore to the schedule
prov.RefreshSchedule()
return prov, nil
// prov.RefreshSchedule()
return prov, keyStore, nil
}
}
return &NoopProvider{}, nil
return &NoopProvider{}, keyStore, nil
})
initKeyStore := fx.Invoke(func(lc fx.Lifecycle, keyStore *rds.KeyStore, keyProvider provider.KeyChanFunc) {
type keystoreInput struct {
fx.In
Provider DHTProvider
KeyStore *rds.KeyStore
KeyProvider provider.KeyChanFunc
}
initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
// TODO: Set GCFunc to keystore
ch, err := keyProvider(ctx)
ch, err := in.KeyProvider(ctx)
if err != nil {
return err
}
return keyStore.ResetCids(ctx, ch)
err = in.KeyStore.ResetCids(ctx, ch)
if err != nil {
return err
}
in.Provider.RefreshSchedule()
return nil
},
})
})
closeKeyStore := fx.Invoke(func(lc fx.Lifecycle, keyStore *rds.KeyStore) {
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return keyStore.Close()
return in.KeyStore.Close()
},
})
})
return fx.Options(
keyStore,
// keyStore,
sweepingReprovider,
initKeyStore,
closeKeyStore,
)
}
@ -516,7 +532,7 @@ func persistStrategy(ctx context.Context, strategy string, ds datastore.Datastor
// Strategy change detection: when the reproviding strategy changes,
// we clear the provide queue to avoid unexpected behavior from mixing
// strategies. This ensures a clean transition between different providing modes.
func handleStrategyChange(strategy string, provider provider.System, ds datastore.Datastore) {
func handleStrategyChange(strategy string, provider DHTProvider, ds datastore.Datastore) {
ctx := context.Background()
previous, changed, err := detectStrategyChange(ctx, strategy, ds)

View File

@ -113,7 +113,7 @@ require (
github.com/libp2p/go-doh-resolver v0.5.0 // indirect
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba // indirect
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.14.2 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect

View File

@ -428,8 +428,8 @@ github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl9
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba h1:Q+VJpniUaZUhGBAh2MIY3NWZB6deQ0OmCjpPw3iJjOk=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 h1:LsAanglHrVdoOFQRz2G8VoIMqPHACxhvNGCJoix0G08=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=

2
go.mod
View File

@ -53,7 +53,7 @@ require (
github.com/libp2p/go-doh-resolver v0.5.0
github.com/libp2p/go-libp2p v0.43.0
github.com/libp2p/go-libp2p-http v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33
github.com/libp2p/go-libp2p-kbucket v0.8.0
github.com/libp2p/go-libp2p-pubsub v0.14.2
github.com/libp2p/go-libp2p-pubsub-router v0.6.0

4
go.sum
View File

@ -514,8 +514,8 @@ github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qk
github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA=
github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc=
github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba h1:Q+VJpniUaZUhGBAh2MIY3NWZB6deQ0OmCjpPw3iJjOk=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 h1:LsAanglHrVdoOFQRz2G8VoIMqPHACxhvNGCJoix0G08=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=

View File

@ -12,17 +12,26 @@ import (
"github.com/stretchr/testify/require"
)
func TestProvider(t *testing.T) {
t.Parallel()
const (
timeStep = 20 * time.Millisecond
timeout = time.Second
)
type cfgApplier func(*harness.Node)
func runProviderSuite(t *testing.T, reprovide bool, apply cfgApplier) {
t.Helper()
initNodes := func(t *testing.T, n int, fn func(n *harness.Node)) harness.Nodes {
nodes := harness.NewT(t).NewNodes(n).Init()
nodes.ForEachPar(apply)
nodes.ForEachPar(fn)
return nodes.StartDaemons().Connect()
}
initNodesWithoutStart := func(t *testing.T, n int, fn func(n *harness.Node)) harness.Nodes {
nodes := harness.NewT(t).NewNodes(n).Init()
nodes.ForEachPar(apply)
nodes.ForEachPar(fn)
return nodes
}
@ -35,9 +44,15 @@ func TestProvider(t *testing.T) {
}
expectProviders := func(t *testing.T, cid, expectedProvider string, nodes ...*harness.Node) {
outerLoop:
for _, node := range nodes {
res := node.IPFS("routing", "findprovs", "-n=1", cid)
require.Equal(t, expectedProvider, res.Stdout.Trimmed())
for i := time.Duration(0); i*timeStep < timeout; i++ {
res := node.IPFS("routing", "findprovs", "-n=1", cid)
if res.Stdout.Trimmed() == expectedProvider {
continue outerLoop
}
}
require.FailNowf(t, "found no providers", "expected a provider for %s", cid)
}
}
@ -272,182 +287,185 @@ func TestProvider(t *testing.T) {
expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...)
})
t.Run("Reprovides with 'all' strategy when strategy is '' (empty)", func(t *testing.T) {
t.Parallel()
if reprovide {
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "")
t.Run("Reprovides with 'all' strategy when strategy is '' (empty)", func(t *testing.T) {
t.Parallel()
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "")
})
cid := nodes[0].IPFSAddStr(time.Now().String())
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
expectNoProviders(t, cid, nodes[1:]...)
nodes[0].IPFS("routing", "reprovide")
expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...)
})
cid := nodes[0].IPFSAddStr(time.Now().String())
t.Run("Reprovides with 'all' strategy", func(t *testing.T) {
t.Parallel()
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
expectNoProviders(t, cid, nodes[1:]...)
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "all")
})
nodes[0].IPFS("routing", "reprovide")
cid := nodes[0].IPFSAddStr(time.Now().String())
expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...)
})
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
expectNoProviders(t, cid, nodes[1:]...)
t.Run("Reprovides with 'all' strategy", func(t *testing.T) {
t.Parallel()
nodes[0].IPFS("routing", "reprovide")
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "all")
expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...)
})
cid := nodes[0].IPFSAddStr(time.Now().String())
t.Run("Reprovides with 'flat' strategy", func(t *testing.T) {
t.Parallel()
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
expectNoProviders(t, cid, nodes[1:]...)
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "flat")
})
nodes[0].IPFS("routing", "reprovide")
cid := nodes[0].IPFSAddStr(time.Now().String())
expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...)
})
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
expectNoProviders(t, cid, nodes[1:]...)
t.Run("Reprovides with 'flat' strategy", func(t *testing.T) {
t.Parallel()
nodes[0].IPFS("routing", "reprovide")
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "flat")
expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...)
})
cid := nodes[0].IPFSAddStr(time.Now().String())
t.Run("Reprovides with 'pinned' strategy", func(t *testing.T) {
t.Parallel()
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
expectNoProviders(t, cid, nodes[1:]...)
foo := random.Bytes(1000)
bar := random.Bytes(1000)
nodes[0].IPFS("routing", "reprovide")
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "pinned")
})
expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...)
})
// Add a pin while offline so it cannot be provided
cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w")
t.Run("Reprovides with 'pinned' strategy", func(t *testing.T) {
t.Parallel()
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
foo := random.Bytes(1000)
bar := random.Bytes(1000)
// Add content without pinning while daemon line
cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo), "--pin=false")
cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false")
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "pinned")
// Nothing should have been provided. The pin was offline, and
// the others should not be provided per the strategy.
expectNoProviders(t, cidFoo, nodes[1:]...)
expectNoProviders(t, cidBar, nodes[1:]...)
expectNoProviders(t, cidBarDir, nodes[1:]...)
nodes[0].IPFS("routing", "reprovide")
// cidFoo is not pinned so should not be provided.
expectNoProviders(t, cidFoo, nodes[1:]...)
// cidBar gets provided by being a child from cidBarDir even though we added with pin=false.
expectProviders(t, cidBar, nodes[0].PeerID().String(), nodes[1:]...)
expectProviders(t, cidBarDir, nodes[0].PeerID().String(), nodes[1:]...)
})
// Add a pin while offline so it cannot be provided
cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w")
t.Run("Reprovides with 'roots' strategy", func(t *testing.T) {
t.Parallel()
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
foo := random.Bytes(1000)
bar := random.Bytes(1000)
// Add content without pinning while daemon line
cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo), "--pin=false")
cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false")
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "roots")
})
n0pid := nodes[0].PeerID().String()
// Nothing should have been provided. The pin was offline, and
// the others should not be provided per the strategy.
expectNoProviders(t, cidFoo, nodes[1:]...)
expectNoProviders(t, cidBar, nodes[1:]...)
expectNoProviders(t, cidBarDir, nodes[1:]...)
// Add a pin. Only root should get pinned but not provided
// because node not started
cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w")
nodes[0].IPFS("routing", "reprovide")
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
// cidFoo is not pinned so should not be provided.
expectNoProviders(t, cidFoo, nodes[1:]...)
// cidBar gets provided by being a child from cidBarDir even though we added with pin=false.
expectProviders(t, cidBar, nodes[0].PeerID().String(), nodes[1:]...)
expectProviders(t, cidBarDir, nodes[0].PeerID().String(), nodes[1:]...)
})
cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo))
cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false")
t.Run("Reprovides with 'roots' strategy", func(t *testing.T) {
t.Parallel()
// cidFoo will get provided per the strategy but cidBar will not.
expectProviders(t, cidFoo, n0pid, nodes[1:]...)
expectNoProviders(t, cidBar, nodes[1:]...)
foo := random.Bytes(1000)
bar := random.Bytes(1000)
nodes[0].IPFS("routing", "reprovide")
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "roots")
expectProviders(t, cidFoo, n0pid, nodes[1:]...)
expectNoProviders(t, cidBar, nodes[1:]...)
expectProviders(t, cidBarDir, n0pid, nodes[1:]...)
})
n0pid := nodes[0].PeerID().String()
// Add a pin. Only root should get pinned but not provided
// because node not started
cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w")
t.Run("Reprovides with 'mfs' strategy", func(t *testing.T) {
t.Parallel()
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
bar := random.Bytes(1000)
cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo))
cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false")
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "mfs")
})
n0pid := nodes[0].PeerID().String()
// cidFoo will get provided per the strategy but cidBar will not.
expectProviders(t, cidFoo, n0pid, nodes[1:]...)
expectNoProviders(t, cidBar, nodes[1:]...)
// add something and lets put it in MFS
cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false", "-Q")
nodes[0].IPFS("files", "cp", "/ipfs/"+cidBar, "/myfile")
nodes[0].IPFS("routing", "reprovide")
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
expectProviders(t, cidFoo, n0pid, nodes[1:]...)
expectNoProviders(t, cidBar, nodes[1:]...)
expectProviders(t, cidBarDir, n0pid, nodes[1:]...)
})
// cidBar is in MFS but not provided
expectNoProviders(t, cidBar, nodes[1:]...)
t.Run("Reprovides with 'mfs' strategy", func(t *testing.T) {
t.Parallel()
nodes[0].IPFS("routing", "reprovide")
bar := random.Bytes(1000)
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "mfs")
// And now is provided
expectProviders(t, cidBar, n0pid, nodes[1:]...)
})
n0pid := nodes[0].PeerID().String()
// add something and lets put it in MFS
cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false", "-Q")
nodes[0].IPFS("files", "cp", "/ipfs/"+cidBar, "/myfile")
t.Run("Reprovides with 'pinned+mfs' strategy", func(t *testing.T) {
t.Parallel()
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "pinned+mfs")
})
n0pid := nodes[0].PeerID().String()
// cidBar is in MFS but not provided
expectNoProviders(t, cidBar, nodes[1:]...)
// Add a pinned CID (should be provided)
cidPinned := nodes[0].IPFSAddStr("pinned content", "--pin=true")
// Add a CID to MFS (should be provided)
cidMFS := nodes[0].IPFSAddStr("mfs content")
nodes[0].IPFS("files", "cp", "/ipfs/"+cidMFS, "/myfile")
// Add a CID that is neither pinned nor in MFS (should not be provided)
cidNeither := nodes[0].IPFSAddStr("neither content", "--pin=false")
nodes[0].IPFS("routing", "reprovide")
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
// And now is provided
expectProviders(t, cidBar, n0pid, nodes[1:]...)
})
// Trigger reprovide
nodes[0].IPFS("routing", "reprovide")
t.Run("Reprovides with 'pinned+mfs' strategy", func(t *testing.T) {
t.Parallel()
nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Strategy", "pinned+mfs")
// Check that pinned CID is provided
expectProviders(t, cidPinned, n0pid, nodes[1:]...)
// Check that MFS CID is provided
expectProviders(t, cidMFS, n0pid, nodes[1:]...)
// Check that neither CID is not provided
expectNoProviders(t, cidNeither, nodes[1:]...)
})
n0pid := nodes[0].PeerID().String()
// Add a pinned CID (should be provided)
cidPinned := nodes[0].IPFSAddStr("pinned content", "--pin=true")
// Add a CID to MFS (should be provided)
cidMFS := nodes[0].IPFSAddStr("mfs content")
nodes[0].IPFS("files", "cp", "/ipfs/"+cidMFS, "/myfile")
// Add a CID that is neither pinned nor in MFS (should not be provided)
cidNeither := nodes[0].IPFSAddStr("neither content", "--pin=false")
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
// Trigger reprovide
nodes[0].IPFS("routing", "reprovide")
// Check that pinned CID is provided
expectProviders(t, cidPinned, n0pid, nodes[1:]...)
// Check that MFS CID is provided
expectProviders(t, cidMFS, n0pid, nodes[1:]...)
// Check that neither CID is not provided
expectNoProviders(t, cidNeither, nodes[1:]...)
})
}
t.Run("provide clear command removes items from provide queue", func(t *testing.T) {
t.Parallel()
@ -542,5 +560,38 @@ func TestProvider(t *testing.T) {
// Should be a non-negative integer (0 or positive)
assert.GreaterOrEqual(t, result, 0)
})
}
func TestProvider(t *testing.T) {
t.Parallel()
variants := []struct {
name string
reprovide bool
apply cfgApplier
}{
{
name: "BurstProvider",
reprovide: true,
apply: func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Sweep.Enabled", false)
},
},
{
name: "SweepingProvider",
reprovide: false,
apply: func(n *harness.Node) {
n.SetIPFSConfig("Reprovider.Sweep.Enabled", true)
n.SetIPFSConfig("Reprovider.Sweep.MaxWorkers", 6) // TODO: necessary otherswise Provide pinned+mfs fails. not sure why
},
},
}
for _, v := range variants {
v := v // capture
t.Run(v.name, func(t *testing.T) {
// t.Parallel()
runProviderSuite(t, v.reprovide, v.apply)
})
}
}

View File

@ -181,7 +181,7 @@ require (
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p v0.43.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba // indirect
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-record v0.3.1 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect

View File

@ -462,8 +462,8 @@ github.com/libp2p/go-libp2p v0.43.0 h1:b2bg2cRNmY4HpLK8VHYQXLX2d3iND95OjodLFymvq
github.com/libp2p/go-libp2p v0.43.0/go.mod h1:IiSqAXDyP2sWH+J2gs43pNmB/y4FOi2XQPbsb+8qvzc=
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba h1:Q+VJpniUaZUhGBAh2MIY3NWZB6deQ0OmCjpPw3iJjOk=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250820075604-3ffe76faf0ba/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33 h1:LsAanglHrVdoOFQRz2G8VoIMqPHACxhvNGCJoix0G08=
github.com/libp2p/go-libp2p-kad-dht v0.34.1-0.20250821124929-f0f63c94dc33/go.mod h1:cQeuJNRsh+FYF6+oFWgg7HQQ7YZ10pYnY9reAjyXEGk=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=
github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=