feat(rpc): add dag import with fast-provide support

adds Import method to APIDagService interface and RPC client implementation

- new DagImportResult, DagImportRoot, DagImportStats types in coreiface
- DagImportOptions with uniform Set pattern for all params (PinRoots, Stats, FastProvideRoot, FastProvideWait)
- streaming channel API for handling multiple roots and stats
- tests covering basic import, stats, offline mode, and blocking wait
This commit is contained in:
Marcin Rataj 2025-11-19 19:12:59 +01:00
parent 91dd98c3de
commit 7c5db10169
4 changed files with 356 additions and 0 deletions

View File

@ -3,13 +3,16 @@ package rpc
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/path"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
multicodec "github.com/multiformats/go-multicodec"
)
@ -129,6 +132,72 @@ func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error {
return nil
}
func (api *HttpDagServ) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) {
options, err := options.DagImportOptions(opts...)
if err != nil {
return nil, err
}
req := api.core().Request("dag/import")
if options.PinRootsSet {
req.Option("pin-roots", options.PinRoots)
}
if options.StatsSet {
req.Option("stats", options.Stats)
}
if options.FastProvideRootSet {
req.Option("fast-provide-root", options.FastProvideRoot)
}
if options.FastProvideWaitSet {
req.Option("fast-provide-wait", options.FastProvideWait)
}
req.Body(files.NewMultiFileReader(files.NewMapDirectory(map[string]files.Node{"": file}), false, false))
resp, err := req.Send(ctx)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}
out := make(chan iface.DagImportResult)
go func() {
defer resp.Close()
defer close(out)
dec := json.NewDecoder(resp.Output)
for {
var event iface.DagImportResult
if err := dec.Decode(&event); err != nil {
if err != io.EOF {
select {
case out <- iface.DagImportResult{}:
case <-ctx.Done():
}
}
return
}
select {
case out <- event:
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (api *httpNodeAdder) core() *HttpApi {
return (*HttpApi)(api)
}

177
client/rpc/dag_test.go Normal file
View File

@ -0,0 +1,177 @@
package rpc
import (
"context"
"os"
"testing"
"github.com/ipfs/boxo/files"
"github.com/ipfs/go-cid"
"github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/require"
)
func TestDagImport_Basic(t *testing.T) {
t.Parallel()
ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon()
defer node.StopDaemon()
apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)
api, err := NewApi(apiMaddr)
require.NoError(t, err)
// Open test fixture
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
require.NoError(t, err)
defer carFile.Close()
// Import CAR file
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile))
require.NoError(t, err)
// Collect results
var roots []cid.Cid
for result := range results {
if result.Root != nil {
roots = append(roots, result.Root.Cid)
require.Empty(t, result.Root.PinErrorMsg, "pin should succeed")
}
}
// Verify we got exactly one root
require.Len(t, roots, 1, "should have exactly one root")
// Verify the expected root CID
expectedRoot := "bafyreifrm6uf5o4dsaacuszf35zhibyojlqclabzrms7iak67pf62jygaq"
require.Equal(t, expectedRoot, roots[0].String())
}
func TestDagImport_WithStats(t *testing.T) {
t.Parallel()
ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon()
defer node.StopDaemon()
apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)
api, err := NewApi(apiMaddr)
require.NoError(t, err)
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
require.NoError(t, err)
defer carFile.Close()
// Import with stats enabled
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
options.Dag.Stats(true))
require.NoError(t, err)
var roots []cid.Cid
var gotStats bool
var blockCount uint64
for result := range results {
if result.Root != nil {
roots = append(roots, result.Root.Cid)
}
if result.Stats != nil {
gotStats = true
blockCount = result.Stats.BlockCount
}
}
require.Len(t, roots, 1, "should have one root")
require.True(t, gotStats, "should receive stats")
require.Equal(t, uint64(4), blockCount, "TestDagStat.car has 4 blocks")
}
func TestDagImport_OfflineWithFastProvide(t *testing.T) {
t.Parallel()
ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon("--offline=true")
defer node.StopDaemon()
apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)
api, err := NewApi(apiMaddr)
require.NoError(t, err)
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
require.NoError(t, err)
defer carFile.Close()
// Import with fast-provide enabled in offline mode
// Should succeed gracefully (fast-provide silently skipped)
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
options.Dag.FastProvideRoot(true),
options.Dag.FastProvideWait(true))
require.NoError(t, err)
var roots []cid.Cid
for result := range results {
if result.Root != nil {
roots = append(roots, result.Root.Cid)
}
}
require.Len(t, roots, 1, "import should succeed offline with fast-provide enabled")
}
func TestDagImport_OnlineWithFastProvideWait(t *testing.T) {
t.Parallel()
ctx := context.Background()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon()
defer node.StopDaemon()
apiMaddr, err := node.TryAPIAddr()
require.NoError(t, err)
api, err := NewApi(apiMaddr)
require.NoError(t, err)
carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car")
require.NoError(t, err)
defer carFile.Close()
// Import with fast-provide wait enabled in online mode
// This tests that FastProvideWait actually blocks (not fire-and-forget).
// In isolated test environment (no DHT peers), the provide operation may:
// 1. Succeed trivially (announced to randomly discovered peers), or
// 2. Return an error (timeout/no peers)
// Both outcomes prove blocking behavior works correctly.
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
options.Dag.FastProvideRoot(true),
options.Dag.FastProvideWait(true))
if err != nil {
// Blocking wait detected provide failure (no DHT peers in isolated test)
// This proves FastProvideWait actually blocked and error propagated
require.Contains(t, err.Error(), "fast-provide",
"error should be from fast-provide operation")
return // Test passed - blocking wait worked and returned error
}
// No error - provide succeeded, verify we got results
var roots []cid.Cid
for result := range results {
if result.Root != nil {
roots = append(roots, result.Root.Cid)
}
}
require.Len(t, roots, 1, "should receive one root when provide succeeds")
}

View File

@ -1,13 +1,43 @@
package iface
import (
"context"
"github.com/ipfs/boxo/files"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/kubo/core/coreiface/options"
)
// DagImportResult represents the result of importing roots or stats from CAR files.
// Each result has either Root or Stats set, never both.
type DagImportResult struct {
Root *DagImportRoot
Stats *DagImportStats
}
// DagImportRoot represents a root CID from a CAR file header
type DagImportRoot struct {
Cid cid.Cid
PinErrorMsg string
}
// DagImportStats contains statistics about the import operation
type DagImportStats struct {
BlockCount uint64
BlockBytesCount uint64
}
// APIDagService extends ipld.DAGService
type APIDagService interface {
ipld.DAGService
// Pinning returns special NodeAdder which recursively pins added nodes
Pinning() ipld.NodeAdder
// Import imports data from CAR files.
// Returns a channel that streams results for each root CID found in CAR headers,
// and optionally stats at the end if requested via options.
// Supports importing multiple CAR files, each with multiple roots.
Import(context.Context, files.File, ...options.DagImportOption) (<-chan DagImportResult, error)
}

View File

@ -0,0 +1,80 @@
package options
type DagImportSettings struct {
PinRoots bool
PinRootsSet bool
Stats bool
StatsSet bool
FastProvideRoot bool
FastProvideRootSet bool
FastProvideWait bool
FastProvideWaitSet bool
}
type DagImportOption func(*DagImportSettings) error
func DagImportOptions(opts ...DagImportOption) (*DagImportSettings, error) {
options := &DagImportSettings{
PinRoots: false,
PinRootsSet: false,
Stats: false,
StatsSet: false,
FastProvideRoot: false,
FastProvideRootSet: false,
FastProvideWait: false,
FastProvideWaitSet: false,
}
for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}
type dagOpts struct{}
var Dag dagOpts
// PinRoots sets whether to pin roots listed in CAR headers after importing.
// If not set, server uses command default (true).
func (dagOpts) PinRoots(pin bool) DagImportOption {
return func(settings *DagImportSettings) error {
settings.PinRoots = pin
settings.PinRootsSet = true
return nil
}
}
// Stats enables output of import statistics (block count and bytes).
// If not set, server uses command default (false).
func (dagOpts) Stats(enable bool) DagImportOption {
return func(settings *DagImportSettings) error {
settings.Stats = enable
settings.StatsSet = true
return nil
}
}
// FastProvideRoot sets whether to immediately provide root CIDs to DHT for faster discovery.
// If not set, server uses Import.FastProvideRoot config value (default: true).
func (dagOpts) FastProvideRoot(enable bool) DagImportOption {
return func(settings *DagImportSettings) error {
settings.FastProvideRoot = enable
settings.FastProvideRootSet = true
return nil
}
}
// FastProvideWait sets whether to block until fast provide completes.
// If not set, server uses Import.FastProvideWait config value (default: false).
func (dagOpts) FastProvideWait(enable bool) DagImportOption {
return func(settings *DagImportSettings) error {
settings.FastProvideWait = enable
settings.FastProvideWaitSet = true
return nil
}
}