mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-23 19:37:46 +08:00
Merge pull request #2384 from ipfs/feat/dag-refactor
refactor merkledag fetching methods
This commit is contained in:
commit
dede20eab4
@ -33,7 +33,7 @@ type PinOutput struct {
|
||||
|
||||
var addPinCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
Tagline: "Pins objects to local storage.",
|
||||
Tagline: "Pins objects to local storage.",
|
||||
ShortDescription: "Stores an IPFS object(s) from a given path locally to disk.",
|
||||
},
|
||||
|
||||
|
||||
@ -233,7 +233,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) {
|
||||
}
|
||||
|
||||
var count int
|
||||
for i, ng := range rw.DAG.GetDAG(rw.Ctx, n) {
|
||||
for i, ng := range dag.GetDAG(rw.Ctx, rw.DAG, n) {
|
||||
lk := key.Key(n.Links[i].Hash)
|
||||
if rw.skip(lk) {
|
||||
continue
|
||||
|
||||
@ -3,6 +3,7 @@ package merkledag
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
@ -24,8 +25,7 @@ type DAGService interface {
|
||||
|
||||
// GetDAG returns, in order, all the single leve child
|
||||
// nodes of the passed in node.
|
||||
GetDAG(context.Context, *Node) []NodeGetter
|
||||
GetNodes(context.Context, []key.Key) []NodeGetter
|
||||
GetMany(context.Context, []key.Key) <-chan *NodeOption
|
||||
|
||||
Batch() *Batch
|
||||
}
|
||||
@ -146,21 +146,61 @@ func FindLinks(links []key.Key, k key.Key, start int) []int {
|
||||
return out
|
||||
}
|
||||
|
||||
type NodeOption struct {
|
||||
Node *Node
|
||||
Err error
|
||||
}
|
||||
|
||||
func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption {
|
||||
out := make(chan *NodeOption, len(keys))
|
||||
blocks := ds.Blocks.GetBlocks(ctx, keys)
|
||||
var count int
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
for {
|
||||
select {
|
||||
case b, ok := <-blocks:
|
||||
if !ok {
|
||||
if count != len(keys) {
|
||||
out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
|
||||
}
|
||||
return
|
||||
}
|
||||
nd, err := Decoded(b.Data)
|
||||
if err != nil {
|
||||
out <- &NodeOption{Err: err}
|
||||
return
|
||||
}
|
||||
|
||||
// buffered, no need to select
|
||||
out <- &NodeOption{Node: nd}
|
||||
count++
|
||||
|
||||
case <-ctx.Done():
|
||||
out <- &NodeOption{Err: ctx.Err()}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return out
|
||||
}
|
||||
|
||||
// GetDAG will fill out all of the links of the given Node.
|
||||
// It returns a channel of nodes, which the caller can receive
|
||||
// all the child nodes of 'root' on, in proper order.
|
||||
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
|
||||
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
|
||||
var keys []key.Key
|
||||
for _, lnk := range root.Links {
|
||||
keys = append(keys, key.Key(lnk.Hash))
|
||||
}
|
||||
|
||||
return ds.GetNodes(ctx, keys)
|
||||
return GetNodes(ctx, ds, keys)
|
||||
}
|
||||
|
||||
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
|
||||
// to the key with the same index as the passed in keys
|
||||
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
|
||||
func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
|
||||
|
||||
// Early out if no work to do
|
||||
if len(keys) == 0 {
|
||||
@ -178,22 +218,29 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
|
||||
nodechan := ds.GetMany(ctx, dedupedKeys)
|
||||
|
||||
for count := 0; count < len(keys); {
|
||||
select {
|
||||
case blk, ok := <-blkchan:
|
||||
case opt, ok := <-nodechan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
nd, err := Decoded(blk.Data)
|
||||
if err != nil {
|
||||
// NB: can happen with improperly formatted input data
|
||||
log.Debug("Got back bad block!")
|
||||
if opt.Err != nil {
|
||||
log.Error("error fetching: ", opt.Err)
|
||||
return
|
||||
}
|
||||
is := FindLinks(keys, blk.Key(), 0)
|
||||
|
||||
nd := opt.Node
|
||||
|
||||
k, err := nd.Key()
|
||||
if err != nil {
|
||||
log.Error("Failed to get node key: ", err)
|
||||
continue
|
||||
}
|
||||
|
||||
is := FindLinks(keys, k, 0)
|
||||
for _, i := range is {
|
||||
count++
|
||||
sendChans[i] <- nd
|
||||
@ -318,24 +365,30 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K
|
||||
|
||||
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
|
||||
toprocess := make(chan []key.Key, 8)
|
||||
nodes := make(chan *Node, 8)
|
||||
errs := make(chan error, 1)
|
||||
nodes := make(chan *NodeOption, 8)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
defer close(toprocess)
|
||||
|
||||
go fetchNodes(ctx, ds, toprocess, nodes, errs)
|
||||
go fetchNodes(ctx, ds, toprocess, nodes)
|
||||
|
||||
nodes <- root
|
||||
nodes <- &NodeOption{Node: root}
|
||||
live := 1
|
||||
|
||||
for {
|
||||
select {
|
||||
case nd, ok := <-nodes:
|
||||
case opt, ok := <-nodes:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if opt.Err != nil {
|
||||
return opt.Err
|
||||
}
|
||||
|
||||
nd := opt.Node
|
||||
|
||||
// a node has been fetched
|
||||
live--
|
||||
|
||||
@ -360,38 +413,35 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
case err := <-errs:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
|
||||
defer close(out)
|
||||
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) {
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
// wait for all 'get' calls to complete so we don't accidentally send
|
||||
// on a closed channel
|
||||
wg.Wait()
|
||||
close(out)
|
||||
}()
|
||||
|
||||
get := func(g NodeGetter) {
|
||||
nd, err := g.Get(ctx)
|
||||
if err != nil {
|
||||
get := func(ks []key.Key) {
|
||||
defer wg.Done()
|
||||
nodes := ds.GetMany(ctx, ks)
|
||||
for opt := range nodes {
|
||||
select {
|
||||
case errs <- err:
|
||||
case out <- opt:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case out <- nd:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for ks := range in {
|
||||
ng := ds.GetNodes(ctx, ks)
|
||||
for _, g := range ng {
|
||||
go get(g)
|
||||
}
|
||||
wg.Add(1)
|
||||
go get(ks)
|
||||
}
|
||||
}
|
||||
|
||||
@ -238,9 +238,9 @@ test_expect_success "some are no longer there" '
|
||||
'
|
||||
|
||||
test_expect_success "recursive pin fails without objects" '
|
||||
ipfs pin rm "$HASH_DIR1" &&
|
||||
test_must_fail ipfs pin add -r "$HASH_DIR1" --timeout=500ms 2>err_expected8 &&
|
||||
grep "context deadline exceeded" err_expected8 ||
|
||||
ipfs pin rm -r=false "$HASH_DIR1" &&
|
||||
test_must_fail ipfs pin add -r "$HASH_DIR1" 2>err_expected8 &&
|
||||
grep "pin: failed to fetch all nodes" err_expected8 ||
|
||||
test_fsh cat err_expected8
|
||||
'
|
||||
|
||||
@ -275,9 +275,9 @@ test_expect_success "test add nopin dir" '
|
||||
FICTIONAL_HASH="QmXV4f9v8a56MxWKBhP3ETsz4EaafudU1cKfPaaJnenc48"
|
||||
test_launch_ipfs_daemon
|
||||
test_expect_success "test unpinning a hash that's not pinned" "
|
||||
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=5s
|
||||
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=5s
|
||||
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=5s
|
||||
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=2s
|
||||
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=2s
|
||||
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=2s
|
||||
"
|
||||
test_kill_ipfs_daemon
|
||||
|
||||
|
||||
@ -39,7 +39,7 @@ func (w *Writer) writeDir(nd *mdag.Node, fpath string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
for i, ng := range w.Dag.GetDAG(w.ctx, nd) {
|
||||
for i, ng := range mdag.GetDAG(w.ctx, w.Dag, nd) {
|
||||
child, err := ng.Get(w.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -90,7 +90,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
|
||||
|
||||
func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
|
||||
fctx, cancel := context.WithCancel(ctx)
|
||||
promises := serv.GetDAG(fctx, n)
|
||||
promises := mdag.GetDAG(fctx, serv, n)
|
||||
return &DagReader{
|
||||
node: n,
|
||||
serv: serv,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user