Use DatastoreConfig abstraction to create datastores.

License: MIT
Signed-off-by: Kevin Atkinson <k@kevina.org>
This commit is contained in:
Kevin Atkinson 2017-07-04 22:15:46 -04:00 committed by Jeromy
parent 5a8c17104d
commit 1f97170ef9
3 changed files with 195 additions and 84 deletions

View File

@ -7,10 +7,6 @@ import (
"reflect"
"testing"
//"fmt"
syncmount "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/syncmount"
//levelds "gx/ipfs/QmaHHmfEozrrotyhyN44omJouyuEtx6ahddqV6W5yRaUSQ/go-ds-leveldb"
config "github.com/ipfs/go-ipfs/repo/config"
)
@ -90,10 +86,6 @@ func TestDefaultDatastoreConfig(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, ok := ds.(*syncmount.Datastore)
if !ok {
t.Fatal("expected mount datastore at top level")
}
if typ := reflect.TypeOf(ds).String(); typ != "*syncmount.Datastore" {
t.Errorf("expected '*syncmount.Datastore' got '%s'", typ)
}
@ -122,7 +114,7 @@ func TestLevelDbConfig(t *testing.T) {
t.Fatal(err)
}
if typ := reflect.TypeOf(ds).String(); typ != "*leveldb.datastore" {
t.Errorf("expected '*syncmount.Datastore' got '%s'", typ)
t.Errorf("expected '*leveldb.datastore' got '%s'", typ)
}
}
@ -149,7 +141,7 @@ func TestFlatfsConfig(t *testing.T) {
t.Fatal(err)
}
if typ := reflect.TypeOf(ds).String(); typ != "*flatfs.Datastore" {
t.Errorf("expected '*syncmount.Datastore' got '%s'", typ)
t.Errorf("expected '*flatfs.Datastore' got '%s'", typ)
}
}
@ -176,6 +168,6 @@ func TestMeasureConfig(t *testing.T) {
t.Fatal(err)
}
if typ := reflect.TypeOf(ds).String(); typ != "*measure.measure" {
t.Errorf("expected '*syncmount.Datastore' got '%s'", typ)
t.Errorf("expected '*measure.measure' got '%s'", typ)
}
}

View File

@ -14,63 +14,65 @@ import (
ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt"
)
func (r *FSRepo) constructDatastore(params map[string]interface{}) (repo.Datastore, error) {
switch params["type"] {
case "mount":
mounts, ok := params["mounts"].([]interface{})
if !ok {
return nil, fmt.Errorf("'mounts' field is missing or not an array")
}
// ConfigFromMap creates a new datastore config from a map
type ConfigFromMap func(map[string]interface{}) (DatastoreConfig, error)
return r.openMountDatastore(mounts)
case "flatfs":
return r.openFlatfsDatastore(params)
case "mem":
return ds.NewMapDatastore(), nil
case "log":
childField, ok := params["child"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("'child' field is missing or not a map")
}
child, err := r.constructDatastore(childField)
if err != nil {
return nil, err
}
nameField, ok := params["name"].(string)
if !ok {
return nil, fmt.Errorf("'name' field was missing or not a string")
}
return ds.NewLogDatastore(child, nameField), nil
case "measure":
childField, ok := params["child"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("'child' field was missing or not a map")
}
child, err := r.constructDatastore(childField)
if err != nil {
return nil, err
}
type DatastoreConfig interface {
// DiskId is a unique id representing the Datastore config as stored on disk, runtime config values are not
// part of this Id. No length limit.
//DiskId() string
prefix, ok := params["prefix"].(string)
if !ok {
return nil, fmt.Errorf("'prefix' field was missing or not a string")
}
// Create instantiate a new datastore from this config
Create(path string) (repo.Datastore, error)
}
return r.openMeasureDB(prefix, child)
var datastores map[string]ConfigFromMap
case "levelds":
return r.openLeveldbDatastore(params)
default:
return nil, fmt.Errorf("unknown datastore type: %s", params["type"])
func init() {
datastores = map[string]ConfigFromMap{
"mount": MountDatastoreConfig,
"flatfs": FlatfsDatastoreConfig,
"levelds": LeveldsDatastoreConfig,
"mem": MemDatastoreConfig,
"log": LogDatastoreConfig,
"measure": MeasureDatastoreConfig,
}
}
func (r *FSRepo) openMountDatastore(mountcfg []interface{}) (repo.Datastore, error) {
var mounts []mount.Mount
for _, iface := range mountcfg {
cfg := iface.(map[string]interface{})
func AnyDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
which, ok := params["type"].(string)
if !ok {
return nil, fmt.Errorf("'type' field missing or not a string")
}
fun, ok := datastores[which]
if !ok {
return nil, fmt.Errorf("unknown datastore type: %s", which)
}
return fun(params)
}
child, err := r.constructDatastore(cfg)
type mountDatastoreConfig struct {
mounts []premount
}
type premount struct {
ds DatastoreConfig
prefix ds.Key
}
func MountDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
var res mountDatastoreConfig
mounts, ok := params["mounts"].([]interface{})
if !ok {
return nil, fmt.Errorf("'mounts' field is missing or not an array")
}
for _, iface := range mounts {
cfg, ok := iface.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("expected map for mountpoint")
}
child, err := AnyDatastoreConfig(cfg)
if err != nil {
return nil, err
}
@ -80,65 +82,175 @@ func (r *FSRepo) openMountDatastore(mountcfg []interface{}) (repo.Datastore, err
return nil, fmt.Errorf("no 'mountpoint' on mount")
}
mounts = append(mounts, mount.Mount{
Datastore: child,
Prefix: ds.NewKey(prefix.(string)),
res.mounts = append(res.mounts, premount{
ds: child,
prefix: ds.NewKey(prefix.(string)),
})
}
return &res, nil
}
func (c *mountDatastoreConfig) Create(path string) (repo.Datastore, error) {
mounts := make([]mount.Mount, len(c.mounts))
for i, m := range c.mounts {
ds, err := m.ds.Create(path)
if err != nil {
return nil, err
}
mounts[i].Datastore = ds
mounts[i].Prefix = m.prefix
}
return mount.New(mounts), nil
}
func (r *FSRepo) openFlatfsDatastore(params map[string]interface{}) (repo.Datastore, error) {
p, ok := params["path"].(string)
type flatfsDatastoreConfig struct {
path string
shardFun *flatfs.ShardIdV1
syncField bool
}
func FlatfsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
var c flatfsDatastoreConfig
var ok bool
var err error
c.path, ok = params["path"].(string)
if !ok {
return nil, fmt.Errorf("'path' field is missing or not boolean")
}
if !filepath.IsAbs(p) {
p = filepath.Join(r.path, p)
}
sshardFun, ok := params["shardFunc"].(string)
if !ok {
return nil, fmt.Errorf("'shardFunc' field is missing or not a string")
}
shardFun, err := flatfs.ParseShardFunc(sshardFun)
c.shardFun, err = flatfs.ParseShardFunc(sshardFun)
if err != nil {
return nil, err
}
syncField, ok := params["sync"].(bool)
c.syncField, ok = params["sync"].(bool)
if !ok {
return nil, fmt.Errorf("'sync' field is missing or not boolean")
}
return flatfs.CreateOrOpen(p, shardFun, syncField)
return &c, nil
}
func (r *FSRepo) openLeveldbDatastore(params map[string]interface{}) (repo.Datastore, error) {
p, ok := params["path"].(string)
func (c *flatfsDatastoreConfig) Create(path string) (repo.Datastore, error) {
p := c.path
if !filepath.IsAbs(p) {
p = filepath.Join(path, p)
}
return flatfs.CreateOrOpen(p, c.shardFun, c.syncField)
}
type leveldsDatastoreConfig struct {
path string
compression ldbopts.Compression
}
func LeveldsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
var c leveldsDatastoreConfig
var ok bool
c.path, ok = params["path"].(string)
if !ok {
return nil, fmt.Errorf("'path' field is missing or not string")
}
if !filepath.IsAbs(p) {
p = filepath.Join(r.path, p)
}
var c ldbopts.Compression
switch params["compression"].(string) {
case "none":
c = ldbopts.NoCompression
c.compression = ldbopts.NoCompression
case "snappy":
c = ldbopts.SnappyCompression
c.compression = ldbopts.SnappyCompression
case "":
fallthrough
default:
c = ldbopts.DefaultCompression
c.compression = ldbopts.DefaultCompression
}
return &c, nil
}
func (c *leveldsDatastoreConfig) Create(path string) (repo.Datastore, error) {
p := c.path
if !filepath.IsAbs(p) {
p = filepath.Join(path, p)
}
return levelds.NewDatastore(p, &levelds.Options{
Compression: c,
Compression: c.compression,
})
}
func (r *FSRepo) openMeasureDB(prefix string, child repo.Datastore) (repo.Datastore, error) {
return measure.New(prefix, child), nil
type memDatastoreConfig struct {
cfg map[string]interface{}
}
func MemDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
return &memDatastoreConfig{params}, nil
}
func (c *memDatastoreConfig) Create(string) (repo.Datastore, error) {
return ds.NewMapDatastore(), nil
}
type logDatastoreConfig struct {
child DatastoreConfig
name string
}
func LogDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
childField, ok := params["child"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("'child' field is missing or not a map")
}
child, err := AnyDatastoreConfig(childField)
if err != nil {
return nil, err
}
name, ok := params["name"].(string)
if !ok {
return nil, fmt.Errorf("'name' field was missing or not a string")
}
return &logDatastoreConfig{child, name}, nil
}
func (c *logDatastoreConfig) Create(path string) (repo.Datastore, error) {
child, err := c.child.Create(path)
if err != nil {
return nil, err
}
return ds.NewLogDatastore(child, c.name), nil
}
type measureDatastoreConfig struct {
child DatastoreConfig
prefix string
}
func MeasureDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) {
childField, ok := params["child"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("'child' field is missing or not a map")
}
child, err := AnyDatastoreConfig(childField)
if err != nil {
return nil, err
}
prefix, ok := params["prefix"].(string)
if !ok {
return nil, fmt.Errorf("'prefix' field was missing or not a string")
}
return &measureDatastoreConfig{child, prefix}, nil
}
func (c measureDatastoreConfig) Create(path string) (repo.Datastore, error) {
child, err := c.child.Create(path)
if err != nil {
return nil, err
}
return measure.New(c.prefix, child), nil
}

View File

@ -362,7 +362,6 @@ func (r *FSRepo) openDatastore() error {
if err != nil {
return err
}
r.ds = d
} else if r.config.Datastore.Type != "" || r.config.Datastore.Path != "" {
return fmt.Errorf("old style datatstore config detected")
@ -377,6 +376,14 @@ func (r *FSRepo) openDatastore() error {
return nil
}
func (r *FSRepo) constructDatastore(params map[string]interface{}) (repo.Datastore, error) {
cfg, err := AnyDatastoreConfig(params)
if err != nil {
return nil, err
}
return cfg.Create(r.path)
}
// Close closes the FSRepo, releasing held resources.
func (r *FSRepo) Close() error {
packageLock.Lock()