pin: follow async pinner changes

See https://github.com/ipfs/boxo/pull/290

This PR follow the changes in the Pinner to make listing recursive and direct pins asynchronous, which in turns allow pin/ls to build and emit results without having to wait anything, or accumulate too much in memory.

Note: there is a tradeoff for pin/ls?type=all:
- keep the recursive pins in memory (which I chose)
- ask the pinner twice for the recursive pins, and limit memory usage

Also, follow the changes in the GC with similar benefit of not having to wait the full pin list. Add a test.
Also, follow the changes in pin.Verify.
This commit is contained in:
Michael Muré 2023-05-04 13:50:02 +02:00 committed by Michael Muré
parent a6f446a4ba
commit a197125b8f
No known key found for this signature in database
GPG Key ID: A4457C029293126F
6 changed files with 223 additions and 95 deletions

View File

@ -675,10 +675,6 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins, err := n.Pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
var checkPin func(root cid.Cid) PinStatus
checkPin = func(root cid.Cid) PinStatus {
@ -722,11 +718,15 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
out := make(chan interface{})
go func() {
defer close(out)
for _, cid := range recPins {
pinStatus := checkPin(cid)
for p := range n.Pinning.RecursiveKeys(ctx) {
if p.Err != nil {
out <- p.Err
return
}
pinStatus := checkPin(p.C)
if !pinStatus.Ok || opts.includeOk {
select {
case out <- &PinVerifyRes{enc.Encode(cid), pinStatus}:
case out <- &PinVerifyRes{enc.Encode(p.C), pinStatus}:
case <-ctx.Done():
return
}

View File

@ -12,9 +12,10 @@ import (
"github.com/ipfs/boxo/ipld/merkledag"
pin "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/go-cid"
"github.com/ipfs/kubo/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/ipfs/kubo/tracing"
)
type PinAPI CoreAPI
@ -156,6 +157,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
}
type pinStatus struct {
err error
cid cid.Cid
ok bool
badNodes []coreiface.BadPinNode
@ -175,6 +177,10 @@ func (s *pinStatus) BadNodes() []coreiface.BadPinNode {
return s.badNodes
}
func (s *pinStatus) Err() error {
return s.err
}
func (n *badNode) Path() path.Resolved {
return n.path
}
@ -191,10 +197,6 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG)
recPins, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
var checkPin func(root cid.Cid) *pinStatus
checkPin = func(root cid.Cid) *pinStatus {
@ -229,8 +231,18 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
out := make(chan coreiface.PinStatus)
go func() {
defer close(out)
for _, c := range recPins {
out <- checkPin(c)
for p := range api.pinning.RecursiveKeys(ctx) {
var res *pinStatus
if p.Err != nil {
res = &pinStatus{err: p.Err}
} else {
res = checkPin(p.C)
}
select {
case <-ctx.Done():
return
case out <- res:
}
}
}()
@ -262,63 +274,57 @@ func (p *pinInfo) Err() error {
func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreiface.Pin {
out := make(chan coreiface.Pin, 1)
keys := cid.NewSet()
emittedSet := cid.NewSet()
AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
if keys.Visit(c) {
select {
case out <- &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
}:
case <-ctx.Done():
return ctx.Err()
}
AddToResultKeys := func(c cid.Cid, typeStr string) error {
if emittedSet.Visit(c) {
select {
case out <- &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
VisitKeys := func(keyList []cid.Cid) {
for _, c := range keyList {
keys.Visit(c)
}
}
go func() {
defer close(out)
var dkeys, rkeys []cid.Cid
var rkeys []cid.Cid
var err error
if typeStr == "recursive" || typeStr == "all" {
rkeys, err = api.pinning.RecursiveKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
}
if err = AddToResultKeys(rkeys, "recursive"); err != nil {
out <- &pinInfo{err: err}
return
for streamedCid := range api.pinning.RecursiveKeys(ctx) {
if streamedCid.Err != nil {
out <- &pinInfo{err: streamedCid.Err}
return
}
if err = AddToResultKeys(streamedCid.C, "recursive"); err != nil {
out <- &pinInfo{err: err}
return
}
}
}
if typeStr == "direct" || typeStr == "all" {
dkeys, err = api.pinning.DirectKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
}
if err = AddToResultKeys(dkeys, "direct"); err != nil {
out <- &pinInfo{err: err}
return
for streamedCid := range api.pinning.DirectKeys(ctx) {
if streamedCid.Err != nil {
out <- &pinInfo{err: streamedCid.Err}
return
}
if err = AddToResultKeys(streamedCid.C, "direct"); err != nil {
out <- &pinInfo{err: err}
return
}
}
}
if typeStr == "all" {
set := cid.NewSet()
walkingSet := cid.NewSet()
for _, k := range rkeys {
err = merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
walkingSet.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
@ -326,7 +332,10 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac
return
}
}
if err = AddToResultKeys(set.Keys(), "indirect"); err != nil {
err = walkingSet.ForEach(func(c cid.Cid) error {
return AddToResultKeys(c, "indirect")
})
if err != nil {
out <- &pinInfo{err: err}
return
}
@ -335,25 +344,27 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac
// We need to first visit the direct pins that have priority
// without emitting them
dkeys, err = api.pinning.DirectKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
for streamedCid := range api.pinning.DirectKeys(ctx) {
if streamedCid.Err != nil {
out <- &pinInfo{err: streamedCid.Err}
return
}
emittedSet.Add(streamedCid.C)
}
VisitKeys(dkeys)
rkeys, err = api.pinning.RecursiveKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
for streamedCid := range api.pinning.RecursiveKeys(ctx) {
if streamedCid.Err != nil {
out <- &pinInfo{err: streamedCid.Err}
return
}
emittedSet.Add(streamedCid.C)
}
VisitKeys(rkeys)
set := cid.NewSet()
walkingSet := cid.NewSet()
for _, k := range rkeys {
err = merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
walkingSet.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
@ -361,7 +372,10 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac
return
}
}
if err = AddToResultKeys(set.Keys(), "indirect"); err != nil {
err = emittedSet.ForEach(func(c cid.Cid) error {
return AddToResultKeys(c, "indirect")
})
if err != nil {
out <- &pinInfo{err: err}
return
}

View File

@ -154,7 +154,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn
// Descendants recursively finds all the descendants of the given roots and
// adds them to the given cid.Set, using the provided dag.GetLinks function
// to walk the tree.
func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []cid.Cid) error {
func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots <-chan pin.StreamedCid) error {
verifyGetLinks := func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
err := verifcid.ValidateCid(c)
if err != nil {
@ -167,7 +167,7 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots
verboseCidError := func(err error) error {
if strings.Contains(err.Error(), verifcid.ErrBelowMinimumHashLength.Error()) ||
strings.Contains(err.Error(), verifcid.ErrPossiblyInsecureHashFunction.Error()) {
err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ //nolint
err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ // nolint
" to list insecure hashes. If you want to read them,"+
" please downgrade your go-ipfs to 0.4.13\n", err)
log.Error(err)
@ -175,19 +175,29 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots
return err
}
for _, c := range roots {
// Walk recursively walks the dag and adds the keys to the given set
err := dag.Walk(ctx, verifyGetLinks, c, func(k cid.Cid) bool {
return set.Visit(toCidV1(k))
}, dag.Concurrent())
for {
select {
case <-ctx.Done():
return ctx.Err()
case wrapper, ok := <-roots:
if !ok {
return nil
}
if wrapper.Err != nil {
return wrapper.Err
}
if err != nil {
err = verboseCidError(err)
return err
// Walk recursively walks the dag and adds the keys to the given set
err := dag.Walk(ctx, verifyGetLinks, wrapper.C, func(k cid.Cid) bool {
return set.Visit(toCidV1(k))
}, dag.Concurrent())
if err != nil {
err = verboseCidError(err)
return err
}
}
}
return nil
}
// toCidV1 converts any CIDv0s to CIDv1s.
@ -217,11 +227,8 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
return links, nil
}
rkeys, err := pn.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
err = Descendants(ctx, getLinks, gcs, rkeys)
rkeys := pn.RecursiveKeys(ctx)
err := Descendants(ctx, getLinks, gcs, rkeys)
if err != nil {
errors = true
select {
@ -243,7 +250,18 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
return links, nil
}
err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRoots)
bestEffortRootsChan := make(chan pin.StreamedCid)
go func() {
defer close(bestEffortRootsChan)
for _, root := range bestEffortRoots {
select {
case <-ctx.Done():
return
case bestEffortRootsChan <- pin.StreamedCid{C: root}:
}
}
}()
err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRootsChan)
if err != nil {
errors = true
select {
@ -253,18 +271,15 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
}
dkeys, err := pn.DirectKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range dkeys {
gcs.Add(toCidV1(k))
dkeys := pn.DirectKeys(ctx)
for k := range dkeys {
if k.Err != nil {
return nil, k.Err
}
gcs.Add(toCidV1(k.C))
}
ikeys, err := pn.InternalPins(ctx)
if err != nil {
return nil, err
}
ikeys := pn.InternalPins(ctx)
err = Descendants(ctx, getLinks, gcs, ikeys)
if err != nil {
errors = true

96
gc/gc_test.go Normal file
View File

@ -0,0 +1,96 @@
package gc
import (
"context"
"testing"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange/offline"
"github.com/ipfs/boxo/ipld/merkledag"
mdutils "github.com/ipfs/boxo/ipld/merkledag/test"
pin "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/boxo/pinning/pinner/dspinner"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)
func TestGC(t *testing.T) {
ctx := context.Background()
ds := dssync.MutexWrap(datastore.NewMapDatastore())
bs := blockstore.NewGCBlockstore(blockstore.NewBlockstore(ds), blockstore.NewGCLocker())
bserv := blockservice.New(bs, offline.Exchange(bs))
dserv := merkledag.NewDAGService(bserv)
pinner, err := dspinner.New(ctx, ds, dserv)
require.NoError(t, err)
daggen := mdutils.NewDAGGenerator()
var expectedKept []multihash.Multihash
var expectedDiscarded []multihash.Multihash
// add some pins
for i := 0; i < 5; i++ {
// direct
root, _, err := daggen.MakeDagNode(dserv.Add, 0, 1)
require.NoError(t, err)
err = pinner.PinWithMode(ctx, root, pin.Direct)
require.NoError(t, err)
expectedKept = append(expectedKept, root.Hash())
// recursive
root, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2)
require.NoError(t, err)
err = pinner.PinWithMode(ctx, root, pin.Recursive)
require.NoError(t, err)
expectedKept = append(expectedKept, toMHs(allCids)...)
}
err = pinner.Flush(ctx)
require.NoError(t, err)
// add more dags to be GCed
for i := 0; i < 5; i++ {
_, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2)
require.NoError(t, err)
expectedDiscarded = append(expectedDiscarded, toMHs(allCids)...)
}
// and some other as "best effort roots"
var bestEffortRoots []cid.Cid
for i := 0; i < 5; i++ {
root, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2)
require.NoError(t, err)
bestEffortRoots = append(bestEffortRoots, root)
expectedKept = append(expectedKept, toMHs(allCids)...)
}
ch := GC(ctx, bs, ds, pinner, bestEffortRoots)
var discarded []multihash.Multihash
for res := range ch {
require.NoError(t, res.Error)
discarded = append(discarded, res.KeyRemoved.Hash())
}
allKeys, err := bs.AllKeysChan(ctx)
require.NoError(t, err)
var kept []multihash.Multihash
for key := range allKeys {
kept = append(kept, key.Hash())
}
require.ElementsMatch(t, expectedDiscarded, discarded)
require.ElementsMatch(t, expectedKept, kept)
}
func toMHs(cids []cid.Cid) []multihash.Multihash {
res := make([]multihash.Multihash, len(cids))
for i, c := range cids {
res[i] = c.Hash()
}
return res
}

3
go.mod
View File

@ -1,5 +1,8 @@
module github.com/ipfs/kubo
// https://github.com/ipfs/boxo/pull/290
replace github.com/ipfs/boxo => github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f
require (
bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc
contrib.go.opencensus.io/exporter/prometheus v0.4.2

4
go.sum
View File

@ -49,6 +49,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Kubuxu/go-os-helper v0.0.1 h1:EJiD2VUQyh5A9hWJLmc6iWg6yIcJ7jpBcwC8GMGXfDk=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f h1:2UbpOJ6cIC43V/hIDxgvP0VLbJIk+cBofPAWmXBlSrg=
github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
@ -356,8 +358,6 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866 h1:ThRTXD/EyoLb/jz+YW+ZlOLbjX9FyaxP0dEpgUp3cCE=
github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=