diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index d517ba39e..04ea1e110 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -111,6 +111,10 @@ "ImportPath": "github.com/inconshreveable/go-update", "Rev": "68f5725818189545231c1fd8694793d45f2fc529" }, + { + "ImportPath": "github.com/ipfs/go-log", + "Rev": "ee5cb9834b33bcf29689183e0323e328c8b8de29" + }, { "ImportPath": "github.com/jackpal/go-nat-pmp", "Rev": "a45aa3d54aef73b504e15eb71bea0e5565b5e6e1" @@ -129,7 +133,7 @@ }, { "ImportPath": "github.com/jbenet/go-datastore", - "Rev": "7d6acaf7c0164c335f2ca4100f8fe30a7e2943dd" + "Rev": "c835c30f206c1e97172e428f052e225adab9abde" }, { "ImportPath": "github.com/jbenet/go-detect-race", diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/.gxlastpubver b/Godeps/_workspace/src/github.com/ipfs/go-log/.gxlastpubver new file mode 100644 index 000000000..8775b73b7 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/.gxlastpubver @@ -0,0 +1 @@ +QmXJkcEXB6C9h6Ytb6rrUTFU56Ro62zxgrbxTT3dgjQGA8 \ No newline at end of file diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/context.go b/Godeps/_workspace/src/github.com/ipfs/go-log/context.go new file mode 100644 index 000000000..0468047e4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/context.go @@ -0,0 +1,38 @@ +package log + +import ( + "errors" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +type key int + +const metadataKey key = 0 + +// ContextWithLoggable returns a derived context which contains the provided +// Loggable. Any Events logged with the derived context will include the +// provided Loggable. +func ContextWithLoggable(ctx context.Context, l Loggable) context.Context { + existing, err := MetadataFromContext(ctx) + if err != nil { + // context does not contain meta. just set the new metadata + child := context.WithValue(ctx, metadataKey, Metadata(l.Loggable())) + return child + } + + merged := DeepMerge(existing, l.Loggable()) + child := context.WithValue(ctx, metadataKey, merged) + return child +} + +func MetadataFromContext(ctx context.Context) (Metadata, error) { + value := ctx.Value(metadataKey) + if value != nil { + metadata, ok := value.(Metadata) + if ok { + return metadata, nil + } + } + return nil, errors.New("context contains no metadata") +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/context_test.go b/Godeps/_workspace/src/github.com/ipfs/go-log/context_test.go new file mode 100644 index 000000000..699a2edc2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/context_test.go @@ -0,0 +1,44 @@ +package log + +import ( + "testing" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +func TestContextContainsMetadata(t *testing.T) { + t.Parallel() + + m := Metadata{"foo": "bar"} + ctx := ContextWithLoggable(context.Background(), m) + got, err := MetadataFromContext(ctx) + if err != nil { + t.Fatal(err) + } + + _, exists := got["foo"] + if !exists { + t.Fail() + } +} + +func TestContextWithPreexistingMetadata(t *testing.T) { + t.Parallel() + + ctx := ContextWithLoggable(context.Background(), Metadata{"hello": "world"}) + ctx = ContextWithLoggable(ctx, Metadata{"goodbye": "earth"}) + + got, err := MetadataFromContext(ctx) + if err != nil { + t.Fatal(err) + } + + _, exists := got["hello"] + if !exists { + t.Fatal("original key not present") + } + _, exists = got["goodbye"] + if !exists { + t.Fatal("new key not present") + } +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/entry.go b/Godeps/_workspace/src/github.com/ipfs/go-log/entry.go new file mode 100644 index 000000000..e5086c408 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/entry.go @@ -0,0 +1,42 @@ +package log + +import ( + "time" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/Sirupsen/logrus" +) + +type entry struct { + loggables []Loggable + system string + event string +} + +// Log logs the event unconditionally (regardless of log level) +// TODO add support for leveled-logs once we decide which levels we want +// for our structured logs +func (e *entry) Log() { + e.log() +} + +// log is a private method invoked by the public Log, Info, Error methods +func (e *entry) log() { + // accumulate metadata + accum := Metadata{} + for _, loggable := range e.loggables { + accum = DeepMerge(accum, loggable.Loggable()) + } + + // apply final attributes to reserved keys + // TODO accum["level"] = level + accum["event"] = e.event + accum["system"] = e.system + accum["time"] = FormatRFC3339(time.Now()) + + // TODO roll our own event logger + logrus.WithFields(map[string]interface{}(accum)).Info(e.event) +} + +func FormatRFC3339(t time.Time) string { + return t.UTC().Format(time.RFC3339Nano) +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/example_test.go b/Godeps/_workspace/src/github.com/ipfs/go-log/example_test.go new file mode 100644 index 000000000..f28e90b96 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/example_test.go @@ -0,0 +1,16 @@ +package log + +import "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + +func ExampleEventLogger() { + { + log := EventLogger(nil) + e := log.EventBegin(context.Background(), "dial") + e.Done() + } + { + log := EventLogger(nil) + e := log.EventBegin(context.Background(), "dial") + _ = e.Close() // implements io.Closer for convenience + } +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/log.go b/Godeps/_workspace/src/github.com/ipfs/go-log/log.go new file mode 100644 index 000000000..563f1b7a9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/log.go @@ -0,0 +1,149 @@ +package log + +import ( + "fmt" + "time" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +// StandardLogger provides API compatibility with standard printf loggers +// eg. go-logging +type StandardLogger interface { + Debug(args ...interface{}) + Debugf(format string, args ...interface{}) + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Fatal(args ...interface{}) + Fatalf(format string, args ...interface{}) + Info(args ...interface{}) + Infof(format string, args ...interface{}) + Panic(args ...interface{}) + Panicf(format string, args ...interface{}) + Warning(args ...interface{}) + Warningf(format string, args ...interface{}) +} + +// EventLogger extends the StandardLogger interface to allow for log items +// containing structured metadata +type EventLogger interface { + StandardLogger + + // Event merges structured data from the provided inputs into a single + // machine-readable log event. + // + // If the context contains metadata, a copy of this is used as the base + // metadata accumulator. + // + // If one or more loggable objects are provided, these are deep-merged into base blob. + // + // Next, the event name is added to the blob under the key "event". If + // the key "event" already exists, it will be over-written. + // + // Finally the timestamp and package name are added to the accumulator and + // the metadata is logged. + Event(ctx context.Context, event string, m ...Loggable) + + EventBegin(ctx context.Context, event string, m ...Loggable) *EventInProgress +} + +// Logger retrieves an event logger by name +func Logger(system string) EventLogger { + + // TODO if we would like to adjust log levels at run-time. Store this event + // logger in a map (just like the util.Logger impl) + if len(system) == 0 { + log.Warnf("Missing name parameter") + system = "undefined" + } + if _, ok := loggers[system]; !ok { + loggers[system] = log.WithField("module", system) + } + logger := loggers[system] + + return &eventLogger{system: system, StandardLogger: logger} +} + +// eventLogger implements the EventLogger and wraps a go-logging Logger +type eventLogger struct { + StandardLogger + + system string + // TODO add log-level +} + +func (el *eventLogger) EventBegin(ctx context.Context, event string, metadata ...Loggable) *EventInProgress { + start := time.Now() + el.Event(ctx, fmt.Sprintf("%sBegin", event), metadata...) + + eip := &EventInProgress{} + eip.doneFunc = func(additional []Loggable) { + + metadata = append(metadata, additional...) // anything added during the operation + metadata = append(metadata, LoggableMap(map[string]interface{}{ // finally, duration of event + "duration": time.Now().Sub(start), + })) + + el.Event(ctx, event, metadata...) + } + return eip +} + +func (el *eventLogger) Event(ctx context.Context, event string, metadata ...Loggable) { + + // short circuit if theres nothing to write to + if !WriterGroup.Active() { + return + } + + // Collect loggables for later logging + var loggables []Loggable + + // get any existing metadata from the context + existing, err := MetadataFromContext(ctx) + if err != nil { + existing = Metadata{} + } + loggables = append(loggables, existing) + + for _, datum := range metadata { + loggables = append(loggables, datum) + } + + e := entry{ + loggables: loggables, + system: el.system, + event: event, + } + + e.Log() // TODO replace this when leveled-logs have been implemented +} + +type EventInProgress struct { + loggables []Loggable + doneFunc func([]Loggable) +} + +// Append adds loggables to be included in the call to Done +func (eip *EventInProgress) Append(l Loggable) { + eip.loggables = append(eip.loggables, l) +} + +// SetError includes the provided error +func (eip *EventInProgress) SetError(err error) { + eip.loggables = append(eip.loggables, LoggableMap{ + "error": err.Error(), + }) +} + +// Done creates a new Event entry that includes the duration and appended +// loggables. +func (eip *EventInProgress) Done() { + eip.doneFunc(eip.loggables) // create final event with extra data +} + +// Close is an alias for done +func (eip *EventInProgress) Close() error { + eip.Done() + return nil +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/loggable.go b/Godeps/_workspace/src/github.com/ipfs/go-log/loggable.go new file mode 100644 index 000000000..d770ebaf0 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/loggable.go @@ -0,0 +1,34 @@ +package log + +// Loggable describes objects that can be marshalled into Metadata for logging +type Loggable interface { + Loggable() map[string]interface{} +} + +type LoggableMap map[string]interface{} + +func (l LoggableMap) Loggable() map[string]interface{} { + return l +} + +// LoggableF converts a func into a Loggable +type LoggableF func() map[string]interface{} + +func (l LoggableF) Loggable() map[string]interface{} { + return l() +} + +func Deferred(key string, f func() string) Loggable { + function := func() map[string]interface{} { + return map[string]interface{}{ + key: f(), + } + } + return LoggableF(function) +} + +func Pair(key string, l Loggable) Loggable { + return LoggableMap{ + key: l, + } +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/metadata.go b/Godeps/_workspace/src/github.com/ipfs/go-log/metadata.go new file mode 100644 index 000000000..26164f7ef --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/metadata.go @@ -0,0 +1,82 @@ +package log + +import ( + "encoding/json" + "errors" + "reflect" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/satori/go.uuid" +) + +// Metadata is a convenience type for generic maps +type Metadata map[string]interface{} + +// Uuid returns a Metadata with the string key and UUID value +func Uuid(key string) Metadata { + return Metadata{ + key: uuid.NewV4().String(), + } +} + +// DeepMerge merges the second Metadata parameter into the first. +// Nested Metadata are merged recursively. Primitives are over-written. +func DeepMerge(b, a Metadata) Metadata { + out := Metadata{} + for k, v := range b { + out[k] = v + } + for k, v := range a { + + maybe, err := Metadatify(v) + if err != nil { + // if the new value is not meta. just overwrite the dest vaue + out[k] = v + continue + } + + // it is meta. What about dest? + outv, exists := out[k] + if !exists { + // the new value is meta, but there's no dest value. just write it + out[k] = v + continue + } + + outMetadataValue, err := Metadatify(outv) + if err != nil { + // the new value is meta and there's a dest value, but the dest + // value isn't meta. just overwrite + out[k] = v + continue + } + + // both are meta. merge them. + out[k] = DeepMerge(outMetadataValue, maybe) + } + return out +} + +// Loggable implements the Loggable interface +func (m Metadata) Loggable() map[string]interface{} { + // NB: method defined on value to avoid de-referencing nil Metadata + return m +} + +func (m Metadata) JsonString() (string, error) { + // NB: method defined on value + b, err := json.Marshal(m) + return string(b), err +} + +// Metadatify converts maps into Metadata +func Metadatify(i interface{}) (Metadata, error) { + value := reflect.ValueOf(i) + if value.Kind() == reflect.Map { + m := map[string]interface{}{} + for _, k := range value.MapKeys() { + m[k.String()] = value.MapIndex(k).Interface() + } + return Metadata(m), nil + } + return nil, errors.New("is not a map") +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/metadata_test.go b/Godeps/_workspace/src/github.com/ipfs/go-log/metadata_test.go new file mode 100644 index 000000000..c1814511c --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/metadata_test.go @@ -0,0 +1,50 @@ +package log + +import "testing" + +func TestOverwrite(t *testing.T) { + t.Parallel() + + under := Metadata{ + "a": Metadata{ + "b": Metadata{ + "c": Metadata{ + "d": "the original value", + "other": "SURVIVE", + }, + }, + }, + } + over := Metadata{ + "a": Metadata{ + "b": Metadata{ + "c": Metadata{ + "d": "a new value", + }, + }, + }, + } + + out := DeepMerge(under, over) + + dval := out["a"].(Metadata)["b"].(Metadata)["c"].(Metadata)["d"].(string) + if dval != "a new value" { + t.Fatal(dval) + } + surv := out["a"].(Metadata)["b"].(Metadata)["c"].(Metadata)["other"].(string) + if surv != "SURVIVE" { + t.Fatal(surv) + } +} + +func TestMarshalJSON(t *testing.T) { + t.Parallel() + bs, _ := Metadata{"a": "b"}.JsonString() + t.Log(bs) +} + +func TestMetadataIsLoggable(t *testing.T) { + t.Parallel() + func(l Loggable) { + }(Metadata{}) +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/oldlog.go b/Godeps/_workspace/src/github.com/ipfs/go-log/oldlog.go new file mode 100644 index 000000000..4a8c3eb86 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/oldlog.go @@ -0,0 +1,97 @@ +package log + +import ( + "errors" + "os" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/Sirupsen/logrus" +) + +func init() { + SetupLogging() +} + +var log = logrus.New() + +// LogFormats is a map of formats used for our logger, keyed by name. +// TODO: write custom TextFormatter (don't print module=name explicitly) and +// fork logrus to add shortfile +var LogFormats = map[string]*logrus.TextFormatter{ + "nocolor": {DisableColors: true, FullTimestamp: true, TimestampFormat: "2006-01-02 15:04:05.000000", DisableSorting: true}, + "color": {DisableColors: false, FullTimestamp: true, TimestampFormat: "15:04:05:000", DisableSorting: true}, +} +var defaultLogFormat = "color" + +// Logging environment variables +const ( + envLogging = "IPFS_LOGGING" + envLoggingFmt = "IPFS_LOGGING_FMT" +) + +// ErrNoSuchLogger is returned when the util pkg is asked for a non existant logger +var ErrNoSuchLogger = errors.New("Error: No such logger") + +// loggers is the set of loggers in the system +var loggers = map[string]*logrus.Entry{} + +// SetupLogging will initialize the logger backend and set the flags. +func SetupLogging() { + + format, ok := LogFormats[os.Getenv(envLoggingFmt)] + if !ok { + format = LogFormats[defaultLogFormat] + } + + log.Out = os.Stderr + log.Formatter = format + + lvl := logrus.ErrorLevel + + if logenv := os.Getenv(envLogging); logenv != "" { + var err error + lvl, err = logrus.ParseLevel(logenv) + if err != nil { + log.Debugf("logrus.ParseLevel() Error: %q", err) + lvl = logrus.ErrorLevel // reset to ERROR, could be undefined now(?) + } + } + + SetAllLoggers(lvl) +} + +// SetDebugLogging calls SetAllLoggers with logrus.DebugLevel +func SetDebugLogging() { + SetAllLoggers(logrus.DebugLevel) +} + +// SetAllLoggers changes the logrus.Level of all loggers to lvl +func SetAllLoggers(lvl logrus.Level) { + log.Level = lvl + for _, logger := range loggers { + logger.Level = lvl + } +} + +// SetLogLevel changes the log level of a specific subsystem +// name=="*" changes all subsystems +func SetLogLevel(name, level string) error { + lvl, err := logrus.ParseLevel(level) + if err != nil { + return err + } + + // wildcard, change all + if name == "*" { + SetAllLoggers(lvl) + return nil + } + + // Check if we have a logger by that name + if _, ok := loggers[name]; !ok { + return ErrNoSuchLogger + } + + loggers[name].Level = lvl + + return nil +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/option.go b/Godeps/_workspace/src/github.com/ipfs/go-log/option.go new file mode 100644 index 000000000..17750fd84 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/option.go @@ -0,0 +1,61 @@ +package log + +import ( + "io" + "os" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/Sirupsen/logrus" +) + +// init sets up sane defaults +func init() { + Configure(TextFormatter) + Configure(Output(os.Stderr)) + // has the effect of disabling logging since we log event entries at Info + // level by convention + Configure(LevelError) +} + +// Global writer group for logs to output to +var WriterGroup = new(MirrorWriter) + +type Option func() + +// Configure applies the provided options sequentially from left to right +func Configure(options ...Option) { + for _, f := range options { + f() + } +} + +// LdJSONFormatter Option formats the event log as line-delimited JSON +var LdJSONFormatter = func() { + logrus.SetFormatter(&PoliteJSONFormatter{}) +} + +// TextFormatter Option formats the event log as human-readable plain-text +var TextFormatter = func() { + logrus.SetFormatter(&logrus.TextFormatter{}) +} + +func Output(w io.Writer) Option { + return func() { + logrus.SetOutput(w) + // TODO return previous Output option + } +} + +// LevelDebug Option sets the log level to debug +var LevelDebug = func() { + logrus.SetLevel(logrus.DebugLevel) +} + +// LevelDebug Option sets the log level to error +var LevelError = func() { + logrus.SetLevel(logrus.ErrorLevel) +} + +// LevelDebug Option sets the log level to info +var LevelInfo = func() { + logrus.SetLevel(logrus.InfoLevel) +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/package.json b/Godeps/_workspace/src/github.com/ipfs/go-log/package.json new file mode 100644 index 000000000..8752f8d67 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/package.json @@ -0,0 +1,5 @@ +{ + "name": "go-log", + "version": "1.0.0", + "language": "go" +} \ No newline at end of file diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/polite_json_formatter.go b/Godeps/_workspace/src/github.com/ipfs/go-log/polite_json_formatter.go new file mode 100644 index 000000000..be6638cfb --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/polite_json_formatter.go @@ -0,0 +1,20 @@ +package log + +import ( + "encoding/json" + "fmt" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/Sirupsen/logrus" +) + +// PoliteJSONFormatter marshals entries into JSON encoded slices (without +// overwriting user-provided keys). How polite of it! +type PoliteJSONFormatter struct{} + +func (f *PoliteJSONFormatter) Format(entry *logrus.Entry) ([]byte, error) { + serialized, err := json.Marshal(entry.Data) + if err != nil { + return nil, fmt.Errorf("Failed to marshal fields to JSON, %v", err) + } + return append(serialized, '\n'), nil +} diff --git a/Godeps/_workspace/src/github.com/ipfs/go-log/writer.go b/Godeps/_workspace/src/github.com/ipfs/go-log/writer.go new file mode 100644 index 000000000..cfa48c9b3 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ipfs/go-log/writer.go @@ -0,0 +1,53 @@ +package log + +import ( + "io" + "sync" +) + +type MirrorWriter struct { + writers []io.Writer + lk sync.Mutex +} + +func (mw *MirrorWriter) Write(b []byte) (int, error) { + mw.lk.Lock() + // write to all writers, and nil out the broken ones. + for i, w := range mw.writers { + _, err := w.Write(b) + if err != nil { + mw.writers[i] = nil + } + } + + // consolidate the slice + for i := 0; i < len(mw.writers); i++ { + if mw.writers[i] != nil { + continue + } + + j := len(mw.writers) - 1 + for ; j > i; j-- { + if mw.writers[j] != nil { + mw.writers[i], mw.writers[j] = mw.writers[j], nil // swap + break + } + } + mw.writers = mw.writers[:j] + } + mw.lk.Unlock() + return len(b), nil +} + +func (mw *MirrorWriter) AddWriter(w io.Writer) { + mw.lk.Lock() + mw.writers = append(mw.writers, w) + mw.lk.Unlock() +} + +func (mw *MirrorWriter) Active() (active bool) { + mw.lk.Lock() + active = len(mw.writers) > 0 + mw.lk.Unlock() + return +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json index 966434229..f5b260883 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/Godeps/Godeps.json @@ -1,10 +1,15 @@ { "ImportPath": "github.com/jbenet/go-datastore", - "GoVersion": "go1.4.2", + "GoVersion": "go1.5", "Packages": [ "./..." ], "Deps": [ + { + "ImportPath": "github.com/Sirupsen/logrus", + "Comment": "v0.8.3-37-g418b41d", + "Rev": "418b41d23a1bf978c06faea5313ba194650ac088" + }, { "ImportPath": "github.com/codahale/blake2", "Rev": "3fa823583afba430e8fc7cdbcc670dbf90bfacc4" @@ -21,10 +26,19 @@ "ImportPath": "github.com/dustin/randbo", "Rev": "7f1b564ca7242d22bcc6e2128beb90d9fa38b9f0" }, + { + "ImportPath": "github.com/fzzy/radix/redis", + "Comment": "v0.5.1", + "Rev": "27a863cdffdb0998d13e1e11992b18489aeeaa25" + }, { "ImportPath": "github.com/hashicorp/golang-lru", "Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370" }, + { + "ImportPath": "github.com/ipfs/go-log", + "Rev": "ee5cb9834b33bcf29689183e0323e328c8b8de29" + }, { "ImportPath": "github.com/jbenet/go-os-rename", "Rev": "2d93ae970ba96c41f717036a5bf5494faf1f38c0" @@ -53,6 +67,10 @@ "ImportPath": "github.com/syndtr/gosnappy/snappy", "Rev": "ce8acff4829e0c2458a67ead32390ac0a381c862" }, + { + "ImportPath": "golang.org/x/net/context", + "Rev": "dfcbca9c45aeabb8971affa4f76b2d40f6f72328" + }, { "ImportPath": "gopkg.in/check.v1", "Rev": "91ae5f88a67b14891cfd43895b01164f6c120420" diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go index ce98b752a..034daa1b9 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/basic_ds.go @@ -1,6 +1,7 @@ package datastore import ( + "io" "log" dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" @@ -67,6 +68,10 @@ func (d *MapDatastore) Batch() (Batch, error) { return NewBasicBatch(d), nil } +func (d *MapDatastore) Close() error { + return nil +} + // NullDatastore stores nothing, but conforms to the API. // Useful to test with. type NullDatastore struct { @@ -106,6 +111,10 @@ func (d *NullDatastore) Batch() (Batch, error) { return NewBasicBatch(d), nil } +func (d *NullDatastore) Close() error { + return nil +} + // LogDatastore logs all accesses through the datastore. type LogDatastore struct { Name string @@ -165,9 +174,16 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) { func (d *LogDatastore) Batch() (Batch, error) { log.Printf("%s: Batch\n", d.Name) - bds, ok := d.child.(BatchingDatastore) - if !ok { - return nil, ErrBatchUnsupported + if bds, ok := d.child.(Batching); ok { + return bds.Batch() } - return bds.Batch() + return nil, ErrBatchUnsupported +} + +func (d *LogDatastore) Close() error { + log.Printf("%s: Close\n", d.Name) + if cds, ok := d.child.(io.Closer); ok { + return cds.Close() + } + return nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/coalesce/coalesce.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/coalesce/coalesce.go index 90c354683..e85a4b491 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/coalesce/coalesce.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/coalesce/coalesce.go @@ -1,6 +1,7 @@ package coalesce import ( + "io" "sync" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -124,3 +125,16 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { // query not coalesced yet. return d.child.Query(q) } + +func (d *datastore) Close() error { + d.reqmu.Lock() + defer d.reqmu.Unlock() + + for _, s := range d.req { + <-s.done + } + if c, ok := d.child.(io.Closer); ok { + return c.Close() + } + return nil +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go index 91deebf29..63df85e6c 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/datastore.go @@ -69,7 +69,7 @@ type Datastore interface { Query(q query.Query) (query.Results, error) } -type BatchingDatastore interface { +type Batching interface { Datastore Batch() (Batch, error) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs/flatfs.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs/flatfs.go index 0ec33caf3..3dcee639a 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs/flatfs.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/flatfs/flatfs.go @@ -10,12 +10,16 @@ import ( "os" "path" "strings" + "time" + logging "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-log" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-os-rename" ) +var log = logging.Logger("flatfs") + const ( extension = ".data" maxPrefixLen = 16 @@ -93,12 +97,32 @@ func (fs *Datastore) makePrefixDirNoSync(dir string) error { return nil } +var putMaxRetries = 3 + func (fs *Datastore) Put(key datastore.Key, value interface{}) error { val, ok := value.([]byte) if !ok { return datastore.ErrInvalidType } + var err error + for i := 0; i < putMaxRetries; i++ { + err = fs.doPut(key, val) + if err == nil { + return nil + } + + if !strings.Contains(err.Error(), "too many open files") { + return err + } + + log.Error("too many open files, retrying in %dms", 100*i) + time.Sleep(time.Millisecond * 100 * time.Duration(i)) + } + return err +} + +func (fs *Datastore) doPut(key datastore.Key, val []byte) error { dir, path := fs.encode(key) if err := fs.makePrefixDir(dir); err != nil { return err @@ -323,6 +347,10 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E return res, nil } +func (fs *Datastore) Close() error { + return nil +} + type flatfsBatch struct { puts map[datastore.Key]interface{} deletes map[datastore.Key]struct{} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go index f56896936..5271257bb 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/fs/fs.go @@ -149,3 +149,11 @@ func isFile(path string) bool { return !finfo.IsDir() } + +func (d *Datastore) Close() error { + return nil +} + +func (d *Datastore) Batch() (ds.Batch, error) { + return ds.NewBasicBatch(d), nil +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/interface.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/interface.go index 7e653a1d4..e1576adaa 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/interface.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/interface.go @@ -16,14 +16,12 @@ type KeyTransform interface { type Datastore interface { ds.Shim KeyTransform - - Batch() (ds.Batch, error) } // Wrap wraps a given datastore with a KeyTransform function. // The resulting wrapped datastore will use the transform on all Datastore // operations. -func Wrap(child ds.Datastore, t KeyTransform) Datastore { +func Wrap(child ds.Datastore, t KeyTransform) *ktds { if t == nil { panic("t (KeyTransform) is nil") } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go index e238386dd..e7eba924c 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform/keytransform.go @@ -1,6 +1,8 @@ package keytransform import ( + "io" + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) @@ -74,8 +76,15 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { return dsq.DerivedResults(qr, ch), nil } +func (d *ktds) Close() error { + if c, ok := d.child.(io.Closer); ok { + return c.Close() + } + return nil +} + func (d *ktds) Batch() (ds.Batch, error) { - bds, ok := d.child.(ds.BatchingDatastore) + bds, ok := d.child.(ds.Batching) if !ok { return nil, ds.ErrBatchUnsupported } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go index 215459dd3..07d0a296d 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/datastore.go @@ -1,8 +1,6 @@ package leveldb import ( - "io" - ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" dsq "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" @@ -11,18 +9,13 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util" ) -type Datastore interface { - ds.ThreadSafeDatastore - io.Closer -} - type datastore struct { DB *leveldb.DB } type Options opt.Options -func NewDatastore(path string, opts *Options) (Datastore, error) { +func NewDatastore(path string, opts *Options) (*datastore, error) { var nopts opt.Options if opts != nil { nopts = opt.Options(*opts) @@ -148,6 +141,11 @@ func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) { } } +func (d *datastore) Batch() (ds.Batch, error) { + // TODO: implement batch on leveldb + return nil, ds.ErrBatchUnsupported +} + // LevelDB needs to be closed. func (d *datastore) Close() (err error) { return d.DB.Close() diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go index e7e803e34..131fdeb3e 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb/ds_test.go @@ -25,7 +25,7 @@ var testcases = map[string]string{ // // d, close := newDS(t) // defer close() -func newDS(t *testing.T) (Datastore, func()) { +func newDS(t *testing.T) (*datastore, func()) { path, err := ioutil.TempDir("/tmp", "testing_leveldb_") if err != nil { t.Fatal(err) @@ -41,7 +41,7 @@ func newDS(t *testing.T) (Datastore, func()) { } } -func addTestCases(t *testing.T, d Datastore, testcases map[string]string) { +func addTestCases(t *testing.T, d *datastore, testcases map[string]string) { for k, v := range testcases { dsk := ds.NewKey(k) if err := d.Put(dsk, []byte(v)); err != nil { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go index 29d4e7d0b..16c521734 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go @@ -54,3 +54,11 @@ func (d *Datastore) Delete(key ds.Key) (err error) { func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { return nil, errors.New("KeyList not implemented.") } + +func (d *Datastore) Close() error { + return nil +} + +func (d *Datastore) Batch() (ds.Batch, error) { + return nil, ds.ErrBatchUnsupported +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/measure/measure.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/measure/measure.go index a71bb7c95..7cd568a33 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/measure/measure.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/measure/measure.go @@ -3,6 +3,7 @@ package measure import ( + "io" "time" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/codahale/metrics" @@ -18,17 +19,12 @@ const ( maxSize = int64(1 << 32) ) -type DatastoreCloser interface { - datastore.Datastore - Close() error -} - // New wraps the datastore, providing metrics on the operations. The // metrics are registered with names starting with prefix and a dot. // // If prefix is not unique, New will panic. Call Close to release the // prefix. -func New(prefix string, ds datastore.Datastore) DatastoreCloser { +func New(prefix string, ds datastore.Datastore) *measure { m := &measure{ backend: ds, @@ -84,7 +80,6 @@ type measure struct { } var _ datastore.Datastore = (*measure)(nil) -var _ DatastoreCloser = (*measure)(nil) func recordLatency(h *metrics.Histogram, start time.Time) { elapsed := time.Now().Sub(start) / time.Microsecond @@ -159,7 +154,7 @@ type measuredBatch struct { } func (m *measure) Batch() (datastore.Batch, error) { - bds, ok := m.backend.(datastore.BatchingDatastore) + bds, ok := m.backend.(datastore.Batching) if !ok { return nil, datastore.ErrBatchUnsupported } @@ -245,5 +240,9 @@ func (m *measure) Close() error { m.queryNum.Remove() m.queryErr.Remove() m.queryLatency.Remove() + + if c, ok := m.backend.(io.Closer); ok { + return c.Close() + } return nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount/mount.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount/mount.go index d8f4528b7..831ab9ccd 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount/mount.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount/mount.go @@ -4,6 +4,7 @@ package mount import ( "errors" + "io" "strings" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -115,6 +116,18 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { return r, nil } +func (d *Datastore) Close() error { + for _, d := range d.mounts { + if c, ok := d.Datastore.(io.Closer); ok { + err := c.Close() + if err != nil { + return err + } + } + } + return nil +} + type mountBatch struct { mounts map[string]datastore.Batch @@ -132,7 +145,7 @@ func (mt *mountBatch) lookupBatch(key datastore.Key) (datastore.Batch, datastore child, loc, rest := mt.d.lookup(key) t, ok := mt.mounts[loc.String()] if !ok { - bds, ok := child.(datastore.BatchingDatastore) + bds, ok := child.(datastore.Batching) if !ok { return nil, datastore.NewKey(""), datastore.ErrBatchUnsupported } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go index 820e815ef..ef8c96e1a 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace/namespace.go @@ -36,7 +36,7 @@ func PrefixTransform(prefix ds.Key) ktds.KeyTransform { } // Wrap wraps a given datastore with a key-prefix. -func Wrap(child ds.Datastore, prefix ds.Key) ktds.Datastore { +func Wrap(child ds.Datastore, prefix ds.Key) *datastore { if child == nil { panic("child (ds.Datastore) is nil") } @@ -81,3 +81,11 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { return dsq.DerivedResults(qr, ch), nil } + +func (d *datastore) Batch() (ds.Batch, error) { + if bds, ok := d.Datastore.(ds.Batching); ok { + return bds.Batch() + } + + return nil, ds.ErrBatchUnsupported +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go index 3554644ee..1d1aa70c7 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/panic/panic.go @@ -2,6 +2,7 @@ package sync import ( "fmt" + "io" "os" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -67,6 +68,26 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { return r, nil } +func (d *datastore) Close() error { + if c, ok := d.child.(io.Closer); ok { + err := c.Close() + if err != nil { + fmt.Fprintf(os.Stdout, "panic datastore: %s", err) + panic("panic datastore: Close failed") + } + } + return nil +} + +func (d *datastore) Batch() (ds.Batch, error) { + b, err := d.child.(ds.Batching).Batch() + if err != nil { + return nil, err + } + + return &panicBatch{b}, nil +} + type panicBatch struct { t ds.Batch } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/redis/redis.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/redis/redis.go index 3ca4f0a27..24799635a 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/redis/redis.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/redis/redis.go @@ -7,7 +7,6 @@ import ( "time" "github.com/fzzy/radix/redis" - datastore "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" query "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" ) @@ -17,14 +16,14 @@ var _ datastore.ThreadSafeDatastore = &Datastore{} var ErrInvalidType = errors.New("redis datastore: invalid type error. this datastore only supports []byte values") -func NewExpiringDatastore(client *redis.Client, ttl time.Duration) (datastore.ThreadSafeDatastore, error) { +func NewExpiringDatastore(client *redis.Client, ttl time.Duration) (*Datastore, error) { return &Datastore{ client: client, ttl: ttl, }, nil } -func NewDatastore(client *redis.Client) (datastore.ThreadSafeDatastore, error) { +func NewDatastore(client *redis.Client) (*Datastore, error) { return &Datastore{ client: client, }, nil @@ -83,3 +82,11 @@ func (ds *Datastore) Query(q query.Query) (query.Results, error) { } func (ds *Datastore) IsThreadSafe() {} + +func (ds *Datastore) Batch() (datastore.Batch, error) { + return nil, datastore.ErrBatchUnsupported +} + +func (ds *Datastore) Close() error { + return ds.client.Close() +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/redis/redis_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/redis/redis_test.go index 52e5f4a6e..05036b249 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/redis/redis_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/redis/redis_test.go @@ -8,6 +8,7 @@ import ( "github.com/fzzy/radix/redis" datastore "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dstest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/test" ) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go index 01b9f5aa8..55820bf70 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync/sync.go @@ -1,6 +1,7 @@ package sync import ( + "io" "sync" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" @@ -17,7 +18,7 @@ type MutexDatastore struct { // MutexWrap constructs a datastore with a coarse lock around // the entire datastore, for every single operation -func MutexWrap(d ds.Datastore) ds.ThreadSafeDatastore { +func MutexWrap(d ds.Datastore) *MutexDatastore { return &MutexDatastore{child: d} } @@ -67,7 +68,7 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { func (d *MutexDatastore) Batch() (ds.Batch, error) { d.RLock() defer d.RUnlock() - bds, ok := d.child.(ds.BatchingDatastore) + bds, ok := d.child.(ds.Batching) if !ok { return nil, ds.ErrBatchUnsupported } @@ -81,6 +82,15 @@ func (d *MutexDatastore) Batch() (ds.Batch, error) { }, nil } +func (d *MutexDatastore) Close() error { + d.RWMutex.Lock() + defer d.RWMutex.Unlock() + if c, ok := d.child.(io.Closer); ok { + return c.Close() + } + return nil +} + type syncBatch struct { lk sync.Mutex batch ds.Batch diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/syncmount/mount.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/syncmount/mount.go new file mode 100644 index 000000000..adc882627 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/syncmount/mount.go @@ -0,0 +1,198 @@ +// Package mount provides a Datastore that has other Datastores +// mounted at various key prefixes and is threadsafe +package syncmount + +import ( + "errors" + "io" + "strings" + "sync" + + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/keytransform" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" +) + +var ( + ErrNoMount = errors.New("no datastore mounted for this key") +) + +type Mount struct { + Prefix ds.Key + Datastore ds.Datastore +} + +func New(mounts []Mount) *Datastore { + // make a copy so we're sure it doesn't mutate + m := make([]Mount, len(mounts)) + for i, v := range mounts { + m[i] = v + } + return &Datastore{mounts: m} +} + +type Datastore struct { + mounts []Mount + lk sync.Mutex +} + +var _ ds.Datastore = (*Datastore)(nil) + +func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) { + d.lk.Lock() + defer d.lk.Unlock() + for _, m := range d.mounts { + if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) { + s := strings.TrimPrefix(key.String(), m.Prefix.String()) + k := ds.NewKey(s) + return m.Datastore, m.Prefix, k + } + } + return nil, ds.NewKey("/"), key +} + +func (d *Datastore) Put(key ds.Key, value interface{}) error { + cds, _, k := d.lookup(key) + if cds == nil { + return ErrNoMount + } + return cds.Put(k, value) +} + +func (d *Datastore) Get(key ds.Key) (value interface{}, err error) { + cds, _, k := d.lookup(key) + if cds == nil { + return nil, ds.ErrNotFound + } + return cds.Get(k) +} + +func (d *Datastore) Has(key ds.Key) (exists bool, err error) { + cds, _, k := d.lookup(key) + if cds == nil { + return false, nil + } + return cds.Has(k) +} + +func (d *Datastore) Delete(key ds.Key) error { + cds, _, k := d.lookup(key) + if cds == nil { + return ds.ErrNotFound + } + return cds.Delete(k) +} + +func (d *Datastore) Query(q query.Query) (query.Results, error) { + if len(q.Filters) > 0 || + len(q.Orders) > 0 || + q.Limit > 0 || + q.Offset > 0 { + // TODO this is overly simplistic, but the only caller is + // `ipfs refs local` for now, and this gets us moving. + return nil, errors.New("mount only supports listing all prefixed keys in random order") + } + key := ds.NewKey(q.Prefix) + cds, mount, k := d.lookup(key) + if cds == nil { + return nil, errors.New("mount only supports listing a mount point") + } + // TODO support listing cross mount points too + + // delegate the query to the mounted datastore, while adjusting + // keys in and out + q2 := q + q2.Prefix = k.String() + wrapDS := keytransform.Wrap(cds, &keytransform.Pair{ + Convert: func(ds.Key) ds.Key { + panic("this should never be called") + }, + Invert: func(k ds.Key) ds.Key { + return mount.Child(k) + }, + }) + + r, err := wrapDS.Query(q2) + if err != nil { + return nil, err + } + r = query.ResultsReplaceQuery(r, q) + return r, nil +} + +func (d *Datastore) IsThreadSafe() {} + +func (d *Datastore) Close() error { + for _, d := range d.mounts { + if c, ok := d.Datastore.(io.Closer); ok { + err := c.Close() + if err != nil { + return err + } + } + } + return nil +} + +type mountBatch struct { + mounts map[string]ds.Batch + lk sync.Mutex + + d *Datastore +} + +func (d *Datastore) Batch() (ds.Batch, error) { + return &mountBatch{ + mounts: make(map[string]ds.Batch), + d: d, + }, nil +} + +func (mt *mountBatch) lookupBatch(key ds.Key) (ds.Batch, ds.Key, error) { + mt.lk.Lock() + defer mt.lk.Unlock() + + child, loc, rest := mt.d.lookup(key) + t, ok := mt.mounts[loc.String()] + if !ok { + bds, ok := child.(ds.Batching) + if !ok { + return nil, ds.NewKey(""), ds.ErrBatchUnsupported + } + var err error + t, err = bds.Batch() + if err != nil { + return nil, ds.NewKey(""), err + } + mt.mounts[loc.String()] = t + } + return t, rest, nil +} + +func (mt *mountBatch) Put(key ds.Key, val interface{}) error { + t, rest, err := mt.lookupBatch(key) + if err != nil { + return err + } + + return t.Put(rest, val) +} + +func (mt *mountBatch) Delete(key ds.Key) error { + t, rest, err := mt.lookupBatch(key) + if err != nil { + return err + } + + return t.Delete(rest) +} + +func (mt *mountBatch) Commit() error { + for _, t := range mt.mounts { + err := t.Commit() + if err != nil { + return err + } + } + return nil +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/syncmount/mount_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/syncmount/mount_test.go new file mode 100644 index 000000000..40feb2d03 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/syncmount/mount_test.go @@ -0,0 +1,241 @@ +package syncmount_test + +import ( + "testing" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/mount" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/query" +) + +func TestPutBadNothing(t *testing.T) { + m := mount.New(nil) + + err := m.Put(datastore.NewKey("quux"), []byte("foobar")) + if g, e := err, mount.ErrNoMount; g != e { + t.Fatalf("Put got wrong error: %v != %v", g, e) + } +} + +func TestPutBadNoMount(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/redherring"), Datastore: mapds}, + }) + + err := m.Put(datastore.NewKey("/quux/thud"), []byte("foobar")) + if g, e := err, mount.ErrNoMount; g != e { + t.Fatalf("expected ErrNoMount, got: %v\n", g) + } +} + +func TestPut(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, + }) + + if err := m.Put(datastore.NewKey("/quux/thud"), []byte("foobar")); err != nil { + t.Fatalf("Put error: %v", err) + } + + val, err := mapds.Get(datastore.NewKey("/thud")) + if err != nil { + t.Fatalf("Get error: %v", err) + } + buf, ok := val.([]byte) + if !ok { + t.Fatalf("Get value is not []byte: %T %v", val, val) + } + if g, e := string(buf), "foobar"; g != e { + t.Errorf("wrong value: %q != %q", g, e) + } +} + +func TestGetBadNothing(t *testing.T) { + m := mount.New([]mount.Mount{}) + + _, err := m.Get(datastore.NewKey("/quux/thud")) + if g, e := err, datastore.ErrNotFound; g != e { + t.Fatalf("expected ErrNotFound, got: %v\n", g) + } +} + +func TestGetBadNoMount(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/redherring"), Datastore: mapds}, + }) + + _, err := m.Get(datastore.NewKey("/quux/thud")) + if g, e := err, datastore.ErrNotFound; g != e { + t.Fatalf("expected ErrNotFound, got: %v\n", g) + } +} + +func TestGetNotFound(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, + }) + + _, err := m.Get(datastore.NewKey("/quux/thud")) + if g, e := err, datastore.ErrNotFound; g != e { + t.Fatalf("expected ErrNotFound, got: %v\n", g) + } +} + +func TestGet(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, + }) + + if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil { + t.Fatalf("Get error: %v", err) + } + + val, err := m.Get(datastore.NewKey("/quux/thud")) + if err != nil { + t.Fatalf("Put error: %v", err) + } + + buf, ok := val.([]byte) + if !ok { + t.Fatalf("Get value is not []byte: %T %v", val, val) + } + if g, e := string(buf), "foobar"; g != e { + t.Errorf("wrong value: %q != %q", g, e) + } +} + +func TestHasBadNothing(t *testing.T) { + m := mount.New([]mount.Mount{}) + + found, err := m.Has(datastore.NewKey("/quux/thud")) + if err != nil { + t.Fatalf("Has error: %v", err) + } + if g, e := found, false; g != e { + t.Fatalf("wrong value: %v != %v", g, e) + } +} + +func TestHasBadNoMount(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/redherring"), Datastore: mapds}, + }) + + found, err := m.Has(datastore.NewKey("/quux/thud")) + if err != nil { + t.Fatalf("Has error: %v", err) + } + if g, e := found, false; g != e { + t.Fatalf("wrong value: %v != %v", g, e) + } +} + +func TestHasNotFound(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, + }) + + found, err := m.Has(datastore.NewKey("/quux/thud")) + if err != nil { + t.Fatalf("Has error: %v", err) + } + if g, e := found, false; g != e { + t.Fatalf("wrong value: %v != %v", g, e) + } +} + +func TestHas(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, + }) + + if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil { + t.Fatalf("Put error: %v", err) + } + + found, err := m.Has(datastore.NewKey("/quux/thud")) + if err != nil { + t.Fatalf("Has error: %v", err) + } + if g, e := found, true; g != e { + t.Fatalf("wrong value: %v != %v", g, e) + } +} + +func TestDeleteNotFound(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, + }) + + err := m.Delete(datastore.NewKey("/quux/thud")) + if g, e := err, datastore.ErrNotFound; g != e { + t.Fatalf("expected ErrNotFound, got: %v\n", g) + } +} + +func TestDelete(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, + }) + + if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil { + t.Fatalf("Put error: %v", err) + } + + err := m.Delete(datastore.NewKey("/quux/thud")) + if err != nil { + t.Fatalf("Delete error: %v", err) + } + + // make sure it disappeared + found, err := mapds.Has(datastore.NewKey("/thud")) + if err != nil { + t.Fatalf("Has error: %v", err) + } + if g, e := found, false; g != e { + t.Fatalf("wrong value: %v != %v", g, e) + } +} + +func TestQuerySimple(t *testing.T) { + mapds := datastore.NewMapDatastore() + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/quux"), Datastore: mapds}, + }) + + const myKey = "/quux/thud" + if err := m.Put(datastore.NewKey(myKey), []byte("foobar")); err != nil { + t.Fatalf("Put error: %v", err) + } + + res, err := m.Query(query.Query{Prefix: "/quux"}) + if err != nil { + t.Fatalf("Query fail: %v\n", err) + } + entries, err := res.Rest() + if err != nil { + t.Fatalf("Query Results.Rest fail: %v\n", err) + } + seen := false + for _, e := range entries { + switch e.Key { + case datastore.NewKey(myKey).String(): + seen = true + default: + t.Errorf("saw unexpected key: %q", e.Key) + } + } + if !seen { + t.Errorf("did not see wanted key %q in %+v", myKey, entries) + } +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/test/test_util.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/test/test_util.go index 360115acc..bb272c3fb 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/test/test_util.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/test/test_util.go @@ -9,7 +9,7 @@ import ( dstore "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ) -func RunBatchTest(t *testing.T, ds dstore.BatchingDatastore) { +func RunBatchTest(t *testing.T, ds dstore.Batching) { batch, err := ds.Batch() if err != nil { t.Fatal(err) @@ -58,7 +58,7 @@ func RunBatchTest(t *testing.T, ds dstore.BatchingDatastore) { } } -func RunBatchDeleteTest(t *testing.T, ds dstore.BatchingDatastore) { +func RunBatchDeleteTest(t *testing.T, ds dstore.Batching) { r := rand.New() var keys []dstore.Key for i := 0; i < 20; i++ { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/tiered/tiered.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/tiered/tiered.go index 46a4d57be..8a16b5b2d 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/tiered/tiered.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/tiered/tiered.go @@ -13,7 +13,7 @@ type tiered []ds.Datastore // New returns a tiered datastore. Puts and Deletes will write-through to // all datastores, Has and Get will try each datastore sequentially, and // Query will always try the last one (most complete) first. -func New(dses ...ds.Datastore) ds.Datastore { +func New(dses ...ds.Datastore) tiered { return tiered(dses) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/tiered/tiered_test.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/tiered/tiered_test.go index 409814fa6..3bff9a5eb 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/tiered/tiered_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/tiered/tiered_test.go @@ -49,19 +49,19 @@ func TestTiered(t *testing.T) { td := New(d1, d2, d3, d4) td.Put(ds.NewKey("foo"), "bar") testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar") - testHas(t, td.(tiered), ds.NewKey("foo"), "bar") // all children + testHas(t, td, ds.NewKey("foo"), "bar") // all children // remove it from, say, caches. d1.Delete(ds.NewKey("foo")) d2.Delete(ds.NewKey("foo")) testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar") - testHas(t, td.(tiered)[2:], ds.NewKey("foo"), "bar") - testNotHas(t, td.(tiered)[:2], ds.NewKey("foo")) + testHas(t, td[2:], ds.NewKey("foo"), "bar") + testNotHas(t, td[:2], ds.NewKey("foo")) // write it again. td.Put(ds.NewKey("foo"), "bar2") testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar2") - testHas(t, td.(tiered), ds.NewKey("foo"), "bar2") + testHas(t, td, ds.NewKey("foo"), "bar2") } func TestQueryCallsLast(t *testing.T) { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/timecache/timecache.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/timecache/timecache.go index 8b1e840ac..1da1ef02c 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/timecache/timecache.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/timecache/timecache.go @@ -1,6 +1,7 @@ package timecache import ( + "io" "sync" "time" @@ -24,13 +25,13 @@ type datastore struct { ttls map[ds.Key]time.Time } -func WithTTL(ttl time.Duration) ds.Datastore { +func WithTTL(ttl time.Duration) *datastore { return WithCache(ds.NewMapDatastore(), ttl) } // WithCache wraps a given datastore as a timecache. // Get + Has requests are considered expired after a TTL. -func WithCache(d ds.Datastore, ttl time.Duration) ds.Datastore { +func WithCache(d ds.Datastore, ttl time.Duration) *datastore { return &datastore{cache: d, ttl: ttl, ttls: make(map[ds.Key]time.Time)} } @@ -94,3 +95,10 @@ func (d *datastore) Delete(key ds.Key) (err error) { func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { return d.cache.Query(q) } + +func (d *datastore) Close() error { + if c, ok := d.cache.(io.Closer); ok { + return c.Close() + } + return nil +} diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 90a3b9264..5558bf3b2 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -43,7 +43,7 @@ func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore { } type blockstore struct { - datastore ds.BatchingDatastore + datastore ds.Batching // cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it. // we do check it on `NewBlockstore` though. } diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index c9f9170c0..2366e6e21 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -94,10 +94,6 @@ type FSRepo struct { lockfile io.Closer config *config.Config ds ds.ThreadSafeDatastore - // tracked separately for use in Close; do not use directly. - leveldbDS levelds.Datastore - metricsBlocks measure.DatastoreCloser - metricsLevelDB measure.DatastoreCloser } var _ repo.Repo = (*FSRepo)(nil) @@ -352,7 +348,7 @@ func (r *FSRepo) openDatastore() error { leveldbPath := path.Join(r.path, leveldbDirectory) var err error // save leveldb reference so it can be neatly closed afterward - r.leveldbDS, err = levelds.NewDatastore(leveldbPath, &levelds.Options{ + leveldbDS, err := levelds.NewDatastore(leveldbPath, &levelds.Options{ Compression: ldbopts.NoCompression, }) if err != nil { @@ -382,16 +378,16 @@ func (r *FSRepo) openDatastore() error { id = fmt.Sprintf("uninitialized_%p", r) } prefix := "fsrepo." + id + ".datastore." - r.metricsBlocks = measure.New(prefix+"blocks", blocksDS) - r.metricsLevelDB = measure.New(prefix+"leveldb", r.leveldbDS) + metricsBlocks := measure.New(prefix+"blocks", blocksDS) + metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS) mountDS := mount.New([]mount.Mount{ { Prefix: ds.NewKey("/blocks"), - Datastore: r.metricsBlocks, + Datastore: metricsBlocks, }, { Prefix: ds.NewKey("/"), - Datastore: r.metricsLevelDB, + Datastore: metricsLevelDB, }, }) // Make sure it's ok to claim the virtual datastore from mount as @@ -400,7 +396,7 @@ func (r *FSRepo) openDatastore() error { // variants. This is the same dilemma as the `[].byte` attempt at // introducing const types to Go. var _ ds.ThreadSafeDatastore = blocksDS - var _ ds.ThreadSafeDatastore = r.leveldbDS + var _ ds.ThreadSafeDatastore = leveldbDS r.ds = ds2.ClaimThreadSafe{mountDS} return nil } @@ -420,13 +416,7 @@ func (r *FSRepo) Close() error { return errors.New("repo is closed") } - if err := r.metricsBlocks.Close(); err != nil { - return err - } - if err := r.metricsLevelDB.Close(); err != nil { - return err - } - if err := r.leveldbDS.Close(); err != nil { + if err := r.ds.(io.Closer).Close(); err != nil { return err } diff --git a/util/datastore2/datastore_closer.go b/util/datastore2/datastore_closer.go index f9168d2a3..e1a1dd30a 100644 --- a/util/datastore2/datastore_closer.go +++ b/util/datastore2/datastore_closer.go @@ -26,7 +26,7 @@ func (w *datastoreCloserWrapper) Close() error { } func (w *datastoreCloserWrapper) Batch() (datastore.Batch, error) { - bds, ok := w.ThreadSafeDatastore.(datastore.BatchingDatastore) + bds, ok := w.ThreadSafeDatastore.(datastore.Batching) if !ok { return nil, datastore.ErrBatchUnsupported } diff --git a/util/datastore2/threadsafe.go b/util/datastore2/threadsafe.go index 880d0e80d..82584bdf3 100644 --- a/util/datastore2/threadsafe.go +++ b/util/datastore2/threadsafe.go @@ -1,15 +1,22 @@ package datastore2 import ( + "io" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ) // ClaimThreadSafe claims that a Datastore is threadsafe, even when // it's type does not guarantee this. Use carefully. type ClaimThreadSafe struct { - datastore.BatchingDatastore + datastore.Batching } var _ datastore.ThreadSafeDatastore = ClaimThreadSafe{} func (ClaimThreadSafe) IsThreadSafe() {} + +// TEMP UNTIL dev0.4.0 merges and solves this ugly interface stuff +func (c ClaimThreadSafe) Close() error { + return c.Batching.(io.Closer).Close() +}