Merge pull request #6771 from MichaelMure/extract-pinner

extract the pinner to go-ipfs-pinner and dagutils into go-merkledag
This commit is contained in:
Steven Allen 2019-12-02 15:57:26 -05:00 committed by GitHub
commit eb11f569b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 73 additions and 2827 deletions

View File

@ -48,9 +48,6 @@ ifneq ($(filter coverage% clean distclean test/unit/gotest.junit.xml,$(MAKECMDGO
include $(dir)/Rules.mk
endif
dir := pin/internal/pb
include $(dir)/Rules.mk
# -------------------- #
# universal rules #
# -------------------- #

View File

@ -6,10 +6,9 @@ import (
"fmt"
"io"
"github.com/ipfs/go-ipfs/pin"
cid "github.com/ipfs/go-cid"
bs "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-pinner"
)
// RemovedBlock is used to respresent the result of removing a block.

View File

@ -4,11 +4,11 @@ import (
"fmt"
"io"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/dagutils"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-merkledag/dagutils"
path "github.com/ipfs/interface-go-ipfs-core/path"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
)
const (
@ -70,7 +70,7 @@ Example:
out := make([]*dagutils.Change, len(changes))
for i, change := range changes {
out[i] = &dagutils.Change{
Type: change.Type,
Type: dagutils.ChangeType(change.Type),
Path: change.Path,
}

View File

@ -8,23 +8,23 @@ import (
"os"
"time"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
pin "github.com/ipfs/go-ipfs/pin"
bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidenc "github.com/ipfs/go-cidutil/cidenc"
cmds "github.com/ipfs/go-ipfs-cmds"
offline "github.com/ipfs/go-ipfs-exchange-offline"
pin "github.com/ipfs/go-ipfs-pinner"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
verifcid "github.com/ipfs/go-verifcid"
coreiface "github.com/ipfs/interface-go-ipfs-core"
options "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
)
var PinCmd = &cmds.Command{

View File

@ -14,15 +14,7 @@ import (
"io"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/node"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/fuse/mount"
"github.com/ipfs/go-ipfs/namesys"
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
"github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs-pinner"
bserv "github.com/ipfs/go-blockservice"
bstore "github.com/ipfs/go-ipfs-blockstore"
@ -47,6 +39,15 @@ import (
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/p2p/discovery"
p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/node"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/fuse/mount"
"github.com/ipfs/go-ipfs/namesys"
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
"github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/repo"
)
var log = logging.Logger("core")

View File

@ -7,14 +7,14 @@ import (
"io"
"io/ioutil"
util "github.com/ipfs/go-ipfs/blocks/blockstoreutil"
pin "github.com/ipfs/go-ipfs/pin"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
pin "github.com/ipfs/go-ipfs-pinner"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"
util "github.com/ipfs/go-ipfs/blocks/blockstoreutil"
)
type BlockAPI CoreAPI

View File

@ -18,16 +18,11 @@ import (
"errors"
"fmt"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/node"
"github.com/ipfs/go-ipfs/namesys"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/repo"
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-interface"
offlinexch "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-provider"
offlineroute "github.com/ipfs/go-ipfs-routing/offline"
ipld "github.com/ipfs/go-ipld-format"
@ -42,6 +37,11 @@ import (
routing "github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
record "github.com/libp2p/go-libp2p-record"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/node"
"github.com/ipfs/go-ipfs/namesys"
"github.com/ipfs/go-ipfs/repo"
)
var log = logging.Logger("core/coreapi")

View File

@ -3,9 +3,8 @@ package coreapi
import (
"context"
"github.com/ipfs/go-ipfs/pin"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-pinner"
ipld "github.com/ipfs/go-ipld-format"
)

View File

@ -11,12 +11,11 @@ import (
"io"
"io/ioutil"
"github.com/ipfs/go-ipfs/dagutils"
"github.com/ipfs/go-ipfs/pin"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-pinner"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-merkledag/dagutils"
ft "github.com/ipfs/go-unixfs"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
@ -307,7 +306,7 @@ func (api *ObjectAPI) Diff(ctx context.Context, before ipath.Path, after ipath.P
out := make([]coreiface.ObjectChange, len(changes))
for i, change := range changes {
out[i] = coreiface.ObjectChange{
Type: change.Type,
Type: coreiface.ChangeType(change.Type),
Path: change.Path,
}

View File

@ -6,7 +6,7 @@ import (
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-ipfs/pin"
pin "github.com/ipfs/go-ipfs-pinner"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"

View File

@ -7,13 +7,13 @@ import (
"time"
"github.com/ipfs/go-ipfs/core"
gc "github.com/ipfs/go-ipfs/pin/gc"
repo "github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/gc"
"github.com/ipfs/go-ipfs/repo"
humanize "github.com/dustin/go-humanize"
cid "github.com/ipfs/go-cid"
"github.com/dustin/go-humanize"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
mfs "github.com/ipfs/go-mfs"
"github.com/ipfs/go-mfs"
)
var log = logging.Logger("corerepo")

View File

@ -8,12 +8,11 @@ import (
gopath "path"
"strconv"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
chunker "github.com/ipfs/go-ipfs-chunker"
"github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-posinfo"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"

View File

@ -12,13 +12,13 @@ import (
"time"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/pin/gc"
"github.com/ipfs/go-ipfs/gc"
"github.com/ipfs/go-ipfs/repo"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
config "github.com/ipfs/go-ipfs-config"

View File

@ -4,10 +4,6 @@ import (
"context"
"fmt"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
@ -16,6 +12,7 @@ import (
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-interface"
"github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-mfs"
@ -23,6 +20,9 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
)
// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks

View File

@ -5,16 +5,16 @@ import (
"fmt"
"time"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-provider"
q "github.com/ipfs/go-ipfs-provider/queue"
"github.com/ipfs/go-ipfs-provider/simple"
ipld "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
)
const kReprovideFrequency = time.Hour * 12

View File

@ -1,211 +0,0 @@
package dagutils
import (
"context"
"fmt"
"path"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"
)
// These constants define the changes that can be applied to a DAG.
const (
Add = iota
Remove
Mod
)
// Change represents a change to a DAG and contains a reference to the old and
// new CIDs.
type Change struct {
Type coreiface.ChangeType
Path string
Before cid.Cid
After cid.Cid
}
// String prints a human-friendly line about a change.
func (c *Change) String() string {
switch c.Type {
case Add:
return fmt.Sprintf("Added %s at %s", c.After.String(), c.Path)
case Remove:
return fmt.Sprintf("Removed %s from %s", c.Before.String(), c.Path)
case Mod:
return fmt.Sprintf("Changed %s to %s at %s", c.Before.String(), c.After.String(), c.Path)
default:
panic("nope")
}
}
// ApplyChange applies the requested changes to the given node in the given dag.
func ApplyChange(ctx context.Context, ds ipld.DAGService, nd *dag.ProtoNode, cs []*Change) (*dag.ProtoNode, error) {
e := NewDagEditor(nd, ds)
for _, c := range cs {
switch c.Type {
case Add:
child, err := ds.Get(ctx, c.After)
if err != nil {
return nil, err
}
childpb, ok := child.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
err = e.InsertNodeAtPath(ctx, c.Path, childpb, nil)
if err != nil {
return nil, err
}
case Remove:
err := e.RmLink(ctx, c.Path)
if err != nil {
return nil, err
}
case Mod:
err := e.RmLink(ctx, c.Path)
if err != nil {
return nil, err
}
child, err := ds.Get(ctx, c.After)
if err != nil {
return nil, err
}
childpb, ok := child.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
err = e.InsertNodeAtPath(ctx, c.Path, childpb, nil)
if err != nil {
return nil, err
}
}
}
return e.Finalize(ctx, ds)
}
// Diff returns a set of changes that transform node 'a' into node 'b'.
// It only traverses links in the following cases:
// 1. two node's links number are greater than 0.
// 2. both of two nodes are ProtoNode.
// Otherwise, it compares the cid and emits a Mod change object.
func Diff(ctx context.Context, ds ipld.DAGService, a, b ipld.Node) ([]*Change, error) {
// Base case where both nodes are leaves, just compare
// their CIDs.
if len(a.Links()) == 0 && len(b.Links()) == 0 {
return getChange(a, b)
}
var out []*Change
cleanA, okA := a.Copy().(*dag.ProtoNode)
cleanB, okB := b.Copy().(*dag.ProtoNode)
if !okA || !okB {
return getChange(a, b)
}
// strip out unchanged stuff
for _, lnk := range a.Links() {
l, _, err := b.ResolveLink([]string{lnk.Name})
if err == nil {
if l.Cid.Equals(lnk.Cid) {
// no change... ignore it
} else {
anode, err := lnk.GetNode(ctx, ds)
if err != nil {
return nil, err
}
bnode, err := l.GetNode(ctx, ds)
if err != nil {
return nil, err
}
sub, err := Diff(ctx, ds, anode, bnode)
if err != nil {
return nil, err
}
for _, subc := range sub {
subc.Path = path.Join(lnk.Name, subc.Path)
out = append(out, subc)
}
}
_ = cleanA.RemoveNodeLink(l.Name)
_ = cleanB.RemoveNodeLink(l.Name)
}
}
for _, lnk := range cleanA.Links() {
out = append(out, &Change{
Type: Remove,
Path: lnk.Name,
Before: lnk.Cid,
})
}
for _, lnk := range cleanB.Links() {
out = append(out, &Change{
Type: Add,
Path: lnk.Name,
After: lnk.Cid,
})
}
return out, nil
}
// Conflict represents two incompatible changes and is returned by MergeDiffs().
type Conflict struct {
A *Change
B *Change
}
// MergeDiffs takes two slice of changes and adds them to a single slice.
// When a Change from b happens to the same path of an existing change in a,
// a conflict is created and b is not added to the merged slice.
// A slice of Conflicts is returned and contains pointers to the
// Changes involved (which share the same path).
func MergeDiffs(a, b []*Change) ([]*Change, []Conflict) {
var out []*Change
var conflicts []Conflict
paths := make(map[string]*Change)
for _, c := range a {
paths[c.Path] = c
}
for _, c := range b {
if ca, ok := paths[c.Path]; ok {
conflicts = append(conflicts, Conflict{
A: ca,
B: c,
})
} else {
out = append(out, c)
}
}
for _, c := range paths {
out = append(out, c)
}
return out, conflicts
}
func getChange(a, b ipld.Node) ([]*Change, error) {
if a.Cid().Equals(b.Cid()) {
return []*Change{}, nil
}
return []*Change{
{
Type: Mod,
Before: a.Cid(),
After: b.Cid(),
},
}, nil
}

View File

@ -1,99 +0,0 @@
package dagutils
import (
"context"
"fmt"
mdag "github.com/ipfs/go-merkledag"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
// DiffEnumerate fetches every object in the graph pointed to by 'to' that is
// not in 'from'. This can be used to more efficiently fetch a graph if you can
// guarantee you already have the entirety of 'from'
func DiffEnumerate(ctx context.Context, dserv ipld.NodeGetter, from, to cid.Cid) error {
fnd, err := dserv.Get(ctx, from)
if err != nil {
return fmt.Errorf("get %s: %s", from, err)
}
tnd, err := dserv.Get(ctx, to)
if err != nil {
return fmt.Errorf("get %s: %s", to, err)
}
diff := getLinkDiff(fnd, tnd)
sset := cid.NewSet()
for _, c := range diff {
// Since we're already assuming we have everything in the 'from' graph,
// add all those cids to our 'already seen' set to avoid potentially
// enumerating them later
if c.bef.Defined() {
sset.Add(c.bef)
}
}
for _, c := range diff {
if !c.bef.Defined() {
if sset.Has(c.aft) {
continue
}
err := mdag.Walk(ctx, mdag.GetLinksDirect(dserv), c.aft, sset.Visit, mdag.Concurrent())
if err != nil {
return err
}
} else {
err := DiffEnumerate(ctx, dserv, c.bef, c.aft)
if err != nil {
return err
}
}
}
return nil
}
// if both bef and aft are not nil, then that signifies bef was replaces with aft.
// if bef is nil and aft is not, that means aft was newly added
// if aft is nil and bef is not, that means bef was deleted
type diffpair struct {
bef, aft cid.Cid
}
// getLinkDiff returns a changeset between nodes 'a' and 'b'. Currently does
// not log deletions as our usecase doesnt call for this.
func getLinkDiff(a, b ipld.Node) []diffpair {
ina := make(map[string]*ipld.Link)
inb := make(map[string]*ipld.Link)
var aonly []cid.Cid
for _, l := range b.Links() {
inb[l.Cid.KeyString()] = l
}
for _, l := range a.Links() {
var key = l.Cid.KeyString()
ina[key] = l
if inb[key] == nil {
aonly = append(aonly, l.Cid)
}
}
var out []diffpair
var aindex int
for _, l := range b.Links() {
if ina[l.Cid.KeyString()] != nil {
continue
}
if aindex < len(aonly) {
out = append(out, diffpair{bef: aonly[aindex], aft: l.Cid})
aindex++
} else {
out = append(out, diffpair{aft: l.Cid})
continue
}
}
return out
}

View File

@ -1,249 +0,0 @@
package dagutils
import (
"context"
"fmt"
"testing"
dag "github.com/ipfs/go-merkledag"
mdtest "github.com/ipfs/go-merkledag/test"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
func buildNode(name string, desc map[string]ndesc, out map[string]ipld.Node) ipld.Node {
this := desc[name]
nd := new(dag.ProtoNode)
nd.SetData([]byte(name))
for k, v := range this {
child, ok := out[v]
if !ok {
child = buildNode(v, desc, out)
out[v] = child
}
if err := nd.AddNodeLink(k, child); err != nil {
panic(err)
}
}
return nd
}
type ndesc map[string]string
func mkGraph(desc map[string]ndesc) map[string]ipld.Node {
out := make(map[string]ipld.Node)
for name := range desc {
if _, ok := out[name]; ok {
continue
}
out[name] = buildNode(name, desc, out)
}
return out
}
var tg1 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "c",
},
"c": ndesc{},
}
var tg2 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "c",
},
"c": ndesc{"baz": "d"},
"d": ndesc{},
}
var tg3 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
"bar": "c",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "d",
},
"c": ndesc{},
"d": ndesc{},
}
var tg4 = map[string]ndesc{
"a1": ndesc{
"key1": "b",
"key2": "c",
},
"a2": ndesc{
"key1": "b",
"key2": "d",
},
}
var tg5 = map[string]ndesc{
"a1": ndesc{
"key1": "a",
"key2": "b",
},
"a2": ndesc{
"key1": "c",
"key2": "d",
},
}
func TestNameMatching(t *testing.T) {
nds := mkGraph(tg4)
diff := getLinkDiff(nds["a1"], nds["a2"])
if len(diff) != 1 {
t.Fatal(fmt.Errorf("node diff didn't match by name"))
}
}
func TestNameMatching2(t *testing.T) {
nds := mkGraph(tg5)
diff := getLinkDiff(nds["a1"], nds["a2"])
if len(diff) != 2 {
t.Fatal(fmt.Errorf("incorrect number of link diff elements"))
}
if !(diff[0].bef.Equals(nds["a1"].Links()[0].Cid) && diff[0].aft.Equals(nds["a2"].Links()[0].Cid)) {
t.Fatal(fmt.Errorf("node diff didn't match by name"))
}
}
func TestDiffEnumBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg1)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, nd := range nds {
err := ds.Add(ctx, nd)
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != nil {
t.Fatal(err)
}
err = assertCidList(lgds.log, []cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid()})
if err != nil {
t.Fatal(err)
}
}
type getLogger struct {
ds ipld.NodeGetter
log []cid.Cid
}
func (gl *getLogger) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
nd, err := gl.ds.Get(ctx, c)
if err != nil {
return nil, err
}
gl.log = append(gl.log, c)
return nd, nil
}
func (gl *getLogger) GetMany(ctx context.Context, cids []cid.Cid) <-chan *ipld.NodeOption {
outCh := make(chan *ipld.NodeOption, len(cids))
nds := gl.ds.GetMany(ctx, cids)
for no := range nds {
if no.Err == nil {
gl.log = append(gl.log, no.Node.Cid())
}
select {
case outCh <- no:
default:
panic("too many responses")
}
}
return nds
}
func assertCidList(a, b []cid.Cid) error {
if len(a) != len(b) {
return fmt.Errorf("got different number of cids than expected")
}
for i, c := range a {
if !c.Equals(b[i]) {
return fmt.Errorf("expected %s, got %s", c, b[i])
}
}
return nil
}
func TestDiffEnumFail(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg2)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, s := range []string{"a1", "a2", "b", "c"} {
err := ds.Add(ctx, nds[s])
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != ipld.ErrNotFound {
t.Fatal("expected err not found")
}
err = assertCidList(lgds.log, []cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid()})
if err != nil {
t.Fatal(err)
}
}
func TestDiffEnumRecurse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg3)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, s := range []string{"a1", "a2", "b", "c", "d"} {
err := ds.Add(ctx, nds[s])
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != nil {
t.Fatal(err)
}
err = assertCidList(lgds.log, []cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid(), nds["d"].Cid()})
if err != nil {
t.Fatal(err)
}
}

View File

@ -1,234 +0,0 @@
package dagutils
import (
"context"
"errors"
bserv "github.com/ipfs/go-blockservice"
dag "github.com/ipfs/go-merkledag"
path "github.com/ipfs/go-path"
ds "github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipld "github.com/ipfs/go-ipld-format"
)
// Editor represents a ProtoNode tree editor and provides methods to
// modify it.
type Editor struct {
root *dag.ProtoNode
// tmp is a temporary in memory (for now) dagstore for all of the
// intermediary nodes to be stored in
tmp ipld.DAGService
// src is the dagstore with *all* of the data on it, it is used to pull
// nodes from for modification (nil is a valid value)
src ipld.DAGService
}
// NewMemoryDagService returns a new, thread-safe in-memory DAGService.
func NewMemoryDagService() ipld.DAGService {
// build mem-datastore for editor's intermediary nodes
bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
bsrv := bserv.New(bs, offline.Exchange(bs))
return dag.NewDAGService(bsrv)
}
// NewDagEditor returns an ProtoNode editor.
//
// * root is the node to be modified
// * source is the dagstore to pull nodes from (optional)
func NewDagEditor(root *dag.ProtoNode, source ipld.DAGService) *Editor {
return &Editor{
root: root,
tmp: NewMemoryDagService(),
src: source,
}
}
// GetNode returns the a copy of the root node being edited.
func (e *Editor) GetNode() *dag.ProtoNode {
return e.root.Copy().(*dag.ProtoNode)
}
// GetDagService returns the DAGService used by this editor.
func (e *Editor) GetDagService() ipld.DAGService {
return e.tmp
}
func addLink(ctx context.Context, ds ipld.DAGService, root *dag.ProtoNode, childname string, childnd ipld.Node) (*dag.ProtoNode, error) {
if childname == "" {
return nil, errors.New("cannot create link with no name")
}
// ensure that the node we are adding is in the dagservice
err := ds.Add(ctx, childnd)
if err != nil {
return nil, err
}
_ = ds.Remove(ctx, root.Cid())
// ensure no link with that name already exists
_ = root.RemoveNodeLink(childname) // ignore error, only option is ErrNotFound
if err := root.AddNodeLink(childname, childnd); err != nil {
return nil, err
}
if err := ds.Add(ctx, root); err != nil {
return nil, err
}
return root, nil
}
// InsertNodeAtPath inserts a new node in the tree and replaces the current root with the new one.
func (e *Editor) InsertNodeAtPath(ctx context.Context, pth string, toinsert ipld.Node, create func() *dag.ProtoNode) error {
splpath := path.SplitList(pth)
nd, err := e.insertNodeAtPath(ctx, e.root, splpath, toinsert, create)
if err != nil {
return err
}
e.root = nd
return nil
}
func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path []string, toinsert ipld.Node, create func() *dag.ProtoNode) (*dag.ProtoNode, error) {
if len(path) == 1 {
return addLink(ctx, e.tmp, root, path[0], toinsert)
}
nd, err := root.GetLinkedProtoNode(ctx, e.tmp, path[0])
if err != nil {
// if 'create' is true, we create directories on the way down as needed
if err == dag.ErrLinkNotFound && create != nil {
nd = create()
err = nil // no longer an error case
} else if err == ipld.ErrNotFound {
// try finding it in our source dagstore
nd, err = root.GetLinkedProtoNode(ctx, e.src, path[0])
}
// if we receive an ErrNotFound, then our second 'GetLinkedNode' call
// also fails, we want to error out
if err != nil {
return nil, err
}
}
ndprime, err := e.insertNodeAtPath(ctx, nd, path[1:], toinsert, create)
if err != nil {
return nil, err
}
_ = e.tmp.Remove(ctx, root.Cid())
_ = root.RemoveNodeLink(path[0])
err = root.AddNodeLink(path[0], ndprime)
if err != nil {
return nil, err
}
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
return root, nil
}
// RmLink removes the link with the given name and updates the root node of
// the editor.
func (e *Editor) RmLink(ctx context.Context, pth string) error {
splpath := path.SplitList(pth)
nd, err := e.rmLink(ctx, e.root, splpath)
if err != nil {
return err
}
e.root = nd
return nil
}
func (e *Editor) rmLink(ctx context.Context, root *dag.ProtoNode, path []string) (*dag.ProtoNode, error) {
if len(path) == 1 {
// base case, remove node in question
err := root.RemoveNodeLink(path[0])
if err != nil {
return nil, err
}
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
return root, nil
}
// search for node in both tmp dagstore and source dagstore
nd, err := root.GetLinkedProtoNode(ctx, e.tmp, path[0])
if err == ipld.ErrNotFound {
nd, err = root.GetLinkedProtoNode(ctx, e.src, path[0])
}
if err != nil {
return nil, err
}
nnode, err := e.rmLink(ctx, nd, path[1:])
if err != nil {
return nil, err
}
_ = e.tmp.Remove(ctx, root.Cid())
_ = root.RemoveNodeLink(path[0])
err = root.AddNodeLink(path[0], nnode)
if err != nil {
return nil, err
}
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
return root, nil
}
// Finalize writes the new DAG to the given DAGService and returns the modified
// root node.
func (e *Editor) Finalize(ctx context.Context, ds ipld.DAGService) (*dag.ProtoNode, error) {
nd := e.GetNode()
err := copyDag(ctx, nd, e.tmp, ds)
return nd, err
}
func copyDag(ctx context.Context, nd ipld.Node, from, to ipld.DAGService) error {
// TODO(#4609): make this batch.
err := to.Add(ctx, nd)
if err != nil {
return err
}
for _, lnk := range nd.Links() {
child, err := lnk.GetNode(ctx, from)
if err != nil {
if err == ipld.ErrNotFound {
// not found means we didnt modify it, and it should
// already be in the target datastore
continue
}
return err
}
err = copyDag(ctx, child, from, to)
if err != nil {
return err
}
}
return nil
}

View File

@ -1,114 +0,0 @@
package dagutils
import (
"context"
"testing"
dag "github.com/ipfs/go-merkledag"
mdtest "github.com/ipfs/go-merkledag/test"
path "github.com/ipfs/go-path"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
func TestAddLink(t *testing.T) {
ctx, context := context.WithCancel(context.Background())
defer context()
ds := mdtest.Mock()
fishnode := dag.NodeWithData([]byte("fishcakes!"))
err := ds.Add(ctx, fishnode)
if err != nil {
t.Fatal(err)
}
nd := new(dag.ProtoNode)
nnode, err := addLink(ctx, ds, nd, "fish", fishnode)
if err != nil {
t.Fatal(err)
}
fnprime, err := nnode.GetLinkedNode(ctx, ds, "fish")
if err != nil {
t.Fatal(err)
}
fnpkey := fnprime.Cid()
if !fnpkey.Equals(fishnode.Cid()) {
t.Fatal("wrong child node found!")
}
}
func assertNodeAtPath(t *testing.T, ds ipld.DAGService, root *dag.ProtoNode, pth string, exp cid.Cid) {
parts := path.SplitList(pth)
cur := root
for _, e := range parts {
nxt, err := cur.GetLinkedProtoNode(context.Background(), ds, e)
if err != nil {
t.Fatal(err)
}
cur = nxt
}
curc := cur.Cid()
if !curc.Equals(exp) {
t.Fatal("node not as expected at end of path")
}
}
func TestInsertNode(t *testing.T) {
root := new(dag.ProtoNode)
e := NewDagEditor(root, nil)
testInsert(t, e, "a", "anodefortesting", false, "")
testInsert(t, e, "a/b", "data", false, "")
testInsert(t, e, "a/b/c/d/e", "blah", false, "no link by that name")
testInsert(t, e, "a/b/c/d/e", "foo", true, "")
testInsert(t, e, "a/b/c/d/f", "baz", true, "")
testInsert(t, e, "a/b/c/d/f", "bar", true, "")
testInsert(t, e, "", "bar", true, "cannot create link with no name")
testInsert(t, e, "////", "slashes", true, "cannot create link with no name")
c := e.GetNode().Cid()
if c.String() != "QmZ8yeT9uD6ouJPNAYt62XffYuXBT6b4mP4obRSE9cJrSt" {
t.Fatal("output was different than expected: ", c)
}
}
func testInsert(t *testing.T, e *Editor, path, data string, create bool, experr string) {
child := dag.NodeWithData([]byte(data))
err := e.tmp.Add(context.Background(), child)
if err != nil {
t.Fatal(err)
}
var c func() *dag.ProtoNode
if create {
c = func() *dag.ProtoNode {
return &dag.ProtoNode{}
}
}
err = e.InsertNodeAtPath(context.Background(), path, child, c)
if experr != "" {
var got string
if err != nil {
got = err.Error()
}
if got != experr {
t.Fatalf("expected '%s' but got '%s'", experr, got)
}
return
}
if err != nil {
t.Fatal(err, path, data, create, experr)
}
assertNodeAtPath(t, e.tmp, e.root, path, child.Cid())
}

View File

@ -8,15 +8,14 @@ import (
"strings"
bserv "github.com/ipfs/go-blockservice"
pin "github.com/ipfs/go-ipfs/pin"
dag "github.com/ipfs/go-merkledag"
cid "github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
pin "github.com/ipfs/go-ipfs-pinner"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
dag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-verifcid"
)

5
go.mod
View File

@ -35,6 +35,7 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-pinner v0.0.2
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.3.0
github.com/ipfs/go-ipfs-routing v0.1.0
@ -44,7 +45,7 @@ require (
github.com/ipfs/go-ipld-git v0.0.2
github.com/ipfs/go-ipns v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-merkledag v0.2.3
github.com/ipfs/go-merkledag v0.3.0
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-metrics-prometheus v0.0.2
github.com/ipfs/go-mfs v0.1.1
@ -107,4 +108,4 @@ require (
gopkg.in/cheggaaa/pb.v1 v1.0.28
)
go 1.13
go 1.12

14
go.sum
View File

@ -8,6 +8,10 @@ github.com/AndreasBriese/bbloom v0.0.0-20190823232136-616930265c33/go.mod h1:bOv
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Kubuxu/go-os-helper v0.0.1 h1:EJiD2VUQyh5A9hWJLmc6iWg6yIcJ7jpBcwC8GMGXfDk=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/MichaelMure/go-ipfs-pinner v0.0.0-20191120152055-f93a308647da h1:VujEWo1PUA+EIMvYU26fNvqykLMBuyXkILKbue86bzg=
github.com/MichaelMure/go-ipfs-pinner v0.0.0-20191120152055-f93a308647da/go.mod h1:0G0RKJr9eZwNvcwdLNVymjCcFhWtX9aXVGOxCZglwMg=
github.com/MichaelMure/go-merkledag v0.2.1-0.20191119160700-c20b9a52f504 h1:2qPGrw2YNfWNaOTPHY/WiSQZUKgiGQWh34my2zSppzo=
github.com/MichaelMure/go-merkledag v0.2.1-0.20191119160700-c20b9a52f504/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/Stebalien/go-bitfield v0.0.0-20180330043415-076a62f9ce6e/go.mod h1:3oM7gXIttpYDAJXpVNnSCiUMYBLIZ6cb1t+Ip982MRo=
github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo=
github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s=
@ -209,6 +213,8 @@ github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjN
github.com/ipfs/go-ipfs-files v0.0.4 h1:WzRCivcybUQch/Qh6v8LBRhKtRsjnwyiuOV09mK7mrE=
github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-flags v0.0.1/go.mod h1:RnXBb9WV53GSfTrSDVK61NLTFKvWc60n+K9EgCDh+rA=
github.com/ipfs/go-ipfs-pinner v0.0.2 h1:KRXt2V0TzoTd3mO1aONSw8C9wnZtl7RLpPruN/XDnlQ=
github.com/ipfs/go-ipfs-pinner v0.0.2/go.mod h1:KZGyGAR+yLthGEkG9tuA2zweB7O6auXaJNjX6IbEbOs=
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
@ -238,6 +244,9 @@ github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKy
github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/ipfs/go-merkledag v0.2.3 h1:aMdkK9G1hEeNvn3VXfiEMLY0iJnbiQQUHnM0HFJREsE=
github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/ipfs/go-merkledag v0.2.4/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/ipfs/go-merkledag v0.3.0 h1:1bXv/ZRPZLVdij/a33CkXMVdxUdred9sz4xyph+0ls0=
github.com/ipfs/go-merkledag v0.3.0/go.mod h1:4pymaZLhSLNVuiCITYrpViD6vmfZ/Ws4n/L9tfNv3S4=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-metrics-prometheus v0.0.2 h1:9i2iljLg12S78OhC6UAiXi176xvQGiZaGVF1CUVdE+s=
@ -259,9 +268,14 @@ github.com/ipfs/go-unixfs v0.2.1 h1:g51t9ODICFZ3F51FPivm8dE7NzYcdAQNUL9wGP5AYa0=
github.com/ipfs/go-unixfs v0.2.1/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipfs/interface-go-ipfs-core v0.2.3 h1:E6uQ+1fJjkxJWlL9lAE72a5FWeyeeNL3GitLy8+jq3Y=
github.com/ipfs/interface-go-ipfs-core v0.2.3 h1:E6uQ+1fJjkxJWlL9lAE72a5FWeyeeNL3GitLy8+jq3Y=
github.com/ipfs/interface-go-ipfs-core v0.2.3/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s=
github.com/ipfs/interface-go-ipfs-core v0.2.3/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s=
github.com/ipfs/interface-go-ipfs-core v0.2.5 h1:/rspOe8RbIxwtssEXHB+X9JXhOBDCQt8x50d2kFPXL8=
github.com/ipfs/interface-go-ipfs-core v0.2.5/go.mod h1:Tihp8zxGpUeE3Tokr94L6zWZZdkRQvG5TL6i9MuNE+s=
github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1 h1:i0LektDkO1QlrTm/cSuP+PyBCDnYvjPLGl4LdWEMiaA=

View File

@ -6,11 +6,10 @@ import (
"sync"
"time"
pin "github.com/ipfs/go-ipfs/pin"
proto "github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
dsquery "github.com/ipfs/go-datastore/query"
pin "github.com/ipfs/go-ipfs-pinner"
ipns "github.com/ipfs/go-ipns"
pb "github.com/ipfs/go-ipns/pb"
path "github.com/ipfs/go-path"

View File

@ -1,8 +0,0 @@
include mk/header.mk
PB_$(d) = $(wildcard $(d)/*.proto)
TGTS_$(d) = $(PB_$(d):.proto=.pb.go)
#DEPS_GO += $(TGTS_$(d))
include mk/footer.mk

View File

@ -1,3 +0,0 @@
package pb
//go:generate protoc --gogo_out=. header.proto

View File

@ -1,381 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pin/internal/pb/header.proto
package pb
import (
encoding_binary "encoding/binary"
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Set struct {
// 1 for now, library will refuse to handle entries with an unrecognized version.
Version uint32 `protobuf:"varint,1,opt,name=version" json:"version"`
// how many of the links are subtrees
Fanout uint32 `protobuf:"varint,2,opt,name=fanout" json:"fanout"`
// hash seed for subtree selection, a random number
Seed uint32 `protobuf:"fixed32,3,opt,name=seed" json:"seed"`
}
func (m *Set) Reset() { *m = Set{} }
func (m *Set) String() string { return proto.CompactTextString(m) }
func (*Set) ProtoMessage() {}
func (*Set) Descriptor() ([]byte, []int) {
return fileDescriptor_cda303a5a3ed87e7, []int{0}
}
func (m *Set) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Set) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Set.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Set) XXX_Merge(src proto.Message) {
xxx_messageInfo_Set.Merge(m, src)
}
func (m *Set) XXX_Size() int {
return m.Size()
}
func (m *Set) XXX_DiscardUnknown() {
xxx_messageInfo_Set.DiscardUnknown(m)
}
var xxx_messageInfo_Set proto.InternalMessageInfo
func (m *Set) GetVersion() uint32 {
if m != nil {
return m.Version
}
return 0
}
func (m *Set) GetFanout() uint32 {
if m != nil {
return m.Fanout
}
return 0
}
func (m *Set) GetSeed() uint32 {
if m != nil {
return m.Seed
}
return 0
}
func init() {
proto.RegisterType((*Set)(nil), "ipfs.pin.Set")
}
func init() { proto.RegisterFile("pin/internal/pb/header.proto", fileDescriptor_cda303a5a3ed87e7) }
var fileDescriptor_cda303a5a3ed87e7 = []byte{
// 162 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x29, 0xc8, 0xcc, 0xd3,
0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1, 0x2f, 0x48, 0xd2, 0xcf, 0x48, 0x4d, 0x4c,
0x49, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xc8, 0x2c, 0x48, 0x2b, 0xd6, 0x2b,
0xc8, 0xcc, 0x53, 0x8a, 0xe5, 0x62, 0x0e, 0x4e, 0x2d, 0x11, 0x92, 0xe3, 0x62, 0x2f, 0x4b, 0x2d,
0x2a, 0xce, 0xcc, 0xcf, 0x93, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x75, 0x62, 0x39, 0x71, 0x4f, 0x9e,
0x21, 0x08, 0x26, 0x28, 0x24, 0xc3, 0xc5, 0x96, 0x96, 0x98, 0x97, 0x5f, 0x5a, 0x22, 0xc1, 0x84,
0x24, 0x0d, 0x15, 0x13, 0x92, 0xe0, 0x62, 0x29, 0x4e, 0x4d, 0x4d, 0x91, 0x60, 0x56, 0x60, 0xd4,
0x60, 0x87, 0xca, 0x81, 0x45, 0x9c, 0x64, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1,
0xc1, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, 0x8e,
0x21, 0x8a, 0xa9, 0x20, 0x09, 0x10, 0x00, 0x00, 0xff, 0xff, 0x20, 0x85, 0x2f, 0x24, 0xa5, 0x00,
0x00, 0x00,
}
func (m *Set) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Set) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
dAtA[i] = 0x8
i++
i = encodeVarintHeader(dAtA, i, uint64(m.Version))
dAtA[i] = 0x10
i++
i = encodeVarintHeader(dAtA, i, uint64(m.Fanout))
dAtA[i] = 0x1d
i++
encoding_binary.LittleEndian.PutUint32(dAtA[i:], uint32(m.Seed))
i += 4
return i, nil
}
func encodeVarintHeader(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *Set) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
n += 1 + sovHeader(uint64(m.Version))
n += 1 + sovHeader(uint64(m.Fanout))
n += 5
return n
}
func sovHeader(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozHeader(x uint64) (n int) {
return sovHeader(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Set) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHeader
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Set: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Set: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType)
}
m.Version = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHeader
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Version |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Fanout", wireType)
}
m.Fanout = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHeader
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Fanout |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 5 {
return fmt.Errorf("proto: wrong wireType = %d for field Seed", wireType)
}
m.Seed = 0
if (iNdEx + 4) > l {
return io.ErrUnexpectedEOF
}
m.Seed = uint32(encoding_binary.LittleEndian.Uint32(dAtA[iNdEx:]))
iNdEx += 4
default:
iNdEx = preIndex
skippy, err := skipHeader(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthHeader
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthHeader
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipHeader(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHeader
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHeader
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHeader
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthHeader
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthHeader
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHeader
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipHeader(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
if iNdEx < 0 {
return 0, ErrInvalidLengthHeader
}
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthHeader = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowHeader = fmt.Errorf("proto: integer overflow")
)

View File

@ -1,14 +0,0 @@
syntax = "proto2";
package ipfs.pin;
option go_package = "pb";
message Set {
// 1 for now, library will refuse to handle entries with an unrecognized version.
optional uint32 version = 1;
// how many of the links are subtrees
optional uint32 fanout = 2;
// hash seed for subtree selection, a random number
optional fixed32 seed = 3;
}

View File

@ -1,634 +0,0 @@
// Package pin implements structures and methods to keep track of
// which objects a user wants to keep stored locally.
package pin
import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/ipfs/go-ipfs/dagutils"
mdag "github.com/ipfs/go-merkledag"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("pin")
var pinDatastoreKey = ds.NewKey("/local/pins")
var emptyKey cid.Cid
func init() {
e, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
if err != nil {
log.Error("failed to decode empty key constant")
os.Exit(1)
}
emptyKey = e
}
const (
linkRecursive = "recursive"
linkDirect = "direct"
linkIndirect = "indirect"
linkInternal = "internal"
linkNotPinned = "not pinned"
linkAny = "any"
linkAll = "all"
)
// Mode allows to specify different types of pin (recursive, direct etc.).
// See the Pin Modes constants for a full list.
type Mode int
// Pin Modes
const (
// Recursive pins pin the target cids along with any reachable children.
Recursive Mode = iota
// Direct pins pin just the target cid.
Direct
// Indirect pins are cids who have some ancestor pinned recursively.
Indirect
// Internal pins are cids used to keep the internal state of the pinner.
Internal
// NotPinned
NotPinned
// Any refers to any pinned cid
Any
)
// ModeToString returns a human-readable name for the Mode.
func ModeToString(mode Mode) (string, bool) {
m := map[Mode]string{
Recursive: linkRecursive,
Direct: linkDirect,
Indirect: linkIndirect,
Internal: linkInternal,
NotPinned: linkNotPinned,
Any: linkAny,
}
s, ok := m[mode]
return s, ok
}
// StringToMode parses the result of ModeToString() back to a Mode.
// It returns a boolean which is set to false if the mode is unknown.
func StringToMode(s string) (Mode, bool) {
m := map[string]Mode{
linkRecursive: Recursive,
linkDirect: Direct,
linkIndirect: Indirect,
linkInternal: Internal,
linkNotPinned: NotPinned,
linkAny: Any,
linkAll: Any, // "all" and "any" means the same thing
}
mode, ok := m[s]
return mode, ok
}
// A Pinner provides the necessary methods to keep track of Nodes which are
// to be kept locally, according to a pin mode. In practice, a Pinner is in
// in charge of keeping the list of items from the local storage that should
// not be garbage-collected.
type Pinner interface {
// IsPinned returns whether or not the given cid is pinned
// and an explanation of why its pinned
IsPinned(ctx context.Context, c cid.Cid) (string, bool, error)
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
IsPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error)
// Pin the given node, optionally recursively.
Pin(ctx context.Context, node ipld.Node, recursive bool) error
// Unpin the given cid. If recursive is true, removes either a recursive or
// a direct pin. If recursive is false, only removes a direct pin.
Unpin(ctx context.Context, cid cid.Cid, recursive bool) error
// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
Update(ctx context.Context, from, to cid.Cid, unpin bool) error
// Check if a set of keys are pinned, more efficient than
// calling IsPinned for each key
CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]Pinned, error)
// PinWithMode is for manually editing the pin structure. Use with
// care! If used improperly, garbage collection may not be
// successful.
PinWithMode(cid.Cid, Mode)
// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
RemovePinWithMode(cid.Cid, Mode)
// Flush writes the pin state to the backing datastore
Flush(ctx context.Context) error
// DirectKeys returns all directly pinned cids
DirectKeys(ctx context.Context) ([]cid.Cid, error)
// DirectKeys returns all recursively pinned cids
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins(ctx context.Context) ([]cid.Cid, error)
}
// Pinned represents CID which has been pinned with a pinning strategy.
// The Via field allows to identify the pinning parent of this CID, in the
// case that the item is not pinned directly (but rather pinned recursively
// by some ascendant).
type Pinned struct {
Key cid.Cid
Mode Mode
Via cid.Cid
}
// Pinned returns whether or not the given cid is pinned
func (p Pinned) Pinned() bool {
return p.Mode != NotPinned
}
// String Returns pin status as string
func (p Pinned) String() string {
switch p.Mode {
case NotPinned:
return "not pinned"
case Indirect:
return fmt.Sprintf("pinned via %s", p.Via)
default:
modeStr, _ := ModeToString(p.Mode)
return fmt.Sprintf("pinned: %s", modeStr)
}
}
// pinner implements the Pinner interface
type pinner struct {
lock sync.RWMutex
recursePin *cid.Set
directPin *cid.Set
// Track the keys used for storing the pinning state, so gc does
// not delete them.
internalPin *cid.Set
dserv ipld.DAGService
internal ipld.DAGService // dagservice used to store internal objects
dstore ds.Datastore
}
// NewPinner creates a new pinner using the given datastore as a backend
func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner {
rcset := cid.NewSet()
dirset := cid.NewSet()
return &pinner{
recursePin: rcset,
directPin: dirset,
dserv: serv,
dstore: dstore,
internal: internal,
internalPin: cid.NewSet(),
}
}
// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
err := p.dserv.Add(ctx, node)
if err != nil {
return err
}
c := node.Cid()
p.lock.Lock()
defer p.lock.Unlock()
if recurse {
if p.recursePin.Has(c) {
return nil
}
p.lock.Unlock()
// temporary unlock to fetch the entire graph
err := mdag.FetchGraph(ctx, c, p.dserv)
p.lock.Lock()
if err != nil {
return err
}
if p.recursePin.Has(c) {
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
}
p.recursePin.Add(c)
} else {
if p.recursePin.Has(c) {
return fmt.Errorf("%s already pinned recursively", c.String())
}
p.directPin.Add(c)
}
return nil
}
// ErrNotPinned is returned when trying to unpin items which are not pinned.
var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly")
// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.recursePin.Has(c) {
if !recursive {
return fmt.Errorf("%s is pinned recursively", c)
}
p.recursePin.Remove(c)
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
return nil
}
return ErrNotPinned
}
func (p *pinner) isInternalPin(c cid.Cid) bool {
return p.internalPin.Has(c)
}
// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, Any)
}
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, mode)
}
// isPinnedWithType is the implementation of IsPinnedWithType that does not lock.
// intended for use by other pinned methods that already take locks
func (p *pinner) isPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error) {
switch mode {
case Any, Direct, Indirect, Recursive, Internal:
default:
err := fmt.Errorf("invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}",
mode, Direct, Indirect, Recursive, Internal, Any)
return "", false, err
}
if (mode == Recursive || mode == Any) && p.recursePin.Has(c) {
return linkRecursive, true, nil
}
if mode == Recursive {
return "", false, nil
}
if (mode == Direct || mode == Any) && p.directPin.Has(c) {
return linkDirect, true, nil
}
if mode == Direct {
return "", false, nil
}
if (mode == Internal || mode == Any) && p.isInternalPin(c) {
return linkInternal, true, nil
}
if mode == Internal {
return "", false, nil
}
// Default is Indirect
visitedSet := cid.NewSet()
for _, rc := range p.recursePin.Keys() {
has, err := hasChild(ctx, p.dserv, rc, c, visitedSet.Visit)
if err != nil {
return "", false, err
}
if has {
return rc.String(), true, nil
}
}
return "", false, nil
}
// CheckIfPinned Checks if a set of keys are pinned, more efficient than
// calling IsPinned for each key, returns the pinned status of cid(s)
func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]Pinned, error) {
p.lock.RLock()
defer p.lock.RUnlock()
pinned := make([]Pinned, 0, len(cids))
toCheck := cid.NewSet()
// First check for non-Indirect pins directly
for _, c := range cids {
if p.recursePin.Has(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Recursive})
} else if p.directPin.Has(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Direct})
} else if p.isInternalPin(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Internal})
} else {
toCheck.Add(c)
}
}
// Now walk all recursive pins to check for indirect pins
var checkChildren func(cid.Cid, cid.Cid) error
checkChildren = func(rk, parentKey cid.Cid) error {
links, err := ipld.GetLinks(ctx, p.dserv, parentKey)
if err != nil {
return err
}
for _, lnk := range links {
c := lnk.Cid
if toCheck.Has(c) {
pinned = append(pinned,
Pinned{Key: c, Mode: Indirect, Via: rk})
toCheck.Remove(c)
}
err := checkChildren(rk, c)
if err != nil {
return err
}
if toCheck.Len() == 0 {
return nil
}
}
return nil
}
for _, rk := range p.recursePin.Keys() {
err := checkChildren(rk, rk)
if err != nil {
return nil, err
}
if toCheck.Len() == 0 {
break
}
}
// Anything left in toCheck is not pinned
for _, k := range toCheck.Keys() {
pinned = append(pinned, Pinned{Key: k, Mode: NotPinned})
}
return pinned, nil
}
// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
func (p *pinner) RemovePinWithMode(c cid.Cid, mode Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case Direct:
p.directPin.Remove(c)
case Recursive:
p.recursePin.Remove(c)
default:
// programmer error, panic OK
panic("unrecognized pin type")
}
}
func cidSetWithValues(cids []cid.Cid) *cid.Set {
out := cid.NewSet()
for _, c := range cids {
out.Add(c)
}
return out
}
// LoadPinner loads a pinner and its keysets from the given datastore
func LoadPinner(d ds.Datastore, dserv, internal ipld.DAGService) (Pinner, error) {
p := new(pinner)
rootKey, err := d.Get(pinDatastoreKey)
if err != nil {
return nil, fmt.Errorf("cannot load pin state: %v", err)
}
rootCid, err := cid.Cast(rootKey)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
root, err := internal.Get(ctx, rootCid)
if err != nil {
return nil, fmt.Errorf("cannot find pinning root object: %v", err)
}
rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}
internalset := cid.NewSet()
internalset.Add(rootCid)
recordInternal := internalset.Add
{ // load recursive set
recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load recursive pins: %v", err)
}
p.recursePin = cidSetWithValues(recurseKeys)
}
{ // load direct set
directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load direct pins: %v", err)
}
p.directPin = cidSetWithValues(directKeys)
}
p.internalPin = internalset
// assign services
p.dserv = dserv
p.dstore = d
p.internal = internal
return p, nil
}
// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.directPin.Keys(), nil
}
// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.recursePin.Keys(), nil
}
// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error {
if from == to {
// Nothing to do. Don't remove this check or we'll end up
// _removing_ the pin.
//
// See #6648
return nil
}
p.lock.Lock()
defer p.lock.Unlock()
if !p.recursePin.Has(from) {
return fmt.Errorf("'from' cid was not recursively pinned already")
}
err := dagutils.DiffEnumerate(ctx, p.dserv, from, to)
if err != nil {
return err
}
p.recursePin.Add(to)
if unpin {
p.recursePin.Remove(from)
}
return nil
}
// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()
internalset := cid.NewSet()
recordInternal := internalset.Add
root := &mdag.ProtoNode{}
{
n, err := storeSet(ctx, p.internal, p.directPin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkDirect, n); err != nil {
return err
}
}
{
n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkRecursive, n); err != nil {
return err
}
}
// add the empty node, its referenced by the pin sets but never created
err := p.internal.Add(ctx, new(mdag.ProtoNode))
if err != nil {
return err
}
err = p.internal.Add(ctx, root)
if err != nil {
return err
}
k := root.Cid()
internalset.Add(k)
if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil {
return fmt.Errorf("cannot store pin state: %v", err)
}
p.internalPin = internalset
return nil
}
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
p.lock.Lock()
defer p.lock.Unlock()
var out []cid.Cid
out = append(out, p.internalPin.Keys()...)
return out, nil
}
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(c cid.Cid, mode Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case Recursive:
p.recursePin.Add(c)
case Direct:
p.directPin.Add(c)
}
}
// hasChild recursively looks for a Cid among the children of a root Cid.
// The visit function can be used to shortcut already-visited branches.
func hasChild(ctx context.Context, ng ipld.NodeGetter, root cid.Cid, child cid.Cid, visit func(cid.Cid) bool) (bool, error) {
links, err := ipld.GetLinks(ctx, ng, root)
if err != nil {
return false, err
}
for _, lnk := range links {
c := lnk.Cid
if lnk.Cid.Equals(child) {
return true, nil
}
if visit(c) {
has, err := hasChild(ctx, ng, c, child, visit)
if err != nil {
return false, err
}
if has {
return has, nil
}
}
}
return false, nil
}

View File

@ -1,416 +0,0 @@
package pin
import (
"context"
"io"
"testing"
"time"
bs "github.com/ipfs/go-blockservice"
mdag "github.com/ipfs/go-merkledag"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
util "github.com/ipfs/go-ipfs-util"
)
var rand = util.NewTimeSeededRand()
func randNode() (*mdag.ProtoNode, cid.Cid) {
nd := new(mdag.ProtoNode)
nd.SetData(make([]byte, 32))
_, err := io.ReadFull(rand, nd.Data())
if err != nil {
panic(err)
}
k := nd.Cid()
return nd, k
}
func assertPinned(t *testing.T, p Pinner, c cid.Cid, failmsg string) {
_, pinned, err := p.IsPinned(context.Background(), c)
if err != nil {
t.Fatal(err)
}
if !pinned {
t.Fatal(failmsg)
}
}
func assertUnpinned(t *testing.T, p Pinner, c cid.Cid, failmsg string) {
_, pinned, err := p.IsPinned(context.Background(), c)
if err != nil {
t.Fatal(err)
}
if pinned {
t.Fatal(failmsg)
}
}
func TestPinnerBasic(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv, dserv)
a, ak := randNode()
err := dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
// Pin A{}
err = p.Pin(ctx, a, false)
if err != nil {
t.Fatal(err)
}
assertPinned(t, p, ak, "Failed to find key")
// create new node c, to be indirectly pinned through b
c, _ := randNode()
err = dserv.Add(ctx, c)
if err != nil {
t.Fatal(err)
}
ck := c.Cid()
// Create new node b, to be parent to a and c
b, _ := randNode()
err = b.AddNodeLink("child", a)
if err != nil {
t.Fatal(err)
}
err = b.AddNodeLink("otherchild", c)
if err != nil {
t.Fatal(err)
}
err = dserv.Add(ctx, b)
if err != nil {
t.Fatal(err)
}
bk := b.Cid()
// recursively pin B{A,C}
err = p.Pin(ctx, b, true)
if err != nil {
t.Fatal(err)
}
assertPinned(t, p, ck, "child of recursively pinned node not found")
assertPinned(t, p, bk, "Recursively pinned node not found..")
d, _ := randNode()
_ = d.AddNodeLink("a", a)
_ = d.AddNodeLink("c", c)
e, _ := randNode()
_ = d.AddNodeLink("e", e)
// Must be in dagserv for unpin to work
err = dserv.Add(ctx, e)
if err != nil {
t.Fatal(err)
}
err = dserv.Add(ctx, d)
if err != nil {
t.Fatal(err)
}
// Add D{A,C,E}
err = p.Pin(ctx, d, true)
if err != nil {
t.Fatal(err)
}
dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")
// Test recursive unpin
err = p.Unpin(ctx, dk, true)
if err != nil {
t.Fatal(err)
}
err = p.Flush(ctx)
if err != nil {
t.Fatal(err)
}
np, err := LoadPinner(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
// Test directly pinned
assertPinned(t, np, ak, "Could not find pinned node!")
// Test recursively pinned
assertPinned(t, np, bk, "could not find recursively pinned node")
}
func TestIsPinnedLookup(t *testing.T) {
// We are going to test that lookups work in pins which share
// the same branches. For that we will construct this tree:
//
// A5->A4->A3->A2->A1->A0
// / /
// B------- /
// \ /
// C---------------
//
// We will ensure that IsPinned works for all objects both when they
// are pinned and once they have been unpinned.
aBranchLen := 6
if aBranchLen < 3 {
t.Fatal("set aBranchLen to at least 3")
}
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv, dserv)
aNodes := make([]*mdag.ProtoNode, aBranchLen)
aKeys := make([]cid.Cid, aBranchLen)
for i := 0; i < aBranchLen; i++ {
a, _ := randNode()
if i >= 1 {
err := a.AddNodeLink("child", aNodes[i-1])
if err != nil {
t.Fatal(err)
}
}
err := dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
//t.Logf("a[%d] is %s", i, ak)
aNodes[i] = a
aKeys[i] = a.Cid()
}
// Pin A5 recursively
if err := p.Pin(ctx, aNodes[aBranchLen-1], true); err != nil {
t.Fatal(err)
}
// Create node B and add A3 as child
b, _ := randNode()
if err := b.AddNodeLink("mychild", aNodes[3]); err != nil {
t.Fatal(err)
}
// Create C node
c, _ := randNode()
// Add A0 as child of C
if err := c.AddNodeLink("child", aNodes[0]); err != nil {
t.Fatal(err)
}
// Add C
err := dserv.Add(ctx, c)
if err != nil {
t.Fatal(err)
}
ck := c.Cid()
//t.Logf("C is %s", ck)
// Add C to B and Add B
if err := b.AddNodeLink("myotherchild", c); err != nil {
t.Fatal(err)
}
err = dserv.Add(ctx, b)
if err != nil {
t.Fatal(err)
}
bk := b.Cid()
//t.Logf("B is %s", bk)
// Pin C recursively
if err := p.Pin(ctx, c, true); err != nil {
t.Fatal(err)
}
// Pin B recursively
if err := p.Pin(ctx, b, true); err != nil {
t.Fatal(err)
}
assertPinned(t, p, aKeys[0], "A0 should be pinned")
assertPinned(t, p, aKeys[1], "A1 should be pinned")
assertPinned(t, p, ck, "C should be pinned")
assertPinned(t, p, bk, "B should be pinned")
// Unpin A5 recursively
if err := p.Unpin(ctx, aKeys[5], true); err != nil {
t.Fatal(err)
}
assertPinned(t, p, aKeys[0], "A0 should still be pinned through B")
assertUnpinned(t, p, aKeys[4], "A4 should be unpinned")
// Unpin B recursively
if err := p.Unpin(ctx, bk, true); err != nil {
t.Fatal(err)
}
assertUnpinned(t, p, bk, "B should be unpinned")
assertUnpinned(t, p, aKeys[1], "A1 should be unpinned")
assertPinned(t, p, aKeys[0], "A0 should still be pinned through C")
}
func TestDuplicateSemantics(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv, dserv)
a, _ := randNode()
err := dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
// pin is recursively
err = p.Pin(ctx, a, true)
if err != nil {
t.Fatal(err)
}
// pinning directly should fail
err = p.Pin(ctx, a, false)
if err == nil {
t.Fatal("expected direct pin to fail")
}
// pinning recursively again should succeed
err = p.Pin(ctx, a, true)
if err != nil {
t.Fatal(err)
}
}
func TestFlush(t *testing.T) {
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv, dserv)
_, k := randNode()
p.PinWithMode(k, Recursive)
if err := p.Flush(context.Background()); err != nil {
t.Fatal(err)
}
assertPinned(t, p, k, "expected key to still be pinned")
}
func TestPinRecursiveFail(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv, dserv)
a, _ := randNode()
b, _ := randNode()
err := a.AddNodeLink("child", b)
if err != nil {
t.Fatal(err)
}
// NOTE: This isnt a time based test, we expect the pin to fail
mctx, cancel := context.WithTimeout(ctx, time.Millisecond)
defer cancel()
err = p.Pin(mctx, a, true)
if err == nil {
t.Fatal("should have failed to pin here")
}
err = dserv.Add(ctx, b)
if err != nil {
t.Fatal(err)
}
err = dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
// this one is time based... but shouldnt cause any issues
mctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()
err = p.Pin(mctx, a, true)
if err != nil {
t.Fatal(err)
}
}
func TestPinUpdate(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv, dserv)
n1, c1 := randNode()
n2, c2 := randNode()
if err := dserv.Add(ctx, n1); err != nil {
t.Fatal(err)
}
if err := dserv.Add(ctx, n2); err != nil {
t.Fatal(err)
}
if err := p.Pin(ctx, n1, true); err != nil {
t.Fatal(err)
}
if err := p.Update(ctx, c1, c2, true); err != nil {
t.Fatal(err)
}
assertPinned(t, p, c2, "c2 should be pinned now")
assertUnpinned(t, p, c1, "c1 should no longer be pinned")
if err := p.Update(ctx, c2, c1, false); err != nil {
t.Fatal(err)
}
assertPinned(t, p, c2, "c2 should be pinned still")
assertPinned(t, p, c1, "c1 should be pinned now")
}

View File

@ -1,297 +0,0 @@
package pin
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"hash/fnv"
"sort"
"github.com/ipfs/go-ipfs/pin/internal/pb"
"github.com/ipfs/go-merkledag"
"github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
const (
// defaultFanout specifies the default number of fan-out links per layer
defaultFanout = 256
// maxItems is the maximum number of items that will fit in a single bucket
maxItems = 8192
)
func hash(seed uint32, c cid.Cid) uint32 {
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], seed)
h := fnv.New32a()
_, _ = h.Write(buf[:])
_, _ = h.Write(c.Bytes())
return h.Sum32()
}
type itemIterator func() (c cid.Cid, ok bool)
type keyObserver func(cid.Cid)
type sortByHash struct {
links []*ipld.Link
}
func (s sortByHash) Len() int {
return len(s.links)
}
func (s sortByHash) Less(a, b int) bool {
return bytes.Compare(s.links[a].Cid.Bytes(), s.links[b].Cid.Bytes()) == -1
}
func (s sortByHash) Swap(a, b int) {
s.links[a], s.links[b] = s.links[b], s.links[a]
}
func storeItems(ctx context.Context, dag ipld.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
links := make([]*ipld.Link, 0, defaultFanout+maxItems)
for i := 0; i < defaultFanout; i++ {
links = append(links, &ipld.Link{Cid: emptyKey})
}
// add emptyKey to our set of internal pinset objects
n := &merkledag.ProtoNode{}
n.SetLinks(links)
internalKeys(emptyKey)
hdr := &pb.Set{
Version: 1,
Fanout: defaultFanout,
Seed: depth,
}
if err := writeHdr(n, hdr); err != nil {
return nil, err
}
if estimatedLen < maxItems {
// it'll probably fit
links := n.Links()
for i := 0; i < maxItems; i++ {
k, ok := iter()
if !ok {
// all done
break
}
links = append(links, &ipld.Link{Cid: k})
}
n.SetLinks(links)
// sort by hash, also swap item Data
s := sortByHash{
links: n.Links()[defaultFanout:],
}
sort.Stable(s)
}
hashed := make([][]cid.Cid, defaultFanout)
for {
// This loop essentially enumerates every single item in the set
// and maps them all into a set of buckets. Each bucket will be recursively
// turned into its own sub-set, and so on down the chain. Each sub-set
// gets added to the dagservice, and put into its place in a set nodes
// links array.
//
// Previously, the bucket was selected by taking an int32 from the hash of
// the input key + seed. This was erroneous as we would later be assigning
// the created sub-sets into an array of length 256 by the modulus of the
// int32 hash value with 256. This resulted in overwriting existing sub-sets
// and losing pins. The fix (a few lines down from this comment), is to
// map the hash value down to the 8 bit keyspace here while creating the
// buckets. This way, we avoid any overlapping later on.
k, ok := iter()
if !ok {
break
}
h := hash(depth, k) % defaultFanout
hashed[h] = append(hashed[h], k)
}
for h, items := range hashed {
if len(items) == 0 {
// recursion base case
continue
}
childIter := getCidListIterator(items)
// recursively create a pinset from the items for this bucket index
child, err := storeItems(ctx, dag, uint64(len(items)), depth+1, childIter, internalKeys)
if err != nil {
return nil, err
}
size, err := child.Size()
if err != nil {
return nil, err
}
err = dag.Add(ctx, child)
if err != nil {
return nil, err
}
childKey := child.Cid()
internalKeys(childKey)
// overwrite the 'empty key' in the existing links array
n.Links()[h] = &ipld.Link{
Cid: childKey,
Size: size,
}
}
return n, nil
}
func readHdr(n *merkledag.ProtoNode) (*pb.Set, error) {
hdrLenRaw, consumed := binary.Uvarint(n.Data())
if consumed <= 0 {
return nil, errors.New("invalid Set header length")
}
pbdata := n.Data()[consumed:]
if hdrLenRaw > uint64(len(pbdata)) {
return nil, errors.New("impossibly large Set header length")
}
// as hdrLenRaw was <= an int, we now know it fits in an int
hdrLen := int(hdrLenRaw)
var hdr pb.Set
if err := proto.Unmarshal(pbdata[:hdrLen], &hdr); err != nil {
return nil, err
}
if v := hdr.GetVersion(); v != 1 {
return nil, fmt.Errorf("unsupported Set version: %d", v)
}
if uint64(hdr.GetFanout()) > uint64(len(n.Links())) {
return nil, errors.New("impossibly large Fanout")
}
return &hdr, nil
}
func writeHdr(n *merkledag.ProtoNode, hdr *pb.Set) error {
hdrData, err := proto.Marshal(hdr)
if err != nil {
return err
}
// make enough space for the length prefix and the marshaled header data
data := make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData))
// write the uvarint length of the header data
uvarlen := binary.PutUvarint(data, uint64(len(hdrData)))
// append the actual protobuf data *after* the length value we wrote
data = append(data[:uvarlen], hdrData...)
n.SetData(data)
return nil
}
type walkerFunc func(idx int, link *ipld.Link) error
func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode, fn walkerFunc, children keyObserver) error {
hdr, err := readHdr(n)
if err != nil {
return err
}
// readHdr guarantees fanout is a safe value
fanout := hdr.GetFanout()
for i, l := range n.Links()[fanout:] {
if err := fn(i, l); err != nil {
return err
}
}
for _, l := range n.Links()[:fanout] {
c := l.Cid
children(c)
if c.Equals(emptyKey) {
continue
}
subtree, err := l.GetNode(ctx, dag)
if err != nil {
return err
}
stpb, ok := subtree.(*merkledag.ProtoNode)
if !ok {
return merkledag.ErrNotProtobuf
}
if err := walkItems(ctx, dag, stpb, fn, children); err != nil {
return err
}
}
return nil
}
func loadSet(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode, name string, internalKeys keyObserver) ([]cid.Cid, error) {
l, err := root.GetNodeLink(name)
if err != nil {
return nil, err
}
lnkc := l.Cid
internalKeys(lnkc)
n, err := l.GetNode(ctx, dag)
if err != nil {
return nil, err
}
pbn, ok := n.(*merkledag.ProtoNode)
if !ok {
return nil, merkledag.ErrNotProtobuf
}
var res []cid.Cid
walk := func(idx int, link *ipld.Link) error {
res = append(res, link.Cid)
return nil
}
if err := walkItems(ctx, dag, pbn, walk, internalKeys); err != nil {
return nil, err
}
return res, nil
}
func getCidListIterator(cids []cid.Cid) itemIterator {
return func() (c cid.Cid, ok bool) {
if len(cids) == 0 {
return cid.Cid{}, false
}
first := cids[0]
cids = cids[1:]
return first, true
}
}
func storeSet(ctx context.Context, dag ipld.DAGService, cids []cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
iter := getCidListIterator(cids)
n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys)
if err != nil {
return nil, err
}
err = dag.Add(ctx, n)
if err != nil {
return nil, err
}
internalKeys(n.Cid())
return n, nil
}

View File

@ -1,101 +0,0 @@
package pin
import (
"context"
"encoding/binary"
"testing"
bserv "github.com/ipfs/go-blockservice"
dag "github.com/ipfs/go-merkledag"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
)
func ignoreCids(_ cid.Cid) {}
func objCount(d ds.Datastore) int {
q := dsq.Query{KeysOnly: true}
res, err := d.Query(q)
if err != nil {
panic(err)
}
var count int
for {
_, ok := res.NextSync()
if !ok {
break
}
count++
}
return count
}
func TestSet(t *testing.T) {
dst := ds.NewMapDatastore()
bstore := blockstore.NewBlockstore(dst)
ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore)))
// this value triggers the creation of a recursive shard.
// If the recursive sharding is done improperly, this will result in
// an infinite recursion and crash (OOM)
limit := uint32((defaultFanout * maxItems) + 1)
var inputs []cid.Cid
buf := make([]byte, 4)
for i := uint32(0); i < limit; i++ {
binary.BigEndian.PutUint32(buf, i)
c := dag.NewRawNode(buf).Cid()
inputs = append(inputs, c)
}
_, err := storeSet(context.Background(), ds, inputs[:len(inputs)-1], ignoreCids)
if err != nil {
t.Fatal(err)
}
objs1 := objCount(dst)
out, err := storeSet(context.Background(), ds, inputs, ignoreCids)
if err != nil {
t.Fatal(err)
}
objs2 := objCount(dst)
if objs2-objs1 > 2 {
t.Fatal("set sharding does not appear to be deterministic")
}
// weird wrapper node because loadSet expects us to pass an
// object pointing to multiple named sets
setroot := &dag.ProtoNode{}
err = setroot.AddNodeLink("foo", out)
if err != nil {
t.Fatal(err)
}
outset, err := loadSet(context.Background(), ds, setroot, "foo", ignoreCids)
if err != nil {
t.Fatal(err)
}
if uint32(len(outset)) != limit {
t.Fatal("got wrong number", len(outset), limit)
}
seen := cid.NewSet()
for _, c := range outset {
seen.Add(c)
}
for _, c := range inputs {
if !seen.Has(c) {
t.Fatalf("expected to have '%s', didnt find it", c)
}
}
}

View File

@ -8,8 +8,8 @@ import (
"io"
"strings"
"github.com/ipfs/go-ipfs/dagutils"
dag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-merkledag/dagutils"
path "github.com/ipfs/go-path"
importer "github.com/ipfs/go-unixfs/importer"
uio "github.com/ipfs/go-unixfs/io"