diff --git a/core/core.go b/core/core.go index efbbdcd6d..29d929b1a 100644 --- a/core/core.go +++ b/core/core.go @@ -47,7 +47,6 @@ import ( rp "github.com/ipfs/go-ipfs/exchange/reprovide" mount "github.com/ipfs/go-ipfs/fuse/mount" - ipnsfs "github.com/ipfs/go-ipfs/ipnsfs" merkledag "github.com/ipfs/go-ipfs/merkledag" namesys "github.com/ipfs/go-ipfs/namesys" ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" @@ -107,8 +106,6 @@ type IpfsNode struct { Reprovider *rp.Reprovider // the value reprovider system IpnsRepub *ipnsrp.Republisher - IpnsFs *ipnsfs.Filesystem - proc goprocess.Process ctx context.Context @@ -334,12 +331,6 @@ func (n *IpfsNode) teardown() error { closers = append(closers, mount.Closer(n.Mounts.Ipns)) } - // Filesystem needs to be closed before network, dht, and blockservice - // so it can use them as its shutting down - if n.IpnsFs != nil { - closers = append(closers, n.IpnsFs) - } - if n.Blocks != nil { closers = append(closers, n.Blocks) } diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index fdee57418..c5f8d6a73 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -16,7 +16,7 @@ import ( context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" core "github.com/ipfs/go-ipfs/core" - nsfs "github.com/ipfs/go-ipfs/ipnsfs" + //mfs "github.com/ipfs/go-ipfs/mfs" namesys "github.com/ipfs/go-ipfs/namesys" offroute "github.com/ipfs/go-ipfs/routing/offline" u "github.com/ipfs/go-ipfs/util" @@ -115,12 +115,10 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M node.Routing = offroute.NewOfflineRouter(node.Repo.Datastore(), node.PrivateKey) node.Namesys = namesys.NewNameSystem(node.Routing, node.Repo.Datastore(), 0) - ipnsfs, err := nsfs.NewFilesystem(context.Background(), node.DAG, node.Namesys, node.Pinning, node.PrivateKey) + err = InitializeKeyspace(node, node.PrivateKey) if err != nil { t.Fatal(err) } - - node.IpnsFs = ipnsfs } fs, err := NewFileSystem(node, node.PrivateKey, "", "") diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index c6759531d..18d5255c4 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -17,9 +17,10 @@ import ( key "github.com/ipfs/go-ipfs/blocks/key" core "github.com/ipfs/go-ipfs/core" - nsfs "github.com/ipfs/go-ipfs/ipnsfs" dag "github.com/ipfs/go-ipfs/merkledag" + mfs "github.com/ipfs/go-ipfs/mfs" ci "github.com/ipfs/go-ipfs/p2p/crypto" + path "github.com/ipfs/go-ipfs/path" ft "github.com/ipfs/go-ipfs/unixfs" ) @@ -33,10 +34,15 @@ type FileSystem struct { // NewFileSystem constructs new fs using given core.IpfsNode instance. func NewFileSystem(ipfs *core.IpfsNode, sk ci.PrivKey, ipfspath, ipnspath string) (*FileSystem, error) { - root, err := CreateRoot(ipfs, []ci.PrivKey{sk}, ipfspath, ipnspath) + + kmap := map[string]ci.PrivKey{ + "local": sk, + } + root, err := CreateRoot(ipfs, kmap, ipfspath, ipnspath) if err != nil { return nil, err } + return &FileSystem{Ipfs: ipfs, RootNode: root}, nil } @@ -56,53 +62,95 @@ func (f *FileSystem) Destroy() { // Root is the root object of the filesystem tree. type Root struct { Ipfs *core.IpfsNode - Keys []ci.PrivKey + Keys map[string]ci.PrivKey // Used for symlinking into ipfs IpfsRoot string IpnsRoot string LocalDirs map[string]fs.Node - Roots map[string]*nsfs.KeyRoot + Roots map[string]*keyRoot - fs *nsfs.Filesystem - LocalLink *Link + LocalLinks map[string]*Link } -func CreateRoot(ipfs *core.IpfsNode, keys []ci.PrivKey, ipfspath, ipnspath string) (*Root, error) { +func ipnsPubFunc(ipfs *core.IpfsNode, k ci.PrivKey) mfs.PubFunc { + return func(ctx context.Context, key key.Key) error { + return ipfs.Namesys.Publish(ctx, k, path.FromKey(key)) + } +} + +func loadRoot(ctx context.Context, rt *keyRoot, ipfs *core.IpfsNode, name string) (fs.Node, error) { + p, err := path.ParsePath("/ipns/" + name) + if err != nil { + log.Errorf("mkpath %s: %s", name, err) + return nil, err + } + + node, err := core.Resolve(ctx, ipfs, p) + if err != nil { + log.Errorf("looking up %s: %s", p, err) + return nil, err + } + + root, err := mfs.NewRoot(ctx, ipfs.DAG, node, ipnsPubFunc(ipfs, rt.k)) + if err != nil { + return nil, err + } + + rt.root = root + + switch val := root.GetValue().(type) { + case *mfs.Directory: + return &Directory{dir: val}, nil + case *mfs.File: + return &File{fi: val}, nil + default: + return nil, errors.New("unrecognized type") + } + + panic("not reached") +} + +type keyRoot struct { + k ci.PrivKey + alias string + root *mfs.Root +} + +func CreateRoot(ipfs *core.IpfsNode, keys map[string]ci.PrivKey, ipfspath, ipnspath string) (*Root, error) { ldirs := make(map[string]fs.Node) - roots := make(map[string]*nsfs.KeyRoot) - for _, k := range keys { + roots := make(map[string]*keyRoot) + links := make(map[string]*Link) + for alias, k := range keys { pkh, err := k.GetPublic().Hash() if err != nil { return nil, err } name := key.Key(pkh).B58String() - root, err := ipfs.IpnsFs.GetRoot(name) + + kr := &keyRoot{k: k, alias: alias} + fsn, err := loadRoot(ipfs.Context(), kr, ipfs, name) if err != nil { return nil, err } - roots[name] = root + roots[name] = kr + ldirs[name] = fsn - switch val := root.GetValue().(type) { - case *nsfs.Directory: - ldirs[name] = &Directory{dir: val} - case *nsfs.File: - ldirs[name] = &File{fi: val} - default: - return nil, errors.New("unrecognized type") + // set up alias symlink + links[alias] = &Link{ + Target: name, } } return &Root{ - fs: ipfs.IpnsFs, - Ipfs: ipfs, - IpfsRoot: ipfspath, - IpnsRoot: ipnspath, - Keys: keys, - LocalDirs: ldirs, - LocalLink: &Link{ipfs.Identity.Pretty()}, - Roots: roots, + Ipfs: ipfs, + IpfsRoot: ipfspath, + IpnsRoot: ipnspath, + Keys: keys, + LocalDirs: ldirs, + LocalLinks: links, + Roots: roots, }, nil } @@ -121,12 +169,8 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) { return nil, fuse.ENOENT } - // Local symlink to the node ID keyspace - if name == "local" { - if s.LocalLink == nil { - return nil, fuse.ENOENT - } - return s.LocalLink, nil + if lnk, ok := s.LocalLinks[name]; ok { + return lnk, nil } nd, ok := s.LocalDirs[name] @@ -152,15 +196,15 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) { if segments[0] == "ipfs" { p := strings.Join(resolved.Segments()[1:], "/") return &Link{s.IpfsRoot + "/" + p}, nil - } else { - log.Error("Invalid path.Path: ", resolved) - return nil, errors.New("invalid path from ipns record") } + + log.Error("Invalid path.Path: ", resolved) + return nil, errors.New("invalid path from ipns record") } func (r *Root) Close() error { - for _, kr := range r.Roots { - err := kr.Publish(r.Ipfs.Context()) + for _, mr := range r.Roots { + err := mr.root.Close() if err != nil { return err } @@ -181,13 +225,9 @@ func (r *Root) Forget() { // as well as a symlink to the peerID key func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { log.Debug("Root ReadDirAll") - listing := []fuse.Dirent{ - { - Name: "local", - Type: fuse.DT_Link, - }, - } - for _, k := range r.Keys { + + var listing []fuse.Dirent + for alias, k := range r.Keys { pub := k.GetPublic() hash, err := pub.Hash() if err != nil { @@ -197,21 +237,25 @@ func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { Name: key.Key(hash).Pretty(), Type: fuse.DT_Dir, } - listing = append(listing, ent) + link := fuse.Dirent{ + Name: alias, + Type: fuse.DT_Link, + } + listing = append(listing, ent, link) } return listing, nil } -// Directory is wrapper over an ipnsfs directory to satisfy the fuse fs interface +// Directory is wrapper over an mfs directory to satisfy the fuse fs interface type Directory struct { - dir *nsfs.Directory + dir *mfs.Directory fs.NodeRef } -// File is wrapper over an ipnsfs file to satisfy the fuse fs interface +// File is wrapper over an mfs file to satisfy the fuse fs interface type File struct { - fi *nsfs.File + fi *mfs.File fs.NodeRef } @@ -249,9 +293,9 @@ func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) { } switch child := child.(type) { - case *nsfs.Directory: + case *mfs.Directory: return &Directory{dir: child}, nil - case *nsfs.File: + case *mfs.File: return &File{fi: child}, nil default: // NB: if this happens, we do not want to continue, unpredictable behaviour @@ -263,19 +307,17 @@ func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) { // ReadDirAll reads the link structure as directory entries func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { var entries []fuse.Dirent - for _, name := range dir.dir.List() { - dirent := fuse.Dirent{Name: name} + listing, err := dir.dir.List() + if err != nil { + return nil, err + } + for _, entry := range listing { + dirent := fuse.Dirent{Name: entry.Name} - // TODO: make dir.dir.List() return dirinfos - child, err := dir.dir.Child(name) - if err != nil { - return nil, err - } - - switch child.Type() { - case nsfs.TDir: + switch mfs.NodeType(entry.Type) { + case mfs.TDir: dirent.Type = fuse.DT_Dir - case nsfs.TFile: + case mfs.TFile: dirent.Type = fuse.DT_File } @@ -419,7 +461,7 @@ func (dir *Directory) Create(ctx context.Context, req *fuse.CreateRequest, resp return nil, nil, err } - fi, ok := child.(*nsfs.File) + fi, ok := child.(*mfs.File) if !ok { return nil, nil, errors.New("child creation failed") } diff --git a/fuse/ipns/mount_unix.go b/fuse/ipns/mount_unix.go index 620ce9fa7..57b234db8 100644 --- a/fuse/ipns/mount_unix.go +++ b/fuse/ipns/mount_unix.go @@ -6,7 +6,6 @@ package ipns import ( core "github.com/ipfs/go-ipfs/core" mount "github.com/ipfs/go-ipfs/fuse/mount" - ipnsfs "github.com/ipfs/go-ipfs/ipnsfs" ) // Mount mounts ipns at a given location, and returns a mount.Mount instance. @@ -18,14 +17,6 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) { allow_other := cfg.Mounts.FuseAllowOther - if ipfs.IpnsFs == nil { - fs, err := ipnsfs.NewFilesystem(ipfs.Context(), ipfs.DAG, ipfs.Namesys, ipfs.Pinning, ipfs.PrivateKey) - if err != nil { - return nil, err - } - ipfs.IpnsFs = fs - } - fsys, err := NewFileSystem(ipfs, ipfs.PrivateKey, ipfsmp, ipnsmp) if err != nil { return nil, err diff --git a/ipnsfs/system.go b/ipnsfs/system.go deleted file mode 100644 index 4fe935d03..000000000 --- a/ipnsfs/system.go +++ /dev/null @@ -1,304 +0,0 @@ -// package ipnsfs implements an in memory model of a mutable ipns filesystem, -// to be used by the fuse filesystem. -// -// It consists of four main structs: -// 1) The Filesystem -// The filesystem serves as a container and entry point for the ipns filesystem -// 2) KeyRoots -// KeyRoots represent the root of the keyspace controlled by a given keypair -// 3) Directories -// 4) Files -package ipnsfs - -import ( - "errors" - "os" - "sync" - "time" - - key "github.com/ipfs/go-ipfs/blocks/key" - dag "github.com/ipfs/go-ipfs/merkledag" - namesys "github.com/ipfs/go-ipfs/namesys" - ci "github.com/ipfs/go-ipfs/p2p/crypto" - path "github.com/ipfs/go-ipfs/path" - pin "github.com/ipfs/go-ipfs/pin" - ft "github.com/ipfs/go-ipfs/unixfs" - - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" - logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" -) - -var log = logging.Logger("ipnsfs") - -var ErrIsDirectory = errors.New("error: is a directory") - -// Filesystem is the writeable fuse filesystem structure -type Filesystem struct { - ctx context.Context - - dserv dag.DAGService - - nsys namesys.NameSystem - - resolver *path.Resolver - - pins pin.Pinner - - roots map[string]*KeyRoot -} - -// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys -func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) { - roots := make(map[string]*KeyRoot) - fs := &Filesystem{ - ctx: ctx, - roots: roots, - nsys: nsys, - dserv: ds, - pins: pins, - resolver: &path.Resolver{DAG: ds}, - } - for _, k := range keys { - pkh, err := k.GetPublic().Hash() - if err != nil { - return nil, err - } - - root, err := fs.newKeyRoot(ctx, k) - if err != nil { - return nil, err - } - roots[key.Key(pkh).Pretty()] = root - } - - return fs, nil -} - -func (fs *Filesystem) Close() error { - wg := sync.WaitGroup{} - for _, r := range fs.roots { - wg.Add(1) - go func(r *KeyRoot) { - defer wg.Done() - err := r.Publish(fs.ctx) - if err != nil { - log.Info(err) - return - } - }(r) - } - wg.Wait() - return nil -} - -// GetRoot returns the KeyRoot of the given name -func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) { - r, ok := fs.roots[name] - if ok { - return r, nil - } - return nil, os.ErrNotExist -} - -type childCloser interface { - closeChild(string, *dag.Node) error -} - -type NodeType int - -const ( - TFile NodeType = iota - TDir -) - -// FSNode represents any node (directory, root, or file) in the ipns filesystem -type FSNode interface { - GetNode() (*dag.Node, error) - Type() NodeType - Lock() - Unlock() -} - -// KeyRoot represents the root of a filesystem tree pointed to by a given keypair -type KeyRoot struct { - key ci.PrivKey - name string - - // node is the merkledag node pointed to by this keypair - node *dag.Node - - // A pointer to the filesystem to access components - fs *Filesystem - - // val represents the node pointed to by this key. It can either be a File or a Directory - val FSNode - - repub *Republisher -} - -// newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine -// for it -func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) { - hash, err := k.GetPublic().Hash() - if err != nil { - return nil, err - } - - name := "/ipns/" + key.Key(hash).String() - - root := new(KeyRoot) - root.key = k - root.fs = fs - root.name = name - - ctx, cancel := context.WithCancel(parent) - defer cancel() - - pointsTo, err := fs.nsys.Resolve(ctx, name) - if err != nil { - err = namesys.InitializeKeyspace(ctx, fs.dserv, fs.nsys, fs.pins, k) - if err != nil { - return nil, err - } - - pointsTo, err = fs.nsys.Resolve(ctx, name) - if err != nil { - return nil, err - } - } - - mnode, err := fs.resolver.ResolvePath(ctx, pointsTo) - if err != nil { - log.Errorf("Failed to retrieve value '%s' for ipns entry: %s\n", pointsTo, err) - return nil, err - } - - root.node = mnode - - root.repub = NewRepublisher(root, time.Millisecond*300, time.Second*3) - go root.repub.Run(parent) - - pbn, err := ft.FromBytes(mnode.Data) - if err != nil { - log.Error("IPNS pointer was not unixfs node") - return nil, err - } - - switch pbn.GetType() { - case ft.TDirectory: - root.val = NewDirectory(ctx, pointsTo.String(), mnode, root, fs) - case ft.TFile, ft.TMetadata, ft.TRaw: - fi, err := NewFile(pointsTo.String(), mnode, root, fs) - if err != nil { - return nil, err - } - root.val = fi - default: - panic("unrecognized! (NYI)") - } - return root, nil -} - -func (kr *KeyRoot) GetValue() FSNode { - return kr.val -} - -// closeChild implements the childCloser interface, and signals to the publisher that -// there are changes ready to be published -func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error { - kr.repub.Touch() - return nil -} - -// Publish publishes the ipns entry associated with this key -func (kr *KeyRoot) Publish(ctx context.Context) error { - child, ok := kr.val.(FSNode) - if !ok { - return errors.New("child of key root not valid type") - } - - nd, err := child.GetNode() - if err != nil { - return err - } - - // Holding this lock so our child doesnt change out from under us - child.Lock() - k, err := kr.fs.dserv.Add(nd) - if err != nil { - child.Unlock() - return err - } - child.Unlock() - // Dont want to hold the lock while we publish - // otherwise we are holding the lock through a costly - // network operation - - kp := path.FromKey(k) - - ev := &logging.Metadata{"name": kr.name, "key": kp} - defer log.EventBegin(ctx, "ipnsfsPublishing", ev).Done() - log.Info("ipnsfs publishing %s -> %s", kr.name, kp) - - return kr.fs.nsys.Publish(ctx, kr.key, kp) -} - -// Republisher manages when to publish the ipns entry associated with a given key -type Republisher struct { - TimeoutLong time.Duration - TimeoutShort time.Duration - Publish chan struct{} - root *KeyRoot -} - -// NewRepublisher creates a new Republisher object to republish the given keyroot -// using the given short and long time intervals -func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher { - return &Republisher{ - TimeoutShort: tshort, - TimeoutLong: tlong, - Publish: make(chan struct{}, 1), - root: root, - } -} - -// Touch signals that an update has occurred since the last publish. -// Multiple consecutive touches may extend the time period before -// the next Publish occurs in order to more efficiently batch updates -func (np *Republisher) Touch() { - select { - case np.Publish <- struct{}{}: - default: - } -} - -// Run is the main republisher loop -func (np *Republisher) Run(ctx context.Context) { - for { - select { - case <-np.Publish: - quick := time.After(np.TimeoutShort) - longer := time.After(np.TimeoutLong) - - wait: - select { - case <-ctx.Done(): - return - case <-np.Publish: - quick = time.After(np.TimeoutShort) - goto wait - case <-quick: - case <-longer: - } - - log.Info("Publishing Changes!") - err := np.root.Publish(ctx) - if err != nil { - log.Error("republishRoot error: %s", err) - } - - case <-ctx.Done(): - return - } - } -} diff --git a/ipnsfs/dir.go b/mfs/dir.go similarity index 80% rename from ipnsfs/dir.go rename to mfs/dir.go index a7e264f96..c33032baf 100644 --- a/ipnsfs/dir.go +++ b/mfs/dir.go @@ -1,4 +1,4 @@ -package ipnsfs +package mfs import ( "errors" @@ -15,9 +15,10 @@ import ( var ErrNotYetImplemented = errors.New("not yet implemented") var ErrInvalidChild = errors.New("invalid child node") +var ErrDirExists = errors.New("directory already has entry by that name") type Directory struct { - fs *Filesystem + dserv dag.DAGService parent childCloser childDirs map[string]*Directory @@ -30,10 +31,10 @@ type Directory struct { name string } -func NewDirectory(ctx context.Context, name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory { +func NewDirectory(ctx context.Context, name string, node *dag.Node, parent childCloser, dserv dag.DAGService) *Directory { return &Directory{ + dserv: dserv, ctx: ctx, - fs: fs, name: name, node: node, parent: parent, @@ -45,7 +46,7 @@ func NewDirectory(ctx context.Context, name string, node *dag.Node, parent child // closeChild updates the child by the given name to the dag node 'nd' // and changes its own dag node, then propogates the changes upward func (d *Directory) closeChild(name string, nd *dag.Node) error { - _, err := d.fs.dserv.Add(nd) + _, err := d.dserv.Add(nd) if err != nil { return err } @@ -89,7 +90,7 @@ func (d *Directory) childFile(name string) (*File, error) { case ufspb.Data_Directory: return nil, ErrIsDirectory case ufspb.Data_File: - nfi, err := NewFile(name, nd, d, d.fs) + nfi, err := NewFile(name, nd, d, d.dserv) if err != nil { return nil, err } @@ -122,7 +123,7 @@ func (d *Directory) childDir(name string) (*Directory, error) { switch i.GetType() { case ufspb.Data_Directory: - ndir := NewDirectory(d.ctx, name, nd, d, d.fs) + ndir := NewDirectory(d.ctx, name, nd, d, d.dserv) d.childDirs[name] = ndir return ndir, nil case ufspb.Data_File: @@ -139,7 +140,7 @@ func (d *Directory) childDir(name string) (*Directory, error) { func (d *Directory) childFromDag(name string) (*dag.Node, error) { for _, lnk := range d.node.Links { if lnk.Name == name { - return lnk.GetNode(d.ctx, d.fs.dserv) + return lnk.GetNode(d.ctx, d.dserv) } } @@ -156,6 +157,7 @@ func (d *Directory) Child(name string) (FSNode, error) { // childUnsync returns the child under this directory by the given name // without locking, useful for operations which already hold a lock func (d *Directory) childUnsync(name string) (FSNode, error) { + dir, err := d.childDir(name) if err == nil { return dir, nil @@ -168,15 +170,51 @@ func (d *Directory) childUnsync(name string) (FSNode, error) { return nil, os.ErrNotExist } -func (d *Directory) List() []string { +type NodeListing struct { + Name string + Type int + Size int64 + Hash string +} + +func (d *Directory) List() ([]NodeListing, error) { d.lock.Lock() defer d.lock.Unlock() - var out []string - for _, lnk := range d.node.Links { - out = append(out, lnk.Name) + var out []NodeListing + for _, l := range d.node.Links { + child := NodeListing{} + child.Name = l.Name + + c, err := d.childUnsync(l.Name) + if err != nil { + return nil, err + } + + child.Type = int(c.Type()) + if c, ok := c.(*File); ok { + size, err := c.Size() + if err != nil { + return nil, err + } + child.Size = size + } + nd, err := c.GetNode() + if err != nil { + return nil, err + } + + k, err := nd.Key() + if err != nil { + return nil, err + } + + child.Hash = k.B58String() + + out = append(out, child) } - return out + + return out, nil } func (d *Directory) Mkdir(name string) (*Directory, error) { @@ -193,6 +231,12 @@ func (d *Directory) Mkdir(name string) (*Directory, error) { } ndir := &dag.Node{Data: ft.FolderPBData()} + + _, err = d.dserv.Add(ndir) + if err != nil { + return nil, err + } + err = d.node.AddNodeLinkClean(name, ndir) if err != nil { return nil, err @@ -225,6 +269,7 @@ func (d *Directory) Unlink(name string) error { func (d *Directory) AddChild(name string, nd *dag.Node) error { d.Lock() defer d.Unlock() + pbn, err := ft.FromBytes(nd.Data) if err != nil { return err @@ -232,7 +277,7 @@ func (d *Directory) AddChild(name string, nd *dag.Node) error { _, err = d.childUnsync(name) if err == nil { - return errors.New("directory already has entry by that name") + return ErrDirExists } err = d.node.AddNodeLinkClean(name, nd) @@ -242,9 +287,9 @@ func (d *Directory) AddChild(name string, nd *dag.Node) error { switch pbn.GetType() { case ft.TDirectory: - d.childDirs[name] = NewDirectory(d.ctx, name, nd, d, d.fs) + d.childDirs[name] = NewDirectory(d.ctx, name, nd, d, d.dserv) case ft.TFile, ft.TMetadata, ft.TRaw: - nfi, err := NewFile(name, nd, d, d.fs) + nfi, err := NewFile(name, nd, d, d.dserv) if err != nil { return err } diff --git a/ipnsfs/file.go b/mfs/file.go similarity index 91% rename from ipnsfs/file.go rename to mfs/file.go index b6dc9108b..fea1112dc 100644 --- a/ipnsfs/file.go +++ b/mfs/file.go @@ -1,4 +1,4 @@ -package ipnsfs +package mfs import ( "sync" @@ -12,7 +12,6 @@ import ( type File struct { parent childCloser - fs *Filesystem name string hasChanges bool @@ -22,14 +21,13 @@ type File struct { } // NewFile returns a NewFile object with the given parameters -func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) { - dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins, chunk.DefaultSplitter) +func NewFile(name string, node *dag.Node, parent childCloser, dserv dag.DAGService) (*File, error) { + dmod, err := mod.NewDagModifier(context.Background(), node, dserv, chunk.DefaultSplitter) if err != nil { return nil, err } return &File{ - fs: fs, parent: parent, name: name, mod: dmod, diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go new file mode 100644 index 000000000..609d81a29 --- /dev/null +++ b/mfs/mfs_test.go @@ -0,0 +1,476 @@ +package mfs + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "sort" + "strings" + "testing" + + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + + bstore "github.com/ipfs/go-ipfs/blocks/blockstore" + key "github.com/ipfs/go-ipfs/blocks/key" + bserv "github.com/ipfs/go-ipfs/blockservice" + offline "github.com/ipfs/go-ipfs/exchange/offline" + importer "github.com/ipfs/go-ipfs/importer" + chunk "github.com/ipfs/go-ipfs/importer/chunk" + dag "github.com/ipfs/go-ipfs/merkledag" + ft "github.com/ipfs/go-ipfs/unixfs" + uio "github.com/ipfs/go-ipfs/unixfs/io" + u "github.com/ipfs/go-ipfs/util" +) + +func getDagserv(t *testing.T) dag.DAGService { + db := dssync.MutexWrap(ds.NewMapDatastore()) + bs := bstore.NewBlockstore(db) + blockserv := bserv.New(bs, offline.Exchange(bs)) + return dag.NewDAGService(blockserv) +} + +func getRandFile(t *testing.T, ds dag.DAGService, size int64) *dag.Node { + r := io.LimitReader(u.NewTimeSeededRand(), size) + nd, err := importer.BuildDagFromReader(ds, chunk.DefaultSplitter(r)) + if err != nil { + t.Fatal(err) + } + return nd +} + +func mkdirP(t *testing.T, root *Directory, path string) *Directory { + dirs := strings.Split(path, "/") + cur := root + for _, d := range dirs { + n, err := cur.Mkdir(d) + if err != nil && err != os.ErrExist { + t.Fatal(err) + } + if err == os.ErrExist { + fsn, err := cur.Child(d) + if err != nil { + t.Fatal(err) + } + switch fsn := fsn.(type) { + case *Directory: + n = fsn + case *File: + t.Fatal("tried to make a directory where a file already exists") + } + } + + cur = n + } + return cur +} + +func assertDirAtPath(root *Directory, path string, children []string) error { + fsn, err := DirLookup(root, path) + if err != nil { + return err + } + + dir, ok := fsn.(*Directory) + if !ok { + return fmt.Errorf("%s was not a directory", path) + } + + listing, err := dir.List() + if err != nil { + return err + } + + var names []string + for _, d := range listing { + names = append(names, d.Name) + } + + sort.Strings(children) + sort.Strings(names) + if !compStrArrs(children, names) { + return errors.New("directories children did not match!") + } + + return nil +} + +func compStrArrs(a, b []string) bool { + if len(a) != len(b) { + return false + } + + for i := 0; i < len(a); i++ { + if a[i] != b[i] { + return false + } + } + + return true +} + +func assertFileAtPath(ds dag.DAGService, root *Directory, exp *dag.Node, path string) error { + parts := strings.Split(path, "/") + cur := root + for i, d := range parts[:len(parts)-1] { + next, err := cur.Child(d) + if err != nil { + return fmt.Errorf("looking for %s failed: %s", path, err) + } + + nextDir, ok := next.(*Directory) + if !ok { + return fmt.Errorf("%s points to a non-directory", parts[:i+1]) + } + + cur = nextDir + } + + last := parts[len(parts)-1] + finaln, err := cur.Child(last) + if err != nil { + return err + } + + file, ok := finaln.(*File) + if !ok { + return fmt.Errorf("%s was not a file!", path) + } + + out, err := ioutil.ReadAll(file) + if err != nil { + return err + } + + expbytes, err := catNode(ds, exp) + if err != nil { + return err + } + + if !bytes.Equal(out, expbytes) { + return fmt.Errorf("Incorrect data at path!") + } + return nil +} + +func catNode(ds dag.DAGService, nd *dag.Node) ([]byte, error) { + r, err := uio.NewDagReader(context.TODO(), nd, ds) + if err != nil { + return nil, err + } + defer r.Close() + + return ioutil.ReadAll(r) +} + +func setupRoot(ctx context.Context, t *testing.T) (dag.DAGService, *Root) { + ds := getDagserv(t) + + root := &dag.Node{Data: ft.FolderPBData()} + rt, err := NewRoot(ctx, ds, root, func(ctx context.Context, k key.Key) error { + fmt.Println("PUBLISHED: ", k) + return nil + }) + + if err != nil { + t.Fatal(err) + } + + return ds, rt +} + +func TestBasic(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ds, rt := setupRoot(ctx, t) + + rootdir := rt.GetValue().(*Directory) + + // test making a basic dir + _, err := rootdir.Mkdir("a") + if err != nil { + t.Fatal(err) + } + + path := "a/b/c/d/e/f/g" + d := mkdirP(t, rootdir, path) + + fi := getRandFile(t, ds, 1000) + + // test inserting that file + err = d.AddChild("afile", fi) + if err != nil { + t.Fatal(err) + } + + err = assertFileAtPath(ds, rootdir, fi, "a/b/c/d/e/f/g/afile") + if err != nil { + t.Fatal(err) + } +} + +func TestMkdir(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, rt := setupRoot(ctx, t) + + rootdir := rt.GetValue().(*Directory) + + dirsToMake := []string{"a", "B", "foo", "bar", "cats", "fish"} + sort.Strings(dirsToMake) // sort for easy comparing later + + for _, d := range dirsToMake { + _, err := rootdir.Mkdir(d) + if err != nil { + t.Fatal(err) + } + } + + err := assertDirAtPath(rootdir, "/", dirsToMake) + if err != nil { + t.Fatal(err) + } + + for _, d := range dirsToMake { + mkdirP(t, rootdir, "a/"+d) + } + + err = assertDirAtPath(rootdir, "/a", dirsToMake) + if err != nil { + t.Fatal(err) + } + + // mkdir over existing dir should fail + _, err = rootdir.Mkdir("a") + if err == nil { + t.Fatal("should have failed!") + } +} + +func TestDirectoryLoadFromDag(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ds, rt := setupRoot(ctx, t) + + rootdir := rt.GetValue().(*Directory) + + nd := getRandFile(t, ds, 1000) + _, err := ds.Add(nd) + if err != nil { + t.Fatal(err) + } + + fihash, err := nd.Multihash() + if err != nil { + t.Fatal(err) + } + + dir := &dag.Node{Data: ft.FolderPBData()} + _, err = ds.Add(dir) + if err != nil { + t.Fatal(err) + } + + dirhash, err := dir.Multihash() + if err != nil { + t.Fatal(err) + } + + top := &dag.Node{ + Data: ft.FolderPBData(), + Links: []*dag.Link{ + &dag.Link{ + Name: "a", + Hash: fihash, + }, + &dag.Link{ + Name: "b", + Hash: dirhash, + }, + }, + } + + err = rootdir.AddChild("foo", top) + if err != nil { + t.Fatal(err) + } + + // get this dir + topi, err := rootdir.Child("foo") + if err != nil { + t.Fatal(err) + } + + topd := topi.(*Directory) + + // mkdir over existing but unloaded child file should fail + _, err = topd.Mkdir("a") + if err == nil { + t.Fatal("expected to fail!") + } + + // mkdir over existing but unloaded child dir should fail + _, err = topd.Mkdir("b") + if err == nil { + t.Fatal("expected to fail!") + } + + // adding a child over an existing path fails + err = topd.AddChild("b", nd) + if err == nil { + t.Fatal("expected to fail!") + } + + err = assertFileAtPath(ds, rootdir, nd, "foo/a") + if err != nil { + t.Fatal(err) + } + + err = assertDirAtPath(rootdir, "foo/b", nil) + if err != nil { + t.Fatal(err) + } + + err = rootdir.Unlink("foo") + if err != nil { + t.Fatal(err) + } + + err = assertDirAtPath(rootdir, "", nil) + if err != nil { + t.Fatal(err) + } +} + +func TestMfsFile(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ds, rt := setupRoot(ctx, t) + + rootdir := rt.GetValue().(*Directory) + + fisize := 1000 + nd := getRandFile(t, ds, 1000) + + err := rootdir.AddChild("file", nd) + if err != nil { + t.Fatal(err) + } + + fsn, err := rootdir.Child("file") + if err != nil { + t.Fatal(err) + } + + fi := fsn.(*File) + + if fi.Type() != TFile { + t.Fatal("some is seriously wrong here") + } + + // assert size is as expected + size, err := fi.Size() + if size != int64(fisize) { + t.Fatal("size isnt correct") + } + + // write to beginning of file + b := []byte("THIS IS A TEST") + n, err := fi.Write(b) + if err != nil { + t.Fatal(err) + } + + if n != len(b) { + t.Fatal("didnt write correct number of bytes") + } + + // sync file + err = fi.Sync() + if err != nil { + t.Fatal(err) + } + + // make sure size hasnt changed + size, err = fi.Size() + if size != int64(fisize) { + t.Fatal("size isnt correct") + } + + // seek back to beginning + ns, err := fi.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + + if ns != 0 { + t.Fatal("didnt seek to beginning") + } + + // read back bytes we wrote + buf := make([]byte, len(b)) + n, err = fi.Read(buf) + if err != nil { + t.Fatal(err) + } + + if n != len(buf) { + t.Fatal("didnt read enough") + } + + if !bytes.Equal(buf, b) { + t.Fatal("data read was different than data written") + } + + // truncate file to ten bytes + err = fi.Truncate(10) + if err != nil { + t.Fatal(err) + } + + size, err = fi.Size() + if err != nil { + t.Fatal(err) + } + + if size != 10 { + t.Fatal("size was incorrect: ", size) + } + + // 'writeAt' to extend it + data := []byte("this is a test foo foo foo") + nwa, err := fi.WriteAt(data, 5) + if err != nil { + t.Fatal(err) + } + + if nwa != len(data) { + t.Fatal(err) + } + + // assert size once more + size, err = fi.Size() + if err != nil { + t.Fatal(err) + } + + if size != int64(5+len(data)) { + t.Fatal("size was incorrect") + } + + // make sure we can get node. TODO: verify it later + _, err = fi.GetNode() + if err != nil { + t.Fatal(err) + } + + // close it out! + err = fi.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/mfs/ops.go b/mfs/ops.go new file mode 100644 index 000000000..75f187f52 --- /dev/null +++ b/mfs/ops.go @@ -0,0 +1,43 @@ +package mfs + +import ( + "errors" + "fmt" + "strings" +) + +func rootLookup(r *Root, path string) (FSNode, error) { + dir, ok := r.GetValue().(*Directory) + if !ok { + return nil, errors.New("root was not a directory") + } + + return DirLookup(dir, path) +} + +// DirLookup will look up a file or directory at the given path +// under the directory 'd' +func DirLookup(d *Directory, path string) (FSNode, error) { + path = strings.Trim(path, "/") + parts := strings.Split(path, "/") + if len(parts) == 1 && parts[0] == "" { + return d, nil + } + + var cur FSNode + cur = d + for i, p := range parts { + chdir, ok := cur.(*Directory) + if !ok { + return nil, fmt.Errorf("cannot access %s: Not a directory", strings.Join(parts[:i+1], "/")) + } + + child, err := chdir.Child(p) + if err != nil { + return nil, err + } + + cur = child + } + return cur, nil +} diff --git a/mfs/repub_test.go b/mfs/repub_test.go new file mode 100644 index 000000000..36db90e80 --- /dev/null +++ b/mfs/repub_test.go @@ -0,0 +1,78 @@ +package mfs + +import ( + "testing" + "time" + + key "github.com/ipfs/go-ipfs/blocks/key" + ci "github.com/ipfs/go-ipfs/util/testutil/ci" + + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +func TestRepublisher(t *testing.T) { + if ci.IsRunning() { + t.Skip("dont run timing tests in CI") + } + + ctx := context.TODO() + + pub := make(chan struct{}) + + pf := func(ctx context.Context, k key.Key) error { + pub <- struct{}{} + return nil + } + + tshort := time.Millisecond * 50 + tlong := time.Second / 2 + + rp := NewRepublisher(ctx, pf, tshort, tlong) + go rp.Run() + + rp.Update("test") + + // should hit short timeout + select { + case <-time.After(tshort * 2): + t.Fatal("publish didnt happen in time") + case <-pub: + } + + cctx, cancel := context.WithCancel(context.Background()) + + go func() { + for { + rp.Update("a") + time.Sleep(time.Millisecond * 10) + select { + case <-cctx.Done(): + return + default: + } + } + }() + + select { + case <-pub: + t.Fatal("shouldnt have received publish yet!") + case <-time.After((tlong * 9) / 10): + } + select { + case <-pub: + case <-time.After(tlong / 2): + t.Fatal("waited too long for pub!") + } + + cancel() + + go func() { + err := rp.Close() + if err != nil { + t.Fatal(err) + } + }() + + // final pub from closing + <-pub +} diff --git a/mfs/system.go b/mfs/system.go new file mode 100644 index 000000000..d2819479f --- /dev/null +++ b/mfs/system.go @@ -0,0 +1,237 @@ +// package mfs implements an in memory model of a mutable ipfs filesystem. +// +// It consists of four main structs: +// 1) The Filesystem +// The filesystem serves as a container and entry point for various mfs filesystems +// 2) Root +// Root represents an individual filesystem mounted within the mfs system as a whole +// 3) Directories +// 4) Files +package mfs + +import ( + "errors" + "sync" + "time" + + key "github.com/ipfs/go-ipfs/blocks/key" + dag "github.com/ipfs/go-ipfs/merkledag" + ft "github.com/ipfs/go-ipfs/unixfs" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" +) + +var ErrNotExist = errors.New("no such rootfs") + +var log = logging.Logger("mfs") + +var ErrIsDirectory = errors.New("error: is a directory") + +type childCloser interface { + closeChild(string, *dag.Node) error +} + +type NodeType int + +const ( + TFile NodeType = iota + TDir +) + +// FSNode represents any node (directory, root, or file) in the ipns filesystem +type FSNode interface { + GetNode() (*dag.Node, error) + Type() NodeType + Lock() + Unlock() +} + +// Root represents the root of a filesystem tree pointed to by a given keypair +type Root struct { + // node is the merkledag node pointed to by this keypair + node *dag.Node + + // val represents the node pointed to by this key. It can either be a File or a Directory + val FSNode + + repub *Republisher + + dserv dag.DAGService + + Type string +} + +type PubFunc func(context.Context, key.Key) error + +// newRoot creates a new Root for the given key, and starts up a republisher routine +// for it +func NewRoot(parent context.Context, ds dag.DAGService, node *dag.Node, pf PubFunc) (*Root, error) { + ndk, err := node.Key() + if err != nil { + return nil, err + } + + root := &Root{ + node: node, + repub: NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3), + dserv: ds, + } + + root.repub.setVal(ndk) + go root.repub.Run() + + pbn, err := ft.FromBytes(node.Data) + if err != nil { + log.Error("IPNS pointer was not unixfs node") + return nil, err + } + + switch pbn.GetType() { + case ft.TDirectory: + root.val = NewDirectory(parent, ndk.String(), node, root, ds) + case ft.TFile, ft.TMetadata, ft.TRaw: + fi, err := NewFile(ndk.String(), node, root, ds) + if err != nil { + return nil, err + } + root.val = fi + default: + panic("unrecognized! (NYI)") + } + return root, nil +} + +func (kr *Root) GetValue() FSNode { + return kr.val +} + +// closeChild implements the childCloser interface, and signals to the publisher that +// there are changes ready to be published +func (kr *Root) closeChild(name string, nd *dag.Node) error { + k, err := kr.dserv.Add(nd) + if err != nil { + return err + } + + kr.repub.Update(k) + return nil +} + +func (kr *Root) Close() error { + return kr.repub.Close() +} + +// Republisher manages when to publish the ipns entry associated with a given key +type Republisher struct { + TimeoutLong time.Duration + TimeoutShort time.Duration + Publish chan struct{} + pubfunc PubFunc + pubnowch chan struct{} + + ctx context.Context + cancel func() + + lk sync.Mutex + val key.Key + lastpub key.Key +} + +func (rp *Republisher) getVal() key.Key { + rp.lk.Lock() + defer rp.lk.Unlock() + return rp.val +} + +// NewRepublisher creates a new Republisher object to republish the given keyroot +// using the given short and long time intervals +func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher { + ctx, cancel := context.WithCancel(ctx) + return &Republisher{ + TimeoutShort: tshort, + TimeoutLong: tlong, + Publish: make(chan struct{}, 1), + pubfunc: pf, + pubnowch: make(chan struct{}), + ctx: ctx, + cancel: cancel, + } +} + +func (p *Republisher) setVal(k key.Key) { + p.lk.Lock() + defer p.lk.Unlock() + p.val = k +} + +func (p *Republisher) pubNow() { + select { + case p.pubnowch <- struct{}{}: + default: + } +} + +func (p *Republisher) Close() error { + err := p.publish(p.ctx) + p.cancel() + return err +} + +// Touch signals that an update has occurred since the last publish. +// Multiple consecutive touches may extend the time period before +// the next Publish occurs in order to more efficiently batch updates +func (np *Republisher) Update(k key.Key) { + np.setVal(k) + select { + case np.Publish <- struct{}{}: + default: + } +} + +// Run is the main republisher loop +func (np *Republisher) Run() { + for { + select { + case <-np.Publish: + quick := time.After(np.TimeoutShort) + longer := time.After(np.TimeoutLong) + + wait: + select { + case <-np.ctx.Done(): + return + case <-np.Publish: + quick = time.After(np.TimeoutShort) + goto wait + case <-quick: + case <-longer: + case <-np.pubnowch: + } + + err := np.publish(np.ctx) + if err != nil { + log.Error("republishRoot error: %s", err) + } + + case <-np.ctx.Done(): + return + } + } +} + +func (np *Republisher) publish(ctx context.Context) error { + np.lk.Lock() + topub := np.val + np.lk.Unlock() + + log.Info("Publishing Changes!") + err := np.pubfunc(ctx, topub) + if err != nil { + return err + } + np.lk.Lock() + np.lastpub = topub + np.lk.Unlock() + return nil +} diff --git a/unixfs/format.go b/unixfs/format.go index 9193ddede..472a575e7 100644 --- a/unixfs/format.go +++ b/unixfs/format.go @@ -67,6 +67,7 @@ func WrapData(b []byte) []byte { typ := pb.Data_Raw pbdata.Data = b pbdata.Type = &typ + pbdata.Filesize = proto.Uint64(uint64(len(b))) out, err := proto.Marshal(pbdata) if err != nil { diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index 481005c2f..3c6a110f6 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -15,7 +15,6 @@ import ( help "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" mdag "github.com/ipfs/go-ipfs/merkledag" - pin "github.com/ipfs/go-ipfs/pin" ft "github.com/ipfs/go-ipfs/unixfs" uio "github.com/ipfs/go-ipfs/unixfs/io" logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log" @@ -36,7 +35,6 @@ var log = logging.Logger("dagio") type DagModifier struct { dagserv mdag.DAGService curNode *mdag.Node - mp pin.Pinner splitter chunk.SplitterGen ctx context.Context @@ -49,13 +47,12 @@ type DagModifier struct { read *uio.DagReader } -func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.Pinner, spl chunk.SplitterGen) (*DagModifier, error) { +func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) { return &DagModifier{ curNode: from.Copy(), dagserv: serv, splitter: spl, ctx: ctx, - mp: mp, }, nil } @@ -174,7 +171,7 @@ func (dm *DagModifier) Sync() error { buflen := dm.wrBuf.Len() // Grab key for unpinning after mod operation - curk, err := dm.curNode.Key() + _, err := dm.curNode.Key() if err != nil { return err } @@ -208,15 +205,6 @@ func (dm *DagModifier) Sync() error { dm.curNode = nd } - // Finalize correct pinning, and flush pinner. - // Be careful about the order, as curk might equal thisk. - dm.mp.RemovePinWithMode(curk, pin.Recursive) - dm.mp.PinWithMode(thisk, pin.Recursive) - err = dm.mp.Flush() - if err != nil { - return err - } - dm.writeStart += uint64(buflen) dm.wrBuf = nil diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index 48be0545e..6f53a90d1 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "os" "testing" @@ -17,8 +16,6 @@ import ( h "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" mdag "github.com/ipfs/go-ipfs/merkledag" - pin "github.com/ipfs/go-ipfs/pin" - gc "github.com/ipfs/go-ipfs/pin/gc" ft "github.com/ipfs/go-ipfs/unixfs" uio "github.com/ipfs/go-ipfs/unixfs/io" u "github.com/ipfs/go-ipfs/util" @@ -27,25 +24,24 @@ import ( context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) -func getMockDagServ(t testing.TB) (mdag.DAGService, pin.Pinner) { +func getMockDagServ(t testing.TB) mdag.DAGService { + dstore := ds.NewMapDatastore() + tsds := sync.MutexWrap(dstore) + bstore := blockstore.NewBlockstore(tsds) + bserv := bs.New(bstore, offline.Exchange(bstore)) + return mdag.NewDAGService(bserv) +} + +func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore) { dstore := ds.NewMapDatastore() tsds := sync.MutexWrap(dstore) bstore := blockstore.NewBlockstore(tsds) bserv := bs.New(bstore, offline.Exchange(bstore)) dserv := mdag.NewDAGService(bserv) - return dserv, pin.NewPinner(tsds, dserv) + return dserv, bstore } -func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore, pin.Pinner) { - dstore := ds.NewMapDatastore() - tsds := sync.MutexWrap(dstore) - bstore := blockstore.NewBlockstore(tsds) - bserv := bs.New(bstore, offline.Exchange(bstore)) - dserv := mdag.NewDAGService(bserv) - return dserv, bstore, pin.NewPinner(tsds, dserv) -} - -func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.Pinner) ([]byte, *mdag.Node) { +func getNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) { in := io.LimitReader(u.NewTimeSeededRand(), size) node, err := imp.BuildTrickleDagFromReader(dserv, sizeSplitterGen(500)(in)) if err != nil { @@ -118,12 +114,12 @@ func sizeSplitterGen(size int64) chunk.SplitterGen { } func TestDagModifierBasic(t *testing.T) { - dserv, pin := getMockDagServ(t) - b, n := getNode(t, dserv, 50000, pin) + dserv := getMockDagServ(t) + b, n := getNode(t, dserv, 50000) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pin, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -172,13 +168,13 @@ func TestDagModifierBasic(t *testing.T) { } func TestMultiWrite(t *testing.T) { - dserv, pins := getMockDagServ(t) - _, n := getNode(t, dserv, 0, pins) + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -225,13 +221,13 @@ func TestMultiWrite(t *testing.T) { } func TestMultiWriteAndFlush(t *testing.T) { - dserv, pins := getMockDagServ(t) - _, n := getNode(t, dserv, 0, pins) + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -273,13 +269,13 @@ func TestMultiWriteAndFlush(t *testing.T) { } func TestWriteNewFile(t *testing.T) { - dserv, pins := getMockDagServ(t) - _, n := getNode(t, dserv, 0, pins) + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -316,13 +312,13 @@ func TestWriteNewFile(t *testing.T) { } func TestMultiWriteCoal(t *testing.T) { - dserv, pins := getMockDagServ(t) - _, n := getNode(t, dserv, 0, pins) + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -362,13 +358,13 @@ func TestMultiWriteCoal(t *testing.T) { } func TestLargeWriteChunks(t *testing.T) { - dserv, pins := getMockDagServ(t) - _, n := getNode(t, dserv, 0, pins) + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -401,12 +397,12 @@ func TestLargeWriteChunks(t *testing.T) { } func TestDagTruncate(t *testing.T) { - dserv, pins := getMockDagServ(t) - b, n := getNode(t, dserv, 50000, pins) + dserv := getMockDagServ(t) + b, n := getNode(t, dserv, 50000) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -415,6 +411,14 @@ func TestDagTruncate(t *testing.T) { if err != nil { t.Fatal(err) } + size, err := dagmod.Size() + if err != nil { + t.Fatal(err) + } + + if size != 12345 { + t.Fatal("size was incorrect!") + } _, err = dagmod.Seek(0, os.SEEK_SET) if err != nil { @@ -429,15 +433,29 @@ func TestDagTruncate(t *testing.T) { if err = arrComp(out, b[:12345]); err != nil { t.Fatal(err) } + + err = dagmod.Truncate(10) + if err != nil { + t.Fatal(err) + } + + size, err = dagmod.Size() + if err != nil { + t.Fatal(err) + } + + if size != 10 { + t.Fatal("size was incorrect!") + } } func TestSparseWrite(t *testing.T) { - dserv, pins := getMockDagServ(t) - _, n := getNode(t, dserv, 0, pins) + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { t.Fatal(err) } @@ -469,110 +487,16 @@ func TestSparseWrite(t *testing.T) { } } -func basicGC(t *testing.T, bs blockstore.GCBlockstore, pins pin.Pinner) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // in case error occurs during operation - out, err := gc.GC(ctx, bs, pins) - if err != nil { - t.Fatal(err) - } - for range out { - } -} - -func TestCorrectPinning(t *testing.T) { - dserv, bstore, pins := getMockDagServAndBstore(t) - b, n := getNode(t, dserv, 50000, pins) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) - if err != nil { - t.Fatal(err) - } - - buf := make([]byte, 1024) - for i := 0; i < 100; i++ { - size, err := dagmod.Size() - if err != nil { - t.Fatal(err) - } - offset := rand.Intn(int(size)) - u.NewTimeSeededRand().Read(buf) - - if offset+len(buf) > int(size) { - b = append(b[:offset], buf...) - } else { - copy(b[offset:], buf) - } - - n, err := dagmod.WriteAt(buf, int64(offset)) - if err != nil { - t.Fatal(err) - } - if n != len(buf) { - t.Fatal("wrote incorrect number of bytes") - } - } - - fisize, err := dagmod.Size() - if err != nil { - t.Fatal(err) - } - - if int(fisize) != len(b) { - t.Fatal("reported filesize incorrect", fisize, len(b)) - } - - // Run a GC, then ensure we can still read the file correctly - basicGC(t, bstore, pins) - - nd, err := dagmod.GetNode() - if err != nil { - t.Fatal(err) - } - read, err := uio.NewDagReader(context.Background(), nd, dserv) - if err != nil { - t.Fatal(err) - } - - out, err := ioutil.ReadAll(read) - if err != nil { - t.Fatal(err) - } - - if err = arrComp(out, b); err != nil { - t.Fatal(err) - } - - rootk, err := nd.Key() - if err != nil { - t.Fatal(err) - } - - // Verify only one recursive pin - recpins := pins.RecursiveKeys() - if len(recpins) != 1 { - t.Fatal("Incorrect number of pinned entries") - } - - // verify the correct node is pinned - if recpins[0] != rootk { - t.Fatal("Incorrect node recursively pinned") - } - -} - func BenchmarkDagmodWrite(b *testing.B) { b.StopTimer() - dserv, pins := getMockDagServ(b) - _, n := getNode(b, dserv, 0, pins) + dserv := getMockDagServ(b) + _, n := getNode(b, dserv, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() wrsize := 4096 - dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512)) + dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512)) if err != nil { b.Fatal(err) }