kubo/core/node/storage.go
Hector Sanjuan a673c2ec95
fix: Provide according to Reprovider.Strategy (#10886)
* Provide according to strategy

Updates boxo to a version with the changes from https://github.com/ipfs/boxo/pull/976, which decentralize the providing responsibilities (from a central providing.Exchange to blockstore, pinner, mfs).

The changes consist in initializing the Pinner, MFS and the blockstore with the provider.System, which is created first.

Since the provider.System is created first, the reproviding KeyChanFunc is set
later when we can create it once we have the Pinner, MFS and the blockstore.

Some additional work applies to the Add() workflow. Normally, blocks would get provided at the Blockstore or the Pinner, but when adding blocks AND a "pinned" strategy is used, the blockstore does not provide, and the
pinner does not traverse the DAG (and thus doesn't provide either), so we need to provide directly from the Adder. This is resolved by wrapping the DAGService in a "providingDAGService" which provides every added block, when using the "pinned" strategy.

`ipfs --offline add` when the ONLINE daemon is running will now announce blocks per the chosen strategy, where before it did not announce them. This is documented in the changelog. A couple of releases ago, adding with `ipfs --offline add` was faster, but this is no longer the case so we are not incurring in any penalties by sticking to the fact that the daemon is online and has a providing strategy that we follow.

Co-authored-by: gammazero <11790789+gammazero@users.noreply.github.com>
Co-authored-by: Marcin Rataj <lidel@lidel.org>
2025-08-08 10:56:44 +02:00

94 lines
3.1 KiB
Go

package node
import (
blockstore "github.com/ipfs/boxo/blockstore"
provider "github.com/ipfs/boxo/provider"
"github.com/ipfs/go-datastore"
config "github.com/ipfs/kubo/config"
"go.uber.org/fx"
"github.com/ipfs/boxo/filestore"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/kubo/thirdparty/verifbs"
)
// RepoConfig loads configuration from the repo
func RepoConfig(repo repo.Repo) (*config.Config, error) {
cfg, err := repo.Config()
return cfg, err
}
// Datastore provides the datastore
func Datastore(repo repo.Repo) datastore.Datastore {
return repo.Datastore()
}
// BaseBlocks is the lower level blockstore without GC or Filestore layers
type BaseBlocks blockstore.Blockstore
// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
func BaseBlockstoreCtor(
cacheOpts blockstore.CacheOpts,
hashOnRead bool,
writeThrough bool,
providingStrategy string,
) func(mctx helpers.MetricsCtx, repo repo.Repo, prov provider.System, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, prov provider.System, lc fx.Lifecycle) (bs BaseBlocks, err error) {
opts := []blockstore.Option{blockstore.WriteThrough(writeThrough)}
// Blockstore providing integration:
// When strategy includes "all" or "flat", the blockstore directly provides blocks as they're Put.
// Important: Provide calls from blockstore are intentionally BLOCKING.
// The Provider implementation (not the blockstore) should handle concurrency/queuing.
// This avoids spawning unbounded goroutines for concurrent block additions.
strategyFlag := config.ParseReproviderStrategy(providingStrategy)
shouldProvide := config.ReproviderStrategyAll | config.ReproviderStrategyFlat
if strategyFlag&shouldProvide != 0 {
opts = append(opts, blockstore.Provider(prov))
}
// hash security
bs = blockstore.NewBlockstore(
repo.Datastore(),
opts...,
)
bs = &verifbs.VerifBS{Blockstore: bs}
bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts)
if err != nil {
return nil, err
}
bs = blockstore.NewIdStore(bs)
if hashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly?
bs.HashOnRead(true)
}
return
}
}
// GcBlockstoreCtor wraps the base blockstore with GC and Filestore layers
func GcBlockstoreCtor(bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore) {
gclocker = blockstore.NewGCLocker()
gcbs = blockstore.NewGCBlockstore(bb, gclocker)
bs = gcbs
return
}
// FilestoreBlockstoreCtor wraps GcBlockstore and adds Filestore support
func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
gclocker = blockstore.NewGCLocker()
// hash security
fstore = filestore.NewFilestore(bb, repo.FileManager())
gcbs = blockstore.NewGCBlockstore(fstore, gclocker)
gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}
bs = gcbs
return
}