Refactor ipnsfs into a more generic and well tested mfs

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy 2015-09-09 15:02:46 -07:00
parent e49e610b07
commit 78a8088410
14 changed files with 1068 additions and 560 deletions

View File

@ -47,7 +47,6 @@ import (
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
mount "github.com/ipfs/go-ipfs/fuse/mount"
ipnsfs "github.com/ipfs/go-ipfs/ipnsfs"
merkledag "github.com/ipfs/go-ipfs/merkledag"
namesys "github.com/ipfs/go-ipfs/namesys"
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
@ -107,8 +106,6 @@ type IpfsNode struct {
Reprovider *rp.Reprovider // the value reprovider system
IpnsRepub *ipnsrp.Republisher
IpnsFs *ipnsfs.Filesystem
proc goprocess.Process
ctx context.Context
@ -334,12 +331,6 @@ func (n *IpfsNode) teardown() error {
closers = append(closers, mount.Closer(n.Mounts.Ipns))
}
// Filesystem needs to be closed before network, dht, and blockservice
// so it can use them as its shutting down
if n.IpnsFs != nil {
closers = append(closers, n.IpnsFs)
}
if n.Blocks != nil {
closers = append(closers, n.Blocks)
}

View File

@ -16,7 +16,7 @@ import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
core "github.com/ipfs/go-ipfs/core"
nsfs "github.com/ipfs/go-ipfs/ipnsfs"
//mfs "github.com/ipfs/go-ipfs/mfs"
namesys "github.com/ipfs/go-ipfs/namesys"
offroute "github.com/ipfs/go-ipfs/routing/offline"
u "github.com/ipfs/go-ipfs/util"
@ -115,12 +115,10 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M
node.Routing = offroute.NewOfflineRouter(node.Repo.Datastore(), node.PrivateKey)
node.Namesys = namesys.NewNameSystem(node.Routing, node.Repo.Datastore(), 0)
ipnsfs, err := nsfs.NewFilesystem(context.Background(), node.DAG, node.Namesys, node.Pinning, node.PrivateKey)
err = InitializeKeyspace(node, node.PrivateKey)
if err != nil {
t.Fatal(err)
}
node.IpnsFs = ipnsfs
}
fs, err := NewFileSystem(node, node.PrivateKey, "", "")

View File

@ -17,9 +17,10 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key"
core "github.com/ipfs/go-ipfs/core"
nsfs "github.com/ipfs/go-ipfs/ipnsfs"
dag "github.com/ipfs/go-ipfs/merkledag"
mfs "github.com/ipfs/go-ipfs/mfs"
ci "github.com/ipfs/go-ipfs/p2p/crypto"
path "github.com/ipfs/go-ipfs/path"
ft "github.com/ipfs/go-ipfs/unixfs"
)
@ -33,10 +34,15 @@ type FileSystem struct {
// NewFileSystem constructs new fs using given core.IpfsNode instance.
func NewFileSystem(ipfs *core.IpfsNode, sk ci.PrivKey, ipfspath, ipnspath string) (*FileSystem, error) {
root, err := CreateRoot(ipfs, []ci.PrivKey{sk}, ipfspath, ipnspath)
kmap := map[string]ci.PrivKey{
"local": sk,
}
root, err := CreateRoot(ipfs, kmap, ipfspath, ipnspath)
if err != nil {
return nil, err
}
return &FileSystem{Ipfs: ipfs, RootNode: root}, nil
}
@ -56,53 +62,95 @@ func (f *FileSystem) Destroy() {
// Root is the root object of the filesystem tree.
type Root struct {
Ipfs *core.IpfsNode
Keys []ci.PrivKey
Keys map[string]ci.PrivKey
// Used for symlinking into ipfs
IpfsRoot string
IpnsRoot string
LocalDirs map[string]fs.Node
Roots map[string]*nsfs.KeyRoot
Roots map[string]*keyRoot
fs *nsfs.Filesystem
LocalLink *Link
LocalLinks map[string]*Link
}
func CreateRoot(ipfs *core.IpfsNode, keys []ci.PrivKey, ipfspath, ipnspath string) (*Root, error) {
func ipnsPubFunc(ipfs *core.IpfsNode, k ci.PrivKey) mfs.PubFunc {
return func(ctx context.Context, key key.Key) error {
return ipfs.Namesys.Publish(ctx, k, path.FromKey(key))
}
}
func loadRoot(ctx context.Context, rt *keyRoot, ipfs *core.IpfsNode, name string) (fs.Node, error) {
p, err := path.ParsePath("/ipns/" + name)
if err != nil {
log.Errorf("mkpath %s: %s", name, err)
return nil, err
}
node, err := core.Resolve(ctx, ipfs, p)
if err != nil {
log.Errorf("looking up %s: %s", p, err)
return nil, err
}
root, err := mfs.NewRoot(ctx, ipfs.DAG, node, ipnsPubFunc(ipfs, rt.k))
if err != nil {
return nil, err
}
rt.root = root
switch val := root.GetValue().(type) {
case *mfs.Directory:
return &Directory{dir: val}, nil
case *mfs.File:
return &File{fi: val}, nil
default:
return nil, errors.New("unrecognized type")
}
panic("not reached")
}
type keyRoot struct {
k ci.PrivKey
alias string
root *mfs.Root
}
func CreateRoot(ipfs *core.IpfsNode, keys map[string]ci.PrivKey, ipfspath, ipnspath string) (*Root, error) {
ldirs := make(map[string]fs.Node)
roots := make(map[string]*nsfs.KeyRoot)
for _, k := range keys {
roots := make(map[string]*keyRoot)
links := make(map[string]*Link)
for alias, k := range keys {
pkh, err := k.GetPublic().Hash()
if err != nil {
return nil, err
}
name := key.Key(pkh).B58String()
root, err := ipfs.IpnsFs.GetRoot(name)
kr := &keyRoot{k: k, alias: alias}
fsn, err := loadRoot(ipfs.Context(), kr, ipfs, name)
if err != nil {
return nil, err
}
roots[name] = root
roots[name] = kr
ldirs[name] = fsn
switch val := root.GetValue().(type) {
case *nsfs.Directory:
ldirs[name] = &Directory{dir: val}
case *nsfs.File:
ldirs[name] = &File{fi: val}
default:
return nil, errors.New("unrecognized type")
// set up alias symlink
links[alias] = &Link{
Target: name,
}
}
return &Root{
fs: ipfs.IpnsFs,
Ipfs: ipfs,
IpfsRoot: ipfspath,
IpnsRoot: ipnspath,
Keys: keys,
LocalDirs: ldirs,
LocalLink: &Link{ipfs.Identity.Pretty()},
Roots: roots,
Ipfs: ipfs,
IpfsRoot: ipfspath,
IpnsRoot: ipnspath,
Keys: keys,
LocalDirs: ldirs,
LocalLinks: links,
Roots: roots,
}, nil
}
@ -121,12 +169,8 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) {
return nil, fuse.ENOENT
}
// Local symlink to the node ID keyspace
if name == "local" {
if s.LocalLink == nil {
return nil, fuse.ENOENT
}
return s.LocalLink, nil
if lnk, ok := s.LocalLinks[name]; ok {
return lnk, nil
}
nd, ok := s.LocalDirs[name]
@ -152,15 +196,15 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) {
if segments[0] == "ipfs" {
p := strings.Join(resolved.Segments()[1:], "/")
return &Link{s.IpfsRoot + "/" + p}, nil
} else {
log.Error("Invalid path.Path: ", resolved)
return nil, errors.New("invalid path from ipns record")
}
log.Error("Invalid path.Path: ", resolved)
return nil, errors.New("invalid path from ipns record")
}
func (r *Root) Close() error {
for _, kr := range r.Roots {
err := kr.Publish(r.Ipfs.Context())
for _, mr := range r.Roots {
err := mr.root.Close()
if err != nil {
return err
}
@ -181,13 +225,9 @@ func (r *Root) Forget() {
// as well as a symlink to the peerID key
func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
log.Debug("Root ReadDirAll")
listing := []fuse.Dirent{
{
Name: "local",
Type: fuse.DT_Link,
},
}
for _, k := range r.Keys {
var listing []fuse.Dirent
for alias, k := range r.Keys {
pub := k.GetPublic()
hash, err := pub.Hash()
if err != nil {
@ -197,21 +237,25 @@ func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
Name: key.Key(hash).Pretty(),
Type: fuse.DT_Dir,
}
listing = append(listing, ent)
link := fuse.Dirent{
Name: alias,
Type: fuse.DT_Link,
}
listing = append(listing, ent, link)
}
return listing, nil
}
// Directory is wrapper over an ipnsfs directory to satisfy the fuse fs interface
// Directory is wrapper over an mfs directory to satisfy the fuse fs interface
type Directory struct {
dir *nsfs.Directory
dir *mfs.Directory
fs.NodeRef
}
// File is wrapper over an ipnsfs file to satisfy the fuse fs interface
// File is wrapper over an mfs file to satisfy the fuse fs interface
type File struct {
fi *nsfs.File
fi *mfs.File
fs.NodeRef
}
@ -249,9 +293,9 @@ func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) {
}
switch child := child.(type) {
case *nsfs.Directory:
case *mfs.Directory:
return &Directory{dir: child}, nil
case *nsfs.File:
case *mfs.File:
return &File{fi: child}, nil
default:
// NB: if this happens, we do not want to continue, unpredictable behaviour
@ -263,19 +307,17 @@ func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) {
// ReadDirAll reads the link structure as directory entries
func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
var entries []fuse.Dirent
for _, name := range dir.dir.List() {
dirent := fuse.Dirent{Name: name}
listing, err := dir.dir.List()
if err != nil {
return nil, err
}
for _, entry := range listing {
dirent := fuse.Dirent{Name: entry.Name}
// TODO: make dir.dir.List() return dirinfos
child, err := dir.dir.Child(name)
if err != nil {
return nil, err
}
switch child.Type() {
case nsfs.TDir:
switch mfs.NodeType(entry.Type) {
case mfs.TDir:
dirent.Type = fuse.DT_Dir
case nsfs.TFile:
case mfs.TFile:
dirent.Type = fuse.DT_File
}
@ -419,7 +461,7 @@ func (dir *Directory) Create(ctx context.Context, req *fuse.CreateRequest, resp
return nil, nil, err
}
fi, ok := child.(*nsfs.File)
fi, ok := child.(*mfs.File)
if !ok {
return nil, nil, errors.New("child creation failed")
}

View File

@ -6,7 +6,6 @@ package ipns
import (
core "github.com/ipfs/go-ipfs/core"
mount "github.com/ipfs/go-ipfs/fuse/mount"
ipnsfs "github.com/ipfs/go-ipfs/ipnsfs"
)
// Mount mounts ipns at a given location, and returns a mount.Mount instance.
@ -18,14 +17,6 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) {
allow_other := cfg.Mounts.FuseAllowOther
if ipfs.IpnsFs == nil {
fs, err := ipnsfs.NewFilesystem(ipfs.Context(), ipfs.DAG, ipfs.Namesys, ipfs.Pinning, ipfs.PrivateKey)
if err != nil {
return nil, err
}
ipfs.IpnsFs = fs
}
fsys, err := NewFileSystem(ipfs, ipfs.PrivateKey, ipfsmp, ipnsmp)
if err != nil {
return nil, err

View File

@ -1,304 +0,0 @@
// package ipnsfs implements an in memory model of a mutable ipns filesystem,
// to be used by the fuse filesystem.
//
// It consists of four main structs:
// 1) The Filesystem
// The filesystem serves as a container and entry point for the ipns filesystem
// 2) KeyRoots
// KeyRoots represent the root of the keyspace controlled by a given keypair
// 3) Directories
// 4) Files
package ipnsfs
import (
"errors"
"os"
"sync"
"time"
key "github.com/ipfs/go-ipfs/blocks/key"
dag "github.com/ipfs/go-ipfs/merkledag"
namesys "github.com/ipfs/go-ipfs/namesys"
ci "github.com/ipfs/go-ipfs/p2p/crypto"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)
var log = logging.Logger("ipnsfs")
var ErrIsDirectory = errors.New("error: is a directory")
// Filesystem is the writeable fuse filesystem structure
type Filesystem struct {
ctx context.Context
dserv dag.DAGService
nsys namesys.NameSystem
resolver *path.Resolver
pins pin.Pinner
roots map[string]*KeyRoot
}
// NewFilesystem instantiates an ipns filesystem using the given parameters and locally owned keys
func NewFilesystem(ctx context.Context, ds dag.DAGService, nsys namesys.NameSystem, pins pin.Pinner, keys ...ci.PrivKey) (*Filesystem, error) {
roots := make(map[string]*KeyRoot)
fs := &Filesystem{
ctx: ctx,
roots: roots,
nsys: nsys,
dserv: ds,
pins: pins,
resolver: &path.Resolver{DAG: ds},
}
for _, k := range keys {
pkh, err := k.GetPublic().Hash()
if err != nil {
return nil, err
}
root, err := fs.newKeyRoot(ctx, k)
if err != nil {
return nil, err
}
roots[key.Key(pkh).Pretty()] = root
}
return fs, nil
}
func (fs *Filesystem) Close() error {
wg := sync.WaitGroup{}
for _, r := range fs.roots {
wg.Add(1)
go func(r *KeyRoot) {
defer wg.Done()
err := r.Publish(fs.ctx)
if err != nil {
log.Info(err)
return
}
}(r)
}
wg.Wait()
return nil
}
// GetRoot returns the KeyRoot of the given name
func (fs *Filesystem) GetRoot(name string) (*KeyRoot, error) {
r, ok := fs.roots[name]
if ok {
return r, nil
}
return nil, os.ErrNotExist
}
type childCloser interface {
closeChild(string, *dag.Node) error
}
type NodeType int
const (
TFile NodeType = iota
TDir
)
// FSNode represents any node (directory, root, or file) in the ipns filesystem
type FSNode interface {
GetNode() (*dag.Node, error)
Type() NodeType
Lock()
Unlock()
}
// KeyRoot represents the root of a filesystem tree pointed to by a given keypair
type KeyRoot struct {
key ci.PrivKey
name string
// node is the merkledag node pointed to by this keypair
node *dag.Node
// A pointer to the filesystem to access components
fs *Filesystem
// val represents the node pointed to by this key. It can either be a File or a Directory
val FSNode
repub *Republisher
}
// newKeyRoot creates a new KeyRoot for the given key, and starts up a republisher routine
// for it
func (fs *Filesystem) newKeyRoot(parent context.Context, k ci.PrivKey) (*KeyRoot, error) {
hash, err := k.GetPublic().Hash()
if err != nil {
return nil, err
}
name := "/ipns/" + key.Key(hash).String()
root := new(KeyRoot)
root.key = k
root.fs = fs
root.name = name
ctx, cancel := context.WithCancel(parent)
defer cancel()
pointsTo, err := fs.nsys.Resolve(ctx, name)
if err != nil {
err = namesys.InitializeKeyspace(ctx, fs.dserv, fs.nsys, fs.pins, k)
if err != nil {
return nil, err
}
pointsTo, err = fs.nsys.Resolve(ctx, name)
if err != nil {
return nil, err
}
}
mnode, err := fs.resolver.ResolvePath(ctx, pointsTo)
if err != nil {
log.Errorf("Failed to retrieve value '%s' for ipns entry: %s\n", pointsTo, err)
return nil, err
}
root.node = mnode
root.repub = NewRepublisher(root, time.Millisecond*300, time.Second*3)
go root.repub.Run(parent)
pbn, err := ft.FromBytes(mnode.Data)
if err != nil {
log.Error("IPNS pointer was not unixfs node")
return nil, err
}
switch pbn.GetType() {
case ft.TDirectory:
root.val = NewDirectory(ctx, pointsTo.String(), mnode, root, fs)
case ft.TFile, ft.TMetadata, ft.TRaw:
fi, err := NewFile(pointsTo.String(), mnode, root, fs)
if err != nil {
return nil, err
}
root.val = fi
default:
panic("unrecognized! (NYI)")
}
return root, nil
}
func (kr *KeyRoot) GetValue() FSNode {
return kr.val
}
// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *KeyRoot) closeChild(name string, nd *dag.Node) error {
kr.repub.Touch()
return nil
}
// Publish publishes the ipns entry associated with this key
func (kr *KeyRoot) Publish(ctx context.Context) error {
child, ok := kr.val.(FSNode)
if !ok {
return errors.New("child of key root not valid type")
}
nd, err := child.GetNode()
if err != nil {
return err
}
// Holding this lock so our child doesnt change out from under us
child.Lock()
k, err := kr.fs.dserv.Add(nd)
if err != nil {
child.Unlock()
return err
}
child.Unlock()
// Dont want to hold the lock while we publish
// otherwise we are holding the lock through a costly
// network operation
kp := path.FromKey(k)
ev := &logging.Metadata{"name": kr.name, "key": kp}
defer log.EventBegin(ctx, "ipnsfsPublishing", ev).Done()
log.Info("ipnsfs publishing %s -> %s", kr.name, kp)
return kr.fs.nsys.Publish(ctx, kr.key, kp)
}
// Republisher manages when to publish the ipns entry associated with a given key
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
Publish chan struct{}
root *KeyRoot
}
// NewRepublisher creates a new Republisher object to republish the given keyroot
// using the given short and long time intervals
func NewRepublisher(root *KeyRoot, tshort, tlong time.Duration) *Republisher {
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
Publish: make(chan struct{}, 1),
root: root,
}
}
// Touch signals that an update has occurred since the last publish.
// Multiple consecutive touches may extend the time period before
// the next Publish occurs in order to more efficiently batch updates
func (np *Republisher) Touch() {
select {
case np.Publish <- struct{}{}:
default:
}
}
// Run is the main republisher loop
func (np *Republisher) Run(ctx context.Context) {
for {
select {
case <-np.Publish:
quick := time.After(np.TimeoutShort)
longer := time.After(np.TimeoutLong)
wait:
select {
case <-ctx.Done():
return
case <-np.Publish:
quick = time.After(np.TimeoutShort)
goto wait
case <-quick:
case <-longer:
}
log.Info("Publishing Changes!")
err := np.root.Publish(ctx)
if err != nil {
log.Error("republishRoot error: %s", err)
}
case <-ctx.Done():
return
}
}
}

View File

@ -1,4 +1,4 @@
package ipnsfs
package mfs
import (
"errors"
@ -15,9 +15,10 @@ import (
var ErrNotYetImplemented = errors.New("not yet implemented")
var ErrInvalidChild = errors.New("invalid child node")
var ErrDirExists = errors.New("directory already has entry by that name")
type Directory struct {
fs *Filesystem
dserv dag.DAGService
parent childCloser
childDirs map[string]*Directory
@ -30,10 +31,10 @@ type Directory struct {
name string
}
func NewDirectory(ctx context.Context, name string, node *dag.Node, parent childCloser, fs *Filesystem) *Directory {
func NewDirectory(ctx context.Context, name string, node *dag.Node, parent childCloser, dserv dag.DAGService) *Directory {
return &Directory{
dserv: dserv,
ctx: ctx,
fs: fs,
name: name,
node: node,
parent: parent,
@ -45,7 +46,7 @@ func NewDirectory(ctx context.Context, name string, node *dag.Node, parent child
// closeChild updates the child by the given name to the dag node 'nd'
// and changes its own dag node, then propogates the changes upward
func (d *Directory) closeChild(name string, nd *dag.Node) error {
_, err := d.fs.dserv.Add(nd)
_, err := d.dserv.Add(nd)
if err != nil {
return err
}
@ -89,7 +90,7 @@ func (d *Directory) childFile(name string) (*File, error) {
case ufspb.Data_Directory:
return nil, ErrIsDirectory
case ufspb.Data_File:
nfi, err := NewFile(name, nd, d, d.fs)
nfi, err := NewFile(name, nd, d, d.dserv)
if err != nil {
return nil, err
}
@ -122,7 +123,7 @@ func (d *Directory) childDir(name string) (*Directory, error) {
switch i.GetType() {
case ufspb.Data_Directory:
ndir := NewDirectory(d.ctx, name, nd, d, d.fs)
ndir := NewDirectory(d.ctx, name, nd, d, d.dserv)
d.childDirs[name] = ndir
return ndir, nil
case ufspb.Data_File:
@ -139,7 +140,7 @@ func (d *Directory) childDir(name string) (*Directory, error) {
func (d *Directory) childFromDag(name string) (*dag.Node, error) {
for _, lnk := range d.node.Links {
if lnk.Name == name {
return lnk.GetNode(d.ctx, d.fs.dserv)
return lnk.GetNode(d.ctx, d.dserv)
}
}
@ -156,6 +157,7 @@ func (d *Directory) Child(name string) (FSNode, error) {
// childUnsync returns the child under this directory by the given name
// without locking, useful for operations which already hold a lock
func (d *Directory) childUnsync(name string) (FSNode, error) {
dir, err := d.childDir(name)
if err == nil {
return dir, nil
@ -168,15 +170,51 @@ func (d *Directory) childUnsync(name string) (FSNode, error) {
return nil, os.ErrNotExist
}
func (d *Directory) List() []string {
type NodeListing struct {
Name string
Type int
Size int64
Hash string
}
func (d *Directory) List() ([]NodeListing, error) {
d.lock.Lock()
defer d.lock.Unlock()
var out []string
for _, lnk := range d.node.Links {
out = append(out, lnk.Name)
var out []NodeListing
for _, l := range d.node.Links {
child := NodeListing{}
child.Name = l.Name
c, err := d.childUnsync(l.Name)
if err != nil {
return nil, err
}
child.Type = int(c.Type())
if c, ok := c.(*File); ok {
size, err := c.Size()
if err != nil {
return nil, err
}
child.Size = size
}
nd, err := c.GetNode()
if err != nil {
return nil, err
}
k, err := nd.Key()
if err != nil {
return nil, err
}
child.Hash = k.B58String()
out = append(out, child)
}
return out
return out, nil
}
func (d *Directory) Mkdir(name string) (*Directory, error) {
@ -193,6 +231,12 @@ func (d *Directory) Mkdir(name string) (*Directory, error) {
}
ndir := &dag.Node{Data: ft.FolderPBData()}
_, err = d.dserv.Add(ndir)
if err != nil {
return nil, err
}
err = d.node.AddNodeLinkClean(name, ndir)
if err != nil {
return nil, err
@ -225,6 +269,7 @@ func (d *Directory) Unlink(name string) error {
func (d *Directory) AddChild(name string, nd *dag.Node) error {
d.Lock()
defer d.Unlock()
pbn, err := ft.FromBytes(nd.Data)
if err != nil {
return err
@ -232,7 +277,7 @@ func (d *Directory) AddChild(name string, nd *dag.Node) error {
_, err = d.childUnsync(name)
if err == nil {
return errors.New("directory already has entry by that name")
return ErrDirExists
}
err = d.node.AddNodeLinkClean(name, nd)
@ -242,9 +287,9 @@ func (d *Directory) AddChild(name string, nd *dag.Node) error {
switch pbn.GetType() {
case ft.TDirectory:
d.childDirs[name] = NewDirectory(d.ctx, name, nd, d, d.fs)
d.childDirs[name] = NewDirectory(d.ctx, name, nd, d, d.dserv)
case ft.TFile, ft.TMetadata, ft.TRaw:
nfi, err := NewFile(name, nd, d, d.fs)
nfi, err := NewFile(name, nd, d, d.dserv)
if err != nil {
return err
}

View File

@ -1,4 +1,4 @@
package ipnsfs
package mfs
import (
"sync"
@ -12,7 +12,6 @@ import (
type File struct {
parent childCloser
fs *Filesystem
name string
hasChanges bool
@ -22,14 +21,13 @@ type File struct {
}
// NewFile returns a NewFile object with the given parameters
func NewFile(name string, node *dag.Node, parent childCloser, fs *Filesystem) (*File, error) {
dmod, err := mod.NewDagModifier(context.Background(), node, fs.dserv, fs.pins, chunk.DefaultSplitter)
func NewFile(name string, node *dag.Node, parent childCloser, dserv dag.DAGService) (*File, error) {
dmod, err := mod.NewDagModifier(context.Background(), node, dserv, chunk.DefaultSplitter)
if err != nil {
return nil, err
}
return &File{
fs: fs,
parent: parent,
name: name,
mod: dmod,

476
mfs/mfs_test.go Normal file
View File

@ -0,0 +1,476 @@
package mfs
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"strings"
"testing"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
importer "github.com/ipfs/go-ipfs/importer"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
u "github.com/ipfs/go-ipfs/util"
)
func getDagserv(t *testing.T) dag.DAGService {
db := dssync.MutexWrap(ds.NewMapDatastore())
bs := bstore.NewBlockstore(db)
blockserv := bserv.New(bs, offline.Exchange(bs))
return dag.NewDAGService(blockserv)
}
func getRandFile(t *testing.T, ds dag.DAGService, size int64) *dag.Node {
r := io.LimitReader(u.NewTimeSeededRand(), size)
nd, err := importer.BuildDagFromReader(ds, chunk.DefaultSplitter(r))
if err != nil {
t.Fatal(err)
}
return nd
}
func mkdirP(t *testing.T, root *Directory, path string) *Directory {
dirs := strings.Split(path, "/")
cur := root
for _, d := range dirs {
n, err := cur.Mkdir(d)
if err != nil && err != os.ErrExist {
t.Fatal(err)
}
if err == os.ErrExist {
fsn, err := cur.Child(d)
if err != nil {
t.Fatal(err)
}
switch fsn := fsn.(type) {
case *Directory:
n = fsn
case *File:
t.Fatal("tried to make a directory where a file already exists")
}
}
cur = n
}
return cur
}
func assertDirAtPath(root *Directory, path string, children []string) error {
fsn, err := DirLookup(root, path)
if err != nil {
return err
}
dir, ok := fsn.(*Directory)
if !ok {
return fmt.Errorf("%s was not a directory", path)
}
listing, err := dir.List()
if err != nil {
return err
}
var names []string
for _, d := range listing {
names = append(names, d.Name)
}
sort.Strings(children)
sort.Strings(names)
if !compStrArrs(children, names) {
return errors.New("directories children did not match!")
}
return nil
}
func compStrArrs(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := 0; i < len(a); i++ {
if a[i] != b[i] {
return false
}
}
return true
}
func assertFileAtPath(ds dag.DAGService, root *Directory, exp *dag.Node, path string) error {
parts := strings.Split(path, "/")
cur := root
for i, d := range parts[:len(parts)-1] {
next, err := cur.Child(d)
if err != nil {
return fmt.Errorf("looking for %s failed: %s", path, err)
}
nextDir, ok := next.(*Directory)
if !ok {
return fmt.Errorf("%s points to a non-directory", parts[:i+1])
}
cur = nextDir
}
last := parts[len(parts)-1]
finaln, err := cur.Child(last)
if err != nil {
return err
}
file, ok := finaln.(*File)
if !ok {
return fmt.Errorf("%s was not a file!", path)
}
out, err := ioutil.ReadAll(file)
if err != nil {
return err
}
expbytes, err := catNode(ds, exp)
if err != nil {
return err
}
if !bytes.Equal(out, expbytes) {
return fmt.Errorf("Incorrect data at path!")
}
return nil
}
func catNode(ds dag.DAGService, nd *dag.Node) ([]byte, error) {
r, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
return nil, err
}
defer r.Close()
return ioutil.ReadAll(r)
}
func setupRoot(ctx context.Context, t *testing.T) (dag.DAGService, *Root) {
ds := getDagserv(t)
root := &dag.Node{Data: ft.FolderPBData()}
rt, err := NewRoot(ctx, ds, root, func(ctx context.Context, k key.Key) error {
fmt.Println("PUBLISHED: ", k)
return nil
})
if err != nil {
t.Fatal(err)
}
return ds, rt
}
func TestBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, rt := setupRoot(ctx, t)
rootdir := rt.GetValue().(*Directory)
// test making a basic dir
_, err := rootdir.Mkdir("a")
if err != nil {
t.Fatal(err)
}
path := "a/b/c/d/e/f/g"
d := mkdirP(t, rootdir, path)
fi := getRandFile(t, ds, 1000)
// test inserting that file
err = d.AddChild("afile", fi)
if err != nil {
t.Fatal(err)
}
err = assertFileAtPath(ds, rootdir, fi, "a/b/c/d/e/f/g/afile")
if err != nil {
t.Fatal(err)
}
}
func TestMkdir(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, rt := setupRoot(ctx, t)
rootdir := rt.GetValue().(*Directory)
dirsToMake := []string{"a", "B", "foo", "bar", "cats", "fish"}
sort.Strings(dirsToMake) // sort for easy comparing later
for _, d := range dirsToMake {
_, err := rootdir.Mkdir(d)
if err != nil {
t.Fatal(err)
}
}
err := assertDirAtPath(rootdir, "/", dirsToMake)
if err != nil {
t.Fatal(err)
}
for _, d := range dirsToMake {
mkdirP(t, rootdir, "a/"+d)
}
err = assertDirAtPath(rootdir, "/a", dirsToMake)
if err != nil {
t.Fatal(err)
}
// mkdir over existing dir should fail
_, err = rootdir.Mkdir("a")
if err == nil {
t.Fatal("should have failed!")
}
}
func TestDirectoryLoadFromDag(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, rt := setupRoot(ctx, t)
rootdir := rt.GetValue().(*Directory)
nd := getRandFile(t, ds, 1000)
_, err := ds.Add(nd)
if err != nil {
t.Fatal(err)
}
fihash, err := nd.Multihash()
if err != nil {
t.Fatal(err)
}
dir := &dag.Node{Data: ft.FolderPBData()}
_, err = ds.Add(dir)
if err != nil {
t.Fatal(err)
}
dirhash, err := dir.Multihash()
if err != nil {
t.Fatal(err)
}
top := &dag.Node{
Data: ft.FolderPBData(),
Links: []*dag.Link{
&dag.Link{
Name: "a",
Hash: fihash,
},
&dag.Link{
Name: "b",
Hash: dirhash,
},
},
}
err = rootdir.AddChild("foo", top)
if err != nil {
t.Fatal(err)
}
// get this dir
topi, err := rootdir.Child("foo")
if err != nil {
t.Fatal(err)
}
topd := topi.(*Directory)
// mkdir over existing but unloaded child file should fail
_, err = topd.Mkdir("a")
if err == nil {
t.Fatal("expected to fail!")
}
// mkdir over existing but unloaded child dir should fail
_, err = topd.Mkdir("b")
if err == nil {
t.Fatal("expected to fail!")
}
// adding a child over an existing path fails
err = topd.AddChild("b", nd)
if err == nil {
t.Fatal("expected to fail!")
}
err = assertFileAtPath(ds, rootdir, nd, "foo/a")
if err != nil {
t.Fatal(err)
}
err = assertDirAtPath(rootdir, "foo/b", nil)
if err != nil {
t.Fatal(err)
}
err = rootdir.Unlink("foo")
if err != nil {
t.Fatal(err)
}
err = assertDirAtPath(rootdir, "", nil)
if err != nil {
t.Fatal(err)
}
}
func TestMfsFile(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, rt := setupRoot(ctx, t)
rootdir := rt.GetValue().(*Directory)
fisize := 1000
nd := getRandFile(t, ds, 1000)
err := rootdir.AddChild("file", nd)
if err != nil {
t.Fatal(err)
}
fsn, err := rootdir.Child("file")
if err != nil {
t.Fatal(err)
}
fi := fsn.(*File)
if fi.Type() != TFile {
t.Fatal("some is seriously wrong here")
}
// assert size is as expected
size, err := fi.Size()
if size != int64(fisize) {
t.Fatal("size isnt correct")
}
// write to beginning of file
b := []byte("THIS IS A TEST")
n, err := fi.Write(b)
if err != nil {
t.Fatal(err)
}
if n != len(b) {
t.Fatal("didnt write correct number of bytes")
}
// sync file
err = fi.Sync()
if err != nil {
t.Fatal(err)
}
// make sure size hasnt changed
size, err = fi.Size()
if size != int64(fisize) {
t.Fatal("size isnt correct")
}
// seek back to beginning
ns, err := fi.Seek(0, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if ns != 0 {
t.Fatal("didnt seek to beginning")
}
// read back bytes we wrote
buf := make([]byte, len(b))
n, err = fi.Read(buf)
if err != nil {
t.Fatal(err)
}
if n != len(buf) {
t.Fatal("didnt read enough")
}
if !bytes.Equal(buf, b) {
t.Fatal("data read was different than data written")
}
// truncate file to ten bytes
err = fi.Truncate(10)
if err != nil {
t.Fatal(err)
}
size, err = fi.Size()
if err != nil {
t.Fatal(err)
}
if size != 10 {
t.Fatal("size was incorrect: ", size)
}
// 'writeAt' to extend it
data := []byte("this is a test foo foo foo")
nwa, err := fi.WriteAt(data, 5)
if err != nil {
t.Fatal(err)
}
if nwa != len(data) {
t.Fatal(err)
}
// assert size once more
size, err = fi.Size()
if err != nil {
t.Fatal(err)
}
if size != int64(5+len(data)) {
t.Fatal("size was incorrect")
}
// make sure we can get node. TODO: verify it later
_, err = fi.GetNode()
if err != nil {
t.Fatal(err)
}
// close it out!
err = fi.Close()
if err != nil {
t.Fatal(err)
}
}

43
mfs/ops.go Normal file
View File

@ -0,0 +1,43 @@
package mfs
import (
"errors"
"fmt"
"strings"
)
func rootLookup(r *Root, path string) (FSNode, error) {
dir, ok := r.GetValue().(*Directory)
if !ok {
return nil, errors.New("root was not a directory")
}
return DirLookup(dir, path)
}
// DirLookup will look up a file or directory at the given path
// under the directory 'd'
func DirLookup(d *Directory, path string) (FSNode, error) {
path = strings.Trim(path, "/")
parts := strings.Split(path, "/")
if len(parts) == 1 && parts[0] == "" {
return d, nil
}
var cur FSNode
cur = d
for i, p := range parts {
chdir, ok := cur.(*Directory)
if !ok {
return nil, fmt.Errorf("cannot access %s: Not a directory", strings.Join(parts[:i+1], "/"))
}
child, err := chdir.Child(p)
if err != nil {
return nil, err
}
cur = child
}
return cur, nil
}

78
mfs/repub_test.go Normal file
View File

@ -0,0 +1,78 @@
package mfs
import (
"testing"
"time"
key "github.com/ipfs/go-ipfs/blocks/key"
ci "github.com/ipfs/go-ipfs/util/testutil/ci"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func TestRepublisher(t *testing.T) {
if ci.IsRunning() {
t.Skip("dont run timing tests in CI")
}
ctx := context.TODO()
pub := make(chan struct{})
pf := func(ctx context.Context, k key.Key) error {
pub <- struct{}{}
return nil
}
tshort := time.Millisecond * 50
tlong := time.Second / 2
rp := NewRepublisher(ctx, pf, tshort, tlong)
go rp.Run()
rp.Update("test")
// should hit short timeout
select {
case <-time.After(tshort * 2):
t.Fatal("publish didnt happen in time")
case <-pub:
}
cctx, cancel := context.WithCancel(context.Background())
go func() {
for {
rp.Update("a")
time.Sleep(time.Millisecond * 10)
select {
case <-cctx.Done():
return
default:
}
}
}()
select {
case <-pub:
t.Fatal("shouldnt have received publish yet!")
case <-time.After((tlong * 9) / 10):
}
select {
case <-pub:
case <-time.After(tlong / 2):
t.Fatal("waited too long for pub!")
}
cancel()
go func() {
err := rp.Close()
if err != nil {
t.Fatal(err)
}
}()
// final pub from closing
<-pub
}

237
mfs/system.go Normal file
View File

@ -0,0 +1,237 @@
// package mfs implements an in memory model of a mutable ipfs filesystem.
//
// It consists of four main structs:
// 1) The Filesystem
// The filesystem serves as a container and entry point for various mfs filesystems
// 2) Root
// Root represents an individual filesystem mounted within the mfs system as a whole
// 3) Directories
// 4) Files
package mfs
import (
"errors"
"sync"
"time"
key "github.com/ipfs/go-ipfs/blocks/key"
dag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)
var ErrNotExist = errors.New("no such rootfs")
var log = logging.Logger("mfs")
var ErrIsDirectory = errors.New("error: is a directory")
type childCloser interface {
closeChild(string, *dag.Node) error
}
type NodeType int
const (
TFile NodeType = iota
TDir
)
// FSNode represents any node (directory, root, or file) in the ipns filesystem
type FSNode interface {
GetNode() (*dag.Node, error)
Type() NodeType
Lock()
Unlock()
}
// Root represents the root of a filesystem tree pointed to by a given keypair
type Root struct {
// node is the merkledag node pointed to by this keypair
node *dag.Node
// val represents the node pointed to by this key. It can either be a File or a Directory
val FSNode
repub *Republisher
dserv dag.DAGService
Type string
}
type PubFunc func(context.Context, key.Key) error
// newRoot creates a new Root for the given key, and starts up a republisher routine
// for it
func NewRoot(parent context.Context, ds dag.DAGService, node *dag.Node, pf PubFunc) (*Root, error) {
ndk, err := node.Key()
if err != nil {
return nil, err
}
root := &Root{
node: node,
repub: NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3),
dserv: ds,
}
root.repub.setVal(ndk)
go root.repub.Run()
pbn, err := ft.FromBytes(node.Data)
if err != nil {
log.Error("IPNS pointer was not unixfs node")
return nil, err
}
switch pbn.GetType() {
case ft.TDirectory:
root.val = NewDirectory(parent, ndk.String(), node, root, ds)
case ft.TFile, ft.TMetadata, ft.TRaw:
fi, err := NewFile(ndk.String(), node, root, ds)
if err != nil {
return nil, err
}
root.val = fi
default:
panic("unrecognized! (NYI)")
}
return root, nil
}
func (kr *Root) GetValue() FSNode {
return kr.val
}
// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *Root) closeChild(name string, nd *dag.Node) error {
k, err := kr.dserv.Add(nd)
if err != nil {
return err
}
kr.repub.Update(k)
return nil
}
func (kr *Root) Close() error {
return kr.repub.Close()
}
// Republisher manages when to publish the ipns entry associated with a given key
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
Publish chan struct{}
pubfunc PubFunc
pubnowch chan struct{}
ctx context.Context
cancel func()
lk sync.Mutex
val key.Key
lastpub key.Key
}
func (rp *Republisher) getVal() key.Key {
rp.lk.Lock()
defer rp.lk.Unlock()
return rp.val
}
// NewRepublisher creates a new Republisher object to republish the given keyroot
// using the given short and long time intervals
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
Publish: make(chan struct{}, 1),
pubfunc: pf,
pubnowch: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
}
func (p *Republisher) setVal(k key.Key) {
p.lk.Lock()
defer p.lk.Unlock()
p.val = k
}
func (p *Republisher) pubNow() {
select {
case p.pubnowch <- struct{}{}:
default:
}
}
func (p *Republisher) Close() error {
err := p.publish(p.ctx)
p.cancel()
return err
}
// Touch signals that an update has occurred since the last publish.
// Multiple consecutive touches may extend the time period before
// the next Publish occurs in order to more efficiently batch updates
func (np *Republisher) Update(k key.Key) {
np.setVal(k)
select {
case np.Publish <- struct{}{}:
default:
}
}
// Run is the main republisher loop
func (np *Republisher) Run() {
for {
select {
case <-np.Publish:
quick := time.After(np.TimeoutShort)
longer := time.After(np.TimeoutLong)
wait:
select {
case <-np.ctx.Done():
return
case <-np.Publish:
quick = time.After(np.TimeoutShort)
goto wait
case <-quick:
case <-longer:
case <-np.pubnowch:
}
err := np.publish(np.ctx)
if err != nil {
log.Error("republishRoot error: %s", err)
}
case <-np.ctx.Done():
return
}
}
}
func (np *Republisher) publish(ctx context.Context) error {
np.lk.Lock()
topub := np.val
np.lk.Unlock()
log.Info("Publishing Changes!")
err := np.pubfunc(ctx, topub)
if err != nil {
return err
}
np.lk.Lock()
np.lastpub = topub
np.lk.Unlock()
return nil
}

View File

@ -67,6 +67,7 @@ func WrapData(b []byte) []byte {
typ := pb.Data_Raw
pbdata.Data = b
pbdata.Type = &typ
pbdata.Filesize = proto.Uint64(uint64(len(b)))
out, err := proto.Marshal(pbdata)
if err != nil {

View File

@ -15,7 +15,6 @@ import (
help "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
mdag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
@ -36,7 +35,6 @@ var log = logging.Logger("dagio")
type DagModifier struct {
dagserv mdag.DAGService
curNode *mdag.Node
mp pin.Pinner
splitter chunk.SplitterGen
ctx context.Context
@ -49,13 +47,12 @@ type DagModifier struct {
read *uio.DagReader
}
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.Pinner, spl chunk.SplitterGen) (*DagModifier, error) {
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, spl chunk.SplitterGen) (*DagModifier, error) {
return &DagModifier{
curNode: from.Copy(),
dagserv: serv,
splitter: spl,
ctx: ctx,
mp: mp,
}, nil
}
@ -174,7 +171,7 @@ func (dm *DagModifier) Sync() error {
buflen := dm.wrBuf.Len()
// Grab key for unpinning after mod operation
curk, err := dm.curNode.Key()
_, err := dm.curNode.Key()
if err != nil {
return err
}
@ -208,15 +205,6 @@ func (dm *DagModifier) Sync() error {
dm.curNode = nd
}
// Finalize correct pinning, and flush pinner.
// Be careful about the order, as curk might equal thisk.
dm.mp.RemovePinWithMode(curk, pin.Recursive)
dm.mp.PinWithMode(thisk, pin.Recursive)
err = dm.mp.Flush()
if err != nil {
return err
}
dm.writeStart += uint64(buflen)
dm.wrBuf = nil

View File

@ -4,7 +4,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
@ -17,8 +16,6 @@ import (
h "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"
mdag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
gc "github.com/ipfs/go-ipfs/pin/gc"
ft "github.com/ipfs/go-ipfs/unixfs"
uio "github.com/ipfs/go-ipfs/unixfs/io"
u "github.com/ipfs/go-ipfs/util"
@ -27,25 +24,24 @@ import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func getMockDagServ(t testing.TB) (mdag.DAGService, pin.Pinner) {
func getMockDagServ(t testing.TB) mdag.DAGService {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv := bs.New(bstore, offline.Exchange(bstore))
return mdag.NewDAGService(bserv)
}
func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore) {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
return dserv, pin.NewPinner(tsds, dserv)
return dserv, bstore
}
func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore, pin.Pinner) {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
return dserv, bstore, pin.NewPinner(tsds, dserv)
}
func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.Pinner) ([]byte, *mdag.Node) {
func getNode(t testing.TB, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
in := io.LimitReader(u.NewTimeSeededRand(), size)
node, err := imp.BuildTrickleDagFromReader(dserv, sizeSplitterGen(500)(in))
if err != nil {
@ -118,12 +114,12 @@ func sizeSplitterGen(size int64) chunk.SplitterGen {
}
func TestDagModifierBasic(t *testing.T) {
dserv, pin := getMockDagServ(t)
b, n := getNode(t, dserv, 50000, pin)
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pin, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
@ -172,13 +168,13 @@ func TestDagModifierBasic(t *testing.T) {
}
func TestMultiWrite(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
@ -225,13 +221,13 @@ func TestMultiWrite(t *testing.T) {
}
func TestMultiWriteAndFlush(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
@ -273,13 +269,13 @@ func TestMultiWriteAndFlush(t *testing.T) {
}
func TestWriteNewFile(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
@ -316,13 +312,13 @@ func TestWriteNewFile(t *testing.T) {
}
func TestMultiWriteCoal(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
@ -362,13 +358,13 @@ func TestMultiWriteCoal(t *testing.T) {
}
func TestLargeWriteChunks(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
@ -401,12 +397,12 @@ func TestLargeWriteChunks(t *testing.T) {
}
func TestDagTruncate(t *testing.T) {
dserv, pins := getMockDagServ(t)
b, n := getNode(t, dserv, 50000, pins)
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
@ -415,6 +411,14 @@ func TestDagTruncate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
size, err := dagmod.Size()
if err != nil {
t.Fatal(err)
}
if size != 12345 {
t.Fatal("size was incorrect!")
}
_, err = dagmod.Seek(0, os.SEEK_SET)
if err != nil {
@ -429,15 +433,29 @@ func TestDagTruncate(t *testing.T) {
if err = arrComp(out, b[:12345]); err != nil {
t.Fatal(err)
}
err = dagmod.Truncate(10)
if err != nil {
t.Fatal(err)
}
size, err = dagmod.Size()
if err != nil {
t.Fatal(err)
}
if size != 10 {
t.Fatal("size was incorrect!")
}
}
func TestSparseWrite(t *testing.T) {
dserv, pins := getMockDagServ(t)
_, n := getNode(t, dserv, 0, pins)
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
@ -469,110 +487,16 @@ func TestSparseWrite(t *testing.T) {
}
}
func basicGC(t *testing.T, bs blockstore.GCBlockstore, pins pin.Pinner) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // in case error occurs during operation
out, err := gc.GC(ctx, bs, pins)
if err != nil {
t.Fatal(err)
}
for range out {
}
}
func TestCorrectPinning(t *testing.T) {
dserv, bstore, pins := getMockDagServAndBstore(t)
b, n := getNode(t, dserv, 50000, pins)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 1024)
for i := 0; i < 100; i++ {
size, err := dagmod.Size()
if err != nil {
t.Fatal(err)
}
offset := rand.Intn(int(size))
u.NewTimeSeededRand().Read(buf)
if offset+len(buf) > int(size) {
b = append(b[:offset], buf...)
} else {
copy(b[offset:], buf)
}
n, err := dagmod.WriteAt(buf, int64(offset))
if err != nil {
t.Fatal(err)
}
if n != len(buf) {
t.Fatal("wrote incorrect number of bytes")
}
}
fisize, err := dagmod.Size()
if err != nil {
t.Fatal(err)
}
if int(fisize) != len(b) {
t.Fatal("reported filesize incorrect", fisize, len(b))
}
// Run a GC, then ensure we can still read the file correctly
basicGC(t, bstore, pins)
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := uio.NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
if err = arrComp(out, b); err != nil {
t.Fatal(err)
}
rootk, err := nd.Key()
if err != nil {
t.Fatal(err)
}
// Verify only one recursive pin
recpins := pins.RecursiveKeys()
if len(recpins) != 1 {
t.Fatal("Incorrect number of pinned entries")
}
// verify the correct node is pinned
if recpins[0] != rootk {
t.Fatal("Incorrect node recursively pinned")
}
}
func BenchmarkDagmodWrite(b *testing.B) {
b.StopTimer()
dserv, pins := getMockDagServ(b)
_, n := getNode(b, dserv, 0, pins)
dserv := getMockDagServ(b)
_, n := getNode(b, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wrsize := 4096
dagmod, err := NewDagModifier(ctx, n, dserv, pins, sizeSplitterGen(512))
dagmod, err := NewDagModifier(ctx, n, dserv, sizeSplitterGen(512))
if err != nil {
b.Fatal(err)
}