package node import ( "context" "errors" "fmt" "github.com/ipfs/boxo/blockservice" blockstore "github.com/ipfs/boxo/blockstore" exchange "github.com/ipfs/boxo/exchange" offline "github.com/ipfs/boxo/exchange/offline" "github.com/ipfs/boxo/fetcher" bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/boxo/filestore" "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/boxo/ipld/unixfs" "github.com/ipfs/boxo/mfs" pathresolver "github.com/ipfs/boxo/path/resolver" pin "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/boxo/pinning/pinner/dspinner" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" format "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-unixfsnode" dagpb "github.com/ipld/go-codec-dagpb" "go.uber.org/fx" "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core/node/helpers" "github.com/ipfs/kubo/repo" ) // FilesRootDatastoreKey is the datastore key for the MFS files root CID. var FilesRootDatastoreKey = datastore.NewKey("/local/filesroot") // BlockService creates new blockservice which provides an interface to fetch content-addressable blocks func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { bsvc := blockservice.New(bs, rem, blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)), ) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return bsvc.Close() }, }) return bsvc } } // Pinning creates new pinner which tells GC which blocks should be kept func Pinning(strategy string) func(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo, prov DHTProvider) (pin.Pinner, error) { // Parse strategy at function creation time (not inside the returned function) // This happens before the provider is created, which is why we pass the strategy // string and parse it here, rather than using fx-provided ProvidingStrategy. strategyFlag := config.ParseProvideStrategy(strategy) return func(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo, prov DHTProvider, ) (pin.Pinner, error) { rootDS := repo.Datastore() syncFn := func(ctx context.Context) error { if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil { return err } return rootDS.Sync(ctx, filestore.FilestorePrefix) } syncDs := &syncDagService{ds, syncFn} ctx := context.TODO() var opts []dspinner.Option roots := (strategyFlag & config.ProvideStrategyRoots) != 0 pinned := (strategyFlag & config.ProvideStrategyPinned) != 0 // Important: Only one of WithPinnedProvider or WithRootsProvider should be active. // Having both would cause duplicate root advertisements since "pinned" includes all // pinned content (roots + children), while "roots" is just the root CIDs. // We prioritize "pinned" if both are somehow set (though this shouldn't happen // with proper strategy parsing). if pinned { opts = append(opts, dspinner.WithPinnedProvider(prov)) } else if roots { opts = append(opts, dspinner.WithRootsProvider(prov)) } pinning, err := dspinner.New(ctx, rootDS, syncDs, opts...) if err != nil { return nil, err } return pinning, nil } } var ( _ merkledag.SessionMaker = new(syncDagService) _ format.DAGService = new(syncDagService) ) // syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore type syncDagService struct { format.DAGService syncFn func(context.Context) error } func (s *syncDagService) Sync(ctx context.Context) error { return s.syncFn(ctx) } func (s *syncDagService) Session(ctx context.Context) format.NodeGetter { return merkledag.NewSession(ctx, s.DAGService) } // FetchersOut allows injection of fetchers. type FetchersOut struct { fx.Out IPLDFetcher fetcher.Factory `name:"ipldFetcher"` UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"` OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"` OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"` } // FetchersIn allows using fetchers for other dependencies. type FetchersIn struct { fx.In IPLDFetcher fetcher.Factory `name:"ipldFetcher"` UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"` OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"` OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"` } // FetcherConfig returns a fetcher config that can build new fetcher instances func FetcherConfig(bs blockservice.BlockService) FetchersOut { ipldFetcher := bsfetcher.NewFetcherConfig(bs) ipldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser) unixFSFetcher := ipldFetcher.WithReifier(unixfsnode.Reify) // Construct offline versions which we can safely use in contexts where // path resolution should not fetch new blocks via exchange. offlineBs := blockservice.New(bs.Blockstore(), offline.Exchange(bs.Blockstore())) offlineIpldFetcher := bsfetcher.NewFetcherConfig(offlineBs) offlineIpldFetcher.SkipNotFound = true // carries onto the UnixFSFetcher below offlineIpldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser) offlineUnixFSFetcher := offlineIpldFetcher.WithReifier(unixfsnode.Reify) return FetchersOut{ IPLDFetcher: ipldFetcher, UnixfsFetcher: unixFSFetcher, OfflineIPLDFetcher: offlineIpldFetcher, OfflineUnixfsFetcher: offlineUnixFSFetcher, } } // PathResolversOut allows injection of path resolvers type PathResolversOut struct { fx.Out IPLDPathResolver pathresolver.Resolver `name:"ipldPathResolver"` UnixFSPathResolver pathresolver.Resolver `name:"unixFSPathResolver"` OfflineIPLDPathResolver pathresolver.Resolver `name:"offlineIpldPathResolver"` OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"` } // PathResolverConfig creates path resolvers with the given fetchers. func PathResolverConfig(fetchers FetchersIn) PathResolversOut { return PathResolversOut{ IPLDPathResolver: pathresolver.NewBasicResolver(fetchers.IPLDFetcher), UnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.UnixfsFetcher), OfflineIPLDPathResolver: pathresolver.NewBasicResolver(fetchers.OfflineIPLDFetcher), OfflineUnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.OfflineUnixfsFetcher), } } // Dag creates new DAGService func Dag(bs blockservice.BlockService) format.DAGService { return merkledag.NewDAGService(bs) } // Files loads persisted MFS root func Files(strategy string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov DHTProvider) (*mfs.Root, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov DHTProvider) (*mfs.Root, error) { pf := func(ctx context.Context, c cid.Cid) error { rootDS := repo.Datastore() if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil { return err } if err := rootDS.Sync(ctx, filestore.FilestorePrefix); err != nil { return err } if err := rootDS.Put(ctx, FilesRootDatastoreKey, c.Bytes()); err != nil { return err } return rootDS.Sync(ctx, FilesRootDatastoreKey) } var nd *merkledag.ProtoNode ctx := helpers.LifecycleCtx(mctx, lc) val, err := repo.Datastore().Get(ctx, FilesRootDatastoreKey) switch { case errors.Is(err, datastore.ErrNotFound): nd = unixfs.EmptyDirNode() err := dag.Add(ctx, nd) if err != nil { return nil, fmt.Errorf("failure writing filesroot to dagstore: %s", err) } case err == nil: c, err := cid.Cast(val) if err != nil { return nil, err } offlineDag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) rnd, err := offlineDag.Get(ctx, c) if err != nil { return nil, fmt.Errorf("error loading filesroot from dagservice: %s", err) } pbnd, ok := rnd.(*merkledag.ProtoNode) if !ok { return nil, merkledag.ErrNotProtobuf } nd = pbnd default: return nil, err } // MFS (Mutable File System) provider integration: Only pass the provider // to MFS when the strategy includes "mfs". MFS will call StartProviding() // on every DAGService.Add() operation, which is sufficient for the "mfs" // strategy - it ensures all MFS content gets announced as it's added or // modified. For non-mfs strategies, we set provider to nil to avoid // unnecessary providing. strategyFlag := config.ParseProvideStrategy(strategy) if strategyFlag&config.ProvideStrategyMFS == 0 { prov = nil } // Get configured settings from Import config cfg, err := repo.Config() if err != nil { return nil, fmt.Errorf("failed to get config: %w", err) } chunkerGen := cfg.Import.UnixFSSplitterFunc() maxDirLinks := int(cfg.Import.UnixFSDirectoryMaxLinks.WithDefault(config.DefaultUnixFSDirectoryMaxLinks)) maxHAMTFanout := int(cfg.Import.UnixFSHAMTDirectoryMaxFanout.WithDefault(config.DefaultUnixFSHAMTDirectoryMaxFanout)) hamtShardingSize := int(cfg.Import.UnixFSHAMTDirectorySizeThreshold.WithDefault(config.DefaultUnixFSHAMTDirectorySizeThreshold)) sizeEstimationMode := cfg.Import.HAMTSizeEstimationMode() root, err := mfs.NewRoot(ctx, dag, nd, pf, prov, mfs.WithChunker(chunkerGen), mfs.WithMaxLinks(maxDirLinks), mfs.WithMaxHAMTFanout(maxHAMTFanout), mfs.WithHAMTShardingSize(hamtShardingSize), mfs.WithSizeEstimationMode(sizeEstimationMode), ) if err != nil { return nil, fmt.Errorf("failed to initialize MFS root from %s stored at %s: %w. "+ "If corrupted, use 'ipfs files chroot' to reset (see --help)", nd.Cid(), FilesRootDatastoreKey, err) } lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return root.Close() }, }) return root, err } }