kubo/core/coreunix/add_test.go
Marcin Rataj 1301710a91
fix(ci): parallelize gotest, cleanup output, flakiness (#11113)
* ci: parallelize gotest by separating test/cli into own job

split the Go Test workflow into two parallel jobs:
- `unit-tests`: runs unit tests (excluding test/cli)
- `cli-tests`: runs test/cli end-to-end tests

test/cli takes ~3 minutes (~50% of total gotest time), so running
it in parallel should reduce wall-clock CI time by ~1.5-2.5 minutes.

both jobs produce JUnit XML and HTML reports for consistent debugging.

* ci(gotest): reduce noise on test timeout panics

add GOTRACEBACK=single to show only one goroutine stack instead of all
when a test timeout panic occurs. this makes CI output much cleaner
when tests hang.

* fix(ci): prevent stderr from corrupting test JSON output

- remove 2>&1 which mixed "go: downloading" stderr messages into JSON
- add JSON validation before parsing
- print failed test names for easier debugging

* ci(gotest): use gotestsum for human-readable test output

- replace per-package coverage loop with single gotestsum invocation
- both unit-tests and cli-tests now show human-readable output
- simplified coverage collection (single -coverprofile, no gocovmerge)
- clarified step names to indicate they run tests

* ci: fix codecov uploads by adding token

- add CODECOV_TOKEN to gotest.yml and sharness.yml
- update codecov-action to v5.5.2
- add fail_ci_if_error: false for robustness

codecov stopped receiving coverage data ~1 year ago when they
started requiring tokens for public repos

* refactor(make): add test_unit and test_cli targets

- add `make test_unit` for unit tests with coverage (used by CI)
- add `make test_cli` for CLI integration tests (used by CI)
- only disable colors when CI env var is set (local dev gets colors)
- remove legacy targets: test_go_test, test_go_short, test_go_race, test_go_expensive
- update gotest.yml to use make targets instead of inline commands
- add test artifacts to .gitignore

* fix(ci): move client/rpc tests to cli-tests job

client/rpc tests use test/cli/harness which requires the ipfs binary.
Move them from test_unit to test_cli where the binary is built.

also:
- update gotestsum to v1.13.0
- simplify workflow step names

* fix(ci): use build tags when listing test packages

go list needs build tags to properly exclude packages like fuse/mfs
when running with TEST_FUSE=0 (nofuse tag).

* fix(ci): move test/integration to cli-tests job

test/integration tests need the ipfs binary, move them from test_unit
to test_cli.

* fix(test): fix flaky kubo-as-a-library and GetClosestPeers tests

kubo-as-a-library: use `Bootstrap()` instead of raw `Swarm().Connect()`
to fix race condition between swarm connection and bitswap peer
discovery. `Bootstrap()` properly integrates peers into the routing
system, ensuring bitswap learns about connected peers synchronously.

GetClosestPeers: simplify retry logic using `EventuallyWithT` with
10-minute timeout. tests all 4 routing types (`auto`, `autoclient`,
`dht`, `dhtclient`) against real bootstrap peers with patient polling.

* fix(example): use bidirectional Swarm().Connect() for reliable bitswap

- connect nodes bidirectionally (A→B and B→A) to simulate mutual peering
- mutual peering protects connection from resource manager culling
- use port 0 for random available ports (avoids CI conflicts)
- enable LoopbackAddressesOnLanDHT for local testing
- move retry logic to test file using require.Eventually

* fix(ci): add test_examples target and parallel example-tests job

- add `make test_examples` target to mk/golang.mk for consistency with test_unit/test_cli
- move example tests to separate parallel CI job (example-tests)
- example: use Bootstrap() with autoconf.FallbackBootstrapPeers for reliable bitswap
- example: increase context timeout to 10 minutes
- test: add 60s per-request timeout to GetClosestPeers (server has 30s routing timeout)
- test: reduce EventuallyWithT to 3 minutes (locally passes in under 1 minute)

* fix(ci): improve test targets, exclusion patterns, and artifact naming

- define COVERPKG_EXCLUDE and UNIT_EXCLUDE as documented variables
- use grep -vE with single regex instead of multiple grep -v calls
- add mkdir -p before rm to ensure directories exist
- add DEPS_GO dependency to test_cli target
- make CLI test timeout configurable via TEST_CLI_TIMEOUT (default 10m)
- fix test_examples cleanup on failure using subshell
- reduce GetClosestPeers test wait time from 3m to 2m
- rename artifacts to match job names: unit-tests-{junit,html}, cli-tests-{junit,html}
- update cli-tests upload-artifact from v5 to v6

* fix(ci): fix unit test exclusion and speed up example test

- fix UNIT_EXCLUDE regex to match client/rpc at end of path
- remove public bootstrap peers from example (only connect to nodeA)
- example test now runs in ~3s instead of timing out

* fix(test): fix flaky TestAddMultipleGCLive race condition

added time.Sleep after spawning GC goroutines to ensure they reach
GCLock() before the test proceeds. without this, the adder's
maybePauseForGC() might check GCRequested() before GC has even
requested the lock, causing the lock to not be released and GC to
block indefinitely.

this matches the existing pattern in TestAddGCLive which already
had this sleep.

also replaced context.Background() with t.Context() in both
TestAddMultipleGCLive and TestAddGCLive for proper test lifecycle
management.

* fix(example): use test harness settings for reliable CI

the kubo-as-a-library example was flaky on CI. applied test-harness-like
settings that match what transports_test.go uses:

- TCP-only on 127.0.0.1 with random port (no QUIC/UDP)
- explicitly disable non-TCP transports (QUIC, Relay, WebTransport, etc)
- use NilRouterOption (no routing) since we connect peers directly
- bitswap works with directly connected peers without DHT lookups
- 2-minute context timeout
- streaming output in test for debugging
2026-01-08 05:07:08 +01:00

386 lines
9.4 KiB
Go

package coreunix
import (
"bytes"
"context"
"io"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
"github.com/ipfs/kubo/core"
"github.com/ipfs/kubo/gc"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/files"
pi "github.com/ipfs/boxo/filestore/posinfo"
dag "github.com/ipfs/boxo/ipld/merkledag"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
config "github.com/ipfs/kubo/config"
coreiface "github.com/ipfs/kubo/core/coreiface"
)
const testPeerID = "QmTFauExutTsy4XP6JbMFcw2Wa9645HJt2bTqL6qYDCKfe"
func TestAddMultipleGCLive(t *testing.T) {
ctx := t.Context()
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: testPeerID, // required by offline node
},
},
D: syncds.MutexWrap(datastore.NewMapDatastore()),
}
node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}
out := make(chan interface{}, 10)
adder, err := NewAdder(ctx, node.Pinning, node.Blockstore, node.DAG)
if err != nil {
t.Fatal(err)
}
adder.Out = out
// make two files with pipes so we can 'pause' the add for timing of the test
piper1, pipew1 := io.Pipe()
hangfile1 := files.NewReaderFile(piper1)
piper2, pipew2 := io.Pipe()
hangfile2 := files.NewReaderFile(piper2)
rfc := files.NewBytesFile([]byte("testfileA"))
slf := files.NewMapDirectory(map[string]files.Node{
"a": hangfile1,
"b": hangfile2,
"c": rfc,
})
go func() {
defer close(out)
_, _ = adder.AddAllAndPin(ctx, slf)
// Ignore errors for clarity - the real bug would be gc'ing files while adding them, not this resultant error
}()
// Start writing the first file but don't close the stream
if _, err := pipew1.Write([]byte("some data for file a")); err != nil {
t.Fatal(err)
}
var gc1out <-chan gc.Result
gc1started := make(chan struct{})
go func() {
defer close(gc1started)
gc1out = gc.GC(ctx, node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
}()
// Give GC goroutine time to reach GCLock (will block there waiting for adder)
time.Sleep(time.Millisecond * 100)
// GC shouldn't get the lock until after the file is completely added
select {
case <-gc1started:
t.Fatal("gc shouldn't have started yet")
default:
}
// finish write and unblock gc
pipew1.Close()
// Wait for GC to acquire the lock
// The adder needs to finish processing file 'a' and call maybePauseForGC
// when starting file 'b' before GC can proceed
select {
case <-gc1started:
// GC got the lock as expected
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for GC to start - possible deadlock")
}
removedHashes := make(map[string]struct{})
for r := range gc1out {
if r.Error != nil {
t.Fatal(err)
}
removedHashes[r.KeyRemoved.String()] = struct{}{}
}
if _, err := pipew2.Write([]byte("some data for file b")); err != nil {
t.Fatal(err)
}
var gc2out <-chan gc.Result
gc2started := make(chan struct{})
go func() {
defer close(gc2started)
gc2out = gc.GC(ctx, node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
}()
// Give GC goroutine time to reach GCLock
time.Sleep(time.Millisecond * 100)
select {
case <-gc2started:
t.Fatal("gc shouldn't have started yet")
default:
}
pipew2.Close()
// Wait for second GC to acquire the lock
// The adder needs to finish processing file 'b' and call maybePauseForGC
// when starting file 'c' before GC can proceed
select {
case <-gc2started:
// GC got the lock as expected
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for second GC to start - possible deadlock")
}
for r := range gc2out {
if r.Error != nil {
t.Fatal(err)
}
removedHashes[r.KeyRemoved.String()] = struct{}{}
}
for o := range out {
if _, ok := removedHashes[o.(*coreiface.AddEvent).Path.RootCid().String()]; ok {
t.Fatal("gc'ed a hash we just added")
}
}
}
func TestAddGCLive(t *testing.T) {
ctx := t.Context()
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: testPeerID, // required by offline node
},
},
D: syncds.MutexWrap(datastore.NewMapDatastore()),
}
node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}
out := make(chan interface{})
adder, err := NewAdder(ctx, node.Pinning, node.Blockstore, node.DAG)
if err != nil {
t.Fatal(err)
}
adder.Out = out
rfa := files.NewBytesFile([]byte("testfileA"))
// make two files with pipes so we can 'pause' the add for timing of the test
piper, pipew := io.Pipe()
hangfile := files.NewReaderFile(piper)
rfd := files.NewBytesFile([]byte("testfileD"))
slf := files.NewMapDirectory(map[string]files.Node{
"a": rfa,
"b": hangfile,
"d": rfd,
})
addDone := make(chan struct{})
go func() {
defer close(addDone)
defer close(out)
_, err := adder.AddAllAndPin(ctx, slf)
if err != nil {
t.Error(err)
}
}()
addedHashes := make(map[string]struct{})
select {
case o := <-out:
addedHashes[o.(*coreiface.AddEvent).Path.RootCid().String()] = struct{}{}
case <-addDone:
t.Fatal("add shouldn't complete yet")
}
var gcout <-chan gc.Result
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcout = gc.GC(ctx, node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
}()
// gc shouldn't start until we let the add finish its current file.
if _, err := pipew.Write([]byte("some data for file b")); err != nil {
t.Fatal(err)
}
select {
case <-gcstarted:
t.Fatal("gc shouldn't have started yet")
default:
}
time.Sleep(time.Millisecond * 100) // make sure gc gets to requesting lock
// finish write and unblock gc
pipew.Close()
// receive next object from adder
o := <-out
addedHashes[o.(*coreiface.AddEvent).Path.RootCid().String()] = struct{}{}
<-gcstarted
for r := range gcout {
if r.Error != nil {
t.Fatal(err)
}
if _, ok := addedHashes[r.KeyRemoved.String()]; ok {
t.Fatal("gc'ed a hash we just added")
}
}
var last cid.Cid
for a := range out {
// wait for it to finish
c, err := cid.Decode(a.(*coreiface.AddEvent).Path.RootCid().String())
if err != nil {
t.Fatal(err)
}
last = c
}
set := cid.NewSet()
err = dag.Walk(ctx, dag.GetLinksWithDAG(node.DAG), last, set.Visit)
if err != nil {
t.Fatal(err)
}
}
func testAddWPosInfo(t *testing.T, rawLeaves bool) {
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: testPeerID, // required by offline node
},
},
D: syncds.MutexWrap(datastore.NewMapDatastore()),
}
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}
bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: filepath.Join(os.TempDir(), "foo.txt"), t: t}
bserv := blockservice.New(bs, node.Exchange)
dserv := dag.NewDAGService(bserv)
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
if err != nil {
t.Fatal(err)
}
out := make(chan interface{})
adder.Out = out
adder.Progress = true
adder.RawLeaves = rawLeaves
adder.NoCopy = true
data := make([]byte, 5*1024*1024)
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
fileData := io.NopCloser(bytes.NewBuffer(data))
fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()}
file, _ := files.NewReaderPathFile(filepath.Join(os.TempDir(), "foo.txt"), fileData, &fileInfo)
go func() {
defer close(adder.Out)
_, err = adder.AddAllAndPin(context.Background(), file)
if err != nil {
t.Error(err)
}
}()
for range out {
}
exp := 0
nonOffZero := 0
if rawLeaves {
exp = 1
nonOffZero = 19
}
if bs.countAtOffsetZero != exp {
t.Fatalf("expected %d blocks with an offset at zero (one root and one leaf), got %d", exp, bs.countAtOffsetZero)
}
if bs.countAtOffsetNonZero != nonOffZero {
// note: the exact number will depend on the size and the sharding algo. used
t.Fatalf("expected %d blocks with an offset > 0, got %d", nonOffZero, bs.countAtOffsetNonZero)
}
}
func TestAddWPosInfo(t *testing.T) {
testAddWPosInfo(t, false)
}
func TestAddWPosInfoAndRawLeafs(t *testing.T) {
testAddWPosInfo(t, true)
}
type testBlockstore struct {
blockstore.GCBlockstore
expectedPath string
t *testing.T
countAtOffsetZero int
countAtOffsetNonZero int
}
func (bs *testBlockstore) Put(ctx context.Context, block blocks.Block) error {
bs.CheckForPosInfo(block)
return bs.GCBlockstore.Put(ctx, block)
}
func (bs *testBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
for _, blk := range blocks {
bs.CheckForPosInfo(blk)
}
return bs.GCBlockstore.PutMany(ctx, blocks)
}
func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) {
fsn, ok := block.(*pi.FilestoreNode)
if ok {
posInfo := fsn.PosInfo
if posInfo.FullPath != bs.expectedPath {
bs.t.Fatal("PosInfo does not have the expected path")
}
if posInfo.Offset == 0 {
bs.countAtOffsetZero += 1
} else {
bs.countAtOffsetNonZero += 1
}
}
}
type dummyFileInfo struct {
name string
size int64
modTime time.Time
}
func (fi *dummyFileInfo) Name() string { return fi.name }
func (fi *dummyFileInfo) Size() int64 { return fi.size }
func (fi *dummyFileInfo) Mode() os.FileMode { return 0 }
func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime }
func (fi *dummyFileInfo) IsDir() bool { return false }
func (fi *dummyFileInfo) Sys() interface{} { return nil }