diff --git a/repo/fsrepo/config_test.go b/repo/fsrepo/config_test.go index 01708999a..1c62241a2 100644 --- a/repo/fsrepo/config_test.go +++ b/repo/fsrepo/config_test.go @@ -75,17 +75,28 @@ func TestDefaultDatastoreConfig(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(dir) // clean up - repo := FSRepo{path: dir} config := new(config.Datastore) err = json.Unmarshal(defaultConfig, config) if err != nil { t.Fatal(err) } - ds, err := repo.constructDatastore(config.Spec) + + dsc, err := AnyDatastoreConfig(config.Spec) if err != nil { t.Fatal(err) } + + expected := "/blocks:{flatfs;blocks;/repo/flatfs/shard/v1/next-to-last/2};/:{levelds;datastore};" + if dsc.DiskId() != expected { + t.Errorf("expected '%s' got '%s' as DiskId", expected, dsc.DiskId()) + } + + ds, err := dsc.Create(dir) + if err != nil { + t.Fatal(err) + } + if typ := reflect.TypeOf(ds).String(); typ != "*syncmount.Datastore" { t.Errorf("expected '*syncmount.Datastore' got '%s'", typ) } @@ -102,17 +113,28 @@ func TestLevelDbConfig(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(dir) // clean up - repo := FSRepo{path: dir} spec := make(map[string]interface{}) err = json.Unmarshal(leveldbConfig, &spec) if err != nil { t.Fatal(err) } - ds, err := repo.constructDatastore(spec) + + dsc, err := AnyDatastoreConfig(spec) if err != nil { t.Fatal(err) } + + expected := "levelds;datastore" + if dsc.DiskId() != expected { + t.Errorf("expected '%s' got '%s' as DiskId", expected, dsc.DiskId()) + } + + ds, err := dsc.Create(dir) + if err != nil { + t.Fatal(err) + } + if typ := reflect.TypeOf(ds).String(); typ != "*leveldb.datastore" { t.Errorf("expected '*leveldb.datastore' got '%s'", typ) } @@ -129,17 +151,28 @@ func TestFlatfsConfig(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(dir) // clean up - repo := FSRepo{path: dir} spec := make(map[string]interface{}) err = json.Unmarshal(flatfsConfig, &spec) if err != nil { t.Fatal(err) } - ds, err := repo.constructDatastore(spec) + + dsc, err := AnyDatastoreConfig(spec) if err != nil { t.Fatal(err) } + + expected := "flatfs;blocks;/repo/flatfs/shard/v1/next-to-last/2" + if dsc.DiskId() != expected { + t.Errorf("expected '%s' got '%s' as DiskId", expected, dsc.DiskId()) + } + + ds, err := dsc.Create(dir) + if err != nil { + t.Fatal(err) + } + if typ := reflect.TypeOf(ds).String(); typ != "*flatfs.Datastore" { t.Errorf("expected '*flatfs.Datastore' got '%s'", typ) } @@ -156,17 +189,28 @@ func TestMeasureConfig(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(dir) // clean up - repo := FSRepo{path: dir} spec := make(map[string]interface{}) err = json.Unmarshal(measureConfig, &spec) if err != nil { t.Fatal(err) } - ds, err := repo.constructDatastore(spec) + + dsc, err := AnyDatastoreConfig(spec) if err != nil { t.Fatal(err) } + + expected := "flatfs;blocks;/repo/flatfs/shard/v1/next-to-last/2" + if dsc.DiskId() != expected { + t.Errorf("expected '%s' got '%s' as DiskId", expected, dsc.DiskId()) + } + + ds, err := dsc.Create(dir) + if err != nil { + t.Fatal(err) + } + if typ := reflect.TypeOf(ds).String(); typ != "*measure.measure" { t.Errorf("expected '*measure.measure' got '%s'", typ) } diff --git a/repo/fsrepo/datastores.go b/repo/fsrepo/datastores.go index cfe60dbf7..807097ac3 100644 --- a/repo/fsrepo/datastores.go +++ b/repo/fsrepo/datastores.go @@ -1,6 +1,7 @@ package fsrepo import ( + "bytes" "fmt" "path/filepath" @@ -18,9 +19,11 @@ import ( type ConfigFromMap func(map[string]interface{}) (DatastoreConfig, error) type DatastoreConfig interface { - // DiskId is a unique id representing the Datastore config as stored on disk, runtime config values are not - // part of this Id. No length limit. - //DiskId() string + // DiskId is a unique id representing the Datastore config as + // stored on disk, runtime config values are not part of this Id. + // Returns an empty string if the datastore does not have an on + // disk representation. No length limit. + DiskId() string // Create instantiate a new datastore from this config Create(path string) (repo.Datastore, error) @@ -91,6 +94,14 @@ func MountDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error return &res, nil } +func (c *mountDatastoreConfig) DiskId() string { + buf := new(bytes.Buffer) + for _, m := range c.mounts { + fmt.Fprintf(buf, "%s:{%s};", m.prefix.String(), m.ds.DiskId()) + } + return buf.String() +} + func (c *mountDatastoreConfig) Create(path string) (repo.Datastore, error) { mounts := make([]mount.Mount, len(c.mounts)) for i, m := range c.mounts { @@ -136,6 +147,10 @@ func FlatfsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, erro return &c, nil } +func (c *flatfsDatastoreConfig) DiskId() string { + return fmt.Sprintf("flatfs;%s;%s", c.path, c.shardFun.String()) +} + func (c *flatfsDatastoreConfig) Create(path string) (repo.Datastore, error) { p := c.path if !filepath.IsAbs(p) { @@ -173,6 +188,10 @@ func LeveldsDatastoreConfig(params map[string]interface{}) (DatastoreConfig, err return &c, nil } +func (c *leveldsDatastoreConfig) DiskId() string { + return fmt.Sprintf("levelds;%s", c.path) +} + func (c *leveldsDatastoreConfig) Create(path string) (repo.Datastore, error) { p := c.path if !filepath.IsAbs(p) { @@ -192,6 +211,10 @@ func MemDatastoreConfig(params map[string]interface{}) (DatastoreConfig, error) return &memDatastoreConfig{params}, nil } +func (c *memDatastoreConfig) DiskId() string { + return "" +} + func (c *memDatastoreConfig) Create(string) (repo.Datastore, error) { return ds.NewMapDatastore(), nil } @@ -226,6 +249,10 @@ func (c *logDatastoreConfig) Create(path string) (repo.Datastore, error) { return ds.NewLogDatastore(child, c.name), nil } +func (c *logDatastoreConfig) DiskId() string { + return c.child.DiskId() +} + type measureDatastoreConfig struct { child DatastoreConfig prefix string @@ -247,6 +274,10 @@ func MeasureDatastoreConfig(params map[string]interface{}) (DatastoreConfig, err return &measureDatastoreConfig{child, prefix}, nil } +func (c *measureDatastoreConfig) DiskId() string { + return c.child.DiskId() +} + func (c measureDatastoreConfig) Create(path string) (repo.Datastore, error) { child, err := c.child.Create(path) if err != nil { diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 971b25611..8b6a3717c 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -1,6 +1,7 @@ package fsrepo import ( + "bytes" "errors" "fmt" "io" @@ -357,18 +358,39 @@ func (r *FSRepo) openKeystore() error { // openDatastore returns an error if the config file is not present. func (r *FSRepo) openDatastore() error { - if r.config.Datastore.Spec != nil { - d, err := r.constructDatastore(r.config.Datastore.Spec) + if r.config.Datastore.Type != "" || r.config.Datastore.Path != "" { + return fmt.Errorf("old style datatstore config detected") + } else if r.config.Datastore.Spec == nil { + return fmt.Errorf("required Datastore.Spec entry missing form config file") + } + + dsc, err := AnyDatastoreConfig(r.config.Datastore.Spec) + if err != nil { + return err + } + diskId := dsc.DiskId() + + oldDiskId, err := r.readDiskId() + if err == nil { + if oldDiskId != diskId { + return fmt.Errorf("Datastore configuration of '%s' does not match what is on disk '%s'", + oldDiskId, diskId) + } + } else if os.IsNotExist(err) { + err := r.writeDiskId(diskId) if err != nil { return err } - r.ds = d - } else if r.config.Datastore.Type != "" || r.config.Datastore.Path != "" { - return fmt.Errorf("old style datatstore config detected") } else { - return fmt.Errorf("required Datastore.Spec entry missing form config file") + return err } + d, err := dsc.Create(r.path) + if err != nil { + return err + } + r.ds = d + // Wrap it with metrics gathering prefix := "ipfs.fsrepo.datastore" r.ds = measure.New(prefix, r.ds) @@ -376,12 +398,31 @@ func (r *FSRepo) openDatastore() error { return nil } -func (r *FSRepo) constructDatastore(params map[string]interface{}) (repo.Datastore, error) { - cfg, err := AnyDatastoreConfig(params) +var DiskIdFn = "dsid" + +func (r *FSRepo) readDiskId() (string, error) { + fn, err := config.Path(r.path, DiskIdFn) if err != nil { - return nil, err + return "", err } - return cfg.Create(r.path) + b, err := ioutil.ReadFile(fn) + if err != nil { + return "", err + } + b = bytes.TrimSpace(b) + return string(b), nil +} + +func (r *FSRepo) writeDiskId(newId string) error { + fn, err := config.Path(r.path, DiskIdFn) + if err != nil { + return err + } + err = ioutil.WriteFile(fn, []byte(newId), 0666) + if err != nil { + return err + } + return nil } // Close closes the FSRepo, releasing held resources.