mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
Some checks are pending
CodeQL / codeql (push) Waiting to run
Docker Check / lint (push) Waiting to run
Docker Check / build (push) Waiting to run
Gateway Conformance / gateway-conformance (push) Waiting to run
Gateway Conformance / gateway-conformance-libp2p-experiment (push) Waiting to run
Go Build / go-build (push) Waiting to run
Go Check / go-check (push) Waiting to run
Go Lint / go-lint (push) Waiting to run
Go Test / unit-tests (push) Waiting to run
Go Test / cli-tests (push) Waiting to run
Go Test / example-tests (push) Waiting to run
Interop / interop-prep (push) Waiting to run
Interop / helia-interop (push) Blocked by required conditions
Interop / ipfs-webui (push) Blocked by required conditions
Sharness / sharness-test (push) Waiting to run
Spell Check / spellcheck (push) Waiting to run
* feat(provider): log fullrt crawl * fix(provider): improve AcceleratedDHTClient log messages use config option name instead of internal "fullrt" jargon, align terminology with daemon startup message, add time estimate --------- Co-authored-by: Marcin Rataj <lidel@lidel.org>
956 lines
34 KiB
Go
956 lines
34 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/ipfs/boxo/blockstore"
|
|
"github.com/ipfs/boxo/fetcher"
|
|
"github.com/ipfs/boxo/mfs"
|
|
pin "github.com/ipfs/boxo/pinning/pinner"
|
|
"github.com/ipfs/boxo/pinning/pinner/dspinner"
|
|
"github.com/ipfs/boxo/provider"
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-datastore"
|
|
"github.com/ipfs/go-datastore/namespace"
|
|
"github.com/ipfs/go-datastore/query"
|
|
log "github.com/ipfs/go-log/v2"
|
|
"github.com/ipfs/kubo/config"
|
|
"github.com/ipfs/kubo/repo"
|
|
irouting "github.com/ipfs/kubo/routing"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
"github.com/libp2p/go-libp2p-kad-dht/amino"
|
|
"github.com/libp2p/go-libp2p-kad-dht/dual"
|
|
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
|
|
dht_pb "github.com/libp2p/go-libp2p-kad-dht/pb"
|
|
dhtprovider "github.com/libp2p/go-libp2p-kad-dht/provider"
|
|
"github.com/libp2p/go-libp2p-kad-dht/provider/buffered"
|
|
ddhtprovider "github.com/libp2p/go-libp2p-kad-dht/provider/dual"
|
|
"github.com/libp2p/go-libp2p-kad-dht/provider/keystore"
|
|
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/routing"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
mh "github.com/multiformats/go-multihash"
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
const (
|
|
// The size of a batch that will be used for calculating average announcement
|
|
// time per CID, inside of boxo/provider.ThroughputReport
|
|
// and in 'ipfs stats provide' report.
|
|
// Used when Provide.DHT.SweepEnabled=false
|
|
sampledBatchSize = 1000
|
|
|
|
// Datastore key used to store previous reprovide strategy.
|
|
reprovideStrategyKey = "/reprovideStrategy"
|
|
|
|
// Datastore namespace prefix for provider data.
|
|
providerDatastorePrefix = "provider"
|
|
// Datastore path for the provider keystore.
|
|
keystoreDatastorePath = "keystore"
|
|
)
|
|
|
|
var errAcceleratedDHTNotReady = errors.New("AcceleratedDHTClient: routing table not ready")
|
|
|
|
// Interval between reprovide queue monitoring checks for slow reprovide alerts.
|
|
// Used when Provide.DHT.SweepEnabled=true
|
|
const reprovideAlertPollInterval = 15 * time.Minute
|
|
|
|
// Number of consecutive polling intervals with sustained queue growth before
|
|
// triggering a slow reprovide alert (3 intervals = 45 minutes).
|
|
// Used when Provide.DHT.SweepEnabled=true
|
|
const consecutiveAlertsThreshold = 3
|
|
|
|
// DHTProvider is an interface for providing keys to a DHT swarm. It holds a
|
|
// state of keys to be advertised, and is responsible for periodically
|
|
// publishing provider records for these keys to the DHT swarm before the
|
|
// records expire.
|
|
type DHTProvider interface {
|
|
// StartProviding ensures keys are periodically advertised to the DHT swarm.
|
|
//
|
|
// If the `keys` aren't currently being reprovided, they are added to the
|
|
// queue to be provided to the DHT swarm as soon as possible, and scheduled
|
|
// to be reprovided periodically. If `force` is set to true, all keys are
|
|
// provided to the DHT swarm, regardless of whether they were already being
|
|
// reprovided in the past. `keys` keep being reprovided until `StopProviding`
|
|
// is called.
|
|
//
|
|
// This operation is asynchronous, it returns as soon as the `keys` are added
|
|
// to the provide queue, and provides happens asynchronously.
|
|
//
|
|
// Returns an error if the keys couldn't be added to the provide queue. This
|
|
// can happen if the provider is closed or if the node is currently Offline
|
|
// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
|
|
// The schedule and provide queue depend on the network size, hence recent
|
|
// network connectivity is essential.
|
|
StartProviding(force bool, keys ...mh.Multihash) error
|
|
// ProvideOnce sends provider records for the specified keys to the DHT swarm
|
|
// only once. It does not automatically reprovide those keys afterward.
|
|
//
|
|
// Add the supplied multihashes to the provide queue, and return immediately.
|
|
// The provide operation happens asynchronously.
|
|
//
|
|
// Returns an error if the keys couldn't be added to the provide queue. This
|
|
// can happen if the provider is closed or if the node is currently Offline
|
|
// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
|
|
// The schedule and provide queue depend on the network size, hence recent
|
|
// network connectivity is essential.
|
|
ProvideOnce(keys ...mh.Multihash) error
|
|
// Clear clears the all the keys from the provide queue and returns the number
|
|
// of keys that were cleared.
|
|
//
|
|
// The keys are not deleted from the keystore, so they will continue to be
|
|
// reprovided as scheduled.
|
|
Clear() int
|
|
// RefreshSchedule scans the Keystore for any keys that are not currently
|
|
// scheduled for reproviding. If such keys are found, it schedules their
|
|
// associated keyspace region to be reprovided.
|
|
//
|
|
// This function doesn't remove prefixes that have no keys from the schedule.
|
|
// This is done automatically during the reprovide operation if a region has no
|
|
// keys.
|
|
//
|
|
// Returns an error if the provider is closed or if the node is currently
|
|
// Offline (either never bootstrapped, or disconnected since more than
|
|
// `OfflineDelay`). The schedule depends on the network size, hence recent
|
|
// network connectivity is essential.
|
|
RefreshSchedule() error
|
|
Close() error
|
|
}
|
|
|
|
var (
|
|
_ DHTProvider = &ddhtprovider.SweepingProvider{}
|
|
_ DHTProvider = &dhtprovider.SweepingProvider{}
|
|
_ DHTProvider = &NoopProvider{}
|
|
_ DHTProvider = &LegacyProvider{}
|
|
)
|
|
|
|
// NoopProvider is a no-operation provider implementation that does nothing.
|
|
// It is used when providing is disabled or when no DHT is available.
|
|
// All methods return successfully without performing any actual operations.
|
|
type NoopProvider struct{}
|
|
|
|
func (r *NoopProvider) StartProviding(bool, ...mh.Multihash) error { return nil }
|
|
func (r *NoopProvider) ProvideOnce(...mh.Multihash) error { return nil }
|
|
func (r *NoopProvider) Clear() int { return 0 }
|
|
func (r *NoopProvider) RefreshSchedule() error { return nil }
|
|
func (r *NoopProvider) Close() error { return nil }
|
|
|
|
// LegacyProvider is a wrapper around the boxo/provider.System that implements
|
|
// the DHTProvider interface. This provider manages reprovides using a burst
|
|
// strategy where it sequentially reprovides all keys at once during each
|
|
// reprovide interval, rather than spreading the load over time.
|
|
//
|
|
// This is the legacy provider implementation that can cause resource spikes
|
|
// during reprovide operations. For more efficient providing, consider using
|
|
// the SweepingProvider which spreads the load over the reprovide interval.
|
|
type LegacyProvider struct {
|
|
provider.System
|
|
}
|
|
|
|
func (r *LegacyProvider) StartProviding(force bool, keys ...mh.Multihash) error {
|
|
return r.ProvideOnce(keys...)
|
|
}
|
|
|
|
func (r *LegacyProvider) ProvideOnce(keys ...mh.Multihash) error {
|
|
if many, ok := r.System.(routinghelpers.ProvideManyRouter); ok {
|
|
return many.ProvideMany(context.Background(), keys)
|
|
}
|
|
|
|
for _, k := range keys {
|
|
if err := r.Provide(context.Background(), cid.NewCidV1(cid.Raw, k), true); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *LegacyProvider) Clear() int {
|
|
return r.System.Clear()
|
|
}
|
|
|
|
func (r *LegacyProvider) RefreshSchedule() error { return nil }
|
|
|
|
// LegacyProviderOpt creates a LegacyProvider to be used as provider in the
|
|
// IpfsNode
|
|
func LegacyProviderOpt(reprovideInterval time.Duration, strategy string, acceleratedDHTClient bool, provideWorkerCount int) fx.Option {
|
|
system := fx.Provide(
|
|
fx.Annotate(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, repo repo.Repo) (*LegacyProvider, error) {
|
|
// Initialize provider.System first, before pinner/blockstore/etc.
|
|
// The KeyChanFunc will be set later via SetKeyProvider() once we have
|
|
// created the pinner, blockstore and other dependencies.
|
|
opts := []provider.Option{
|
|
provider.Online(cr),
|
|
provider.ReproviderInterval(reprovideInterval),
|
|
provider.ProvideWorkerCount(provideWorkerCount),
|
|
}
|
|
if !acceleratedDHTClient && reprovideInterval > 0 {
|
|
// The estimation kinda suck if you are running with accelerated DHT client,
|
|
// given this message is just trying to push people to use the acceleratedDHTClient
|
|
// let's not report on through if it's in use
|
|
opts = append(opts,
|
|
provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool {
|
|
avgProvideSpeed := duration / time.Duration(keysProvided)
|
|
count := uint64(keysProvided)
|
|
|
|
if !reprovide || !complete {
|
|
// We don't know how many CIDs we have to provide, try to fetch it from the blockstore.
|
|
// But don't try for too long as this might be very expensive if you have a huge datastore.
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
|
|
defer cancel()
|
|
|
|
// FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup.
|
|
// Note: talk to datastore directly, as to not depend on Blockstore here.
|
|
qr, err := repo.Datastore().Query(ctx, query.Query{
|
|
Prefix: blockstore.BlockPrefix.String(),
|
|
KeysOnly: true,
|
|
})
|
|
if err != nil {
|
|
logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err)
|
|
return false
|
|
}
|
|
defer qr.Close()
|
|
count = 0
|
|
countLoop:
|
|
for {
|
|
select {
|
|
case _, ok := <-qr.Next():
|
|
if !ok {
|
|
break countLoop
|
|
}
|
|
count++
|
|
case <-ctx.Done():
|
|
// really big blockstore mode
|
|
|
|
// how many blocks would be in a 10TiB blockstore with 128KiB blocks.
|
|
const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024)
|
|
// How long per block that lasts us.
|
|
expectedProvideSpeed := reprovideInterval / probableBigBlockstore
|
|
if avgProvideSpeed > expectedProvideSpeed {
|
|
logger.Errorf(`
|
|
🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔
|
|
|
|
Your node may be falling behind on DHT reprovides, which could affect content availability.
|
|
|
|
Observed: %d keys at %v per key
|
|
Estimated: Assuming 10TiB blockstore, would take %v to complete
|
|
⏰ Must finish within %v (Provide.DHT.Interval)
|
|
|
|
Solutions (try in order):
|
|
1. Enable Provide.DHT.SweepEnabled=true (recommended)
|
|
2. Increase Provide.DHT.MaxWorkers if needed
|
|
3. Enable Routing.AcceleratedDHTClient=true (last resort, resource intensive)
|
|
|
|
Learn more: https://github.com/ipfs/kubo/blob/master/docs/config.md#provide`,
|
|
keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore, reprovideInterval)
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// How long per block that lasts us.
|
|
expectedProvideSpeed := reprovideInterval
|
|
if count > 0 {
|
|
expectedProvideSpeed = reprovideInterval / time.Duration(count)
|
|
}
|
|
|
|
if avgProvideSpeed > expectedProvideSpeed {
|
|
logger.Errorf(`
|
|
🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔
|
|
|
|
Your node is falling behind on DHT reprovides, which will affect content availability.
|
|
|
|
Observed: %d keys at %v per key
|
|
Confirmed: ~%d total CIDs requiring %v to complete
|
|
⏰ Must finish within %v (Provide.DHT.Interval)
|
|
|
|
Solutions (try in order):
|
|
1. Enable Provide.DHT.SweepEnabled=true (recommended)
|
|
2. Increase Provide.DHT.MaxWorkers if needed
|
|
3. Enable Routing.AcceleratedDHTClient=true (last resort, resource intensive)
|
|
|
|
Learn more: https://github.com/ipfs/kubo/blob/master/docs/config.md#provide`,
|
|
keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count), reprovideInterval)
|
|
}
|
|
return false
|
|
}, sampledBatchSize))
|
|
}
|
|
|
|
sys, err := provider.New(repo.Datastore(), opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return sys.Close()
|
|
},
|
|
})
|
|
|
|
prov := &LegacyProvider{sys}
|
|
handleStrategyChange(strategy, prov, repo.Datastore())
|
|
|
|
return prov, nil
|
|
},
|
|
fx.As(new(provider.System)),
|
|
fx.As(new(DHTProvider)),
|
|
),
|
|
)
|
|
setKeyProvider := fx.Invoke(func(lc fx.Lifecycle, system provider.System, keyProvider provider.KeyChanFunc) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
// SetKeyProvider breaks the circular dependency between provider, blockstore, and pinner.
|
|
// We cannot create the blockstore without the provider (it needs to provide blocks),
|
|
// and we cannot determine the reproviding strategy without the pinner/blockstore.
|
|
// This deferred initialization allows us to create provider.System first,
|
|
// then set the actual key provider function after all dependencies are ready.
|
|
system.SetKeyProvider(keyProvider)
|
|
return nil
|
|
},
|
|
})
|
|
})
|
|
return fx.Options(
|
|
system,
|
|
setKeyProvider,
|
|
)
|
|
}
|
|
|
|
type dhtImpl interface {
|
|
routing.Routing
|
|
GetClosestPeers(context.Context, string) ([]peer.ID, error)
|
|
Host() host.Host
|
|
MessageSender() dht_pb.MessageSender
|
|
}
|
|
|
|
type fullrtRouter struct {
|
|
*fullrt.FullRT
|
|
ready bool
|
|
logger *log.ZapEventLogger
|
|
}
|
|
|
|
func newFullRTRouter(fr *fullrt.FullRT, loggerName string) *fullrtRouter {
|
|
return &fullrtRouter{
|
|
FullRT: fr,
|
|
ready: true,
|
|
logger: log.Logger(loggerName),
|
|
}
|
|
}
|
|
|
|
// GetClosestPeers overrides fullrt.FullRT's GetClosestPeers and returns an
|
|
// error if the fullrt's initial network crawl isn't complete yet.
|
|
func (fr *fullrtRouter) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
|
|
if fr.ready {
|
|
if !fr.Ready() {
|
|
fr.ready = false
|
|
fr.logger.Info("AcceleratedDHTClient: waiting for routing table initialization (5-10 min, depends on DHT size and network) to complete before providing")
|
|
return nil, errAcceleratedDHTNotReady
|
|
}
|
|
} else {
|
|
if fr.Ready() {
|
|
fr.ready = true
|
|
fr.logger.Info("AcceleratedDHTClient: routing table ready, providing can begin")
|
|
} else {
|
|
return nil, errAcceleratedDHTNotReady
|
|
}
|
|
}
|
|
return fr.FullRT.GetClosestPeers(ctx, key)
|
|
}
|
|
|
|
var (
|
|
_ dhtImpl = &dht.IpfsDHT{}
|
|
_ dhtImpl = &fullrtRouter{}
|
|
)
|
|
|
|
type addrsFilter interface {
|
|
FilteredAddrs() []ma.Multiaddr
|
|
}
|
|
|
|
func SweepingProviderOpt(cfg *config.Config) fx.Option {
|
|
reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval)
|
|
type providerInput struct {
|
|
fx.In
|
|
DHT routing.Routing `name:"dhtc"`
|
|
Repo repo.Repo
|
|
}
|
|
sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *keystore.ResettableKeystore, error) {
|
|
ds := namespace.Wrap(in.Repo.Datastore(), datastore.NewKey(providerDatastorePrefix))
|
|
ks, err := keystore.NewResettableKeystore(ds,
|
|
keystore.WithPrefixBits(16),
|
|
keystore.WithDatastorePath(keystoreDatastorePath),
|
|
keystore.WithBatchSize(int(cfg.Provide.DHT.KeystoreBatchSize.WithDefault(config.DefaultProvideDHTKeystoreBatchSize))),
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
// Constants for buffered provider configuration
|
|
// These values match the upstream defaults from go-libp2p-kad-dht and have been battle-tested
|
|
const (
|
|
// bufferedDsName is the datastore namespace used by the buffered provider.
|
|
// The dsqueue persists operations here to handle large data additions without
|
|
// being memory-bound, allowing operations on hardware with limited RAM and
|
|
// enabling core operations to return instantly while processing happens async.
|
|
bufferedDsName = "bprov"
|
|
|
|
// bufferedBatchSize controls how many operations are dequeued and processed
|
|
// together from the datastore queue. The worker processes up to this many
|
|
// operations at once, grouping them by type for efficiency.
|
|
bufferedBatchSize = 1 << 10 // 1024 items
|
|
|
|
// bufferedIdleWriteTime is an implementation detail of go-dsqueue that controls
|
|
// how long the datastore buffer waits for new multihashes to arrive before
|
|
// flushing in-memory items to the datastore. This does NOT affect providing speed -
|
|
// provides happen as fast as possible via a dedicated worker that continuously
|
|
// processes the queue regardless of this timing.
|
|
bufferedIdleWriteTime = time.Minute
|
|
|
|
// loggerName is the name of the go-log logger used by the provider.
|
|
loggerName = dhtprovider.DefaultLoggerName
|
|
)
|
|
|
|
bufferedProviderOpts := []buffered.Option{
|
|
buffered.WithBatchSize(bufferedBatchSize),
|
|
buffered.WithDsName(bufferedDsName),
|
|
buffered.WithIdleWriteTime(bufferedIdleWriteTime),
|
|
}
|
|
var impl dhtImpl
|
|
switch inDht := in.DHT.(type) {
|
|
case *dht.IpfsDHT:
|
|
if inDht != nil {
|
|
impl = inDht
|
|
}
|
|
case *dual.DHT:
|
|
if inDht != nil {
|
|
prov, err := ddhtprovider.New(inDht,
|
|
ddhtprovider.WithKeystore(ks),
|
|
ddhtprovider.WithDatastore(ds),
|
|
ddhtprovider.WithResumeCycle(cfg.Provide.DHT.ResumeEnabled.WithDefault(config.DefaultProvideDHTResumeEnabled)),
|
|
|
|
ddhtprovider.WithReprovideInterval(reprovideInterval),
|
|
ddhtprovider.WithMaxReprovideDelay(time.Hour),
|
|
ddhtprovider.WithOfflineDelay(cfg.Provide.DHT.OfflineDelay.WithDefault(config.DefaultProvideDHTOfflineDelay)),
|
|
ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute),
|
|
|
|
ddhtprovider.WithMaxWorkers(int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers))),
|
|
ddhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))),
|
|
ddhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))),
|
|
ddhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))),
|
|
|
|
ddhtprovider.WithLoggerName(loggerName),
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return buffered.New(prov, ds, bufferedProviderOpts...), ks, nil
|
|
}
|
|
case *fullrt.FullRT:
|
|
if inDht != nil {
|
|
impl = newFullRTRouter(inDht, loggerName)
|
|
}
|
|
}
|
|
if impl == nil {
|
|
return &NoopProvider{}, nil, nil
|
|
}
|
|
|
|
var selfAddrsFunc func() []ma.Multiaddr
|
|
if imlpFilter, ok := impl.(addrsFilter); ok {
|
|
selfAddrsFunc = imlpFilter.FilteredAddrs
|
|
} else {
|
|
selfAddrsFunc = func() []ma.Multiaddr { return impl.Host().Addrs() }
|
|
}
|
|
opts := []dhtprovider.Option{
|
|
dhtprovider.WithKeystore(ks),
|
|
dhtprovider.WithDatastore(ds),
|
|
dhtprovider.WithResumeCycle(cfg.Provide.DHT.ResumeEnabled.WithDefault(config.DefaultProvideDHTResumeEnabled)),
|
|
dhtprovider.WithHost(impl.Host()),
|
|
dhtprovider.WithRouter(impl),
|
|
dhtprovider.WithMessageSender(impl.MessageSender()),
|
|
dhtprovider.WithSelfAddrs(selfAddrsFunc),
|
|
dhtprovider.WithAddLocalRecord(func(h mh.Multihash) error {
|
|
return impl.Provide(context.Background(), cid.NewCidV1(cid.Raw, h), false)
|
|
}),
|
|
|
|
dhtprovider.WithReplicationFactor(amino.DefaultBucketSize),
|
|
dhtprovider.WithReprovideInterval(reprovideInterval),
|
|
dhtprovider.WithMaxReprovideDelay(time.Hour),
|
|
dhtprovider.WithOfflineDelay(cfg.Provide.DHT.OfflineDelay.WithDefault(config.DefaultProvideDHTOfflineDelay)),
|
|
dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute),
|
|
|
|
dhtprovider.WithMaxWorkers(int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers))),
|
|
dhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))),
|
|
dhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))),
|
|
dhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))),
|
|
|
|
dhtprovider.WithLoggerName(loggerName),
|
|
}
|
|
|
|
prov, err := dhtprovider.New(opts...)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
return buffered.New(prov, ds, bufferedProviderOpts...), ks, nil
|
|
})
|
|
|
|
type keystoreInput struct {
|
|
fx.In
|
|
Provider DHTProvider
|
|
Keystore *keystore.ResettableKeystore
|
|
KeyProvider provider.KeyChanFunc
|
|
}
|
|
initKeystore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) {
|
|
// Skip keystore initialization for NoopProvider
|
|
if _, ok := in.Provider.(*NoopProvider); ok {
|
|
return
|
|
}
|
|
|
|
var (
|
|
cancel context.CancelFunc
|
|
done = make(chan struct{})
|
|
)
|
|
|
|
syncKeystore := func(ctx context.Context) error {
|
|
kcf, err := in.KeyProvider(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := in.Keystore.ResetCids(ctx, kcf); err != nil {
|
|
return err
|
|
}
|
|
if err := in.Provider.RefreshSchedule(); err != nil {
|
|
logger.Infow("refreshing provider schedule", "err", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
// Set the KeyProvider as a garbage collection function for the
|
|
// keystore. Periodically purge the Keystore from all its keys and
|
|
// replace them with the keys that needs to be reprovided, coming from
|
|
// the KeyChanFunc. So far, this is the less worse way to remove CIDs
|
|
// that shouldn't be reprovided from the provider's state.
|
|
go func() {
|
|
// Sync the keystore once at startup. This operation is async since
|
|
// we need to walk the DAG of objects matching the provide strategy,
|
|
// which can take a while.
|
|
strategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy)
|
|
logger.Infow("provider keystore sync started", "strategy", strategy)
|
|
if err := syncKeystore(ctx); err != nil {
|
|
if ctx.Err() == nil {
|
|
logger.Errorw("provider keystore sync failed", "err", err, "strategy", strategy)
|
|
} else {
|
|
logger.Debugw("provider keystore sync interrupted by shutdown", "err", err, "strategy", strategy)
|
|
}
|
|
return
|
|
}
|
|
logger.Infow("provider keystore sync completed", "strategy", strategy)
|
|
}()
|
|
|
|
gcCtx, c := context.WithCancel(context.Background())
|
|
cancel = c
|
|
|
|
go func() { // garbage collection loop for cids to reprovide
|
|
defer close(done)
|
|
ticker := time.NewTicker(reprovideInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-gcCtx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if err := syncKeystore(gcCtx); err != nil {
|
|
logger.Errorw("provider keystore sync", "err", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
if cancel != nil {
|
|
cancel()
|
|
}
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
// Keystore will be closed by ensureProviderClosesBeforeKeystore hook
|
|
// to guarantee provider closes before keystore.
|
|
return nil
|
|
},
|
|
})
|
|
})
|
|
|
|
// ensureProviderClosesBeforeKeystore manages the shutdown order between
|
|
// provider and keystore to prevent race conditions.
|
|
//
|
|
// The provider's worker goroutines may call keystore methods during their
|
|
// operation. If keystore closes while these operations are in-flight, we get
|
|
// "keystore is closed" errors. By closing the provider first, we ensure all
|
|
// worker goroutines exit and complete any pending keystore operations before
|
|
// the keystore itself closes.
|
|
type providerKeystoreShutdownInput struct {
|
|
fx.In
|
|
Provider DHTProvider
|
|
Keystore *keystore.ResettableKeystore
|
|
}
|
|
ensureProviderClosesBeforeKeystore := fx.Invoke(func(lc fx.Lifecycle, in providerKeystoreShutdownInput) {
|
|
// Skip for NoopProvider
|
|
if _, ok := in.Provider.(*NoopProvider); ok {
|
|
return
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
// Close provider first - waits for all worker goroutines to exit.
|
|
// This ensures no code can access keystore after this returns.
|
|
if err := in.Provider.Close(); err != nil {
|
|
logger.Errorw("error closing provider during shutdown", "error", err)
|
|
}
|
|
|
|
// Close keystore - safe now, provider is fully shut down
|
|
return in.Keystore.Close()
|
|
},
|
|
})
|
|
})
|
|
|
|
// extractSweepingProvider extracts a SweepingProvider from the given provider interface.
|
|
// It handles unwrapping buffered and dual providers, always selecting WAN for dual DHT.
|
|
// Returns nil if the provider is not a sweeping provider type.
|
|
var extractSweepingProvider func(prov any) *dhtprovider.SweepingProvider
|
|
extractSweepingProvider = func(prov any) *dhtprovider.SweepingProvider {
|
|
switch p := prov.(type) {
|
|
case *dhtprovider.SweepingProvider:
|
|
return p
|
|
case *ddhtprovider.SweepingProvider:
|
|
return p.WAN
|
|
case *buffered.SweepingProvider:
|
|
// Recursively extract from the inner provider
|
|
return extractSweepingProvider(p.Provider)
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type alertInput struct {
|
|
fx.In
|
|
Provider DHTProvider
|
|
}
|
|
reprovideAlert := fx.Invoke(func(lc fx.Lifecycle, in alertInput) {
|
|
prov := extractSweepingProvider(in.Provider)
|
|
if prov == nil {
|
|
return
|
|
}
|
|
|
|
var (
|
|
cancel context.CancelFunc
|
|
done = make(chan struct{})
|
|
)
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
gcCtx, c := context.WithCancel(context.Background())
|
|
cancel = c
|
|
go func() {
|
|
defer close(done)
|
|
|
|
ticker := time.NewTicker(reprovideAlertPollInterval)
|
|
defer ticker.Stop()
|
|
|
|
var (
|
|
queueSize, prevQueueSize int64
|
|
queuedWorkers, prevQueuedWorkers bool
|
|
count int
|
|
)
|
|
|
|
for {
|
|
select {
|
|
case <-gcCtx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
|
|
stats := prov.Stats()
|
|
queuedWorkers = stats.Workers.QueuedPeriodic > 0
|
|
queueSize = int64(stats.Queues.PendingRegionReprovides)
|
|
|
|
// Alert if reprovide queue keeps growing and all periodic workers are busy.
|
|
// Requires consecutiveAlertsThreshold intervals of sustained growth.
|
|
if prevQueuedWorkers && queuedWorkers && queueSize > prevQueueSize {
|
|
count++
|
|
if count >= consecutiveAlertsThreshold {
|
|
logger.Errorf(`
|
|
🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔
|
|
|
|
Your node is falling behind on DHT reprovides, which will affect content availability.
|
|
|
|
Keyspace regions enqueued for reprovide:
|
|
%s ago:\t%d
|
|
Now:\t%d
|
|
|
|
All periodic workers are busy!
|
|
Active workers:\t%d / %d (max)
|
|
Active workers types:\t%d periodic, %d burst
|
|
Dedicated workers:\t%d periodic, %d burst
|
|
|
|
Solutions (try in order):
|
|
1. Increase Provide.DHT.MaxWorkers (current %d)
|
|
2. Increase Provide.DHT.DedicatedPeriodicWorkers (current %d)
|
|
3. Set Provide.DHT.SweepEnabled=false and Routing.AcceleratedDHTClient=true (last resort, not recommended)
|
|
|
|
See how the reprovide queue is processed in real-time with 'watch ipfs provide stat --all --compact'
|
|
|
|
See docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#providedhtmaxworkers`,
|
|
reprovideAlertPollInterval.Truncate(time.Minute).String(), prevQueueSize, queueSize,
|
|
stats.Workers.Active, stats.Workers.Max,
|
|
stats.Workers.ActivePeriodic, stats.Workers.ActiveBurst,
|
|
stats.Workers.DedicatedPeriodic, stats.Workers.DedicatedBurst,
|
|
stats.Workers.Max, stats.Workers.DedicatedPeriodic)
|
|
}
|
|
} else if !queuedWorkers {
|
|
count = 0
|
|
}
|
|
|
|
prevQueueSize, prevQueuedWorkers = queueSize, queuedWorkers
|
|
}
|
|
}()
|
|
return nil
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
// Cancel the alert loop
|
|
if cancel != nil {
|
|
cancel()
|
|
}
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
return nil
|
|
},
|
|
})
|
|
})
|
|
|
|
return fx.Options(
|
|
sweepingReprovider,
|
|
initKeystore,
|
|
ensureProviderClosesBeforeKeystore,
|
|
reprovideAlert,
|
|
)
|
|
}
|
|
|
|
// ONLINE/OFFLINE
|
|
|
|
// hasDHTRouting checks if the routing configuration includes a DHT component.
|
|
// Returns false for HTTP-only custom routing configurations (e.g., Routing.Type="custom"
|
|
// with only HTTP routers). This is used to determine whether SweepingProviderOpt
|
|
// can be used, since it requires a DHT client.
|
|
func hasDHTRouting(cfg *config.Config) bool {
|
|
routingType := cfg.Routing.Type.WithDefault(config.DefaultRoutingType)
|
|
switch routingType {
|
|
case "auto", "autoclient", "dht", "dhtclient", "dhtserver":
|
|
return true
|
|
case "custom":
|
|
// Check if any router in custom config is DHT-based
|
|
for _, router := range cfg.Routing.Routers {
|
|
if routerIncludesDHT(router, cfg) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
default: // "none", "delegated"
|
|
return false
|
|
}
|
|
}
|
|
|
|
// routerIncludesDHT recursively checks if a router configuration includes DHT.
|
|
// Handles parallel and sequential composite routers by checking their children.
|
|
func routerIncludesDHT(rp config.RouterParser, cfg *config.Config) bool {
|
|
switch rp.Type {
|
|
case config.RouterTypeDHT:
|
|
return true
|
|
case config.RouterTypeParallel, config.RouterTypeSequential:
|
|
if children, ok := rp.Parameters.(*config.ComposableRouterParams); ok {
|
|
for _, child := range children.Routers {
|
|
if childRouter, exists := cfg.Routing.Routers[child.RouterName]; exists {
|
|
if routerIncludesDHT(childRouter, cfg) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// OnlineProviders groups units managing provide routing records online
|
|
func OnlineProviders(provide bool, cfg *config.Config) fx.Option {
|
|
if !provide {
|
|
return OfflineProviders()
|
|
}
|
|
|
|
providerStrategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy)
|
|
|
|
strategyFlag := config.ParseProvideStrategy(providerStrategy)
|
|
if strategyFlag == 0 {
|
|
return fx.Error(fmt.Errorf("provider: unknown strategy %q", providerStrategy))
|
|
}
|
|
|
|
opts := []fx.Option{
|
|
fx.Provide(setReproviderKeyProvider(providerStrategy)),
|
|
}
|
|
|
|
sweepEnabled := cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled)
|
|
dhtAvailable := hasDHTRouting(cfg)
|
|
|
|
// Use SweepingProvider only when both sweep is enabled AND DHT is available.
|
|
// For HTTP-only routing (e.g., Routing.Type="custom" with only HTTP routers),
|
|
// fall back to LegacyProvider which works with ProvideManyRouter.
|
|
// See https://github.com/ipfs/kubo/issues/11089
|
|
if sweepEnabled && dhtAvailable {
|
|
opts = append(opts, SweepingProviderOpt(cfg))
|
|
} else {
|
|
reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval)
|
|
acceleratedDHTClient := cfg.Routing.AcceleratedDHTClient.WithDefault(config.DefaultAcceleratedDHTClient)
|
|
provideWorkerCount := int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers))
|
|
|
|
opts = append(opts, LegacyProviderOpt(reprovideInterval, providerStrategy, acceleratedDHTClient, provideWorkerCount))
|
|
}
|
|
|
|
return fx.Options(opts...)
|
|
}
|
|
|
|
// OfflineProviders groups units managing provide routing records offline
|
|
func OfflineProviders() fx.Option {
|
|
return fx.Provide(func() DHTProvider {
|
|
return &NoopProvider{}
|
|
})
|
|
}
|
|
|
|
func mfsProvider(mfsRoot *mfs.Root, fetcher fetcher.Factory) provider.KeyChanFunc {
|
|
return func(ctx context.Context) (<-chan cid.Cid, error) {
|
|
err := mfsRoot.FlushMemFree(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("provider: error flushing MFS, cannot provide MFS: %w", err)
|
|
}
|
|
rootNode, err := mfsRoot.GetDirectory().GetNode()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("provider: error loading MFS root, cannot provide MFS: %w", err)
|
|
}
|
|
|
|
kcf := provider.NewDAGProvider(rootNode.Cid(), fetcher)
|
|
return kcf(ctx)
|
|
}
|
|
}
|
|
|
|
type provStrategyIn struct {
|
|
fx.In
|
|
Pinner pin.Pinner
|
|
Blockstore blockstore.Blockstore
|
|
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
|
|
OfflineUnixFSFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
|
|
MFSRoot *mfs.Root
|
|
Repo repo.Repo
|
|
}
|
|
|
|
type provStrategyOut struct {
|
|
fx.Out
|
|
ProvidingStrategy config.ProvideStrategy
|
|
ProvidingKeyChanFunc provider.KeyChanFunc
|
|
}
|
|
|
|
// createKeyProvider creates the appropriate KeyChanFunc based on strategy.
|
|
// Each strategy has different behavior:
|
|
// - "roots": Only root CIDs of pinned content
|
|
// - "pinned": All pinned content (roots + children)
|
|
// - "mfs": Only MFS content
|
|
// - "all": all blocks
|
|
func createKeyProvider(strategyFlag config.ProvideStrategy, in provStrategyIn) provider.KeyChanFunc {
|
|
switch strategyFlag {
|
|
case config.ProvideStrategyRoots:
|
|
return provider.NewBufferedProvider(dspinner.NewPinnedProvider(true, in.Pinner, in.OfflineIPLDFetcher))
|
|
case config.ProvideStrategyPinned:
|
|
return provider.NewBufferedProvider(dspinner.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher))
|
|
case config.ProvideStrategyPinned | config.ProvideStrategyMFS:
|
|
return provider.NewPrioritizedProvider(
|
|
provider.NewBufferedProvider(dspinner.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher)),
|
|
mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher),
|
|
)
|
|
case config.ProvideStrategyMFS:
|
|
return mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher)
|
|
default: // "all", "", "flat" (compat)
|
|
return in.Blockstore.AllKeysChan
|
|
}
|
|
}
|
|
|
|
// detectStrategyChange checks if the reproviding strategy has changed from what's persisted.
|
|
// Returns: (previousStrategy, hasChanged, error)
|
|
func detectStrategyChange(ctx context.Context, strategy string, ds datastore.Datastore) (string, bool, error) {
|
|
strategyKey := datastore.NewKey(reprovideStrategyKey)
|
|
|
|
prev, err := ds.Get(ctx, strategyKey)
|
|
if err != nil {
|
|
if errors.Is(err, datastore.ErrNotFound) {
|
|
return "", strategy != "", nil
|
|
}
|
|
return "", false, err
|
|
}
|
|
|
|
previousStrategy := string(prev)
|
|
return previousStrategy, previousStrategy != strategy, nil
|
|
}
|
|
|
|
// persistStrategy saves the current reproviding strategy to the datastore.
|
|
// Empty string strategies are deleted rather than stored.
|
|
func persistStrategy(ctx context.Context, strategy string, ds datastore.Datastore) error {
|
|
strategyKey := datastore.NewKey(reprovideStrategyKey)
|
|
|
|
if strategy == "" {
|
|
return ds.Delete(ctx, strategyKey)
|
|
}
|
|
return ds.Put(ctx, strategyKey, []byte(strategy))
|
|
}
|
|
|
|
// handleStrategyChange manages strategy change detection and queue clearing.
|
|
// Strategy change detection: when the reproviding strategy changes,
|
|
// we clear the provide queue to avoid unexpected behavior from mixing
|
|
// strategies. This ensures a clean transition between different providing modes.
|
|
func handleStrategyChange(strategy string, provider DHTProvider, ds datastore.Datastore) {
|
|
ctx := context.Background()
|
|
|
|
previous, changed, err := detectStrategyChange(ctx, strategy, ds)
|
|
if err != nil {
|
|
logger.Error("cannot read previous reprovide strategy", "err", err)
|
|
return
|
|
}
|
|
|
|
if !changed {
|
|
return
|
|
}
|
|
|
|
logger.Infow("Provide.Strategy changed, clearing provide queue", "previous", previous, "current", strategy)
|
|
provider.Clear()
|
|
|
|
if err := persistStrategy(ctx, strategy, ds); err != nil {
|
|
logger.Error("cannot update reprovide strategy", "err", err)
|
|
}
|
|
}
|
|
|
|
func setReproviderKeyProvider(strategy string) func(in provStrategyIn) provStrategyOut {
|
|
strategyFlag := config.ParseProvideStrategy(strategy)
|
|
|
|
return func(in provStrategyIn) provStrategyOut {
|
|
// Create the appropriate key provider based on strategy
|
|
kcf := createKeyProvider(strategyFlag, in)
|
|
return provStrategyOut{
|
|
ProvidingStrategy: strategyFlag,
|
|
ProvidingKeyChanFunc: kcf,
|
|
}
|
|
}
|
|
}
|