mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
fsrepo components simplification: directly use datastore
This commit is contained in:
parent
695b2c491a
commit
24a32af3f1
@ -1,15 +0,0 @@
|
||||
package component
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/jbenet/go-ipfs/repo/config"
|
||||
)
|
||||
|
||||
type Component interface {
|
||||
Open(*config.Config) error
|
||||
io.Closer
|
||||
SetPath(string)
|
||||
}
|
||||
type Initializer func(path string, conf *config.Config) error
|
||||
type InitializationChecker func(path string) bool
|
||||
@ -1,113 +0,0 @@
|
||||
package component
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
levelds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb"
|
||||
ldbopts "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
|
||||
config "github.com/jbenet/go-ipfs/repo/config"
|
||||
counter "github.com/jbenet/go-ipfs/repo/fsrepo/counter"
|
||||
dir "github.com/jbenet/go-ipfs/thirdparty/dir"
|
||||
util "github.com/jbenet/go-ipfs/util"
|
||||
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
|
||||
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultDataStoreDirectory = "datastore"
|
||||
)
|
||||
|
||||
var (
|
||||
_ Component = &DatastoreComponent{}
|
||||
_ Initializer = InitDatastoreComponent
|
||||
_ InitializationChecker = DatastoreComponentIsInitialized
|
||||
|
||||
dsLock sync.Mutex // protects openersCounter and datastores
|
||||
openersCounter *counter.Openers
|
||||
datastores map[string]ds2.ThreadSafeDatastoreCloser
|
||||
)
|
||||
|
||||
func init() {
|
||||
openersCounter = counter.NewOpenersCounter()
|
||||
datastores = make(map[string]ds2.ThreadSafeDatastoreCloser)
|
||||
}
|
||||
|
||||
func InitDatastoreComponent(dspath string, conf *config.Config) error {
|
||||
// The actual datastore contents are initialized lazily when Opened.
|
||||
// During Init, we merely check that the directory is writeable.
|
||||
p := path.Join(dspath, DefaultDataStoreDirectory)
|
||||
if err := dir.Writable(p); err != nil {
|
||||
return debugerror.Errorf("datastore: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DatastoreComponentIsInitialized returns true if the datastore dir exists.
|
||||
func DatastoreComponentIsInitialized(dspath string) bool {
|
||||
if !util.FileExists(path.Join(dspath, DefaultDataStoreDirectory)) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// DatastoreComponent abstracts the datastore component of the FSRepo.
|
||||
type DatastoreComponent struct {
|
||||
path string // required
|
||||
ds ds2.ThreadSafeDatastoreCloser // assigned when repo is opened
|
||||
}
|
||||
|
||||
func (dsc *DatastoreComponent) SetPath(p string) {
|
||||
dsc.path = path.Join(p, DefaultDataStoreDirectory)
|
||||
}
|
||||
|
||||
func (dsc *DatastoreComponent) Datastore() datastore.ThreadSafeDatastore { return dsc.ds }
|
||||
|
||||
// Open returns an error if the config file is not present.
|
||||
func (dsc *DatastoreComponent) Open(*config.Config) error {
|
||||
|
||||
dsLock.Lock()
|
||||
defer dsLock.Unlock()
|
||||
|
||||
// if no other goroutines have the datastore Open, initialize it and assign
|
||||
// it to the package-scoped map for the goroutines that follow.
|
||||
if openersCounter.NumOpeners(dsc.path) == 0 {
|
||||
ds, err := levelds.NewDatastore(dsc.path, &levelds.Options{
|
||||
Compression: ldbopts.NoCompression,
|
||||
})
|
||||
if err != nil {
|
||||
return debugerror.New("unable to open leveldb datastore")
|
||||
}
|
||||
datastores[dsc.path] = ds
|
||||
}
|
||||
|
||||
// get the datastore from the package-scoped map and record self as an
|
||||
// opener.
|
||||
ds, dsIsPresent := datastores[dsc.path]
|
||||
if !dsIsPresent {
|
||||
// This indicates a programmer error has occurred.
|
||||
return errors.New("datastore should be available, but it isn't")
|
||||
}
|
||||
dsc.ds = ds
|
||||
openersCounter.AddOpener(dsc.path) // only after success
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dsc *DatastoreComponent) Close() error {
|
||||
|
||||
dsLock.Lock()
|
||||
defer dsLock.Unlock()
|
||||
|
||||
// decrement the Opener count. if this goroutine is the last, also close
|
||||
// the underlying datastore (and remove its reference from the map)
|
||||
|
||||
openersCounter.RemoveOpener(dsc.path)
|
||||
|
||||
if openersCounter.NumOpeners(dsc.path) == 0 {
|
||||
delete(datastores, dsc.path) // remove the reference
|
||||
return dsc.ds.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -1,30 +0,0 @@
|
||||
package component
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/jbenet/go-ipfs/thirdparty/assert"
|
||||
)
|
||||
|
||||
// swap arg order
|
||||
func testRepoPath(t *testing.T, path ...string) string {
|
||||
name, err := ioutil.TempDir("", filepath.Join(path...))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return name
|
||||
}
|
||||
|
||||
func TestOpenMoreThanOnceInSameProcess(t *testing.T) {
|
||||
t.Parallel()
|
||||
path := testRepoPath(t)
|
||||
dsc1 := DatastoreComponent{path: path}
|
||||
dsc2 := DatastoreComponent{path: path}
|
||||
assert.Nil(dsc1.Open(nil), t, "first repo should open successfully")
|
||||
assert.Nil(dsc2.Open(nil), t, "second repo should open successfully")
|
||||
|
||||
assert.Nil(dsc1.Close(), t)
|
||||
assert.Nil(dsc2.Close(), t)
|
||||
}
|
||||
@ -10,10 +10,11 @@ import (
|
||||
"sync"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
levelds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb"
|
||||
ldbopts "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
|
||||
repo "github.com/jbenet/go-ipfs/repo"
|
||||
"github.com/jbenet/go-ipfs/repo/common"
|
||||
config "github.com/jbenet/go-ipfs/repo/config"
|
||||
component "github.com/jbenet/go-ipfs/repo/fsrepo/component"
|
||||
counter "github.com/jbenet/go-ipfs/repo/fsrepo/counter"
|
||||
lockfile "github.com/jbenet/go-ipfs/repo/fsrepo/lock"
|
||||
serialize "github.com/jbenet/go-ipfs/repo/fsrepo/serialize"
|
||||
@ -21,9 +22,14 @@ import (
|
||||
"github.com/jbenet/go-ipfs/thirdparty/eventlog"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
util "github.com/jbenet/go-ipfs/util"
|
||||
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
|
||||
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultDataStoreDirectory = "datastore"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
// packageLock must be held to while performing any operation that modifies an
|
||||
@ -40,11 +46,19 @@ var (
|
||||
// If an operation is used when repo is Open and the operation does not
|
||||
// change the repo's state, the package lock does not need to be acquired.
|
||||
openersCounter *counter.Openers
|
||||
|
||||
// protects dsOpenersCounter and datastores
|
||||
dsLock sync.Mutex
|
||||
dsOpenersCounter *counter.Openers
|
||||
datastores map[string]ds2.ThreadSafeDatastoreCloser
|
||||
)
|
||||
|
||||
func init() {
|
||||
openersCounter = counter.NewOpenersCounter()
|
||||
lockfiles = make(map[string]io.Closer)
|
||||
|
||||
dsOpenersCounter = counter.NewOpenersCounter()
|
||||
datastores = make(map[string]ds2.ThreadSafeDatastoreCloser)
|
||||
}
|
||||
|
||||
// FSRepo represents an IPFS FileSystem Repo. It is safe for use by multiple
|
||||
@ -56,19 +70,12 @@ type FSRepo struct {
|
||||
path string
|
||||
// config is set on Open, guarded by packageLock
|
||||
config *config.Config
|
||||
|
||||
// TODO test
|
||||
datastoreComponent component.DatastoreComponent
|
||||
// ds is set on Open
|
||||
ds ds2.ThreadSafeDatastoreCloser
|
||||
}
|
||||
|
||||
var _ repo.Repo = (*FSRepo)(nil)
|
||||
|
||||
type componentBuilder struct {
|
||||
Init component.Initializer
|
||||
IsInitialized component.InitializationChecker
|
||||
OpenHandler func(*FSRepo) error
|
||||
}
|
||||
|
||||
// At returns a handle to an FSRepo at the provided |path|.
|
||||
func At(repoPath string) *FSRepo {
|
||||
// This method must not have side-effects.
|
||||
@ -141,10 +148,11 @@ func Init(repoPath string, conf *config.Config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, b := range componentBuilders() {
|
||||
if err := b.Init(repoPath, conf); err != nil {
|
||||
return err
|
||||
}
|
||||
// The actual datastore contents are initialized lazily when Opened.
|
||||
// During Init, we merely check that the directory is writeable.
|
||||
p := path.Join(repoPath, defaultDataStoreDirectory)
|
||||
if err := dir.Writable(p); err != nil {
|
||||
return debugerror.Errorf("datastore: %s", err)
|
||||
}
|
||||
|
||||
if err := dir.Writable(path.Join(repoPath, "logs")); err != nil {
|
||||
@ -196,6 +204,37 @@ func (r *FSRepo) openConfig() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// openDatastore returns an error if the config file is not present.
|
||||
func (r *FSRepo) openDatastore() error {
|
||||
dsLock.Lock()
|
||||
defer dsLock.Unlock()
|
||||
|
||||
dsPath := path.Join(r.path, defaultDataStoreDirectory)
|
||||
|
||||
// if no other goroutines have the datastore Open, initialize it and assign
|
||||
// it to the package-scoped map for the goroutines that follow.
|
||||
if dsOpenersCounter.NumOpeners(dsPath) == 0 {
|
||||
ds, err := levelds.NewDatastore(dsPath, &levelds.Options{
|
||||
Compression: ldbopts.NoCompression,
|
||||
})
|
||||
if err != nil {
|
||||
return debugerror.New("unable to open leveldb datastore")
|
||||
}
|
||||
datastores[dsPath] = ds
|
||||
}
|
||||
|
||||
// get the datastore from the package-scoped map and record self as an
|
||||
// opener.
|
||||
ds, dsIsPresent := datastores[dsPath]
|
||||
if !dsIsPresent {
|
||||
// This indicates a programmer error has occurred.
|
||||
return errors.New("datastore should be available, but it isn't")
|
||||
}
|
||||
r.ds = ds
|
||||
dsOpenersCounter.AddOpener(dsPath) // only after success
|
||||
return nil
|
||||
}
|
||||
|
||||
func configureEventLoggerAtRepoPath(c *config.Config, repoPath string) {
|
||||
eventlog.Configure(eventlog.LevelInfo)
|
||||
eventlog.Configure(eventlog.LdJSONFormatter)
|
||||
@ -240,10 +279,8 @@ func (r *FSRepo) Open() error {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, b := range componentBuilders() {
|
||||
if err := b.OpenHandler(r); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.openDatastore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// log.Debugf("writing eventlogs to ...", c.path)
|
||||
@ -252,6 +289,24 @@ func (r *FSRepo) Open() error {
|
||||
return r.transitionToOpened()
|
||||
}
|
||||
|
||||
func (r *FSRepo) closeDatastore() error {
|
||||
dsLock.Lock()
|
||||
defer dsLock.Unlock()
|
||||
|
||||
dsPath := path.Join(r.path, defaultDataStoreDirectory)
|
||||
|
||||
// decrement the Opener count. if this goroutine is the last, also close
|
||||
// the underlying datastore (and remove its reference from the map)
|
||||
|
||||
dsOpenersCounter.RemoveOpener(dsPath)
|
||||
|
||||
if dsOpenersCounter.NumOpeners(dsPath) == 0 {
|
||||
delete(datastores, dsPath) // remove the reference
|
||||
return r.ds.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the FSRepo, releasing held resources.
|
||||
func (r *FSRepo) Close() error {
|
||||
packageLock.Lock()
|
||||
@ -261,10 +316,8 @@ func (r *FSRepo) Close() error {
|
||||
return debugerror.Errorf("repo is %s", r.state)
|
||||
}
|
||||
|
||||
for _, closer := range r.components() {
|
||||
if err := closer.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.closeDatastore(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// This code existed in the previous versions, but
|
||||
@ -395,7 +448,7 @@ func (r *FSRepo) SetConfigKey(key string, value interface{}) error {
|
||||
// is undefined.
|
||||
func (r *FSRepo) Datastore() ds.ThreadSafeDatastore {
|
||||
packageLock.Lock()
|
||||
d := r.datastoreComponent.Datastore()
|
||||
d := r.ds
|
||||
packageLock.Unlock()
|
||||
return d
|
||||
}
|
||||
@ -421,10 +474,8 @@ func isInitializedUnsynced(repoPath string) bool {
|
||||
if !configIsInitialized(repoPath) {
|
||||
return false
|
||||
}
|
||||
for _, b := range componentBuilders() {
|
||||
if !b.IsInitialized(repoPath) {
|
||||
return false
|
||||
}
|
||||
if !util.FileExists(path.Join(repoPath, defaultDataStoreDirectory)) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
@ -462,30 +513,3 @@ func (r *FSRepo) transitionToClosed() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// components returns the FSRepo's constituent components
|
||||
func (r *FSRepo) components() []component.Component {
|
||||
return []component.Component{
|
||||
&r.datastoreComponent,
|
||||
}
|
||||
}
|
||||
|
||||
func componentBuilders() []componentBuilder {
|
||||
return []componentBuilder{
|
||||
|
||||
// DatastoreComponent
|
||||
componentBuilder{
|
||||
Init: component.InitDatastoreComponent,
|
||||
IsInitialized: component.DatastoreComponentIsInitialized,
|
||||
OpenHandler: func(r *FSRepo) error {
|
||||
c := component.DatastoreComponent{}
|
||||
c.SetPath(r.path)
|
||||
if err := c.Open(r.config); err != nil {
|
||||
return err
|
||||
}
|
||||
r.datastoreComponent = c
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,6 +135,7 @@ func TestOpenMoreThanOnceInSameProcess(t *testing.T) {
|
||||
r2 := At(path)
|
||||
assert.Nil(r1.Open(), t, "first repo should open successfully")
|
||||
assert.Nil(r2.Open(), t, "second repo should open successfully")
|
||||
assert.True(r1.ds == r2.ds, t, "repos should share the datastore")
|
||||
|
||||
assert.Nil(r1.Close(), t)
|
||||
assert.Nil(r2.Close(), t)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user