mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-24 11:57:44 +08:00
EnumerateChildrenAsync has been renamed to WalkParallel to reflect the fact that: 1. It visits the root. 2. It's parallel, not async. To mirror this change, EnumerateChildren has also been renamed to Walk and now behaves the same (except that it's not parallel).
369 lines
8.7 KiB
Go
369 lines
8.7 KiB
Go
package coreunix
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-ipfs/core"
|
|
"github.com/ipfs/go-ipfs/pin/gc"
|
|
"github.com/ipfs/go-ipfs/repo"
|
|
|
|
blocks "github.com/ipfs/go-block-format"
|
|
"github.com/ipfs/go-blockservice"
|
|
cid "github.com/ipfs/go-cid"
|
|
datastore "github.com/ipfs/go-datastore"
|
|
syncds "github.com/ipfs/go-datastore/sync"
|
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
|
config "github.com/ipfs/go-ipfs-config"
|
|
files "github.com/ipfs/go-ipfs-files"
|
|
pi "github.com/ipfs/go-ipfs-posinfo"
|
|
dag "github.com/ipfs/go-merkledag"
|
|
coreiface "github.com/ipfs/interface-go-ipfs-core"
|
|
)
|
|
|
|
const testPeerID = "QmTFauExutTsy4XP6JbMFcw2Wa9645HJt2bTqL6qYDCKfe"
|
|
|
|
func TestAddMultipleGCLive(t *testing.T) {
|
|
r := &repo.Mock{
|
|
C: config.Config{
|
|
Identity: config.Identity{
|
|
PeerID: testPeerID, // required by offline node
|
|
},
|
|
},
|
|
D: syncds.MutexWrap(datastore.NewMapDatastore()),
|
|
}
|
|
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
out := make(chan interface{}, 10)
|
|
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
adder.Out = out
|
|
|
|
// make two files with pipes so we can 'pause' the add for timing of the test
|
|
piper1, pipew1 := io.Pipe()
|
|
hangfile1 := files.NewReaderFile(piper1)
|
|
|
|
piper2, pipew2 := io.Pipe()
|
|
hangfile2 := files.NewReaderFile(piper2)
|
|
|
|
rfc := files.NewBytesFile([]byte("testfileA"))
|
|
|
|
slf := files.NewMapDirectory(map[string]files.Node{
|
|
"a": hangfile1,
|
|
"b": hangfile2,
|
|
"c": rfc,
|
|
})
|
|
|
|
go func() {
|
|
defer close(out)
|
|
_, _ = adder.AddAllAndPin(slf)
|
|
// Ignore errors for clarity - the real bug would be gc'ing files while adding them, not this resultant error
|
|
}()
|
|
|
|
// Start writing the first file but don't close the stream
|
|
if _, err := pipew1.Write([]byte("some data for file a")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var gc1out <-chan gc.Result
|
|
gc1started := make(chan struct{})
|
|
go func() {
|
|
defer close(gc1started)
|
|
gc1out = gc.GC(context.Background(), node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
|
|
}()
|
|
|
|
// GC shouldn't get the lock until after the file is completely added
|
|
select {
|
|
case <-gc1started:
|
|
t.Fatal("gc shouldnt have started yet")
|
|
default:
|
|
}
|
|
|
|
// finish write and unblock gc
|
|
pipew1.Close()
|
|
|
|
// Should have gotten the lock at this point
|
|
<-gc1started
|
|
|
|
removedHashes := make(map[string]struct{})
|
|
for r := range gc1out {
|
|
if r.Error != nil {
|
|
t.Fatal(err)
|
|
}
|
|
removedHashes[r.KeyRemoved.String()] = struct{}{}
|
|
}
|
|
|
|
if _, err := pipew2.Write([]byte("some data for file b")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var gc2out <-chan gc.Result
|
|
gc2started := make(chan struct{})
|
|
go func() {
|
|
defer close(gc2started)
|
|
gc2out = gc.GC(context.Background(), node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
|
|
}()
|
|
|
|
select {
|
|
case <-gc2started:
|
|
t.Fatal("gc shouldnt have started yet")
|
|
default:
|
|
}
|
|
|
|
pipew2.Close()
|
|
|
|
<-gc2started
|
|
|
|
for r := range gc2out {
|
|
if r.Error != nil {
|
|
t.Fatal(err)
|
|
}
|
|
removedHashes[r.KeyRemoved.String()] = struct{}{}
|
|
}
|
|
|
|
for o := range out {
|
|
if _, ok := removedHashes[o.(*coreiface.AddEvent).Path.Cid().String()]; ok {
|
|
t.Fatal("gc'ed a hash we just added")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestAddGCLive(t *testing.T) {
|
|
r := &repo.Mock{
|
|
C: config.Config{
|
|
Identity: config.Identity{
|
|
PeerID: testPeerID, // required by offline node
|
|
},
|
|
},
|
|
D: syncds.MutexWrap(datastore.NewMapDatastore()),
|
|
}
|
|
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
out := make(chan interface{})
|
|
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
adder.Out = out
|
|
|
|
rfa := files.NewBytesFile([]byte("testfileA"))
|
|
|
|
// make two files with pipes so we can 'pause' the add for timing of the test
|
|
piper, pipew := io.Pipe()
|
|
hangfile := files.NewReaderFile(piper)
|
|
|
|
rfd := files.NewBytesFile([]byte("testfileD"))
|
|
|
|
slf := files.NewMapDirectory(map[string]files.Node{
|
|
"a": rfa,
|
|
"b": hangfile,
|
|
"d": rfd,
|
|
})
|
|
|
|
addDone := make(chan struct{})
|
|
go func() {
|
|
defer close(addDone)
|
|
defer close(out)
|
|
_, err := adder.AddAllAndPin(slf)
|
|
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
}()
|
|
|
|
addedHashes := make(map[string]struct{})
|
|
select {
|
|
case o := <-out:
|
|
addedHashes[o.(*coreiface.AddEvent).Path.Cid().String()] = struct{}{}
|
|
case <-addDone:
|
|
t.Fatal("add shouldnt complete yet")
|
|
}
|
|
|
|
var gcout <-chan gc.Result
|
|
gcstarted := make(chan struct{})
|
|
go func() {
|
|
defer close(gcstarted)
|
|
gcout = gc.GC(context.Background(), node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
|
|
}()
|
|
|
|
// gc shouldnt start until we let the add finish its current file.
|
|
if _, err := pipew.Write([]byte("some data for file b")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
select {
|
|
case <-gcstarted:
|
|
t.Fatal("gc shouldnt have started yet")
|
|
default:
|
|
}
|
|
|
|
time.Sleep(time.Millisecond * 100) // make sure gc gets to requesting lock
|
|
|
|
// finish write and unblock gc
|
|
pipew.Close()
|
|
|
|
// receive next object from adder
|
|
o := <-out
|
|
addedHashes[o.(*coreiface.AddEvent).Path.Cid().String()] = struct{}{}
|
|
|
|
<-gcstarted
|
|
|
|
for r := range gcout {
|
|
if r.Error != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if _, ok := addedHashes[r.KeyRemoved.String()]; ok {
|
|
t.Fatal("gc'ed a hash we just added")
|
|
}
|
|
}
|
|
|
|
var last cid.Cid
|
|
for a := range out {
|
|
// wait for it to finish
|
|
c, err := cid.Decode(a.(*coreiface.AddEvent).Path.Cid().String())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
last = c
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
|
|
set := cid.NewSet()
|
|
err = dag.Walk(ctx, dag.GetLinksWithDAG(node.DAG), last, set.Visit)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func testAddWPosInfo(t *testing.T, rawLeaves bool) {
|
|
r := &repo.Mock{
|
|
C: config.Config{
|
|
Identity: config.Identity{
|
|
PeerID: testPeerID, // required by offline node
|
|
},
|
|
},
|
|
D: syncds.MutexWrap(datastore.NewMapDatastore()),
|
|
}
|
|
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: filepath.Join(os.TempDir(), "foo.txt"), t: t}
|
|
bserv := blockservice.New(bs, node.Exchange)
|
|
dserv := dag.NewDAGService(bserv)
|
|
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
out := make(chan interface{})
|
|
adder.Out = out
|
|
adder.Progress = true
|
|
adder.RawLeaves = rawLeaves
|
|
adder.NoCopy = true
|
|
|
|
data := make([]byte, 5*1024*1024)
|
|
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
|
|
fileData := ioutil.NopCloser(bytes.NewBuffer(data))
|
|
fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()}
|
|
file, _ := files.NewReaderPathFile(filepath.Join(os.TempDir(), "foo.txt"), fileData, &fileInfo)
|
|
|
|
go func() {
|
|
defer close(adder.Out)
|
|
_, err = adder.AddAllAndPin(file)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}()
|
|
for range out {
|
|
}
|
|
|
|
exp := 0
|
|
nonOffZero := 0
|
|
if rawLeaves {
|
|
exp = 1
|
|
nonOffZero = 19
|
|
}
|
|
if bs.countAtOffsetZero != exp {
|
|
t.Fatalf("expected %d blocks with an offset at zero (one root and one leaf), got %d", exp, bs.countAtOffsetZero)
|
|
}
|
|
if bs.countAtOffsetNonZero != nonOffZero {
|
|
// note: the exact number will depend on the size and the sharding algo. used
|
|
t.Fatalf("expected %d blocks with an offset > 0, got %d", nonOffZero, bs.countAtOffsetNonZero)
|
|
}
|
|
}
|
|
|
|
func TestAddWPosInfo(t *testing.T) {
|
|
testAddWPosInfo(t, false)
|
|
}
|
|
|
|
func TestAddWPosInfoAndRawLeafs(t *testing.T) {
|
|
testAddWPosInfo(t, true)
|
|
}
|
|
|
|
type testBlockstore struct {
|
|
blockstore.GCBlockstore
|
|
expectedPath string
|
|
t *testing.T
|
|
countAtOffsetZero int
|
|
countAtOffsetNonZero int
|
|
}
|
|
|
|
func (bs *testBlockstore) Put(block blocks.Block) error {
|
|
bs.CheckForPosInfo(block)
|
|
return bs.GCBlockstore.Put(block)
|
|
}
|
|
|
|
func (bs *testBlockstore) PutMany(blocks []blocks.Block) error {
|
|
for _, blk := range blocks {
|
|
bs.CheckForPosInfo(blk)
|
|
}
|
|
return bs.GCBlockstore.PutMany(blocks)
|
|
}
|
|
|
|
func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) {
|
|
fsn, ok := block.(*pi.FilestoreNode)
|
|
if ok {
|
|
posInfo := fsn.PosInfo
|
|
if posInfo.FullPath != bs.expectedPath {
|
|
bs.t.Fatal("PosInfo does not have the expected path")
|
|
}
|
|
if posInfo.Offset == 0 {
|
|
bs.countAtOffsetZero += 1
|
|
} else {
|
|
bs.countAtOffsetNonZero += 1
|
|
}
|
|
}
|
|
}
|
|
|
|
type dummyFileInfo struct {
|
|
name string
|
|
size int64
|
|
modTime time.Time
|
|
}
|
|
|
|
func (fi *dummyFileInfo) Name() string { return fi.name }
|
|
func (fi *dummyFileInfo) Size() int64 { return fi.size }
|
|
func (fi *dummyFileInfo) Mode() os.FileMode { return 0 }
|
|
func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime }
|
|
func (fi *dummyFileInfo) IsDir() bool { return false }
|
|
func (fi *dummyFileInfo) Sys() interface{} { return nil }
|