feat(client/rpc): add provide stat and dag import support

adds RPC client support for:
- ipfs provide stat (with --lan flag for dual DHT)
- ipfs dag import (with --fast-provide-root/--fast-provide-wait)

client/rpc changes:
- dag.go: add Import() method (~70 lines)
- dag_test.go: 4 test cases for Import (new file)
- routing.go: add ProvideStats() method (~25 lines)
- routing_test.go: 3 test cases for ProvideStats (new file)

to enable RPC client, refactored commands to use CoreAPI:
- add ProvideStats() to RoutingAPI interface and implementation
- add Import() to APIDagService interface and implementation
- commands delegate to CoreAPI (provide.go, dag/import.go)
This commit is contained in:
Marcin Rataj 2025-11-20 05:25:55 +01:00
parent 7c5db10169
commit 45d17e72b6
13 changed files with 622 additions and 215 deletions

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"github.com/ipfs/boxo/path"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
@ -156,6 +157,31 @@ func (api *RoutingAPI) Provide(ctx context.Context, p path.Path, opts ...options
Exec(ctx, nil)
}
func (api *RoutingAPI) ProvideStats(ctx context.Context, opts ...options.RoutingProvideStatOption) (*iface.ProvideStatsResponse, error) {
options, err := options.RoutingProvideStatOptions(opts...)
if err != nil {
return nil, err
}
resp, err := api.core().Request("provide/stat").
Option("lan", options.UseLAN).
Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
defer resp.Close()
var out iface.ProvideStatsResponse
if err := json.NewDecoder(resp.Output).Decode(&out); err != nil {
return nil, err
}
return &out, nil
}
func (api *RoutingAPI) core() *HttpApi {
return (*HttpApi)(api)
}

163
client/rpc/routing_test.go Normal file
View File

@ -0,0 +1,163 @@
package rpc
import (
"context"
"encoding/json"
"testing"
boxoprovider "github.com/ipfs/boxo/provider"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/libp2p/go-libp2p-kad-dht/provider/stats"
"github.com/stretchr/testify/require"
)
// Compile-time check: ensure our response type is compatible with kubo's provideStats
// This verifies that JSON marshaling/unmarshaling will work correctly
var _ = func() {
// Create instance of command's provideStats structure
cmdStats := struct {
Sweep *stats.Stats `json:"Sweep,omitempty"`
Legacy *boxoprovider.ReproviderStats `json:"Legacy,omitempty"`
FullRT bool `json:"FullRT,omitempty"`
}{}
// Marshal and unmarshal to verify compatibility
data, _ := json.Marshal(cmdStats)
var ifaceStats iface.ProvideStatsResponse
_ = json.Unmarshal(data, &ifaceStats)
}
// testProvideStats mirrors the subset of fields we verify in tests.
// Intentionally independent from coreiface types to detect breaking changes.
type testProvideStats struct {
Sweep *struct {
Connectivity struct {
Status string `json:"status"`
} `json:"connectivity"`
Queues struct {
PendingKeyProvides int `json:"pending_key_provides"`
} `json:"queues"`
Schedule struct {
Keys int `json:"keys"`
} `json:"schedule"`
} `json:"Sweep,omitempty"`
Legacy *struct {
TotalReprovides uint64 `json:"TotalReprovides"`
} `json:"Legacy,omitempty"`
}
func TestProvideStats_WithSweepProvider(t *testing.T) {
t.Parallel()
ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init()
// Explicitly enable Sweep provider (default in v0.39)
node.SetIPFSConfig("Provide.DHT.SweepEnabled", true)
node.SetIPFSConfig("Provide.Enabled", true)
node.StartDaemon()
defer node.StopDaemon()
apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)
api, err := NewApi(apiMaddr)
require.NoError(t, err)
// Get provide stats
result, err := api.Routing().ProvideStats(ctx)
require.NoError(t, err)
require.NotNil(t, result)
// Verify Sweep stats are present, Legacy is not
require.NotNil(t, result.Sweep, "Sweep provider should return Sweep stats")
require.Nil(t, result.Legacy, "Sweep provider should not return Legacy stats")
// Marshal to JSON and unmarshal to test struct to verify structure
data, err := json.Marshal(result)
require.NoError(t, err)
var testStats testProvideStats
err = json.Unmarshal(data, &testStats)
require.NoError(t, err)
// Verify key fields exist and have reasonable values
require.NotNil(t, testStats.Sweep)
require.NotEmpty(t, testStats.Sweep.Connectivity.Status, "connectivity status should be present")
require.GreaterOrEqual(t, testStats.Sweep.Queues.PendingKeyProvides, 0, "queue size should be non-negative")
require.GreaterOrEqual(t, testStats.Sweep.Schedule.Keys, 0, "scheduled keys should be non-negative")
}
func TestProvideStats_WithLegacyProvider(t *testing.T) {
t.Parallel()
ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init()
// Explicitly disable Sweep to use Legacy provider
node.SetIPFSConfig("Provide.DHT.SweepEnabled", false)
node.SetIPFSConfig("Provide.Enabled", true)
node.StartDaemon()
defer node.StopDaemon()
apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)
api, err := NewApi(apiMaddr)
require.NoError(t, err)
// Get provide stats
result, err := api.Routing().ProvideStats(ctx)
require.NoError(t, err)
require.NotNil(t, result)
// Verify Legacy stats are present, Sweep is not
require.Nil(t, result.Sweep, "Legacy provider should not return Sweep stats")
require.NotNil(t, result.Legacy, "Legacy provider should return Legacy stats")
// Marshal to JSON and unmarshal to test struct to verify structure
data, err := json.Marshal(result)
require.NoError(t, err)
var testStats testProvideStats
err = json.Unmarshal(data, &testStats)
require.NoError(t, err)
// Verify Legacy field exists
require.NotNil(t, testStats.Legacy)
require.GreaterOrEqual(t, testStats.Legacy.TotalReprovides, uint64(0), "total reprovides should be non-negative")
}
func TestProvideStats_LANFlagErrorWithLegacy(t *testing.T) {
t.Parallel()
ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init()
// Use Legacy provider - LAN flag should error
node.SetIPFSConfig("Provide.DHT.SweepEnabled", false)
node.SetIPFSConfig("Provide.Enabled", true)
node.StartDaemon()
defer node.StopDaemon()
apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)
api, err := NewApi(apiMaddr)
require.NoError(t, err)
// Try to get LAN stats with Legacy provider
// This should return an error
_, err = api.Routing().ProvideStats(ctx, options.Routing.UseLAN(true))
require.Error(t, err, "LAN flag should error with Legacy provider")
require.Contains(t, err.Error(), "LAN stats only available for Sweep provider with Dual DHT",
"error should indicate LAN stats unavailable")
}

View File

@ -2,26 +2,14 @@ package dagcmd
import (
"errors"
"fmt"
"io"
"github.com/ipfs/boxo/files"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
ipld "github.com/ipfs/go-ipld-format"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/coreiface/options"
gocarv2 "github.com/ipld/go-car/v2"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"
"github.com/ipfs/kubo/core/coreiface/options"
)
var log = logging.Logger("core/commands")
func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
node, err := cmdenv.GetNode(env)
if err != nil {
@ -38,62 +26,31 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return err
}
blockDecoder := ipldlegacy.NewDecoder()
// on import ensure we do not reach out to the network for any reason
// if a pin based on what is imported + what is in the blockstore
// isn't possible: tough luck
// Ensure offline mode - import should not reach out to network
api, err = api.WithOptions(options.Api.Offline(true))
if err != nil {
return err
}
// Parse options
doPinRoots, _ := req.Options[pinRootsOptionName].(bool)
doStats, _ := req.Options[statsOptionName].(bool)
fastProvideRoot, fastProvideRootSet := req.Options[fastProvideRootOptionName].(bool)
fastProvideWait, fastProvideWaitSet := req.Options[fastProvideWaitOptionName].(bool)
// Resolve fast-provide options from config if not explicitly set
fastProvideRoot = config.ResolveBoolFromConfig(fastProvideRoot, fastProvideRootSet, cfg.Import.FastProvideRoot, config.DefaultFastProvideRoot)
fastProvideWait = config.ResolveBoolFromConfig(fastProvideWait, fastProvideWaitSet, cfg.Import.FastProvideWait, config.DefaultFastProvideWait)
// grab a pinlock ( which doubles as a GC lock ) so that regardless of the
// size of the streamed-in cars nothing will disappear on us before we had
// a chance to roots that may show up at the very end
// This is especially important for use cases like dagger:
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
//
if doPinRoots {
unlocker := node.Blockstore.PinLock(req.Context)
defer unlocker.Unlock(req.Context)
}
// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag(),
// Default: 128. Means 128 file descriptors needed in flatfs
ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))),
// Default 100MiB. When setting block size to 1MiB, we can add
// ~100 nodes maximum. With default 256KiB block-size, we will
// hit the max nodes limit at 32MiB.p
ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))),
)
roots := cid.NewSet()
var blockCount, blockBytesCount uint64
// remember last valid block and provide a meaningful error message
// when a truncated/mangled CAR is being imported
importError := func(previous blocks.Block, current blocks.Block, err error) error {
if current != nil {
return fmt.Errorf("import failed at block %q: %w", current.Cid(), err)
}
if previous != nil {
return fmt.Errorf("import failed after block %q: %w", previous.Cid(), err)
}
return fmt.Errorf("import failed: %w", err)
// Build CoreAPI options
dagOpts := []options.DagImportOption{
options.Dag.PinRoots(doPinRoots),
options.Dag.Stats(doStats),
options.Dag.FastProvideRoot(fastProvideRoot),
options.Dag.FastProvideWait(fastProvideWait),
}
// Process each file
it := req.Files.Entries()
for it.Next() {
file := files.FileFromEntry(it)
@ -101,118 +58,44 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return errors.New("expected a file handle")
}
// import blocks
err = func() error {
// wrap a defer-closer-scope
//
// every single file in it() is already open before we start
// just close here sooner rather than later for neatness
// and to surface potential errors writing on closed fifos
// this won't/can't help with not running out of handles
defer file.Close()
// Call CoreAPI to import the file
resultChan, err := api.Dag().Import(req.Context, file, dagOpts...)
if err != nil {
return err
}
var previous blocks.Block
car, err := gocarv2.NewBlockReader(file)
if err != nil {
return err
// Stream results back to user
for result := range resultChan {
// Check for errors from CoreAPI
if result.Err != nil {
return result.Err
}
for _, c := range car.Roots {
roots.Add(c)
}
for {
block, err := car.Next()
if err != nil && err != io.EOF {
return importError(previous, block, err)
} else if block == nil {
break
}
if err := cmdutils.CheckBlockSize(req, uint64(len(block.RawData()))); err != nil {
return importError(previous, block, err)
}
// the double-decode is suboptimal, but we need it for batching
nd, err := blockDecoder.DecodeNode(req.Context, block)
// Emit root results
if result.Root != nil {
err := res.Emit(&CarImportOutput{
Root: &RootMeta{
Cid: result.Root.Cid,
PinErrorMsg: result.Root.PinErrorMsg,
},
})
if err != nil {
return importError(previous, block, err)
return err
}
if err := batch.Add(req.Context, nd); err != nil {
return importError(previous, block, err)
}
blockCount++
blockBytesCount += uint64(len(block.RawData()))
previous = block
}
return nil
}()
if err != nil {
return err
}
}
if err := batch.Commit(); err != nil {
return err
}
// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in further versions too.
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will attempt a pin *only* at the end in case all car files were well-formed.
// opportunistic pinning: try whatever sticks
if doPinRoots {
err = roots.ForEach(func(c cid.Cid) error {
ret := RootMeta{Cid: c}
// This will trigger a full read of the DAG in the pinner, to make sure we have all blocks.
// Ideally we would do colloring of the pinning state while importing the blocks
// and ensure the gray bucket is empty at the end (or use the network to download missing blocks).
if block, err := node.Blockstore.Get(req.Context, c); err != nil {
ret.PinErrorMsg = err.Error()
} else if nd, err := blockDecoder.DecodeNode(req.Context, block); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Pin(req.Context, nd, true, ""); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Flush(req.Context); err != nil {
ret.PinErrorMsg = err.Error()
}
return res.Emit(&CarImportOutput{Root: &ret})
})
if err != nil {
return err
}
}
stats, _ := req.Options[statsOptionName].(bool)
if stats {
err = res.Emit(&CarImportOutput{
Stats: &CarImportStats{
BlockCount: blockCount,
BlockBytesCount: blockBytesCount,
},
})
if err != nil {
return err
}
}
// Fast-provide roots for faster discovery
if fastProvideRoot {
err = roots.ForEach(func(c cid.Cid) error {
return cmdenv.ExecuteFastProvide(req.Context, node, cfg, c, fastProvideWait, doPinRoots, doPinRoots, false)
})
if err != nil {
return err
}
} else {
if fastProvideWait {
log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true)
} else {
log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config")
// Emit stats results
if result.Stats != nil {
err := res.Emit(&CarImportOutput{
Stats: &CarImportStats{
BlockCount: result.Stats.BlockCount,
BlockBytesCount: result.Stats.BlockBytesCount,
},
})
if err != nil {
return err
}
}
}
}

View File

@ -15,10 +15,8 @@ import (
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/coreiface/options"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"github.com/libp2p/go-libp2p-kad-dht/provider"
"github.com/libp2p/go-libp2p-kad-dht/provider/buffered"
"github.com/libp2p/go-libp2p-kad-dht/provider/dual"
"github.com/libp2p/go-libp2p-kad-dht/provider/stats"
routing "github.com/libp2p/go-libp2p/core/routing"
"github.com/probe-lab/go-libdht/kad/key"
@ -136,26 +134,6 @@ type provideStats struct {
FullRT bool // only used for legacy stats
}
// extractSweepingProvider extracts a SweepingProvider from the given provider interface.
// It handles unwrapping buffered and dual providers, selecting LAN or WAN as specified.
// Returns nil if the provider is not a sweeping provider type.
func extractSweepingProvider(prov any, useLAN bool) *provider.SweepingProvider {
switch p := prov.(type) {
case *provider.SweepingProvider:
return p
case *dual.SweepingProvider:
if useLAN {
return p.LAN
}
return p.WAN
case *buffered.SweepingProvider:
// Recursively extract from the inner provider
return extractSweepingProvider(p.Provider, useLAN)
default:
return nil
}
}
var provideStatCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
@ -234,41 +212,36 @@ NOTES:
cmds.BoolOption(provideStatQueuesOptionName, "Display provide and reprovide queue sizes"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}
if !nd.IsOnline {
return ErrNotOnline
}
lanStats, _ := req.Options[provideLanOptionName].(bool)
// Handle legacy provider
if legacySys, ok := nd.Provider.(boxoprovider.System); ok {
if lanStats {
return errors.New("LAN stats only available for Sweep provider with Dual DHT")
}
stats, err := legacySys.Stat()
if err != nil {
return err
}
_, fullRT := nd.DHTClient.(*fullrt.FullRT)
return res.Emit(provideStats{Legacy: &stats, FullRT: fullRT})
// Get stats from CoreAPI
opts := []options.RoutingProvideStatOption{}
if lanStats {
opts = append(opts, options.Routing.UseLAN(true))
}
// Extract sweeping provider (handles buffered and dual unwrapping)
sweepingProvider := extractSweepingProvider(nd.Provider, lanStats)
if sweepingProvider == nil {
if lanStats {
return errors.New("LAN stats only available for Sweep provider with Dual DHT")
}
return fmt.Errorf("stats not available with current routing system %T", nd.Provider)
result, err := api.Routing().ProvideStats(req.Context, opts...)
if err != nil {
return err
}
s := sweepingProvider.Stats()
return res.Emit(provideStats{Sweep: &s})
// Set FullRT field for display (command-layer presentation concern)
nd, err := cmdenv.GetNode(env)
if err != nil {
return err
}
_, fullRT := nd.DHTClient.(*fullrt.FullRT)
return res.Emit(provideStats{
Sweep: result.Sweep,
Legacy: result.Legacy,
FullRT: fullRT,
})
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, s provideStats) error {

View File

@ -2,11 +2,20 @@ package coreapi
import (
"context"
"fmt"
"io"
"github.com/ipfs/boxo/files"
dag "github.com/ipfs/boxo/ipld/merkledag"
pin "github.com/ipfs/boxo/pinning/pinner"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
"github.com/ipfs/kubo/config"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
gocarv2 "github.com/ipld/go-car/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -68,6 +77,250 @@ func (api *dagAPI) Session(ctx context.Context) ipld.NodeGetter {
return dag.NewSession(ctx, api.DAGService)
}
func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) {
// Parse options
settings, err := options.DagImportOptions(opts...)
if err != nil {
return nil, err
}
// Get config for batch settings
cfg, err := api.core.repo.Config()
if err != nil {
return nil, err
}
// Create block decoder for IPLD nodes
blockDecoder := ipldlegacy.NewDecoder()
// Create batch for efficient block addition
// Uses config values for batch size tuning
batch := ipld.NewBatch(ctx, api.DAGService,
ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))),
ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))),
)
// Create output channel
out := make(chan iface.DagImportResult)
// Process import in background
go func() {
defer close(out)
defer file.Close()
// Acquire pinlock if pinning roots (also serves as GC lock)
if settings.PinRoots {
unlocker := api.core.blockstore.PinLock(ctx)
defer unlocker.Unlock(ctx)
}
// Track roots from CAR headers and stats
roots := cid.NewSet()
var blockCount, blockBytesCount uint64
// Parse CAR file
car, err := gocarv2.NewBlockReader(file)
if err != nil {
out <- iface.DagImportResult{Err: fmt.Errorf("failed to create CAR reader: %w", err)}
return
}
// Collect roots from CAR header
for _, c := range car.Roots {
roots.Add(c)
}
// Process all blocks from CAR file
var previous blocks.Block
for {
block, err := car.Next()
if err != nil {
if err != io.EOF {
if previous != nil {
out <- iface.DagImportResult{Err: fmt.Errorf("error reading block after %s: %w", previous.Cid(), err)}
} else {
out <- iface.DagImportResult{Err: fmt.Errorf("error reading CAR blocks: %w", err)}
}
}
break
}
if block == nil {
break
}
// Decode block into IPLD node
nd, err := blockDecoder.DecodeNode(ctx, block)
if err != nil {
out <- iface.DagImportResult{Err: fmt.Errorf("failed to decode block %s: %w", block.Cid(), err)}
return
}
// Add node to batch
if err := batch.Add(ctx, nd); err != nil {
out <- iface.DagImportResult{Err: fmt.Errorf("failed to add block %s to batch: %w", nd.Cid(), err)}
return
}
blockCount++
blockBytesCount += uint64(len(block.RawData()))
previous = block
// Check context cancellation
select {
case <-ctx.Done():
out <- iface.DagImportResult{Err: ctx.Err()}
return
default:
}
}
// Commit batch to blockstore
if err := batch.Commit(); err != nil {
out <- iface.DagImportResult{Err: fmt.Errorf("failed to commit batch: %w", err)}
return
}
// Emit all roots (with pin status if requested)
err = roots.ForEach(func(c cid.Cid) error {
result := iface.DagImportResult{
Root: &iface.DagImportRoot{Cid: c},
}
// Attempt to pin if requested
if settings.PinRoots {
// Verify block exists in blockstore
block, err := api.core.blockstore.Get(ctx, c)
if err != nil {
result.Root.PinErrorMsg = fmt.Sprintf("blockstore get: %v", err)
} else {
// Decode node for pinning
nd, err := blockDecoder.DecodeNode(ctx, block)
if err != nil {
result.Root.PinErrorMsg = fmt.Sprintf("decode node: %v", err)
} else {
// Pin recursively
err = api.core.pinning.Pin(ctx, nd, true, "")
if err != nil {
result.Root.PinErrorMsg = fmt.Sprintf("pin: %v", err)
} else {
// Flush pins to storage
err = api.core.pinning.Flush(ctx)
if err != nil {
result.Root.PinErrorMsg = fmt.Sprintf("flush: %v", err)
}
}
}
}
}
// Send root result
select {
case out <- result:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
if err != nil {
out <- iface.DagImportResult{Err: fmt.Errorf("error emitting roots: %w", err)}
return
}
// Emit stats if requested
if settings.Stats {
select {
case out <- iface.DagImportResult{
Stats: &iface.DagImportStats{
BlockCount: blockCount,
BlockBytesCount: blockBytesCount,
},
}:
case <-ctx.Done():
return
}
}
// Execute fast-provide (will check if enabled)
if err := api.executeFastProvide(ctx, cfg, roots, settings.FastProvideRoot, settings.FastProvideWait, settings.PinRoots, settings.PinRoots, false); err != nil {
select {
case out <- iface.DagImportResult{Err: err}:
case <-ctx.Done():
}
}
}()
return out, nil
}
// executeFastProvide announces roots to the DHT for faster discovery
func (api *dagAPI) executeFastProvide(ctx context.Context, cfg *config.Config, roots *cid.Set, enabled bool, wait bool, isPinned bool, isPinnedRoot bool, isMFS bool) error {
// Check if fast-provide is enabled
if !enabled {
if wait {
log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config", "wait-flag-ignored", true)
} else {
log.Debugw("fast-provide-root: skipped", "reason", "disabled by flag or config")
}
return nil
}
log.Debugw("fast-provide-root: enabled", "wait", wait)
// Check preconditions for providing
if !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) {
log.Debugw("fast-provide-root: skipped", "reason", "Provide.Enabled is false")
return nil
}
if cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0 {
log.Debugw("fast-provide-root: skipped", "reason", "Provide.DHT.Interval is 0")
return nil
}
if !api.core.nd.HasActiveDHTClient() {
log.Debugw("fast-provide-root: skipped", "reason", "DHT not available")
return nil
}
// Check provide strategy
strategyStr := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy)
strategy := config.ParseProvideStrategy(strategyStr)
shouldProvide := config.ShouldProvideForStrategy(strategy, isPinned, isPinnedRoot, isMFS)
if !shouldProvide {
log.Debugw("fast-provide-root: skipped", "reason", "strategy does not match content", "strategy", strategyStr, "pinned", isPinned, "pinnedRoot", isPinnedRoot, "mfs", isMFS)
return nil
}
// Provide each root
return roots.ForEach(func(c cid.Cid) error {
if wait {
// Synchronous mode: block until provide completes
log.Debugw("fast-provide-root: providing synchronously", "cid", c)
if err := api.core.nd.DHTClient.Provide(ctx, c, true); err != nil {
log.Warnw("fast-provide-root: sync provide failed", "cid", c, "error", err)
return fmt.Errorf("fast-provide: %w", err)
}
log.Debugw("fast-provide-root: sync provide completed", "cid", c)
} else {
// Asynchronous mode: fire-and-forget in goroutine
log.Debugw("fast-provide-root: providing asynchronously", "cid", c)
go func(rootCid cid.Cid) {
// Use detached context with timeout to prevent hanging
asyncCtx, cancel := context.WithTimeout(context.Background(), config.DefaultFastProvideTimeout)
defer cancel()
if err := api.core.nd.DHTClient.Provide(asyncCtx, rootCid, true); err != nil {
log.Warnw("fast-provide-root: async provide failed", "cid", rootCid, "error", err)
} else {
log.Debugw("fast-provide-root: async provide completed", "cid", rootCid)
}
}(c)
}
return nil
})
}
var (
_ ipld.DAGService = (*dagAPI)(nil)
_ dag.SessionMaker = (*dagAPI)(nil)

View File

@ -11,12 +11,16 @@ import (
offline "github.com/ipfs/boxo/exchange/offline"
dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/path"
boxoprovider "github.com/ipfs/boxo/provider"
cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil"
coreiface "github.com/ipfs/kubo/core/coreiface"
caopts "github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/core/node"
"github.com/ipfs/kubo/tracing"
"github.com/libp2p/go-libp2p-kad-dht/provider"
"github.com/libp2p/go-libp2p-kad-dht/provider/buffered"
"github.com/libp2p/go-libp2p-kad-dht/provider/dual"
peer "github.com/libp2p/go-libp2p/core/peer"
mh "github.com/multiformats/go-multihash"
"go.opentelemetry.io/otel/attribute"
@ -222,6 +226,63 @@ func provideKeysRec(ctx context.Context, prov node.DHTProvider, bs blockstore.Bl
}
}
// extractSweepingProvider extracts a SweepingProvider from the given provider interface.
// It handles unwrapping buffered and dual providers, selecting LAN or WAN as specified.
// Returns nil if the provider is not a sweeping provider type.
func extractSweepingProvider(prov any, useLAN bool) *provider.SweepingProvider {
switch p := prov.(type) {
case *provider.SweepingProvider:
return p
case *dual.SweepingProvider:
if useLAN {
return p.LAN
}
return p.WAN
case *buffered.SweepingProvider:
// Recursively extract from the inner provider
return extractSweepingProvider(p.Provider, useLAN)
default:
return nil
}
}
func (api *RoutingAPI) ProvideStats(ctx context.Context, opts ...caopts.RoutingProvideStatOption) (*coreiface.ProvideStatsResponse, error) {
options, err := caopts.RoutingProvideStatOptions(opts...)
if err != nil {
return nil, err
}
if !api.nd.IsOnline {
return nil, coreiface.ErrOffline
}
// Handle legacy provider
if legacySys, ok := api.provider.(boxoprovider.System); ok {
if options.UseLAN {
return nil, errors.New("LAN stats only available for Sweep provider with Dual DHT")
}
stats, err := legacySys.Stat()
if err != nil {
return nil, err
}
// Note: FullRT field is not set here as we don't have access to nd.DHTClient
// This field is primarily for display purposes in the command
return &coreiface.ProvideStatsResponse{Legacy: &stats, FullRT: false}, nil
}
// Extract sweeping provider (handles buffered and dual unwrapping)
sweepingProvider := extractSweepingProvider(api.provider, options.UseLAN)
if sweepingProvider == nil {
if options.UseLAN {
return nil, errors.New("LAN stats only available for Sweep provider with Dual DHT")
}
return nil, fmt.Errorf("stats not available with current routing system %T", api.provider)
}
s := sweepingProvider.Stats()
return &coreiface.ProvideStatsResponse{Sweep: &s}, nil
}
func (api *RoutingAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}

View File

@ -14,6 +14,7 @@ import (
type DagImportResult struct {
Root *DagImportRoot
Stats *DagImportStats
Err error
}
// DagImportRoot represents a root CID from a CAR file header

View File

@ -96,3 +96,34 @@ func (routingOpts) AllowOffline(allow bool) RoutingPutOption {
return nil
}
}
type RoutingProvideStatSettings struct {
UseLAN bool
}
type RoutingProvideStatOption func(*RoutingProvideStatSettings) error
func RoutingProvideStatOptions(opts ...RoutingProvideStatOption) (*RoutingProvideStatSettings, error) {
options := &RoutingProvideStatSettings{
UseLAN: false,
}
for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}
// UseLAN is an option for [Routing.ProvideStats] which specifies whether to
// return stats for LAN DHT only (only valid for Sweep provider with Dual DHT).
// Default value is false (WAN DHT stats).
func (routingOpts) UseLAN(useLAN bool) RoutingProvideStatOption {
return func(settings *RoutingProvideStatSettings) error {
settings.UseLAN = useLAN
return nil
}
}

View File

@ -4,7 +4,9 @@ import (
"context"
"github.com/ipfs/boxo/path"
boxoprovider "github.com/ipfs/boxo/provider"
"github.com/ipfs/kubo/core/coreiface/options"
"github.com/libp2p/go-libp2p-kad-dht/provider/stats"
"github.com/libp2p/go-libp2p/core/peer"
)
@ -26,4 +28,15 @@ type RoutingAPI interface {
// Provide announces to the network that you are providing given values
Provide(context.Context, path.Path, ...options.RoutingProvideOption) error
// ProvideStats returns statistics about the provide system.
// Returns stats for either sweep provider (new default) or legacy provider.
ProvideStats(context.Context, ...options.RoutingProvideStatOption) (*ProvideStatsResponse, error)
}
// ProvideStatsResponse contains statistics about the provide system
type ProvideStatsResponse struct {
Sweep *stats.Stats `json:"Sweep,omitempty"`
Legacy *boxoprovider.ReproviderStats `json:"Legacy,omitempty"`
FullRT bool `json:"FullRT,omitempty"`
}

View File

@ -120,7 +120,7 @@ func TestDagImportFastProvide(t *testing.T) {
node.StartDaemonWithReq(harness.RunRequest{
CmdOpts: []harness.CmdOpt{
harness.RunWithEnv(map[string]string{
"GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug",
"GOLOG_LOG_LEVEL": "error,coreapi=debug",
}),
},
}, "")
@ -146,7 +146,7 @@ func TestDagImportFastProvide(t *testing.T) {
node.StartDaemonWithReq(harness.RunRequest{
CmdOpts: []harness.CmdOpt{
harness.RunWithEnv(map[string]string{
"GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug",
"GOLOG_LOG_LEVEL": "error,coreapi=debug",
}),
},
}, "")
@ -183,7 +183,7 @@ func TestDagImportFastProvide(t *testing.T) {
node.StartDaemonWithReq(harness.RunRequest{
CmdOpts: []harness.CmdOpt{
harness.RunWithEnv(map[string]string{
"GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug",
"GOLOG_LOG_LEVEL": "error,coreapi=debug",
}),
},
}, "")
@ -226,7 +226,7 @@ func TestDagImportFastProvide(t *testing.T) {
node.StartDaemonWithReq(harness.RunRequest{
CmdOpts: []harness.CmdOpt{
harness.RunWithEnv(map[string]string{
"GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug",
"GOLOG_LOG_LEVEL": "error,coreapi=debug",
}),
},
}, "")
@ -254,7 +254,7 @@ func TestDagImportFastProvide(t *testing.T) {
node.StartDaemonWithReq(harness.RunRequest{
CmdOpts: []harness.CmdOpt{
harness.RunWithEnv(map[string]string{
"GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug",
"GOLOG_LOG_LEVEL": "error,coreapi=debug",
}),
},
}, "")
@ -284,7 +284,7 @@ func TestDagImportFastProvide(t *testing.T) {
node.StartDaemonWithReq(harness.RunRequest{
CmdOpts: []harness.CmdOpt{
harness.RunWithEnv(map[string]string{
"GOLOG_LOG_LEVEL": "error,core/commands=debug,core/commands/cmdenv=debug",
"GOLOG_LOG_LEVEL": "error,coreapi=debug",
}),
},
}, "")

View File

@ -180,7 +180,7 @@ func TestProvideStatBasic(t *testing.T) {
res := node.RunIPFS("provide", "stat")
assert.Error(t, res.Err)
assert.Contains(t, res.Stderr.String(), "this command must be run in online mode")
assert.Contains(t, res.Stderr.String(), "this action must be run in online mode")
})
}

View File

@ -249,6 +249,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/polyfloyd/go-errorlint v1.7.1 // indirect
github.com/probe-lab/go-libdht v0.4.0 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect

View File

@ -650,6 +650,8 @@ github.com/polyfloyd/go-errorlint v1.7.1 h1:RyLVXIbosq1gBdk/pChWA8zWYLsq9UEw7a1L
github.com/polyfloyd/go-errorlint v1.7.1/go.mod h1:aXjNb1x2TNhoLsk26iv1yl7a+zTnXPhwEMtEXukiLR8=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/probe-lab/go-libdht v0.4.0 h1:LAqHuko/owRW6+0cs5wmJXbHzg09EUMJEh5DI37yXqo=
github.com/probe-lab/go-libdht v0.4.0/go.mod h1:hamw22kI6YkPQFGy5P6BrWWDrgE9ety5Si8iWAyuDvc=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=