refactor(test): use waitForSubscription helper in pubsub tests

replaces hardcoded time.Sleep(500ms) with deterministic polling
via `ipfs pubsub ls`, following the pattern established in p2p_test.go

addresses review feedback from #11110
This commit is contained in:
Marcin Rataj 2026-01-09 19:49:03 +01:00
parent fada2b6dc4
commit 9397f9649b

View File

@ -3,6 +3,7 @@ package cli
import (
"context"
"encoding/json"
"slices"
"strings"
"testing"
"time"
@ -12,6 +13,18 @@ import (
"github.com/stretchr/testify/require"
)
// waitForSubscription waits until the node has a subscription to the given topic.
func waitForSubscription(t *testing.T, node *harness.Node, topic string) {
t.Helper()
require.Eventually(t, func() bool {
res := node.RunIPFS("pubsub", "ls")
if res.Err != nil {
return false
}
return slices.Contains(res.Stdout.Lines(), topic)
}, 5*time.Second, 100*time.Millisecond, "expected subscription to topic %s", topic)
}
// TestPubsub tests pubsub functionality and the persistent seqno validator.
//
// Pubsub has two deduplication layers:
@ -74,7 +87,7 @@ func TestPubsub(t *testing.T) {
}()
// Wait for subscriber to be ready
time.Sleep(500 * time.Millisecond)
waitForSubscription(t, subscriber, topic)
// Publish message
publisher.PipeStrToIPFS(message, "pubsub", "pub", topic)
@ -109,7 +122,7 @@ func TestPubsub(t *testing.T) {
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
waitForSubscription(t, node1, topic)
// Publish multiple messages from node2 to trigger seqno validation
for i := 0; i < 3; i++ {
@ -163,7 +176,7 @@ func TestPubsub(t *testing.T) {
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
waitForSubscription(t, node1, topic)
// Send first message
node2.PipeStrToIPFS("msg1", "pubsub", "pub", topic)
@ -184,13 +197,12 @@ func TestPubsub(t *testing.T) {
// Restart daemons for second message
nodes = nodes.StartDaemons().Connect()
time.Sleep(500 * time.Millisecond)
// Resubscribe
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
waitForSubscription(t, node1, topic)
// Send second message
node2.PipeStrToIPFS("msg2", "pubsub", "pub", topic)
@ -241,7 +253,7 @@ func TestPubsub(t *testing.T) {
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
waitForSubscription(t, node1, topic)
for i := 0; i < 3; i++ {
node2.PipeStrToIPFS("msg", "pubsub", "pub", topic)
@ -296,7 +308,7 @@ func TestPubsub(t *testing.T) {
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
waitForSubscription(t, node1, topic)
// Publish from both node2 and node3
for i := 0; i < 3; i++ {
@ -354,7 +366,7 @@ func TestPubsub(t *testing.T) {
go func() {
node.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
waitForSubscription(t, node, topic)
for i := 0; i < 3; i++ {
node2.PipeStrToIPFS("msg", "pubsub", "pub", topic)