Merge pull request #923 from jbenet/ipns/ipnsfs

ipnsfs
This commit is contained in:
Juan Batiz-Benet 2015-03-17 22:12:03 -07:00
commit c5fbc6673a
6 changed files with 734 additions and 13 deletions

View File

@ -26,6 +26,7 @@ import (
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
offroute "github.com/jbenet/go-ipfs/routing/offline"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
@ -37,6 +38,7 @@ import (
rp "github.com/jbenet/go-ipfs/exchange/reprovide"
mount "github.com/jbenet/go-ipfs/fuse/mount"
ipnsfs "github.com/jbenet/go-ipfs/ipnsfs"
merkledag "github.com/jbenet/go-ipfs/merkledag"
namesys "github.com/jbenet/go-ipfs/namesys"
path "github.com/jbenet/go-ipfs/path"
@ -89,6 +91,8 @@ type IpfsNode struct {
Diagnostics *diag.Diagnostics // the diagnostics service
Reprovider *rp.Reprovider // the value reprovider system
IpnsFs *ipnsfs.Filesystem
ctxgroup.ContextGroup
mode mode
@ -138,6 +142,16 @@ func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error)
node.Pinning = pin.NewPinner(node.Repo.Datastore(), node.DAG)
}
node.Resolver = &path.Resolver{DAG: node.DAG}
// Setup the mutable ipns filesystem structure
if node.OnlineMode() {
fs, err := ipnsfs.NewFilesystem(ctx, node.DAG, node.Namesys, node.Pinning, node.PrivateKey)
if err != nil && err != kb.ErrLookupFailure {
return nil, debugerror.Wrap(err)
}
node.IpnsFs = fs
}
success = true
return node, nil
}
@ -268,6 +282,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
// setup name system
n.Namesys = namesys.NewNameSystem(n.Routing)
return nil
}
@ -278,21 +293,31 @@ func (n *IpfsNode) teardown() error {
// owned objects are closed in this teardown to ensure that they're closed
// regardless of which constructor was used to add them to the node.
closers := []io.Closer{
n.Blocks,
n.Exchange,
n.Repo,
}
addCloser := func(c io.Closer) { // use when field may be nil
if c != nil {
closers = append(closers, c)
}
// Filesystem needs to be closed before network, dht, and blockservice
// so it can use them as its shutting down
if n.IpnsFs != nil {
closers = append(closers, n.IpnsFs)
}
addCloser(n.Bootstrapper)
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
addCloser(dht)
if n.Blocks != nil {
closers = append(closers, n.Blocks)
}
if n.Bootstrapper != nil {
closers = append(closers, n.Bootstrapper)
}
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht)
}
if n.PeerHost != nil {
closers = append(closers, n.PeerHost)
}
addCloser(n.PeerHost)
var errs []error
for _, closer := range closers {

View File

@ -13,8 +13,9 @@ import (
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
peer "github.com/jbenet/go-ipfs/p2p/peer"
path "github.com/jbenet/go-ipfs/path"
pin "github.com/jbenet/go-ipfs/pin"
"github.com/jbenet/go-ipfs/repo"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
offrt "github.com/jbenet/go-ipfs/routing/offline"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
@ -54,7 +55,7 @@ func NewMockNode() (*IpfsNode, error) {
}
// Routing
nd.Routing = mockrouting.NewServer().Client(ident)
nd.Routing = offrt.NewOfflineRouter(nd.Repo.Datastore(), nd.PrivateKey)
// Bitswap
bstore := blockstore.NewBlockstore(nd.Repo.Datastore())
@ -65,6 +66,8 @@ func NewMockNode() (*IpfsNode, error) {
nd.DAG = mdag.NewDAGService(bserv)
nd.Pinning = pin.NewPinner(nd.Repo.Datastore(), nd.DAG)
// Namespace resolver
nd.Namesys = nsys.NewNameSystem(nd.Routing)

264
ipnsfs/dir.go Normal file
View File

@ -0,0 +1,264 @@
package ipnsfs
import (
"errors"
"fmt"
"os"
"sync"
dag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
ufspb "github.com/jbenet/go-ipfs/unixfs/pb"
)
var ErrNotYetImplemented = errors.New("not yet implemented")
var ErrInvalidChild = errors.New("invalid child node")
type Directory struct {
fs *Filesystem
parent childCloser
childDirs map[string]*Directory
files map[string]*File
lock sync.Mutex
node *dag.Node
name string
}
func NewDirectory(name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory {
return &Directory{
fs: fs,
name: name,
node: node,
parent: parent,
childDirs: make(map[string]*Directory),
files: make(map[string]*File),
}
}
// closeChild updates the child by the given name to the dag node 'nd'
// and changes its own dag node, then propogates the changes upward
func (d *Directory) closeChild(name string, nd *dag.Node) error {
_, err := d.fs.dserv.Add(nd)
if err != nil {
return err
}
d.lock.Lock()
defer d.lock.Unlock()
err = d.node.RemoveNodeLink(name)
if err != nil && err != dag.ErrNotFound {
return err
}
err = d.node.AddNodeLinkClean(name, nd)
if err != nil {
return err
}
return d.parent.closeChild(d.name, d.node)
}
func (d *Directory) Type() NodeType {
return TDir
}
// childFile returns a file under this directory by the given name if it exists
func (d *Directory) childFile(name string) (*File, error) {
fi, ok := d.files[name]
if ok {
return fi, nil
}
nd, err := d.childFromDag(name)
if err != nil {
return nil, err
}
i, err := ft.FromBytes(nd.Data)
if err != nil {
return nil, err
}
switch i.GetType() {
case ufspb.Data_Directory:
return nil, ErrIsDirectory
case ufspb.Data_File:
nfi, err := NewFile(name, nd, d, d.fs)
if err != nil {
return nil, err
}
d.files[name] = nfi
return nfi, nil
case ufspb.Data_Metadata:
return nil, ErrNotYetImplemented
default:
return nil, ErrInvalidChild
}
}
// childDir returns a directory under this directory by the given name if it
// exists.
func (d *Directory) childDir(name string) (*Directory, error) {
dir, ok := d.childDirs[name]
if ok {
return dir, nil
}
nd, err := d.childFromDag(name)
if err != nil {
return nil, err
}
i, err := ft.FromBytes(nd.Data)
if err != nil {
return nil, err
}
switch i.GetType() {
case ufspb.Data_Directory:
ndir := NewDirectory(name, nd, d, d.fs)
d.childDirs[name] = ndir
return ndir, nil
case ufspb.Data_File:
return nil, fmt.Errorf("%s is not a directory", name)
case ufspb.Data_Metadata:
return nil, ErrNotYetImplemented
default:
return nil, ErrInvalidChild
}
}
// childFromDag searches through this directories dag node for a child link
// with the given name
func (d *Directory) childFromDag(name string) (*dag.Node, error) {
for _, lnk := range d.node.Links {
if lnk.Name == name {
return lnk.GetNode(d.fs.dserv)
}
}
return nil, os.ErrNotExist
}
// Child returns the child of this directory by the given name
func (d *Directory) Child(name string) (FSNode, error) {
d.lock.Lock()
defer d.lock.Unlock()
return d.childUnsync(name)
}
// childUnsync returns the child under this directory by the given name
// without locking, useful for operations which already hold a lock
func (d *Directory) childUnsync(name string) (FSNode, error) {
dir, err := d.childDir(name)
if err == nil {
return dir, nil
}
fi, err := d.childFile(name)
if err == nil {
return fi, nil
}
return nil, os.ErrNotExist
}
func (d *Directory) List() []string {
d.lock.Lock()
defer d.lock.Unlock()
var out []string
for _, lnk := range d.node.Links {
out = append(out, lnk.Name)
}
return out
}
func (d *Directory) Mkdir(name string) (*Directory, error) {
d.lock.Lock()
defer d.lock.Unlock()
_, err := d.childDir(name)
if err == nil {
return nil, os.ErrExist
}
_, err = d.childFile(name)
if err == nil {
return nil, os.ErrExist
}
ndir := &dag.Node{Data: ft.FolderPBData()}
err = d.node.AddNodeLinkClean(name, ndir)
if err != nil {
return nil, err
}
err = d.parent.closeChild(d.name, d.node)
if err != nil {
return nil, err
}
return d.childDir(name)
}
func (d *Directory) Unlink(name string) error {
d.lock.Lock()
defer d.lock.Unlock()
delete(d.childDirs, name)
delete(d.files, name)
err := d.node.RemoveNodeLink(name)
if err != nil {
return err
}
return d.parent.closeChild(d.name, d.node)
}
// AddChild adds the node 'nd' under this directory giving it the name 'name'
func (d *Directory) AddChild(name string, nd *dag.Node) error {
d.Lock()
defer d.Unlock()
pbn, err := ft.FromBytes(nd.Data)
if err != nil {
return err
}
_, err = d.childUnsync(name)
if err == nil {
return errors.New("directory already has entry by that name")
}
err = d.node.AddNodeLinkClean(name, nd)
if err != nil {
return err
}
switch pbn.GetType() {
case ft.TDirectory:
d.childDirs[name] = NewDirectory(name, nd, d, d.fs)
case ft.TFile, ft.TMetadata, ft.TRaw:
nfi, err := NewFile(name, nd, d, d.fs)
if err != nil {
return err
}
d.files[name] = nfi
default:
return ErrInvalidChild
}
return d.parent.closeChild(d.name, d.node)
}
func (d *Directory) GetNode() (*dag.Node, error) {
return d.node, nil
}
func (d *Directory) Lock() {
d.lock.Lock()
}
func (d *Directory) Unlock() {
d.lock.Unlock()
}

140
ipnsfs/file.go Normal file
View File

@ -0,0 +1,140 @@
package ipnsfs
import (
"sync"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
mod "github.com/jbenet/go-ipfs/unixfs/mod"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
type File struct {
parent childCloser
fs *Filesystem
name string
hasChanges bool
mod *mod.DagModifier
lock sync.Mutex
}
// NewFile returns a NewFile object with the given parameters
func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) {
dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins.GetManual(), chunk.DefaultSplitter)
if err != nil {
return nil, err
}
return &File{
fs: fs,
parent: parent,
name: name,
mod: dmod,
}, nil
}
// Write writes the given data to the file at its current offset
func (fi *File) Write(b []byte) (int, error) {
fi.Lock()
defer fi.Unlock()
fi.hasChanges = true
return fi.mod.Write(b)
}
// Read reads into the given buffer from the current offset
func (fi *File) Read(b []byte) (int, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.Read(b)
}
// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *File) Close() error {
fi.Lock()
defer fi.Unlock()
if fi.hasChanges {
err := fi.mod.Flush()
if err != nil {
return err
}
nd, err := fi.mod.GetNode()
if err != nil {
return err
}
fi.Unlock()
err = fi.parent.closeChild(fi.name, nd)
fi.Lock()
if err != nil {
return err
}
fi.hasChanges = false
}
return nil
}
// Flush flushes the changes in the file to disk
func (fi *File) Flush() error {
fi.Lock()
defer fi.Unlock()
return fi.mod.Flush()
}
// Seek implements io.Seeker
func (fi *File) Seek(offset int64, whence int) (int64, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.Seek(offset, whence)
}
// Write At writes the given bytes at the offset 'at'
func (fi *File) WriteAt(b []byte, at int64) (int, error) {
fi.Lock()
defer fi.Unlock()
fi.hasChanges = true
return fi.mod.WriteAt(b, at)
}
// Size returns the size of this file
func (fi *File) Size() (int64, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.Size()
}
// GetNode returns the dag node associated with this file
func (fi *File) GetNode() (*dag.Node, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.GetNode()
}
// Truncate truncates the file to size
func (fi *File) Truncate(size int64) error {
fi.Lock()
defer fi.Unlock()
fi.hasChanges = true
return fi.mod.Truncate(size)
}
// Type returns the type FSNode this is
func (fi *File) Type() NodeType {
return TFile
}
// Lock the file
func (fi *File) Lock() {
fi.lock.Lock()
}
// Unlock the file
func (fi *File) Unlock() {
fi.lock.Unlock()
}

290
ipnsfs/system.go Normal file
View File

@ -0,0 +1,290 @@
// package ipnsfs implements an in memory model of a mutable ipns filesystem,
// to be used by the fuse filesystem.
//
// It consists of four main structs:
// 1) The Filesystem
// The filesystem serves as a container and entry point for the ipns filesystem
// 2) KeyRoots
// KeyRoots represent the root of the keyspace controlled by a given keypair
// 3) Directories
// 4) Files
package ipnsfs
import (
"errors"
"fmt"
"os"
"sync"
"time"
dag "github.com/jbenet/go-ipfs/merkledag"
namesys "github.com/jbenet/go-ipfs/namesys"
ci "github.com/jbenet/go-ipfs/p2p/crypto"
pin "github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
)
var log = eventlog.Logger("ipnsfs")
var ErrIsDirectory = errors.New("error: is a directory")
// Filesystem is the writeable fuse filesystem structure
type Filesystem struct {
dserv dag.DAGService
nsys namesys.NameSystem
pins pin.Pinner
roots map[string]*KeyRoot
}
// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys
func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) {
roots := make(map[string]*KeyRoot)
fs := &Filesystem{
roots: roots,
nsys: nsys,
dserv: ds,
pins: pins,
}
for _, k := range keys {
pkh, err := k.GetPublic().Hash()
if err != nil {
return nil, err
}
root, err := fs.newKeyRoot(ctx, k)
if err != nil {
return nil, err
}
roots[u.Key(pkh).Pretty()] = root
}
return fs, nil
}
func (fs *Filesystem) Close() error {
wg := sync.WaitGroup{}
for _, r := range fs.roots {
wg.Add(1)
go func(r *KeyRoot) {
defer wg.Done()
err := r.Publish(context.TODO())
if err != nil {
log.Error(err)
return
}
}(r)
}
wg.Wait()
return nil
}
// GetRoot returns the KeyRoot of the given name
func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) {
r, ok := fs.roots[name]
if ok {
return r, nil
}
return nil, os.ErrNotExist
}
type childCloser interface {
closeChild(string, *dag.Node) error
}
type NodeType int
const (
TFile NodeType = iota
TDir
)
// FSNode represents any node (directory, root, or file) in the ipns filesystem
type FSNode interface {
GetNode() (*dag.Node, error)
Type() NodeType
Lock()
Unlock()
}
// KeyRoot represents the root of a filesystem tree pointed to by a given keypair
type KeyRoot struct {
key ci.PrivKey
// node is the merkledag node pointed to by this keypair
node *dag.Node
// A pointer to the filesystem to access components
fs *Filesystem
// val represents the node pointed to by this key. It can either be a File or a Directory
val FSNode
repub *Republisher
}
// newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine
// for it
func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) {
hash, err := k.GetPublic().Hash()
if err != nil {
return nil, err
}
name := u.Key(hash).Pretty()
root := new(KeyRoot)
root.key = k
root.fs = fs
ctx, cancel := context.WithCancel(parent)
defer cancel()
pointsTo, err := fs.nsys.Resolve(ctx, name)
if err != nil {
err = namesys.InitializeKeyspace(ctx, fs.dserv, fs.nsys, fs.pins, k)
if err != nil {
return nil, err
}
pointsTo, err = fs.nsys.Resolve(ctx, name)
if err != nil {
return nil, err
}
}
mnode, err := fs.dserv.Get(pointsTo)
if err != nil {
return nil, err
}
root.node = mnode
root.repub = NewRepublisher(root, time.Millisecond*300, time.Second*3)
go root.repub.Run(parent)
pbn, err := ft.FromBytes(mnode.Data)
if err != nil {
log.Error("IPNS pointer was not unixfs node")
return nil, err
}
switch pbn.GetType() {
case ft.TDirectory:
root.val = NewDirectory(pointsTo.B58String(), mnode, root, fs)
case ft.TFile, ft.TMetadata, ft.TRaw:
fi, err := NewFile(pointsTo.B58String(), mnode, root, fs)
if err != nil {
return nil, err
}
root.val = fi
default:
panic("unrecognized! (NYI)")
}
return root, nil
}
func (kr *KeyRoot) GetValue() FSNode {
return kr.val
}
// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error {
kr.repub.Touch()
return nil
}
// Publish publishes the ipns entry associated with this key
func (kr *KeyRoot) Publish(ctx context.Context) error {
child, ok := kr.val.(FSNode)
if !ok {
return errors.New("child of key root not valid type")
}
nd, err := child.GetNode()
if err != nil {
return err
}
// Holding this lock so our child doesnt change out from under us
child.Lock()
k, err := kr.fs.dserv.Add(nd)
if err != nil {
child.Unlock()
return err
}
child.Unlock()
// Dont want to hold the lock while we publish
// otherwise we are holding the lock through a costly
// network operation
fmt.Println("Publishing!")
return kr.fs.nsys.Publish(ctx, kr.key, k)
}
// Republisher manages when to publish the ipns entry associated with a given key
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
Publish chan struct{}
root *KeyRoot
}
// NewRepublisher creates a new Republisher object to republish the given keyroot
// using the given short and long time intervals
func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher {
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
Publish: make(chan struct{}, 1),
root: root,
}
}
// Touch signals that an update has occurred since the last publish.
// Multiple consecutive touches may extend the time period before
// the next Publish occurs in order to more efficiently batch updates
func (np *Republisher) Touch() {
select {
case np.Publish <- struct{}{}:
default:
}
}
// Run is the main republisher loop
func (np *Republisher) Run(ctx context.Context) {
for {
select {
case <-np.Publish:
quick := time.After(np.TimeoutShort)
longer := time.After(np.TimeoutLong)
wait:
select {
case <-ctx.Done():
return
case <-np.Publish:
quick = time.After(np.TimeoutShort)
goto wait
case <-quick:
case <-longer:
}
log.Info("Publishing Changes!")
err := np.root.Publish(ctx)
if err != nil {
log.Critical("republishRoot error: %s", err)
}
case <-ctx.Done():
return
}
}
}

View File

@ -6,7 +6,6 @@ import (
peer "github.com/jbenet/go-ipfs/p2p/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
pset "github.com/jbenet/go-ipfs/util/peerset"
)
@ -26,7 +25,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key u.Key) (<-chan peer
e := log.EventBegin(ctx, "getClosestPeers", &key)
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
if len(tablepeers) == 0 {
return nil, errors.Wrap(kb.ErrLookupFailure)
return nil, kb.ErrLookupFailure
}
out := make(chan peer.ID, KValue)