feat: fast provide

This commit is contained in:
guillaumemichel 2025-10-31 16:57:22 +01:00
parent e05357ed19
commit 4dcd6ac681
No known key found for this signature in database
GPG Key ID: 612745DB2E6D0E15
4 changed files with 59 additions and 3 deletions

View File

@ -66,6 +66,7 @@ const (
modeOptionName = "mode"
mtimeOptionName = "mtime"
mtimeNsecsOptionName = "mtime-nsecs"
fastProvideOptionName = "fast-provide"
)
const adderOutChanSize = 8
@ -213,6 +214,7 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import
cmds.UintOption(modeOptionName, "Custom POSIX file mode to store in created UnixFS entries. WARNING: experimental, forces dag-pb for root block, disables raw-leaves"),
cmds.Int64Option(mtimeOptionName, "Custom POSIX modification time to store in created UnixFS entries (seconds before or after the Unix Epoch). WARNING: experimental, forces dag-pb for root block, disables raw-leaves"),
cmds.UintOption(mtimeNsecsOptionName, "Custom POSIX modification time (optional time fraction in nanoseconds)"),
cmds.BoolOption(fastProvideOptionName, "Apply fast-provide function to the root CID after add completes").WithDefault(true), // TODO: default could be Provide.DHT.SweepEnabled
},
PreRun: func(req *cmds.Request, env cmds.Environment) error {
quiet, _ := req.Options[quietOptionName].(bool)
@ -283,6 +285,7 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import
mode, _ := req.Options[modeOptionName].(uint)
mtime, _ := req.Options[mtimeOptionName].(int64)
mtimeNsecs, _ := req.Options[mtimeNsecsOptionName].(uint)
fastProvide, _ := req.Options[fastProvideOptionName].(bool)
if chunker == "" {
chunker = cfg.Import.UnixFSChunker.WithDefault(config.DefaultUnixFSChunker)
@ -421,11 +424,12 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import
}
var added int
var fileAddedToMFS bool
var lastRootCid path.ImmutablePath // Track the root CID for fast-provide
addit := toadd.Entries()
for addit.Next() {
_, dir := addit.Node().(files.Directory)
errCh := make(chan error, 1)
events := make(chan interface{}, adderOutChanSize)
events := make(chan any, adderOutChanSize)
opts[len(opts)-1] = options.Unixfs.Events(events)
go func() {
@ -437,6 +441,9 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import
return
}
// Store the root CID for potential fast-provide operation
lastRootCid = pathAdded
// creating MFS pointers when optional --to-files is set
if toFilesSet {
if addit.Name() == "" {
@ -560,12 +567,21 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import
return fmt.Errorf("expected a file argument")
}
// Apply fast-provide if the flag is enabled
if fastProvide && (lastRootCid != path.ImmutablePath{}) {
// Blocks until root CID is provided to the DHT.
// TODO: consider logging that fast-provide is in progress for user
if err := provideRoot(req.Context, ipfsNode.DHTClient, lastRootCid.RootCid()); err != nil {
log.Warnf("fast-provide failed for root CID %s: %s", lastRootCid.String(), err)
}
}
return nil
},
PostRun: cmds.PostRunMap{
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
sizeChan := make(chan int64, 1)
outChan := make(chan interface{})
outChan := make(chan any)
req := res.Request()
// Could be slow.

View File

@ -1,6 +1,7 @@
package commands
import (
"context"
"errors"
"fmt"
"io"
@ -11,6 +12,7 @@ import (
humanize "github.com/dustin/go-humanize"
boxoprovider "github.com/ipfs/boxo/provider"
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
@ -18,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/provider/buffered"
"github.com/libp2p/go-libp2p-kad-dht/provider/dual"
"github.com/libp2p/go-libp2p-kad-dht/provider/stats"
routing "github.com/libp2p/go-libp2p/core/routing"
"github.com/probe-lab/go-libdht/kad/key"
"golang.org/x/exp/constraints"
)
@ -571,3 +574,15 @@ func humanSI(val float64, decimals int) string {
func humanFull(val float64, decimals int) string {
return humanize.CommafWithDigits(val, decimals)
}
// provideRoot performs a provide operation on the supplied DHT client for the
// given CID.
//
// - If the accelerated DHT client is used, a DHT lookup isn't needed, we
// directly allocate provider records to closest peers.
// - If Provide.DHT.SweepEnabled=true or OptimisticProvide=true, we make an
// optimistic provide call.
// - Else we make a standard provide call (much slower).
func provideRoot(ctx context.Context, router routing.Routing, c cid.Cid) error {
return router.Provide(ctx, c, true)
}

View File

@ -211,6 +211,10 @@ var provideRefRoutingCmd = &cmds.Command{
ctx, events := routing.RegisterForQueryEvents(ctx)
var provideErr error
// TODO: not sure if necessary to call StartProviding for `ipfs routing
// provide <cid>`, since either cid is already being provided, or it will
// be garbage collected and not reprovided anyway. So we may simply stick
// with a single (optimistic) provide, and skip StartProviding call.
go func() {
defer cancel()
if rec {
@ -226,6 +230,14 @@ var provideRefRoutingCmd = &cmds.Command{
}
}()
if dhtClient := nd.DHTClient; dhtClient != nil {
// If node has a DHT client, provide immediately the supplied cids before
// returning.
for _, c := range cids {
provideRoot(req.Context, dhtClient, c)
}
}
for e := range events {
if err := res.Emit(e); err != nil {
return err
@ -300,6 +312,7 @@ func provideCids(prov node.DHTProvider, cids []cid.Cid) error {
for i, c := range cids {
mhs[i] = c.Hash()
}
// providing happens asynchronously
return prov.StartProviding(true, mhs...)
}

View File

@ -55,12 +55,24 @@ func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHo
return out, err
}
// Optimistic provide is enabled either via dedicated expierimental flag, or when DHT Provide Sweep is enabled.
// When DHT Provide Sweep is enabled, all provide operations go through the
// `SweepingProvider`, hence the provides don't use the optimistic provide
// logic. Provides use `SweepingProvider.StartProviding()` and not
// `IpfsDHT.Provide()`, which is where the optimistic provide logic is
// implemented. However, `IpfsDHT.Provide()` is used to quickly provide roots
// when user manually adds content with the `--fast-provide` flag enabled. In
// this case we want to use optimistic provide logic to quickly announce the
// content to the network. This should be the only use case of
// `IpfsDHT.Provide()` when DHT Provide Sweep is enabled.
optimisticProvide := cfg.Experimental.OptimisticProvide || cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled)
routingOptArgs := RoutingOptionArgs{
Ctx: ctx,
Datastore: params.Repo.Datastore(),
Validator: params.Validator,
BootstrapPeers: bootstrappers,
OptimisticProvide: cfg.Experimental.OptimisticProvide,
OptimisticProvide: optimisticProvide,
OptimisticProvideJobsPoolSize: cfg.Experimental.OptimisticProvideJobsPoolSize,
LoopbackAddressesOnLanDHT: cfg.Routing.LoopbackAddressesOnLanDHT.WithDefault(config.DefaultLoopbackAddressesOnLanDHT),
}