From 1f97170ef9aeb266a29ca6624490266ee0b3774c Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Tue, 4 Jul 2017 22:15:46 -0400 Subject: [PATCH] Use DatastoreConfig abstraction to create datastores. License: MIT Signed-off-by: Kevin Atkinson --- repo/fsrepo/config_test.go | 14 +- repo/fsrepo/datastores.go | 256 ++++++++++++++++++++++++++----------- repo/fsrepo/fsrepo.go | 9 +- 3 files changed, 195 insertions(+), 84 deletions(-) diff --git a/repo/fsrepo/config_test.go b/repo/fsrepo/config_test.go index 4b9b41b22..01708999a 100644 --- a/repo/fsrepo/config_test.go +++ b/repo/fsrepo/config_test.go @@ -7,10 +7,6 @@ import ( "reflect" "testing" - //"fmt" - - syncmount "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/syncmount" - //levelds "gx/ipfs/QmaHHmfEozrrotyhyN44omJouyuEtx6ahddqV6W5yRaUSQ/go-ds-leveldb" config "github.com/ipfs/go-ipfs/repo/config" ) @@ -90,10 +86,6 @@ func TestDefaultDatastoreConfig(t *testing.T) { if err != nil { t.Fatal(err) } - _, ok := ds.(*syncmount.Datastore) - if !ok { - t.Fatal("expected mount datastore at top level") - } if typ := reflect.TypeOf(ds).String(); typ != "*syncmount.Datastore" { t.Errorf("expected '*syncmount.Datastore' got '%s'", typ) } @@ -122,7 +114,7 @@ func TestLevelDbConfig(t *testing.T) { t.Fatal(err) } if typ := reflect.TypeOf(ds).String(); typ != "*leveldb.datastore" { - t.Errorf("expected '*syncmount.Datastore' got '%s'", typ) + t.Errorf("expected '*leveldb.datastore' got '%s'", typ) } } @@ -149,7 +141,7 @@ func TestFlatfsConfig(t *testing.T) { t.Fatal(err) } if typ := reflect.TypeOf(ds).String(); typ != "*flatfs.Datastore" { - t.Errorf("expected '*syncmount.Datastore' got '%s'", typ) + t.Errorf("expected '*flatfs.Datastore' got '%s'", typ) } } @@ -176,6 +168,6 @@ func TestMeasureConfig(t *testing.T) { t.Fatal(err) } if typ := reflect.TypeOf(ds).String(); typ != "*measure.measure" { - t.Errorf("expected '*syncmount.Datastore' got '%s'", typ) + t.Errorf("expected '*measure.measure' got '%s'", typ) } } diff --git a/repo/fsrepo/datastores.go b/repo/fsrepo/datastores.go index 7be3783ed..cfe60dbf7 100644 --- a/repo/fsrepo/datastores.go +++ b/repo/fsrepo/datastores.go @@ -14,63 +14,65 @@ import ( ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt" ) -func (r *FSRepo) constructDatastore(params map[string]interface{}) (repo.Datastore, error) { - switch params["type"] { - case "mount": - mounts, ok := params["mounts"].([]interface{}) - if !ok { - return nil, fmt.Errorf("'mounts' field is missing or not an array") - } +// ConfigFromMap creates a new datastore config from a map +type ConfigFromMap func(map[string]interface{}) (DatastoreConfig, error) - return r.openMountDatastore(mounts) - case "flatfs": - return r.openFlatfsDatastore(params) - case "mem": - return ds.NewMapDatastore(), nil - case "log": - childField, ok := params["child"].(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("'child' field is missing or not a map") - } - child, err := r.constructDatastore(childField) - if err != nil { - return nil, err - } - nameField, ok := params["name"].(string) - if !ok { - return nil, fmt.Errorf("'name' field was missing or not a string") - } - return ds.NewLogDatastore(child, nameField), nil - case "measure": - childField, ok := params["child"].(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("'child' field was missing or not a map") - } - child, err := r.constructDatastore(childField) - if err != nil { - return nil, err - } +type DatastoreConfig interface { + // DiskId is a unique id representing the Datastore config as stored on disk, runtime config values are not + // part of this Id. No length limit. + //DiskId() string - prefix, ok := params["prefix"].(string) - if !ok { - return nil, fmt.Errorf("'prefix' field was missing or not a string") - } + // Create instantiate a new datastore from this config + Create(path string) (repo.Datastore, error) +} - return r.openMeasureDB(prefix, child) +var datastores map[string]ConfigFromMap - case "levelds": - return r.openLeveldbDatastore(params) - default: - return nil, fmt.Errorf("unknown datastore type: %s", params["type"]) +func init() { + datastores = map[string]ConfigFromMap{ + "mount": MountDatastoreConfig, + "flatfs": FlatfsDatastoreConfig, + "levelds": LeveldsDatastoreConfig, + "mem": MemDatastoreConfig, + "log": LogDatastoreConfig, + "measure": MeasureDatastoreConfig, } } -func (r *FSRepo) openMountDatastore(mountcfg []interface{}) (repo.Datastore, error) { - var mounts []mount.Mount - for _, iface := range mountcfg { - cfg := iface.(map[string]interface{}) +func AnyDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) { + which, ok := params["type"].(string) + if !ok { + return nil, fmt.Errorf("'type' field missing or not a string") + } + fun, ok := datastores[which] + if !ok { + return nil, fmt.Errorf("unknown datastore type: %s", which) + } + return fun(params) +} - child, err := r.constructDatastore(cfg) +type mountDatastoreConfig struct { + mounts []premount +} + +type premount struct { + ds DatastoreConfig + prefix ds.Key +} + +func MountDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) { + var res mountDatastoreConfig + mounts, ok := params["mounts"].([]interface{}) + if !ok { + return nil, fmt.Errorf("'mounts' field is missing or not an array") + } + for _, iface := range mounts { + cfg, ok := iface.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("expected map for mountpoint") + } + + child, err := AnyDatastoreConfig(cfg) if err != nil { return nil, err } @@ -80,65 +82,175 @@ func (r *FSRepo) openMountDatastore(mountcfg []interface{}) (repo.Datastore, err return nil, fmt.Errorf("no 'mountpoint' on mount") } - mounts = append(mounts, mount.Mount{ - Datastore: child, - Prefix: ds.NewKey(prefix.(string)), + res.mounts = append(res.mounts, premount{ + ds: child, + prefix: ds.NewKey(prefix.(string)), }) } + return &res, nil +} + +func (c *mountDatastoreConfig) Create(path string) (repo.Datastore, error) { + mounts := make([]mount.Mount, len(c.mounts)) + for i, m := range c.mounts { + ds, err := m.ds.Create(path) + if err != nil { + return nil, err + } + mounts[i].Datastore = ds + mounts[i].Prefix = m.prefix + } return mount.New(mounts), nil } -func (r *FSRepo) openFlatfsDatastore(params map[string]interface{}) (repo.Datastore, error) { - p, ok := params["path"].(string) +type flatfsDatastoreConfig struct { + path string + shardFun *flatfs.ShardIdV1 + syncField bool +} + +func FlatfsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) { + var c flatfsDatastoreConfig + var ok bool + var err error + + c.path, ok = params["path"].(string) if !ok { return nil, fmt.Errorf("'path' field is missing or not boolean") } - if !filepath.IsAbs(p) { - p = filepath.Join(r.path, p) - } sshardFun, ok := params["shardFunc"].(string) if !ok { return nil, fmt.Errorf("'shardFunc' field is missing or not a string") } - shardFun, err := flatfs.ParseShardFunc(sshardFun) + c.shardFun, err = flatfs.ParseShardFunc(sshardFun) if err != nil { return nil, err } - syncField, ok := params["sync"].(bool) + c.syncField, ok = params["sync"].(bool) if !ok { return nil, fmt.Errorf("'sync' field is missing or not boolean") } - return flatfs.CreateOrOpen(p, shardFun, syncField) + return &c, nil } -func (r *FSRepo) openLeveldbDatastore(params map[string]interface{}) (repo.Datastore, error) { - p, ok := params["path"].(string) +func (c *flatfsDatastoreConfig) Create(path string) (repo.Datastore, error) { + p := c.path + if !filepath.IsAbs(p) { + p = filepath.Join(path, p) + } + + return flatfs.CreateOrOpen(p, c.shardFun, c.syncField) +} + +type leveldsDatastoreConfig struct { + path string + compression ldbopts.Compression +} + +func LeveldsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) { + var c leveldsDatastoreConfig + var ok bool + + c.path, ok = params["path"].(string) if !ok { return nil, fmt.Errorf("'path' field is missing or not string") } - if !filepath.IsAbs(p) { - p = filepath.Join(r.path, p) - } - var c ldbopts.Compression switch params["compression"].(string) { case "none": - c = ldbopts.NoCompression + c.compression = ldbopts.NoCompression case "snappy": - c = ldbopts.SnappyCompression + c.compression = ldbopts.SnappyCompression case "": fallthrough default: - c = ldbopts.DefaultCompression + c.compression = ldbopts.DefaultCompression } + + return &c, nil +} + +func (c *leveldsDatastoreConfig) Create(path string) (repo.Datastore, error) { + p := c.path + if !filepath.IsAbs(p) { + p = filepath.Join(path, p) + } + return levelds.NewDatastore(p, &levelds.Options{ - Compression: c, + Compression: c.compression, }) } -func (r *FSRepo) openMeasureDB(prefix string, child repo.Datastore) (repo.Datastore, error) { - return measure.New(prefix, child), nil +type memDatastoreConfig struct { + cfg map[string]interface{} +} + +func MemDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) { + return &memDatastoreConfig{params}, nil +} + +func (c *memDatastoreConfig) Create(string) (repo.Datastore, error) { + return ds.NewMapDatastore(), nil +} + +type logDatastoreConfig struct { + child DatastoreConfig + name string +} + +func LogDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) { + childField, ok := params["child"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("'child' field is missing or not a map") + } + child, err := AnyDatastoreConfig(childField) + if err != nil { + return nil, err + } + name, ok := params["name"].(string) + if !ok { + return nil, fmt.Errorf("'name' field was missing or not a string") + } + return &logDatastoreConfig{child, name}, nil + +} + +func (c *logDatastoreConfig) Create(path string) (repo.Datastore, error) { + child, err := c.child.Create(path) + if err != nil { + return nil, err + } + return ds.NewLogDatastore(child, c.name), nil +} + +type measureDatastoreConfig struct { + child DatastoreConfig + prefix string +} + +func MeasureDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) { + childField, ok := params["child"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("'child' field is missing or not a map") + } + child, err := AnyDatastoreConfig(childField) + if err != nil { + return nil, err + } + prefix, ok := params["prefix"].(string) + if !ok { + return nil, fmt.Errorf("'prefix' field was missing or not a string") + } + return &measureDatastoreConfig{child, prefix}, nil +} + +func (c measureDatastoreConfig) Create(path string) (repo.Datastore, error) { + child, err := c.child.Create(path) + if err != nil { + return nil, err + } + return measure.New(c.prefix, child), nil } diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index bfe2c20fc..971b25611 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -362,7 +362,6 @@ func (r *FSRepo) openDatastore() error { if err != nil { return err } - r.ds = d } else if r.config.Datastore.Type != "" || r.config.Datastore.Path != "" { return fmt.Errorf("old style datatstore config detected") @@ -377,6 +376,14 @@ func (r *FSRepo) openDatastore() error { return nil } +func (r *FSRepo) constructDatastore(params map[string]interface{}) (repo.Datastore, error) { + cfg, err := AnyDatastoreConfig(params) + if err != nil { + return nil, err + } + return cfg.Create(r.path) +} + // Close closes the FSRepo, releasing held resources. func (r *FSRepo) Close() error { packageLock.Lock()