From 21dea1e45b9eb9b0dcf6362a843c09ff4841656a Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Fri, 14 Nov 2025 23:14:09 +0100 Subject: [PATCH] refactor: error cmd on error and wait=true change ExecuteFastProvide() to return error, enabling proper error propagation when --fast-provide-wait=true. in sync mode, provide failures now error the command as expected. in async mode (default), always returns nil with errors logged in background goroutine. also remove duplicate ExecuteFastProvide() from provide.go (75 lines), keeping single implementation in cmdenv/env.go for reuse across add and dag import commands. call sites simplified: - add.go: check and propagate error from ExecuteFastProvide - dag/import.go: return error from ForEach callback, remove confusing conditional error handling semantics: - precondition skips (DHT unavailable, etc): return nil (not failure) - async mode (wait=false): return nil, log errors in goroutine - sync mode (wait=true): return wrapped error on provide failure --- core/commands/add.go | 4 +- core/commands/cmdenv/env.go | 49 +++++++++++++---------- core/commands/dag/import.go | 8 +--- core/commands/provide.go | 79 ------------------------------------- test/cli/add_test.go | 22 ++++++++--- test/cli/dag_test.go | 25 +++++++----- 6 files changed, 65 insertions(+), 122 deletions(-) diff --git a/core/commands/add.go b/core/commands/add.go index c8e3a8de0..cb4bcb312 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -602,7 +602,9 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import if err != nil { return err } - cmdenv.ExecuteFastProvide(req.Context, ipfsNode, cfg, lastRootCid.RootCid(), fastProvideWait, dopin, dopin, toFilesSet) + if err := cmdenv.ExecuteFastProvide(req.Context, ipfsNode, cfg, lastRootCid.RootCid(), fastProvideWait, dopin, dopin, toFilesSet); err != nil { + return err + } } else if !fastProvideRoot { if fastProvideWait { log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true) diff --git a/core/commands/cmdenv/env.go b/core/commands/cmdenv/env.go index 099e23539..b2a45351e 100644 --- a/core/commands/cmdenv/env.go +++ b/core/commands/cmdenv/env.go @@ -121,6 +121,11 @@ func provideCIDSync(ctx context.Context, router routing.Routing, c cid.Cid) erro // - isPinnedRoot: whether this is a pinned root CID // - isMFS: whether content is in MFS // +// Return value: +// - Returns nil if operation succeeded or was skipped (preconditions not met) +// - Returns error only in sync mode (wait=true) when provide operation fails +// - In async mode (wait=false), always returns nil (errors logged in goroutine) +// // The function handles all precondition checks (Provide.Enabled, DHT availability, // strategy matching) and logs appropriately. In async mode, it launches a goroutine // with a detached context and timeout. @@ -133,20 +138,20 @@ func ExecuteFastProvide( isPinned bool, isPinnedRoot bool, isMFS bool, -) { +) error { log.Debugw("fast-provide-root: enabled", "wait", wait) // Check preconditions for providing switch { case !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled): log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false") - return + return nil case cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0: log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0") - return + return nil case !ipfsNode.HasActiveDHTClient(): log.Debugw("fast-provide-root: skipped", "reason", "DHT not available") - return + return nil } // Check if strategy allows providing this content @@ -156,30 +161,32 @@ func ExecuteFastProvide( if !shouldProvide { log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", isPinned, "pinnedRoot", isPinnedRoot, "mfs", isMFS) - return + return nil } // Execute provide operation if wait { - // Synchronous mode: block until provide completes + // Synchronous mode: block until provide completes, return error on failure log.Debugw("fast-provide-root: providing synchronously", "cid", rootCid) if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil { log.Warnw("fast-provide-root: sync provide failed", "cid", rootCid, "error", err) - } else { - log.Debugw("fast-provide-root: sync provide completed", "cid", rootCid) + return fmt.Errorf("fast-provide: %w", err) } - } else { - // Asynchronous mode (default): fire-and-forget, don't block - log.Debugw("fast-provide-root: providing asynchronously", "cid", rootCid) - go func() { - // Use detached context with timeout to prevent hanging on network issues - ctx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout) - defer cancel() - if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil { - log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err) - } else { - log.Debugw("fast-provide-root: async provide completed", "cid", rootCid) - } - }() + log.Debugw("fast-provide-root: sync provide completed", "cid", rootCid) + return nil } + + // Asynchronous mode (default): fire-and-forget, don't block, always return nil + log.Debugw("fast-provide-root: providing asynchronously", "cid", rootCid) + go func() { + // Use detached context with timeout to prevent hanging on network issues + ctx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout) + defer cancel() + if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil { + log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err) + } else { + log.Debugw("fast-provide-root: async provide completed", "cid", rootCid) + } + }() + return nil } diff --git a/core/commands/dag/import.go b/core/commands/dag/import.go index c86ed4806..032b9e52a 100644 --- a/core/commands/dag/import.go +++ b/core/commands/dag/import.go @@ -203,14 +203,10 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment // Fast-provide roots for faster discovery if fastProvideRoot { err = roots.ForEach(func(c cid.Cid) error { - cmdenv.ExecuteFastProvide(req.Context, node, cfg, c, fastProvideWait, doPinRoots, doPinRoots, false) - return nil + return cmdenv.ExecuteFastProvide(req.Context, node, cfg, c, fastProvideWait, doPinRoots, doPinRoots, false) }) if err != nil { - if fastProvideWait { - return err - } - log.Warnw("fast-provide-root: ForEach error", "error", err) + return err } } else { if fastProvideWait { diff --git a/core/commands/provide.go b/core/commands/provide.go index 2a1a9ad51..c9d3954cf 100644 --- a/core/commands/provide.go +++ b/core/commands/provide.go @@ -14,8 +14,6 @@ import ( boxoprovider "github.com/ipfs/boxo/provider" cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" - "github.com/ipfs/kubo/config" - "github.com/ipfs/kubo/core" "github.com/ipfs/kubo/core/commands/cmdenv" "github.com/libp2p/go-libp2p-kad-dht/fullrt" "github.com/libp2p/go-libp2p-kad-dht/provider" @@ -596,80 +594,3 @@ func humanFull(val float64, decimals int) string { func provideCIDSync(ctx context.Context, router routing.Routing, c cid.Cid) error { return router.Provide(ctx, c, true) } - -// ExecuteFastProvide immediately provides a root CID to the DHT, bypassing the regular -// provide queue for faster content discovery. This function is reusable across commands -// that add or import content, such as ipfs add and (in the future) ipfs dag import. -// -// Parameters: -// - ctx: context for synchronous provides -// - ipfsNode: the IPFS node instance -// - cfg: node configuration -// - rootCid: the CID to provide -// - wait: whether to block until provide completes (sync mode) -// - isPinned: whether content is pinned -// - isPinnedRoot: whether this is a pinned root CID -// - isMFS: whether content is in MFS -// -// The function handles all precondition checks (Provide.Enabled, DHT availability, -// strategy matching) and logs appropriately. In async mode, it launches a goroutine -// with a detached context and timeout. -func ExecuteFastProvide( - ctx context.Context, - ipfsNode *core.IpfsNode, - cfg *config.Config, - rootCid cid.Cid, - wait bool, - isPinned bool, - isPinnedRoot bool, - isMFS bool, -) { - log.Debugw("fast-provide-root: enabled", "wait", wait) - - // Check preconditions for providing - switch { - case !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled): - log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false") - return - case cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0: - log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0") - return - case !ipfsNode.HasActiveDHTClient(): - log.Debugw("fast-provide-root: skipped", "reason", "DHT not available") - return - } - - // Check if strategy allows providing this content - strategyStr := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy) - strategy := config.ParseProvideStrategy(strategyStr) - shouldProvide := config.ShouldProvideForStrategy(strategy, isPinned, isPinnedRoot, isMFS) - - if !shouldProvide { - log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", isPinned, "pinnedRoot", isPinnedRoot, "mfs", isMFS) - return - } - - // Execute provide operation - if wait { - // Synchronous mode: block until provide completes - log.Debugw("fast-provide-root: providing synchronously", "cid", rootCid) - if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil { - log.Warnw("fast-provide-root: sync provide failed", "cid", rootCid, "error", err) - } else { - log.Debugw("fast-provide-root: sync provide completed", "cid", rootCid) - } - } else { - // Asynchronous mode (default): fire-and-forget, don't block - log.Debugw("fast-provide-root: providing asynchronously", "cid", rootCid) - go func() { - // Use detached context with timeout to prevent hanging on network issues - ctx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout) - defer cancel() - if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil { - log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err) - } else { - log.Debugw("fast-provide-root: async provide completed", "cid", rootCid) - } - }() - } -} diff --git a/test/cli/add_test.go b/test/cli/add_test.go index 0a4f06e90..cda0c977d 100644 --- a/test/cli/add_test.go +++ b/test/cli/add_test.go @@ -529,17 +529,27 @@ func TestAddFastProvide(t *testing.T) { }, "") defer node.StopDaemon() - cidStr := node.IPFSAddStr(shortString) - require.Equal(t, shortStringCidV0, cidStr) + // Use Runner.Run with stdin to allow for expected errors + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"add", "-q"}, + CmdOpts: []harness.CmdOpt{ + harness.RunWithStdin(strings.NewReader(shortString)), + }, + }) + + // In sync mode (wait=true), provide errors propagate and fail the command. + // Test environment uses 'test' profile with no bootstrappers, and CI has + // insufficient peers for proper DHT puts, so we expect this to fail with + // "failed to find any peer in table" error from the DHT. + require.Equal(t, 1, res.ExitCode()) + require.Contains(t, res.Stderr.String(), "Error: fast-provide: failed to find any peer in table") daemonLog := node.Daemon.Stderr.String() // Should see sync mode started require.Contains(t, daemonLog, "fast-provide-root: enabled") require.Contains(t, daemonLog, "fast-provide-root: providing synchronously") - // In test environment with no DHT peers, this will fail, but the provide attempt was made - require.True(t, - strings.Contains(daemonLog, "sync provide completed") || strings.Contains(daemonLog, "sync provide failed"), - "sync provide should complete or fail") + require.Contains(t, daemonLog, "sync provide failed") // Verify the failure was logged }) t.Run("fast-provide-wait ignored when root disabled", func(t *testing.T) { diff --git a/test/cli/dag_test.go b/test/cli/dag_test.go index 37069330d..f6758a710 100644 --- a/test/cli/dag_test.go +++ b/test/cli/dag_test.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "os" - "strings" "testing" "time" @@ -190,22 +189,30 @@ func TestDagImportFastProvide(t *testing.T) { }, "") defer node.StopDaemon() - // Import CAR file + // Import CAR file - use Run instead of IPFSDagImport to handle expected error r, err := os.Open(fixtureFile) require.NoError(t, err) defer r.Close() - err = node.IPFSDagImport(r, fixtureCid) - require.NoError(t, err) + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"dag", "import", "--pin-roots=false"}, + CmdOpts: []harness.CmdOpt{ + harness.RunWithStdin(r), + }, + }) + // In sync mode (wait=true), provide errors propagate and fail the command. + // Test environment uses 'test' profile with no bootstrappers, and CI has + // insufficient peers for proper DHT puts, so we expect this to fail with + // "failed to find any peer in table" error from the DHT. + require.Equal(t, 1, res.ExitCode()) + require.Contains(t, res.Stderr.String(), "Error: fast-provide: failed to find any peer in table") daemonLog := node.Daemon.Stderr.String() // Should see sync mode started require.Contains(t, daemonLog, "fast-provide-root: enabled") require.Contains(t, daemonLog, "fast-provide-root: providing synchronously") - require.Contains(t, daemonLog, fixtureCid) // Should log the specific CID being provided - // In test environment with no DHT peers, this will fail, but the provide attempt was made - require.True(t, - strings.Contains(daemonLog, "sync provide completed") || strings.Contains(daemonLog, "sync provide failed"), - "sync provide should complete or fail") + require.Contains(t, daemonLog, fixtureCid) // Should log the specific CID being provided + require.Contains(t, daemonLog, "sync provide failed") // Verify the failure was logged }) t.Run("fast-provide-wait ignored when root disabled", func(t *testing.T) {