kubo/test/cli/pubsub_test.go
Marcin Rataj e725f6a2ab feat(pubsub): persistent seqno validation and diagnostic commands
- upgrade go-libp2p-pubsub to v0.15.0
- add persistent seqno validator using BasicSeqnoValidator
  stores max seen seqno per peer at /pubsub/seqno/<peerid>
  survives daemon restarts, addresses message cycling in large networks (#9665)
- add `ipfs pubsub reset` command to clear validator state
- add `ipfs diag datastore get/count` commands for datastore inspection
  requires daemon to be stopped, useful for debugging
- change pubsub status from Deprecated to Experimental
- add CLI tests for pubsub and diag datastore commands
- remove flaky pubsub_msg_seen_cache_test.go (replaced by CLI tests)
2025-12-15 23:06:26 +01:00

391 lines
11 KiB
Go

package cli
import (
"context"
"encoding/json"
"strings"
"testing"
"time"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestPubsub tests pubsub functionality and the persistent seqno validator.
//
// Pubsub has two deduplication layers:
//
// Layer 1: MessageID-based TimeCache (in-memory)
// - Controlled by Pubsub.SeenMessagesTTL config (default 120s)
// - Tested in go-libp2p-pubsub (see timecache in github.com/libp2p/go-libp2p-pubsub)
// - Only tested implicitly here via message delivery (timing-sensitive, not practical for CLI tests)
//
// Layer 2: Per-peer seqno validator (persistent in datastore)
// - Stores max seen seqno per peer at /pubsub/seqno/<peerid>
// - Tested directly below: persistence, updates, reset, survives restart
// - Validator: go-libp2p-pubsub BasicSeqnoValidator
func TestPubsub(t *testing.T) {
t.Parallel()
// enablePubsub configures a node with pubsub enabled
enablePubsub := func(n *harness.Node) {
n.SetIPFSConfig("Pubsub.Enabled", true)
n.SetIPFSConfig("Routing.Type", "none") // simplify test setup
}
t.Run("basic pub/sub message delivery", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub enabled
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
subscriber := nodes[0]
publisher := nodes[1]
const topic = "test-topic"
const message = "hello pubsub"
// Start subscriber in background
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Use a channel to receive the message
msgChan := make(chan string, 1)
go func() {
// Subscribe and wait for one message
res := subscriber.RunIPFS("pubsub", "sub", "--enc=json", topic)
if res.Err == nil {
// Parse JSON output to get message data
lines := res.Stdout.Lines()
if len(lines) > 0 {
var msg struct {
Data []byte `json:"data"`
}
if json.Unmarshal([]byte(lines[0]), &msg) == nil {
msgChan <- string(msg.Data)
}
}
}
}()
// Wait for subscriber to be ready
time.Sleep(500 * time.Millisecond)
// Publish message
publisher.PipeStrToIPFS(message, "pubsub", "pub", topic)
// Wait for message or timeout
select {
case received := <-msgChan:
assert.Equal(t, message, received)
case <-ctx.Done():
// Subscriber may not receive in time due to test timing - that's OK
// The main goal is to test the seqno validator state persistence
t.Log("subscriber did not receive message in time (this is acceptable)")
}
})
t.Run("seqno validator state is persisted", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node2PeerID := node2.PeerID().String()
const topic = "seqno-test"
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
// Publish multiple messages from node2 to trigger seqno validation
for i := 0; i < 3; i++ {
node2.PipeStrToIPFS("msg", "pubsub", "pub", topic)
time.Sleep(100 * time.Millisecond)
}
// Give time for messages to propagate and seqno to be stored
time.Sleep(1 * time.Second)
// Stop daemons to check datastore (diag datastore requires daemon to be stopped)
nodes.StopDaemons()
// Check that seqno state exists using diag datastore count
res := node1.IPFS("diag", "datastore", "count", "/pubsub/seqno/")
count := strings.TrimSpace(res.Stdout.String())
t.Logf("seqno entries count: %s", count)
// There should be at least one seqno entry (from node2)
assert.NotEqual(t, "0", count, "expected seqno state to be persisted")
// Verify the specific peer's key exists
key := "/pubsub/seqno/" + node2PeerID
res = node1.RunIPFS("diag", "datastore", "get", "--hex", key)
if res.Err == nil {
t.Logf("seqno for peer %s:\n%s", node2PeerID, res.Stdout.String())
assert.Contains(t, res.Stdout.String(), "Hex Dump:")
} else {
// Key might not exist if messages didn't propagate - log but don't fail
t.Logf("seqno key not found for peer %s (messages may not have propagated)", node2PeerID)
}
})
t.Run("seqno updates when receiving multiple messages", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node2PeerID := node2.PeerID().String()
const topic = "seqno-update-test"
seqnoKey := "/pubsub/seqno/" + node2PeerID
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
// Send first message
node2.PipeStrToIPFS("msg1", "pubsub", "pub", topic)
time.Sleep(500 * time.Millisecond)
// Stop daemons to check seqno (diag datastore requires daemon to be stopped)
nodes.StopDaemons()
// Get seqno after first message
res1 := node1.RunIPFS("diag", "datastore", "get", seqnoKey)
var seqno1 []byte
if res1.Err == nil {
seqno1 = res1.Stdout.Bytes()
t.Logf("seqno after first message: %d bytes", len(seqno1))
} else {
t.Logf("seqno not found after first message (message may not have propagated)")
}
// Restart daemons for second message
nodes = nodes.StartDaemons().Connect()
time.Sleep(500 * time.Millisecond)
// Resubscribe
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
// Send second message
node2.PipeStrToIPFS("msg2", "pubsub", "pub", topic)
time.Sleep(500 * time.Millisecond)
// Stop daemons to check seqno
nodes.StopDaemons()
// Get seqno after second message
res2 := node1.RunIPFS("diag", "datastore", "get", seqnoKey)
var seqno2 []byte
if res2.Err == nil {
seqno2 = res2.Stdout.Bytes()
t.Logf("seqno after second message: %d bytes", len(seqno2))
} else {
t.Logf("seqno not found after second message")
}
// If both messages were received, seqno should have been updated
// The seqno is a uint64 that should increase with each message
if len(seqno1) > 0 && len(seqno2) > 0 {
// seqno2 should be >= seqno1 (it's the max seen seqno)
// We just verify they're both non-empty and potentially different
t.Logf("seqno1: %x", seqno1)
t.Logf("seqno2: %x", seqno2)
// The seqno validator stores the max seqno seen, so seqno2 >= seqno1
// We can't do a simple byte comparison due to potential endianness
// but both should be valid uint64 values (8 bytes)
assert.Equal(t, 8, len(seqno2), "seqno should be 8 bytes (uint64)")
}
})
t.Run("pubsub reset clears seqno state", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
const topic = "reset-test"
// Start subscriber and exchange messages
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
for i := 0; i < 3; i++ {
node2.PipeStrToIPFS("msg", "pubsub", "pub", topic)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(1 * time.Second)
// Stop daemons to check initial count
nodes.StopDaemons()
// Get initial count
res := node1.IPFS("diag", "datastore", "count", "/pubsub/seqno/")
initialCount := strings.TrimSpace(res.Stdout.String())
t.Logf("initial seqno count: %s", initialCount)
// Restart daemon to run pubsub reset
node1.StartDaemon()
// Reset all seqno state
res = node1.IPFS("pubsub", "reset")
assert.NoError(t, res.Err)
t.Logf("reset output: %s", res.Stdout.String())
// Stop daemon to verify state was cleared
node1.StopDaemon()
// Verify state was cleared
res = node1.IPFS("diag", "datastore", "count", "/pubsub/seqno/")
finalCount := strings.TrimSpace(res.Stdout.String())
t.Logf("final seqno count: %s", finalCount)
assert.Equal(t, "0", finalCount, "seqno state should be cleared after reset")
})
t.Run("pubsub reset with peer flag", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create three connected nodes
nodes := h.NewNodes(3).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node3 := nodes[2]
node2PeerID := node2.PeerID().String()
node3PeerID := node3.PeerID().String()
const topic = "peer-reset-test"
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
// Publish from both node2 and node3
for i := 0; i < 3; i++ {
node2.PipeStrToIPFS("msg2", "pubsub", "pub", topic)
node3.PipeStrToIPFS("msg3", "pubsub", "pub", topic)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(1 * time.Second)
// Stop other daemons (keep node1 running to do the reset)
node2.StopDaemon()
node3.StopDaemon()
// Reset only node2's state (while node1 daemon is running)
res := node1.IPFS("pubsub", "reset", "--peer", node2PeerID)
require.NoError(t, res.Err)
t.Logf("reset output: %s", res.Stdout.String())
// Stop node1 daemon to check datastore
node1.StopDaemon()
// Check that node2's key is gone
res = node1.RunIPFS("diag", "datastore", "get", "/pubsub/seqno/"+node2PeerID)
assert.Error(t, res.Err, "node2's seqno key should be deleted")
// Check that node3's key still exists (if it was created)
res = node1.RunIPFS("diag", "datastore", "get", "/pubsub/seqno/"+node3PeerID)
// Note: node3's key might not exist if messages didn't propagate
// So we just log the result without asserting
if res.Err == nil {
t.Logf("node3's seqno key still exists (as expected)")
} else {
t.Logf("node3's seqno key not found (messages may not have propagated)")
}
})
t.Run("seqno state survives daemon restart", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create and start single node
node := h.NewNode().Init()
enablePubsub(node)
node.StartDaemon()
// We need another node to publish messages
node2 := h.NewNode().Init()
enablePubsub(node2)
node2.StartDaemon()
node.Connect(node2)
const topic = "restart-test"
// Start subscriber and exchange messages
go func() {
node.RunIPFS("pubsub", "sub", topic)
}()
time.Sleep(500 * time.Millisecond)
for i := 0; i < 3; i++ {
node2.PipeStrToIPFS("msg", "pubsub", "pub", topic)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(1 * time.Second)
// Stop daemons to check datastore
node.StopDaemon()
node2.StopDaemon()
// Get count before restart
res := node.IPFS("diag", "datastore", "count", "/pubsub/seqno/")
beforeCount := strings.TrimSpace(res.Stdout.String())
t.Logf("seqno count before restart: %s", beforeCount)
// Restart daemon (simulate restart scenario)
time.Sleep(500 * time.Millisecond)
node.StartDaemon()
time.Sleep(500 * time.Millisecond)
// Stop daemon to check datastore again
node.StopDaemon()
// Get count after restart
res = node.IPFS("diag", "datastore", "count", "/pubsub/seqno/")
afterCount := strings.TrimSpace(res.Stdout.String())
t.Logf("seqno count after restart: %s", afterCount)
// Count should be the same (state persisted)
assert.Equal(t, beforeCount, afterCount, "seqno state should survive daemon restart")
})
}