test: verifyWorkerRun and helptext (#11063)

This commit is contained in:
Marcin Rataj 2025-11-17 18:51:33 +01:00 committed by GitHub
parent 798b889ba2
commit c7eda21d68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 1011 additions and 24 deletions

View File

@ -5,20 +5,22 @@ import (
"errors"
"fmt"
"io"
"os"
"runtime"
"strings"
"sync"
"text/tabwriter"
"time"
oldcmds "github.com/ipfs/kubo/commands"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
coreiface "github.com/ipfs/kubo/core/coreiface"
corerepo "github.com/ipfs/kubo/core/corerepo"
fsrepo "github.com/ipfs/kubo/repo/fsrepo"
"github.com/ipfs/kubo/repo/fsrepo/migrations"
humanize "github.com/dustin/go-humanize"
bstore "github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/path"
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
)
@ -226,45 +228,137 @@ Version string The repo version.
},
}
// VerifyProgress reports verification progress to the user.
// It contains either a message about a corrupt block or a progress counter.
type VerifyProgress struct {
Msg string
Progress int
Msg string // Message about a corrupt/healed block (empty for valid blocks)
Progress int // Number of blocks processed so far
}
func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- string, bs bstore.Blockstore) {
// verifyState represents the state of a block after verification.
// States track both the verification result and any remediation actions taken.
type verifyState int
const (
verifyStateValid verifyState = iota // Block is valid and uncorrupted
verifyStateCorrupt // Block is corrupt, no action taken
verifyStateCorruptRemoved // Block was corrupt and successfully removed
verifyStateCorruptRemoveFailed // Block was corrupt but removal failed
verifyStateCorruptHealed // Block was corrupt, removed, and successfully re-fetched
verifyStateCorruptHealFailed // Block was corrupt and removed, but re-fetching failed
)
const (
// verifyWorkerMultiplier determines worker pool size relative to CPU count.
// Since block verification is I/O-bound (disk reads + potential network fetches),
// we use more workers than CPU cores to maximize throughput.
verifyWorkerMultiplier = 2
)
// verifyResult contains the outcome of verifying a single block.
// It includes the block's CID, its verification state, and an optional
// human-readable message describing what happened.
type verifyResult struct {
cid cid.Cid // CID of the block that was verified
state verifyState // Final state after verification and any remediation
msg string // Human-readable message (empty for valid blocks)
}
// verifyWorkerRun processes CIDs from the keys channel, verifying their integrity.
// If shouldDrop is true, corrupt blocks are removed from the blockstore.
// If shouldHeal is true (implies shouldDrop), removed blocks are re-fetched from the network.
// The api parameter must be non-nil when shouldHeal is true.
// healTimeout specifies the maximum time to wait for each block heal (0 = no timeout).
func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- *verifyResult, bs bstore.Blockstore, api coreiface.CoreAPI, shouldDrop, shouldHeal bool, healTimeout time.Duration) {
defer wg.Done()
sendResult := func(r *verifyResult) bool {
select {
case results <- r:
return true
case <-ctx.Done():
return false
}
}
for k := range keys {
_, err := bs.Get(ctx, k)
if err != nil {
select {
case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err):
case <-ctx.Done():
return
// Block is corrupt
result := &verifyResult{cid: k, state: verifyStateCorrupt}
if !shouldDrop {
result.msg = fmt.Sprintf("block %s was corrupt (%s)", k, err)
if !sendResult(result) {
return
}
continue
}
// Try to delete
if delErr := bs.DeleteBlock(ctx, k); delErr != nil {
result.state = verifyStateCorruptRemoveFailed
result.msg = fmt.Sprintf("block %s was corrupt (%s), failed to remove (%s)", k, err, delErr)
if !sendResult(result) {
return
}
continue
}
if !shouldHeal {
result.state = verifyStateCorruptRemoved
result.msg = fmt.Sprintf("block %s was corrupt (%s), removed", k, err)
if !sendResult(result) {
return
}
continue
}
// Try to heal by re-fetching from network (api is guaranteed non-nil here)
healCtx := ctx
var healCancel context.CancelFunc
if healTimeout > 0 {
healCtx, healCancel = context.WithTimeout(ctx, healTimeout)
}
if _, healErr := api.Block().Get(healCtx, path.FromCid(k)); healErr != nil {
result.state = verifyStateCorruptHealFailed
result.msg = fmt.Sprintf("block %s was corrupt (%s), removed, failed to heal (%s)", k, err, healErr)
} else {
result.state = verifyStateCorruptHealed
result.msg = fmt.Sprintf("block %s was corrupt (%s), removed, healed", k, err)
}
if healCancel != nil {
healCancel()
}
if !sendResult(result) {
return
}
continue
}
select {
case results <- "":
case <-ctx.Done():
// Block is valid
if !sendResult(&verifyResult{cid: k, state: verifyStateValid}) {
return
}
}
}
func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore) <-chan string {
results := make(chan string)
// verifyResultChan creates a channel of verification results by spawning multiple worker goroutines
// to process blocks in parallel. It returns immediately with a channel that will receive results.
func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore, api coreiface.CoreAPI, shouldDrop, shouldHeal bool, healTimeout time.Duration) <-chan *verifyResult {
results := make(chan *verifyResult)
go func() {
defer close(results)
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU()*2; i++ {
for i := 0; i < runtime.NumCPU()*verifyWorkerMultiplier; i++ {
wg.Add(1)
go verifyWorkerRun(ctx, &wg, keys, results, bs)
go verifyWorkerRun(ctx, &wg, keys, results, bs, api, shouldDrop, shouldHeal, healTimeout)
}
wg.Wait()
@ -276,6 +370,45 @@ func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blocks
var repoVerifyCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Verify all blocks in repo are not corrupted.",
ShortDescription: `
'ipfs repo verify' checks integrity of all blocks in the local datastore.
Each block is read and validated against its CID to ensure data integrity.
Without any flags, this is a SAFE, read-only check that only reports corrupt
blocks without modifying the repository. This can be used as a "dry run" to
preview what --drop or --heal would do.
Use --drop to remove corrupt blocks, or --heal to remove and re-fetch from
the network.
Examples:
ipfs repo verify # safe read-only check, reports corrupt blocks
ipfs repo verify --drop # remove corrupt blocks
ipfs repo verify --heal # remove and re-fetch corrupt blocks
Exit Codes:
0: All blocks are valid, OR all corrupt blocks were successfully remediated
(with --drop or --heal)
1: Corrupt blocks detected (without flags), OR remediation failed (block
removal or healing failed with --drop or --heal)
Note: --heal requires the daemon to be running in online mode with network
connectivity to nodes that have the missing blocks. Make sure the daemon is
online and connected to other peers. Healing will attempt to re-fetch each
corrupt block from the network after removing it. If a block cannot be found
on the network, it will remain deleted.
WARNING: Both --drop and --heal are DESTRUCTIVE operations that permanently
delete corrupt blocks from your repository. Once deleted, blocks cannot be
recovered unless --heal successfully fetches them from the network. Blocks
that cannot be healed will remain permanently deleted. Always backup your
repository before using these options.
`,
},
Options: []cmds.Option{
cmds.BoolOption("drop", "Remove corrupt blocks from datastore (destructive operation)."),
cmds.BoolOption("heal", "Remove corrupt blocks and re-fetch from network (destructive operation, implies --drop)."),
cmds.StringOption("heal-timeout", "Maximum time to wait for each block heal (e.g., \"30s\"). Only applies with --heal.").WithDefault("30s"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
@ -283,6 +416,38 @@ var repoVerifyCmd = &cmds.Command{
return err
}
drop, _ := req.Options["drop"].(bool)
heal, _ := req.Options["heal"].(bool)
if heal {
drop = true // heal implies drop
}
// Parse and validate heal-timeout
timeoutStr, _ := req.Options["heal-timeout"].(string)
healTimeout, err := time.ParseDuration(timeoutStr)
if err != nil {
return fmt.Errorf("invalid heal-timeout: %w", err)
}
if healTimeout < 0 {
return errors.New("heal-timeout must be >= 0")
}
// Check online mode and API availability for healing operation
var api coreiface.CoreAPI
if heal {
if !nd.IsOnline {
return ErrNotOnline
}
api, err = cmdenv.GetApi(env, req)
if err != nil {
return err
}
if api == nil {
return fmt.Errorf("healing requested but API is not available - make sure daemon is online and connected to other peers")
}
}
bs := &bstore.ValidatingBlockstore{Blockstore: bstore.NewBlockstore(nd.Repo.Datastore())}
keys, err := bs.AllKeysChan(req.Context)
@ -291,17 +456,47 @@ var repoVerifyCmd = &cmds.Command{
return err
}
results := verifyResultChan(req.Context, keys, bs)
results := verifyResultChan(req.Context, keys, bs, api, drop, heal, healTimeout)
var fails int
// Track statistics for each type of outcome
var corrupted, removed, removeFailed, healed, healFailed int
var i int
for msg := range results {
if msg != "" {
if err := res.Emit(&VerifyProgress{Msg: msg}); err != nil {
for result := range results {
// Update counters based on the block's final state
switch result.state {
case verifyStateCorrupt:
// Block is corrupt but no action was taken (--drop not specified)
corrupted++
case verifyStateCorruptRemoved:
// Block was corrupt and successfully removed (--drop specified)
corrupted++
removed++
case verifyStateCorruptRemoveFailed:
// Block was corrupt but couldn't be removed
corrupted++
removeFailed++
case verifyStateCorruptHealed:
// Block was corrupt, removed, and successfully re-fetched (--heal specified)
corrupted++
removed++
healed++
case verifyStateCorruptHealFailed:
// Block was corrupt and removed, but re-fetching failed
corrupted++
removed++
healFailed++
default:
// verifyStateValid blocks are not counted (they're the expected case)
}
// Emit progress message for corrupt blocks
if result.state != verifyStateValid && result.msg != "" {
if err := res.Emit(&VerifyProgress{Msg: result.msg}); err != nil {
return err
}
fails++
}
i++
if err := res.Emit(&VerifyProgress{Progress: i}); err != nil {
return err
@ -312,8 +507,42 @@ var repoVerifyCmd = &cmds.Command{
return err
}
if fails != 0 {
return errors.New("verify complete, some blocks were corrupt")
if corrupted > 0 {
// Build a summary of what happened with corrupt blocks
summary := fmt.Sprintf("verify complete, %d blocks corrupt", corrupted)
if removed > 0 {
summary += fmt.Sprintf(", %d removed", removed)
}
if removeFailed > 0 {
summary += fmt.Sprintf(", %d failed to remove", removeFailed)
}
if healed > 0 {
summary += fmt.Sprintf(", %d healed", healed)
}
if healFailed > 0 {
summary += fmt.Sprintf(", %d failed to heal", healFailed)
}
// Determine success/failure based on operation mode
shouldFail := false
if !drop {
// Detection-only mode: always fail if corruption found
shouldFail = true
} else if heal {
// Heal mode: fail if any removal or heal failed
shouldFail = (removeFailed > 0 || healFailed > 0)
} else {
// Drop mode: fail if any removal failed
shouldFail = (removeFailed > 0)
}
if shouldFail {
return errors.New(summary)
}
// Success: emit summary as a message instead of error
return res.Emit(&VerifyProgress{Msg: summary})
}
return res.Emit(&VerifyProgress{Msg: "verify complete, all blocks validated."})
@ -322,7 +551,7 @@ var repoVerifyCmd = &cmds.Command{
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, obj *VerifyProgress) error {
if strings.Contains(obj.Msg, "was corrupt") {
fmt.Fprintln(os.Stdout, obj.Msg)
fmt.Fprintln(w, obj.Msg)
return nil
}

View File

@ -0,0 +1,371 @@
//go:build go1.25
package commands
// This file contains unit tests for the --heal-timeout flag functionality
// using testing/synctest to avoid waiting for real timeouts.
//
// End-to-end tests for the full 'ipfs repo verify' command (including --drop
// and --heal flags) are located in test/cli/repo_verify_test.go.
import (
"bytes"
"context"
"errors"
"io"
"sync"
"testing"
"testing/synctest"
"time"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
coreiface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/core/coreiface/options"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ipfs/boxo/path"
)
func TestVerifyWorkerHealTimeout(t *testing.T) {
t.Run("heal succeeds before timeout", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const healTimeout = 5 * time.Second
testCID := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
// Setup channels
keys := make(chan cid.Cid, 1)
keys <- testCID
close(keys)
results := make(chan *verifyResult, 1)
// Mock blockstore that returns error (simulating corruption)
mockBS := &mockBlockstore{
getError: errors.New("corrupt block"),
}
// Mock API where Block().Get() completes before timeout
mockAPI := &mockCoreAPI{
blockAPI: &mockBlockAPI{
getDelay: 2 * time.Second, // Less than healTimeout
data: []byte("healed data"),
},
}
var wg sync.WaitGroup
wg.Add(1)
// Run worker
go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout)
// Advance time past the mock delay but before timeout
time.Sleep(3 * time.Second)
synctest.Wait()
wg.Wait()
close(results)
// Verify heal succeeded
result := <-results
require.NotNil(t, result)
assert.Equal(t, verifyStateCorruptHealed, result.state)
assert.Contains(t, result.msg, "healed")
})
})
t.Run("heal fails due to timeout", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const healTimeout = 2 * time.Second
testCID := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
// Setup channels
keys := make(chan cid.Cid, 1)
keys <- testCID
close(keys)
results := make(chan *verifyResult, 1)
// Mock blockstore that returns error (simulating corruption)
mockBS := &mockBlockstore{
getError: errors.New("corrupt block"),
}
// Mock API where Block().Get() takes longer than healTimeout
mockAPI := &mockCoreAPI{
blockAPI: &mockBlockAPI{
getDelay: 5 * time.Second, // More than healTimeout
data: []byte("healed data"),
},
}
var wg sync.WaitGroup
wg.Add(1)
// Run worker
go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout)
// Advance time past timeout
time.Sleep(3 * time.Second)
synctest.Wait()
wg.Wait()
close(results)
// Verify heal failed due to timeout
result := <-results
require.NotNil(t, result)
assert.Equal(t, verifyStateCorruptHealFailed, result.state)
assert.Contains(t, result.msg, "failed to heal")
assert.Contains(t, result.msg, "context deadline exceeded")
})
})
t.Run("heal with zero timeout still attempts heal", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const healTimeout = 0 // Zero timeout means no timeout
testCID := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
// Setup channels
keys := make(chan cid.Cid, 1)
keys <- testCID
close(keys)
results := make(chan *verifyResult, 1)
// Mock blockstore that returns error (simulating corruption)
mockBS := &mockBlockstore{
getError: errors.New("corrupt block"),
}
// Mock API that succeeds quickly
mockAPI := &mockCoreAPI{
blockAPI: &mockBlockAPI{
getDelay: 100 * time.Millisecond,
data: []byte("healed data"),
},
}
var wg sync.WaitGroup
wg.Add(1)
// Run worker
go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout)
// Advance time to let heal complete
time.Sleep(200 * time.Millisecond)
synctest.Wait()
wg.Wait()
close(results)
// Verify heal succeeded even with zero timeout
result := <-results
require.NotNil(t, result)
assert.Equal(t, verifyStateCorruptHealed, result.state)
assert.Contains(t, result.msg, "healed")
})
})
t.Run("multiple blocks with different timeout outcomes", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const healTimeout = 3 * time.Second
testCID1 := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
testCID2 := cid.MustParse("bafybeihvvulpp4evxj7x7armbqcyg6uezzuig6jp3lktpbovlqfkjtgyby")
// Setup channels
keys := make(chan cid.Cid, 2)
keys <- testCID1
keys <- testCID2
close(keys)
results := make(chan *verifyResult, 2)
// Mock blockstore that always returns error (all blocks corrupt)
mockBS := &mockBlockstore{
getError: errors.New("corrupt block"),
}
// Create two mock block APIs with different delays
// We'll need to alternate which one gets used
// For simplicity, use one that succeeds fast
mockAPI := &mockCoreAPI{
blockAPI: &mockBlockAPI{
getDelay: 1 * time.Second, // Less than healTimeout - will succeed
data: []byte("healed data"),
},
}
var wg sync.WaitGroup
wg.Add(2) // Two workers
// Run two workers
go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout)
go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout)
// Advance time to let both complete
time.Sleep(2 * time.Second)
synctest.Wait()
wg.Wait()
close(results)
// Collect results
var healedCount int
for result := range results {
if result.state == verifyStateCorruptHealed {
healedCount++
}
}
// Both should heal successfully (both under timeout)
assert.Equal(t, 2, healedCount)
})
})
t.Run("valid block is not healed", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const healTimeout = 5 * time.Second
testCID := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")
// Setup channels
keys := make(chan cid.Cid, 1)
keys <- testCID
close(keys)
results := make(chan *verifyResult, 1)
// Mock blockstore that returns valid block (no error)
mockBS := &mockBlockstore{
block: blocks.NewBlock([]byte("valid data")),
}
// Mock API (won't be called since block is valid)
mockAPI := &mockCoreAPI{
blockAPI: &mockBlockAPI{},
}
var wg sync.WaitGroup
wg.Add(1)
// Run worker with heal enabled
go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, false, true, healTimeout)
synctest.Wait()
wg.Wait()
close(results)
// Verify block is marked valid, not healed
result := <-results
require.NotNil(t, result)
assert.Equal(t, verifyStateValid, result.state)
assert.Empty(t, result.msg)
})
})
}
// mockBlockstore implements a minimal blockstore for testing
type mockBlockstore struct {
getError error
block blocks.Block
}
func (m *mockBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
if m.getError != nil {
return nil, m.getError
}
return m.block, nil
}
func (m *mockBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
return nil
}
func (m *mockBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
return m.block != nil, nil
}
func (m *mockBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
if m.block != nil {
return len(m.block.RawData()), nil
}
return 0, errors.New("block not found")
}
func (m *mockBlockstore) Put(ctx context.Context, b blocks.Block) error {
return nil
}
func (m *mockBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
return nil
}
func (m *mockBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
}
func (m *mockBlockstore) HashOnRead(enabled bool) {
}
// mockBlockAPI implements BlockAPI for testing
type mockBlockAPI struct {
getDelay time.Duration
getError error
data []byte
}
func (m *mockBlockAPI) Get(ctx context.Context, p path.Path) (io.Reader, error) {
if m.getDelay > 0 {
select {
case <-time.After(m.getDelay):
// Delay completed
case <-ctx.Done():
return nil, ctx.Err()
}
}
if m.getError != nil {
return nil, m.getError
}
return bytes.NewReader(m.data), nil
}
func (m *mockBlockAPI) Put(ctx context.Context, r io.Reader, opts ...options.BlockPutOption) (coreiface.BlockStat, error) {
return nil, errors.New("not implemented")
}
func (m *mockBlockAPI) Rm(ctx context.Context, p path.Path, opts ...options.BlockRmOption) error {
return errors.New("not implemented")
}
func (m *mockBlockAPI) Stat(ctx context.Context, p path.Path) (coreiface.BlockStat, error) {
return nil, errors.New("not implemented")
}
// mockCoreAPI implements minimal CoreAPI for testing
type mockCoreAPI struct {
blockAPI *mockBlockAPI
}
func (m *mockCoreAPI) Block() coreiface.BlockAPI {
return m.blockAPI
}
func (m *mockCoreAPI) Unixfs() coreiface.UnixfsAPI { return nil }
func (m *mockCoreAPI) Dag() coreiface.APIDagService { return nil }
func (m *mockCoreAPI) Name() coreiface.NameAPI { return nil }
func (m *mockCoreAPI) Key() coreiface.KeyAPI { return nil }
func (m *mockCoreAPI) Pin() coreiface.PinAPI { return nil }
func (m *mockCoreAPI) Object() coreiface.ObjectAPI { return nil }
func (m *mockCoreAPI) Swarm() coreiface.SwarmAPI { return nil }
func (m *mockCoreAPI) PubSub() coreiface.PubSubAPI { return nil }
func (m *mockCoreAPI) Routing() coreiface.RoutingAPI { return nil }
func (m *mockCoreAPI) ResolvePath(ctx context.Context, p path.Path) (path.ImmutablePath, []string, error) {
return path.ImmutablePath{}, nil, errors.New("not implemented")
}
func (m *mockCoreAPI) ResolveNode(ctx context.Context, p path.Path) (ipld.Node, error) {
return nil, errors.New("not implemented")
}
func (m *mockCoreAPI) WithOptions(...options.ApiOption) (coreiface.CoreAPI, error) {
return nil, errors.New("not implemented")
}

View File

@ -0,0 +1,384 @@
package cli
import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Well-known block file names in flatfs blockstore that should not be corrupted during testing.
// Flatfs stores each block as a separate .data file on disk.
const (
// emptyFileFlatfsFilename is the flatfs filename for an empty UnixFS file block
emptyFileFlatfsFilename = "CIQL7TG2PB52XIZLLHDYIUFMHUQLMMZWBNBZSLDXFCPZ5VDNQQ2WDZQ"
// emptyDirFlatfsFilename is the flatfs filename for an empty UnixFS directory block.
// This block has special handling and may be served from memory even when corrupted on disk.
emptyDirFlatfsFilename = "CIQFTFEEHEDF6KLBT32BFAGLXEZL4UWFNWM4LFTLMXQBCERZ6CMLX3Y"
)
// getEligibleFlatfsBlockFiles returns flatfs block files (*.data) that are safe to corrupt in tests.
// Filters out well-known blocks (empty file/dir) that cause test flakiness.
//
// Note: This helper is specific to the flatfs blockstore implementation where each block
// is stored as a separate file on disk under blocks/*/*.data.
func getEligibleFlatfsBlockFiles(t *testing.T, node *harness.Node) []string {
blockFiles, err := filepath.Glob(filepath.Join(node.Dir, "blocks", "*", "*.data"))
require.NoError(t, err)
require.NotEmpty(t, blockFiles, "no flatfs block files found")
var eligible []string
for _, f := range blockFiles {
name := filepath.Base(f)
if !strings.Contains(name, emptyFileFlatfsFilename) &&
!strings.Contains(name, emptyDirFlatfsFilename) {
eligible = append(eligible, f)
}
}
return eligible
}
// corruptRandomBlock corrupts a random block file in the flatfs blockstore.
// Returns the path to the corrupted file.
func corruptRandomBlock(t *testing.T, node *harness.Node) string {
eligible := getEligibleFlatfsBlockFiles(t, node)
require.NotEmpty(t, eligible, "no eligible blocks to corrupt")
toCorrupt := eligible[0]
err := os.WriteFile(toCorrupt, []byte("corrupted data"), 0644)
require.NoError(t, err)
return toCorrupt
}
// corruptMultipleBlocks corrupts multiple block files in the flatfs blockstore.
// Returns the paths to the corrupted files.
func corruptMultipleBlocks(t *testing.T, node *harness.Node, count int) []string {
eligible := getEligibleFlatfsBlockFiles(t, node)
require.GreaterOrEqual(t, len(eligible), count, "not enough eligible blocks to corrupt")
var corrupted []string
for i := 0; i < count && i < len(eligible); i++ {
err := os.WriteFile(eligible[i], []byte(fmt.Sprintf("corrupted data %d", i)), 0644)
require.NoError(t, err)
corrupted = append(corrupted, eligible[i])
}
return corrupted
}
func TestRepoVerify(t *testing.T) {
t.Run("healthy repo passes", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("add", "-q", "--raw-leaves=false", "-r", node.IPFSBin)
res := node.IPFS("repo", "verify")
assert.Contains(t, res.Stdout.String(), "all blocks validated")
})
t.Run("detects corruption", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFSAddStr("test content")
corruptRandomBlock(t, node)
res := node.RunIPFS("repo", "verify")
assert.Equal(t, 1, res.ExitCode())
assert.Contains(t, res.Stdout.String(), "was corrupt")
assert.Contains(t, res.Stderr.String(), "1 blocks corrupt")
})
t.Run("drop removes corrupt blocks", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
cid := node.IPFSAddStr("test content")
corruptRandomBlock(t, node)
res := node.RunIPFS("repo", "verify", "--drop")
assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks removed successfully")
output := res.Stdout.String()
assert.Contains(t, output, "1 blocks corrupt")
assert.Contains(t, output, "1 removed")
// Verify block is gone
res = node.RunIPFS("block", "stat", cid)
assert.NotEqual(t, 0, res.ExitCode())
})
t.Run("heal requires online mode", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFSAddStr("test content")
corruptRandomBlock(t, node)
res := node.RunIPFS("repo", "verify", "--heal")
assert.NotEqual(t, 0, res.ExitCode())
assert.Contains(t, res.Stderr.String(), "online mode")
})
t.Run("heal repairs from network", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
// Add content to node 0
cid := nodes[0].IPFSAddStr("test content for healing")
// Wait for it to appear on node 1
nodes[1].IPFS("block", "get", cid)
// Corrupt on node 1
corruptRandomBlock(t, nodes[1])
// Heal should restore from node 0
res := nodes[1].RunIPFS("repo", "verify", "--heal")
assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks healed successfully")
output := res.Stdout.String()
// Should report corruption and healing with specific counts
assert.Contains(t, output, "1 blocks corrupt")
assert.Contains(t, output, "1 removed")
assert.Contains(t, output, "1 healed")
// Verify block is restored
nodes[1].IPFS("block", "stat", cid)
})
t.Run("healed blocks contain correct data", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
// Add specific content to node 0
testContent := "this is the exact content that should be healed correctly"
cid := nodes[0].IPFSAddStr(testContent)
// Fetch to node 1 and verify the content is correct initially
nodes[1].IPFS("block", "get", cid)
res := nodes[1].IPFS("cat", cid)
assert.Equal(t, testContent, res.Stdout.String())
// Corrupt on node 1
corruptRandomBlock(t, nodes[1])
// Heal the corruption
res = nodes[1].RunIPFS("repo", "verify", "--heal")
assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks healed successfully")
output := res.Stdout.String()
assert.Contains(t, output, "1 blocks corrupt")
assert.Contains(t, output, "1 removed")
assert.Contains(t, output, "1 healed")
// Verify the healed content matches the original exactly
res = nodes[1].IPFS("cat", cid)
assert.Equal(t, testContent, res.Stdout.String(), "healed content should match original")
// Also verify via block get that the raw block data is correct
block0 := nodes[0].IPFS("block", "get", cid)
block1 := nodes[1].IPFS("block", "get", cid)
assert.Equal(t, block0.Stdout.String(), block1.Stdout.String(), "raw block data should match")
})
t.Run("multiple corrupt blocks", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Create 20 blocks
for i := 0; i < 20; i++ {
node.IPFSAddStr(strings.Repeat("test content ", i+1))
}
// Corrupt 5 blocks
corruptMultipleBlocks(t, node, 5)
// Verify detects all corruptions
res := node.RunIPFS("repo", "verify")
assert.Equal(t, 1, res.ExitCode())
// Error summary is in stderr
assert.Contains(t, res.Stderr.String(), "5 blocks corrupt")
// Test with --drop
res = node.RunIPFS("repo", "verify", "--drop")
assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks removed successfully")
assert.Contains(t, res.Stdout.String(), "5 blocks corrupt")
assert.Contains(t, res.Stdout.String(), "5 removed")
})
t.Run("empty repository", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Verify empty repo passes
res := node.IPFS("repo", "verify")
assert.Equal(t, 0, res.ExitCode())
assert.Contains(t, res.Stdout.String(), "all blocks validated")
// Should work with --drop and --heal too
res = node.IPFS("repo", "verify", "--drop")
assert.Equal(t, 0, res.ExitCode())
assert.Contains(t, res.Stdout.String(), "all blocks validated")
})
t.Run("partial heal success", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
// Start both nodes and connect them
nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
// Add 5 blocks to node 0, pin them to keep available
cid1 := nodes[0].IPFSAddStr("content available for healing 1")
cid2 := nodes[0].IPFSAddStr("content available for healing 2")
cid3 := nodes[0].IPFSAddStr("content available for healing 3")
cid4 := nodes[0].IPFSAddStr("content available for healing 4")
cid5 := nodes[0].IPFSAddStr("content available for healing 5")
// Pin these on node 0 to ensure they stay available
nodes[0].IPFS("pin", "add", cid1)
nodes[0].IPFS("pin", "add", cid2)
nodes[0].IPFS("pin", "add", cid3)
nodes[0].IPFS("pin", "add", cid4)
nodes[0].IPFS("pin", "add", cid5)
// Node 1 fetches these blocks
nodes[1].IPFS("block", "get", cid1)
nodes[1].IPFS("block", "get", cid2)
nodes[1].IPFS("block", "get", cid3)
nodes[1].IPFS("block", "get", cid4)
nodes[1].IPFS("block", "get", cid5)
// Now remove some blocks from node 0 to simulate partial availability
nodes[0].IPFS("pin", "rm", cid3)
nodes[0].IPFS("pin", "rm", cid4)
nodes[0].IPFS("pin", "rm", cid5)
nodes[0].IPFS("repo", "gc")
// Verify node 1 is still connected
peers := nodes[1].IPFS("swarm", "peers")
require.Contains(t, peers.Stdout.String(), nodes[0].PeerID().String())
// Corrupt 5 blocks on node 1
corruptMultipleBlocks(t, nodes[1], 5)
// Heal should partially succeed (only cid1 and cid2 available from node 0)
res := nodes[1].RunIPFS("repo", "verify", "--heal")
assert.Equal(t, 1, res.ExitCode())
// Should show mixed results with specific counts in stderr
errOutput := res.Stderr.String()
assert.Contains(t, errOutput, "5 blocks corrupt")
assert.Contains(t, errOutput, "5 removed")
// Only cid1 and cid2 are available for healing, cid3-5 were GC'd
assert.Contains(t, errOutput, "2 healed")
assert.Contains(t, errOutput, "3 failed to heal")
})
t.Run("heal with block not available on network", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
// Start both nodes and connect
nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
// Add unique content only to node 1
nodes[1].IPFSAddStr("unique content that exists nowhere else")
// Ensure nodes are connected
peers := nodes[1].IPFS("swarm", "peers")
require.Contains(t, peers.Stdout.String(), nodes[0].PeerID().String())
// Corrupt the block on node 1
corruptRandomBlock(t, nodes[1])
// Heal should fail - node 0 doesn't have this content
res := nodes[1].RunIPFS("repo", "verify", "--heal")
assert.Equal(t, 1, res.ExitCode())
// Should report heal failure with specific counts in stderr
errOutput := res.Stderr.String()
assert.Contains(t, errOutput, "1 blocks corrupt")
assert.Contains(t, errOutput, "1 removed")
assert.Contains(t, errOutput, "1 failed to heal")
})
t.Run("large repository scale test", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Create 1000 small blocks
for i := 0; i < 1000; i++ {
node.IPFSAddStr(fmt.Sprintf("content-%d", i))
}
// Corrupt 10 blocks
corruptMultipleBlocks(t, node, 10)
// Verify handles large repos efficiently
res := node.RunIPFS("repo", "verify")
assert.Equal(t, 1, res.ExitCode())
// Should report exactly 10 corrupt blocks in stderr
assert.Contains(t, res.Stderr.String(), "10 blocks corrupt")
// Test --drop at scale
res = node.RunIPFS("repo", "verify", "--drop")
assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks removed successfully")
output := res.Stdout.String()
assert.Contains(t, output, "10 blocks corrupt")
assert.Contains(t, output, "10 removed")
})
t.Run("drop with partial removal failures", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Create several blocks
for i := 0; i < 5; i++ {
node.IPFSAddStr(fmt.Sprintf("content for removal test %d", i))
}
// Corrupt 3 blocks
corruptedFiles := corruptMultipleBlocks(t, node, 3)
require.Len(t, corruptedFiles, 3)
// Make one of the corrupted files read-only to simulate removal failure
err := os.Chmod(corruptedFiles[0], 0400) // read-only
require.NoError(t, err)
defer func() { _ = os.Chmod(corruptedFiles[0], 0644) }() // cleanup
// Also make the directory read-only to prevent deletion
blockDir := filepath.Dir(corruptedFiles[0])
originalPerm, err := os.Stat(blockDir)
require.NoError(t, err)
err = os.Chmod(blockDir, 0500) // read+execute only, no write
require.NoError(t, err)
defer func() { _ = os.Chmod(blockDir, originalPerm.Mode()) }() // cleanup
// Try to drop - should fail because at least one block can't be removed
res := node.RunIPFS("repo", "verify", "--drop")
assert.Equal(t, 1, res.ExitCode(), "should exit 1 when some blocks fail to remove")
// Restore permissions for verification
_ = os.Chmod(blockDir, originalPerm.Mode())
_ = os.Chmod(corruptedFiles[0], 0644)
// Should report both successes and failures with specific counts
errOutput := res.Stderr.String()
assert.Contains(t, errOutput, "3 blocks corrupt")
assert.Contains(t, errOutput, "2 removed")
assert.Contains(t, errOutput, "1 failed to remove")
})
}

View File

@ -3,6 +3,9 @@
# Copyright (c) 2016 Jeromy Johnson
# MIT Licensed; see the LICENSE file in this repository.
#
# NOTE: This is a legacy sharness test kept for compatibility.
# New tests for 'ipfs repo verify' should be added to test/cli/repo_verify_test.go
#
test_description="Test ipfs repo fsck"