From 9397f9649b69315a4ceff5afd2fc2015d3d681c4 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Fri, 9 Jan 2026 19:49:03 +0100 Subject: [PATCH] refactor(test): use waitForSubscription helper in pubsub tests replaces hardcoded time.Sleep(500ms) with deterministic polling via `ipfs pubsub ls`, following the pattern established in p2p_test.go addresses review feedback from #11110 --- test/cli/pubsub_test.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/test/cli/pubsub_test.go b/test/cli/pubsub_test.go index 632265782..b583b649f 100644 --- a/test/cli/pubsub_test.go +++ b/test/cli/pubsub_test.go @@ -3,6 +3,7 @@ package cli import ( "context" "encoding/json" + "slices" "strings" "testing" "time" @@ -12,6 +13,18 @@ import ( "github.com/stretchr/testify/require" ) +// waitForSubscription waits until the node has a subscription to the given topic. +func waitForSubscription(t *testing.T, node *harness.Node, topic string) { + t.Helper() + require.Eventually(t, func() bool { + res := node.RunIPFS("pubsub", "ls") + if res.Err != nil { + return false + } + return slices.Contains(res.Stdout.Lines(), topic) + }, 5*time.Second, 100*time.Millisecond, "expected subscription to topic %s", topic) +} + // TestPubsub tests pubsub functionality and the persistent seqno validator. // // Pubsub has two deduplication layers: @@ -74,7 +87,7 @@ func TestPubsub(t *testing.T) { }() // Wait for subscriber to be ready - time.Sleep(500 * time.Millisecond) + waitForSubscription(t, subscriber, topic) // Publish message publisher.PipeStrToIPFS(message, "pubsub", "pub", topic) @@ -109,7 +122,7 @@ func TestPubsub(t *testing.T) { go func() { node1.RunIPFS("pubsub", "sub", topic) }() - time.Sleep(500 * time.Millisecond) + waitForSubscription(t, node1, topic) // Publish multiple messages from node2 to trigger seqno validation for i := 0; i < 3; i++ { @@ -163,7 +176,7 @@ func TestPubsub(t *testing.T) { go func() { node1.RunIPFS("pubsub", "sub", topic) }() - time.Sleep(500 * time.Millisecond) + waitForSubscription(t, node1, topic) // Send first message node2.PipeStrToIPFS("msg1", "pubsub", "pub", topic) @@ -184,13 +197,12 @@ func TestPubsub(t *testing.T) { // Restart daemons for second message nodes = nodes.StartDaemons().Connect() - time.Sleep(500 * time.Millisecond) // Resubscribe go func() { node1.RunIPFS("pubsub", "sub", topic) }() - time.Sleep(500 * time.Millisecond) + waitForSubscription(t, node1, topic) // Send second message node2.PipeStrToIPFS("msg2", "pubsub", "pub", topic) @@ -241,7 +253,7 @@ func TestPubsub(t *testing.T) { go func() { node1.RunIPFS("pubsub", "sub", topic) }() - time.Sleep(500 * time.Millisecond) + waitForSubscription(t, node1, topic) for i := 0; i < 3; i++ { node2.PipeStrToIPFS("msg", "pubsub", "pub", topic) @@ -296,7 +308,7 @@ func TestPubsub(t *testing.T) { go func() { node1.RunIPFS("pubsub", "sub", topic) }() - time.Sleep(500 * time.Millisecond) + waitForSubscription(t, node1, topic) // Publish from both node2 and node3 for i := 0; i < 3; i++ { @@ -354,7 +366,7 @@ func TestPubsub(t *testing.T) { go func() { node.RunIPFS("pubsub", "sub", topic) }() - time.Sleep(500 * time.Millisecond) + waitForSubscription(t, node, topic) for i := 0; i < 3; i++ { node2.PipeStrToIPFS("msg", "pubsub", "pub", topic)