From 4dcd6ac681fc59d11cc98ae8d5d36e7df80c0004 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Fri, 31 Oct 2025 16:57:22 +0100 Subject: [PATCH] feat: fast provide --- core/commands/add.go | 20 ++++++++++++++++++-- core/commands/provide.go | 15 +++++++++++++++ core/commands/routing.go | 13 +++++++++++++ core/node/libp2p/host.go | 14 +++++++++++++- 4 files changed, 59 insertions(+), 3 deletions(-) diff --git a/core/commands/add.go b/core/commands/add.go index f314bbf64..3275876bb 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -66,6 +66,7 @@ const ( modeOptionName = "mode" mtimeOptionName = "mtime" mtimeNsecsOptionName = "mtime-nsecs" + fastProvideOptionName = "fast-provide" ) const adderOutChanSize = 8 @@ -213,6 +214,7 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import cmds.UintOption(modeOptionName, "Custom POSIX file mode to store in created UnixFS entries. WARNING: experimental, forces dag-pb for root block, disables raw-leaves"), cmds.Int64Option(mtimeOptionName, "Custom POSIX modification time to store in created UnixFS entries (seconds before or after the Unix Epoch). WARNING: experimental, forces dag-pb for root block, disables raw-leaves"), cmds.UintOption(mtimeNsecsOptionName, "Custom POSIX modification time (optional time fraction in nanoseconds)"), + cmds.BoolOption(fastProvideOptionName, "Apply fast-provide function to the root CID after add completes").WithDefault(true), // TODO: default could be Provide.DHT.SweepEnabled }, PreRun: func(req *cmds.Request, env cmds.Environment) error { quiet, _ := req.Options[quietOptionName].(bool) @@ -283,6 +285,7 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import mode, _ := req.Options[modeOptionName].(uint) mtime, _ := req.Options[mtimeOptionName].(int64) mtimeNsecs, _ := req.Options[mtimeNsecsOptionName].(uint) + fastProvide, _ := req.Options[fastProvideOptionName].(bool) if chunker == "" { chunker = cfg.Import.UnixFSChunker.WithDefault(config.DefaultUnixFSChunker) @@ -421,11 +424,12 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import } var added int var fileAddedToMFS bool + var lastRootCid path.ImmutablePath // Track the root CID for fast-provide addit := toadd.Entries() for addit.Next() { _, dir := addit.Node().(files.Directory) errCh := make(chan error, 1) - events := make(chan interface{}, adderOutChanSize) + events := make(chan any, adderOutChanSize) opts[len(opts)-1] = options.Unixfs.Events(events) go func() { @@ -437,6 +441,9 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import return } + // Store the root CID for potential fast-provide operation + lastRootCid = pathAdded + // creating MFS pointers when optional --to-files is set if toFilesSet { if addit.Name() == "" { @@ -560,12 +567,21 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import return fmt.Errorf("expected a file argument") } + // Apply fast-provide if the flag is enabled + if fastProvide && (lastRootCid != path.ImmutablePath{}) { + // Blocks until root CID is provided to the DHT. + // TODO: consider logging that fast-provide is in progress for user + if err := provideRoot(req.Context, ipfsNode.DHTClient, lastRootCid.RootCid()); err != nil { + log.Warnf("fast-provide failed for root CID %s: %s", lastRootCid.String(), err) + } + } + return nil }, PostRun: cmds.PostRunMap{ cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error { sizeChan := make(chan int64, 1) - outChan := make(chan interface{}) + outChan := make(chan any) req := res.Request() // Could be slow. diff --git a/core/commands/provide.go b/core/commands/provide.go index ba2be7d7b..a179a8304 100644 --- a/core/commands/provide.go +++ b/core/commands/provide.go @@ -1,6 +1,7 @@ package commands import ( + "context" "errors" "fmt" "io" @@ -11,6 +12,7 @@ import ( humanize "github.com/dustin/go-humanize" boxoprovider "github.com/ipfs/boxo/provider" + cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" "github.com/libp2p/go-libp2p-kad-dht/fullrt" @@ -18,6 +20,7 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/provider/buffered" "github.com/libp2p/go-libp2p-kad-dht/provider/dual" "github.com/libp2p/go-libp2p-kad-dht/provider/stats" + routing "github.com/libp2p/go-libp2p/core/routing" "github.com/probe-lab/go-libdht/kad/key" "golang.org/x/exp/constraints" ) @@ -571,3 +574,15 @@ func humanSI(val float64, decimals int) string { func humanFull(val float64, decimals int) string { return humanize.CommafWithDigits(val, decimals) } + +// provideRoot performs a provide operation on the supplied DHT client for the +// given CID. +// +// - If the accelerated DHT client is used, a DHT lookup isn't needed, we +// directly allocate provider records to closest peers. +// - If Provide.DHT.SweepEnabled=true or OptimisticProvide=true, we make an +// optimistic provide call. +// - Else we make a standard provide call (much slower). +func provideRoot(ctx context.Context, router routing.Routing, c cid.Cid) error { + return router.Provide(ctx, c, true) +} diff --git a/core/commands/routing.go b/core/commands/routing.go index c772e2045..e8c6cdc18 100644 --- a/core/commands/routing.go +++ b/core/commands/routing.go @@ -211,6 +211,10 @@ var provideRefRoutingCmd = &cmds.Command{ ctx, events := routing.RegisterForQueryEvents(ctx) var provideErr error + // TODO: not sure if necessary to call StartProviding for `ipfs routing + // provide `, since either cid is already being provided, or it will + // be garbage collected and not reprovided anyway. So we may simply stick + // with a single (optimistic) provide, and skip StartProviding call. go func() { defer cancel() if rec { @@ -226,6 +230,14 @@ var provideRefRoutingCmd = &cmds.Command{ } }() + if dhtClient := nd.DHTClient; dhtClient != nil { + // If node has a DHT client, provide immediately the supplied cids before + // returning. + for _, c := range cids { + provideRoot(req.Context, dhtClient, c) + } + } + for e := range events { if err := res.Emit(e); err != nil { return err @@ -300,6 +312,7 @@ func provideCids(prov node.DHTProvider, cids []cid.Cid) error { for i, c := range cids { mhs[i] = c.Hash() } + // providing happens asynchronously return prov.StartProviding(true, mhs...) } diff --git a/core/node/libp2p/host.go b/core/node/libp2p/host.go index 9e71d3359..0cb85f454 100644 --- a/core/node/libp2p/host.go +++ b/core/node/libp2p/host.go @@ -55,12 +55,24 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo return out, err } + // Optimistic provide is enabled either via dedicated expierimental flag, or when DHT Provide Sweep is enabled. + // When DHT Provide Sweep is enabled, all provide operations go through the + // `SweepingProvider`, hence the provides don't use the optimistic provide + // logic. Provides use `SweepingProvider.StartProviding()` and not + // `IpfsDHT.Provide()`, which is where the optimistic provide logic is + // implemented. However, `IpfsDHT.Provide()` is used to quickly provide roots + // when user manually adds content with the `--fast-provide` flag enabled. In + // this case we want to use optimistic provide logic to quickly announce the + // content to the network. This should be the only use case of + // `IpfsDHT.Provide()` when DHT Provide Sweep is enabled. + optimisticProvide := cfg.Experimental.OptimisticProvide || cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled) + routingOptArgs := RoutingOptionArgs{ Ctx: ctx, Datastore: params.Repo.Datastore(), Validator: params.Validator, BootstrapPeers: bootstrappers, - OptimisticProvide: cfg.Experimental.OptimisticProvide, + OptimisticProvide: optimisticProvide, OptimisticProvideJobsPoolSize: cfg.Experimental.OptimisticProvideJobsPoolSize, LoopbackAddressesOnLanDHT: cfg.Routing.LoopbackAddressesOnLanDHT.WithDefault(config.DefaultLoopbackAddressesOnLanDHT), }