mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
feat: improve dag/import (#9721)
- don't bypass the CoreApi - don't use a goroutine and return channel for `importWorker`, when what's happening is really just a synchronous call - only `PinLock()` when we are going to pin - use `cid.Set` instead of an explicit map - fail the request early if any pinning fail, no need to try to pin more if the request failed already
This commit is contained in:
parent
668d0b2fa4
commit
1457b4fd4a
@ -168,13 +168,6 @@ var DagResolveCmd = &cmds.Command{
|
||||
Type: ResolveOutput{},
|
||||
}
|
||||
|
||||
type importResult struct {
|
||||
blockCount uint64
|
||||
blockBytesCount uint64
|
||||
roots map[cid.Cid]struct{}
|
||||
err error
|
||||
}
|
||||
|
||||
// DagImportCmd is a command for importing a car to ipfs
|
||||
var DagImportCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
|
||||
@ -2,24 +2,22 @@ package dagcmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
ipldlegacy "github.com/ipfs/go-ipld-legacy"
|
||||
"github.com/ipfs/go-libipfs/files"
|
||||
iface "github.com/ipfs/interface-go-ipfs-core"
|
||||
"github.com/ipfs/interface-go-ipfs-core/options"
|
||||
"github.com/ipfs/interface-go-ipfs-core/path"
|
||||
gocarv2 "github.com/ipld/go-car/v2"
|
||||
|
||||
"github.com/ipfs/kubo/core/commands/cmdenv"
|
||||
"github.com/ipfs/kubo/core/commands/cmdutils"
|
||||
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
gocarv2 "github.com/ipld/go-car/v2"
|
||||
)
|
||||
|
||||
func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
|
||||
node, err := cmdenv.GetNode(env)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -38,127 +36,42 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
|
||||
return err
|
||||
}
|
||||
|
||||
doPinRoots, _ := req.Options[pinRootsOptionName].(bool)
|
||||
|
||||
// grab a pinlock ( which doubles as a GC lock ) so that regardless of the
|
||||
// size of the streamed-in cars nothing will disappear on us before we had
|
||||
// a chance to roots that may show up at the very end
|
||||
// This is especially important for use cases like dagger:
|
||||
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
|
||||
//
|
||||
unlocker := node.Blockstore.PinLock(req.Context)
|
||||
defer unlocker.Unlock(req.Context)
|
||||
|
||||
doPinRoots, _ := req.Options[pinRootsOptionName].(bool)
|
||||
|
||||
retCh := make(chan importResult, 1)
|
||||
go importWorker(req, res, api, retCh)
|
||||
|
||||
done := <-retCh
|
||||
if done.err != nil {
|
||||
return done.err
|
||||
}
|
||||
|
||||
// It is not guaranteed that a root in a header is actually present in the same ( or any )
|
||||
// .car file. This is the case in version 1, and ideally in further versions too
|
||||
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
|
||||
// We will attempt a pin *only* at the end in case all car files were well formed
|
||||
//
|
||||
// The boolean value indicates whether we have encountered the root within the car file's
|
||||
roots := done.roots
|
||||
|
||||
// opportunistic pinning: try whatever sticks
|
||||
if doPinRoots {
|
||||
|
||||
var failedPins int
|
||||
for c := range roots {
|
||||
|
||||
// We need to re-retrieve a block, convert it to ipld, and feed it
|
||||
// to the Pinning interface, sigh...
|
||||
//
|
||||
// If we didn't have the problem of inability to take multiple pinlocks,
|
||||
// we could use the api directly like so (though internally it does the same):
|
||||
//
|
||||
// // not ideal, but the pinning api takes only paths :(
|
||||
// rp := path.NewResolvedPath(
|
||||
// ipfspath.FromCid(c),
|
||||
// c,
|
||||
// c,
|
||||
// "",
|
||||
// )
|
||||
//
|
||||
// if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil {
|
||||
|
||||
ret := RootMeta{Cid: c}
|
||||
|
||||
if block, err := node.Blockstore.Get(req.Context, c); err != nil {
|
||||
ret.PinErrorMsg = err.Error()
|
||||
} else if nd, err := ipldlegacy.DecodeNode(req.Context, block); err != nil {
|
||||
ret.PinErrorMsg = err.Error()
|
||||
} else if err := node.Pinning.Pin(req.Context, nd, true); err != nil {
|
||||
ret.PinErrorMsg = err.Error()
|
||||
} else if err := node.Pinning.Flush(req.Context); err != nil {
|
||||
ret.PinErrorMsg = err.Error()
|
||||
}
|
||||
|
||||
if ret.PinErrorMsg != "" {
|
||||
failedPins++
|
||||
}
|
||||
|
||||
if err := res.Emit(&CarImportOutput{Root: &ret}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if failedPins > 0 {
|
||||
return fmt.Errorf(
|
||||
"unable to pin all roots: %d out of %d failed",
|
||||
failedPins,
|
||||
len(roots),
|
||||
)
|
||||
}
|
||||
unlocker := node.Blockstore.PinLock(req.Context)
|
||||
defer unlocker.Unlock(req.Context)
|
||||
}
|
||||
|
||||
stats, _ := req.Options[statsOptionName].(bool)
|
||||
if stats {
|
||||
err = res.Emit(&CarImportOutput{
|
||||
Stats: &CarImportStats{
|
||||
BlockCount: done.blockCount,
|
||||
BlockBytesCount: done.blockBytesCount,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) {
|
||||
|
||||
// this is *not* a transaction
|
||||
// it is simply a way to relieve pressure on the blockstore
|
||||
// similar to pinner.Pin/pinner.Flush
|
||||
batch := ipld.NewBatch(req.Context, api.Dag())
|
||||
|
||||
roots := make(map[cid.Cid]struct{})
|
||||
roots := cid.NewSet()
|
||||
var blockCount, blockBytesCount uint64
|
||||
|
||||
it := req.Files.Entries()
|
||||
for it.Next() {
|
||||
|
||||
file := files.FileFromEntry(it)
|
||||
if file == nil {
|
||||
ret <- importResult{err: errors.New("expected a file handle")}
|
||||
return
|
||||
return errors.New("expected a file handle")
|
||||
}
|
||||
|
||||
// wrap a defer-closer-scope
|
||||
//
|
||||
// every single file in it() is already open before we start
|
||||
// just close here sooner rather than later for neatness
|
||||
// and to surface potential errors writing on closed fifos
|
||||
// this won't/can't help with not running out of handles
|
||||
err := func() error {
|
||||
// import blocks
|
||||
err = func() error {
|
||||
// wrap a defer-closer-scope
|
||||
//
|
||||
// every single file in it() is already open before we start
|
||||
// just close here sooner rather than later for neatness
|
||||
// and to surface potential errors writing on closed fifos
|
||||
// this won't/can't help with not running out of handles
|
||||
defer file.Close()
|
||||
|
||||
car, err := gocarv2.NewBlockReader(file)
|
||||
@ -167,7 +80,7 @@ func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI,
|
||||
}
|
||||
|
||||
for _, c := range car.Roots {
|
||||
roots[c] = struct{}{}
|
||||
roots.Add(c)
|
||||
}
|
||||
|
||||
for {
|
||||
@ -193,28 +106,51 @@ func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI,
|
||||
blockCount++
|
||||
blockBytesCount += uint64(len(block.RawData()))
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
ret <- importResult{err: err}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := it.Err(); err != nil {
|
||||
ret <- importResult{err: err}
|
||||
return
|
||||
}
|
||||
|
||||
if err := batch.Commit(); err != nil {
|
||||
ret <- importResult{err: err}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
ret <- importResult{
|
||||
blockCount: blockCount,
|
||||
blockBytesCount: blockBytesCount,
|
||||
roots: roots}
|
||||
// It is not guaranteed that a root in a header is actually present in the same ( or any )
|
||||
// .car file. This is the case in version 1, and ideally in further versions too.
|
||||
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
|
||||
// We will attempt a pin *only* at the end in case all car files were well-formed.
|
||||
|
||||
// opportunistic pinning: try whatever sticks
|
||||
if doPinRoots {
|
||||
err = roots.ForEach(func(c cid.Cid) error {
|
||||
ret := RootMeta{Cid: c}
|
||||
|
||||
// This will trigger a full read of the DAG in the pinner, to make sure we have all blocks.
|
||||
// Ideally we would have a lighter merkledag.Walk() instead of the underlying merkledag.FetchDag,
|
||||
// then pinner.PinWithMode().
|
||||
err = api.Pin().Add(req.Context, path.IpldPath(c), options.Pin.Recursive(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return res.Emit(&CarImportOutput{Root: &ret})
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
stats, _ := req.Options[statsOptionName].(bool)
|
||||
if stats {
|
||||
err = res.Emit(&CarImportOutput{
|
||||
Stats: &CarImportStats{
|
||||
BlockCount: blockCount,
|
||||
BlockBytesCount: blockBytesCount,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user