ceremonyclient/node/rpc/proxy_blossomsub_test.go
Cassandra Heart 12996487c3
v2.1.0.18 (#508)
* experiment: reject bad peer info messages

* v2.1.0.18 preview

* add tagged sync

* Add missing hypergraph changes

* small tweaks to sync

* allow local sync, use it for provers with workers

* missing file

* resolve build error

* resolve sync issue, remove raw sync

* resolve deletion promotion bug

* resolve sync abstraction leak from tree deletion changes

* rearrange prover sync

* remove pruning from sync

* restore removed sync flag

* fix: sync, event stream deadlock, heuristic scoring of better shards

* resolve hanging shutdown + pubsub proxy issue

* further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events

* fix: clean up rust ffi, background coverage events, and sync tweaks

* fix: linking issue for channel, connectivity test aggression, sync regression, join tests

* fix: disjoint sync, improper application of filter

* resolve sync/reel/validation deadlock

* adjust sync to handle no leaf edge cases, multi-path segment traversal

* use simpler sync

* faster, simpler sync with some debug extras

* migration to recalculate

* don't use batch

* square up the roots

* fix nil pointer

* fix: seniority calculation, sync race condition, migration

* make sync dumber

* fix: tree deletion issue

* fix: missing seniority merge request canonical serialization

* address issues from previous commit test

* stale workers should be cleared

* remove missing gap check

* rearrange collect, reduce sync logging noise

* fix: the disjoint leaf/branch sync case

* nuclear option on sync failures

* v2.1.0.18, finalized
2026-02-08 23:51:51 -06:00

790 lines
24 KiB
Go

package rpc_test
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"net"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
p2ptypes "source.quilibrium.com/quilibrium/monorepo/types/p2p"
)
// mockPubSub implements the p2p.PubSub interface for testing
type mockPubSub struct {
peerID []byte
subscriptions map[string][]func(message *pb.Message) error // Support multiple handlers per bitmask
validators map[string]func(peerID peer.ID, message *pb.Message) p2ptypes.ValidationResult
publishedData map[string][]byte
mu sync.RWMutex
validatorCalls int
messageCount int
nextSubID int // For generating unique subscription IDs
}
func newMockPubSub() *mockPubSub {
// Generate a random peer ID for testing
peerID := make([]byte, 32)
rand.Read(peerID)
return &mockPubSub{
peerID: peerID,
subscriptions: make(map[string][]func(message *pb.Message) error),
validators: make(map[string]func(peer.ID, *pb.Message) p2ptypes.ValidationResult),
publishedData: make(map[string][]byte),
}
}
// Implement all p2p.PubSub interface methods
func (m *mockPubSub) PublishToBitmask(bitmask []byte, data []byte) error {
m.mu.Lock()
m.publishedData[string(bitmask)] = data
// Trigger any subscriptions - make a copy of ALL handlers for this bitmask
var handlersToCall []func(message *pb.Message) error
if handlers, exists := m.subscriptions[string(bitmask)]; exists {
handlersToCall = make([]func(message *pb.Message) error, len(handlers))
copy(handlersToCall, handlers)
}
m.messageCount++
msgSeqno := m.messageCount
m.mu.Unlock()
// Call all handlers without holding lock
if len(handlersToCall) > 0 {
msg := &pb.Message{
Data: data,
From: m.peerID,
Seqno: []byte(fmt.Sprintf("%d", msgSeqno)),
Bitmask: bitmask,
}
for _, handler := range handlersToCall {
go func(h func(message *pb.Message) error) {
h(msg)
}(handler)
}
}
return nil
}
func (m *mockPubSub) Publish(address []byte, data []byte) error {
// Simple mock - just use address as bitmask
return m.PublishToBitmask(address, data)
}
func (m *mockPubSub) Subscribe(bitmask []byte, handler func(message *pb.Message) error) error {
m.mu.Lock()
defer m.mu.Unlock()
bitmaskKey := string(bitmask)
// Add handler to the list of handlers for this bitmask
if _, exists := m.subscriptions[bitmaskKey]; !exists {
m.subscriptions[bitmaskKey] = make([]func(message *pb.Message) error, 0)
}
m.subscriptions[bitmaskKey] = append(m.subscriptions[bitmaskKey], handler)
return nil
}
func (m *mockPubSub) Unsubscribe(bitmask []byte, raw bool) {
m.mu.Lock()
defer m.mu.Unlock()
// In a real implementation, we'd need to track individual subscriptions
// For this mock, we'll just clear all handlers for this bitmask
delete(m.subscriptions, string(bitmask))
}
func (m *mockPubSub) RegisterValidator(
bitmask []byte,
validator func(peerID peer.ID, message *pb.Message) p2ptypes.ValidationResult,
sync bool,
) error {
m.mu.Lock()
defer m.mu.Unlock()
m.validators[string(bitmask)] = validator
return nil
}
func (m *mockPubSub) UnregisterValidator(bitmask []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.validators, string(bitmask))
return nil
}
func (m *mockPubSub) GetPeerID() []byte {
return m.peerID
}
func (m *mockPubSub) GetValidatorCallCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.validatorCalls
}
// Implement remaining interface methods with basic mocks
func (m *mockPubSub) GetPeerstoreCount() int { return 5 }
func (m *mockPubSub) GetNetworkPeersCount() int { return 10 }
func (m *mockPubSub) GetRandomPeer(bitmask []byte) ([]byte, error) { return m.peerID, nil }
func (m *mockPubSub) GetMultiaddrOfPeerStream(ctx context.Context, peerId []byte) <-chan multiaddr.Multiaddr {
ch := make(chan multiaddr.Multiaddr)
close(ch)
return ch
}
func (m *mockPubSub) GetMultiaddrOfPeer(peerId []byte) string { return "/ip4/127.0.0.1/tcp/8080" }
func (m *mockPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr {
ma, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/8080")
return []multiaddr.Multiaddr{ma}
}
func (m *mockPubSub) StartDirectChannelListener(key []byte, purpose string, server *grpc.Server) error {
return nil
}
func (m *mockPubSub) GetDirectChannel(ctx context.Context, peerId []byte, purpose string) (*grpc.ClientConn, error) {
return nil, nil
}
func (m *mockPubSub) GetNetworkInfo() *protobufs.NetworkInfoResponse {
return &protobufs.NetworkInfoResponse{}
}
func (m *mockPubSub) SignMessage(msg []byte) ([]byte, error) { return msg, nil }
func (m *mockPubSub) GetPublicKey() []byte { return m.peerID }
func (m *mockPubSub) GetPeerScore(peerId []byte) int64 { return 100 }
func (m *mockPubSub) SetPeerScore(peerId []byte, score int64) {}
func (m *mockPubSub) AddPeerScore(peerId []byte, scoreDelta int64) {}
func (m *mockPubSub) Reconnect(peerId []byte) error { return nil }
func (m *mockPubSub) Bootstrap(ctx context.Context) error { return nil }
func (m *mockPubSub) DiscoverPeers(ctx context.Context) error { return nil }
func (m *mockPubSub) GetNetwork() uint { return 0 }
func (m *mockPubSub) IsPeerConnected(peerId []byte) bool { return true }
func (m *mockPubSub) Reachability() *wrapperspb.BoolValue { return wrapperspb.Bool(true) }
func (m *mockPubSub) Close() error { return nil }
func (m *mockPubSub) SetShutdownContext(ctx context.Context) {}
// Test helper functions
func createTestConfigs() (*config.P2PConfig, *config.EngineConfig, error) {
// Generate a test Ed448 private key
privKey, _, err := crypto.GenerateEd448Key(rand.Reader)
if err != nil {
return nil, nil, err
}
privKeyBytes, err := privKey.Raw()
if err != nil {
return nil, nil, err
}
p2pConfig := &config.P2PConfig{
StreamListenMultiaddr: "/ip4/127.0.0.1/tcp/0", // Use ephemeral port
PeerPrivKey: hex.EncodeToString(privKeyBytes),
}
engineConfig := &config.EngineConfig{
EnableMasterProxy: true,
}
return p2pConfig, engineConfig, nil
}
func setupTestServer(t *testing.T, mockPubSub *mockPubSub, p2pConfig *config.P2PConfig) (string, func()) {
// Create TLS credentials for the test server using the provided config
tlsCreds, err := p2p.NewPeerAuthenticator(
zap.NewNop(),
p2pConfig,
nil,
nil,
nil,
nil,
nil,
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer,
},
nil,
).CreateServerTLSCredentials()
require.NoError(t, err)
// Find available port
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
addr := listener.Addr().String()
listener.Close()
// Create gRPC server with TLS
server := grpc.NewServer(grpc.Creds(tlsCreds))
proxyServer := rpc.NewPubSubProxyServer(mockPubSub, zap.NewNop())
protobufs.RegisterPubSubProxyServer(server, proxyServer)
// Start server
listener, err = net.Listen("tcp", addr)
require.NoError(t, err)
go func() {
if err := server.Serve(listener); err != nil {
t.Logf("Server error: %v", err)
}
}()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
return addr, func() {
server.Stop()
listener.Close()
}
}
func TestProxyBlossomSubCreation(t *testing.T) {
p2pConfig, engineConfig, err := createTestConfigs()
require.NoError(t, err)
// Test with coreId = 0 (should fail)
_, err = rpc.NewProxyBlossomSub(p2pConfig, engineConfig, zap.NewNop(), 0)
assert.Error(t, err)
assert.Contains(t, err.Error(), "proxy blossomsub should not be used for master node")
// Test with proxy disabled (should fail)
engineConfig.EnableMasterProxy = false
_, err = rpc.NewProxyBlossomSub(p2pConfig, engineConfig, zap.NewNop(), 1)
assert.Error(t, err)
assert.Contains(t, err.Error(), "proxy mode is not enabled")
// Enable proxy mode
engineConfig.EnableMasterProxy = true
// Test with valid config but no server running
// Note: gRPC dial doesn't fail immediately, so we just test that creation succeeds
proxy, err := rpc.NewProxyBlossomSub(p2pConfig, engineConfig, zap.NewNop(), 1)
if err == nil {
// Creation succeeded, but actual RPC calls should fail
proxy.Close()
// Test that RPC operations fail when server is not available
err = proxy.PublishToBitmask([]byte("test"), []byte("data"))
assert.Error(t, err, "should fail when server is not available")
} else {
// Creation failed, which is also acceptable
assert.Error(t, err)
}
}
func TestBasicPublishSubscribe(t *testing.T) {
// Mark first todo as completed and start next one
defer func() {
// Update todos at the end of this test
}()
// Create configs first
p2pConfig, engineConfig, err := createTestConfigs()
require.NoError(t, err)
mockPubSub := newMockPubSub()
serverAddr, cleanup := setupTestServer(t, mockPubSub, p2pConfig)
defer cleanup()
// Parse server address to update stream multiaddr
host, port, err := net.SplitHostPort(serverAddr)
require.NoError(t, err)
p2pConfig.StreamListenMultiaddr = fmt.Sprintf("/ip4/%s/tcp/%s", host, port)
// Create proxy client
proxy, err := rpc.NewProxyBlossomSub(p2pConfig, engineConfig, zap.NewNop(), 1)
require.NoError(t, err)
defer proxy.Close()
// Test basic operations
testBitmask := []byte("test-bitmask")
testData := []byte("hello world")
// Test publish
err = proxy.PublishToBitmask(testBitmask, testData)
assert.NoError(t, err)
// Verify data was published to mock
mockPubSub.mu.RLock()
publishedData := mockPubSub.publishedData[string(testBitmask)]
mockPubSub.mu.RUnlock()
assert.Equal(t, testData, publishedData)
// Test subscribe
receivedMessages := make(chan *pb.Message, 1)
err = proxy.Subscribe(testBitmask, func(message *pb.Message) error {
receivedMessages <- message
return nil
})
assert.NoError(t, err)
// Give subscription time to be established through gRPC stream
time.Sleep(200 * time.Millisecond)
// Publish another message
testData2 := []byte("hello again")
err = proxy.PublishToBitmask(testBitmask, testData2)
assert.NoError(t, err)
// Wait for message
select {
case msg := <-receivedMessages:
assert.Equal(t, testData2, msg.Data)
assert.Equal(t, testBitmask, msg.Bitmask)
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for message")
}
// Test other basic methods
peerID := proxy.GetPeerID()
assert.NotNil(t, peerID)
count := proxy.GetPeerstoreCount()
assert.Equal(t, 5, count)
networkCount := proxy.GetNetworkPeersCount()
assert.Equal(t, 10, networkCount)
}
func TestValidatorRegistration(t *testing.T) {
p2pConfig, engineConfig, err := createTestConfigs()
require.NoError(t, err)
mockPubSub := newMockPubSub()
serverAddr, cleanup := setupTestServer(t, mockPubSub, p2pConfig)
defer cleanup()
host, port, err := net.SplitHostPort(serverAddr)
require.NoError(t, err)
p2pConfig.StreamListenMultiaddr = fmt.Sprintf("/ip4/%s/tcp/%s", host, port)
proxy, err := rpc.NewProxyBlossomSub(p2pConfig, engineConfig, zap.NewNop(), 1)
require.NoError(t, err)
defer proxy.Close()
testBitmask := []byte("validator-test")
validationResults := make(chan p2ptypes.ValidationResult, 1)
// Register validator
err = proxy.RegisterValidator(testBitmask, func(peerID peer.ID, message *pb.Message) p2ptypes.ValidationResult {
// Validate message
if string(message.Data) == "valid" {
validationResults <- p2ptypes.ValidationResultAccept
return p2ptypes.ValidationResultAccept
}
validationResults <- p2ptypes.ValidationResultReject
return p2ptypes.ValidationResultReject
}, false)
assert.NoError(t, err)
// Give some time for validator to be registered
time.Sleep(100 * time.Millisecond)
// Test unregistering validator
err = proxy.UnregisterValidator(testBitmask)
assert.NoError(t, err)
}
func TestTLSConnection(t *testing.T) {
p2pConfig, engineConfig, err := createTestConfigs()
require.NoError(t, err)
// Test TLS credential creation with valid Ed448 key
tlsCreds, err := p2p.NewPeerAuthenticator(
zap.NewNop(),
p2pConfig,
nil,
nil,
nil,
nil,
nil,
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer,
},
nil,
).CreateServerTLSCredentials()
require.NoError(t, err, "should be able to create TLS credentials with valid Ed448 key")
assert.NotNil(t, tlsCreds, "TLS credentials should not be nil")
// Test TLS connection by setting up a TLS server and connecting to it
mockPubSub := newMockPubSub()
// Find available port
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
addr := listener.Addr().String()
listener.Close()
// Create gRPC server with TLS credentials
server := grpc.NewServer(grpc.Creds(tlsCreds))
proxyServer := rpc.NewPubSubProxyServer(mockPubSub, zap.NewNop())
protobufs.RegisterPubSubProxyServer(server, proxyServer)
// Start TLS server
listener, err = net.Listen("tcp", addr)
require.NoError(t, err)
defer listener.Close()
go func() {
if err := server.Serve(listener); err != nil {
t.Logf("TLS Server error: %v", err)
}
}()
defer server.Stop()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
// Update p2p config to use the TLS server address
host, port, err := net.SplitHostPort(addr)
require.NoError(t, err)
p2pConfig.StreamListenMultiaddr = fmt.Sprintf("/ip4/%s/tcp/%s", host, port)
// Create proxy client that should connect with TLS
proxy, err := rpc.NewProxyBlossomSub(p2pConfig, engineConfig, zap.NewNop(), 1)
require.NoError(t, err, "should be able to create proxy with TLS connection")
defer proxy.Close()
// Test that we can actually use the TLS connection
testBitmask := []byte("tls-test")
testData := []byte("tls message")
err = proxy.PublishToBitmask(testBitmask, testData)
assert.NoError(t, err, "should be able to publish over TLS connection")
// Verify the message was received by the server
mockPubSub.mu.RLock()
publishedData := mockPubSub.publishedData[string(testBitmask)]
mockPubSub.mu.RUnlock()
assert.Equal(t, testData, publishedData, "message should have been transmitted over TLS")
}
func TestTLSXSignConnection(t *testing.T) {
// Create server with one set of keys
serverP2PConfig, _, err := createTestConfigs()
require.NoError(t, err)
// Create client with different keys
clientP2PConfig, _, err := createTestConfigs()
require.NoError(t, err)
// Make sure client has different keys than server by regenerating
privKey, _, err := crypto.GenerateEd448Key(rand.Reader)
require.NoError(t, err)
privKeyBytes, err := privKey.Raw()
require.NoError(t, err)
clientP2PConfig.PeerPrivKey = hex.EncodeToString(privKeyBytes)
serverTLSCreds, err := p2p.NewPeerAuthenticator(
zap.NewNop(),
serverP2PConfig,
nil,
nil,
nil,
nil,
nil,
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer,
},
nil,
).CreateServerTLSCredentials()
require.NoError(t, err, "should be able to create TLS credentials with valid Ed448 key")
assert.NotNil(t, serverTLSCreds, "TLS credentials should not be nil")
// Test TLS connection by setting up a TLS server and connecting to it
mockPubSub := newMockPubSub()
// Find available port
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
addr := listener.Addr().String()
listener.Close()
// Create gRPC server with TLS credentials
server := grpc.NewServer(grpc.Creds(serverTLSCreds))
proxyServer := rpc.NewPubSubProxyServer(mockPubSub, zap.NewNop())
protobufs.RegisterPubSubProxyServer(server, proxyServer)
// Start TLS server
listener, err = net.Listen("tcp", addr)
require.NoError(t, err)
defer listener.Close()
go func() {
if err := server.Serve(listener); err != nil {
t.Logf("TLS Server error: %v", err)
}
}()
defer server.Stop()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
// Update p2p config to use the TLS server address
host, port, err := net.SplitHostPort(addr)
require.NoError(t, err)
serverP2PConfig.StreamListenMultiaddr = fmt.Sprintf("/ip4/%s/tcp/%s", host, port)
pkb, _ := hex.DecodeString(serverP2PConfig.PeerPrivKey)
pk, _ := crypto.UnmarshalEd448PublicKey(pkb[57:])
peerid, _ := peer.IDFromPublicKey(pk)
// Create proxy client that should connect with TLS
tlsCreds, err := p2p.NewPeerAuthenticator(
zap.NewNop(),
serverP2PConfig,
nil,
nil,
nil,
nil,
nil,
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer,
},
nil,
).CreateClientTLSCredentials([]byte(peerid))
require.NoError(t, err)
// Create gRPC connection with TLS
conn, err := grpc.Dial(
fmt.Sprintf("%s:%s", host, port),
grpc.WithTransportCredentials(tlsCreds),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(100*1024*1024)),
)
assert.NoError(t, err)
defer conn.Close()
// Create the proxy client
client := rpc.NewPubSubProxyClient(context.TODO(), conn, zap.NewNop())
// Test that we can actually use the TLS connection
testBitmask := []byte("tls-test")
testData := []byte("tls message")
err = client.PublishToBitmask(testBitmask, testData)
assert.NoError(t, err, "should be able to publish over TLS connection")
// Verify the message was received by the server
mockPubSub.mu.RLock()
publishedData := mockPubSub.publishedData[string(testBitmask)]
mockPubSub.mu.RUnlock()
assert.Equal(t, testData, publishedData, "message should have been transmitted over TLS")
}
func TestTLSKeyMismatch(t *testing.T) {
// Create server with one set of keys
serverP2PConfig, _, err := createTestConfigs()
require.NoError(t, err)
// Create client with different keys
clientP2PConfig, clientEngineConfig, err := createTestConfigs()
require.NoError(t, err)
// Make sure client has different keys than server by regenerating
privKey, _, err := crypto.GenerateEd448Key(rand.Reader)
require.NoError(t, err)
privKeyBytes, err := privKey.Raw()
require.NoError(t, err)
clientP2PConfig.PeerPrivKey = hex.EncodeToString(privKeyBytes)
serverTLSCreds, err := p2p.NewPeerAuthenticator(zap.NewNop(), serverP2PConfig, nil, nil, nil, nil, nil,
map[string]channel.AllowedPeerPolicyType{
"quilibrium.node.proxy.pb.PubSubProxy": channel.OnlySelfPeer,
}, nil).CreateServerTLSCredentials()
require.NoError(t, err)
// Set up TLS server with server keys
mockPubSub := newMockPubSub()
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
addr := listener.Addr().String()
listener.Close()
// Create server with server TLS credentials
server := grpc.NewServer(grpc.Creds(serverTLSCreds))
proxyServer := rpc.NewPubSubProxyServer(mockPubSub, zap.NewNop())
protobufs.RegisterPubSubProxyServer(server, proxyServer)
listener, err = net.Listen("tcp", addr)
require.NoError(t, err)
defer listener.Close()
go func() {
if err := server.Serve(listener); err != nil {
t.Logf("TLS Server error: %v", err)
}
}()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// Update client config to connect to server
host, port, err := net.SplitHostPort(addr)
require.NoError(t, err)
clientP2PConfig.StreamListenMultiaddr = fmt.Sprintf("/ip4/%s/tcp/%s", host, port)
// Try to create proxy client with different keys - this should fail
proxy, err := rpc.NewProxyBlossomSub(clientP2PConfig, clientEngineConfig, zap.NewNop(), 1)
if err == nil {
// If proxy creation succeeded, operations should fail due to TLS mismatch
defer proxy.Close()
err = proxy.PublishToBitmask([]byte("test"), []byte("data"))
assert.Error(t, err, "operations should fail with TLS key mismatch")
assert.Contains(t, err.Error(), "authentication handshake failed", "error should be authentication-related")
} else {
// Proxy creation failed due to TLS issues, which is expected
assert.Error(t, err, "should fail to create proxy with mismatched TLS keys")
assert.Contains(t, err.Error(), "authentication handshake failed", "error should be authentication-related")
}
}
func TestTLSBidirectionalVerification(t *testing.T) {
// This test verifies that TLS verification works in both directions:
// 1. Client verifies server certificate
// 2. Server verifies client certificate
// Create shared key config for both server and client
sharedP2PConfig, engineConfig, err := createTestConfigs()
require.NoError(t, err)
mockPubSub := newMockPubSub()
// Start server with shared key
serverAddr, cleanup := setupTestServer(t, mockPubSub, sharedP2PConfig)
defer cleanup()
// Update config for client to use server address
host, port, err := net.SplitHostPort(serverAddr)
require.NoError(t, err)
sharedP2PConfig.StreamListenMultiaddr = fmt.Sprintf("/ip4/%s/tcp/%s", host, port)
// Create client with same shared key - this should succeed
proxy, err := rpc.NewProxyBlossomSub(sharedP2PConfig, engineConfig, zap.NewNop(), 1)
require.NoError(t, err, "should successfully connect with matching keys")
defer proxy.Close()
// Verify the connection works for operations
testBitmask := []byte("bidirectional-test")
testData := []byte("bidirectional message")
err = proxy.PublishToBitmask(testBitmask, testData)
assert.NoError(t, err, "should be able to publish over bidirectionally verified TLS")
// Verify the message was received
mockPubSub.mu.RLock()
publishedData := mockPubSub.publishedData[string(testBitmask)]
mockPubSub.mu.RUnlock()
assert.Equal(t, testData, publishedData, "message should have been transmitted successfully")
}
// Integration test that runs the full proxy system
func TestFullProxyIntegration(t *testing.T) {
t.Log("Starting full proxy integration test")
p2pConfig, engineConfig, err := createTestConfigs()
require.NoError(t, err)
mockPubSub := newMockPubSub()
serverAddr, cleanup := setupTestServer(t, mockPubSub, p2pConfig)
defer cleanup()
host, port, err := net.SplitHostPort(serverAddr)
require.NoError(t, err)
p2pConfig.StreamListenMultiaddr = fmt.Sprintf("/ip4/%s/tcp/%s", host, port)
// Create multiple proxy clients to simulate workers
var proxies []*rpc.ProxyBlossomSub
for i := 1; i <= 3; i++ {
proxy, err := rpc.NewProxyBlossomSub(p2pConfig, engineConfig, zap.NewNop(), uint(i))
require.NoError(t, err)
proxies = append(proxies, proxy)
defer proxy.Close()
}
testBitmask := []byte("integration-test")
// Set up subscribers on all proxies
messageCount := make([]int, len(proxies))
var messageCountMu sync.Mutex
var wg sync.WaitGroup
for i, proxy := range proxies {
i := i // capture loop variable
proxy := proxy
err := proxy.Subscribe(testBitmask, func(message *pb.Message) error {
messageCountMu.Lock()
messageCount[i]++
count := messageCount[i]
messageCountMu.Unlock()
t.Logf("Proxy %d received message %d: %s", i, count, string(message.Data))
wg.Done()
return nil
})
require.NoError(t, err)
}
// Give subscriptions time to be established
time.Sleep(200 * time.Millisecond)
// Publish messages from different proxies
messages := []string{"msg1", "msg2", "msg3"}
wg.Add(len(proxies) * len(messages)) // Each proxy should receive each message
for i, msg := range messages {
t.Logf("Publishing message %d: %s from proxy %d", i, msg, i)
err := proxies[i].PublishToBitmask(testBitmask, []byte(msg))
require.NoError(t, err)
// Add small delay between messages
time.Sleep(50 * time.Millisecond)
}
// Wait for all messages to be received (with timeout)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
t.Log("All messages received successfully")
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for messages")
}
// Verify each proxy received all messages (correct pubsub behavior)
for i, count := range messageCount {
t.Logf("Proxy %d received %d messages", i, count)
assert.Equal(t, len(messages), count,
"proxy %d should have received %d messages, got %d", i, len(messages), count)
}
}