From c9fbe62069f52cf919ed2ac5b8f285b8dd701c5a Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 20 Aug 2025 15:15:08 +0200 Subject: [PATCH] fix merge issues --- core/commands/routing.go | 60 ++++++++++++++-------------------------- core/coreapi/pin.go | 3 -- core/coreapi/unixfs.go | 5 ---- core/node/core.go | 12 ++++---- core/node/provider.go | 23 ++++----------- 5 files changed, 32 insertions(+), 71 deletions(-) diff --git a/core/commands/routing.go b/core/commands/routing.go index 3db7756e3..44e221f88 100644 --- a/core/commands/routing.go +++ b/core/commands/routing.go @@ -11,10 +11,11 @@ import ( "github.com/ipfs/kubo/config" cmdenv "github.com/ipfs/kubo/core/commands/cmdenv" + "github.com/ipfs/kubo/core/node" + mh "github.com/multiformats/go-multihash" dag "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/boxo/ipns" - "github.com/ipfs/boxo/provider" cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" ipld "github.com/ipfs/go-ipld-format" @@ -208,15 +209,15 @@ var provideRefRoutingCmd = &cmds.Command{ go func() { defer cancel() if rec { - provideErr = provideKeysRec(ctx, nd.Routing, nd.DAG, cids) + provideErr := provideCidsRec(ctx, nd.Provider, nd.DAG, cids) + if provideErr != nil { + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.QueryError, + Extra: provideErr.Error(), + }) + } } else { - provideErr = provideKeys(ctx, nd.Routing, cids) - } - if provideErr != nil { - routing.PublishQueryEvent(ctx, &routing.QueryEvent{ - Type: routing.QueryError, - Extra: provideErr.Error(), - }) + provideCids(nd.Provider, cids) } }() @@ -264,11 +265,6 @@ Trigger reprovider to announce our data to network. return ErrNotOnline } - provideSys, ok := nd.Provider.(provider.System) - if !ok { - return errors.New("manual reprovide not supported with sweeping provider") - } - // respect global config cfg, err := nd.Repo.Config() if err != nil { @@ -280,6 +276,10 @@ Trigger reprovider to announce our data to network. if cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval) == 0 { return errors.New("invalid configuration: Reprovider.Interval is set to '0'") } + provideSys, ok := nd.Provider.(*node.BurstProvider) + if !ok { + return fmt.Errorf("manual reprovide not supported with sweeping provider, %T", nd.Provider) + } err = provideSys.Reprovide(req.Context) if err != nil { @@ -290,41 +290,23 @@ Trigger reprovider to announce our data to network. }, } -func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error { - for _, c := range cids { - // TODO: only provide cids according to Provide.Strategy - err := r.Provide(ctx, c, true) - if err != nil { - return err - } +func provideCids(prov node.DHTProvider, cids []cid.Cid) { + mhs := make([]mh.Multihash, len(cids)) + for i, c := range cids { + mhs[i] = c.Hash() } - return nil + prov.StartProviding(true, mhs...) } -func provideKeysRec(ctx context.Context, r routing.Routing, dserv ipld.DAGService, cids []cid.Cid) error { - provided := cid.NewSet() +func provideCidsRec(ctx context.Context, prov node.DHTProvider, dserv ipld.DAGService, cids []cid.Cid) error { for _, c := range cids { kset := cid.NewSet() - err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) if err != nil { return err } - - for _, k := range kset.Keys() { - if provided.Has(k) { - continue - } - - // TODO: only provide cids according to Provide.Strategy - err = r.Provide(ctx, k, true) - if err != nil { - return err - } - provided.Add(k) - } + provideCids(prov, kset.Keys()) } - return nil } diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 4499c1706..9bb44bac5 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -44,9 +44,6 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp return fmt.Errorf("pin: %s", err) } - // TODO: only provide cids according to Provide.Strategy - api.provider.StartProviding(false, dagNode.Cid().Hash()) - return api.pinning.Flush(ctx) } diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index 17eb31e96..1b6f7f896 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -218,11 +218,6 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options return path.ImmutablePath{}, err } - if !settings.OnlyHash { - // TODO: only provide cids according to Provide.Strategy - api.provider.StartProviding(false, nd.Cid().Hash()) - } - return path.FromCid(nd.Cid()), nil } diff --git a/core/node/core.go b/core/node/core.go index ac488f961..ffc9c4c39 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -230,12 +230,12 @@ func Files(strategy string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo return nil, err } - // MFS (Mutable File System) provider integration: - // Only pass the provider to MFS when the strategy includes "mfs". - // MFS will call Provide() on every DAGService.Add() operation, - // which is sufficient for the "mfs" strategy - it ensures all - // MFS content gets announced as it's added or modified. - // For non-mfs strategies, we set provider to nil to avoid unnecessary providing. + // MFS (Mutable File System) provider integration: Only pass the provider + // to MFS when the strategy includes "mfs". MFS will call StartProviding() + // on every DAGService.Add() operation, which is sufficient for the "mfs" + // strategy - it ensures all MFS content gets announced as it's added or + // modified. For non-mfs strategies, we set provider to nil to avoid + // unnecessary providing. strategyFlag := config.ParseReproviderStrategy(strategy) if strategyFlag&config.ReproviderStrategyMFS == 0 { prov = nil diff --git a/core/node/provider.go b/core/node/provider.go index 951ac8113..6b94eb966 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -42,7 +42,6 @@ const reprovideStrategyKey = "/reprovideStrategy" type NoopProvider struct{} func (r *NoopProvider) StartProviding(bool, ...mh.Multihash) {} -func (r *NoopProvider) StopProviding(...mh.Multihash) {} func (r *NoopProvider) ProvideOnce(...mh.Multihash) {} func (r *NoopProvider) Clear() int { return 0 } @@ -63,23 +62,12 @@ type DHTProvider interface { // This operation is asynchronous, it returns as soon as the `keys` are added // to the provide queue, and provides happens asynchronously. StartProviding(force bool, keys ...mh.Multihash) - - // StopProviding stops reproviding the given keys to the DHT swarm. The node - // stops being referred as a provider when the provider records in the DHT - // swarm expire. - // - // Remove the `keys` from the schedule and return immediately. Valid records - // can remain in the DHT swarm up to the provider record TTL after calling - // `StopProviding`. - StopProviding(keys ...mh.Multihash) - // ProvideOnce sends provider records for the specified keys to the DHT swarm // only once. It does not automatically reprovide those keys afterward. // // Add the supplied multihashes to the provide queue, and return immediately. // The provide operation happens asynchronously. ProvideOnce(keys ...mh.Multihash) - // Clear clears the all the keys from the provide queue and returns // the number of keys that were cleared. Clear() int @@ -106,21 +94,20 @@ func (r *BurstProvider) StartProviding(force bool, keys ...mh.Multihash) { go r.ProvideOnce(keys...) } -// StopProviding is a no op, since reprovider isn't tracking the keys to be -// reprovided over time. -func (r *BurstProvider) StopProviding(keys ...mh.Multihash) { -} - // ProvideOnce sends out provider records for the supplied keys, but doesn't // mark the keys for reproviding. func (r *BurstProvider) ProvideOnce(keys ...mh.Multihash) { if many, ok := r.System.(routinghelpers.ProvideManyRouter); ok { - _ = many.ProvideMany(context.Background(), keys) + err := many.ProvideMany(context.Background(), keys) + if err != nil { + logger.Warnf("error providing many: %v", err) + } return } for _, k := range keys { if err := r.Provide(context.Background(), cid.NewCidV1(cid.Raw, k), true); err != nil { + logger.Warnf("error providing %s: %v", k, err) break } }