mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
working on dag modification structures, factored out the data format into an importer subpackage and added more ipns tests
This commit is contained in:
parent
a13baff33d
commit
5592030ed3
@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/jbenet/go-ipfs/core"
|
||||
"github.com/jbenet/go-ipfs/importer"
|
||||
ft "github.com/jbenet/go-ipfs/importer/format"
|
||||
dag "github.com/jbenet/go-ipfs/merkledag"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
@ -75,7 +76,7 @@ func AddPath(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
|
||||
}
|
||||
|
||||
func addDir(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
|
||||
tree := &dag.Node{Data: dag.FolderPBData()}
|
||||
tree := &dag.Node{Data: ft.FolderPBData()}
|
||||
|
||||
files, err := ioutil.ReadDir(fpath)
|
||||
if err != nil {
|
||||
|
||||
@ -63,6 +63,7 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M
|
||||
return node, mnt
|
||||
}
|
||||
|
||||
// Test writing a file and reading it back
|
||||
func TestIpnsBasicIO(t *testing.T) {
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
@ -80,6 +81,7 @@ func TestIpnsBasicIO(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test to make sure file changes persist over mounts of ipns
|
||||
func TestFilePersistence(t *testing.T) {
|
||||
node, mnt := setupIpnsTest(t, nil)
|
||||
|
||||
@ -104,6 +106,7 @@ func TestFilePersistence(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test to make sure the filesystem reports file sizes correctly
|
||||
func TestFileSizeReporting(t *testing.T) {
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
@ -120,3 +123,91 @@ func TestFileSizeReporting(t *testing.T) {
|
||||
t.Fatal("Read incorrect size from stat!")
|
||||
}
|
||||
}
|
||||
|
||||
// Test to make sure you cant create multiple entries with the same name
|
||||
func TestDoubleEntryFailure(t *testing.T) {
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
|
||||
dname := mnt.Dir + "/local/thisisadir"
|
||||
err := os.Mkdir(dname, 0777)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = os.Mkdir(dname, 0777)
|
||||
if err == nil {
|
||||
t.Fatal("Should have gotten error one creating new directory.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppendFile(t *testing.T) {
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
|
||||
fname := mnt.Dir + "/local/file"
|
||||
data := writeFile(t, 1300, fname)
|
||||
|
||||
fi, err := os.OpenFile(fname, os.O_RDWR|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
nudata := randBytes(500)
|
||||
|
||||
n, err := fi.Write(nudata)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = fi.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if n != len(nudata) {
|
||||
t.Fatal("Failed to write enough bytes.")
|
||||
}
|
||||
|
||||
data = append(data, nudata...)
|
||||
|
||||
rbuf, err := ioutil.ReadFile(fname)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(rbuf, data) {
|
||||
t.Fatal("Data inconsistent!")
|
||||
}
|
||||
}
|
||||
|
||||
// Test writing a medium sized file one byte at a time
|
||||
func TestMultiWrite(t *testing.T) {
|
||||
_, mnt := setupIpnsTest(t, nil)
|
||||
defer mnt.Close()
|
||||
|
||||
fpath := mnt.Dir + "/local/file"
|
||||
fi, err := os.Create(fpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
data := randBytes(1001)
|
||||
for i := 0; i < len(data); i++ {
|
||||
n, err := fi.Write(data[i : i+1])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Fatal("Somehow wrote the wrong number of bytes! (n != 1)")
|
||||
}
|
||||
}
|
||||
fi.Close()
|
||||
|
||||
rbuf, err := ioutil.ReadFile(fpath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(rbuf, data) {
|
||||
t.Fatal("File on disk did not match bytes written")
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/jbenet/go-ipfs/core"
|
||||
ci "github.com/jbenet/go-ipfs/crypto"
|
||||
imp "github.com/jbenet/go-ipfs/importer"
|
||||
ft "github.com/jbenet/go-ipfs/importer/format"
|
||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
@ -77,7 +78,7 @@ func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, er
|
||||
pointsTo, err := n.Namesys.Resolve(name)
|
||||
if err != nil {
|
||||
log.Warning("Could not resolve value for local ipns entry, providing empty dir")
|
||||
nd.Nd = &mdag.Node{Data: mdag.FolderPBData()}
|
||||
nd.Nd = &mdag.Node{Data: ft.FolderPBData()}
|
||||
root.LocalDirs[name] = nd
|
||||
continue
|
||||
}
|
||||
@ -199,14 +200,14 @@ type Node struct {
|
||||
Ipfs *core.IpfsNode
|
||||
Nd *mdag.Node
|
||||
fd *mdag.DagReader
|
||||
cached *mdag.PBData
|
||||
cached *ft.PBData
|
||||
|
||||
// For writing
|
||||
writerBuf WriteAtBuf
|
||||
}
|
||||
|
||||
func (s *Node) loadData() error {
|
||||
s.cached = new(mdag.PBData)
|
||||
s.cached = new(ft.PBData)
|
||||
return proto.Unmarshal(s.Nd.Data, s.cached)
|
||||
}
|
||||
|
||||
@ -216,10 +217,10 @@ func (s *Node) Attr() fuse.Attr {
|
||||
s.loadData()
|
||||
}
|
||||
switch s.cached.GetType() {
|
||||
case mdag.PBData_Directory:
|
||||
case ft.PBData_Directory:
|
||||
return fuse.Attr{Mode: os.ModeDir | 0555}
|
||||
case mdag.PBData_File, mdag.PBData_Raw:
|
||||
size, err := s.Nd.DataSize()
|
||||
case ft.PBData_File, ft.PBData_Raw:
|
||||
size, err := ft.DataSize(s.Nd.Data)
|
||||
if err != nil {
|
||||
log.Error("Error getting size of file: %s", err)
|
||||
size = 0
|
||||
@ -414,7 +415,7 @@ func (n *Node) Fsync(req *fuse.FsyncRequest, intr fs.Intr) fuse.Error {
|
||||
|
||||
func (n *Node) Mkdir(req *fuse.MkdirRequest, intr fs.Intr) (fs.Node, fuse.Error) {
|
||||
log.Debug("Got mkdir request!")
|
||||
dagnd := &mdag.Node{Data: mdag.FolderPBData()}
|
||||
dagnd := &mdag.Node{Data: ft.FolderPBData()}
|
||||
nnode := n.Nd.Copy()
|
||||
nnode.AddNodeLink(req.Name, dagnd)
|
||||
|
||||
@ -448,6 +449,12 @@ func (n *Node) Mkdir(req *fuse.MkdirRequest, intr fs.Intr) (fs.Node, fuse.Error)
|
||||
func (n *Node) Open(req *fuse.OpenRequest, resp *fuse.OpenResponse, intr fs.Intr) (fs.Handle, fuse.Error) {
|
||||
//log.Debug("[%s] Received open request! flags = %s", n.name, req.Flags.String())
|
||||
//TODO: check open flags and truncate if necessary
|
||||
if req.Flags&fuse.OpenTruncate != 0 {
|
||||
log.Warning("Need to truncate file!")
|
||||
}
|
||||
if req.Flags&fuse.OpenAppend != 0 {
|
||||
log.Warning("Need to append to file!")
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
@ -460,7 +467,7 @@ func (n *Node) Create(req *fuse.CreateRequest, resp *fuse.CreateResponse, intr f
|
||||
log.Debug("Got create request: %s", req.Name)
|
||||
|
||||
// New 'empty' file
|
||||
nd := &mdag.Node{Data: mdag.FilePBData(nil, 0)}
|
||||
nd := &mdag.Node{Data: ft.FilePBData(nil, 0)}
|
||||
child := n.makeChild(req.Name, nd)
|
||||
|
||||
nnode := n.Nd.Copy()
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
|
||||
core "github.com/jbenet/go-ipfs/core"
|
||||
ft "github.com/jbenet/go-ipfs/importer/format"
|
||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
@ -79,11 +80,11 @@ type Node struct {
|
||||
Ipfs *core.IpfsNode
|
||||
Nd *mdag.Node
|
||||
fd *mdag.DagReader
|
||||
cached *mdag.PBData
|
||||
cached *ft.PBData
|
||||
}
|
||||
|
||||
func (s *Node) loadData() error {
|
||||
s.cached = new(mdag.PBData)
|
||||
s.cached = new(ft.PBData)
|
||||
return proto.Unmarshal(s.Nd.Data, s.cached)
|
||||
}
|
||||
|
||||
@ -94,9 +95,9 @@ func (s *Node) Attr() fuse.Attr {
|
||||
s.loadData()
|
||||
}
|
||||
switch s.cached.GetType() {
|
||||
case mdag.PBData_Directory:
|
||||
case ft.PBData_Directory:
|
||||
return fuse.Attr{Mode: os.ModeDir | 0555}
|
||||
case mdag.PBData_File, mdag.PBData_Raw:
|
||||
case ft.PBData_File, ft.PBData_Raw:
|
||||
size, _ := s.Nd.Size()
|
||||
return fuse.Attr{
|
||||
Mode: 0444,
|
||||
|
||||
80
importer/dagwriter/dagwriter.go
Normal file
80
importer/dagwriter/dagwriter.go
Normal file
@ -0,0 +1,80 @@
|
||||
package dagwriter
|
||||
|
||||
import (
|
||||
imp "github.com/jbenet/go-ipfs/importer"
|
||||
ft "github.com/jbenet/go-ipfs/importer/format"
|
||||
dag "github.com/jbenet/go-ipfs/merkledag"
|
||||
"github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
var log = util.Logger("dagwriter")
|
||||
|
||||
type DagWriter struct {
|
||||
dagserv *dag.DAGService
|
||||
node *dag.Node
|
||||
totalSize int64
|
||||
splChan chan []byte
|
||||
done chan struct{}
|
||||
splitter imp.StreamSplitter
|
||||
seterr error
|
||||
}
|
||||
|
||||
func NewDagWriter(ds *dag.DAGService, splitter imp.StreamSplitter) *DagWriter {
|
||||
dw := new(DagWriter)
|
||||
dw.dagserv = ds
|
||||
dw.splChan = make(chan []byte, 8)
|
||||
dw.splitter = splitter
|
||||
dw.done = make(chan struct{})
|
||||
go dw.startSplitter()
|
||||
return dw
|
||||
}
|
||||
|
||||
func (dw *DagWriter) startSplitter() {
|
||||
blkchan := dw.splitter.Split(dw.splChan)
|
||||
first := <-blkchan
|
||||
root := new(dag.Node)
|
||||
fileSize := uint64(0)
|
||||
for blkData := range blkchan {
|
||||
fileSize += uint64(len(blkData))
|
||||
node := &dag.Node{Data: ft.WrapData(blkData)}
|
||||
_, err := dw.dagserv.Add(node)
|
||||
if err != nil {
|
||||
dw.seterr = err
|
||||
log.Critical("Got error adding created node to dagservice: %s", err)
|
||||
return
|
||||
}
|
||||
err = root.AddNodeLinkClean("", node)
|
||||
if err != nil {
|
||||
dw.seterr = err
|
||||
log.Critical("Got error adding created node to root node: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
root.Data = ft.FilePBData(first, fileSize)
|
||||
_, err := dw.dagserv.Add(root)
|
||||
if err != nil {
|
||||
dw.seterr = err
|
||||
log.Critical("Got error adding created node to dagservice: %s", err)
|
||||
return
|
||||
}
|
||||
dw.node = root
|
||||
dw.done <- struct{}{}
|
||||
}
|
||||
|
||||
func (dw *DagWriter) Write(b []byte) (int, error) {
|
||||
if dw.seterr != nil {
|
||||
return 0, dw.seterr
|
||||
}
|
||||
dw.splChan <- b
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (dw *DagWriter) Close() error {
|
||||
close(dw.splChan)
|
||||
<-dw.done
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dw *DagWriter) GetNode() *dag.Node {
|
||||
return dw.node
|
||||
}
|
||||
102
importer/dagwriter/dagwriter_test.go
Normal file
102
importer/dagwriter/dagwriter_test.go
Normal file
@ -0,0 +1,102 @@
|
||||
package dagwriter
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"io"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
||||
bs "github.com/jbenet/go-ipfs/blockservice"
|
||||
imp "github.com/jbenet/go-ipfs/importer"
|
||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||
)
|
||||
|
||||
type datasource struct {
|
||||
i int
|
||||
}
|
||||
|
||||
func (d *datasource) Read(b []byte) (int, error) {
|
||||
for i, _ := range b {
|
||||
b[i] = byte(d.i % 256)
|
||||
d.i++
|
||||
}
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (d *datasource) Matches(t *testing.T, r io.Reader, length int) bool {
|
||||
b := make([]byte, 100)
|
||||
i := 0
|
||||
for {
|
||||
n, err := r.Read(b)
|
||||
if err != nil && err != io.EOF {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, v := range b[:n] {
|
||||
if v != byte(i%256) {
|
||||
t.Fatalf("Buffers differed at byte: %d (%d != %d)", i, v, (i % 256))
|
||||
}
|
||||
i++
|
||||
}
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
if i != length {
|
||||
t.Fatalf("Incorrect length. (%d != %d)", i, length)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func TestDagWriter(t *testing.T) {
|
||||
dstore := ds.NewMapDatastore()
|
||||
bserv, err := bs.NewBlockService(dstore, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
dag := &mdag.DAGService{bserv}
|
||||
dw := NewDagWriter(dag, &imp.SizeSplitter2{4096})
|
||||
|
||||
nbytes := int64(1024 * 1024 * 2)
|
||||
n, err := io.CopyN(dw, &datasource{}, nbytes)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if n != nbytes {
|
||||
t.Fatal("Copied incorrect amount of bytes!")
|
||||
}
|
||||
|
||||
dw.Close()
|
||||
|
||||
node := dw.GetNode()
|
||||
read, err := mdag.NewDagReader(node, dag)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
d := &datasource{}
|
||||
if !d.Matches(t, read, int(nbytes)) {
|
||||
t.Fatal("Failed to validate!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMassiveWrite(t *testing.T) {
|
||||
t.SkipNow()
|
||||
dstore := ds.NewNullDatastore()
|
||||
bserv, err := bs.NewBlockService(dstore, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
dag := &mdag.DAGService{bserv}
|
||||
dw := NewDagWriter(dag, &imp.SizeSplitter2{4096})
|
||||
|
||||
nbytes := int64(1024 * 1024 * 1024 * 16)
|
||||
n, err := io.CopyN(dw, &datasource{}, nbytes)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != nbytes {
|
||||
t.Fatal("Incorrect copy size.")
|
||||
}
|
||||
dw.Close()
|
||||
}
|
||||
5
importer/format/Makefile
Normal file
5
importer/format/Makefile
Normal file
@ -0,0 +1,5 @@
|
||||
all: data.pb.go
|
||||
|
||||
data.pb.go: data.proto
|
||||
protoc --go_out=. data.proto
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
// DO NOT EDIT!
|
||||
|
||||
/*
|
||||
Package merkledag is a generated protocol buffer package.
|
||||
Package format is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
data.proto
|
||||
@ -11,7 +11,7 @@ It is generated from these files:
|
||||
It has these top-level messages:
|
||||
PBData
|
||||
*/
|
||||
package merkledag
|
||||
package format
|
||||
|
||||
import proto "code.google.com/p/goprotobuf/proto"
|
||||
import math "math"
|
||||
@ -57,9 +57,10 @@ func (x *PBData_DataType) UnmarshalJSON(data []byte) error {
|
||||
}
|
||||
|
||||
type PBData struct {
|
||||
Type *PBData_DataType `protobuf:"varint,1,req,enum=merkledag.PBData_DataType" json:"Type,omitempty"`
|
||||
Type *PBData_DataType `protobuf:"varint,1,req,enum=format.PBData_DataType" json:"Type,omitempty"`
|
||||
Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"`
|
||||
Filesize *uint64 `protobuf:"varint,3,opt,name=filesize" json:"filesize,omitempty"`
|
||||
Blocksizes []uint64 `protobuf:"varint,4,rep,name=blocksizes" json:"blocksizes,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
@ -88,6 +89,13 @@ func (m *PBData) GetFilesize() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("merkledag.PBData_DataType", PBData_DataType_name, PBData_DataType_value)
|
||||
func (m *PBData) GetBlocksizes() []uint64 {
|
||||
if m != nil {
|
||||
return m.Blocksizes
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("format.PBData_DataType", PBData_DataType_name, PBData_DataType_value)
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
package merkledag;
|
||||
package format;
|
||||
|
||||
message PBData {
|
||||
enum DataType {
|
||||
@ -10,4 +10,5 @@ message PBData {
|
||||
required DataType Type = 1;
|
||||
optional bytes Data = 2;
|
||||
optional uint64 filesize = 3;
|
||||
repeated uint64 blocksizes = 4;
|
||||
}
|
||||
71
importer/format/format.go
Normal file
71
importer/format/format.go
Normal file
@ -0,0 +1,71 @@
|
||||
// Package format implements a data format for files in the ipfs filesystem
|
||||
// It is not the only format in ipfs, but it is the one that the filesystem assumes
|
||||
package format
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
)
|
||||
|
||||
func FilePBData(data []byte, totalsize uint64) []byte {
|
||||
pbfile := new(PBData)
|
||||
typ := PBData_File
|
||||
pbfile.Type = &typ
|
||||
pbfile.Data = data
|
||||
pbfile.Filesize = proto.Uint64(totalsize)
|
||||
|
||||
data, err := proto.Marshal(pbfile)
|
||||
if err != nil {
|
||||
//this really shouldnt happen, i promise
|
||||
panic(err)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func FolderPBData() []byte {
|
||||
pbfile := new(PBData)
|
||||
typ := PBData_Directory
|
||||
pbfile.Type = &typ
|
||||
|
||||
data, err := proto.Marshal(pbfile)
|
||||
if err != nil {
|
||||
//this really shouldnt happen, i promise
|
||||
panic(err)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func WrapData(b []byte) []byte {
|
||||
pbdata := new(PBData)
|
||||
typ := PBData_Raw
|
||||
pbdata.Data = b
|
||||
pbdata.Type = &typ
|
||||
|
||||
out, err := proto.Marshal(pbdata)
|
||||
if err != nil {
|
||||
// This shouldnt happen. seriously.
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func DataSize(data []byte) (uint64, error) {
|
||||
pbdata := new(PBData)
|
||||
err := proto.Unmarshal(data, pbdata)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
switch pbdata.GetType() {
|
||||
case PBData_Directory:
|
||||
return 0, errors.New("Cant get data size of directory!")
|
||||
case PBData_File:
|
||||
return pbdata.GetFilesize(), nil
|
||||
case PBData_Raw:
|
||||
return uint64(len(pbdata.GetData())), nil
|
||||
default:
|
||||
return 0, errors.New("Unrecognized node data type!")
|
||||
}
|
||||
}
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
|
||||
ft "github.com/jbenet/go-ipfs/importer/format"
|
||||
dag "github.com/jbenet/go-ipfs/merkledag"
|
||||
)
|
||||
|
||||
@ -34,7 +35,7 @@ func NewDagFromReaderWithSplitter(r io.Reader, spl BlockSplitter) (*dag.Node, er
|
||||
totalsize := uint64(len(first))
|
||||
for blk := range blkChan {
|
||||
totalsize += uint64(len(blk))
|
||||
child := &dag.Node{Data: dag.WrapData(blk)}
|
||||
child := &dag.Node{Data: ft.WrapData(blk)}
|
||||
err := root.AddNodeLink(fmt.Sprintf("%d", i), child)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -42,7 +43,7 @@ func NewDagFromReaderWithSplitter(r io.Reader, spl BlockSplitter) (*dag.Node, er
|
||||
i++
|
||||
}
|
||||
|
||||
root.Data = dag.FilePBData(first, totalsize)
|
||||
root.Data = ft.FilePBData(first, totalsize)
|
||||
return root, nil
|
||||
}
|
||||
|
||||
|
||||
@ -6,10 +6,16 @@ import (
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
// OLD
|
||||
type BlockSplitter interface {
|
||||
Split(io.Reader) chan []byte
|
||||
}
|
||||
|
||||
// NEW
|
||||
type StreamSplitter interface {
|
||||
Split(chan []byte) chan []byte
|
||||
}
|
||||
|
||||
type SizeSplitter struct {
|
||||
Size int
|
||||
}
|
||||
@ -39,3 +45,26 @@ func (ss *SizeSplitter) Split(r io.Reader) chan []byte {
|
||||
}()
|
||||
return out
|
||||
}
|
||||
|
||||
type SizeSplitter2 struct {
|
||||
Size int
|
||||
}
|
||||
|
||||
func (ss *SizeSplitter2) Split(in chan []byte) chan []byte {
|
||||
out := make(chan []byte)
|
||||
go func() {
|
||||
defer close(out)
|
||||
var buf []byte
|
||||
for b := range in {
|
||||
buf = append(buf, b...)
|
||||
for len(buf) > ss.Size {
|
||||
out <- buf[:ss.Size]
|
||||
buf = buf[ss.Size:]
|
||||
}
|
||||
}
|
||||
if len(buf) > 0 {
|
||||
out <- buf
|
||||
}
|
||||
}()
|
||||
return out
|
||||
}
|
||||
|
||||
@ -1,11 +1,8 @@
|
||||
|
||||
all: node.pb.go data.pb.go
|
||||
all: node.pb.go
|
||||
|
||||
node.pb.go: node.proto
|
||||
protoc --gogo_out=. --proto_path=../../../../:/usr/local/opt/protobuf/include:. $<
|
||||
|
||||
data.pb.go: data.proto
|
||||
protoc --go_out=. data.proto
|
||||
|
||||
clean:
|
||||
rm node.pb.go
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"io"
|
||||
|
||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
ft "github.com/jbenet/go-ipfs/importer/format"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
@ -20,21 +21,21 @@ type DagReader struct {
|
||||
}
|
||||
|
||||
func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) {
|
||||
pb := new(PBData)
|
||||
pb := new(ft.PBData)
|
||||
err := proto.Unmarshal(n.Data, pb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch pb.GetType() {
|
||||
case PBData_Directory:
|
||||
case ft.PBData_Directory:
|
||||
return nil, ErrIsDir
|
||||
case PBData_File:
|
||||
case ft.PBData_File:
|
||||
return &DagReader{
|
||||
node: n,
|
||||
serv: serv,
|
||||
buf: bytes.NewBuffer(pb.GetData()),
|
||||
}, nil
|
||||
case PBData_Raw:
|
||||
case ft.PBData_Raw:
|
||||
return bytes.NewBuffer(pb.GetData()), nil
|
||||
default:
|
||||
panic("Unrecognized node type!")
|
||||
@ -54,7 +55,7 @@ func (dr *DagReader) precalcNextBuf() error {
|
||||
}
|
||||
nxt = nxtNode
|
||||
}
|
||||
pb := new(PBData)
|
||||
pb := new(ft.PBData)
|
||||
err := proto.Unmarshal(nxt.Data, pb)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -62,13 +63,13 @@ func (dr *DagReader) precalcNextBuf() error {
|
||||
dr.position++
|
||||
|
||||
switch pb.GetType() {
|
||||
case PBData_Directory:
|
||||
case ft.PBData_Directory:
|
||||
panic("Why is there a directory under a file?")
|
||||
case PBData_File:
|
||||
case ft.PBData_File:
|
||||
//TODO: this *should* work, needs testing first
|
||||
//return NewDagReader(nxt, dr.serv)
|
||||
panic("Not yet handling different layers of indirection!")
|
||||
case PBData_Raw:
|
||||
case ft.PBData_Raw:
|
||||
dr.buf = bytes.NewBuffer(pb.GetData())
|
||||
return nil
|
||||
default:
|
||||
|
||||
@ -1,11 +1,8 @@
|
||||
package merkledag
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
|
||||
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
bserv "github.com/jbenet/go-ipfs/blockservice"
|
||||
@ -37,6 +34,9 @@ type Link struct {
|
||||
// cumulative size of target object
|
||||
Size uint64
|
||||
|
||||
// cumulative size of data stored in object
|
||||
DataSize uint64
|
||||
|
||||
// multihash of the target object
|
||||
Hash mh.Multihash
|
||||
|
||||
@ -46,14 +46,6 @@ type Link struct {
|
||||
|
||||
// AddNodeLink adds a link to another node.
|
||||
func (n *Node) AddNodeLink(name string, that *Node) error {
|
||||
// DEBUG CODE
|
||||
for _, l := range n.Links {
|
||||
if l.Name == name {
|
||||
panic("Trying to add child that already exists!")
|
||||
}
|
||||
}
|
||||
//
|
||||
|
||||
s, err := that.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -73,6 +65,27 @@ func (n *Node) AddNodeLink(name string, that *Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddNodeLink adds a link to another node. without keeping a reference to
|
||||
// the child node
|
||||
func (n *Node) AddNodeLinkClean(name string, that *Node) error {
|
||||
s, err := that.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h, err := that.Multihash()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n.Links = append(n.Links, &Link{
|
||||
Name: name,
|
||||
Size: s,
|
||||
Hash: h,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) RemoveNodeLink(name string) error {
|
||||
for i, l := range n.Links {
|
||||
if l.Name == name {
|
||||
@ -83,6 +96,8 @@ func (n *Node) RemoveNodeLink(name string) error {
|
||||
return u.ErrNotFound
|
||||
}
|
||||
|
||||
// Copy returns a copy of the node.
|
||||
// NOTE: does not make copies of Node objects in the links.
|
||||
func (n *Node) Copy() *Node {
|
||||
nnode := new(Node)
|
||||
nnode.Data = make([]byte, len(n.Data))
|
||||
@ -108,25 +123,6 @@ func (n *Node) Size() (uint64, error) {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (n *Node) DataSize() (uint64, error) {
|
||||
pbdata := new(PBData)
|
||||
err := proto.Unmarshal(n.Data, pbdata)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
switch pbdata.GetType() {
|
||||
case PBData_Directory:
|
||||
return 0, errors.New("Cant get data size of directory!")
|
||||
case PBData_File:
|
||||
return pbdata.GetFilesize(), nil
|
||||
case PBData_Raw:
|
||||
return uint64(len(pbdata.GetData())), nil
|
||||
default:
|
||||
return 0, errors.New("Unrecognized node data type!")
|
||||
}
|
||||
}
|
||||
|
||||
// Multihash hashes the encoded data of this node.
|
||||
func (n *Node) Multihash() (mh.Multihash, error) {
|
||||
b, err := n.Encoded(false)
|
||||
@ -230,46 +226,3 @@ func (n *DAGService) Get(k u.Key) (*Node, error) {
|
||||
|
||||
return Decoded(b.Data)
|
||||
}
|
||||
|
||||
func FilePBData(data []byte, totalsize uint64) []byte {
|
||||
pbfile := new(PBData)
|
||||
typ := PBData_File
|
||||
pbfile.Type = &typ
|
||||
pbfile.Data = data
|
||||
pbfile.Filesize = proto.Uint64(totalsize)
|
||||
|
||||
data, err := proto.Marshal(pbfile)
|
||||
if err != nil {
|
||||
//this really shouldnt happen, i promise
|
||||
panic(err)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func FolderPBData() []byte {
|
||||
pbfile := new(PBData)
|
||||
typ := PBData_Directory
|
||||
pbfile.Type = &typ
|
||||
|
||||
data, err := proto.Marshal(pbfile)
|
||||
if err != nil {
|
||||
//this really shouldnt happen, i promise
|
||||
panic(err)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func WrapData(b []byte) []byte {
|
||||
pbdata := new(PBData)
|
||||
typ := PBData_Raw
|
||||
pbdata.Data = b
|
||||
pbdata.Type = &typ
|
||||
|
||||
out, err := proto.Marshal(pbdata)
|
||||
if err != nil {
|
||||
// This shouldnt happen. seriously.
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
@ -2,8 +2,9 @@ package merkledag
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
"testing"
|
||||
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
func TestNode(t *testing.T) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user