From f9ca67ef04296d0081fa03b767261f5cdadbd2e5 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 9 Jan 2015 16:37:20 -0800 Subject: [PATCH 01/13] updated datastore (Query) --- Godeps/Godeps.json | 2 +- .../jbenet/go-datastore/.travis.yml | 11 ++ .../github.com/jbenet/go-datastore/Makefile | 3 + .../jbenet/go-datastore/basic_ds.go | 34 ++-- .../jbenet/go-datastore/datastore.go | 17 +- .../jbenet/go-datastore/elastigo/datastore.go | 3 +- .../github.com/jbenet/go-datastore/fs/fs.go | 25 ++- .../jbenet/go-datastore/fs/fs_test.go | 30 +++- .../src/github.com/jbenet/go-datastore/key.go | 11 ++ .../go-datastore/keytransform/keytransform.go | 28 +++- .../keytransform/keytransform_test.go | 14 +- .../jbenet/go-datastore/leveldb/datastore.go | 58 ++++++- .../jbenet/go-datastore/leveldb/ds_test.go | 99 ++++++++++++ .../jbenet/go-datastore/lru/datastore.go | 4 +- .../go-datastore/namespace/namespace_test.go | 11 +- .../jbenet/go-datastore/panic/panic.go | 9 +- .../jbenet/go-datastore/query/filter.go | 86 +++++++++++ .../jbenet/go-datastore/query/filter_test.go | 75 +++++++++ .../jbenet/go-datastore/query/order.go | 66 ++++++++ .../jbenet/go-datastore/query/order_test.go | 55 +++++++ .../jbenet/go-datastore/query/query.go | 145 ++++++++++++++++++ .../jbenet/go-datastore/query/query_impl.go | 85 ++++++++++ .../jbenet/go-datastore/sync/sync.go | 5 +- blocks/blockstore/write_cache_test.go | 5 +- util/datastore2/delayed.go | 22 +-- 25 files changed, 837 insertions(+), 66 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/.travis.yml create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 9ff92b7fc..8ca77f4d0 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -110,7 +110,7 @@ }, { "ImportPath": "github.com/jbenet/go-datastore", - "Rev": "6a1c83bda2a71a9bdc936749fdb507df958ed949" + "Rev": "8a8988d1a4e174274bd4a9dd55c4837f46fdf323" }, { "ImportPath": "github.com/jbenet/go-fuse-version", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/go-datastore/.travis.yml new file mode 100644 index 000000000..9b1d623af --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/.travis.yml @@ -0,0 +1,11 @@ +language: go + +go: + - 1.3 + - release + - tip + +script: + - make test + +env: TEST_NO_FUSE=1 TEST_VERBOSE=1 diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/Makefile b/Godeps/_workspace/src/github.com/jbenet/go-datastore/Makefile index 1a377522b..c03d51b20 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/Makefile +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/Makefile @@ -1,6 +1,9 @@ build: go build +test: + go test ./... + # saves/vendors third-party dependencies to Godeps/_workspace # -r flag rewrites import paths to use the vendored path # ./... performs operation on all packages in tree diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go index 782b5de46..9005e8433 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go @@ -1,6 +1,10 @@ package datastore -import "log" +import ( + "log" + + query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" +) // Here are some basic datastore implementations. @@ -45,13 +49,15 @@ func (d *MapDatastore) Delete(key Key) (err error) { return nil } -// KeyList implements Datastore.KeyList -func (d *MapDatastore) KeyList() ([]Key, error) { - var keys []Key - for k := range d.values { - keys = append(keys, k) +// Query implements Datastore.Query +func (d *MapDatastore) Query(q query.Query) (*query.Results, error) { + re := make([]query.Entry, 0, len(d.values)) + for k, v := range d.values { + re = append(re, query.Entry{Key: k.String(), Value: v}) } - return keys, nil + r := query.ResultsWithEntries(q, re) + r = q.ApplyTo(r) + return r, nil } // NullDatastore stores nothing, but conforms to the API. @@ -84,9 +90,9 @@ func (d *NullDatastore) Delete(key Key) (err error) { return nil } -// KeyList implements Datastore.KeyList -func (d *NullDatastore) KeyList() ([]Key, error) { - return nil, nil +// Query implements Datastore.Query +func (d *NullDatastore) Query(q query.Query) (*query.Results, error) { + return query.ResultsWithEntries(q, nil), nil } // LogDatastore logs all accesses through the datastore. @@ -140,8 +146,8 @@ func (d *LogDatastore) Delete(key Key) (err error) { return d.child.Delete(key) } -// KeyList implements Datastore.KeyList -func (d *LogDatastore) KeyList() ([]Key, error) { - log.Printf("%s: Get KeyList\n", d.Name) - return d.child.KeyList() +// Query implements Datastore.Query +func (d *LogDatastore) Query(q query.Query) (*query.Results, error) { + log.Printf("%s: Query\n", d.Name) + return d.child.Query(q) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go index f3260cd7d..d6cc0b97b 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go @@ -2,6 +2,8 @@ package datastore import ( "errors" + + query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) /* @@ -52,8 +54,19 @@ type Datastore interface { // Delete removes the value for given `key`. Delete(key Key) (err error) - // KeyList returns a list of keys in the datastore - KeyList() ([]Key, error) + // Query searches the datastore and returns a query result. This function + // may return before the query actually runs. To wait for the query: + // + // result, _ := ds.Query(q) + // + // // use the channel interface; result may come in at different times + // for entry := range result.Entries() { ... } + // + // // or wait for the query to be completely done + // result.Wait() + // result.AllEntries() + // + Query(q query.Query) (*query.Results, error) } // ThreadSafeDatastore is an interface that all threadsafe datastore should diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go index 32a7bdda3..e05d2228a 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go @@ -8,6 +8,7 @@ import ( "github.com/codahale/blake2" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" "github.com/mattbaird/elastigo/api" "github.com/mattbaird/elastigo/core" ) @@ -112,7 +113,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) { return nil } -func (d *Datastore) KeyList() ([]ds.Key, error) { +func (d *Datastore) Query(query.Query) (*query.Results, error) { return nil, errors.New("Not yet implemented!") } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go index 2fa8fcd36..07613040b 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go @@ -8,8 +8,11 @@ import ( "strings" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) +var ObjectKeySuffix = ".dsobject" + // Datastore uses a standard Go map for internal storage. type Datastore struct { path string @@ -26,7 +29,7 @@ func NewDatastore(path string) (ds.Datastore, error) { // KeyFilename returns the filename associated with `key` func (d *Datastore) KeyFilename(key ds.Key) string { - return filepath.Join(d.path, key.String(), ".dsobject") + return filepath.Join(d.path, key.String(), ObjectKeySuffix) } // Put stores the given value. @@ -79,10 +82,10 @@ func (d *Datastore) Delete(key ds.Key) (err error) { return os.Remove(fn) } -// KeyList returns a list of all keys in the datastore -func (d *Datastore) KeyList() ([]ds.Key, error) { +// Query implements Datastore.Query +func (d *Datastore) Query(q query.Query) (*query.Results, error) { - keys := []ds.Key{} + entries := make(chan query.Entry) walkFn := func(path string, info os.FileInfo, err error) error { // remove ds path prefix @@ -91,14 +94,22 @@ func (d *Datastore) KeyList() ([]ds.Key, error) { } if !info.IsDir() { + if strings.HasSuffix(path, ObjectKeySuffix) { + path = path[:len(path)-len(ObjectKeySuffix)] + } key := ds.NewKey(path) - keys = append(keys, key) + entries <- query.Entry{Key: key.String(), Value: query.NotFetched} } return nil } - filepath.Walk(d.path, walkFn) - return keys, nil + go func() { + filepath.Walk(d.path, walkFn) + close(entries) + }() + r := query.ResultsWithEntriesChan(q, entries) + r = q.ApplyTo(r) + return r, nil } // isDir returns whether given path is a directory diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go index 8e5d94ac4..beef672aa 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go @@ -4,9 +4,11 @@ import ( "bytes" "testing" + . "launchpad.net/gocheck" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs" - . "launchpad.net/gocheck" + query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // Hook up gocheck into the "go test" runner. @@ -54,6 +56,32 @@ func (ks *DSSuite) TestBasic(c *C) { c.Check(err, Equals, nil) c.Check(bytes.Equal(v.([]byte), []byte(k.String())), Equals, true) } + + r, err := ks.ds.Query(query.Query{Prefix: "/foo/bar/"}) + if err != nil { + c.Check(err, Equals, nil) + } + + expect := []string{ + "/foo/bar/baz", + "/foo/bar/bazb", + "/foo/bar/baz/barb", + } + all := r.AllEntries() + c.Check(len(all), Equals, len(expect)) + + for _, k := range expect { + found := false + for _, e := range all { + if e.Key == k { + found = true + } + } + + if !found { + c.Error("did not find expected key: ", k) + } + } } func strsToKeys(strs []string) []ds.Key { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/key.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/key.go index cfe37b625..fd06a1ee3 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/key.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/key.go @@ -5,6 +5,8 @@ import ( "strings" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go-uuid/uuid" + + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) /* @@ -239,3 +241,12 @@ type KeySlice []Key func (p KeySlice) Len() int { return len(p) } func (p KeySlice) Less(i, j int) bool { return p[i].Less(p[j]) } func (p KeySlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// EntryKeys +func EntryKeys(e []dsq.Entry) []Key { + ks := make([]Key, len(e)) + for i, e := range e { + ks[i] = NewKey(e.Key) + } + return ks +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go index 6566905d9..f23a0de43 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go @@ -1,6 +1,9 @@ package keytransform -import ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" +import ( + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" +) type Pair struct { Convert KeyMapping @@ -48,16 +51,25 @@ func (d *ktds) Delete(key ds.Key) (err error) { return d.child.Delete(d.ConvertKey(key)) } -// KeyList returns a list of all keys in the datastore, transforming keys out. -func (d *ktds) KeyList() ([]ds.Key, error) { +// Query implements Query, inverting keys on the way back out. +func (d *ktds) Query(q dsq.Query) (*dsq.Results, error) { - keys, err := d.child.KeyList() + q2 := q + q2.Prefix = d.ConvertKey(ds.NewKey(q2.Prefix)).String() + r, err := d.child.Query(q2) if err != nil { return nil, err } - for i, k := range keys { - keys[i] = d.InvertKey(k) - } - return keys, nil + ch := make(chan dsq.Entry) + go func() { + for e := range r.Entries() { + e.Key = d.InvertKey(ds.NewKey(e.Key)).String() + ch <- e + } + close(ch) + }() + + r2 := dsq.ResultsWithEntriesChan(q, ch) + return r2, nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go index ec1bfac30..868c7b1d4 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go @@ -5,9 +5,11 @@ import ( "sort" "testing" + . "launchpad.net/gocheck" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" kt "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform" - . "launchpad.net/gocheck" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // Hook up gocheck into the "go test" runner. @@ -60,10 +62,13 @@ func (ks *DSSuite) TestBasic(c *C) { c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true) } - listA, errA := mpds.KeyList() - listB, errB := ktds.KeyList() + listAr, errA := mpds.Query(dsq.Query{}) + listBr, errB := ktds.Query(dsq.Query{}) c.Check(errA, Equals, nil) c.Check(errB, Equals, nil) + + listA := ds.EntryKeys(listAr.AllEntries()) + listB := ds.EntryKeys(listBr.AllEntries()) c.Check(len(listA), Equals, len(listB)) // sort them cause yeah. @@ -75,6 +80,9 @@ func (ks *DSSuite) TestBasic(c *C) { c.Check(pair.Invert(kA), Equals, kB) c.Check(kA, Equals, pair.Convert(kB)) } + + c.Log("listA: ", listA) + c.Log("listB: ", listB) } func strsToKeys(strs []string) []ds.Key { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go index f540e5c92..1d1f91bf5 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go @@ -3,9 +3,12 @@ package leveldb import ( "io" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util" + + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) type Datastore interface { @@ -69,13 +72,54 @@ func (d *datastore) Delete(key ds.Key) (err error) { return err } -func (d *datastore) KeyList() ([]ds.Key, error) { - i := d.DB.NewIterator(nil, nil) - var keys []ds.Key - for i.Next() { - keys = append(keys, ds.NewKey(string(i.Key()))) +func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) { + var rnge *util.Range + if q.Prefix != "" { + rnge = util.BytesPrefix([]byte(q.Prefix)) } - return keys, nil + i := d.DB.NewIterator(rnge, nil) + + // offset + if q.Offset > 0 { + for j := 0; j < q.Offset; j++ { + i.Next() + } + } + + var es []dsq.Entry + for i.Next() { + + // limit + if q.Limit > 0 && len(es) >= q.Limit { + break + } + + k := ds.NewKey(string(i.Key())).String() + e := dsq.Entry{Key: k} + + if !q.KeysOnly { + buf := make([]byte, len(i.Value())) + copy(buf, i.Value()) + e.Value = buf + } + + es = append(es, e) + } + i.Release() + if err := i.Error(); err != nil { + return nil, err + } + + // Now, apply remaining pieces. + q2 := q + q2.Offset = 0 // already applied + q2.Limit = 0 // already applied + // TODO: make this async with: + // qr := dsq.ResultsWithEntriesChan(q, ch) + qr := dsq.ResultsWithEntries(q, es) + qr = q2.ApplyTo(qr) + qr.Query = q // set it back + return qr, nil } // LevelDB needs to be closed. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go new file mode 100644 index 000000000..828c04139 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go @@ -0,0 +1,99 @@ +package leveldb + +import ( + "io/ioutil" + "os" + "testing" + + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" +) + +var testcases = map[string]string{ + "/a": "a", + "/a/b": "ab", + "/a/b/c": "abc", + "/a/b/d": "a/b/d", + "/a/c": "ac", + "/a/d": "ad", + "/e": "e", + "/f": "f", +} + +func TestQuery(t *testing.T) { + path, err := ioutil.TempDir("/tmp", "testing_leveldb_") + if err != nil { + t.Fatal(err) + } + defer func() { + os.RemoveAll(path) + }() + + d, err := NewDatastore(path, nil) + if err != nil { + t.Fatal(err) + } + defer d.Close() + + for k, v := range testcases { + dsk := ds.NewKey(k) + if err := d.Put(dsk, []byte(v)); err != nil { + t.Fatal(err) + } + } + + for k, v := range testcases { + dsk := ds.NewKey(k) + v2, err := d.Get(dsk) + if err != nil { + t.Fatal(err) + } + v2b := v2.([]byte) + if string(v2b) != v { + t.Errorf("%s values differ: %s != %s", k, v, v2) + } + } + + rs, err := d.Query(dsq.Query{Prefix: "/a/"}) + if err != nil { + t.Fatal(err) + } + + expectMatches(t, []string{ + "/a/b", + "/a/b/c", + "/a/b/d", + "/a/c", + "/a/d", + }, rs.AllEntries()) + + // test offset and limit + + rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 2, Limit: 2}) + if err != nil { + t.Fatal(err) + } + + expectMatches(t, []string{ + "/a/b/d", + "/a/c", + }, rs.AllEntries()) + +} + +func expectMatches(t *testing.T, expect []string, actual []dsq.Entry) { + if len(actual) != len(expect) { + t.Error("not enough", expect, actual) + } + for _, k := range expect { + found := false + for _, e := range actual { + if e.Key == k { + found = true + } + } + if !found { + t.Error(k, "not found") + } + } +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go index dec21e4b9..72fa225e1 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go @@ -4,7 +4,9 @@ import ( "errors" lru "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // Datastore uses golang-lru for internal storage. @@ -49,6 +51,6 @@ func (d *Datastore) Delete(key ds.Key) (err error) { } // KeyList returns a list of keys in the datastore -func (d *Datastore) KeyList() ([]ds.Key, error) { +func (d *Datastore) Query(q dsq.Query) (*dsq.Results, error) { return nil, errors.New("KeyList not implemented.") } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go index df8a4f419..6ef2edbfc 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go @@ -5,9 +5,11 @@ import ( "sort" "testing" + . "launchpad.net/gocheck" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace" - . "launchpad.net/gocheck" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // Hook up gocheck into the "go test" runner. @@ -46,10 +48,13 @@ func (ks *DSSuite) TestBasic(c *C) { c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true) } - listA, errA := mpds.KeyList() - listB, errB := nsds.KeyList() + listAr, errA := mpds.Query(dsq.Query{}) + listBr, errB := nsds.Query(dsq.Query{}) c.Check(errA, Equals, nil) c.Check(errB, Equals, nil) + + listA := ds.EntryKeys(listAr.AllEntries()) + listB := ds.EntryKeys(listBr.AllEntries()) c.Check(len(listA), Equals, len(listB)) // sort them cause yeah. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go index a1c5850fe..70556355f 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go @@ -5,6 +5,7 @@ import ( "os" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) type datastore struct { @@ -57,11 +58,11 @@ func (d *datastore) Delete(key ds.Key) error { return nil } -func (d *datastore) KeyList() ([]ds.Key, error) { - kl, err := d.child.KeyList() +func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) { + r, err := d.child.Query(q) if err != nil { fmt.Fprintf(os.Stdout, "panic datastore: %s", err) - panic("panic datastore: KeyList failed") + panic("panic datastore: Query failed") } - return kl, nil + return r, nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter.go new file mode 100644 index 000000000..d8b48ea32 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter.go @@ -0,0 +1,86 @@ +package query + +import ( + "fmt" + "reflect" + "strings" +) + +// Filter is an object that tests ResultEntries +type Filter interface { + // Filter returns whether an entry passes the filter + Filter(e Entry) bool +} + +// Op is a comparison operator +type Op string + +var ( + Equal = Op("==") + NotEqual = Op("!=") + GreaterThan = Op(">") + GreaterThanOrEqual = Op(">=") + LessThan = Op("<") + LessThanOrEqual = Op("<=") +) + +// FilterValueCompare is used to signal to datastores they +// should apply internal comparisons. unfortunately, there +// is no way to apply comparisons* to interface{} types in +// Go, so if the datastore doesnt have a special way to +// handle these comparisons, you must provided the +// TypedFilter to actually do filtering. +// +// [*] other than == and !=, which use reflect.DeepEqual. +type FilterValueCompare struct { + Op Op + Value interface{} + TypedFilter Filter +} + +func (f FilterValueCompare) Filter(e Entry) bool { + if f.TypedFilter != nil { + return f.TypedFilter.Filter(e) + } + + switch f.Op { + case Equal: + return reflect.DeepEqual(f.Value, e.Value) + case NotEqual: + return !reflect.DeepEqual(f.Value, e.Value) + default: + panic(fmt.Errorf("cannot apply op '%s' to interface{}.", f.Op)) + } +} + +type FilterKeyCompare struct { + Op Op + Key string +} + +func (f FilterKeyCompare) Filter(e Entry) bool { + switch f.Op { + case Equal: + return e.Key == f.Key + case NotEqual: + return e.Key != f.Key + case GreaterThan: + return e.Key > f.Key + case GreaterThanOrEqual: + return e.Key >= f.Key + case LessThan: + return e.Key < f.Key + case LessThanOrEqual: + return e.Key <= f.Key + default: + panic(fmt.Errorf("unknown op '%s'", f.Op)) + } +} + +type FilterKeyPrefix struct { + Prefix string +} + +func (f FilterKeyPrefix) Filter(e Entry) bool { + return strings.HasPrefix(e.Key, f.Prefix) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go new file mode 100644 index 000000000..5543ad786 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go @@ -0,0 +1,75 @@ +package query + +import ( + "strings" + "testing" +) + +var sampleKeys = []string{ + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", +} + +type filterTestCase struct { + filter Filter + keys []string + expect []string +} + +func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) { + e := make([]Entry, len(keys)) + for i, k := range keys { + e[i] = Entry{Key: k} + } + + res := ResultsWithEntries(Query{}, e) + res = NaiveFilter(res, f) + actualE := res.AllEntries() + actual := make([]string, len(actualE)) + for i, e := range actualE { + actual[i] = e.Key + } + + if len(actual) != len(expect) { + t.Error("expect != actual.", expect, actual) + } + + if strings.Join(actual, "") != strings.Join(expect, "") { + t.Error("expect != actual.", expect, actual) + } +} + +func TestFilterKeyCompare(t *testing.T) { + + testKeyFilter(t, FilterKeyCompare{Equal, "/ab"}, sampleKeys, []string{"/ab"}) + testKeyFilter(t, FilterKeyCompare{GreaterThan, "/ab"}, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + "/abce", + "/abcf", + }) + testKeyFilter(t, FilterKeyCompare{LessThanOrEqual, "/ab"}, sampleKeys, []string{ + "/a", + "/ab", + }) +} + +func TestFilterKeyPrefix(t *testing.T) { + + testKeyFilter(t, FilterKeyPrefix{"/a"}, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", + }) + testKeyFilter(t, FilterKeyPrefix{"/ab/"}, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + }) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order.go new file mode 100644 index 000000000..8fa987ba4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order.go @@ -0,0 +1,66 @@ +package query + +import ( + "sort" +) + +// Order is an object used to order objects +type Order interface { + + // Sort sorts the Entry slice according to + // the Order criteria. + Sort([]Entry) +} + +// OrderByValue is used to signal to datastores they +// should apply internal orderings. unfortunately, there +// is no way to apply order comparisons to interface{} types +// in Go, so if the datastore doesnt have a special way to +// handle these comparisons, you must provide an Order +// implementation that casts to the correct type. +type OrderByValue struct { + TypedOrder Order +} + +func (o OrderByValue) Sort(res []Entry) { + if o.TypedOrder == nil { + panic("cannot order interface{} by value. see query docs.") + } + o.TypedOrder.Sort(res) +} + +// OrderByValueDescending is used to signal to datastores they +// should apply internal orderings. unfortunately, there +// is no way to apply order comparisons to interface{} types +// in Go, so if the datastore doesnt have a special way to +// handle these comparisons, you are SOL. +type OrderByValueDescending struct { + TypedOrder Order +} + +func (o OrderByValueDescending) Sort(res []Entry) { + if o.TypedOrder == nil { + panic("cannot order interface{} by value. see query docs.") + } + o.TypedOrder.Sort(res) +} + +// OrderByKey +type OrderByKey struct{} + +func (o OrderByKey) Sort(res []Entry) { + sort.Stable(reByKey(res)) +} + +// OrderByKeyDescending +type OrderByKeyDescending struct{} + +func (o OrderByKeyDescending) Sort(res []Entry) { + sort.Stable(sort.Reverse(reByKey(res))) +} + +type reByKey []Entry + +func (s reByKey) Len() int { return len(s) } +func (s reByKey) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s reByKey) Less(i, j int) bool { return s[i].Key < s[j].Key } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go new file mode 100644 index 000000000..8d6c0b81b --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go @@ -0,0 +1,55 @@ +package query + +import ( + "strings" + "testing" +) + +type orderTestCase struct { + order Order + keys []string + expect []string +} + +func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) { + e := make([]Entry, len(keys)) + for i, k := range keys { + e[i] = Entry{Key: k} + } + + res := ResultsWithEntries(Query{}, e) + res = NaiveOrder(res, f) + actualE := res.AllEntries() + actual := make([]string, len(actualE)) + for i, e := range actualE { + actual[i] = e.Key + } + + if len(actual) != len(expect) { + t.Error("expect != actual.", expect, actual) + } + + if strings.Join(actual, "") != strings.Join(expect, "") { + t.Error("expect != actual.", expect, actual) + } +} + +func TestOrderByKey(t *testing.T) { + + testKeyOrder(t, OrderByKey{}, sampleKeys, []string{ + "/a", + "/ab", + "/ab/c", + "/ab/cd", + "/abce", + "/abcf", + }) + testKeyOrder(t, OrderByKeyDescending{}, sampleKeys, []string{ + "/abcf", + "/abce", + "/ab/cd", + "/ab/c", + "/ab", + "/a", + }) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go new file mode 100644 index 000000000..89787661e --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go @@ -0,0 +1,145 @@ +package query + +/* +Query represents storage for any key-value pair. + +tl;dr: + + queries are supported across datastores. + Cheap on top of relational dbs, and expensive otherwise. + Pick the right tool for the job! + +In addition to the key-value store get and set semantics, datastore +provides an interface to retrieve multiple records at a time through +the use of queries. The datastore Query model gleans a common set of +operations performed when querying. To avoid pasting here years of +database research, let’s summarize the operations datastore supports. + +Query Operations: + + * namespace - scope the query, usually by object type + * filters - select a subset of values by applying constraints + * orders - sort the results by applying sort conditions + * limit - impose a numeric limit on the number of results + * offset - skip a number of results (for efficient pagination) + +datastore combines these operations into a simple Query class that allows +applications to define their constraints in a simple, generic, way without +introducing datastore specific calls, languages, etc. + +Of course, different datastores provide relational query support across a +wide spectrum, from full support in traditional databases to none at all in +most key-value stores. Datastore aims to provide a common, simple interface +for the sake of application evolution over time and keeping large code bases +free of tool-specific code. It would be ridiculous to claim to support high- +performance queries on architectures that obviously do not. Instead, datastore +provides the interface, ideally translating queries to their native form +(e.g. into SQL for MySQL). + +However, on the wrong datastore, queries can potentially incur the high cost +of performing the aforemantioned query operations on the data set directly in +Go. It is the client’s responsibility to select the right tool for the job: +pick a data storage solution that fits the application’s needs now, and wrap +it with a datastore implementation. As the needs change, swap out datastore +implementations to support your new use cases. Some applications, particularly +in early development stages, can afford to incurr the cost of queries on non- +relational databases (e.g. using a FSDatastore and not worry about a database +at all). When it comes time to switch the tool for performance, updating the +application code can be as simple as swapping the datastore in one place, not +all over the application code base. This gain in engineering time, both at +initial development and during later iterations, can significantly offset the +cost of the layer of abstraction. + +*/ +type Query struct { + Prefix string // namespaces the query to results whose keys have Prefix + Filters []Filter // filter results. apply sequentially + Orders []Order // order results. apply sequentially + Limit int // maximum number of results + Offset int // skip given number of results + KeysOnly bool // return only keys. +} + +// NotFetched is a special type that signals whether or not the value +// of an Entry has been fetched or not. This is needed because +// datastore implementations get to decide whether Query returns values +// or only keys. nil is not a good signal, as real values may be nil. +var NotFetched = struct{}{} + +// Entry is a query result entry. +type Entry struct { + Key string // cant be ds.Key because circular imports ...!!! + Value interface{} +} + +// Results is a set of Query results +type Results struct { + Query Query // the query these Results correspond to + + done chan struct{} + res chan Entry + all []Entry +} + +// ResultsWithEntriesChan returns a Results object from a +// channel of ResultEntries. It's merely an encapsulation +// that provides for AllEntries() functionality. +func ResultsWithEntriesChan(q Query, res <-chan Entry) *Results { + r := &Results{ + Query: q, + done: make(chan struct{}), + res: make(chan Entry), + all: []Entry{}, + } + + // go consume all the results and add them to the results. + go func() { + for e := range res { + r.all = append(r.all, e) + r.res <- e + } + close(r.res) + close(r.done) + }() + return r +} + +// ResultsWithEntries returns a Results object from a +// channel of ResultEntries. It's merely an encapsulation +// that provides for AllEntries() functionality. +func ResultsWithEntries(q Query, res []Entry) *Results { + r := &Results{ + Query: q, + done: make(chan struct{}), + res: make(chan Entry), + all: res, + } + + // go add all the results + go func() { + for _, e := range res { + r.res <- e + } + close(r.res) + close(r.done) + }() + return r +} + +// Entries() returns results through a channel. +// Results may arrive at any time. +// The channel may or may not be buffered. +// The channel may or may not rate limit the query processing. +func (r *Results) Entries() <-chan Entry { + return r.res +} + +// AllEntries returns all the entries in Results. +// It blocks until all the results have come in. +func (r *Results) AllEntries() []Entry { + for e := range r.res { + _ = e + } + <-r.done + return r.all +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go new file mode 100644 index 000000000..ee1a4cdba --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go @@ -0,0 +1,85 @@ +package query + +// NaiveFilter applies a filter to the results +func NaiveFilter(qr *Results, filter Filter) *Results { + ch := make(chan Entry) + go func() { + defer close(ch) + + for e := range qr.Entries() { + if filter.Filter(e) { + ch <- e + } + } + }() + return ResultsWithEntriesChan(qr.Query, ch) +} + +// NaiveLimit truncates the results to a given int limit +func NaiveLimit(qr *Results, limit int) *Results { + ch := make(chan Entry) + go func() { + defer close(ch) + + for l := 0; l < limit; l++ { + e, more := <-qr.Entries() + if !more { + return + } + ch <- e + } + }() + return ResultsWithEntriesChan(qr.Query, ch) +} + +// NaiveOffset skips a given number of results +func NaiveOffset(qr *Results, offset int) *Results { + ch := make(chan Entry) + go func() { + defer close(ch) + + for l := 0; l < offset; l++ { + <-qr.Entries() // discard + } + + for e := range qr.Entries() { + ch <- e + } + }() + return ResultsWithEntriesChan(qr.Query, ch) +} + +// NaiveOrder reorders results according to given Order. +// WARNING: this is the only non-stream friendly operation! +func NaiveOrder(qr *Results, o Order) *Results { + e := qr.AllEntries() + o.Sort(e) + return ResultsWithEntries(qr.Query, e) +} + +func (q Query) ApplyTo(qr *Results) *Results { + if q.Prefix != "" { + qr = NaiveFilter(qr, FilterKeyPrefix{q.Prefix}) + } + for _, f := range q.Filters { + qr = NaiveFilter(qr, f) + } + for _, o := range q.Orders { + qr = NaiveOrder(qr, o) + } + if q.Offset != 0 { + qr = NaiveOffset(qr, q.Offset) + } + if q.Limit != 0 { + qr = NaiveLimit(qr, q.Offset) + } + return qr +} + +func ResultEntriesFrom(keys []string, vals []interface{}) []Entry { + re := make([]Entry, len(keys)) + for i, k := range keys { + re[i] = Entry{Key: k, Value: vals[i]} + } + return re +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go index 1881adea1..789d4b36d 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go @@ -4,6 +4,7 @@ import ( "sync" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // MutexDatastore contains a child datastire and a mutex. @@ -57,8 +58,8 @@ func (d *MutexDatastore) Delete(key ds.Key) (err error) { } // KeyList implements Datastore.KeyList -func (d *MutexDatastore) KeyList() ([]ds.Key, error) { +func (d *MutexDatastore) Query(q dsq.Query) (*dsq.Results, error) { d.RLock() defer d.RUnlock() - return d.child.KeyList() + return d.child.Query(q) } diff --git a/blocks/blockstore/write_cache_test.go b/blocks/blockstore/write_cache_test.go index c2175a1fc..1e072e95e 100644 --- a/blocks/blockstore/write_cache_test.go +++ b/blocks/blockstore/write_cache_test.go @@ -4,6 +4,7 @@ import ( "testing" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" "github.com/jbenet/go-ipfs/blocks" ) @@ -83,7 +84,7 @@ func (c *callbackDatastore) Delete(key ds.Key) (err error) { return c.ds.Delete(key) } -func (c *callbackDatastore) KeyList() ([]ds.Key, error) { +func (c *callbackDatastore) Query(q dsq.Query) (*dsq.Results, error) { c.f() - return c.ds.KeyList() + return c.ds.Query(q) } diff --git a/util/datastore2/delayed.go b/util/datastore2/delayed.go index b8670b252..ccb3cc29c 100644 --- a/util/datastore2/delayed.go +++ b/util/datastore2/delayed.go @@ -1,42 +1,44 @@ package datastore2 import ( - datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" + delay "github.com/jbenet/go-ipfs/util/delay" ) -func WithDelay(ds datastore.Datastore, delay delay.D) datastore.Datastore { +func WithDelay(ds ds.Datastore, delay delay.D) ds.Datastore { return &delayed{ds: ds, delay: delay} } type delayed struct { - ds datastore.Datastore + ds ds.Datastore delay delay.D } -func (dds *delayed) Put(key datastore.Key, value interface{}) (err error) { +func (dds *delayed) Put(key ds.Key, value interface{}) (err error) { dds.delay.Wait() return dds.ds.Put(key, value) } -func (dds *delayed) Get(key datastore.Key) (value interface{}, err error) { +func (dds *delayed) Get(key ds.Key) (value interface{}, err error) { dds.delay.Wait() return dds.ds.Get(key) } -func (dds *delayed) Has(key datastore.Key) (exists bool, err error) { +func (dds *delayed) Has(key ds.Key) (exists bool, err error) { dds.delay.Wait() return dds.ds.Has(key) } -func (dds *delayed) Delete(key datastore.Key) (err error) { +func (dds *delayed) Delete(key ds.Key) (err error) { dds.delay.Wait() return dds.ds.Delete(key) } -func (dds *delayed) KeyList() ([]datastore.Key, error) { +func (dds *delayed) Query(q dsq.Query) (*dsq.Results, error) { dds.delay.Wait() - return dds.ds.KeyList() + return dds.ds.Query(q) } -var _ datastore.Datastore = &delayed{} +var _ ds.Datastore = &delayed{} From da976a5f2159151859e6de4b70e96a50023b6496 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 9 Jan 2015 17:39:56 -0800 Subject: [PATCH 02/13] blocks: AllKeys + tests --- blocks/blockstore/blockstore.go | 35 ++++++++++++++++- blocks/blockstore/blockstore_test.go | 59 +++++++++++++++++++++++++++- blocks/blockstore/write_cache.go | 4 ++ util/key.go | 9 ++++- 4 files changed, 103 insertions(+), 4 deletions(-) diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index e203ffc50..6132d155e 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -6,12 +6,17 @@ import ( "errors" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" ) +// BlockPrefix namespaces blockstore datastores +var BlockPrefix = ds.NewKey("blocks") + var ValueTypeMismatch = errors.New("The retrieved value is not a Block") var ErrNotFound = errors.New("blockstore: block not found") @@ -22,16 +27,20 @@ type Blockstore interface { Has(u.Key) (bool, error) Get(u.Key) (*blocks.Block, error) Put(*blocks.Block) error + AllKeys(offset int, limit int) ([]u.Key, error) } func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore { + dd := dsns.Wrap(d, BlockPrefix) return &blockstore{ - datastore: d, + datastore: dd, } } type blockstore struct { - datastore ds.ThreadSafeDatastore + datastore ds.Datastore + // cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it. + // we do check it on `NewBlockstore` though. } func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) { @@ -67,3 +76,25 @@ func (bs *blockstore) Has(k u.Key) (bool, error) { func (s *blockstore) DeleteBlock(k u.Key) error { return s.datastore.Delete(k.DsKey()) } + +// AllKeys runs a query for keys from the blockstore. +// this is very simplistic, in the future, take dsq.Query as a param? +// if offset and limit are 0, they are ignored. +func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) { + var keys []u.Key + + // TODO make async inside ds/leveldb.Query + // KeysOnly, because that would be _a lot_ of data. + q := dsq.Query{KeysOnly: true, Offset: offset, Limit: limit} + res, err := bs.datastore.Query(q) + if err != nil { + return nil, err + } + + for e := range res.Entries() { + // need to convert to u.Key using u.KeyFromDsKey. + k := u.KeyFromDsKey(ds.NewKey(e.Key)) + keys = append(keys, k) + } + return keys, nil +} diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index 00edf61ab..a80ce8337 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -2,6 +2,7 @@ package blockstore import ( "bytes" + "fmt" "testing" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -41,11 +42,49 @@ func TestPutThenGetBlock(t *testing.T) { } } +func TestAllKeys(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + N := 100 + + keys := make([]u.Key, N) + for i := 0; i < N; i++ { + block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + keys[i] = block.Key() + } + + keys2, err := bs.AllKeys(0, 0) + if err != nil { + t.Fatal(err) + } + // for _, k2 := range keys2 { + // t.Log("found ", k2.Pretty()) + // } + + expectMatches(t, keys, keys2) + + keys3, err := bs.AllKeys(N/3, N/3) + if err != nil { + t.Fatal(err) + } + for _, k3 := range keys3 { + t.Log("found ", k3.Pretty()) + } + if len(keys3) != N/3 { + t.Errorf("keys3 should be: %d != %d", N/3, len(keys3)) + } + +} + func TestValueTypeMismatch(t *testing.T) { block := blocks.NewBlock([]byte("some data")) datastore := ds.NewMapDatastore() - datastore.Put(block.Key().DsKey(), "data that isn't a block!") + k := BlockPrefix.Child(block.Key().DsKey()) + datastore.Put(k, "data that isn't a block!") blockstore := NewBlockstore(ds_sync.MutexWrap(datastore)) @@ -54,3 +93,21 @@ func TestValueTypeMismatch(t *testing.T) { t.Fatal(err) } } + +func expectMatches(t *testing.T, expect, actual []u.Key) { + + if len(expect) != len(actual) { + t.Errorf("expect and actual differ: %d != %d", len(expect), len(actual)) + } + for _, ek := range expect { + found := false + for _, ak := range actual { + if ek == ak { + found = true + } + } + if !found { + t.Error("expected key not found: ", ek) + } + } +} diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go index b46d05846..da9a0a01d 100644 --- a/blocks/blockstore/write_cache.go +++ b/blocks/blockstore/write_cache.go @@ -43,3 +43,7 @@ func (w *writecache) Put(b *blocks.Block) error { w.cache.Add(b.Key(), struct{}{}) return w.blockstore.Put(b) } + +func (w *writecache) AllKeys(offset int, limit int) ([]u.Key, error) { + return w.blockstore.AllKeys(offset, limit) +} diff --git a/util/key.go b/util/key.go index eca1255b5..445181b75 100644 --- a/util/key.go +++ b/util/key.go @@ -71,7 +71,7 @@ func (k *Key) Loggable() map[string]interface{} { // KeyFromDsKey returns a Datastore key func KeyFromDsKey(dsk ds.Key) Key { - return Key(dsk.BaseNamespace()) + return Key(dsk.String()[1:]) } // B58KeyConverter -- for KeyTransform datastores @@ -131,3 +131,10 @@ func XOR(a, b []byte) []byte { } return c } + +// KeySlice is used for sorting Keys +type KeySlice []Key + +func (es KeySlice) Len() int { return len(es) } +func (es KeySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es KeySlice) Less(i, j int) bool { return es[i] < es[j] } From 708e47fcbc8ef3bfb6ddd11bcae4130ba5b72125 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 9 Jan 2015 23:27:50 -0800 Subject: [PATCH 03/13] ipfs blocks local command ipfs blocks local returns _all_ local refs. For now this is one long op. future commits will make it async. --- core/commands/refs.go | 47 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/core/commands/refs.go b/core/commands/refs.go index 6aed45242..4390e108c 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -31,7 +31,7 @@ func KeyListTextMarshaler(res cmds.Response) (io.Reader, error) { var RefsCmd = &cmds.Command{ Helptext: cmds.HelpText{ - Tagline: "Lists link hashes from an object", + Tagline: "Lists links (references) from an object", ShortDescription: ` Retrieves the object named by and displays the link hashes it contains, with the following format: @@ -41,7 +41,9 @@ hashes it contains, with the following format: Note: list all refs recursively with -r. `, }, - + Subcommands: map[string]*cmds.Command{ + "local": RefsLocalCmd, + }, Arguments: []cmds.Argument{ cmds.StringArg("ipfs-path", true, true, "Path to the object(s) to list refs from"), }, @@ -102,6 +104,47 @@ Note: list all refs recursively with -r. if _, err := rw.WriteRefs(o); err != nil { log.Error(err) eptr.SetError(err) + return + } + } + }() + + return eptr, nil + }, +} + +var RefsLocalCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Lists all local references", + ShortDescription: ` +Displays the hashes of all local objects. +`, + }, + + Run: func(req cmds.Request) (interface{}, error) { + n, err := req.Context().GetNode() + if err != nil { + return nil, err + } + + // todo: make async + allKeys, err := n.Blockstore.AllKeys(0, 0) + if err != nil { + return nil, err + } + + piper, pipew := io.Pipe() + eptr := &ErrPassThroughReader{R: piper} + + go func() { + defer pipew.Close() + + for _, k := range allKeys { + s := k.Pretty() + "\n" + if _, err := pipew.Write([]byte(s)); err != nil { + log.Error(err) + eptr.SetError(err) + return } } }() From c0cc9511187a50665675d33335a1717b8918b594 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 12:29:07 -0800 Subject: [PATCH 04/13] updated goprocess --- Godeps/Godeps.json | 2 +- .../github.com/jbenet/goprocess/goprocess.go | 20 ++++- .../jbenet/goprocess/goprocess_test.go | 81 +++++++++++++++++-- .../github.com/jbenet/goprocess/impl-mutex.go | 53 +++++++++++- .../jbenet/goprocess/ratelimit/ratelimit.go | 27 ++----- 5 files changed, 152 insertions(+), 31 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 8ca77f4d0..1091857f8 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -156,7 +156,7 @@ }, { "ImportPath": "github.com/jbenet/goprocess", - "Rev": "162148a58668ca38b0b8f0459ccc6ca88e32f1f4" + "Rev": "7f96033e206c3cd4e79d1c61cbdfff57869feaf8" }, { "ImportPath": "github.com/kr/binarydist", diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go index 804eefc52..afe848c61 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go @@ -113,7 +113,7 @@ type Process interface { // } // // It is useful to construct simple asynchronous workers, children of p. - Go(f ProcessFunc) + Go(f ProcessFunc) Process // Close ends the process. Close blocks until the process has completely // shut down, and any teardown has run _exactly once_. The returned error @@ -121,6 +121,10 @@ type Process interface { // If the process has already been closed, Close returns immediately. Close() error + // CloseAfterChildren calls Close _after_ its children have Closed + // normally (i.e. it _does not_ attempt to close them). + CloseAfterChildren() error + // Closing is a signal to wait upon. The returned channel is closed // _after_ Close has been called at least once, but teardown may or may // not be done yet. The primary use case of Closing is for children who @@ -167,7 +171,19 @@ var nilProcessFunc = func(Process) {} // // This is because having the process you func Go(f ProcessFunc) Process { - return GoChild(Background(), f) + // return GoChild(Background(), f) + + // we use two processes, one for communication, and + // one for ensuring we wait on the function (unclosable from the outside). + p := newProcess(nil) + waitFor := newProcess(nil) + p.WaitFor(waitFor) // prevent p from closing + go func() { + f(p) + waitFor.Close() // allow p to close. + p.Close() // ensure p closes. + }() + return p } // GoChild is like Go, but it registers the returned Process as a child of parent, diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go index 1bc4fa117..277c01b72 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go @@ -289,7 +289,6 @@ func TestAddChild(t *testing.T) { func TestGoChildrenClose(t *testing.T) { var a, b, c, d, e Process - var ready = make(chan struct{}) var bWait = make(chan struct{}) var cWait = make(chan struct{}) @@ -335,10 +334,85 @@ func TestGoChildrenClose(t *testing.T) { go a.Close() testNone(t, Q) + bWait <- struct{}{} // relase b go b.Close() testNone(t, Q) + cWait <- struct{}{} // relase c + <-c.Closed() + <-b.Closed() + testStrs(t, Q, "b", "c") + testStrs(t, Q, "b", "c") + + eWait <- struct{}{} // release e + <-e.Closed() + testStrs(t, Q, "e") + + dWait <- struct{}{} // releasse d + <-d.Closed() + <-a.Closed() + testStrs(t, Q, "a", "d") + testStrs(t, Q, "a", "d") +} + +func TestCloseAfterChildren(t *testing.T) { + + var a, b, c, d, e Process + + var ready = make(chan struct{}) + + a = WithParent(Background()) + a.Go(func(p Process) { + b = p + b.Go(func(p Process) { + c = p + ready <- struct{}{} + <-p.Closing() // wait till we're told to close (parents mustnt) + }) + ready <- struct{}{} + }) + a.Go(func(p Process) { + d = p + d.Go(func(p Process) { + e = p + ready <- struct{}{} + <-p.Closing() // wait till we're told to close (parents mustnt) + }) + ready <- struct{}{} + }) + + <-ready + <-ready + <-ready + <-ready + + Q := make(chan string, 5) + + go onClosedStr(Q, "a", a) + go onClosedStr(Q, "b", b) + go onClosedStr(Q, "c", c) + go onClosedStr(Q, "d", d) + go onClosedStr(Q, "e", e) + + aDone := make(chan struct{}) + bDone := make(chan struct{}) + + testNone(t, Q) + go func() { + a.CloseAfterChildren() + aDone <- struct{}{} + }() + testNone(t, Q) + + go func() { + b.CloseAfterChildren() + bDone <- struct{}{} + }() + testNone(t, Q) + c.Close() + <-bDone + <-b.Closed() testStrs(t, Q, "b", "c") testStrs(t, Q, "b", "c") @@ -346,6 +420,7 @@ func TestGoChildrenClose(t *testing.T) { testStrs(t, Q, "e") d.Close() + <-aDone <-a.Closed() testStrs(t, Q, "a", "d") testStrs(t, Q, "a", "d") @@ -354,11 +429,7 @@ func TestGoChildrenClose(t *testing.T) { func TestBackground(t *testing.T) { // test it hangs indefinitely: b := Background() - go b.Close() - go func() { - b.Close() - }() select { case <-b.Closing(): diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go index 627b39b88..ed68b9a03 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go @@ -9,6 +9,7 @@ type process struct { children []Process // process to close with us waitfors []Process // process to only wait for teardown TeardownFunc // called to run the teardown logic. + waiting chan struct{} // closed when CloseAfterChildrenClosed is called. closing chan struct{} // closed once close starts. closed chan struct{} // closed once close is done. closeErr error // error to return to clients of Close() @@ -73,13 +74,18 @@ func (p *process) AddChild(child Process) { p.Unlock() } -func (p *process) Go(f ProcessFunc) { +func (p *process) Go(f ProcessFunc) Process { child := newProcess(nil) p.AddChild(child) + + waitFor := newProcess(nil) + child.WaitFor(waitFor) // prevent child from closing go func() { f(child) - child.Close() // close to tear down. + waitFor.Close() // allow child to close. + child.Close() // close to tear down. }() + return child } // Close is the external close function. @@ -125,3 +131,46 @@ func (p *process) doClose() { p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) close(p.closed) // signal that we're shut down (Closed) } + +// We will only wait on the children we have now. +// We will not wait on children added subsequently. +// this may change in the future. +func (p *process) CloseAfterChildren() error { + p.Lock() + select { + case <-p.Closed(): + p.Unlock() + return p.Close() // get error. safe, after p.Closed() + case <-p.waiting: // already called it. + p.Unlock() + <-p.Closed() + return p.Close() // get error. safe, after p.Closed() + default: + } + p.Unlock() + + // here only from one goroutine. + + nextToWaitFor := func() Process { + p.Lock() + defer p.Unlock() + for _, e := range p.waitfors { + select { + case <-e.Closed(): + default: + return e + } + } + return nil + } + + // wait for all processes we're waiting for are closed. + // the semantics here are simple: we will _only_ close + // if there are no processes currently waiting for. + for next := nextToWaitFor(); next != nil; next = nextToWaitFor() { + <-next.Closed() + } + + // YAY! we're done. close + return p.Close() +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit/ratelimit.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit/ratelimit.go index e9ce43287..138441506 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit/ratelimit.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit/ratelimit.go @@ -45,29 +45,14 @@ func NewRateLimiter(parent process.Process, limit int) *RateLimiter { func (rl *RateLimiter) LimitedGo(f process.ProcessFunc) { <-rl.limiter - rl.Go(func(child process.Process) { + p := rl.Go(f) - // call the function as rl.Go would. - f(child) - - // this close is here because the child may have spawned - // children of its own, and our rate limiter should capture that. - // we have two options: - // * this approach (which is what process.Go itself does), or - // * spawn another goroutine that waits on <-child.Closed() - // - // go func() { - // <-child.Closed() - // rl.limiter <- struct{}{} - // }() - // - // This approach saves a goroutine. It is fine to call child.Close() - // multiple times. - child.Close() - - // after it's done. + // this <-closed() is here because the child may have spawned + // children of its own, and our rate limiter should capture that. + go func() { + <-p.Closed() rl.limiter <- struct{}{} - }) + }() } // LimitChan returns a rate-limiting channel. it is the usual, simple, From 92e8a7bcd56a38ae722b556fc5b5e0345e1906ac Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 14:31:40 -0800 Subject: [PATCH 05/13] updated datastore for proper query handling Queries now can be cancelled and the resources collected --- Godeps/Godeps.json | 2 +- .../jbenet/go-datastore/Godeps/Godeps.json | 4 + .../jbenet/go-datastore/basic_ds.go | 18 +- .../jbenet/go-datastore/datastore.go | 2 +- .../jbenet/go-datastore/elastigo/datastore.go | 2 +- .../github.com/jbenet/go-datastore/fs/fs.go | 13 +- .../jbenet/go-datastore/fs/fs_test.go | 5 +- .../go-datastore/keytransform/keytransform.go | 24 +- .../keytransform/keytransform_test.go | 17 +- .../jbenet/go-datastore/leveldb/datastore.go | 86 ++++--- .../jbenet/go-datastore/leveldb/ds_test.go | 41 +++- .../jbenet/go-datastore/lru/datastore.go | 2 +- .../go-datastore/namespace/namespace.go | 41 +++- .../go-datastore/namespace/namespace_test.go | 17 +- .../jbenet/go-datastore/panic/panic.go | 2 +- .../jbenet/go-datastore/query/filter_test.go | 14 +- .../jbenet/go-datastore/query/order_test.go | 6 +- .../jbenet/go-datastore/query/query.go | 221 +++++++++++++----- .../jbenet/go-datastore/query/query_impl.go | 110 ++++++--- .../jbenet/go-datastore/query/query_test.go | 109 +++++++++ .../jbenet/go-datastore/sync/sync.go | 2 +- blocks/blockstore/blockstore.go | 79 ++++++- blocks/blockstore/blockstore_test.go | 165 ++++++++++++- blocks/blockstore/write_cache.go | 10 +- blocks/blockstore/write_cache_test.go | 2 +- core/commands/refs.go | 2 +- util/datastore2/delayed.go | 2 +- 27 files changed, 791 insertions(+), 207 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 1091857f8..b078a8d6e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -110,7 +110,7 @@ }, { "ImportPath": "github.com/jbenet/go-datastore", - "Rev": "8a8988d1a4e174274bd4a9dd55c4837f46fdf323" + "Rev": "35738aceb35505bd3c77c2a618fb1947ca3f72da" }, { "ImportPath": "github.com/jbenet/go-fuse-version", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json index 91aa65d50..6dc670b20 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json @@ -18,6 +18,10 @@ "ImportPath": "github.com/hashicorp/golang-lru", "Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370" }, + { + "ImportPath": "github.com/jbenet/goprocess", + "Rev": "b4b4178efcf2404ce9db72438c9c49db2fb399d8" + }, { "ImportPath": "github.com/mattbaird/elastigo/api", "Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d" diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go index 9005e8433..9e4cd9e1f 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go @@ -3,7 +3,7 @@ package datastore import ( "log" - query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // Here are some basic datastore implementations. @@ -50,13 +50,13 @@ func (d *MapDatastore) Delete(key Key) (err error) { } // Query implements Datastore.Query -func (d *MapDatastore) Query(q query.Query) (*query.Results, error) { - re := make([]query.Entry, 0, len(d.values)) +func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) { + re := make([]dsq.Entry, 0, len(d.values)) for k, v := range d.values { - re = append(re, query.Entry{Key: k.String(), Value: v}) + re = append(re, dsq.Entry{Key: k.String(), Value: v}) } - r := query.ResultsWithEntries(q, re) - r = q.ApplyTo(r) + r := dsq.ResultsWithEntries(q, re) + r = dsq.NaiveQueryApply(q, r) return r, nil } @@ -91,8 +91,8 @@ func (d *NullDatastore) Delete(key Key) (err error) { } // Query implements Datastore.Query -func (d *NullDatastore) Query(q query.Query) (*query.Results, error) { - return query.ResultsWithEntries(q, nil), nil +func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) { + return dsq.ResultsWithEntries(q, nil), nil } // LogDatastore logs all accesses through the datastore. @@ -147,7 +147,7 @@ func (d *LogDatastore) Delete(key Key) (err error) { } // Query implements Datastore.Query -func (d *LogDatastore) Query(q query.Query) (*query.Results, error) { +func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) { log.Printf("%s: Query\n", d.Name) return d.child.Query(q) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go index d6cc0b97b..30e55ddb6 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go @@ -66,7 +66,7 @@ type Datastore interface { // result.Wait() // result.AllEntries() // - Query(q query.Query) (*query.Results, error) + Query(q query.Query) (query.Results, error) } // ThreadSafeDatastore is an interface that all threadsafe datastore should diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go index e05d2228a..f53da4d4d 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/elastigo/datastore.go @@ -113,7 +113,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) { return nil } -func (d *Datastore) Query(query.Query) (*query.Results, error) { +func (d *Datastore) Query(query.Query) (query.Results, error) { return nil, errors.New("Not yet implemented!") } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go index 07613040b..7fb869113 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go @@ -83,9 +83,9 @@ func (d *Datastore) Delete(key ds.Key) (err error) { } // Query implements Datastore.Query -func (d *Datastore) Query(q query.Query) (*query.Results, error) { +func (d *Datastore) Query(q query.Query) (query.Results, error) { - entries := make(chan query.Entry) + results := make(chan query.Result) walkFn := func(path string, info os.FileInfo, err error) error { // remove ds path prefix @@ -98,17 +98,18 @@ func (d *Datastore) Query(q query.Query) (*query.Results, error) { path = path[:len(path)-len(ObjectKeySuffix)] } key := ds.NewKey(path) - entries <- query.Entry{Key: key.String(), Value: query.NotFetched} + entry := query.Entry{Key: key.String(), Value: query.NotFetched} + results <- query.Result{Entry: entry} } return nil } go func() { filepath.Walk(d.path, walkFn) - close(entries) + close(results) }() - r := query.ResultsWithEntriesChan(q, entries) - r = q.ApplyTo(r) + r := query.ResultsWithChan(q, results) + r = query.NaiveQueryApply(q, r) return r, nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go index beef672aa..d0ec9f2db 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs_test.go @@ -67,7 +67,10 @@ func (ks *DSSuite) TestBasic(c *C) { "/foo/bar/bazb", "/foo/bar/baz/barb", } - all := r.AllEntries() + all, err := r.Rest() + if err != nil { + c.Fatal(err) + } c.Check(len(all), Equals, len(expect)) for _, k := range expect { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go index f23a0de43..be9734658 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go @@ -52,24 +52,24 @@ func (d *ktds) Delete(key ds.Key) (err error) { } // Query implements Query, inverting keys on the way back out. -func (d *ktds) Query(q dsq.Query) (*dsq.Results, error) { - - q2 := q - q2.Prefix = d.ConvertKey(ds.NewKey(q2.Prefix)).String() - r, err := d.child.Query(q2) +func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { + qr, err := d.child.Query(q) if err != nil { return nil, err } - ch := make(chan dsq.Entry) + ch := make(chan dsq.Result) go func() { - for e := range r.Entries() { - e.Key = d.InvertKey(ds.NewKey(e.Key)).String() - ch <- e + defer close(ch) + defer qr.Close() + + for r := range qr.Next() { + if r.Error == nil { + r.Entry.Key = d.InvertKey(ds.NewKey(r.Entry.Key)).String() + } + ch <- r } - close(ch) }() - r2 := dsq.ResultsWithEntriesChan(q, ch) - return r2, nil + return dsq.DerivedResults(qr, ch), nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go index 868c7b1d4..701f94582 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform_test.go @@ -62,13 +62,18 @@ func (ks *DSSuite) TestBasic(c *C) { c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true) } - listAr, errA := mpds.Query(dsq.Query{}) - listBr, errB := ktds.Query(dsq.Query{}) - c.Check(errA, Equals, nil) - c.Check(errB, Equals, nil) + run := func(d ds.Datastore, q dsq.Query) []ds.Key { + r, err := d.Query(q) + c.Check(err, Equals, nil) - listA := ds.EntryKeys(listAr.AllEntries()) - listB := ds.EntryKeys(listBr.AllEntries()) + e, err := r.Rest() + c.Check(err, Equals, nil) + + return ds.EntryKeys(e) + } + + listA := run(mpds, dsq.Query{}) + listB := run(ktds, dsq.Query{}) c.Check(len(listA), Equals, len(listB)) // sort them cause yeah. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go index 1d1f91bf5..f10fddfd3 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go @@ -3,12 +3,12 @@ package leveldb import ( "io" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) type Datastore interface { @@ -72,54 +72,80 @@ func (d *datastore) Delete(key ds.Key) (err error) { return err } -func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) { +func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { + + // we can use multiple iterators concurrently. see: + // https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator + // advance the iterator only if the reader reads + // + // run query in own sub-process tied to Results.Process(), so that + // it waits for us to finish AND so that clients can signal to us + // that resources should be reclaimed. + qrb := dsq.NewResultBuilder(q) + qrb.Process.Go(func(worker goprocess.Process) { + d.runQuery(worker, qrb) + }) + + // go wait on the worker (without signaling close) + go qrb.Process.CloseAfterChildren() + + // Now, apply remaining things (filters, order) + qr := qrb.Results() + for _, f := range q.Filters { + qr = dsq.NaiveFilter(qr, f) + } + for _, o := range q.Orders { + qr = dsq.NaiveOrder(qr, o) + } + return qr, nil +} + +func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) { + var rnge *util.Range - if q.Prefix != "" { - rnge = util.BytesPrefix([]byte(q.Prefix)) + if qrb.Query.Prefix != "" { + rnge = util.BytesPrefix([]byte(qrb.Query.Prefix)) } i := d.DB.NewIterator(rnge, nil) + defer i.Release() - // offset - if q.Offset > 0 { - for j := 0; j < q.Offset; j++ { + // advance iterator for offset + if qrb.Query.Offset > 0 { + for j := 0; j < qrb.Query.Offset; j++ { i.Next() } } - var es []dsq.Entry - for i.Next() { - - // limit - if q.Limit > 0 && len(es) >= q.Limit { + // iterate, and handle limit, too + for sent := 0; i.Next(); sent++ { + // end early if we hit the limit + if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit { break } k := ds.NewKey(string(i.Key())).String() e := dsq.Entry{Key: k} - if !q.KeysOnly { + if !qrb.Query.KeysOnly { buf := make([]byte, len(i.Value())) copy(buf, i.Value()) e.Value = buf } - es = append(es, e) - } - i.Release() - if err := i.Error(); err != nil { - return nil, err + select { + case qrb.Output <- dsq.Result{Entry: e}: // we sent it out + case <-worker.Closing(): // client told us to end early. + break + } } - // Now, apply remaining pieces. - q2 := q - q2.Offset = 0 // already applied - q2.Limit = 0 // already applied - // TODO: make this async with: - // qr := dsq.ResultsWithEntriesChan(q, ch) - qr := dsq.ResultsWithEntries(q, es) - qr = q2.ApplyTo(qr) - qr.Query = q // set it back - return qr, nil + if err := i.Error(); err != nil { + select { + case qrb.Output <- dsq.Result{Error: err}: // client read our error + case <-worker.Closing(): // client told us to end. + return + } + } } // LevelDB needs to be closed. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go index 828c04139..9a556e285 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go @@ -20,21 +20,28 @@ var testcases = map[string]string{ "/f": "f", } -func TestQuery(t *testing.T) { +// returns datastore, and a function to call on exit. +// (this garbage collects). So: +// +// d, close := newDS(t) +// defer close() +func newDS(t *testing.T) (Datastore, func()) { path, err := ioutil.TempDir("/tmp", "testing_leveldb_") if err != nil { t.Fatal(err) } - defer func() { - os.RemoveAll(path) - }() d, err := NewDatastore(path, nil) if err != nil { t.Fatal(err) } - defer d.Close() + return d, func() { + os.RemoveAll(path) + d.Close() + } +} +func addTestCases(t *testing.T, d Datastore, testcases map[string]string) { for k, v := range testcases { dsk := ds.NewKey(k) if err := d.Put(dsk, []byte(v)); err != nil { @@ -54,6 +61,13 @@ func TestQuery(t *testing.T) { } } +} + +func TestQuery(t *testing.T) { + d, close := newDS(t) + defer close() + addTestCases(t, d, testcases) + rs, err := d.Query(dsq.Query{Prefix: "/a/"}) if err != nil { t.Fatal(err) @@ -65,7 +79,7 @@ func TestQuery(t *testing.T) { "/a/b/d", "/a/c", "/a/d", - }, rs.AllEntries()) + }, rs) // test offset and limit @@ -77,11 +91,22 @@ func TestQuery(t *testing.T) { expectMatches(t, []string{ "/a/b/d", "/a/c", - }, rs.AllEntries()) + }, rs) } -func expectMatches(t *testing.T, expect []string, actual []dsq.Entry) { +func TestQueryRespectsProcess(t *testing.T) { + d, close := newDS(t) + defer close() + addTestCases(t, d, testcases) +} + +func expectMatches(t *testing.T, expect []string, actualR dsq.Results) { + actual, err := actualR.Rest() + if err != nil { + t.Error(err) + } + if len(actual) != len(expect) { t.Error("not enough", expect, actual) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go index 72fa225e1..074bc726f 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go @@ -51,6 +51,6 @@ func (d *Datastore) Delete(key ds.Key) (err error) { } // KeyList returns a list of keys in the datastore -func (d *Datastore) Query(q dsq.Query) (*dsq.Results, error) { +func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { return nil, errors.New("KeyList not implemented.") } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go index 4ad5f3073..9b9cf01bb 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go @@ -6,6 +6,7 @@ import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ktds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) // PrefixTransform constructs a KeyTransform with a pair of functions that @@ -40,5 +41,43 @@ func Wrap(child ds.Datastore, prefix ds.Key) ktds.Datastore { panic("child (ds.Datastore) is nil") } - return ktds.Wrap(child, PrefixTransform(prefix)) + d := ktds.Wrap(child, PrefixTransform(prefix)) + return &datastore{Datastore: d, raw: child, prefix: prefix} +} + +type datastore struct { + prefix ds.Key + raw ds.Datastore + ktds.Datastore +} + +// Query implements Query, inverting keys on the way back out. +func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { + qr, err := d.raw.Query(q) + if err != nil { + return nil, err + } + + ch := make(chan dsq.Result) + go func() { + defer close(ch) + defer qr.Close() + + for r := range qr.Next() { + if r.Error != nil { + ch <- r + continue + } + + k := ds.NewKey(r.Entry.Key) + if !d.prefix.IsAncestorOf(k) { + continue + } + + r.Entry.Key = d.Datastore.InvertKey(k).String() + ch <- r + } + }() + + return dsq.DerivedResults(qr, ch), nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go index 6ef2edbfc..eb9908927 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace_test.go @@ -48,13 +48,18 @@ func (ks *DSSuite) TestBasic(c *C) { c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true) } - listAr, errA := mpds.Query(dsq.Query{}) - listBr, errB := nsds.Query(dsq.Query{}) - c.Check(errA, Equals, nil) - c.Check(errB, Equals, nil) + run := func(d ds.Datastore, q dsq.Query) []ds.Key { + r, err := d.Query(q) + c.Check(err, Equals, nil) - listA := ds.EntryKeys(listAr.AllEntries()) - listB := ds.EntryKeys(listBr.AllEntries()) + e, err := r.Rest() + c.Check(err, Equals, nil) + + return ds.EntryKeys(e) + } + + listA := run(mpds, dsq.Query{}) + listB := run(nsds, dsq.Query{}) c.Check(len(listA), Equals, len(listB)) // sort them cause yeah. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go index 70556355f..be10e5976 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go @@ -58,7 +58,7 @@ func (d *datastore) Delete(key ds.Key) error { return nil } -func (d *datastore) Query(q dsq.Query) (*dsq.Results, error) { +func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { r, err := d.child.Query(q) if err != nil { fmt.Fprintf(os.Stdout, "panic datastore: %s", err) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go index 5543ad786..b4c7d8ac4 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/filter_test.go @@ -5,15 +5,6 @@ import ( "testing" ) -var sampleKeys = []string{ - "/ab/c", - "/ab/cd", - "/a", - "/abce", - "/abcf", - "/ab", -} - type filterTestCase struct { filter Filter keys []string @@ -28,7 +19,10 @@ func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) { res := ResultsWithEntries(Query{}, e) res = NaiveFilter(res, f) - actualE := res.AllEntries() + actualE, err := res.Rest() + if err != nil { + t.Fatal(err) + } actual := make([]string, len(actualE)) for i, e := range actualE { actual[i] = e.Key diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go index 8d6c0b81b..648304172 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/order_test.go @@ -19,7 +19,11 @@ func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) { res := ResultsWithEntries(Query{}, e) res = NaiveOrder(res, f) - actualE := res.AllEntries() + actualE, err := res.Rest() + if err != nil { + t.Fatal(err) + } + actual := make([]string, len(actualE)) for i, e := range actualE { actual[i] = e.Key diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go index 89787661e..434c5592a 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query.go @@ -1,5 +1,9 @@ package query +import ( + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + /* Query represents storage for any key-value pair. @@ -64,7 +68,7 @@ type Query struct { // of an Entry has been fetched or not. This is needed because // datastore implementations get to decide whether Query returns values // or only keys. nil is not a good signal, as real values may be nil. -var NotFetched = struct{}{} +const NotFetched int = iota // Entry is a query result entry. type Entry struct { @@ -72,74 +76,175 @@ type Entry struct { Value interface{} } -// Results is a set of Query results -type Results struct { - Query Query // the query these Results correspond to +// Result is a special entry that includes an error, so that the client +// may be warned about internal errors. +type Result struct { + Entry - done chan struct{} - res chan Entry - all []Entry + Error error } -// ResultsWithEntriesChan returns a Results object from a -// channel of ResultEntries. It's merely an encapsulation -// that provides for AllEntries() functionality. -func ResultsWithEntriesChan(q Query, res <-chan Entry) *Results { - r := &Results{ - Query: q, - done: make(chan struct{}), - res: make(chan Entry), - all: []Entry{}, - } +// Results is a set of Query results. This is the interface for clients. +// Example: +// +// qr, _ := myds.Query(q) +// for r := range qr.Next() { +// if r.Error != nil { +// // handle. +// break +// } +// +// fmt.Println(r.Entry.Key, r.Entry.Value) +// } +// +// or, wait on all results at once: +// +// qr, _ := myds.Query(q) +// es, _ := qr.Rest() +// for _, e := range es { +// fmt.Println(e.Key, e.Value) +// } +// +type Results interface { + Query() Query // the query these Results correspond to + Next() <-chan Result // returns a channel to wait for the next result + Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once. + Close() error // client may call Close to signal early exit - // go consume all the results and add them to the results. - go func() { - for e := range res { - r.all = append(r.all, e) - r.res <- e - } - close(r.res) - close(r.done) - }() - return r + // Process returns a goprocess.Process associated with these results. + // most users will not need this function (Close is all they want), + // but it's here in case you want to connect the results to other + // goprocess-friendly things. + Process() goprocess.Process } -// ResultsWithEntries returns a Results object from a -// channel of ResultEntries. It's merely an encapsulation -// that provides for AllEntries() functionality. -func ResultsWithEntries(q Query, res []Entry) *Results { - r := &Results{ - Query: q, - done: make(chan struct{}), - res: make(chan Entry), - all: res, - } - - // go add all the results - go func() { - for _, e := range res { - r.res <- e - } - close(r.res) - close(r.done) - }() - return r +// results implements Results +type results struct { + query Query + proc goprocess.Process + res <-chan Result } -// Entries() returns results through a channel. -// Results may arrive at any time. -// The channel may or may not be buffered. -// The channel may or may not rate limit the query processing. -func (r *Results) Entries() <-chan Entry { +func (r *results) Next() <-chan Result { return r.res } -// AllEntries returns all the entries in Results. -// It blocks until all the results have come in. -func (r *Results) AllEntries() []Entry { +func (r *results) Rest() ([]Entry, error) { + var es []Entry for e := range r.res { - _ = e + if e.Error != nil { + return es, e.Error + } + es = append(es, e.Entry) + } + <-r.proc.Closed() // wait till the processing finishes. + return es, nil +} + +func (r *results) Process() goprocess.Process { + return r.proc +} + +func (r *results) Close() error { + return r.proc.Close() +} + +func (r *results) Query() Query { + return r.query +} + +// ResultBuilder is what implementors use to construct results +// Implementors of datastores and their clients must respect the +// Process of the Request: +// +// * clients must call r.Process().Close() on an early exit, so +// implementations can reclaim resources. +// * if the Entries are read to completion (channel closed), Process +// should be closed automatically. +// * datastores must respect <-Process.Closing(), which intermediates +// an early close signal from the client. +// +type ResultBuilder struct { + Query Query + Process goprocess.Process + Output chan Result +} + +// Results returns a Results to to this builder. +func (rb *ResultBuilder) Results() Results { + return &results{ + query: rb.Query, + proc: rb.Process, + res: rb.Output, + } +} + +func NewResultBuilder(q Query) *ResultBuilder { + b := &ResultBuilder{ + Query: q, + Output: make(chan Result), + } + b.Process = goprocess.WithTeardown(func() error { + close(b.Output) + return nil + }) + return b +} + +// ResultsWithChan returns a Results object from a channel +// of Result entries. Respects its own Close() +func ResultsWithChan(q Query, res <-chan Result) Results { + b := NewResultBuilder(q) + + // go consume all the entries and add them to the results. + b.Process.Go(func(worker goprocess.Process) { + for { + select { + case <-worker.Closing(): // client told us to close early + return + case e, more := <-res: + if !more { + return + } + + select { + case b.Output <- e: + case <-worker.Closing(): // client told us to close early + return + } + } + } + return + }) + + go b.Process.CloseAfterChildren() + return b.Results() +} + +// ResultsWithEntries returns a Results object from a list of entries +func ResultsWithEntries(q Query, res []Entry) Results { + b := NewResultBuilder(q) + + // go consume all the entries and add them to the results. + b.Process.Go(func(worker goprocess.Process) { + for _, e := range res { + select { + case b.Output <- Result{Entry: e}: + case <-worker.Closing(): // client told us to close early + return + } + } + return + }) + + go b.Process.CloseAfterChildren() + return b.Results() +} + +func ResultsReplaceQuery(r Results, q Query) Results { + return &results{ + query: q, + proc: r.Process(), + res: r.Next(), } - <-r.done - return r.all } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go index ee1a4cdba..9e584e7b2 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_impl.go @@ -1,63 +1,105 @@ package query -// NaiveFilter applies a filter to the results -func NaiveFilter(qr *Results, filter Filter) *Results { - ch := make(chan Entry) +func DerivedResults(qr Results, ch <-chan Result) Results { + return &results{ + query: qr.Query(), + proc: qr.Process(), + res: ch, + } +} + +// NaiveFilter applies a filter to the results. +func NaiveFilter(qr Results, filter Filter) Results { + ch := make(chan Result) go func() { defer close(ch) + defer qr.Close() - for e := range qr.Entries() { - if filter.Filter(e) { + for e := range qr.Next() { + if e.Error != nil || filter.Filter(e.Entry) { ch <- e } } }() - return ResultsWithEntriesChan(qr.Query, ch) + + return DerivedResults(qr, ch) } // NaiveLimit truncates the results to a given int limit -func NaiveLimit(qr *Results, limit int) *Results { - ch := make(chan Entry) +func NaiveLimit(qr Results, limit int) Results { + ch := make(chan Result) go func() { defer close(ch) + defer qr.Close() - for l := 0; l < limit; l++ { - e, more := <-qr.Entries() - if !more { - return + l := 0 + for e := range qr.Next() { + if e.Error != nil { + ch <- e + continue + } + ch <- e + l++ + if limit > 0 && l >= limit { + break + } + } + }() + + return DerivedResults(qr, ch) +} + +// NaiveOffset skips a given number of results +func NaiveOffset(qr Results, offset int) Results { + ch := make(chan Result) + go func() { + defer close(ch) + defer qr.Close() + + sent := 0 + for e := range qr.Next() { + if e.Error != nil { + ch <- e + } + + if sent < offset { + sent++ + continue } ch <- e } }() - return ResultsWithEntriesChan(qr.Query, ch) -} -// NaiveOffset skips a given number of results -func NaiveOffset(qr *Results, offset int) *Results { - ch := make(chan Entry) - go func() { - defer close(ch) - - for l := 0; l < offset; l++ { - <-qr.Entries() // discard - } - - for e := range qr.Entries() { - ch <- e - } - }() - return ResultsWithEntriesChan(qr.Query, ch) + return DerivedResults(qr, ch) } // NaiveOrder reorders results according to given Order. // WARNING: this is the only non-stream friendly operation! -func NaiveOrder(qr *Results, o Order) *Results { - e := qr.AllEntries() - o.Sort(e) - return ResultsWithEntries(qr.Query, e) +func NaiveOrder(qr Results, o Order) Results { + ch := make(chan Result) + var entries []Entry + go func() { + defer close(ch) + defer qr.Close() + + for e := range qr.Next() { + if e.Error != nil { + ch <- e + } + + entries = append(entries, e.Entry) + } + + o.Sort(entries) + for _, e := range entries { + ch <- Result{Entry: e} + } + }() + + return DerivedResults(qr, ch) } -func (q Query) ApplyTo(qr *Results) *Results { +func NaiveQueryApply(q Query, qr Results) Results { if q.Prefix != "" { qr = NaiveFilter(qr, FilterKeyPrefix{q.Prefix}) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_test.go new file mode 100644 index 000000000..e00edf095 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/query/query_test.go @@ -0,0 +1,109 @@ +package query + +import ( + "strings" + "testing" +) + +var sampleKeys = []string{ + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", +} + +type testCase struct { + keys []string + expect []string +} + +func testResults(t *testing.T, res Results, expect []string) { + actualE, err := res.Rest() + if err != nil { + t.Fatal(err) + } + + actual := make([]string, len(actualE)) + for i, e := range actualE { + actual[i] = e.Key + } + + if len(actual) != len(expect) { + t.Error("expect != actual.", expect, actual) + } + + if strings.Join(actual, "") != strings.Join(expect, "") { + t.Error("expect != actual.", expect, actual) + } +} + +func TestLimit(t *testing.T) { + testKeyLimit := func(t *testing.T, limit int, keys []string, expect []string) { + e := make([]Entry, len(keys)) + for i, k := range keys { + e[i] = Entry{Key: k} + } + + res := ResultsWithEntries(Query{}, e) + res = NaiveLimit(res, limit) + testResults(t, res, expect) + } + + testKeyLimit(t, 0, sampleKeys, []string{ // none + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", + }) + + testKeyLimit(t, 10, sampleKeys, []string{ // large + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", + }) + + testKeyLimit(t, 2, sampleKeys, []string{ + "/ab/c", + "/ab/cd", + }) +} + +func TestOffset(t *testing.T) { + + testOffset := func(t *testing.T, offset int, keys []string, expect []string) { + e := make([]Entry, len(keys)) + for i, k := range keys { + e[i] = Entry{Key: k} + } + + res := ResultsWithEntries(Query{}, e) + res = NaiveOffset(res, offset) + testResults(t, res, expect) + } + + testOffset(t, 0, sampleKeys, []string{ // none + "/ab/c", + "/ab/cd", + "/a", + "/abce", + "/abcf", + "/ab", + }) + + testOffset(t, 10, sampleKeys, []string{ // large + }) + + testOffset(t, 2, sampleKeys, []string{ + "/a", + "/abce", + "/abcf", + "/ab", + }) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go index 789d4b36d..ec2bfb504 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go @@ -58,7 +58,7 @@ func (d *MutexDatastore) Delete(key ds.Key) (err error) { } // KeyList implements Datastore.KeyList -func (d *MutexDatastore) Query(q dsq.Query) (*dsq.Results, error) { +func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { d.RLock() defer d.RUnlock() return d.child.Query(q) diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 6132d155e..63d5df54b 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -5,6 +5,7 @@ package blockstore import ( "errors" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" dsns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace" dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" @@ -12,8 +13,11 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) +var log = eventlog.Logger("blockstore") + // BlockPrefix namespaces blockstore datastores var BlockPrefix = ds.NewKey("blocks") @@ -27,7 +31,9 @@ type Blockstore interface { Has(u.Key) (bool, error) Get(u.Key) (*blocks.Block, error) Put(*blocks.Block) error - AllKeys(offset int, limit int) ([]u.Key, error) + + AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) + AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) } func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore { @@ -80,10 +86,29 @@ func (s *blockstore) DeleteBlock(k u.Key) error { // AllKeys runs a query for keys from the blockstore. // this is very simplistic, in the future, take dsq.Query as a param? // if offset and limit are 0, they are ignored. -func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) { - var keys []u.Key +// +// AllKeys respects context +func (bs *blockstore) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) { + + ch, err := bs.AllKeysChan(ctx, offset, limit) + if err != nil { + return nil, err + } + + var keys []u.Key + for k := range ch { + keys = append(keys, k) + } + return keys, nil +} + +// AllKeys runs a query for keys from the blockstore. +// this is very simplistic, in the future, take dsq.Query as a param? +// if offset and limit are 0, they are ignored. +// +// AllKeys respects context +func (bs *blockstore) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) { - // TODO make async inside ds/leveldb.Query // KeysOnly, because that would be _a lot_ of data. q := dsq.Query{KeysOnly: true, Offset: offset, Limit: limit} res, err := bs.datastore.Query(q) @@ -91,10 +116,46 @@ func (bs *blockstore) AllKeys(offset int, limit int) ([]u.Key, error) { return nil, err } - for e := range res.Entries() { - // need to convert to u.Key using u.KeyFromDsKey. - k := u.KeyFromDsKey(ds.NewKey(e.Key)) - keys = append(keys, k) + // this function is here to compartmentalize + get := func() (k u.Key, ok bool) { + select { + case <-ctx.Done(): + return k, false + case e, more := <-res.Next(): + if !more { + return k, false + } + if e.Error != nil { + log.Debug("blockstore.AllKeysChan got err:", e.Error) + return k, false + } + + // need to convert to u.Key using u.KeyFromDsKey. + k = u.KeyFromDsKey(ds.NewKey(e.Key)) + return k, true + } } - return keys, nil + + output := make(chan u.Key) + go func() { + defer func() { + res.Process().Close() // ensure exit (signals early exit, too) + close(output) + }() + + for { + k, ok := get() + if !ok { + return + } + + select { + case <-ctx.Done(): + return + case output <- k: + } + } + }() + + return output, nil } diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index a80ce8337..de74d6d83 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -5,8 +5,11 @@ import ( "fmt" "testing" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" ) @@ -42,9 +45,11 @@ func TestPutThenGetBlock(t *testing.T) { } } -func TestAllKeys(t *testing.T) { - bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) - N := 100 +func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []u.Key) { + if d == nil { + d = ds.NewMapDatastore() + } + bs := NewBlockstore(ds_sync.MutexWrap(d)) keys := make([]u.Key, N) for i := 0; i < N; i++ { @@ -55,8 +60,14 @@ func TestAllKeys(t *testing.T) { } keys[i] = block.Key() } + return bs, keys +} - keys2, err := bs.AllKeys(0, 0) +func TestAllKeysSimple(t *testing.T) { + bs, keys := newBlockStoreWithKeys(t, nil, 100) + + ctx := context.Background() + keys2, err := bs.AllKeys(ctx, 0, 0) if err != nil { t.Fatal(err) } @@ -65,8 +76,14 @@ func TestAllKeys(t *testing.T) { // } expectMatches(t, keys, keys2) +} - keys3, err := bs.AllKeys(N/3, N/3) +func TestAllKeysOffsetAndLimit(t *testing.T) { + N := 30 + bs, _ := newBlockStoreWithKeys(t, nil, N) + + ctx := context.Background() + keys3, err := bs.AllKeys(ctx, N/3, N/3) if err != nil { t.Fatal(err) } @@ -76,6 +93,114 @@ func TestAllKeys(t *testing.T) { if len(keys3) != N/3 { t.Errorf("keys3 should be: %d != %d", N/3, len(keys3)) } +} + +func TestAllKeysRespectsContext(t *testing.T) { + N := 100 + + d := &queryTestDS{ds: ds.NewMapDatastore()} + bs, _ := newBlockStoreWithKeys(t, d, N) + + started := make(chan struct{}, 1) + done := make(chan struct{}, 1) + errors := make(chan error, 100) + + getKeys := func(ctx context.Context) { + started <- struct{}{} + _, err := bs.AllKeys(ctx, 0, 0) // once without cancelling + if err != nil { + errors <- err + } + done <- struct{}{} + errors <- nil // a nil one to signal break + } + + // Once without context, to make sure it all works + { + var results dsq.Results + resultChan := make(chan dsq.Result) + d.SetFunc(func(q dsq.Query) (dsq.Results, error) { + results = dsq.ResultsWithChan(q, resultChan) + return results, nil + }) + + go getKeys(context.Background()) + + // make sure it's waiting. + <-started + select { + case <-done: + t.Fatal("sync is wrong") + case <-results.Process().Closing(): + t.Fatal("should not be closing") + case <-results.Process().Closed(): + t.Fatal("should not be closed") + default: + } + + e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()} + resultChan <- dsq.Result{Entry: e} // let it go. + close(resultChan) + <-done // should be done now. + <-results.Process().Closed() // should be closed now + + // print any errors + for err := range errors { + if err == nil { + break + } + t.Error(err) + } + } + + // Once with + { + var results dsq.Results + resultChan := make(chan dsq.Result) + d.SetFunc(func(q dsq.Query) (dsq.Results, error) { + results = dsq.ResultsWithChan(q, resultChan) + return results, nil + }) + + ctx, cancel := context.WithCancel(context.Background()) + go getKeys(ctx) + + // make sure it's waiting. + <-started + select { + case <-done: + t.Fatal("sync is wrong") + case <-results.Process().Closing(): + t.Fatal("should not be closing") + case <-results.Process().Closed(): + t.Fatal("should not be closed") + default: + } + + cancel() // let it go. + + select { + case <-done: + t.Fatal("sync is wrong") + case <-results.Process().Closed(): + t.Fatal("should not be closed") // should not be closed yet. + case <-results.Process().Closing(): + // should be closing now! + t.Log("closing correctly at this point.") + } + + close(resultChan) + <-done // should be done now. + <-results.Process().Closed() // should be closed now + + // print any errors + for err := range errors { + if err == nil { + break + } + t.Error(err) + } + } } @@ -111,3 +236,33 @@ func expectMatches(t *testing.T, expect, actual []u.Key) { } } } + +type queryTestDS struct { + cb func(q dsq.Query) (dsq.Results, error) + ds ds.Datastore +} + +func (c *queryTestDS) SetFunc(f func(dsq.Query) (dsq.Results, error)) { c.cb = f } + +func (c *queryTestDS) Put(key ds.Key, value interface{}) (err error) { + return c.ds.Put(key, value) +} + +func (c *queryTestDS) Get(key ds.Key) (value interface{}, err error) { + return c.ds.Get(key) +} + +func (c *queryTestDS) Has(key ds.Key) (exists bool, err error) { + return c.ds.Has(key) +} + +func (c *queryTestDS) Delete(key ds.Key) (err error) { + return c.ds.Delete(key) +} + +func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) { + if c.cb != nil { + return c.cb(q) + } + return c.ds.Query(q) +} diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go index da9a0a01d..377ce629d 100644 --- a/blocks/blockstore/write_cache.go +++ b/blocks/blockstore/write_cache.go @@ -1,7 +1,9 @@ package blockstore import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru" + "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" ) @@ -44,6 +46,10 @@ func (w *writecache) Put(b *blocks.Block) error { return w.blockstore.Put(b) } -func (w *writecache) AllKeys(offset int, limit int) ([]u.Key, error) { - return w.blockstore.AllKeys(offset, limit) +func (w *writecache) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) { + return w.blockstore.AllKeys(ctx, offset, limit) +} + +func (w *writecache) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) { + return w.blockstore.AllKeysChan(ctx, offset, limit) } diff --git a/blocks/blockstore/write_cache_test.go b/blocks/blockstore/write_cache_test.go index 1e072e95e..b20188e29 100644 --- a/blocks/blockstore/write_cache_test.go +++ b/blocks/blockstore/write_cache_test.go @@ -84,7 +84,7 @@ func (c *callbackDatastore) Delete(key ds.Key) (err error) { return c.ds.Delete(key) } -func (c *callbackDatastore) Query(q dsq.Query) (*dsq.Results, error) { +func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) { c.f() return c.ds.Query(q) } diff --git a/core/commands/refs.go b/core/commands/refs.go index 4390e108c..0e26be2c2 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -128,7 +128,7 @@ Displays the hashes of all local objects. } // todo: make async - allKeys, err := n.Blockstore.AllKeys(0, 0) + allKeys, err := n.Blockstore.AllKeys(context.TODO(), 0, 0) if err != nil { return nil, err } diff --git a/util/datastore2/delayed.go b/util/datastore2/delayed.go index ccb3cc29c..ca2541913 100644 --- a/util/datastore2/delayed.go +++ b/util/datastore2/delayed.go @@ -36,7 +36,7 @@ func (dds *delayed) Delete(key ds.Key) (err error) { return dds.ds.Delete(key) } -func (dds *delayed) Query(q dsq.Query) (*dsq.Results, error) { +func (dds *delayed) Query(q dsq.Query) (dsq.Results, error) { dds.delay.Wait() return dds.ds.Query(q) } From 0e2a554c8da6788cd331044a19c2ba9508baf37e Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 21:08:52 -0800 Subject: [PATCH 06/13] cmds: fix error header output using Header().Set after WriteHeader() has no effect. cc @mappum --- commands/http/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/http/handler.go b/commands/http/handler.go index 329e8fa29..a409a67c3 100644 --- a/commands/http/handler.go +++ b/commands/http/handler.go @@ -95,8 +95,8 @@ func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { out, err := res.Reader() if err != nil { - w.WriteHeader(http.StatusInternalServerError) w.Header().Set(contentTypeHeader, "text/plain") + w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } From ddd7540186598ddafa12951d4dcfc3b2aa4f2774 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 20:37:05 -0800 Subject: [PATCH 07/13] cmds: flush output on standard readers cc @mappum can we do this for the copyChunks case? --- commands/http/handler.go | 45 ++++++++++++++++++++++++++++++++++++---- commands/request.go | 9 +++++++- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/commands/http/handler.go b/commands/http/handler.go index a409a67c3..df952ff94 100644 --- a/commands/http/handler.go +++ b/commands/http/handler.go @@ -6,6 +6,8 @@ import ( "io" "net/http" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + cmds "github.com/jbenet/go-ipfs/commands" u "github.com/jbenet/go-ipfs/util" ) @@ -26,6 +28,7 @@ const ( contentTypeHeader = "Content-Type" contentLengthHeader = "Content-Length" transferEncodingHeader = "Transfer-Encoding" + applicationJson = "application/json" ) var mimeTypes = map[string]string{ @@ -44,6 +47,11 @@ func NewHandler(ctx cmds.Context, root *cmds.Command, origin string) *Handler { } func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // create a context.Context to pass into the commands. + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + i.ctx.Context = ctx + log.Debug("Incoming API request: ", r.URL) if len(i.origin) > 0 { @@ -106,19 +114,30 @@ func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { _, isChan := res.Output().(chan interface{}) streamChans, _, _ := req.Option("stream-channels").Bool() if isChan && streamChans { - err = copyChunks(w, out) + err = copyChunks(applicationJson, w, out) if err != nil { log.Error(err) } return } - io.Copy(w, out) + flushCopy(w, out) +} + +// flushCopy Copies from an io.Reader to a http.ResponseWriter. +// Flushes chunks over HTTP stream as they are read (if supported by transport). +func flushCopy(w http.ResponseWriter, out io.Reader) error { + if _, ok := w.(http.Flusher); !ok { + return copyChunks("", w, out) + } + + io.Copy(&flushResponse{w}, out) + return nil } // Copies from an io.Reader to a http.ResponseWriter. // Flushes chunks over HTTP stream as they are read (if supported by transport). -func copyChunks(w http.ResponseWriter, out io.Reader) error { +func copyChunks(contentType string, w http.ResponseWriter, out io.Reader) error { hijacker, ok := w.(http.Hijacker) if !ok { return errors.New("Could not create hijacker") @@ -130,7 +149,9 @@ func copyChunks(w http.ResponseWriter, out io.Reader) error { defer conn.Close() writer.WriteString("HTTP/1.1 200 OK\r\n") - writer.WriteString(contentTypeHeader + ": application/json\r\n") + if contentType != "" { + writer.WriteString(contentTypeHeader + ": " + contentType + "\r\n") + } writer.WriteString(transferEncodingHeader + ": chunked\r\n") writer.WriteString(channelHeader + ": 1\r\n\r\n") @@ -165,3 +186,19 @@ func copyChunks(w http.ResponseWriter, out io.Reader) error { return nil } + +type flushResponse struct { + W http.ResponseWriter +} + +func (fr *flushResponse) Write(buf []byte) (int, error) { + n, err := fr.W.Write(buf) + if err != nil { + return n, err + } + + if flusher, ok := fr.W.(http.Flusher); ok { + flusher.Flush() + } + return n, err +} diff --git a/commands/request.go b/commands/request.go index ab58f0cf4..a44317c81 100644 --- a/commands/request.go +++ b/commands/request.go @@ -6,6 +6,8 @@ import ( "reflect" "strconv" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/jbenet/go-ipfs/config" "github.com/jbenet/go-ipfs/core" u "github.com/jbenet/go-ipfs/util" @@ -14,6 +16,10 @@ import ( type optMap map[string]interface{} type Context struct { + // this Context is temporary. Will be replaced soon, as we get + // rid of this variable entirely. + Context context.Context + Online bool ConfigRoot string @@ -267,7 +273,8 @@ func NewRequest(path []string, opts optMap, args []string, file File, cmd *Comma optDefs = make(map[string]Option) } - req := &request{path, opts, args, file, cmd, Context{}, optDefs} + ctx := Context{Context: context.TODO()} + req := &request{path, opts, args, file, cmd, ctx, optDefs} err := req.ConvertOptions() if err != nil { return nil, err From 322d6d0b05409d78738f4e3056dd4bb6019d2347 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 21:12:45 -0800 Subject: [PATCH 08/13] cmds: use flushCopy instrad of copychunks @mappum would this work? --- commands/http/handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/commands/http/handler.go b/commands/http/handler.go index df952ff94..9291165c1 100644 --- a/commands/http/handler.go +++ b/commands/http/handler.go @@ -114,6 +114,9 @@ func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { _, isChan := res.Output().(chan interface{}) streamChans, _, _ := req.Option("stream-channels").Bool() if isChan && streamChans { + // w.WriteString(transferEncodingHeader + ": chunked\r\n") + // w.Header().Set(channelHeader, "1") + // w.WriteHeader(200) err = copyChunks(applicationJson, w, out) if err != nil { log.Error(err) From 374a75b61203f956ca008851fba7205ddf113813 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 20:44:51 -0800 Subject: [PATCH 09/13] refs: tie the contexts together --- blocks/blockstore/blockstore.go | 1 + core/commands/refs.go | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 63d5df54b..484e15154 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -132,6 +132,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context, offset int, limit int) (< // need to convert to u.Key using u.KeyFromDsKey. k = u.KeyFromDsKey(ds.NewKey(e.Key)) + log.Debug("blockstore: query got key", k) return k, true } } diff --git a/core/commands/refs.go b/core/commands/refs.go index 0e26be2c2..d9999cdde 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -54,6 +54,7 @@ Note: list all refs recursively with -r. cmds.BoolOption("recursive", "r", "Recursively list links of child nodes"), }, Run: func(req cmds.Request) (interface{}, error) { + ctx := req.Context().Context n, err := req.Context().GetNode() if err != nil { return nil, err @@ -93,7 +94,7 @@ Note: list all refs recursively with -r. rw := RefWriter{ W: pipew, DAG: n.DAG, - Ctx: n.Context(), + Ctx: ctx, Unique: unique, PrintEdge: edges, PrintFmt: format, @@ -122,13 +123,14 @@ Displays the hashes of all local objects. }, Run: func(req cmds.Request) (interface{}, error) { + ctx := req.Context().Context n, err := req.Context().GetNode() if err != nil { return nil, err } // todo: make async - allKeys, err := n.Blockstore.AllKeys(context.TODO(), 0, 0) + allKeys, err := n.Blockstore.AllKeysChan(ctx, 0, 0) if err != nil { return nil, err } @@ -139,7 +141,7 @@ Displays the hashes of all local objects. go func() { defer pipew.Close() - for _, k := range allKeys { + for k := range allKeys { s := k.Pretty() + "\n" if _, err := pipew.Write([]byte(s)); err != nil { log.Error(err) From f7941e9841a960dcc08a352d8565ffd3edd74740 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 21:08:34 -0800 Subject: [PATCH 10/13] ping: use context --- core/commands/ping.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/core/commands/ping.go b/core/commands/ping.go index 523d4357d..096852cb8 100644 --- a/core/commands/ping.go +++ b/core/commands/ping.go @@ -72,6 +72,7 @@ Send pings to a peer using the routing system to discover its address }, }, Run: func(req cmds.Request) (interface{}, error) { + ctx := req.Context().Context n, err := req.Context().GetNode() if err != nil { return nil, err @@ -103,14 +104,14 @@ Send pings to a peer using the routing system to discover its address outChan := make(chan interface{}) - go pingPeer(n, peerID, numPings, outChan) + go pingPeer(ctx, n, peerID, numPings, outChan) return outChan, nil }, Type: PingResult{}, } -func pingPeer(n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interface{}) { +func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interface{}) { defer close(outChan) if len(n.Peerstore.Addresses(pid)) == 0 { @@ -119,8 +120,7 @@ func pingPeer(n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interfac Text: fmt.Sprintf("Looking up peer %s", pid.Pretty()), } - // TODO: get master context passed in - ctx, _ := context.WithTimeout(context.TODO(), kPingTimeout) + ctx, _ := context.WithTimeout(ctx, kPingTimeout) p, err := n.Routing.FindPeer(ctx, pid) if err != nil { outChan <- &PingResult{Text: fmt.Sprintf("Peer lookup error: %s", err)} @@ -131,9 +131,17 @@ func pingPeer(n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interfac outChan <- &PingResult{Text: fmt.Sprintf("PING %s.", pid.Pretty())} + var done bool var total time.Duration - for i := 0; i < numPings; i++ { - ctx, _ := context.WithTimeout(context.TODO(), kPingTimeout) + for i := 0; i < numPings && !done; i++ { + select { + case <-ctx.Done(): + done = true + continue + default: + } + + ctx, _ := context.WithTimeout(ctx, kPingTimeout) took, err := n.Routing.Ping(ctx, pid) if err != nil { log.Errorf("Ping error: %s", err) From 7867e01c909f914a3db5e7a8ab02a1b87c8be2ce Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 10 Jan 2015 21:35:26 -0800 Subject: [PATCH 11/13] ipfs ping wording --- core/commands/ping.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/commands/ping.go b/core/commands/ping.go index 096852cb8..a4166ffdd 100644 --- a/core/commands/ping.go +++ b/core/commands/ping.go @@ -31,8 +31,9 @@ var PingCmd = &cmds.Command{ Send pings to a peer using the routing system to discover its address `, ShortDescription: ` - ipfs ping is a tool to find a node (in the routing system), - send pings, wait for pongs, and print out round-trip latency information. +ipfs ping is a tool to test sending data to other nodes. It finds nodes +via the routing system, send pings, wait for pongs, and print out round- +trip latency information. `, }, Arguments: []cmds.Argument{ From 0d059a3881aff3a95c276d235659d8ace11ae8e1 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 11 Jan 2015 19:33:37 -0800 Subject: [PATCH 12/13] change block datastore prefix to "/b" the prefix should be as short as possible, as this is a per-block overhead. --- blocks/blockstore/blockstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 484e15154..ed24326c9 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -19,7 +19,7 @@ import ( var log = eventlog.Logger("blockstore") // BlockPrefix namespaces blockstore datastores -var BlockPrefix = ds.NewKey("blocks") +var BlockPrefix = ds.NewKey("b") var ValueTypeMismatch = errors.New("The retrieved value is not a Block") From 6a8414bcfb16411aae5dad46d467f0b66cdcaf5d Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 11 Jan 2015 21:19:42 -0800 Subject: [PATCH 13/13] blockstore Allkeys: ignore non multihash keys --- blocks/blockstore/blockstore.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index ed24326c9..e4a9f7745 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -133,6 +133,13 @@ func (bs *blockstore) AllKeysChan(ctx context.Context, offset int, limit int) (< // need to convert to u.Key using u.KeyFromDsKey. k = u.KeyFromDsKey(ds.NewKey(e.Key)) log.Debug("blockstore: query got key", k) + + // key must be a multihash. else ignore it. + _, err := mh.Cast([]byte(k)) + if err != nil { + return "", true + } + return k, true } } @@ -149,6 +156,9 @@ func (bs *blockstore) AllKeysChan(ctx context.Context, offset int, limit int) (< if !ok { return } + if k == "" { + continue + } select { case <-ctx.Done():