This commit is contained in:
Guillaume Michel 2026-01-13 15:31:47 -08:00 committed by GitHub
commit e53423471a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -16,6 +16,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
log "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/repo"
irouting "github.com/ipfs/kubo/routing"
@ -53,6 +54,8 @@ const (
keystoreDatastorePath = "keystore"
)
var errFullRTNotReady = errors.New("fullrt: router not ready")
// Interval between reprovide queue monitoring checks for slow reprovide alerts.
// Used when Provide.DHT.SweepEnabled=true
const reprovideAlertPollInterval = 15 * time.Minute
@ -325,13 +328,34 @@ type dhtImpl interface {
type fullrtRouter struct {
*fullrt.FullRT
ready bool
logger *log.ZapEventLogger
}
func newFullRTRouter(fr *fullrt.FullRT, loggerName string) *fullrtRouter {
return &fullrtRouter{
FullRT: fr,
ready: true,
logger: log.Logger(loggerName),
}
}
// GetClosestPeers overrides fullrt.FullRT's GetClosestPeers and returns an
// error if the fullrt's initial network crawl isn't complete yet.
func (fr *fullrtRouter) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
if !fr.Ready() {
return nil, errors.New("fullrt: initial network crawl still running")
if fr.ready {
if !fr.Ready() {
fr.ready = false
fr.logger.Info("fullrt: waiting for network crawl to complete before providing")
return nil, errFullRTNotReady
}
} else {
if fr.Ready() {
fr.ready = true
fr.logger.Info("fullrt: network crawl complete")
} else {
return nil, errFullRTNotReady
}
}
return fr.FullRT.GetClosestPeers(ctx, key)
}
@ -382,6 +406,9 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
// provides happen as fast as possible via a dedicated worker that continuously
// processes the queue regardless of this timing.
bufferedIdleWriteTime = time.Minute
// loggerName is the name of the go-log logger used by the provider.
loggerName = dhtprovider.DefaultLoggerName
)
bufferedProviderOpts := []buffered.Option{
@ -411,6 +438,8 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
ddhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))),
ddhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))),
ddhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))),
ddhtprovider.WithLoggerName(loggerName),
)
if err != nil {
return nil, nil, err
@ -419,7 +448,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
}
case *fullrt.FullRT:
if inDht != nil {
impl = &fullrtRouter{inDht}
impl = newFullRTRouter(inDht, loggerName)
}
}
if impl == nil {
@ -454,6 +483,8 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
dhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))),
dhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))),
dhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))),
dhtprovider.WithLoggerName(loggerName),
}
prov, err := dhtprovider.New(opts...)