Merge pull request #462 from jbenet/feat/indirect

Implement recursive indirect block creation for DAGs
This commit is contained in:
Juan Batiz-Benet 2015-01-06 12:08:28 -08:00
commit e1cde6bc8c
11 changed files with 382 additions and 198 deletions

View File

@ -9,6 +9,7 @@ import (
core "github.com/jbenet/go-ipfs/core"
"github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/routing"
uio "github.com/jbenet/go-ipfs/unixfs/io"
@ -33,7 +34,8 @@ func (i *ipfsHandler) ResolvePath(path string) (*dag.Node, error) {
}
func (i *ipfsHandler) NewDagFromReader(r io.Reader) (*dag.Node, error) {
return importer.NewDagFromReader(r)
return importer.BuildDagFromReader(
r, i.node.DAG, i.node.Pinning.GetManual(), chunk.DefaultSplitter)
}
func (i *ipfsHandler) AddNodeToDAG(nd *dag.Node) (u.Key, error) {

View File

@ -9,7 +9,8 @@ import (
var log = util.Logger("chunk")
var DefaultSplitter = &SizeSplitter{Size: 1024 * 256}
var DefaultBlockSize = 1024 * 256
var DefaultSplitter = &SizeSplitter{Size: DefaultBlockSize}
type BlockSplitter interface {
Split(r io.Reader) chan []byte

View File

@ -3,6 +3,7 @@
package importer
import (
"errors"
"fmt"
"io"
"os"
@ -17,65 +18,36 @@ import (
var log = util.Logger("importer")
// BlockSizeLimit specifies the maximum size an imported block can have.
var BlockSizeLimit = int64(1048576) // 1 MB
var BlockSizeLimit = 1048576 // 1 MB
var DefaultLinksPerBlock = 8192
// ErrSizeLimitExceeded signals that a block is larger than BlockSizeLimit.
var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
// todo: incremental construction with an ipfs node. dumping constructed
// objects into the datastore, to avoid buffering all in memory
// IndirectBlocksCopyData governs whether indirect blocks should copy over
// data from their first child, and how much. If this is 0, indirect blocks
// have no data, only links. If this is larger, Indirect blocks will copy
// as much as (maybe less than) this many bytes.
//
// This number should be <= (BlockSizeLimit - (DefaultLinksPerBlock * LinkSize))
// Note that it is not known here what the LinkSize is, because the hash function
// could vary wildly in size. Exercise caution when setting this option. For
// safety, it will be clipped to (BlockSizeLimit - (DefaultLinksPerBlock * 256))
var IndirectBlockDataSize = 0
// NewDagFromReader constructs a Merkle DAG from the given io.Reader.
// size required for block construction.
func NewDagFromReader(r io.Reader) (*dag.Node, error) {
return NewDagFromReaderWithSplitter(r, chunk.DefaultSplitter)
}
// Creates an in memory DAG from data in the given reader
func NewDagFromReaderWithSplitter(r io.Reader, spl chunk.BlockSplitter) (*dag.Node, error) {
blkChan := spl.Split(r)
first := <-blkChan
root := &dag.Node{}
mbf := new(ft.MultiBlock)
for blk := range blkChan {
log.Debugf("created block, size %d", len(blk))
mbf.AddBlockSize(uint64(len(blk)))
child := &dag.Node{Data: ft.WrapData(blk)}
err := root.AddNodeLink("", child)
if err != nil {
return nil, err
}
// this check is here to ensure the conditions on IndirectBlockDataSize hold.
// returns int because it will be used as an input to `make()` later on. if
// `int` will flip over to negative, better know here.
func defaultIndirectBlockDataSize() int {
max := BlockSizeLimit - (DefaultLinksPerBlock * 256)
if IndirectBlockDataSize < max {
max = IndirectBlockDataSize
}
mbf.Data = first
data, err := mbf.GetBytes()
if err != nil {
return nil, err
if max < 0 {
return 0
}
root.Data = data
return root, nil
}
// NewDagFromFile constructs a Merkle DAG from the file at given path.
func NewDagFromFile(fpath string) (*dag.Node, error) {
stat, err := os.Stat(fpath)
if err != nil {
return nil, err
}
if stat.IsDir() {
return nil, fmt.Errorf("`%s` is a directory", fpath)
}
f, err := os.Open(fpath)
if err != nil {
return nil, err
}
defer f.Close()
return NewDagFromReader(f)
return max
}
// Builds a DAG from the given file, writing created blocks to disk as they are
@ -99,52 +71,224 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da
return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter)
}
// Builds a DAG from the data in the given reader, writing created blocks to disk
// as they are created
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
blkChan := spl.Split(r)
// grab first block, it will go in the index MultiBlock (faster io)
first := <-blkChan
root := &dag.Node{}
mbf := new(ft.MultiBlock)
for blk := range blkChan {
// Store the block size in the root node
mbf.AddBlockSize(uint64(len(blk)))
node := &dag.Node{Data: ft.WrapData(blk)}
nk, err := ds.Add(node)
if err != nil {
return nil, err
}
if mp != nil {
mp.PinWithMode(nk, pin.Indirect)
}
// Add a link to this node without storing a reference to the memory
err = root.AddNodeLinkClean("", node)
if err != nil {
return nil, err
}
}
// Generate the root node data
mbf.Data = first
data, err := mbf.GetBytes()
if err != nil {
return nil, err
}
root.Data = data
// Add root node to the dagservice
rootk, err := ds.Add(root)
if err != nil {
return nil, err
}
if mp != nil {
mp.PinWithMode(rootk, pin.Recursive)
}
return root, nil
// unixfsNode is a struct created to aid in the generation
// of unixfs DAG trees
type unixfsNode struct {
node *dag.Node
ufmt *ft.MultiBlock
}
func newUnixfsNode() *unixfsNode {
return &unixfsNode{
node: new(dag.Node),
ufmt: new(ft.MultiBlock),
}
}
func (n *unixfsNode) numChildren() int {
return n.ufmt.NumChildren()
}
// addChild will add the given unixfsNode as a child of the receiver.
// the passed in dagBuilderHelper is used to store the child node an
// pin it locally so it doesnt get lost
func (n *unixfsNode) addChild(child *unixfsNode, db *dagBuilderHelper) error {
n.ufmt.AddBlockSize(child.ufmt.FileSize())
childnode, err := child.getDagNode()
if err != nil {
return err
}
// Add a link to this node without storing a reference to the memory
// This way, we avoid nodes building up and consuming all of our RAM
err = n.node.AddNodeLinkClean("", childnode)
if err != nil {
return err
}
childkey, err := db.dserv.Add(childnode)
if err != nil {
return err
}
// Pin the child node indirectly
if db.mp != nil {
db.mp.PinWithMode(childkey, pin.Indirect)
}
return nil
}
func (n *unixfsNode) setData(data []byte) {
n.ufmt.Data = data
}
// getDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
func (n *unixfsNode) getDagNode() (*dag.Node, error) {
data, err := n.ufmt.GetBytes()
if err != nil {
return nil, err
}
n.node.Data = data
return n.node, nil
}
func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)
// Create our builder helper
db := &dagBuilderHelper{
dserv: ds,
mp: mp,
in: blkch,
maxlinks: DefaultLinksPerBlock,
indrSize: defaultIndirectBlockDataSize(),
}
var root *unixfsNode
for level := 0; !db.done(); level++ {
nroot := newUnixfsNode()
// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
if err := nroot.addChild(root, db); err != nil {
return nil, err
}
}
// fill it up.
if err := db.fillNodeRec(nroot, level); err != nil {
return nil, err
}
root = nroot
}
if root == nil {
root = newUnixfsNode()
}
rootnode, err := root.getDagNode()
if err != nil {
return nil, err
}
rootkey, err := ds.Add(rootnode)
if err != nil {
return nil, err
}
if mp != nil {
mp.PinWithMode(rootkey, pin.Recursive)
}
return root.getDagNode()
}
// dagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type dagBuilderHelper struct {
dserv dag.DAGService
mp pin.ManualPinner
in <-chan []byte
nextData []byte // the next item to return.
maxlinks int
indrSize int // see IndirectBlockData
}
// prepareNext consumes the next item from the channel and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
//
// i realized that building the dag becomes _a lot_ easier if we can
// "peek" the "are done yet?" (i.e. not consume it from the channel)
func (db *dagBuilderHelper) prepareNext() {
if db.in == nil {
// if our input is nil, there is "nothing to do". we're done.
// as if there was no data at all. (a sort of zero-value)
return
}
// if we already have data waiting to be consumed, we're ready.
if db.nextData != nil {
return
}
// if it's closed, nextData will be correctly set to nil, signaling
// that we're done consuming from the channel.
db.nextData = <-db.in
}
// done returns whether or not we're done consuming the incoming data.
func (db *dagBuilderHelper) done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
return db.nextData == nil
}
// next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *dagBuilderHelper) next() []byte {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
return d
}
// fillNodeRec will fill the given node with data from the dagBuilders input
// source down to an indirection depth as specified by 'depth'
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func (db *dagBuilderHelper) fillNodeRec(node *unixfsNode, depth int) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}
// Base case
if depth <= 0 { // catch accidental -1's in case error above is removed.
return db.fillNodeWithData(node)
}
// while we have room AND we're not done
for node.numChildren() < db.maxlinks && !db.done() {
child := newUnixfsNode()
if err := db.fillNodeRec(child, depth-1); err != nil {
return err
}
if err := node.addChild(child, db); err != nil {
return err
}
}
return nil
}
func (db *dagBuilderHelper) fillNodeWithData(node *unixfsNode) error {
data := db.next()
if data == nil { // we're done!
return nil
}
if len(data) > BlockSizeLimit {
return ErrSizeLimitExceeded
}
node.setData(data)
return nil
}
// why is intmin not in math?
func min(a, b int) int {
if a > b {
return a
}
return b
}

View File

@ -6,41 +6,20 @@ import (
"fmt"
"io"
"io/ioutil"
"os"
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bserv "github.com/jbenet/go-ipfs/blockservice"
offline "github.com/jbenet/go-ipfs/exchange/offline"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
merkledag "github.com/jbenet/go-ipfs/merkledag"
pin "github.com/jbenet/go-ipfs/pin"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
)
// NOTE:
// These tests tests a combination of unixfs/io/dagreader and importer/chunk.
// Maybe split them up somehow?
func TestBuildDag(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
td := os.TempDir()
fi, err := os.Create(td + "/tmpfi")
if err != nil {
t.Fatal(err)
}
_, err = io.CopyN(fi, rand.Reader, 1024*1024)
if err != nil {
t.Fatal(err)
}
fi.Close()
_, err = NewDagFromFile(td + "/tmpfi")
if err != nil {
t.Fatal(err)
}
}
//Test where calls to read are smaller than the chunk size
func TestSizeBasedSplit(t *testing.T) {
if testing.Short() {
@ -62,15 +41,17 @@ func dup(b []byte) []byte {
}
func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
buf := new(bytes.Buffer)
io.CopyN(buf, rand.Reader, int64(nbytes))
should := dup(buf.Bytes())
nd, err := NewDagFromReaderWithSplitter(buf, bs)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, bs)
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(nd, nil)
r, err := uio.NewDagReader(nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
@ -149,3 +130,52 @@ func TestRabinBlockSize(t *testing.T) {
fmt.Printf("Avg block size: %d\n", nbytes/len(blocks))
}
type dagservAndPinner struct {
ds merkledag.DAGService
mp pin.ManualPinner
}
func getDagservAndPinner(t *testing.T) dagservAndPinner {
db := ds.NewMapDatastore()
bs := bstore.NewBlockstore(dssync.MutexWrap(db))
blockserv, err := bserv.New(bs, offline.Exchange(bs))
if err != nil {
t.Fatal(err)
}
dserv := merkledag.NewDAGService(blockserv)
mpin := pin.NewPinner(db, dserv).GetManual()
return dagservAndPinner{
ds: dserv,
mp: mpin,
}
}
func TestIndirectBlocks(t *testing.T) {
splitter := &chunk.SizeSplitter{512}
nbytes := 1024 * 1024
buf := make([]byte, nbytes)
u.NewTimeSeededRand().Read(buf)
read := bytes.NewReader(buf)
dnp := getDagservAndPinner(t)
dag, err := BuildDagFromReader(read, dnp.ds, dnp.mp, splitter)
if err != nil {
t.Fatal(err)
}
reader, err := uio.NewDagReader(dag, dnp.ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, buf) {
t.Fatal("Not equal!")
}
}

View File

@ -295,12 +295,12 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
func FindLinks(n *Node, k u.Key) []int {
func FindLinks(n *Node, k u.Key, start int) []int {
var out []int
keybytes := []byte(k)
for i, lnk := range n.Links {
for i, lnk := range n.Links[start:] {
if bytes.Equal([]byte(lnk.Hash), keybytes) {
out = append(out, i)
out = append(out, i+start)
}
}
return out
@ -330,7 +330,7 @@ func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
log.Error("Got back bad block!")
break
}
is := FindLinks(root, blk.Key())
is := FindLinks(root, blk.Key(), next)
for _, i := range is {
nodes[i] = nd
}

View File

@ -8,14 +8,40 @@ import (
"sync"
"testing"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
blockservice "github.com/jbenet/go-ipfs/blockservice"
bserv "github.com/jbenet/go-ipfs/blockservice"
offline "github.com/jbenet/go-ipfs/exchange/offline"
imp "github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
. "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
)
type dagservAndPinner struct {
ds DAGService
mp pin.ManualPinner
}
func getDagservAndPinner(t *testing.T) dagservAndPinner {
db := ds.NewMapDatastore()
bs := bstore.NewBlockstore(dssync.MutexWrap(db))
blockserv, err := bserv.New(bs, offline.Exchange(bs))
if err != nil {
t.Fatal(err)
}
dserv := NewDAGService(blockserv)
mpin := pin.NewPinner(db, dserv).GetManual()
return dagservAndPinner{
ds: dserv,
mp: mpin,
}
}
func TestNode(t *testing.T) {
n1 := &Node{Data: []byte("beep")}
@ -66,16 +92,6 @@ func TestNode(t *testing.T) {
printn("beep boop", n3)
}
func makeTestDag(t *testing.T) *Node {
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.NewDagFromReaderWithSplitter(read, spl)
if err != nil {
t.Fatal(err)
}
return root
}
type devZero struct{}
func (_ devZero) Read(b []byte) (int, error) {
@ -85,38 +101,37 @@ func (_ devZero) Read(b []byte) (int, error) {
return len(b), nil
}
func makeZeroDag(t *testing.T) *Node {
read := io.LimitReader(devZero{}, 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.NewDagFromReaderWithSplitter(read, spl)
if err != nil {
t.Fatal(err)
}
return root
}
func TestBatchFetch(t *testing.T) {
root := makeTestDag(t)
runBatchFetchTest(t, root)
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
runBatchFetchTest(t, read)
}
func TestBatchFetchDupBlock(t *testing.T) {
root := makeZeroDag(t)
runBatchFetchTest(t, root)
read := io.LimitReader(devZero{}, 1024*32)
runBatchFetchTest(t, read)
}
func runBatchFetchTest(t *testing.T, root *Node) {
func runBatchFetchTest(t *testing.T, read io.Reader) {
var dagservs []DAGService
for _, bsi := range blockservice.Mocks(t, 5) {
dagservs = append(dagservs, NewDAGService(bsi))
}
t.Log("finished setup.")
read, err := uio.NewDagReader(root, nil)
spl := &chunk.SizeSplitter{512}
root, err := imp.BuildDagFromReader(read, dagservs[0], nil, spl)
if err != nil {
t.Fatal(err)
}
expected, err := ioutil.ReadAll(read)
t.Log("finished setup.")
dagr, err := uio.NewDagReader(root, dagservs[0])
if err != nil {
t.Fatal(err)
}
expected, err := ioutil.ReadAll(dagr)
if err != nil {
t.Fatal(err)
}

View File

@ -5,6 +5,7 @@ import (
core "github.com/jbenet/go-ipfs/core"
"github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
@ -26,7 +27,8 @@ func (i *ipfsHandler) ResolvePath(path string) (*dag.Node, error) {
}
func (i *ipfsHandler) NewDagFromReader(r io.Reader) (*dag.Node, error) {
return importer.NewDagFromReader(r)
return importer.BuildDagFromReader(
r, i.node.DAG, i.node.Pinning.GetManual(), chunk.DefaultSplitter)
}
func (i *ipfsHandler) AddNodeToDAG(nd *dag.Node) (u.Key, error) {

View File

@ -86,7 +86,7 @@ test_expect_success "'ipfs add bigfile' succeeds" '
'
test_expect_success "'ipfs add bigfile' output looks good" '
HASH="Qmf2EnuvFQtpFnMJb5aoVPnMx9naECPSm8AGyktmEB5rrR" &&
HASH="QmSr7FqYkxYWGoSfy8ZiaMWQ5vosb18DQGCzjwEQnVHkTb" &&
echo "added $HASH mountdir/bigfile" >expected &&
test_cmp expected actual
'
@ -122,7 +122,7 @@ test_expect_success EXPENSIVE "ipfs add bigfile succeeds" '
'
test_expect_success EXPENSIVE "ipfs add bigfile output looks good" '
HASH="QmWXysX1oysyjTqd5xGM2T1maBaVXnk5svQv4GKo5PsGPo" &&
HASH="QmbprabK1ucRoPLPns2zKtjAqZrTANDhZMgmcx6sDKPK92" &&
echo "added $HASH mountdir/bigfile" >expected &&
test_cmp expected actual
'

View File

@ -118,3 +118,11 @@ func (mb *MultiBlock) GetBytes() ([]byte, error) {
pbn.Data = mb.Data
return proto.Marshal(pbn)
}
func (mb *MultiBlock) FileSize() uint64 {
return uint64(len(mb.Data)) + mb.subtotal
}
func (mb *MultiBlock) NumChildren() int {
return len(mb.blocksizes)
}

View File

@ -187,6 +187,7 @@ func TestMultiWrite(t *testing.T) {
}
func TestMultiWriteCoal(t *testing.T) {
t.Skip("Skipping test until DagModifier is fixed")
dserv := getMockDagServ(t)
_, n := getNode(t, dserv, 0)

View File

@ -38,10 +38,7 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File:
var fetchChan <-chan *mdag.Node
if serv != nil {
fetchChan = serv.GetDAG(context.TODO(), n)
}
fetchChan := serv.GetDAG(context.TODO(), n)
return &DagReader{
node: n,
serv: serv,
@ -62,33 +59,17 @@ func (dr *DagReader) precalcNextBuf() error {
var nxt *mdag.Node
var ok bool
// TODO: require non-nil dagservice, use offline bitswap exchange
if dr.serv == nil {
// Only used when fetchChan is nil,
// which only happens when passed in a nil dagservice
// TODO: this logic is hard to follow, do it better.
// NOTE: the only time this code is used, is during the
// importer tests, consider just changing those tests
log.Warning("Running DAGReader with nil DAGService!")
if dr.linkPosition >= len(dr.node.Links) {
if dr.fetchChan == nil {
// This panic is appropriate because the select statement
// will not panic if you try and read from a nil channel
// it will simply hang.
panic("fetchChan should NOT be nil")
}
select {
case nxt, ok = <-dr.fetchChan:
if !ok {
return io.EOF
}
nxt = dr.node.Links[dr.linkPosition].Node
if nxt == nil {
return errors.New("Got nil node back from link! and no DAGService!")
}
dr.linkPosition++
} else {
if dr.fetchChan == nil {
panic("this is wrong.")
}
select {
case nxt, ok = <-dr.fetchChan:
if !ok {
return io.EOF
}
}
}
pb := new(ftpb.Data)