ceremonyclient/consensus/integration/liveness_test.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

423 lines
16 KiB
Go

package integration
import (
"encoding/hex"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/helper"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker/timeout"
"source.quilibrium.com/quilibrium/monorepo/lifecycle/unittest"
)
// pacemaker timeout
// if your laptop is fast enough, 10 ms is enough
const pmTimeout = 100 * time.Millisecond
// maxTimeoutRebroadcast specifies how often the PaceMaker rebroadcasts
// its timeout state in case there is no progress. We keep the value
// small so we have smaller latency
const maxTimeoutRebroadcast = 1 * time.Second
// If 2 nodes are down in a 7 nodes cluster, the rest of 5 nodes can
// still make progress and reach consensus
func Test2TimeoutOutof7Instances(t *testing.T) {
healthyReplicas := 5
notVotingReplicas := 2
finalRank := uint64(30)
// generate the seven hotstuff participants
participants := helper.WithWeightedIdentityList(healthyReplicas + notVotingReplicas)
instances := make([]*Instance, 0, healthyReplicas+notVotingReplicas)
root := DefaultRoot()
timeouts, err := timeout.NewConfig(pmTimeout, pmTimeout, 1.5, happyPathMaxRoundFailures, maxTimeoutRebroadcast)
require.NoError(t, err)
// set up five instances that work fully
for n := 0; n < healthyReplicas; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithTimeouts(timeouts),
WithBufferLogger(),
WithLocalID(participants[n].Identity()),
WithLoggerParams(consensus.StringParam("status", "healthy")),
WithStopCondition(RankFinalized(finalRank)),
)
instances = append(instances, in)
}
// set up two instances which can't vote, nor propose
for n := healthyReplicas; n < healthyReplicas+notVotingReplicas; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithTimeouts(timeouts),
WithBufferLogger(),
WithLocalID(participants[n].Identity()),
WithLoggerParams(consensus.StringParam("status", "unhealthy")),
WithStopCondition(RankFinalized(finalRank)),
WithOutgoingVotes(DropAllVotes),
WithOutgoingProposals(DropAllProposals),
)
instances = append(instances, in)
}
// connect the communicators of the instances together
Connect(t, instances)
// start all seven instances and wait for them to wrap up
var wg sync.WaitGroup
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition)
wg.Done()
}(in)
}
unittest.AssertReturnsBefore(t, wg.Wait, 20*time.Second, "expect to finish before timeout")
for i, in := range instances {
fmt.Println("=============================================================================")
fmt.Println("INSTANCE", i, "-", hex.EncodeToString([]byte(in.localID)))
fmt.Println("=============================================================================")
in.logger.(*helper.BufferLog).Flush()
}
// check that all instances have the same finalized state
ref := instances[0]
assert.Equal(t, finalRank, ref.forks.FinalizedState().Rank, "expect instance 0 should made enough progress, but didn't")
finalizedRanks := FinalizedRanks(ref)
for i := 1; i < healthyReplicas; i++ {
assert.Equal(t, ref.forks.FinalizedState(), instances[i].forks.FinalizedState(), "instance %d should have same finalized state as first instance")
assert.Equal(t, finalizedRanks, FinalizedRanks(instances[i]), "instance %d should have same finalized rank as first instance")
}
}
// 2 nodes in a 4-node cluster are configured to be able only to send timeout messages (no voting or proposing).
// The other 2 unconstrained nodes should be able to make progress through the recovery path by creating TCs
// for every round, but no state will be finalized, because finalization requires direct 1-chain and QC.
func Test2TimeoutOutof4Instances(t *testing.T) {
healthyReplicas := 2
replicasDroppingHappyPathMsgs := 2
finalRank := uint64(30)
// generate the 4 hotstuff participants
participants := helper.WithWeightedIdentityList(healthyReplicas + replicasDroppingHappyPathMsgs)
instances := make([]*Instance, 0, healthyReplicas+replicasDroppingHappyPathMsgs)
root := DefaultRoot()
timeouts, err := timeout.NewConfig(10*time.Millisecond, 50*time.Millisecond, 1.5, happyPathMaxRoundFailures, maxTimeoutRebroadcast)
require.NoError(t, err)
// set up two instances that work fully
for n := 0; n < healthyReplicas; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithLoggerParams(consensus.StringParam("status", "healthy")),
WithStopCondition(RankReached(finalRank)),
)
instances = append(instances, in)
}
// set up instances which can't vote, nor propose
for n := healthyReplicas; n < healthyReplicas+replicasDroppingHappyPathMsgs; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithLoggerParams(consensus.StringParam("status", "unhealthy")),
WithStopCondition(RankReached(finalRank)),
WithOutgoingVotes(DropAllVotes),
WithIncomingVotes(DropAllVotes),
WithOutgoingProposals(DropAllProposals),
)
instances = append(instances, in)
}
// connect the communicators of the instances together
Connect(t, instances)
// start the instances and wait for them to finish
var wg sync.WaitGroup
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run(t)
require.True(t, errors.Is(err, errStopCondition), "should run until stop condition")
wg.Done()
}(in)
}
unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second, "expect to finish before timeout")
// check that all instances have the same finalized state
ref := instances[0]
finalizedRanks := FinalizedRanks(ref)
assert.Equal(t, []uint64{0}, finalizedRanks, "no rank was finalized, because finalization requires 2 direct chain plus a QC which never happen in this case")
assert.Equal(t, finalRank, ref.pacemaker.CurrentRank(), "expect instance 0 should made enough progress, but didn't")
for i := 1; i < healthyReplicas; i++ {
assert.Equal(t, ref.forks.FinalizedState(), instances[i].forks.FinalizedState(), "instance %d should have same finalized state as first instance", i)
assert.Equal(t, finalizedRanks, FinalizedRanks(instances[i]), "instance %d should have same finalized rank as first instance", i)
assert.Equal(t, finalRank, instances[i].pacemaker.CurrentRank(), "instance %d should have same active rank as first instance", i)
}
}
// If 1 node is down in a 5 nodes cluster, the rest of 4 nodes can
// make progress and reach consensus
func Test1TimeoutOutof5Instances(t *testing.T) {
healthyReplicas := 4
stateedReplicas := 1
finalRank := uint64(30)
// generate the seven hotstuff participants
participants := helper.WithWeightedIdentityList(healthyReplicas + stateedReplicas)
instances := make([]*Instance, 0, healthyReplicas+stateedReplicas)
root := DefaultRoot()
timeouts, err := timeout.NewConfig(pmTimeout, pmTimeout, 1.5, happyPathMaxRoundFailures, maxTimeoutRebroadcast)
require.NoError(t, err)
// set up instances that work fully
for n := 0; n < healthyReplicas; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithLoggerParams(consensus.StringParam("status", "healthy")),
WithStopCondition(RankFinalized(finalRank)),
)
instances = append(instances, in)
}
// set up one instance which can't vote, nor propose
for n := healthyReplicas; n < healthyReplicas+stateedReplicas; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithLoggerParams(consensus.StringParam("status", "unhealthy")),
WithStopCondition(RankReached(finalRank)),
WithOutgoingVotes(DropAllVotes),
WithOutgoingProposals(DropAllProposals),
)
instances = append(instances, in)
}
// connect the communicators of the instances together
Connect(t, instances)
// start all seven instances and wait for them to wrap up
var wg sync.WaitGroup
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition)
wg.Done()
}(in)
}
success := unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second, "expect to finish before timeout")
if !success {
t.Logf("dumping state of system:")
for i, inst := range instances {
t.Logf(
"instance %d: %d %d %d",
i,
inst.pacemaker.CurrentRank(),
inst.pacemaker.LatestQuorumCertificate().GetRank(),
inst.forks.FinalizedState().Rank,
)
}
}
// check that all instances have the same finalized state
ref := instances[0]
finalizedRanks := FinalizedRanks(ref)
assert.Equal(t, finalRank, ref.forks.FinalizedState().Rank, "expect instance 0 should made enough progress, but didn't")
for i := 1; i < healthyReplicas; i++ {
assert.Equal(t, ref.forks.FinalizedState(), instances[i].forks.FinalizedState(), "instance %d should have same finalized state as first instance")
assert.Equal(t, finalizedRanks, FinalizedRanks(instances[i]), "instance %d should have same finalized rank as first instance")
}
}
// TestStateDelayIsHigherThanTimeout tests an edge case protocol edge case, where
// - The state arrives in time for replicas to vote.
// - The next primary does not respond in time with a follow-up proposal,
// so nodes start sending TimeoutStates.
// - However, eventually, the next primary successfully constructs a QC and a new
// state before a TC leads to the round timing out.
//
// This test verifies that nodes still make progress on the happy path (QC constructed),
// despite already having initiated the timeout.
// Example scenarios, how this timing edge case could manifest:
// - state delay is very close (or larger) than round duration
// - delayed message transmission (specifically votes) within network
// - overwhelmed / slowed-down primary
// - byzantine primary
//
// Implementation:
// - We have 4 nodes in total where the TimeoutStates from two of them are always
// discarded. Therefore, no TC can be constructed.
// - To force nodes to initiate the timeout (i.e. send TimeoutStates), we set
// the `stateRateDelay` to _twice_ the PaceMaker Timeout. Furthermore, we configure
// the PaceMaker to only increase timeout duration after 6 successive round failures.
func TestStateDelayIsHigherThanTimeout(t *testing.T) {
healthyReplicas := 2
replicasNotGeneratingTimeouts := 2
finalRank := uint64(20)
// generate the 4 hotstuff participants
participants := helper.WithWeightedIdentityList(healthyReplicas + replicasNotGeneratingTimeouts)
instances := make([]*Instance, 0, healthyReplicas+replicasNotGeneratingTimeouts)
root := DefaultRoot()
timeouts, err := timeout.NewConfig(pmTimeout, pmTimeout, 1.5, happyPathMaxRoundFailures, maxTimeoutRebroadcast)
require.NoError(t, err)
// set up 2 instances that fully work (incl. sending TimeoutStates)
for n := 0; n < healthyReplicas; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithStopCondition(RankFinalized(finalRank)),
)
instances = append(instances, in)
}
// set up two instances which don't generate and receive timeout states
for n := healthyReplicas; n < healthyReplicas+replicasNotGeneratingTimeouts; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithStopCondition(RankFinalized(finalRank)),
WithIncomingTimeoutStates(DropAllTimeoutStates),
WithOutgoingTimeoutStates(DropAllTimeoutStates),
)
instances = append(instances, in)
}
// connect the communicators of the instances together
Connect(t, instances)
// start all 4 instances and wait for them to wrap up
var wg sync.WaitGroup
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition)
wg.Done()
}(in)
}
unittest.AssertReturnsBefore(t, wg.Wait, 10*time.Second, "expect to finish before timeout")
// check that all instances have the same finalized state
ref := instances[0]
assert.Equal(t, finalRank, ref.forks.FinalizedState().Rank, "expect instance 0 should made enough progress, but didn't")
finalizedRanks := FinalizedRanks(ref)
// in this test we rely on QC being produced in each rank
// make sure that all ranks are strictly in increasing order with no gaps
for i := 1; i < len(finalizedRanks); i++ {
// finalized ranks are sorted in descending order
if finalizedRanks[i-1] != finalizedRanks[i]+1 {
t.Fatalf("finalized ranks series has gap, this is not expected: %v", finalizedRanks)
return
}
}
for i := 1; i < healthyReplicas; i++ {
assert.Equal(t, ref.forks.FinalizedState(), instances[i].forks.FinalizedState(), "instance %d should have same finalized state as first instance")
assert.Equal(t, finalizedRanks, FinalizedRanks(instances[i]), "instance %d should have same finalized rank as first instance")
}
}
// TestAsyncClusterStartup tests a realistic scenario where nodes are started asynchronously:
// - Replicas are started in sequential order
// - Each replica skips voting for first state(emulating message omission).
// - Each replica skips first Timeout State (emulating message omission).
// - At this point protocol loses liveness unless a timeout rebroadcast happens from super-majority of replicas.
//
// This test verifies that nodes still make progress, despite first TO messages being lost.
// Implementation:
// - We have 4 replicas in total, each of them skips voting for first rank to force a timeout
// - State TSs for whole committee until each replica has generated its first TO.
// - After each replica has generated a timeout allow subsequent timeout rebroadcasts to make progress.
func TestAsyncClusterStartup(t *testing.T) {
replicas := 4
finalRank := uint64(20)
// generate the four hotstuff participants
participants := helper.WithWeightedIdentityList(replicas)
instances := make([]*Instance, 0, replicas)
root := DefaultRoot()
timeouts, err := timeout.NewConfig(pmTimeout, pmTimeout, 1.5, 6, maxTimeoutRebroadcast)
require.NoError(t, err)
// set up instances that work fully
var lock sync.Mutex
timeoutStateGenerated := make(map[models.Identity]struct{}, 0)
for n := 0; n < replicas; n++ {
in := NewInstance(t,
WithRoot(root),
WithParticipants(participants),
WithLocalID(participants[n].Identity()),
WithTimeouts(timeouts),
WithStopCondition(RankFinalized(finalRank)),
WithOutgoingVotes(func(vote *helper.TestVote) bool {
return vote.Rank == 1
}),
WithOutgoingTimeoutStates(func(object *models.TimeoutState[*helper.TestVote]) bool {
lock.Lock()
defer lock.Unlock()
timeoutStateGenerated[(*object.Vote).ID] = struct{}{}
// start allowing timeouts when every node has generated one
// when nodes will broadcast again, it will go through
return len(timeoutStateGenerated) != replicas
}),
)
instances = append(instances, in)
}
// connect the communicators of the instances together
Connect(t, instances)
// start each node only after previous one has started
var wg sync.WaitGroup
for _, in := range instances {
wg.Add(1)
go func(in *Instance) {
err := in.Run(t)
require.ErrorIs(t, err, errStopCondition)
wg.Done()
}(in)
}
unittest.AssertReturnsBefore(t, wg.Wait, 20*time.Second, "expect to finish before timeout")
// check that all instances have the same finalized state
ref := instances[0]
assert.Equal(t, finalRank, ref.forks.FinalizedState().Rank, "expect instance 0 should made enough progress, but didn't")
finalizedRanks := FinalizedRanks(ref)
for i := 1; i < replicas; i++ {
assert.Equal(t, ref.forks.FinalizedState(), instances[i].forks.FinalizedState(), "instance %d should have same finalized state as first instance")
assert.Equal(t, finalizedRanks, FinalizedRanks(instances[i]), "instance %d should have same finalized rank as first instance")
}
}