refactor dagmodifier to work with trickledag format

This commit is contained in:
Jeromy 2015-03-07 00:58:04 -08:00
parent 565505c4ba
commit 63e15abd8f
6 changed files with 937 additions and 453 deletions

View File

@ -24,6 +24,7 @@ import (
path "github.com/jbenet/go-ipfs/path"
ft "github.com/jbenet/go-ipfs/unixfs"
uio "github.com/jbenet/go-ipfs/unixfs/io"
mod "github.com/jbenet/go-ipfs/unixfs/mod"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
u "github.com/jbenet/go-ipfs/util"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
@ -211,7 +212,7 @@ type Node struct {
Ipfs *core.IpfsNode
Nd *mdag.Node
dagMod *uio.DagModifier
dagMod *mod.DagModifier
cached *ftpb.Data
}
@ -238,7 +239,11 @@ func (s *Node) Attr() fuse.Attr {
size = 0
}
if size == 0 {
size = s.dagMod.Size()
dmsize, err := s.dagMod.Size()
if err != nil {
log.Error(err)
}
size = uint64(dmsize)
}
mode := os.FileMode(0666)
@ -344,13 +349,13 @@ func (n *Node) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wri
if n.dagMod == nil {
// Create a DagModifier to allow us to change the existing dag node
dmod, err := uio.NewDagModifier(n.Nd, n.Ipfs.DAG, chunk.DefaultSplitter)
dmod, err := mod.NewDagModifier(ctx, n.Nd, n.Ipfs.DAG, n.Ipfs.Pinning.GetManual(), chunk.DefaultSplitter)
if err != nil {
return err
}
n.dagMod = dmod
}
wrote, err := n.dagMod.WriteAt(req.Data, uint64(req.Offset))
wrote, err := n.dagMod.WriteAt(req.Data, int64(req.Offset))
if err != nil {
return err
}

View File

@ -244,6 +244,20 @@ func verifyTDagRec(nd *dag.Node, depth, direct, layerRepeat int, ds dag.DAGServi
return nil
}
// Verify this is a branch node
pbn, err := ft.FromBytes(nd.Data)
if err != nil {
return err
}
if pbn.GetType() != ft.TFile {
return errors.New("expected file as branch node")
}
if len(pbn.Data) > 0 {
return errors.New("branch node should not have data")
}
for i := 0; i < len(nd.Links); i++ {
child, err := nd.Links[i].GetNode(ds)
if err != nil {

View File

@ -1,202 +0,0 @@
package io
import (
"bytes"
"errors"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("dagio")
// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
dagserv mdag.DAGService
curNode *mdag.Node
pbdata *ftpb.Data
splitter chunk.BlockSplitter
}
func NewDagModifier(from *mdag.Node, serv mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) {
pbd, err := ft.FromBytes(from.Data)
if err != nil {
return nil, err
}
return &DagModifier{
curNode: from.Copy(),
dagserv: serv,
pbdata: pbd,
splitter: spl,
}, nil
}
// WriteAt will modify a dag file in place
// NOTE: it currently assumes only a single level of indirection
func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
// Check bounds
if dm.pbdata.GetFilesize() < offset {
return 0, errors.New("Attempted to perform write starting past end of file")
}
// First need to find where we are writing at
end := uint64(len(b)) + offset
// This shouldnt be necessary if we do subblocks sizes properly
newsize := dm.pbdata.GetFilesize()
if end > dm.pbdata.GetFilesize() {
newsize = end
}
zeroblocklen := uint64(len(dm.pbdata.Data))
origlen := len(b)
if end <= zeroblocklen {
log.Debug("Writing into zero block")
// Replacing zeroeth data block (embedded in the root node)
//TODO: check chunking here
copy(dm.pbdata.Data[offset:], b)
return len(b), nil
}
// Find where write should start
var traversed uint64
startsubblk := len(dm.pbdata.Blocksizes)
if offset < zeroblocklen {
dm.pbdata.Data = dm.pbdata.Data[:offset]
startsubblk = 0
} else {
traversed = uint64(zeroblocklen)
for i, size := range dm.pbdata.Blocksizes {
if uint64(offset) < traversed+size {
log.Debugf("Starting mod at block %d. [%d < %d + %d]", i, offset, traversed, size)
// Here is where we start
startsubblk = i
lnk := dm.curNode.Links[i]
node, err := dm.dagserv.Get(u.Key(lnk.Hash))
if err != nil {
return 0, err
}
data, err := ft.UnwrapData(node.Data)
if err != nil {
return 0, err
}
// We have to rewrite the data before our write in this block.
b = append(data[:offset-traversed], b...)
break
}
traversed += size
}
if startsubblk == len(dm.pbdata.Blocksizes) {
// TODO: Im not sure if theres any case that isnt being handled here.
// leaving this note here as a future reference in case something breaks
}
}
// Find blocks that need to be overwritten
var changed []int
mid := -1
var midoff uint64
for i, size := range dm.pbdata.Blocksizes[startsubblk:] {
if end > traversed {
changed = append(changed, i+startsubblk)
} else {
break
}
traversed += size
if end < traversed {
mid = i + startsubblk
midoff = end - (traversed - size)
break
}
}
// If our write starts in the middle of a block...
var midlnk *mdag.Link
if mid >= 0 {
midlnk = dm.curNode.Links[mid]
midnode, err := dm.dagserv.Get(u.Key(midlnk.Hash))
if err != nil {
return 0, err
}
// NOTE: this may have to be changed later when we have multiple
// layers of indirection
data, err := ft.UnwrapData(midnode.Data)
if err != nil {
return 0, err
}
b = append(b, data[midoff:]...)
}
// Generate new sub-blocks, and sizes
subblocks := splitBytes(b, dm.splitter)
var links []*mdag.Link
var sizes []uint64
for _, sb := range subblocks {
n := &mdag.Node{Data: ft.WrapData(sb)}
_, err := dm.dagserv.Add(n)
if err != nil {
log.Warningf("Failed adding node to DAG service: %s", err)
return 0, err
}
lnk, err := mdag.MakeLink(n)
if err != nil {
return 0, err
}
links = append(links, lnk)
sizes = append(sizes, uint64(len(sb)))
}
// This is disgusting (and can be rewritten if performance demands)
if len(changed) > 0 {
sechalflink := append(links, dm.curNode.Links[changed[len(changed)-1]+1:]...)
dm.curNode.Links = append(dm.curNode.Links[:changed[0]], sechalflink...)
sechalfblks := append(sizes, dm.pbdata.Blocksizes[changed[len(changed)-1]+1:]...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes[:changed[0]], sechalfblks...)
} else {
dm.curNode.Links = append(dm.curNode.Links, links...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes, sizes...)
}
dm.pbdata.Filesize = proto.Uint64(newsize)
return origlen, nil
}
func (dm *DagModifier) Size() uint64 {
if dm == nil {
return 0
}
return dm.pbdata.GetFilesize()
}
// splitBytes uses a splitterFunc to turn a large array of bytes
// into many smaller arrays of bytes
func splitBytes(b []byte, spl chunk.BlockSplitter) [][]byte {
out := spl.Split(bytes.NewReader(b))
var arr [][]byte
for blk := range out {
arr = append(arr, blk)
}
return arr
}
// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
b, err := proto.Marshal(dm.pbdata)
if err != nil {
return nil, err
}
dm.curNode.Data = b
return dm.curNode.Copy(), nil
}

View File

@ -1,247 +0,0 @@
package io
import (
"fmt"
"io"
"io/ioutil"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bs "github.com/jbenet/go-ipfs/blockservice"
"github.com/jbenet/go-ipfs/exchange/offline"
imp "github.com/jbenet/go-ipfs/importer"
"github.com/jbenet/go-ipfs/importer/chunk"
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func getMockDagServ(t *testing.T) mdag.DAGService {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv, err := bs.New(bstore, offline.Exchange(bstore))
if err != nil {
t.Fatal(err)
}
return mdag.NewDAGService(bserv)
}
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
in := io.LimitReader(u.NewTimeSeededRand(), size)
node, err := imp.BuildDagFromReader(in, dserv, nil, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
dr, err := NewDagReader(context.Background(), node, dserv)
if err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadAll(dr)
if err != nil {
t.Fatal(err)
}
return b, node
}
func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte {
newdata := make([]byte, size)
r := u.NewTimeSeededRand()
r.Read(newdata)
if size+beg > uint64(len(orig)) {
orig = append(orig, make([]byte, (size+beg)-uint64(len(orig)))...)
}
copy(orig[beg:], newdata)
nmod, err := dm.WriteAt(newdata, uint64(beg))
if err != nil {
t.Fatal(err)
}
if nmod != int(size) {
t.Fatalf("Mod length not correct! %d != %d", nmod, size)
}
nd, err := dm.GetNode()
if err != nil {
t.Fatal(err)
}
rd, err := NewDagReader(context.Background(), nd, dm.dagserv)
if err != nil {
t.Fatal(err)
}
after, err := ioutil.ReadAll(rd)
if err != nil {
t.Fatal(err)
}
err = arrComp(after, orig)
if err != nil {
t.Fatal(err)
}
return orig
}
func TestDagModifierBasic(t *testing.T) {
t.Skip("DAGModifier needs to be fixed to work with indirect blocks.")
if err := u.SetLogLevel("blockservice", "critical"); err != nil {
t.Fatalf("testlog prepare failed: %s", err)
}
if err := u.SetLogLevel("merkledag", "critical"); err != nil {
t.Fatalf("testlog prepare failed: %s", err)
}
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
// Within zero block
beg := uint64(15)
length := uint64(60)
t.Log("Testing mod within zero block")
b = testModWrite(t, beg, length, b, dagmod)
// Within bounds of existing file
beg = 1000
length = 4000
t.Log("Testing mod within bounds of existing file.")
b = testModWrite(t, beg, length, b, dagmod)
// Extend bounds
beg = 49500
length = 4000
t.Log("Testing mod that extends file.")
b = testModWrite(t, beg, length, b, dagmod)
// "Append"
beg = uint64(len(b))
length = 3000
b = testModWrite(t, beg, length, b, dagmod)
// Verify reported length
node, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
size, err := ft.DataSize(node.Data)
if err != nil {
t.Fatal(err)
}
expected := uint64(50000 + 3500 + 3000)
if size != expected {
t.Fatalf("Final reported size is incorrect [%d != %d]", size, expected)
}
}
func TestMultiWrite(t *testing.T) {
t.Skip("DAGModifier needs to be fixed to work with indirect blocks.")
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
data := make([]byte, 4000)
u.NewTimeSeededRand().Read(data)
for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[i:i+1], uint64(i))
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
}
}
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
rbuf, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
err = arrComp(rbuf, data)
if err != nil {
t.Fatal(err)
}
}
func TestMultiWriteCoal(t *testing.T) {
t.Skip("Skipping test until DagModifier is fixed")
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
data := make([]byte, 4000)
u.NewTimeSeededRand().Read(data)
for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[:i+1], 0)
if err != nil {
t.Fatal(err)
}
if n != i+1 {
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
}
}
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
rbuf, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
err = arrComp(rbuf, data)
if err != nil {
t.Fatal(err)
}
}
func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
}
for i, v := range a {
if v != b[i] {
return fmt.Errorf("Arrays differ at index: %d", i)
}
}
return nil
}

450
unixfs/mod/dagmodifier.go Normal file
View File

@ -0,0 +1,450 @@
package mod
import (
"bytes"
"errors"
"io"
"os"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
help "github.com/jbenet/go-ipfs/importer/helpers"
trickle "github.com/jbenet/go-ipfs/importer/trickle"
mdag "github.com/jbenet/go-ipfs/merkledag"
pin "github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
uio "github.com/jbenet/go-ipfs/unixfs/io"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
u "github.com/jbenet/go-ipfs/util"
)
// 2MB
var writebufferSize = 1 << 21
var log = u.Logger("dagio")
// DagModifier is the only struct licensed and able to correctly
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
dagserv mdag.DAGService
curNode *mdag.Node
mp pin.ManualPinner
splitter chunk.BlockSplitter
ctx context.Context
readCancel func()
writeStart uint64
curWrOff uint64
wrBuf *bytes.Buffer
read *uio.DagReader
}
func NewDagModifier(ctx context.Context, from *mdag.Node, serv mdag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*DagModifier, error) {
return &DagModifier{
curNode: from.Copy(),
dagserv: serv,
splitter: spl,
ctx: ctx,
mp: mp,
}, nil
}
// WriteAt will modify a dag file in place
// NOTE: it currently assumes only a single level of indirection
func (dm *DagModifier) WriteAt(b []byte, offset int64) (int, error) {
// TODO: this is currently VERY inneficient
if uint64(offset) != dm.curWrOff {
size, err := dm.Size()
if err != nil {
return 0, err
}
if offset > size {
err := dm.expandSparse(offset - size)
if err != nil {
return 0, err
}
}
err = dm.Flush()
if err != nil {
return 0, err
}
dm.writeStart = uint64(offset)
}
return dm.Write(b)
}
// A reader that just returns zeros
type zeroReader struct{}
func (zr zeroReader) Read(b []byte) (int, error) {
for i, _ := range b {
b[i] = 0
}
return len(b), nil
}
func (dm *DagModifier) expandSparse(size int64) error {
spl := chunk.SizeSplitter{4096}
r := io.LimitReader(zeroReader{}, size)
blks := spl.Split(r)
nnode, err := dm.appendData(dm.curNode, blks)
if err != nil {
return err
}
_, err = dm.dagserv.Add(nnode)
if err != nil {
return err
}
dm.curNode = nnode
return nil
}
func (dm *DagModifier) Write(b []byte) (int, error) {
if dm.read != nil {
dm.read = nil
}
if dm.wrBuf == nil {
dm.wrBuf = new(bytes.Buffer)
}
n, err := dm.wrBuf.Write(b)
if err != nil {
return n, err
}
dm.curWrOff += uint64(n)
if dm.wrBuf.Len() > writebufferSize {
err := dm.Flush()
if err != nil {
return n, err
}
}
return n, nil
}
func (dm *DagModifier) Size() (int64, error) {
// TODO: compute size without flushing, should be easy
err := dm.Flush()
if err != nil {
return 0, err
}
pbn, err := ft.FromBytes(dm.curNode.Data)
if err != nil {
return 0, err
}
return int64(pbn.GetFilesize()), nil
}
func (dm *DagModifier) Flush() error {
if dm.wrBuf == nil {
return nil
}
// If we have an active reader, kill it
if dm.read != nil {
dm.read = nil
dm.readCancel()
}
buflen := dm.wrBuf.Len()
k, _, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf)
if err != nil {
return err
}
nd, err := dm.dagserv.Get(k)
if err != nil {
return err
}
dm.curNode = nd
if !done {
blks := dm.splitter.Split(dm.wrBuf)
nd, err = dm.appendData(dm.curNode, blks)
if err != nil {
return err
}
_, err := dm.dagserv.Add(nd)
if err != nil {
return err
}
dm.curNode = nd
}
dm.writeStart += uint64(buflen)
dm.wrBuf = nil
return nil
}
func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) (u.Key, int, bool, error) {
f, err := ft.FromBytes(node.Data)
if err != nil {
return "", 0, false, err
}
if len(node.Links) == 0 && (f.GetType() == ftpb.Data_Raw || f.GetType() == ftpb.Data_File) {
n, err := data.Read(f.Data[offset:])
if err != nil && err != io.EOF {
return "", 0, false, err
}
// Update newly written node..
b, err := proto.Marshal(f)
if err != nil {
return "", 0, false, err
}
nd := &mdag.Node{Data: b}
k, err := dm.dagserv.Add(nd)
if err != nil {
return "", 0, false, err
}
// Hey look! we're done!
var done bool
if n < len(f.Data) {
done = true
}
return k, n, done, nil
}
var cur uint64
var done bool
var totread int
for i, bs := range f.GetBlocksizes() {
if cur+bs > offset {
child, err := node.Links[i].GetNode(dm.dagserv)
if err != nil {
return "", 0, false, err
}
k, nread, sdone, err := dm.modifyDag(child, offset-cur, data)
if err != nil {
return "", 0, false, err
}
totread += nread
offset += bs
node.Links[i].Hash = mh.Multihash(k)
if sdone {
done = true
break
}
}
cur += bs
}
k, err := dm.dagserv.Add(node)
return k, totread, done, err
}
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.Node, error) {
dbp := &help.DagBuilderParams{
Dagserv: dm.dagserv,
Maxlinks: help.DefaultLinksPerBlock,
Pinner: dm.mp,
}
return trickle.TrickleAppend(node, dbp.New(blks))
}
func (dm *DagModifier) Read(b []byte) (int, error) {
err := dm.Flush()
if err != nil {
return 0, err
}
if dm.read == nil {
dr, err := uio.NewDagReader(dm.ctx, dm.curNode, dm.dagserv)
if err != nil {
return 0, err
}
i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
if err != nil {
return 0, err
}
if i != int64(dm.curWrOff) {
return 0, errors.New("failed to seek properly")
}
dm.read = dr
}
n, err := dm.read.Read(b)
dm.curWrOff += uint64(n)
return n, err
}
// splitBytes uses a splitterFunc to turn a large array of bytes
// into many smaller arrays of bytes
func (dm *DagModifier) splitBytes(in io.Reader) ([]u.Key, error) {
var out []u.Key
blks := dm.splitter.Split(in)
for blk := range blks {
nd := help.NewUnixfsNode()
nd.SetData(blk)
dagnd, err := nd.GetDagNode()
if err != nil {
return nil, err
}
k, err := dm.dagserv.Add(dagnd)
if err != nil {
return nil, err
}
out = append(out, k)
}
return out, nil
}
// GetNode gets the modified DAG Node
func (dm *DagModifier) GetNode() (*mdag.Node, error) {
err := dm.Flush()
if err != nil {
return nil, err
}
return dm.curNode.Copy(), nil
}
func (dm *DagModifier) HasChanges() bool {
return dm.wrBuf != nil
}
func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) {
err := dm.Flush()
if err != nil {
return 0, err
}
switch whence {
case os.SEEK_CUR:
dm.curWrOff += uint64(offset)
dm.writeStart = dm.curWrOff
case os.SEEK_SET:
dm.curWrOff = uint64(offset)
dm.writeStart = uint64(offset)
case os.SEEK_END:
return 0, errors.New("SEEK_END currently not implemented")
default:
return 0, errors.New("unrecognized whence")
}
if dm.read != nil {
_, err = dm.read.Seek(offset, whence)
if err != nil {
return 0, err
}
}
return int64(dm.curWrOff), nil
}
func (dm *DagModifier) Truncate(size int64) error {
err := dm.Flush()
if err != nil {
return err
}
realSize, err := dm.Size()
if err != nil {
return err
}
if size > int64(realSize) {
return errors.New("Cannot extend file through truncate")
}
nnode, err := dagTruncate(dm.curNode, uint64(size), dm.dagserv)
if err != nil {
return err
}
_, err = dm.dagserv.Add(nnode)
if err != nil {
return err
}
dm.curNode = nnode
return nil
}
func dagTruncate(nd *mdag.Node, size uint64, ds mdag.DAGService) (*mdag.Node, error) {
if len(nd.Links) == 0 {
// TODO: this can likely be done without marshaling and remarshaling
pbn, err := ft.FromBytes(nd.Data)
if err != nil {
return nil, err
}
nd.Data = ft.WrapData(pbn.Data[:size])
return nd, nil
}
var cur uint64
end := 0
var modified *mdag.Node
ndata := new(ft.FSNode)
for i, lnk := range nd.Links {
child, err := lnk.GetNode(ds)
if err != nil {
return nil, err
}
childsize, err := ft.DataSize(child.Data)
if err != nil {
return nil, err
}
if size < cur+childsize {
nchild, err := dagTruncate(child, size-cur, ds)
if err != nil {
return nil, err
}
// TODO: sanity check size of truncated block
ndata.AddBlockSize(size - cur)
modified = nchild
end = i
break
}
cur += childsize
ndata.AddBlockSize(childsize)
}
_, err := ds.Add(modified)
if err != nil {
return nil, err
}
nd.Links = nd.Links[:end]
err = nd.AddNodeLinkClean("", modified)
if err != nil {
return nil, err
}
d, err := ndata.GetBytes()
if err != nil {
return nil, err
}
nd.Data = d
return nd, nil
}

View File

@ -0,0 +1,464 @@
package mod
import (
"fmt"
"io"
"io/ioutil"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bs "github.com/jbenet/go-ipfs/blockservice"
"github.com/jbenet/go-ipfs/exchange/offline"
imp "github.com/jbenet/go-ipfs/importer"
"github.com/jbenet/go-ipfs/importer/chunk"
h "github.com/jbenet/go-ipfs/importer/helpers"
trickle "github.com/jbenet/go-ipfs/importer/trickle"
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func getMockDagServ(t *testing.T) mdag.DAGService {
dstore := ds.NewMapDatastore()
tsds := sync.MutexWrap(dstore)
bstore := blockstore.NewBlockstore(tsds)
bserv, err := bs.New(bstore, offline.Exchange(bstore))
if err != nil {
t.Fatal(err)
}
return mdag.NewDAGService(bserv)
}
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
in := io.LimitReader(u.NewTimeSeededRand(), size)
node, err := imp.BuildTrickleDagFromReader(in, dserv, nil, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
dr, err := uio.NewDagReader(context.Background(), node, dserv)
if err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadAll(dr)
if err != nil {
t.Fatal(err)
}
return b, node
}
func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier) []byte {
newdata := make([]byte, size)
r := u.NewTimeSeededRand()
r.Read(newdata)
if size+beg > uint64(len(orig)) {
orig = append(orig, make([]byte, (size+beg)-uint64(len(orig)))...)
}
copy(orig[beg:], newdata)
nmod, err := dm.WriteAt(newdata, int64(beg))
if err != nil {
t.Fatal(err)
}
if nmod != int(size) {
t.Fatalf("Mod length not correct! %d != %d", nmod, size)
}
nd, err := dm.GetNode()
if err != nil {
t.Fatal(err)
}
err = trickle.VerifyTrickleDagStructure(nd, dm.dagserv, h.DefaultLinksPerBlock, 4)
if err != nil {
t.Fatal(err)
}
rd, err := uio.NewDagReader(context.Background(), nd, dm.dagserv)
if err != nil {
t.Fatal(err)
}
after, err := ioutil.ReadAll(rd)
if err != nil {
t.Fatal(err)
}
err = arrComp(after, orig)
if err != nil {
t.Fatal(err)
}
return orig
}
func TestDagModifierBasic(t *testing.T) {
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
// Within zero block
beg := uint64(15)
length := uint64(60)
t.Log("Testing mod within zero block")
b = testModWrite(t, beg, length, b, dagmod)
// Within bounds of existing file
beg = 1000
length = 4000
t.Log("Testing mod within bounds of existing multiblock file.")
b = testModWrite(t, beg, length, b, dagmod)
// Extend bounds
beg = 49500
length = 4000
t.Log("Testing mod that extends file.")
b = testModWrite(t, beg, length, b, dagmod)
// "Append"
beg = uint64(len(b))
length = 3000
t.Log("Testing pure append")
b = testModWrite(t, beg, length, b, dagmod)
// Verify reported length
node, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
size, err := ft.DataSize(node.Data)
if err != nil {
t.Fatal(err)
}
expected := uint64(50000 + 3500 + 3000)
if size != expected {
t.Fatalf("Final reported size is incorrect [%d != %d]", size, expected)
}
}
func TestMultiWrite(t *testing.T) {
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
data := make([]byte, 4000)
u.NewTimeSeededRand().Read(data)
for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[i:i+1], int64(i))
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
}
}
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := uio.NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
rbuf, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
err = arrComp(rbuf, data)
if err != nil {
t.Fatal(err)
}
}
func TestMultiWriteAndFlush(t *testing.T) {
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
data := make([]byte, 20)
u.NewTimeSeededRand().Read(data)
for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[i:i+1], int64(i))
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
}
err = dagmod.Flush()
if err != nil {
t.Fatal(err)
}
}
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := uio.NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
rbuf, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
err = arrComp(rbuf, data)
if err != nil {
t.Fatal(err)
}
}
func TestWriteNewFile(t *testing.T) {
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
towrite := make([]byte, 2000)
u.NewTimeSeededRand().Read(towrite)
nw, err := dagmod.Write(towrite)
if err != nil {
t.Fatal(err)
}
if nw != len(towrite) {
t.Fatal("Wrote wrong amount")
}
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := uio.NewDagReader(ctx, nd, dserv)
if err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
if err := arrComp(data, towrite); err != nil {
t.Fatal(err)
}
}
func TestMultiWriteCoal(t *testing.T) {
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
data := make([]byte, 1000)
u.NewTimeSeededRand().Read(data)
for i := 0; i < len(data); i++ {
n, err := dagmod.WriteAt(data[:i+1], 0)
if err != nil {
fmt.Println("FAIL AT ", i)
t.Fatal(err)
}
if n != i+1 {
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
}
// TEMP
nn, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(ctx, nn, dserv)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
if err := arrComp(out, data[:i+1]); err != nil {
fmt.Println("A ", len(out))
fmt.Println(out)
fmt.Println(data[:i+1])
t.Fatal(err)
}
//
}
nd, err := dagmod.GetNode()
if err != nil {
t.Fatal(err)
}
read, err := uio.NewDagReader(context.Background(), nd, dserv)
if err != nil {
t.Fatal(err)
}
rbuf, err := ioutil.ReadAll(read)
if err != nil {
t.Fatal(err)
}
err = arrComp(rbuf, data)
if err != nil {
t.Fatal(err)
}
}
func TestLargeWriteChunks(t *testing.T) {
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
wrsize := 1000
datasize := 10000000
data := make([]byte, datasize)
u.NewTimeSeededRand().Read(data)
for i := 0; i < datasize/wrsize; i++ {
n, err := dagmod.WriteAt(data[i*wrsize:(i+1)*wrsize], int64(i*wrsize))
if err != nil {
t.Fatal(err)
}
if n != wrsize {
t.Fatal("failed to write buffer")
}
}
out, err := ioutil.ReadAll(dagmod)
if err != nil {
t.Fatal(err)
}
if err = arrComp(out, data); err != nil {
t.Fatal(err)
}
}
func TestDagTruncate(t *testing.T) {
dserv := getMockDagServ(t)
b, n := getNode(t, dserv, 50000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dagmod, err := NewDagModifier(ctx, n, dserv, nil, &chunk.SizeSplitter{Size: 512})
if err != nil {
t.Fatal(err)
}
err = dagmod.Truncate(12345)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(dagmod)
if err != nil {
t.Fatal(err)
}
if err = arrComp(out, b[:12345]); err != nil {
t.Fatal(err)
}
}
func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
}
for i, v := range a {
if v != b[i] {
return fmt.Errorf("Arrays differ at index: %d", i)
}
}
return nil
}
func printDag(nd *mdag.Node, ds mdag.DAGService, indent int) {
pbd, err := ft.FromBytes(nd.Data)
if err != nil {
panic(err)
}
for i := 0; i < indent; i++ {
fmt.Print(" ")
}
fmt.Printf("{size = %d, type = %s, children = %d", pbd.GetFilesize(), pbd.GetType().String(), len(pbd.GetBlocksizes()))
if len(nd.Links) > 0 {
fmt.Println()
}
for _, lnk := range nd.Links {
child, err := lnk.GetNode(ds)
if err != nil {
panic(err)
}
printDag(child, ds, indent+1)
}
if len(nd.Links) > 0 {
for i := 0; i < indent; i++ {
fmt.Print(" ")
}
}
fmt.Println("}")
}