diff --git a/client/rpc/dag.go b/client/rpc/dag.go index 63cac8f61..345502ebf 100644 --- a/client/rpc/dag.go +++ b/client/rpc/dag.go @@ -3,13 +3,16 @@ package rpc import ( "bytes" "context" + "encoding/json" "fmt" "io" + "github.com/ipfs/boxo/files" "github.com/ipfs/boxo/path" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" + iface "github.com/ipfs/kubo/core/coreiface" "github.com/ipfs/kubo/core/coreiface/options" multicodec "github.com/multiformats/go-multicodec" ) @@ -129,6 +132,72 @@ func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error { return nil } +func (api *HttpDagServ) Import(ctx context.Context, file files.File, opts ...options.DagImportOption) (<-chan iface.DagImportResult, error) { + options, err := options.DagImportOptions(opts...) + if err != nil { + return nil, err + } + + req := api.core().Request("dag/import") + + if options.PinRootsSet { + req.Option("pin-roots", options.PinRoots) + } + + if options.StatsSet { + req.Option("stats", options.Stats) + } + + if options.FastProvideRootSet { + req.Option("fast-provide-root", options.FastProvideRoot) + } + + if options.FastProvideWaitSet { + req.Option("fast-provide-wait", options.FastProvideWait) + } + + req.Body(files.NewMultiFileReader(files.NewMapDirectory(map[string]files.Node{"": file}), false, false)) + + resp, err := req.Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + out := make(chan iface.DagImportResult) + + go func() { + defer resp.Close() + defer close(out) + + dec := json.NewDecoder(resp.Output) + + for { + var event iface.DagImportResult + + if err := dec.Decode(&event); err != nil { + if err != io.EOF { + select { + case out <- iface.DagImportResult{}: + case <-ctx.Done(): + } + } + return + } + + select { + case out <- event: + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + func (api *httpNodeAdder) core() *HttpApi { return (*HttpApi)(api) } diff --git a/client/rpc/dag_test.go b/client/rpc/dag_test.go new file mode 100644 index 000000000..4e1151e43 --- /dev/null +++ b/client/rpc/dag_test.go @@ -0,0 +1,177 @@ +package rpc + +import ( + "context" + "os" + "testing" + + "github.com/ipfs/boxo/files" + "github.com/ipfs/go-cid" + "github.com/ipfs/kubo/core/coreiface/options" + "github.com/ipfs/kubo/test/cli/harness" + "github.com/stretchr/testify/require" +) + +func TestDagImport_Basic(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + // Open test fixture + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import CAR file + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile)) + require.NoError(t, err) + + // Collect results + var roots []cid.Cid + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + require.Empty(t, result.Root.PinErrorMsg, "pin should succeed") + } + } + + // Verify we got exactly one root + require.Len(t, roots, 1, "should have exactly one root") + + // Verify the expected root CID + expectedRoot := "bafyreifrm6uf5o4dsaacuszf35zhibyojlqclabzrms7iak67pf62jygaq" + require.Equal(t, expectedRoot, roots[0].String()) +} + +func TestDagImport_WithStats(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with stats enabled + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.Stats(true)) + require.NoError(t, err) + + var roots []cid.Cid + var gotStats bool + var blockCount uint64 + + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + } + if result.Stats != nil { + gotStats = true + blockCount = result.Stats.BlockCount + } + } + + require.Len(t, roots, 1, "should have one root") + require.True(t, gotStats, "should receive stats") + require.Equal(t, uint64(4), blockCount, "TestDagStat.car has 4 blocks") +} + +func TestDagImport_OfflineWithFastProvide(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon("--offline=true") + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with fast-provide enabled in offline mode + // Should succeed gracefully (fast-provide silently skipped) + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.FastProvideRoot(true), + options.Dag.FastProvideWait(true)) + require.NoError(t, err) + + var roots []cid.Cid + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + } + } + + require.Len(t, roots, 1, "import should succeed offline with fast-provide enabled") +} + +func TestDagImport_OnlineWithFastProvideWait(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := harness.NewT(t) + node := h.NewNode().Init().StartDaemon() + defer node.StopDaemon() + + apiMaddr, err := node.TryAPIAddr() + require.NoError(t, err) + + api, err := NewApi(apiMaddr) + require.NoError(t, err) + + carFile, err := os.Open("../../test/cli/fixtures/TestDagStat.car") + require.NoError(t, err) + defer carFile.Close() + + // Import with fast-provide wait enabled in online mode + // This tests that FastProvideWait actually blocks (not fire-and-forget). + // In isolated test environment (no DHT peers), the provide operation may: + // 1. Succeed trivially (announced to randomly discovered peers), or + // 2. Return an error (timeout/no peers) + // Both outcomes prove blocking behavior works correctly. + results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile), + options.Dag.FastProvideRoot(true), + options.Dag.FastProvideWait(true)) + + if err != nil { + // Blocking wait detected provide failure (no DHT peers in isolated test) + // This proves FastProvideWait actually blocked and error propagated + require.Contains(t, err.Error(), "fast-provide", + "error should be from fast-provide operation") + return // Test passed - blocking wait worked and returned error + } + + // No error - provide succeeded, verify we got results + var roots []cid.Cid + for result := range results { + if result.Root != nil { + roots = append(roots, result.Root.Cid) + } + } + + require.Len(t, roots, 1, "should receive one root when provide succeeds") +} diff --git a/core/coreiface/dag.go b/core/coreiface/dag.go index 3cc3aeb4d..3598e4528 100644 --- a/core/coreiface/dag.go +++ b/core/coreiface/dag.go @@ -1,13 +1,43 @@ package iface import ( + "context" + + "github.com/ipfs/boxo/files" + "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/kubo/core/coreiface/options" ) +// DagImportResult represents the result of importing roots or stats from CAR files. +// Each result has either Root or Stats set, never both. +type DagImportResult struct { + Root *DagImportRoot + Stats *DagImportStats +} + +// DagImportRoot represents a root CID from a CAR file header +type DagImportRoot struct { + Cid cid.Cid + PinErrorMsg string +} + +// DagImportStats contains statistics about the import operation +type DagImportStats struct { + BlockCount uint64 + BlockBytesCount uint64 +} + // APIDagService extends ipld.DAGService type APIDagService interface { ipld.DAGService // Pinning returns special NodeAdder which recursively pins added nodes Pinning() ipld.NodeAdder + + // Import imports data from CAR files. + // Returns a channel that streams results for each root CID found in CAR headers, + // and optionally stats at the end if requested via options. + // Supports importing multiple CAR files, each with multiple roots. + Import(context.Context, files.File, ...options.DagImportOption) (<-chan DagImportResult, error) } diff --git a/core/coreiface/options/dag.go b/core/coreiface/options/dag.go new file mode 100644 index 000000000..90d4e1251 --- /dev/null +++ b/core/coreiface/options/dag.go @@ -0,0 +1,80 @@ +package options + +type DagImportSettings struct { + PinRoots bool + PinRootsSet bool + Stats bool + StatsSet bool + FastProvideRoot bool + FastProvideRootSet bool + FastProvideWait bool + FastProvideWaitSet bool +} + +type DagImportOption func(*DagImportSettings) error + +func DagImportOptions(opts ...DagImportOption) (*DagImportSettings, error) { + options := &DagImportSettings{ + PinRoots: false, + PinRootsSet: false, + Stats: false, + StatsSet: false, + FastProvideRoot: false, + FastProvideRootSet: false, + FastProvideWait: false, + FastProvideWaitSet: false, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + + return options, nil +} + +type dagOpts struct{} + +var Dag dagOpts + +// PinRoots sets whether to pin roots listed in CAR headers after importing. +// If not set, server uses command default (true). +func (dagOpts) PinRoots(pin bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.PinRoots = pin + settings.PinRootsSet = true + return nil + } +} + +// Stats enables output of import statistics (block count and bytes). +// If not set, server uses command default (false). +func (dagOpts) Stats(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.Stats = enable + settings.StatsSet = true + return nil + } +} + +// FastProvideRoot sets whether to immediately provide root CIDs to DHT for faster discovery. +// If not set, server uses Import.FastProvideRoot config value (default: true). +func (dagOpts) FastProvideRoot(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.FastProvideRoot = enable + settings.FastProvideRootSet = true + return nil + } +} + +// FastProvideWait sets whether to block until fast provide completes. +// If not set, server uses Import.FastProvideWait config value (default: false). +func (dagOpts) FastProvideWait(enable bool) DagImportOption { + return func(settings *DagImportSettings) error { + settings.FastProvideWait = enable + settings.FastProvideWaitSet = true + return nil + } +}