From 006b68b558c0de83db2c5519caae4a4cfc3a46ea Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 26 Sep 2014 00:09:27 -0700 Subject: [PATCH] WIP: getting closer to being able to write in ipns dirs --- fuse/ipns/ipns_unix.go | 22 +++++++++++++++++----- importer/importer.go | 23 ++++++++++++++++++++++- merkledag/merkledag.go | 38 +++++++++++++++++++++++++++++++------- routing/dht/routing.go | 2 +- 4 files changed, 71 insertions(+), 14 deletions(-) diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index d8cff4413..56e2dfb8e 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -296,23 +296,35 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error { // This operation holds everything in memory, // should be changed to stream the block creation/storage // but for now, since the buf is all in memory anyways... - nnode, err := imp.NewDagFromReader(n.dataBuf) + err := imp.NewDagInNode(n.dataBuf, n.Nd) if err != nil { log.Error("ipns Flush error: %s", err) // return fuse.EVERYBAD return fuse.ENODATA } - err = n.Ipfs.DAG.AddRecursive(nnode) + var root *Node + if n.nsRoot != nil { + root = n.nsRoot + } else { + root = n + } + + err = root.Nd.Update() + if err != nil { + log.Error("ipns dag tree update failed: %s", err) + return fuse.ENODATA + } + + err = n.Ipfs.DAG.AddRecursive(root.Nd) if err != nil { log.Critical("ipns Dag Add Error: %s", err) } - n.Nd = nnode n.changed = false n.dataBuf = nil - ndkey, err := nnode.Key() + ndkey, err := root.Nd.Key() if err != nil { log.Error("getKey error: %s", err) // return fuse.ETHISREALLYSUCKS @@ -320,7 +332,7 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error { } log.Debug("Publishing changes!") - err = n.Ipfs.Publisher.Publish(n.key, ndkey) + err = n.Ipfs.Publisher.Publish(root.key, ndkey) if err != nil { log.Error("ipns Publish Failed: %s", err) } diff --git a/importer/importer.go b/importer/importer.go index 197eaef19..5a60167ac 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -14,13 +14,15 @@ var BlockSizeLimit = int64(1048576) // 1 MB // ErrSizeLimitExceeded signals that a block is larger than BlockSizeLimit. var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded") +var DefaultSplitter = &SizeSplitter{1024 * 512} + // todo: incremental construction with an ipfs node. dumping constructed // objects into the datastore, to avoid buffering all in memory // NewDagFromReader constructs a Merkle DAG from the given io.Reader. // size required for block construction. func NewDagFromReader(r io.Reader) (*dag.Node, error) { - return NewDagFromReaderWithSplitter(r, &SizeSplitter{1024 * 512}) + return NewDagFromReaderWithSplitter(r, DefaultSplitter) } func NewDagFromReaderWithSplitter(r io.Reader, spl BlockSplitter) (*dag.Node, error) { @@ -58,3 +60,22 @@ func NewDagFromFile(fpath string) (*dag.Node, error) { return NewDagFromReader(f) } + +// TODO: this needs a better name +func NewDagInNode(r io.Reader, n *dag.Node) error { + n.Links = nil + + blkChan := DefaultSplitter.Split(r) + first := <-blkChan + n.Data = first + + for blk := range blkChan { + child := &dag.Node{Data: dag.WrapData(blk)} + err := n.AddNodeLink("", child) + if err != nil { + return err + } + } + + return nil +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 1cc262783..4c57fb95c 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -12,7 +12,7 @@ import ( u "github.com/jbenet/go-ipfs/util" ) -var log = logging.MustGetLogger("commands") +var log = logging.MustGetLogger("merkledag") // NodeMap maps u.Keys to Nodes. // We cannot use []byte/Multihash for keys :( @@ -96,6 +96,31 @@ func (n *Node) Key() (u.Key, error) { return u.Key(h), err } +// Recursively update all hash links and size values in the tree +func (n *Node) Update() error { + log.Debug("node update") + for _, l := range n.Links { + if l.Node != nil { + err := l.Node.Update() + if err != nil { + return err + } + nhash, err := l.Node.Multihash() + if err != nil { + return err + } + l.Hash = nhash + size, err := l.Node.Size() + if err != nil { + return err + } + l.Size = size + } + } + _, err := n.Encoded(true) + return err +} + // DAGService is an IPFS Merkle DAG service. // - the root is virtual (like a forest) // - stores nodes' data in a BlockService @@ -134,12 +159,11 @@ func (n *DAGService) AddRecursive(nd *Node) error { } for _, link := range nd.Links { - if link.Node == nil { - panic("Why does this node have a nil link?\n") - } - err := n.AddRecursive(link.Node) - if err != nil { - return err + if link.Node != nil { + err := n.AddRecursive(link.Node) + if err != nil { + return err + } } } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 65e4e3b54..e7f98ad4e 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -18,7 +18,7 @@ import ( // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error { - log.Debug("[%s] PutValue %v %v", dht.self.ID.Pretty(), key, value) + log.Debug("[%s] PutValue %v %v", dht.self.ID.Pretty(), key.Pretty(), value) err := dht.putLocal(key, value) if err != nil { return err