Merge pull request #531 from jbenet/ds-query

ds-query + ipfs refs local
This commit is contained in:
Juan Batiz-Benet 2015-01-11 21:39:58 -08:00
commit cfbe44004b
40 changed files with 1810 additions and 123 deletions

4
Godeps/Godeps.json generated
View File

@ -110,7 +110,7 @@
},
{
"ImportPath": "github.com/jbenet/go-datastore",
"Rev": "6a1c83bda2a71a9bdc936749fdb507df958ed949"
"Rev": "35738aceb35505bd3c77c2a618fb1947ca3f72da"
},
{
"ImportPath": "github.com/jbenet/go-fuse-version",
@ -156,7 +156,7 @@
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "162148a58668ca38b0b8f0459ccc6ca88e32f1f4"
"Rev": "7f96033e206c3cd4e79d1c61cbdfff57869feaf8"
},
{
"ImportPath": "github.com/kr/binarydist",

View File

@ -0,0 +1,11 @@
language: go
go:
- 1.3
- release
- tip
script:
- make test
env: TEST_NO_FUSE=1 TEST_VERBOSE=1

View File

@ -18,6 +18,10 @@
"ImportPath": "github.com/hashicorp/golang-lru",
"Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370"
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "b4b4178efcf2404ce9db72438c9c49db2fb399d8"
},
{
"ImportPath": "github.com/mattbaird/elastigo/api",
"Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d"

View File

@ -1,6 +1,9 @@
build:
go build
test:
go test ./...
# saves/vendors third-party dependencies to Godeps/_workspace
# -r flag rewrites import paths to use the vendored path
# ./... performs operation on all packages in tree

View File

@ -1,6 +1,10 @@
package datastore
import "log"
import (
"log"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Here are some basic datastore implementations.
@ -45,13 +49,15 @@ func (d *MapDatastore) Delete(key Key) (err error) {
return nil
}
// KeyList implements Datastore.KeyList
func (d *MapDatastore) KeyList() ([]Key, error) {
var keys []Key
for k := range d.values {
keys = append(keys, k)
// Query implements Datastore.Query
func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) {
re := make([]dsq.Entry, 0, len(d.values))
for k, v := range d.values {
re = append(re, dsq.Entry{Key: k.String(), Value: v})
}
return keys, nil
r := dsq.ResultsWithEntries(q, re)
r = dsq.NaiveQueryApply(q, r)
return r, nil
}
// NullDatastore stores nothing, but conforms to the API.
@ -84,9 +90,9 @@ func (d *NullDatastore) Delete(key Key) (err error) {
return nil
}
// KeyList implements Datastore.KeyList
func (d *NullDatastore) KeyList() ([]Key, error) {
return nil, nil
// Query implements Datastore.Query
func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.ResultsWithEntries(q, nil), nil
}
// LogDatastore logs all accesses through the datastore.
@ -140,8 +146,8 @@ func (d *LogDatastore) Delete(key Key) (err error) {
return d.child.Delete(key)
}
// KeyList implements Datastore.KeyList
func (d *LogDatastore) KeyList() ([]Key, error) {
log.Printf("%s: Get KeyList\n", d.Name)
return d.child.KeyList()
// Query implements Datastore.Query
func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
log.Printf("%s: Query\n", d.Name)
return d.child.Query(q)
}

View File

@ -2,6 +2,8 @@ package datastore
import (
"errors"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
/*
@ -52,8 +54,19 @@ type Datastore interface {
// Delete removes the value for given `key`.
Delete(key Key) (err error)
// KeyList returns a list of keys in the datastore
KeyList() ([]Key, error)
// Query searches the datastore and returns a query result. This function
// may return before the query actually runs. To wait for the query:
//
// result, _ := ds.Query(q)
//
// // use the channel interface; result may come in at different times
// for entry := range result.Entries() { ... }
//
// // or wait for the query to be completely done
// result.Wait()
// result.AllEntries()
//
Query(q query.Query) (query.Results, error)
}
// ThreadSafeDatastore is an interface that all threadsafe datastore should

View File

@ -8,6 +8,7 @@ import (
"github.com/codahale/blake2"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
"github.com/mattbaird/elastigo/api"
"github.com/mattbaird/elastigo/core"
)
@ -112,7 +113,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
return nil
}
func (d *Datastore) KeyList() ([]ds.Key, error) {
func (d *Datastore) Query(query.Query) (query.Results, error) {
return nil, errors.New("Not yet implemented!")
}

View File

@ -8,8 +8,11 @@ import (
"strings"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
var ObjectKeySuffix = ".dsobject"
// Datastore uses a standard Go map for internal storage.
type Datastore struct {
path string
@ -26,7 +29,7 @@ func NewDatastore(path string) (ds.Datastore, error) {
// KeyFilename returns the filename associated with `key`
func (d *Datastore) KeyFilename(key ds.Key) string {
return filepath.Join(d.path, key.String(), ".dsobject")
return filepath.Join(d.path, key.String(), ObjectKeySuffix)
}
// Put stores the given value.
@ -79,10 +82,10 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
return os.Remove(fn)
}
// KeyList returns a list of all keys in the datastore
func (d *Datastore) KeyList() ([]ds.Key, error) {
// Query implements Datastore.Query
func (d *Datastore) Query(q query.Query) (query.Results, error) {
keys := []ds.Key{}
results := make(chan query.Result)
walkFn := func(path string, info os.FileInfo, err error) error {
// remove ds path prefix
@ -91,14 +94,23 @@ func (d *Datastore) KeyList() ([]ds.Key, error) {
}
if !info.IsDir() {
if strings.HasSuffix(path, ObjectKeySuffix) {
path = path[:len(path)-len(ObjectKeySuffix)]
}
key := ds.NewKey(path)
keys = append(keys, key)
entry := query.Entry{Key: key.String(), Value: query.NotFetched}
results <- query.Result{Entry: entry}
}
return nil
}
filepath.Walk(d.path, walkFn)
return keys, nil
go func() {
filepath.Walk(d.path, walkFn)
close(results)
}()
r := query.ResultsWithChan(q, results)
r = query.NaiveQueryApply(q, r)
return r, nil
}
// isDir returns whether given path is a directory

View File

@ -4,9 +4,11 @@ import (
"bytes"
"testing"
. "launchpad.net/gocheck"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs"
. "launchpad.net/gocheck"
query "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Hook up gocheck into the "go test" runner.
@ -54,6 +56,35 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(err, Equals, nil)
c.Check(bytes.Equal(v.([]byte), []byte(k.String())), Equals, true)
}
r, err := ks.ds.Query(query.Query{Prefix: "/foo/bar/"})
if err != nil {
c.Check(err, Equals, nil)
}
expect := []string{
"/foo/bar/baz",
"/foo/bar/bazb",
"/foo/bar/baz/barb",
}
all, err := r.Rest()
if err != nil {
c.Fatal(err)
}
c.Check(len(all), Equals, len(expect))
for _, k := range expect {
found := false
for _, e := range all {
if e.Key == k {
found = true
}
}
if !found {
c.Error("did not find expected key: ", k)
}
}
}
func strsToKeys(strs []string) []ds.Key {

View File

@ -5,6 +5,8 @@ import (
"strings"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go-uuid/uuid"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
/*
@ -239,3 +241,12 @@ type KeySlice []Key
func (p KeySlice) Len() int { return len(p) }
func (p KeySlice) Less(i, j int) bool { return p[i].Less(p[j]) }
func (p KeySlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// EntryKeys
func EntryKeys(e []dsq.Entry) []Key {
ks := make([]Key, len(e))
for i, e := range e {
ks[i] = NewKey(e.Key)
}
return ks
}

View File

@ -1,6 +1,9 @@
package keytransform
import ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
import (
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
type Pair struct {
Convert KeyMapping
@ -48,16 +51,25 @@ func (d *ktds) Delete(key ds.Key) (err error) {
return d.child.Delete(d.ConvertKey(key))
}
// KeyList returns a list of all keys in the datastore, transforming keys out.
func (d *ktds) KeyList() ([]ds.Key, error) {
keys, err := d.child.KeyList()
// Query implements Query, inverting keys on the way back out.
func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.child.Query(q)
if err != nil {
return nil, err
}
for i, k := range keys {
keys[i] = d.InvertKey(k)
}
return keys, nil
ch := make(chan dsq.Result)
go func() {
defer close(ch)
defer qr.Close()
for r := range qr.Next() {
if r.Error == nil {
r.Entry.Key = d.InvertKey(ds.NewKey(r.Entry.Key)).String()
}
ch <- r
}
}()
return dsq.DerivedResults(qr, ch), nil
}

View File

@ -5,9 +5,11 @@ import (
"sort"
"testing"
. "launchpad.net/gocheck"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
kt "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform"
. "launchpad.net/gocheck"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Hook up gocheck into the "go test" runner.
@ -60,10 +62,18 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true)
}
listA, errA := mpds.KeyList()
listB, errB := ktds.KeyList()
c.Check(errA, Equals, nil)
c.Check(errB, Equals, nil)
run := func(d ds.Datastore, q dsq.Query) []ds.Key {
r, err := d.Query(q)
c.Check(err, Equals, nil)
e, err := r.Rest()
c.Check(err, Equals, nil)
return ds.EntryKeys(e)
}
listA := run(mpds, dsq.Query{})
listB := run(ktds, dsq.Query{})
c.Check(len(listA), Equals, len(listB))
// sort them cause yeah.
@ -75,6 +85,9 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(pair.Invert(kA), Equals, kB)
c.Check(kA, Equals, pair.Convert(kB))
}
c.Log("listA: ", listA)
c.Log("listB: ", listB)
}
func strsToKeys(strs []string) []ds.Key {

View File

@ -4,8 +4,11 @@ import (
"io"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util"
)
type Datastore interface {
@ -69,13 +72,80 @@ func (d *datastore) Delete(key ds.Key) (err error) {
return err
}
func (d *datastore) KeyList() ([]ds.Key, error) {
i := d.DB.NewIterator(nil, nil)
var keys []ds.Key
for i.Next() {
keys = append(keys, ds.NewKey(string(i.Key())))
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// we can use multiple iterators concurrently. see:
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// advance the iterator only if the reader reads
//
// run query in own sub-process tied to Results.Process(), so that
// it waits for us to finish AND so that clients can signal to us
// that resources should be reclaimed.
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
d.runQuery(worker, qrb)
})
// go wait on the worker (without signaling close)
go qrb.Process.CloseAfterChildren()
// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
for _, o := range q.Orders {
qr = dsq.NaiveOrder(qr, o)
}
return qr, nil
}
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
if qrb.Query.Prefix != "" {
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
}
i := d.DB.NewIterator(rnge, nil)
defer i.Release()
// advance iterator for offset
if qrb.Query.Offset > 0 {
for j := 0; j < qrb.Query.Offset; j++ {
i.Next()
}
}
// iterate, and handle limit, too
for sent := 0; i.Next(); sent++ {
// end early if we hit the limit
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
}
k := ds.NewKey(string(i.Key())).String()
e := dsq.Entry{Key: k}
if !qrb.Query.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
e.Value = buf
}
select {
case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
case <-worker.Closing(): // client told us to end early.
break
}
}
if err := i.Error(); err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}: // client read our error
case <-worker.Closing(): // client told us to end.
return
}
}
return keys, nil
}
// LevelDB needs to be closed.

View File

@ -0,0 +1,124 @@
package leveldb
import (
"io/ioutil"
"os"
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
var testcases = map[string]string{
"/a": "a",
"/a/b": "ab",
"/a/b/c": "abc",
"/a/b/d": "a/b/d",
"/a/c": "ac",
"/a/d": "ad",
"/e": "e",
"/f": "f",
}
// returns datastore, and a function to call on exit.
// (this garbage collects). So:
//
// d, close := newDS(t)
// defer close()
func newDS(t *testing.T) (Datastore, func()) {
path, err := ioutil.TempDir("/tmp", "testing_leveldb_")
if err != nil {
t.Fatal(err)
}
d, err := NewDatastore(path, nil)
if err != nil {
t.Fatal(err)
}
return d, func() {
os.RemoveAll(path)
d.Close()
}
}
func addTestCases(t *testing.T, d Datastore, testcases map[string]string) {
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {
t.Fatal(err)
}
}
for k, v := range testcases {
dsk := ds.NewKey(k)
v2, err := d.Get(dsk)
if err != nil {
t.Fatal(err)
}
v2b := v2.([]byte)
if string(v2b) != v {
t.Errorf("%s values differ: %s != %s", k, v, v2)
}
}
}
func TestQuery(t *testing.T) {
d, close := newDS(t)
defer close()
addTestCases(t, d, testcases)
rs, err := d.Query(dsq.Query{Prefix: "/a/"})
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
"/a/b",
"/a/b/c",
"/a/b/d",
"/a/c",
"/a/d",
}, rs)
// test offset and limit
rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 2, Limit: 2})
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
"/a/b/d",
"/a/c",
}, rs)
}
func TestQueryRespectsProcess(t *testing.T) {
d, close := newDS(t)
defer close()
addTestCases(t, d, testcases)
}
func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
}
if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}
for _, k := range expect {
found := false
for _, e := range actual {
if e.Key == k {
found = true
}
}
if !found {
t.Error(k, "not found")
}
}
}

View File

@ -4,7 +4,9 @@ import (
"errors"
lru "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Datastore uses golang-lru for internal storage.
@ -49,6 +51,6 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
}
// KeyList returns a list of keys in the datastore
func (d *Datastore) KeyList() ([]ds.Key, error) {
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return nil, errors.New("KeyList not implemented.")
}

View File

@ -6,6 +6,7 @@ import (
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ktds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// PrefixTransform constructs a KeyTransform with a pair of functions that
@ -40,5 +41,43 @@ func Wrap(child ds.Datastore, prefix ds.Key) ktds.Datastore {
panic("child (ds.Datastore) is nil")
}
return ktds.Wrap(child, PrefixTransform(prefix))
d := ktds.Wrap(child, PrefixTransform(prefix))
return &datastore{Datastore: d, raw: child, prefix: prefix}
}
type datastore struct {
prefix ds.Key
raw ds.Datastore
ktds.Datastore
}
// Query implements Query, inverting keys on the way back out.
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
qr, err := d.raw.Query(q)
if err != nil {
return nil, err
}
ch := make(chan dsq.Result)
go func() {
defer close(ch)
defer qr.Close()
for r := range qr.Next() {
if r.Error != nil {
ch <- r
continue
}
k := ds.NewKey(r.Entry.Key)
if !d.prefix.IsAncestorOf(k) {
continue
}
r.Entry.Key = d.Datastore.InvertKey(k).String()
ch <- r
}
}()
return dsq.DerivedResults(qr, ch), nil
}

View File

@ -5,9 +5,11 @@ import (
"sort"
"testing"
. "launchpad.net/gocheck"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
. "launchpad.net/gocheck"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// Hook up gocheck into the "go test" runner.
@ -46,10 +48,18 @@ func (ks *DSSuite) TestBasic(c *C) {
c.Check(bytes.Equal(v2.([]byte), []byte(k.String())), Equals, true)
}
listA, errA := mpds.KeyList()
listB, errB := nsds.KeyList()
c.Check(errA, Equals, nil)
c.Check(errB, Equals, nil)
run := func(d ds.Datastore, q dsq.Query) []ds.Key {
r, err := d.Query(q)
c.Check(err, Equals, nil)
e, err := r.Rest()
c.Check(err, Equals, nil)
return ds.EntryKeys(e)
}
listA := run(mpds, dsq.Query{})
listB := run(nsds, dsq.Query{})
c.Check(len(listA), Equals, len(listB))
// sort them cause yeah.

View File

@ -5,6 +5,7 @@ import (
"os"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
type datastore struct {
@ -57,11 +58,11 @@ func (d *datastore) Delete(key ds.Key) error {
return nil
}
func (d *datastore) KeyList() ([]ds.Key, error) {
kl, err := d.child.KeyList()
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
r, err := d.child.Query(q)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: KeyList failed")
panic("panic datastore: Query failed")
}
return kl, nil
return r, nil
}

View File

@ -0,0 +1,86 @@
package query
import (
"fmt"
"reflect"
"strings"
)
// Filter is an object that tests ResultEntries
type Filter interface {
// Filter returns whether an entry passes the filter
Filter(e Entry) bool
}
// Op is a comparison operator
type Op string
var (
Equal = Op("==")
NotEqual = Op("!=")
GreaterThan = Op(">")
GreaterThanOrEqual = Op(">=")
LessThan = Op("<")
LessThanOrEqual = Op("<=")
)
// FilterValueCompare is used to signal to datastores they
// should apply internal comparisons. unfortunately, there
// is no way to apply comparisons* to interface{} types in
// Go, so if the datastore doesnt have a special way to
// handle these comparisons, you must provided the
// TypedFilter to actually do filtering.
//
// [*] other than == and !=, which use reflect.DeepEqual.
type FilterValueCompare struct {
Op Op
Value interface{}
TypedFilter Filter
}
func (f FilterValueCompare) Filter(e Entry) bool {
if f.TypedFilter != nil {
return f.TypedFilter.Filter(e)
}
switch f.Op {
case Equal:
return reflect.DeepEqual(f.Value, e.Value)
case NotEqual:
return !reflect.DeepEqual(f.Value, e.Value)
default:
panic(fmt.Errorf("cannot apply op '%s' to interface{}.", f.Op))
}
}
type FilterKeyCompare struct {
Op Op
Key string
}
func (f FilterKeyCompare) Filter(e Entry) bool {
switch f.Op {
case Equal:
return e.Key == f.Key
case NotEqual:
return e.Key != f.Key
case GreaterThan:
return e.Key > f.Key
case GreaterThanOrEqual:
return e.Key >= f.Key
case LessThan:
return e.Key < f.Key
case LessThanOrEqual:
return e.Key <= f.Key
default:
panic(fmt.Errorf("unknown op '%s'", f.Op))
}
}
type FilterKeyPrefix struct {
Prefix string
}
func (f FilterKeyPrefix) Filter(e Entry) bool {
return strings.HasPrefix(e.Key, f.Prefix)
}

View File

@ -0,0 +1,69 @@
package query
import (
"strings"
"testing"
)
type filterTestCase struct {
filter Filter
keys []string
expect []string
}
func testKeyFilter(t *testing.T, f Filter, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
}
res := ResultsWithEntries(Query{}, e)
res = NaiveFilter(res, f)
actualE, err := res.Rest()
if err != nil {
t.Fatal(err)
}
actual := make([]string, len(actualE))
for i, e := range actualE {
actual[i] = e.Key
}
if len(actual) != len(expect) {
t.Error("expect != actual.", expect, actual)
}
if strings.Join(actual, "") != strings.Join(expect, "") {
t.Error("expect != actual.", expect, actual)
}
}
func TestFilterKeyCompare(t *testing.T) {
testKeyFilter(t, FilterKeyCompare{Equal, "/ab"}, sampleKeys, []string{"/ab"})
testKeyFilter(t, FilterKeyCompare{GreaterThan, "/ab"}, sampleKeys, []string{
"/ab/c",
"/ab/cd",
"/abce",
"/abcf",
})
testKeyFilter(t, FilterKeyCompare{LessThanOrEqual, "/ab"}, sampleKeys, []string{
"/a",
"/ab",
})
}
func TestFilterKeyPrefix(t *testing.T) {
testKeyFilter(t, FilterKeyPrefix{"/a"}, sampleKeys, []string{
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
})
testKeyFilter(t, FilterKeyPrefix{"/ab/"}, sampleKeys, []string{
"/ab/c",
"/ab/cd",
})
}

View File

@ -0,0 +1,66 @@
package query
import (
"sort"
)
// Order is an object used to order objects
type Order interface {
// Sort sorts the Entry slice according to
// the Order criteria.
Sort([]Entry)
}
// OrderByValue is used to signal to datastores they
// should apply internal orderings. unfortunately, there
// is no way to apply order comparisons to interface{} types
// in Go, so if the datastore doesnt have a special way to
// handle these comparisons, you must provide an Order
// implementation that casts to the correct type.
type OrderByValue struct {
TypedOrder Order
}
func (o OrderByValue) Sort(res []Entry) {
if o.TypedOrder == nil {
panic("cannot order interface{} by value. see query docs.")
}
o.TypedOrder.Sort(res)
}
// OrderByValueDescending is used to signal to datastores they
// should apply internal orderings. unfortunately, there
// is no way to apply order comparisons to interface{} types
// in Go, so if the datastore doesnt have a special way to
// handle these comparisons, you are SOL.
type OrderByValueDescending struct {
TypedOrder Order
}
func (o OrderByValueDescending) Sort(res []Entry) {
if o.TypedOrder == nil {
panic("cannot order interface{} by value. see query docs.")
}
o.TypedOrder.Sort(res)
}
// OrderByKey
type OrderByKey struct{}
func (o OrderByKey) Sort(res []Entry) {
sort.Stable(reByKey(res))
}
// OrderByKeyDescending
type OrderByKeyDescending struct{}
func (o OrderByKeyDescending) Sort(res []Entry) {
sort.Stable(sort.Reverse(reByKey(res)))
}
type reByKey []Entry
func (s reByKey) Len() int { return len(s) }
func (s reByKey) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s reByKey) Less(i, j int) bool { return s[i].Key < s[j].Key }

View File

@ -0,0 +1,59 @@
package query
import (
"strings"
"testing"
)
type orderTestCase struct {
order Order
keys []string
expect []string
}
func testKeyOrder(t *testing.T, f Order, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
}
res := ResultsWithEntries(Query{}, e)
res = NaiveOrder(res, f)
actualE, err := res.Rest()
if err != nil {
t.Fatal(err)
}
actual := make([]string, len(actualE))
for i, e := range actualE {
actual[i] = e.Key
}
if len(actual) != len(expect) {
t.Error("expect != actual.", expect, actual)
}
if strings.Join(actual, "") != strings.Join(expect, "") {
t.Error("expect != actual.", expect, actual)
}
}
func TestOrderByKey(t *testing.T) {
testKeyOrder(t, OrderByKey{}, sampleKeys, []string{
"/a",
"/ab",
"/ab/c",
"/ab/cd",
"/abce",
"/abcf",
})
testKeyOrder(t, OrderByKeyDescending{}, sampleKeys, []string{
"/abcf",
"/abce",
"/ab/cd",
"/ab/c",
"/ab",
"/a",
})
}

View File

@ -0,0 +1,250 @@
package query
import (
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
/*
Query represents storage for any key-value pair.
tl;dr:
queries are supported across datastores.
Cheap on top of relational dbs, and expensive otherwise.
Pick the right tool for the job!
In addition to the key-value store get and set semantics, datastore
provides an interface to retrieve multiple records at a time through
the use of queries. The datastore Query model gleans a common set of
operations performed when querying. To avoid pasting here years of
database research, lets summarize the operations datastore supports.
Query Operations:
* namespace - scope the query, usually by object type
* filters - select a subset of values by applying constraints
* orders - sort the results by applying sort conditions
* limit - impose a numeric limit on the number of results
* offset - skip a number of results (for efficient pagination)
datastore combines these operations into a simple Query class that allows
applications to define their constraints in a simple, generic, way without
introducing datastore specific calls, languages, etc.
Of course, different datastores provide relational query support across a
wide spectrum, from full support in traditional databases to none at all in
most key-value stores. Datastore aims to provide a common, simple interface
for the sake of application evolution over time and keeping large code bases
free of tool-specific code. It would be ridiculous to claim to support high-
performance queries on architectures that obviously do not. Instead, datastore
provides the interface, ideally translating queries to their native form
(e.g. into SQL for MySQL).
However, on the wrong datastore, queries can potentially incur the high cost
of performing the aforemantioned query operations on the data set directly in
Go. It is the clients responsibility to select the right tool for the job:
pick a data storage solution that fits the applications needs now, and wrap
it with a datastore implementation. As the needs change, swap out datastore
implementations to support your new use cases. Some applications, particularly
in early development stages, can afford to incurr the cost of queries on non-
relational databases (e.g. using a FSDatastore and not worry about a database
at all). When it comes time to switch the tool for performance, updating the
application code can be as simple as swapping the datastore in one place, not
all over the application code base. This gain in engineering time, both at
initial development and during later iterations, can significantly offset the
cost of the layer of abstraction.
*/
type Query struct {
Prefix string // namespaces the query to results whose keys have Prefix
Filters []Filter // filter results. apply sequentially
Orders []Order // order results. apply sequentially
Limit int // maximum number of results
Offset int // skip given number of results
KeysOnly bool // return only keys.
}
// NotFetched is a special type that signals whether or not the value
// of an Entry has been fetched or not. This is needed because
// datastore implementations get to decide whether Query returns values
// or only keys. nil is not a good signal, as real values may be nil.
const NotFetched int = iota
// Entry is a query result entry.
type Entry struct {
Key string // cant be ds.Key because circular imports ...!!!
Value interface{}
}
// Result is a special entry that includes an error, so that the client
// may be warned about internal errors.
type Result struct {
Entry
Error error
}
// Results is a set of Query results. This is the interface for clients.
// Example:
//
// qr, _ := myds.Query(q)
// for r := range qr.Next() {
// if r.Error != nil {
// // handle.
// break
// }
//
// fmt.Println(r.Entry.Key, r.Entry.Value)
// }
//
// or, wait on all results at once:
//
// qr, _ := myds.Query(q)
// es, _ := qr.Rest()
// for _, e := range es {
// fmt.Println(e.Key, e.Value)
// }
//
type Results interface {
Query() Query // the query these Results correspond to
Next() <-chan Result // returns a channel to wait for the next result
Rest() ([]Entry, error) // waits till processing finishes, returns all entries at once.
Close() error // client may call Close to signal early exit
// Process returns a goprocess.Process associated with these results.
// most users will not need this function (Close is all they want),
// but it's here in case you want to connect the results to other
// goprocess-friendly things.
Process() goprocess.Process
}
// results implements Results
type results struct {
query Query
proc goprocess.Process
res <-chan Result
}
func (r *results) Next() <-chan Result {
return r.res
}
func (r *results) Rest() ([]Entry, error) {
var es []Entry
for e := range r.res {
if e.Error != nil {
return es, e.Error
}
es = append(es, e.Entry)
}
<-r.proc.Closed() // wait till the processing finishes.
return es, nil
}
func (r *results) Process() goprocess.Process {
return r.proc
}
func (r *results) Close() error {
return r.proc.Close()
}
func (r *results) Query() Query {
return r.query
}
// ResultBuilder is what implementors use to construct results
// Implementors of datastores and their clients must respect the
// Process of the Request:
//
// * clients must call r.Process().Close() on an early exit, so
// implementations can reclaim resources.
// * if the Entries are read to completion (channel closed), Process
// should be closed automatically.
// * datastores must respect <-Process.Closing(), which intermediates
// an early close signal from the client.
//
type ResultBuilder struct {
Query Query
Process goprocess.Process
Output chan Result
}
// Results returns a Results to to this builder.
func (rb *ResultBuilder) Results() Results {
return &results{
query: rb.Query,
proc: rb.Process,
res: rb.Output,
}
}
func NewResultBuilder(q Query) *ResultBuilder {
b := &ResultBuilder{
Query: q,
Output: make(chan Result),
}
b.Process = goprocess.WithTeardown(func() error {
close(b.Output)
return nil
})
return b
}
// ResultsWithChan returns a Results object from a channel
// of Result entries. Respects its own Close()
func ResultsWithChan(q Query, res <-chan Result) Results {
b := NewResultBuilder(q)
// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
for {
select {
case <-worker.Closing(): // client told us to close early
return
case e, more := <-res:
if !more {
return
}
select {
case b.Output <- e:
case <-worker.Closing(): // client told us to close early
return
}
}
}
return
})
go b.Process.CloseAfterChildren()
return b.Results()
}
// ResultsWithEntries returns a Results object from a list of entries
func ResultsWithEntries(q Query, res []Entry) Results {
b := NewResultBuilder(q)
// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
for _, e := range res {
select {
case b.Output <- Result{Entry: e}:
case <-worker.Closing(): // client told us to close early
return
}
}
return
})
go b.Process.CloseAfterChildren()
return b.Results()
}
func ResultsReplaceQuery(r Results, q Query) Results {
return &results{
query: q,
proc: r.Process(),
res: r.Next(),
}
}

View File

@ -0,0 +1,127 @@
package query
func DerivedResults(qr Results, ch <-chan Result) Results {
return &results{
query: qr.Query(),
proc: qr.Process(),
res: ch,
}
}
// NaiveFilter applies a filter to the results.
func NaiveFilter(qr Results, filter Filter) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()
for e := range qr.Next() {
if e.Error != nil || filter.Filter(e.Entry) {
ch <- e
}
}
}()
return DerivedResults(qr, ch)
}
// NaiveLimit truncates the results to a given int limit
func NaiveLimit(qr Results, limit int) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()
l := 0
for e := range qr.Next() {
if e.Error != nil {
ch <- e
continue
}
ch <- e
l++
if limit > 0 && l >= limit {
break
}
}
}()
return DerivedResults(qr, ch)
}
// NaiveOffset skips a given number of results
func NaiveOffset(qr Results, offset int) Results {
ch := make(chan Result)
go func() {
defer close(ch)
defer qr.Close()
sent := 0
for e := range qr.Next() {
if e.Error != nil {
ch <- e
}
if sent < offset {
sent++
continue
}
ch <- e
}
}()
return DerivedResults(qr, ch)
}
// NaiveOrder reorders results according to given Order.
// WARNING: this is the only non-stream friendly operation!
func NaiveOrder(qr Results, o Order) Results {
ch := make(chan Result)
var entries []Entry
go func() {
defer close(ch)
defer qr.Close()
for e := range qr.Next() {
if e.Error != nil {
ch <- e
}
entries = append(entries, e.Entry)
}
o.Sort(entries)
for _, e := range entries {
ch <- Result{Entry: e}
}
}()
return DerivedResults(qr, ch)
}
func NaiveQueryApply(q Query, qr Results) Results {
if q.Prefix != "" {
qr = NaiveFilter(qr, FilterKeyPrefix{q.Prefix})
}
for _, f := range q.Filters {
qr = NaiveFilter(qr, f)
}
for _, o := range q.Orders {
qr = NaiveOrder(qr, o)
}
if q.Offset != 0 {
qr = NaiveOffset(qr, q.Offset)
}
if q.Limit != 0 {
qr = NaiveLimit(qr, q.Offset)
}
return qr
}
func ResultEntriesFrom(keys []string, vals []interface{}) []Entry {
re := make([]Entry, len(keys))
for i, k := range keys {
re[i] = Entry{Key: k, Value: vals[i]}
}
return re
}

View File

@ -0,0 +1,109 @@
package query
import (
"strings"
"testing"
)
var sampleKeys = []string{
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
}
type testCase struct {
keys []string
expect []string
}
func testResults(t *testing.T, res Results, expect []string) {
actualE, err := res.Rest()
if err != nil {
t.Fatal(err)
}
actual := make([]string, len(actualE))
for i, e := range actualE {
actual[i] = e.Key
}
if len(actual) != len(expect) {
t.Error("expect != actual.", expect, actual)
}
if strings.Join(actual, "") != strings.Join(expect, "") {
t.Error("expect != actual.", expect, actual)
}
}
func TestLimit(t *testing.T) {
testKeyLimit := func(t *testing.T, limit int, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
}
res := ResultsWithEntries(Query{}, e)
res = NaiveLimit(res, limit)
testResults(t, res, expect)
}
testKeyLimit(t, 0, sampleKeys, []string{ // none
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
})
testKeyLimit(t, 10, sampleKeys, []string{ // large
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
})
testKeyLimit(t, 2, sampleKeys, []string{
"/ab/c",
"/ab/cd",
})
}
func TestOffset(t *testing.T) {
testOffset := func(t *testing.T, offset int, keys []string, expect []string) {
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
}
res := ResultsWithEntries(Query{}, e)
res = NaiveOffset(res, offset)
testResults(t, res, expect)
}
testOffset(t, 0, sampleKeys, []string{ // none
"/ab/c",
"/ab/cd",
"/a",
"/abce",
"/abcf",
"/ab",
})
testOffset(t, 10, sampleKeys, []string{ // large
})
testOffset(t, 2, sampleKeys, []string{
"/a",
"/abce",
"/abcf",
"/ab",
})
}

View File

@ -4,6 +4,7 @@ import (
"sync"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
// MutexDatastore contains a child datastire and a mutex.
@ -57,8 +58,8 @@ func (d *MutexDatastore) Delete(key ds.Key) (err error) {
}
// KeyList implements Datastore.KeyList
func (d *MutexDatastore) KeyList() ([]ds.Key, error) {
func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
d.RLock()
defer d.RUnlock()
return d.child.KeyList()
return d.child.Query(q)
}

View File

@ -113,7 +113,7 @@ type Process interface {
// }
//
// It is useful to construct simple asynchronous workers, children of p.
Go(f ProcessFunc)
Go(f ProcessFunc) Process
// Close ends the process. Close blocks until the process has completely
// shut down, and any teardown has run _exactly once_. The returned error
@ -121,6 +121,10 @@ type Process interface {
// If the process has already been closed, Close returns immediately.
Close() error
// CloseAfterChildren calls Close _after_ its children have Closed
// normally (i.e. it _does not_ attempt to close them).
CloseAfterChildren() error
// Closing is a signal to wait upon. The returned channel is closed
// _after_ Close has been called at least once, but teardown may or may
// not be done yet. The primary use case of Closing is for children who
@ -167,7 +171,19 @@ var nilProcessFunc = func(Process) {}
//
// This is because having the process you
func Go(f ProcessFunc) Process {
return GoChild(Background(), f)
// return GoChild(Background(), f)
// we use two processes, one for communication, and
// one for ensuring we wait on the function (unclosable from the outside).
p := newProcess(nil)
waitFor := newProcess(nil)
p.WaitFor(waitFor) // prevent p from closing
go func() {
f(p)
waitFor.Close() // allow p to close.
p.Close() // ensure p closes.
}()
return p
}
// GoChild is like Go, but it registers the returned Process as a child of parent,

View File

@ -289,7 +289,6 @@ func TestAddChild(t *testing.T) {
func TestGoChildrenClose(t *testing.T) {
var a, b, c, d, e Process
var ready = make(chan struct{})
var bWait = make(chan struct{})
var cWait = make(chan struct{})
@ -335,10 +334,85 @@ func TestGoChildrenClose(t *testing.T) {
go a.Close()
testNone(t, Q)
bWait <- struct{}{} // relase b
go b.Close()
testNone(t, Q)
cWait <- struct{}{} // relase c
<-c.Closed()
<-b.Closed()
testStrs(t, Q, "b", "c")
testStrs(t, Q, "b", "c")
eWait <- struct{}{} // release e
<-e.Closed()
testStrs(t, Q, "e")
dWait <- struct{}{} // releasse d
<-d.Closed()
<-a.Closed()
testStrs(t, Q, "a", "d")
testStrs(t, Q, "a", "d")
}
func TestCloseAfterChildren(t *testing.T) {
var a, b, c, d, e Process
var ready = make(chan struct{})
a = WithParent(Background())
a.Go(func(p Process) {
b = p
b.Go(func(p Process) {
c = p
ready <- struct{}{}
<-p.Closing() // wait till we're told to close (parents mustnt)
})
ready <- struct{}{}
})
a.Go(func(p Process) {
d = p
d.Go(func(p Process) {
e = p
ready <- struct{}{}
<-p.Closing() // wait till we're told to close (parents mustnt)
})
ready <- struct{}{}
})
<-ready
<-ready
<-ready
<-ready
Q := make(chan string, 5)
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
aDone := make(chan struct{})
bDone := make(chan struct{})
testNone(t, Q)
go func() {
a.CloseAfterChildren()
aDone <- struct{}{}
}()
testNone(t, Q)
go func() {
b.CloseAfterChildren()
bDone <- struct{}{}
}()
testNone(t, Q)
c.Close()
<-bDone
<-b.Closed()
testStrs(t, Q, "b", "c")
testStrs(t, Q, "b", "c")
@ -346,6 +420,7 @@ func TestGoChildrenClose(t *testing.T) {
testStrs(t, Q, "e")
d.Close()
<-aDone
<-a.Closed()
testStrs(t, Q, "a", "d")
testStrs(t, Q, "a", "d")
@ -354,11 +429,7 @@ func TestGoChildrenClose(t *testing.T) {
func TestBackground(t *testing.T) {
// test it hangs indefinitely:
b := Background()
go b.Close()
go func() {
b.Close()
}()
select {
case <-b.Closing():

View File

@ -9,6 +9,7 @@ type process struct {
children []Process // process to close with us
waitfors []Process // process to only wait for
teardown TeardownFunc // called to run the teardown logic.
waiting chan struct{} // closed when CloseAfterChildrenClosed is called.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeErr error // error to return to clients of Close()
@ -73,13 +74,18 @@ func (p *process) AddChild(child Process) {
p.Unlock()
}
func (p *process) Go(f ProcessFunc) {
func (p *process) Go(f ProcessFunc) Process {
child := newProcess(nil)
p.AddChild(child)
waitFor := newProcess(nil)
child.WaitFor(waitFor) // prevent child from closing
go func() {
f(child)
child.Close() // close to tear down.
waitFor.Close() // allow child to close.
child.Close() // close to tear down.
}()
return child
}
// Close is the external close function.
@ -125,3 +131,46 @@ func (p *process) doClose() {
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
close(p.closed) // signal that we're shut down (Closed)
}
// We will only wait on the children we have now.
// We will not wait on children added subsequently.
// this may change in the future.
func (p *process) CloseAfterChildren() error {
p.Lock()
select {
case <-p.Closed():
p.Unlock()
return p.Close() // get error. safe, after p.Closed()
case <-p.waiting: // already called it.
p.Unlock()
<-p.Closed()
return p.Close() // get error. safe, after p.Closed()
default:
}
p.Unlock()
// here only from one goroutine.
nextToWaitFor := func() Process {
p.Lock()
defer p.Unlock()
for _, e := range p.waitfors {
select {
case <-e.Closed():
default:
return e
}
}
return nil
}
// wait for all processes we're waiting for are closed.
// the semantics here are simple: we will _only_ close
// if there are no processes currently waiting for.
for next := nextToWaitFor(); next != nil; next = nextToWaitFor() {
<-next.Closed()
}
// YAY! we're done. close
return p.Close()
}

View File

@ -45,29 +45,14 @@ func NewRateLimiter(parent process.Process, limit int) *RateLimiter {
func (rl *RateLimiter) LimitedGo(f process.ProcessFunc) {
<-rl.limiter
rl.Go(func(child process.Process) {
p := rl.Go(f)
// call the function as rl.Go would.
f(child)
// this close is here because the child may have spawned
// children of its own, and our rate limiter should capture that.
// we have two options:
// * this approach (which is what process.Go itself does), or
// * spawn another goroutine that waits on <-child.Closed()
//
// go func() {
// <-child.Closed()
// rl.limiter <- struct{}{}
// }()
//
// This approach saves a goroutine. It is fine to call child.Close()
// multiple times.
child.Close()
// after it's done.
// this <-closed() is here because the child may have spawned
// children of its own, and our rate limiter should capture that.
go func() {
<-p.Closed()
rl.limiter <- struct{}{}
})
}()
}
// LimitChan returns a rate-limiting channel. it is the usual, simple,

View File

@ -5,13 +5,22 @@ package blockstore
import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsns "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = eventlog.Logger("blockstore")
// BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("b")
var ValueTypeMismatch = errors.New("The retrieved value is not a Block")
var ErrNotFound = errors.New("blockstore: block not found")
@ -22,16 +31,22 @@ type Blockstore interface {
Has(u.Key) (bool, error)
Get(u.Key) (*blocks.Block, error)
Put(*blocks.Block) error
AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error)
AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error)
}
func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
dd := dsns.Wrap(d, BlockPrefix)
return &blockstore{
datastore: d,
datastore: dd,
}
}
type blockstore struct {
datastore ds.ThreadSafeDatastore
datastore ds.Datastore
// cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it.
// we do check it on `NewBlockstore` though.
}
func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) {
@ -67,3 +82,91 @@ func (bs *blockstore) Has(k u.Key) (bool, error) {
func (s *blockstore) DeleteBlock(k u.Key) error {
return s.datastore.Delete(k.DsKey())
}
// AllKeys runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
// if offset and limit are 0, they are ignored.
//
// AllKeys respects context
func (bs *blockstore) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) {
ch, err := bs.AllKeysChan(ctx, offset, limit)
if err != nil {
return nil, err
}
var keys []u.Key
for k := range ch {
keys = append(keys, k)
}
return keys, nil
}
// AllKeys runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
// if offset and limit are 0, they are ignored.
//
// AllKeys respects context
func (bs *blockstore) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) {
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true, Offset: offset, Limit: limit}
res, err := bs.datastore.Query(q)
if err != nil {
return nil, err
}
// this function is here to compartmentalize
get := func() (k u.Key, ok bool) {
select {
case <-ctx.Done():
return k, false
case e, more := <-res.Next():
if !more {
return k, false
}
if e.Error != nil {
log.Debug("blockstore.AllKeysChan got err:", e.Error)
return k, false
}
// need to convert to u.Key using u.KeyFromDsKey.
k = u.KeyFromDsKey(ds.NewKey(e.Key))
log.Debug("blockstore: query got key", k)
// key must be a multihash. else ignore it.
_, err := mh.Cast([]byte(k))
if err != nil {
return "", true
}
return k, true
}
}
output := make(chan u.Key)
go func() {
defer func() {
res.Process().Close() // ensure exit (signals early exit, too)
close(output)
}()
for {
k, ok := get()
if !ok {
return
}
if k == "" {
continue
}
select {
case <-ctx.Done():
return
case output <- k:
}
}
}()
return output, nil
}

View File

@ -2,10 +2,14 @@ package blockstore
import (
"bytes"
"fmt"
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
)
@ -41,11 +45,171 @@ func TestPutThenGetBlock(t *testing.T) {
}
}
func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []u.Key) {
if d == nil {
d = ds.NewMapDatastore()
}
bs := NewBlockstore(ds_sync.MutexWrap(d))
keys := make([]u.Key, N)
for i := 0; i < N; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := bs.Put(block)
if err != nil {
t.Fatal(err)
}
keys[i] = block.Key()
}
return bs, keys
}
func TestAllKeysSimple(t *testing.T) {
bs, keys := newBlockStoreWithKeys(t, nil, 100)
ctx := context.Background()
keys2, err := bs.AllKeys(ctx, 0, 0)
if err != nil {
t.Fatal(err)
}
// for _, k2 := range keys2 {
// t.Log("found ", k2.Pretty())
// }
expectMatches(t, keys, keys2)
}
func TestAllKeysOffsetAndLimit(t *testing.T) {
N := 30
bs, _ := newBlockStoreWithKeys(t, nil, N)
ctx := context.Background()
keys3, err := bs.AllKeys(ctx, N/3, N/3)
if err != nil {
t.Fatal(err)
}
for _, k3 := range keys3 {
t.Log("found ", k3.Pretty())
}
if len(keys3) != N/3 {
t.Errorf("keys3 should be: %d != %d", N/3, len(keys3))
}
}
func TestAllKeysRespectsContext(t *testing.T) {
N := 100
d := &queryTestDS{ds: ds.NewMapDatastore()}
bs, _ := newBlockStoreWithKeys(t, d, N)
started := make(chan struct{}, 1)
done := make(chan struct{}, 1)
errors := make(chan error, 100)
getKeys := func(ctx context.Context) {
started <- struct{}{}
_, err := bs.AllKeys(ctx, 0, 0) // once without cancelling
if err != nil {
errors <- err
}
done <- struct{}{}
errors <- nil // a nil one to signal break
}
// Once without context, to make sure it all works
{
var results dsq.Results
resultChan := make(chan dsq.Result)
d.SetFunc(func(q dsq.Query) (dsq.Results, error) {
results = dsq.ResultsWithChan(q, resultChan)
return results, nil
})
go getKeys(context.Background())
// make sure it's waiting.
<-started
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closing():
t.Fatal("should not be closing")
case <-results.Process().Closed():
t.Fatal("should not be closed")
default:
}
e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()}
resultChan <- dsq.Result{Entry: e} // let it go.
close(resultChan)
<-done // should be done now.
<-results.Process().Closed() // should be closed now
// print any errors
for err := range errors {
if err == nil {
break
}
t.Error(err)
}
}
// Once with
{
var results dsq.Results
resultChan := make(chan dsq.Result)
d.SetFunc(func(q dsq.Query) (dsq.Results, error) {
results = dsq.ResultsWithChan(q, resultChan)
return results, nil
})
ctx, cancel := context.WithCancel(context.Background())
go getKeys(ctx)
// make sure it's waiting.
<-started
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closing():
t.Fatal("should not be closing")
case <-results.Process().Closed():
t.Fatal("should not be closed")
default:
}
cancel() // let it go.
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closed():
t.Fatal("should not be closed") // should not be closed yet.
case <-results.Process().Closing():
// should be closing now!
t.Log("closing correctly at this point.")
}
close(resultChan)
<-done // should be done now.
<-results.Process().Closed() // should be closed now
// print any errors
for err := range errors {
if err == nil {
break
}
t.Error(err)
}
}
}
func TestValueTypeMismatch(t *testing.T) {
block := blocks.NewBlock([]byte("some data"))
datastore := ds.NewMapDatastore()
datastore.Put(block.Key().DsKey(), "data that isn't a block!")
k := BlockPrefix.Child(block.Key().DsKey())
datastore.Put(k, "data that isn't a block!")
blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
@ -54,3 +218,51 @@ func TestValueTypeMismatch(t *testing.T) {
t.Fatal(err)
}
}
func expectMatches(t *testing.T, expect, actual []u.Key) {
if len(expect) != len(actual) {
t.Errorf("expect and actual differ: %d != %d", len(expect), len(actual))
}
for _, ek := range expect {
found := false
for _, ak := range actual {
if ek == ak {
found = true
}
}
if !found {
t.Error("expected key not found: ", ek)
}
}
}
type queryTestDS struct {
cb func(q dsq.Query) (dsq.Results, error)
ds ds.Datastore
}
func (c *queryTestDS) SetFunc(f func(dsq.Query) (dsq.Results, error)) { c.cb = f }
func (c *queryTestDS) Put(key ds.Key, value interface{}) (err error) {
return c.ds.Put(key, value)
}
func (c *queryTestDS) Get(key ds.Key) (value interface{}, err error) {
return c.ds.Get(key)
}
func (c *queryTestDS) Has(key ds.Key) (exists bool, err error) {
return c.ds.Has(key)
}
func (c *queryTestDS) Delete(key ds.Key) (err error) {
return c.ds.Delete(key)
}
func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) {
if c.cb != nil {
return c.cb(q)
}
return c.ds.Query(q)
}

View File

@ -1,7 +1,9 @@
package blockstore
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru"
"github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
)
@ -43,3 +45,11 @@ func (w *writecache) Put(b *blocks.Block) error {
w.cache.Add(b.Key(), struct{}{})
return w.blockstore.Put(b)
}
func (w *writecache) AllKeys(ctx context.Context, offset int, limit int) ([]u.Key, error) {
return w.blockstore.AllKeys(ctx, offset, limit)
}
func (w *writecache) AllKeysChan(ctx context.Context, offset int, limit int) (<-chan u.Key, error) {
return w.blockstore.AllKeysChan(ctx, offset, limit)
}

View File

@ -4,6 +4,7 @@ import (
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
syncds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks"
)
@ -83,7 +84,7 @@ func (c *callbackDatastore) Delete(key ds.Key) (err error) {
return c.ds.Delete(key)
}
func (c *callbackDatastore) KeyList() ([]ds.Key, error) {
func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
c.f()
return c.ds.KeyList()
return c.ds.Query(q)
}

View File

@ -6,6 +6,8 @@ import (
"io"
"net/http"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
cmds "github.com/jbenet/go-ipfs/commands"
u "github.com/jbenet/go-ipfs/util"
)
@ -26,6 +28,7 @@ const (
contentTypeHeader = "Content-Type"
contentLengthHeader = "Content-Length"
transferEncodingHeader = "Transfer-Encoding"
applicationJson = "application/json"
)
var mimeTypes = map[string]string{
@ -44,6 +47,11 @@ func NewHandler(ctx cmds.Context, root *cmds.Command, origin string) *Handler {
}
func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// create a context.Context to pass into the commands.
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
i.ctx.Context = ctx
log.Debug("Incoming API request: ", r.URL)
if len(i.origin) > 0 {
@ -95,8 +103,8 @@ func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
out, err := res.Reader()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set(contentTypeHeader, "text/plain")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
@ -106,19 +114,33 @@ func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_, isChan := res.Output().(chan interface{})
streamChans, _, _ := req.Option("stream-channels").Bool()
if isChan && streamChans {
err = copyChunks(w, out)
// w.WriteString(transferEncodingHeader + ": chunked\r\n")
// w.Header().Set(channelHeader, "1")
// w.WriteHeader(200)
err = copyChunks(applicationJson, w, out)
if err != nil {
log.Error(err)
}
return
}
io.Copy(w, out)
flushCopy(w, out)
}
// flushCopy Copies from an io.Reader to a http.ResponseWriter.
// Flushes chunks over HTTP stream as they are read (if supported by transport).
func flushCopy(w http.ResponseWriter, out io.Reader) error {
if _, ok := w.(http.Flusher); !ok {
return copyChunks("", w, out)
}
io.Copy(&flushResponse{w}, out)
return nil
}
// Copies from an io.Reader to a http.ResponseWriter.
// Flushes chunks over HTTP stream as they are read (if supported by transport).
func copyChunks(w http.ResponseWriter, out io.Reader) error {
func copyChunks(contentType string, w http.ResponseWriter, out io.Reader) error {
hijacker, ok := w.(http.Hijacker)
if !ok {
return errors.New("Could not create hijacker")
@ -130,7 +152,9 @@ func copyChunks(w http.ResponseWriter, out io.Reader) error {
defer conn.Close()
writer.WriteString("HTTP/1.1 200 OK\r\n")
writer.WriteString(contentTypeHeader + ": application/json\r\n")
if contentType != "" {
writer.WriteString(contentTypeHeader + ": " + contentType + "\r\n")
}
writer.WriteString(transferEncodingHeader + ": chunked\r\n")
writer.WriteString(channelHeader + ": 1\r\n\r\n")
@ -165,3 +189,19 @@ func copyChunks(w http.ResponseWriter, out io.Reader) error {
return nil
}
type flushResponse struct {
W http.ResponseWriter
}
func (fr *flushResponse) Write(buf []byte) (int, error) {
n, err := fr.W.Write(buf)
if err != nil {
return n, err
}
if flusher, ok := fr.W.(http.Flusher); ok {
flusher.Flush()
}
return n, err
}

View File

@ -6,6 +6,8 @@ import (
"reflect"
"strconv"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/config"
"github.com/jbenet/go-ipfs/core"
u "github.com/jbenet/go-ipfs/util"
@ -14,6 +16,10 @@ import (
type optMap map[string]interface{}
type Context struct {
// this Context is temporary. Will be replaced soon, as we get
// rid of this variable entirely.
Context context.Context
Online bool
ConfigRoot string
@ -267,7 +273,8 @@ func NewRequest(path []string, opts optMap, args []string, file File, cmd *Comma
optDefs = make(map[string]Option)
}
req := &request{path, opts, args, file, cmd, Context{}, optDefs}
ctx := Context{Context: context.TODO()}
req := &request{path, opts, args, file, cmd, ctx, optDefs}
err := req.ConvertOptions()
if err != nil {
return nil, err

View File

@ -31,8 +31,9 @@ var PingCmd = &cmds.Command{
Send pings to a peer using the routing system to discover its address
`,
ShortDescription: `
ipfs ping is a tool to find a node (in the routing system),
send pings, wait for pongs, and print out round-trip latency information.
ipfs ping is a tool to test sending data to other nodes. It finds nodes
via the routing system, send pings, wait for pongs, and print out round-
trip latency information.
`,
},
Arguments: []cmds.Argument{
@ -72,6 +73,7 @@ Send pings to a peer using the routing system to discover its address
},
},
Run: func(req cmds.Request) (interface{}, error) {
ctx := req.Context().Context
n, err := req.Context().GetNode()
if err != nil {
return nil, err
@ -103,14 +105,14 @@ Send pings to a peer using the routing system to discover its address
outChan := make(chan interface{})
go pingPeer(n, peerID, numPings, outChan)
go pingPeer(ctx, n, peerID, numPings, outChan)
return outChan, nil
},
Type: PingResult{},
}
func pingPeer(n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interface{}) {
func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interface{}) {
defer close(outChan)
if len(n.Peerstore.Addresses(pid)) == 0 {
@ -119,8 +121,7 @@ func pingPeer(n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interfac
Text: fmt.Sprintf("Looking up peer %s", pid.Pretty()),
}
// TODO: get master context passed in
ctx, _ := context.WithTimeout(context.TODO(), kPingTimeout)
ctx, _ := context.WithTimeout(ctx, kPingTimeout)
p, err := n.Routing.FindPeer(ctx, pid)
if err != nil {
outChan <- &PingResult{Text: fmt.Sprintf("Peer lookup error: %s", err)}
@ -131,9 +132,17 @@ func pingPeer(n *core.IpfsNode, pid peer.ID, numPings int, outChan chan interfac
outChan <- &PingResult{Text: fmt.Sprintf("PING %s.", pid.Pretty())}
var done bool
var total time.Duration
for i := 0; i < numPings; i++ {
ctx, _ := context.WithTimeout(context.TODO(), kPingTimeout)
for i := 0; i < numPings && !done; i++ {
select {
case <-ctx.Done():
done = true
continue
default:
}
ctx, _ := context.WithTimeout(ctx, kPingTimeout)
took, err := n.Routing.Ping(ctx, pid)
if err != nil {
log.Errorf("Ping error: %s", err)

View File

@ -31,7 +31,7 @@ func KeyListTextMarshaler(res cmds.Response) (io.Reader, error) {
var RefsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Lists link hashes from an object",
Tagline: "Lists links (references) from an object",
ShortDescription: `
Retrieves the object named by <ipfs-path> and displays the link
hashes it contains, with the following format:
@ -41,7 +41,9 @@ hashes it contains, with the following format:
Note: list all refs recursively with -r.
`,
},
Subcommands: map[string]*cmds.Command{
"local": RefsLocalCmd,
},
Arguments: []cmds.Argument{
cmds.StringArg("ipfs-path", true, true, "Path to the object(s) to list refs from"),
},
@ -52,6 +54,7 @@ Note: list all refs recursively with -r.
cmds.BoolOption("recursive", "r", "Recursively list links of child nodes"),
},
Run: func(req cmds.Request) (interface{}, error) {
ctx := req.Context().Context
n, err := req.Context().GetNode()
if err != nil {
return nil, err
@ -91,7 +94,7 @@ Note: list all refs recursively with -r.
rw := RefWriter{
W: pipew,
DAG: n.DAG,
Ctx: n.Context(),
Ctx: ctx,
Unique: unique,
PrintEdge: edges,
PrintFmt: format,
@ -102,6 +105,48 @@ Note: list all refs recursively with -r.
if _, err := rw.WriteRefs(o); err != nil {
log.Error(err)
eptr.SetError(err)
return
}
}
}()
return eptr, nil
},
}
var RefsLocalCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Lists all local references",
ShortDescription: `
Displays the hashes of all local objects.
`,
},
Run: func(req cmds.Request) (interface{}, error) {
ctx := req.Context().Context
n, err := req.Context().GetNode()
if err != nil {
return nil, err
}
// todo: make async
allKeys, err := n.Blockstore.AllKeysChan(ctx, 0, 0)
if err != nil {
return nil, err
}
piper, pipew := io.Pipe()
eptr := &ErrPassThroughReader{R: piper}
go func() {
defer pipew.Close()
for k := range allKeys {
s := k.Pretty() + "\n"
if _, err := pipew.Write([]byte(s)); err != nil {
log.Error(err)
eptr.SetError(err)
return
}
}
}()

View File

@ -1,42 +1,44 @@
package datastore2
import (
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
delay "github.com/jbenet/go-ipfs/util/delay"
)
func WithDelay(ds datastore.Datastore, delay delay.D) datastore.Datastore {
func WithDelay(ds ds.Datastore, delay delay.D) ds.Datastore {
return &delayed{ds: ds, delay: delay}
}
type delayed struct {
ds datastore.Datastore
ds ds.Datastore
delay delay.D
}
func (dds *delayed) Put(key datastore.Key, value interface{}) (err error) {
func (dds *delayed) Put(key ds.Key, value interface{}) (err error) {
dds.delay.Wait()
return dds.ds.Put(key, value)
}
func (dds *delayed) Get(key datastore.Key) (value interface{}, err error) {
func (dds *delayed) Get(key ds.Key) (value interface{}, err error) {
dds.delay.Wait()
return dds.ds.Get(key)
}
func (dds *delayed) Has(key datastore.Key) (exists bool, err error) {
func (dds *delayed) Has(key ds.Key) (exists bool, err error) {
dds.delay.Wait()
return dds.ds.Has(key)
}
func (dds *delayed) Delete(key datastore.Key) (err error) {
func (dds *delayed) Delete(key ds.Key) (err error) {
dds.delay.Wait()
return dds.ds.Delete(key)
}
func (dds *delayed) KeyList() ([]datastore.Key, error) {
func (dds *delayed) Query(q dsq.Query) (dsq.Results, error) {
dds.delay.Wait()
return dds.ds.KeyList()
return dds.ds.Query(q)
}
var _ datastore.Datastore = &delayed{}
var _ ds.Datastore = &delayed{}

View File

@ -71,7 +71,7 @@ func (k *Key) Loggable() map[string]interface{} {
// KeyFromDsKey returns a Datastore key
func KeyFromDsKey(dsk ds.Key) Key {
return Key(dsk.BaseNamespace())
return Key(dsk.String()[1:])
}
// B58KeyConverter -- for KeyTransform datastores
@ -131,3 +131,10 @@ func XOR(a, b []byte) []byte {
}
return c
}
// KeySlice is used for sorting Keys
type KeySlice []Key
func (es KeySlice) Len() int { return len(es) }
func (es KeySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
func (es KeySlice) Less(i, j int) bool { return es[i] < es[j] }