From 0dbf15791952dbbc5cb9aeb48ded1f2d5547ce77 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Sat, 23 Aug 2025 00:02:18 +0200 Subject: [PATCH] restore dual dht provider --- core/node/provider.go | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/core/node/provider.go b/core/node/provider.go index 8ca9eb1bc..93115dac8 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -289,6 +289,16 @@ func SweepingProvider(cfg *config.Config) fx.Option { Repo repo.Repo } sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *rds.KeyStore, error) { + keyStore, err := rds.NewKeyStore(in.Repo.Datastore(), + rds.WithPrefixBits(10), + rds.WithDatastorePrefix("/reprovider/keystore"), + rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))), + rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))), + ) + if err != nil { + return &NoopProvider{}, nil, err + } + var impl dhtImpl switch inDht := in.DHT.(type) { case *dht.IpfsDHT: @@ -296,8 +306,23 @@ func SweepingProvider(cfg *config.Config) fx.Option { impl = inDht } case *dual.DHT: - if inDht != nil && inDht.WAN != nil { - impl = inDht.WAN + if inDht != nil { + prov, err := ddhtprovider.New(inDht, + ddhtprovider.WithKeyStore(keyStore), + + ddhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)), + ddhtprovider.WithMaxReprovideDelay(time.Hour), + + ddhtprovider.WithMaxWorkers(int(cfg.Reprovider.Sweep.MaxWorkers.WithDefault(config.DefaultReproviderSweepMaxWorkers))), + ddhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Reprovider.Sweep.DedicatedPeriodicWorkers.WithDefault(config.DefaultReproviderSweepDedicatedPeriodicWorkers))), + ddhtprovider.WithDedicatedBurstWorkers(int(cfg.Reprovider.Sweep.DedicatedBurstWorkers.WithDefault(config.DefaultReproviderSweepDedicatedBurstWorkers))), + ddhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Reprovider.Sweep.MaxProvideConnsPerWorker.WithDefault(config.DefaultReproviderSweepMaxProvideConnsPerWorker))), + ) + if err != nil { + return nil, nil, err + } + _ = prov + return prov, keyStore, nil } case *fullrt.FullRT: if inDht != nil { @@ -308,15 +333,6 @@ func SweepingProvider(cfg *config.Config) fx.Option { return &NoopProvider{}, nil, errors.New("no valid DHT available for providing") } - keyStore, err := rds.NewKeyStore(in.Repo.Datastore(), - rds.WithPrefixBits(10), - rds.WithDatastorePrefix("/reprovider/keystore"), - rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))), - rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))), - ) - if err != nil { - return &NoopProvider{}, nil, err - } var selfAddrsFunc func() []ma.Multiaddr if imlpFilter, ok := impl.(addrsFilter); ok { selfAddrsFunc = imlpFilter.FilteredAddrs