ceremonyclient/node/consensus/events/distributor_test.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

678 lines
18 KiB
Go

package events
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
)
// Helper function to create a test global event
func createTestGlobalEvent(eventType consensustime.TimeReelEventType, frameNumber uint64) consensustime.GlobalEvent {
frameHeader := &protobufs.GlobalFrameHeader{
FrameNumber: frameNumber,
Timestamp: time.Now().UnixMilli(),
Output: []byte("test-output"),
}
frame := &protobufs.GlobalFrame{
Header: frameHeader,
}
event := consensustime.GlobalEvent{
Type: eventType,
Frame: frame,
Message: "test message",
}
if eventType == consensustime.TimeReelEventForkDetected {
event.OldHead = &protobufs.GlobalFrame{
Header: &protobufs.GlobalFrameHeader{
FrameNumber: frameNumber - 1,
Timestamp: time.Now().UnixMilli() - 10000,
Output: []byte("old-output"),
},
}
}
return event
}
// Helper function to create a test app event
func createTestAppEvent(eventType consensustime.TimeReelEventType, frameNumber uint64) consensustime.AppEvent {
frame := &protobufs.AppShardFrame{
Header: &protobufs.FrameHeader{
FrameNumber: frameNumber,
Timestamp: time.Now().UnixMilli(),
Prover: []byte("test-prover"),
Output: []byte("test-output"),
Address: []byte("test-address"),
},
}
event := consensustime.AppEvent{
Type: eventType,
Frame: frame,
Message: "test message",
}
if eventType == consensustime.TimeReelEventForkDetected {
event.OldHead = &protobufs.AppShardFrame{
Header: &protobufs.FrameHeader{
FrameNumber: frameNumber - 1,
Timestamp: time.Now().UnixMilli() - 10000,
Prover: []byte("test-prover"),
Output: []byte("old-output"),
Address: []byte("test-address"),
},
}
}
return event
}
func TestGlobalEventDistributor_StartStop(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
distributor := NewGlobalEventDistributor(globalEventCh)
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
// Test starting
go distributor.Start(ctx, func() {})
// Test stopping
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
close(globalEventCh)
}
func TestGlobalEventDistributor_Subscribe(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
distributor := NewGlobalEventDistributor(globalEventCh)
// Subscribe multiple subscribers
sub1Ch := distributor.Subscribe("subscriber1")
sub2Ch := distributor.Subscribe("subscriber2")
sub3Ch := distributor.Subscribe("subscriber3")
assert.NotNil(t, sub1Ch)
assert.NotNil(t, sub2Ch)
assert.NotNil(t, sub3Ch)
// Start the distributor
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
// Send a test event
testEvent := createTestGlobalEvent(consensustime.TimeReelEventNewHead, 100)
globalEventCh <- testEvent
// All subscribers should receive the event
timeout := time.After(1 * time.Second)
for i := 0; i < 3; i++ {
select {
case event1 := <-sub1Ch:
assert.Equal(t, consensus.ControlEventGlobalNewHead, event1.Type)
eventData := event1.Data.(*consensustime.GlobalEvent)
assert.Equal(t, testEvent, *eventData)
case event2 := <-sub2Ch:
assert.Equal(t, consensus.ControlEventGlobalNewHead, event2.Type)
eventData := event2.Data.(*consensustime.GlobalEvent)
assert.Equal(t, testEvent, *eventData)
case event3 := <-sub3Ch:
assert.Equal(t, consensus.ControlEventGlobalNewHead, event3.Type)
eventData := event3.Data.(*consensustime.GlobalEvent)
assert.Equal(t, testEvent, *eventData)
case <-timeout:
t.Fatal("Timeout waiting for events")
}
}
// Stop the distributor
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
close(globalEventCh)
}
func TestGlobalEventDistributor_Unsubscribe(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
distributor := NewGlobalEventDistributor(globalEventCh)
// Subscribe
sub1Ch := distributor.Subscribe("subscriber1")
sub2Ch := distributor.Subscribe("subscriber2")
// Start the distributor
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
// Unsubscribe subscriber1
distributor.Unsubscribe("subscriber1")
// Give a moment for the unsubscribe to take effect
time.Sleep(10 * time.Millisecond)
// Send a test event
testEvent := createTestGlobalEvent(consensustime.TimeReelEventNewHead, 100)
globalEventCh <- testEvent
// Only subscriber2 should receive the event
timeout := time.After(100 * time.Millisecond)
select {
case event := <-sub2Ch:
assert.Equal(t, consensus.ControlEventGlobalNewHead, event.Type)
case <-timeout:
t.Fatal("Timeout waiting for event on subscriber2")
}
// Verify sub1Ch doesn't receive the event
select {
case _, ok := <-sub1Ch:
if ok {
t.Fatal("Unsubscribed channel should not receive events")
}
case <-time.After(50 * time.Millisecond):
// Expected - no event received
}
// Verify sub1Ch is closed
_, ok := <-sub1Ch
assert.False(t, ok, "Unsubscribed channel should be closed")
// Stop the distributor
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
close(globalEventCh)
}
func TestGlobalEventDistributor_EventTypes(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
distributor := NewGlobalEventDistributor(globalEventCh)
// Subscribe
subCh := distributor.Subscribe("test-subscriber")
// Start the distributor
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
// Test NewHead event
newHeadEvent := createTestGlobalEvent(consensustime.TimeReelEventNewHead, 100)
globalEventCh <- newHeadEvent
event := <-subCh
assert.Equal(t, consensus.ControlEventGlobalNewHead, event.Type)
eventData := event.Data.(*consensustime.GlobalEvent)
assert.Equal(t, newHeadEvent, *eventData)
// Test Fork event
forkEvent := createTestGlobalEvent(consensustime.TimeReelEventForkDetected, 101)
globalEventCh <- forkEvent
event = <-subCh
assert.Equal(t, consensus.ControlEventGlobalFork, event.Type)
eventData = event.Data.(*consensustime.GlobalEvent)
assert.Equal(t, forkEvent, *eventData)
// Test Equivocation event
equivocationEvent := createTestGlobalEvent(consensustime.TimeReelEventEquivocationDetected, 102)
globalEventCh <- equivocationEvent
event = <-subCh
assert.Equal(t, consensus.ControlEventGlobalEquivocation, event.Type)
eventData = event.Data.(*consensustime.GlobalEvent)
assert.Equal(t, equivocationEvent, *eventData)
// Stop the distributor
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
close(globalEventCh)
}
func TestGlobalEventDistributor_ContextCancellation(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
distributor := NewGlobalEventDistributor(globalEventCh)
// Create a cancellable context
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
// Start the distributor
go distributor.Start(ctx, func() {})
// Subscribe
subCh := distributor.Subscribe("test-subscriber")
// Cancel the context
cancel()
// Give some time for the goroutine to exit
time.Sleep(100 * time.Millisecond)
// Stop should work gracefully
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
// Channel should be closed
_, ok := <-subCh
assert.False(t, ok)
close(globalEventCh)
}
func TestAppEventDistributor_StartStop(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
appEventCh := make(chan consensustime.AppEvent, 10)
distributor := NewAppEventDistributor(globalEventCh, appEventCh)
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
// Test starting
go distributor.Start(ctx, func() {})
// Test stopping
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
close(globalEventCh)
close(appEventCh)
}
func TestAppEventDistributor_GlobalAndAppEvents(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
appEventCh := make(chan consensustime.AppEvent, 10)
distributor := NewAppEventDistributor(globalEventCh, appEventCh)
// Subscribe
subCh := distributor.Subscribe("test-subscriber")
// Start the distributor
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
// Test Global event
globalEvent := createTestGlobalEvent(consensustime.TimeReelEventNewHead, 100)
globalEventCh <- globalEvent
event := <-subCh
assert.Equal(t, consensus.ControlEventGlobalNewHead, event.Type)
globalEventData := event.Data.(*consensustime.GlobalEvent)
assert.Equal(t, globalEvent, *globalEventData)
// Test App event
appEvent := createTestAppEvent(consensustime.TimeReelEventNewHead, 200)
appEventCh <- appEvent
event = <-subCh
assert.Equal(t, consensus.ControlEventAppNewHead, event.Type)
appEventData := event.Data.(*consensustime.AppEvent)
assert.Equal(t, appEvent, *appEventData)
// Stop the distributor
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
close(globalEventCh)
close(appEventCh)
}
func TestAppEventDistributor_AllEventTypes(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
appEventCh := make(chan consensustime.AppEvent, 10)
distributor := NewAppEventDistributor(globalEventCh, appEventCh)
// Subscribe
subCh := distributor.Subscribe("test-subscriber")
// Start the distributor
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
// Test all global event types
globalNewHead := createTestGlobalEvent(consensustime.TimeReelEventNewHead, 100)
globalEventCh <- globalNewHead
event := <-subCh
assert.Equal(t, consensus.ControlEventGlobalNewHead, event.Type)
globalFork := createTestGlobalEvent(consensustime.TimeReelEventForkDetected, 101)
globalEventCh <- globalFork
event = <-subCh
assert.Equal(t, consensus.ControlEventGlobalFork, event.Type)
globalEquivocation := createTestGlobalEvent(consensustime.TimeReelEventEquivocationDetected, 102)
globalEventCh <- globalEquivocation
event = <-subCh
assert.Equal(t, consensus.ControlEventGlobalEquivocation, event.Type)
// Test all app event types
appNewHead := createTestAppEvent(consensustime.TimeReelEventNewHead, 200)
appEventCh <- appNewHead
event = <-subCh
assert.Equal(t, consensus.ControlEventAppNewHead, event.Type)
appFork := createTestAppEvent(consensustime.TimeReelEventForkDetected, 201)
appEventCh <- appFork
event = <-subCh
assert.Equal(t, consensus.ControlEventAppFork, event.Type)
appEquivocation := createTestAppEvent(consensustime.TimeReelEventEquivocationDetected, 202)
appEventCh <- appEquivocation
event = <-subCh
assert.Equal(t, consensus.ControlEventAppEquivocation, event.Type)
// Stop the distributor
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
close(globalEventCh)
close(appEventCh)
}
func TestAppEventDistributor_MultipleSubscribers(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
appEventCh := make(chan consensustime.AppEvent, 10)
distributor := NewAppEventDistributor(globalEventCh, appEventCh)
// Subscribe multiple subscribers
sub1Ch := distributor.Subscribe("subscriber1")
sub2Ch := distributor.Subscribe("subscriber2")
// Start the distributor
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
// Send events
globalEvent := createTestGlobalEvent(consensustime.TimeReelEventNewHead, 100)
appEvent := createTestAppEvent(consensustime.TimeReelEventNewHead, 200)
globalEventCh <- globalEvent
appEventCh <- appEvent
// Both subscribers should receive both events
receivedGlobal := 0
receivedApp := 0
timeout := time.After(1 * time.Second)
for receivedGlobal < 2 || receivedApp < 2 {
select {
case event := <-sub1Ch:
if event.Type == consensus.ControlEventGlobalNewHead {
receivedGlobal++
} else if event.Type == consensus.ControlEventAppNewHead {
receivedApp++
}
case event := <-sub2Ch:
if event.Type == consensus.ControlEventGlobalNewHead {
receivedGlobal++
} else if event.Type == consensus.ControlEventAppNewHead {
receivedApp++
}
case <-timeout:
t.Fatal("Timeout waiting for events")
}
}
assert.Equal(t, 2, receivedGlobal)
assert.Equal(t, 2, receivedApp)
// Stop the distributor
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
close(globalEventCh)
close(appEventCh)
}
func TestAppEventDistributor_ChannelClosure(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
appEventCh := make(chan consensustime.AppEvent, 10)
distributor := NewAppEventDistributor(globalEventCh, appEventCh)
// Subscribe
subCh := distributor.Subscribe("test-subscriber")
// Start the distributor
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
// Close the input channels
close(globalEventCh)
// Give some time for the goroutine to exit
time.Sleep(100 * time.Millisecond)
// Stop should work gracefully
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
// Subscriber channel should be closed
_, ok := <-subCh
assert.False(t, ok)
close(appEventCh)
}
func TestControlEventDataTypes(t *testing.T) {
// Test that all event data types implement the interface
var _ consensus.ControlEventData = StartEventData{}
var _ consensus.ControlEventData = StopEventData{}
var _ consensus.ControlEventData = HaltEventData{}
var _ consensus.ControlEventData = ResumeEventData{}
var _ consensus.ControlEventData = &consensustime.GlobalEvent{}
var _ consensus.ControlEventData = &consensustime.AppEvent{}
}
func TestConcurrentSubscribeUnsubscribe(t *testing.T) {
globalEventCh := make(chan consensustime.GlobalEvent, 10)
distributor := NewGlobalEventDistributor(globalEventCh)
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
// Concurrently subscribe and unsubscribe
done := make(chan bool)
// Subscriber goroutines
for i := 0; i < 10; i++ {
go func(id int) {
subID := fmt.Sprintf("subscriber%d", id)
ch := distributor.Subscribe(subID)
// Wait for an event
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
}
distributor.Unsubscribe(subID)
done <- true
}(i)
}
// Send events while subscribing/unsubscribing
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
event := createTestGlobalEvent(consensustime.TimeReelEventNewHead, uint64(i))
globalEventCh <- event
time.Sleep(10 * time.Millisecond)
}
}()
// Wait for all goroutines to complete
for i := 0; i < 10; i++ {
<-done
}
// Stop the distributor
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(t, err)
}
wg.Wait()
close(globalEventCh)
}
// Benchmark tests
func BenchmarkGlobalEventDistributor_Broadcast(b *testing.B) {
globalEventCh := make(chan consensustime.GlobalEvent, 1000)
distributor := NewGlobalEventDistributor(globalEventCh)
// Subscribe 100 subscribers with consumers
var wg sync.WaitGroup
done := make(chan struct{})
for i := 0; i < 100; i++ {
ch := distributor.Subscribe(fmt.Sprintf("subscriber%d", i))
wg.Add(1)
go func(subCh <-chan consensus.ControlEvent) {
defer wg.Done()
for {
select {
case <-subCh:
case <-done:
return
}
}
}(ch)
}
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
b.ResetTimer()
// Send b.N events
for i := 0; i < b.N; i++ {
event := createTestGlobalEvent(consensustime.TimeReelEventNewHead, uint64(i))
globalEventCh <- event
}
// Wait a bit for events to be processed
time.Sleep(100 * time.Millisecond)
b.StopTimer()
// Signal consumers to stop
close(done)
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(b, err)
}
close(globalEventCh)
// Wait for all consumers to finish
wg.Wait()
}
func BenchmarkAppEventDistributor_MixedEvents(b *testing.B) {
globalEventCh := make(chan consensustime.GlobalEvent, 1000)
appEventCh := make(chan consensustime.AppEvent, 1000)
distributor := NewAppEventDistributor(globalEventCh, appEventCh)
// Subscribe 100 subscribers with consumers
var wg sync.WaitGroup
done := make(chan struct{})
for i := 0; i < 100; i++ {
ch := distributor.Subscribe(fmt.Sprintf("subscriber%d", i))
wg.Add(1)
go func(subCh <-chan consensus.ControlEvent) {
defer wg.Done()
for {
select {
case <-subCh:
case <-done:
return
}
}
}(ch)
}
ctx, cancel, errCh := lifecycle.WithSignallerAndCancel(context.Background())
go distributor.Start(ctx, func() {})
b.ResetTimer()
// Send b.N/2 events of each type
for i := 0; i < b.N/2; i++ {
globalEvent := createTestGlobalEvent(consensustime.TimeReelEventNewHead, uint64(i))
appEvent := createTestAppEvent(consensustime.TimeReelEventNewHead, uint64(i))
globalEventCh <- globalEvent
appEventCh <- appEvent
}
// Wait a bit for events to be processed
time.Sleep(100 * time.Millisecond)
b.StopTimer()
// Signal consumers to stop
close(done)
cancel()
select {
case <-ctx.Done():
case err, _ := <-errCh:
require.NoError(b, err)
}
close(globalEventCh)
close(appEventCh)
// Wait for all consumers to finish
wg.Wait()
}