mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-22 02:47:26 +08:00
739 lines
26 KiB
Go
739 lines
26 KiB
Go
package integration
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gammazero/workerpool"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/atomic"
|
|
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/counters"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/eventhandler"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/forks"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/helper"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/mocks"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker/timeout"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/safetyrules"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/stateproducer"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutaggregator"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutcollector"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/voteaggregator"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/votecollector"
|
|
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
|
|
"source.quilibrium.com/quilibrium/monorepo/lifecycle/unittest"
|
|
)
|
|
|
|
type Instance struct {
|
|
|
|
// instance parameters
|
|
logger consensus.TraceLogger
|
|
participants []models.WeightedIdentity
|
|
localID models.Identity
|
|
dropVoteIn VoteFilter
|
|
dropVoteOut VoteFilter
|
|
dropPropIn ProposalFilter
|
|
dropPropOut ProposalFilter
|
|
dropTimeoutStateIn TimeoutStateFilter
|
|
dropTimeoutStateOut TimeoutStateFilter
|
|
stop Condition
|
|
|
|
// instance data
|
|
queue chan interface{}
|
|
updatingStates sync.RWMutex
|
|
headers map[models.Identity]*models.State[*helper.TestState]
|
|
pendings map[models.Identity]*models.SignedProposal[*helper.TestState, *helper.TestVote] // indexed by parent ID
|
|
|
|
// mocked dependencies
|
|
committee *mocks.DynamicCommittee
|
|
builder *mocks.LeaderProvider[*helper.TestState, *helper.TestPeer, *helper.TestCollected]
|
|
finalizer *mocks.Finalizer
|
|
persist *mocks.ConsensusStore[*helper.TestVote]
|
|
signer *mocks.Signer[*helper.TestState, *helper.TestVote]
|
|
verifier *mocks.Verifier[*helper.TestVote]
|
|
notifier *MockedCommunicatorConsumer
|
|
voting *mocks.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]
|
|
|
|
// real dependencies
|
|
pacemaker consensus.Pacemaker
|
|
producer *stateproducer.StateProducer[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected]
|
|
forks *forks.Forks[*helper.TestState, *helper.TestVote]
|
|
voteAggregator *voteaggregator.VoteAggregator[*helper.TestState, *helper.TestVote]
|
|
timeoutAggregator *timeoutaggregator.TimeoutAggregator[*helper.TestVote]
|
|
safetyRules *safetyrules.SafetyRules[*helper.TestState, *helper.TestVote]
|
|
validator *validator.Validator[*helper.TestState, *helper.TestVote]
|
|
|
|
// main logic
|
|
handler *eventhandler.EventHandler[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected]
|
|
}
|
|
|
|
type MockedCommunicatorConsumer struct {
|
|
notifications.NoopProposalViolationConsumer[*helper.TestState, *helper.TestVote]
|
|
notifications.NoopParticipantConsumer[*helper.TestState, *helper.TestVote]
|
|
notifications.NoopFinalizationConsumer[*helper.TestState]
|
|
*mocks.CommunicatorConsumer[*helper.TestState, *helper.TestVote]
|
|
}
|
|
|
|
func NewMockedCommunicatorConsumer() *MockedCommunicatorConsumer {
|
|
return &MockedCommunicatorConsumer{
|
|
CommunicatorConsumer: &mocks.CommunicatorConsumer[*helper.TestState, *helper.TestVote]{},
|
|
}
|
|
}
|
|
|
|
var _ consensus.Consumer[*helper.TestState, *helper.TestVote] = (*MockedCommunicatorConsumer)(nil)
|
|
var _ consensus.TimeoutCollectorConsumer[*helper.TestVote] = (*Instance)(nil)
|
|
|
|
func NewInstance(t *testing.T, options ...Option) *Instance {
|
|
|
|
// generate random default identity
|
|
identity := helper.MakeIdentity()
|
|
|
|
// initialize the default configuration
|
|
cfg := Config{
|
|
Logger: helper.Logger(),
|
|
Root: DefaultRoot(),
|
|
Participants: []models.WeightedIdentity{&helper.TestWeightedIdentity{
|
|
ID: identity,
|
|
}},
|
|
LocalID: identity,
|
|
Timeouts: timeout.DefaultConfig,
|
|
IncomingVotes: DropNoVotes,
|
|
OutgoingVotes: DropNoVotes,
|
|
IncomingProposals: DropNoProposals,
|
|
OutgoingProposals: DropNoProposals,
|
|
IncomingTimeoutStates: DropNoTimeoutStates,
|
|
OutgoingTimeoutStates: DropNoTimeoutStates,
|
|
StopCondition: RightAway,
|
|
}
|
|
|
|
// apply the custom options
|
|
for _, option := range options {
|
|
option(&cfg)
|
|
}
|
|
|
|
// check the local ID is a participant
|
|
takesPart := false
|
|
for _, participant := range cfg.Participants {
|
|
if participant.Identity() == cfg.LocalID {
|
|
takesPart = true
|
|
break
|
|
}
|
|
}
|
|
require.True(t, takesPart)
|
|
|
|
// initialize the instance
|
|
in := Instance{
|
|
|
|
// instance parameters
|
|
logger: cfg.Logger,
|
|
participants: cfg.Participants,
|
|
localID: cfg.LocalID,
|
|
dropVoteIn: cfg.IncomingVotes,
|
|
dropVoteOut: cfg.OutgoingVotes,
|
|
dropPropIn: cfg.IncomingProposals,
|
|
dropPropOut: cfg.OutgoingProposals,
|
|
dropTimeoutStateIn: cfg.IncomingTimeoutStates,
|
|
dropTimeoutStateOut: cfg.OutgoingTimeoutStates,
|
|
stop: cfg.StopCondition,
|
|
|
|
// instance data
|
|
pendings: make(map[models.Identity]*models.SignedProposal[*helper.TestState, *helper.TestVote]),
|
|
headers: make(map[models.Identity]*models.State[*helper.TestState]),
|
|
queue: make(chan interface{}, 1024),
|
|
|
|
// instance mocks
|
|
committee: &mocks.DynamicCommittee{},
|
|
builder: &mocks.LeaderProvider[*helper.TestState, *helper.TestPeer, *helper.TestCollected]{},
|
|
persist: &mocks.ConsensusStore[*helper.TestVote]{},
|
|
signer: &mocks.Signer[*helper.TestState, *helper.TestVote]{},
|
|
verifier: &mocks.Verifier[*helper.TestVote]{},
|
|
notifier: NewMockedCommunicatorConsumer(),
|
|
finalizer: &mocks.Finalizer{},
|
|
voting: &mocks.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]{},
|
|
}
|
|
|
|
// insert root state into headers register
|
|
in.headers[cfg.Root.Identifier] = cfg.Root
|
|
|
|
// program the hotstuff committee state
|
|
in.committee.On("IdentitiesByRank", mock.Anything).Return(
|
|
func(_ uint64) []models.WeightedIdentity {
|
|
return in.participants
|
|
},
|
|
nil,
|
|
)
|
|
in.committee.On("IdentitiesByState", mock.Anything).Return(
|
|
func(_ models.Identity) []models.WeightedIdentity {
|
|
return in.participants
|
|
},
|
|
nil,
|
|
)
|
|
for _, participant := range in.participants {
|
|
in.committee.On("IdentityByState", mock.Anything, participant.Identity()).Return(participant, nil)
|
|
in.committee.On("IdentityByRank", mock.Anything, participant.Identity()).Return(participant, nil)
|
|
}
|
|
in.committee.On("Self").Return(in.localID)
|
|
in.committee.On("LeaderForRank", mock.Anything).Return(
|
|
func(rank uint64) models.Identity {
|
|
return in.participants[int(rank)%len(in.participants)].Identity()
|
|
}, nil,
|
|
)
|
|
in.committee.On("QuorumThresholdForRank", mock.Anything).Return(uint64(len(in.participants)*2000/3), nil)
|
|
in.committee.On("TimeoutThresholdForRank", mock.Anything).Return(uint64(len(in.participants)*2000/3), nil)
|
|
|
|
// program the builder module behaviour
|
|
in.builder.On("ProveNextState", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
|
|
func(ctx context.Context, rank uint64, filter []byte, parentID models.Identity) **helper.TestState {
|
|
in.updatingStates.Lock()
|
|
defer in.updatingStates.Unlock()
|
|
|
|
_, ok := in.headers[parentID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
s := &helper.TestState{
|
|
Rank: rank,
|
|
Signature: []byte{},
|
|
Timestamp: uint64(time.Now().UnixMilli()),
|
|
ID: helper.MakeIdentity(),
|
|
Prover: in.localID,
|
|
}
|
|
return &s
|
|
},
|
|
func(ctx context.Context, rank uint64, filter []byte, parentID models.Identity) error {
|
|
in.updatingStates.RLock()
|
|
_, ok := in.headers[parentID]
|
|
in.updatingStates.RUnlock()
|
|
if !ok {
|
|
return fmt.Errorf("parent state not found (parent: %x)", parentID)
|
|
}
|
|
return nil
|
|
},
|
|
)
|
|
|
|
// check on stop condition, stop the tests as soon as entering a certain rank
|
|
in.persist.On("PutConsensusState", mock.Anything).Return(nil)
|
|
in.persist.On("PutLivenessState", mock.Anything).Return(nil)
|
|
|
|
// program the hotstuff signer behaviour
|
|
in.signer.On("CreateVote", mock.Anything).Return(
|
|
func(state *models.State[*helper.TestState]) **helper.TestVote {
|
|
vote := &helper.TestVote{
|
|
Rank: state.Rank,
|
|
StateID: state.Identifier,
|
|
ID: in.localID,
|
|
Signature: make([]byte, 74),
|
|
}
|
|
return &vote
|
|
},
|
|
nil,
|
|
)
|
|
in.signer.On("CreateTimeout", mock.Anything, mock.Anything, mock.Anything).Return(
|
|
func(curRank uint64, newestQC models.QuorumCertificate, previousRankTimeoutCert models.TimeoutCertificate) *models.TimeoutState[*helper.TestVote] {
|
|
v := &helper.TestVote{
|
|
Rank: curRank,
|
|
Signature: make([]byte, 74),
|
|
Timestamp: uint64(time.Now().UnixMilli()),
|
|
ID: in.localID,
|
|
}
|
|
timeoutState := &models.TimeoutState[*helper.TestVote]{
|
|
Rank: curRank,
|
|
LatestQuorumCertificate: newestQC,
|
|
PriorRankTimeoutCertificate: previousRankTimeoutCert,
|
|
Vote: &v,
|
|
}
|
|
return timeoutState
|
|
},
|
|
nil,
|
|
)
|
|
in.signer.On("CreateQuorumCertificate", mock.Anything).Return(
|
|
func(votes []*helper.TestVote) models.QuorumCertificate {
|
|
voterIDs := make([]models.Identity, 0, len(votes))
|
|
bitmask := []byte{0, 0}
|
|
for i, vote := range votes {
|
|
bitmask[i/8] |= 1 << (i % 8)
|
|
voterIDs = append(voterIDs, vote.ID)
|
|
}
|
|
|
|
qc := &helper.TestQuorumCertificate{
|
|
Rank: votes[0].Rank,
|
|
FrameNumber: votes[0].Rank,
|
|
Selector: votes[0].StateID,
|
|
Timestamp: uint64(time.Now().UnixMilli()),
|
|
AggregatedSignature: &helper.TestAggregatedSignature{
|
|
Signature: make([]byte, 74),
|
|
Bitmask: bitmask,
|
|
PublicKey: make([]byte, 585),
|
|
},
|
|
}
|
|
return qc
|
|
},
|
|
nil,
|
|
)
|
|
|
|
// program the hotstuff verifier behaviour
|
|
in.verifier.On("VerifyVote", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
in.verifier.On("VerifyQuorumCertificate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
in.verifier.On("VerifyTimeoutCertificate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
|
|
|
// program the hotstuff communicator behaviour
|
|
in.notifier.CommunicatorConsumer.On("OnOwnProposal", mock.Anything, mock.Anything).Run(
|
|
func(args mock.Arguments) {
|
|
proposal, ok := args[0].(*models.SignedProposal[*helper.TestState, *helper.TestVote])
|
|
require.True(t, ok)
|
|
|
|
// sender should always have the parent
|
|
in.updatingStates.RLock()
|
|
_, exists := in.headers[proposal.State.ParentQuorumCertificate.Identity()]
|
|
in.updatingStates.RUnlock()
|
|
|
|
if !exists {
|
|
t.Fatalf("parent for proposal not found parent: %x", proposal.State.ParentQuorumCertificate.Identity())
|
|
}
|
|
|
|
// store locally and loop back to engine for processing
|
|
in.ProcessState(proposal)
|
|
},
|
|
)
|
|
in.notifier.CommunicatorConsumer.On("OnOwnTimeout", mock.Anything).Run(func(args mock.Arguments) {
|
|
timeoutState, ok := args[0].(*models.TimeoutState[*helper.TestVote])
|
|
require.True(t, ok)
|
|
in.queue <- timeoutState
|
|
},
|
|
)
|
|
// in case of single node setup we should just forward vote to our own node
|
|
// for multi-node setup this method will be overridden
|
|
in.notifier.CommunicatorConsumer.On("OnOwnVote", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
|
|
vote, ok := args[0].(**helper.TestVote)
|
|
require.True(t, ok)
|
|
in.queue <- *vote
|
|
})
|
|
|
|
// program the finalizer module behaviour
|
|
in.finalizer.On("MakeFinal", mock.Anything).Return(
|
|
func(stateID models.Identity) error {
|
|
|
|
// as we don't use mocks to assert expectations, but only to
|
|
// simulate behaviour, we should drop the call data regularly
|
|
in.updatingStates.RLock()
|
|
state, found := in.headers[stateID]
|
|
in.updatingStates.RUnlock()
|
|
if !found {
|
|
return fmt.Errorf("can't broadcast with unknown parent")
|
|
}
|
|
if state.Rank%100 == 0 {
|
|
in.committee.Calls = nil
|
|
in.builder.Calls = nil
|
|
in.signer.Calls = nil
|
|
in.verifier.Calls = nil
|
|
in.notifier.CommunicatorConsumer.Calls = nil
|
|
in.finalizer.Calls = nil
|
|
}
|
|
|
|
return nil
|
|
},
|
|
)
|
|
|
|
// initialize error handling and logging
|
|
var err error
|
|
|
|
notifier := pubsub.NewDistributor[*helper.TestState, *helper.TestVote]()
|
|
notifier.AddConsumer(in.notifier)
|
|
logConsumer := notifications.NewLogConsumer[*helper.TestState, *helper.TestVote](in.logger)
|
|
notifier.AddConsumer(logConsumer)
|
|
|
|
// initialize the finalizer
|
|
var rootState *models.State[*helper.TestState]
|
|
if cfg.Root.ParentQuorumCertificate != nil {
|
|
rootState = models.StateFrom(cfg.Root.State, cfg.Root.ParentQuorumCertificate)
|
|
} else {
|
|
rootState = models.GenesisStateFrom(cfg.Root.State)
|
|
}
|
|
|
|
rootQC := &helper.TestQuorumCertificate{
|
|
Rank: rootState.Rank,
|
|
FrameNumber: rootState.Rank,
|
|
Selector: rootState.Identifier,
|
|
Timestamp: uint64(time.Now().UnixMilli()),
|
|
AggregatedSignature: &helper.TestAggregatedSignature{
|
|
Signature: make([]byte, 74),
|
|
Bitmask: []byte{0b11111111, 0b00000000},
|
|
PublicKey: make([]byte, 585),
|
|
},
|
|
}
|
|
certifiedRootState, err := models.NewCertifiedState(rootState, rootQC)
|
|
require.NoError(t, err)
|
|
|
|
livenessData := &models.LivenessState{
|
|
CurrentRank: rootQC.Rank + 1,
|
|
LatestQuorumCertificate: rootQC,
|
|
}
|
|
|
|
in.persist.On("GetLivenessState", mock.Anything).Return(livenessData, nil).Once()
|
|
|
|
// initialize the pacemaker
|
|
controller := timeout.NewController(cfg.Timeouts)
|
|
in.pacemaker, err = pacemaker.NewPacemaker[*helper.TestState, *helper.TestVote](nil, controller, pacemaker.NoProposalDelay(), notifier, in.persist, in.logger)
|
|
require.NoError(t, err)
|
|
|
|
// initialize the forks handler
|
|
in.forks, err = forks.NewForks(certifiedRootState, in.finalizer, notifier)
|
|
require.NoError(t, err)
|
|
|
|
// initialize the validator
|
|
in.validator = validator.NewValidator[*helper.TestState, *helper.TestVote](in.committee, in.verifier)
|
|
|
|
packer := &mocks.Packer{}
|
|
packer.On("Pack", mock.Anything, mock.Anything).Return(
|
|
func(rank uint64, sig *consensus.StateSignatureData) ([]byte, []byte, error) {
|
|
indices := []byte{0, 0}
|
|
for i := range sig.Signers {
|
|
indices[i/8] |= 1 << (i % 8)
|
|
}
|
|
|
|
return indices, make([]byte, 74), nil
|
|
},
|
|
).Maybe()
|
|
|
|
onQCCreated := func(qc models.QuorumCertificate) {
|
|
in.queue <- qc
|
|
}
|
|
|
|
voteProcessorFactory := mocks.NewVoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)
|
|
voteProcessorFactory.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
|
|
func(tracer consensus.TraceLogger, filter []byte, proposal *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote] {
|
|
processor, err := votecollector.NewBootstrapVoteProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer](
|
|
in.logger,
|
|
filter,
|
|
in.committee,
|
|
proposal.State,
|
|
onQCCreated,
|
|
[]byte{},
|
|
aggregator,
|
|
in.voting,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
vote, err := proposal.ProposerVote()
|
|
require.NoError(t, err)
|
|
|
|
err = processor.Process(vote)
|
|
if err != nil {
|
|
t.Fatalf("invalid vote for own proposal: %v", err)
|
|
}
|
|
return processor
|
|
}, nil).Maybe()
|
|
in.voting.On("FinalizeQuorumCertificate", mock.Anything, mock.Anything, mock.Anything).Return(
|
|
func(
|
|
ctx context.Context,
|
|
state *models.State[*helper.TestState],
|
|
aggregatedSignature models.AggregatedSignature,
|
|
) (models.QuorumCertificate, error) {
|
|
return &helper.TestQuorumCertificate{
|
|
Rank: state.Rank,
|
|
Timestamp: state.Timestamp,
|
|
FrameNumber: state.Rank,
|
|
Selector: state.Identifier,
|
|
AggregatedSignature: aggregatedSignature,
|
|
}, nil
|
|
},
|
|
)
|
|
in.voting.On("FinalizeTimeout", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
|
|
func(ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, latestQuorumCertificateRanks []consensus.TimeoutSignerInfo, aggregatedSignature models.AggregatedSignature) (models.TimeoutCertificate, error) {
|
|
ranks := []uint64{}
|
|
for _, i := range latestQuorumCertificateRanks {
|
|
ranks = append(ranks, i.NewestQCRank)
|
|
}
|
|
return &helper.TestTimeoutCertificate{
|
|
Filter: nil,
|
|
Rank: rank,
|
|
LatestRanks: ranks,
|
|
LatestQuorumCert: latestQuorumCertificate,
|
|
AggregatedSignature: aggregatedSignature,
|
|
}, nil
|
|
},
|
|
)
|
|
|
|
voteAggregationDistributor := pubsub.NewVoteAggregationDistributor[*helper.TestState, *helper.TestVote]()
|
|
sigAgg := mocks.NewSignatureAggregator(t)
|
|
sigAgg.On("Aggregate", mock.Anything, mock.Anything).Return(
|
|
func(publicKeys [][]byte, signatures [][]byte) (models.AggregatedSignature, error) {
|
|
bitmask := []byte{0, 0}
|
|
for i := range publicKeys {
|
|
bitmask[i/8] |= 1 << (i % 8)
|
|
}
|
|
return &helper.TestAggregatedSignature{
|
|
Signature: make([]byte, 74),
|
|
Bitmask: bitmask,
|
|
PublicKey: make([]byte, 585),
|
|
}, nil
|
|
}).Maybe()
|
|
sigAgg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil).Maybe()
|
|
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(in.logger, []byte{}, voteAggregationDistributor, voteProcessorFactory.Create, []byte{}, sigAgg, in.voting)
|
|
voteCollectors := voteaggregator.NewVoteCollectors[*helper.TestState, *helper.TestVote](in.logger, livenessData.CurrentRank, workerpool.New(2), createCollectorFactoryMethod)
|
|
|
|
// initialize the vote aggregator
|
|
in.voteAggregator, err = voteaggregator.NewVoteAggregator[*helper.TestState, *helper.TestVote](
|
|
in.logger,
|
|
voteAggregationDistributor,
|
|
livenessData.CurrentRank,
|
|
voteCollectors,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
// initialize factories for timeout collector and timeout processor
|
|
timeoutAggregationDistributor := pubsub.NewTimeoutAggregationDistributor[*helper.TestVote]()
|
|
timeoutProcessorFactory := mocks.NewTimeoutProcessorFactory[*helper.TestVote](t)
|
|
timeoutProcessorFactory.On("Create", mock.Anything).Return(
|
|
func(rank uint64) consensus.TimeoutProcessor[*helper.TestVote] {
|
|
// mock signature aggregator which doesn't perform any crypto operations and just tracks total weight
|
|
aggregator := &mocks.TimeoutSignatureAggregator{}
|
|
totalWeight := atomic.NewUint64(0)
|
|
newestRank := counters.NewMonotonicCounter(0)
|
|
bits := counters.NewMonotonicCounter(0)
|
|
aggregator.On("Rank").Return(rank).Maybe()
|
|
aggregator.On("TotalWeight").Return(func() uint64 {
|
|
return totalWeight.Load()
|
|
}).Maybe()
|
|
aggregator.On("VerifyAndAdd", mock.Anything, mock.Anything, mock.Anything).Return(
|
|
func(signerID models.Identity, _ []byte, newestQCRank uint64) uint64 {
|
|
newestRank.Set(newestQCRank)
|
|
var signer models.WeightedIdentity
|
|
for _, p := range in.participants {
|
|
if p.Identity() == signerID {
|
|
signer = p
|
|
}
|
|
}
|
|
require.NotNil(t, signer)
|
|
bits.Increment()
|
|
return totalWeight.Add(signer.Weight())
|
|
}, nil,
|
|
).Maybe()
|
|
aggregator.On("Aggregate").Return(
|
|
func() []consensus.TimeoutSignerInfo {
|
|
signersData := make([]consensus.TimeoutSignerInfo, 0, len(in.participants))
|
|
newestQCRank := newestRank.Value()
|
|
for _, signer := range in.participants {
|
|
signersData = append(signersData, consensus.TimeoutSignerInfo{
|
|
NewestQCRank: newestQCRank,
|
|
Signer: signer.Identity(),
|
|
})
|
|
}
|
|
return signersData
|
|
},
|
|
func() models.AggregatedSignature {
|
|
bitCount := bits.Value()
|
|
bitmask := []byte{0, 0}
|
|
for i := range bitCount {
|
|
pos := i / 8
|
|
bitmask[pos] |= 1 << (i % 8)
|
|
}
|
|
return &helper.TestAggregatedSignature{
|
|
Signature: make([]byte, 74),
|
|
Bitmask: bitmask,
|
|
PublicKey: make([]byte, 585),
|
|
}
|
|
},
|
|
nil,
|
|
).Maybe()
|
|
|
|
p, err := timeoutcollector.NewTimeoutProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer](
|
|
in.logger,
|
|
in.committee,
|
|
in.validator,
|
|
aggregator,
|
|
timeoutAggregationDistributor,
|
|
in.voting,
|
|
)
|
|
require.NoError(t, err)
|
|
return p
|
|
}, nil).Maybe()
|
|
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(
|
|
in.logger,
|
|
timeoutAggregationDistributor,
|
|
timeoutProcessorFactory,
|
|
)
|
|
timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(
|
|
in.logger,
|
|
livenessData.CurrentRank,
|
|
timeoutCollectorFactory,
|
|
)
|
|
|
|
// initialize the timeout aggregator
|
|
in.timeoutAggregator, err = timeoutaggregator.NewTimeoutAggregator(
|
|
in.logger,
|
|
livenessData.CurrentRank,
|
|
timeoutCollectors,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
safetyData := &models.ConsensusState[*helper.TestVote]{
|
|
FinalizedRank: rootState.Rank,
|
|
LatestAcknowledgedRank: rootState.Rank,
|
|
}
|
|
in.persist.On("GetConsensusState", mock.Anything).Return(safetyData, nil).Once()
|
|
|
|
// initialize the safety rules
|
|
in.safetyRules, err = safetyrules.NewSafetyRules(nil, in.signer, in.persist, in.committee)
|
|
require.NoError(t, err)
|
|
|
|
// initialize the state producer
|
|
in.producer, err = stateproducer.NewStateProducer[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected](in.safetyRules, in.committee, in.builder)
|
|
require.NoError(t, err)
|
|
|
|
// initialize the event handler
|
|
in.handler, err = eventhandler.NewEventHandler[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected](
|
|
in.pacemaker,
|
|
in.producer,
|
|
in.forks,
|
|
in.persist,
|
|
in.committee,
|
|
in.safetyRules,
|
|
notifier,
|
|
in.logger,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
timeoutAggregationDistributor.AddTimeoutCollectorConsumer(logConsumer)
|
|
timeoutAggregationDistributor.AddTimeoutCollectorConsumer(&in)
|
|
voteAggregationDistributor.AddVoteCollectorConsumer(logConsumer)
|
|
|
|
return &in
|
|
}
|
|
|
|
func (in *Instance) Run(t *testing.T) error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer func() {
|
|
cancel()
|
|
<-lifecycle.AllDone(in.voteAggregator, in.timeoutAggregator)
|
|
}()
|
|
signalerCtx := unittest.NewMockSignalerContext(t, ctx)
|
|
in.voteAggregator.Start(signalerCtx)
|
|
in.timeoutAggregator.Start(signalerCtx)
|
|
<-lifecycle.AllReady(in.voteAggregator, in.timeoutAggregator)
|
|
|
|
// start the event handler
|
|
err := in.handler.Start(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("could not start event handler: %w", err)
|
|
}
|
|
|
|
// run until an error or stop condition is reached
|
|
for {
|
|
// check on stop conditions
|
|
if in.stop(in) {
|
|
return errStopCondition
|
|
}
|
|
|
|
// we handle timeouts with priority
|
|
select {
|
|
case <-in.handler.TimeoutChannel():
|
|
err := in.handler.OnLocalTimeout()
|
|
if err != nil {
|
|
panic(fmt.Errorf("could not process timeout: %w", err))
|
|
}
|
|
default:
|
|
}
|
|
|
|
// check on stop conditions
|
|
if in.stop(in) {
|
|
return errStopCondition
|
|
}
|
|
|
|
// otherwise, process first received event
|
|
select {
|
|
case <-in.handler.TimeoutChannel():
|
|
err := in.handler.OnLocalTimeout()
|
|
if err != nil {
|
|
return fmt.Errorf("could not process timeout: %w", err)
|
|
}
|
|
case msg := <-in.queue:
|
|
switch m := msg.(type) {
|
|
case *models.SignedProposal[*helper.TestState, *helper.TestVote]:
|
|
// add state to aggregator
|
|
in.voteAggregator.AddState(m)
|
|
// then pass to event handler
|
|
err := in.handler.OnReceiveProposal(m)
|
|
if err != nil {
|
|
return fmt.Errorf("could not process proposal: %w", err)
|
|
}
|
|
case *helper.TestVote:
|
|
in.voteAggregator.AddVote(&m)
|
|
case *models.TimeoutState[*helper.TestVote]:
|
|
in.timeoutAggregator.AddTimeout(m)
|
|
case models.QuorumCertificate:
|
|
err := in.handler.OnReceiveQuorumCertificate(m)
|
|
if err != nil {
|
|
return fmt.Errorf("could not process received QC: %w", err)
|
|
}
|
|
case models.TimeoutCertificate:
|
|
err := in.handler.OnReceiveTimeoutCertificate(m)
|
|
if err != nil {
|
|
return fmt.Errorf("could not process received TC: %w", err)
|
|
}
|
|
case *consensus.PartialTimeoutCertificateCreated:
|
|
err := in.handler.OnPartialTimeoutCertificateCreated(m)
|
|
if err != nil {
|
|
return fmt.Errorf("could not process partial TC: %w", err)
|
|
}
|
|
default:
|
|
fmt.Printf("unhandled queue event: %s\n", reflect.ValueOf(msg).Type().String())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (in *Instance) ProcessState(proposal *models.SignedProposal[*helper.TestState, *helper.TestVote]) {
|
|
in.updatingStates.Lock()
|
|
defer in.updatingStates.Unlock()
|
|
_, parentExists := in.headers[proposal.State.ParentQuorumCertificate.Identity()]
|
|
|
|
if parentExists {
|
|
next := proposal
|
|
for next != nil {
|
|
in.headers[next.State.Identifier] = next.State
|
|
|
|
in.queue <- next
|
|
// keep processing the pending states
|
|
next = in.pendings[next.State.ParentQuorumCertificate.Identity()]
|
|
}
|
|
} else {
|
|
// cache it in pendings by ParentID
|
|
in.pendings[proposal.State.ParentQuorumCertificate.Identity()] = proposal
|
|
}
|
|
}
|
|
|
|
func (in *Instance) OnTimeoutCertificateConstructedFromTimeouts(tc models.TimeoutCertificate) {
|
|
in.queue <- tc
|
|
}
|
|
|
|
func (in *Instance) OnPartialTimeoutCertificateCreated(rank uint64, newestQC models.QuorumCertificate, previousRankTimeoutCert models.TimeoutCertificate) {
|
|
in.queue <- &consensus.PartialTimeoutCertificateCreated{
|
|
Rank: rank,
|
|
NewestQuorumCertificate: newestQC,
|
|
PriorRankTimeoutCertificate: previousRankTimeoutCert,
|
|
}
|
|
}
|
|
|
|
func (in *Instance) OnNewQuorumCertificateDiscovered(qc models.QuorumCertificate) {
|
|
in.queue <- qc
|
|
}
|
|
|
|
func (in *Instance) OnNewTimeoutCertificateDiscovered(tc models.TimeoutCertificate) {
|
|
in.queue <- tc
|
|
}
|
|
|
|
func (in *Instance) OnTimeoutProcessed(*models.TimeoutState[*helper.TestVote]) {
|
|
}
|