diff --git a/client/rpc/api_test.go b/client/rpc/api_test.go index c0da3d7b0..e51aebfba 100644 --- a/client/rpc/api_test.go +++ b/client/rpc/api_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/ipfs/boxo/path" + "github.com/ipfs/kubo/config" iface "github.com/ipfs/kubo/core/coreiface" "github.com/ipfs/kubo/core/coreiface/tests" "github.com/ipfs/kubo/test/cli/harness" @@ -45,6 +46,9 @@ func (np NodeProvider) MakeAPISwarm(t *testing.T, ctx context.Context, fullIdent c := n.ReadConfig() c.Experimental.FilestoreEnabled = true + // only provide things we pin. Allows to test + // provide operations. + c.Reprovider.Strategy = config.NewOptionalString("roots") n.WriteConfig(c) n.StartDaemon("--enable-pubsub-experiment", "--offline="+strconv.FormatBool(!online)) diff --git a/config/reprovider.go b/config/reprovider.go index 3e8a5b476..c02b64e85 100644 --- a/config/reprovider.go +++ b/config/reprovider.go @@ -1,15 +1,47 @@ package config -import "time" +import ( + "strings" + "time" +) const ( DefaultReproviderInterval = time.Hour * 22 // https://github.com/ipfs/kubo/pull/9326 DefaultReproviderStrategy = "all" ) +type ReproviderStrategy int + +const ( + ReproviderStrategyAll ReproviderStrategy = 1 << iota // 1 (0b00001) + ReproviderStrategyFlat // 2 (0b00010) + ReproviderStrategyPinned // 4 (0b00100) + ReproviderStrategyRoots // 8 (0b01000) + ReproviderStrategyMFS // 16 (0b10000) +) + // Reprovider configuration describes how CID from local datastore are periodically re-announced to routing systems. // For provide behavior of ad-hoc or newly created CIDs and their first-time announcement, see Provider.* type Reprovider struct { Interval *OptionalDuration `json:",omitempty"` // Time period to reprovide locally stored objects to the network Strategy *OptionalString `json:",omitempty"` // Which keys to announce } + +func ParseReproviderStrategy(s string) ReproviderStrategy { + var strategy ReproviderStrategy + for _, part := range strings.Split(s, "+") { + switch part { + case "all", "": // special case, does not mix with others + return ReproviderStrategyAll + case "flat": + strategy |= ReproviderStrategyFlat + case "pinned": + strategy |= ReproviderStrategyPinned + case "roots": + strategy |= ReproviderStrategyRoots + case "mfs": + strategy |= ReproviderStrategyMFS + } + } + return strategy +} diff --git a/core/core.go b/core/core.go index c693600f7..10571ba3e 100644 --- a/core/core.go +++ b/core/core.go @@ -107,6 +107,8 @@ type IpfsNode struct { Bitswap *bitswap.Bitswap `optional:"true"` // The Bitswap instance Namesys namesys.NameSystem // the name system, resolves paths to hashes Provider provider.System // the value provider system + ProvidingStrategy config.ReproviderStrategy `optional:"true"` + ProvidingKeyChanFunc provider.KeyChanFunc `optional:"true"` IpnsRepub *ipnsrp.Republisher `optional:"true"` ResourceManager network.ResourceManager `optional:"true"` diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 6e099e5fd..66763e884 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -26,6 +26,7 @@ import ( provider "github.com/ipfs/boxo/provider" offlineroute "github.com/ipfs/boxo/routing/offline" ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log/v2" "github.com/ipfs/kubo/config" coreiface "github.com/ipfs/kubo/core/coreiface" "github.com/ipfs/kubo/core/coreiface/options" @@ -44,6 +45,8 @@ import ( "github.com/ipfs/kubo/repo" ) +var log = logging.Logger("coreapi") + type CoreAPI struct { nctx context.Context @@ -70,7 +73,8 @@ type CoreAPI struct { ipldPathResolver pathresolver.Resolver unixFSPathResolver pathresolver.Resolver - provider provider.System + provider provider.System + providingStrategy config.ReproviderStrategy pubSub *pubsub.PubSub @@ -185,7 +189,8 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e ipldPathResolver: n.IPLDPathResolver, unixFSPathResolver: n.UnixFSPathResolver, - provider: n.Provider, + provider: n.Provider, + providingStrategy: n.ProvidingStrategy, pubSub: n.PubSub, @@ -235,8 +240,6 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e return nil, fmt.Errorf("error constructing namesys: %w", err) } - subAPI.provider = provider.NewNoopProvider() - subAPI.peerstore = nil subAPI.peerHost = nil subAPI.recordValidator = nil diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 878b4c28d..9bb44bac5 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -44,10 +44,6 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp return fmt.Errorf("pin: %s", err) } - if err := api.provider.Provide(ctx, dagNode.Cid(), true); err != nil { - return err - } - return api.pinning.Flush(ctx) } diff --git a/core/coreapi/test/api_test.go b/core/coreapi/test/api_test.go index dfd8cf685..7867e1f1c 100644 --- a/core/coreapi/test/api_test.go +++ b/core/coreapi/test/api_test.go @@ -70,6 +70,9 @@ func (NodeProvider) MakeAPISwarm(t *testing.T, ctx context.Context, fullIdentity c.Identity = ident c.Experimental.FilestoreEnabled = true c.AutoTLS.Enabled = config.False // disable so no /ws listener is added + // For provider tests, avoid that content gets + // auto-provided without calling "provide" (unless pinned). + c.Reprovider.Strategy = config.NewOptionalString("roots") ds := syncds.MutexWrap(datastore.NewMapDatastore()) r := &repo.Mock{ diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index 6c78a869a..b0206e7b9 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -16,6 +16,7 @@ import ( uio "github.com/ipfs/boxo/ipld/unixfs/io" "github.com/ipfs/boxo/mfs" "github.com/ipfs/boxo/path" + provider "github.com/ipfs/boxo/provider" cid "github.com/ipfs/go-cid" cidutil "github.com/ipfs/go-cidutil" ds "github.com/ipfs/go-datastore" @@ -102,7 +103,22 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options bserv := blockservice.New(addblockstore, exch, blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)), ) // hash security 001 - dserv := merkledag.NewDAGService(bserv) + + var dserv ipld.DAGService = merkledag.NewDAGService(bserv) + + // wrap the DAGService in a providingDAG service which provides every block written. + // note about strategies: + // - "all"/"flat" gets handled directly at the blockstore so no need to provide + // - "roots" gets handled in the pinner + // - "mfs" gets handled in mfs + // We need to provide the "pinned" cases only. Added blocks are not + // going to be provided by the blockstore (wrong strategy for that), + // nor by the pinner (the pinner doesn't traverse the pinned DAG itself, it only + // handles roots). This wrapping ensures all blocks of pinned content get provided. + if settings.Pin && !settings.OnlyHash && + (api.providingStrategy&config.ReproviderStrategyPinned) != 0 { + dserv = &providingDagService{dserv, api.provider} + } // add a sync call to the DagService // this ensures that data written to the DagService is persisted to the underlying datastore @@ -126,6 +142,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options } } + // Note: the dag service gets wrapped multiple times: + // 1. providingDagService (if pinned strategy) - provides blocks as they're added + // 2. syncDagService - ensures data persistence + // 3. batchingDagService (in coreunix.Adder) - batches operations for efficiency + fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, syncDserv) if err != nil { return path.ImmutablePath{}, err @@ -183,7 +204,8 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options if err != nil { return path.ImmutablePath{}, err } - mr, err := mfs.NewRoot(ctx, md, emptyDirNode, nil) + // MFS root for OnlyHash mode: provider is nil since we're not storing/providing anything + mr, err := mfs.NewRoot(ctx, md, emptyDirNode, nil, nil) if err != nil { return path.ImmutablePath{}, err } @@ -196,12 +218,6 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options return path.ImmutablePath{}, err } - if !settings.OnlyHash { - if err := api.provider.Provide(ctx, nd.Cid(), true); err != nil { - return path.ImmutablePath{}, err - } - } - return path.FromCid(nd.Cid()), nil } @@ -367,3 +383,40 @@ type syncDagService struct { func (s *syncDagService) Sync() error { return s.syncFn() } + +type providingDagService struct { + ipld.DAGService + provider provider.System +} + +func (pds *providingDagService) Add(ctx context.Context, n ipld.Node) error { + if err := pds.DAGService.Add(ctx, n); err != nil { + return err + } + // Provider errors are logged but not propagated. + // We don't want DAG operations to fail due to providing issues. + // The user's data is still stored successfully even if the + // announcement to the routing system fails temporarily. + if err := pds.provider.Provide(ctx, n.Cid(), true); err != nil { + log.Error(err) + } + return nil +} + +func (pds *providingDagService) AddMany(ctx context.Context, nds []ipld.Node) error { + if err := pds.DAGService.AddMany(ctx, nds); err != nil { + return err + } + // Same error handling philosophy as Add(): log but don't fail. + // Note: Provide calls are intentionally blocking here - the Provider + // implementation should handle concurrency/queuing internally. + for _, n := range nds { + if err := pds.provider.Provide(ctx, n.Cid(), true); err != nil { + log.Error(err) + break + } + } + return nil +} + +var _ ipld.DAGService = (*providingDagService)(nil) diff --git a/core/coreiface/tests/routing.go b/core/coreiface/tests/routing.go index 753d49550..147cb9b74 100644 --- a/core/coreiface/tests/routing.go +++ b/core/coreiface/tests/routing.go @@ -171,6 +171,13 @@ func (tp *TestSuite) TestRoutingFindProviders(t *testing.T) { t.Fatal(err) } + // Pin so that it is provided, given that providing strategy is + // "roots" and addTestObject does not pin. + err = apis[0].Pin().Add(ctx, p) + if err != nil { + t.Fatal(err) + } + time.Sleep(3 * time.Second) out, err := apis[2].Routing().FindProviders(ctx, p, options.Routing.NumProviders(1)) diff --git a/core/coreunix/add.go b/core/coreunix/add.go index c693773f9..55a9d5bec 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -103,7 +103,7 @@ func (adder *Adder) mfsRoot() (*mfs.Root, error) { } // Note, this adds it to DAGService already. - mr, err := mfs.NewEmptyRoot(adder.ctx, adder.dagService, nil, mfs.MkdirOpts{ + mr, err := mfs.NewEmptyRoot(adder.ctx, adder.dagService, nil, nil, mfs.MkdirOpts{ CidBuilder: adder.CidBuilder, MaxLinks: adder.MaxDirectoryLinks, MaxHAMTFanout: adder.MaxHAMTFanout, @@ -416,7 +416,7 @@ func (adder *Adder) addFileNode(ctx context.Context, path string, file files.Nod case files.Directory: return adder.addDir(ctx, path, f, toplevel) case *files.Symlink: - return adder.addSymlink(path, f) + return adder.addSymlink(ctx, path, f) case files.File: return adder.addFile(path, f) default: @@ -424,7 +424,7 @@ func (adder *Adder) addFileNode(ctx context.Context, path string, file files.Nod } } -func (adder *Adder) addSymlink(path string, l *files.Symlink) error { +func (adder *Adder) addSymlink(ctx context.Context, path string, l *files.Symlink) error { sdata, err := unixfs.SymlinkData(l.Target) if err != nil { return err @@ -482,7 +482,7 @@ func (adder *Adder) addDir(ctx context.Context, path string, dir files.Directory // if we need to store mode or modification time then create a new root which includes that data if toplevel && (adder.FileMode != 0 || !adder.FileMtime.IsZero()) { - mr, err := mfs.NewEmptyRoot(ctx, adder.dagService, nil, + mr, err := mfs.NewEmptyRoot(ctx, adder.dagService, nil, nil, mfs.MkdirOpts{ CidBuilder: adder.CidBuilder, MaxLinks: adder.MaxDirectoryLinks, diff --git a/core/node/bitswap.go b/core/node/bitswap.go index 250dc89f6..e73145292 100644 --- a/core/node/bitswap.go +++ b/core/node/bitswap.go @@ -14,8 +14,6 @@ import ( "github.com/ipfs/boxo/bitswap/network/httpnet" blockstore "github.com/ipfs/boxo/blockstore" exchange "github.com/ipfs/boxo/exchange" - "github.com/ipfs/boxo/exchange/providing" - provider "github.com/ipfs/boxo/provider" rpqm "github.com/ipfs/boxo/routing/providerquerymanager" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" @@ -222,32 +220,6 @@ func OnlineExchange(isBitswapActive bool) interface{} { } } -type providingExchangeIn struct { - fx.In - - BaseExch exchange.Interface - Provider provider.System -} - -// ProvidingExchange creates a providing.Exchange with the existing exchange -// and the provider.System. -// We cannot do this in OnlineExchange because it causes cycles so this is for -// a decorator. -func ProvidingExchange(provide bool) interface{} { - return func(in providingExchangeIn, lc fx.Lifecycle) exchange.Interface { - exch := in.BaseExch - if provide { - exch = providing.New(in.BaseExch, in.Provider) - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return exch.Close() - }, - }) - } - return exch - } -} - type noopExchange struct { closer io.Closer } diff --git a/core/node/core.go b/core/node/core.go index cb3439939..38e68f285 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -2,6 +2,7 @@ package node import ( "context" + "errors" "fmt" "github.com/ipfs/boxo/blockservice" @@ -17,6 +18,7 @@ import ( pathresolver "github.com/ipfs/boxo/path/resolver" pin "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/boxo/pinning/pinner/dspinner" + provider "github.com/ipfs/boxo/provider" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" format "github.com/ipfs/go-ipld-format" @@ -47,25 +49,50 @@ func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blocks } // Pinning creates new pinner which tells GC which blocks should be kept -func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) { - rootDS := repo.Datastore() +func Pinning(strategy string) func(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo, prov provider.System) (pin.Pinner, error) { + // Parse strategy at function creation time (not inside the returned function) + // This happens before the provider is created, which is why we pass the strategy + // string and parse it here, rather than using fx-provided ProvidingStrategy. + strategyFlag := config.ParseReproviderStrategy(strategy) - syncFn := func(ctx context.Context) error { - if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil { - return err + return func(bstore blockstore.Blockstore, + ds format.DAGService, + repo repo.Repo, + prov provider.System) (pin.Pinner, error) { + rootDS := repo.Datastore() + + syncFn := func(ctx context.Context) error { + if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil { + return err + } + return rootDS.Sync(ctx, filestore.FilestorePrefix) } - return rootDS.Sync(ctx, filestore.FilestorePrefix) + syncDs := &syncDagService{ds, syncFn} + + ctx := context.TODO() + + var opts []dspinner.Option + roots := (strategyFlag & config.ReproviderStrategyRoots) != 0 + pinned := (strategyFlag & config.ReproviderStrategyPinned) != 0 + + // Important: Only one of WithPinnedProvider or WithRootsProvider should be active. + // Having both would cause duplicate root advertisements since "pinned" includes all + // pinned content (roots + children), while "roots" is just the root CIDs. + // We prioritize "pinned" if both are somehow set (though this shouldn't happen + // with proper strategy parsing). + if pinned { + opts = append(opts, dspinner.WithPinnedProvider(prov)) + } else if roots { + opts = append(opts, dspinner.WithRootsProvider(prov)) + } + + pinning, err := dspinner.New(ctx, rootDS, syncDs, opts...) + if err != nil { + return nil, err + } + + return pinning, nil } - syncDs := &syncDagService{ds, syncFn} - - ctx := context.TODO() - - pinning, err := dspinner.New(ctx, rootDS, syncDs) - if err != nil { - return nil, err - } - - return pinning, nil } var ( @@ -152,63 +179,76 @@ func Dag(bs blockservice.BlockService) format.DAGService { } // Files loads persisted MFS root -func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore) (*mfs.Root, error) { - dsk := datastore.NewKey("/local/filesroot") - pf := func(ctx context.Context, c cid.Cid) error { - rootDS := repo.Datastore() - if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil { - return err - } - if err := rootDS.Sync(ctx, filestore.FilestorePrefix); err != nil { - return err +func Files(strategy string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov provider.System) (*mfs.Root, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov provider.System) (*mfs.Root, error) { + dsk := datastore.NewKey("/local/filesroot") + pf := func(ctx context.Context, c cid.Cid) error { + rootDS := repo.Datastore() + if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil { + return err + } + if err := rootDS.Sync(ctx, filestore.FilestorePrefix); err != nil { + return err + } + + if err := rootDS.Put(ctx, dsk, c.Bytes()); err != nil { + return err + } + return rootDS.Sync(ctx, dsk) } - if err := rootDS.Put(ctx, dsk, c.Bytes()); err != nil { - return err - } - return rootDS.Sync(ctx, dsk) - } + var nd *merkledag.ProtoNode + ctx := helpers.LifecycleCtx(mctx, lc) + val, err := repo.Datastore().Get(ctx, dsk) - var nd *merkledag.ProtoNode - ctx := helpers.LifecycleCtx(mctx, lc) - val, err := repo.Datastore().Get(ctx, dsk) + switch { + case errors.Is(err, datastore.ErrNotFound): + nd = unixfs.EmptyDirNode() + err := dag.Add(ctx, nd) + if err != nil { + return nil, fmt.Errorf("failure writing filesroot to dagstore: %s", err) + } + case err == nil: + c, err := cid.Cast(val) + if err != nil { + return nil, err + } - switch { - case err == datastore.ErrNotFound || val == nil: - nd = unixfs.EmptyDirNode() - err := dag.Add(ctx, nd) - if err != nil { - return nil, fmt.Errorf("failure writing filesroot to dagstore: %s", err) - } - case err == nil: - c, err := cid.Cast(val) - if err != nil { + offineDag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + rnd, err := offineDag.Get(ctx, c) + if err != nil { + return nil, fmt.Errorf("error loading filesroot from dagservice: %s", err) + } + + pbnd, ok := rnd.(*merkledag.ProtoNode) + if !ok { + return nil, merkledag.ErrNotProtobuf + } + + nd = pbnd + default: return nil, err } - offineDag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - rnd, err := offineDag.Get(ctx, c) - if err != nil { - return nil, fmt.Errorf("error loading filesroot from dagservice: %s", err) + // MFS (Mutable File System) provider integration: + // Only pass the provider to MFS when the strategy includes "mfs". + // MFS will call Provide() on every DAGService.Add() operation, + // which is sufficient for the "mfs" strategy - it ensures all + // MFS content gets announced as it's added or modified. + // For non-mfs strategies, we set provider to nil to avoid unnecessary providing. + strategyFlag := config.ParseReproviderStrategy(strategy) + if strategyFlag&config.ReproviderStrategyMFS == 0 { + prov = nil } - pbnd, ok := rnd.(*merkledag.ProtoNode) - if !ok { - return nil, merkledag.ErrNotProtobuf - } + root, err := mfs.NewRoot(ctx, dag, nd, pf, prov) - nd = pbnd - default: - return nil, err + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return root.Close() + }, + }) + + return root, err } - - root, err := mfs.NewRoot(ctx, dag, nd, pf) - - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return root.Close() - }, - }) - - return root, err } diff --git a/core/node/groups.go b/core/node/groups.go index 5b16b5527..9904574a8 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -250,7 +250,12 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option { return fx.Options( fx.Provide(RepoConfig), fx.Provide(Datastore), - fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead, cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough))), + fx.Provide(BaseBlockstoreCtor( + cacheOpts, + cfg.Datastore.HashOnRead, + cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough), + cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy), + )), finalBstore, ) } @@ -350,8 +355,6 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part fx.Provide(BitswapOptions(cfg)), fx.Provide(Bitswap(isBitswapServerEnabled, isBitswapLibp2pEnabled, isHTTPRetrievalEnabled)), fx.Provide(OnlineExchange(isBitswapLibp2pEnabled)), - // Replace our Exchange with a Providing exchange! - fx.Decorate(ProvidingExchange(isProviderEnabled && isBitswapServerEnabled)), fx.Provide(DNSResolver), fx.Provide(Namesys(ipnsCacheSize, cfg.Ipns.MaxCacheTTL.WithDefault(config.DefaultIpnsMaxCacheTTL))), fx.Provide(Peering), @@ -391,8 +394,6 @@ var Core = fx.Options( fx.Provide(Dag), fx.Provide(FetcherConfig), fx.Provide(PathResolverConfig), - fx.Provide(Pinning), - fx.Provide(Files), ) func Networked(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.PartialLimitConfig) fx.Option { @@ -442,6 +443,8 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { uio.HAMTShardingSize = int(shardSingThresholdInt) uio.DefaultShardWidth = int(shardMaxFanout) + providerStrategy := cfg.Reprovider.Strategy.WithDefault(config.DefaultReproviderStrategy) + return fx.Options( bcfgOpts, @@ -450,6 +453,8 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { IPNS, Networked(bcfg, cfg, userResourceOverrides), fx.Provide(BlockService(cfg)), + fx.Provide(Pinning(providerStrategy)), + fx.Provide(Files(providerStrategy)), Core, ) } diff --git a/core/node/provider.go b/core/node/provider.go index d1237da97..5858c11dc 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -10,9 +10,12 @@ import ( "github.com/ipfs/boxo/fetcher" "github.com/ipfs/boxo/mfs" pin "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/boxo/pinning/pinner/dspinner" provider "github.com/ipfs/boxo/provider" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/repo" irouting "github.com/ipfs/kubo/routing" "go.uber.org/fx" @@ -26,12 +29,14 @@ const sampledBatchSize = 1000 // Datastore key used to store previous reprovide strategy. const reprovideStrategyKey = "/reprovideStrategy" -func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int, reprovideStrategy string) fx.Option { - return fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, keyProvider provider.KeyChanFunc, repo repo.Repo, bs blockstore.Blockstore) (provider.System, error) { +func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option { + return fx.Provide(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, repo repo.Repo) (provider.System, error) { + // Initialize provider.System first, before pinner/blockstore/etc. + // The KeyChanFunc will be set later via SetKeyProvider() once we have + // created the pinner, blockstore and other dependencies. opts := []provider.Option{ provider.Online(cr), provider.ReproviderInterval(reprovideInterval), - provider.KeyProvider(keyProvider), provider.ProvideWorkerCount(provideWorkerCount), } if !acceleratedDHTClient && reprovideInterval > 0 { @@ -50,16 +55,20 @@ func ProviderSys(reprovideInterval time.Duration, acceleratedDHTClient bool, pro defer cancel() // FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup. - ch, err := bs.AllKeysChan(ctx) + // Note: talk to datastore directly, as to not depend on Blockstore here. + qr, err := repo.Datastore().Query(ctx, query.Query{ + Prefix: blockstore.BlockPrefix.String(), + KeysOnly: true}) if err != nil { logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err) return false } + defer qr.Close() count = 0 countLoop: for { select { - case _, ok := <-ch: + case _, ok := <-qr.Next(): if !ok { break countLoop } @@ -120,34 +129,10 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtcli }, sampledBatchSize)) } - var strategyChanged bool - ctx := context.Background() - ds := repo.Datastore() - strategyKey := datastore.NewKey(reprovideStrategyKey) - - prev, err := ds.Get(ctx, strategyKey) - if err != nil && !errors.Is(err, datastore.ErrNotFound) { - logger.Error("cannot read previous reprovide strategy", "err", err) - } else if string(prev) != reprovideStrategy { - strategyChanged = true - } - - sys, err := provider.New(ds, opts...) + sys, err := provider.New(repo.Datastore(), opts...) if err != nil { return nil, err } - if strategyChanged { - logger.Infow("Reprovider.Strategy changed, clearing provide queue", "previous", string(prev), "current", reprovideStrategy) - sys.Clear() - if reprovideStrategy == "" { - err = ds.Delete(ctx, strategyKey) - } else { - err = ds.Put(ctx, strategyKey, []byte(reprovideStrategy)) - } - if err != nil { - logger.Error("cannot update reprovide strategy", "err", err) - } - } lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { @@ -162,22 +147,19 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtcli // ONLINE/OFFLINE // OnlineProviders groups units managing provider routing records online -func OnlineProviders(provide bool, reprovideStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option { +func OnlineProviders(provide bool, providerStrategy string, reprovideInterval time.Duration, acceleratedDHTClient bool, provideWorkerCount int) fx.Option { if !provide { return OfflineProviders() } - var keyProvider fx.Option - switch reprovideStrategy { - case "all", "", "roots", "pinned", "mfs", "pinned+mfs", "flat": - keyProvider = fx.Provide(newProvidingStrategy(reprovideStrategy)) - default: - return fx.Error(fmt.Errorf("unknown reprovider strategy %q", reprovideStrategy)) + strategyFlag := config.ParseReproviderStrategy(providerStrategy) + if strategyFlag == 0 { + return fx.Error(fmt.Errorf("unknown reprovider strategy %q", providerStrategy)) } return fx.Options( - keyProvider, - ProviderSys(reprovideInterval, acceleratedDHTClient, provideWorkerCount, reprovideStrategy), + fx.Provide(setReproviderKeyProvider(providerStrategy)), + ProviderSys(reprovideInterval, acceleratedDHTClient, provideWorkerCount), ) } @@ -215,38 +197,138 @@ func mfsRootProvider(mfsRoot *mfs.Root) provider.KeyChanFunc { } } -func newProvidingStrategy(strategy string) interface{} { - type input struct { - fx.In - Pinner pin.Pinner - Blockstore blockstore.Blockstore - OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"` - OfflineUnixFSFetcher fetcher.Factory `name:"offlineUnixfsFetcher"` - MFSRoot *mfs.Root +type provStrategyIn struct { + fx.In + Pinner pin.Pinner + Blockstore blockstore.Blockstore + OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"` + OfflineUnixFSFetcher fetcher.Factory `name:"offlineUnixfsFetcher"` + MFSRoot *mfs.Root + Provider provider.System + Repo repo.Repo +} + +type provStrategyOut struct { + fx.Out + ProvidingStrategy config.ReproviderStrategy + ProvidingKeyChanFunc provider.KeyChanFunc +} + +// createKeyProvider creates the appropriate KeyChanFunc based on strategy. +// Each strategy has different behavior: +// - "roots": Only root CIDs of pinned content +// - "pinned": All pinned content (roots + children) +// - "mfs": Only MFS content +// - "flat": All blocks, no prioritization +// - "all": Prioritized: pins first, then MFS roots, then all blocks +func createKeyProvider(strategyFlag config.ReproviderStrategy, in provStrategyIn) provider.KeyChanFunc { + switch strategyFlag { + case config.ReproviderStrategyRoots: + return provider.NewBufferedProvider(dspinner.NewPinnedProvider(true, in.Pinner, in.OfflineIPLDFetcher)) + case config.ReproviderStrategyPinned: + return provider.NewBufferedProvider(dspinner.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher)) + case config.ReproviderStrategyPinned | config.ReproviderStrategyMFS: + return provider.NewPrioritizedProvider( + provider.NewBufferedProvider(dspinner.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher)), + mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher), + ) + case config.ReproviderStrategyMFS: + return mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher) + case config.ReproviderStrategyFlat: + return in.Blockstore.AllKeysChan + default: // "all", "" + return createAllStrategyProvider(in) } - return func(in input) provider.KeyChanFunc { - switch strategy { - case "roots": - return provider.NewBufferedProvider(provider.NewPinnedProvider(true, in.Pinner, in.OfflineIPLDFetcher)) - case "pinned": - return provider.NewBufferedProvider(provider.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher)) - case "pinned+mfs": - return provider.NewPrioritizedProvider( - provider.NewBufferedProvider(provider.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher)), - mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher), - ) - case "mfs": - return mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher) - case "flat": - return provider.NewBlockstoreProvider(in.Blockstore) - default: // "all", "" - return provider.NewPrioritizedProvider( - provider.NewPrioritizedProvider( - provider.NewBufferedProvider(provider.NewPinnedProvider(true, in.Pinner, in.OfflineIPLDFetcher)), - mfsRootProvider(in.MFSRoot), - ), - provider.NewBlockstoreProvider(in.Blockstore), - ) +} + +// createAllStrategyProvider creates the complex "all" strategy provider. +// This implements a three-tier priority system: +// 1. Root blocks of direct and recursive pins (highest priority) +// 2. MFS root (medium priority) +// 3. All other blocks in blockstore (lowest priority) +func createAllStrategyProvider(in provStrategyIn) provider.KeyChanFunc { + return provider.NewPrioritizedProvider( + provider.NewPrioritizedProvider( + provider.NewBufferedProvider(dspinner.NewPinnedProvider(true, in.Pinner, in.OfflineIPLDFetcher)), + mfsRootProvider(in.MFSRoot), + ), + in.Blockstore.AllKeysChan, + ) +} + +// detectStrategyChange checks if the reproviding strategy has changed from what's persisted. +// Returns: (previousStrategy, hasChanged, error) +func detectStrategyChange(ctx context.Context, strategy string, ds datastore.Datastore) (string, bool, error) { + strategyKey := datastore.NewKey(reprovideStrategyKey) + + prev, err := ds.Get(ctx, strategyKey) + if err != nil { + if errors.Is(err, datastore.ErrNotFound) { + return "", strategy != "", nil + } + return "", false, err + } + + previousStrategy := string(prev) + return previousStrategy, previousStrategy != strategy, nil +} + +// persistStrategy saves the current reproviding strategy to the datastore. +// Empty string strategies are deleted rather than stored. +func persistStrategy(ctx context.Context, strategy string, ds datastore.Datastore) error { + strategyKey := datastore.NewKey(reprovideStrategyKey) + + if strategy == "" { + return ds.Delete(ctx, strategyKey) + } + return ds.Put(ctx, strategyKey, []byte(strategy)) +} + +// handleStrategyChange manages strategy change detection and queue clearing. +// Strategy change detection: when the reproviding strategy changes, +// we clear the provide queue to avoid unexpected behavior from mixing +// strategies. This ensures a clean transition between different providing modes. +func handleStrategyChange(strategy string, provider provider.System, ds datastore.Datastore) { + ctx := context.Background() + + previous, changed, err := detectStrategyChange(ctx, strategy, ds) + if err != nil { + logger.Error("cannot read previous reprovide strategy", "err", err) + return + } + + if !changed { + return + } + + logger.Infow("Reprovider.Strategy changed, clearing provide queue", "previous", previous, "current", strategy) + provider.Clear() + + if err := persistStrategy(ctx, strategy, ds); err != nil { + logger.Error("cannot update reprovide strategy", "err", err) + } +} + +func setReproviderKeyProvider(strategy string) func(in provStrategyIn) provStrategyOut { + strategyFlag := config.ParseReproviderStrategy(strategy) + + return func(in provStrategyIn) provStrategyOut { + // Create the appropriate key provider based on strategy + kcf := createKeyProvider(strategyFlag, in) + + // SetKeyProvider breaks the circular dependency between provider, blockstore, and pinner. + // We cannot create the blockstore without the provider (it needs to provide blocks), + // and we cannot determine the reproviding strategy without the pinner/blockstore. + // This deferred initialization allows us to create provider.System first, + // then set the actual key provider function after all dependencies are ready. + in.Provider.SetKeyProvider(kcf) + + // Handle strategy changes (detection, queue clearing, persistence) + handleStrategyChange(strategy, in.Provider, in.Repo.Datastore()) + + return provStrategyOut{ + ProvidingStrategy: strategyFlag, + ProvidingKeyChanFunc: kcf, } } } diff --git a/core/node/storage.go b/core/node/storage.go index fd8dfb82e..5115d9403 100644 --- a/core/node/storage.go +++ b/core/node/storage.go @@ -2,6 +2,7 @@ package node import ( blockstore "github.com/ipfs/boxo/blockstore" + provider "github.com/ipfs/boxo/provider" "github.com/ipfs/go-datastore" config "github.com/ipfs/kubo/config" "go.uber.org/fx" @@ -27,11 +28,31 @@ func Datastore(repo repo.Repo) datastore.Datastore { type BaseBlocks blockstore.Blockstore // BaseBlockstoreCtor creates cached blockstore backed by the provided datastore -func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool, writeThrough bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { - return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { +func BaseBlockstoreCtor( + cacheOpts blockstore.CacheOpts, + hashOnRead bool, + writeThrough bool, + providingStrategy string, + +) func(mctx helpers.MetricsCtx, repo repo.Repo, prov provider.System, lc fx.Lifecycle) (bs BaseBlocks, err error) { + return func(mctx helpers.MetricsCtx, repo repo.Repo, prov provider.System, lc fx.Lifecycle) (bs BaseBlocks, err error) { + opts := []blockstore.Option{blockstore.WriteThrough(writeThrough)} + + // Blockstore providing integration: + // When strategy includes "all" or "flat", the blockstore directly provides blocks as they're Put. + // Important: Provide calls from blockstore are intentionally BLOCKING. + // The Provider implementation (not the blockstore) should handle concurrency/queuing. + // This avoids spawning unbounded goroutines for concurrent block additions. + strategyFlag := config.ParseReproviderStrategy(providingStrategy) + shouldProvide := config.ReproviderStrategyAll | config.ReproviderStrategyFlat + if strategyFlag&shouldProvide != 0 { + opts = append(opts, blockstore.Provider(prov)) + } + // hash security - bs = blockstore.NewBlockstore(repo.Datastore(), - blockstore.WriteThrough(writeThrough), + bs = blockstore.NewBlockstore( + repo.Datastore(), + opts..., ) bs = &verifbs.VerifBS{Blockstore: bs} bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts) diff --git a/docs/changelogs/v0.37.md b/docs/changelogs/v0.37.md index c35f21556..f64834881 100644 --- a/docs/changelogs/v0.37.md +++ b/docs/changelogs/v0.37.md @@ -2,7 +2,7 @@ -This release was brought to you by the [Interplanetary Shipyard](https://ipshipyard.com/) team. +This release was brought to you by the [Shipyard](https://ipshipyard.com/) team. - [v0.37.0](#v0370) @@ -12,6 +12,7 @@ This release was brought to you by the [Interplanetary Shipyard](https://ipship - [🔦 Highlights](#-highlights) - [Clear provide queue when reprovide strategy changes](#clear-provide-queue-when-reprovide-strategy-changes) - [Named pins in `ipfs add` command](#-named-pins-in-ipfs-add-command) + - [⚙️ `Reprovider.Strategy` is now consistently respected](#-reprovider-strategy-is-now-consistently-respected) - [Removed unnecessary dependencies](#removed-unnecessary-dependencies) - [Deprecated `ipfs stats reprovide`](#deprecated-ipfs-stats-reprovide) - [📦️ Important dependency updates](#-important-dependency-updates) @@ -45,6 +46,20 @@ $ ipfs pin ls --names bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi recursive testname ``` +#### ⚙️ `Reprovider.Strategy` is now consistently respected + +Prior to this version, files added, blocks received etc. were "provided" to the network (announced on the DHT) regardless of the ["reproviding strategy" setting](https://github.com/ipfs/kubo/blob/master/docs/config.md#reproviderstrategy). For example: + +- Strategy set to "pinned" + `ipfs add --pin=false` → file was provided regardless +- Strategy set to "roots" + `ipfs pin add` → all blocks (not only the root) were provided + +Only the periodic "reproviding" action (runs every 22h by default) respected the strategy. + +This was inefficient as content that should not be provided was getting provided once. Now all operations respect `Reprovider.Strategy`. If set to "roots", no blocks other than pin roots will be provided regardless of what is fetched, added etc. + +> [!NOTE] +> **Behavior change:** The `--offline` flag no longer affects providing behavior. Both `ipfs add` and `ipfs --offline add` now provide blocks according to the reproviding strategy when run against an online daemon (previously `--offline add` did not provide). Since `ipfs add` has been nearly as fast as offline mode [since v0.35](https://github.com/ipfs/kubo/blob/master/docs/changelogs/v0.35.md#fast-ipfs-add-in-online-mode), `--offline` is rarely needed. To run truly offline operations, use `ipfs --offline daemon`. + #### Removed unnecessary dependencies Kubo has been cleaned up by removing unnecessary dependencies and packages: diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index 55ef762a4..5fbd71ded 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -7,7 +7,7 @@ go 1.24 replace github.com/ipfs/kubo => ./../../.. require ( - github.com/ipfs/boxo v0.33.1 + github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08 github.com/ipfs/kubo v0.0.0-00010101000000-000000000000 github.com/libp2p/go-libp2p v0.42.1 github.com/multiformats/go-multiaddr v0.16.0 diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index 47158688c..d970aa54d 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -291,8 +291,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.25.0 h1:OqNqsGZPX8zh3eFMO8Lf8EHRRnSGBMqcd github.com/ipfs-shipyard/nopfs/ipfs v0.25.0/go.mod h1:BxhUdtBgOXg1B+gAPEplkg/GpyTZY+kCMSfsJvvydqU= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.33.1 h1:89m+ksw+cYi0ecTNTJ71IRS5ZrLiovmO6XWHIOGhAEg= -github.com/ipfs/boxo v0.33.1/go.mod h1:KwlJTzv5fb1GLlA9KyMqHQmvP+4mrFuiE3PnjdrPJHs= +github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08 h1:PtntQQtYOh7YTCRnrU1idTuOwxEi0ZmYM4u7ZfSAExY= +github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08/go.mod h1:KwlJTzv5fb1GLlA9KyMqHQmvP+4mrFuiE3PnjdrPJHs= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk= diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index de475dd89..9c36c9a26 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -108,7 +108,10 @@ func loadRoot(ctx context.Context, ipfs iface.CoreAPI, key iface.Key) (*mfs.Root return nil, nil, dag.ErrNotProtobuf } - root, err := mfs.NewRoot(ctx, ipfs.Dag(), pbnode, ipnsPubFunc(ipfs, key)) + // We have no access to provider.System from the CoreAPI. The Routing + // part offers Provide through the router so it may be slow/risky + // to give that here to MFS. Therefore we leave as nil. + root, err := mfs.NewRoot(ctx, ipfs.Dag(), pbnode, ipnsPubFunc(ipfs, key), nil) if err != nil { return nil, nil, err } diff --git a/go.mod b/go.mod index c7a89e9ef..b1969ea8d 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/hashicorp/go-version v1.7.0 github.com/ipfs-shipyard/nopfs v0.0.14 github.com/ipfs-shipyard/nopfs/ipfs v0.25.0 - github.com/ipfs/boxo v0.33.1 + github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08 github.com/ipfs/go-block-format v0.2.2 github.com/ipfs/go-cid v0.5.0 github.com/ipfs/go-cidutil v0.1.0 diff --git a/go.sum b/go.sum index c8ab14151..a2052ae5b 100644 --- a/go.sum +++ b/go.sum @@ -358,8 +358,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.25.0 h1:OqNqsGZPX8zh3eFMO8Lf8EHRRnSGBMqcd github.com/ipfs-shipyard/nopfs/ipfs v0.25.0/go.mod h1:BxhUdtBgOXg1B+gAPEplkg/GpyTZY+kCMSfsJvvydqU= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.33.1 h1:89m+ksw+cYi0ecTNTJ71IRS5ZrLiovmO6XWHIOGhAEg= -github.com/ipfs/boxo v0.33.1/go.mod h1:KwlJTzv5fb1GLlA9KyMqHQmvP+4mrFuiE3PnjdrPJHs= +github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08 h1:PtntQQtYOh7YTCRnrU1idTuOwxEi0ZmYM4u7ZfSAExY= +github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08/go.mod h1:KwlJTzv5fb1GLlA9KyMqHQmvP+4mrFuiE3PnjdrPJHs= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk= diff --git a/test/cli/harness/ipfs.go b/test/cli/harness/ipfs.go index 0842d3627..2f7a8f18e 100644 --- a/test/cli/harness/ipfs.go +++ b/test/cli/harness/ipfs.go @@ -101,6 +101,34 @@ func (n *Node) IPFSAdd(content io.Reader, args ...string) string { return out } +func (n *Node) IPFSBlockPut(content io.Reader, args ...string) string { + log.Debugf("node %d block put with args: %v", n.ID, args) + fullArgs := []string{"block", "put"} + fullArgs = append(fullArgs, args...) + res := n.Runner.MustRun(RunRequest{ + Path: n.IPFSBin, + Args: fullArgs, + CmdOpts: []CmdOpt{RunWithStdin(content)}, + }) + out := strings.TrimSpace(res.Stdout.String()) + log.Debugf("block put result: %q", out) + return out +} + +func (n *Node) IPFSDAGPut(content io.Reader, args ...string) string { + log.Debugf("node %d dag put with args: %v", n.ID, args) + fullArgs := []string{"dag", "put"} + fullArgs = append(fullArgs, args...) + res := n.Runner.MustRun(RunRequest{ + Path: n.IPFSBin, + Args: fullArgs, + CmdOpts: []CmdOpt{RunWithStdin(content)}, + }) + out := strings.TrimSpace(res.Stdout.String()) + log.Debugf("dag put result: %q", out) + return out +} + func (n *Node) IPFSDagImport(content io.Reader, cid string, args ...string) error { log.Debugf("node %d dag import with args: %v", n.ID, args) fullArgs := []string{"dag", "import", "--pin-roots=false"} diff --git a/test/cli/provider_test.go b/test/cli/provider_test.go index 4fd74ef8b..a3cb4c086 100644 --- a/test/cli/provider_test.go +++ b/test/cli/provider_test.go @@ -21,6 +21,12 @@ func TestProvider(t *testing.T) { return nodes.StartDaemons().Connect() } + initNodesWithoutStart := func(t *testing.T, n int, fn func(n *harness.Node)) harness.Nodes { + nodes := harness.NewT(t).NewNodes(n).Init() + nodes.ForEachPar(fn) + return nodes + } + expectNoProviders := func(t *testing.T, cid string, nodes ...*harness.Node) { for _, node := range nodes { res := node.IPFS("routing", "findprovs", "-n=1", cid) @@ -44,9 +50,47 @@ func TestProvider(t *testing.T) { defer nodes.StopDaemons() cid := nodes[0].IPFSAddStr(time.Now().String()) - // Reprovide as initialProviderDelay still ongoing - res := nodes[0].IPFS("routing", "reprovide") - require.NoError(t, res.Err) + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) + }) + + t.Run("Provider.Enabled=true announces new CIDs created by ipfs add --pin=false with default strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodes(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Provider.Enabled", true) + // Default strategy is "all" which should provide even unpinned content + }) + defer nodes.StopDaemons() + + cid := nodes[0].IPFSAddStr(time.Now().String(), "--pin=false") + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) + }) + + t.Run("Provider.Enabled=true announces new CIDs created by ipfs block put --pin=false with default strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodes(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Provider.Enabled", true) + // Default strategy is "all" which should provide unpinned content from block put + }) + defer nodes.StopDaemons() + + data := testutils.RandomBytes(256) + cid := nodes[0].IPFSBlockPut(bytes.NewReader(data), "--pin=false") + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) + }) + + t.Run("Provider.Enabled=true announces new CIDs created by ipfs dag put --pin=false with default strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodes(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Provider.Enabled", true) + // Default strategy is "all" which should provide unpinned content from dag put + }) + defer nodes.StopDaemons() + + dagData := `{"hello": "world", "timestamp": "` + time.Now().String() + `"}` + cid := nodes[0].IPFSDAGPut(bytes.NewReader([]byte(dagData)), "--pin=false") expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) }) @@ -100,7 +144,7 @@ func TestProvider(t *testing.T) { }) defer nodes.StopDaemons() - cid := nodes[0].IPFSAddStr(time.Now().String(), "--offline") + cid := nodes[0].IPFSAddStr(time.Now().String()) expectNoProviders(t, cid, nodes[1:]...) @@ -120,7 +164,7 @@ func TestProvider(t *testing.T) { }) defer nodes.StopDaemons() - cid := nodes[0].IPFSAddStr(time.Now().String(), "--offline") + cid := nodes[0].IPFSAddStr(time.Now().String()) expectNoProviders(t, cid, nodes[1:]...) @@ -131,7 +175,7 @@ func TestProvider(t *testing.T) { expectNoProviders(t, cid, nodes[1:]...) }) - t.Run("Reprovides with 'all' strategy", func(t *testing.T) { + t.Run("Provide with 'all' strategy", func(t *testing.T) { t.Parallel() nodes := initNodes(t, 2, func(n *harness.Node) { @@ -139,8 +183,124 @@ func TestProvider(t *testing.T) { }) defer nodes.StopDaemons() - cid := nodes[0].IPFSAddStr(time.Now().String(), "--local") + cid := nodes[0].IPFSAddStr("all strategy") + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) + }) + t.Run("Provide with 'flat' strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodes(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "flat") + }) + defer nodes.StopDaemons() + + cid := nodes[0].IPFSAddStr("flat strategy") + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) + }) + + t.Run("Provide with 'pinned' strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodes(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "pinned") + }) + defer nodes.StopDaemons() + + // Add a non-pinned CID (should not be provided) + cid := nodes[0].IPFSAddStr("pinned strategy", "--pin=false") + expectNoProviders(t, cid, nodes[1:]...) + + // Pin the CID (should now be provided) + nodes[0].IPFS("pin", "add", cid) + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) + }) + + t.Run("Provide with 'pinned+mfs' strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodes(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "pinned+mfs") + }) + defer nodes.StopDaemons() + + // Add a pinned CID (should be provided) + cidPinned := nodes[0].IPFSAddStr("pinned content") + cidUnpinned := nodes[0].IPFSAddStr("unpinned content", "--pin=false") + cidMFS := nodes[0].IPFSAddStr("mfs content", "--pin=false") + nodes[0].IPFS("files", "cp", "/ipfs/"+cidMFS, "/myfile") + + n0pid := nodes[0].PeerID().String() + expectProviders(t, cidPinned, n0pid, nodes[1:]...) + expectNoProviders(t, cidUnpinned, nodes[1:]...) + expectProviders(t, cidMFS, n0pid, nodes[1:]...) + }) + + t.Run("Provide with 'roots' strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodes(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "roots") + }) + defer nodes.StopDaemons() + + // Add a root CID (should be provided) + cidRoot := nodes[0].IPFSAddStr("roots strategy", "-w", "-Q") + // the same without wrapping should give us a child node. + cidChild := nodes[0].IPFSAddStr("root strategy", "--pin=false") + + expectProviders(t, cidRoot, nodes[0].PeerID().String(), nodes[1:]...) + expectNoProviders(t, cidChild, nodes[1:]...) + }) + + t.Run("Provide with 'mfs' strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodes(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "mfs") + }) + defer nodes.StopDaemons() + + // Add a file to MFS (should be provided) + data := testutils.RandomBytes(1000) + cid := nodes[0].IPFSAdd(bytes.NewReader(data), "-Q") + + // not yet in MFS + expectNoProviders(t, cid, nodes[1:]...) + + nodes[0].IPFS("files", "cp", "/ipfs/"+cid, "/myfile") + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) + }) + + t.Run("Reprovides with 'all' strategy when strategy is '' (empty)", func(t *testing.T) { + t.Parallel() + + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "") + }) + + cid := nodes[0].IPFSAddStr(time.Now().String()) + + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + expectNoProviders(t, cid, nodes[1:]...) + + nodes[0].IPFS("routing", "reprovide") + + expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...) + }) + + t.Run("Reprovides with 'all' strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "all") + }) + + cid := nodes[0].IPFSAddStr(time.Now().String()) + + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() expectNoProviders(t, cid, nodes[1:]...) nodes[0].IPFS("routing", "reprovide") @@ -151,13 +311,14 @@ func TestProvider(t *testing.T) { t.Run("Reprovides with 'flat' strategy", func(t *testing.T) { t.Parallel() - nodes := initNodes(t, 2, func(n *harness.Node) { + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { n.SetIPFSConfig("Reprovider.Strategy", "flat") }) + + cid := nodes[0].IPFSAddStr(time.Now().String()) + + nodes = nodes.StartDaemons().Connect() defer nodes.StopDaemons() - - cid := nodes[0].IPFSAddStr(time.Now().String(), "--local") - expectNoProviders(t, cid, nodes[1:]...) nodes[0].IPFS("routing", "reprovide") @@ -171,22 +332,31 @@ func TestProvider(t *testing.T) { foo := testutils.RandomBytes(1000) bar := testutils.RandomBytes(1000) - nodes := initNodes(t, 2, func(n *harness.Node) { + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { n.SetIPFSConfig("Reprovider.Strategy", "pinned") }) + + // Add a pin while offline so it cannot be provided + cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w") + + nodes = nodes.StartDaemons().Connect() defer nodes.StopDaemons() - cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo), "--offline", "--pin=false") - cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--offline", "--pin=false") - cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "--offline", "-w") + // Add content without pinning while daemon line + cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo), "--pin=false") + cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false") + // Nothing should have been provided. The pin was offline, and + // the others should not be provided per the strategy. expectNoProviders(t, cidFoo, nodes[1:]...) expectNoProviders(t, cidBar, nodes[1:]...) expectNoProviders(t, cidBarDir, nodes[1:]...) nodes[0].IPFS("routing", "reprovide") + // cidFoo is not pinned so should not be provided. expectNoProviders(t, cidFoo, nodes[1:]...) + // cidBar gets provided by being a child from cidBarDir even though we added with pin=false. expectProviders(t, cidBar, nodes[0].PeerID().String(), nodes[1:]...) expectProviders(t, cidBarDir, nodes[0].PeerID().String(), nodes[1:]...) }) @@ -196,28 +366,87 @@ func TestProvider(t *testing.T) { foo := testutils.RandomBytes(1000) bar := testutils.RandomBytes(1000) - baz := testutils.RandomBytes(1000) - nodes := initNodes(t, 2, func(n *harness.Node) { + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { n.SetIPFSConfig("Reprovider.Strategy", "roots") }) + n0pid := nodes[0].PeerID().String() + + // Add a pin. Only root should get pinned but not provided + // because node not started + cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "-w") + + nodes = nodes.StartDaemons().Connect() defer nodes.StopDaemons() - cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo), "--offline", "--pin=false") - cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--offline", "--pin=false") - cidBaz := nodes[0].IPFSAdd(bytes.NewReader(baz), "--offline") - cidBarDir := nodes[0].IPFSAdd(bytes.NewReader(bar), "-Q", "--offline", "-w") + cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo)) + cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false") - expectNoProviders(t, cidFoo, nodes[1:]...) + // cidFoo will get provided per the strategy but cidBar will not. + expectProviders(t, cidFoo, n0pid, nodes[1:]...) expectNoProviders(t, cidBar, nodes[1:]...) - expectNoProviders(t, cidBarDir, nodes[1:]...) nodes[0].IPFS("routing", "reprovide") - expectNoProviders(t, cidFoo, nodes[1:]...) + expectProviders(t, cidFoo, n0pid, nodes[1:]...) expectNoProviders(t, cidBar, nodes[1:]...) - expectProviders(t, cidBaz, nodes[0].PeerID().String(), nodes[1:]...) - expectProviders(t, cidBarDir, nodes[0].PeerID().String(), nodes[1:]...) + expectProviders(t, cidBarDir, n0pid, nodes[1:]...) + }) + + t.Run("Reprovides with 'mfs' strategy", func(t *testing.T) { + t.Parallel() + + bar := testutils.RandomBytes(1000) + + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "mfs") + }) + n0pid := nodes[0].PeerID().String() + + // add something and lets put it in MFS + cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false", "-Q") + nodes[0].IPFS("files", "cp", "/ipfs/"+cidBar, "/myfile") + + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + + // cidBar is in MFS but not provided + expectNoProviders(t, cidBar, nodes[1:]...) + + nodes[0].IPFS("routing", "reprovide") + + // And now is provided + expectProviders(t, cidBar, n0pid, nodes[1:]...) + }) + + t.Run("Reprovides with 'pinned+mfs' strategy", func(t *testing.T) { + t.Parallel() + + nodes := initNodesWithoutStart(t, 2, func(n *harness.Node) { + n.SetIPFSConfig("Reprovider.Strategy", "pinned+mfs") + }) + n0pid := nodes[0].PeerID().String() + + // Add a pinned CID (should be provided) + cidPinned := nodes[0].IPFSAddStr("pinned content", "--pin=true") + // Add a CID to MFS (should be provided) + cidMFS := nodes[0].IPFSAddStr("mfs content") + nodes[0].IPFS("files", "cp", "/ipfs/"+cidMFS, "/myfile") + // Add a CID that is neither pinned nor in MFS (should not be provided) + cidNeither := nodes[0].IPFSAddStr("neither content", "--pin=false") + + nodes = nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + + // Trigger reprovide + nodes[0].IPFS("routing", "reprovide") + + // Check that pinned CID is provided + expectProviders(t, cidPinned, n0pid, nodes[1:]...) + // Check that MFS CID is provided + expectProviders(t, cidMFS, n0pid, nodes[1:]...) + // Check that neither CID is not provided + expectNoProviders(t, cidNeither, nodes[1:]...) }) t.Run("provide clear command removes items from provide queue", func(t *testing.T) { diff --git a/test/dependencies/go.mod b/test/dependencies/go.mod index 6fdd5cc38..0ae5db95f 100644 --- a/test/dependencies/go.mod +++ b/test/dependencies/go.mod @@ -31,6 +31,7 @@ require ( github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e // indirect github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24 // indirect github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.0 // indirect + github.com/Jorropo/jsync v1.0.1 // indirect github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/OpenPeeDeeP/depguard/v2 v2.2.0 // indirect github.com/alecthomas/go-check-sumtype v0.1.4 // indirect @@ -130,7 +131,7 @@ require ( github.com/huin/goupnp v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect - github.com/ipfs/boxo v0.33.1 // indirect + github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-block-format v0.2.2 // indirect github.com/ipfs/go-cid v0.5.0 // indirect diff --git a/test/dependencies/go.sum b/test/dependencies/go.sum index 7fe8af796..18cf795e1 100644 --- a/test/dependencies/go.sum +++ b/test/dependencies/go.sum @@ -33,6 +33,8 @@ github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24 h1:sHglBQTwgx+rW github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24/go.mod h1:4UJr5HIiMZrwgkSPdsjy2uOQExX/WEILpIrO9UPGuXs= github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.0 h1:/fTUt5vmbkAcMBt4YQiuC23cV0kEsN1MVMNqeOW43cU= github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.0/go.mod h1:ONJg5sxcbsdQQ4pOW8TGdTidT2TMAUy/2Xhr8mrYaao= +github.com/Jorropo/jsync v1.0.1 h1:6HgRolFZnsdfzRUj+ImB9og1JYOxQoReSywkHOGSaUU= +github.com/Jorropo/jsync v1.0.1/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ= github.com/Kubuxu/gocovmerge v0.0.0-20161216165753-7ecaa51963cd h1:HNhzThEtZW714v8Eda8sWWRcu9WSzJC+oCyjRjvZgRA= github.com/Kubuxu/gocovmerge v0.0.0-20161216165753-7ecaa51963cd/go.mod h1:bqoB8kInrTeEtYAwaIXoSRqdwnjQmFhsfusnzyui6yY= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= @@ -319,8 +321,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.33.1 h1:89m+ksw+cYi0ecTNTJ71IRS5ZrLiovmO6XWHIOGhAEg= -github.com/ipfs/boxo v0.33.1/go.mod h1:KwlJTzv5fb1GLlA9KyMqHQmvP+4mrFuiE3PnjdrPJHs= +github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08 h1:PtntQQtYOh7YTCRnrU1idTuOwxEi0ZmYM4u7ZfSAExY= +github.com/ipfs/boxo v0.33.2-0.20250804224807-e5da058ebb08/go.mod h1:KwlJTzv5fb1GLlA9KyMqHQmvP+4mrFuiE3PnjdrPJHs= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.2.2 h1:uecCTgRwDIXyZPgYspaLXoMiMmxQpSx2aq34eNc4YvQ=