diff --git a/config/import.go b/config/import.go index 27fcef410..d595199c8 100644 --- a/config/import.go +++ b/config/import.go @@ -16,6 +16,8 @@ const ( DefaultUnixFSRawLeaves = false DefaultUnixFSChunker = "size-262144" DefaultHashFunction = "sha2-256" + DefaultFastProvideRoot = true + DefaultFastProvideWait = false DefaultUnixFSHAMTDirectorySizeThreshold = 262144 // 256KiB - https://github.com/ipfs/boxo/blob/6c5a07602aed248acc86598f30ab61923a54a83e/ipld/unixfs/io/directory.go#L26 @@ -48,6 +50,8 @@ type Import struct { UnixFSHAMTDirectorySizeThreshold OptionalBytes BatchMaxNodes OptionalInteger BatchMaxSize OptionalInteger + FastProvideRoot Flag + FastProvideWait Flag } // ValidateImportConfig validates the Import configuration according to UnixFS spec requirements. diff --git a/config/provide.go b/config/provide.go index 666174902..c194a39b5 100644 --- a/config/provide.go +++ b/config/provide.go @@ -22,6 +22,11 @@ const ( DefaultProvideDHTMaxProvideConnsPerWorker = 20 DefaultProvideDHTKeystoreBatchSize = 1 << 14 // ~544 KiB per batch (1 multihash = 34 bytes) DefaultProvideDHTOfflineDelay = 2 * time.Hour + + // DefaultFastProvideTimeout is the maximum time allowed for fast-provide operations. + // Prevents hanging on network issues when providing root CID. + // 10 seconds is sufficient for DHT operations with sweep provider or accelerated client. + DefaultFastProvideTimeout = 10 * time.Second ) type ProvideStrategy int @@ -175,3 +180,25 @@ func ValidateProvideConfig(cfg *Provide) error { return nil } + +// ShouldProvideForStrategy determines if content should be provided based on the provide strategy +// and content characteristics (pinned status, root status, MFS status). +func ShouldProvideForStrategy(strategy ProvideStrategy, isPinned bool, isPinnedRoot bool, isMFS bool) bool { + if strategy == ProvideStrategyAll { + // 'all' strategy: always provide + return true + } + + // For combined strategies, check each component + if strategy&ProvideStrategyPinned != 0 && isPinned { + return true + } + if strategy&ProvideStrategyRoots != 0 && isPinnedRoot { + return true + } + if strategy&ProvideStrategyMFS != 0 && isMFS { + return true + } + + return false +} diff --git a/config/provide_test.go b/config/provide_test.go index 213271eb0..5c8f5fac1 100644 --- a/config/provide_test.go +++ b/config/provide_test.go @@ -105,3 +105,87 @@ func TestValidateProvideConfig_MaxWorkers(t *testing.T) { }) } } + +func TestShouldProvideForStrategy(t *testing.T) { + t.Run("all strategy always provides", func(t *testing.T) { + // ProvideStrategyAll should return true regardless of flags + testCases := []struct{ pinned, pinnedRoot, mfs bool }{ + {false, false, false}, + {true, true, true}, + {true, false, false}, + } + + for _, tc := range testCases { + assert.True(t, ShouldProvideForStrategy( + ProvideStrategyAll, tc.pinned, tc.pinnedRoot, tc.mfs)) + } + }) + + t.Run("single strategies match only their flag", func(t *testing.T) { + tests := []struct { + name string + strategy ProvideStrategy + pinned, pinnedRoot, mfs bool + want bool + }{ + {"pinned: matches when pinned=true", ProvideStrategyPinned, true, false, false, true}, + {"pinned: ignores other flags", ProvideStrategyPinned, false, true, true, false}, + + {"roots: matches when pinnedRoot=true", ProvideStrategyRoots, false, true, false, true}, + {"roots: ignores other flags", ProvideStrategyRoots, true, false, true, false}, + + {"mfs: matches when mfs=true", ProvideStrategyMFS, false, false, true, true}, + {"mfs: ignores other flags", ProvideStrategyMFS, true, true, false, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ShouldProvideForStrategy(tt.strategy, tt.pinned, tt.pinnedRoot, tt.mfs) + assert.Equal(t, tt.want, got) + }) + } + }) + + t.Run("combined strategies use OR logic (else-if bug fix)", func(t *testing.T) { + // CRITICAL: Tests the fix where bitflag combinations (pinned+mfs) didn't work + // because of else-if instead of separate if statements + tests := []struct { + name string + strategy ProvideStrategy + pinned, pinnedRoot, mfs bool + want bool + }{ + // pinned|mfs: provide if EITHER matches + {"pinned|mfs when pinned", ProvideStrategyPinned | ProvideStrategyMFS, true, false, false, true}, + {"pinned|mfs when mfs", ProvideStrategyPinned | ProvideStrategyMFS, false, false, true, true}, + {"pinned|mfs when both", ProvideStrategyPinned | ProvideStrategyMFS, true, false, true, true}, + {"pinned|mfs when neither", ProvideStrategyPinned | ProvideStrategyMFS, false, false, false, false}, + + // roots|mfs + {"roots|mfs when root", ProvideStrategyRoots | ProvideStrategyMFS, false, true, false, true}, + {"roots|mfs when mfs", ProvideStrategyRoots | ProvideStrategyMFS, false, false, true, true}, + {"roots|mfs when neither", ProvideStrategyRoots | ProvideStrategyMFS, false, false, false, false}, + + // pinned|roots + {"pinned|roots when pinned", ProvideStrategyPinned | ProvideStrategyRoots, true, false, false, true}, + {"pinned|roots when root", ProvideStrategyPinned | ProvideStrategyRoots, false, true, false, true}, + {"pinned|roots when neither", ProvideStrategyPinned | ProvideStrategyRoots, false, false, false, false}, + + // triple combination + {"all-three when any matches", ProvideStrategyPinned | ProvideStrategyRoots | ProvideStrategyMFS, false, false, true, true}, + {"all-three when none match", ProvideStrategyPinned | ProvideStrategyRoots | ProvideStrategyMFS, false, false, false, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ShouldProvideForStrategy(tt.strategy, tt.pinned, tt.pinnedRoot, tt.mfs) + assert.Equal(t, tt.want, got) + }) + } + }) + + t.Run("zero strategy never provides", func(t *testing.T) { + assert.False(t, ShouldProvideForStrategy(ProvideStrategy(0), false, false, false)) + assert.False(t, ShouldProvideForStrategy(ProvideStrategy(0), true, true, true)) + }) +} diff --git a/config/types.go b/config/types.go index ea2315bd8..47738f9f2 100644 --- a/config/types.go +++ b/config/types.go @@ -117,6 +117,16 @@ func (f Flag) String() string { } } +// ResolveBoolFromConfig returns the resolved boolean value based on: +// - If userSet is true, returns userValue (user explicitly set the flag) +// - Otherwise, uses configFlag.WithDefault(defaultValue) (respects config or falls back to default) +func ResolveBoolFromConfig(userValue bool, userSet bool, configFlag Flag, defaultValue bool) bool { + if userSet { + return userValue + } + return configFlag.WithDefault(defaultValue) +} + var ( _ json.Unmarshaler = (*Flag)(nil) _ json.Marshaler = (*Flag)(nil) diff --git a/core/commands/add.go b/core/commands/add.go index 75fb184b7..cb4bcb312 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -1,7 +1,6 @@ package commands import ( - "context" "errors" "fmt" "io" @@ -9,7 +8,6 @@ import ( gopath "path" "strconv" "strings" - "time" "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core/commands/cmdenv" @@ -74,11 +72,6 @@ const ( const ( adderOutChanSize = 8 - - // fastProvideTimeout is the maximum time allowed for async fast-provide operations. - // Prevents hanging on network issues when providing root CID in background. - // 10 seconds is sufficient for DHT operations with sweep provider or accelerated client. - fastProvideTimeout = 10 * time.Second ) var AddCmd = &cmds.Command{ @@ -89,21 +82,21 @@ Adds the content of to IPFS. Use -r to add directories (recursively). FAST PROVIDE OPTIMIZATION: -When you add content to IPFS, it gets queued for announcement on the DHT. -The background queue can take some time to process, meaning other peers -won't find your content immediately after 'ipfs add' completes. +When you add content to IPFS, the sweep provider queues it for efficient +DHT provides over time. While this is resource-efficient, other peers won't +find your content immediately after 'ipfs add' completes. -To make sharing faster, 'ipfs add' does an extra immediate announcement -of just the root CID to the DHT. This lets other peers start discovering -your content right away, while the regular background queue still handles -announcing all the blocks later. +To make sharing faster, 'ipfs add' does an immediate provide of the root CID +to the DHT in addition to the regular queue. This complements the sweep provider: +fast-provide handles the urgent case (root CIDs that users share and reference), +while the sweep provider efficiently provides all blocks according to +Provide.Strategy over time. -By default, this extra announcement runs in the background without slowing -down the command. If you need to be certain the root CID is discoverable -before the command returns (for example, sharing a link immediately), -use --fast-provide-wait to wait for the announcement to complete. -Use --fast-provide-root=false to skip this optimization and rely only on -the background queue (controlled by Provide.Strategy and Provide.DHT.Interval). +By default, this immediate provide runs in the background without blocking +the command. If you need certainty that the root CID is discoverable before +the command returns (e.g., sharing a link immediately), use --fast-provide-wait +to wait for the provide to complete. Use --fast-provide-root=false to skip +this optimization. This works best with the sweep provider and accelerated DHT client. Automatically skipped when DHT is not available. @@ -245,8 +238,8 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import cmds.UintOption(modeOptionName, "Custom POSIX file mode to store in created UnixFS entries. WARNING: experimental, forces dag-pb for root block, disables raw-leaves"), cmds.Int64Option(mtimeOptionName, "Custom POSIX modification time to store in created UnixFS entries (seconds before or after the Unix Epoch). WARNING: experimental, forces dag-pb for root block, disables raw-leaves"), cmds.UintOption(mtimeNsecsOptionName, "Custom POSIX modification time (optional time fraction in nanoseconds)"), - cmds.BoolOption(fastProvideRootOptionName, "Immediately provide root CID to DHT for fast content discovery. When disabled, root CID is queued for background providing instead.").WithDefault(true), - cmds.BoolOption(fastProvideWaitOptionName, "Wait for fast-provide-root to complete before returning. Ensures root CID is discoverable when command finishes.").WithDefault(false), + cmds.BoolOption(fastProvideRootOptionName, "Immediately provide root CID to DHT in addition to regular queue, for faster discovery. Default: Import.FastProvideRoot"), + cmds.BoolOption(fastProvideWaitOptionName, "Block until the immediate provide completes before returning. Default: Import.FastProvideWait"), }, PreRun: func(req *cmds.Request, env cmds.Environment) error { quiet, _ := req.Options[quietOptionName].(bool) @@ -317,8 +310,8 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import mode, _ := req.Options[modeOptionName].(uint) mtime, _ := req.Options[mtimeOptionName].(int64) mtimeNsecs, _ := req.Options[mtimeNsecsOptionName].(uint) - fastProvideRoot, _ := req.Options[fastProvideRootOptionName].(bool) - fastProvideWait, _ := req.Options[fastProvideWaitOptionName].(bool) + fastProvideRoot, fastProvideRootSet := req.Options[fastProvideRootOptionName].(bool) + fastProvideWait, fastProvideWaitSet := req.Options[fastProvideWaitOptionName].(bool) if chunker == "" { chunker = cfg.Import.UnixFSChunker.WithDefault(config.DefaultUnixFSChunker) @@ -355,6 +348,9 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import maxHAMTFanout = int(cfg.Import.UnixFSHAMTDirectoryMaxFanout.WithDefault(config.DefaultUnixFSHAMTDirectoryMaxFanout)) } + fastProvideRoot = config.ResolveBoolFromConfig(fastProvideRoot, fastProvideRootSet, cfg.Import.FastProvideRoot, config.DefaultFastProvideRoot) + fastProvideWait = config.ResolveBoolFromConfig(fastProvideWait, fastProvideWaitSet, cfg.Import.FastProvideWait, config.DefaultFastProvideWait) + // Storing optional mode or mtime (UnixFS 1.5) requires root block // to always be 'dag-pb' and not 'raw'. Below adjusts raw-leaves setting, if possible. if preserveMode || preserveMtime || mode != 0 || mtime != 0 { @@ -606,65 +602,15 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import if err != nil { return err } - - // Parse the provide strategy to check if we should provide based on pin/MFS status - strategyStr := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy) - strategy := config.ParseProvideStrategy(strategyStr) - - // Determine if we should provide based on strategy - shouldProvide := false - if strategy == config.ProvideStrategyAll { - // 'all' strategy: always provide - shouldProvide = true + if err := cmdenv.ExecuteFastProvide(req.Context, ipfsNode, cfg, lastRootCid.RootCid(), fastProvideWait, dopin, dopin, toFilesSet); err != nil { + return err + } + } else if !fastProvideRoot { + if fastProvideWait { + log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true) } else { - // For combined strategies (pinned+mfs), check each component - if strategy&config.ProvideStrategyPinned != 0 && dopin { - shouldProvide = true - } else if strategy&config.ProvideStrategyRoots != 0 && dopin { - shouldProvide = true - } else if strategy&config.ProvideStrategyMFS != 0 && toFilesSet { - shouldProvide = true - } + log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config") } - - switch { - case !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled): - log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false") - case cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0: - log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0") - case !shouldProvide: - log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", dopin, "to-files", toFilesSet) - case !ipfsNode.HasActiveDHTClient(): - log.Debugw("fast-provide-root: skipped", "reason", "DHT not available") - default: - rootCid := lastRootCid.RootCid() - - if fastProvideWait { - // Synchronous mode: block until provide completes - log.Debugw("fast-provide-root: providing synchronously", "cid", rootCid) - if err := provideCIDSync(req.Context, ipfsNode.DHTClient, rootCid); err != nil { - log.Warnw("fast-provide-root: sync provide failed", "cid", rootCid, "error", err) - } else { - log.Debugw("fast-provide-root: sync provide completed", "cid", rootCid) - } - } else { - // Asynchronous mode (default): fire-and-forget, don't block - log.Debugw("fast-provide-root: providing asynchronously", "cid", rootCid) - go func() { - // Use detached context with timeout to prevent hanging on network issues - ctx, cancel := context.WithTimeout(context.Background(), fastProvideTimeout) - defer cancel() - if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil { - log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err) - } else { - log.Debugw("fast-provide-root: async provide completed", "cid", rootCid) - } - }() - } - } - } else if fastProvideWait && !fastProvideRoot { - // Log that wait flag is ignored when provide-root is disabled - log.Debugw("fast-provide-root: wait flag ignored", "reason", "fast-provide-root disabled") } return nil diff --git a/core/commands/cmdenv/env.go b/core/commands/cmdenv/env.go index 06bccb0ef..b2a45351e 100644 --- a/core/commands/cmdenv/env.go +++ b/core/commands/cmdenv/env.go @@ -1,15 +1,19 @@ package cmdenv import ( + "context" "fmt" "strconv" "strings" - "github.com/ipfs/kubo/commands" - "github.com/ipfs/kubo/core" - + "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" logging "github.com/ipfs/go-log/v2" + routing "github.com/libp2p/go-libp2p/core/routing" + + "github.com/ipfs/kubo/commands" + "github.com/ipfs/kubo/config" + "github.com/ipfs/kubo/core" coreiface "github.com/ipfs/kubo/core/coreiface" options "github.com/ipfs/kubo/core/coreiface/options" ) @@ -86,3 +90,103 @@ func needEscape(s string) bool { } return false } + +// provideCIDSync performs a synchronous/blocking provide operation to announce +// the given CID to the DHT. +// +// - If the accelerated DHT client is used, a DHT lookup isn't needed, we +// directly allocate provider records to closest peers. +// - If Provide.DHT.SweepEnabled=true or OptimisticProvide=true, we make an +// optimistic provide call. +// - Else we make a standard provide call (much slower). +// +// IMPORTANT: The caller MUST verify DHT availability using HasActiveDHTClient() +// before calling this function. Calling with a nil or invalid router will cause +// a panic - this is the caller's responsibility to prevent. +func provideCIDSync(ctx context.Context, router routing.Routing, c cid.Cid) error { + return router.Provide(ctx, c, true) +} + +// ExecuteFastProvide immediately provides a root CID to the DHT, bypassing the regular +// provide queue for faster content discovery. This function is reusable across commands +// that add or import content, such as ipfs add and ipfs dag import. +// +// Parameters: +// - ctx: context for synchronous provides +// - ipfsNode: the IPFS node instance +// - cfg: node configuration +// - rootCid: the CID to provide +// - wait: whether to block until provide completes (sync mode) +// - isPinned: whether content is pinned +// - isPinnedRoot: whether this is a pinned root CID +// - isMFS: whether content is in MFS +// +// Return value: +// - Returns nil if operation succeeded or was skipped (preconditions not met) +// - Returns error only in sync mode (wait=true) when provide operation fails +// - In async mode (wait=false), always returns nil (errors logged in goroutine) +// +// The function handles all precondition checks (Provide.Enabled, DHT availability, +// strategy matching) and logs appropriately. In async mode, it launches a goroutine +// with a detached context and timeout. +func ExecuteFastProvide( + ctx context.Context, + ipfsNode *core.IpfsNode, + cfg *config.Config, + rootCid cid.Cid, + wait bool, + isPinned bool, + isPinnedRoot bool, + isMFS bool, +) error { + log.Debugw("fast-provide-root: enabled", "wait", wait) + + // Check preconditions for providing + switch { + case !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled): + log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false") + return nil + case cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0: + log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0") + return nil + case !ipfsNode.HasActiveDHTClient(): + log.Debugw("fast-provide-root: skipped", "reason", "DHT not available") + return nil + } + + // Check if strategy allows providing this content + strategyStr := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy) + strategy := config.ParseProvideStrategy(strategyStr) + shouldProvide := config.ShouldProvideForStrategy(strategy, isPinned, isPinnedRoot, isMFS) + + if !shouldProvide { + log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", isPinned, "pinnedRoot", isPinnedRoot, "mfs", isMFS) + return nil + } + + // Execute provide operation + if wait { + // Synchronous mode: block until provide completes, return error on failure + log.Debugw("fast-provide-root: providing synchronously", "cid", rootCid) + if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil { + log.Warnw("fast-provide-root: sync provide failed", "cid", rootCid, "error", err) + return fmt.Errorf("fast-provide: %w", err) + } + log.Debugw("fast-provide-root: sync provide completed", "cid", rootCid) + return nil + } + + // Asynchronous mode (default): fire-and-forget, don't block, always return nil + log.Debugw("fast-provide-root: providing asynchronously", "cid", rootCid) + go func() { + // Use detached context with timeout to prevent hanging on network issues + ctx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout) + defer cancel() + if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil { + log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err) + } else { + log.Debugw("fast-provide-root: async provide completed", "cid", rootCid) + } + }() + return nil +} diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index ce5edb641..6827e46fa 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -16,10 +16,12 @@ import ( ) const ( - pinRootsOptionName = "pin-roots" - progressOptionName = "progress" - silentOptionName = "silent" - statsOptionName = "stats" + pinRootsOptionName = "pin-roots" + progressOptionName = "progress" + silentOptionName = "silent" + statsOptionName = "stats" + fastProvideRootOptionName = "fast-provide-root" + fastProvideWaitOptionName = "fast-provide-wait" ) // DagCmd provides a subset of commands for interacting with ipld dag objects @@ -189,6 +191,18 @@ Note: currently present in the blockstore does not represent a complete DAG, pinning of that individual root will fail. +FAST PROVIDE OPTIMIZATION: + +Root CIDs from CAR headers are immediately provided to the DHT in addition +to the regular provide queue, allowing other peers to discover your content +right away. This complements the sweep provider, which efficiently provides +all blocks according to Provide.Strategy over time. + +By default, the provide happens in the background without blocking the +command. Use --fast-provide-wait to wait for the provide to complete, or +--fast-provide-root=false to skip it. Works even with --pin-roots=false. +Automatically skipped when DHT is not available. + Maximum supported CAR version: 2 Specification of CAR formats: https://ipld.io/specs/transport/car/ `, @@ -200,6 +214,8 @@ Specification of CAR formats: https://ipld.io/specs/transport/car/ cmds.BoolOption(pinRootsOptionName, "Pin optional roots listed in the .car headers after importing.").WithDefault(true), cmds.BoolOption(silentOptionName, "No output."), cmds.BoolOption(statsOptionName, "Output stats."), + cmds.BoolOption(fastProvideRootOptionName, "Immediately provide root CIDs to DHT in addition to regular queue, for faster discovery. Default: Import.FastProvideRoot"), + cmds.BoolOption(fastProvideWaitOptionName, "Block until the immediate provide completes before returning. Default: Import.FastProvideWait"), cmdutils.AllowBigBlockOption, }, Type: CarImportOutput{}, diff --git a/core/commands/dag/import.go b/core/commands/dag/import.go index e298a2d52..032b9e52a 100644 --- a/core/commands/dag/import.go +++ b/core/commands/dag/import.go @@ -11,6 +11,7 @@ import ( cmds "github.com/ipfs/go-ipfs-cmds" ipld "github.com/ipfs/go-ipld-format" ipldlegacy "github.com/ipfs/go-ipld-legacy" + logging "github.com/ipfs/go-log/v2" "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/core/coreiface/options" gocarv2 "github.com/ipld/go-car/v2" @@ -19,6 +20,8 @@ import ( "github.com/ipfs/kubo/core/commands/cmdutils" ) +var log = logging.Logger("core/commands") + func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { node, err := cmdenv.GetNode(env) if err != nil { @@ -47,6 +50,12 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment doPinRoots, _ := req.Options[pinRootsOptionName].(bool) + fastProvideRoot, fastProvideRootSet := req.Options[fastProvideRootOptionName].(bool) + fastProvideWait, fastProvideWaitSet := req.Options[fastProvideWaitOptionName].(bool) + + fastProvideRoot = config.ResolveBoolFromConfig(fastProvideRoot, fastProvideRootSet, cfg.Import.FastProvideRoot, config.DefaultFastProvideRoot) + fastProvideWait = config.ResolveBoolFromConfig(fastProvideWait, fastProvideWaitSet, cfg.Import.FastProvideWait, config.DefaultFastProvideWait) + // grab a pinlock ( which doubles as a GC lock ) so that regardless of the // size of the streamed-in cars nothing will disappear on us before we had // a chance to roots that may show up at the very end @@ -191,5 +200,21 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment } } + // Fast-provide roots for faster discovery + if fastProvideRoot { + err = roots.ForEach(func(c cid.Cid) error { + return cmdenv.ExecuteFastProvide(req.Context, node, cfg, c, fastProvideWait, doPinRoots, doPinRoots, false) + }) + if err != nil { + return err + } + } else { + if fastProvideWait { + log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true) + } else { + log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config") + } + } + return nil } diff --git a/docs/changelogs/v0.39.md b/docs/changelogs/v0.39.md index 9d28af3de..e34fc1d70 100644 --- a/docs/changelogs/v0.39.md +++ b/docs/changelogs/v0.39.md @@ -48,26 +48,31 @@ The Amino DHT Sweep provider system, introduced as experimental in v0.38, is now - Automatic resume after restarts with persistent state ([see below](#provider-resume-cycle-for-improved-reproviding-reliability)) - Proactive alerts when reproviding falls behind ([see below](#-sweep-provider-slow-reprovide-warnings)) - Better metrics for monitoring (`provider_provides_total`) ([see below](#-metric-rename-provider_provides_total)) +- Fast optimistic provide of new root CIDs ([see below](#-fast-root-cid-providing-for-immediate-content-discovery)) For background on the sweep provider design and motivations, see [`Provide.DHT.SweepEnabled`](https://github.com/ipfs/kubo/blob/master/docs/config.md#providedhtsweepenabled) and [ipshipyard.com#8](https://github.com/ipshipyard/ipshipyard.com/pull/8). #### ⚡ Fast root CID providing for immediate content discovery -When you add content to IPFS, it normally gets queued for announcement on the DHT. This background queue can take time to process, meaning other peers won't find your content immediately after `ipfs add` completes. +When you add content to IPFS, the sweep provider queues it for efficient DHT provides over time. While this is resource-efficient, other peers won't find your content immediately after `ipfs add` or `ipfs dag import` completes. -To make sharing faster, `ipfs add` now does an extra immediate announcement of just the root CID to the DHT (controlled by the new `--fast-provide-root` flag, enabled by default). This lets other peers start discovering your content right away, while the regular background queue still handles announcing all the blocks later. +To make sharing faster, `ipfs add` and `ipfs dag import` now do an immediate provide of root CIDs to the DHT in addition to the regular queue (controlled by the new `--fast-provide-root` flag, enabled by default). This complements the sweep provider system: fast-provide handles the urgent case (root CIDs that users share and reference), while the sweep provider efficiently provides all blocks according to `Provide.Strategy` over time. -By default, this extra announcement runs in the background without slowing down the command. For use cases requiring guaranteed discoverability before the command returns (for example, sharing a link immediately), use `--fast-provide-wait` to block until the announcement completes. +This closes the gap between command completion and content shareability: root CIDs typically become discoverable on the network in under a second (compared to 30+ seconds previously). The feature uses optimistic DHT operations, which are significantly faster with the sweep provider (now enabled by default). -**Usage examples:** +By default, this immediate provide runs in the background without blocking the command. For use cases requiring guaranteed discoverability before the command returns (e.g., sharing a link immediately), use `--fast-provide-wait` to block until the provide completes. + +**Simple examples:** ```bash -ipfs add file.txt # Root CID provided immediately in background, independent of queue (default) -ipfs add file.txt --fast-provide-wait # Blocks until root CID announcement completes (slower, guaranteed) -ipfs add file.txt --fast-provide-root=false # Skip immediate announcement, use background queue only +ipfs add file.txt # Root provided immediately, blocks queued for sweep provider +ipfs add file.txt --fast-provide-wait # Wait for root provide to complete +ipfs dag import file.car # Same for CAR imports ``` -This optimization works best with the sweep provider and accelerated DHT client, where provide operations are significantly faster than traditional DHT providing. The feature is automatically skipped when DHT is unavailable (e.g., `Routing.Type=none` or delegated-only configurations). +**Configuration:** Set defaults via `Import.FastProvideRoot` (default: `true`) and `Import.FastProvideWait` (default: `false`). See `ipfs add --help` and `ipfs dag import --help` for more details and examples. + +This optimization works best with the sweep provider and accelerated DHT client, where provide operations are significantly faster. Automatically skipped when DHT is unavailable (e.g., `Routing.Type=none` or delegated-only configurations). #### 📊 Detailed statistics for Sweep provider with `ipfs provide stat` diff --git a/docs/config.md b/docs/config.md index 65f902cfd..0352bd845 100644 --- a/docs/config.md +++ b/docs/config.md @@ -230,6 +230,8 @@ config file at runtime. - [`Import.UnixFSRawLeaves`](#importunixfsrawleaves) - [`Import.UnixFSChunker`](#importunixfschunker) - [`Import.HashFunction`](#importhashfunction) + - [`Import.FastProvideRoot`](#importfastprovideroot) + - [`Import.FastProvideWait`](#importfastprovidewait) - [`Import.BatchMaxNodes`](#importbatchmaxnodes) - [`Import.BatchMaxSize`](#importbatchmaxsize) - [`Import.UnixFSFileMaxLinks`](#importunixfsfilemaxlinks) @@ -3619,6 +3621,38 @@ Default: `sha2-256` Type: `optionalString` +### `Import.FastProvideRoot` + +Immediately provide root CIDs to the DHT in addition to the regular provide queue. + +This complements the sweep provider system: fast-provide handles the urgent case (root CIDs that users share and reference), while the sweep provider efficiently provides all blocks according to the `Provide.Strategy` over time. Together, they optimize for both immediate discoverability of newly imported content and efficient resource usage for complete DAG provides. + +When disabled, only the sweep provider's queue is used. + +This setting applies to both `ipfs add` and `ipfs dag import` commands and can be overridden per-command with the `--fast-provide-root` flag. + +Ignored when DHT is not available for routing (e.g., `Routing.Type=none` or delegated-only configurations). + +Default: `true` + +Type: `flag` + +### `Import.FastProvideWait` + +Wait for the immediate root CID provide to complete before returning. + +When enabled, the command blocks until the provide completes, ensuring guaranteed discoverability before returning. When disabled (default), the provide happens asynchronously in the background without blocking the command. + +Use this when you need certainty that content is discoverable before the command returns (e.g., sharing a link immediately after adding). + +This setting applies to both `ipfs add` and `ipfs dag import` commands and can be overridden per-command with the `--fast-provide-wait` flag. + +Ignored when DHT is not available for routing (e.g., `Routing.Type=none` or delegated-only configurations). + +Default: `false` + +Type: `flag` + ### `Import.BatchMaxNodes` The maximum number of nodes in a write-batch. The total size of the batch is limited by `BatchMaxnodes` and `BatchMaxSize`. diff --git a/test/cli/add_test.go b/test/cli/add_test.go index e4138b624..cda0c977d 100644 --- a/test/cli/add_test.go +++ b/test/cli/add_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/dustin/go-humanize" "github.com/ipfs/kubo/config" @@ -15,6 +16,19 @@ import ( "github.com/stretchr/testify/require" ) +// waitForLogMessage polls a buffer for a log message, waiting up to timeout duration. +// Returns true if message found, false if timeout reached. +func waitForLogMessage(buffer *harness.Buffer, message string, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if strings.Contains(buffer.String(), message) { + return true + } + time.Sleep(100 * time.Millisecond) + } + return false +} + func TestAdd(t *testing.T) { t.Parallel() @@ -435,7 +449,182 @@ func TestAdd(t *testing.T) { require.Equal(t, 992, len(root.Links)) }) }) +} +func TestAddFastProvide(t *testing.T) { + t.Parallel() + + const ( + shortString = "hello world" + shortStringCidV0 = "Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD" // cidv0 - dag-pb - sha2-256 + ) + + t.Run("fast-provide-root disabled via config: verify skipped in logs", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideRoot = config.False + }) + + // Start daemon with debug logging + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + cidStr := node.IPFSAddStr(shortString) + require.Equal(t, shortStringCidV0, cidStr) + + // Verify fast-provide-root was disabled + daemonLog := node.Daemon.Stderr.String() + require.Contains(t, daemonLog, "fast-provide-root: skipped") + }) + + t.Run("fast-provide-root enabled with wait=false: verify async provide", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + // Use default config (FastProvideRoot=true, FastProvideWait=false) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + cidStr := node.IPFSAddStr(shortString) + require.Equal(t, shortStringCidV0, cidStr) + + daemonLog := node.Daemon.Stderr + // Should see async mode started + require.Contains(t, daemonLog.String(), "fast-provide-root: enabled") + require.Contains(t, daemonLog.String(), "fast-provide-root: providing asynchronously") + + // Wait for async completion or failure (up to 11 seconds - slightly more than fastProvideTimeout) + // In test environment with no DHT peers, this will fail with "failed to find any peer in table" + completedOrFailed := waitForLogMessage(daemonLog, "async provide completed", 11*time.Second) || + waitForLogMessage(daemonLog, "async provide failed", 11*time.Second) + require.True(t, completedOrFailed, "async provide should complete or fail within timeout") + }) + + t.Run("fast-provide-root enabled with wait=true: verify sync provide", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideWait = config.True + }) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + // Use Runner.Run with stdin to allow for expected errors + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"add", "-q"}, + CmdOpts: []harness.CmdOpt{ + harness.RunWithStdin(strings.NewReader(shortString)), + }, + }) + + // In sync mode (wait=true), provide errors propagate and fail the command. + // Test environment uses 'test' profile with no bootstrappers, and CI has + // insufficient peers for proper DHT puts, so we expect this to fail with + // "failed to find any peer in table" error from the DHT. + require.Equal(t, 1, res.ExitCode()) + require.Contains(t, res.Stderr.String(), "Error: fast-provide: failed to find any peer in table") + + daemonLog := node.Daemon.Stderr.String() + // Should see sync mode started + require.Contains(t, daemonLog, "fast-provide-root: enabled") + require.Contains(t, daemonLog, "fast-provide-root: providing synchronously") + require.Contains(t, daemonLog, "sync provide failed") // Verify the failure was logged + }) + + t.Run("fast-provide-wait ignored when root disabled", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideRoot = config.False + cfg.Import.FastProvideWait = config.True + }) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + cidStr := node.IPFSAddStr(shortString) + require.Equal(t, shortStringCidV0, cidStr) + + daemonLog := node.Daemon.Stderr.String() + require.Contains(t, daemonLog, "fast-provide-root: skipped") + require.Contains(t, daemonLog, "wait-flag-ignored") + }) + + t.Run("CLI flag overrides config: flag=true overrides config=false", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideRoot = config.False + }) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + cidStr := node.IPFSAddStr(shortString, "--fast-provide-root=true") + require.Equal(t, shortStringCidV0, cidStr) + + daemonLog := node.Daemon.Stderr + // Flag should enable it despite config saying false + require.Contains(t, daemonLog.String(), "fast-provide-root: enabled") + require.Contains(t, daemonLog.String(), "fast-provide-root: providing asynchronously") + }) + + t.Run("CLI flag overrides config: flag=false overrides config=true", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideRoot = config.True + }) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + cidStr := node.IPFSAddStr(shortString, "--fast-provide-root=false") + require.Equal(t, shortStringCidV0, cidStr) + + daemonLog := node.Daemon.Stderr.String() + // Flag should disable it despite config saying true + require.Contains(t, daemonLog, "fast-provide-root: skipped") + }) } // createDirectoryForHAMT aims to create enough files with long names for the directory block to be close to the UnixFSHAMTDirectorySizeThreshold. diff --git a/test/cli/dag_test.go b/test/cli/dag_test.go index 1a3defc3c..f6758a710 100644 --- a/test/cli/dag_test.go +++ b/test/cli/dag_test.go @@ -5,10 +5,13 @@ import ( "io" "os" "testing" + "time" + "github.com/ipfs/kubo/config" "github.com/ipfs/kubo/test/cli/harness" "github.com/ipfs/kubo/test/cli/testutils" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -102,3 +105,200 @@ func TestDag(t *testing.T) { assert.Equal(t, content, stat.Stdout.Bytes()) }) } + +func TestDagImportFastProvide(t *testing.T) { + t.Parallel() + + t.Run("fast-provide-root disabled via config: verify skipped in logs", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideRoot = config.False + }) + + // Start daemon with debug logging + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + // Import CAR file + r, err := os.Open(fixtureFile) + require.NoError(t, err) + defer r.Close() + err = node.IPFSDagImport(r, fixtureCid) + require.NoError(t, err) + + // Verify fast-provide-root was disabled + daemonLog := node.Daemon.Stderr.String() + require.Contains(t, daemonLog, "fast-provide-root: skipped") + }) + + t.Run("fast-provide-root enabled with wait=false: verify async provide", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + // Use default config (FastProvideRoot=true, FastProvideWait=false) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + // Import CAR file + r, err := os.Open(fixtureFile) + require.NoError(t, err) + defer r.Close() + err = node.IPFSDagImport(r, fixtureCid) + require.NoError(t, err) + + daemonLog := node.Daemon.Stderr + // Should see async mode started + require.Contains(t, daemonLog.String(), "fast-provide-root: enabled") + require.Contains(t, daemonLog.String(), "fast-provide-root: providing asynchronously") + require.Contains(t, daemonLog.String(), fixtureCid) // Should log the specific CID being provided + + // Wait for async completion or failure (slightly more than DefaultFastProvideTimeout) + // In test environment with no DHT peers, this will fail with "failed to find any peer in table" + timeout := config.DefaultFastProvideTimeout + time.Second + completedOrFailed := waitForLogMessage(daemonLog, "async provide completed", timeout) || + waitForLogMessage(daemonLog, "async provide failed", timeout) + require.True(t, completedOrFailed, "async provide should complete or fail within timeout") + }) + + t.Run("fast-provide-root enabled with wait=true: verify sync provide", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideWait = config.True + }) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + // Import CAR file - use Run instead of IPFSDagImport to handle expected error + r, err := os.Open(fixtureFile) + require.NoError(t, err) + defer r.Close() + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"dag", "import", "--pin-roots=false"}, + CmdOpts: []harness.CmdOpt{ + harness.RunWithStdin(r), + }, + }) + // In sync mode (wait=true), provide errors propagate and fail the command. + // Test environment uses 'test' profile with no bootstrappers, and CI has + // insufficient peers for proper DHT puts, so we expect this to fail with + // "failed to find any peer in table" error from the DHT. + require.Equal(t, 1, res.ExitCode()) + require.Contains(t, res.Stderr.String(), "Error: fast-provide: failed to find any peer in table") + + daemonLog := node.Daemon.Stderr.String() + // Should see sync mode started + require.Contains(t, daemonLog, "fast-provide-root: enabled") + require.Contains(t, daemonLog, "fast-provide-root: providing synchronously") + require.Contains(t, daemonLog, fixtureCid) // Should log the specific CID being provided + require.Contains(t, daemonLog, "sync provide failed") // Verify the failure was logged + }) + + t.Run("fast-provide-wait ignored when root disabled", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideRoot = config.False + cfg.Import.FastProvideWait = config.True + }) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + // Import CAR file + r, err := os.Open(fixtureFile) + require.NoError(t, err) + defer r.Close() + err = node.IPFSDagImport(r, fixtureCid) + require.NoError(t, err) + + daemonLog := node.Daemon.Stderr.String() + require.Contains(t, daemonLog, "fast-provide-root: skipped") + // Note: dag import doesn't log wait-flag-ignored like add does + }) + + t.Run("CLI flag overrides config: flag=true overrides config=false", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideRoot = config.False + }) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + // Import CAR file with flag override + r, err := os.Open(fixtureFile) + require.NoError(t, err) + defer r.Close() + err = node.IPFSDagImport(r, fixtureCid, "--fast-provide-root=true") + require.NoError(t, err) + + daemonLog := node.Daemon.Stderr + // Flag should enable it despite config saying false + require.Contains(t, daemonLog.String(), "fast-provide-root: enabled") + require.Contains(t, daemonLog.String(), "fast-provide-root: providing asynchronously") + require.Contains(t, daemonLog.String(), fixtureCid) // Should log the specific CID being provided + }) + + t.Run("CLI flag overrides config: flag=false overrides config=true", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.UpdateConfig(func(cfg *config.Config) { + cfg.Import.FastProvideRoot = config.True + }) + + node.StartDaemonWithReq(harness.RunRequest{ + CmdOpts: []harness.CmdOpt{ + harness.RunWithEnv(map[string]string{ + "GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug", + }), + }, + }, "") + defer node.StopDaemon() + + // Import CAR file with flag override + r, err := os.Open(fixtureFile) + require.NoError(t, err) + defer r.Close() + err = node.IPFSDagImport(r, fixtureCid, "--fast-provide-root=false") + require.NoError(t, err) + + daemonLog := node.Daemon.Stderr.String() + // Flag should disable it despite config saying true + require.Contains(t, daemonLog, "fast-provide-root: skipped") + }) +}