From 92e8a7bcd56a38ae722b556fc5b5e0345e1906ac Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 14:31:40 -0800 Subject: [PATCH] updated datastore for proper query handling Queries now can be cancelled and the resources collected --- Godeps/Godeps.json | 2 +- .../jbenet/go-datastore/Godeps/Godeps.json | 4 + .../jbenet/go-datastore/basic_ds.go | 18 +- .../jbenet/go-datastore/datastore.go | 2 +- .../jbenet/go-datastore/elastigo/datastore.go | 2 +- .../github.com/jbenet/go-datastore/fs/fs.go | 13 +- .../jbenet/go-datastore/fs/fs_test.go | 5 +- .../go-datastore/keytransform/keytransform.go | 24 +- .../keytransform/keytransform_test.go | 17 +- .../jbenet/go-datastore/leveldb/datastore.go | 86 ++++--- .../jbenet/go-datastore/leveldb/ds_test.go | 41 +++- .../jbenet/go-datastore/lru/datastore.go | 2 +- .../go-datastore/namespace/namespace.go | 41 +++- .../go-datastore/namespace/namespace_test.go | 17 +- .../jbenet/go-datastore/panic/panic.go | 2 +- .../jbenet/go-datastore/query/filter_test.go | 14 +- .../jbenet/go-datastore/query/order_test.go | 6 +- .../jbenet/go-datastore/query/query.go | 221 +++++++++++++----- .../jbenet/go-datastore/query/query_impl.go | 110 ++++++--- .../jbenet/go-datastore/query/query_test.go | 109 +++++++++ .../jbenet/go-datastore/sync/sync.go | 2 +- blocks/blockstore/blockstore.go | 79 ++++++- blocks/blockstore/blockstore_test.go | 165 ++++++++++++- blocks/blockstore/write_cache.go | 10 +- blocks/blockstore/write_cache_test.go | 2 +- core/commands/refs.go | 2 +- util/datastore2/delayed.go | 2 +- 27 files changed, 791 insertions(+), 207 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 1091857f8..b078a8d6e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -110,7 +110,7 @@ }, { "ImportPath": "github.com/jbenet/go-datastore", - "Rev": "8a8988d1a4e174274bd4a9dd55c4837f46fdf323" + "Rev": "35738aceb35505bd3c77c2a618fb1947ca3f72da" }, { "ImportPath": "github.com/jbenet/go-fuse-version", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json index 91aa65d50..6dc670b20 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json @@ -18,6 +18,10 @@ "ImportPath": "github.com/hashicorp/golang-lru", "Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370" }, + { + "ImportPath": "github.com/jbenet/goprocess", + "Rev": "b4b4178efcf2404ce9db72438c9c49db2fb399d8" + }, { "ImportPath": "github.com/mattbaird/elastigo/api", "Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d" diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go index 9005e8433..9e4cd9e1f 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go @@ -3,7 +3,7 @@ package datastore import ( "log" - query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // Here are some basic datastore implementations. @@ -50,13 +50,13 @@ func (d *MapDatastore) Delete(key Key) (err error) { } // Query implements Datastore.Query -func (d *MapDatastore) Query(q query.Query) (*query.Results, error) { - re := make([]query.Entry, 0, len(d.values)) +func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) { + re := make([]dsq.Entry, 0, len(d.values)) for k, v := range d.values { - re = append(re, query.Entry{Key: k.String(), Value: v}) + re = append(re, dsq.Entry{Key: k.String(), Value: v}) } - r := query.ResultsWithEntries(q, re) - r = q.ApplyTo(r) + r := dsq.ResultsWithEntries(q, re) + r = dsq.NaiveQueryApply(q, r) return r, nil } @@ -91,8 +91,8 @@ func (d *NullDatastore) Delete(key Key) (err error) { } // Query implements Datastore.Query -func (d *NullDatastore) Query(q query.Query) (*query.Results, error) { - return query.ResultsWithEntries(q, nil), nil +func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) { + return dsq.ResultsWithEntries(q, nil), nil } // LogDatastore logs all accesses through the datastore. @@ -147,7 +147,7 @@ func (d *LogDatastore) Delete(key Key) (err error) { } // Query implements Datastore.Query -func (d *LogDatastore) Query(q query.Query) (*query.Results, error) { +func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) { log.Printf("%s: Query\n", d.Name) return d.child.Query(q) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go index d6cc0b97b..30e55ddb6 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go @@ -66,7 +66,7 @@ type Datastore interface { // result.Wait() // result.AllEntries() // - Query(q query.Query) (*query.Results, error) + Query(q query.Query) (query.Results, error) } // ThreadSafeDatastore is an interface that all threadsafe datastore should diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go index e05d2228a..f53da4d4d 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go @@ -113,7 +113,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) { return nil } -func (d *Datastore) Query(query.Query) (*query.Results, error) { +func (d *Datastore) Query(query.Query) (query.Results, error) { return nil, errors.New("Not yet implemented!") } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go index 07613040b..7fb869113 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go @@ -83,9 +83,9 @@ func (d *Datastore) Delete(key ds.Key) (err error) { } // Query implements Datastore.Query -func (d *Datastore) Query(q query.Query) (*query.Results, error) { +func (d *Datastore) Query(q query.Query) (query.Results, error) { - entries := make(chan query.Entry) + results := make(chan query.Result) walkFn := func(path string, info os.FileInfo, err error) error { // remove ds path prefix @@ -98,17 +98,18 @@ func (d *Datastore) Query(q query.Query) (*query.Results, error) { path = path[:len(path)-len(ObjectKeySuffix)] } key := ds.NewKey(path) - entries <- query.Entry{Key: key.String(), Value: query.NotFetched} + entry := query.Entry{Key: key.String(), Value: query.NotFetched} + results <- query.Result{Entry: entry} } return nil } go func() { filepath.Walk(d.path, walkFn) - close(entries) + close(results) }() - r := query.ResultsWithEntriesChan(q, entries) - r = q.ApplyTo(r) + r := query.ResultsWithChan(q, results) + r = query.NaiveQueryApply(q, r) return r, nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go index beef672aa..d0ec9f2db 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go @@ -67,7 +67,10 @@ func (ks *DSSuite) TestBasic(c *C) { "/foo/bar/bazb", "/foo/bar/baz/barb", } - all := r.AllEntries() + all, err := r.Rest() + if err != nil { + c.Fatal(err) + } c.Check(len(all), Equals, len(expect)) for _, k := range expect { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go index f23a0de43..be9734658 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go @@ -52,24 +52,24 @@ func (d *ktds) Delete(key ds.Key) (err error) { } // Query implements Query, inverting keys on the way back out. -func (d *ktds) Query(q dsq.Query) (*dsq.Results, error) { - - q2 := q - q2.Prefix = d.ConvertKey(ds.NewKey(q2.Prefix)).String() - r, err := d.child.Query(q2) +func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { + qr, err := d.child.Query(q) if err != nil { return nil, err } - ch := make(chan dsq.Entry) + ch := make(chan dsq.Result) go func() { - for e := range r.Entries() { - e.Key = d.InvertKey(ds.NewKey(e.Key)).String() - ch <- e + defer close(ch) + defer qr.Close() + + for r := range qr.Next() { + if r.Error == nil { + r.Entry.Key = d.InvertKey(ds.NewKey(r.Entry.Key)).String() + } + ch <- r } - close(ch) }() - r2 := dsq.ResultsWithEntriesChan(q, ch) - return r2, nil + return dsq.DerivedResults(qr, ch), nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go index 868c7b1d4..701f94582 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go @@ -62,13 +62,18 @@ func (ks *DSSuite) TestBasic(c *C) { c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true) } - listAr, errA := mpds.Query(dsq.Query{}) - listBr, errB := ktds.Query(dsq.Query{}) - c.Check(errA, Equals, nil) - c.Check(errB, Equals, nil) + run := func(d ds.Datastore, q dsq.Query) []ds.Key { + r, err := d.Query(q) + c.Check(err, Equals, nil) - listA := ds.EntryKeys(listAr.AllEntries()) - listB := ds.EntryKeys(listBr.AllEntries()) + e, err := r.Rest() + c.Check(err, Equals, nil) + + return ds.EntryKeys(e) + } + + listA := run(mpds, dsq.Query{}) + listB := run(ktds, dsq.Query{}) c.Check(len(listA), Equals, len(listB)) // sort them cause yeah. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go index 1d1f91bf5..f10fddfd3 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go @@ -3,12 +3,12 @@ package leveldb import ( "io" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) type Datastore interface { @@ -72,54 +72,80 @@ func (d *datastore) Delete(key ds.Key) (err error) { return err } -func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) { +func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { + + // we can use multiple iterators concurrently. see: + // https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator + // advance the iterator only if the reader reads + // + // run query in own sub-process tied to Results.Process(), so that + // it waits for us to finish AND so that clients can signal to us + // that resources should be reclaimed. + qrb := dsq.NewResultBuilder(q) + qrb.Process.Go(func(worker goprocess.Process) { + d.runQuery(worker, qrb) + }) + + // go wait on the worker (without signaling close) + go qrb.Process.CloseAfterChildren() + + // Now, apply remaining things (filters, order) + qr := qrb.Results() + for _, f := range q.Filters { + qr = dsq.NaiveFilter(qr, f) + } + for _, o := range q.Orders { + qr = dsq.NaiveOrder(qr, o) + } + return qr, nil +} + +func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) { + var rnge *util.Range - if q.Prefix != "" { - rnge = util.BytesPrefix([]byte(q.Prefix)) + if qrb.Query.Prefix != "" { + rnge = util.BytesPrefix([]byte(qrb.Query.Prefix)) } i := d.DB.NewIterator(rnge, nil) + defer i.Release() - // offset - if q.Offset > 0 { - for j := 0; j < q.Offset; j++ { + // advance iterator for offset + if qrb.Query.Offset > 0 { + for j := 0; j < qrb.Query.Offset; j++ { i.Next() } } - var es []dsq.Entry - for i.Next() { - - // limit - if q.Limit > 0 && len(es) >= q.Limit { + // iterate, and handle limit, too + for sent := 0; i.Next(); sent++ { + // end early if we hit the limit + if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit { break } k := ds.NewKey(string(i.Key())).String() e := dsq.Entry{Key: k} - if !q.KeysOnly { + if !qrb.Query.KeysOnly { buf := make([]byte, len(i.Value())) copy(buf, i.Value()) e.Value = buf } - es = append(es, e) - } - i.Release() - if err := i.Error(); err != nil { - return nil, err + select { + case qrb.Output <- dsq.Result{Entry: e}: // we sent it out + case <-worker.Closing(): // client told us to end early. + break + } } - // Now, apply remaining pieces. - q2 := q - q2.Offset = 0 // already applied - q2.Limit = 0 // already applied - // TODO: make this async with: - // qr := dsq.ResultsWithEntriesChan(q, ch) - qr := dsq.ResultsWithEntries(q, es) - qr = q2.ApplyTo(qr) - qr.Query = q // set it back - return qr, nil + if err := i.Error(); err != nil { + select { + case qrb.Output <- dsq.Result{Error: err}: // client read our error + case <-worker.Closing(): // client told us to end. + return + } + } } // LevelDB needs to be closed. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go index 828c04139..9a556e285 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go @@ -20,21 +20,28 @@ var testcases = map[string]string{ "/f": "f", } -func TestQuery(t *testing.T) { +// returns datastore, and a function to call on exit. +// (this garbage collects). So: +// +// d, close := newDS(t) +// defer close() +func newDS(t *testing.T) (Datastore, func()) { path, err := ioutil.TempDir("/tmp", "testing_leveldb_") if err != nil { t.Fatal(err) } - defer func() { - os.RemoveAll(path) - }() d, err := NewDatastore(path, nil) if err != nil { t.Fatal(err) } - defer d.Close() + return d, func() { + os.RemoveAll(path) + d.Close() + } +} +func addTestCases(t *testing.T, d Datastore, testcases map[string]string) { for k, v := range testcases { dsk := ds.NewKey(k) if err := d.Put(dsk, []byte(v)); err != nil { @@ -54,6 +61,13 @@ func TestQuery(t *testing.T) { } } +} + +func TestQuery(t *testing.T) { + d, close := newDS(t) + defer close() + addTestCases(t, d, testcases) + rs, err := d.Query(dsq.Query{Prefix: "/a/"}) if err != nil { t.Fatal(err) @@ -65,7 +79,7 @@ func TestQuery(t *testing.T) { "/a/b/d", "/a/c", "/a/d", - }, rs.AllEntries()) + }, rs) // test offset and limit @@ -77,11 +91,22 @@ func TestQuery(t *testing.T) { expectMatches(t, []string{ "/a/b/d", "/a/c", - }, rs.AllEntries()) + }, rs) } -func expectMatches(t *testing.T, expect []string, actual []dsq.Entry) { +func TestQueryRespectsProcess(t *testing.T) { + d, close := newDS(t) + defer close() + addTestCases(t, d, testcases) +} + +func expectMatches(t *testing.T, expect []string, actualR dsq.Results) { + actual, err := actualR.Rest() + if err != nil { + t.Error(err) + } + if len(actual) != len(expect) { t.Error("not enough", expect, actual) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go index 72fa225e1..074bc726f 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go @@ -51,6 +51,6 @@ func (d *Datastore) Delete(key ds.Key) (err error) { } // KeyList returns a list of keys in the datastore -func (d *Datastore) Query(q dsq.Query) (*dsq.Results, error) { +func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { return nil, errors.New("KeyList not implemented.") } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go index 4ad5f3073..9b9cf01bb 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go @@ -6,6 +6,7 @@ import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ktds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // PrefixTransform constructs a KeyTransform with a pair of functions that @@ -40,5 +41,43 @@ func Wrap(child ds.Datastore, prefix ds.Key) ktds.Datastore { panic("child (ds.Datastore) is nil") } - return ktds.Wrap(child, PrefixTransform(prefix)) + d := ktds.Wrap(child, PrefixTransform(prefix)) + return &datastore{Datastore: d, raw: child, prefix: prefix} +} + +type datastore struct { + prefix ds.Key + raw ds.Datastore + ktds.Datastore +} + +// Query implements Query, inverting keys on the way back out. +func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { + qr, err := d.raw.Query(q) + if err != nil { + return nil, err + } + + ch := make(chan dsq.Result) + go func() { + defer close(ch) + defer qr.Close() + + for r := range qr.Next() { + if r.Error != nil { + ch <- r + continue + } + + k := ds.NewKey(r.Entry.Key) + if !d.prefix.IsAncestorOf(k) { + continue + } + + r.Entry.Key = d.Datastore.InvertKey(k).String() + ch <- r + } + }() + + return dsq.DerivedResults(qr, ch), nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go index 6ef2edbfc..eb9908927 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go @@ -48,13 +48,18 @@ func (ks *DSSuite) TestBasic(c *C) { c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true) } - listAr, errA := mpds.Query(dsq.Query{}) - listBr, errB := nsds.Query(dsq.Query{}) - c.Check(errA, Equals, nil) - c.Check(errB, Equals, nil) + run := func(d ds.Datastore, q dsq.Query) []ds.Key { + r, err := d.Query(q) + c.Check(err, Equals, nil) - listA := ds.EntryKeys(listAr.AllEntries()) - listB := ds.EntryKeys(listBr.AllEntries()) + e, err := r.Rest() + c.Check(err, Equals, nil) + + return ds.EntryKeys(e) + } + + listA := run(mpds, dsq.Query{}) + listB := run(nsds, dsq.Query{}) c.Check(len(listA), Equals, len(listB)) // sort them cause yeah. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go index 70556355f..be10e5976 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go @@ -58,7 +58,7 @@ func (d *datastore) Delete(key ds.Key) error { return nil } -func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) { +func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { r, err := d.child.Query(q) if err != nil { fmt.Fprintf(os.Stdout, "panic datastore: %s", err) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go index 5543ad786..b4c7d8ac4 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go @@ -5,15 +5,6 @@ import ( "testing" ) -var sampleKeys = []string{ - "/ab/c", - "/ab/cd", - "/a", - "/abce", - "/abcf", - "/ab", -} - type filterTestCase struct { filter Filter keys []string @@ -28,7 +19,10 @@ func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) { res := ResultsWithEntries(Query{}, e) res = NaiveFilter(res, f) - actualE := res.AllEntries() + actualE, err := res.Rest() + if err != nil { + t.Fatal(err) + } actual := make([]string, len(actualE)) for i, e := range actualE { actual[i] = e.Key diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go index 8d6c0b81b..648304172 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go @@ -19,7 +19,11 @@ func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) { res := ResultsWithEntries(Query{}, e) res = NaiveOrder(res, f) - actualE := res.AllEntries() + actualE, err := res.Rest() + if err != nil { + t.Fatal(err) + } + actual := make([]string, len(actualE)) for i, e := range actualE { actual[i] = e.Key diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go index 89787661e..434c5592a 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go @@ -1,5 +1,9 @@ package query +import ( + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + /* Query represents storage for any key-value pair. @@ -64,7 +68,7 @@ type Query struct { // of an Entry has been fetched or not. This is needed because // datastore implementations get to decide whether Query returns values // or only keys. nil is not a good signal, as real values may be nil. -var NotFetched = struct{}{} +const NotFetched int = iota // Entry is a query result entry. type Entry struct { @@ -72,74 +76,175 @@ type Entry struct { Value interface{} } -// Results is a set of Query results -type Results struct { - Query Query // the query these Results correspond to +// Result is a special entry that includes an error, so that the client +// may be warned about internal errors. +type Result struct { + Entry - done chan struct{} - res chan Entry - all []Entry + Error error } -// ResultsWithEntriesChan returns a Results object from a -// channel of ResultEntries. It's merely an encapsulation -// that provides for AllEntries() functionality. -func ResultsWithEntriesChan(q Query, res <-chan Entry) *Results { - r := &Results{ - Query: q, - done: make(chan struct{}), - res: make(chan Entry), - all: []Entry{}, - } +// Results is a set of Query results. This is the interface for clients. +// Example: +// +// qr, _ := myds.Query(q) +// for r := range qr.Next() { +// if r.Error != nil { +// // handle. +// break +// } +// +// fmt.Println(r.Entry.Key, r.Entry.Value) +// } +// +// or, wait on all results at once: +// +// qr, _ := myds.Query(q) +// es, _ := qr.Rest() +// for _, e := range es { +// fmt.Println(e.Key, e.Value) +// } +// +type Results interface { + Query() Query // the query these Results correspond to + Next() <-chan Result // returns a channel to wait for the next result + Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once. + Close() error // client may call Close to signal early exit - // go consume all the results and add them to the results. - go func() { - for e := range res { - r.all = append(r.all, e) - r.res <- e - } - close(r.res) - close(r.done) - }() - return r + // Process returns a goprocess.Process associated with these results. + // most users will not need this function (Close is all they want), + // but it's here in case you want to connect the results to other + // goprocess-friendly things. + Process() goprocess.Process } -// ResultsWithEntries returns a Results object from a -// channel of ResultEntries. It's merely an encapsulation -// that provides for AllEntries() functionality. -func ResultsWithEntries(q Query, res []Entry) *Results { - r := &Results{ - Query: q, - done: make(chan struct{}), - res: make(chan Entry), - all: res, - } - - // go add all the results - go func() { - for _, e := range res { - r.res <- e - } - close(r.res) - close(r.done) - }() - return r +// results implements Results +type results struct { + query Query + proc goprocess.Process + res <-chan Result } -// Entries() returns results through a channel. -// Results may arrive at any time. -// The channel may or may not be buffered. -// The channel may or may not rate limit the query processing. -func (r *Results) Entries() <-chan Entry { +func (r *results) Next() <-chan Result { return r.res } -// AllEntries returns all the entries in Results. -// It blocks until all the results have come in. -func (r *Results) AllEntries() []Entry { +func (r *results) Rest() ([]Entry, error) { + var es []Entry for e := range r.res { - _ = e + if e.Error != nil { + return es, e.Error + } + es = append(es, e.Entry) + } + <-r.proc.Closed() // wait till the processing finishes. + return es, nil +} + +func (r *results) Process() goprocess.Process { + return r.proc +} + +func (r *results) Close() error { + return r.proc.Close() +} + +func (r *results) Query() Query { + return r.query +} + +// ResultBuilder is what implementors use to construct results +// Implementors of datastores and their clients must respect the +// Process of the Request: +// +// * clients must call r.Process().Close() on an early exit, so +// implementations can reclaim resources. +// * if the Entries are read to completion (channel closed), Process +// should be closed automatically. +// * datastores must respect <-Process.Closing(), which intermediates +// an early close signal from the client. +// +type ResultBuilder struct { + Query Query + Process goprocess.Process + Output chan Result +} + +// Results returns a Results to to this builder. +func (rb *ResultBuilder) Results() Results { + return &results{ + query: rb.Query, + proc: rb.Process, + res: rb.Output, + } +} + +func NewResultBuilder(q Query) *ResultBuilder { + b := &ResultBuilder{ + Query: q, + Output: make(chan Result), + } + b.Process = goprocess.WithTeardown(func() error { + close(b.Output) + return nil + }) + return b +} + +// ResultsWithChan returns a Results object from a channel +// of Result entries. Respects its own Close() +func ResultsWithChan(q Query, res <-chan Result) Results { + b := NewResultBuilder(q) + + // go consume all the entries and add them to the results. + b.Process.Go(func(worker goprocess.Process) { + for { + select { + case <-worker.Closing(): // client told us to close early + return + case e, more := <-res: + if !more { + return + } + + select { + case b.Output <- e: + case <-worker.Closing(): // client told us to close early + return + } + } + } + return + }) + + go b.Process.CloseAfterChildren() + return b.Results() +} + +// ResultsWithEntries returns a Results object from a list of entries +func ResultsWithEntries(q Query, res []Entry) Results { + b := NewResultBuilder(q) + + // go consume all the entries and add them to the results. + b.Process.Go(func(worker goprocess.Process) { + for _, e := range res { + select { + case b.Output <- Result{Entry: e}: + case <-worker.Closing(): // client told us to close early + return + } + } + return + }) + + go b.Process.CloseAfterChildren() + return b.Results() +} + +func ResultsReplaceQuery(r Results, q Query) Results { + return &results{ + query: q, + proc: r.Process(), + res: r.Next(), } - <-r.done - return r.all } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go index ee1a4cdba..9e584e7b2 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go @@ -1,63 +1,105 @@ package query -// NaiveFilter applies a filter to the results -func NaiveFilter(qr *Results, filter Filter) *Results { - ch := make(chan Entry) +func DerivedResults(qr Results, ch <-chan Result) Results { + return &results{ + query: qr.Query(), + proc: qr.Process(), + res: ch, + } +} + +// NaiveFilter applies a filter to the results. +func NaiveFilter(qr Results, filter Filter) Results { + ch := make(chan Result) go func() { defer close(ch) + defer qr.Close() - for e := range qr.Entries() { - if filter.Filter(e) { + for e := range qr.Next() { + if e.Error != nil || filter.Filter(e.Entry) { ch <- e } } }() - return ResultsWithEntriesChan(qr.Query, ch) + + return DerivedResults(qr, ch) } // NaiveLimit truncates the results to a given int limit -func NaiveLimit(qr *Results, limit int) *Results { - ch := make(chan Entry) +func NaiveLimit(qr Results, limit int) Results { + ch := make(chan Result) go func() { defer close(ch) + defer qr.Close() - for l := 0; l < limit; l++ { - e, more := <-qr.Entries() - if !more { - return + l := 0 + for e := range qr.Next() { + if e.Error != nil { + ch <- e + continue + } + ch <- e + l++ + if limit > 0 && l >= limit { + break + } + } + }() + + return DerivedResults(qr, ch) +} + +// NaiveOffset skips a given number of results +func NaiveOffset(qr Results, offset int) Results { + ch := make(chan Result) + go func() { + defer close(ch) + defer qr.Close() + + sent := 0 + for e := range qr.Next() { + if e.Error != nil { + ch <- e + } + + if sent < offset { + sent++ + continue } ch <- e } }() - return ResultsWithEntriesChan(qr.Query, ch) -} -// NaiveOffset skips a given number of results -func NaiveOffset(qr *Results, offset int) *Results { - ch := make(chan Entry) - go func() { - defer close(ch) - - for l := 0; l < offset; l++ { - <-qr.Entries() // discard - } - - for e := range qr.Entries() { - ch <- e - } - }() - return ResultsWithEntriesChan(qr.Query, ch) + return DerivedResults(qr, ch) } // NaiveOrder reorders results according to given Order. // WARNING: this is the only non-stream friendly operation! -func NaiveOrder(qr *Results, o Order) *Results { - e := qr.AllEntries() - o.Sort(e) - return ResultsWithEntries(qr.Query, e) +func NaiveOrder(qr Results, o Order) Results { + ch := make(chan Result) + var entries []Entry + go func() { + defer close(ch) + defer qr.Close() + + for e := range qr.Next() { + if e.Error != nil { + ch <- e + } + + entries = append(entries, e.Entry) + } + + o.Sort(entries) + for _, e := range entries { + ch <- Result{Entry: e} + } + }() + + return DerivedResults(qr, ch) } -func (q Query) ApplyTo(qr *Results) *Results { +func NaiveQueryApply(q Query, qr Results) Results { if q.Prefix != "" { qr = NaiveFilter(qr, FilterKeyPrefix{q.Prefix}) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_test.go new file mode 100644 index 000000000..e00edf095 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_test.go @@ -0,0 +1,109 @@ +package query + +import ( + "strings" + "testing" +) + +var sampleKeys = []string{ + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", +} + +type testCase struct { + keys []string + expect []string +} + +func testResults(t *testing.T, res Results, expect []string) { + actualE, err := res.Rest() + if err != nil { + t.Fatal(err) + } + + actual := make([]string, len(actualE)) + for i, e := range actualE { + actual[i] = e.Key + } + + if len(actual) != len(expect) { + t.Error("expect != actual.", expect, actual) + } + + if strings.Join(actual, "") != strings.Join(expect, "") { + t.Error("expect != actual.", expect, actual) + } +} + +func TestLimit(t *testing.T) { + testKeyLimit := func(t *testing.T, limit int, keys []string, expect []string) { + e := make([]Entry, len(keys)) + for i, k := range keys { + e[i] = Entry{Key: k} + } + + res := ResultsWithEntries(Query{}, e) + res = NaiveLimit(res, limit) + testResults(t, res, expect) + } + + testKeyLimit(t, 0, sampleKeys, []string{ // none + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", + }) + + testKeyLimit(t, 10, sampleKeys, []string{ // large + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", + }) + + testKeyLimit(t, 2, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + }) +} + +func TestOffset(t *testing.T) { + + testOffset := func(t *testing.T, offset int, keys []string, expect []string) { + e := make([]Entry, len(keys)) + for i, k := range keys { + e[i] = Entry{Key: k} + } + + res := ResultsWithEntries(Query{}, e) + res = NaiveOffset(res, offset) + testResults(t, res, expect) + } + + testOffset(t, 0, sampleKeys, []string{ // none + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", + }) + + testOffset(t, 10, sampleKeys, []string{ // large + }) + + testOffset(t, 2, sampleKeys, []string{ + "/a", + "/abce", + "/abcf", + "/ab", + }) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go index 789d4b36d..ec2bfb504 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go @@ -58,7 +58,7 @@ func (d *MutexDatastore) Delete(key ds.Key) (err error) { } // KeyList implements Datastore.KeyList -func (d *MutexDatastore) Query(q dsq.Query) (*dsq.Results, error) { +func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { d.RLock() defer d.RUnlock() return d.child.Query(q) diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 6132d155e..63d5df54b 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -5,6 +5,7 @@ package blockstore import ( "errors" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" dsns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace" dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" @@ -12,8 +13,11 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) +var log = eventlog.Logger("blockstore") + // BlockPrefix namespaces blockstore datastores var BlockPrefix = ds.NewKey("blocks") @@ -27,7 +31,9 @@ type Blockstore interface { Has(u.Key) (bool, error) Get(u.Key) (*blocks.Block, error) Put(*blocks.Block) error - AllKeys(offset int, limit int) ([]u.Key, error) + + AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) + AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) } func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore { @@ -80,10 +86,29 @@ func (s *blockstore) DeleteBlock(k u.Key) error { // AllKeys runs a query for keys from the blockstore. // this is very simplistic, in the future, take dsq.Query as a param? // if offset and limit are 0, they are ignored. -func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) { - var keys []u.Key +// +// AllKeys respects context +func (bs *blockstore) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) { + + ch, err := bs.AllKeysChan(ctx, offset, limit) + if err != nil { + return nil, err + } + + var keys []u.Key + for k := range ch { + keys = append(keys, k) + } + return keys, nil +} + +// AllKeys runs a query for keys from the blockstore. +// this is very simplistic, in the future, take dsq.Query as a param? +// if offset and limit are 0, they are ignored. +// +// AllKeys respects context +func (bs *blockstore) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) { - // TODO make async inside ds/leveldb.Query // KeysOnly, because that would be _a lot_ of data. q := dsq.Query{KeysOnly: true, Offset: offset, Limit: limit} res, err := bs.datastore.Query(q) @@ -91,10 +116,46 @@ func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) { return nil, err } - for e := range res.Entries() { - // need to convert to u.Key using u.KeyFromDsKey. - k := u.KeyFromDsKey(ds.NewKey(e.Key)) - keys = append(keys, k) + // this function is here to compartmentalize + get := func() (k u.Key, ok bool) { + select { + case <-ctx.Done(): + return k, false + case e, more := <-res.Next(): + if !more { + return k, false + } + if e.Error != nil { + log.Debug("blockstore.AllKeysChan got err:", e.Error) + return k, false + } + + // need to convert to u.Key using u.KeyFromDsKey. + k = u.KeyFromDsKey(ds.NewKey(e.Key)) + return k, true + } } - return keys, nil + + output := make(chan u.Key) + go func() { + defer func() { + res.Process().Close() // ensure exit (signals early exit, too) + close(output) + }() + + for { + k, ok := get() + if !ok { + return + } + + select { + case <-ctx.Done(): + return + case output <- k: + } + } + }() + + return output, nil } diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index a80ce8337..de74d6d83 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -5,8 +5,11 @@ import ( "fmt" "testing" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" ) @@ -42,9 +45,11 @@ func TestPutThenGetBlock(t *testing.T) { } } -func TestAllKeys(t *testing.T) { - bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) - N := 100 +func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []u.Key) { + if d == nil { + d = ds.NewMapDatastore() + } + bs := NewBlockstore(ds_sync.MutexWrap(d)) keys := make([]u.Key, N) for i := 0; i < N; i++ { @@ -55,8 +60,14 @@ func TestAllKeys(t *testing.T) { } keys[i] = block.Key() } + return bs, keys +} - keys2, err := bs.AllKeys(0, 0) +func TestAllKeysSimple(t *testing.T) { + bs, keys := newBlockStoreWithKeys(t, nil, 100) + + ctx := context.Background() + keys2, err := bs.AllKeys(ctx, 0, 0) if err != nil { t.Fatal(err) } @@ -65,8 +76,14 @@ func TestAllKeys(t *testing.T) { // } expectMatches(t, keys, keys2) +} - keys3, err := bs.AllKeys(N/3, N/3) +func TestAllKeysOffsetAndLimit(t *testing.T) { + N := 30 + bs, _ := newBlockStoreWithKeys(t, nil, N) + + ctx := context.Background() + keys3, err := bs.AllKeys(ctx, N/3, N/3) if err != nil { t.Fatal(err) } @@ -76,6 +93,114 @@ func TestAllKeys(t *testing.T) { if len(keys3) != N/3 { t.Errorf("keys3 should be: %d != %d", N/3, len(keys3)) } +} + +func TestAllKeysRespectsContext(t *testing.T) { + N := 100 + + d := &queryTestDS{ds: ds.NewMapDatastore()} + bs, _ := newBlockStoreWithKeys(t, d, N) + + started := make(chan struct{}, 1) + done := make(chan struct{}, 1) + errors := make(chan error, 100) + + getKeys := func(ctx context.Context) { + started <- struct{}{} + _, err := bs.AllKeys(ctx, 0, 0) // once without cancelling + if err != nil { + errors <- err + } + done <- struct{}{} + errors <- nil // a nil one to signal break + } + + // Once without context, to make sure it all works + { + var results dsq.Results + resultChan := make(chan dsq.Result) + d.SetFunc(func(q dsq.Query) (dsq.Results, error) { + results = dsq.ResultsWithChan(q, resultChan) + return results, nil + }) + + go getKeys(context.Background()) + + // make sure it's waiting. + <-started + select { + case <-done: + t.Fatal("sync is wrong") + case <-results.Process().Closing(): + t.Fatal("should not be closing") + case <-results.Process().Closed(): + t.Fatal("should not be closed") + default: + } + + e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()} + resultChan <- dsq.Result{Entry: e} // let it go. + close(resultChan) + <-done // should be done now. + <-results.Process().Closed() // should be closed now + + // print any errors + for err := range errors { + if err == nil { + break + } + t.Error(err) + } + } + + // Once with + { + var results dsq.Results + resultChan := make(chan dsq.Result) + d.SetFunc(func(q dsq.Query) (dsq.Results, error) { + results = dsq.ResultsWithChan(q, resultChan) + return results, nil + }) + + ctx, cancel := context.WithCancel(context.Background()) + go getKeys(ctx) + + // make sure it's waiting. + <-started + select { + case <-done: + t.Fatal("sync is wrong") + case <-results.Process().Closing(): + t.Fatal("should not be closing") + case <-results.Process().Closed(): + t.Fatal("should not be closed") + default: + } + + cancel() // let it go. + + select { + case <-done: + t.Fatal("sync is wrong") + case <-results.Process().Closed(): + t.Fatal("should not be closed") // should not be closed yet. + case <-results.Process().Closing(): + // should be closing now! + t.Log("closing correctly at this point.") + } + + close(resultChan) + <-done // should be done now. + <-results.Process().Closed() // should be closed now + + // print any errors + for err := range errors { + if err == nil { + break + } + t.Error(err) + } + } } @@ -111,3 +236,33 @@ func expectMatches(t *testing.T, expect, actual []u.Key) { } } } + +type queryTestDS struct { + cb func(q dsq.Query) (dsq.Results, error) + ds ds.Datastore +} + +func (c *queryTestDS) SetFunc(f func(dsq.Query) (dsq.Results, error)) { c.cb = f } + +func (c *queryTestDS) Put(key ds.Key, value interface{}) (err error) { + return c.ds.Put(key, value) +} + +func (c *queryTestDS) Get(key ds.Key) (value interface{}, err error) { + return c.ds.Get(key) +} + +func (c *queryTestDS) Has(key ds.Key) (exists bool, err error) { + return c.ds.Has(key) +} + +func (c *queryTestDS) Delete(key ds.Key) (err error) { + return c.ds.Delete(key) +} + +func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) { + if c.cb != nil { + return c.cb(q) + } + return c.ds.Query(q) +} diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go index da9a0a01d..377ce629d 100644 --- a/blocks/blockstore/write_cache.go +++ b/blocks/blockstore/write_cache.go @@ -1,7 +1,9 @@ package blockstore import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru" + "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" ) @@ -44,6 +46,10 @@ func (w *writecache) Put(b *blocks.Block) error { return w.blockstore.Put(b) } -func (w *writecache) AllKeys(offset int, limit int) ([]u.Key, error) { - return w.blockstore.AllKeys(offset, limit) +func (w *writecache) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) { + return w.blockstore.AllKeys(ctx, offset, limit) +} + +func (w *writecache) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) { + return w.blockstore.AllKeysChan(ctx, offset, limit) } diff --git a/blocks/blockstore/write_cache_test.go b/blocks/blockstore/write_cache_test.go index 1e072e95e..b20188e29 100644 --- a/blocks/blockstore/write_cache_test.go +++ b/blocks/blockstore/write_cache_test.go @@ -84,7 +84,7 @@ func (c *callbackDatastore) Delete(key ds.Key) (err error) { return c.ds.Delete(key) } -func (c *callbackDatastore) Query(q dsq.Query) (*dsq.Results, error) { +func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) { c.f() return c.ds.Query(q) } diff --git a/core/commands/refs.go b/core/commands/refs.go index 4390e108c..0e26be2c2 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -128,7 +128,7 @@ Displays the hashes of all local objects. } // todo: make async - allKeys, err := n.Blockstore.AllKeys(0, 0) + allKeys, err := n.Blockstore.AllKeys(context.TODO(), 0, 0) if err != nil { return nil, err } diff --git a/util/datastore2/delayed.go b/util/datastore2/delayed.go index ccb3cc29c..ca2541913 100644 --- a/util/datastore2/delayed.go +++ b/util/datastore2/delayed.go @@ -36,7 +36,7 @@ func (dds *delayed) Delete(key ds.Key) (err error) { return dds.ds.Delete(key) } -func (dds *delayed) Query(q dsq.Query) (*dsq.Results, error) { +func (dds *delayed) Query(q dsq.Query) (dsq.Results, error) { dds.delay.Wait() return dds.ds.Query(q) }