ceremonyclient/consensus/integration/instance_test.go
Cassandra Heart b507ff26f2
v2.1.0.10
2025-11-19 00:00:31 -06:00

739 lines
26 KiB
Go

package integration
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
"time"
"github.com/gammazero/workerpool"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/counters"
"source.quilibrium.com/quilibrium/monorepo/consensus/eventhandler"
"source.quilibrium.com/quilibrium/monorepo/consensus/forks"
"source.quilibrium.com/quilibrium/monorepo/consensus/helper"
"source.quilibrium.com/quilibrium/monorepo/consensus/mocks"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications"
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub"
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker"
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker/timeout"
"source.quilibrium.com/quilibrium/monorepo/consensus/safetyrules"
"source.quilibrium.com/quilibrium/monorepo/consensus/stateproducer"
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutaggregator"
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutcollector"
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
"source.quilibrium.com/quilibrium/monorepo/consensus/voteaggregator"
"source.quilibrium.com/quilibrium/monorepo/consensus/votecollector"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/lifecycle/unittest"
)
type Instance struct {
// instance parameters
logger consensus.TraceLogger
participants []models.WeightedIdentity
localID models.Identity
dropVoteIn VoteFilter
dropVoteOut VoteFilter
dropPropIn ProposalFilter
dropPropOut ProposalFilter
dropTimeoutStateIn TimeoutStateFilter
dropTimeoutStateOut TimeoutStateFilter
stop Condition
// instance data
queue chan interface{}
updatingStates sync.RWMutex
headers map[models.Identity]*models.State[*helper.TestState]
pendings map[models.Identity]*models.SignedProposal[*helper.TestState, *helper.TestVote] // indexed by parent ID
// mocked dependencies
committee *mocks.DynamicCommittee
builder *mocks.LeaderProvider[*helper.TestState, *helper.TestPeer, *helper.TestCollected]
finalizer *mocks.Finalizer
persist *mocks.ConsensusStore[*helper.TestVote]
signer *mocks.Signer[*helper.TestState, *helper.TestVote]
verifier *mocks.Verifier[*helper.TestVote]
notifier *MockedCommunicatorConsumer
voting *mocks.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]
// real dependencies
pacemaker consensus.Pacemaker
producer *stateproducer.StateProducer[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected]
forks *forks.Forks[*helper.TestState, *helper.TestVote]
voteAggregator *voteaggregator.VoteAggregator[*helper.TestState, *helper.TestVote]
timeoutAggregator *timeoutaggregator.TimeoutAggregator[*helper.TestVote]
safetyRules *safetyrules.SafetyRules[*helper.TestState, *helper.TestVote]
validator *validator.Validator[*helper.TestState, *helper.TestVote]
// main logic
handler *eventhandler.EventHandler[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected]
}
type MockedCommunicatorConsumer struct {
notifications.NoopProposalViolationConsumer[*helper.TestState, *helper.TestVote]
notifications.NoopParticipantConsumer[*helper.TestState, *helper.TestVote]
notifications.NoopFinalizationConsumer[*helper.TestState]
*mocks.CommunicatorConsumer[*helper.TestState, *helper.TestVote]
}
func NewMockedCommunicatorConsumer() *MockedCommunicatorConsumer {
return &MockedCommunicatorConsumer{
CommunicatorConsumer: &mocks.CommunicatorConsumer[*helper.TestState, *helper.TestVote]{},
}
}
var _ consensus.Consumer[*helper.TestState, *helper.TestVote] = (*MockedCommunicatorConsumer)(nil)
var _ consensus.TimeoutCollectorConsumer[*helper.TestVote] = (*Instance)(nil)
func NewInstance(t *testing.T, options ...Option) *Instance {
// generate random default identity
identity := helper.MakeIdentity()
// initialize the default configuration
cfg := Config{
Logger: helper.Logger(),
Root: DefaultRoot(),
Participants: []models.WeightedIdentity{&helper.TestWeightedIdentity{
ID: identity,
}},
LocalID: identity,
Timeouts: timeout.DefaultConfig,
IncomingVotes: DropNoVotes,
OutgoingVotes: DropNoVotes,
IncomingProposals: DropNoProposals,
OutgoingProposals: DropNoProposals,
IncomingTimeoutStates: DropNoTimeoutStates,
OutgoingTimeoutStates: DropNoTimeoutStates,
StopCondition: RightAway,
}
// apply the custom options
for _, option := range options {
option(&cfg)
}
// check the local ID is a participant
takesPart := false
for _, participant := range cfg.Participants {
if participant.Identity() == cfg.LocalID {
takesPart = true
break
}
}
require.True(t, takesPart)
// initialize the instance
in := Instance{
// instance parameters
logger: cfg.Logger,
participants: cfg.Participants,
localID: cfg.LocalID,
dropVoteIn: cfg.IncomingVotes,
dropVoteOut: cfg.OutgoingVotes,
dropPropIn: cfg.IncomingProposals,
dropPropOut: cfg.OutgoingProposals,
dropTimeoutStateIn: cfg.IncomingTimeoutStates,
dropTimeoutStateOut: cfg.OutgoingTimeoutStates,
stop: cfg.StopCondition,
// instance data
pendings: make(map[models.Identity]*models.SignedProposal[*helper.TestState, *helper.TestVote]),
headers: make(map[models.Identity]*models.State[*helper.TestState]),
queue: make(chan interface{}, 1024),
// instance mocks
committee: &mocks.DynamicCommittee{},
builder: &mocks.LeaderProvider[*helper.TestState, *helper.TestPeer, *helper.TestCollected]{},
persist: &mocks.ConsensusStore[*helper.TestVote]{},
signer: &mocks.Signer[*helper.TestState, *helper.TestVote]{},
verifier: &mocks.Verifier[*helper.TestVote]{},
notifier: NewMockedCommunicatorConsumer(),
finalizer: &mocks.Finalizer{},
voting: &mocks.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]{},
}
// insert root state into headers register
in.headers[cfg.Root.Identifier] = cfg.Root
// program the hotstuff committee state
in.committee.On("IdentitiesByRank", mock.Anything).Return(
func(_ uint64) []models.WeightedIdentity {
return in.participants
},
nil,
)
in.committee.On("IdentitiesByState", mock.Anything).Return(
func(_ models.Identity) []models.WeightedIdentity {
return in.participants
},
nil,
)
for _, participant := range in.participants {
in.committee.On("IdentityByState", mock.Anything, participant.Identity()).Return(participant, nil)
in.committee.On("IdentityByRank", mock.Anything, participant.Identity()).Return(participant, nil)
}
in.committee.On("Self").Return(in.localID)
in.committee.On("LeaderForRank", mock.Anything).Return(
func(rank uint64) models.Identity {
return in.participants[int(rank)%len(in.participants)].Identity()
}, nil,
)
in.committee.On("QuorumThresholdForRank", mock.Anything).Return(uint64(len(in.participants)*2000/3), nil)
in.committee.On("TimeoutThresholdForRank", mock.Anything).Return(uint64(len(in.participants)*2000/3), nil)
// program the builder module behaviour
in.builder.On("ProveNextState", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
func(ctx context.Context, rank uint64, filter []byte, parentID models.Identity) **helper.TestState {
in.updatingStates.Lock()
defer in.updatingStates.Unlock()
_, ok := in.headers[parentID]
if !ok {
return nil
}
s := &helper.TestState{
Rank: rank,
Signature: []byte{},
Timestamp: uint64(time.Now().UnixMilli()),
ID: helper.MakeIdentity(),
Prover: in.localID,
}
return &s
},
func(ctx context.Context, rank uint64, filter []byte, parentID models.Identity) error {
in.updatingStates.RLock()
_, ok := in.headers[parentID]
in.updatingStates.RUnlock()
if !ok {
return fmt.Errorf("parent state not found (parent: %x)", parentID)
}
return nil
},
)
// check on stop condition, stop the tests as soon as entering a certain rank
in.persist.On("PutConsensusState", mock.Anything).Return(nil)
in.persist.On("PutLivenessState", mock.Anything).Return(nil)
// program the hotstuff signer behaviour
in.signer.On("CreateVote", mock.Anything).Return(
func(state *models.State[*helper.TestState]) **helper.TestVote {
vote := &helper.TestVote{
Rank: state.Rank,
StateID: state.Identifier,
ID: in.localID,
Signature: make([]byte, 74),
}
return &vote
},
nil,
)
in.signer.On("CreateTimeout", mock.Anything, mock.Anything, mock.Anything).Return(
func(curRank uint64, newestQC models.QuorumCertificate, previousRankTimeoutCert models.TimeoutCertificate) *models.TimeoutState[*helper.TestVote] {
v := &helper.TestVote{
Rank: curRank,
Signature: make([]byte, 74),
Timestamp: uint64(time.Now().UnixMilli()),
ID: in.localID,
}
timeoutState := &models.TimeoutState[*helper.TestVote]{
Rank: curRank,
LatestQuorumCertificate: newestQC,
PriorRankTimeoutCertificate: previousRankTimeoutCert,
Vote: &v,
}
return timeoutState
},
nil,
)
in.signer.On("CreateQuorumCertificate", mock.Anything).Return(
func(votes []*helper.TestVote) models.QuorumCertificate {
voterIDs := make([]models.Identity, 0, len(votes))
bitmask := []byte{0, 0}
for i, vote := range votes {
bitmask[i/8] |= 1 << (i % 8)
voterIDs = append(voterIDs, vote.ID)
}
qc := &helper.TestQuorumCertificate{
Rank: votes[0].Rank,
FrameNumber: votes[0].Rank,
Selector: votes[0].StateID,
Timestamp: uint64(time.Now().UnixMilli()),
AggregatedSignature: &helper.TestAggregatedSignature{
Signature: make([]byte, 74),
Bitmask: bitmask,
PublicKey: make([]byte, 585),
},
}
return qc
},
nil,
)
// program the hotstuff verifier behaviour
in.verifier.On("VerifyVote", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
in.verifier.On("VerifyQuorumCertificate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
in.verifier.On("VerifyTimeoutCertificate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
// program the hotstuff communicator behaviour
in.notifier.CommunicatorConsumer.On("OnOwnProposal", mock.Anything, mock.Anything).Run(
func(args mock.Arguments) {
proposal, ok := args[0].(*models.SignedProposal[*helper.TestState, *helper.TestVote])
require.True(t, ok)
// sender should always have the parent
in.updatingStates.RLock()
_, exists := in.headers[proposal.State.ParentQuorumCertificate.Identity()]
in.updatingStates.RUnlock()
if !exists {
t.Fatalf("parent for proposal not found parent: %x", proposal.State.ParentQuorumCertificate.Identity())
}
// store locally and loop back to engine for processing
in.ProcessState(proposal)
},
)
in.notifier.CommunicatorConsumer.On("OnOwnTimeout", mock.Anything).Run(func(args mock.Arguments) {
timeoutState, ok := args[0].(*models.TimeoutState[*helper.TestVote])
require.True(t, ok)
in.queue <- timeoutState
},
)
// in case of single node setup we should just forward vote to our own node
// for multi-node setup this method will be overridden
in.notifier.CommunicatorConsumer.On("OnOwnVote", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
vote, ok := args[0].(**helper.TestVote)
require.True(t, ok)
in.queue <- *vote
})
// program the finalizer module behaviour
in.finalizer.On("MakeFinal", mock.Anything).Return(
func(stateID models.Identity) error {
// as we don't use mocks to assert expectations, but only to
// simulate behaviour, we should drop the call data regularly
in.updatingStates.RLock()
state, found := in.headers[stateID]
in.updatingStates.RUnlock()
if !found {
return fmt.Errorf("can't broadcast with unknown parent")
}
if state.Rank%100 == 0 {
in.committee.Calls = nil
in.builder.Calls = nil
in.signer.Calls = nil
in.verifier.Calls = nil
in.notifier.CommunicatorConsumer.Calls = nil
in.finalizer.Calls = nil
}
return nil
},
)
// initialize error handling and logging
var err error
notifier := pubsub.NewDistributor[*helper.TestState, *helper.TestVote]()
notifier.AddConsumer(in.notifier)
logConsumer := notifications.NewLogConsumer[*helper.TestState, *helper.TestVote](in.logger)
notifier.AddConsumer(logConsumer)
// initialize the finalizer
var rootState *models.State[*helper.TestState]
if cfg.Root.ParentQuorumCertificate != nil {
rootState = models.StateFrom(cfg.Root.State, cfg.Root.ParentQuorumCertificate)
} else {
rootState = models.GenesisStateFrom(cfg.Root.State)
}
rootQC := &helper.TestQuorumCertificate{
Rank: rootState.Rank,
FrameNumber: rootState.Rank,
Selector: rootState.Identifier,
Timestamp: uint64(time.Now().UnixMilli()),
AggregatedSignature: &helper.TestAggregatedSignature{
Signature: make([]byte, 74),
Bitmask: []byte{0b11111111, 0b00000000},
PublicKey: make([]byte, 585),
},
}
certifiedRootState, err := models.NewCertifiedState(rootState, rootQC)
require.NoError(t, err)
livenessData := &models.LivenessState{
CurrentRank: rootQC.Rank + 1,
LatestQuorumCertificate: rootQC,
}
in.persist.On("GetLivenessState", mock.Anything).Return(livenessData, nil).Once()
// initialize the pacemaker
controller := timeout.NewController(cfg.Timeouts)
in.pacemaker, err = pacemaker.NewPacemaker[*helper.TestState, *helper.TestVote](nil, controller, pacemaker.NoProposalDelay(), notifier, in.persist, in.logger)
require.NoError(t, err)
// initialize the forks handler
in.forks, err = forks.NewForks(certifiedRootState, in.finalizer, notifier)
require.NoError(t, err)
// initialize the validator
in.validator = validator.NewValidator[*helper.TestState, *helper.TestVote](in.committee, in.verifier)
packer := &mocks.Packer{}
packer.On("Pack", mock.Anything, mock.Anything).Return(
func(rank uint64, sig *consensus.StateSignatureData) ([]byte, []byte, error) {
indices := []byte{0, 0}
for i := range sig.Signers {
indices[i/8] |= 1 << (i % 8)
}
return indices, make([]byte, 74), nil
},
).Maybe()
onQCCreated := func(qc models.QuorumCertificate) {
in.queue <- qc
}
voteProcessorFactory := mocks.NewVoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)
voteProcessorFactory.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
func(tracer consensus.TraceLogger, filter []byte, proposal *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote] {
processor, err := votecollector.NewBootstrapVoteProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer](
in.logger,
filter,
in.committee,
proposal.State,
onQCCreated,
[]byte{},
aggregator,
in.voting,
)
require.NoError(t, err)
vote, err := proposal.ProposerVote()
require.NoError(t, err)
err = processor.Process(vote)
if err != nil {
t.Fatalf("invalid vote for own proposal: %v", err)
}
return processor
}, nil).Maybe()
in.voting.On("FinalizeQuorumCertificate", mock.Anything, mock.Anything, mock.Anything).Return(
func(
ctx context.Context,
state *models.State[*helper.TestState],
aggregatedSignature models.AggregatedSignature,
) (models.QuorumCertificate, error) {
return &helper.TestQuorumCertificate{
Rank: state.Rank,
Timestamp: state.Timestamp,
FrameNumber: state.Rank,
Selector: state.Identifier,
AggregatedSignature: aggregatedSignature,
}, nil
},
)
in.voting.On("FinalizeTimeout", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
func(ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, latestQuorumCertificateRanks []consensus.TimeoutSignerInfo, aggregatedSignature models.AggregatedSignature) (models.TimeoutCertificate, error) {
ranks := []uint64{}
for _, i := range latestQuorumCertificateRanks {
ranks = append(ranks, i.NewestQCRank)
}
return &helper.TestTimeoutCertificate{
Filter: nil,
Rank: rank,
LatestRanks: ranks,
LatestQuorumCert: latestQuorumCertificate,
AggregatedSignature: aggregatedSignature,
}, nil
},
)
voteAggregationDistributor := pubsub.NewVoteAggregationDistributor[*helper.TestState, *helper.TestVote]()
sigAgg := mocks.NewSignatureAggregator(t)
sigAgg.On("Aggregate", mock.Anything, mock.Anything).Return(
func(publicKeys [][]byte, signatures [][]byte) (models.AggregatedSignature, error) {
bitmask := []byte{0, 0}
for i := range publicKeys {
bitmask[i/8] |= 1 << (i % 8)
}
return &helper.TestAggregatedSignature{
Signature: make([]byte, 74),
Bitmask: bitmask,
PublicKey: make([]byte, 585),
}, nil
}).Maybe()
sigAgg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil).Maybe()
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(in.logger, []byte{}, voteAggregationDistributor, voteProcessorFactory.Create, []byte{}, sigAgg, in.voting)
voteCollectors := voteaggregator.NewVoteCollectors[*helper.TestState, *helper.TestVote](in.logger, livenessData.CurrentRank, workerpool.New(2), createCollectorFactoryMethod)
// initialize the vote aggregator
in.voteAggregator, err = voteaggregator.NewVoteAggregator[*helper.TestState, *helper.TestVote](
in.logger,
voteAggregationDistributor,
livenessData.CurrentRank,
voteCollectors,
)
require.NoError(t, err)
// initialize factories for timeout collector and timeout processor
timeoutAggregationDistributor := pubsub.NewTimeoutAggregationDistributor[*helper.TestVote]()
timeoutProcessorFactory := mocks.NewTimeoutProcessorFactory[*helper.TestVote](t)
timeoutProcessorFactory.On("Create", mock.Anything).Return(
func(rank uint64) consensus.TimeoutProcessor[*helper.TestVote] {
// mock signature aggregator which doesn't perform any crypto operations and just tracks total weight
aggregator := &mocks.TimeoutSignatureAggregator{}
totalWeight := atomic.NewUint64(0)
newestRank := counters.NewMonotonicCounter(0)
bits := counters.NewMonotonicCounter(0)
aggregator.On("Rank").Return(rank).Maybe()
aggregator.On("TotalWeight").Return(func() uint64 {
return totalWeight.Load()
}).Maybe()
aggregator.On("VerifyAndAdd", mock.Anything, mock.Anything, mock.Anything).Return(
func(signerID models.Identity, _ []byte, newestQCRank uint64) uint64 {
newestRank.Set(newestQCRank)
var signer models.WeightedIdentity
for _, p := range in.participants {
if p.Identity() == signerID {
signer = p
}
}
require.NotNil(t, signer)
bits.Increment()
return totalWeight.Add(signer.Weight())
}, nil,
).Maybe()
aggregator.On("Aggregate").Return(
func() []consensus.TimeoutSignerInfo {
signersData := make([]consensus.TimeoutSignerInfo, 0, len(in.participants))
newestQCRank := newestRank.Value()
for _, signer := range in.participants {
signersData = append(signersData, consensus.TimeoutSignerInfo{
NewestQCRank: newestQCRank,
Signer: signer.Identity(),
})
}
return signersData
},
func() models.AggregatedSignature {
bitCount := bits.Value()
bitmask := []byte{0, 0}
for i := range bitCount {
pos := i / 8
bitmask[pos] |= 1 << (i % 8)
}
return &helper.TestAggregatedSignature{
Signature: make([]byte, 74),
Bitmask: bitmask,
PublicKey: make([]byte, 585),
}
},
nil,
).Maybe()
p, err := timeoutcollector.NewTimeoutProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer](
in.logger,
in.committee,
in.validator,
aggregator,
timeoutAggregationDistributor,
in.voting,
)
require.NoError(t, err)
return p
}, nil).Maybe()
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(
in.logger,
timeoutAggregationDistributor,
timeoutProcessorFactory,
)
timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(
in.logger,
livenessData.CurrentRank,
timeoutCollectorFactory,
)
// initialize the timeout aggregator
in.timeoutAggregator, err = timeoutaggregator.NewTimeoutAggregator(
in.logger,
livenessData.CurrentRank,
timeoutCollectors,
)
require.NoError(t, err)
safetyData := &models.ConsensusState[*helper.TestVote]{
FinalizedRank: rootState.Rank,
LatestAcknowledgedRank: rootState.Rank,
}
in.persist.On("GetConsensusState", mock.Anything).Return(safetyData, nil).Once()
// initialize the safety rules
in.safetyRules, err = safetyrules.NewSafetyRules(nil, in.signer, in.persist, in.committee)
require.NoError(t, err)
// initialize the state producer
in.producer, err = stateproducer.NewStateProducer[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected](in.safetyRules, in.committee, in.builder)
require.NoError(t, err)
// initialize the event handler
in.handler, err = eventhandler.NewEventHandler[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected](
in.pacemaker,
in.producer,
in.forks,
in.persist,
in.committee,
in.safetyRules,
notifier,
in.logger,
)
require.NoError(t, err)
timeoutAggregationDistributor.AddTimeoutCollectorConsumer(logConsumer)
timeoutAggregationDistributor.AddTimeoutCollectorConsumer(&in)
voteAggregationDistributor.AddVoteCollectorConsumer(logConsumer)
return &in
}
func (in *Instance) Run(t *testing.T) error {
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
<-lifecycle.AllDone(in.voteAggregator, in.timeoutAggregator)
}()
signalerCtx := unittest.NewMockSignalerContext(t, ctx)
in.voteAggregator.Start(signalerCtx)
in.timeoutAggregator.Start(signalerCtx)
<-lifecycle.AllReady(in.voteAggregator, in.timeoutAggregator)
// start the event handler
err := in.handler.Start(ctx)
if err != nil {
return fmt.Errorf("could not start event handler: %w", err)
}
// run until an error or stop condition is reached
for {
// check on stop conditions
if in.stop(in) {
return errStopCondition
}
// we handle timeouts with priority
select {
case <-in.handler.TimeoutChannel():
err := in.handler.OnLocalTimeout()
if err != nil {
panic(fmt.Errorf("could not process timeout: %w", err))
}
default:
}
// check on stop conditions
if in.stop(in) {
return errStopCondition
}
// otherwise, process first received event
select {
case <-in.handler.TimeoutChannel():
err := in.handler.OnLocalTimeout()
if err != nil {
return fmt.Errorf("could not process timeout: %w", err)
}
case msg := <-in.queue:
switch m := msg.(type) {
case *models.SignedProposal[*helper.TestState, *helper.TestVote]:
// add state to aggregator
in.voteAggregator.AddState(m)
// then pass to event handler
err := in.handler.OnReceiveProposal(m)
if err != nil {
return fmt.Errorf("could not process proposal: %w", err)
}
case *helper.TestVote:
in.voteAggregator.AddVote(&m)
case *models.TimeoutState[*helper.TestVote]:
in.timeoutAggregator.AddTimeout(m)
case models.QuorumCertificate:
err := in.handler.OnReceiveQuorumCertificate(m)
if err != nil {
return fmt.Errorf("could not process received QC: %w", err)
}
case models.TimeoutCertificate:
err := in.handler.OnReceiveTimeoutCertificate(m)
if err != nil {
return fmt.Errorf("could not process received TC: %w", err)
}
case *consensus.PartialTimeoutCertificateCreated:
err := in.handler.OnPartialTimeoutCertificateCreated(m)
if err != nil {
return fmt.Errorf("could not process partial TC: %w", err)
}
default:
fmt.Printf("unhandled queue event: %s\n", reflect.ValueOf(msg).Type().String())
}
}
}
}
func (in *Instance) ProcessState(proposal *models.SignedProposal[*helper.TestState, *helper.TestVote]) {
in.updatingStates.Lock()
defer in.updatingStates.Unlock()
_, parentExists := in.headers[proposal.State.ParentQuorumCertificate.Identity()]
if parentExists {
next := proposal
for next != nil {
in.headers[next.State.Identifier] = next.State
in.queue <- next
// keep processing the pending states
next = in.pendings[next.State.ParentQuorumCertificate.Identity()]
}
} else {
// cache it in pendings by ParentID
in.pendings[proposal.State.ParentQuorumCertificate.Identity()] = proposal
}
}
func (in *Instance) OnTimeoutCertificateConstructedFromTimeouts(tc models.TimeoutCertificate) {
in.queue <- tc
}
func (in *Instance) OnPartialTimeoutCertificateCreated(rank uint64, newestQC models.QuorumCertificate, previousRankTimeoutCert models.TimeoutCertificate) {
in.queue <- &consensus.PartialTimeoutCertificateCreated{
Rank: rank,
NewestQuorumCertificate: newestQC,
PriorRankTimeoutCertificate: previousRankTimeoutCert,
}
}
func (in *Instance) OnNewQuorumCertificateDiscovered(qc models.QuorumCertificate) {
in.queue <- qc
}
func (in *Instance) OnNewTimeoutCertificateDiscovered(tc models.TimeoutCertificate) {
in.queue <- tc
}
func (in *Instance) OnTimeoutProcessed(*models.TimeoutState[*helper.TestVote]) {
}