mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-07 09:17:49 +08:00
* reprovide sweep draft
* update reprovider dep
* go mod tidy
* fix provider type
* change router type
* dual reprovider
* revert to provider.System
* back to start
* SweepingReprovider test
* fix nil pointer deref
* noop provider for nil dht
* disabled initial network estimation
* another iteration
* suppress missing self addrs err
* silence empty rt err on lan dht
* comments
* new attempt at integrating
* reverting changes in core/node/libp2p/routing.go
* removing SweepingProvider
* make reprovider optional
* add noop reprovider
* update KeyChanFunc type alias
* restore boxo KeyChanFunc
* fix missing KeyChanFunc
* test(sharness): PARALLEL=1 and timeout 30m
running sequentially to see where timeout occurs
* initialize MHStore
* revert workflow debug
* config
* config docs
* merged IpfsNode provider and reprovider
* move Provider interface to from kad-dht to node
* moved Provider interface from kad-dht to kubo/core/node
* mod_tidy
* Add Clear to Provider interface
* use latest kad-dht commit
* make linter happy
* updated boxo provide interface
* boxo PR fix
* using latest kad-dht commit
* use latest boxo release
* fix fx
* fx cyclic deps
* fix merge issues
* extended tests
* don't provide LAN DHT
* docs
* restore dual dht provider
* don't start provider before it is online
* address linter
* dual/provider fix
* add delay in provider tests for dht bootstrap
* add OfflineDelay parameter to config
* remove increase number of workers in test
* improved keystore gc process
* fix: replace incorrect logger import in coreapi
replaced github.com/labstack/gommon/log with the standard
github.com/ipfs/go-log/v2 logger used throughout kubo.
removed unused labstack dependency from go.mod files.
* fix: remove duplicate WithDefault call in provider config
* fix: use correct option method for burst workers
* fix: improve error messages for experimental sweeping provider
updated error messages to clearly indicate when commands are unavailable
due to experimental sweeping provider being enabled via Reprovider.Sweep.Enabled=true
* docs: remove obsolete KeyStoreGCInterval config
removed from config.md as option no longer exists (removed in b540fba1a)
updated keystore description to reflect gc happens at reprovide interval
* docs: add TODO placeholder changelog for experimental sweeping DHT provider
using v0.38-TODO.md name to avoid merge conflicts with master branch
and allow CI tests to run. will be renamed to v0.38.md once config
migration is added to the PR
* fix: provideKeysRec go routine
* clear keystore on close
* fix: datastore prefix
* fix: improve error handling in provideKeysRec
- close errCh channel to distinguish between nil and pending errors
- check for pending errors when provided.New closes
- handle context cancellation during error send
- prevent race condition where errors could be silently lost
this ensures DAG walk errors are always propagated correctly
* address gammazero's review
* rename BurstProvider to LegacyProvider
* use latest provider/keystore
* boxo: make mfs StartProviding async
* bump boxo
* chore: update boxo to f2b4e12fb9a8ac138ccb82aae3b51ec51d9f631c
- updated boxo dependency to specified commit
- updated go.mod and go.sum files across all modules
* use latest kad-dht/boxo
* Buffered SweepingProvider wrapper
* use latest kad-dht commit
* allow no DHT router
* use latest kad-dht & boxo
---------
Co-authored-by: Marcin Rataj <lidel@lidel.org>
Co-authored-by: gammazero <11790789+gammazero@users.noreply.github.com>
255 lines
8.7 KiB
Go
255 lines
8.7 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/ipfs/boxo/blockservice"
|
|
blockstore "github.com/ipfs/boxo/blockstore"
|
|
exchange "github.com/ipfs/boxo/exchange"
|
|
offline "github.com/ipfs/boxo/exchange/offline"
|
|
"github.com/ipfs/boxo/fetcher"
|
|
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
|
|
"github.com/ipfs/boxo/filestore"
|
|
"github.com/ipfs/boxo/ipld/merkledag"
|
|
"github.com/ipfs/boxo/ipld/unixfs"
|
|
"github.com/ipfs/boxo/mfs"
|
|
pathresolver "github.com/ipfs/boxo/path/resolver"
|
|
pin "github.com/ipfs/boxo/pinning/pinner"
|
|
"github.com/ipfs/boxo/pinning/pinner/dspinner"
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-datastore"
|
|
format "github.com/ipfs/go-ipld-format"
|
|
"github.com/ipfs/go-unixfsnode"
|
|
dagpb "github.com/ipld/go-codec-dagpb"
|
|
"go.uber.org/fx"
|
|
|
|
"github.com/ipfs/kubo/config"
|
|
"github.com/ipfs/kubo/core/node/helpers"
|
|
"github.com/ipfs/kubo/repo"
|
|
)
|
|
|
|
// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
|
|
func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
|
|
return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
|
|
bsvc := blockservice.New(bs, rem,
|
|
blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
|
|
)
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return bsvc.Close()
|
|
},
|
|
})
|
|
|
|
return bsvc
|
|
}
|
|
}
|
|
|
|
// Pinning creates new pinner which tells GC which blocks should be kept
|
|
func Pinning(strategy string) func(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo, prov DHTProvider) (pin.Pinner, error) {
|
|
// Parse strategy at function creation time (not inside the returned function)
|
|
// This happens before the provider is created, which is why we pass the strategy
|
|
// string and parse it here, rather than using fx-provided ProvidingStrategy.
|
|
strategyFlag := config.ParseReproviderStrategy(strategy)
|
|
|
|
return func(bstore blockstore.Blockstore,
|
|
ds format.DAGService,
|
|
repo repo.Repo,
|
|
prov DHTProvider,
|
|
) (pin.Pinner, error) {
|
|
rootDS := repo.Datastore()
|
|
|
|
syncFn := func(ctx context.Context) error {
|
|
if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil {
|
|
return err
|
|
}
|
|
return rootDS.Sync(ctx, filestore.FilestorePrefix)
|
|
}
|
|
syncDs := &syncDagService{ds, syncFn}
|
|
|
|
ctx := context.TODO()
|
|
|
|
var opts []dspinner.Option
|
|
roots := (strategyFlag & config.ReproviderStrategyRoots) != 0
|
|
pinned := (strategyFlag & config.ReproviderStrategyPinned) != 0
|
|
|
|
// Important: Only one of WithPinnedProvider or WithRootsProvider should be active.
|
|
// Having both would cause duplicate root advertisements since "pinned" includes all
|
|
// pinned content (roots + children), while "roots" is just the root CIDs.
|
|
// We prioritize "pinned" if both are somehow set (though this shouldn't happen
|
|
// with proper strategy parsing).
|
|
if pinned {
|
|
opts = append(opts, dspinner.WithPinnedProvider(prov))
|
|
} else if roots {
|
|
opts = append(opts, dspinner.WithRootsProvider(prov))
|
|
}
|
|
|
|
pinning, err := dspinner.New(ctx, rootDS, syncDs, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return pinning, nil
|
|
}
|
|
}
|
|
|
|
var (
|
|
_ merkledag.SessionMaker = new(syncDagService)
|
|
_ format.DAGService = new(syncDagService)
|
|
)
|
|
|
|
// syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore
|
|
type syncDagService struct {
|
|
format.DAGService
|
|
syncFn func(context.Context) error
|
|
}
|
|
|
|
func (s *syncDagService) Sync(ctx context.Context) error {
|
|
return s.syncFn(ctx)
|
|
}
|
|
|
|
func (s *syncDagService) Session(ctx context.Context) format.NodeGetter {
|
|
return merkledag.NewSession(ctx, s.DAGService)
|
|
}
|
|
|
|
// FetchersOut allows injection of fetchers.
|
|
type FetchersOut struct {
|
|
fx.Out
|
|
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
|
|
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
|
|
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
|
|
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
|
|
}
|
|
|
|
// FetchersIn allows using fetchers for other dependencies.
|
|
type FetchersIn struct {
|
|
fx.In
|
|
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
|
|
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
|
|
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
|
|
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
|
|
}
|
|
|
|
// FetcherConfig returns a fetcher config that can build new fetcher instances
|
|
func FetcherConfig(bs blockservice.BlockService) FetchersOut {
|
|
ipldFetcher := bsfetcher.NewFetcherConfig(bs)
|
|
ipldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
|
|
unixFSFetcher := ipldFetcher.WithReifier(unixfsnode.Reify)
|
|
|
|
// Construct offline versions which we can safely use in contexts where
|
|
// path resolution should not fetch new blocks via exchange.
|
|
offlineBs := blockservice.New(bs.Blockstore(), offline.Exchange(bs.Blockstore()))
|
|
offlineIpldFetcher := bsfetcher.NewFetcherConfig(offlineBs)
|
|
offlineIpldFetcher.SkipNotFound = true // carries onto the UnixFSFetcher below
|
|
offlineIpldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
|
|
offlineUnixFSFetcher := offlineIpldFetcher.WithReifier(unixfsnode.Reify)
|
|
|
|
return FetchersOut{
|
|
IPLDFetcher: ipldFetcher,
|
|
UnixfsFetcher: unixFSFetcher,
|
|
OfflineIPLDFetcher: offlineIpldFetcher,
|
|
OfflineUnixfsFetcher: offlineUnixFSFetcher,
|
|
}
|
|
}
|
|
|
|
// PathResolversOut allows injection of path resolvers
|
|
type PathResolversOut struct {
|
|
fx.Out
|
|
IPLDPathResolver pathresolver.Resolver `name:"ipldPathResolver"`
|
|
UnixFSPathResolver pathresolver.Resolver `name:"unixFSPathResolver"`
|
|
OfflineIPLDPathResolver pathresolver.Resolver `name:"offlineIpldPathResolver"`
|
|
OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"`
|
|
}
|
|
|
|
// PathResolverConfig creates path resolvers with the given fetchers.
|
|
func PathResolverConfig(fetchers FetchersIn) PathResolversOut {
|
|
return PathResolversOut{
|
|
IPLDPathResolver: pathresolver.NewBasicResolver(fetchers.IPLDFetcher),
|
|
UnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.UnixfsFetcher),
|
|
OfflineIPLDPathResolver: pathresolver.NewBasicResolver(fetchers.OfflineIPLDFetcher),
|
|
OfflineUnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.OfflineUnixfsFetcher),
|
|
}
|
|
}
|
|
|
|
// Dag creates new DAGService
|
|
func Dag(bs blockservice.BlockService) format.DAGService {
|
|
return merkledag.NewDAGService(bs)
|
|
}
|
|
|
|
// Files loads persisted MFS root
|
|
func Files(strategy string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov DHTProvider) (*mfs.Root, error) {
|
|
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov DHTProvider) (*mfs.Root, error) {
|
|
dsk := datastore.NewKey("/local/filesroot")
|
|
pf := func(ctx context.Context, c cid.Cid) error {
|
|
rootDS := repo.Datastore()
|
|
if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil {
|
|
return err
|
|
}
|
|
if err := rootDS.Sync(ctx, filestore.FilestorePrefix); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := rootDS.Put(ctx, dsk, c.Bytes()); err != nil {
|
|
return err
|
|
}
|
|
return rootDS.Sync(ctx, dsk)
|
|
}
|
|
|
|
var nd *merkledag.ProtoNode
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
val, err := repo.Datastore().Get(ctx, dsk)
|
|
|
|
switch {
|
|
case errors.Is(err, datastore.ErrNotFound):
|
|
nd = unixfs.EmptyDirNode()
|
|
err := dag.Add(ctx, nd)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failure writing filesroot to dagstore: %s", err)
|
|
}
|
|
case err == nil:
|
|
c, err := cid.Cast(val)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
offlineDag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
|
rnd, err := offlineDag.Get(ctx, c)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error loading filesroot from dagservice: %s", err)
|
|
}
|
|
|
|
pbnd, ok := rnd.(*merkledag.ProtoNode)
|
|
if !ok {
|
|
return nil, merkledag.ErrNotProtobuf
|
|
}
|
|
|
|
nd = pbnd
|
|
default:
|
|
return nil, err
|
|
}
|
|
|
|
// MFS (Mutable File System) provider integration: Only pass the provider
|
|
// to MFS when the strategy includes "mfs". MFS will call StartProviding()
|
|
// on every DAGService.Add() operation, which is sufficient for the "mfs"
|
|
// strategy - it ensures all MFS content gets announced as it's added or
|
|
// modified. For non-mfs strategies, we set provider to nil to avoid
|
|
// unnecessary providing.
|
|
strategyFlag := config.ParseReproviderStrategy(strategy)
|
|
if strategyFlag&config.ReproviderStrategyMFS == 0 {
|
|
prov = nil
|
|
}
|
|
|
|
root, err := mfs.NewRoot(ctx, dag, nd, pf, prov)
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return root.Close()
|
|
},
|
|
})
|
|
|
|
return root, err
|
|
}
|
|
}
|