Merge pull request #559 from jbenet/feat/repo-fsrepo-filelock

refactor(fsrepo.FSRepo): manage the Datastore and daemon.lock
This commit is contained in:
Brian Tiger Chow 2015-01-14 10:20:47 -08:00
commit 2dd8eaae3f
22 changed files with 857 additions and 208 deletions

6
Godeps/Godeps.json generated
View File

@ -1,6 +1,6 @@
{
"ImportPath": "github.com/jbenet/go-ipfs",
"GoVersion": "go1.3",
"GoVersion": "go1.4",
"Packages": [
"./..."
],
@ -68,6 +68,10 @@
"ImportPath": "github.com/dustin/go-humanize",
"Rev": "b198514c204f20799b91c93b6ffd8b26be04c2c9"
},
{
"ImportPath": "github.com/facebookgo/atomicfile",
"Rev": "6f117f2e7f224fb03eb5e5fba370eade6e2b90c8"
},
{
"ImportPath": "github.com/facebookgo/stack",
"Rev": "4da6d991fc3c389efa512151354d643eb5fae4e2"

View File

@ -0,0 +1,24 @@
language: go
go:
- 1.2
- 1.3
matrix:
fast_finish: true
before_install:
- go get -v code.google.com/p/go.tools/cmd/vet
- go get -v github.com/golang/lint/golint
- go get -v code.google.com/p/go.tools/cmd/cover
install:
- go install -race -v std
- go get -race -t -v ./...
- go install -race -v ./...
script:
- go vet ./...
- $HOME/gopath/bin/golint .
- go test -cpu=2 -race -v ./...
- go test -cpu=2 -covermode=atomic ./...

View File

@ -0,0 +1,54 @@
// Package atomicfile provides the ability to write a file with an eventual
// rename on Close. This allows for a file to always be in a consistent state
// and never represent an in-progress write.
package atomicfile
import (
"io/ioutil"
"os"
"path/filepath"
)
// File behaves like os.File, but does an atomic rename operation at Close.
type File struct {
*os.File
path string
}
// New creates a new temporary file that will replace the file at the given
// path when Closed.
func New(path string, mode os.FileMode) (*File, error) {
f, err := ioutil.TempFile(filepath.Dir(path), filepath.Base(path))
if err != nil {
return nil, err
}
if err := os.Chmod(f.Name(), mode); err != nil {
os.Remove(f.Name())
return nil, err
}
return &File{File: f, path: path}, nil
}
// Close the file replacing the configured file.
func (f *File) Close() error {
if err := f.File.Close(); err != nil {
return err
}
if err := os.Rename(f.Name(), f.path); err != nil {
return err
}
return nil
}
// Abort closes the file and removes it instead of replacing the configured
// file. This is useful if after starting to write to the file you decide you
// don't want it anymore.
func (f *File) Abort() error {
if err := f.File.Close(); err != nil {
return err
}
if err := os.Remove(f.Name()); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,86 @@
package atomicfile_test
import (
"bytes"
"io/ioutil"
"os"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/facebookgo/atomicfile"
)
func test(t *testing.T, dir, prefix string) {
t.Parallel()
tmpfile, err := ioutil.TempFile(dir, prefix)
if err != nil {
t.Fatal(err)
}
name := tmpfile.Name()
if err := os.Remove(name); err != nil {
t.Fatal(err)
}
defer os.Remove(name)
f, err := atomicfile.New(name, os.FileMode(0666))
if err != nil {
t.Fatal(err)
}
f.Write([]byte("foo"))
if _, err := os.Stat(name); !os.IsNotExist(err) {
t.Fatal("did not expect file to exist")
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(name); err != nil {
t.Fatalf("expected file to exist: %s", err)
}
}
func TestCurrentDir(t *testing.T) {
cwd, _ := os.Getwd()
test(t, cwd, "atomicfile-current-dir-")
}
func TestRootTmpDir(t *testing.T) {
test(t, "/tmp", "atomicfile-root-tmp-dir-")
}
func TestDefaultTmpDir(t *testing.T) {
test(t, "", "atomicfile-default-tmp-dir-")
}
func TestAbort(t *testing.T) {
contents := []byte("the answer is 42")
t.Parallel()
tmpfile, err := ioutil.TempFile("", "atomicfile-abort-")
if err != nil {
t.Fatal(err)
}
name := tmpfile.Name()
if _, err := tmpfile.Write(contents); err != nil {
t.Fatal(err)
}
defer os.Remove(name)
f, err := atomicfile.New(name, os.FileMode(0666))
if err != nil {
t.Fatal(err)
}
f.Write([]byte("foo"))
if err := f.Abort(); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(name); err != nil {
t.Fatalf("expected file to exist: %s", err)
}
actual, err := ioutil.ReadFile(name)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(contents, actual) {
t.Fatalf(`did not find expected "%s" instead found "%s"`, contents, actual)
}
}

View File

@ -0,0 +1,30 @@
BSD License
For atomicfile software
Copyright (c) 2014, Facebook, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,23 @@
Additional Grant of Patent Rights
"Software" means the atomicfile software distributed by Facebook, Inc.
Facebook hereby grants you a perpetual, worldwide, royalty-free, non-exclusive,
irrevocable (subject to the termination provision below) license under any
rights in any patent claims owned by Facebook, to make, have made, use, sell,
offer to sell, import, and otherwise transfer the Software. For avoidance of
doubt, no license is granted under Facebooks rights in any patent claims that
are infringed by (i) modifications to the Software made by you or a third party,
or (ii) the Software in combination with any software or other technology
provided by you or a third party.
The license granted hereunder will terminate, automatically and without notice,
for anyone that makes any claim (including by filing any lawsuit, assertion or
other action) alleging (a) direct, indirect, or contributory infringement or
inducement to infringe any patent: (i) by Facebook or any of its subsidiaries or
affiliates, whether or not such claim is related to the Software, (ii) by any
party if such claim arises in whole or in part from any software, product or
service of Facebook or any of its subsidiaries or affiliates, whether or not
such claim is related to the Software, or (iii) by any party relating to the
Software; or (b) that any right in any patent claim of Facebook is invalid or
unenforceable.

View File

@ -0,0 +1,4 @@
atomicfile [![Build Status](https://secure.travis-ci.org/facebookgo/atomicfile.png)](http://travis-ci.org/facebookgo/atomicfile)
==========
Documentation: http://godoc.org/github.com/facebookgo/atomicfile

View File

@ -12,7 +12,7 @@ import (
cmdsHttp "github.com/jbenet/go-ipfs/commands/http"
core "github.com/jbenet/go-ipfs/core"
commands "github.com/jbenet/go-ipfs/core/commands"
daemon "github.com/jbenet/go-ipfs/core/daemon"
fsrepo "github.com/jbenet/go-ipfs/repo/fsrepo"
util "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/debugerror"
)
@ -89,13 +89,13 @@ func daemonFunc(req cmds.Request) (interface{}, error) {
return nil, err
}
// acquire the daemon lock _before_ constructing a node. we need to make
// acquire the repo lock _before_ constructing a node. we need to make
// sure we are permitted to access the resources (datastore, etc.)
lock, err := daemon.Lock(req.Context().ConfigRoot)
if err != nil {
repo := fsrepo.At(req.Context().ConfigRoot)
if err := repo.Open(); err != nil {
return nil, debugerror.Errorf("Couldn't obtain lock. Is another daemon already running?")
}
defer lock.Close()
defer repo.Close()
// OK!!! Now we're ready to construct the node.
// make sure we construct an online node.

View File

@ -20,7 +20,6 @@ import (
cmdsCli "github.com/jbenet/go-ipfs/commands/cli"
cmdsHttp "github.com/jbenet/go-ipfs/commands/http"
core "github.com/jbenet/go-ipfs/core"
daemon "github.com/jbenet/go-ipfs/core/daemon"
repo "github.com/jbenet/go-ipfs/repo"
config "github.com/jbenet/go-ipfs/repo/config"
fsrepo "github.com/jbenet/go-ipfs/repo/fsrepo"
@ -392,7 +391,7 @@ func commandShouldRunOnDaemon(details cmdDetails, req cmds.Request, root *cmds.C
// at this point need to know whether daemon is running. we defer
// to this point so that some commands dont open files unnecessarily.
daemonLocked := daemon.Locked(req.Context().ConfigRoot)
daemonLocked := fsrepo.LockedByOtherProcess(req.Context().ConfigRoot)
if daemonLocked {
@ -445,12 +444,7 @@ func getConfigRoot(req cmds.Request) (string, error) {
}
func loadConfig(path string) (*config.Config, error) {
r := fsrepo.At(path)
if err := r.Open(); err != nil {
return nil, err
}
defer r.Close()
return r.Config(), nil
return fsrepo.ConfigAt(path)
}
// startProfiling begins CPU profiling and returns a `stop` function to be

View File

@ -188,6 +188,7 @@ func tourGet(id tour.ID) (*tour.Topic, error) {
// TODO share func
func writeConfig(path string, cfg *config.Config) error {
// NB: This needs to run on the daemon.
r := fsrepo.At(path)
if err := r.Open(); err != nil {
return err

View File

@ -0,0 +1,15 @@
package component
import (
"io"
"github.com/jbenet/go-ipfs/repo/config"
)
type Component interface {
Open() error
io.Closer
SetPath(string)
}
type Initializer func(path string, conf *config.Config) error
type InitializationChecker func(path string) bool

View File

@ -0,0 +1,148 @@
package component
import (
common "github.com/jbenet/go-ipfs/repo/common"
config "github.com/jbenet/go-ipfs/repo/config"
serialize "github.com/jbenet/go-ipfs/repo/fsrepo/serialize"
util "github.com/jbenet/go-ipfs/util"
)
var _ Component = &ConfigComponent{}
var _ Initializer = InitConfigComponent
var _ InitializationChecker = ConfigComponentIsInitialized
// ConfigComponent abstracts the config component of the FSRepo.
// NB: create with makeConfigComponent function.
// NOT THREAD-SAFE
type ConfigComponent struct {
path string // required at instantiation
config *config.Config // assigned on Open()
}
// fsrepoConfigInit initializes the FSRepo's ConfigComponent.
func InitConfigComponent(path string, conf *config.Config) error {
if ConfigComponentIsInitialized(path) {
return nil
}
configFilename, err := config.Filename(path)
if err != nil {
return err
}
// initialization is the one time when it's okay to write to the config
// without reading the config from disk and merging any user-provided keys
// that may exist.
if err := serialize.WriteConfigFile(configFilename, conf); err != nil {
return err
}
return nil
}
// Open returns an error if the config file is not present.
func (c *ConfigComponent) Open() error {
configFilename, err := config.Filename(c.path)
if err != nil {
return err
}
conf, err := serialize.Load(configFilename)
if err != nil {
return err
}
c.config = conf
return nil
}
// Close satisfies the fsrepoComponent interface.
func (c *ConfigComponent) Close() error {
return nil // config doesn't need to be closed.
}
func (c *ConfigComponent) Config() *config.Config {
return c.config
}
// SetConfig updates the config file.
func (c *ConfigComponent) SetConfig(updated *config.Config) error {
return c.setConfigUnsynced(updated)
}
// GetConfigKey retrieves only the value of a particular key.
func (c *ConfigComponent) GetConfigKey(key string) (interface{}, error) {
filename, err := config.Filename(c.path)
if err != nil {
return nil, err
}
var cfg map[string]interface{}
if err := serialize.ReadConfigFile(filename, &cfg); err != nil {
return nil, err
}
return common.MapGetKV(cfg, key)
}
// SetConfigKey writes the value of a particular key.
func (c *ConfigComponent) SetConfigKey(key string, value interface{}) error {
filename, err := config.Filename(c.path)
if err != nil {
return err
}
var mapconf map[string]interface{}
if err := serialize.ReadConfigFile(filename, &mapconf); err != nil {
return err
}
if err := common.MapSetKV(mapconf, key, value); err != nil {
return err
}
if err := serialize.WriteConfigFile(filename, mapconf); err != nil {
return err
}
// in order to get the updated values, read updated config from the
// file-system.
conf, err := config.FromMap(mapconf)
if err != nil {
return err
}
return c.setConfigUnsynced(conf) // TODO roll this into this method
}
func (c *ConfigComponent) SetPath(p string) {
c.path = p
}
// ConfigComponentIsInitialized returns true if the repo is initialized at
// provided |path|.
func ConfigComponentIsInitialized(path string) bool {
configFilename, err := config.Filename(path)
if err != nil {
return false
}
if !util.FileExists(configFilename) {
return false
}
return true
}
// setConfigUnsynced is for private use.
func (r *ConfigComponent) setConfigUnsynced(updated *config.Config) error {
configFilename, err := config.Filename(r.path)
if err != nil {
return err
}
// to avoid clobbering user-provided keys, must read the config from disk
// as a map, write the updated struct values to the map and write the map
// to disk.
var mapconf map[string]interface{}
if err := serialize.ReadConfigFile(configFilename, &mapconf); err != nil {
return err
}
m, err := config.ToMap(updated)
if err != nil {
return err
}
for k, v := range m {
mapconf[k] = v
}
if err := serialize.WriteConfigFile(configFilename, mapconf); err != nil {
return err
}
*r.config = *updated // copy so caller cannot modify this private config
return nil
}

View File

@ -0,0 +1,64 @@
package component
import (
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
levelds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/leveldb"
ldbopts "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt"
config "github.com/jbenet/go-ipfs/repo/config"
dir "github.com/jbenet/go-ipfs/repo/fsrepo/dir"
util "github.com/jbenet/go-ipfs/util"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
)
var _ Component = &DatastoreComponent{}
var _ Initializer = InitDatastoreComponent
var _ InitializationChecker = DatastoreComponentIsInitialized
func InitDatastoreComponent(path string, conf *config.Config) error {
// The actual datastore contents are initialized lazily when Opened.
// During Init, we merely check that the directory is writeable.
dspath, err := config.DataStorePath(path)
if err != nil {
return err
}
if err := dir.Writable(dspath); err != nil {
return debugerror.Errorf("datastore: %s", err)
}
return nil
}
// DatastoreComponentIsInitialized returns true if the datastore dir exists.
func DatastoreComponentIsInitialized(path string) bool {
dspath, err := config.DataStorePath(path)
if err != nil {
return false
}
if !util.FileExists(dspath) {
return false
}
return true
}
// DatastoreComponent abstracts the datastore component of the FSRepo.
// NB: create with makeDatastoreComponent function.
type DatastoreComponent struct {
path string
ds ds2.ThreadSafeDatastoreCloser
}
// Open returns an error if the config file is not present.
func (dsc *DatastoreComponent) Open() error {
ds, err := levelds.NewDatastore(dsc.path, &levelds.Options{
Compression: ldbopts.NoCompression,
})
if err != nil {
return err
}
dsc.ds = ds
return nil
}
func (dsc *DatastoreComponent) Close() error { return dsc.ds.Close() }
func (dsc *DatastoreComponent) SetPath(p string) { dsc.path = p }
func (dsc *DatastoreComponent) Datastore() datastore.ThreadSafeDatastore { return dsc.ds }

View File

@ -1,52 +1,41 @@
package fsrepo
package counter
import (
"path"
"sync"
)
import "path"
type Counter struct {
// lock protects repos
lock sync.Mutex
// TODO this could be made into something more generic.
type Openers struct {
// repos maps repo paths to the number of openers holding an FSRepo handle
// to it
repos map[string]int
}
func NewCounter() *Counter {
return &Counter{
func NewOpenersCounter() *Openers {
return &Openers{
repos: make(map[string]int),
}
}
// Lock must be held to while performing any operation that modifies an
// FSRepo's state field. This includes Init, Open, Close, and Remove.
func (l *Counter) Lock() {
l.lock.Lock()
}
func (l *Counter) Unlock() {
l.lock.Unlock()
}
// NumOpeners returns the number of FSRepos holding a handle to the repo at
// this path. This method is not thread-safe. The caller must have this object
// locked.
func (l *Counter) NumOpeners(repoPath string) int {
func (l *Openers) NumOpeners(repoPath string) int {
return l.repos[key(repoPath)]
}
// AddOpener messages that an FSRepo holds a handle to the repo at this path.
// This method is not thread-safe. The caller must have this object locked.
func (l *Counter) AddOpener(repoPath string) {
func (l *Openers) AddOpener(repoPath string) error {
l.repos[key(repoPath)]++
return nil
}
// RemoveOpener messgaes that an FSRepo no longer holds a handle to the repo at
// this path. This method is not thread-safe. The caller must have this object
// locked.
func (l *Counter) RemoveOpener(repoPath string) {
func (l *Openers) RemoveOpener(repoPath string) error {
l.repos[key(repoPath)]--
return nil
}
func key(repoPath string) string {

24
repo/fsrepo/dir/dir.go Normal file
View File

@ -0,0 +1,24 @@
package dir
// TODO move somewhere generic
import (
"errors"
"os"
"path/filepath"
)
// Writable ensures the directory exists and is writable
func Writable(path string) error {
// Construct the path if missing
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return err
}
// Check the directory is writeable
if f, err := os.Create(filepath.Join(path, "._check_writeable")); err == nil {
os.Remove(f.Name())
} else {
return errors.New("'" + path + "' is not writeable")
}
return nil
}

22
repo/fsrepo/doc.go Normal file
View File

@ -0,0 +1,22 @@
// package fsrepo
//
// TODO explain the package roadmap...
//
// .go-ipfs/
// ├── client/
// | ├── client.lock <------ protects client/ + signals its own pid
// │ ├── ipfs-client.cpuprof
// │ ├── ipfs-client.memprof
// │ └── logs/
// ├── config
// ├── daemon/
// │ ├── daemon.lock <------ protects daemon/ + signals its own address
// │ ├── ipfs-daemon.cpuprof
// │ ├── ipfs-daemon.memprof
// │ └── logs/
// ├── datastore/
// ├── repo.lock <------ protects datastore/ and config
// └── version
package fsrepo
// TODO prevent multiple daemons from running

View File

@ -5,79 +5,145 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"path"
"sync"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
repo "github.com/jbenet/go-ipfs/repo"
common "github.com/jbenet/go-ipfs/repo/common"
config "github.com/jbenet/go-ipfs/repo/config"
opener "github.com/jbenet/go-ipfs/repo/fsrepo/opener"
util "github.com/jbenet/go-ipfs/util"
component "github.com/jbenet/go-ipfs/repo/fsrepo/component"
counter "github.com/jbenet/go-ipfs/repo/fsrepo/counter"
dir "github.com/jbenet/go-ipfs/repo/fsrepo/dir"
lockfile "github.com/jbenet/go-ipfs/repo/fsrepo/lock"
serialize "github.com/jbenet/go-ipfs/repo/fsrepo/serialize"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
)
var (
// openerCounter prevents the fsrepo from being removed while there exist open
// packageLock must be held to while performing any operation that modifies an
// FSRepo's state field. This includes Init, Open, Close, and Remove.
packageLock sync.Mutex // protects openersCounter and lockfiles
// lockfiles holds references to the Closers that ensure that repos are
// only accessed by one process at a time.
lockfiles map[string]io.Closer
// openersCounter prevents the fsrepo from being removed while there exist open
// FSRepo handles. It also ensures that the Init is atomic.
//
// packageLock also protects numOpenedRepos
//
// If an operation is used when repo is Open and the operation does not
// change the repo's state, the package lock does not need to be acquired.
openerCounter *opener.Counter
openersCounter *counter.Openers
)
func init() {
openerCounter = opener.NewCounter()
openersCounter = counter.NewOpenersCounter()
lockfiles = make(map[string]io.Closer)
}
// FSRepo represents an IPFS FileSystem Repo. It is not thread-safe.
// FSRepo represents an IPFS FileSystem Repo. It is safe for use by multiple
// callers.
type FSRepo struct {
state state
path string
config *config.Config
// state is the FSRepo's state (unopened, opened, closed)
state state
// path is the file-system path
path string
// configComponent is loaded when FSRepo is opened and kept up to date when
// the FSRepo is modified.
// TODO test
configComponent component.ConfigComponent
datastoreComponent component.DatastoreComponent
}
type componentBuilder struct {
Init component.Initializer
IsInitialized component.InitializationChecker
OpenHandler func(*FSRepo) error
}
// At returns a handle to an FSRepo at the provided |path|.
func At(path string) *FSRepo {
func At(repoPath string) *FSRepo {
// This method must not have side-effects.
return &FSRepo{
path: path,
path: path.Clean(repoPath),
state: unopened, // explicitly set for clarity
}
}
// ConfigAt returns an error if the FSRepo at the given path is not
// initialized. This function allows callers to read the config file even when
// another process is running and holding the lock.
func ConfigAt(repoPath string) (*config.Config, error) {
// packageLock must be held to ensure that the Read is atomic.
packageLock.Lock()
defer packageLock.Unlock()
configFilename, err := config.Filename(repoPath)
if err != nil {
return nil, err
}
return serialize.Load(configFilename)
}
// Init initializes a new FSRepo at the given path with the provided config.
// TODO add support for custom datastores.
func Init(path string, conf *config.Config) error {
openerCounter.Lock() // lock must be held to ensure atomicity (prevent Removal)
defer openerCounter.Unlock()
// packageLock must be held to ensure that the repo is not initialized more
// than once.
packageLock.Lock()
defer packageLock.Unlock()
if isInitializedUnsynced(path) {
return nil
}
configFilename, err := config.Filename(path)
if err != nil {
return err
}
if err := writeConfigFile(configFilename, conf); err != nil {
return err
for _, b := range componentBuilders() {
if err := b.Init(path, conf); err != nil {
return err
}
}
return nil
}
// Remove recursively removes the FSRepo at |path|.
func Remove(path string) error {
openerCounter.Lock()
defer openerCounter.Unlock()
if openerCounter.NumOpeners(path) != 0 {
func Remove(repoPath string) error {
repoPath = path.Clean(repoPath)
// packageLock must be held to ensure that the repo is not removed while
// being accessed by others.
packageLock.Lock()
defer packageLock.Unlock()
if openersCounter.NumOpeners(repoPath) != 0 {
return errors.New("repo in use")
}
return os.RemoveAll(path)
return os.RemoveAll(repoPath)
}
// LockedByOtherProcess returns true if the FSRepo is locked by another
// process. If true, then the repo cannot be opened by this process.
func LockedByOtherProcess(repoPath string) bool {
repoPath = path.Clean(repoPath)
// packageLock must be held to check the number of openers.
packageLock.Lock()
defer packageLock.Unlock()
// NB: the lock is only held when repos are Open
return lockfile.Locked(repoPath) && openersCounter.NumOpeners(repoPath) == 0
}
// Open returns an error if the repo is not initialized.
func (r *FSRepo) Open() error {
openerCounter.Lock()
defer openerCounter.Unlock()
// packageLock must be held to make sure that the repo is not destroyed by
// another caller. It must not be released until initialization is complete
// and the number of openers is incremeneted.
packageLock.Lock()
defer packageLock.Unlock()
if r.state != unopened {
return debugerror.Errorf("repo is %s", r.state)
}
@ -87,40 +153,42 @@ func (r *FSRepo) Open() error {
// check repo path, then check all constituent parts.
// TODO acquire repo lock
// TODO if err := initCheckDir(logpath); err != nil { // }
if err := initCheckDir(r.path); err != nil {
if err := dir.Writable(r.path); err != nil {
return err
}
configFilename, err := config.Filename(r.path)
if err != nil {
return err
}
conf, err := load(configFilename)
if err != nil {
return err
}
r.config = conf
// datastore
dspath, err := config.DataStorePath("")
if err != nil {
return err
}
if err := initCheckDir(dspath); err != nil {
return debugerror.Errorf("datastore: %s", err)
for _, b := range componentBuilders() {
if err := b.OpenHandler(r); err != nil {
return err
}
}
logpath, err := config.LogsPath("")
if err != nil {
return debugerror.Wrap(err)
}
if err := initCheckDir(logpath); err != nil {
if err := dir.Writable(logpath); err != nil {
return debugerror.Errorf("logs: %s", err)
}
r.state = opened
openerCounter.AddOpener(r.path)
return nil
return r.transitionToOpened()
}
// Close closes the FSRepo, releasing held resources.
func (r *FSRepo) Close() error {
packageLock.Lock()
defer packageLock.Unlock()
if r.state != opened {
return debugerror.Errorf("repo is %s", r.state)
}
for _, closer := range r.components() {
if err := closer.Close(); err != nil {
return err
}
}
return r.transitionToClosed()
}
// Config returns the FSRepo's config. This method must not be called if the
@ -128,97 +196,60 @@ func (r *FSRepo) Open() error {
//
// Result when not Open is undefined. The method may panic if it pleases.
func (r *FSRepo) Config() *config.Config {
// no lock necessary because repo is either Open (and thus protected from
// Removal) or has no side-effect
// It is not necessary to hold the package lock since the repo is in an
// opened state. The package lock is _not_ meant to ensure that the repo is
// thread-safe. The package lock is only meant to guard againt removal and
// coordinate the lockfile. However, we provide thread-safety to keep
// things simple.
packageLock.Lock()
defer packageLock.Unlock()
if r.state != opened {
panic(fmt.Sprintln("repo is", r.state))
}
return r.config
return r.configComponent.Config()
}
// SetConfig updates the FSRepo's config.
func (r *FSRepo) SetConfig(updated *config.Config) error {
// no lock required because repo should be Open
if r.state != opened {
panic(fmt.Sprintln("repo is", r.state))
}
configFilename, err := config.Filename(r.path)
if err != nil {
return err
}
// to avoid clobbering user-provided keys, must read the config from disk
// as a map, write the updated struct values to the map and write the map
// to disk.
var mapconf map[string]interface{}
if err := readConfigFile(configFilename, &mapconf); err != nil {
return err
}
m, err := config.ToMap(updated)
if err != nil {
return err
}
for k, v := range m {
mapconf[k] = v
}
if err := writeConfigFile(configFilename, mapconf); err != nil {
return err
}
*r.config = *updated // copy so caller cannot modify this private config
return nil
// packageLock is held to provide thread-safety.
packageLock.Lock()
defer packageLock.Unlock()
return r.configComponent.SetConfig(updated)
}
// GetConfigKey retrieves only the value of a particular key.
func (r *FSRepo) GetConfigKey(key string) (interface{}, error) {
packageLock.Lock()
defer packageLock.Unlock()
if r.state != opened {
return nil, debugerror.Errorf("repo is %s", r.state)
}
filename, err := config.Filename(r.path)
if err != nil {
return nil, err
}
var cfg map[string]interface{}
if err := readConfigFile(filename, &cfg); err != nil {
return nil, err
}
return common.MapGetKV(cfg, key)
return r.configComponent.GetConfigKey(key)
}
// SetConfigKey writes the value of a particular key.
func (r *FSRepo) SetConfigKey(key string, value interface{}) error {
// no lock required because repo should be Open
packageLock.Lock()
defer packageLock.Unlock()
if r.state != opened {
return debugerror.Errorf("repo is %s", r.state)
}
filename, err := config.Filename(r.path)
if err != nil {
return err
}
var mapconf map[string]interface{}
if err := readConfigFile(filename, &mapconf); err != nil {
return err
}
if err := common.MapSetKV(mapconf, key, value); err != nil {
return err
}
if err := writeConfigFile(filename, mapconf); err != nil {
return err
}
conf, err := config.FromMap(mapconf)
if err != nil {
return err
}
return r.SetConfig(conf)
return r.configComponent.SetConfigKey(key, value)
}
// Close closes the FSRepo, releasing held resources.
func (r *FSRepo) Close() error {
openerCounter.Lock()
defer openerCounter.Unlock()
if r.state != opened {
return debugerror.Errorf("repo is %s", r.state)
}
openerCounter.RemoveOpener(r.path)
return nil // TODO release repo lock
// Datastore returns a repo-owned datastore. If FSRepo is Closed, return value
// is undefined.
func (r *FSRepo) Datastore() ds.ThreadSafeDatastore {
packageLock.Lock()
d := r.datastoreComponent.Datastore()
packageLock.Unlock()
return d
}
var _ io.Closer = &FSRepo{}
@ -226,35 +257,100 @@ var _ repo.Repo = &FSRepo{}
// IsInitialized returns true if the repo is initialized at provided |path|.
func IsInitialized(path string) bool {
openerCounter.Lock()
defer openerCounter.Unlock()
// packageLock is held to ensure that another caller doesn't attempt to
// Init or Remove the repo while this call is in progress.
packageLock.Lock()
defer packageLock.Unlock()
return isInitializedUnsynced(path)
}
// private methods below this point. NB: packageLock must held by caller.
// isInitializedUnsynced reports whether the repo is initialized. Caller must
// hold openerCounter lock.
// hold the packageLock.
func isInitializedUnsynced(path string) bool {
configFilename, err := config.Filename(path)
if err != nil {
return false
}
if !util.FileExists(configFilename) {
return false
for _, b := range componentBuilders() {
if !b.IsInitialized(path) {
return false
}
}
return true
}
// initCheckDir ensures the directory exists and is writable
func initCheckDir(path string) error {
// Construct the path if missing
if err := os.MkdirAll(path, os.ModePerm); err != nil {
// transitionToOpened manages the state transition to |opened|. Caller must hold
// the package mutex.
func (r *FSRepo) transitionToOpened() error {
r.state = opened
if countBefore := openersCounter.NumOpeners(r.path); countBefore == 0 { // #first
closer, err := lockfile.Lock(r.path)
if err != nil {
return err
}
lockfiles[r.path] = closer
}
return openersCounter.AddOpener(r.path)
}
// transitionToClosed manages the state transition to |closed|. Caller must
// hold the package mutex.
func (r *FSRepo) transitionToClosed() error {
r.state = closed
if err := openersCounter.RemoveOpener(r.path); err != nil {
return err
}
// Check the directory is writeable
if f, err := os.Create(filepath.Join(path, "._check_writeable")); err == nil {
os.Remove(f.Name())
} else {
return debugerror.New("'" + path + "' is not writeable")
if countAfter := openersCounter.NumOpeners(r.path); countAfter == 0 {
closer, ok := lockfiles[r.path]
if !ok {
return errors.New("package error: lockfile is not held")
}
if err := closer.Close(); err != nil {
return err
}
delete(lockfiles, r.path)
}
return nil
}
// components returns the FSRepo's constituent components
func (r *FSRepo) components() []component.Component {
return []component.Component{
&r.configComponent,
&r.datastoreComponent,
}
}
func componentBuilders() []componentBuilder {
return []componentBuilder{
// ConfigComponent
componentBuilder{
Init: component.InitConfigComponent,
IsInitialized: component.ConfigComponentIsInitialized,
OpenHandler: func(r *FSRepo) error {
c := component.ConfigComponent{}
c.SetPath(r.path)
if err := c.Open(); err != nil {
return err
}
r.configComponent = c
return nil
},
},
// DatastoreComponent
componentBuilder{
Init: component.InitDatastoreComponent,
IsInitialized: component.DatastoreComponentIsInitialized,
OpenHandler: func(r *FSRepo) error {
c := component.DatastoreComponent{}
c.SetPath(r.path)
if err := c.Open(); err != nil {
return err
}
r.datastoreComponent = c
return nil
},
},
}
}

View File

@ -1,12 +1,15 @@
package fsrepo
import (
"bytes"
"io/ioutil"
"testing"
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/jbenet/go-ipfs/repo/config"
)
// swap arg order
func testRepoPath(p string, t *testing.T) string {
name, err := ioutil.TempDir("", p)
if err != nil {
@ -15,7 +18,22 @@ func testRepoPath(p string, t *testing.T) string {
return name
}
func TestInitIdempotence(t *testing.T) {
t.Parallel()
path := testRepoPath("", t)
for i := 0; i < 10; i++ {
AssertNil(Init(path, &config.Config{}), t, "multiple calls to init should succeed")
}
}
func TestRemove(t *testing.T) {
t.Parallel()
path := testRepoPath("foo", t)
AssertNil(Remove(path), t, "should be able to remove after closed")
}
func TestCannotRemoveIfOpen(t *testing.T) {
t.Parallel()
path := testRepoPath("TestCannotRemoveIfOpen", t)
AssertNil(Init(path, &config.Config{}), t, "should initialize successfully")
r := At(path)
@ -25,19 +43,37 @@ func TestCannotRemoveIfOpen(t *testing.T) {
AssertNil(Remove(path), t, "should be able to remove after closed")
}
func TestCannotBeReopened(t *testing.T) {
t.Parallel()
path := testRepoPath("", t)
AssertNil(Init(path, &config.Config{}), t)
r := At(path)
AssertNil(r.Open(), t)
AssertNil(r.Close(), t)
AssertErr(r.Open(), t, "shouldn't be possible to re-open the repo")
// mutable state is the enemy. Take Close() as an opportunity to reduce
// entropy. Callers ought to start fresh with a new handle by calling `At`.
}
func TestCanManageReposIndependently(t *testing.T) {
t.Parallel()
pathA := testRepoPath("a", t)
pathB := testRepoPath("b", t)
t.Log("initialize two repos")
AssertNil(Init(pathA, &config.Config{}), t, "should initialize successfully")
AssertNil(Init(pathB, &config.Config{}), t, "should initialize successfully")
AssertNil(Init(pathA, &config.Config{}), t, "a", "should initialize successfully")
AssertNil(Init(pathB, &config.Config{}), t, "b", "should initialize successfully")
t.Log("ensure repos initialized")
Assert(IsInitialized(pathA), t, "a should be initialized")
Assert(IsInitialized(pathB), t, "b should be initialized")
t.Log("open the two repos")
repoA := At(pathA)
repoB := At(pathB)
AssertNil(repoA.Open(), t)
AssertNil(repoB.Open(), t)
AssertNil(repoA.Open(), t, "a")
AssertNil(repoB.Open(), t, "b")
t.Log("close and remove b while a is open")
AssertNil(repoB.Close(), t, "close b")
@ -48,14 +84,61 @@ func TestCanManageReposIndependently(t *testing.T) {
AssertNil(Remove(pathA), t)
}
func TestDatastoreGetNotAllowedAfterClose(t *testing.T) {
t.Parallel()
path := testRepoPath("test", t)
Assert(!IsInitialized(path), t, "should NOT be initialized")
AssertNil(Init(path, &config.Config{}), t, "should initialize successfully")
r := At(path)
AssertNil(r.Open(), t, "should open successfully")
k := "key"
data := []byte(k)
AssertNil(r.Datastore().Put(datastore.NewKey(k), data), t, "Put should be successful")
AssertNil(r.Close(), t)
_, err := r.Datastore().Get(datastore.NewKey(k))
AssertErr(err, t, "after closer, Get should be fail")
}
func TestDatastorePersistsFromRepoToRepo(t *testing.T) {
t.Parallel()
path := testRepoPath("test", t)
AssertNil(Init(path, &config.Config{}), t)
r1 := At(path)
AssertNil(r1.Open(), t)
k := "key"
expected := []byte(k)
AssertNil(r1.Datastore().Put(datastore.NewKey(k), expected), t, "using first repo, Put should be successful")
AssertNil(r1.Close(), t)
r2 := At(path)
AssertNil(r2.Open(), t)
v, err := r2.Datastore().Get(datastore.NewKey(k))
AssertNil(err, t, "using second repo, Get should be successful")
actual, ok := v.([]byte)
Assert(ok, t, "value should be the []byte from r1's Put")
AssertNil(r2.Close(), t)
Assert(bytes.Compare(expected, actual) == 0, t, "data should match")
}
func AssertNil(err error, t *testing.T, msgs ...string) {
if err != nil {
t.Error(msgs, "error:", err)
t.Fatal(msgs, "error:", err)
}
}
func Assert(v bool, t *testing.T, msgs ...string) {
if !v {
t.Fatal(msgs)
}
}
func AssertErr(err error, t *testing.T, msgs ...string) {
if err == nil {
t.Error(msgs, "error:", err)
t.Fatal(msgs, "error:", err)
}
}

View File

@ -1,4 +1,4 @@
package daemon
package lock
import (
"io"
@ -10,6 +10,7 @@ import (
)
// LockFile is the filename of the daemon lock, relative to config dir
// TODO rename repo lock and hide name
const LockFile = "daemon.lock"
func Lock(confdir string) (io.Closer, error) {
@ -23,7 +24,6 @@ func Locked(confdir string) bool {
}
if lk, err := Lock(confdir); err != nil {
return true
} else {
lk.Close()
return false

View File

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/facebookgo/atomicfile"
"github.com/jbenet/go-ipfs/repo/config"
"github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/debugerror"
@ -14,8 +15,8 @@ import (
var log = util.Logger("fsrepo")
// readConfigFile reads the config from `filename` into `cfg`.
func readConfigFile(filename string, cfg interface{}) error {
// ReadConfigFile reads the config from `filename` into `cfg`.
func ReadConfigFile(filename string, cfg interface{}) error {
f, err := os.Open(filename)
if err != nil {
return err
@ -27,14 +28,14 @@ func readConfigFile(filename string, cfg interface{}) error {
return nil
}
// writeConfigFile writes the config from `cfg` into `filename`.
func writeConfigFile(filename string, cfg interface{}) error {
// WriteConfigFile writes the config from `cfg` into `filename`.
func WriteConfigFile(filename string, cfg interface{}) error {
err := os.MkdirAll(filepath.Dir(filename), 0775)
if err != nil {
return err
}
f, err := os.Create(filename)
f, err := atomicfile.New(filename, 0775)
if err != nil {
return err
}
@ -43,23 +44,6 @@ func writeConfigFile(filename string, cfg interface{}) error {
return encode(f, cfg)
}
// writeFile writes the buffer at filename
func writeFile(filename string, buf []byte) error {
err := os.MkdirAll(filepath.Dir(filename), 0775)
if err != nil {
return err
}
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(buf)
return err
}
// encode configuration with JSON
func encode(w io.Writer, value interface{}) error {
// need to prettyprint, hence MarshalIndent, instead of Encoder
@ -71,20 +55,21 @@ func encode(w io.Writer, value interface{}) error {
return err
}
// load reads given file and returns the read config, or error.
func load(filename string) (*config.Config, error) {
// Load reads given file and returns the read config, or error.
func Load(filename string) (*config.Config, error) {
// if nothing is there, fail. User must run 'ipfs init'
if !util.FileExists(filename) {
return nil, debugerror.New("ipfs not initialized, please run 'ipfs init'")
}
var cfg config.Config
err := readConfigFile(filename, &cfg)
err := ReadConfigFile(filename, &cfg)
if err != nil {
return nil, err
}
// tilde expansion on datastore path
// TODO why is this here??
cfg.Datastore.Path, err = util.TildeExpansion(cfg.Datastore.Path)
if err != nil {
return nil, err

View File

@ -11,11 +11,11 @@ func TestConfig(t *testing.T) {
const dsPath = "/path/to/datastore"
cfgWritten := new(config.Config)
cfgWritten.Datastore.Path = dsPath
err := writeConfigFile(filename, cfgWritten)
err := WriteConfigFile(filename, cfgWritten)
if err != nil {
t.Error(err)
}
cfgRead, err := load(filename)
cfgRead, err := Load(filename)
if err != nil {
t.Error(err)
return

View File

@ -1,6 +1,7 @@
package repo
import (
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
config "github.com/jbenet/go-ipfs/repo/config"
util "github.com/jbenet/go-ipfs/util"
)
@ -11,6 +12,8 @@ type Repo interface {
SetConfigKey(key string, value interface{}) error
GetConfigKey(key string) (interface{}, error)
Datastore() datastore.ThreadSafeDatastore
}
// IsInitialized returns true if the path is home to an initialized IPFS