diff --git a/core/commands/repo.go b/core/commands/repo.go index 622e92d7e..14956ec7c 100644 --- a/core/commands/repo.go +++ b/core/commands/repo.go @@ -5,20 +5,22 @@ import ( "errors" "fmt" "io" - "os" "runtime" "strings" "sync" "text/tabwriter" + "time" oldcmds "github.com/ipfs/kubo/commands" cmdenv "github.com/ipfs/kubo/core/commands/cmdenv" + coreiface "github.com/ipfs/kubo/core/coreiface" corerepo "github.com/ipfs/kubo/core/corerepo" fsrepo "github.com/ipfs/kubo/repo/fsrepo" "github.com/ipfs/kubo/repo/fsrepo/migrations" humanize "github.com/dustin/go-humanize" bstore "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/path" cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" ) @@ -226,45 +228,137 @@ Version string The repo version. }, } +// VerifyProgress reports verification progress to the user. +// It contains either a message about a corrupt block or a progress counter. type VerifyProgress struct { - Msg string - Progress int + Msg string // Message about a corrupt/healed block (empty for valid blocks) + Progress int // Number of blocks processed so far } -func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- string, bs bstore.Blockstore) { +// verifyState represents the state of a block after verification. +// States track both the verification result and any remediation actions taken. +type verifyState int + +const ( + verifyStateValid verifyState = iota // Block is valid and uncorrupted + verifyStateCorrupt // Block is corrupt, no action taken + verifyStateCorruptRemoved // Block was corrupt and successfully removed + verifyStateCorruptRemoveFailed // Block was corrupt but removal failed + verifyStateCorruptHealed // Block was corrupt, removed, and successfully re-fetched + verifyStateCorruptHealFailed // Block was corrupt and removed, but re-fetching failed +) + +const ( + // verifyWorkerMultiplier determines worker pool size relative to CPU count. + // Since block verification is I/O-bound (disk reads + potential network fetches), + // we use more workers than CPU cores to maximize throughput. + verifyWorkerMultiplier = 2 +) + +// verifyResult contains the outcome of verifying a single block. +// It includes the block's CID, its verification state, and an optional +// human-readable message describing what happened. +type verifyResult struct { + cid cid.Cid // CID of the block that was verified + state verifyState // Final state after verification and any remediation + msg string // Human-readable message (empty for valid blocks) +} + +// verifyWorkerRun processes CIDs from the keys channel, verifying their integrity. +// If shouldDrop is true, corrupt blocks are removed from the blockstore. +// If shouldHeal is true (implies shouldDrop), removed blocks are re-fetched from the network. +// The api parameter must be non-nil when shouldHeal is true. +// healTimeout specifies the maximum time to wait for each block heal (0 = no timeout). +func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- *verifyResult, bs bstore.Blockstore, api coreiface.CoreAPI, shouldDrop, shouldHeal bool, healTimeout time.Duration) { defer wg.Done() + sendResult := func(r *verifyResult) bool { + select { + case results <- r: + return true + case <-ctx.Done(): + return false + } + } + for k := range keys { _, err := bs.Get(ctx, k) if err != nil { - select { - case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err): - case <-ctx.Done(): - return + // Block is corrupt + result := &verifyResult{cid: k, state: verifyStateCorrupt} + + if !shouldDrop { + result.msg = fmt.Sprintf("block %s was corrupt (%s)", k, err) + if !sendResult(result) { + return + } + continue } + // Try to delete + if delErr := bs.DeleteBlock(ctx, k); delErr != nil { + result.state = verifyStateCorruptRemoveFailed + result.msg = fmt.Sprintf("block %s was corrupt (%s), failed to remove (%s)", k, err, delErr) + if !sendResult(result) { + return + } + continue + } + + if !shouldHeal { + result.state = verifyStateCorruptRemoved + result.msg = fmt.Sprintf("block %s was corrupt (%s), removed", k, err) + if !sendResult(result) { + return + } + continue + } + + // Try to heal by re-fetching from network (api is guaranteed non-nil here) + healCtx := ctx + var healCancel context.CancelFunc + if healTimeout > 0 { + healCtx, healCancel = context.WithTimeout(ctx, healTimeout) + } + + if _, healErr := api.Block().Get(healCtx, path.FromCid(k)); healErr != nil { + result.state = verifyStateCorruptHealFailed + result.msg = fmt.Sprintf("block %s was corrupt (%s), removed, failed to heal (%s)", k, err, healErr) + } else { + result.state = verifyStateCorruptHealed + result.msg = fmt.Sprintf("block %s was corrupt (%s), removed, healed", k, err) + } + + if healCancel != nil { + healCancel() + } + + if !sendResult(result) { + return + } continue } - select { - case results <- "": - case <-ctx.Done(): + // Block is valid + if !sendResult(&verifyResult{cid: k, state: verifyStateValid}) { return } } } -func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore) <-chan string { - results := make(chan string) +// verifyResultChan creates a channel of verification results by spawning multiple worker goroutines +// to process blocks in parallel. It returns immediately with a channel that will receive results. +func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore, api coreiface.CoreAPI, shouldDrop, shouldHeal bool, healTimeout time.Duration) <-chan *verifyResult { + results := make(chan *verifyResult) go func() { defer close(results) var wg sync.WaitGroup - for i := 0; i < runtime.NumCPU()*2; i++ { + for i := 0; i < runtime.NumCPU()*verifyWorkerMultiplier; i++ { wg.Add(1) - go verifyWorkerRun(ctx, &wg, keys, results, bs) + go verifyWorkerRun(ctx, &wg, keys, results, bs, api, shouldDrop, shouldHeal, healTimeout) } wg.Wait() @@ -276,6 +370,45 @@ func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blocks var repoVerifyCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Verify all blocks in repo are not corrupted.", + ShortDescription: ` +'ipfs repo verify' checks integrity of all blocks in the local datastore. +Each block is read and validated against its CID to ensure data integrity. + +Without any flags, this is a SAFE, read-only check that only reports corrupt +blocks without modifying the repository. This can be used as a "dry run" to +preview what --drop or --heal would do. + +Use --drop to remove corrupt blocks, or --heal to remove and re-fetch from +the network. + +Examples: + ipfs repo verify # safe read-only check, reports corrupt blocks + ipfs repo verify --drop # remove corrupt blocks + ipfs repo verify --heal # remove and re-fetch corrupt blocks + +Exit Codes: + 0: All blocks are valid, OR all corrupt blocks were successfully remediated + (with --drop or --heal) + 1: Corrupt blocks detected (without flags), OR remediation failed (block + removal or healing failed with --drop or --heal) + +Note: --heal requires the daemon to be running in online mode with network +connectivity to nodes that have the missing blocks. Make sure the daemon is +online and connected to other peers. Healing will attempt to re-fetch each +corrupt block from the network after removing it. If a block cannot be found +on the network, it will remain deleted. + +WARNING: Both --drop and --heal are DESTRUCTIVE operations that permanently +delete corrupt blocks from your repository. Once deleted, blocks cannot be +recovered unless --heal successfully fetches them from the network. Blocks +that cannot be healed will remain permanently deleted. Always backup your +repository before using these options. +`, + }, + Options: []cmds.Option{ + cmds.BoolOption("drop", "Remove corrupt blocks from datastore (destructive operation)."), + cmds.BoolOption("heal", "Remove corrupt blocks and re-fetch from network (destructive operation, implies --drop)."), + cmds.StringOption("heal-timeout", "Maximum time to wait for each block heal (e.g., \"30s\"). Only applies with --heal.").WithDefault("30s"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { nd, err := cmdenv.GetNode(env) @@ -283,6 +416,38 @@ var repoVerifyCmd = &cmds.Command{ return err } + drop, _ := req.Options["drop"].(bool) + heal, _ := req.Options["heal"].(bool) + + if heal { + drop = true // heal implies drop + } + + // Parse and validate heal-timeout + timeoutStr, _ := req.Options["heal-timeout"].(string) + healTimeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return fmt.Errorf("invalid heal-timeout: %w", err) + } + if healTimeout < 0 { + return errors.New("heal-timeout must be >= 0") + } + + // Check online mode and API availability for healing operation + var api coreiface.CoreAPI + if heal { + if !nd.IsOnline { + return ErrNotOnline + } + api, err = cmdenv.GetApi(env, req) + if err != nil { + return err + } + if api == nil { + return fmt.Errorf("healing requested but API is not available - make sure daemon is online and connected to other peers") + } + } + bs := &bstore.ValidatingBlockstore{Blockstore: bstore.NewBlockstore(nd.Repo.Datastore())} keys, err := bs.AllKeysChan(req.Context) @@ -291,17 +456,47 @@ var repoVerifyCmd = &cmds.Command{ return err } - results := verifyResultChan(req.Context, keys, bs) + results := verifyResultChan(req.Context, keys, bs, api, drop, heal, healTimeout) - var fails int + // Track statistics for each type of outcome + var corrupted, removed, removeFailed, healed, healFailed int var i int - for msg := range results { - if msg != "" { - if err := res.Emit(&VerifyProgress{Msg: msg}); err != nil { + + for result := range results { + // Update counters based on the block's final state + switch result.state { + case verifyStateCorrupt: + // Block is corrupt but no action was taken (--drop not specified) + corrupted++ + case verifyStateCorruptRemoved: + // Block was corrupt and successfully removed (--drop specified) + corrupted++ + removed++ + case verifyStateCorruptRemoveFailed: + // Block was corrupt but couldn't be removed + corrupted++ + removeFailed++ + case verifyStateCorruptHealed: + // Block was corrupt, removed, and successfully re-fetched (--heal specified) + corrupted++ + removed++ + healed++ + case verifyStateCorruptHealFailed: + // Block was corrupt and removed, but re-fetching failed + corrupted++ + removed++ + healFailed++ + default: + // verifyStateValid blocks are not counted (they're the expected case) + } + + // Emit progress message for corrupt blocks + if result.state != verifyStateValid && result.msg != "" { + if err := res.Emit(&VerifyProgress{Msg: result.msg}); err != nil { return err } - fails++ } + i++ if err := res.Emit(&VerifyProgress{Progress: i}); err != nil { return err @@ -312,8 +507,42 @@ var repoVerifyCmd = &cmds.Command{ return err } - if fails != 0 { - return errors.New("verify complete, some blocks were corrupt") + if corrupted > 0 { + // Build a summary of what happened with corrupt blocks + summary := fmt.Sprintf("verify complete, %d blocks corrupt", corrupted) + if removed > 0 { + summary += fmt.Sprintf(", %d removed", removed) + } + if removeFailed > 0 { + summary += fmt.Sprintf(", %d failed to remove", removeFailed) + } + if healed > 0 { + summary += fmt.Sprintf(", %d healed", healed) + } + if healFailed > 0 { + summary += fmt.Sprintf(", %d failed to heal", healFailed) + } + + // Determine success/failure based on operation mode + shouldFail := false + + if !drop { + // Detection-only mode: always fail if corruption found + shouldFail = true + } else if heal { + // Heal mode: fail if any removal or heal failed + shouldFail = (removeFailed > 0 || healFailed > 0) + } else { + // Drop mode: fail if any removal failed + shouldFail = (removeFailed > 0) + } + + if shouldFail { + return errors.New(summary) + } + + // Success: emit summary as a message instead of error + return res.Emit(&VerifyProgress{Msg: summary}) } return res.Emit(&VerifyProgress{Msg: "verify complete, all blocks validated."}) @@ -322,7 +551,7 @@ var repoVerifyCmd = &cmds.Command{ Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, obj *VerifyProgress) error { if strings.Contains(obj.Msg, "was corrupt") { - fmt.Fprintln(os.Stdout, obj.Msg) + fmt.Fprintln(w, obj.Msg) return nil } diff --git a/core/commands/repo_verify_test.go b/core/commands/repo_verify_test.go new file mode 100644 index 000000000..4b6b65a07 --- /dev/null +++ b/core/commands/repo_verify_test.go @@ -0,0 +1,371 @@ +//go:build go1.25 + +package commands + +// This file contains unit tests for the --heal-timeout flag functionality +// using testing/synctest to avoid waiting for real timeouts. +// +// End-to-end tests for the full 'ipfs repo verify' command (including --drop +// and --heal flags) are located in test/cli/repo_verify_test.go. + +import ( + "bytes" + "context" + "errors" + "io" + "sync" + "testing" + "testing/synctest" + "time" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + coreiface "github.com/ipfs/kubo/core/coreiface" + "github.com/ipfs/kubo/core/coreiface/options" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ipfs/boxo/path" +) + +func TestVerifyWorkerHealTimeout(t *testing.T) { + t.Run("heal succeeds before timeout", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + const healTimeout = 5 * time.Second + testCID := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") + + // Setup channels + keys := make(chan cid.Cid, 1) + keys <- testCID + close(keys) + results := make(chan *verifyResult, 1) + + // Mock blockstore that returns error (simulating corruption) + mockBS := &mockBlockstore{ + getError: errors.New("corrupt block"), + } + + // Mock API where Block().Get() completes before timeout + mockAPI := &mockCoreAPI{ + blockAPI: &mockBlockAPI{ + getDelay: 2 * time.Second, // Less than healTimeout + data: []byte("healed data"), + }, + } + + var wg sync.WaitGroup + wg.Add(1) + + // Run worker + go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout) + + // Advance time past the mock delay but before timeout + time.Sleep(3 * time.Second) + synctest.Wait() + + wg.Wait() + close(results) + + // Verify heal succeeded + result := <-results + require.NotNil(t, result) + assert.Equal(t, verifyStateCorruptHealed, result.state) + assert.Contains(t, result.msg, "healed") + }) + }) + + t.Run("heal fails due to timeout", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + const healTimeout = 2 * time.Second + testCID := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") + + // Setup channels + keys := make(chan cid.Cid, 1) + keys <- testCID + close(keys) + results := make(chan *verifyResult, 1) + + // Mock blockstore that returns error (simulating corruption) + mockBS := &mockBlockstore{ + getError: errors.New("corrupt block"), + } + + // Mock API where Block().Get() takes longer than healTimeout + mockAPI := &mockCoreAPI{ + blockAPI: &mockBlockAPI{ + getDelay: 5 * time.Second, // More than healTimeout + data: []byte("healed data"), + }, + } + + var wg sync.WaitGroup + wg.Add(1) + + // Run worker + go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout) + + // Advance time past timeout + time.Sleep(3 * time.Second) + synctest.Wait() + + wg.Wait() + close(results) + + // Verify heal failed due to timeout + result := <-results + require.NotNil(t, result) + assert.Equal(t, verifyStateCorruptHealFailed, result.state) + assert.Contains(t, result.msg, "failed to heal") + assert.Contains(t, result.msg, "context deadline exceeded") + }) + }) + + t.Run("heal with zero timeout still attempts heal", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + const healTimeout = 0 // Zero timeout means no timeout + testCID := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") + + // Setup channels + keys := make(chan cid.Cid, 1) + keys <- testCID + close(keys) + results := make(chan *verifyResult, 1) + + // Mock blockstore that returns error (simulating corruption) + mockBS := &mockBlockstore{ + getError: errors.New("corrupt block"), + } + + // Mock API that succeeds quickly + mockAPI := &mockCoreAPI{ + blockAPI: &mockBlockAPI{ + getDelay: 100 * time.Millisecond, + data: []byte("healed data"), + }, + } + + var wg sync.WaitGroup + wg.Add(1) + + // Run worker + go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout) + + // Advance time to let heal complete + time.Sleep(200 * time.Millisecond) + synctest.Wait() + + wg.Wait() + close(results) + + // Verify heal succeeded even with zero timeout + result := <-results + require.NotNil(t, result) + assert.Equal(t, verifyStateCorruptHealed, result.state) + assert.Contains(t, result.msg, "healed") + }) + }) + + t.Run("multiple blocks with different timeout outcomes", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + const healTimeout = 3 * time.Second + testCID1 := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") + testCID2 := cid.MustParse("bafybeihvvulpp4evxj7x7armbqcyg6uezzuig6jp3lktpbovlqfkjtgyby") + + // Setup channels + keys := make(chan cid.Cid, 2) + keys <- testCID1 + keys <- testCID2 + close(keys) + results := make(chan *verifyResult, 2) + + // Mock blockstore that always returns error (all blocks corrupt) + mockBS := &mockBlockstore{ + getError: errors.New("corrupt block"), + } + + // Create two mock block APIs with different delays + // We'll need to alternate which one gets used + // For simplicity, use one that succeeds fast + mockAPI := &mockCoreAPI{ + blockAPI: &mockBlockAPI{ + getDelay: 1 * time.Second, // Less than healTimeout - will succeed + data: []byte("healed data"), + }, + } + + var wg sync.WaitGroup + wg.Add(2) // Two workers + + // Run two workers + go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout) + go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, true, true, healTimeout) + + // Advance time to let both complete + time.Sleep(2 * time.Second) + synctest.Wait() + + wg.Wait() + close(results) + + // Collect results + var healedCount int + for result := range results { + if result.state == verifyStateCorruptHealed { + healedCount++ + } + } + + // Both should heal successfully (both under timeout) + assert.Equal(t, 2, healedCount) + }) + }) + + t.Run("valid block is not healed", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + const healTimeout = 5 * time.Second + testCID := cid.MustParse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") + + // Setup channels + keys := make(chan cid.Cid, 1) + keys <- testCID + close(keys) + results := make(chan *verifyResult, 1) + + // Mock blockstore that returns valid block (no error) + mockBS := &mockBlockstore{ + block: blocks.NewBlock([]byte("valid data")), + } + + // Mock API (won't be called since block is valid) + mockAPI := &mockCoreAPI{ + blockAPI: &mockBlockAPI{}, + } + + var wg sync.WaitGroup + wg.Add(1) + + // Run worker with heal enabled + go verifyWorkerRun(t.Context(), &wg, keys, results, mockBS, mockAPI, false, true, healTimeout) + + synctest.Wait() + + wg.Wait() + close(results) + + // Verify block is marked valid, not healed + result := <-results + require.NotNil(t, result) + assert.Equal(t, verifyStateValid, result.state) + assert.Empty(t, result.msg) + }) + }) +} + +// mockBlockstore implements a minimal blockstore for testing +type mockBlockstore struct { + getError error + block blocks.Block +} + +func (m *mockBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + if m.getError != nil { + return nil, m.getError + } + return m.block, nil +} + +func (m *mockBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { + return nil +} + +func (m *mockBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + return m.block != nil, nil +} + +func (m *mockBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + if m.block != nil { + return len(m.block.RawData()), nil + } + return 0, errors.New("block not found") +} + +func (m *mockBlockstore) Put(ctx context.Context, b blocks.Block) error { + return nil +} + +func (m *mockBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error { + return nil +} + +func (m *mockBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, errors.New("not implemented") +} + +func (m *mockBlockstore) HashOnRead(enabled bool) { +} + +// mockBlockAPI implements BlockAPI for testing +type mockBlockAPI struct { + getDelay time.Duration + getError error + data []byte +} + +func (m *mockBlockAPI) Get(ctx context.Context, p path.Path) (io.Reader, error) { + if m.getDelay > 0 { + select { + case <-time.After(m.getDelay): + // Delay completed + case <-ctx.Done(): + return nil, ctx.Err() + } + } + if m.getError != nil { + return nil, m.getError + } + return bytes.NewReader(m.data), nil +} + +func (m *mockBlockAPI) Put(ctx context.Context, r io.Reader, opts ...options.BlockPutOption) (coreiface.BlockStat, error) { + return nil, errors.New("not implemented") +} + +func (m *mockBlockAPI) Rm(ctx context.Context, p path.Path, opts ...options.BlockRmOption) error { + return errors.New("not implemented") +} + +func (m *mockBlockAPI) Stat(ctx context.Context, p path.Path) (coreiface.BlockStat, error) { + return nil, errors.New("not implemented") +} + +// mockCoreAPI implements minimal CoreAPI for testing +type mockCoreAPI struct { + blockAPI *mockBlockAPI +} + +func (m *mockCoreAPI) Block() coreiface.BlockAPI { + return m.blockAPI +} + +func (m *mockCoreAPI) Unixfs() coreiface.UnixfsAPI { return nil } +func (m *mockCoreAPI) Dag() coreiface.APIDagService { return nil } +func (m *mockCoreAPI) Name() coreiface.NameAPI { return nil } +func (m *mockCoreAPI) Key() coreiface.KeyAPI { return nil } +func (m *mockCoreAPI) Pin() coreiface.PinAPI { return nil } +func (m *mockCoreAPI) Object() coreiface.ObjectAPI { return nil } +func (m *mockCoreAPI) Swarm() coreiface.SwarmAPI { return nil } +func (m *mockCoreAPI) PubSub() coreiface.PubSubAPI { return nil } +func (m *mockCoreAPI) Routing() coreiface.RoutingAPI { return nil } + +func (m *mockCoreAPI) ResolvePath(ctx context.Context, p path.Path) (path.ImmutablePath, []string, error) { + return path.ImmutablePath{}, nil, errors.New("not implemented") +} + +func (m *mockCoreAPI) ResolveNode(ctx context.Context, p path.Path) (ipld.Node, error) { + return nil, errors.New("not implemented") +} + +func (m *mockCoreAPI) WithOptions(...options.ApiOption) (coreiface.CoreAPI, error) { + return nil, errors.New("not implemented") +} diff --git a/test/cli/repo_verify_test.go b/test/cli/repo_verify_test.go new file mode 100644 index 000000000..e75eec963 --- /dev/null +++ b/test/cli/repo_verify_test.go @@ -0,0 +1,384 @@ +package cli + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/ipfs/kubo/test/cli/harness" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Well-known block file names in flatfs blockstore that should not be corrupted during testing. +// Flatfs stores each block as a separate .data file on disk. +const ( + // emptyFileFlatfsFilename is the flatfs filename for an empty UnixFS file block + emptyFileFlatfsFilename = "CIQL7TG2PB52XIZLLHDYIUFMHUQLMMZWBNBZSLDXFCPZ5VDNQQ2WDZQ" + // emptyDirFlatfsFilename is the flatfs filename for an empty UnixFS directory block. + // This block has special handling and may be served from memory even when corrupted on disk. + emptyDirFlatfsFilename = "CIQFTFEEHEDF6KLBT32BFAGLXEZL4UWFNWM4LFTLMXQBCERZ6CMLX3Y" +) + +// getEligibleFlatfsBlockFiles returns flatfs block files (*.data) that are safe to corrupt in tests. +// Filters out well-known blocks (empty file/dir) that cause test flakiness. +// +// Note: This helper is specific to the flatfs blockstore implementation where each block +// is stored as a separate file on disk under blocks/*/*.data. +func getEligibleFlatfsBlockFiles(t *testing.T, node *harness.Node) []string { + blockFiles, err := filepath.Glob(filepath.Join(node.Dir, "blocks", "*", "*.data")) + require.NoError(t, err) + require.NotEmpty(t, blockFiles, "no flatfs block files found") + + var eligible []string + for _, f := range blockFiles { + name := filepath.Base(f) + if !strings.Contains(name, emptyFileFlatfsFilename) && + !strings.Contains(name, emptyDirFlatfsFilename) { + eligible = append(eligible, f) + } + } + return eligible +} + +// corruptRandomBlock corrupts a random block file in the flatfs blockstore. +// Returns the path to the corrupted file. +func corruptRandomBlock(t *testing.T, node *harness.Node) string { + eligible := getEligibleFlatfsBlockFiles(t, node) + require.NotEmpty(t, eligible, "no eligible blocks to corrupt") + + toCorrupt := eligible[0] + err := os.WriteFile(toCorrupt, []byte("corrupted data"), 0644) + require.NoError(t, err) + + return toCorrupt +} + +// corruptMultipleBlocks corrupts multiple block files in the flatfs blockstore. +// Returns the paths to the corrupted files. +func corruptMultipleBlocks(t *testing.T, node *harness.Node, count int) []string { + eligible := getEligibleFlatfsBlockFiles(t, node) + require.GreaterOrEqual(t, len(eligible), count, "not enough eligible blocks to corrupt") + + var corrupted []string + for i := 0; i < count && i < len(eligible); i++ { + err := os.WriteFile(eligible[i], []byte(fmt.Sprintf("corrupted data %d", i)), 0644) + require.NoError(t, err) + corrupted = append(corrupted, eligible[i]) + } + + return corrupted +} + +func TestRepoVerify(t *testing.T) { + t.Run("healthy repo passes", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("add", "-q", "--raw-leaves=false", "-r", node.IPFSBin) + + res := node.IPFS("repo", "verify") + assert.Contains(t, res.Stdout.String(), "all blocks validated") + }) + + t.Run("detects corruption", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFSAddStr("test content") + + corruptRandomBlock(t, node) + + res := node.RunIPFS("repo", "verify") + assert.Equal(t, 1, res.ExitCode()) + assert.Contains(t, res.Stdout.String(), "was corrupt") + assert.Contains(t, res.Stderr.String(), "1 blocks corrupt") + }) + + t.Run("drop removes corrupt blocks", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + cid := node.IPFSAddStr("test content") + + corruptRandomBlock(t, node) + + res := node.RunIPFS("repo", "verify", "--drop") + assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks removed successfully") + output := res.Stdout.String() + assert.Contains(t, output, "1 blocks corrupt") + assert.Contains(t, output, "1 removed") + + // Verify block is gone + res = node.RunIPFS("block", "stat", cid) + assert.NotEqual(t, 0, res.ExitCode()) + }) + + t.Run("heal requires online mode", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFSAddStr("test content") + + corruptRandomBlock(t, node) + + res := node.RunIPFS("repo", "verify", "--heal") + assert.NotEqual(t, 0, res.ExitCode()) + assert.Contains(t, res.Stderr.String(), "online mode") + }) + + t.Run("heal repairs from network", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + + // Add content to node 0 + cid := nodes[0].IPFSAddStr("test content for healing") + + // Wait for it to appear on node 1 + nodes[1].IPFS("block", "get", cid) + + // Corrupt on node 1 + corruptRandomBlock(t, nodes[1]) + + // Heal should restore from node 0 + res := nodes[1].RunIPFS("repo", "verify", "--heal") + assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks healed successfully") + output := res.Stdout.String() + + // Should report corruption and healing with specific counts + assert.Contains(t, output, "1 blocks corrupt") + assert.Contains(t, output, "1 removed") + assert.Contains(t, output, "1 healed") + + // Verify block is restored + nodes[1].IPFS("block", "stat", cid) + }) + + t.Run("healed blocks contain correct data", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + + // Add specific content to node 0 + testContent := "this is the exact content that should be healed correctly" + cid := nodes[0].IPFSAddStr(testContent) + + // Fetch to node 1 and verify the content is correct initially + nodes[1].IPFS("block", "get", cid) + res := nodes[1].IPFS("cat", cid) + assert.Equal(t, testContent, res.Stdout.String()) + + // Corrupt on node 1 + corruptRandomBlock(t, nodes[1]) + + // Heal the corruption + res = nodes[1].RunIPFS("repo", "verify", "--heal") + assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks healed successfully") + output := res.Stdout.String() + assert.Contains(t, output, "1 blocks corrupt") + assert.Contains(t, output, "1 removed") + assert.Contains(t, output, "1 healed") + + // Verify the healed content matches the original exactly + res = nodes[1].IPFS("cat", cid) + assert.Equal(t, testContent, res.Stdout.String(), "healed content should match original") + + // Also verify via block get that the raw block data is correct + block0 := nodes[0].IPFS("block", "get", cid) + block1 := nodes[1].IPFS("block", "get", cid) + assert.Equal(t, block0.Stdout.String(), block1.Stdout.String(), "raw block data should match") + }) + + t.Run("multiple corrupt blocks", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + + // Create 20 blocks + for i := 0; i < 20; i++ { + node.IPFSAddStr(strings.Repeat("test content ", i+1)) + } + + // Corrupt 5 blocks + corruptMultipleBlocks(t, node, 5) + + // Verify detects all corruptions + res := node.RunIPFS("repo", "verify") + assert.Equal(t, 1, res.ExitCode()) + // Error summary is in stderr + assert.Contains(t, res.Stderr.String(), "5 blocks corrupt") + + // Test with --drop + res = node.RunIPFS("repo", "verify", "--drop") + assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks removed successfully") + assert.Contains(t, res.Stdout.String(), "5 blocks corrupt") + assert.Contains(t, res.Stdout.String(), "5 removed") + }) + + t.Run("empty repository", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + + // Verify empty repo passes + res := node.IPFS("repo", "verify") + assert.Equal(t, 0, res.ExitCode()) + assert.Contains(t, res.Stdout.String(), "all blocks validated") + + // Should work with --drop and --heal too + res = node.IPFS("repo", "verify", "--drop") + assert.Equal(t, 0, res.ExitCode()) + assert.Contains(t, res.Stdout.String(), "all blocks validated") + }) + + t.Run("partial heal success", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + + // Start both nodes and connect them + nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + + // Add 5 blocks to node 0, pin them to keep available + cid1 := nodes[0].IPFSAddStr("content available for healing 1") + cid2 := nodes[0].IPFSAddStr("content available for healing 2") + cid3 := nodes[0].IPFSAddStr("content available for healing 3") + cid4 := nodes[0].IPFSAddStr("content available for healing 4") + cid5 := nodes[0].IPFSAddStr("content available for healing 5") + + // Pin these on node 0 to ensure they stay available + nodes[0].IPFS("pin", "add", cid1) + nodes[0].IPFS("pin", "add", cid2) + nodes[0].IPFS("pin", "add", cid3) + nodes[0].IPFS("pin", "add", cid4) + nodes[0].IPFS("pin", "add", cid5) + + // Node 1 fetches these blocks + nodes[1].IPFS("block", "get", cid1) + nodes[1].IPFS("block", "get", cid2) + nodes[1].IPFS("block", "get", cid3) + nodes[1].IPFS("block", "get", cid4) + nodes[1].IPFS("block", "get", cid5) + + // Now remove some blocks from node 0 to simulate partial availability + nodes[0].IPFS("pin", "rm", cid3) + nodes[0].IPFS("pin", "rm", cid4) + nodes[0].IPFS("pin", "rm", cid5) + nodes[0].IPFS("repo", "gc") + + // Verify node 1 is still connected + peers := nodes[1].IPFS("swarm", "peers") + require.Contains(t, peers.Stdout.String(), nodes[0].PeerID().String()) + + // Corrupt 5 blocks on node 1 + corruptMultipleBlocks(t, nodes[1], 5) + + // Heal should partially succeed (only cid1 and cid2 available from node 0) + res := nodes[1].RunIPFS("repo", "verify", "--heal") + assert.Equal(t, 1, res.ExitCode()) + + // Should show mixed results with specific counts in stderr + errOutput := res.Stderr.String() + assert.Contains(t, errOutput, "5 blocks corrupt") + assert.Contains(t, errOutput, "5 removed") + // Only cid1 and cid2 are available for healing, cid3-5 were GC'd + assert.Contains(t, errOutput, "2 healed") + assert.Contains(t, errOutput, "3 failed to heal") + }) + + t.Run("heal with block not available on network", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + + // Start both nodes and connect + nodes.StartDaemons().Connect() + defer nodes.StopDaemons() + + // Add unique content only to node 1 + nodes[1].IPFSAddStr("unique content that exists nowhere else") + + // Ensure nodes are connected + peers := nodes[1].IPFS("swarm", "peers") + require.Contains(t, peers.Stdout.String(), nodes[0].PeerID().String()) + + // Corrupt the block on node 1 + corruptRandomBlock(t, nodes[1]) + + // Heal should fail - node 0 doesn't have this content + res := nodes[1].RunIPFS("repo", "verify", "--heal") + assert.Equal(t, 1, res.ExitCode()) + + // Should report heal failure with specific counts in stderr + errOutput := res.Stderr.String() + assert.Contains(t, errOutput, "1 blocks corrupt") + assert.Contains(t, errOutput, "1 removed") + assert.Contains(t, errOutput, "1 failed to heal") + }) + + t.Run("large repository scale test", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + + // Create 1000 small blocks + for i := 0; i < 1000; i++ { + node.IPFSAddStr(fmt.Sprintf("content-%d", i)) + } + + // Corrupt 10 blocks + corruptMultipleBlocks(t, node, 10) + + // Verify handles large repos efficiently + res := node.RunIPFS("repo", "verify") + assert.Equal(t, 1, res.ExitCode()) + + // Should report exactly 10 corrupt blocks in stderr + assert.Contains(t, res.Stderr.String(), "10 blocks corrupt") + + // Test --drop at scale + res = node.RunIPFS("repo", "verify", "--drop") + assert.Equal(t, 0, res.ExitCode(), "should exit 0 when all corrupt blocks removed successfully") + output := res.Stdout.String() + assert.Contains(t, output, "10 blocks corrupt") + assert.Contains(t, output, "10 removed") + }) + + t.Run("drop with partial removal failures", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + + // Create several blocks + for i := 0; i < 5; i++ { + node.IPFSAddStr(fmt.Sprintf("content for removal test %d", i)) + } + + // Corrupt 3 blocks + corruptedFiles := corruptMultipleBlocks(t, node, 3) + require.Len(t, corruptedFiles, 3) + + // Make one of the corrupted files read-only to simulate removal failure + err := os.Chmod(corruptedFiles[0], 0400) // read-only + require.NoError(t, err) + defer func() { _ = os.Chmod(corruptedFiles[0], 0644) }() // cleanup + + // Also make the directory read-only to prevent deletion + blockDir := filepath.Dir(corruptedFiles[0]) + originalPerm, err := os.Stat(blockDir) + require.NoError(t, err) + err = os.Chmod(blockDir, 0500) // read+execute only, no write + require.NoError(t, err) + defer func() { _ = os.Chmod(blockDir, originalPerm.Mode()) }() // cleanup + + // Try to drop - should fail because at least one block can't be removed + res := node.RunIPFS("repo", "verify", "--drop") + assert.Equal(t, 1, res.ExitCode(), "should exit 1 when some blocks fail to remove") + + // Restore permissions for verification + _ = os.Chmod(blockDir, originalPerm.Mode()) + _ = os.Chmod(corruptedFiles[0], 0644) + + // Should report both successes and failures with specific counts + errOutput := res.Stderr.String() + assert.Contains(t, errOutput, "3 blocks corrupt") + assert.Contains(t, errOutput, "2 removed") + assert.Contains(t, errOutput, "1 failed to remove") + }) +} diff --git a/test/sharness/t0086-repo-verify.sh b/test/sharness/t0086-repo-verify.sh index 612d281ef..b73a6230e 100755 --- a/test/sharness/t0086-repo-verify.sh +++ b/test/sharness/t0086-repo-verify.sh @@ -3,6 +3,9 @@ # Copyright (c) 2016 Jeromy Johnson # MIT Licensed; see the LICENSE file in this repository. # +# NOTE: This is a legacy sharness test kept for compatibility. +# New tests for 'ipfs repo verify' should be added to test/cli/repo_verify_test.go +# test_description="Test ipfs repo fsck"