From b9658f0cb259b6b12fe64444899b48918028c2e8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 16 Mar 2015 17:25:00 -0700 Subject: [PATCH 1/5] refactor ipns fuse to utilize ipnsfs --- fuse/ipns/ipns_test.go | 241 +++++++++++++++- fuse/ipns/ipns_unix.go | 626 ++++++++++++++-------------------------- fuse/ipns/repub_unix.go | 44 --- 3 files changed, 440 insertions(+), 471 deletions(-) delete mode 100644 fuse/ipns/repub_unix.go diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index 83f175b37..5d523f0be 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -5,16 +5,19 @@ package ipns import ( "bytes" "crypto/rand" + "fmt" "io/ioutil" + mrand "math/rand" "os" + "sync" "testing" - "time" fstest "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs/fstestutil" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + racedet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" core "github.com/jbenet/go-ipfs/core" - u "github.com/jbenet/go-ipfs/util" + nsfs "github.com/jbenet/go-ipfs/ipnsfs" ci "github.com/jbenet/go-ipfs/util/testutil/ci" ) @@ -30,6 +33,13 @@ func randBytes(size int) []byte { return b } +func mkdir(t *testing.T, path string) { + err := os.Mkdir(path, os.ModeDir) + if err != nil { + t.Fatal(err) + } +} + func writeFile(t *testing.T, size int, path string) []byte { return writeFileData(t, randBytes(size), path) } @@ -57,6 +67,38 @@ func writeFileData(t *testing.T, data []byte, path string) []byte { return data } +func verifyFile(t *testing.T, path string, data []byte) { + fi, err := os.Open(path) + if err != nil { + t.Fatal(err) + } + defer fi.Close() + + out, err := ioutil.ReadAll(fi) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(out, data) { + t.Fatal("Data not equal") + } +} + +func checkExists(t *testing.T, path string) { + _, err := os.Stat(path) + if err != nil { + t.Fatal(err) + } +} + +func closeMount(mnt *fstest.Mount) { + if err := recover(); err != nil { + log.Error("Recovered panic") + log.Error(err) + } + mnt.Close() +} + func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.Mount) { maybeSkipFuseTests(t) @@ -66,6 +108,13 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M if err != nil { t.Fatal(err) } + + ipnsfs, err := nsfs.NewFilesystem(context.TODO(), node.DAG, node.Namesys, node.Pinning, node.PrivateKey) + if err != nil { + t.Fatal(err) + } + + node.IpnsFs = ipnsfs } fs, err := NewFileSystem(node, node.PrivateKey, "") @@ -80,17 +129,29 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M return node, mnt } +func TestIpnsLocalLink(t *testing.T) { + _, mnt := setupIpnsTest(t, nil) + defer mnt.Close() + name := mnt.Dir + "/local" + + finfo, err := os.Stat(name) + if err != nil { + t.Fatal(err) + } + + t.Log(finfo.Name()) +} + // Test writing a file and reading it back func TestIpnsBasicIO(t *testing.T) { - t.Skip("Skipping until DAGModifier can be fixed.") if testing.Short() { t.SkipNow() } _, mnt := setupIpnsTest(t, nil) - defer mnt.Close() + defer closeMount(mnt) fname := mnt.Dir + "/local/testfile" - data := writeFile(t, 12345, fname) + data := writeFile(t, 10, fname) rbuf, err := ioutil.ReadFile(fname) if err != nil { @@ -104,7 +165,6 @@ func TestIpnsBasicIO(t *testing.T) { // Test to make sure file changes persist over mounts of ipns func TestFilePersistence(t *testing.T) { - t.Skip("Skipping until DAGModifier can be fixed.") if testing.Short() { t.SkipNow() } @@ -113,11 +173,9 @@ func TestFilePersistence(t *testing.T) { fname := "/local/atestfile" data := writeFile(t, 127, mnt.Dir+fname) - // Wait for publish: TODO: make publish happen faster in tests - time.Sleep(time.Millisecond * 40) - mnt.Close() + t.Log("Closed, opening new fs") node, mnt = setupIpnsTest(t, node) defer mnt.Close() @@ -131,9 +189,45 @@ func TestFilePersistence(t *testing.T) { } } +func TestDeeperDirs(t *testing.T) { + node, mnt := setupIpnsTest(t, nil) + + t.Log("make a top level dir") + dir1 := "/local/test1" + mkdir(t, mnt.Dir+dir1) + + checkExists(t, mnt.Dir+dir1) + + t.Log("write a file in it") + data1 := writeFile(t, 4000, mnt.Dir+dir1+"/file1") + + verifyFile(t, mnt.Dir+dir1+"/file1", data1) + + t.Log("sub directory") + mkdir(t, mnt.Dir+dir1+"/dir2") + + checkExists(t, mnt.Dir+dir1+"/dir2") + + t.Log("file in that subdirectory") + data2 := writeFile(t, 5000, mnt.Dir+dir1+"/dir2/file2") + + verifyFile(t, mnt.Dir+dir1+"/dir2/file2", data2) + + mnt.Close() + t.Log("closing mount, then restarting") + + _, mnt = setupIpnsTest(t, node) + + checkExists(t, mnt.Dir+dir1) + + verifyFile(t, mnt.Dir+dir1+"/file1", data1) + + verifyFile(t, mnt.Dir+dir1+"/dir2/file2", data2) + mnt.Close() +} + // Test to make sure the filesystem reports file sizes correctly func TestFileSizeReporting(t *testing.T) { - t.Skip("Skipping until DAGModifier can be fixed.") if testing.Short() { t.SkipNow() } @@ -155,7 +249,6 @@ func TestFileSizeReporting(t *testing.T) { // Test to make sure you cant create multiple entries with the same name func TestDoubleEntryFailure(t *testing.T) { - t.Skip("Skipping until DAGModifier can be fixed.") if testing.Short() { t.SkipNow() } @@ -175,7 +268,6 @@ func TestDoubleEntryFailure(t *testing.T) { } func TestAppendFile(t *testing.T) { - t.Skip("Skipping until DAGModifier can be fixed.") if testing.Short() { t.SkipNow() } @@ -216,8 +308,126 @@ func TestAppendFile(t *testing.T) { } } +func TestConcurrentWrites(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + _, mnt := setupIpnsTest(t, nil) + defer mnt.Close() + + nactors := 4 + filesPerActor := 400 + fileSize := 2000 + + data := make([][][]byte, nactors) + + if racedet.WithRace() { + nactors = 2 + filesPerActor = 50 + } + + wg := sync.WaitGroup{} + for i := 0; i < nactors; i++ { + data[i] = make([][]byte, filesPerActor) + wg.Add(1) + go func(n int) { + defer wg.Done() + for j := 0; j < filesPerActor; j++ { + out := writeFile(t, fileSize, mnt.Dir+fmt.Sprintf("/local/%dFILE%d", n, j)) + data[n][j] = out + } + }(i) + } + wg.Wait() + + for i := 0; i < nactors; i++ { + for j := 0; j < filesPerActor; j++ { + verifyFile(t, mnt.Dir+fmt.Sprintf("/local/%dFILE%d", i, j), data[i][j]) + } + } +} + +func TestFSThrash(t *testing.T) { + files := make(map[string][]byte) + + if testing.Short() { + t.SkipNow() + } + _, mnt := setupIpnsTest(t, nil) + defer mnt.Close() + + base := mnt.Dir + "/local" + dirs := []string{base} + dirlock := sync.RWMutex{} + filelock := sync.Mutex{} + + ndirWorkers := 2 + nfileWorkers := 2 + + ndirs := 100 + nfiles := 200 + + wg := sync.WaitGroup{} + + // Spawn off workers to make directories + for i := 0; i < ndirWorkers; i++ { + wg.Add(1) + go func(worker int) { + defer wg.Done() + for j := 0; j < ndirs; j++ { + dirlock.RLock() + n := mrand.Intn(len(dirs)) + dir := dirs[n] + dirlock.RUnlock() + + newDir := fmt.Sprintf("%s/dir%d-%d", dir, worker, j) + err := os.Mkdir(newDir, os.ModeDir) + if err != nil { + t.Fatal(err) + } + dirlock.Lock() + dirs = append(dirs, newDir) + dirlock.Unlock() + } + }(i) + } + + // Spawn off workers to make files + for i := 0; i < nfileWorkers; i++ { + wg.Add(1) + go func(worker int) { + defer wg.Done() + for j := 0; j < nfiles; j++ { + dirlock.RLock() + n := mrand.Intn(len(dirs)) + dir := dirs[n] + dirlock.RUnlock() + + newFileName := fmt.Sprintf("%s/file%d-%d", dir, worker, j) + + data := writeFile(t, 2000+mrand.Intn(5000), newFileName) + filelock.Lock() + files[newFileName] = data + filelock.Unlock() + } + }(i) + } + + wg.Wait() + for name, data := range files { + out, err := ioutil.ReadFile(name) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(data, out) { + t.Fatal("Data didnt match") + } + } +} + +/* func TestFastRepublish(t *testing.T) { - t.Skip("Skipping until DAGModifier can be fixed.") if testing.Short() { t.SkipNow() } @@ -319,10 +529,11 @@ func TestFastRepublish(t *testing.T) { close(closed) } +*/ // Test writing a medium sized file one byte at a time func TestMultiWrite(t *testing.T) { - t.Skip("Skipping until DAGModifier can be fixed.") + if testing.Short() { t.SkipNow() } diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index 5a4ca4960..6b839cf78 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -8,37 +8,22 @@ import ( "errors" "io" "os" - "path/filepath" - "time" fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse" fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs" - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" core "github.com/jbenet/go-ipfs/core" - chunk "github.com/jbenet/go-ipfs/importer/chunk" - mdag "github.com/jbenet/go-ipfs/merkledag" + nsfs "github.com/jbenet/go-ipfs/ipnsfs" + dag "github.com/jbenet/go-ipfs/merkledag" ci "github.com/jbenet/go-ipfs/p2p/crypto" - path "github.com/jbenet/go-ipfs/path" ft "github.com/jbenet/go-ipfs/unixfs" - uio "github.com/jbenet/go-ipfs/unixfs/io" - mod "github.com/jbenet/go-ipfs/unixfs/mod" - ftpb "github.com/jbenet/go-ipfs/unixfs/pb" u "github.com/jbenet/go-ipfs/util" - lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" ) -const IpnsReadonly = true - var log = eventlog.Logger("fuse/ipns") -var ( - shortRepublishTimeout = time.Millisecond * 5 - longRepublishTimeout = time.Millisecond * 500 -) - // FileSystem is the readwrite IPNS Fuse Filesystem. type FileSystem struct { Ipfs *core.IpfsNode @@ -54,73 +39,17 @@ func NewFileSystem(ipfs *core.IpfsNode, sk ci.PrivKey, ipfspath string) (*FileSy return &FileSystem{Ipfs: ipfs, RootNode: root}, nil } -func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, error) { - root := new(Root) - root.LocalDirs = make(map[string]*Node) - root.Ipfs = n - abspath, err := filepath.Abs(ipfsroot) - if err != nil { - return nil, err - } - root.IpfsRoot = abspath - - root.Keys = keys - - if len(keys) == 0 { - log.Warning("No keys given for ipns root creation") - } else { - k := keys[0] - pub := k.GetPublic() - hash, err := pub.Hash() - if err != nil { - return nil, err - } - root.LocalLink = &Link{u.Key(hash).Pretty()} - } - - for _, k := range keys { - hash, err := k.GetPublic().Hash() - if err != nil { - log.Debug("failed to hash public key.") - continue - } - name := u.Key(hash).Pretty() - nd := new(Node) - nd.Ipfs = n - nd.key = k - nd.repub = NewRepublisher(nd, shortRepublishTimeout, longRepublishTimeout) - - go nd.repub.Run() - - pointsTo, err := n.Namesys.Resolve(n.Context(), name) - if err != nil { - log.Warning("Could not resolve value for local ipns entry, providing empty dir") - nd.Nd = &mdag.Node{Data: ft.FolderPBData()} - root.LocalDirs[name] = nd - continue - } - - if !u.IsValidHash(pointsTo.B58String()) { - log.Criticalf("Got back bad data from namesys resolve! [%s]", pointsTo) - return nil, nil - } - - node, err := n.Resolver.ResolvePath(path.Path(pointsTo.B58String())) - if err != nil { - log.Warning("Failed to resolve value from ipns entry in ipfs") - continue - } - - nd.Nd = node - root.LocalDirs[name] = nd - } - - return root, nil +// Root constructs the Root of the filesystem, a Root object. +func (f *FileSystem) Root() (fs.Node, error) { + log.Debug("Filesystem, get root") + return f.RootNode, nil } -// Root constructs the Root of the filesystem, a Root object. -func (f FileSystem) Root() (fs.Node, error) { - return f.RootNode, nil +func (f *FileSystem) Destroy() { + err := f.RootNode.Close() + if err != nil { + log.Errorf("Error Shutting Down Filesystem: %s\n", err) + } } // Root is the root object of the filesystem tree. @@ -130,13 +59,53 @@ type Root struct { // Used for symlinking into ipfs IpfsRoot string - LocalDirs map[string]*Node + LocalDirs map[string]fs.Node + Roots map[string]*nsfs.KeyRoot + fs *nsfs.Filesystem LocalLink *Link } +func CreateRoot(ipfs *core.IpfsNode, keys []ci.PrivKey, ipfspath string) (*Root, error) { + ldirs := make(map[string]fs.Node) + roots := make(map[string]*nsfs.KeyRoot) + for _, k := range keys { + pkh, err := k.GetPublic().Hash() + if err != nil { + return nil, err + } + name := u.Key(pkh).B58String() + root, err := ipfs.IpnsFs.GetRoot(name) + if err != nil { + return nil, err + } + + roots[name] = root + + switch val := root.GetValue().(type) { + case *nsfs.Directory: + ldirs[name] = &Directory{dir: val} + case *nsfs.File: + ldirs[name] = &File{fi: val} + default: + return nil, errors.New("unrecognized type") + } + } + + return &Root{ + fs: ipfs.IpnsFs, + Ipfs: ipfs, + IpfsRoot: ipfspath, + Keys: keys, + LocalDirs: ldirs, + LocalLink: &Link{ipfs.Identity.Pretty()}, + Roots: roots, + }, nil +} + // Attr returns file attributes. func (*Root) Attr() fuse.Attr { + log.Debug("Root Attr") return fuse.Attr{Mode: os.ModeDir | 0111} // -rw+x } @@ -148,6 +117,7 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) { return nil, fuse.ENOENT } + // Local symlink to the node ID keyspace if name == "local" { if s.LocalLink == nil { return nil, fuse.ENOENT @@ -157,9 +127,17 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) { nd, ok := s.LocalDirs[name] if ok { - return nd, nil + switch nd := nd.(type) { + case *Directory: + return nd, nil + case *File: + return nd, nil + default: + return nil, fuse.EIO + } } + // other links go through ipns resolution and are symlinked into the ipfs mountpoint resolved, err := s.Ipfs.Namesys.Resolve(s.Ipfs.Context(), name) if err != nil { log.Warningf("ipns: namesys resolve error: %s", err) @@ -169,8 +147,29 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) { return &Link{s.IpfsRoot + "/" + resolved.B58String()}, nil } -// ReadDirAll reads a particular directory. Disallowed for root. +func (r *Root) Close() error { + for _, kr := range r.Roots { + err := kr.Publish(r.Ipfs.Context()) + if err != nil { + return err + } + } + return nil +} + +// Forget is called when the filesystem is unmounted. probably. +// see comments here: http://godoc.org/bazil.org/fuse/fs#FSDestroyer +func (r *Root) Forget() { + err := r.Close() + if err != nil { + log.Error(err) + } +} + +// ReadDirAll reads a particular directory. Will show locally available keys +// as well as a symlink to the peerID key func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { + log.Debug("Root ReadDirAll") listing := []fuse.Dirent{ fuse.Dirent{ Name: "local", @@ -192,115 +191,78 @@ func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { return listing, nil } -// Node is the core object representing a filesystem tree node. -type Node struct { - root *Root - nsRoot *Node - parent *Node +// Directory is wrapper over an ipnsfs directory to satisfy the fuse fs interface +type Directory struct { + dir *nsfs.Directory - repub *Republisher - - // This nodes name in its parent dir. - // NOTE: this strategy wont work well if we allow hard links - // (im all for murdering the thought of hard links) - name string - - // Private keys held by nodes at the root of a keyspace - // WARNING(security): the PrivKey interface is currently insecure - // (holds the raw key). It will be secured later. - key ci.PrivKey - - Ipfs *core.IpfsNode - Nd *mdag.Node - dagMod *mod.DagModifier - cached *ftpb.Data + fs.NodeRef } -func (s *Node) loadData() error { - s.cached = new(ftpb.Data) - return proto.Unmarshal(s.Nd.Data, s.cached) +// File is wrapper over an ipnsfs file to satisfy the fuse fs interface +type File struct { + fi *nsfs.File + + fs.NodeRef } // Attr returns the attributes of a given node. -func (s *Node) Attr() fuse.Attr { - if s.cached == nil { - err := s.loadData() - if err != nil { - log.Debugf("Error loading PBData for file: '%s'", s.name) - } +func (d *Directory) Attr() fuse.Attr { + log.Debug("Directory Attr") + return fuse.Attr{Mode: os.ModeDir | 0555} +} + +// Attr returns the attributes of a given node. +func (fi *File) Attr() fuse.Attr { + log.Debug("File Attr") + size, err := fi.fi.Size() + if err != nil { + // In this case, the dag node in question may not be unixfs + log.Critical("Failed to get file size: %s", err) } - switch s.cached.GetType() { - case ftpb.Data_Directory: - return fuse.Attr{Mode: os.ModeDir | 0555} - case ftpb.Data_File, ftpb.Data_Raw: - size, err := ft.DataSize(s.Nd.Data) - if err != nil { - log.Debugf("Error getting size of file: %s", err) - size = 0 - } - if size == 0 { - dmsize, err := s.dagMod.Size() - if err != nil { - log.Error(err) - } - size = uint64(dmsize) - } - - mode := os.FileMode(0666) - if IpnsReadonly { - mode = 0444 - } - - return fuse.Attr{ - Mode: mode, - Size: size, - Blocks: uint64(len(s.Nd.Links)), - } - default: - log.Debug("Invalid data type.") - return fuse.Attr{} + return fuse.Attr{ + Mode: os.FileMode(0666), + Size: uint64(size), } } // Lookup performs a lookup under this node. -func (s *Node) Lookup(ctx context.Context, name string) (fs.Node, error) { - nodes, err := s.Ipfs.Resolver.ResolveLinks(s.Nd, []string{name}) +func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) { + child, err := s.dir.Child(name) if err != nil { // todo: make this error more versatile. return nil, fuse.ENOENT } - return s.makeChild(name, nodes[len(nodes)-1]), nil -} - -func (n *Node) makeChild(name string, node *mdag.Node) *Node { - child := &Node{ - Ipfs: n.Ipfs, - Nd: node, - name: name, - nsRoot: n.nsRoot, - parent: n, + switch child := child.(type) { + case *nsfs.Directory: + return &Directory{dir: child}, nil + case *nsfs.File: + return &File{fi: child}, nil + default: + panic("system has proven to be insane") } - - // Always ensure that each child knows where the root is - if n.nsRoot == nil { - child.nsRoot = n - } else { - child.nsRoot = n.nsRoot - } - - return child } // ReadDirAll reads the link structure as directory entries -func (s *Node) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { - entries := make([]fuse.Dirent, len(s.Nd.Links)) - for i, link := range s.Nd.Links { - n := link.Name - if len(n) == 0 { - n = link.Hash.B58String() +func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { + var entries []fuse.Dirent + for _, name := range dir.dir.List() { + dirent := fuse.Dirent{Name: name} + + // TODO: make dir.dir.List() return dirinfos + child, err := dir.dir.Child(name) + if err != nil { + return nil, err } - entries[i] = fuse.Dirent{Name: n, Type: fuse.DT_File} + + switch child.Type() { + case nsfs.TDir: + dirent.Type = fuse.DT_Dir + case nsfs.TFile: + dirent.Type = fuse.DT_File + } + + entries = append(entries, dirent) } if len(entries) > 0 { @@ -309,279 +271,130 @@ func (s *Node) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { return nil, fuse.ENOENT } -func (s *Node) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - k, err := s.Nd.Key() +func (fi *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { + _, err := fi.fi.Seek(req.Offset, os.SEEK_SET) if err != nil { return err } - // setup our logging event - lm := make(lgbl.DeferredMap) - lm["fs"] = "ipns" - lm["key"] = func() interface{} { return k.Pretty() } - lm["req_offset"] = req.Offset - lm["req_size"] = req.Size - defer log.EventBegin(ctx, "fuseRead", lm).Done() - - r, err := uio.NewDagReader(ctx, s.Nd, s.Ipfs.DAG) - if err != nil { - return err - } - o, err := r.Seek(req.Offset, os.SEEK_SET) - lm["res_offset"] = o + fisize, err := fi.fi.Size() if err != nil { return err } - buf := resp.Data[:min(req.Size, int(r.Size()))] - n, err := io.ReadFull(r, buf) + readsize := min(req.Size, int(fisize-req.Offset)) + n, err := io.ReadFull(fi.fi, resp.Data[:readsize]) resp.Data = resp.Data[:n] - lm["res_size"] = n return err // may be non-nil / not succeeded } -func (n *Node) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { - // log.Debugf("ipns: Node Write [%s]: flags = %s, offset = %d, size = %d", n.name, req.Flags.String(), req.Offset, len(req.Data)) - if IpnsReadonly { - log.Debug("Attempted to write on readonly ipns filesystem.") - return fuse.EPERM - } - - if n.dagMod == nil { - // Create a DagModifier to allow us to change the existing dag node - dmod, err := mod.NewDagModifier(ctx, n.Nd, n.Ipfs.DAG, n.Ipfs.Pinning.GetManual(), chunk.DefaultSplitter) - if err != nil { - return err - } - n.dagMod = dmod - } - wrote, err := n.dagMod.WriteAt(req.Data, int64(req.Offset)) +func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { + wrote, err := fi.fi.WriteAt(req.Data, req.Offset) if err != nil { return err } resp.Size = wrote + return nil } -func (n *Node) Flush(ctx context.Context, req *fuse.FlushRequest) error { - if IpnsReadonly { - return nil - } - - // If a write has happened - if n.dagMod != nil { - newNode, err := n.dagMod.GetNode() - if err != nil { - return err - } - - if n.parent != nil { - log.Error("updating self in parent!") - err := n.parent.update(n.name, newNode) - if err != nil { - log.Criticalf("error in updating ipns dag tree: %s", err) - // return fuse.ETHISISPRETTYBAD - return err - } - } - n.Nd = newNode - - /*/TEMP - dr, err := mdag.NewDagReader(n.Nd, n.Ipfs.DAG) - if err != nil { - log.Critical("Verification read failed.") - } - b, err := ioutil.ReadAll(dr) - if err != nil { - log.Critical("Verification read failed.") - } - fmt.Println("VERIFICATION READ") - fmt.Printf("READ %d BYTES\n", len(b)) - fmt.Println(string(b)) - fmt.Println(b) - //*/ - - n.dagMod = nil - - n.wasChanged() - } - return nil +func (fi *File) Flush(ctx context.Context, req *fuse.FlushRequest) error { + return fi.fi.Close() } -// Signal that a node in this tree was changed so the root can republish -func (n *Node) wasChanged() { - if IpnsReadonly { - return - } - root := n.nsRoot - if root == nil { - root = n - } - - root.repub.Publish <- struct{}{} +func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { + return fi.fi.Flush() } -func (n *Node) republishRoot() error { - - // We should already be the root, this is just a sanity check - var root *Node - if n.nsRoot != nil { - root = n.nsRoot - } else { - root = n - } - - // Add any nodes that may be new to the DAG service - err := n.Ipfs.DAG.AddRecursive(root.Nd) +func (fi *File) Forget() { + err := fi.fi.Flush() if err != nil { - log.Criticalf("ipns: Dag Add Error: %s", err) - return err + log.Debug("Forget file error: ", err) } +} - ndkey, err := root.Nd.Key() +func (dir *Directory) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { + child, err := dir.dir.Mkdir(req.Name) if err != nil { - return err + return nil, err } - err = n.Ipfs.Namesys.Publish(n.Ipfs.Context(), root.key, ndkey) - if err != nil { - return err - } - return nil + return &Directory{dir: child}, nil } -func (n *Node) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { - return nil -} - -func (n *Node) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { - if IpnsReadonly { - return nil, fuse.EPERM - } - dagnd := &mdag.Node{Data: ft.FolderPBData()} - nnode := n.Nd.Copy() - nnode.AddNodeLink(req.Name, dagnd) - - child := &Node{ - Ipfs: n.Ipfs, - Nd: dagnd, - name: req.Name, - } - - if n.nsRoot == nil { - child.nsRoot = n - } else { - child.nsRoot = n.nsRoot - } - - if n.parent != nil { - err := n.parent.update(n.name, nnode) - if err != nil { - log.Criticalf("Error updating node: %s", err) - return nil, err - } - } - n.Nd = nnode - - n.wasChanged() - - return child, nil -} - -func (n *Node) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { - //log.Debug("[%s] Received open request! flags = %s", n.name, req.Flags.String()) - //TODO: check open flags and truncate if necessary +func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { if req.Flags&fuse.OpenTruncate != 0 { log.Warning("Need to truncate file!") - n.cached = nil - n.Nd = &mdag.Node{Data: ft.FilePBData(nil, 0)} + err := fi.fi.Truncate(0) + if err != nil { + return nil, err + } } else if req.Flags&fuse.OpenAppend != 0 { log.Warning("Need to append to file!") } - return n, nil + return fi, nil } -func (n *Node) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) { - return nil, nil +func (fi *File) Release(ctx context.Context, req *fuse.ReleaseRequest) error { + return fi.fi.Close() } -func (n *Node) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { - if IpnsReadonly { - log.Debug("Attempted to call Create on a readonly filesystem.") - return nil, nil, fuse.EPERM - } - +func (dir *Directory) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { // New 'empty' file - nd := &mdag.Node{Data: ft.FilePBData(nil, 0)} - child := n.makeChild(req.Name, nd) - - nnode := n.Nd.Copy() - - err := nnode.AddNodeLink(req.Name, nd) + nd := &dag.Node{Data: ft.FilePBData(nil, 0)} + err := dir.dir.AddChild(req.Name, nd) if err != nil { return nil, nil, err } - if n.parent != nil { - err := n.parent.update(n.name, nnode) - if err != nil { - log.Criticalf("Error updating node: %s", err) - // Can we panic, please? - return nil, nil, err - } - } - n.Nd = nnode - n.wasChanged() - return child, child, nil + child, err := dir.dir.Child(req.Name) + if err != nil { + return nil, nil, err + } + + fi, ok := child.(*nsfs.File) + if !ok { + return nil, nil, errors.New("child creation failed") + } + + nodechild := &File{fi: fi} + return nodechild, nodechild, nil } -func (n *Node) Remove(ctx context.Context, req *fuse.RemoveRequest) error { - if IpnsReadonly { - return fuse.EPERM - } - - nnode := n.Nd.Copy() - err := nnode.RemoveNodeLink(req.Name) +func (dir *Directory) Remove(ctx context.Context, req *fuse.RemoveRequest) error { + err := dir.dir.Unlink(req.Name) if err != nil { return fuse.ENOENT } - - if n.parent != nil { - err := n.parent.update(n.name, nnode) - if err != nil { - log.Criticalf("Error updating node: %s", err) - return err - } - } - n.Nd = nnode - n.wasChanged() return nil } -func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error { - if IpnsReadonly { - log.Debug("Attempted to call Rename on a readonly filesystem.") - return fuse.EPERM +// Rename implements NodeRenamer +func (dir *Directory) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Node) error { + cur, err := dir.dir.Child(req.OldName) + if err != nil { + return err } - var mdn *mdag.Node - for _, l := range n.Nd.Links { - if l.Name == req.OldName { - mdn = l.Node - } + err = dir.dir.Unlink(req.OldName) + if err != nil { + return err } - if mdn == nil { - log.Critical("nil Link found on rename!") - return fuse.ENOENT - } - n.Nd.RemoveNodeLink(req.OldName) switch newDir := newDir.(type) { - case *Node: - err := newDir.Nd.AddNodeLink(req.NewName, mdn) + case *Directory: + nd, err := cur.GetNode() if err != nil { return err } + + err = newDir.dir.AddChild(req.NewName, nd) + if err != nil { + return err + } + case *File: + log.Critical("Cannot move node into a file!") + return fuse.EPERM default: log.Critical("Unknown node type for rename target dir!") return errors.New("Unknown fs node type!") @@ -589,21 +402,11 @@ func (n *Node) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.No return nil } -// Updates the child of this node, specified by name to the given newnode -func (n *Node) update(name string, newnode *mdag.Node) error { - nnode, err := n.Nd.UpdateNodeLink(name, newnode) - if err != nil { - return err +func min(a, b int) int { + if a < b { + return a } - - if n.parent != nil { - err := n.parent.update(n.name, nnode) - if err != nil { - return err - } - } - n.Nd = nnode - return nil + return b } // to check that out Node implements all the interfaces we want @@ -615,27 +418,26 @@ type ipnsRoot interface { var _ ipnsRoot = (*Root)(nil) -type ipnsNode interface { - fs.HandleFlusher +type ipnsDirectory interface { fs.HandleReadDirAller - fs.HandleReader - fs.HandleWriter fs.Node fs.NodeCreater - fs.NodeFsyncer fs.NodeMkdirer - fs.NodeMknoder - fs.NodeOpener fs.NodeRemover fs.NodeRenamer fs.NodeStringLookuper } -var _ ipnsNode = (*Node)(nil) +var _ ipnsDirectory = (*Directory)(nil) -func min(a, b int) int { - if a < b { - return a - } - return b +type ipnsFile interface { + fs.HandleFlusher + fs.HandleReader + fs.HandleWriter + fs.HandleReleaser + fs.Node + fs.NodeFsyncer + fs.NodeOpener } + +var _ ipnsFile = (*File)(nil) diff --git a/fuse/ipns/repub_unix.go b/fuse/ipns/repub_unix.go deleted file mode 100644 index 9e3d7ac5b..000000000 --- a/fuse/ipns/repub_unix.go +++ /dev/null @@ -1,44 +0,0 @@ -// +build !nofuse - -package ipns - -import "time" - -type Republisher struct { - TimeoutLong time.Duration - TimeoutShort time.Duration - Publish chan struct{} - node *Node -} - -func NewRepublisher(n *Node, tshort, tlong time.Duration) *Republisher { - return &Republisher{ - TimeoutShort: tshort, - TimeoutLong: tlong, - Publish: make(chan struct{}), - node: n, - } -} - -func (np *Republisher) Run() { - for _ = range np.Publish { - quick := time.After(np.TimeoutShort) - longer := time.After(np.TimeoutLong) - - wait: - select { - case <-quick: - case <-longer: - case <-np.Publish: - quick = time.After(np.TimeoutShort) - goto wait - } - - log.Info("Publishing Changes!") - err := np.node.republishRoot() - if err != nil { - log.Critical("republishRoot error: %s", err) - } - - } -} From 13de031b442f6699bafad49d330e05f3004b4ce1 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 18 Mar 2015 21:48:52 -0700 Subject: [PATCH 2/5] code cleanup and better naming of methods --- fuse/ipns/ipns_test.go | 37 +++++++++++---- fuse/ipns/ipns_unix.go | 87 +++++++++++++++++++++++++--------- ipnsfs/file.go | 8 ++-- unixfs/mod/dagmodifier.go | 16 +++---- unixfs/mod/dagmodifier_test.go | 2 +- 5 files changed, 105 insertions(+), 45 deletions(-) diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index 5d523f0be..48a4542fe 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -74,13 +74,23 @@ func verifyFile(t *testing.T, path string, data []byte) { } defer fi.Close() - out, err := ioutil.ReadAll(fi) - if err != nil { - t.Fatal(err) - } + buf := make([]byte, 1024) + offset := 0 + for { + n, err := fi.Read(buf) + if err != nil { + t.Fatal(err) + } - if !bytes.Equal(out, data) { - t.Fatal("Data not equal") + if !bytes.Equal(buf[:n], data[offset:offset+n]) { + t.Fatal("Data not equal") + } + + if n < len(buf) { + break + } + + offset += n } } @@ -130,16 +140,23 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M } func TestIpnsLocalLink(t *testing.T) { - _, mnt := setupIpnsTest(t, nil) + nd, mnt := setupIpnsTest(t, nil) defer mnt.Close() name := mnt.Dir + "/local" - finfo, err := os.Stat(name) + _, err := os.Stat(name) if err != nil { t.Fatal(err) } - t.Log(finfo.Name()) + linksto, err := os.Readlink(name) + if err != nil { + t.Fatal(err) + } + + if linksto != nd.Identity.Pretty() { + t.Fatal("Link invalid") + } } // Test writing a file and reading it back @@ -189,7 +206,7 @@ func TestFilePersistence(t *testing.T) { } } -func TestDeeperDirs(t *testing.T) { +func TestMultipleDirs(t *testing.T) { node, mnt := setupIpnsTest(t, nil) t.Log("make a top level dir") diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index 6b839cf78..cf9add9ad 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -239,7 +239,9 @@ func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) { case *nsfs.File: return &File{fi: child}, nil default: - panic("system has proven to be insane") + // NB: if this happens, we do not want to continue, unpredictable behaviour + // may occur. + panic("invalid type found under directory. programmer error.") } } @@ -272,42 +274,83 @@ func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { } func (fi *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - _, err := fi.fi.Seek(req.Offset, os.SEEK_SET) - if err != nil { + errs := make(chan error, 1) + go func() { + _, err := fi.fi.Seek(req.Offset, os.SEEK_SET) + if err != nil { + errs <- err + return + } + + fisize, err := fi.fi.Size() + if err != nil { + errs <- err + return + } + + readsize := min(req.Size, int(fisize-req.Offset)) + n, err := io.ReadFull(fi.fi, resp.Data[:readsize]) + resp.Data = resp.Data[:n] + errs <- err + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errs: return err } - - fisize, err := fi.fi.Size() - if err != nil { - return err - } - - readsize := min(req.Size, int(fisize-req.Offset)) - n, err := io.ReadFull(fi.fi, resp.Data[:readsize]) - resp.Data = resp.Data[:n] - return err // may be non-nil / not succeeded } func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { - wrote, err := fi.fi.WriteAt(req.Data, req.Offset) - if err != nil { - return err - } - resp.Size = wrote + errs := make(chan error, 1) + go func() { + wrote, err := fi.fi.WriteAt(req.Data, req.Offset) + if err != nil { + errs <- err + } + resp.Size = wrote + errs <- nil + }() - return nil + select { + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + } } func (fi *File) Flush(ctx context.Context, req *fuse.FlushRequest) error { - return fi.fi.Close() + errs := make(chan error, 1) + go func() { + errs <- fi.fi.Close() + }() + select { + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + } } +// Fsync flushes the content in the file to disk, but does not +// update the dag tree internally func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { - return fi.fi.Flush() + errs := make(chan error, 1) + go func() { + errs <- fi.fi.Sync() + }() + select { + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + } } func (fi *File) Forget() { - err := fi.fi.Flush() + err := fi.fi.Sync() if err != nil { log.Debug("Forget file error: ", err) } diff --git a/ipnsfs/file.go b/ipnsfs/file.go index d820d5b45..7d7edebd3 100644 --- a/ipnsfs/file.go +++ b/ipnsfs/file.go @@ -57,7 +57,7 @@ func (fi *File) Close() error { fi.Lock() defer fi.Unlock() if fi.hasChanges { - err := fi.mod.Flush() + err := fi.mod.Sync() if err != nil { return err } @@ -80,11 +80,11 @@ func (fi *File) Close() error { return nil } -// Flush flushes the changes in the file to disk -func (fi *File) Flush() error { +// Sync flushes the changes in the file to disk +func (fi *File) Sync() error { fi.Lock() defer fi.Unlock() - return fi.mod.Flush() + return fi.mod.Sync() } // Seek implements io.Seeker diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index e08c3bf86..cd207ed91 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -80,7 +80,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) { } } - err = dm.Flush() + err = dm.Sync() if err != nil { return 0, err } @@ -133,7 +133,7 @@ func (dm *DagModifier) Write(b []byte) (int, error) { } dm.curWrOff += uint64(n) if dm.wrBuf.Len() > writebufferSize { - err := dm.Flush() + err := dm.Sync() if err != nil { return n, err } @@ -156,8 +156,8 @@ func (dm *DagModifier) Size() (int64, error) { return int64(pbn.GetFilesize()), nil } -// Flush writes changes to this dag to disk -func (dm *DagModifier) Flush() error { +// Sync writes changes to this dag to disk +func (dm *DagModifier) Sync() error { // No buffer? Nothing to do if dm.wrBuf == nil { return nil @@ -315,7 +315,7 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No // Read data from this dag starting at the current offset func (dm *DagModifier) Read(b []byte) (int, error) { - err := dm.Flush() + err := dm.Sync() if err != nil { return 0, err } @@ -347,7 +347,7 @@ func (dm *DagModifier) Read(b []byte) (int, error) { // GetNode gets the modified DAG Node func (dm *DagModifier) GetNode() (*mdag.Node, error) { - err := dm.Flush() + err := dm.Sync() if err != nil { return nil, err } @@ -360,7 +360,7 @@ func (dm *DagModifier) HasChanges() bool { } func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) { - err := dm.Flush() + err := dm.Sync() if err != nil { return 0, err } @@ -389,7 +389,7 @@ func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) { } func (dm *DagModifier) Truncate(size int64) error { - err := dm.Flush() + err := dm.Sync() if err != nil { return err } diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index d5ae29d7d..9f8050972 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -246,7 +246,7 @@ func TestMultiWriteAndFlush(t *testing.T) { if n != 1 { t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)") } - err = dagmod.Flush() + err = dagmod.Sync() if err != nil { t.Fatal(err) } From d8bc95f43ea0452566070e848cba4ffaccea50c3 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 19 Mar 2015 22:50:28 -0700 Subject: [PATCH 3/5] invalidate merkledag cache when modifying children --- merkledag/node.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/merkledag/node.go b/merkledag/node.go index 2848cdd3a..848b228dc 100644 --- a/merkledag/node.go +++ b/merkledag/node.go @@ -87,6 +87,7 @@ func (l *Link) GetNode(serv DAGService) (*Node, error) { // AddNodeLink adds a link to another node. func (n *Node) AddNodeLink(name string, that *Node) error { + n.encoded = nil lnk, err := MakeLink(that) if err != nil { return err @@ -101,6 +102,7 @@ func (n *Node) AddNodeLink(name string, that *Node) error { // AddNodeLink adds a link to another node. without keeping a reference to // the child node func (n *Node) AddNodeLinkClean(name string, that *Node) error { + n.encoded = nil lnk, err := MakeLink(that) if err != nil { return err @@ -113,6 +115,7 @@ func (n *Node) AddNodeLinkClean(name string, that *Node) error { // Remove a link on this node by the given name func (n *Node) RemoveNodeLink(name string) error { + n.encoded = nil for i, l := range n.Links { if l.Name == name { n.Links = append(n.Links[:i], n.Links[i+1:]...) From 13c489eca1ac8565350b9566a1b194fb4ebda17e Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 23 Mar 2015 14:01:42 -0700 Subject: [PATCH 4/5] fix context respect through fuse reading --- core/commands/refs.go | 2 +- fuse/ipns/ipns_unix.go | 35 ++++++++++++++--------------------- ipnsfs/file.go | 7 +++++++ merkledag/merkledag.go | 6 ++++-- pin/pin.go | 2 +- unixfs/io/dagreader.go | 16 +++++++++++----- unixfs/mod/dagmodifier.go | 31 ++++++++++++++++++++++++++----- unixfs/tar/reader.go | 2 +- 8 files changed, 65 insertions(+), 36 deletions(-) diff --git a/core/commands/refs.go b/core/commands/refs.go index 6a5b8b6ca..dfede0e7a 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -254,7 +254,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) { return count, err } - nd, err := ng.Get() + nd, err := ng.Get(rw.Ctx) if err != nil { return count, err } diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index cf9add9ad..0ce96ec87 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -6,7 +6,6 @@ package ipns import ( "errors" - "io" "os" fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse" @@ -274,32 +273,26 @@ func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { } func (fi *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - errs := make(chan error, 1) - go func() { - _, err := fi.fi.Seek(req.Offset, os.SEEK_SET) - if err != nil { - errs <- err - return - } + _, err := fi.fi.Seek(req.Offset, os.SEEK_SET) + if err != nil { + return err + } - fisize, err := fi.fi.Size() - if err != nil { - errs <- err - return - } - - readsize := min(req.Size, int(fisize-req.Offset)) - n, err := io.ReadFull(fi.fi, resp.Data[:readsize]) - resp.Data = resp.Data[:n] - errs <- err - }() + fisize, err := fi.fi.Size() + if err != nil { + return err + } select { case <-ctx.Done(): return ctx.Err() - case err := <-errs: - return err + default: } + + readsize := min(req.Size, int(fisize-req.Offset)) + n, err := fi.fi.CtxReadFull(ctx, resp.Data[:readsize]) + resp.Data = resp.Data[:n] + return err } func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { diff --git a/ipnsfs/file.go b/ipnsfs/file.go index 7d7edebd3..73560d351 100644 --- a/ipnsfs/file.go +++ b/ipnsfs/file.go @@ -51,6 +51,13 @@ func (fi *File) Read(b []byte) (int, error) { return fi.mod.Read(b) } +// Read reads into the given buffer from the current offset +func (fi *File) CtxReadFull(ctx context.Context, b []byte) (int, error) { + fi.Lock() + defer fi.Unlock() + return fi.mod.CtxReadFull(ctx, b) +} + // Close flushes, then propogates the modified dag node up the directory structure // and signals a republish to occur func (fi *File) Close() error { diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 923a3d715..2084c200e 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -257,10 +257,10 @@ type nodePromise struct { // from its internal channels, subsequent calls will return the // cached node. type NodeGetter interface { - Get() (*Node, error) + Get(context.Context) (*Node, error) } -func (np *nodePromise) Get() (*Node, error) { +func (np *nodePromise) Get(ctx context.Context) (*Node, error) { if np.cache != nil { return np.cache, nil } @@ -270,6 +270,8 @@ func (np *nodePromise) Get() (*Node, error) { np.cache = blk case <-np.ctx.Done(): return nil, np.ctx.Err() + case <-ctx.Done(): + return nil, ctx.Err() } return np.cache, nil } diff --git a/pin/pin.go b/pin/pin.go index 5f726a457..6ec299388 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -177,7 +177,7 @@ func (p *pinner) pinLinks(node *mdag.Node) error { defer cancel() for _, ng := range p.dserv.GetDAG(ctx, node) { - subnode, err := ng.Get() + subnode, err := ng.Get(ctx) if err != nil { // TODO: Maybe just log and continue? return err diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 64dfff127..6bb9eb406 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -100,12 +100,13 @@ func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv md // precalcNextBuf follows the next link in line and loads it from the DAGService, // setting the next buffer to read from -func (dr *DagReader) precalcNextBuf() error { +func (dr *DagReader) precalcNextBuf(ctx context.Context) error { dr.buf.Close() // Just to make sure if dr.linkPosition >= len(dr.promises) { return io.EOF } - nxt, err := dr.promises[dr.linkPosition].Get() + + nxt, err := dr.promises[dr.linkPosition].Get(ctx) if err != nil { return err } @@ -141,6 +142,11 @@ func (dr *DagReader) Size() int64 { // Read reads data from the DAG structured file func (dr *DagReader) Read(b []byte) (int, error) { + return dr.CtxReadFull(dr.ctx, b) +} + +// CtxReadFull reads data from the DAG structured file +func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { // If no cached buffer, load one total := 0 for { @@ -161,7 +167,7 @@ func (dr *DagReader) Read(b []byte) (int, error) { } // Otherwise, load up the next block - err = dr.precalcNextBuf() + err = dr.precalcNextBuf(ctx) if err != nil { return total, err } @@ -183,7 +189,7 @@ func (dr *DagReader) WriteTo(w io.Writer) (int64, error) { } // Otherwise, load up the next block - err = dr.precalcNextBuf() + err = dr.precalcNextBuf(dr.ctx) if err != nil { if err == io.EOF { return total, nil @@ -239,7 +245,7 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { } // start sub-block request - err := dr.precalcNextBuf() + err := dr.precalcNextBuf(dr.ctx) if err != nil { return 0, err } diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index cd207ed91..133af2227 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -315,32 +315,53 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No // Read data from this dag starting at the current offset func (dm *DagModifier) Read(b []byte) (int, error) { - err := dm.Sync() + err := dm.readPrep() if err != nil { return 0, err } + n, err := dm.read.Read(b) + dm.curWrOff += uint64(n) + return n, err +} + +func (dm *DagModifier) readPrep() error { + err := dm.Sync() + if err != nil { + return err + } + if dm.read == nil { ctx, cancel := context.WithCancel(dm.ctx) dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv) if err != nil { - return 0, err + return err } i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET) if err != nil { - return 0, err + return err } if i != int64(dm.curWrOff) { - return 0, ErrSeekFail + return ErrSeekFail } dm.readCancel = cancel dm.read = dr } - n, err := dm.read.Read(b) + return nil +} + +// Read data from this dag starting at the current offset +func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) { + err := dm.readPrep() + if err != nil { + return 0, err + } + + n, err := dm.read.CtxReadFull(ctx, b) dm.curWrOff += uint64(n) return n, err } diff --git a/unixfs/tar/reader.go b/unixfs/tar/reader.go index aa15c823a..26aa772ce 100644 --- a/unixfs/tar/reader.go +++ b/unixfs/tar/reader.go @@ -90,7 +90,7 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) { defer cancel() for i, ng := range r.dag.GetDAG(ctx, dagnode) { - childNode, err := ng.Get() + childNode, err := ng.Get(ctx) if err != nil { r.emitError(err) return From 5440bb0e6d7a2dcc13662eb53cbe15f65875bc5f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 24 Mar 2015 11:41:41 -0700 Subject: [PATCH 5/5] fix WriteAt race condition --- fuse/ipns/ipns_unix.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index 0ce96ec87..d96f2b187 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -296,22 +296,13 @@ func (fi *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Read } func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { - errs := make(chan error, 1) - go func() { - wrote, err := fi.fi.WriteAt(req.Data, req.Offset) - if err != nil { - errs <- err - } - resp.Size = wrote - errs <- nil - }() - - select { - case err := <-errs: + // TODO: at some point, ensure that WriteAt here respects the context + wrote, err := fi.fi.WriteAt(req.Data, req.Offset) + if err != nil { return err - case <-ctx.Done(): - return ctx.Err() } + resp.Size = wrote + return nil } func (fi *File) Flush(ctx context.Context, req *fuse.FlushRequest) error {