mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
refactor: error cmd on error and wait=true
change ExecuteFastProvide() to return error, enabling proper error propagation when --fast-provide-wait=true. in sync mode, provide failures now error the command as expected. in async mode (default), always returns nil with errors logged in background goroutine. also remove duplicate ExecuteFastProvide() from provide.go (75 lines), keeping single implementation in cmdenv/env.go for reuse across add and dag import commands. call sites simplified: - add.go: check and propagate error from ExecuteFastProvide - dag/import.go: return error from ForEach callback, remove confusing conditional error handling semantics: - precondition skips (DHT unavailable, etc): return nil (not failure) - async mode (wait=false): return nil, log errors in goroutine - sync mode (wait=true): return wrapped error on provide failure
This commit is contained in:
parent
deaa7416d6
commit
21dea1e45b
@ -602,7 +602,9 @@ https://github.com/ipfs/kubo/blob/master/docs/config.md#import
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdenv.ExecuteFastProvide(req.Context, ipfsNode, cfg, lastRootCid.RootCid(), fastProvideWait, dopin, dopin, toFilesSet)
|
||||
if err := cmdenv.ExecuteFastProvide(req.Context, ipfsNode, cfg, lastRootCid.RootCid(), fastProvideWait, dopin, dopin, toFilesSet); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !fastProvideRoot {
|
||||
if fastProvideWait {
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true)
|
||||
|
||||
@ -121,6 +121,11 @@ func provideCIDSync(ctx context.Context, router routing.Routing, c cid.Cid) erro
|
||||
// - isPinnedRoot: whether this is a pinned root CID
|
||||
// - isMFS: whether content is in MFS
|
||||
//
|
||||
// Return value:
|
||||
// - Returns nil if operation succeeded or was skipped (preconditions not met)
|
||||
// - Returns error only in sync mode (wait=true) when provide operation fails
|
||||
// - In async mode (wait=false), always returns nil (errors logged in goroutine)
|
||||
//
|
||||
// The function handles all precondition checks (Provide.Enabled, DHT availability,
|
||||
// strategy matching) and logs appropriately. In async mode, it launches a goroutine
|
||||
// with a detached context and timeout.
|
||||
@ -133,20 +138,20 @@ func ExecuteFastProvide(
|
||||
isPinned bool,
|
||||
isPinnedRoot bool,
|
||||
isMFS bool,
|
||||
) {
|
||||
) error {
|
||||
log.Debugw("fast-provide-root: enabled", "wait", wait)
|
||||
|
||||
// Check preconditions for providing
|
||||
switch {
|
||||
case !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled):
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false")
|
||||
return
|
||||
return nil
|
||||
case cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0:
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0")
|
||||
return
|
||||
return nil
|
||||
case !ipfsNode.HasActiveDHTClient():
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "DHT not available")
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if strategy allows providing this content
|
||||
@ -156,30 +161,32 @@ func ExecuteFastProvide(
|
||||
|
||||
if !shouldProvide {
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", isPinned, "pinnedRoot", isPinnedRoot, "mfs", isMFS)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// Execute provide operation
|
||||
if wait {
|
||||
// Synchronous mode: block until provide completes
|
||||
// Synchronous mode: block until provide completes, return error on failure
|
||||
log.Debugw("fast-provide-root: providing synchronously", "cid", rootCid)
|
||||
if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil {
|
||||
log.Warnw("fast-provide-root: sync provide failed", "cid", rootCid, "error", err)
|
||||
} else {
|
||||
log.Debugw("fast-provide-root: sync provide completed", "cid", rootCid)
|
||||
return fmt.Errorf("fast-provide: %w", err)
|
||||
}
|
||||
} else {
|
||||
// Asynchronous mode (default): fire-and-forget, don't block
|
||||
log.Debugw("fast-provide-root: providing asynchronously", "cid", rootCid)
|
||||
go func() {
|
||||
// Use detached context with timeout to prevent hanging on network issues
|
||||
ctx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout)
|
||||
defer cancel()
|
||||
if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil {
|
||||
log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err)
|
||||
} else {
|
||||
log.Debugw("fast-provide-root: async provide completed", "cid", rootCid)
|
||||
}
|
||||
}()
|
||||
log.Debugw("fast-provide-root: sync provide completed", "cid", rootCid)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Asynchronous mode (default): fire-and-forget, don't block, always return nil
|
||||
log.Debugw("fast-provide-root: providing asynchronously", "cid", rootCid)
|
||||
go func() {
|
||||
// Use detached context with timeout to prevent hanging on network issues
|
||||
ctx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout)
|
||||
defer cancel()
|
||||
if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil {
|
||||
log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err)
|
||||
} else {
|
||||
log.Debugw("fast-provide-root: async provide completed", "cid", rootCid)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -203,14 +203,10 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
|
||||
// Fast-provide roots for faster discovery
|
||||
if fastProvideRoot {
|
||||
err = roots.ForEach(func(c cid.Cid) error {
|
||||
cmdenv.ExecuteFastProvide(req.Context, node, cfg, c, fastProvideWait, doPinRoots, doPinRoots, false)
|
||||
return nil
|
||||
return cmdenv.ExecuteFastProvide(req.Context, node, cfg, c, fastProvideWait, doPinRoots, doPinRoots, false)
|
||||
})
|
||||
if err != nil {
|
||||
if fastProvideWait {
|
||||
return err
|
||||
}
|
||||
log.Warnw("fast-provide-root: ForEach error", "error", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if fastProvideWait {
|
||||
|
||||
@ -14,8 +14,6 @@ import (
|
||||
boxoprovider "github.com/ipfs/boxo/provider"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
"github.com/ipfs/kubo/config"
|
||||
"github.com/ipfs/kubo/core"
|
||||
"github.com/ipfs/kubo/core/commands/cmdenv"
|
||||
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
|
||||
"github.com/libp2p/go-libp2p-kad-dht/provider"
|
||||
@ -596,80 +594,3 @@ func humanFull(val float64, decimals int) string {
|
||||
func provideCIDSync(ctx context.Context, router routing.Routing, c cid.Cid) error {
|
||||
return router.Provide(ctx, c, true)
|
||||
}
|
||||
|
||||
// ExecuteFastProvide immediately provides a root CID to the DHT, bypassing the regular
|
||||
// provide queue for faster content discovery. This function is reusable across commands
|
||||
// that add or import content, such as ipfs add and (in the future) ipfs dag import.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: context for synchronous provides
|
||||
// - ipfsNode: the IPFS node instance
|
||||
// - cfg: node configuration
|
||||
// - rootCid: the CID to provide
|
||||
// - wait: whether to block until provide completes (sync mode)
|
||||
// - isPinned: whether content is pinned
|
||||
// - isPinnedRoot: whether this is a pinned root CID
|
||||
// - isMFS: whether content is in MFS
|
||||
//
|
||||
// The function handles all precondition checks (Provide.Enabled, DHT availability,
|
||||
// strategy matching) and logs appropriately. In async mode, it launches a goroutine
|
||||
// with a detached context and timeout.
|
||||
func ExecuteFastProvide(
|
||||
ctx context.Context,
|
||||
ipfsNode *core.IpfsNode,
|
||||
cfg *config.Config,
|
||||
rootCid cid.Cid,
|
||||
wait bool,
|
||||
isPinned bool,
|
||||
isPinnedRoot bool,
|
||||
isMFS bool,
|
||||
) {
|
||||
log.Debugw("fast-provide-root: enabled", "wait", wait)
|
||||
|
||||
// Check preconditions for providing
|
||||
switch {
|
||||
case !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled):
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false")
|
||||
return
|
||||
case cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0:
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0")
|
||||
return
|
||||
case !ipfsNode.HasActiveDHTClient():
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "DHT not available")
|
||||
return
|
||||
}
|
||||
|
||||
// Check if strategy allows providing this content
|
||||
strategyStr := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy)
|
||||
strategy := config.ParseProvideStrategy(strategyStr)
|
||||
shouldProvide := config.ShouldProvideForStrategy(strategy, isPinned, isPinnedRoot, isMFS)
|
||||
|
||||
if !shouldProvide {
|
||||
log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", isPinned, "pinnedRoot", isPinnedRoot, "mfs", isMFS)
|
||||
return
|
||||
}
|
||||
|
||||
// Execute provide operation
|
||||
if wait {
|
||||
// Synchronous mode: block until provide completes
|
||||
log.Debugw("fast-provide-root: providing synchronously", "cid", rootCid)
|
||||
if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil {
|
||||
log.Warnw("fast-provide-root: sync provide failed", "cid", rootCid, "error", err)
|
||||
} else {
|
||||
log.Debugw("fast-provide-root: sync provide completed", "cid", rootCid)
|
||||
}
|
||||
} else {
|
||||
// Asynchronous mode (default): fire-and-forget, don't block
|
||||
log.Debugw("fast-provide-root: providing asynchronously", "cid", rootCid)
|
||||
go func() {
|
||||
// Use detached context with timeout to prevent hanging on network issues
|
||||
ctx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout)
|
||||
defer cancel()
|
||||
if err := provideCIDSync(ctx, ipfsNode.DHTClient, rootCid); err != nil {
|
||||
log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err)
|
||||
} else {
|
||||
log.Debugw("fast-provide-root: async provide completed", "cid", rootCid)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
@ -529,17 +529,27 @@ func TestAddFastProvide(t *testing.T) {
|
||||
}, "")
|
||||
defer node.StopDaemon()
|
||||
|
||||
cidStr := node.IPFSAddStr(shortString)
|
||||
require.Equal(t, shortStringCidV0, cidStr)
|
||||
// Use Runner.Run with stdin to allow for expected errors
|
||||
res := node.Runner.Run(harness.RunRequest{
|
||||
Path: node.IPFSBin,
|
||||
Args: []string{"add", "-q"},
|
||||
CmdOpts: []harness.CmdOpt{
|
||||
harness.RunWithStdin(strings.NewReader(shortString)),
|
||||
},
|
||||
})
|
||||
|
||||
// In sync mode (wait=true), provide errors propagate and fail the command.
|
||||
// Test environment uses 'test' profile with no bootstrappers, and CI has
|
||||
// insufficient peers for proper DHT puts, so we expect this to fail with
|
||||
// "failed to find any peer in table" error from the DHT.
|
||||
require.Equal(t, 1, res.ExitCode())
|
||||
require.Contains(t, res.Stderr.String(), "Error: fast-provide: failed to find any peer in table")
|
||||
|
||||
daemonLog := node.Daemon.Stderr.String()
|
||||
// Should see sync mode started
|
||||
require.Contains(t, daemonLog, "fast-provide-root: enabled")
|
||||
require.Contains(t, daemonLog, "fast-provide-root: providing synchronously")
|
||||
// In test environment with no DHT peers, this will fail, but the provide attempt was made
|
||||
require.True(t,
|
||||
strings.Contains(daemonLog, "sync provide completed") || strings.Contains(daemonLog, "sync provide failed"),
|
||||
"sync provide should complete or fail")
|
||||
require.Contains(t, daemonLog, "sync provide failed") // Verify the failure was logged
|
||||
})
|
||||
|
||||
t.Run("fast-provide-wait ignored when root disabled", func(t *testing.T) {
|
||||
|
||||
@ -4,7 +4,6 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -190,22 +189,30 @@ func TestDagImportFastProvide(t *testing.T) {
|
||||
}, "")
|
||||
defer node.StopDaemon()
|
||||
|
||||
// Import CAR file
|
||||
// Import CAR file - use Run instead of IPFSDagImport to handle expected error
|
||||
r, err := os.Open(fixtureFile)
|
||||
require.NoError(t, err)
|
||||
defer r.Close()
|
||||
err = node.IPFSDagImport(r, fixtureCid)
|
||||
require.NoError(t, err)
|
||||
res := node.Runner.Run(harness.RunRequest{
|
||||
Path: node.IPFSBin,
|
||||
Args: []string{"dag", "import", "--pin-roots=false"},
|
||||
CmdOpts: []harness.CmdOpt{
|
||||
harness.RunWithStdin(r),
|
||||
},
|
||||
})
|
||||
// In sync mode (wait=true), provide errors propagate and fail the command.
|
||||
// Test environment uses 'test' profile with no bootstrappers, and CI has
|
||||
// insufficient peers for proper DHT puts, so we expect this to fail with
|
||||
// "failed to find any peer in table" error from the DHT.
|
||||
require.Equal(t, 1, res.ExitCode())
|
||||
require.Contains(t, res.Stderr.String(), "Error: fast-provide: failed to find any peer in table")
|
||||
|
||||
daemonLog := node.Daemon.Stderr.String()
|
||||
// Should see sync mode started
|
||||
require.Contains(t, daemonLog, "fast-provide-root: enabled")
|
||||
require.Contains(t, daemonLog, "fast-provide-root: providing synchronously")
|
||||
require.Contains(t, daemonLog, fixtureCid) // Should log the specific CID being provided
|
||||
// In test environment with no DHT peers, this will fail, but the provide attempt was made
|
||||
require.True(t,
|
||||
strings.Contains(daemonLog, "sync provide completed") || strings.Contains(daemonLog, "sync provide failed"),
|
||||
"sync provide should complete or fail")
|
||||
require.Contains(t, daemonLog, fixtureCid) // Should log the specific CID being provided
|
||||
require.Contains(t, daemonLog, "sync provide failed") // Verify the failure was logged
|
||||
})
|
||||
|
||||
t.Run("fast-provide-wait ignored when root disabled", func(t *testing.T) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user