diff --git a/core/core.go b/core/core.go index 45767728b..db16a153f 100644 --- a/core/core.go +++ b/core/core.go @@ -26,6 +26,7 @@ import ( routing "github.com/jbenet/go-ipfs/routing" dht "github.com/jbenet/go-ipfs/routing/dht" + kb "github.com/jbenet/go-ipfs/routing/kbucket" offroute "github.com/jbenet/go-ipfs/routing/offline" bstore "github.com/jbenet/go-ipfs/blocks/blockstore" @@ -37,6 +38,7 @@ import ( rp "github.com/jbenet/go-ipfs/exchange/reprovide" mount "github.com/jbenet/go-ipfs/fuse/mount" + ipnsfs "github.com/jbenet/go-ipfs/ipnsfs" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" path "github.com/jbenet/go-ipfs/path" @@ -89,6 +91,8 @@ type IpfsNode struct { Diagnostics *diag.Diagnostics // the diagnostics service Reprovider *rp.Reprovider // the value reprovider system + IpnsFs *ipnsfs.Filesystem + ctxgroup.ContextGroup mode mode @@ -138,6 +142,16 @@ func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) node.Pinning = pin.NewPinner(node.Repo.Datastore(), node.DAG) } node.Resolver = &path.Resolver{DAG: node.DAG} + + // Setup the mutable ipns filesystem structure + if node.OnlineMode() { + fs, err := ipnsfs.NewFilesystem(ctx, node.DAG, node.Namesys, node.Pinning, node.PrivateKey) + if err != nil && err != kb.ErrLookupFailure { + return nil, debugerror.Wrap(err) + } + node.IpnsFs = fs + } + success = true return node, nil } @@ -268,6 +282,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost // setup name system n.Namesys = namesys.NewNameSystem(n.Routing) + return nil } @@ -278,21 +293,31 @@ func (n *IpfsNode) teardown() error { // owned objects are closed in this teardown to ensure that they're closed // regardless of which constructor was used to add them to the node. closers := []io.Closer{ - n.Blocks, n.Exchange, n.Repo, } - addCloser := func(c io.Closer) { // use when field may be nil - if c != nil { - closers = append(closers, c) - } + + // Filesystem needs to be closed before network, dht, and blockservice + // so it can use them as its shutting down + if n.IpnsFs != nil { + closers = append(closers, n.IpnsFs) } - addCloser(n.Bootstrapper) - if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - addCloser(dht) + if n.Blocks != nil { + closers = append(closers, n.Blocks) + } + + if n.Bootstrapper != nil { + closers = append(closers, n.Bootstrapper) + } + + if dht, ok := n.Routing.(*dht.IpfsDHT); ok { + closers = append(closers, dht) + } + + if n.PeerHost != nil { + closers = append(closers, n.PeerHost) } - addCloser(n.PeerHost) var errs []error for _, closer := range closers { diff --git a/core/mock.go b/core/mock.go index 987ba02b0..149795cef 100644 --- a/core/mock.go +++ b/core/mock.go @@ -13,8 +13,9 @@ import ( mocknet "github.com/jbenet/go-ipfs/p2p/net/mock" peer "github.com/jbenet/go-ipfs/p2p/peer" path "github.com/jbenet/go-ipfs/path" + pin "github.com/jbenet/go-ipfs/pin" "github.com/jbenet/go-ipfs/repo" - mockrouting "github.com/jbenet/go-ipfs/routing/mock" + offrt "github.com/jbenet/go-ipfs/routing/offline" ds2 "github.com/jbenet/go-ipfs/util/datastore2" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -54,7 +55,7 @@ func NewMockNode() (*IpfsNode, error) { } // Routing - nd.Routing = mockrouting.NewServer().Client(ident) + nd.Routing = offrt.NewOfflineRouter(nd.Repo.Datastore(), nd.PrivateKey) // Bitswap bstore := blockstore.NewBlockstore(nd.Repo.Datastore()) @@ -65,6 +66,8 @@ func NewMockNode() (*IpfsNode, error) { nd.DAG = mdag.NewDAGService(bserv) + nd.Pinning = pin.NewPinner(nd.Repo.Datastore(), nd.DAG) + // Namespace resolver nd.Namesys = nsys.NewNameSystem(nd.Routing) diff --git a/ipnsfs/dir.go b/ipnsfs/dir.go new file mode 100644 index 000000000..ce50a874b --- /dev/null +++ b/ipnsfs/dir.go @@ -0,0 +1,264 @@ +package ipnsfs + +import ( + "errors" + "fmt" + "os" + "sync" + + dag "github.com/jbenet/go-ipfs/merkledag" + ft "github.com/jbenet/go-ipfs/unixfs" + ufspb "github.com/jbenet/go-ipfs/unixfs/pb" +) + +var ErrNotYetImplemented = errors.New("not yet implemented") +var ErrInvalidChild = errors.New("invalid child node") + +type Directory struct { + fs *Filesystem + parent childCloser + + childDirs map[string]*Directory + files map[string]*File + + lock sync.Mutex + node *dag.Node + + name string +} + +func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory { + return &Directory{ + fs: fs, + name: name, + node: node, + parent: parent, + childDirs: make(map[string]*Directory), + files: make(map[string]*File), + } +} + +// closeChild updates the child by the given name to the dag node 'nd' +// and changes its own dag node, then propogates the changes upward +func (d *Directory) closeChild(name string, nd *dag.Node) error { + _, err := d.fs.dserv.Add(nd) + if err != nil { + return err + } + + d.lock.Lock() + defer d.lock.Unlock() + err = d.node.RemoveNodeLink(name) + if err != nil && err != dag.ErrNotFound { + return err + } + + err = d.node.AddNodeLinkClean(name, nd) + if err != nil { + return err + } + + return d.parent.closeChild(d.name, d.node) +} + +func (d *Directory) Type() NodeType { + return TDir +} + +// childFile returns a file under this directory by the given name if it exists +func (d *Directory) childFile(name string) (*File, error) { + fi, ok := d.files[name] + if ok { + return fi, nil + } + + nd, err := d.childFromDag(name) + if err != nil { + return nil, err + } + i, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + switch i.GetType() { + case ufspb.Data_Directory: + return nil, ErrIsDirectory + case ufspb.Data_File: + nfi, err := NewFile(name, nd, d, d.fs) + if err != nil { + return nil, err + } + d.files[name] = nfi + return nfi, nil + case ufspb.Data_Metadata: + return nil, ErrNotYetImplemented + default: + return nil, ErrInvalidChild + } +} + +// childDir returns a directory under this directory by the given name if it +// exists. +func (d *Directory) childDir(name string) (*Directory, error) { + dir, ok := d.childDirs[name] + if ok { + return dir, nil + } + + nd, err := d.childFromDag(name) + if err != nil { + return nil, err + } + + i, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + switch i.GetType() { + case ufspb.Data_Directory: + ndir := NewDirectory(name, nd, d, d.fs) + d.childDirs[name] = ndir + return ndir, nil + case ufspb.Data_File: + return nil, fmt.Errorf("%s is not a directory", name) + case ufspb.Data_Metadata: + return nil, ErrNotYetImplemented + default: + return nil, ErrInvalidChild + } +} + +// childFromDag searches through this directories dag node for a child link +// with the given name +func (d *Directory) childFromDag(name string) (*dag.Node, error) { + for _, lnk := range d.node.Links { + if lnk.Name == name { + return lnk.GetNode(d.fs.dserv) + } + } + + return nil, os.ErrNotExist +} + +// Child returns the child of this directory by the given name +func (d *Directory) Child(name string) (FSNode, error) { + d.lock.Lock() + defer d.lock.Unlock() + return d.childUnsync(name) +} + +// childUnsync returns the child under this directory by the given name +// without locking, useful for operations which already hold a lock +func (d *Directory) childUnsync(name string) (FSNode, error) { + dir, err := d.childDir(name) + if err == nil { + return dir, nil + } + fi, err := d.childFile(name) + if err == nil { + return fi, nil + } + + return nil, os.ErrNotExist +} + +func (d *Directory) List() []string { + d.lock.Lock() + defer d.lock.Unlock() + + var out []string + for _, lnk := range d.node.Links { + out = append(out, lnk.Name) + } + return out +} + +func (d *Directory) Mkdir(name string) (*Directory, error) { + d.lock.Lock() + defer d.lock.Unlock() + + _, err := d.childDir(name) + if err == nil { + return nil, os.ErrExist + } + _, err = d.childFile(name) + if err == nil { + return nil, os.ErrExist + } + + ndir := &dag.Node{Data: ft.FolderPBData()} + err = d.node.AddNodeLinkClean(name, ndir) + if err != nil { + return nil, err + } + + err = d.parent.closeChild(d.name, d.node) + if err != nil { + return nil, err + } + + return d.childDir(name) +} + +func (d *Directory) Unlink(name string) error { + d.lock.Lock() + defer d.lock.Unlock() + + delete(d.childDirs, name) + delete(d.files, name) + + err := d.node.RemoveNodeLink(name) + if err != nil { + return err + } + + return d.parent.closeChild(d.name, d.node) +} + +// AddChild adds the node 'nd' under this directory giving it the name 'name' +func (d *Directory) AddChild(name string, nd *dag.Node) error { + d.Lock() + defer d.Unlock() + pbn, err := ft.FromBytes(nd.Data) + if err != nil { + return err + } + + _, err = d.childUnsync(name) + if err == nil { + return errors.New("directory already has entry by that name") + } + + err = d.node.AddNodeLinkClean(name, nd) + if err != nil { + return err + } + + switch pbn.GetType() { + case ft.TDirectory: + d.childDirs[name] = NewDirectory(name, nd, d, d.fs) + case ft.TFile, ft.TMetadata, ft.TRaw: + nfi, err := NewFile(name, nd, d, d.fs) + if err != nil { + return err + } + d.files[name] = nfi + default: + return ErrInvalidChild + } + return d.parent.closeChild(d.name, d.node) +} + +func (d *Directory) GetNode() (*dag.Node, error) { + return d.node, nil +} + +func (d *Directory) Lock() { + d.lock.Lock() +} + +func (d *Directory) Unlock() { + d.lock.Unlock() +} diff --git a/ipnsfs/file.go b/ipnsfs/file.go new file mode 100644 index 000000000..d820d5b45 --- /dev/null +++ b/ipnsfs/file.go @@ -0,0 +1,140 @@ +package ipnsfs + +import ( + "sync" + + chunk "github.com/jbenet/go-ipfs/importer/chunk" + dag "github.com/jbenet/go-ipfs/merkledag" + mod "github.com/jbenet/go-ipfs/unixfs/mod" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +type File struct { + parent childCloser + fs *Filesystem + + name string + hasChanges bool + + mod *mod.DagModifier + lock sync.Mutex +} + +// NewFile returns a NewFile object with the given parameters +func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) { + dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter) + if err != nil { + return nil, err + } + + return &File{ + fs: fs, + parent: parent, + name: name, + mod: dmod, + }, nil +} + +// Write writes the given data to the file at its current offset +func (fi *File) Write(b []byte) (int, error) { + fi.Lock() + defer fi.Unlock() + fi.hasChanges = true + return fi.mod.Write(b) +} + +// Read reads into the given buffer from the current offset +func (fi *File) Read(b []byte) (int, error) { + fi.Lock() + defer fi.Unlock() + return fi.mod.Read(b) +} + +// Close flushes, then propogates the modified dag node up the directory structure +// and signals a republish to occur +func (fi *File) Close() error { + fi.Lock() + defer fi.Unlock() + if fi.hasChanges { + err := fi.mod.Flush() + if err != nil { + return err + } + + nd, err := fi.mod.GetNode() + if err != nil { + return err + } + + fi.Unlock() + err = fi.parent.closeChild(fi.name, nd) + fi.Lock() + if err != nil { + return err + } + + fi.hasChanges = false + } + + return nil +} + +// Flush flushes the changes in the file to disk +func (fi *File) Flush() error { + fi.Lock() + defer fi.Unlock() + return fi.mod.Flush() +} + +// Seek implements io.Seeker +func (fi *File) Seek(offset int64, whence int) (int64, error) { + fi.Lock() + defer fi.Unlock() + return fi.mod.Seek(offset, whence) +} + +// Write At writes the given bytes at the offset 'at' +func (fi *File) WriteAt(b []byte, at int64) (int, error) { + fi.Lock() + defer fi.Unlock() + fi.hasChanges = true + return fi.mod.WriteAt(b, at) +} + +// Size returns the size of this file +func (fi *File) Size() (int64, error) { + fi.Lock() + defer fi.Unlock() + return fi.mod.Size() +} + +// GetNode returns the dag node associated with this file +func (fi *File) GetNode() (*dag.Node, error) { + fi.Lock() + defer fi.Unlock() + return fi.mod.GetNode() +} + +// Truncate truncates the file to size +func (fi *File) Truncate(size int64) error { + fi.Lock() + defer fi.Unlock() + fi.hasChanges = true + return fi.mod.Truncate(size) +} + +// Type returns the type FSNode this is +func (fi *File) Type() NodeType { + return TFile +} + +// Lock the file +func (fi *File) Lock() { + fi.lock.Lock() +} + +// Unlock the file +func (fi *File) Unlock() { + fi.lock.Unlock() +} diff --git a/ipnsfs/system.go b/ipnsfs/system.go new file mode 100644 index 000000000..9ad561a49 --- /dev/null +++ b/ipnsfs/system.go @@ -0,0 +1,290 @@ +// package ipnsfs implements an in memory model of a mutable ipns filesystem, +// to be used by the fuse filesystem. +// +// It consists of four main structs: +// 1) The Filesystem +// The filesystem serves as a container and entry point for the ipns filesystem +// 2) KeyRoots +// KeyRoots represent the root of the keyspace controlled by a given keypair +// 3) Directories +// 4) Files +package ipnsfs + +import ( + "errors" + "fmt" + "os" + "sync" + "time" + + dag "github.com/jbenet/go-ipfs/merkledag" + namesys "github.com/jbenet/go-ipfs/namesys" + ci "github.com/jbenet/go-ipfs/p2p/crypto" + pin "github.com/jbenet/go-ipfs/pin" + ft "github.com/jbenet/go-ipfs/unixfs" + u "github.com/jbenet/go-ipfs/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" +) + +var log = eventlog.Logger("ipnsfs") + +var ErrIsDirectory = errors.New("error: is a directory") + +// Filesystem is the writeable fuse filesystem structure +type Filesystem struct { + dserv dag.DAGService + + nsys namesys.NameSystem + + pins pin.Pinner + + roots map[string]*KeyRoot +} + +// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys +func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) { + roots := make(map[string]*KeyRoot) + fs := &Filesystem{ + roots: roots, + nsys: nsys, + dserv: ds, + pins: pins, + } + for _, k := range keys { + pkh, err := k.GetPublic().Hash() + if err != nil { + return nil, err + } + + root, err := fs.newKeyRoot(ctx, k) + if err != nil { + return nil, err + } + roots[u.Key(pkh).Pretty()] = root + } + + return fs, nil +} + +func (fs *Filesystem) Close() error { + wg := sync.WaitGroup{} + for _, r := range fs.roots { + wg.Add(1) + go func(r *KeyRoot) { + defer wg.Done() + err := r.Publish(context.TODO()) + if err != nil { + log.Error(err) + return + } + }(r) + } + wg.Wait() + return nil +} + +// GetRoot returns the KeyRoot of the given name +func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { + r, ok := fs.roots[name] + if ok { + return r, nil + } + return nil, os.ErrNotExist +} + +type childCloser interface { + closeChild(string, *dag.Node) error +} + +type NodeType int + +const ( + TFile NodeType = iota + TDir +) + +// FSNode represents any node (directory, root, or file) in the ipns filesystem +type FSNode interface { + GetNode() (*dag.Node, error) + Type() NodeType + Lock() + Unlock() +} + +// KeyRoot represents the root of a filesystem tree pointed to by a given keypair +type KeyRoot struct { + key ci.PrivKey + + // node is the merkledag node pointed to by this keypair + node *dag.Node + + // A pointer to the filesystem to access components + fs *Filesystem + + // val represents the node pointed to by this key. It can either be a File or a Directory + val FSNode + + repub *Republisher +} + +// newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine +// for it +func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { + hash, err := k.GetPublic().Hash() + if err != nil { + return nil, err + } + + name := u.Key(hash).Pretty() + + root := new(KeyRoot) + root.key = k + root.fs = fs + + ctx, cancel := context.WithCancel(parent) + defer cancel() + + pointsTo, err := fs.nsys.Resolve(ctx, name) + if err != nil { + err = namesys.InitializeKeyspace(ctx, fs.dserv, fs.nsys, fs.pins, k) + if err != nil { + return nil, err + } + + pointsTo, err = fs.nsys.Resolve(ctx, name) + if err != nil { + return nil, err + } + } + + mnode, err := fs.dserv.Get(pointsTo) + if err != nil { + return nil, err + } + + root.node = mnode + + root.repub = NewRepublisher(root, time.Millisecond*300, time.Second*3) + go root.repub.Run(parent) + + pbn, err := ft.FromBytes(mnode.Data) + if err != nil { + log.Error("IPNS pointer was not unixfs node") + return nil, err + } + + switch pbn.GetType() { + case ft.TDirectory: + root.val = NewDirectory(pointsTo.B58String(), mnode, root, fs) + case ft.TFile, ft.TMetadata, ft.TRaw: + fi, err := NewFile(pointsTo.B58String(), mnode, root, fs) + if err != nil { + return nil, err + } + root.val = fi + default: + panic("unrecognized! (NYI)") + } + return root, nil +} + +func (kr *KeyRoot) GetValue() FSNode { + return kr.val +} + +// closeChild implements the childCloser interface, and signals to the publisher that +// there are changes ready to be published +func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error { + kr.repub.Touch() + return nil +} + +// Publish publishes the ipns entry associated with this key +func (kr *KeyRoot) Publish(ctx context.Context) error { + child, ok := kr.val.(FSNode) + if !ok { + return errors.New("child of key root not valid type") + } + + nd, err := child.GetNode() + if err != nil { + return err + } + + // Holding this lock so our child doesnt change out from under us + child.Lock() + k, err := kr.fs.dserv.Add(nd) + if err != nil { + child.Unlock() + return err + } + child.Unlock() + // Dont want to hold the lock while we publish + // otherwise we are holding the lock through a costly + // network operation + + fmt.Println("Publishing!") + return kr.fs.nsys.Publish(ctx, kr.key, k) +} + +// Republisher manages when to publish the ipns entry associated with a given key +type Republisher struct { + TimeoutLong time.Duration + TimeoutShort time.Duration + Publish chan struct{} + root *KeyRoot +} + +// NewRepublisher creates a new Republisher object to republish the given keyroot +// using the given short and long time intervals +func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { + return &Republisher{ + TimeoutShort: tshort, + TimeoutLong: tlong, + Publish: make(chan struct{}, 1), + root: root, + } +} + +// Touch signals that an update has occurred since the last publish. +// Multiple consecutive touches may extend the time period before +// the next Publish occurs in order to more efficiently batch updates +func (np *Republisher) Touch() { + select { + case np.Publish <- struct{}{}: + default: + } +} + +// Run is the main republisher loop +func (np *Republisher) Run(ctx context.Context) { + for { + select { + case <-np.Publish: + quick := time.After(np.TimeoutShort) + longer := time.After(np.TimeoutLong) + + wait: + select { + case <-ctx.Done(): + return + case <-np.Publish: + quick = time.After(np.TimeoutShort) + goto wait + case <-quick: + case <-longer: + } + + log.Info("Publishing Changes!") + err := np.root.Publish(ctx) + if err != nil { + log.Critical("republishRoot error: %s", err) + } + + case <-ctx.Done(): + return + } + } +} diff --git a/routing/dht/lookup.go b/routing/dht/lookup.go index 59ef3911f..1f01a082a 100644 --- a/routing/dht/lookup.go +++ b/routing/dht/lookup.go @@ -6,7 +6,6 @@ import ( peer "github.com/jbenet/go-ipfs/p2p/peer" kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" - errors "github.com/jbenet/go-ipfs/util/debugerror" pset "github.com/jbenet/go-ipfs/util/peerset" ) @@ -26,7 +25,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key u.Key) (<-chan peer e := log.EventBegin(ctx, "getClosestPeers", &key) tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) if len(tablepeers) == 0 { - return nil, errors.Wrap(kb.ErrLookupFailure) + return nil, kb.ErrLookupFailure } out := make(chan peer.ID, KValue)