mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-22 10:57:42 +08:00
Make pinset sharding deterministic
Making this deterministic keeps us from creating an exponential amount of objects as the number of pins in the set increases. License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
parent
65e0704e1b
commit
728ff6dd21
23
pin/set.go
23
pin/set.go
@ -3,7 +3,6 @@ package pin
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -26,14 +25,6 @@ const (
|
||||
maxItems = 8192
|
||||
)
|
||||
|
||||
func randomSeed() (uint32, error) {
|
||||
var buf [4]byte
|
||||
if _, err := rand.Read(buf[:]); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return binary.LittleEndian.Uint32(buf[:]), nil
|
||||
}
|
||||
|
||||
func hash(seed uint32, c *cid.Cid) uint32 {
|
||||
var buf [4]byte
|
||||
binary.LittleEndian.PutUint32(buf[:], seed)
|
||||
@ -63,11 +54,7 @@ func (s sortByHash) Swap(a, b int) {
|
||||
s.links[a], s.links[b] = s.links[b], s.links[a]
|
||||
}
|
||||
|
||||
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
|
||||
seed, err := randomSeed()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
|
||||
links := make([]*node.Link, 0, defaultFanout+maxItems)
|
||||
for i := 0; i < defaultFanout; i++ {
|
||||
links = append(links, &node.Link{Cid: emptyKey})
|
||||
@ -82,7 +69,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
|
||||
hdr := &pb.Set{
|
||||
Version: proto.Uint32(1),
|
||||
Fanout: proto.Uint32(defaultFanout),
|
||||
Seed: proto.Uint32(seed),
|
||||
Seed: proto.Uint32(depth),
|
||||
}
|
||||
if err := writeHdr(n, hdr); err != nil {
|
||||
return nil, err
|
||||
@ -129,7 +116,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
h := hash(seed, k) % defaultFanout
|
||||
h := hash(depth, k) % defaultFanout
|
||||
hashed[h] = append(hashed[h], k)
|
||||
}
|
||||
|
||||
@ -142,7 +129,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
|
||||
childIter := getCidListIterator(items)
|
||||
|
||||
// recursively create a pinset from the items for this bucket index
|
||||
child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys)
|
||||
child, err := storeItems(ctx, dag, uint64(len(items)), depth+1, childIter, internalKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -296,7 +283,7 @@ func getCidListIterator(cids []*cid.Cid) itemIterator {
|
||||
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
|
||||
iter := getCidListIterator(cids)
|
||||
|
||||
n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys)
|
||||
n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -2,40 +2,75 @@ package pin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||
bserv "github.com/ipfs/go-ipfs/blockservice"
|
||||
offline "github.com/ipfs/go-ipfs/exchange/offline"
|
||||
dag "github.com/ipfs/go-ipfs/merkledag"
|
||||
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
|
||||
|
||||
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
|
||||
dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
|
||||
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
|
||||
)
|
||||
|
||||
func ignoreCids(_ *cid.Cid) {}
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
ds := mdtest.Mock()
|
||||
limit := 10000 // 10000 reproduces the pinloss issue fairly reliably
|
||||
|
||||
if os.Getenv("STRESS_IT_OUT_YO") != "" {
|
||||
limit = 10000000
|
||||
func objCount(d ds.Datastore) int {
|
||||
q := dsq.Query{KeysOnly: true}
|
||||
res, err := d.Query(q)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var inputs []*cid.Cid
|
||||
for i := 0; i < limit; i++ {
|
||||
c, err := ds.Add(dag.NodeWithData([]byte(fmt.Sprint(i))))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
var count int
|
||||
for {
|
||||
_, ok := res.NextSync()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
count++
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
dst := ds.NewMapDatastore()
|
||||
bstore := blockstore.NewBlockstore(dst)
|
||||
ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore)))
|
||||
|
||||
// this value triggers the creation of a recursive shard.
|
||||
// If the recursive sharding is done improperly, this will result in
|
||||
// an infinite recursion and crash (OOM)
|
||||
limit := uint32((defaultFanout * maxItems) + 1)
|
||||
|
||||
var inputs []*cid.Cid
|
||||
buf := make([]byte, 4)
|
||||
for i := uint32(0); i < limit; i++ {
|
||||
binary.BigEndian.PutUint32(buf, i)
|
||||
c := dag.NewRawNode(buf).Cid()
|
||||
inputs = append(inputs, c)
|
||||
}
|
||||
|
||||
_, err := storeSet(context.Background(), ds, inputs[:len(inputs)-1], ignoreCids)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
objs1 := objCount(dst)
|
||||
|
||||
out, err := storeSet(context.Background(), ds, inputs, ignoreCids)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
objs2 := objCount(dst)
|
||||
if objs2-objs1 > 2 {
|
||||
t.Fatal("set sharding does not appear to be deterministic")
|
||||
}
|
||||
|
||||
// weird wrapper node because loadSet expects us to pass an
|
||||
// object pointing to multiple named sets
|
||||
setroot := &dag.ProtoNode{}
|
||||
@ -49,7 +84,7 @@ func TestSet(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(outset) != limit {
|
||||
if uint32(len(outset)) != limit {
|
||||
t.Fatal("got wrong number", len(outset), limit)
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user