From 63e15abd8f22cfed3266bb08bfdf4bde5453c667 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 7 Mar 2015 00:58:04 -0800 Subject: [PATCH] refactor dagmodifier to work with trickledag format --- fuse/ipns/ipns_unix.go | 13 +- importer/trickle/trickledag.go | 14 + unixfs/io/dagmodifier.go | 202 -------------- unixfs/io/dagmodifier_test.go | 247 ------------------ unixfs/mod/dagmodifier.go | 450 ++++++++++++++++++++++++++++++++ unixfs/mod/dagmodifier_test.go | 464 +++++++++++++++++++++++++++++++++ 6 files changed, 937 insertions(+), 453 deletions(-) delete mode 100644 unixfs/io/dagmodifier.go delete mode 100644 unixfs/io/dagmodifier_test.go create mode 100644 unixfs/mod/dagmodifier.go create mode 100644 unixfs/mod/dagmodifier_test.go diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index 645b9ce27..5a4ca4960 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -24,6 +24,7 @@ import ( path "github.com/jbenet/go-ipfs/path" ft "github.com/jbenet/go-ipfs/unixfs" uio "github.com/jbenet/go-ipfs/unixfs/io" + mod "github.com/jbenet/go-ipfs/unixfs/mod" ftpb "github.com/jbenet/go-ipfs/unixfs/pb" u "github.com/jbenet/go-ipfs/util" lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" @@ -211,7 +212,7 @@ type Node struct { Ipfs *core.IpfsNode Nd *mdag.Node - dagMod *uio.DagModifier + dagMod *mod.DagModifier cached *ftpb.Data } @@ -238,7 +239,11 @@ func (s *Node) Attr() fuse.Attr { size = 0 } if size == 0 { - size = s.dagMod.Size() + dmsize, err := s.dagMod.Size() + if err != nil { + log.Error(err) + } + size = uint64(dmsize) } mode := os.FileMode(0666) @@ -344,13 +349,13 @@ func (n *Node) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wri if n.dagMod == nil { // Create a DagModifier to allow us to change the existing dag node - dmod, err := uio.NewDagModifier(n.Nd, n.Ipfs.DAG, chunk.DefaultSplitter) + dmod, err := mod.NewDagModifier(ctx, n.Nd, n.Ipfs.DAG, n.Ipfs.Pinning.GetManual(), chunk.DefaultSplitter) if err != nil { return err } n.dagMod = dmod } - wrote, err := n.dagMod.WriteAt(req.Data, uint64(req.Offset)) + wrote, err := n.dagMod.WriteAt(req.Data, int64(req.Offset)) if err != nil { return err } diff --git a/importer/trickle/trickledag.go b/importer/trickle/trickledag.go index c090a8ab0..76a7110b6 100644 --- a/importer/trickle/trickledag.go +++ b/importer/trickle/trickledag.go @@ -244,6 +244,20 @@ func verifyTDagRec(nd *dag.Node, depth, direct, layerRepeat int, ds dag.DAGServi return nil } + // Verify this is a branch node + pbn, err := ft.FromBytes(nd.Data) + if err != nil { + return err + } + + if pbn.GetType() != ft.TFile { + return errors.New("expected file as branch node") + } + + if len(pbn.Data) > 0 { + return errors.New("branch node should not have data") + } + for i := 0; i < len(nd.Links); i++ { child, err := nd.Links[i].GetNode(ds) if err != nil { diff --git a/unixfs/io/dagmodifier.go b/unixfs/io/dagmodifier.go deleted file mode 100644 index e155d8b38..000000000 --- a/unixfs/io/dagmodifier.go +++ /dev/null @@ -1,202 +0,0 @@ -package io - -import ( - "bytes" - "errors" - - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" - - chunk "github.com/jbenet/go-ipfs/importer/chunk" - mdag "github.com/jbenet/go-ipfs/merkledag" - ft "github.com/jbenet/go-ipfs/unixfs" - ftpb "github.com/jbenet/go-ipfs/unixfs/pb" - u "github.com/jbenet/go-ipfs/util" -) - -var log = u.Logger("dagio") - -// DagModifier is the only struct licensed and able to correctly -// perform surgery on a DAG 'file' -// Dear god, please rename this to something more pleasant -type DagModifier struct { - dagserv mdag.DAGService - curNode *mdag.Node - - pbdata *ftpb.Data - splitter chunk.BlockSplitter -} - -func NewDagModifier(from *mdag.Node, serv mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) { - pbd, err := ft.FromBytes(from.Data) - if err != nil { - return nil, err - } - - return &DagModifier{ - curNode: from.Copy(), - dagserv: serv, - pbdata: pbd, - splitter: spl, - }, nil -} - -// WriteAt will modify a dag file in place -// NOTE: it currently assumes only a single level of indirection -func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { - - // Check bounds - if dm.pbdata.GetFilesize() < offset { - return 0, errors.New("Attempted to perform write starting past end of file") - } - - // First need to find where we are writing at - end := uint64(len(b)) + offset - - // This shouldnt be necessary if we do subblocks sizes properly - newsize := dm.pbdata.GetFilesize() - if end > dm.pbdata.GetFilesize() { - newsize = end - } - zeroblocklen := uint64(len(dm.pbdata.Data)) - origlen := len(b) - - if end <= zeroblocklen { - log.Debug("Writing into zero block") - // Replacing zeroeth data block (embedded in the root node) - //TODO: check chunking here - copy(dm.pbdata.Data[offset:], b) - return len(b), nil - } - - // Find where write should start - var traversed uint64 - startsubblk := len(dm.pbdata.Blocksizes) - if offset < zeroblocklen { - dm.pbdata.Data = dm.pbdata.Data[:offset] - startsubblk = 0 - } else { - traversed = uint64(zeroblocklen) - for i, size := range dm.pbdata.Blocksizes { - if uint64(offset) < traversed+size { - log.Debugf("Starting mod at block %d. [%d < %d + %d]", i, offset, traversed, size) - // Here is where we start - startsubblk = i - lnk := dm.curNode.Links[i] - node, err := dm.dagserv.Get(u.Key(lnk.Hash)) - if err != nil { - return 0, err - } - data, err := ft.UnwrapData(node.Data) - if err != nil { - return 0, err - } - - // We have to rewrite the data before our write in this block. - b = append(data[:offset-traversed], b...) - break - } - traversed += size - } - if startsubblk == len(dm.pbdata.Blocksizes) { - // TODO: Im not sure if theres any case that isnt being handled here. - // leaving this note here as a future reference in case something breaks - } - } - - // Find blocks that need to be overwritten - var changed []int - mid := -1 - var midoff uint64 - for i, size := range dm.pbdata.Blocksizes[startsubblk:] { - if end > traversed { - changed = append(changed, i+startsubblk) - } else { - break - } - traversed += size - if end < traversed { - mid = i + startsubblk - midoff = end - (traversed - size) - break - } - } - - // If our write starts in the middle of a block... - var midlnk *mdag.Link - if mid >= 0 { - midlnk = dm.curNode.Links[mid] - midnode, err := dm.dagserv.Get(u.Key(midlnk.Hash)) - if err != nil { - return 0, err - } - - // NOTE: this may have to be changed later when we have multiple - // layers of indirection - data, err := ft.UnwrapData(midnode.Data) - if err != nil { - return 0, err - } - b = append(b, data[midoff:]...) - } - - // Generate new sub-blocks, and sizes - subblocks := splitBytes(b, dm.splitter) - var links []*mdag.Link - var sizes []uint64 - for _, sb := range subblocks { - n := &mdag.Node{Data: ft.WrapData(sb)} - _, err := dm.dagserv.Add(n) - if err != nil { - log.Warningf("Failed adding node to DAG service: %s", err) - return 0, err - } - lnk, err := mdag.MakeLink(n) - if err != nil { - return 0, err - } - links = append(links, lnk) - sizes = append(sizes, uint64(len(sb))) - } - - // This is disgusting (and can be rewritten if performance demands) - if len(changed) > 0 { - sechalflink := append(links, dm.curNode.Links[changed[len(changed)-1]+1:]...) - dm.curNode.Links = append(dm.curNode.Links[:changed[0]], sechalflink...) - sechalfblks := append(sizes, dm.pbdata.Blocksizes[changed[len(changed)-1]+1:]...) - dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes[:changed[0]], sechalfblks...) - } else { - dm.curNode.Links = append(dm.curNode.Links, links...) - dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes, sizes...) - } - dm.pbdata.Filesize = proto.Uint64(newsize) - - return origlen, nil -} - -func (dm *DagModifier) Size() uint64 { - if dm == nil { - return 0 - } - return dm.pbdata.GetFilesize() -} - -// splitBytes uses a splitterFunc to turn a large array of bytes -// into many smaller arrays of bytes -func splitBytes(b []byte, spl chunk.BlockSplitter) [][]byte { - out := spl.Split(bytes.NewReader(b)) - var arr [][]byte - for blk := range out { - arr = append(arr, blk) - } - return arr -} - -// GetNode gets the modified DAG Node -func (dm *DagModifier) GetNode() (*mdag.Node, error) { - b, err := proto.Marshal(dm.pbdata) - if err != nil { - return nil, err - } - dm.curNode.Data = b - return dm.curNode.Copy(), nil -} diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go deleted file mode 100644 index ca9c42004..000000000 --- a/unixfs/io/dagmodifier_test.go +++ /dev/null @@ -1,247 +0,0 @@ -package io - -import ( - "fmt" - "io" - "io/ioutil" - "testing" - - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - "github.com/jbenet/go-ipfs/blocks/blockstore" - bs "github.com/jbenet/go-ipfs/blockservice" - "github.com/jbenet/go-ipfs/exchange/offline" - imp "github.com/jbenet/go-ipfs/importer" - "github.com/jbenet/go-ipfs/importer/chunk" - mdag "github.com/jbenet/go-ipfs/merkledag" - ft "github.com/jbenet/go-ipfs/unixfs" - u "github.com/jbenet/go-ipfs/util" - - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" -) - -func getMockDagServ(t *testing.T) mdag.DAGService { - dstore := ds.NewMapDatastore() - tsds := sync.MutexWrap(dstore) - bstore := blockstore.NewBlockstore(tsds) - bserv, err := bs.New(bstore, offline.Exchange(bstore)) - if err != nil { - t.Fatal(err) - } - return mdag.NewDAGService(bserv) -} - -func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) { - in := io.LimitReader(u.NewTimeSeededRand(), size) - node, err := imp.BuildDagFromReader(in, dserv, nil, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } - - dr, err := NewDagReader(context.Background(), node, dserv) - if err != nil { - t.Fatal(err) - } - - b, err := ioutil.ReadAll(dr) - if err != nil { - t.Fatal(err) - } - - return b, node -} - -func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte { - newdata := make([]byte, size) - r := u.NewTimeSeededRand() - r.Read(newdata) - - if size+beg > uint64(len(orig)) { - orig = append(orig, make([]byte, (size+beg)-uint64(len(orig)))...) - } - copy(orig[beg:], newdata) - - nmod, err := dm.WriteAt(newdata, uint64(beg)) - if err != nil { - t.Fatal(err) - } - - if nmod != int(size) { - t.Fatalf("Mod length not correct! %d != %d", nmod, size) - } - - nd, err := dm.GetNode() - if err != nil { - t.Fatal(err) - } - - rd, err := NewDagReader(context.Background(), nd, dm.dagserv) - if err != nil { - t.Fatal(err) - } - - after, err := ioutil.ReadAll(rd) - if err != nil { - t.Fatal(err) - } - - err = arrComp(after, orig) - if err != nil { - t.Fatal(err) - } - return orig -} - -func TestDagModifierBasic(t *testing.T) { - t.Skip("DAGModifier needs to be fixed to work with indirect blocks.") - if err := u.SetLogLevel("blockservice", "critical"); err != nil { - t.Fatalf("testlog prepare failed: %s", err) - } - if err := u.SetLogLevel("merkledag", "critical"); err != nil { - t.Fatalf("testlog prepare failed: %s", err) - } - dserv := getMockDagServ(t) - b, n := getNode(t, dserv, 50000) - - dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512}) - if err != nil { - t.Fatal(err) - } - - // Within zero block - beg := uint64(15) - length := uint64(60) - - t.Log("Testing mod within zero block") - b = testModWrite(t, beg, length, b, dagmod) - - // Within bounds of existing file - beg = 1000 - length = 4000 - t.Log("Testing mod within bounds of existing file.") - b = testModWrite(t, beg, length, b, dagmod) - - // Extend bounds - beg = 49500 - length = 4000 - - t.Log("Testing mod that extends file.") - b = testModWrite(t, beg, length, b, dagmod) - - // "Append" - beg = uint64(len(b)) - length = 3000 - b = testModWrite(t, beg, length, b, dagmod) - - // Verify reported length - node, err := dagmod.GetNode() - if err != nil { - t.Fatal(err) - } - - size, err := ft.DataSize(node.Data) - if err != nil { - t.Fatal(err) - } - - expected := uint64(50000 + 3500 + 3000) - if size != expected { - t.Fatalf("Final reported size is incorrect [%d != %d]", size, expected) - } -} - -func TestMultiWrite(t *testing.T) { - t.Skip("DAGModifier needs to be fixed to work with indirect blocks.") - dserv := getMockDagServ(t) - _, n := getNode(t, dserv, 0) - - dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512}) - if err != nil { - t.Fatal(err) - } - - data := make([]byte, 4000) - u.NewTimeSeededRand().Read(data) - - for i := 0; i < len(data); i++ { - n, err := dagmod.WriteAt(data[i:i+1], uint64(i)) - if err != nil { - t.Fatal(err) - } - if n != 1 { - t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)") - } - } - nd, err := dagmod.GetNode() - if err != nil { - t.Fatal(err) - } - - read, err := NewDagReader(context.Background(), nd, dserv) - if err != nil { - t.Fatal(err) - } - rbuf, err := ioutil.ReadAll(read) - if err != nil { - t.Fatal(err) - } - - err = arrComp(rbuf, data) - if err != nil { - t.Fatal(err) - } -} - -func TestMultiWriteCoal(t *testing.T) { - t.Skip("Skipping test until DagModifier is fixed") - dserv := getMockDagServ(t) - _, n := getNode(t, dserv, 0) - - dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512}) - if err != nil { - t.Fatal(err) - } - - data := make([]byte, 4000) - u.NewTimeSeededRand().Read(data) - - for i := 0; i < len(data); i++ { - n, err := dagmod.WriteAt(data[:i+1], 0) - if err != nil { - t.Fatal(err) - } - if n != i+1 { - t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)") - } - } - nd, err := dagmod.GetNode() - if err != nil { - t.Fatal(err) - } - - read, err := NewDagReader(context.Background(), nd, dserv) - if err != nil { - t.Fatal(err) - } - rbuf, err := ioutil.ReadAll(read) - if err != nil { - t.Fatal(err) - } - - err = arrComp(rbuf, data) - if err != nil { - t.Fatal(err) - } -} - -func arrComp(a, b []byte) error { - if len(a) != len(b) { - return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b)) - } - for i, v := range a { - if v != b[i] { - return fmt.Errorf("Arrays differ at index: %d", i) - } - } - return nil -} diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go new file mode 100644 index 000000000..4e5b31088 --- /dev/null +++ b/unixfs/mod/dagmodifier.go @@ -0,0 +1,450 @@ +package mod + +import ( + "bytes" + "errors" + "io" + "os" + + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + + chunk "github.com/jbenet/go-ipfs/importer/chunk" + help "github.com/jbenet/go-ipfs/importer/helpers" + trickle "github.com/jbenet/go-ipfs/importer/trickle" + mdag "github.com/jbenet/go-ipfs/merkledag" + pin "github.com/jbenet/go-ipfs/pin" + ft "github.com/jbenet/go-ipfs/unixfs" + uio "github.com/jbenet/go-ipfs/unixfs/io" + ftpb "github.com/jbenet/go-ipfs/unixfs/pb" + u "github.com/jbenet/go-ipfs/util" +) + +// 2MB +var writebufferSize = 1 << 21 + +var log = u.Logger("dagio") + +// DagModifier is the only struct licensed and able to correctly +// perform surgery on a DAG 'file' +// Dear god, please rename this to something more pleasant +type DagModifier struct { + dagserv mdag.DAGService + curNode *mdag.Node + mp pin.ManualPinner + + splitter chunk.BlockSplitter + ctx context.Context + readCancel func() + + writeStart uint64 + curWrOff uint64 + wrBuf *bytes.Buffer + + read *uio.DagReader +} + +func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*DagModifier, error) { + return &DagModifier{ + curNode: from.Copy(), + dagserv: serv, + splitter: spl, + ctx: ctx, + mp: mp, + }, nil +} + +// WriteAt will modify a dag file in place +// NOTE: it currently assumes only a single level of indirection +func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) { + // TODO: this is currently VERY inneficient + if uint64(offset) != dm.curWrOff { + size, err := dm.Size() + if err != nil { + return 0, err + } + if offset > size { + err := dm.expandSparse(offset - size) + if err != nil { + return 0, err + } + } + + err = dm.Flush() + if err != nil { + return 0, err + } + dm.writeStart = uint64(offset) + } + + return dm.Write(b) +} + +// A reader that just returns zeros +type zeroReader struct{} + +func (zr zeroReader) Read(b []byte) (int, error) { + for i, _ := range b { + b[i] = 0 + } + return len(b), nil +} + +func (dm *DagModifier) expandSparse(size int64) error { + spl := chunk.SizeSplitter{4096} + r := io.LimitReader(zeroReader{}, size) + blks := spl.Split(r) + nnode, err := dm.appendData(dm.curNode, blks) + if err != nil { + return err + } + _, err = dm.dagserv.Add(nnode) + if err != nil { + return err + } + dm.curNode = nnode + return nil +} + +func (dm *DagModifier) Write(b []byte) (int, error) { + if dm.read != nil { + dm.read = nil + } + if dm.wrBuf == nil { + dm.wrBuf = new(bytes.Buffer) + } + n, err := dm.wrBuf.Write(b) + if err != nil { + return n, err + } + dm.curWrOff += uint64(n) + if dm.wrBuf.Len() > writebufferSize { + err := dm.Flush() + if err != nil { + return n, err + } + } + return n, nil +} + +func (dm *DagModifier) Size() (int64, error) { + // TODO: compute size without flushing, should be easy + err := dm.Flush() + if err != nil { + return 0, err + } + + pbn, err := ft.FromBytes(dm.curNode.Data) + if err != nil { + return 0, err + } + + return int64(pbn.GetFilesize()), nil +} + +func (dm *DagModifier) Flush() error { + if dm.wrBuf == nil { + return nil + } + + // If we have an active reader, kill it + if dm.read != nil { + dm.read = nil + dm.readCancel() + } + + buflen := dm.wrBuf.Len() + + k, _, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf) + if err != nil { + return err + } + + nd, err := dm.dagserv.Get(k) + if err != nil { + return err + } + + dm.curNode = nd + + if !done { + blks := dm.splitter.Split(dm.wrBuf) + nd, err = dm.appendData(dm.curNode, blks) + if err != nil { + return err + } + + _, err := dm.dagserv.Add(nd) + if err != nil { + return err + } + + dm.curNode = nd + } + + dm.writeStart += uint64(buflen) + + dm.wrBuf = nil + return nil +} + +func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (u.Key, int, bool, error) { + f, err := ft.FromBytes(node.Data) + if err != nil { + return "", 0, false, err + } + + if len(node.Links) == 0 && (f.GetType() == ftpb.Data_Raw || f.GetType() == ftpb.Data_File) { + n, err := data.Read(f.Data[offset:]) + if err != nil && err != io.EOF { + return "", 0, false, err + } + + // Update newly written node.. + b, err := proto.Marshal(f) + if err != nil { + return "", 0, false, err + } + + nd := &mdag.Node{Data: b} + k, err := dm.dagserv.Add(nd) + if err != nil { + return "", 0, false, err + } + + // Hey look! we're done! + var done bool + if n < len(f.Data) { + done = true + } + + return k, n, done, nil + } + + var cur uint64 + var done bool + var totread int + for i, bs := range f.GetBlocksizes() { + if cur+bs > offset { + child, err := node.Links[i].GetNode(dm.dagserv) + if err != nil { + return "", 0, false, err + } + k, nread, sdone, err := dm.modifyDag(child, offset-cur, data) + if err != nil { + return "", 0, false, err + } + totread += nread + + offset += bs + node.Links[i].Hash = mh.Multihash(k) + + if sdone { + done = true + break + } + } + cur += bs + } + + k, err := dm.dagserv.Add(node) + return k, totread, done, err +} + +func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.Node, error) { + dbp := &help.DagBuilderParams{ + Dagserv: dm.dagserv, + Maxlinks: help.DefaultLinksPerBlock, + Pinner: dm.mp, + } + + return trickle.TrickleAppend(node, dbp.New(blks)) +} + +func (dm *DagModifier) Read(b []byte) (int, error) { + err := dm.Flush() + if err != nil { + return 0, err + } + + if dm.read == nil { + dr, err := uio.NewDagReader(dm.ctx, dm.curNode, dm.dagserv) + if err != nil { + return 0, err + } + + i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET) + if err != nil { + return 0, err + } + + if i != int64(dm.curWrOff) { + return 0, errors.New("failed to seek properly") + } + + dm.read = dr + } + + n, err := dm.read.Read(b) + dm.curWrOff += uint64(n) + return n, err +} + +// splitBytes uses a splitterFunc to turn a large array of bytes +// into many smaller arrays of bytes +func (dm *DagModifier) splitBytes(in io.Reader) ([]u.Key, error) { + var out []u.Key + blks := dm.splitter.Split(in) + for blk := range blks { + nd := help.NewUnixfsNode() + nd.SetData(blk) + dagnd, err := nd.GetDagNode() + if err != nil { + return nil, err + } + + k, err := dm.dagserv.Add(dagnd) + if err != nil { + return nil, err + } + out = append(out, k) + } + return out, nil +} + +// GetNode gets the modified DAG Node +func (dm *DagModifier) GetNode() (*mdag.Node, error) { + err := dm.Flush() + if err != nil { + return nil, err + } + return dm.curNode.Copy(), nil +} + +func (dm *DagModifier) HasChanges() bool { + return dm.wrBuf != nil +} + +func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) { + err := dm.Flush() + if err != nil { + return 0, err + } + + switch whence { + case os.SEEK_CUR: + dm.curWrOff += uint64(offset) + dm.writeStart = dm.curWrOff + case os.SEEK_SET: + dm.curWrOff = uint64(offset) + dm.writeStart = uint64(offset) + case os.SEEK_END: + return 0, errors.New("SEEK_END currently not implemented") + default: + return 0, errors.New("unrecognized whence") + } + + if dm.read != nil { + _, err = dm.read.Seek(offset, whence) + if err != nil { + return 0, err + } + } + + return int64(dm.curWrOff), nil +} + +func (dm *DagModifier) Truncate(size int64) error { + err := dm.Flush() + if err != nil { + return err + } + + realSize, err := dm.Size() + if err != nil { + return err + } + + if size > int64(realSize) { + return errors.New("Cannot extend file through truncate") + } + + nnode, err := dagTruncate(dm.curNode, uint64(size), dm.dagserv) + if err != nil { + return err + } + + _, err = dm.dagserv.Add(nnode) + if err != nil { + return err + } + + dm.curNode = nnode + return nil +} + +func dagTruncate(nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) { + if len(nd.Links) == 0 { + // TODO: this can likely be done without marshaling and remarshaling + pbn, err := ft.FromBytes(nd.Data) + if err != nil { + return nil, err + } + + nd.Data = ft.WrapData(pbn.Data[:size]) + return nd, nil + } + + var cur uint64 + end := 0 + var modified *mdag.Node + ndata := new(ft.FSNode) + for i, lnk := range nd.Links { + child, err := lnk.GetNode(ds) + if err != nil { + return nil, err + } + + childsize, err := ft.DataSize(child.Data) + if err != nil { + return nil, err + } + + if size < cur+childsize { + nchild, err := dagTruncate(child, size-cur, ds) + if err != nil { + return nil, err + } + + // TODO: sanity check size of truncated block + ndata.AddBlockSize(size - cur) + + modified = nchild + end = i + break + } + cur += childsize + ndata.AddBlockSize(childsize) + } + + _, err := ds.Add(modified) + if err != nil { + return nil, err + } + + nd.Links = nd.Links[:end] + err = nd.AddNodeLinkClean("", modified) + if err != nil { + return nil, err + } + + d, err := ndata.GetBytes() + if err != nil { + return nil, err + } + + nd.Data = d + + return nd, nil +} diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go new file mode 100644 index 000000000..ac4e6f507 --- /dev/null +++ b/unixfs/mod/dagmodifier_test.go @@ -0,0 +1,464 @@ +package mod + +import ( + "fmt" + "io" + "io/ioutil" + "testing" + + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + "github.com/jbenet/go-ipfs/blocks/blockstore" + bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/exchange/offline" + imp "github.com/jbenet/go-ipfs/importer" + "github.com/jbenet/go-ipfs/importer/chunk" + h "github.com/jbenet/go-ipfs/importer/helpers" + trickle "github.com/jbenet/go-ipfs/importer/trickle" + mdag "github.com/jbenet/go-ipfs/merkledag" + ft "github.com/jbenet/go-ipfs/unixfs" + uio "github.com/jbenet/go-ipfs/unixfs/io" + u "github.com/jbenet/go-ipfs/util" + + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +func getMockDagServ(t *testing.T) mdag.DAGService { + dstore := ds.NewMapDatastore() + tsds := sync.MutexWrap(dstore) + bstore := blockstore.NewBlockstore(tsds) + bserv, err := bs.New(bstore, offline.Exchange(bstore)) + if err != nil { + t.Fatal(err) + } + return mdag.NewDAGService(bserv) +} + +func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) { + in := io.LimitReader(u.NewTimeSeededRand(), size) + node, err := imp.BuildTrickleDagFromReader(in, dserv, nil, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + dr, err := uio.NewDagReader(context.Background(), node, dserv) + if err != nil { + t.Fatal(err) + } + + b, err := ioutil.ReadAll(dr) + if err != nil { + t.Fatal(err) + } + + return b, node +} + +func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte { + newdata := make([]byte, size) + r := u.NewTimeSeededRand() + r.Read(newdata) + + if size+beg > uint64(len(orig)) { + orig = append(orig, make([]byte, (size+beg)-uint64(len(orig)))...) + } + copy(orig[beg:], newdata) + + nmod, err := dm.WriteAt(newdata, int64(beg)) + if err != nil { + t.Fatal(err) + } + + if nmod != int(size) { + t.Fatalf("Mod length not correct! %d != %d", nmod, size) + } + + nd, err := dm.GetNode() + if err != nil { + t.Fatal(err) + } + + err = trickle.VerifyTrickleDagStructure(nd, dm.dagserv, h.DefaultLinksPerBlock, 4) + if err != nil { + t.Fatal(err) + } + + rd, err := uio.NewDagReader(context.Background(), nd, dm.dagserv) + if err != nil { + t.Fatal(err) + } + + after, err := ioutil.ReadAll(rd) + if err != nil { + t.Fatal(err) + } + + err = arrComp(after, orig) + if err != nil { + t.Fatal(err) + } + return orig +} + +func TestDagModifierBasic(t *testing.T) { + dserv := getMockDagServ(t) + b, n := getNode(t, dserv, 50000) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + // Within zero block + beg := uint64(15) + length := uint64(60) + + t.Log("Testing mod within zero block") + b = testModWrite(t, beg, length, b, dagmod) + + // Within bounds of existing file + beg = 1000 + length = 4000 + t.Log("Testing mod within bounds of existing multiblock file.") + b = testModWrite(t, beg, length, b, dagmod) + + // Extend bounds + beg = 49500 + length = 4000 + + t.Log("Testing mod that extends file.") + b = testModWrite(t, beg, length, b, dagmod) + + // "Append" + beg = uint64(len(b)) + length = 3000 + t.Log("Testing pure append") + b = testModWrite(t, beg, length, b, dagmod) + + // Verify reported length + node, err := dagmod.GetNode() + if err != nil { + t.Fatal(err) + } + + size, err := ft.DataSize(node.Data) + if err != nil { + t.Fatal(err) + } + + expected := uint64(50000 + 3500 + 3000) + if size != expected { + t.Fatalf("Final reported size is incorrect [%d != %d]", size, expected) + } +} + +func TestMultiWrite(t *testing.T) { + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + data := make([]byte, 4000) + u.NewTimeSeededRand().Read(data) + + for i := 0; i < len(data); i++ { + n, err := dagmod.WriteAt(data[i:i+1], int64(i)) + if err != nil { + t.Fatal(err) + } + if n != 1 { + t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)") + } + } + nd, err := dagmod.GetNode() + if err != nil { + t.Fatal(err) + } + + read, err := uio.NewDagReader(context.Background(), nd, dserv) + if err != nil { + t.Fatal(err) + } + rbuf, err := ioutil.ReadAll(read) + if err != nil { + t.Fatal(err) + } + + err = arrComp(rbuf, data) + if err != nil { + t.Fatal(err) + } +} + +func TestMultiWriteAndFlush(t *testing.T) { + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + data := make([]byte, 20) + u.NewTimeSeededRand().Read(data) + + for i := 0; i < len(data); i++ { + n, err := dagmod.WriteAt(data[i:i+1], int64(i)) + if err != nil { + t.Fatal(err) + } + if n != 1 { + t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)") + } + err = dagmod.Flush() + if err != nil { + t.Fatal(err) + } + } + nd, err := dagmod.GetNode() + if err != nil { + t.Fatal(err) + } + + read, err := uio.NewDagReader(context.Background(), nd, dserv) + if err != nil { + t.Fatal(err) + } + rbuf, err := ioutil.ReadAll(read) + if err != nil { + t.Fatal(err) + } + + err = arrComp(rbuf, data) + if err != nil { + t.Fatal(err) + } +} + +func TestWriteNewFile(t *testing.T) { + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + towrite := make([]byte, 2000) + u.NewTimeSeededRand().Read(towrite) + + nw, err := dagmod.Write(towrite) + if err != nil { + t.Fatal(err) + } + if nw != len(towrite) { + t.Fatal("Wrote wrong amount") + } + + nd, err := dagmod.GetNode() + if err != nil { + t.Fatal(err) + } + + read, err := uio.NewDagReader(ctx, nd, dserv) + if err != nil { + t.Fatal(err) + } + + data, err := ioutil.ReadAll(read) + if err != nil { + t.Fatal(err) + } + + if err := arrComp(data, towrite); err != nil { + t.Fatal(err) + } +} + +func TestMultiWriteCoal(t *testing.T) { + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + data := make([]byte, 1000) + u.NewTimeSeededRand().Read(data) + + for i := 0; i < len(data); i++ { + n, err := dagmod.WriteAt(data[:i+1], 0) + if err != nil { + fmt.Println("FAIL AT ", i) + t.Fatal(err) + } + if n != i+1 { + t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)") + } + + // TEMP + nn, err := dagmod.GetNode() + if err != nil { + t.Fatal(err) + } + + r, err := uio.NewDagReader(ctx, nn, dserv) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + if err := arrComp(out, data[:i+1]); err != nil { + fmt.Println("A ", len(out)) + fmt.Println(out) + fmt.Println(data[:i+1]) + t.Fatal(err) + } + // + } + nd, err := dagmod.GetNode() + if err != nil { + t.Fatal(err) + } + + read, err := uio.NewDagReader(context.Background(), nd, dserv) + if err != nil { + t.Fatal(err) + } + rbuf, err := ioutil.ReadAll(read) + if err != nil { + t.Fatal(err) + } + + err = arrComp(rbuf, data) + if err != nil { + t.Fatal(err) + } +} + +func TestLargeWriteChunks(t *testing.T) { + dserv := getMockDagServ(t) + _, n := getNode(t, dserv, 0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + wrsize := 1000 + datasize := 10000000 + data := make([]byte, datasize) + + u.NewTimeSeededRand().Read(data) + + for i := 0; i < datasize/wrsize; i++ { + n, err := dagmod.WriteAt(data[i*wrsize:(i+1)*wrsize], int64(i*wrsize)) + if err != nil { + t.Fatal(err) + } + if n != wrsize { + t.Fatal("failed to write buffer") + } + } + + out, err := ioutil.ReadAll(dagmod) + if err != nil { + t.Fatal(err) + } + + if err = arrComp(out, data); err != nil { + t.Fatal(err) + } + +} + +func TestDagTruncate(t *testing.T) { + dserv := getMockDagServ(t) + b, n := getNode(t, dserv, 50000) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512}) + if err != nil { + t.Fatal(err) + } + + err = dagmod.Truncate(12345) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(dagmod) + if err != nil { + t.Fatal(err) + } + + if err = arrComp(out, b[:12345]); err != nil { + t.Fatal(err) + } +} + +func arrComp(a, b []byte) error { + if len(a) != len(b) { + return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b)) + } + for i, v := range a { + if v != b[i] { + return fmt.Errorf("Arrays differ at index: %d", i) + } + } + return nil +} + +func printDag(nd *mdag.Node, ds mdag.DAGService, indent int) { + pbd, err := ft.FromBytes(nd.Data) + if err != nil { + panic(err) + } + + for i := 0; i < indent; i++ { + fmt.Print(" ") + } + fmt.Printf("{size = %d, type = %s, children = %d", pbd.GetFilesize(), pbd.GetType().String(), len(pbd.GetBlocksizes())) + if len(nd.Links) > 0 { + fmt.Println() + } + for _, lnk := range nd.Links { + child, err := lnk.GetNode(ds) + if err != nil { + panic(err) + } + printDag(child, ds, indent+1) + } + if len(nd.Links) > 0 { + for i := 0; i < indent; i++ { + fmt.Print(" ") + } + } + fmt.Println("}") +}