Merge pull request #3846 from ipfs/feat/pin-update

implement ipfs pin update
This commit is contained in:
Jeromy Johnson 2017-05-17 22:44:31 -07:00 committed by GitHub
commit 8e2aed3023
8 changed files with 430 additions and 22 deletions

View File

@ -7,7 +7,6 @@ import (
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
dagutils "github.com/ipfs/go-ipfs/merkledag/utils"
path "github.com/ipfs/go-ipfs/path"
)
@ -86,19 +85,7 @@ Example:
return
}
pbobj_a, ok := obj_a.(*dag.ProtoNode)
if !ok {
res.SetError(dag.ErrNotProtobuf, cmds.ErrNormal)
return
}
pbobj_b, ok := obj_b.(*dag.ProtoNode)
if !ok {
res.SetError(dag.ErrNotProtobuf, cmds.ErrNormal)
return
}
changes, err := dagutils.Diff(ctx, node.DAG, pbobj_a, pbobj_b)
changes, err := dagutils.Diff(ctx, node.DAG, obj_a, obj_b)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return

View File

@ -24,9 +24,10 @@ var PinCmd = &cmds.Command{
},
Subcommands: map[string]*cmds.Command{
"add": addPinCmd,
"rm": rmPinCmd,
"ls": listPinCmd,
"add": addPinCmd,
"rm": rmPinCmd,
"ls": listPinCmd,
"update": updatePinCmd,
},
}
@ -332,6 +333,83 @@ Example:
},
}
var updatePinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Update a recursive pin",
ShortDescription: `
Updates one pin to another, making sure that all objects in the new pin are
local. Then removes the old pin. This is an optimized version of adding the
new pin and removing the old one.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("from-path", true, false, "Path to old object."),
cmds.StringArg("to-path", true, false, "Path to new object to be pinned."),
},
Options: []cmds.Option{
cmds.BoolOption("unpin", "Remove the old pin.").Default(true),
},
Type: PinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
unpin, _, err := req.Option("unpin").Bool()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
from, err := path.ParsePath(req.Arguments()[0])
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
to, err := path.ParsePath(req.Arguments()[1])
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
fromc, err := core.ResolveToCid(req.Context(), n, from)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
toc, err := core.ResolveToCid(req.Context(), n, to)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
err = n.Pinning.Update(req.Context(), fromc, toc, unpin)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetOutput(&PinOutput{Pins: []string{from.String(), to.String()}})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
added, ok := res.Output().(*PinOutput)
if !ok {
return nil, u.ErrCast()
}
buf := new(bytes.Buffer)
fmt.Fprintf(buf, "updated %s to %s\n", added.Pins[0], added.Pins[1])
return buf, nil
},
},
}
type RefKeyObject struct {
Type string
}

View File

@ -153,7 +153,7 @@ func (n *dagService) Remove(nd node.Node) error {
// GetLinksDirect creates a function to get the links for a node, from
// the node, bypassing the LinkService. If the node does not exist
// locally (and can not be retrieved) an error will be returned.
func GetLinksDirect(serv DAGService) GetLinks {
func GetLinksDirect(serv node.NodeGetter) GetLinks {
return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
node, err := serv.Get(ctx, c)
if err != nil {

View File

@ -1,13 +1,14 @@
package dagutils
import (
"context"
"fmt"
"path"
dag "github.com/ipfs/go-ipfs/merkledag"
context "context"
cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format"
)
const (
@ -87,7 +88,8 @@ func ApplyChange(ctx context.Context, ds dag.DAGService, nd *dag.ProtoNode, cs [
return e.Finalize(ds)
}
func Diff(ctx context.Context, ds dag.DAGService, a, b *dag.ProtoNode) ([]*Change, error) {
// Diff returns a set of changes that transform node 'a' into node 'b'
func Diff(ctx context.Context, ds dag.DAGService, a, b node.Node) ([]*Change, error) {
if len(a.Links()) == 0 && len(b.Links()) == 0 {
return []*Change{
&Change{
@ -104,7 +106,7 @@ func Diff(ctx context.Context, ds dag.DAGService, a, b *dag.ProtoNode) ([]*Chang
// strip out unchanged stuff
for _, lnk := range a.Links() {
l, err := b.GetNodeLink(lnk.Name)
l, _, err := b.ResolveLink([]string{lnk.Name})
if err == nil {
if l.Cid.Equals(lnk.Cid) {
// no change... ignore it

View File

@ -0,0 +1,91 @@
package dagutils
import (
"context"
"fmt"
mdag "github.com/ipfs/go-ipfs/merkledag"
cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format"
)
// DiffEnumerate fetches every object in the graph pointed to by 'to' that is
// not in 'from'. This can be used to more efficiently fetch a graph if you can
// guarantee you already have the entirety of 'from'
func DiffEnumerate(ctx context.Context, dserv node.NodeGetter, from, to *cid.Cid) error {
fnd, err := dserv.Get(ctx, from)
if err != nil {
return fmt.Errorf("get %s: %s", from, err)
}
tnd, err := dserv.Get(ctx, to)
if err != nil {
return fmt.Errorf("get %s: %s", to, err)
}
diff := getLinkDiff(fnd, tnd)
sset := cid.NewSet()
for _, c := range diff {
// Since we're already assuming we have everything in the 'from' graph,
// add all those cids to our 'already seen' set to avoid potentially
// enumerating them later
if c.bef != nil {
sset.Add(c.bef)
}
}
for _, c := range diff {
if c.bef == nil {
if sset.Has(c.aft) {
continue
}
err := mdag.EnumerateChildrenAsync(ctx, mdag.GetLinksDirect(dserv), c.aft, sset.Visit)
if err != nil {
return err
}
} else {
err := DiffEnumerate(ctx, dserv, c.bef, c.aft)
if err != nil {
return err
}
}
}
return nil
}
// if both bef and aft are not nil, then that signifies bef was replaces with aft.
// if bef is nil and aft is not, that means aft was newly added
// if aft is nil and bef is not, that means bef was deleted
type diffpair struct {
bef, aft *cid.Cid
}
// getLinkDiff returns a changeset between nodes 'a' and 'b'. Currently does
// not log deletions as our usecase doesnt call for this.
func getLinkDiff(a, b node.Node) []diffpair {
have := make(map[string]*node.Link)
names := make(map[string]*node.Link)
for _, l := range a.Links() {
have[l.Cid.KeyString()] = l
names[l.Name] = l
}
var out []diffpair
for _, l := range b.Links() {
if have[l.Cid.KeyString()] != nil {
continue
}
match, ok := names[l.Name]
if !ok {
out = append(out, diffpair{aft: l.Cid})
continue
}
out = append(out, diffpair{bef: match.Cid, aft: l.Cid})
}
return out
}

View File

@ -0,0 +1,190 @@
package dagutils
import (
"context"
"fmt"
"testing"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid"
node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format"
)
func buildNode(name string, desc map[string]ndesc, out map[string]node.Node) node.Node {
this := desc[name]
nd := new(dag.ProtoNode)
nd.SetData([]byte(name))
for k, v := range this {
child, ok := out[v]
if !ok {
child = buildNode(v, desc, out)
out[v] = child
}
if err := nd.AddNodeLink(k, child); err != nil {
panic(err)
}
}
return nd
}
type ndesc map[string]string
func mkGraph(desc map[string]ndesc) map[string]node.Node {
out := make(map[string]node.Node)
for name := range desc {
if _, ok := out[name]; ok {
continue
}
out[name] = buildNode(name, desc, out)
}
return out
}
var tg1 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "c",
},
"c": ndesc{},
}
var tg2 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "c",
},
"c": ndesc{"baz": "d"},
"d": ndesc{},
}
var tg3 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
"bar": "c",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "d",
},
"c": ndesc{},
"d": ndesc{},
}
func TestDiffEnumBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg1)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, nd := range nds {
_, err := ds.Add(nd)
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != nil {
t.Fatal(err)
}
err = assertCidList(lgds.log, []*cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid()})
if err != nil {
t.Fatal(err)
}
}
type getLogger struct {
ds node.NodeGetter
log []*cid.Cid
}
func (gl *getLogger) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
nd, err := gl.ds.Get(ctx, c)
if err != nil {
return nil, err
}
gl.log = append(gl.log, c)
return nd, nil
}
func assertCidList(a, b []*cid.Cid) error {
if len(a) != len(b) {
return fmt.Errorf("got different number of cids than expected")
}
for i, c := range a {
if !c.Equals(b[i]) {
return fmt.Errorf("expected %s, got %s", c, b[i])
}
}
return nil
}
func TestDiffEnumFail(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg2)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, s := range []string{"a1", "a2", "b", "c"} {
_, err := ds.Add(nds[s])
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != dag.ErrNotFound {
t.Fatal("expected err not found")
}
err = assertCidList(lgds.log, []*cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid()})
if err != nil {
t.Fatal(err)
}
}
func TestDiffEnumRecurse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg3)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, s := range []string{"a1", "a2", "b", "c", "d"} {
_, err := ds.Add(nds[s])
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != nil {
t.Fatal(err)
}
err = assertCidList(lgds.log, []*cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid(), nds["d"].Cid()})
if err != nil {
t.Fatal(err)
}
}

View File

@ -10,6 +10,7 @@ import (
"time"
mdag "github.com/ipfs/go-ipfs/merkledag"
dutils "github.com/ipfs/go-ipfs/merkledag/utils"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
@ -86,6 +87,11 @@ type Pinner interface {
Pin(context.Context, node.Node, bool) error
Unpin(context.Context, *cid.Cid, bool) error
// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
Update(ctx context.Context, from, to *cid.Cid, unpin bool) error
// Check if a set of keys are pinned, more efficient than
// calling IsPinned for each key
CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error)
@ -94,6 +100,7 @@ type Pinner interface {
// care! If used improperly, garbage collection may not be
// successful.
PinWithMode(*cid.Cid, PinMode)
// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
@ -447,6 +454,26 @@ func (p *pinner) RecursiveKeys() []*cid.Cid {
return p.recursePin.Keys()
}
func (p *pinner) Update(ctx context.Context, from, to *cid.Cid, unpin bool) error {
p.lock.Lock()
defer p.lock.Unlock()
if !p.recursePin.Has(from) {
return fmt.Errorf("'from' cid was not recursively pinned already")
}
err := dutils.DiffEnumerate(ctx, p.dserv, from, to)
if err != nil {
return err
}
p.recursePin.Add(to)
if unpin {
p.recursePin.Remove(from)
}
return nil
}
// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush() error {
p.lock.Lock()

View File

@ -1,6 +1,7 @@
package pin
import (
"context"
"testing"
"time"
@ -9,7 +10,6 @@ import (
"github.com/ipfs/go-ipfs/exchange/offline"
mdag "github.com/ipfs/go-ipfs/merkledag"
context "context"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
dssync "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
"gx/ipfs/QmWbjfz3u6HkAdPh34dgPchGbQjob6LXLhAeCGii2TX69n/go-ipfs-util"
@ -367,3 +367,36 @@ func TestPinRecursiveFail(t *testing.T) {
t.Fatal(err)
}
}
func TestPinUpdate(t *testing.T) {
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv, dserv)
n1, c1 := randNode()
n2, c2 := randNode()
dserv.Add(n1)
dserv.Add(n2)
ctx := context.Background()
if err := p.Pin(ctx, n1, true); err != nil {
t.Fatal(err)
}
if err := p.Update(ctx, c1, c2, true); err != nil {
t.Fatal(err)
}
assertPinned(t, p, c2, "c2 should be pinned now")
assertUnpinned(t, p, c1, "c1 should no longer be pinned")
if err := p.Update(ctx, c2, c1, false); err != nil {
t.Fatal(err)
}
assertPinned(t, p, c2, "c2 should be pinned still")
assertPinned(t, p, c1, "c1 should be pinned now")
}