From 9ab59e44ad25d241ca734553844b47db8866b5b2 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 8 Mar 2015 17:59:42 -0700 Subject: [PATCH 1/9] implement in memory model for ipns filesystem, to be used as backing for ipns fuse interface --- core/core.go | 22 ++- core/mock.go | 7 +- ipnsfs/dir.go | 340 ++++++++++++++++++++++++++++++++++++++++++ ipnsfs/file.go | 145 ++++++++++++++++++ ipnsfs/system.go | 285 +++++++++++++++++++++++++++++++++++ ipnsfs/system_test.go | 92 ++++++++++++ 6 files changed, 888 insertions(+), 3 deletions(-) create mode 100644 ipnsfs/dir.go create mode 100644 ipnsfs/file.go create mode 100644 ipnsfs/system.go create mode 100644 ipnsfs/system_test.go diff --git a/core/core.go b/core/core.go index 45767728b..5d5eb3df4 100644 --- a/core/core.go +++ b/core/core.go @@ -37,6 +37,7 @@ import ( rp "github.com/jbenet/go-ipfs/exchange/reprovide" mount "github.com/jbenet/go-ipfs/fuse/mount" + ipnsfs "github.com/jbenet/go-ipfs/ipnsfs" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" path "github.com/jbenet/go-ipfs/path" @@ -89,6 +90,8 @@ type IpfsNode struct { Diagnostics *diag.Diagnostics // the diagnostics service Reprovider *rp.Reprovider // the value reprovider system + IpnsFs *ipnsfs.Filesystem + ctxgroup.ContextGroup mode mode @@ -138,6 +141,16 @@ func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) node.Pinning = pin.NewPinner(node.Repo.Datastore(), node.DAG) } node.Resolver = &path.Resolver{DAG: node.DAG} + + // Setup the mutable ipns filesystem structure + if node.OnlineMode() { + fs, err := ipnsfs.NewFilesystem(ctx, node.DAG, node.Namesys, node.Pinning, node.PrivateKey) + if err != nil { + return nil, debugerror.Wrap(err) + } + node.IpnsFs = fs + } + success = true return node, nil } @@ -268,6 +281,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost // setup name system n.Namesys = namesys.NewNameSystem(n.Routing) + return nil } @@ -278,7 +292,6 @@ func (n *IpfsNode) teardown() error { // owned objects are closed in this teardown to ensure that they're closed // regardless of which constructor was used to add them to the node. closers := []io.Closer{ - n.Blocks, n.Exchange, n.Repo, } @@ -288,6 +301,13 @@ func (n *IpfsNode) teardown() error { } } + if n.Blocks != nil { + addCloser(n.Blocks) + } + if n.IpnsFs != nil { + addCloser(n.IpnsFs) + } + addCloser(n.Bootstrapper) if dht, ok := n.Routing.(*dht.IpfsDHT); ok { addCloser(dht) diff --git a/core/mock.go b/core/mock.go index 987ba02b0..149795cef 100644 --- a/core/mock.go +++ b/core/mock.go @@ -13,8 +13,9 @@ import ( mocknet "github.com/jbenet/go-ipfs/p2p/net/mock" peer "github.com/jbenet/go-ipfs/p2p/peer" path "github.com/jbenet/go-ipfs/path" + pin "github.com/jbenet/go-ipfs/pin" "github.com/jbenet/go-ipfs/repo" - mockrouting "github.com/jbenet/go-ipfs/routing/mock" + offrt "github.com/jbenet/go-ipfs/routing/offline" ds2 "github.com/jbenet/go-ipfs/util/datastore2" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -54,7 +55,7 @@ func NewMockNode() (*IpfsNode, error) { } // Routing - nd.Routing = mockrouting.NewServer().Client(ident) + nd.Routing = offrt.NewOfflineRouter(nd.Repo.Datastore(), nd.PrivateKey) // Bitswap bstore := blockstore.NewBlockstore(nd.Repo.Datastore()) @@ -65,6 +66,8 @@ func NewMockNode() (*IpfsNode, error) { nd.DAG = mdag.NewDAGService(bserv) + nd.Pinning = pin.NewPinner(nd.Repo.Datastore(), nd.DAG) + // Namespace resolver nd.Namesys = nsys.NewNameSystem(nd.Routing) diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go new file mode 100644 index 000000000..979ace875 --- /dev/null +++ b/ipnsfs/dir.go @@ -0,0 +1,340 @@ +package ipnsfs + +import ( + "errors" + "fmt" + "os" + "sync" + + dag "github.com/jbenet/go-ipfs/merkledag" + ft "github.com/jbenet/go-ipfs/unixfs" + ufspb "github.com/jbenet/go-ipfs/unixfs/pb" +) + +type Directory struct { + fs *Filesystem + parent childCloser + childDirs map[string]*Directory + files map[string]*file + + node *dag.Node + name string + lock sync.Mutex + + ref int + refLock sync.Mutex +} + +func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory { + return &Directory{ + fs: fs, + name: name, + node: node, + parent: parent, + childDirs: make(map[string]*Directory), + files: make(map[string]*file), + } +} + +func (d *Directory) Open(tpath []string, mode int) (File, error) { + if len(tpath) == 0 { + return nil, ErrIsDirectory + } + if len(tpath) == 1 { + fi, err := d.childFile(tpath[0]) + if err == nil { + return fi.withMode(mode), nil + } + + if mode|os.O_CREATE != 0 { + fnode := new(dag.Node) + fnode.Data = ft.FilePBData(nil, 0) + nfi, err := NewFile(tpath[0], fnode, d, d.fs) + if err != nil { + return nil, err + } + d.files[tpath[0]] = nfi + return nfi.withMode(mode), nil + } + + return nil, ErrNoSuch + } + + dir, err := d.childDir(tpath[0]) + if err != nil { + return nil, err + } + return dir.Open(tpath[1:], mode) +} + +// consider combining into a single method... +type childCloser interface { + closeChild(string, *dag.Node) error +} + +func (d *Directory) closeChild(name string, nd *dag.Node) error { + _, err := d.fs.dserv.Add(nd) + if err != nil { + return err + } + + d.lock.Lock() + err = d.node.RemoveNodeLink(name) + if err != nil && err != dag.ErrNotFound { + d.lock.Unlock() + return err + } + + err = d.node.AddNodeLinkClean(name, nd) + if err != nil { + d.lock.Unlock() + return err + } + d.lock.Unlock() + + return d.parent.closeChild(d.name, d.node) +} + +func (d *Directory) Type() NodeType { + return TDir +} + +func (d *Directory) childFile(name string) (*file, error) { + fi, ok := d.files[name] + if ok { + return fi, nil + } + + // search dag + for _, lnk := range d.node.Links { + if lnk.Name == name { + nd, err := lnk.GetNode(d.fs.dserv) + if err != nil { + return nil, err + } + i, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + switch i.GetType() { + case ufspb.Data_Directory: + return nil, ErrIsDirectory + case ufspb.Data_File: + nfi, err := NewFile(name, nd, d, d.fs) + if err != nil { + return nil, err + } + d.files[name] = nfi + return nfi, nil + case ufspb.Data_Metadata: + panic("NOT YET IMPLEMENTED") + default: + panic("NO!") + } + } + } + return nil, ErrNoSuch +} + +func (d *Directory) childDir(name string) (*Directory, error) { + dir, ok := d.childDirs[name] + if ok { + return dir, nil + } + + for _, lnk := range d.node.Links { + if lnk.Name == name { + nd, err := lnk.GetNode(d.fs.dserv) + if err != nil { + return nil, err + } + i, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + switch i.GetType() { + case ufspb.Data_Directory: + ndir := NewDirectory(name, nd, d, d.fs) + d.childDirs[name] = ndir + return ndir, nil + case ufspb.Data_File: + return nil, fmt.Errorf("%s is not a directory", name) + case ufspb.Data_Metadata: + panic("NOT YET IMPLEMENTED") + default: + panic("NO!") + } + } + + } + + return nil, ErrNoSuch +} + +func (d *Directory) Child(name string) (FSNode, error) { + d.lock.Lock() + defer d.lock.Unlock() + dir, err := d.childDir(name) + if err == nil { + return dir, nil + } + fi, err := d.childFile(name) + if err == nil { + return fi, nil + } + + return nil, ErrNoSuch +} + +func (d *Directory) List() []string { + d.lock.Lock() + defer d.lock.Unlock() + + var out []string + for _, lnk := range d.node.Links { + out = append(out, lnk.Name) + } + return out +} + +func (d *Directory) Mkdir(name string) (*Directory, error) { + d.lock.Lock() + + _, err := d.childDir(name) + if err == nil { + d.lock.Unlock() + return nil, errors.New("directory by that name already exists") + } + _, err = d.childFile(name) + if err == nil { + d.lock.Unlock() + return nil, errors.New("file by that name already exists") + } + + ndir := &dag.Node{Data: ft.FolderPBData()} + err = d.node.AddNodeLinkClean(name, ndir) + if err != nil { + d.lock.Unlock() + return nil, err + } + d.lock.Unlock() + + err = d.parent.closeChild(d.name, d.node) + if err != nil { + return nil, err + } + + d.lock.Lock() + defer d.lock.Unlock() + + return d.childDir(name) +} + +func (d *Directory) Unlink(name string) error { + d.lock.Lock() + delete(d.childDirs, name) + delete(d.files, name) + + err := d.node.RemoveNodeLink(name) + if err != nil { + d.lock.Unlock() + return err + } + d.lock.Unlock() + + return d.parent.closeChild(d.name, d.node) +} + +func (d *Directory) RenameEntry(oldname, newname string) error { + dir, err := d.childDir(oldname) + if err == nil { + dir.name = newname + + err := d.node.RemoveNodeLink(oldname) + if err != nil { + return err + } + err = d.node.AddNodeLinkClean(newname, dir.node) + if err != nil { + return err + } + + delete(d.childDirs, oldname) + d.childDirs[newname] = dir + return d.parent.closeChild(d.name, d.node) + } + + fi, err := d.childFile(oldname) + if err == nil { + fi.name = newname + + err := d.node.RemoveNodeLink(oldname) + if err != nil { + return err + } + + nd, err := fi.GetNode() + if err != nil { + return err + } + + err = d.node.AddNodeLinkClean(newname, nd) + if err != nil { + return err + } + + delete(d.childDirs, oldname) + d.files[newname] = fi + return d.parent.closeChild(d.name, d.node) + } + return ErrNoSuch +} + +func (d *Directory) AddChild(name string, nd *dag.Node) error { + pbn, err := ft.FromBytes(nd.Data) + if err != nil { + return err + } + + _, err = d.Child(name) + if err == nil { + return errors.New("directory already has entry by that name") + } + + err = d.node.AddNodeLinkClean(name, nd) + if err != nil { + return err + } + + switch pbn.GetType() { + case ft.TDirectory: + d.childDirs[name] = NewDirectory(name, nd, d, d.fs) + case ft.TFile, ft.TMetadata, ft.TRaw: + nfi, err := NewFile(name, nd, d, d.fs) + if err != nil { + return err + } + d.files[name] = nfi + default: + panic("invalid unixfs node") + } + return d.parent.closeChild(d.name, d.node) +} + +func (d *Directory) GetNode() (*dag.Node, error) { + return d.node, nil +} + +func (d *Directory) Upref() { + d.refLock.Lock() + d.ref++ + d.refLock.Unlock() +} + +func (d *Directory) Deref() { + d.refLock.Lock() + d.ref-- + d.refLock.Unlock() +} diff --git a/ipnsfs/file.go b/ipnsfs/file.go new file mode 100644 index 000000000..d5c951690 --- /dev/null +++ b/ipnsfs/file.go @@ -0,0 +1,145 @@ +package ipnsfs + +import ( + "errors" + "io" + "os" + "sync" + + chunk "github.com/jbenet/go-ipfs/importer/chunk" + dag "github.com/jbenet/go-ipfs/merkledag" + mod "github.com/jbenet/go-ipfs/unixfs/mod" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +type File interface { + io.ReadWriteCloser + io.WriterAt + Seek(int64, int) (int64, error) + Size() (int64, error) + Flush() error + Truncate(int64) error + FSNode +} + +type file struct { + parent childCloser + fs *Filesystem + + name string + hasChanges bool + + // TODO: determine whether or not locking here is actually required... + lk sync.Mutex + mod *mod.DagModifier +} + +func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*file, error) { + dmod, err := mod.NewDagModifier(context.TODO(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) + if err != nil { + return nil, err + } + + return &file{ + fs: fs, + parent: parent, + name: name, + mod: dmod, + }, nil +} + +func (fi *file) Write(b []byte) (int, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + fi.hasChanges = true + return fi.mod.Write(b) +} + +func (fi *file) Read(b []byte) (int, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.Read(b) +} + +func (fi *file) Close() error { + fi.lk.Lock() + defer fi.lk.Unlock() + if fi.hasChanges { + err := fi.mod.Flush() + if err != nil { + return err + } + + nd, err := fi.mod.GetNode() + if err != nil { + return err + } + + err = fi.parent.closeChild(fi.name, nd) + if err != nil { + return err + } + + fi.hasChanges = false + } + + return nil +} + +func (fi *file) Flush() error { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.Flush() +} + +func (fi *file) withMode(mode int) File { + if mode == os.O_RDONLY { + return &readOnlyFile{fi} + } + return fi +} + +func (fi *file) Seek(offset int64, whence int) (int64, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.Seek(offset, whence) +} + +func (fi *file) WriteAt(b []byte, at int64) (int, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + fi.hasChanges = true + return fi.mod.WriteAt(b, at) +} + +func (fi *file) Size() (int64, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.Size() +} + +func (fi *file) GetNode() (*dag.Node, error) { + fi.lk.Lock() + defer fi.lk.Unlock() + return fi.mod.GetNode() +} + +func (fi *file) Truncate(size int64) error { + fi.lk.Lock() + defer fi.lk.Unlock() + fi.hasChanges = true + return fi.mod.Truncate(size) +} + +func (fi *file) Type() NodeType { + return TFile +} + +type readOnlyFile struct { + *file +} + +func (ro *readOnlyFile) Write([]byte) (int, error) { + return 0, errors.New("permission denied: file readonly") +} diff --git a/ipnsfs/system.go b/ipnsfs/system.go new file mode 100644 index 000000000..e3f25fcc6 --- /dev/null +++ b/ipnsfs/system.go @@ -0,0 +1,285 @@ +package ipnsfs + +import ( + "errors" + "fmt" + "strings" + "time" + + dag "github.com/jbenet/go-ipfs/merkledag" + namesys "github.com/jbenet/go-ipfs/namesys" + ci "github.com/jbenet/go-ipfs/p2p/crypto" + pin "github.com/jbenet/go-ipfs/pin" + ft "github.com/jbenet/go-ipfs/unixfs" + u "github.com/jbenet/go-ipfs/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" +) + +var log = eventlog.Logger("ipnsfs") + +var ErrIsDirectory = errors.New("error: is a directory") + +var ErrNoSuch = errors.New("no such file or directory") + +// Filesystem is the writeable fuse filesystem structure +type Filesystem struct { + dserv dag.DAGService + + nsys namesys.NameSystem + + pins pin.Pinner + + roots map[string]*KeyRoot +} + +func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) { + roots := make(map[string]*KeyRoot) + fs := &Filesystem{ + roots: roots, + nsys: nsys, + dserv: ds, + pins: pins, + } + for _, k := range keys { + pkh, err := k.GetPublic().Hash() + if err != nil { + return nil, err + } + + root, err := fs.NewKeyRoot(ctx, k) + if err != nil { + return nil, err + } + roots[u.Key(pkh).Pretty()] = root + } + + return fs, nil +} + +func (fs *Filesystem) Open(tpath string, mode int) (File, error) { + pathelem := strings.Split(tpath, "/") + r, ok := fs.roots[pathelem[0]] + if !ok { + return nil, ErrNoSuch + } + + return r.Open(pathelem[1:], mode) +} + +func (fs *Filesystem) Close() error { + for _, r := range fs.roots { + err := r.Publish(context.TODO()) + if err != nil { + return err + } + } + return nil +} + +func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { + r, ok := fs.roots[name] + if ok { + return r, nil + } + return nil, ErrNoSuch +} + +type NodeType int + +const ( + TFile NodeType = iota + TDir +) + +type FSNode interface { + GetNode() (*dag.Node, error) + Type() NodeType +} + +// KeyRoot represents the root of a filesystem tree pointed to by a given keypair +type KeyRoot struct { + key ci.PrivKey + + // node is the merkledag node pointed to by this keypair + node *dag.Node + + // A pointer to the filesystem to access components + fs *Filesystem + + // val represents the node pointed to by this key. It can either be a File or a Directory + val FSNode + + repub *Republisher +} + +func (fs *Filesystem) NewKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { + hash, err := k.GetPublic().Hash() + if err != nil { + return nil, err + } + + name := u.Key(hash).Pretty() + + root := new(KeyRoot) + root.key = k + root.fs = fs + + ctx, cancel := context.WithCancel(parent) + defer cancel() + + pointsTo, err := fs.nsys.Resolve(ctx, name) + if err != nil { + err = namesys.InitializeKeyspace(ctx, fs.dserv, fs.nsys, fs.pins, k) + if err != nil { + return nil, err + } + + pointsTo, err = fs.nsys.Resolve(ctx, name) + if err != nil { + return nil, err + } + } + + mnode, err := fs.dserv.Get(pointsTo) + if err != nil { + return nil, err + } + + root.node = mnode + + root.repub = NewRepublisher(root, time.Millisecond*300, time.Second*3) + go root.repub.Run(parent) + + pbn, err := ft.FromBytes(mnode.Data) + if err != nil { + log.Error("IPNS pointer was not unixfs node") + return nil, err + } + + switch pbn.GetType() { + case ft.TDirectory: + root.val = NewDirectory(pointsTo.B58String(), mnode, root, fs) + case ft.TFile, ft.TMetadata, ft.TRaw: + fi, err := NewFile(pointsTo.B58String(), mnode, root, fs) + if err != nil { + return nil, err + } + root.val = fi + default: + panic("unrecognized! (NYI)") + } + return root, nil +} + +func (kr *KeyRoot) GetValue() FSNode { + return kr.val +} + +func (kr *KeyRoot) Open(tpath []string, mode int) (File, error) { + if kr.val == nil { + // No entry here... what should we do? + panic("nyi") + } + if len(tpath) > 0 { + // Make sure our root is a directory + dir, ok := kr.val.(*Directory) + if !ok { + return nil, fmt.Errorf("no such file or directory: %s", tpath[0]) + } + + return dir.Open(tpath, mode) + } + + switch t := kr.val.(type) { + case *Directory: + return nil, ErrIsDirectory + case File: + return t, nil + default: + panic("unrecognized type, should not happen") + } +} + +// closeChild implements the childCloser interface, and signals to the publisher that +// there are changes ready to be published +func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error { + kr.repub.Touch() + return nil +} + +// Publish publishes the ipns entry associated with this key +func (kr *KeyRoot) Publish(ctx context.Context) error { + child, ok := kr.val.(FSNode) + if !ok { + return errors.New("child of key root not valid type") + } + + nd, err := child.GetNode() + if err != nil { + return err + } + + k, err := kr.fs.dserv.Add(nd) + if err != nil { + return err + } + + fmt.Println("Publishing!") + return kr.fs.nsys.Publish(ctx, kr.key, k) +} + +// Republisher manages when to publish the ipns entry associated with a given key +type Republisher struct { + TimeoutLong time.Duration + TimeoutShort time.Duration + Publish chan struct{} + root *KeyRoot +} + +func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { + return &Republisher{ + TimeoutShort: tshort, + TimeoutLong: tlong, + Publish: make(chan struct{}, 1), + root: root, + } +} + +func (np *Republisher) Touch() { + select { + case np.Publish <- struct{}{}: + default: + } +} + +func (np *Republisher) Run(ctx context.Context) { + for { + select { + case <-np.Publish: + quick := time.After(np.TimeoutShort) + longer := time.After(np.TimeoutLong) + + wait: + select { + case <-quick: + case <-longer: + case <-ctx.Done(): + return + case <-np.Publish: + quick = time.After(np.TimeoutShort) + goto wait + } + + log.Info("Publishing Changes!") + err := np.root.Publish(ctx) + if err != nil { + log.Critical("republishRoot error: %s", err) + } + + case <-ctx.Done(): + return + } + } +} diff --git a/ipnsfs/system_test.go b/ipnsfs/system_test.go new file mode 100644 index 000000000..355cd3f87 --- /dev/null +++ b/ipnsfs/system_test.go @@ -0,0 +1,92 @@ +package ipnsfs_test + +import ( + "bytes" + "io/ioutil" + "os" + "path" + "testing" + + core "github.com/jbenet/go-ipfs/core" + . "github.com/jbenet/go-ipfs/ipnsfs" + u "github.com/jbenet/go-ipfs/util" +) + +func testFS(t *testing.T, nd *core.IpfsNode) *Filesystem { + fs, err := NewFilesystem(nd.Context(), nd.DAG, nd.Namesys, nd.Pinning, nd.PrivateKey) + if err != nil { + t.Fatal(err) + } + + return fs +} + +func TestBasic(t *testing.T) { + mock, err := core.NewMockNode() + if err != nil { + t.Fatal(err) + } + + fs := testFS(t, mock) + + k := u.Key(mock.Identity) + p := path.Join(k.B58String(), "file") + fi, err := fs.Open(p, os.O_CREATE) + if err != nil { + t.Fatal(err) + } + + data := []byte("Hello World") + n, err := fi.Write(data) + if err != nil { + t.Fatal(err) + } + + if n != len(data) { + t.Fatal("wrote incorrect amount") + } + + err = fi.Close() + if err != nil { + t.Fatal(err) + } + + nfi, err := fs.Open(p, os.O_RDONLY) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(nfi) + if err != nil { + t.Fatal(err) + } + + err = nfi.Close() + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(out, data) { + t.Fatal("Write failed.") + } + + err = fs.Close() + if err != nil { + t.Fatal(err) + } + + // Open the filesystem again, and try to read our file + nfs := testFS(t, mock) + + fi, err = nfs.Open(p, os.O_RDONLY) + nb, err := ioutil.ReadAll(fi) + if err != nil { + t.Fatal(err) + } + + t.Log(nb) + + if !bytes.Equal(nb, data) { + t.Fatal("data not the same after closing down fs") + } +} From 4f76e473d06409aaf24c221a2d2a23f352e14c4b Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 12 Mar 2015 13:34:52 -0700 Subject: [PATCH 2/9] a bit of cleanup --- ipnsfs/dir.go | 17 ++++++++++++----- ipnsfs/file.go | 23 +---------------------- ipnsfs/system_test.go | 2 ++ 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go index 979ace875..4ab665bb1 100644 --- a/ipnsfs/dir.go +++ b/ipnsfs/dir.go @@ -11,6 +11,9 @@ import ( ufspb "github.com/jbenet/go-ipfs/unixfs/pb" ) +var ErrNotYetImplemented = errors.New("not yet implemented") +var ErrInvalidChild = errors.New("invalid child node") + type Directory struct { fs *Filesystem parent childCloser @@ -128,9 +131,9 @@ func (d *Directory) childFile(name string) (*file, error) { d.files[name] = nfi return nfi, nil case ufspb.Data_Metadata: - panic("NOT YET IMPLEMENTED") + return nil, ErrNotYetImplemented default: - panic("NO!") + return nil, ErrInvalidChild } } } @@ -162,9 +165,9 @@ func (d *Directory) childDir(name string) (*Directory, error) { case ufspb.Data_File: return nil, fmt.Errorf("%s is not a directory", name) case ufspb.Data_Metadata: - panic("NOT YET IMPLEMENTED") + return nil, ErrNotYetImplemented default: - panic("NO!") + return nil, ErrInvalidChild } } @@ -247,7 +250,9 @@ func (d *Directory) Unlink(name string) error { return d.parent.closeChild(d.name, d.node) } +// RenameEntry renames the child by 'oldname' of this directory to 'newname' func (d *Directory) RenameEntry(oldname, newname string) error { + // Is the child a directory? dir, err := d.childDir(oldname) if err == nil { dir.name = newname @@ -266,6 +271,7 @@ func (d *Directory) RenameEntry(oldname, newname string) error { return d.parent.closeChild(d.name, d.node) } + // Is the child a file? fi, err := d.childFile(oldname) if err == nil { fi.name = newname @@ -292,6 +298,7 @@ func (d *Directory) RenameEntry(oldname, newname string) error { return ErrNoSuch } +// AddChild adds the node 'nd' under this directory giving it the name 'name' func (d *Directory) AddChild(name string, nd *dag.Node) error { pbn, err := ft.FromBytes(nd.Data) if err != nil { @@ -318,7 +325,7 @@ func (d *Directory) AddChild(name string, nd *dag.Node) error { } d.files[name] = nfi default: - panic("invalid unixfs node") + return ErrInvalidChild } return d.parent.closeChild(d.name, d.node) } diff --git a/ipnsfs/file.go b/ipnsfs/file.go index d5c951690..84cd764d6 100644 --- a/ipnsfs/file.go +++ b/ipnsfs/file.go @@ -4,7 +4,6 @@ import ( "errors" "io" "os" - "sync" chunk "github.com/jbenet/go-ipfs/importer/chunk" dag "github.com/jbenet/go-ipfs/merkledag" @@ -30,13 +29,11 @@ type file struct { name string hasChanges bool - // TODO: determine whether or not locking here is actually required... - lk sync.Mutex mod *mod.DagModifier } func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*file, error) { - dmod, err := mod.NewDagModifier(context.TODO(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) + dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) if err != nil { return nil, err } @@ -50,21 +47,15 @@ func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (* } func (fi *file) Write(b []byte) (int, error) { - fi.lk.Lock() - defer fi.lk.Unlock() fi.hasChanges = true return fi.mod.Write(b) } func (fi *file) Read(b []byte) (int, error) { - fi.lk.Lock() - defer fi.lk.Unlock() return fi.mod.Read(b) } func (fi *file) Close() error { - fi.lk.Lock() - defer fi.lk.Unlock() if fi.hasChanges { err := fi.mod.Flush() if err != nil { @@ -88,8 +79,6 @@ func (fi *file) Close() error { } func (fi *file) Flush() error { - fi.lk.Lock() - defer fi.lk.Unlock() return fi.mod.Flush() } @@ -101,33 +90,23 @@ func (fi *file) withMode(mode int) File { } func (fi *file) Seek(offset int64, whence int) (int64, error) { - fi.lk.Lock() - defer fi.lk.Unlock() return fi.mod.Seek(offset, whence) } func (fi *file) WriteAt(b []byte, at int64) (int, error) { - fi.lk.Lock() - defer fi.lk.Unlock() fi.hasChanges = true return fi.mod.WriteAt(b, at) } func (fi *file) Size() (int64, error) { - fi.lk.Lock() - defer fi.lk.Unlock() return fi.mod.Size() } func (fi *file) GetNode() (*dag.Node, error) { - fi.lk.Lock() - defer fi.lk.Unlock() return fi.mod.GetNode() } func (fi *file) Truncate(size int64) error { - fi.lk.Lock() - defer fi.lk.Unlock() fi.hasChanges = true return fi.mod.Truncate(size) } diff --git a/ipnsfs/system_test.go b/ipnsfs/system_test.go index 355cd3f87..601f1545f 100644 --- a/ipnsfs/system_test.go +++ b/ipnsfs/system_test.go @@ -21,6 +21,8 @@ func testFS(t *testing.T, nd *core.IpfsNode) *Filesystem { return fs } +// Test some basic operations +// testing in fuse/ipns is sufficient to prove this code works properly func TestBasic(t *testing.T) { mock, err := core.NewMockNode() if err != nil { From dc80116d7641544e67ef09838a100e92e1507de8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 13 Mar 2015 21:17:20 -0700 Subject: [PATCH 3/9] fix locking and race conditions throughout ipnsfs and the pinner --- ipnsfs/dir.go | 36 ++++++++++------------- ipnsfs/file.go | 76 ++++++++++++++++++++++++------------------------ ipnsfs/system.go | 12 ++++++-- 3 files changed, 63 insertions(+), 61 deletions(-) diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go index 4ab665bb1..fc44d97ab 100644 --- a/ipnsfs/dir.go +++ b/ipnsfs/dir.go @@ -15,17 +15,16 @@ var ErrNotYetImplemented = errors.New("not yet implemented") var ErrInvalidChild = errors.New("invalid child node") type Directory struct { - fs *Filesystem - parent childCloser + fs *Filesystem + parent childCloser + childDirs map[string]*Directory - files map[string]*file + files map[string]*File - node *dag.Node - name string lock sync.Mutex + node *dag.Node - ref int - refLock sync.Mutex + name string } func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory { @@ -35,18 +34,19 @@ func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesyste node: node, parent: parent, childDirs: make(map[string]*Directory), - files: make(map[string]*file), + files: make(map[string]*File), } } -func (d *Directory) Open(tpath []string, mode int) (File, error) { +// Open opens a file at the given path 'tpath' +func (d *Directory) Open(tpath []string, mode int) (*File, error) { if len(tpath) == 0 { return nil, ErrIsDirectory } if len(tpath) == 1 { fi, err := d.childFile(tpath[0]) if err == nil { - return fi.withMode(mode), nil + return fi, nil } if mode|os.O_CREATE != 0 { @@ -57,7 +57,7 @@ func (d *Directory) Open(tpath []string, mode int) (File, error) { return nil, err } d.files[tpath[0]] = nfi - return nfi.withMode(mode), nil + return nfi, nil } return nil, ErrNoSuch @@ -102,7 +102,7 @@ func (d *Directory) Type() NodeType { return TDir } -func (d *Directory) childFile(name string) (*file, error) { +func (d *Directory) childFile(name string) (*File, error) { fi, ok := d.files[name] if ok { return fi, nil @@ -334,14 +334,10 @@ func (d *Directory) GetNode() (*dag.Node, error) { return d.node, nil } -func (d *Directory) Upref() { - d.refLock.Lock() - d.ref++ - d.refLock.Unlock() +func (d *Directory) Lock() { + d.lock.Lock() } -func (d *Directory) Deref() { - d.refLock.Lock() - d.ref-- - d.refLock.Unlock() +func (d *Directory) Unlock() { + d.lock.Unlock() } diff --git a/ipnsfs/file.go b/ipnsfs/file.go index 84cd764d6..01ada9c89 100644 --- a/ipnsfs/file.go +++ b/ipnsfs/file.go @@ -1,9 +1,7 @@ package ipnsfs import ( - "errors" - "io" - "os" + "sync" chunk "github.com/jbenet/go-ipfs/importer/chunk" dag "github.com/jbenet/go-ipfs/merkledag" @@ -12,33 +10,24 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) -type File interface { - io.ReadWriteCloser - io.WriterAt - Seek(int64, int) (int64, error) - Size() (int64, error) - Flush() error - Truncate(int64) error - FSNode -} - -type file struct { +type File struct { parent childCloser fs *Filesystem name string hasChanges bool - mod *mod.DagModifier + mod *mod.DagModifier + lock sync.Mutex } -func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*file, error) { +func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) { dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) if err != nil { return nil, err } - return &file{ + return &File{ fs: fs, parent: parent, name: name, @@ -46,16 +35,20 @@ func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (* }, nil } -func (fi *file) Write(b []byte) (int, error) { +func (fi *File) Write(b []byte) (int, error) { fi.hasChanges = true return fi.mod.Write(b) } -func (fi *file) Read(b []byte) (int, error) { +func (fi *File) Read(b []byte) (int, error) { + fi.lock.Lock() + defer fi.lock.Unlock() return fi.mod.Read(b) } -func (fi *file) Close() error { +func (fi *File) Close() error { + fi.lock.Lock() + defer fi.lock.Unlock() if fi.hasChanges { err := fi.mod.Flush() if err != nil { @@ -67,7 +60,9 @@ func (fi *file) Close() error { return err } + fi.lock.Unlock() err = fi.parent.closeChild(fi.name, nd) + fi.lock.Lock() if err != nil { return err } @@ -78,47 +73,52 @@ func (fi *file) Close() error { return nil } -func (fi *file) Flush() error { +func (fi *File) Flush() error { + fi.lock.Lock() + defer fi.lock.Unlock() return fi.mod.Flush() } -func (fi *file) withMode(mode int) File { - if mode == os.O_RDONLY { - return &readOnlyFile{fi} - } - return fi -} - -func (fi *file) Seek(offset int64, whence int) (int64, error) { +func (fi *File) Seek(offset int64, whence int) (int64, error) { + fi.lock.Lock() + defer fi.lock.Unlock() return fi.mod.Seek(offset, whence) } -func (fi *file) WriteAt(b []byte, at int64) (int, error) { +func (fi *File) WriteAt(b []byte, at int64) (int, error) { + fi.lock.Lock() + defer fi.lock.Unlock() fi.hasChanges = true return fi.mod.WriteAt(b, at) } -func (fi *file) Size() (int64, error) { +func (fi *File) Size() (int64, error) { + fi.lock.Lock() + defer fi.lock.Unlock() return fi.mod.Size() } -func (fi *file) GetNode() (*dag.Node, error) { +func (fi *File) GetNode() (*dag.Node, error) { + fi.lock.Lock() + defer fi.lock.Unlock() return fi.mod.GetNode() } -func (fi *file) Truncate(size int64) error { +func (fi *File) Truncate(size int64) error { + fi.lock.Lock() + defer fi.lock.Unlock() fi.hasChanges = true return fi.mod.Truncate(size) } -func (fi *file) Type() NodeType { +func (fi *File) Type() NodeType { return TFile } -type readOnlyFile struct { - *file +func (fi *File) Lock() { + fi.lock.Lock() } -func (ro *readOnlyFile) Write([]byte) (int, error) { - return 0, errors.New("permission denied: file readonly") +func (fi *File) Unlock() { + fi.lock.Unlock() } diff --git a/ipnsfs/system.go b/ipnsfs/system.go index e3f25fcc6..e516e8668 100644 --- a/ipnsfs/system.go +++ b/ipnsfs/system.go @@ -58,7 +58,7 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst return fs, nil } -func (fs *Filesystem) Open(tpath string, mode int) (File, error) { +func (fs *Filesystem) Open(tpath string, mode int) (*File, error) { pathelem := strings.Split(tpath, "/") r, ok := fs.roots[pathelem[0]] if !ok { @@ -96,6 +96,8 @@ const ( type FSNode interface { GetNode() (*dag.Node, error) Type() NodeType + Lock() + Unlock() } // KeyRoot represents the root of a filesystem tree pointed to by a given keypair @@ -177,7 +179,7 @@ func (kr *KeyRoot) GetValue() FSNode { return kr.val } -func (kr *KeyRoot) Open(tpath []string, mode int) (File, error) { +func (kr *KeyRoot) Open(tpath []string, mode int) (*File, error) { if kr.val == nil { // No entry here... what should we do? panic("nyi") @@ -195,7 +197,7 @@ func (kr *KeyRoot) Open(tpath []string, mode int) (File, error) { switch t := kr.val.(type) { case *Directory: return nil, ErrIsDirectory - case File: + case *File: return t, nil default: panic("unrecognized type, should not happen") @@ -221,10 +223,14 @@ func (kr *KeyRoot) Publish(ctx context.Context) error { return err } + child.Lock() k, err := kr.fs.dserv.Add(nd) if err != nil { + child.Unlock() return err } + child.Unlock() + // Dont want to hold the lock while we publish fmt.Println("Publishing!") return kr.fs.nsys.Publish(ctx, kr.key, k) From eb228eb615e6c2ce02acfaf9a385a6b690be6888 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 14 Mar 2015 15:05:48 -0700 Subject: [PATCH 4/9] remove addCloser --- core/core.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/core/core.go b/core/core.go index 5d5eb3df4..eba3c4843 100644 --- a/core/core.go +++ b/core/core.go @@ -295,24 +295,25 @@ func (n *IpfsNode) teardown() error { n.Exchange, n.Repo, } - addCloser := func(c io.Closer) { // use when field may be nil - if c != nil { - closers = append(closers, c) - } - } if n.Blocks != nil { - addCloser(n.Blocks) + closers = append(closers, n.Blocks) } if n.IpnsFs != nil { - addCloser(n.IpnsFs) + closers = append(closers, n.IpnsFs) } - addCloser(n.Bootstrapper) - if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - addCloser(dht) + if n.Bootstrapper != nil { + closers = append(closers, n.Bootstrapper) + } + + if dht, ok := n.Routing.(*dht.IpfsDHT); ok { + closers = append(closers, dht) + } + + if n.PeerHost != nil { + closers = append(closers, n.PeerHost) } - addCloser(n.PeerHost) var errs []error for _, closer := range closers { From 0718bd73a72cc49a70d8eea19e5c9d99f1e8135f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 14 Mar 2015 21:45:52 -0700 Subject: [PATCH 5/9] fix locking --- ipnsfs/dir.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go index fc44d97ab..569d1fdaf 100644 --- a/ipnsfs/dir.go +++ b/ipnsfs/dir.go @@ -70,7 +70,6 @@ func (d *Directory) Open(tpath []string, mode int) (*File, error) { return dir.Open(tpath[1:], mode) } -// consider combining into a single method... type childCloser interface { closeChild(string, *dag.Node) error } @@ -82,18 +81,16 @@ func (d *Directory) closeChild(name string, nd *dag.Node) error { } d.lock.Lock() + defer d.lock.Unlock() err = d.node.RemoveNodeLink(name) if err != nil && err != dag.ErrNotFound { - d.lock.Unlock() return err } err = d.node.AddNodeLinkClean(name, nd) if err != nil { - d.lock.Unlock() return err } - d.lock.Unlock() return d.parent.closeChild(d.name, d.node) } @@ -179,6 +176,10 @@ func (d *Directory) childDir(name string) (*Directory, error) { func (d *Directory) Child(name string) (FSNode, error) { d.lock.Lock() defer d.lock.Unlock() + return d.childUnsync(name) +} + +func (d *Directory) childUnsync(name string) (FSNode, error) { dir, err := d.childDir(name) if err == nil { return dir, nil @@ -204,54 +205,50 @@ func (d *Directory) List() []string { func (d *Directory) Mkdir(name string) (*Directory, error) { d.lock.Lock() + defer d.lock.Unlock() _, err := d.childDir(name) if err == nil { - d.lock.Unlock() - return nil, errors.New("directory by that name already exists") + return nil, os.ErrExist } _, err = d.childFile(name) if err == nil { - d.lock.Unlock() - return nil, errors.New("file by that name already exists") + return nil, os.ErrExist } ndir := &dag.Node{Data: ft.FolderPBData()} err = d.node.AddNodeLinkClean(name, ndir) if err != nil { - d.lock.Unlock() return nil, err } - d.lock.Unlock() err = d.parent.closeChild(d.name, d.node) if err != nil { return nil, err } - d.lock.Lock() - defer d.lock.Unlock() - return d.childDir(name) } func (d *Directory) Unlink(name string) error { d.lock.Lock() + defer d.lock.Unlock() + delete(d.childDirs, name) delete(d.files, name) err := d.node.RemoveNodeLink(name) if err != nil { - d.lock.Unlock() return err } - d.lock.Unlock() return d.parent.closeChild(d.name, d.node) } // RenameEntry renames the child by 'oldname' of this directory to 'newname' func (d *Directory) RenameEntry(oldname, newname string) error { + d.Lock() + defer d.Unlock() // Is the child a directory? dir, err := d.childDir(oldname) if err == nil { @@ -300,12 +297,14 @@ func (d *Directory) RenameEntry(oldname, newname string) error { // AddChild adds the node 'nd' under this directory giving it the name 'name' func (d *Directory) AddChild(name string, nd *dag.Node) error { + d.Lock() + defer d.Unlock() pbn, err := ft.FromBytes(nd.Data) if err != nil { return err } - _, err = d.Child(name) + _, err = d.childUnsync(name) if err == nil { return errors.New("directory already has entry by that name") } From 0ee7091cbcb86f08a93d23418e7f9737fca767f7 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 15 Mar 2015 00:14:57 -0700 Subject: [PATCH 6/9] code cleanup --- ipnsfs/dir.go | 170 ++++++++++++++++++----------------------------- ipnsfs/file.go | 38 ++++++----- ipnsfs/system.go | 7 +- 3 files changed, 86 insertions(+), 129 deletions(-) diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go index 569d1fdaf..168d25a0f 100644 --- a/ipnsfs/dir.go +++ b/ipnsfs/dir.go @@ -60,7 +60,7 @@ func (d *Directory) Open(tpath []string, mode int) (*File, error) { return nfi, nil } - return nil, ErrNoSuch + return nil, os.ErrNotExist } dir, err := d.childDir(tpath[0]) @@ -99,86 +99,92 @@ func (d *Directory) Type() NodeType { return TDir } +// childFile returns a file under this directory by the given name if it exists func (d *Directory) childFile(name string) (*File, error) { fi, ok := d.files[name] if ok { return fi, nil } - // search dag - for _, lnk := range d.node.Links { - if lnk.Name == name { - nd, err := lnk.GetNode(d.fs.dserv) - if err != nil { - return nil, err - } - i, err := ft.FromBytes(nd.Data) - if err != nil { - return nil, err - } - - switch i.GetType() { - case ufspb.Data_Directory: - return nil, ErrIsDirectory - case ufspb.Data_File: - nfi, err := NewFile(name, nd, d, d.fs) - if err != nil { - return nil, err - } - d.files[name] = nfi - return nfi, nil - case ufspb.Data_Metadata: - return nil, ErrNotYetImplemented - default: - return nil, ErrInvalidChild - } - } + nd, err := d.childFromDag(name) + if err != nil { + return nil, err + } + i, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + switch i.GetType() { + case ufspb.Data_Directory: + return nil, ErrIsDirectory + case ufspb.Data_File: + nfi, err := NewFile(name, nd, d, d.fs) + if err != nil { + return nil, err + } + d.files[name] = nfi + return nfi, nil + case ufspb.Data_Metadata: + return nil, ErrNotYetImplemented + default: + return nil, ErrInvalidChild } - return nil, ErrNoSuch } +// childDir returns a directory under this directory by the given name if it +// exists. func (d *Directory) childDir(name string) (*Directory, error) { dir, ok := d.childDirs[name] if ok { return dir, nil } - for _, lnk := range d.node.Links { - if lnk.Name == name { - nd, err := lnk.GetNode(d.fs.dserv) - if err != nil { - return nil, err - } - i, err := ft.FromBytes(nd.Data) - if err != nil { - return nil, err - } - - switch i.GetType() { - case ufspb.Data_Directory: - ndir := NewDirectory(name, nd, d, d.fs) - d.childDirs[name] = ndir - return ndir, nil - case ufspb.Data_File: - return nil, fmt.Errorf("%s is not a directory", name) - case ufspb.Data_Metadata: - return nil, ErrNotYetImplemented - default: - return nil, ErrInvalidChild - } - } - + nd, err := d.childFromDag(name) + if err != nil { + return nil, err } - return nil, ErrNoSuch + i, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + switch i.GetType() { + case ufspb.Data_Directory: + ndir := NewDirectory(name, nd, d, d.fs) + d.childDirs[name] = ndir + return ndir, nil + case ufspb.Data_File: + return nil, fmt.Errorf("%s is not a directory", name) + case ufspb.Data_Metadata: + return nil, ErrNotYetImplemented + default: + return nil, ErrInvalidChild + } } +// childFromDag searches through this directories dag node for a child link +// with the given name +func (d *Directory) childFromDag(name string) (*dag.Node, error) { + for _, lnk := range d.node.Links { + if lnk.Name == name { + return lnk.GetNode(d.fs.dserv) + } + } + + return nil, os.ErrNotExist +} + +// Child returns the child of this directory by the given name func (d *Directory) Child(name string) (FSNode, error) { d.lock.Lock() defer d.lock.Unlock() return d.childUnsync(name) } +// childUnsync returns the child under this directory by the given name +// without locking, useful for operations which already hold a lock func (d *Directory) childUnsync(name string) (FSNode, error) { dir, err := d.childDir(name) if err == nil { @@ -189,7 +195,7 @@ func (d *Directory) childUnsync(name string) (FSNode, error) { return fi, nil } - return nil, ErrNoSuch + return nil, os.ErrNotExist } func (d *Directory) List() []string { @@ -245,56 +251,6 @@ func (d *Directory) Unlink(name string) error { return d.parent.closeChild(d.name, d.node) } -// RenameEntry renames the child by 'oldname' of this directory to 'newname' -func (d *Directory) RenameEntry(oldname, newname string) error { - d.Lock() - defer d.Unlock() - // Is the child a directory? - dir, err := d.childDir(oldname) - if err == nil { - dir.name = newname - - err := d.node.RemoveNodeLink(oldname) - if err != nil { - return err - } - err = d.node.AddNodeLinkClean(newname, dir.node) - if err != nil { - return err - } - - delete(d.childDirs, oldname) - d.childDirs[newname] = dir - return d.parent.closeChild(d.name, d.node) - } - - // Is the child a file? - fi, err := d.childFile(oldname) - if err == nil { - fi.name = newname - - err := d.node.RemoveNodeLink(oldname) - if err != nil { - return err - } - - nd, err := fi.GetNode() - if err != nil { - return err - } - - err = d.node.AddNodeLinkClean(newname, nd) - if err != nil { - return err - } - - delete(d.childDirs, oldname) - d.files[newname] = fi - return d.parent.closeChild(d.name, d.node) - } - return ErrNoSuch -} - // AddChild adds the node 'nd' under this directory giving it the name 'name' func (d *Directory) AddChild(name string, nd *dag.Node) error { d.Lock() diff --git a/ipnsfs/file.go b/ipnsfs/file.go index 01ada9c89..aa5a89b47 100644 --- a/ipnsfs/file.go +++ b/ipnsfs/file.go @@ -36,19 +36,21 @@ func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (* } func (fi *File) Write(b []byte) (int, error) { + fi.Lock() + defer fi.Unlock() fi.hasChanges = true return fi.mod.Write(b) } func (fi *File) Read(b []byte) (int, error) { - fi.lock.Lock() - defer fi.lock.Unlock() + fi.Lock() + defer fi.Unlock() return fi.mod.Read(b) } func (fi *File) Close() error { - fi.lock.Lock() - defer fi.lock.Unlock() + fi.Lock() + defer fi.Unlock() if fi.hasChanges { err := fi.mod.Flush() if err != nil { @@ -60,9 +62,9 @@ func (fi *File) Close() error { return err } - fi.lock.Unlock() + fi.Unlock() err = fi.parent.closeChild(fi.name, nd) - fi.lock.Lock() + fi.Lock() if err != nil { return err } @@ -74,39 +76,39 @@ func (fi *File) Close() error { } func (fi *File) Flush() error { - fi.lock.Lock() - defer fi.lock.Unlock() + fi.Lock() + defer fi.Unlock() return fi.mod.Flush() } func (fi *File) Seek(offset int64, whence int) (int64, error) { - fi.lock.Lock() - defer fi.lock.Unlock() + fi.Lock() + defer fi.Unlock() return fi.mod.Seek(offset, whence) } func (fi *File) WriteAt(b []byte, at int64) (int, error) { - fi.lock.Lock() - defer fi.lock.Unlock() + fi.Lock() + defer fi.Unlock() fi.hasChanges = true return fi.mod.WriteAt(b, at) } func (fi *File) Size() (int64, error) { - fi.lock.Lock() - defer fi.lock.Unlock() + fi.Lock() + defer fi.Unlock() return fi.mod.Size() } func (fi *File) GetNode() (*dag.Node, error) { - fi.lock.Lock() - defer fi.lock.Unlock() + fi.Lock() + defer fi.Unlock() return fi.mod.GetNode() } func (fi *File) Truncate(size int64) error { - fi.lock.Lock() - defer fi.lock.Unlock() + fi.Lock() + defer fi.Unlock() fi.hasChanges = true return fi.mod.Truncate(size) } diff --git a/ipnsfs/system.go b/ipnsfs/system.go index e516e8668..f14aa94d8 100644 --- a/ipnsfs/system.go +++ b/ipnsfs/system.go @@ -3,6 +3,7 @@ package ipnsfs import ( "errors" "fmt" + "os" "strings" "time" @@ -21,8 +22,6 @@ var log = eventlog.Logger("ipnsfs") var ErrIsDirectory = errors.New("error: is a directory") -var ErrNoSuch = errors.New("no such file or directory") - // Filesystem is the writeable fuse filesystem structure type Filesystem struct { dserv dag.DAGService @@ -62,7 +61,7 @@ func (fs *Filesystem) Open(tpath string, mode int) (*File, error) { pathelem := strings.Split(tpath, "/") r, ok := fs.roots[pathelem[0]] if !ok { - return nil, ErrNoSuch + return nil, os.ErrNotExist } return r.Open(pathelem[1:], mode) @@ -83,7 +82,7 @@ func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { if ok { return r, nil } - return nil, ErrNoSuch + return nil, os.ErrNotExist } type NodeType int From f679127d835563a1dd29f787fc58a063314f88c6 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 15 Mar 2015 11:13:59 -0700 Subject: [PATCH 7/9] comments! and cleanup --- core/core.go | 9 +++++--- ipnsfs/dir.go | 6 ++--- ipnsfs/file.go | 14 ++++++++++++ ipnsfs/system.go | 59 +++++++++++++++++++++++++++++++++++++++--------- 4 files changed, 70 insertions(+), 18 deletions(-) diff --git a/core/core.go b/core/core.go index eba3c4843..f7bfb7757 100644 --- a/core/core.go +++ b/core/core.go @@ -296,13 +296,16 @@ func (n *IpfsNode) teardown() error { n.Repo, } - if n.Blocks != nil { - closers = append(closers, n.Blocks) - } + // Filesystem needs to be closed before network, dht, and blockservice + // so it can use them as its shutting down if n.IpnsFs != nil { closers = append(closers, n.IpnsFs) } + if n.Blocks != nil { + closers = append(closers, n.Blocks) + } + if n.Bootstrapper != nil { closers = append(closers, n.Bootstrapper) } diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go index 168d25a0f..edc36db28 100644 --- a/ipnsfs/dir.go +++ b/ipnsfs/dir.go @@ -70,10 +70,8 @@ func (d *Directory) Open(tpath []string, mode int) (*File, error) { return dir.Open(tpath[1:], mode) } -type childCloser interface { - closeChild(string, *dag.Node) error -} - +// closeChild updates the child by the given name to the dag node 'nd' +// and changes its own dag node, then propogates the changes upward func (d *Directory) closeChild(name string, nd *dag.Node) error { _, err := d.fs.dserv.Add(nd) if err != nil { diff --git a/ipnsfs/file.go b/ipnsfs/file.go index aa5a89b47..d820d5b45 100644 --- a/ipnsfs/file.go +++ b/ipnsfs/file.go @@ -21,6 +21,7 @@ type File struct { lock sync.Mutex } +// NewFile returns a NewFile object with the given parameters func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) { dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) if err != nil { @@ -35,6 +36,7 @@ func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (* }, nil } +// Write writes the given data to the file at its current offset func (fi *File) Write(b []byte) (int, error) { fi.Lock() defer fi.Unlock() @@ -42,12 +44,15 @@ func (fi *File) Write(b []byte) (int, error) { return fi.mod.Write(b) } +// Read reads into the given buffer from the current offset func (fi *File) Read(b []byte) (int, error) { fi.Lock() defer fi.Unlock() return fi.mod.Read(b) } +// Close flushes, then propogates the modified dag node up the directory structure +// and signals a republish to occur func (fi *File) Close() error { fi.Lock() defer fi.Unlock() @@ -75,18 +80,21 @@ func (fi *File) Close() error { return nil } +// Flush flushes the changes in the file to disk func (fi *File) Flush() error { fi.Lock() defer fi.Unlock() return fi.mod.Flush() } +// Seek implements io.Seeker func (fi *File) Seek(offset int64, whence int) (int64, error) { fi.Lock() defer fi.Unlock() return fi.mod.Seek(offset, whence) } +// Write At writes the given bytes at the offset 'at' func (fi *File) WriteAt(b []byte, at int64) (int, error) { fi.Lock() defer fi.Unlock() @@ -94,18 +102,21 @@ func (fi *File) WriteAt(b []byte, at int64) (int, error) { return fi.mod.WriteAt(b, at) } +// Size returns the size of this file func (fi *File) Size() (int64, error) { fi.Lock() defer fi.Unlock() return fi.mod.Size() } +// GetNode returns the dag node associated with this file func (fi *File) GetNode() (*dag.Node, error) { fi.Lock() defer fi.Unlock() return fi.mod.GetNode() } +// Truncate truncates the file to size func (fi *File) Truncate(size int64) error { fi.Lock() defer fi.Unlock() @@ -113,14 +124,17 @@ func (fi *File) Truncate(size int64) error { return fi.mod.Truncate(size) } +// Type returns the type FSNode this is func (fi *File) Type() NodeType { return TFile } +// Lock the file func (fi *File) Lock() { fi.lock.Lock() } +// Unlock the file func (fi *File) Unlock() { fi.lock.Unlock() } diff --git a/ipnsfs/system.go b/ipnsfs/system.go index f14aa94d8..4e7395b06 100644 --- a/ipnsfs/system.go +++ b/ipnsfs/system.go @@ -1,3 +1,13 @@ +// package ipnsfs implements an in memory model of a mutable ipns filesystem, +// to be used by the fuse filesystem. +// +// It consists of four main structs: +// 1) The Filesystem +// The filesystem serves as a container and entry point for the ipns filesystem +// 2) KeyRoots +// KeyRoots represent the root of the keyspace controlled by a given keypair +// 3) Directories +// 4) Files package ipnsfs import ( @@ -5,6 +15,7 @@ import ( "fmt" "os" "strings" + "sync" "time" dag "github.com/jbenet/go-ipfs/merkledag" @@ -33,6 +44,7 @@ type Filesystem struct { roots map[string]*KeyRoot } +// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) { roots := make(map[string]*KeyRoot) fs := &Filesystem{ @@ -47,7 +59,7 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst return nil, err } - root, err := fs.NewKeyRoot(ctx, k) + root, err := fs.newKeyRoot(ctx, k) if err != nil { return nil, err } @@ -57,6 +69,7 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst return fs, nil } +// Open opens a file at the given path func (fs *Filesystem) Open(tpath string, mode int) (*File, error) { pathelem := strings.Split(tpath, "/") r, ok := fs.roots[pathelem[0]] @@ -68,15 +81,23 @@ func (fs *Filesystem) Open(tpath string, mode int) (*File, error) { } func (fs *Filesystem) Close() error { + wg := sync.WaitGroup{} for _, r := range fs.roots { - err := r.Publish(context.TODO()) - if err != nil { - return err - } + wg.Add(1) + go func(r *KeyRoot) { + defer wg.Done() + err := r.Publish(context.TODO()) + if err != nil { + log.Error(err) + return + } + }(r) } + wg.Wait() return nil } +// GetRoot returns the KeyRoot of the given name func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { r, ok := fs.roots[name] if ok { @@ -85,6 +106,10 @@ func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { return nil, os.ErrNotExist } +type childCloser interface { + closeChild(string, *dag.Node) error +} + type NodeType int const ( @@ -92,6 +117,7 @@ const ( TDir ) +// FSNode represents any node (directory, root, or file) in the ipns filesystem type FSNode interface { GetNode() (*dag.Node, error) Type() NodeType @@ -115,7 +141,9 @@ type KeyRoot struct { repub *Republisher } -func (fs *Filesystem) NewKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { +// newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine +// for it +func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { hash, err := k.GetPublic().Hash() if err != nil { return nil, err @@ -180,14 +208,14 @@ func (kr *KeyRoot) GetValue() FSNode { func (kr *KeyRoot) Open(tpath []string, mode int) (*File, error) { if kr.val == nil { - // No entry here... what should we do? - panic("nyi") + // No entry here. KeyRoot was created incorrectly + panic("nil keyroot.val, improperly constructed keyroot") } if len(tpath) > 0 { // Make sure our root is a directory dir, ok := kr.val.(*Directory) if !ok { - return nil, fmt.Errorf("no such file or directory: %s", tpath[0]) + return nil, os.ErrNotExist } return dir.Open(tpath, mode) @@ -222,6 +250,7 @@ func (kr *KeyRoot) Publish(ctx context.Context) error { return err } + // Holding this lock so our child doesnt change out from under us child.Lock() k, err := kr.fs.dserv.Add(nd) if err != nil { @@ -230,6 +259,8 @@ func (kr *KeyRoot) Publish(ctx context.Context) error { } child.Unlock() // Dont want to hold the lock while we publish + // otherwise we are holding the lock through a costly + // network operation fmt.Println("Publishing!") return kr.fs.nsys.Publish(ctx, kr.key, k) @@ -243,6 +274,8 @@ type Republisher struct { root *KeyRoot } +// NewRepublisher creates a new Republisher object to republish the given keyroot +// using the given short and long time intervals func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { return &Republisher{ TimeoutShort: tshort, @@ -252,6 +285,9 @@ func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { } } +// Touch signals that an update has occurred since the last publish. +// Multiple consecutive touches may extend the time period before +// the next Publish occurs in order to more efficiently batch updates func (np *Republisher) Touch() { select { case np.Publish <- struct{}{}: @@ -259,6 +295,7 @@ func (np *Republisher) Touch() { } } +// Run is the main republisher loop func (np *Republisher) Run(ctx context.Context) { for { select { @@ -268,13 +305,13 @@ func (np *Republisher) Run(ctx context.Context) { wait: select { - case <-quick: - case <-longer: case <-ctx.Done(): return case <-np.Publish: quick = time.After(np.TimeoutShort) goto wait + case <-quick: + case <-longer: } log.Info("Publishing Changes!") From d588636ce82888ec136d60ff99e0a6fe48f12783 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 15 Mar 2015 17:21:36 -0700 Subject: [PATCH 8/9] remove 'Open' from ipnsfs --- ipnsfs/dir.go | 32 --------------- ipnsfs/system.go | 37 ----------------- ipnsfs/system_test.go | 94 ------------------------------------------- 3 files changed, 163 deletions(-) delete mode 100644 ipnsfs/system_test.go diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go index edc36db28..ce50a874b 100644 --- a/ipnsfs/dir.go +++ b/ipnsfs/dir.go @@ -38,38 +38,6 @@ func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesyste } } -// Open opens a file at the given path 'tpath' -func (d *Directory) Open(tpath []string, mode int) (*File, error) { - if len(tpath) == 0 { - return nil, ErrIsDirectory - } - if len(tpath) == 1 { - fi, err := d.childFile(tpath[0]) - if err == nil { - return fi, nil - } - - if mode|os.O_CREATE != 0 { - fnode := new(dag.Node) - fnode.Data = ft.FilePBData(nil, 0) - nfi, err := NewFile(tpath[0], fnode, d, d.fs) - if err != nil { - return nil, err - } - d.files[tpath[0]] = nfi - return nfi, nil - } - - return nil, os.ErrNotExist - } - - dir, err := d.childDir(tpath[0]) - if err != nil { - return nil, err - } - return dir.Open(tpath[1:], mode) -} - // closeChild updates the child by the given name to the dag node 'nd' // and changes its own dag node, then propogates the changes upward func (d *Directory) closeChild(name string, nd *dag.Node) error { diff --git a/ipnsfs/system.go b/ipnsfs/system.go index 4e7395b06..9ad561a49 100644 --- a/ipnsfs/system.go +++ b/ipnsfs/system.go @@ -14,7 +14,6 @@ import ( "errors" "fmt" "os" - "strings" "sync" "time" @@ -69,17 +68,6 @@ func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSyst return fs, nil } -// Open opens a file at the given path -func (fs *Filesystem) Open(tpath string, mode int) (*File, error) { - pathelem := strings.Split(tpath, "/") - r, ok := fs.roots[pathelem[0]] - if !ok { - return nil, os.ErrNotExist - } - - return r.Open(pathelem[1:], mode) -} - func (fs *Filesystem) Close() error { wg := sync.WaitGroup{} for _, r := range fs.roots { @@ -206,31 +194,6 @@ func (kr *KeyRoot) GetValue() FSNode { return kr.val } -func (kr *KeyRoot) Open(tpath []string, mode int) (*File, error) { - if kr.val == nil { - // No entry here. KeyRoot was created incorrectly - panic("nil keyroot.val, improperly constructed keyroot") - } - if len(tpath) > 0 { - // Make sure our root is a directory - dir, ok := kr.val.(*Directory) - if !ok { - return nil, os.ErrNotExist - } - - return dir.Open(tpath, mode) - } - - switch t := kr.val.(type) { - case *Directory: - return nil, ErrIsDirectory - case *File: - return t, nil - default: - panic("unrecognized type, should not happen") - } -} - // closeChild implements the childCloser interface, and signals to the publisher that // there are changes ready to be published func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error { diff --git a/ipnsfs/system_test.go b/ipnsfs/system_test.go deleted file mode 100644 index 601f1545f..000000000 --- a/ipnsfs/system_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package ipnsfs_test - -import ( - "bytes" - "io/ioutil" - "os" - "path" - "testing" - - core "github.com/jbenet/go-ipfs/core" - . "github.com/jbenet/go-ipfs/ipnsfs" - u "github.com/jbenet/go-ipfs/util" -) - -func testFS(t *testing.T, nd *core.IpfsNode) *Filesystem { - fs, err := NewFilesystem(nd.Context(), nd.DAG, nd.Namesys, nd.Pinning, nd.PrivateKey) - if err != nil { - t.Fatal(err) - } - - return fs -} - -// Test some basic operations -// testing in fuse/ipns is sufficient to prove this code works properly -func TestBasic(t *testing.T) { - mock, err := core.NewMockNode() - if err != nil { - t.Fatal(err) - } - - fs := testFS(t, mock) - - k := u.Key(mock.Identity) - p := path.Join(k.B58String(), "file") - fi, err := fs.Open(p, os.O_CREATE) - if err != nil { - t.Fatal(err) - } - - data := []byte("Hello World") - n, err := fi.Write(data) - if err != nil { - t.Fatal(err) - } - - if n != len(data) { - t.Fatal("wrote incorrect amount") - } - - err = fi.Close() - if err != nil { - t.Fatal(err) - } - - nfi, err := fs.Open(p, os.O_RDONLY) - if err != nil { - t.Fatal(err) - } - - out, err := ioutil.ReadAll(nfi) - if err != nil { - t.Fatal(err) - } - - err = nfi.Close() - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(out, data) { - t.Fatal("Write failed.") - } - - err = fs.Close() - if err != nil { - t.Fatal(err) - } - - // Open the filesystem again, and try to read our file - nfs := testFS(t, mock) - - fi, err = nfs.Open(p, os.O_RDONLY) - nb, err := ioutil.ReadAll(fi) - if err != nil { - t.Fatal(err) - } - - t.Log(nb) - - if !bytes.Equal(nb, data) { - t.Fatal("data not the same after closing down fs") - } -} From 7ad8dd85238aeaeafe7c0ca58180cc2e92db7bef Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 17 Mar 2015 21:03:37 -0700 Subject: [PATCH 9/9] ignore bootstrap failures in namesys initialization --- core/core.go | 3 ++- routing/dht/lookup.go | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/core.go b/core/core.go index f7bfb7757..db16a153f 100644 --- a/core/core.go +++ b/core/core.go @@ -26,6 +26,7 @@ import ( routing "github.com/jbenet/go-ipfs/routing" dht "github.com/jbenet/go-ipfs/routing/dht" + kb "github.com/jbenet/go-ipfs/routing/kbucket" offroute "github.com/jbenet/go-ipfs/routing/offline" bstore "github.com/jbenet/go-ipfs/blocks/blockstore" @@ -145,7 +146,7 @@ func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) // Setup the mutable ipns filesystem structure if node.OnlineMode() { fs, err := ipnsfs.NewFilesystem(ctx, node.DAG, node.Namesys, node.Pinning, node.PrivateKey) - if err != nil { + if err != nil && err != kb.ErrLookupFailure { return nil, debugerror.Wrap(err) } node.IpnsFs = fs diff --git a/routing/dht/lookup.go b/routing/dht/lookup.go index 59ef3911f..1f01a082a 100644 --- a/routing/dht/lookup.go +++ b/routing/dht/lookup.go @@ -6,7 +6,6 @@ import ( peer "github.com/jbenet/go-ipfs/p2p/peer" kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" - errors "github.com/jbenet/go-ipfs/util/debugerror" pset "github.com/jbenet/go-ipfs/util/peerset" ) @@ -26,7 +25,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key u.Key) (<-chan peer e := log.EventBegin(ctx, "getClosestPeers", &key) tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) if len(tablepeers) == 0 { - return nil, errors.Wrap(kb.ErrLookupFailure) + return nil, kb.ErrLookupFailure } out := make(chan peer.ID, KValue)