mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* wip: conversion of hotstuff from flow into Q-oriented model * bulk of tests * remaining non-integration tests * add integration test, adjust log interface, small tweaks * further adjustments, restore full pacemaker shape * add component lifecycle management+supervisor * further refinements * resolve timeout hanging * mostly finalized state for consensus * bulk of engine swap out * lifecycle-ify most types * wiring nearly complete, missing needed hooks for proposals * plugged in, vetting message validation paths * global consensus, plugged in and verified * app shard now wired in too * do not decode empty keys.yml (#456) * remove obsolete engine.maxFrames config parameter (#454) * default to Info log level unless debug is enabled (#453) * respect config's "logging" section params, remove obsolete single-file logging (#452) * Trivial code cleanup aiming to reduce Go compiler warnings (#451) * simplify range traversal * simplify channel read for single select case * delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24 * simplify range traversal * simplify channel read for single select case * remove redundant type from array * simplify range traversal * simplify channel read for single select case * RC slate * finalize 2.1.0.5 * Update comments in StrictMonotonicCounter Fix comment formatting and clarify description. --------- Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
1104 lines
46 KiB
Go
1104 lines
46 KiB
Go
package eventhandler
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/stretchr/testify/suite"
|
|
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/helper"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/mocks"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker/timeout"
|
|
)
|
|
|
|
const (
|
|
minRepTimeout float64 = 100.0 // Milliseconds
|
|
maxRepTimeout float64 = 600.0 // Milliseconds
|
|
multiplicativeIncrease float64 = 1.5 // multiplicative factor
|
|
happyPathMaxRoundFailures uint64 = 6 // number of failed rounds before first timeout increase
|
|
)
|
|
|
|
// TestPacemaker is a real pacemaker module with logging for rank changes
|
|
type TestPacemaker[
|
|
StateT models.Unique,
|
|
VoteT models.Unique,
|
|
PeerIDT models.Unique,
|
|
CollectedT models.Unique,
|
|
] struct {
|
|
consensus.Pacemaker
|
|
}
|
|
|
|
var _ consensus.Pacemaker = (*TestPacemaker[*nilUnique, *nilUnique, *nilUnique, *nilUnique])(nil)
|
|
|
|
func NewTestPacemaker[
|
|
StateT models.Unique,
|
|
VoteT models.Unique,
|
|
PeerIDT models.Unique,
|
|
CollectedT models.Unique,
|
|
](
|
|
timeoutController *timeout.Controller,
|
|
proposalDelayProvider consensus.ProposalDurationProvider,
|
|
notifier consensus.Consumer[StateT, VoteT],
|
|
store consensus.ConsensusStore[VoteT],
|
|
) *TestPacemaker[StateT, VoteT, PeerIDT, CollectedT] {
|
|
p, err := pacemaker.NewPacemaker[StateT, VoteT](nil, timeoutController, proposalDelayProvider, notifier, store, helper.Logger())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return &TestPacemaker[StateT, VoteT, PeerIDT, CollectedT]{p}
|
|
}
|
|
|
|
func (p *TestPacemaker[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) ReceiveQuorumCertificate(qc models.QuorumCertificate) (*models.NextRank, error) {
|
|
oldRank := p.CurrentRank()
|
|
newRank, err := p.Pacemaker.ReceiveQuorumCertificate(qc)
|
|
fmt.Printf("pacemaker.ReceiveQuorumCertificate old rank: %v, new rank: %v\n", oldRank, p.CurrentRank())
|
|
return newRank, err
|
|
}
|
|
|
|
func (p *TestPacemaker[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) ReceiveTimeoutCertificate(tc models.TimeoutCertificate) (*models.NextRank, error) {
|
|
oldRank := p.CurrentRank()
|
|
newRank, err := p.Pacemaker.ReceiveTimeoutCertificate(tc)
|
|
fmt.Printf("pacemaker.ReceiveTimeoutCertificate old rank: %v, new rank: %v\n", oldRank, p.CurrentRank())
|
|
return newRank, err
|
|
}
|
|
|
|
func (p *TestPacemaker[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) LatestQuorumCertificate() models.QuorumCertificate {
|
|
return p.Pacemaker.LatestQuorumCertificate()
|
|
}
|
|
|
|
func (p *TestPacemaker[
|
|
StateT,
|
|
VoteT,
|
|
PeerIDT,
|
|
CollectedT,
|
|
]) PriorRankTimeoutCertificate() models.TimeoutCertificate {
|
|
return p.Pacemaker.PriorRankTimeoutCertificate()
|
|
}
|
|
|
|
type nodelay struct{}
|
|
|
|
// TargetPublicationTime implements consensus.ProposalDurationProvider.
|
|
func (n *nodelay) TargetPublicationTime(proposalRank uint64, timeRankEntered time.Time, parentStateId models.Identity) time.Time {
|
|
return timeRankEntered
|
|
}
|
|
|
|
var _ consensus.ProposalDurationProvider = (*nodelay)(nil)
|
|
|
|
// using a real pacemaker for testing event handler
|
|
func initPacemaker(t require.TestingT, ctx context.Context, livenessData *models.LivenessState) consensus.Pacemaker {
|
|
notifier := &mocks.Consumer[*helper.TestState, *helper.TestVote]{}
|
|
tc, err := timeout.NewConfig(time.Duration(minRepTimeout*1e6), time.Duration(maxRepTimeout*1e6), multiplicativeIncrease, happyPathMaxRoundFailures, time.Duration(maxRepTimeout*1e6))
|
|
require.NoError(t, err)
|
|
persist := &mocks.ConsensusStore[*helper.TestVote]{}
|
|
persist.On("PutLivenessState", mock.Anything).Return(nil).Maybe()
|
|
persist.On("GetLivenessState", mock.Anything).Return(livenessData, nil).Once()
|
|
pm := NewTestPacemaker[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected](timeout.NewController(tc), pacemaker.NoProposalDelay(), notifier, persist)
|
|
notifier.On("OnStartingTimeout", mock.Anything, mock.Anything).Return()
|
|
notifier.On("OnQuorumCertificateTriggeredRankChange", mock.Anything, mock.Anything, mock.Anything).Return()
|
|
notifier.On("OnTimeoutCertificateTriggeredRankChange", mock.Anything, mock.Anything, mock.Anything).Return()
|
|
notifier.On("OnRankChange", mock.Anything, mock.Anything).Maybe()
|
|
pm.Start(ctx)
|
|
return pm
|
|
}
|
|
|
|
// Committee mocks hotstuff.DynamicCommittee and allows to easily control leader for some rank.
|
|
type Committee struct {
|
|
*mocks.Replicas
|
|
// to mock I'm the leader of a certain rank, add the rank into the keys of leaders field
|
|
leaders map[uint64]struct{}
|
|
}
|
|
|
|
func NewCommittee(t *testing.T) *Committee {
|
|
committee := &Committee{
|
|
Replicas: mocks.NewReplicas(t),
|
|
leaders: make(map[uint64]struct{}),
|
|
}
|
|
committee.On("LeaderForRank", mock.Anything).Return(func(rank uint64) models.Identity {
|
|
_, isLeader := committee.leaders[rank]
|
|
if isLeader {
|
|
return "1"
|
|
}
|
|
return "0"
|
|
}, func(rank uint64) error {
|
|
return nil
|
|
}).Maybe()
|
|
|
|
committee.On("Self").Return("1").Maybe()
|
|
|
|
return committee
|
|
}
|
|
|
|
// The SafetyRules mock will not vote for any state unless the state's ID exists in votable field's key
|
|
type SafetyRules struct {
|
|
*mocks.SafetyRules[*helper.TestState, *helper.TestVote]
|
|
votable map[models.Identity]struct{}
|
|
}
|
|
|
|
func NewSafetyRules(t *testing.T) *SafetyRules {
|
|
safetyRules := &SafetyRules{
|
|
SafetyRules: mocks.NewSafetyRules[*helper.TestState, *helper.TestVote](t),
|
|
votable: make(map[models.Identity]struct{}),
|
|
}
|
|
|
|
// SafetyRules will not vote for any state, unless the stateID exists in votable map
|
|
safetyRules.On("ProduceVote", mock.Anything, mock.Anything).Return(
|
|
func(state *models.SignedProposal[*helper.TestState, *helper.TestVote], _ uint64) **helper.TestVote {
|
|
_, ok := safetyRules.votable[state.State.Identifier]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
v := createVote(state.State)
|
|
return &v
|
|
},
|
|
func(state *models.SignedProposal[*helper.TestState, *helper.TestVote], _ uint64) error {
|
|
_, ok := safetyRules.votable[state.State.Identifier]
|
|
if !ok {
|
|
return models.NewNoVoteErrorf("state not found")
|
|
}
|
|
return nil
|
|
}).Maybe()
|
|
|
|
safetyRules.On("ProduceTimeout", mock.Anything, mock.Anything, mock.Anything).Return(
|
|
func(curRank uint64, newestQC models.QuorumCertificate, lastRankTC models.TimeoutCertificate) *models.TimeoutState[*helper.TestVote] {
|
|
return helper.TimeoutStateFixture(func(timeout *models.TimeoutState[*helper.TestVote]) {
|
|
timeout.Rank = curRank
|
|
timeout.LatestQuorumCertificate = newestQC
|
|
timeout.PriorRankTimeoutCertificate = lastRankTC
|
|
}, helper.WithTimeoutVote(&helper.TestVote{Rank: curRank, ID: helper.MakeIdentity()}))
|
|
},
|
|
func(uint64, models.QuorumCertificate, models.TimeoutCertificate) error { return nil }).Maybe()
|
|
|
|
return safetyRules
|
|
}
|
|
|
|
// Forks mock allows to customize the AddState function by specifying the addProposal callbacks
|
|
type Forks struct {
|
|
*mocks.Forks[*helper.TestState]
|
|
// proposals stores all the proposals that have been added to the forks
|
|
proposals map[models.Identity]*models.State[*helper.TestState]
|
|
finalized uint64
|
|
t require.TestingT
|
|
// addProposal is to customize the logic to change finalized rank
|
|
addProposal func(state *models.State[*helper.TestState]) error
|
|
}
|
|
|
|
func NewForks(t *testing.T, finalized uint64) *Forks {
|
|
f := &Forks{
|
|
Forks: mocks.NewForks[*helper.TestState](t),
|
|
proposals: make(map[models.Identity]*models.State[*helper.TestState]),
|
|
finalized: finalized,
|
|
}
|
|
|
|
f.On("AddValidatedState", mock.Anything).Return(func(proposal *models.State[*helper.TestState]) error {
|
|
fmt.Printf("forks.AddValidatedState received State proposal for rank: %v, QC: %v\n", proposal.Rank, proposal.ParentQuorumCertificate.GetRank())
|
|
return f.addProposal(proposal)
|
|
}).Maybe()
|
|
|
|
f.On("FinalizedRank").Return(func() uint64 {
|
|
return f.finalized
|
|
}).Maybe()
|
|
|
|
f.On("GetState", mock.Anything).Return(func(stateID models.Identity) *models.State[*helper.TestState] {
|
|
b := f.proposals[stateID]
|
|
return b
|
|
}, func(stateID models.Identity) bool {
|
|
b, ok := f.proposals[stateID]
|
|
var rank uint64
|
|
if ok {
|
|
rank = b.Rank
|
|
}
|
|
fmt.Printf("forks.GetState found %v: rank: %v\n", ok, rank)
|
|
return ok
|
|
}).Maybe()
|
|
|
|
f.On("GetStatesForRank", mock.Anything).Return(func(rank uint64) []*models.State[*helper.TestState] {
|
|
proposals := make([]*models.State[*helper.TestState], 0)
|
|
for _, b := range f.proposals {
|
|
if b.Rank == rank {
|
|
proposals = append(proposals, b)
|
|
}
|
|
}
|
|
fmt.Printf("forks.GetStatesForRank found %v state(s) for rank %v\n", len(proposals), rank)
|
|
return proposals
|
|
}).Maybe()
|
|
|
|
f.addProposal = func(state *models.State[*helper.TestState]) error {
|
|
f.proposals[state.Identifier] = state
|
|
if state.ParentQuorumCertificate == nil {
|
|
panic(fmt.Sprintf("state has no QC: %v", state.Rank))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return f
|
|
}
|
|
|
|
// StateProducer mock will always make a valid state, exactly once per rank.
|
|
// If it is requested to make a state twice for the same rank, returns models.NoVoteError
|
|
type StateProducer struct {
|
|
proposerID models.Identity
|
|
producedStateForRank map[uint64]bool
|
|
}
|
|
|
|
func NewStateProducer(proposerID models.Identity) *StateProducer {
|
|
return &StateProducer{
|
|
proposerID: proposerID,
|
|
producedStateForRank: make(map[uint64]bool),
|
|
}
|
|
}
|
|
|
|
func (b *StateProducer) MakeStateProposal(rank uint64, qc models.QuorumCertificate, lastRankTC models.TimeoutCertificate) (*models.SignedProposal[*helper.TestState, *helper.TestVote], error) {
|
|
if b.producedStateForRank[rank] {
|
|
return nil, models.NewNoVoteErrorf("state already produced")
|
|
}
|
|
b.producedStateForRank[rank] = true
|
|
return helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](
|
|
helper.WithProposal[*helper.TestState, *helper.TestVote](
|
|
helper.MakeProposal(helper.WithState(helper.MakeState(
|
|
helper.WithStateRank[*helper.TestState](rank),
|
|
helper.WithStateQC[*helper.TestState](qc),
|
|
helper.WithStateProposer[*helper.TestState](b.proposerID))),
|
|
helper.WithPreviousRankTimeoutCertificate[*helper.TestState](lastRankTC)))), nil
|
|
}
|
|
|
|
func TestEventHandler(t *testing.T) {
|
|
suite.Run(t, new(EventHandlerSuite))
|
|
}
|
|
|
|
// EventHandlerSuite contains mocked state for testing event handler under different scenarios.
|
|
type EventHandlerSuite struct {
|
|
suite.Suite
|
|
|
|
eventhandler *EventHandler[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected]
|
|
|
|
paceMaker consensus.Pacemaker
|
|
forks *Forks
|
|
persist *mocks.ConsensusStore[*helper.TestVote]
|
|
stateProducer *StateProducer
|
|
committee *Committee
|
|
notifier *mocks.Consumer[*helper.TestState, *helper.TestVote]
|
|
safetyRules *SafetyRules
|
|
|
|
initRank uint64 // the current rank at the beginning of the test case
|
|
endRank uint64 // the expected current rank at the end of the test case
|
|
parentProposal *models.SignedProposal[*helper.TestState, *helper.TestVote]
|
|
votingProposal *models.SignedProposal[*helper.TestState, *helper.TestVote]
|
|
qc models.QuorumCertificate
|
|
tc models.TimeoutCertificate
|
|
newrank *models.NextRank
|
|
ctx context.Context
|
|
stop context.CancelFunc
|
|
}
|
|
|
|
func (es *EventHandlerSuite) SetupTest() {
|
|
finalized := uint64(3)
|
|
|
|
es.parentProposal = createProposal(4, 3)
|
|
newestQC := createQC(es.parentProposal.State)
|
|
|
|
livenessData := &models.LivenessState{
|
|
CurrentRank: newestQC.GetRank() + 1,
|
|
LatestQuorumCertificate: newestQC,
|
|
}
|
|
|
|
es.ctx, es.stop = context.WithCancel(context.Background())
|
|
|
|
es.committee = NewCommittee(es.T())
|
|
es.paceMaker = initPacemaker(es.T(), es.ctx, livenessData)
|
|
es.forks = NewForks(es.T(), finalized)
|
|
es.persist = mocks.NewConsensusStore[*helper.TestVote](es.T())
|
|
es.persist.On("PutStarted", mock.Anything).Return(nil).Maybe()
|
|
es.stateProducer = NewStateProducer(es.committee.Self())
|
|
es.safetyRules = NewSafetyRules(es.T())
|
|
es.notifier = mocks.NewConsumer[*helper.TestState, *helper.TestVote](es.T())
|
|
es.notifier.On("OnEventProcessed").Maybe()
|
|
es.notifier.On("OnEnteringRank", mock.Anything, mock.Anything).Maybe()
|
|
es.notifier.On("OnStart", mock.Anything).Maybe()
|
|
es.notifier.On("OnReceiveProposal", mock.Anything, mock.Anything).Maybe()
|
|
es.notifier.On("OnReceiveQuorumCertificate", mock.Anything, mock.Anything).Maybe()
|
|
es.notifier.On("OnReceiveTimeoutCertificate", mock.Anything, mock.Anything).Maybe()
|
|
es.notifier.On("OnPartialTimeoutCertificate", mock.Anything, mock.Anything).Maybe()
|
|
es.notifier.On("OnLocalTimeout", mock.Anything).Maybe()
|
|
es.notifier.On("OnCurrentRankDetails", mock.Anything, mock.Anything, mock.Anything).Maybe()
|
|
|
|
eventhandler, err := NewEventHandler[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected](
|
|
es.paceMaker,
|
|
es.stateProducer,
|
|
es.forks,
|
|
es.persist,
|
|
es.committee,
|
|
es.safetyRules,
|
|
es.notifier,
|
|
helper.Logger(),
|
|
)
|
|
require.NoError(es.T(), err)
|
|
|
|
es.eventhandler = eventhandler
|
|
|
|
es.initRank = livenessData.CurrentRank
|
|
es.endRank = livenessData.CurrentRank
|
|
// voting state is a state for the current rank, which will trigger rank change
|
|
es.votingProposal = createProposal(es.paceMaker.CurrentRank(), es.parentProposal.State.Rank)
|
|
es.qc = helper.MakeQC(helper.WithQCState[*helper.TestState](es.votingProposal.State))
|
|
|
|
// create a TC that will trigger rank change for current rank, based on newest QC
|
|
es.tc = helper.MakeTC(helper.WithTCRank(es.paceMaker.CurrentRank()),
|
|
helper.WithTCNewestQC(es.votingProposal.State.ParentQuorumCertificate))
|
|
es.newrank = &models.NextRank{
|
|
Rank: es.votingProposal.State.Rank + 1, // the vote for the voting proposals will trigger a rank change to the next rank
|
|
}
|
|
|
|
// add es.parentProposal into forks, otherwise we won't vote or propose based on it's QC sicne the parent is unknown
|
|
es.forks.proposals[es.parentProposal.State.Identifier] = es.parentProposal.State
|
|
}
|
|
|
|
// TestStartNewRank_ParentProposalNotFound tests next scenario: constructed TC, it contains NewestQC that references state that we
|
|
// don't know about, proposal can't be generated because we can't be sure that resulting state payload is valid.
|
|
func (es *EventHandlerSuite) TestStartNewRank_ParentProposalNotFound() {
|
|
newestQC := helper.MakeQC(helper.WithQCRank(es.initRank + 10))
|
|
tc := helper.MakeTC(helper.WithTCRank(newestQC.GetRank()+1),
|
|
helper.WithTCNewestQC(newestQC))
|
|
|
|
es.endRank = tc.GetRank() + 1
|
|
|
|
// I'm leader for next state
|
|
es.committee.leaders[es.endRank] = struct{}{}
|
|
|
|
err := es.eventhandler.OnReceiveTimeoutCertificate(tc)
|
|
require.NoError(es.T(), err)
|
|
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
es.forks.AssertCalled(es.T(), "GetState", newestQC.Identity())
|
|
es.notifier.AssertNotCalled(es.T(), "OnOwnProposal", mock.Anything, mock.Anything)
|
|
}
|
|
|
|
// TestOnReceiveProposal_StaleProposal test that proposals lower than finalized rank are not processed at all
|
|
// we are not interested in this data because we already performed finalization of that height.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_StaleProposal() {
|
|
proposal := createProposal(es.forks.FinalizedRank()-1, es.forks.FinalizedRank()-2)
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
es.forks.AssertNotCalled(es.T(), "AddState", proposal)
|
|
}
|
|
|
|
// TestOnReceiveProposal_QCOlderThanCurrentRank tests scenario: received a valid proposal with QC that has older rank,
|
|
// the proposal's QC shouldn't trigger rank change.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_QCOlderThanCurrentRank() {
|
|
proposal := createProposal(es.initRank-1, es.initRank-2)
|
|
|
|
// should not trigger rank change
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
es.forks.AssertCalled(es.T(), "AddValidatedState", proposal.State)
|
|
}
|
|
|
|
// TestOnReceiveProposal_TCOlderThanCurrentRank tests scenario: received a valid proposal with QC and TC that has older rank,
|
|
// the proposal's QC shouldn't trigger rank change.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_TCOlderThanCurrentRank() {
|
|
proposal := createProposal(es.initRank-1, es.initRank-3)
|
|
proposal.PreviousRankTimeoutCertificate = helper.MakeTC(helper.WithTCRank(proposal.State.Rank-1), helper.WithTCNewestQC(proposal.State.ParentQuorumCertificate))
|
|
|
|
// should not trigger rank change
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
es.forks.AssertCalled(es.T(), "AddValidatedState", proposal.State)
|
|
}
|
|
|
|
// TestOnReceiveProposal_NoVote tests scenario: received a valid proposal for cur rank, but not a safe node to vote, and I'm the next leader
|
|
// should not vote.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_NoVote() {
|
|
proposal := createProposal(es.initRank, es.initRank-1)
|
|
|
|
// I'm the next leader
|
|
es.committee.leaders[es.initRank+1] = struct{}{}
|
|
// no vote for this proposal
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
es.forks.AssertCalled(es.T(), "AddValidatedState", proposal.State)
|
|
}
|
|
|
|
// TestOnReceiveProposal_NoVote_ParentProposalNotFound tests scenario: received a valid proposal for cur rank, no parent for this proposal found
|
|
// should not vote.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_NoVote_ParentProposalNotFound() {
|
|
proposal := createProposal(es.initRank, es.initRank-1)
|
|
|
|
// remove parent from known proposals
|
|
delete(es.forks.proposals, proposal.State.ParentQuorumCertificate.Identity())
|
|
|
|
// no vote for this proposal, no parent found
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.Error(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
es.forks.AssertCalled(es.T(), "AddValidatedState", proposal.State)
|
|
}
|
|
|
|
// TestOnReceiveProposal_Vote_NextLeader tests scenario: received a valid proposal for cur rank, safe to vote, I'm the next leader
|
|
// should vote and add vote to VoteAggregator.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_Vote_NextLeader() {
|
|
proposal := createProposal(es.initRank, es.initRank-1)
|
|
|
|
// I'm the next leader
|
|
es.committee.leaders[es.initRank+1] = struct{}{}
|
|
|
|
// proposal is safe to vote
|
|
es.safetyRules.votable[proposal.State.Identifier] = struct{}{}
|
|
|
|
vote := &helper.TestVote{
|
|
StateID: proposal.State.Identifier,
|
|
Rank: proposal.State.Rank,
|
|
}
|
|
|
|
es.notifier.On("OnOwnVote", mock.MatchedBy(func(v **helper.TestVote) bool { return vote.Rank == (*v).Rank && vote.StateID == (*v).StateID }), mock.Anything).Once()
|
|
|
|
// vote should be created for this proposal
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
// TestOnReceiveProposal_Vote_NotNextLeader tests scenario: received a valid proposal for cur rank, safe to vote, I'm not the next leader
|
|
// should vote and send vote to next leader.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_Vote_NotNextLeader() {
|
|
proposal := createProposal(es.initRank, es.initRank-1)
|
|
|
|
// proposal is safe to vote
|
|
es.safetyRules.votable[proposal.State.Identifier] = struct{}{}
|
|
|
|
vote := &helper.TestVote{
|
|
StateID: proposal.State.Identifier,
|
|
Rank: proposal.State.Rank,
|
|
ID: "0",
|
|
}
|
|
|
|
es.notifier.On("OnOwnVote", mock.MatchedBy(func(v **helper.TestVote) bool {
|
|
return vote.Rank == (*v).Rank && vote.StateID == (*v).StateID && vote.ID == (*v).ID
|
|
}), mock.Anything).Once()
|
|
|
|
// vote should be created for this proposal
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
// TestOnReceiveProposal_ProposeAfterReceivingTC tests a scenario where we have received TC which advances to rank where we are
|
|
// leader but no proposal can be created because we don't have parent proposal. After receiving missing parent proposal we have
|
|
// all available data to construct a valid proposal. We need to ensure this.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_ProposeAfterReceivingQC() {
|
|
|
|
qc := es.qc
|
|
|
|
// first process QC this should advance rank
|
|
err := es.eventhandler.OnReceiveQuorumCertificate(qc)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), qc.GetRank()+1, es.paceMaker.CurrentRank(), "expect a rank change")
|
|
es.notifier.AssertNotCalled(es.T(), "OnOwnProposal", mock.Anything, mock.Anything)
|
|
|
|
// we are leader for current rank
|
|
es.committee.leaders[es.paceMaker.CurrentRank()] = struct{}{}
|
|
|
|
es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
|
proposal, ok := args[0].(*models.SignedProposal[*helper.TestState, *helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
// it should broadcast a header as the same as current rank
|
|
require.Equal(es.T(), es.paceMaker.CurrentRank(), proposal.State.Rank)
|
|
}).Once()
|
|
|
|
// processing this proposal shouldn't trigger rank change since we have already seen QC.
|
|
// we have used QC to advance rounds, but no proposal was made because we were missing parent state
|
|
// when we have received parent state we can try proposing again.
|
|
err = es.eventhandler.OnReceiveProposal(es.votingProposal)
|
|
require.NoError(es.T(), err)
|
|
|
|
require.Equal(es.T(), qc.GetRank()+1, es.paceMaker.CurrentRank(), "expect a rank change")
|
|
}
|
|
|
|
// TestOnReceiveProposal_ProposeAfterReceivingTC tests a scenario where we have received TC which advances to rank where we are
|
|
// leader but no proposal can be created because we don't have parent proposal. After receiving missing parent proposal we have
|
|
// all available data to construct a valid proposal. We need to ensure this.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_ProposeAfterReceivingTC() {
|
|
|
|
// TC contains a QC.StateID == es.votingProposal
|
|
tc := helper.MakeTC(helper.WithTCRank(es.votingProposal.State.Rank+1),
|
|
helper.WithTCNewestQC(es.qc))
|
|
|
|
// first process TC this should advance rank
|
|
err := es.eventhandler.OnReceiveTimeoutCertificate(tc)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), tc.GetRank()+1, es.paceMaker.CurrentRank(), "expect a rank change")
|
|
es.notifier.AssertNotCalled(es.T(), "OnOwnProposal", mock.Anything, mock.Anything)
|
|
|
|
// we are leader for current rank
|
|
es.committee.leaders[es.paceMaker.CurrentRank()] = struct{}{}
|
|
|
|
es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
|
proposal, ok := args[0].(*models.SignedProposal[*helper.TestState, *helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
// it should broadcast a header as the same as current rank
|
|
require.Equal(es.T(), es.paceMaker.CurrentRank(), proposal.State.Rank)
|
|
}).Once()
|
|
|
|
// processing this proposal shouldn't trigger rank change, since we have already seen QC.
|
|
// we have used QC to advance rounds, but no proposal was made because we were missing parent state
|
|
// when we have received parent state we can try proposing again.
|
|
err = es.eventhandler.OnReceiveProposal(es.votingProposal)
|
|
require.NoError(es.T(), err)
|
|
|
|
require.Equal(es.T(), tc.GetRank()+1, es.paceMaker.CurrentRank(), "expect a rank change")
|
|
}
|
|
|
|
// TestOnReceiveQuorumCertificate_HappyPath tests that building a QC for current rank triggers rank change. We are not leader for next
|
|
// round, so no proposal is expected.
|
|
func (es *EventHandlerSuite) TestOnReceiveQuorumCertificate_HappyPath() {
|
|
// voting state exists
|
|
es.forks.proposals[es.votingProposal.State.Identifier] = es.votingProposal.State
|
|
|
|
// a qc is built
|
|
qc := createQC(es.votingProposal.State)
|
|
|
|
// new qc is added to forks
|
|
// rank changed
|
|
// I'm not the next leader
|
|
// haven't received state for next rank
|
|
// goes to the new rank
|
|
es.endRank++
|
|
// not the leader of the newrank
|
|
// don't have state for the newrank
|
|
|
|
err := es.eventhandler.OnReceiveQuorumCertificate(qc)
|
|
require.NoError(es.T(), err, "if a vote can trigger a QC to be built,"+
|
|
"and the QC triggered a rank change, then start new rank")
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
es.notifier.AssertNotCalled(es.T(), "OnOwnProposal", mock.Anything, mock.Anything)
|
|
}
|
|
|
|
// TestOnReceiveQuorumCertificate_FutureRank tests that building a QC for future rank triggers rank change
|
|
func (es *EventHandlerSuite) TestOnReceiveQuorumCertificate_FutureRank() {
|
|
// voting state exists
|
|
curRank := es.paceMaker.CurrentRank()
|
|
|
|
// b1 is for current rank
|
|
// b2 and b3 is for future rank, but branched out from the same parent as b1
|
|
b1 := createProposal(curRank, curRank-1)
|
|
b2 := createProposal(curRank+1, curRank-1)
|
|
b3 := createProposal(curRank+2, curRank-1)
|
|
|
|
// a qc is built
|
|
// qc3 is for future rank
|
|
// qc2 is an older than qc3
|
|
// since vote aggregator can concurrently process votes and build qcs,
|
|
// we prepare qcs at different rank to be processed, and verify the rank change.
|
|
qc1 := createQC(b1.State)
|
|
qc2 := createQC(b2.State)
|
|
qc3 := createQC(b3.State)
|
|
|
|
// all three proposals are known
|
|
es.forks.proposals[b1.State.Identifier] = b1.State
|
|
es.forks.proposals[b2.State.Identifier] = b2.State
|
|
es.forks.proposals[b3.State.Identifier] = b3.State
|
|
|
|
// test that qc for future rank should trigger rank change
|
|
err := es.eventhandler.OnReceiveQuorumCertificate(qc3)
|
|
endRank := b3.State.Rank + 1 // next rank
|
|
require.NoError(es.T(), err, "if a vote can trigger a QC to be built,"+
|
|
"and the QC triggered a rank change, then start new rank")
|
|
require.Equal(es.T(), endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
|
|
// the same qc would not trigger rank change
|
|
err = es.eventhandler.OnReceiveQuorumCertificate(qc3)
|
|
endRank = b3.State.Rank + 1 // next rank
|
|
require.NoError(es.T(), err, "same qc should not trigger rank change")
|
|
require.Equal(es.T(), endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
|
|
// old QCs won't trigger rank change
|
|
err = es.eventhandler.OnReceiveQuorumCertificate(qc2)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
|
|
err = es.eventhandler.OnReceiveQuorumCertificate(qc1)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
// TestOnReceiveQuorumCertificate_NextLeaderProposes tests that after receiving a valid proposal for cur rank, and I'm the next leader,
|
|
// a QC can be built for the state, triggered rank change, and I will propose
|
|
func (es *EventHandlerSuite) TestOnReceiveQuorumCertificate_NextLeaderProposes() {
|
|
proposal := createProposal(es.initRank, es.initRank-1)
|
|
qc := createQC(proposal.State)
|
|
// I'm the next leader
|
|
es.committee.leaders[es.initRank+1] = struct{}{}
|
|
// qc triggered rank change
|
|
es.endRank++
|
|
// I'm the leader of cur rank (7)
|
|
// I'm not the leader of next rank (8), trigger rank change
|
|
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
|
|
es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
|
proposal, ok := args[0].(*models.SignedProposal[*helper.TestState, *helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
// it should broadcast a header as the same as endRank
|
|
require.Equal(es.T(), es.endRank, proposal.State.Rank)
|
|
}).Once()
|
|
|
|
// after receiving proposal build QC and deliver it to event handler
|
|
err = es.eventhandler.OnReceiveQuorumCertificate(qc)
|
|
require.NoError(es.T(), err)
|
|
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
es.forks.AssertCalled(es.T(), "AddValidatedState", proposal.State)
|
|
}
|
|
|
|
// TestOnReceiveQuorumCertificate_ProposeOnce tests that after constructing proposal we don't attempt to create another
|
|
// proposal for same rank.
|
|
func (es *EventHandlerSuite) TestOnReceiveQuorumCertificate_ProposeOnce() {
|
|
// I'm the next leader
|
|
es.committee.leaders[es.initRank+1] = struct{}{}
|
|
|
|
es.endRank++
|
|
|
|
es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Once()
|
|
|
|
err := es.eventhandler.OnReceiveProposal(es.votingProposal)
|
|
require.NoError(es.T(), err)
|
|
|
|
// constructing QC triggers making state proposal
|
|
err = es.eventhandler.OnReceiveQuorumCertificate(es.qc)
|
|
require.NoError(es.T(), err)
|
|
|
|
// receiving same proposal again triggers proposing logic
|
|
err = es.eventhandler.OnReceiveProposal(es.votingProposal)
|
|
require.NoError(es.T(), err)
|
|
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
es.notifier.AssertNumberOfCalls(es.T(), "OnOwnProposal", 1)
|
|
}
|
|
|
|
// TestOnTCConstructed_HappyPath tests that building a TC for current rank triggers rank change
|
|
func (es *EventHandlerSuite) TestOnReceiveTimeoutCertificate_HappyPath() {
|
|
// voting state exists
|
|
es.forks.proposals[es.votingProposal.State.Identifier] = es.votingProposal.State
|
|
|
|
// a tc is built
|
|
tc := helper.MakeTC(helper.WithTCRank(es.initRank), helper.WithTCNewestQC(es.votingProposal.State.ParentQuorumCertificate))
|
|
|
|
// expect a rank change
|
|
es.endRank++
|
|
|
|
err := es.eventhandler.OnReceiveTimeoutCertificate(tc)
|
|
require.NoError(es.T(), err, "TC should trigger a rank change and start of new rank")
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
// TestOnTCConstructed_NextLeaderProposes tests that after receiving TC and advancing rank we as next leader create a proposal
|
|
// and broadcast it
|
|
func (es *EventHandlerSuite) TestOnReceiveTimeoutCertificate_NextLeaderProposes() {
|
|
es.committee.leaders[es.tc.GetRank()+1] = struct{}{}
|
|
es.endRank++
|
|
|
|
es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
|
proposal, ok := args[0].(*models.SignedProposal[*helper.TestState, *helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
// it should broadcast a header as the same as endRank
|
|
require.Equal(es.T(), es.endRank, proposal.State.Rank)
|
|
|
|
// proposed state should contain valid newest QC and lastRankTC
|
|
expectedNewestQC := es.paceMaker.LatestQuorumCertificate()
|
|
require.Equal(es.T(), expectedNewestQC, proposal.State.ParentQuorumCertificate)
|
|
require.Equal(es.T(), es.paceMaker.PriorRankTimeoutCertificate(), proposal.PreviousRankTimeoutCertificate)
|
|
}).Once()
|
|
|
|
err := es.eventhandler.OnReceiveTimeoutCertificate(es.tc)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "TC didn't trigger rank change")
|
|
}
|
|
|
|
// TestOnTimeout tests that event handler produces TimeoutState and broadcasts it to other members of consensus
|
|
// committee. Additionally, It has to contribute TimeoutState to timeout aggregation process by sending it to TimeoutAggregator.
|
|
func (es *EventHandlerSuite) TestOnTimeout() {
|
|
es.notifier.On("OnOwnTimeout", mock.Anything).Run(func(args mock.Arguments) {
|
|
timeoutState, ok := args[0].(*models.TimeoutState[*helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
// it should broadcast a TO with same rank as endRank
|
|
require.Equal(es.T(), es.endRank, timeoutState.Rank)
|
|
}).Once()
|
|
|
|
err := es.eventhandler.OnLocalTimeout()
|
|
require.NoError(es.T(), err)
|
|
|
|
// TimeoutState shouldn't trigger rank change
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
// TestOnTimeout_SanityChecks tests a specific scenario where pacemaker have seen both QC and TC for previous rank
|
|
// and EventHandler tries to produce a timeout state, such timeout state is invalid if both QC and TC is present, we
|
|
// need to make sure that EventHandler filters out TC for last rank if we know about QC for same rank.
|
|
func (es *EventHandlerSuite) TestOnTimeout_SanityChecks() {
|
|
// voting state exists
|
|
es.forks.proposals[es.votingProposal.State.Identifier] = es.votingProposal.State
|
|
|
|
// a tc is built
|
|
tc := helper.MakeTC(helper.WithTCRank(es.initRank), helper.WithTCNewestQC(es.votingProposal.State.ParentQuorumCertificate))
|
|
|
|
// expect a rank change
|
|
es.endRank++
|
|
|
|
err := es.eventhandler.OnReceiveTimeoutCertificate(tc)
|
|
require.NoError(es.T(), err, "TC should trigger a rank change and start of new rank")
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
|
|
// receive a QC for the same rank as the TC
|
|
qc := helper.MakeQC(helper.WithQCRank(tc.GetRank()))
|
|
err = es.eventhandler.OnReceiveQuorumCertificate(qc)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "QC shouldn't trigger rank change")
|
|
require.Equal(es.T(), tc, es.paceMaker.PriorRankTimeoutCertificate(), "invalid last rank TC")
|
|
require.Equal(es.T(), qc, es.paceMaker.LatestQuorumCertificate(), "invalid newest QC")
|
|
|
|
es.notifier.On("OnOwnTimeout", mock.Anything).Run(func(args mock.Arguments) {
|
|
timeoutState, ok := args[0].(*models.TimeoutState[*helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
require.Equal(es.T(), es.endRank, timeoutState.Rank)
|
|
require.Equal(es.T(), qc, timeoutState.LatestQuorumCertificate)
|
|
require.Nil(es.T(), timeoutState.PriorRankTimeoutCertificate)
|
|
}).Once()
|
|
|
|
err = es.eventhandler.OnLocalTimeout()
|
|
require.NoError(es.T(), err)
|
|
}
|
|
|
|
// TestOnTimeout_ReplicaEjected tests that EventHandler correctly handles possible errors from SafetyRules and doesn't broadcast
|
|
// timeout states when replica is ejected.
|
|
func (es *EventHandlerSuite) TestOnTimeout_ReplicaEjected() {
|
|
es.Run("no-timeout", func() {
|
|
*es.safetyRules.SafetyRules = *mocks.NewSafetyRules[*helper.TestState, *helper.TestVote](es.T())
|
|
es.safetyRules.On("ProduceTimeout", mock.Anything, mock.Anything, mock.Anything).Return(nil, models.NewNoTimeoutErrorf(""))
|
|
err := es.eventhandler.OnLocalTimeout()
|
|
require.NoError(es.T(), err, "should be handled as sentinel error")
|
|
})
|
|
es.Run("create-timeout-exception", func() {
|
|
*es.safetyRules.SafetyRules = *mocks.NewSafetyRules[*helper.TestState, *helper.TestVote](es.T())
|
|
exception := errors.New("produce-timeout-exception")
|
|
es.safetyRules.On("ProduceTimeout", mock.Anything, mock.Anything, mock.Anything).Return(nil, exception)
|
|
err := es.eventhandler.OnLocalTimeout()
|
|
require.ErrorIs(es.T(), err, exception, "expect a wrapped exception")
|
|
})
|
|
es.notifier.AssertNotCalled(es.T(), "OnOwnTimeout", mock.Anything)
|
|
}
|
|
|
|
// Test100Timeout tests that receiving 100 TCs for increasing ranks advances rounds
|
|
func (es *EventHandlerSuite) Test100Timeout() {
|
|
for i := 0; i < 100; i++ {
|
|
tc := helper.MakeTC(helper.WithTCRank(es.initRank + uint64(i)))
|
|
err := es.eventhandler.OnReceiveTimeoutCertificate(tc)
|
|
es.endRank++
|
|
require.NoError(es.T(), err)
|
|
}
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
// TestLeaderBuild100States tests scenario where leader builds 100 proposals one after another
|
|
func (es *EventHandlerSuite) TestLeaderBuild100States() {
|
|
require.Equal(es.T(), 1, len(es.forks.proposals), "expect Forks to contain only root state")
|
|
|
|
// I'm the leader for the first rank
|
|
es.committee.leaders[es.initRank] = struct{}{}
|
|
|
|
totalRank := 100
|
|
for i := 0; i < totalRank; i++ {
|
|
// I'm the leader for 100 ranks
|
|
// I'm the next leader
|
|
es.committee.leaders[es.initRank+uint64(i+1)] = struct{}{}
|
|
// I can build qc for all 100 ranks
|
|
proposal := createProposal(es.initRank+uint64(i), es.initRank+uint64(i)-1)
|
|
qc := createQC(proposal.State)
|
|
|
|
// for first proposal we need to store the parent otherwise it won't be voted for
|
|
if i == 0 {
|
|
parentState := helper.MakeState(func(state *models.State[*helper.TestState]) {
|
|
state.Identifier = proposal.State.ParentQuorumCertificate.Identity()
|
|
state.Rank = proposal.State.ParentQuorumCertificate.GetRank()
|
|
})
|
|
es.forks.proposals[parentState.Identifier] = parentState
|
|
}
|
|
|
|
es.safetyRules.votable[proposal.State.Identifier] = struct{}{}
|
|
// should trigger 100 rank change
|
|
es.endRank++
|
|
|
|
es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
|
ownProposal, ok := args[0].(*models.SignedProposal[*helper.TestState, *helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
require.Equal(es.T(), proposal.State.Rank+1, ownProposal.State.Rank)
|
|
}).Once()
|
|
vote := &helper.TestVote{
|
|
Rank: proposal.State.Rank,
|
|
StateID: proposal.State.Identifier,
|
|
}
|
|
es.notifier.On("OnOwnVote", mock.MatchedBy(func(v **helper.TestVote) bool { return vote.Rank == (*v).Rank && vote.StateID == (*v).StateID }), mock.Anything).Once()
|
|
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
err = es.eventhandler.OnReceiveQuorumCertificate(qc)
|
|
require.NoError(es.T(), err)
|
|
}
|
|
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
require.Equal(es.T(), totalRank+1, len(es.forks.proposals), "expect Forks to contain root state + 100 proposed states")
|
|
es.notifier.AssertExpectations(es.T())
|
|
}
|
|
|
|
// TestFollowerFollows100States tests scenario where follower receives 100 proposals one after another
|
|
func (es *EventHandlerSuite) TestFollowerFollows100States() {
|
|
// add parent proposal otherwise we can't propose
|
|
parentProposal := createProposal(es.initRank, es.initRank-1)
|
|
es.forks.proposals[parentProposal.State.Identifier] = parentProposal.State
|
|
for i := 0; i < 100; i++ {
|
|
// create each proposal as if they are created by some leader
|
|
proposal := createProposal(es.initRank+uint64(i)+1, es.initRank+uint64(i))
|
|
// as a follower, I receive these proposals
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
es.endRank++
|
|
}
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
require.Equal(es.T(), 100, len(es.forks.proposals)-2)
|
|
}
|
|
|
|
// TestFollowerReceives100Forks tests scenario where follower receives 100 forks built on top of the same state
|
|
func (es *EventHandlerSuite) TestFollowerReceives100Forks() {
|
|
for i := 0; i < 100; i++ {
|
|
// create each proposal as if they are created by some leader
|
|
proposal := createProposal(es.initRank+uint64(i)+1, es.initRank-1)
|
|
proposal.PreviousRankTimeoutCertificate = helper.MakeTC(helper.WithTCRank(es.initRank+uint64(i)),
|
|
helper.WithTCNewestQC(proposal.State.ParentQuorumCertificate))
|
|
// expect a rank change since fork can be made only if last rank has ended with TC.
|
|
es.endRank++
|
|
// as a follower, I receive these proposals
|
|
err := es.eventhandler.OnReceiveProposal(proposal)
|
|
require.NoError(es.T(), err)
|
|
}
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
require.Equal(es.T(), 100, len(es.forks.proposals)-1)
|
|
}
|
|
|
|
// TestStart_ProposeOnce tests that after starting event handler we don't create proposal in case we have already proposed
|
|
// for this rank.
|
|
func (es *EventHandlerSuite) TestStart_ProposeOnce() {
|
|
// I'm the next leader
|
|
es.committee.leaders[es.initRank+1] = struct{}{}
|
|
es.endRank++
|
|
|
|
// STEP 1: simulating events _before_ a crash: EventHandler receives proposal and then a QC for the proposal (from VoteAggregator)
|
|
es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Once()
|
|
err := es.eventhandler.OnReceiveProposal(es.votingProposal)
|
|
require.NoError(es.T(), err)
|
|
|
|
// constructing QC triggers making state proposal
|
|
err = es.eventhandler.OnReceiveQuorumCertificate(es.qc)
|
|
require.NoError(es.T(), err)
|
|
es.notifier.AssertNumberOfCalls(es.T(), "OnOwnProposal", 1)
|
|
|
|
// Here, a hypothetical crash would happen.
|
|
// During crash recovery, Forks and Pacemaker are recovered to have exactly the same in-memory state as before
|
|
// Start triggers proposing logic. But as our own proposal for the rank is already in Forks, we should not propose again.
|
|
err = es.eventhandler.Start(es.ctx)
|
|
require.NoError(es.T(), err)
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
|
|
// assert that broadcast wasn't trigger again, i.e. there should have been only one event `OnOwnProposal` in total
|
|
es.notifier.AssertNumberOfCalls(es.T(), "OnOwnProposal", 1)
|
|
}
|
|
|
|
// TestCreateProposal_SanityChecks tests that proposing logic performs sanity checks when creating new state proposal.
|
|
// Specifically it tests a case where TC contains QC which: TC.Rank == TC.NewestQC.Rank
|
|
func (es *EventHandlerSuite) TestCreateProposal_SanityChecks() {
|
|
// round ended with TC where TC.Rank == TC.NewestQC.Rank
|
|
tc := helper.MakeTC(helper.WithTCRank(es.initRank),
|
|
helper.WithTCNewestQC(helper.MakeQC(helper.WithQCState(es.votingProposal.State))))
|
|
|
|
es.forks.proposals[es.votingProposal.State.Identifier] = es.votingProposal.State
|
|
|
|
// I'm the next leader
|
|
es.committee.leaders[tc.GetRank()+1] = struct{}{}
|
|
|
|
es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
|
proposal, ok := args[0].(*models.SignedProposal[*helper.TestState, *helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
// we need to make sure that produced proposal contains only QC even if there is TC for previous rank as well
|
|
require.Nil(es.T(), proposal.PreviousRankTimeoutCertificate)
|
|
}).Once()
|
|
|
|
err := es.eventhandler.OnReceiveTimeoutCertificate(tc)
|
|
require.NoError(es.T(), err)
|
|
|
|
require.Equal(es.T(), tc.GetLatestQuorumCert(), es.paceMaker.LatestQuorumCertificate())
|
|
require.Equal(es.T(), tc, es.paceMaker.PriorRankTimeoutCertificate())
|
|
require.Equal(es.T(), tc.GetRank()+1, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
// TestOnReceiveProposal_ProposalForActiveRank tests that when receiving proposal for active we don't attempt to create a proposal
|
|
// Receiving proposal can trigger proposing logic only in case we have received missing state for past ranks.
|
|
func (es *EventHandlerSuite) TestOnReceiveProposal_ProposalForActiveRank() {
|
|
// receive proposal where we are leader, meaning that we have produced this proposal
|
|
es.committee.leaders[es.votingProposal.State.Rank] = struct{}{}
|
|
|
|
err := es.eventhandler.OnReceiveProposal(es.votingProposal)
|
|
require.NoError(es.T(), err)
|
|
|
|
es.notifier.AssertNotCalled(es.T(), "OnOwnProposal", mock.Anything, mock.Anything)
|
|
}
|
|
|
|
// TestOnPartialTimeoutCertificateCreated_ProducedTimeout tests that when receiving partial TC for active rank we will create a timeout state
|
|
// immediately.
|
|
func (es *EventHandlerSuite) TestOnPartialTimeoutCertificateCreated_ProducedTimeout() {
|
|
partialTimeoutCertificate := &consensus.PartialTimeoutCertificateCreated{
|
|
Rank: es.initRank,
|
|
NewestQuorumCertificate: es.parentProposal.State.ParentQuorumCertificate,
|
|
PriorRankTimeoutCertificate: nil,
|
|
}
|
|
|
|
es.notifier.On("OnOwnTimeout", mock.Anything).Run(func(args mock.Arguments) {
|
|
timeoutState, ok := args[0].(*models.TimeoutState[*helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
// it should broadcast a TO with same rank as partialTimeoutCertificate.Rank
|
|
require.Equal(es.T(), partialTimeoutCertificate.Rank, timeoutState.Rank)
|
|
}).Once()
|
|
|
|
err := es.eventhandler.OnPartialTimeoutCertificateCreated(partialTimeoutCertificate)
|
|
require.NoError(es.T(), err)
|
|
|
|
// partial TC shouldn't trigger rank change
|
|
require.Equal(es.T(), partialTimeoutCertificate.Rank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
// TestOnPartialTimeoutCertificateCreated_NotActiveRank tests that we don't create timeout state if partial TC was delivered for a past, non-current rank.
|
|
// NOTE: it is not possible to receive a partial timeout for a FUTURE rank, unless the partial timeout contains
|
|
// either a QC/TC allowing us to enter that rank, therefore that case is not covered here.
|
|
// See TestOnPartialTimeoutCertificateCreated_QcAndTimeoutCertificateProcessing instead.
|
|
func (es *EventHandlerSuite) TestOnPartialTimeoutCertificateCreated_NotActiveRank() {
|
|
partialTimeoutCertificate := &consensus.PartialTimeoutCertificateCreated{
|
|
Rank: es.initRank - 1,
|
|
NewestQuorumCertificate: es.parentProposal.State.ParentQuorumCertificate,
|
|
}
|
|
|
|
err := es.eventhandler.OnPartialTimeoutCertificateCreated(partialTimeoutCertificate)
|
|
require.NoError(es.T(), err)
|
|
|
|
// partial TC shouldn't trigger rank change
|
|
require.Equal(es.T(), es.initRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
// we don't want to create timeout if partial TC was delivered for rank different than active one.
|
|
es.notifier.AssertNotCalled(es.T(), "OnOwnTimeout", mock.Anything)
|
|
}
|
|
|
|
// TestOnPartialTimeoutCertificateCreated_QcAndTimeoutCertificateProcessing tests that EventHandler processes QC and TC included in consensus.PartialTimeoutCertificateCreated
|
|
// data structure. This tests cases like the following example:
|
|
// * the pacemaker is in rank 10
|
|
// * we observe a partial timeout for rank 11 with a QC for rank 10
|
|
// * we should change to rank 11 using the QC, then broadcast a timeout for rank 11
|
|
func (es *EventHandlerSuite) TestOnPartialTimeoutCertificateCreated_QcAndTimeoutCertificateProcessing() {
|
|
|
|
testOnPartialTimeoutCertificateCreated := func(partialTimeoutCertificate *consensus.PartialTimeoutCertificateCreated) {
|
|
es.endRank++
|
|
|
|
es.notifier.On("OnOwnTimeout", mock.Anything).Run(func(args mock.Arguments) {
|
|
timeoutState, ok := args[0].(*models.TimeoutState[*helper.TestVote])
|
|
require.True(es.T(), ok)
|
|
// it should broadcast a TO with same rank as partialTimeoutCertificate.Rank
|
|
require.Equal(es.T(), partialTimeoutCertificate.Rank, timeoutState.Rank)
|
|
}).Once()
|
|
|
|
err := es.eventhandler.OnPartialTimeoutCertificateCreated(partialTimeoutCertificate)
|
|
require.NoError(es.T(), err)
|
|
|
|
require.Equal(es.T(), es.endRank, es.paceMaker.CurrentRank(), "incorrect rank change")
|
|
}
|
|
|
|
es.Run("qc-triggered-rank-change", func() {
|
|
partialTimeoutCertificate := &consensus.PartialTimeoutCertificateCreated{
|
|
Rank: es.qc.GetRank() + 1,
|
|
NewestQuorumCertificate: es.qc,
|
|
}
|
|
testOnPartialTimeoutCertificateCreated(partialTimeoutCertificate)
|
|
})
|
|
es.Run("tc-triggered-rank-change", func() {
|
|
tc := helper.MakeTC(helper.WithTCRank(es.endRank), helper.WithTCNewestQC(es.qc))
|
|
partialTimeoutCertificate := &consensus.PartialTimeoutCertificateCreated{
|
|
Rank: tc.GetRank() + 1,
|
|
NewestQuorumCertificate: tc.GetLatestQuorumCert(),
|
|
PriorRankTimeoutCertificate: tc,
|
|
}
|
|
testOnPartialTimeoutCertificateCreated(partialTimeoutCertificate)
|
|
})
|
|
}
|
|
|
|
func createState(rank uint64) *models.State[*helper.TestState] {
|
|
return &models.State[*helper.TestState]{
|
|
Identifier: fmt.Sprintf("%d", rank),
|
|
Rank: rank,
|
|
}
|
|
}
|
|
|
|
func createStateWithQC(rank uint64, qcrank uint64) *models.State[*helper.TestState] {
|
|
state := createState(rank)
|
|
parent := createState(qcrank)
|
|
state.ParentQuorumCertificate = createQC(parent)
|
|
return state
|
|
}
|
|
|
|
func createQC(parent *models.State[*helper.TestState]) models.QuorumCertificate {
|
|
qc := &helper.TestQuorumCertificate{
|
|
Selector: parent.Identifier,
|
|
Rank: parent.Rank,
|
|
FrameNumber: parent.Rank,
|
|
AggregatedSignature: &helper.TestAggregatedSignature{
|
|
Signature: make([]byte, 74),
|
|
Bitmask: []byte{0x1},
|
|
PublicKey: make([]byte, 585),
|
|
},
|
|
}
|
|
return qc
|
|
}
|
|
|
|
func createVote(state *models.State[*helper.TestState]) *helper.TestVote {
|
|
return &helper.TestVote{
|
|
Rank: state.Rank,
|
|
StateID: state.Identifier,
|
|
ID: "0",
|
|
Signature: make([]byte, 74),
|
|
}
|
|
}
|
|
|
|
func createProposal(rank uint64, qcrank uint64) *models.SignedProposal[*helper.TestState, *helper.TestVote] {
|
|
state := createStateWithQC(rank, qcrank)
|
|
return helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](
|
|
helper.WithProposal[*helper.TestState, *helper.TestVote](
|
|
helper.MakeProposal(helper.WithState(state))))
|
|
}
|