update go-datastore to latest

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy 2015-09-15 17:15:29 -07:00
parent 973c5fa0a4
commit 35a5ca0ef5
43 changed files with 1361 additions and 65 deletions

6
Godeps/Godeps.json generated
View File

@ -111,6 +111,10 @@
"ImportPath": "github.com/inconshreveable/go-update",
"Rev": "68f5725818189545231c1fd8694793d45f2fc529"
},
{
"ImportPath": "github.com/ipfs/go-log",
"Rev": "ee5cb9834b33bcf29689183e0323e328c8b8de29"
},
{
"ImportPath": "github.com/jackpal/go-nat-pmp",
"Rev": "a45aa3d54aef73b504e15eb71bea0e5565b5e6e1"
@ -129,7 +133,7 @@
},
{
"ImportPath": "github.com/jbenet/go-datastore",
"Rev": "7d6acaf7c0164c335f2ca4100f8fe30a7e2943dd"
"Rev": "c835c30f206c1e97172e428f052e225adab9abde"
},
{
"ImportPath": "github.com/jbenet/go-detect-race",

View File

@ -0,0 +1 @@
QmXJkcEXB6C9h6Ytb6rrUTFU56Ro62zxgrbxTT3dgjQGA8

View File

@ -0,0 +1,38 @@
package log
import (
"errors"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
type key int
const metadataKey key = 0
// ContextWithLoggable returns a derived context which contains the provided
// Loggable. Any Events logged with the derived context will include the
// provided Loggable.
func ContextWithLoggable(ctx context.Context, l Loggable) context.Context {
existing, err := MetadataFromContext(ctx)
if err != nil {
// context does not contain meta. just set the new metadata
child := context.WithValue(ctx, metadataKey, Metadata(l.Loggable()))
return child
}
merged := DeepMerge(existing, l.Loggable())
child := context.WithValue(ctx, metadataKey, merged)
return child
}
func MetadataFromContext(ctx context.Context) (Metadata, error) {
value := ctx.Value(metadataKey)
if value != nil {
metadata, ok := value.(Metadata)
if ok {
return metadata, nil
}
}
return nil, errors.New("context contains no metadata")
}

View File

@ -0,0 +1,44 @@
package log
import (
"testing"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func TestContextContainsMetadata(t *testing.T) {
t.Parallel()
m := Metadata{"foo": "bar"}
ctx := ContextWithLoggable(context.Background(), m)
got, err := MetadataFromContext(ctx)
if err != nil {
t.Fatal(err)
}
_, exists := got["foo"]
if !exists {
t.Fail()
}
}
func TestContextWithPreexistingMetadata(t *testing.T) {
t.Parallel()
ctx := ContextWithLoggable(context.Background(), Metadata{"hello": "world"})
ctx = ContextWithLoggable(ctx, Metadata{"goodbye": "earth"})
got, err := MetadataFromContext(ctx)
if err != nil {
t.Fatal(err)
}
_, exists := got["hello"]
if !exists {
t.Fatal("original key not present")
}
_, exists = got["goodbye"]
if !exists {
t.Fatal("new key not present")
}
}

42
Godeps/_workspace/src/github.com/ipfs/go-log/entry.go generated vendored Normal file
View File

@ -0,0 +1,42 @@
package log
import (
"time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/Sirupsen/logrus"
)
type entry struct {
loggables []Loggable
system string
event string
}
// Log logs the event unconditionally (regardless of log level)
// TODO add support for leveled-logs once we decide which levels we want
// for our structured logs
func (e *entry) Log() {
e.log()
}
// log is a private method invoked by the public Log, Info, Error methods
func (e *entry) log() {
// accumulate metadata
accum := Metadata{}
for _, loggable := range e.loggables {
accum = DeepMerge(accum, loggable.Loggable())
}
// apply final attributes to reserved keys
// TODO accum["level"] = level
accum["event"] = e.event
accum["system"] = e.system
accum["time"] = FormatRFC3339(time.Now())
// TODO roll our own event logger
logrus.WithFields(map[string]interface{}(accum)).Info(e.event)
}
func FormatRFC3339(t time.Time) string {
return t.UTC().Format(time.RFC3339Nano)
}

View File

@ -0,0 +1,16 @@
package log
import "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
func ExampleEventLogger() {
{
log := EventLogger(nil)
e := log.EventBegin(context.Background(), "dial")
e.Done()
}
{
log := EventLogger(nil)
e := log.EventBegin(context.Background(), "dial")
_ = e.Close() // implements io.Closer for convenience
}
}

149
Godeps/_workspace/src/github.com/ipfs/go-log/log.go generated vendored Normal file
View File

@ -0,0 +1,149 @@
package log
import (
"fmt"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
// StandardLogger provides API compatibility with standard printf loggers
// eg. go-logging
type StandardLogger interface {
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Info(args ...interface{})
Infof(format string, args ...interface{})
Panic(args ...interface{})
Panicf(format string, args ...interface{})
Warning(args ...interface{})
Warningf(format string, args ...interface{})
}
// EventLogger extends the StandardLogger interface to allow for log items
// containing structured metadata
type EventLogger interface {
StandardLogger
// Event merges structured data from the provided inputs into a single
// machine-readable log event.
//
// If the context contains metadata, a copy of this is used as the base
// metadata accumulator.
//
// If one or more loggable objects are provided, these are deep-merged into base blob.
//
// Next, the event name is added to the blob under the key "event". If
// the key "event" already exists, it will be over-written.
//
// Finally the timestamp and package name are added to the accumulator and
// the metadata is logged.
Event(ctx context.Context, event string, m ...Loggable)
EventBegin(ctx context.Context, event string, m ...Loggable) *EventInProgress
}
// Logger retrieves an event logger by name
func Logger(system string) EventLogger {
// TODO if we would like to adjust log levels at run-time. Store this event
// logger in a map (just like the util.Logger impl)
if len(system) == 0 {
log.Warnf("Missing name parameter")
system = "undefined"
}
if _, ok := loggers[system]; !ok {
loggers[system] = log.WithField("module", system)
}
logger := loggers[system]
return &eventLogger{system: system, StandardLogger: logger}
}
// eventLogger implements the EventLogger and wraps a go-logging Logger
type eventLogger struct {
StandardLogger
system string
// TODO add log-level
}
func (el *eventLogger) EventBegin(ctx context.Context, event string, metadata ...Loggable) *EventInProgress {
start := time.Now()
el.Event(ctx, fmt.Sprintf("%sBegin", event), metadata...)
eip := &EventInProgress{}
eip.doneFunc = func(additional []Loggable) {
metadata = append(metadata, additional...) // anything added during the operation
metadata = append(metadata, LoggableMap(map[string]interface{}{ // finally, duration of event
"duration": time.Now().Sub(start),
}))
el.Event(ctx, event, metadata...)
}
return eip
}
func (el *eventLogger) Event(ctx context.Context, event string, metadata ...Loggable) {
// short circuit if theres nothing to write to
if !WriterGroup.Active() {
return
}
// Collect loggables for later logging
var loggables []Loggable
// get any existing metadata from the context
existing, err := MetadataFromContext(ctx)
if err != nil {
existing = Metadata{}
}
loggables = append(loggables, existing)
for _, datum := range metadata {
loggables = append(loggables, datum)
}
e := entry{
loggables: loggables,
system: el.system,
event: event,
}
e.Log() // TODO replace this when leveled-logs have been implemented
}
type EventInProgress struct {
loggables []Loggable
doneFunc func([]Loggable)
}
// Append adds loggables to be included in the call to Done
func (eip *EventInProgress) Append(l Loggable) {
eip.loggables = append(eip.loggables, l)
}
// SetError includes the provided error
func (eip *EventInProgress) SetError(err error) {
eip.loggables = append(eip.loggables, LoggableMap{
"error": err.Error(),
})
}
// Done creates a new Event entry that includes the duration and appended
// loggables.
func (eip *EventInProgress) Done() {
eip.doneFunc(eip.loggables) // create final event with extra data
}
// Close is an alias for done
func (eip *EventInProgress) Close() error {
eip.Done()
return nil
}

View File

@ -0,0 +1,34 @@
package log
// Loggable describes objects that can be marshalled into Metadata for logging
type Loggable interface {
Loggable() map[string]interface{}
}
type LoggableMap map[string]interface{}
func (l LoggableMap) Loggable() map[string]interface{} {
return l
}
// LoggableF converts a func into a Loggable
type LoggableF func() map[string]interface{}
func (l LoggableF) Loggable() map[string]interface{} {
return l()
}
func Deferred(key string, f func() string) Loggable {
function := func() map[string]interface{} {
return map[string]interface{}{
key: f(),
}
}
return LoggableF(function)
}
func Pair(key string, l Loggable) Loggable {
return LoggableMap{
key: l,
}
}

View File

@ -0,0 +1,82 @@
package log
import (
"encoding/json"
"errors"
"reflect"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/satori/go.uuid"
)
// Metadata is a convenience type for generic maps
type Metadata map[string]interface{}
// Uuid returns a Metadata with the string key and UUID value
func Uuid(key string) Metadata {
return Metadata{
key: uuid.NewV4().String(),
}
}
// DeepMerge merges the second Metadata parameter into the first.
// Nested Metadata are merged recursively. Primitives are over-written.
func DeepMerge(b, a Metadata) Metadata {
out := Metadata{}
for k, v := range b {
out[k] = v
}
for k, v := range a {
maybe, err := Metadatify(v)
if err != nil {
// if the new value is not meta. just overwrite the dest vaue
out[k] = v
continue
}
// it is meta. What about dest?
outv, exists := out[k]
if !exists {
// the new value is meta, but there's no dest value. just write it
out[k] = v
continue
}
outMetadataValue, err := Metadatify(outv)
if err != nil {
// the new value is meta and there's a dest value, but the dest
// value isn't meta. just overwrite
out[k] = v
continue
}
// both are meta. merge them.
out[k] = DeepMerge(outMetadataValue, maybe)
}
return out
}
// Loggable implements the Loggable interface
func (m Metadata) Loggable() map[string]interface{} {
// NB: method defined on value to avoid de-referencing nil Metadata
return m
}
func (m Metadata) JsonString() (string, error) {
// NB: method defined on value
b, err := json.Marshal(m)
return string(b), err
}
// Metadatify converts maps into Metadata
func Metadatify(i interface{}) (Metadata, error) {
value := reflect.ValueOf(i)
if value.Kind() == reflect.Map {
m := map[string]interface{}{}
for _, k := range value.MapKeys() {
m[k.String()] = value.MapIndex(k).Interface()
}
return Metadata(m), nil
}
return nil, errors.New("is not a map")
}

View File

@ -0,0 +1,50 @@
package log
import "testing"
func TestOverwrite(t *testing.T) {
t.Parallel()
under := Metadata{
"a": Metadata{
"b": Metadata{
"c": Metadata{
"d": "the original value",
"other": "SURVIVE",
},
},
},
}
over := Metadata{
"a": Metadata{
"b": Metadata{
"c": Metadata{
"d": "a new value",
},
},
},
}
out := DeepMerge(under, over)
dval := out["a"].(Metadata)["b"].(Metadata)["c"].(Metadata)["d"].(string)
if dval != "a new value" {
t.Fatal(dval)
}
surv := out["a"].(Metadata)["b"].(Metadata)["c"].(Metadata)["other"].(string)
if surv != "SURVIVE" {
t.Fatal(surv)
}
}
func TestMarshalJSON(t *testing.T) {
t.Parallel()
bs, _ := Metadata{"a": "b"}.JsonString()
t.Log(bs)
}
func TestMetadataIsLoggable(t *testing.T) {
t.Parallel()
func(l Loggable) {
}(Metadata{})
}

97
Godeps/_workspace/src/github.com/ipfs/go-log/oldlog.go generated vendored Normal file
View File

@ -0,0 +1,97 @@
package log
import (
"errors"
"os"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/Sirupsen/logrus"
)
func init() {
SetupLogging()
}
var log = logrus.New()
// LogFormats is a map of formats used for our logger, keyed by name.
// TODO: write custom TextFormatter (don't print module=name explicitly) and
// fork logrus to add shortfile
var LogFormats = map[string]*logrus.TextFormatter{
"nocolor": {DisableColors: true, FullTimestamp: true, TimestampFormat: "2006-01-02 15:04:05.000000", DisableSorting: true},
"color": {DisableColors: false, FullTimestamp: true, TimestampFormat: "15:04:05:000", DisableSorting: true},
}
var defaultLogFormat = "color"
// Logging environment variables
const (
envLogging = "IPFS_LOGGING"
envLoggingFmt = "IPFS_LOGGING_FMT"
)
// ErrNoSuchLogger is returned when the util pkg is asked for a non existant logger
var ErrNoSuchLogger = errors.New("Error: No such logger")
// loggers is the set of loggers in the system
var loggers = map[string]*logrus.Entry{}
// SetupLogging will initialize the logger backend and set the flags.
func SetupLogging() {
format, ok := LogFormats[os.Getenv(envLoggingFmt)]
if !ok {
format = LogFormats[defaultLogFormat]
}
log.Out = os.Stderr
log.Formatter = format
lvl := logrus.ErrorLevel
if logenv := os.Getenv(envLogging); logenv != "" {
var err error
lvl, err = logrus.ParseLevel(logenv)
if err != nil {
log.Debugf("logrus.ParseLevel() Error: %q", err)
lvl = logrus.ErrorLevel // reset to ERROR, could be undefined now(?)
}
}
SetAllLoggers(lvl)
}
// SetDebugLogging calls SetAllLoggers with logrus.DebugLevel
func SetDebugLogging() {
SetAllLoggers(logrus.DebugLevel)
}
// SetAllLoggers changes the logrus.Level of all loggers to lvl
func SetAllLoggers(lvl logrus.Level) {
log.Level = lvl
for _, logger := range loggers {
logger.Level = lvl
}
}
// SetLogLevel changes the log level of a specific subsystem
// name=="*" changes all subsystems
func SetLogLevel(name, level string) error {
lvl, err := logrus.ParseLevel(level)
if err != nil {
return err
}
// wildcard, change all
if name == "*" {
SetAllLoggers(lvl)
return nil
}
// Check if we have a logger by that name
if _, ok := loggers[name]; !ok {
return ErrNoSuchLogger
}
loggers[name].Level = lvl
return nil
}

61
Godeps/_workspace/src/github.com/ipfs/go-log/option.go generated vendored Normal file
View File

@ -0,0 +1,61 @@
package log
import (
"io"
"os"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/Sirupsen/logrus"
)
// init sets up sane defaults
func init() {
Configure(TextFormatter)
Configure(Output(os.Stderr))
// has the effect of disabling logging since we log event entries at Info
// level by convention
Configure(LevelError)
}
// Global writer group for logs to output to
var WriterGroup = new(MirrorWriter)
type Option func()
// Configure applies the provided options sequentially from left to right
func Configure(options ...Option) {
for _, f := range options {
f()
}
}
// LdJSONFormatter Option formats the event log as line-delimited JSON
var LdJSONFormatter = func() {
logrus.SetFormatter(&PoliteJSONFormatter{})
}
// TextFormatter Option formats the event log as human-readable plain-text
var TextFormatter = func() {
logrus.SetFormatter(&logrus.TextFormatter{})
}
func Output(w io.Writer) Option {
return func() {
logrus.SetOutput(w)
// TODO return previous Output option
}
}
// LevelDebug Option sets the log level to debug
var LevelDebug = func() {
logrus.SetLevel(logrus.DebugLevel)
}
// LevelDebug Option sets the log level to error
var LevelError = func() {
logrus.SetLevel(logrus.ErrorLevel)
}
// LevelDebug Option sets the log level to info
var LevelInfo = func() {
logrus.SetLevel(logrus.InfoLevel)
}

View File

@ -0,0 +1,5 @@
{
"name": "go-log",
"version": "1.0.0",
"language": "go"
}

View File

@ -0,0 +1,20 @@
package log
import (
"encoding/json"
"fmt"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/Sirupsen/logrus"
)
// PoliteJSONFormatter marshals entries into JSON encoded slices (without
// overwriting user-provided keys). How polite of it!
type PoliteJSONFormatter struct{}
func (f *PoliteJSONFormatter) Format(entry *logrus.Entry) ([]byte, error) {
serialized, err := json.Marshal(entry.Data)
if err != nil {
return nil, fmt.Errorf("Failed to marshal fields to JSON, %v", err)
}
return append(serialized, '\n'), nil
}

53
Godeps/_workspace/src/github.com/ipfs/go-log/writer.go generated vendored Normal file
View File

@ -0,0 +1,53 @@
package log
import (
"io"
"sync"
)
type MirrorWriter struct {
writers []io.Writer
lk sync.Mutex
}
func (mw *MirrorWriter) Write(b []byte) (int, error) {
mw.lk.Lock()
// write to all writers, and nil out the broken ones.
for i, w := range mw.writers {
_, err := w.Write(b)
if err != nil {
mw.writers[i] = nil
}
}
// consolidate the slice
for i := 0; i < len(mw.writers); i++ {
if mw.writers[i] != nil {
continue
}
j := len(mw.writers) - 1
for ; j > i; j-- {
if mw.writers[j] != nil {
mw.writers[i], mw.writers[j] = mw.writers[j], nil // swap
break
}
}
mw.writers = mw.writers[:j]
}
mw.lk.Unlock()
return len(b), nil
}
func (mw *MirrorWriter) AddWriter(w io.Writer) {
mw.lk.Lock()
mw.writers = append(mw.writers, w)
mw.lk.Unlock()
}
func (mw *MirrorWriter) Active() (active bool) {
mw.lk.Lock()
active = len(mw.writers) > 0
mw.lk.Unlock()
return
}

View File

@ -1,10 +1,15 @@
{
"ImportPath": "github.com/jbenet/go-datastore",
"GoVersion": "go1.4.2",
"GoVersion": "go1.5",
"Packages": [
"./..."
],
"Deps": [
{
"ImportPath": "github.com/Sirupsen/logrus",
"Comment": "v0.8.3-37-g418b41d",
"Rev": "418b41d23a1bf978c06faea5313ba194650ac088"
},
{
"ImportPath": "github.com/codahale/blake2",
"Rev": "3fa823583afba430e8fc7cdbcc670dbf90bfacc4"
@ -21,10 +26,19 @@
"ImportPath": "github.com/dustin/randbo",
"Rev": "7f1b564ca7242d22bcc6e2128beb90d9fa38b9f0"
},
{
"ImportPath": "github.com/fzzy/radix/redis",
"Comment": "v0.5.1",
"Rev": "27a863cdffdb0998d13e1e11992b18489aeeaa25"
},
{
"ImportPath": "github.com/hashicorp/golang-lru",
"Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370"
},
{
"ImportPath": "github.com/ipfs/go-log",
"Rev": "ee5cb9834b33bcf29689183e0323e328c8b8de29"
},
{
"ImportPath": "github.com/jbenet/go-os-rename",
"Rev": "2d93ae970ba96c41f717036a5bf5494faf1f38c0"
@ -53,6 +67,10 @@
"ImportPath": "github.com/syndtr/gosnappy/snappy",
"Rev": "ce8acff4829e0c2458a67ead32390ac0a381c862"
},
{
"ImportPath": "golang.org/x/net/context",
"Rev": "dfcbca9c45aeabb8971affa4f76b2d40f6f72328"
},
{
"ImportPath": "gopkg.in/check.v1",
"Rev": "91ae5f88a67b14891cfd43895b01164f6c120420"

View File

@ -1,6 +1,7 @@
package datastore
import (
"io"
"log"
dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
@ -67,6 +68,10 @@ func (d *MapDatastore) Batch() (Batch, error) {
return NewBasicBatch(d), nil
}
func (d *MapDatastore) Close() error {
return nil
}
// NullDatastore stores nothing, but conforms to the API.
// Useful to test with.
type NullDatastore struct {
@ -106,6 +111,10 @@ func (d *NullDatastore) Batch() (Batch, error) {
return NewBasicBatch(d), nil
}
func (d *NullDatastore) Close() error {
return nil
}
// LogDatastore logs all accesses through the datastore.
type LogDatastore struct {
Name string
@ -165,9 +174,16 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
func (d *LogDatastore) Batch() (Batch, error) {
log.Printf("%s: Batch\n", d.Name)
bds, ok := d.child.(BatchingDatastore)
if !ok {
return nil, ErrBatchUnsupported
if bds, ok := d.child.(Batching); ok {
return bds.Batch()
}
return bds.Batch()
return nil, ErrBatchUnsupported
}
func (d *LogDatastore) Close() error {
log.Printf("%s: Close\n", d.Name)
if cds, ok := d.child.(io.Closer); ok {
return cds.Close()
}
return nil
}

View File

@ -1,6 +1,7 @@
package coalesce
import (
"io"
"sync"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
@ -124,3 +125,16 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// query not coalesced yet.
return d.child.Query(q)
}
func (d *datastore) Close() error {
d.reqmu.Lock()
defer d.reqmu.Unlock()
for _, s := range d.req {
<-s.done
}
if c, ok := d.child.(io.Closer); ok {
return c.Close()
}
return nil
}

View File

@ -69,7 +69,7 @@ type Datastore interface {
Query(q query.Query) (query.Results, error)
}
type BatchingDatastore interface {
type Batching interface {
Datastore
Batch() (Batch, error)

View File

@ -10,12 +10,16 @@ import (
"os"
"path"
"strings"
"time"
logging "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-log"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-os-rename"
)
var log = logging.Logger("flatfs")
const (
extension = ".data"
maxPrefixLen = 16
@ -93,12 +97,32 @@ func (fs *Datastore) makePrefixDirNoSync(dir string) error {
return nil
}
var putMaxRetries = 3
func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
val, ok := value.([]byte)
if !ok {
return datastore.ErrInvalidType
}
var err error
for i := 0; i < putMaxRetries; i++ {
err = fs.doPut(key, val)
if err == nil {
return nil
}
if !strings.Contains(err.Error(), "too many open files") {
return err
}
log.Error("too many open files, retrying in %dms", 100*i)
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
return err
}
func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
dir, path := fs.encode(key)
if err := fs.makePrefixDir(dir); err != nil {
return err
@ -323,6 +347,10 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return res, nil
}
func (fs *Datastore) Close() error {
return nil
}
type flatfsBatch struct {
puts map[datastore.Key]interface{}
deletes map[datastore.Key]struct{}

View File

@ -149,3 +149,11 @@ func isFile(path string) bool {
return !finfo.IsDir()
}
func (d *Datastore) Close() error {
return nil
}
func (d *Datastore) Batch() (ds.Batch, error) {
return ds.NewBasicBatch(d), nil
}

View File

@ -16,14 +16,12 @@ type KeyTransform interface {
type Datastore interface {
ds.Shim
KeyTransform
Batch() (ds.Batch, error)
}
// Wrap wraps a given datastore with a KeyTransform function.
// The resulting wrapped datastore will use the transform on all Datastore
// operations.
func Wrap(child ds.Datastore, t KeyTransform) Datastore {
func Wrap(child ds.Datastore, t KeyTransform) *ktds {
if t == nil {
panic("t (KeyTransform) is nil")
}

View File

@ -1,6 +1,8 @@
package keytransform
import (
"io"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
@ -74,8 +76,15 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil
}
func (d *ktds) Close() error {
if c, ok := d.child.(io.Closer); ok {
return c.Close()
}
return nil
}
func (d *ktds) Batch() (ds.Batch, error) {
bds, ok := d.child.(ds.BatchingDatastore)
bds, ok := d.child.(ds.Batching)
if !ok {
return nil, ds.ErrBatchUnsupported
}

View File

@ -1,8 +1,6 @@
package leveldb
import (
"io"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
@ -11,18 +9,13 @@ import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util"
)
type Datastore interface {
ds.ThreadSafeDatastore
io.Closer
}
type datastore struct {
DB *leveldb.DB
}
type Options opt.Options
func NewDatastore(path string, opts *Options) (Datastore, error) {
func NewDatastore(path string, opts *Options) (*datastore, error) {
var nopts opt.Options
if opts != nil {
nopts = opt.Options(*opts)
@ -148,6 +141,11 @@ func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
}
}
func (d *datastore) Batch() (ds.Batch, error) {
// TODO: implement batch on leveldb
return nil, ds.ErrBatchUnsupported
}
// LevelDB needs to be closed.
func (d *datastore) Close() (err error) {
return d.DB.Close()

View File

@ -25,7 +25,7 @@ var testcases = map[string]string{
//
// d, close := newDS(t)
// defer close()
func newDS(t *testing.T) (Datastore, func()) {
func newDS(t *testing.T) (*datastore, func()) {
path, err := ioutil.TempDir("/tmp", "testing_leveldb_")
if err != nil {
t.Fatal(err)
@ -41,7 +41,7 @@ func newDS(t *testing.T) (Datastore, func()) {
}
}
func addTestCases(t *testing.T, d Datastore, testcases map[string]string) {
func addTestCases(t *testing.T, d *datastore, testcases map[string]string) {
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {

View File

@ -54,3 +54,11 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return nil, errors.New("KeyList not implemented.")
}
func (d *Datastore) Close() error {
return nil
}
func (d *Datastore) Batch() (ds.Batch, error) {
return nil, ds.ErrBatchUnsupported
}

View File

@ -3,6 +3,7 @@
package measure
import (
"io"
"time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/codahale/metrics"
@ -18,17 +19,12 @@ const (
maxSize = int64(1 << 32)
)
type DatastoreCloser interface {
datastore.Datastore
Close() error
}
// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
//
// If prefix is not unique, New will panic. Call Close to release the
// prefix.
func New(prefix string, ds datastore.Datastore) DatastoreCloser {
func New(prefix string, ds datastore.Datastore) *measure {
m := &measure{
backend: ds,
@ -84,7 +80,6 @@ type measure struct {
}
var _ datastore.Datastore = (*measure)(nil)
var _ DatastoreCloser = (*measure)(nil)
func recordLatency(h *metrics.Histogram, start time.Time) {
elapsed := time.Now().Sub(start) / time.Microsecond
@ -159,7 +154,7 @@ type measuredBatch struct {
}
func (m *measure) Batch() (datastore.Batch, error) {
bds, ok := m.backend.(datastore.BatchingDatastore)
bds, ok := m.backend.(datastore.Batching)
if !ok {
return nil, datastore.ErrBatchUnsupported
}
@ -245,5 +240,9 @@ func (m *measure) Close() error {
m.queryNum.Remove()
m.queryErr.Remove()
m.queryLatency.Remove()
if c, ok := m.backend.(io.Closer); ok {
return c.Close()
}
return nil
}

View File

@ -4,6 +4,7 @@ package mount
import (
"errors"
"io"
"strings"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
@ -115,6 +116,18 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
return r, nil
}
func (d *Datastore) Close() error {
for _, d := range d.mounts {
if c, ok := d.Datastore.(io.Closer); ok {
err := c.Close()
if err != nil {
return err
}
}
}
return nil
}
type mountBatch struct {
mounts map[string]datastore.Batch
@ -132,7 +145,7 @@ func (mt *mountBatch) lookupBatch(key datastore.Key) (datastore.Batch, datastore
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
bds, ok := child.(datastore.BatchingDatastore)
bds, ok := child.(datastore.Batching)
if !ok {
return nil, datastore.NewKey(""), datastore.ErrBatchUnsupported
}

View File

@ -36,7 +36,7 @@ func PrefixTransform(prefix ds.Key) ktds.KeyTransform {
}
// Wrap wraps a given datastore with a key-prefix.
func Wrap(child ds.Datastore, prefix ds.Key) ktds.Datastore {
func Wrap(child ds.Datastore, prefix ds.Key) *datastore {
if child == nil {
panic("child (ds.Datastore) is nil")
}
@ -81,3 +81,11 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil
}
func (d *datastore) Batch() (ds.Batch, error) {
if bds, ok := d.Datastore.(ds.Batching); ok {
return bds.Batch()
}
return nil, ds.ErrBatchUnsupported
}

View File

@ -2,6 +2,7 @@ package sync
import (
"fmt"
"io"
"os"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
@ -67,6 +68,26 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return r, nil
}
func (d *datastore) Close() error {
if c, ok := d.child.(io.Closer); ok {
err := c.Close()
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: Close failed")
}
}
return nil
}
func (d *datastore) Batch() (ds.Batch, error) {
b, err := d.child.(ds.Batching).Batch()
if err != nil {
return nil, err
}
return &panicBatch{b}, nil
}
type panicBatch struct {
t ds.Batch
}

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/fzzy/radix/redis"
datastore "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
query "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
@ -17,14 +16,14 @@ var _ datastore.ThreadSafeDatastore = &Datastore{}
var ErrInvalidType = errors.New("redis datastore: invalid type error. this datastore only supports []byte values")
func NewExpiringDatastore(client *redis.Client, ttl time.Duration) (datastore.ThreadSafeDatastore, error) {
func NewExpiringDatastore(client *redis.Client, ttl time.Duration) (*Datastore, error) {
return &Datastore{
client: client,
ttl: ttl,
}, nil
}
func NewDatastore(client *redis.Client) (datastore.ThreadSafeDatastore, error) {
func NewDatastore(client *redis.Client) (*Datastore, error) {
return &Datastore{
client: client,
}, nil
@ -83,3 +82,11 @@ func (ds *Datastore) Query(q query.Query) (query.Results, error) {
}
func (ds *Datastore) IsThreadSafe() {}
func (ds *Datastore) Batch() (datastore.Batch, error) {
return nil, datastore.ErrBatchUnsupported
}
func (ds *Datastore) Close() error {
return ds.client.Close()
}

View File

@ -8,6 +8,7 @@ import (
"github.com/fzzy/radix/redis"
datastore "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dstest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/test"
)

View File

@ -1,6 +1,7 @@
package sync
import (
"io"
"sync"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
@ -17,7 +18,7 @@ type MutexDatastore struct {
// MutexWrap constructs a datastore with a coarse lock around
// the entire datastore, for every single operation
func MutexWrap(d ds.Datastore) ds.ThreadSafeDatastore {
func MutexWrap(d ds.Datastore) *MutexDatastore {
return &MutexDatastore{child: d}
}
@ -67,7 +68,7 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
func (d *MutexDatastore) Batch() (ds.Batch, error) {
d.RLock()
defer d.RUnlock()
bds, ok := d.child.(ds.BatchingDatastore)
bds, ok := d.child.(ds.Batching)
if !ok {
return nil, ds.ErrBatchUnsupported
}
@ -81,6 +82,15 @@ func (d *MutexDatastore) Batch() (ds.Batch, error) {
}, nil
}
func (d *MutexDatastore) Close() error {
d.RWMutex.Lock()
defer d.RWMutex.Unlock()
if c, ok := d.child.(io.Closer); ok {
return c.Close()
}
return nil
}
type syncBatch struct {
lk sync.Mutex
batch ds.Batch

View File

@ -0,0 +1,198 @@
// Package mount provides a Datastore that has other Datastores
// mounted at various key prefixes and is threadsafe
package syncmount
import (
"errors"
"io"
"strings"
"sync"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
var (
ErrNoMount = errors.New("no datastore mounted for this key")
)
type Mount struct {
Prefix ds.Key
Datastore ds.Datastore
}
func New(mounts []Mount) *Datastore {
// make a copy so we're sure it doesn't mutate
m := make([]Mount, len(mounts))
for i, v := range mounts {
m[i] = v
}
return &Datastore{mounts: m}
}
type Datastore struct {
mounts []Mount
lk sync.Mutex
}
var _ ds.Datastore = (*Datastore)(nil)
func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
d.lk.Lock()
defer d.lk.Unlock()
for _, m := range d.mounts {
if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) {
s := strings.TrimPrefix(key.String(), m.Prefix.String())
k := ds.NewKey(s)
return m.Datastore, m.Prefix, k
}
}
return nil, ds.NewKey("/"), key
}
func (d *Datastore) Put(key ds.Key, value interface{}) error {
cds, _, k := d.lookup(key)
if cds == nil {
return ErrNoMount
}
return cds.Put(k, value)
}
func (d *Datastore) Get(key ds.Key) (value interface{}, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
return nil, ds.ErrNotFound
}
return cds.Get(k)
}
func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
return false, nil
}
return cds.Has(k)
}
func (d *Datastore) Delete(key ds.Key) error {
cds, _, k := d.lookup(key)
if cds == nil {
return ds.ErrNotFound
}
return cds.Delete(k)
}
func (d *Datastore) Query(q query.Query) (query.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is overly simplistic, but the only caller is
// `ipfs refs local` for now, and this gets us moving.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
}
key := ds.NewKey(q.Prefix)
cds, mount, k := d.lookup(key)
if cds == nil {
return nil, errors.New("mount only supports listing a mount point")
}
// TODO support listing cross mount points too
// delegate the query to the mounted datastore, while adjusting
// keys in and out
q2 := q
q2.Prefix = k.String()
wrapDS := keytransform.Wrap(cds, &keytransform.Pair{
Convert: func(ds.Key) ds.Key {
panic("this should never be called")
},
Invert: func(k ds.Key) ds.Key {
return mount.Child(k)
},
})
r, err := wrapDS.Query(q2)
if err != nil {
return nil, err
}
r = query.ResultsReplaceQuery(r, q)
return r, nil
}
func (d *Datastore) IsThreadSafe() {}
func (d *Datastore) Close() error {
for _, d := range d.mounts {
if c, ok := d.Datastore.(io.Closer); ok {
err := c.Close()
if err != nil {
return err
}
}
}
return nil
}
type mountBatch struct {
mounts map[string]ds.Batch
lk sync.Mutex
d *Datastore
}
func (d *Datastore) Batch() (ds.Batch, error) {
return &mountBatch{
mounts: make(map[string]ds.Batch),
d: d,
}, nil
}
func (mt *mountBatch) lookupBatch(key ds.Key) (ds.Batch, ds.Key, error) {
mt.lk.Lock()
defer mt.lk.Unlock()
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
bds, ok := child.(ds.Batching)
if !ok {
return nil, ds.NewKey(""), ds.ErrBatchUnsupported
}
var err error
t, err = bds.Batch()
if err != nil {
return nil, ds.NewKey(""), err
}
mt.mounts[loc.String()] = t
}
return t, rest, nil
}
func (mt *mountBatch) Put(key ds.Key, val interface{}) error {
t, rest, err := mt.lookupBatch(key)
if err != nil {
return err
}
return t.Put(rest, val)
}
func (mt *mountBatch) Delete(key ds.Key) error {
t, rest, err := mt.lookupBatch(key)
if err != nil {
return err
}
return t.Delete(rest)
}
func (mt *mountBatch) Commit() error {
for _, t := range mt.mounts {
err := t.Commit()
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,241 @@
package syncmount_test
import (
"testing"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query"
)
func TestPutBadNothing(t *testing.T) {
m := mount.New(nil)
err := m.Put(datastore.NewKey("quux"), []byte("foobar"))
if g, e := err, mount.ErrNoMount; g != e {
t.Fatalf("Put got wrong error: %v != %v", g, e)
}
}
func TestPutBadNoMount(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/redherring"), Datastore: mapds},
})
err := m.Put(datastore.NewKey("/quux/thud"), []byte("foobar"))
if g, e := err, mount.ErrNoMount; g != e {
t.Fatalf("expected ErrNoMount, got: %v\n", g)
}
}
func TestPut(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
if err := m.Put(datastore.NewKey("/quux/thud"), []byte("foobar")); err != nil {
t.Fatalf("Put error: %v", err)
}
val, err := mapds.Get(datastore.NewKey("/thud"))
if err != nil {
t.Fatalf("Get error: %v", err)
}
buf, ok := val.([]byte)
if !ok {
t.Fatalf("Get value is not []byte: %T %v", val, val)
}
if g, e := string(buf), "foobar"; g != e {
t.Errorf("wrong value: %q != %q", g, e)
}
}
func TestGetBadNothing(t *testing.T) {
m := mount.New([]mount.Mount{})
_, err := m.Get(datastore.NewKey("/quux/thud"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestGetBadNoMount(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/redherring"), Datastore: mapds},
})
_, err := m.Get(datastore.NewKey("/quux/thud"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestGetNotFound(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
_, err := m.Get(datastore.NewKey("/quux/thud"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestGet(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil {
t.Fatalf("Get error: %v", err)
}
val, err := m.Get(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Put error: %v", err)
}
buf, ok := val.([]byte)
if !ok {
t.Fatalf("Get value is not []byte: %T %v", val, val)
}
if g, e := string(buf), "foobar"; g != e {
t.Errorf("wrong value: %q != %q", g, e)
}
}
func TestHasBadNothing(t *testing.T) {
m := mount.New([]mount.Mount{})
found, err := m.Has(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestHasBadNoMount(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/redherring"), Datastore: mapds},
})
found, err := m.Has(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestHasNotFound(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
found, err := m.Has(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestHas(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil {
t.Fatalf("Put error: %v", err)
}
found, err := m.Has(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, true; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestDeleteNotFound(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
err := m.Delete(datastore.NewKey("/quux/thud"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestDelete(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil {
t.Fatalf("Put error: %v", err)
}
err := m.Delete(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Delete error: %v", err)
}
// make sure it disappeared
found, err := mapds.Has(datastore.NewKey("/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestQuerySimple(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
const myKey = "/quux/thud"
if err := m.Put(datastore.NewKey(myKey), []byte("foobar")); err != nil {
t.Fatalf("Put error: %v", err)
}
res, err := m.Query(query.Query{Prefix: "/quux"})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
seen := false
for _, e := range entries {
switch e.Key {
case datastore.NewKey(myKey).String():
seen = true
default:
t.Errorf("saw unexpected key: %q", e.Key)
}
}
if !seen {
t.Errorf("did not see wanted key %q in %+v", myKey, entries)
}
}

View File

@ -9,7 +9,7 @@ import (
dstore "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
)
func RunBatchTest(t *testing.T, ds dstore.BatchingDatastore) {
func RunBatchTest(t *testing.T, ds dstore.Batching) {
batch, err := ds.Batch()
if err != nil {
t.Fatal(err)
@ -58,7 +58,7 @@ func RunBatchTest(t *testing.T, ds dstore.BatchingDatastore) {
}
}
func RunBatchDeleteTest(t *testing.T, ds dstore.BatchingDatastore) {
func RunBatchDeleteTest(t *testing.T, ds dstore.Batching) {
r := rand.New()
var keys []dstore.Key
for i := 0; i < 20; i++ {

View File

@ -13,7 +13,7 @@ type tiered []ds.Datastore
// New returns a tiered datastore. Puts and Deletes will write-through to
// all datastores, Has and Get will try each datastore sequentially, and
// Query will always try the last one (most complete) first.
func New(dses ...ds.Datastore) ds.Datastore {
func New(dses ...ds.Datastore) tiered {
return tiered(dses)
}

View File

@ -49,19 +49,19 @@ func TestTiered(t *testing.T) {
td := New(d1, d2, d3, d4)
td.Put(ds.NewKey("foo"), "bar")
testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar")
testHas(t, td.(tiered), ds.NewKey("foo"), "bar") // all children
testHas(t, td, ds.NewKey("foo"), "bar") // all children
// remove it from, say, caches.
d1.Delete(ds.NewKey("foo"))
d2.Delete(ds.NewKey("foo"))
testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar")
testHas(t, td.(tiered)[2:], ds.NewKey("foo"), "bar")
testNotHas(t, td.(tiered)[:2], ds.NewKey("foo"))
testHas(t, td[2:], ds.NewKey("foo"), "bar")
testNotHas(t, td[:2], ds.NewKey("foo"))
// write it again.
td.Put(ds.NewKey("foo"), "bar2")
testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar2")
testHas(t, td.(tiered), ds.NewKey("foo"), "bar2")
testHas(t, td, ds.NewKey("foo"), "bar2")
}
func TestQueryCallsLast(t *testing.T) {

View File

@ -1,6 +1,7 @@
package timecache
import (
"io"
"sync"
"time"
@ -24,13 +25,13 @@ type datastore struct {
ttls map[ds.Key]time.Time
}
func WithTTL(ttl time.Duration) ds.Datastore {
func WithTTL(ttl time.Duration) *datastore {
return WithCache(ds.NewMapDatastore(), ttl)
}
// WithCache wraps a given datastore as a timecache.
// Get + Has requests are considered expired after a TTL.
func WithCache(d ds.Datastore, ttl time.Duration) ds.Datastore {
func WithCache(d ds.Datastore, ttl time.Duration) *datastore {
return &datastore{cache: d, ttl: ttl, ttls: make(map[ds.Key]time.Time)}
}
@ -94,3 +95,10 @@ func (d *datastore) Delete(key ds.Key) (err error) {
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.cache.Query(q)
}
func (d *datastore) Close() error {
if c, ok := d.cache.(io.Closer); ok {
return c.Close()
}
return nil
}

View File

@ -43,7 +43,7 @@ func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
}
type blockstore struct {
datastore ds.BatchingDatastore
datastore ds.Batching
// cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it.
// we do check it on `NewBlockstore` though.
}

View File

@ -94,10 +94,6 @@ type FSRepo struct {
lockfile io.Closer
config *config.Config
ds ds.ThreadSafeDatastore
// tracked separately for use in Close; do not use directly.
leveldbDS levelds.Datastore
metricsBlocks measure.DatastoreCloser
metricsLevelDB measure.DatastoreCloser
}
var _ repo.Repo = (*FSRepo)(nil)
@ -352,7 +348,7 @@ func (r *FSRepo) openDatastore() error {
leveldbPath := path.Join(r.path, leveldbDirectory)
var err error
// save leveldb reference so it can be neatly closed afterward
r.leveldbDS, err = levelds.NewDatastore(leveldbPath, &levelds.Options{
leveldbDS, err := levelds.NewDatastore(leveldbPath, &levelds.Options{
Compression: ldbopts.NoCompression,
})
if err != nil {
@ -382,16 +378,16 @@ func (r *FSRepo) openDatastore() error {
id = fmt.Sprintf("uninitialized_%p", r)
}
prefix := "fsrepo." + id + ".datastore."
r.metricsBlocks = measure.New(prefix+"blocks", blocksDS)
r.metricsLevelDB = measure.New(prefix+"leveldb", r.leveldbDS)
metricsBlocks := measure.New(prefix+"blocks", blocksDS)
metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS)
mountDS := mount.New([]mount.Mount{
{
Prefix: ds.NewKey("/blocks"),
Datastore: r.metricsBlocks,
Datastore: metricsBlocks,
},
{
Prefix: ds.NewKey("/"),
Datastore: r.metricsLevelDB,
Datastore: metricsLevelDB,
},
})
// Make sure it's ok to claim the virtual datastore from mount as
@ -400,7 +396,7 @@ func (r *FSRepo) openDatastore() error {
// variants. This is the same dilemma as the `[].byte` attempt at
// introducing const types to Go.
var _ ds.ThreadSafeDatastore = blocksDS
var _ ds.ThreadSafeDatastore = r.leveldbDS
var _ ds.ThreadSafeDatastore = leveldbDS
r.ds = ds2.ClaimThreadSafe{mountDS}
return nil
}
@ -420,13 +416,7 @@ func (r *FSRepo) Close() error {
return errors.New("repo is closed")
}
if err := r.metricsBlocks.Close(); err != nil {
return err
}
if err := r.metricsLevelDB.Close(); err != nil {
return err
}
if err := r.leveldbDS.Close(); err != nil {
if err := r.ds.(io.Closer).Close(); err != nil {
return err
}

View File

@ -26,7 +26,7 @@ func (w *datastoreCloserWrapper) Close() error {
}
func (w *datastoreCloserWrapper) Batch() (datastore.Batch, error) {
bds, ok := w.ThreadSafeDatastore.(datastore.BatchingDatastore)
bds, ok := w.ThreadSafeDatastore.(datastore.Batching)
if !ok {
return nil, datastore.ErrBatchUnsupported
}

View File

@ -1,15 +1,22 @@
package datastore2
import (
"io"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
)
// ClaimThreadSafe claims that a Datastore is threadsafe, even when
// it's type does not guarantee this. Use carefully.
type ClaimThreadSafe struct {
datastore.BatchingDatastore
datastore.Batching
}
var _ datastore.ThreadSafeDatastore = ClaimThreadSafe{}
func (ClaimThreadSafe) IsThreadSafe() {}
// TEMP UNTIL dev0.4.0 merges and solves this ugly interface stuff
func (c ClaimThreadSafe) Close() error {
return c.Batching.(io.Closer).Close()
}