From dc3b9ed1407b05ee536db6399fce9f04b1963d7d Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 8 Jul 2015 08:48:18 -0700 Subject: [PATCH] address concerns from PR License: MIT Signed-off-by: Jeromy --- merkledag/merkledag.go | 159 ++++++++++++++++++------------------ merkledag/merkledag_test.go | 35 ++++---- 2 files changed, 97 insertions(+), 97 deletions(-) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 5158c42aa..a6c6633f0 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -122,84 +122,7 @@ func (n *dagService) Remove(nd *Node) error { // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, root *Node, serv DAGService) error { - toprocess := make(chan []key.Key, 8) - nodes := make(chan *Node, 8) - errs := make(chan error, 1) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(toprocess) - - go fetchNodes(ctx, serv, toprocess, nodes, errs) - - nodes <- root - live := 1 - - for { - select { - case nd, ok := <-nodes: - if !ok { - return nil - } - - var keys []key.Key - for _, lnk := range nd.Links { - keys = append(keys, key.Key(lnk.Hash)) - } - keys = dedupeKeys(keys) - - // keep track of open request, when zero, we're done - live += len(keys) - 1 - - if live == 0 { - return nil - } - - if len(keys) > 0 { - select { - case toprocess <- keys: - case <-ctx.Done(): - return ctx.Err() - } - } - case err := <-errs: - return err - case <-ctx.Done(): - return ctx.Err() - } - } -} - -func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { - defer close(out) - for { - select { - case ks, ok := <-in: - if !ok { - return - } - - ng := ds.GetNodes(ctx, ks) - for _, g := range ng { - go func(g NodeGetter) { - nd, err := g.Get(ctx) - if err != nil { - select { - case errs <- err: - case <-ctx.Done(): - } - return - } - - select { - case out <- nd: - case <-ctx.Done(): - return - } - }(g) - } - } - } + return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet()) } // FindLinks searches this nodes links for the given key, @@ -383,3 +306,83 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K } return nil } + +func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error { + toprocess := make(chan []key.Key, 8) + nodes := make(chan *Node, 8) + errs := make(chan error, 1) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(toprocess) + + go fetchNodes(ctx, ds, toprocess, nodes, errs) + + nodes <- root + live := 1 + + for { + select { + case nd, ok := <-nodes: + if !ok { + return nil + } + // a node has been fetched + live-- + + var keys []key.Key + for _, lnk := range nd.Links { + k := key.Key(lnk.Hash) + if !set.Has(k) { + set.Add(k) + live++ + keys = append(keys, k) + } + } + + if live == 0 { + return nil + } + + if len(keys) > 0 { + select { + case toprocess <- keys: + case <-ctx.Done(): + return ctx.Err() + } + } + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { + defer close(out) + + get := func(g NodeGetter) { + nd, err := g.Get(ctx) + if err != nil { + select { + case errs <- err: + case <-ctx.Done(): + } + return + } + + select { + case out <- nd: + case <-ctx.Done(): + return + } + } + + for ks := range in { + ng := ds.GetNodes(ctx, ks) + for _, g := range ng { + go get(g) + } + } +} diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 3e316b083..674df6d53 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -299,26 +299,9 @@ func TestCantGet(t *testing.T) { } func TestFetchGraph(t *testing.T) { - bsi := bstest.Mocks(t, 1)[0] - ds := NewDAGService(bsi) - - read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) - spl := &chunk.SizeSplitter{512} - - root, err := imp.BuildDagFromReader(read, ds, spl, nil) - if err != nil { - t.Fatal(err) - } - - err = FetchGraph(context.TODO(), root, ds) - if err != nil { - t.Fatal(err) - } -} - -func TestFetchGraphOther(t *testing.T) { var dservs []DAGService - for _, bsi := range bstest.Mocks(t, 2) { + bsis := bstest.Mocks(t, 2) + for _, bsi := range bsis { dservs = append(dservs, NewDAGService(bsi)) } @@ -334,6 +317,20 @@ func TestFetchGraphOther(t *testing.T) { if err != nil { t.Fatal(err) } + + // create an offline dagstore and ensure all blocks were fetched + bs, err := bserv.New(bsis[1].Blockstore, offline.Exchange(bsis[1].Blockstore)) + if err != nil { + t.Fatal(err) + } + + offline_ds := NewDAGService(bs) + ks := key.NewKeySet() + + err = EnumerateChildren(context.Background(), offline_ds, root, ks) + if err != nil { + t.Fatal(err) + } } func TestEnumerateChildren(t *testing.T) {