From 18ada93ec333dfd475139395bbe51e48fa5e61fb Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 30 Oct 2014 03:13:08 +0000 Subject: [PATCH] rewrite add command to use dagwriter, moved a pinner into the dagwriter for inline pinning --- core/commands/add.go | 4 ++-- importer/importer.go | 41 +++++++++++++++++++++++++++++++++++++++++ pin/pin.go | 26 ++++++++++++++++++++++++++ unixfs/io/dagwriter.go | 12 ++++++++++-- 4 files changed, 79 insertions(+), 4 deletions(-) diff --git a/core/commands/add.go b/core/commands/add.go index b6d1708f8..8bf5a66a1 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -87,7 +87,7 @@ func addDir(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Node } func addFile(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Node, error) { - root, err := importer.NewDagFromFile(fpath) + root, err := importer.NewDagFromFileWServer(fpath, n.DAG, n.Pinning) if err != nil { return nil, err } @@ -98,7 +98,7 @@ func addFile(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Nod log.Info("adding subblock: %s %s", l.Name, l.Hash.B58String()) } - return root, addNode(n, root, fpath, out) + return root, nil } // addNode adds the node to the graph + local storage diff --git a/importer/importer.go b/importer/importer.go index 0a4d9848e..b417555de 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -1,13 +1,16 @@ package importer import ( + "errors" "fmt" "io" "os" "github.com/jbenet/go-ipfs/importer/chunk" dag "github.com/jbenet/go-ipfs/merkledag" + "github.com/jbenet/go-ipfs/pin" ft "github.com/jbenet/go-ipfs/unixfs" + uio "github.com/jbenet/go-ipfs/unixfs/io" "github.com/jbenet/go-ipfs/util" ) @@ -72,3 +75,41 @@ func NewDagFromFile(fpath string) (*dag.Node, error) { return NewDagFromReader(f) } + +func NewDagFromFileWServer(fpath string, dserv dag.DAGService, p pin.Pinner) (*dag.Node, error) { + stat, err := os.Stat(fpath) + if err != nil { + return nil, err + } + + if stat.IsDir() { + return nil, fmt.Errorf("`%s` is a directory", fpath) + } + + f, err := os.Open(fpath) + if err != nil { + return nil, err + } + defer f.Close() + + return NewDagFromReaderWServer(f, dserv, p) +} + +func NewDagFromReaderWServer(r io.Reader, dserv dag.DAGService, p pin.Pinner) (*dag.Node, error) { + dw := uio.NewDagWriter(dserv, chunk.DefaultSplitter) + + mp, ok := p.(pin.ManualPinner) + if !ok { + return nil, errors.New("Needed to be passed a manual pinner!") + } + dw.Pinner = mp + _, err := io.Copy(dw, r) + if err != nil { + return nil, err + } + err = dw.Close() + if err != nil { + return nil, err + } + return dw.GetNode(), nil +} diff --git a/pin/pin.go b/pin/pin.go index a3f0e260b..dba14a977 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -20,6 +20,14 @@ var recursePinDatastoreKey = ds.NewKey("/local/pins/recursive/keys") var directPinDatastoreKey = ds.NewKey("/local/pins/direct/keys") var indirectPinDatastoreKey = ds.NewKey("/local/pins/indirect/keys") +type PinMode int + +const ( + Recursive PinMode = iota + Direct + Indirect +) + type Pinner interface { IsPinned(util.Key) bool Pin(*mdag.Node, bool) error @@ -27,6 +35,13 @@ type Pinner interface { Flush() error } +// ManualPinner is for manually editing the pin structure +// Use with care +type ManualPinner interface { + PinWithMode(util.Key, PinMode) + Pinner +} + type pinner struct { lock sync.RWMutex recursePin set.BlockSet @@ -228,3 +243,14 @@ func loadSet(d ds.Datastore, k ds.Key, val interface{}) error { } return json.Unmarshal(bf, val) } + +func (p *pinner) PinWithMode(k util.Key, mode PinMode) { + switch mode { + case Recursive: + p.recursePin.AddBlock(k) + case Direct: + p.directPin.AddBlock(k) + case Indirect: + p.indirPin.Increment(k) + } +} diff --git a/unixfs/io/dagwriter.go b/unixfs/io/dagwriter.go index c9b91cc58..6575b1edf 100644 --- a/unixfs/io/dagwriter.go +++ b/unixfs/io/dagwriter.go @@ -3,6 +3,7 @@ package io import ( "github.com/jbenet/go-ipfs/importer/chunk" dag "github.com/jbenet/go-ipfs/merkledag" + "github.com/jbenet/go-ipfs/pin" ft "github.com/jbenet/go-ipfs/unixfs" "github.com/jbenet/go-ipfs/util" ) @@ -17,6 +18,7 @@ type DagWriter struct { done chan struct{} splitter chunk.BlockSplitter seterr error + Pinner pin.ManualPinner } func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter { @@ -48,7 +50,10 @@ func (dw *DagWriter) startSplitter() { // Store the block size in the root node mbf.AddBlockSize(uint64(len(blkData))) node := &dag.Node{Data: ft.WrapData(blkData)} - _, err := dw.dagserv.Add(node) + nk, err := dw.dagserv.Add(node) + if dw.Pinner != nil { + dw.Pinner.PinWithMode(nk, pin.Indirect) + } if err != nil { dw.seterr = err log.Critical("Got error adding created node to dagservice: %s", err) @@ -75,12 +80,15 @@ func (dw *DagWriter) startSplitter() { root.Data = data // Add root node to the dagservice - _, err = dw.dagserv.Add(root) + rootk, err := dw.dagserv.Add(root) if err != nil { dw.seterr = err log.Critical("Got error adding created node to dagservice: %s", err) return } + if dw.Pinner != nil { + dw.Pinner.PinWithMode(rootk, pin.Recursive) + } dw.node = root dw.done <- struct{}{} }