From 553ce004c0ff32f0a5a48dcadcddaaae2f08808e Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 30 Oct 2025 05:23:26 -0500 Subject: [PATCH] remaining non-integration tests --- consensus/consensus_committee.go | 36 +- consensus/consensus_safety_rules.go | 2 +- consensus/consensus_timeout.go | 8 +- consensus/eventhandler/event_handler.go | 2 +- consensus/eventhandler/event_handler_test.go | 2 +- consensus/go.mod | 1 + consensus/go.sum | 2 + consensus/helper/state.go | 28 +- consensus/safetyrules/safety_rules.go | 6 +- consensus/safetyrules/safety_rules_test.go | 4 +- consensus/signature/state_signer_decoder.go | 2 +- .../timeout_aggregator_test.go | 8 +- .../timeoutaggregator/timeout_collectors.go | 2 +- .../timeout_collectors_test.go | 4 +- consensus/timeoutcollector/factory.go | 4 +- .../timeoutcollector/timeout_processor.go | 10 +- .../timeout_processor_test.go | 6 +- consensus/tracker/tracker_test.go | 154 +++ consensus/validator/validator.go | 65 +- consensus/validator/validator_test.go | 933 ++++++++++++++++++ consensus/vote_collector.go | 2 +- consensus/voteaggregator/vote_aggregator.go | 41 +- .../voteaggregator/vote_aggregator_test.go | 107 ++ .../voteaggregator/vote_collectors_test.go | 158 +++ consensus/votecollector/factory_test.go | 118 +++ consensus/votecollector/statemachine_test.go | 286 ++++++ consensus/votecollector/testutil.go | 51 + consensus/votecollector/vote_cache_test.go | 189 ++++ .../votecollector/vote_processor_test.go | 269 +++++ 29 files changed, 2400 insertions(+), 100 deletions(-) create mode 100644 consensus/tracker/tracker_test.go create mode 100644 consensus/validator/validator_test.go create mode 100644 consensus/voteaggregator/vote_aggregator_test.go create mode 100644 consensus/voteaggregator/vote_collectors_test.go create mode 100644 consensus/votecollector/factory_test.go create mode 100644 consensus/votecollector/statemachine_test.go create mode 100644 consensus/votecollector/testutil.go create mode 100644 consensus/votecollector/vote_cache_test.go create mode 100644 consensus/votecollector/vote_processor_test.go diff --git a/consensus/consensus_committee.go b/consensus/consensus_committee.go index 4eb0d4d..83befed 100644 --- a/consensus/consensus_committee.go +++ b/consensus/consensus_committee.go @@ -9,23 +9,23 @@ import "source.quilibrium.com/quilibrium/monorepo/consensus/models" // // For the purposes of validating votes, timeouts, quorum certificates, and // timeout certificates we consider a committee which is static over the course -// of an epoch. Although committee members may be ejected, or have their weight -// change during an epoch, we ignore these changes. For these purposes we use -// the Replicas and *ByEpoch methods. +// of an rank. Although committee members may be ejected, or have their weight +// change during an rank, we ignore these changes. For these purposes we use +// the Replicas and *ByRank methods. // // When validating proposals, we take into account changes to the committee -// during the course of an epoch. In particular, if a node is ejected, we will +// during the course of an rank. In particular, if a node is ejected, we will // immediately reject all future proposals from that node. For these purposes we // use the DynamicCommittee and *ByState methods. // Replicas defines the consensus committee for the purposes of validating // votes, timeouts, quorum certificates, and timeout certificates. Any consensus // committee member who was authorized to contribute to consensus AT THE -// BEGINNING of the epoch may produce valid votes and timeouts for the entire -// epoch, even if they are later ejected. So for validating votes/timeouts we -// use *ByEpoch methods. +// BEGINNING of the rank may produce valid votes and timeouts for the entire +// rank, even if they are later ejected. So for validating votes/timeouts we +// use *ByRank methods. // -// Since the voter committee is considered static over an epoch: +// Since the voter committee is considered static over an rank: // - we can query identities by rank // - we don't need the full state ancestry prior to validating messages type Replicas interface { @@ -36,24 +36,24 @@ type Replicas interface { // slots even if it is slashed. Its proposal is simply considered // invalid, as it is not from a legitimate participant. // Returns the following expected errors for invalid inputs: - // - model.ErrRankUnknown if no epoch containing the given rank is + // - model.ErrRankUnknown if no rank containing the given rank is // known LeaderForRank(rank uint64) (models.Identity, error) // QuorumThresholdForRank returns the minimum total weight for a supermajority // at the given rank. This weight threshold is computed using the total weight - // of the initial committee and is static over the course of an epoch. + // of the initial committee and is static over the course of an rank. // Returns the following expected errors for invalid inputs: - // - model.ErrRankUnknown if no epoch containing the given rank is + // - model.ErrRankUnknown if no rank containing the given rank is // known QuorumThresholdForRank(rank uint64) (uint64, error) // TimeoutThresholdForRank returns the minimum total weight of observed // timeout states required to safely timeout for the given rank. This weight // threshold is computed using the total weight of the initial committee and - // is static over the course of an epoch. + // is static over the course of an rank. // Returns the following expected errors for invalid inputs: - // - model.ErrRankUnknown if no epoch containing the given rank is + // - model.ErrRankUnknown if no rank containing the given rank is // known TimeoutThresholdForRank(rank uint64) (uint64, error) @@ -65,22 +65,22 @@ type Replicas interface { Self() models.Identity // IdentitiesByRank returns a list of the legitimate HotStuff participants - // for the epoch given by the input rank. + // for the rank given by the input rank. // The returned list of HotStuff participants: // - contains nodes that are allowed to submit votes or timeouts within the - // given epoch (un-ejected, non-zero weight at the beginning of the epoch) + // given rank (un-ejected, non-zero weight at the beginning of the rank) // - is ordered in the canonical order // - contains no duplicates. // // CAUTION: DO NOT use this method for validating state proposals. // // Returns the following expected errors for invalid inputs: - // - model.ErrRankUnknown if no epoch containing the given rank is + // - model.ErrRankUnknown if no rank containing the given rank is // known // IdentitiesByRank(rank uint64) ([]models.WeightedIdentity, error) - // IdentityByEpoch returns the full Identity for specified HotStuff + // IdentityByRank returns the full Identity for specified HotStuff // participant. The node must be a legitimate HotStuff participant with // NON-ZERO WEIGHT at the specified state. // @@ -89,7 +89,7 @@ type Replicas interface { // authorized HotStuff participant at the specified state. // // Returns the following expected errors for invalid inputs: - // - model.ErrRankUnknown if no epoch containing the given rank is + // - model.ErrRankUnknown if no rank containing the given rank is // known // IdentityByRank( diff --git a/consensus/consensus_safety_rules.go b/consensus/consensus_safety_rules.go index 1148bd2..a056cfb 100644 --- a/consensus/consensus_safety_rules.go +++ b/consensus/consensus_safety_rules.go @@ -41,7 +41,7 @@ type SafetyRules[StateT models.Unique, VoteT models.Unique] interface { // * (nil, model.NoTimeoutError): If replica is not part of the authorized // consensus committee (anymore) and therefore is not authorized to produce // a valid timeout state. This sentinel error is _expected_ during normal - // operation, e.g. during the grace-period after Epoch switchover or after + // operation, e.g. during the grace-period after Rank switchover or after // the replica self-ejected. // All other errors are unexpected and potential symptoms of uncovered edge // cases or corrupted internal state (fatal). diff --git a/consensus/consensus_timeout.go b/consensus/consensus_timeout.go index cf61580..a98a45a 100644 --- a/consensus/consensus_timeout.go +++ b/consensus/consensus_timeout.go @@ -78,7 +78,7 @@ type TimeoutProcessor[VoteT models.Unique] interface { type TimeoutCollectorFactory[VoteT models.Unique] interface { // Create is a factory method to generate a TimeoutCollector for a given rank // Expected error returns during normal operations: - // * models.ErrRankUnknown no epoch containing the given rank is known + // * models.ErrRankUnknown no rank containing the given rank is known // All other errors should be treated as exceptions. Create(rank uint64) (TimeoutCollector[VoteT], error) } @@ -88,7 +88,7 @@ type TimeoutCollectorFactory[VoteT models.Unique] interface { type TimeoutProcessorFactory[VoteT models.Unique] interface { // Create is a factory method to generate a TimeoutProcessor for a given rank // Expected error returns during normal operations: - // * models.ErrRankUnknown no epoch containing the given rank is known + // * models.ErrRankUnknown no rank containing the given rank is known // All other errors should be treated as exceptions. Create(rank uint64) (TimeoutProcessor[VoteT], error) } @@ -102,7 +102,7 @@ type TimeoutCollectors[VoteT models.Unique] interface { // GetOrCreateCollector retrieves the TimeoutCollector for the specified // rank or creates one if none exists. When creating a timeout collector, // the rank is used to query the consensus committee for the respective - // Epoch the rank belongs to. + // Rank the rank belongs to. // It returns: // - (collector, true, nil) if no collector can be found by the rank, and a // new collector was created. @@ -111,7 +111,7 @@ type TimeoutCollectors[VoteT models.Unique] interface { // collector. // Expected error returns during normal operations: // * models.BelowPrunedThresholdError if rank is below the pruning threshold - // * models.ErrRankUnknown if rank is not yet pruned but no epoch containing + // * models.ErrRankUnknown if rank is not yet pruned but no rank containing // the given rank is known GetOrCreateCollector(rank uint64) ( collector TimeoutCollector[VoteT], diff --git a/consensus/eventhandler/event_handler.go b/consensus/eventhandler/event_handler.go index b3d2261..94c791f 100644 --- a/consensus/eventhandler/event_handler.go +++ b/consensus/eventhandler/event_handler.go @@ -343,7 +343,7 @@ func (e *EventHandler[ // models.TimeoutState, adds it to `timeoutAggregator` and broadcasts it to the // consensus commettee. We check, whether this node, at the current rank, is // part of the consensus committee. Otherwise, this method is functionally a -// no-op. For example, right after an epoch switchover a consensus node might +// no-op. For example, right after an rank switchover a consensus node might // still be online but not part of the _active_ consensus committee anymore. // Consequently, it should not broadcast timeouts anymore. No errors are // expected during normal operation. diff --git a/consensus/eventhandler/event_handler_test.go b/consensus/eventhandler/event_handler_test.go index 8b8dd33..a9b6b79 100644 --- a/consensus/eventhandler/event_handler_test.go +++ b/consensus/eventhandler/event_handler_test.go @@ -204,7 +204,7 @@ func NewSafetyRules(t *testing.T) *SafetyRules { timeout.Rank = curRank timeout.LatestQuorumCertificate = newestQC timeout.PriorRankTimeoutCertificate = lastRankTC - }) + }, helper.WithTimeoutVote(&helper.TestVote{Rank: curRank, ID: helper.MakeIdentity()})) }, func(uint64, models.QuorumCertificate, models.TimeoutCertificate) error { return nil }).Maybe() diff --git a/consensus/go.mod b/consensus/go.mod index 21e7be1..1775dd0 100644 --- a/consensus/go.mod +++ b/consensus/go.mod @@ -34,5 +34,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.11.1 go.uber.org/atomic v1.11.0 + golang.org/x/sync v0.17.0 golang.org/x/sys v0.33.0 // indirect ) diff --git a/consensus/go.sum b/consensus/go.sum index a5cbf1f..4eba35a 100644 --- a/consensus/go.sum +++ b/consensus/go.sum @@ -37,6 +37,8 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/consensus/helper/state.go b/consensus/helper/state.go index aee6672..cfccaa7 100644 --- a/consensus/helper/state.go +++ b/consensus/helper/state.go @@ -311,18 +311,28 @@ func WithWeightedIdentityList(count int) []models.WeightedIdentity { return wi } -func VoteForStateFixture[StateT models.Unique, VoteT models.Unique](state *models.State[StateT], ops ...func(vote *VoteT)) VoteT { - v := new(VoteT) - for _, op := range ops { - op(v) +func VoteForStateFixture(state *models.State[*TestState], ops ...func(vote **TestVote)) *TestVote { + v := &TestVote{ + Rank: state.Rank, + ID: MakeIdentity(), + StateID: state.Identifier, + Signature: make([]byte, 74), } - return *v + for _, op := range ops { + op(&v) + } + return v } -func VoteFixture[VoteT models.Unique](op func(vote *VoteT)) VoteT { - v := new(VoteT) - op(v) - return *v +func VoteFixture(op func(vote **TestVote)) *TestVote { + v := &TestVote{ + Rank: rand.Uint64(), + ID: MakeIdentity(), + StateID: MakeIdentity(), + Signature: make([]byte, 74), + } + op(&v) + return v } type FmtLog struct{} diff --git a/consensus/safetyrules/safety_rules.go b/consensus/safetyrules/safety_rules.go index 9346276..43e3936 100644 --- a/consensus/safetyrules/safety_rules.go +++ b/consensus/safetyrules/safety_rules.go @@ -176,7 +176,7 @@ func (r *SafetyRules[StateT, VoteT]) produceVote( if models.IsInvalidSignerError(err) { // the proposer must be ejected since the proposal has already been // validated, which ensures that the proposer was a valid committee member - // at the start of the epoch + // at the start of the rank return nil, models.NewNoVoteErrorf("proposer ejected: %w", err) } if err != nil { @@ -190,7 +190,7 @@ func (r *SafetyRules[StateT, VoteT]) produceVote( // (ii) Do not produce a vote for states where we are not an active // committee member. The HotStuff state machine may request to vote during - // grace periods outside the epochs, where the node is authorized to + // grace periods outside the ranks, where the node is authorized to // actively participate. If we voted during those grace periods, we would // needlessly waste network bandwidth, as such votes can't be used to // produce valid QCs. @@ -235,7 +235,7 @@ func (r *SafetyRules[StateT, VoteT]) produceVote( // - (nil, models.NoTimeoutError): If replica is not part of the authorized // consensus committee (anymore) and therefore is not authorized to produce // a valid timeout state. This sentinel error is _expected_ during normal -// operation, e.g. during the grace-period after Epoch switchover or after +// operation, e.g. during the grace-period after Rank switchover or after // the replica self-ejected. // // All other errors are unexpected and potential symptoms of uncovered edge diff --git a/consensus/safetyrules/safety_rules_test.go b/consensus/safetyrules/safety_rules_test.go index f47d2de..2c7b65d 100644 --- a/consensus/safetyrules/safety_rules_test.go +++ b/consensus/safetyrules/safety_rules_test.go @@ -285,7 +285,7 @@ func (s *SafetyRulesTestSuite) TestProduceVote_InvalidProposerIdentity() { } // TestProduceVote_NodeNotAuthorizedToVote tests that no vote is created if the voter is not authorized to vote. -// Nodes have zero weight in the grace periods around the epochs where they are authorized to participate. +// Nodes have zero weight in the grace periods around the ranks where they are authorized to participate. // We don't want zero-weight nodes to vote in the first place, to avoid unnecessary traffic. // Note: this also covers ejected nodes. In both cases, the committee will return an `InvalidSignerError`. func (s *SafetyRulesTestSuite) TestProduceVote_NodeEjected() { @@ -733,7 +733,7 @@ func (s *SafetyRulesTestSuite) TestProduceTimeout_InvalidProposerIdentity() { } // TestProduceTimeout_NodeEjected tests that no timeout is created if the replica is not authorized to create timeout. -// Nodes have zero weight in the grace periods around the epochs where they are authorized to participate. +// Nodes have zero weight in the grace periods around the ranks where they are authorized to participate. // We don't want zero-weight nodes to participate in the first place, to avoid unnecessary traffic. // Note: this also covers ejected nodes. In both cases, the committee will return an `InvalidSignerError`. func (s *SafetyRulesTestSuite) TestProduceTimeout_NodeEjected() { diff --git a/consensus/signature/state_signer_decoder.go b/consensus/signature/state_signer_decoder.go index fc8ddae..51d15ea 100644 --- a/consensus/signature/state_signer_decoder.go +++ b/consensus/signature/state_signer_decoder.go @@ -48,7 +48,7 @@ func (b *StateSignerDecoder[StateT]) DecodeSignerIDs( members, err := b.IdentitiesByRank(state.ParentQuorumCertificate.GetRank()) if err != nil { if errors.Is(err, models.ErrRankUnknown) { - // possibly, we request epoch which is far behind in the past, in this + // possibly, we request rank which is far behind in the past, in this // case we won't have it in cache. try asking by parent ID byStateMembers, err := b.IdentitiesByState( state.ParentQuorumCertificate.GetSelector(), diff --git a/consensus/timeoutaggregator/timeout_aggregator_test.go b/consensus/timeoutaggregator/timeout_aggregator_test.go index 95c541e..388b864 100644 --- a/consensus/timeoutaggregator/timeout_aggregator_test.go +++ b/consensus/timeoutaggregator/timeout_aggregator_test.go @@ -71,7 +71,7 @@ func (s *TimeoutAggregatorTestSuite) TestAddTimeout_HappyPath() { start.Add(timeoutsCount) for i := 0; i < timeoutsCount; i++ { go func() { - timeout := helper.TimeoutStateFixture[*helper.TestVote](helper.WithTimeoutStateRank[*helper.TestVote](s.lowestRetainedRank)) + timeout := helper.TimeoutStateFixture[*helper.TestVote](helper.WithTimeoutStateRank[*helper.TestVote](s.lowestRetainedRank), helper.WithTimeoutVote(&helper.TestVote{Rank: s.lowestRetainedRank, ID: helper.MakeIdentity()})) start.Done() // Wait for last worker routine to signal ready. Then, @@ -89,9 +89,9 @@ func (s *TimeoutAggregatorTestSuite) TestAddTimeout_HappyPath() { }, time.Second, time.Millisecond*20) } -// TestAddTimeout_EpochUnknown tests if timeout states targeting unknown epoch should be ignored -func (s *TimeoutAggregatorTestSuite) TestAddTimeout_EpochUnknown() { - timeout := helper.TimeoutStateFixture(helper.WithTimeoutStateRank[*helper.TestVote](s.lowestRetainedRank)) +// TestAddTimeout_RankUnknown tests if timeout states targeting unknown rank should be ignored +func (s *TimeoutAggregatorTestSuite) TestAddTimeout_RankUnknown() { + timeout := helper.TimeoutStateFixture(helper.WithTimeoutStateRank[*helper.TestVote](s.lowestRetainedRank), helper.WithTimeoutVote(&helper.TestVote{Rank: s.lowestRetainedRank, ID: helper.MakeIdentity()})) *s.collectors = *mocks.NewTimeoutCollectors[*helper.TestVote](s.T()) done := make(chan struct{}) s.collectors.On("GetOrCreateCollector", timeout.Rank).Return(nil, false, models.ErrRankUnknown).Run(func(args mock.Arguments) { diff --git a/consensus/timeoutaggregator/timeout_collectors.go b/consensus/timeoutaggregator/timeout_collectors.go index f8395e6..7e58936 100644 --- a/consensus/timeoutaggregator/timeout_collectors.go +++ b/consensus/timeoutaggregator/timeout_collectors.go @@ -47,7 +47,7 @@ func NewTimeoutCollectors[VoteT models.Unique]( // // Expected error returns during normal operations: // - models.BelowPrunedThresholdError if rank is below the pruning threshold -// - models.ErrRankUnknown if rank is not yet pruned but no epoch containing +// - models.ErrRankUnknown if rank is not yet pruned but no rank containing // the given rank is known, this error // // can be returned from factory method. diff --git a/consensus/timeoutaggregator/timeout_collectors_test.go b/consensus/timeoutaggregator/timeout_collectors_test.go index 5b8e663..3300cd7 100644 --- a/consensus/timeoutaggregator/timeout_collectors_test.go +++ b/consensus/timeoutaggregator/timeout_collectors_test.go @@ -78,9 +78,9 @@ func (s *TimeoutCollectorsTestSuite) TestGetOrCreateCollector_RankLowerThanLowes require.True(s.T(), models.IsBelowPrunedThresholdError(err)) } -// TestGetOrCreateCollector_UnknownEpoch tests a scenario where caller tries to create a collector with rank referring epoch +// TestGetOrCreateCollector_UnknownRank tests a scenario where caller tries to create a collector with rank referring rank // that we don't know about. This should result in sentinel error ` -func (s *TimeoutCollectorsTestSuite) TestGetOrCreateCollector_UnknownEpoch() { +func (s *TimeoutCollectorsTestSuite) TestGetOrCreateCollector_UnknownRank() { *s.factoryMethod = *mocks.NewTimeoutCollectorFactory[*helper.TestVote](s.T()) s.factoryMethod.On("Create", mock.Anything).Return(nil, models.ErrRankUnknown) collector, created, err := s.collectors.GetOrCreateCollector(s.lowestRank + 100) diff --git a/consensus/timeoutcollector/factory.go b/consensus/timeoutcollector/factory.go index c201df9..754cf66 100644 --- a/consensus/timeoutcollector/factory.go +++ b/consensus/timeoutcollector/factory.go @@ -33,7 +33,7 @@ func NewTimeoutCollectorFactory[VoteT models.Unique]( // Create is a factory method to generate a TimeoutCollector for a given rank // Expected error returns during normal operations: -// - models.ErrRankUnknown if rank is not yet pruned but no epoch containing +// - models.ErrRankUnknown if rank is not yet pruned but no rank containing // the given rank is known // // All other errors should be treated as exceptions. @@ -96,7 +96,7 @@ func NewTimeoutProcessorFactory[ // Create is a factory method to generate a TimeoutProcessor for a given rank // Expected error returns during normal operations: -// - models.ErrRankUnknown no epoch containing the given rank is known +// - models.ErrRankUnknown no rank containing the given rank is known // // All other errors should be treated as exceptions. func (f *TimeoutProcessorFactory[StateT, VoteT, PeerIDT]) Create(rank uint64) ( diff --git a/consensus/timeoutcollector/timeout_processor.go b/consensus/timeoutcollector/timeout_processor.go index ba51d8e..e062216 100644 --- a/consensus/timeoutcollector/timeout_processor.go +++ b/consensus/timeoutcollector/timeout_processor.go @@ -58,7 +58,7 @@ var _ consensus.TimeoutProcessor[*nilUnique] = (*TimeoutProcessor[*nilUnique, *n // NewTimeoutProcessor creates new instance of TimeoutProcessor // Returns the following expected errors for invalid inputs: -// - models.ErrRankUnknown if no epoch containing the given rank is known +// - models.ErrRankUnknown if no rank containing the given rank is known // // All other errors should be treated as exceptions. func NewTimeoutProcessor[ @@ -296,12 +296,12 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) validateTimeout( } if errors.Is(err, models.ErrRankUnknown) { // We require each replica to be bootstrapped with a QC pointing to a - // finalized state. Therefore, we should know the Epoch for any QC.Rank + // finalized state. Therefore, we should know the Rank for any QC.Rank // and TC.Rank we encounter. Receiving a `models.ErrRankUnknown` is // conceptually impossible, i.e. a symptom of an internal bug or invalid // bootstrapping information. return fmt.Errorf( - "no Epoch information availalbe for QC that was included in TO; symptom of internal bug or invalid bootstrapping information: %s", + "no Rank information availalbe for QC that was included in TO; symptom of internal bug or invalid bootstrapping information: %s", err.Error(), ) } @@ -323,12 +323,12 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) validateTimeout( } if errors.Is(err, models.ErrRankUnknown) { // We require each replica to be bootstrapped with a QC pointing to a - // finalized state. Therefore, we should know the Epoch for any QC.Rank + // finalized state. Therefore, we should know the Rank for any QC.Rank // and TC.Rank we encounter. Receiving a `models.ErrRankUnknown` is // conceptually impossible, i.e. a symptom of an internal bug or invalid // bootstrapping information. return fmt.Errorf( - "no Epoch information availalbe for TC that was included in TO; symptom of internal bug or invalid bootstrapping information: %s", + "no Rank information availalbe for TC that was included in TO; symptom of internal bug or invalid bootstrapping information: %s", err.Error(), ) } diff --git a/consensus/timeoutcollector/timeout_processor_test.go b/consensus/timeoutcollector/timeout_processor_test.go index 891135b..6c90576 100644 --- a/consensus/timeoutcollector/timeout_processor_test.go +++ b/consensus/timeoutcollector/timeout_processor_test.go @@ -58,7 +58,7 @@ func (s *TimeoutProcessorTestSuite) SetupTest() { s.committee.On("QuorumThresholdForRank", mock.Anything).Return(uint64(8000), nil).Maybe() s.committee.On("TimeoutThresholdForRank", mock.Anything).Return(uint64(8000), nil).Maybe() - s.committee.On("IdentityByEpoch", mock.Anything, mock.Anything).Return(s.signer, nil).Maybe() + s.committee.On("IdentityByRank", mock.Anything, mock.Anything).Return(s.signer, nil).Maybe() s.sigAggregator.On("Rank").Return(s.rank).Maybe() s.sigAggregator.On("VerifyAndAdd", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { s.totalWeight.Add(s.sigWeight) @@ -208,7 +208,7 @@ func (s *TimeoutProcessorTestSuite) TestProcess_IncludedQCInvalid() { require.ErrorIs(s.T(), err, exception) require.False(s.T(), models.IsInvalidTimeoutError[*helper.TestVote](err)) }) - s.Run("invalid-qc-err-rank-for-unknown-epoch", func() { + s.Run("invalid-qc-err-rank-for-unknown-rank", func() { *s.validator = *mocks.NewValidator[*helper.TestState, *helper.TestVote](s.T()) s.validator.On("ValidateQuorumCertificate", timeout.LatestQuorumCertificate).Return(models.ErrRankUnknown).Once() @@ -242,7 +242,7 @@ func (s *TimeoutProcessorTestSuite) TestProcess_IncludedTCInvalid() { require.ErrorIs(s.T(), err, exception) require.False(s.T(), models.IsInvalidTimeoutError[*helper.TestVote](err)) }) - s.Run("invalid-tc-err-rank-for-unknown-epoch", func() { + s.Run("invalid-tc-err-rank-for-unknown-rank", func() { *s.validator = *mocks.NewValidator[*helper.TestState, *helper.TestVote](s.T()) s.validator.On("ValidateQuorumCertificate", timeout.LatestQuorumCertificate).Return(nil) s.validator.On("ValidateTimeoutCertificate", timeout.PriorRankTimeoutCertificate).Return(models.ErrRankUnknown).Once() diff --git a/consensus/tracker/tracker_test.go b/consensus/tracker/tracker_test.go new file mode 100644 index 0000000..0ca65b3 --- /dev/null +++ b/consensus/tracker/tracker_test.go @@ -0,0 +1,154 @@ +package tracker + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +// TestNewNewestQCTracker checks that new instance returns nil tracked value. +func TestNewNewestQCTracker(t *testing.T) { + tracker := NewNewestQCTracker() + require.Nil(t, tracker.NewestQC()) +} + +// TestNewestQCTracker_Track this test is needed to make sure that concurrent updates on NewestQCTracker are performed correctly, +// and it always tracks the newest QC, especially in scenario of shared access. This test is structured in a way that it +// starts multiple goroutines that will try to submit their QCs simultaneously to the tracker. Once all goroutines are started +// we will use a wait group to execute all operations as concurrent as possible, after that we will observe if resulted value +// is indeed expected. This test will run multiple times. +func TestNewestQCTracker_Track(t *testing.T) { + tracker := NewNewestQCTracker() + samples := 20 // number of concurrent updates per test case + times := 20 // number of times we run the test case + + // setup initial value + initialQC := helper.MakeQC(helper.WithQCRank(0)) + tracker.Track(&initialQC) + + for i := 0; i < times; i++ { + startRank := (*tracker.NewestQC()).GetRank() + var readyWg, startWg, doneWg sync.WaitGroup + startWg.Add(1) + readyWg.Add(samples) + doneWg.Add(samples) + for s := 0; s < samples; s++ { + qc := helper.MakeQC(helper.WithQCRank(startRank + uint64(s+1))) + go func(newestQC *models.QuorumCertificate) { + defer doneWg.Done() + readyWg.Done() + startWg.Wait() + tracker.Track(newestQC) + }(&qc) + } + + // wait for all goroutines to be ready + readyWg.Wait() + // since we have waited for all goroutines to be ready this `Done` will start all goroutines + startWg.Done() + // wait for all of them to finish execution + doneWg.Wait() + + // at this point tracker MUST have the newest QC + require.Equal(t, startRank+uint64(samples), (*tracker.NewestQC()).GetRank()) + } +} + +// TestNewNewestTCTracker checks that new instance returns nil tracked value. +func TestNewNewestTCTracker(t *testing.T) { + tracker := NewNewestTCTracker() + require.Nil(t, tracker.NewestTC()) +} + +// TestNewestTCTracker_Track this test is needed to make sure that concurrent updates on NewestTCTracker are performed correctly, +// and it always tracks the newest TC, especially in scenario of shared access. This test is structured in a way that it +// starts multiple goroutines that will try to submit their TCs simultaneously to the tracker. Once all goroutines are started +// we will use a wait group to execute all operations as concurrent as possible, after that we will observe if resulted value +// is indeed expected. This test will run multiple times. +func TestNewestTCTracker_Track(t *testing.T) { + tracker := NewNewestTCTracker() + samples := 20 + times := 20 + + // setup initial value + initialTc := helper.MakeTC(helper.WithTCRank(0)) + tracker.Track(&initialTc) + + for i := 0; i < times; i++ { + startRank := (*tracker.NewestTC()).GetRank() + var readyWg, startWg, doneWg sync.WaitGroup + startWg.Add(1) + readyWg.Add(samples) + doneWg.Add(samples) + for s := 0; s < samples; s++ { + tc := helper.MakeTC(helper.WithTCRank(startRank + uint64(s+1))) + go func(newestTC *models.TimeoutCertificate) { + defer doneWg.Done() + readyWg.Done() + startWg.Wait() + tracker.Track(newestTC) + }(&tc) + } + + // wait for all goroutines to be ready + readyWg.Wait() + // since we have waited for all goroutines to be ready this `Done` will start all goroutines + startWg.Done() + // wait for all of them to finish execution + doneWg.Wait() + + // at this point tracker MUST have the newest TC + require.Equal(t, startRank+uint64(samples), (*tracker.NewestTC()).GetRank()) + } +} + +// TestNewNewestStateTracker checks that new instance returns nil tracked value. +func TestNewNewestStateTracker(t *testing.T) { + tracker := NewNewestStateTracker[*helper.TestState]() + require.Nil(t, tracker.NewestState()) +} + +// TestNewestStateTracker_Track this test is needed to make sure that concurrent updates on NewestStateTracker are performed correctly, +// and it always tracks the newest state, especially in scenario of shared access. This test is structured in a way that it +// starts multiple goroutines that will try to submit their states simultaneously to the tracker. Once all goroutines are started +// we will use a wait group to execute all operations as concurrent as possible, after that we will observe if resulted value +// is indeed expected. This test will run multiple times. +func TestNewestStateTracker_Track(t *testing.T) { + tracker := NewNewestStateTracker[*helper.TestState]() + samples := 20 // number of concurrent updates per test case + times := 20 // number of times we run the test case + + // setup initial value + tracker.Track(helper.MakeState(helper.WithStateRank[*helper.TestState](0))) + + for i := 0; i < times; i++ { + startRank := tracker.NewestState().Rank + var readyWg, startWg, doneWg sync.WaitGroup + startWg.Add(1) + readyWg.Add(samples) + doneWg.Add(samples) + for s := 0; s < samples; s++ { + state := helper.MakeState(helper.WithStateRank[*helper.TestState](startRank + uint64(s+1))) + go func(newestState *models.State[*helper.TestState]) { + defer doneWg.Done() + readyWg.Done() + startWg.Wait() + tracker.Track(newestState) + }(state) + } + + // wait for all goroutines to be ready + readyWg.Wait() + // since we have waited for all goroutines to be ready this `Done` will start all goroutines + startWg.Done() + // wait for all of them to finish execution + doneWg.Wait() + + // at this point tracker MUST have the newest state + require.Equal(t, startRank+uint64(samples), tracker.NewestState().Rank) + } +} diff --git a/consensus/validator/validator.go b/consensus/validator/validator.go index df2fff2..56bd343 100644 --- a/consensus/validator/validator.go +++ b/consensus/validator/validator.go @@ -30,7 +30,7 @@ func NewValidator[StateT models.Unique, VoteT models.Unique]( // ValidateTimeoutCertificate validates the TimeoutCertificate `TC`. // During normal operations, the following error returns are expected: // - models.InvalidTCError if the TC is invalid -// - models.ErrRankUnknown if the TC refers unknown epoch +// - models.ErrRankUnknown if the TC refers unknown rank // // Any other error should be treated as exception func (v *Validator[StateT, VoteT]) ValidateTimeoutCertificate( @@ -69,6 +69,10 @@ func (v *Validator[StateT, VoteT]) ValidateTimeoutCertificate( sigIndices := tc.GetAggregatedSignature().GetBitmask() totalWeight := uint64(0) for i, member := range allParticipants { + if len(sigIndices) < (i/8)+1 { + return models.NewInsufficientSignaturesErrorf("insufficient signatures") + } + if sigIndices[i/8]>>i%8&1 == 1 { signerIDs = append(signerIDs, member) totalWeight += member.Weight() @@ -79,10 +83,13 @@ func (v *Validator[StateT, VoteT]) ValidateTimeoutCertificate( // consensus threshold, err := v.committee.QuorumThresholdForRank(tc.GetRank()) if err != nil { - return fmt.Errorf( - "could not get weight threshold for rank %d: %w", - tc.GetRank(), - err, + return newInvalidTimeoutCertificateError( + tc, + fmt.Errorf( + "could not get weight threshold for rank %d: %w", + tc.GetRank(), + err, + ), ) } @@ -166,11 +173,11 @@ func (v *Validator[StateT, VoteT]) ValidateTimeoutCertificate( // must include signatures from a supermajority of replicas, including at // least one honest replica, which attest to their locally highest known // QC. Hence, any QC included in a TC must be the root QC or newer. - // Therefore, we should know the Epoch for any QC we encounter. Receiving + // Therefore, we should know the rank for any QC we encounter. Receiving // a `models.ErrRankUnknown` is conceptually impossible, i.e. a symptom of // an internal bug or invalid bootstrapping information. return fmt.Errorf( - "no Epoch information availalbe for QC that was included in TC; symptom of internal bug or invalid bootstrapping information: %s", + "no rank information availalbe for QC that was included in TC; symptom of internal bug or invalid bootstrapping information: %s", err.Error(), ) } @@ -186,18 +193,18 @@ func (v *Validator[StateT, VoteT]) ValidateTimeoutCertificate( // ValidateQuorumCertificate validates the Quorum Certificate `qc`. // During normal operations, the following error returns are expected: // - models.InvalidQCError if the QC is invalid -// - models.ErrRankUnknown if the QC refers unknown epoch +// - models.ErrRankUnknown if the QC refers unknown rank // // Any other error should be treated as exception func (v *Validator[StateT, VoteT]) ValidateQuorumCertificate( qc models.QuorumCertificate, ) error { - // Retrieve the initial identities of consensus participants for this epoch, + // Retrieve the initial identities of consensus participants for this rank, // and those that signed the QC. IdentitiesByRank contains all nodes that were - // authorized to sign during this epoch. Ejection and dynamic weight - // adjustments are not taken into account here. By using an epoch-static set - // of authorized - // signers, we can check QC validity without needing all ancestor states. + // authorized to sign during this rank. Ejection and dynamic weight + // adjustments are not taken into account here. By using an rank-static set + // of authorized signers, we can check QC validity without needing all + // ancestor states. allParticipants, err := v.committee.IdentitiesByRank(qc.GetRank()) if err != nil { return fmt.Errorf( @@ -211,6 +218,12 @@ func (v *Validator[StateT, VoteT]) ValidateQuorumCertificate( sigIndices := qc.GetAggregatedSignature().GetBitmask() totalWeight := uint64(0) for i, member := range allParticipants { + if len(sigIndices) < (i/8)+1 { + return newInvalidQuorumCertificateError( + qc, + models.NewInsufficientSignaturesErrorf("insufficient signatures"), + ) + } if sigIndices[i/8]>>i%8&1 == 1 { signerIDs = append(signerIDs, member) totalWeight += member.Weight() @@ -221,12 +234,16 @@ func (v *Validator[StateT, VoteT]) ValidateQuorumCertificate( // consensus threshold, err := v.committee.QuorumThresholdForRank(qc.GetRank()) if err != nil { - return fmt.Errorf( - "could not get weight threshold for rank %d: %w", - qc.GetRank(), - err, + return newInvalidQuorumCertificateError( + qc, + fmt.Errorf( + "could not get weight threshold for rank %d: %w", + qc.GetRank(), + err, + ), ) } + if totalWeight < threshold { return newInvalidQuorumCertificateError( qc, @@ -265,7 +282,7 @@ func (v *Validator[StateT, VoteT]) ValidateQuorumCertificate( // ErrRankUnknown. To avoid confusion with expected sentinel errors, we // only preserve the error messages here, but not the error types. return fmt.Errorf( - "internal error, as querying identities for rank %d succeeded earlier but now the rank supposedly belongs to an unknown epoch: %s", + "internal error, as querying identities for rank %d succeeded earlier but now the rank supposedly belongs to an unknown rank: %s", qc.GetRank(), err.Error(), ) @@ -286,7 +303,7 @@ func (v *Validator[StateT, VoteT]) ValidateQuorumCertificate( // Note it doesn't check if it's conflicting with finalized state // During normal operations, the following error returns are expected: // - models.InvalidProposalError if the state is invalid -// - models.ErrRankUnknown if the proposal refers unknown epoch +// - models.ErrRankUnknown if the proposal refers unknown rank // // Any other error should be treated as exception func (v *Validator[StateT, VoteT]) ValidateProposal( @@ -395,7 +412,7 @@ func (v *Validator[StateT, VoteT]) ValidateProposal( // conceptually impossible, i.e. a symptom of an internal bug or invalid // bootstrapping information. return fmt.Errorf( - "no Epoch information availalbe for QC that was included in proposal; symptom of internal bug or invalid bootstrapping information: %s", + "no rank information availalbe for QC that was included in proposal; symptom of internal bug or invalid bootstrapping information: %s", err.Error(), ) } @@ -415,12 +432,12 @@ func (v *Validator[StateT, VoteT]) ValidateProposal( } if errors.Is(err, models.ErrRankUnknown) { // We require each replica to be bootstrapped with a QC pointing to a - // finalized state. Therefore, we should know the Epoch for any QC.Rank + // finalized state. Therefore, we should know the rank for any QC.Rank // and TC.Rank we encounter. Receiving a `models.ErrRankUnknown` is // conceptually impossible, i.e. a symptom of an internal bug or invalid // bootstrapping information. return fmt.Errorf( - "no Epoch information availalbe for QC that was included in TC; symptom of internal bug or invalid bootstrapping information: %s", + "no rank information availalbe for QC that was included in TC; symptom of internal bug or invalid bootstrapping information: %s", err.Error(), ) } @@ -438,7 +455,7 @@ func (v *Validator[StateT, VoteT]) ValidateProposal( // signed the vote - the vote to be validated // During normal operations, the following error returns are expected: // - models.InvalidVoteError for invalid votes -// - models.ErrRankUnknown if the vote refers unknown epoch +// - models.ErrRankUnknown if the vote refers unknown rank // // Any other error should be treated as exception func (v *Validator[StateT, VoteT]) ValidateVote(vote *VoteT) ( @@ -472,7 +489,7 @@ func (v *Validator[StateT, VoteT]) ValidateVote(vote *VoteT) ( } if errors.Is(err, models.ErrRankUnknown) { return nil, fmt.Errorf( - "no Epoch information available for vote; symptom of internal bug or invalid bootstrapping information: %s", + "no rank information available for vote; symptom of internal bug or invalid bootstrapping information: %s", err.Error(), ) } diff --git a/consensus/validator/validator_test.go b/consensus/validator/validator_test.go new file mode 100644 index 0000000..b8d16cd --- /dev/null +++ b/consensus/validator/validator_test.go @@ -0,0 +1,933 @@ +package validator + +import ( + "errors" + "fmt" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + "source.quilibrium.com/quilibrium/monorepo/consensus/mocks" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +func TestValidateProposal(t *testing.T) { + suite.Run(t, new(ProposalSuite)) +} + +type ProposalSuite struct { + suite.Suite + participants []models.WeightedIdentity + indices []byte + leader models.WeightedIdentity + finalized uint64 + parent *models.State[*helper.TestState] + state *models.State[*helper.TestState] + voters []models.WeightedIdentity + proposal *models.SignedProposal[*helper.TestState, *helper.TestVote] + vote *helper.TestVote + voter models.WeightedIdentity + committee *mocks.Replicas + verifier *mocks.Verifier[*helper.TestVote] + validator *Validator[*helper.TestState, *helper.TestVote] +} + +func (ps *ProposalSuite) SetupTest() { + // the leader is a random node for now + ps.finalized = uint64(rand.Uint32() + 1) + ps.participants = helper.WithWeightedIdentityList(8) + ps.leader = ps.participants[0] + + // the parent is the last finalized state, followed directly by a state from the leader + ps.parent = helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.finalized), + ) + + var err error + + ps.indices = []byte{0b11111111} + + ps.state = helper.MakeState( + helper.WithStateRank[*helper.TestState](ps.finalized+1), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentState(ps.parent), + helper.WithParentSigners[*helper.TestState](ps.indices), + ) + + ps.voters = ps.participants + vt := &helper.TestVote{ + Rank: ps.state.Rank, + ID: ps.leader.Identity(), + Signature: make([]byte, 74), + StateID: ps.state.Identifier, + } + ps.proposal = helper.MakeSignedProposal( + helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal(helper.WithState(ps.state))), + helper.WithVote[*helper.TestState, *helper.TestVote](&vt), + ) + vote, err := ps.proposal.ProposerVote() + require.NoError(ps.T(), err) + ps.vote = *vote + ps.voter = ps.leader + + // set up the mocked hotstuff Replicas state + ps.committee = &mocks.Replicas{} + ps.committee.On("LeaderForRank", ps.state.Rank).Return(ps.leader.Identity(), nil) + ps.committee.On("QuorumThresholdForRank", mock.Anything).Return(uint64(8000), nil) + ps.committee.On("IdentitiesByRank", mock.Anything).Return( + func(_ uint64) []models.WeightedIdentity { + return ps.participants + }, + nil, + ) + for _, participant := range ps.participants { + ps.committee.On("IdentityByRank", mock.Anything, participant.Identity()).Return(participant, nil) + } + + // set up the mocked verifier + ps.verifier = &mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return(nil).Maybe() + ps.verifier.On("VerifyVote", &ps.vote).Return(nil).Maybe() + + // set up the validator with the mocked dependencies + ps.validator = NewValidator[*helper.TestState, *helper.TestVote](ps.committee, ps.verifier) +} + +func (ps *ProposalSuite) TestProposalOK() { + err := ps.validator.ValidateProposal(ps.proposal) + assert.NoError(ps.T(), err, "a valid proposal should be accepted") +} + +func (ps *ProposalSuite) TestProposalSignatureError() { + + // change the verifier to error on signature validation with unspecific error + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return(nil) + ps.verifier.On("VerifyVote", &ps.vote).Return(errors.New("dummy error")) + + // check that validation now fails + err := ps.validator.ValidateProposal(ps.proposal) + assert.Error(ps.T(), err, "a proposal should be rejected if signature check fails") + + // check that the error is not one that leads to invalid + assert.False(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err), "if signature check fails, we should not receive an ErrorInvalidState") +} + +func (ps *ProposalSuite) TestProposalSignatureInvalidFormat() { + + // change the verifier to fail signature validation with InvalidFormatError error + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return(nil) + ps.verifier.On("VerifyVote", &ps.vote).Return(models.NewInvalidFormatErrorf("")) + + // check that validation now fails + err := ps.validator.ValidateProposal(ps.proposal) + assert.Error(ps.T(), err, "a proposal with an invalid signature should be rejected") + + // check that the error is an invalid proposal error to allow creating slashing challenge + assert.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err), "if signature is invalid, we should generate an invalid error") +} + +func (ps *ProposalSuite) TestProposalSignatureInvalid() { + + // change the verifier to fail signature validation + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return(nil) + ps.verifier.On("VerifyVote", &ps.vote).Return(models.ErrInvalidSignature) + + // check that validation now fails + err := ps.validator.ValidateProposal(ps.proposal) + assert.Error(ps.T(), err, "a proposal with an invalid signature should be rejected") + + // check that the error is an invalid proposal error to allow creating slashing challenge + assert.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err), "if signature is invalid, we should generate an invalid error") +} + +func (ps *ProposalSuite) TestProposalWrongLeader() { + + // change the consensus.Replicas to return a different leader + *ps.committee = mocks.Replicas{} + ps.committee.On("LeaderForRank", ps.state.Rank).Return(ps.participants[1].Identity(), nil) + for _, participant := range ps.participants { + ps.committee.On("IdentityByRank", mock.Anything, participant.Identity()).Return(participant, nil) + } + + // check that validation fails now + err := ps.validator.ValidateProposal(ps.proposal) + assert.Error(ps.T(), err, "a proposal from the wrong proposer should be rejected") + + // check that the error is an invalid proposal error to allow creating slashing challenge + assert.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err), "if the proposal has wrong proposer, we should generate a invalid error") +} + +// TestProposalQCInvalid checks that Validator handles the verifier's error returns correctly. +// In case of `models.InvalidFormatError` and models.ErrInvalidSignature`, we expect the Validator +// to recognize those as an invalid QC, i.e. returns an `models.InvalidProposalError`. +// In contrast, unexpected exceptions and `models.InvalidSignerError` should _not_ be +// interpreted as a sign of an invalid QC. +func (ps *ProposalSuite) TestProposalQCInvalid() { + ps.Run("invalid-signature", func() { + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return( + fmt.Errorf("invalid qc: %w", models.ErrInvalidSignature)) + ps.verifier.On("VerifyVote", &ps.vote).Return(nil) + + // check that validation fails and the failure case is recognized as an invalid state + err := ps.validator.ValidateProposal(ps.proposal) + assert.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err), "if the state's QC signature is invalid, an ErrorInvalidState error should be raised") + }) + + ps.Run("invalid-format", func() { + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return(models.NewInvalidFormatErrorf("invalid qc")) + ps.verifier.On("VerifyVote", &ps.vote).Return(nil) + + // check that validation fails and the failure case is recognized as an invalid state + err := ps.validator.ValidateProposal(ps.proposal) + assert.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err), "if the state's QC has an invalid format, an ErrorInvalidState error should be raised") + }) + + ps.Run("invalid-signer", func() { + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return( + fmt.Errorf("invalid qc: %w", models.NewInvalidSignerErrorf(""))) + ps.verifier.On("VerifyVote", &ps.vote).Return(nil) + + // check that validation fails and the failure case is recognized as an invalid state + err := ps.validator.ValidateProposal(ps.proposal) + assert.Error(ps.T(), err) + assert.False(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + }) + + ps.Run("unknown-exception", func() { + exception := errors.New("exception") + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return(exception) + ps.verifier.On("VerifyVote", &ps.vote).Return(nil) + + // check that validation fails and the failure case is recognized as an invalid state + err := ps.validator.ValidateProposal(ps.proposal) + assert.ErrorIs(ps.T(), err, exception) + assert.False(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + }) + + ps.Run("verify-qc-err-rank-for-unknown-rank", func() { + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return(models.ErrRankUnknown) + ps.verifier.On("VerifyVote", &ps.vote).Return(nil) + + // check that validation fails and the failure is considered internal exception and NOT an InvalidProposal error + err := ps.validator.ValidateProposal(ps.proposal) + assert.Error(ps.T(), err) + assert.NotErrorIs(ps.T(), err, models.ErrRankUnknown) + assert.False(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + }) +} + +func (ps *ProposalSuite) TestProposalQCError() { + + // change verifier to fail on QC validation + *ps.verifier = mocks.Verifier[*helper.TestVote]{} + ps.verifier.On("VerifyQuorumCertificate", ps.state.ParentQuorumCertificate).Return(fmt.Errorf("some exception")) + ps.verifier.On("VerifyVote", &ps.vote).Return(nil) + + // check that validation fails now + err := ps.validator.ValidateProposal(ps.proposal) + assert.Error(ps.T(), err, "a proposal with an invalid QC should be rejected") + + // check that the error is an invalid proposal error to allow creating slashing challenge + assert.False(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err), "if we can't verify the QC, we should not generate a invalid error") +} + +// TestProposalWithPreviousRankTimeoutCertificate tests different scenarios where last rank has ended with TC +// this requires including a valid PreviousRankTimeoutCertificate. +func (ps *ProposalSuite) TestProposalWithPreviousRankTimeoutCertificate() { + // assume all proposals are created by valid leader + ps.verifier.On("VerifyVote", mock.Anything).Return(nil) + ps.committee.On("LeaderForRank", mock.Anything).Return(ps.leader.Identity(), nil) + + ps.Run("happy-path", func() { + state := helper.MakeState( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + vote := &helper.TestVote{ + Rank: ps.state.Rank + 2, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + proposal := helper.MakeSignedProposal(helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC( + helper.WithTCSigners(ps.indices), + helper.WithTCRank(ps.state.Rank+1), + helper.WithTCNewestQC(ps.state.ParentQuorumCertificate))), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + ps.verifier.On("VerifyTimeoutCertificate", proposal.PreviousRankTimeoutCertificate).Return(nil).Once() + err := ps.validator.ValidateProposal(proposal) + require.NoError(ps.T(), err) + }) + ps.Run("no-tc", func() { + state := helper.MakeState( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + vote := &helper.TestVote{ + Rank: ps.state.Rank + 2, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + proposal := helper.MakeSignedProposal(helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + // in this case proposal without PreviousRankTimeoutCertificate is considered invalid + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + err := ps.validator.ValidateProposal(proposal) + require.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + ps.verifier.AssertNotCalled(ps.T(), "VerifyQuorumCertificate") + ps.verifier.AssertNotCalled(ps.T(), "VerifyTimeoutCertificate") + }) + ps.Run("tc-for-wrong-rank", func() { + state := helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + vote := &helper.TestVote{ + Rank: ps.state.Rank + 2, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC( + helper.WithTCSigners(ps.indices), + helper.WithTCRank(ps.state.Rank+10), // PreviousRankTimeoutCertificate.Rank must be equal to State.Rank-1 + helper.WithTCNewestQC(ps.state.ParentQuorumCertificate))), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + err := ps.validator.ValidateProposal(proposal) + require.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + ps.verifier.AssertNotCalled(ps.T(), "VerifyQuorumCertificate") + ps.verifier.AssertNotCalled(ps.T(), "VerifyTimeoutCertificate") + }) + ps.Run("proposal-not-safe-to-extend", func() { + state := helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + vote := &helper.TestVote{ + Rank: state.Rank, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC( + helper.WithTCSigners(ps.indices), + helper.WithTCRank(ps.state.Rank+1), + // proposal is not safe to extend because included QC.Rank is higher that State.QC.Rank + helper.WithTCNewestQC(helper.MakeQC(helper.WithQCRank(ps.state.Rank+1))))), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + err := ps.validator.ValidateProposal(proposal) + require.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + ps.verifier.AssertNotCalled(ps.T(), "VerifyQuorumCertificate") + ps.verifier.AssertNotCalled(ps.T(), "VerifyTimeoutCertificate") + }) + ps.Run("included-tc-highest-qc-not-highest", func() { + state := helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + vote := &helper.TestVote{ + Rank: state.Rank, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC( + helper.WithTCSigners(ps.indices), + helper.WithTCRank(ps.state.Rank+1), + helper.WithTCNewestQC(ps.state.ParentQuorumCertificate), + )), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + ps.verifier.On("VerifyTimeoutCertificate", proposal.PreviousRankTimeoutCertificate).Return(nil).Once() + + // this is considered an invalid TC, because highest QC's rank is not equal to max{NewestQCRanks} + proposal.PreviousRankTimeoutCertificate.(*helper.TestTimeoutCertificate).LatestRanks[0] = proposal.PreviousRankTimeoutCertificate.GetLatestQuorumCert().GetRank() + 1 + err := ps.validator.ValidateProposal(proposal) + require.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err) && models.IsInvalidTimeoutCertificateError(err)) + ps.verifier.AssertNotCalled(ps.T(), "VerifyTimeoutCertificate") + }) + ps.Run("included-tc-threshold-not-reached", func() { + state := helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + vote := &helper.TestVote{ + Rank: state.Rank, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + // TC is signed by only one signer - insufficient to reach weight threshold + insufficientSignerIndices := []byte{0b00000001} + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC( + helper.WithTCSigners(insufficientSignerIndices), // one signer is not enough to reach threshold + helper.WithTCRank(ps.state.Rank+1), + helper.WithTCNewestQC(ps.state.ParentQuorumCertificate), + )), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + err := ps.validator.ValidateProposal(proposal) + require.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err) && models.IsInvalidTimeoutCertificateError(err)) + ps.verifier.AssertNotCalled(ps.T(), "VerifyTimeoutCertificate") + }) + ps.Run("included-tc-highest-qc-invalid", func() { + state := helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + vote := &helper.TestVote{ + Rank: state.Rank, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + // QC included in TC has rank below QC included in proposal + qc := helper.MakeQC( + helper.WithQCRank(ps.state.ParentQuorumCertificate.GetRank()-1), + helper.WithQCSigners(ps.indices)) + + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC( + helper.WithTCSigners(ps.indices), + helper.WithTCRank(ps.state.Rank+1), + helper.WithTCNewestQC(qc))), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + ps.verifier.On("VerifyTimeoutCertificate", proposal.PreviousRankTimeoutCertificate).Return(nil).Once() + ps.verifier.On("VerifyQuorumCertificate", qc).Return(models.ErrInvalidSignature).Once() + err := ps.validator.ValidateProposal(proposal) + require.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err) && models.IsInvalidTimeoutCertificateError(err)) + }) + ps.Run("verify-qc-err-rank-for-unknown-rank", func() { + state := helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + newestQC := helper.MakeQC( + helper.WithQCRank(ps.state.ParentQuorumCertificate.GetRank()-2), + helper.WithQCSigners(ps.indices)) + vote := &helper.TestVote{ + Rank: state.Rank, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC( + helper.WithTCSigners(ps.indices), + helper.WithTCRank(ps.state.Rank+1), + helper.WithTCNewestQC(newestQC))), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + ps.verifier.On("VerifyTimeoutCertificate", proposal.PreviousRankTimeoutCertificate).Return(nil).Once() + // Validating QC included in TC returns ErrRankUnknown + ps.verifier.On("VerifyQuorumCertificate", newestQC).Return(models.ErrRankUnknown).Once() + err := ps.validator.ValidateProposal(proposal) + require.Error(ps.T(), err) + require.False(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + require.False(ps.T(), models.IsInvalidTimeoutCertificateError(err)) + require.NotErrorIs(ps.T(), err, models.ErrRankUnknown) + }) + ps.Run("included-tc-invalid-sig", func() { + state := helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.state.Rank+2), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithStateQC[*helper.TestState](ps.state.ParentQuorumCertificate)) + vote := &helper.TestVote{ + Rank: state.Rank, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC( + helper.WithTCSigners(ps.indices), + helper.WithTCRank(ps.state.Rank+1), + helper.WithTCNewestQC(ps.state.ParentQuorumCertificate))), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + ps.verifier.On("VerifyTimeoutCertificate", proposal.PreviousRankTimeoutCertificate).Return(models.ErrInvalidSignature).Once() + err := ps.validator.ValidateProposal(proposal) + require.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err) && models.IsInvalidTimeoutCertificateError(err)) + ps.verifier.AssertCalled(ps.T(), "VerifyTimeoutCertificate", proposal.PreviousRankTimeoutCertificate) + }) + ps.Run("last-rank-successful-but-includes-tc", func() { + state := helper.MakeState[*helper.TestState]( + helper.WithStateRank[*helper.TestState](ps.finalized+1), + helper.WithStateProposer[*helper.TestState](ps.leader.Identity()), + helper.WithParentSigners[*helper.TestState](ps.indices), + helper.WithParentState(ps.parent)) + vote := &helper.TestVote{ + Rank: state.Rank, + ID: ps.leader.Identity(), + StateID: state.Identifier, + Signature: make([]byte, 74), + } + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState(state), + helper.WithPreviousRankTimeoutCertificate[*helper.TestState](helper.MakeTC()), + )), helper.WithVote[*helper.TestState, *helper.TestVote](&vote)) + err := ps.validator.ValidateProposal(proposal) + require.True(ps.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + ps.verifier.AssertNotCalled(ps.T(), "VerifyTimeoutCertificate") + }) + ps.verifier.AssertExpectations(ps.T()) +} + +func TestValidateVote(t *testing.T) { + suite.Run(t, new(VoteSuite)) +} + +type VoteSuite struct { + suite.Suite + signer models.WeightedIdentity + state *models.State[*helper.TestState] + vote *helper.TestVote + verifier *mocks.Verifier[*helper.TestVote] + committee *mocks.Replicas + validator *Validator[*helper.TestState, *helper.TestVote] +} + +func (vs *VoteSuite) SetupTest() { + + // create a random signing identity + vs.signer = helper.WithWeightedIdentityList(1)[0] + + // create a state that should be signed + vs.state = helper.MakeState[*helper.TestState]() + + // create a vote for this state + vs.vote = &helper.TestVote{ + Rank: vs.state.Rank, + ID: vs.signer.Identity(), + StateID: vs.state.Identifier, + Signature: []byte{}, + } + + // set up the mocked verifier + vs.verifier = &mocks.Verifier[*helper.TestVote]{} + vs.verifier.On("VerifyVote", &vs.vote).Return(nil) + + // the leader for the state rank is the correct one + vs.committee = &mocks.Replicas{} + vs.committee.On("IdentityByRank", mock.Anything, vs.signer.Identity()).Return(vs.signer, nil) + + // set up the validator with the mocked dependencies + vs.validator = NewValidator[*helper.TestState, *helper.TestVote](vs.committee, vs.verifier) +} + +// TestVoteOK checks the happy case, which is the default for the suite +func (vs *VoteSuite) TestVoteOK() { + _, err := vs.validator.ValidateVote(&vs.vote) + assert.NoError(vs.T(), err, "a valid vote should be accepted") +} + +// TestVoteSignatureError checks that the Validator does not misinterpret +// unexpected exceptions for invalid votes. +func (vs *VoteSuite) TestVoteSignatureError() { + *vs.verifier = mocks.Verifier[*helper.TestVote]{} + vs.verifier.On("VerifyVote", &vs.vote).Return(fmt.Errorf("some exception")) + + // check that the vote is no longer validated + _, err := vs.validator.ValidateVote(&vs.vote) + assert.Error(vs.T(), err, "a vote with error on signature validation should be rejected") + assert.False(vs.T(), models.IsInvalidVoteError[*helper.TestVote](err), "internal exception should not be interpreted as invalid vote") +} + +// TestVoteVerifyVote_ErrRankUnknown tests if ValidateVote correctly handles VerifyVote's ErrRankUnknown sentinel error +// Validator shouldn't return a sentinel error here because this behavior is a symptom of internal bug, this behavior is not expected. +func (vs *VoteSuite) TestVoteVerifyVote_ErrRankUnknown() { + *vs.verifier = mocks.Verifier[*helper.TestVote]{} + vs.verifier.On("VerifyVote", &vs.vote).Return(models.ErrRankUnknown) + + // check that the vote is no longer validated + _, err := vs.validator.ValidateVote(&vs.vote) + assert.Error(vs.T(), err) + assert.False(vs.T(), models.IsInvalidVoteError[*helper.TestVote](err), "internal exception should not be interpreted as invalid vote") + assert.NotErrorIs(vs.T(), err, models.ErrRankUnknown, "we don't expect a sentinel error here") +} + +// TestVoteInvalidSignerID checks that the Validator correctly handles a vote +// with a SignerID that does not correspond to a valid consensus participant. +// In this case, the `consensus.DynamicCommittee` returns a `models.InvalidSignerError`, +// which the Validator should recognize as a symptom for an invalid vote. +// Hence, we expect the validator to return a `models.InvalidVoteError`. +func (vs *VoteSuite) TestVoteInvalidSignerID() { + *vs.committee = mocks.Replicas{} + vs.committee.On("IdentityByRank", vs.state.Rank, vs.vote.ID).Return(nil, models.NewInvalidSignerErrorf("")) + + // A `models.InvalidSignerError` from the committee should be interpreted as + // the Vote being invalid, i.e. we expect an InvalidVoteError to be returned + _, err := vs.validator.ValidateVote(&vs.vote) + assert.Error(vs.T(), err, "a vote with unknown SignerID should be rejected") + assert.True(vs.T(), models.IsInvalidVoteError[*helper.TestVote](err), "a vote with unknown SignerID should be rejected") +} + +// TestVoteSignatureInvalid checks that the Validator correctly handles votes +// with cryptographically invalid consensus. In this case, the `consensus.Verifier` +// returns a `models.ErrInvalidSignature`, which the Validator should recognize as +// a symptom for an invalid vote. +// Hence, we expect the validator to return a `models.InvalidVoteError`. +func (vs *VoteSuite) TestVoteSignatureInvalid() { + *vs.verifier = mocks.Verifier[*helper.TestVote]{} + vs.verifier.On("VerifyVote", &vs.vote).Return(fmt.Errorf("staking sig is invalid: %w", models.ErrInvalidSignature)) + + // A `models.ErrInvalidSignature` from the `consensus.Verifier` should be interpreted as + // the Vote being invalid, i.e. we expect an InvalidVoteError to be returned + _, err := vs.validator.ValidateVote(&vs.vote) + assert.Error(vs.T(), err, "a vote with an invalid signature should be rejected") + assert.True(vs.T(), models.IsInvalidVoteError[*helper.TestVote](err), "a vote with an invalid signature should be rejected") +} + +func TestValidateQuorumCertificate(t *testing.T) { + suite.Run(t, new(QCSuite)) +} + +type QCSuite struct { + suite.Suite + participants []models.WeightedIdentity + signers []models.WeightedIdentity + state *models.State[*helper.TestState] + qc models.QuorumCertificate + committee *mocks.Replicas + verifier *mocks.Verifier[*helper.TestVote] + validator *Validator[*helper.TestState, *helper.TestVote] +} + +func (qs *QCSuite) SetupTest() { + // create a list of 10 nodes with 1-weight each + qs.participants = helper.WithWeightedIdentityList(10) + + // signers are a qualified majority at 7 + qs.signers = qs.participants[:7] + + // create a state that has the signers in its QC + qs.state = helper.MakeState[*helper.TestState]() + indices := []byte{0b01111111, 0b00000000} + + qs.qc = helper.MakeQC(helper.WithQCState[*helper.TestState](qs.state), helper.WithQCSigners(indices)) + + // return the correct participants and identities from rank state + qs.committee = &mocks.Replicas{} + qs.committee.On("IdentitiesByRank", mock.Anything).Return( + func(_ uint64) []models.WeightedIdentity { + return qs.participants + }, + nil, + ) + qs.committee.On("QuorumThresholdForRank", mock.Anything).Return(uint64(7000), nil) + + // set up the mocked verifier to verify the QC correctly + qs.verifier = &mocks.Verifier[*helper.TestVote]{} + qs.verifier.On("VerifyQuorumCertificate", qs.qc).Return(nil) + + // set up the validator with the mocked dependencies + qs.validator = NewValidator[*helper.TestState, *helper.TestVote](qs.committee, qs.verifier) +} + +// TestQCOK verifies the default happy case +func (qs *QCSuite) TestQCOK() { + + // check the default happy case passes + err := qs.validator.ValidateQuorumCertificate(qs.qc) + assert.NoError(qs.T(), err, "a valid QC should be accepted") +} + +// TestQCRetrievingParticipantsError tests that validation errors if: +// there is an error retrieving identities of consensus participants +func (qs *QCSuite) TestQCRetrievingParticipantsError() { + // change the consensus.DynamicCommittee to fail on retrieving participants + *qs.committee = mocks.Replicas{} + qs.committee.On("IdentitiesByRank", mock.Anything).Return(qs.participants, errors.New("FATAL internal error")) + + // verifier should escalate unspecific internal error to surrounding logic, but NOT as ErrorInvalidQC + err := qs.validator.ValidateQuorumCertificate(qs.qc) + assert.Error(qs.T(), err, "unspecific error when retrieving consensus participants should be escalated to surrounding logic") + assert.False(qs.T(), models.IsInvalidQuorumCertificateError(err), "unspecific internal errors should not result in ErrorInvalidQC error") +} + +// TestQCSignersError tests that a qc fails validation if: +// QC signer's have insufficient weight (but are all valid consensus participants otherwise) +func (qs *QCSuite) TestQCInsufficientWeight() { + // signers only have weight 6 out of 10 total (NOT have a supermajority) + qs.signers = qs.participants[:6] + indices := []byte{0b00111111, 0b00000000} + + qs.qc = helper.MakeQC(helper.WithQCState[*helper.TestState](qs.state), helper.WithQCSigners(indices)) + + // the QC should not be validated anymore + err := qs.validator.ValidateQuorumCertificate(qs.qc) + assert.Error(qs.T(), err, "a QC should be rejected if it has insufficient voted weight") + + // we should get a threshold error to bubble up for extra info + assert.True(qs.T(), models.IsInvalidQuorumCertificateError(err), "if there is insufficient voted weight, an invalid state error should be raised") +} + +// TestQCSignatureError tests that validation errors if: +// there is an unspecific internal error while validating the signature +func (qs *QCSuite) TestQCSignatureError() { + + // set up the verifier to fail QC verification + *qs.verifier = mocks.Verifier[*helper.TestVote]{} + qs.verifier.On("VerifyQuorumCertificate", qs.qc).Return(errors.New("dummy error")) + + // verifier should escalate unspecific internal error to surrounding logic, but NOT as ErrorInvalidQC + err := qs.validator.ValidateQuorumCertificate(qs.qc) + assert.Error(qs.T(), err, "unspecific sig verification error should be escalated to surrounding logic") + assert.False(qs.T(), models.IsInvalidQuorumCertificateError(err), "unspecific internal errors should not result in ErrorInvalidQC error") +} + +// TestQCSignatureInvalid verifies that the Validator correctly handles the models.ErrInvalidSignature. +// This error return from `Verifier.VerifyQuorumCertificate` is an expected failure case in case of a byzantine input, where +// one of the signatures in the QC is broken. Hence, the Validator should wrap it as InvalidProposalError. +func (qs *QCSuite) TestQCSignatureInvalid() { + // change the verifier to fail the QC signature + *qs.verifier = mocks.Verifier[*helper.TestVote]{} + qs.verifier.On("VerifyQuorumCertificate", qs.qc).Return(fmt.Errorf("invalid qc: %w", models.ErrInvalidSignature)) + + // the QC should no longer pass validation + err := qs.validator.ValidateQuorumCertificate(qs.qc) + assert.True(qs.T(), models.IsInvalidQuorumCertificateError(err), "if the signature is invalid an ErrorInvalidQC error should be raised") +} + +// TestQCVerifyQuorumCertificate_ErrRankUnknown tests if ValidateQuorumCertificate correctly handles VerifyQuorumCertificate's ErrRankUnknown sentinel error +// Validator shouldn't return a sentinel error here because this behavior is a symptom of internal bug, this behavior is not expected. +func (qs *QCSuite) TestQCVerifyQuorumCertificate_ErrRankUnknown() { + *qs.verifier = mocks.Verifier[*helper.TestVote]{} + qs.verifier.On("VerifyQuorumCertificate", qs.qc).Return(models.ErrRankUnknown) + err := qs.validator.ValidateQuorumCertificate(qs.qc) + assert.Error(qs.T(), err) + assert.False(qs.T(), models.IsInvalidQuorumCertificateError(err), "we don't expect a sentinel error here") + assert.NotErrorIs(qs.T(), err, models.ErrRankUnknown, "we don't expect a sentinel error here") +} + +// TestQCSignatureInvalidFormat verifies that the Validator correctly handles the models.InvalidFormatError. +// This error return from `Verifier.VerifyQuorumCertificate` is an expected failure case in case of a byzantine input, where +// some binary vector (e.g. `sigData`) is broken. Hence, the Validator should wrap it as InvalidProposalError. +func (qs *QCSuite) TestQCSignatureInvalidFormat() { + // change the verifier to fail the QC signature + *qs.verifier = mocks.Verifier[*helper.TestVote]{} + qs.verifier.On("VerifyQuorumCertificate", qs.qc).Return(models.NewInvalidFormatErrorf("invalid sigType")) + + // the QC should no longer pass validation + err := qs.validator.ValidateQuorumCertificate(qs.qc) + assert.True(qs.T(), models.IsInvalidQuorumCertificateError(err), "if the signature has an invalid format, an ErrorInvalidQC error should be raised") +} + +// TestQCEmptySigners verifies that the Validator correctly handles the models.InsufficientSignaturesError: +// In the validator, we previously checked the total weight of all signers meets the supermajority threshold, +// which is a _positive_ number. Hence, there must be at least one signer. Hence, `Verifier.VerifyQuorumCertificate` +// returning this error would be a symptom of a fatal internal bug. The Validator should _not_ interpret +// this error as an invalid QC / invalid state, i.e. it should _not_ return an `InvalidProposalError`. +func (qs *QCSuite) TestQCEmptySigners() { + *qs.verifier = mocks.Verifier[*helper.TestVote]{} + qs.verifier.On("VerifyQuorumCertificate", qs.qc).Return( + fmt.Errorf("%w", models.NewInsufficientSignaturesErrorf(""))) + + // the Validator should _not_ interpret this as a invalid QC, but as an internal error + err := qs.validator.ValidateQuorumCertificate(qs.qc) + assert.True(qs.T(), models.IsInsufficientSignaturesError(err)) // unexpected error should be wrapped and propagated upwards + assert.False(qs.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err), err, "should _not_ interpret this as a invalid QC, but as an internal error") +} + +func TestValidateTimeoutCertificate(t *testing.T) { + suite.Run(t, new(TCSuite)) +} + +type TCSuite struct { + suite.Suite + participants []models.WeightedIdentity + signers []models.WeightedIdentity + indices []byte + state *models.State[*helper.TestState] + tc models.TimeoutCertificate + committee *mocks.DynamicCommittee + verifier *mocks.Verifier[*helper.TestVote] + validator *Validator[*helper.TestState, *helper.TestVote] +} + +func (s *TCSuite) SetupTest() { + + // create a list of 10 nodes with 1-weight each + s.participants = helper.WithWeightedIdentityList(10) + + // signers are a qualified majority at 7 + s.signers = s.participants[:7] + + var err error + s.indices = []byte{0b01111111, 0b00000000} + require.NoError(s.T(), err) + + rank := uint64(int(rand.Uint32()) + len(s.participants)) + + highQCRanks := make([]uint64, 0, len(s.signers)) + for i := range s.signers { + highQCRanks = append(highQCRanks, rank-uint64(i)-1) + } + + rand.Shuffle(len(highQCRanks), func(i, j int) { + highQCRanks[i], highQCRanks[j] = highQCRanks[j], highQCRanks[i] + }) + + // create a state that has the signers in its QC + parent := helper.MakeState[*helper.TestState](helper.WithStateRank[*helper.TestState](rank - 1)) + s.state = helper.MakeState[*helper.TestState](helper.WithStateRank[*helper.TestState](rank), + helper.WithParentState(parent), + helper.WithParentSigners[*helper.TestState](s.indices)) + s.tc = helper.MakeTC(helper.WithTCNewestQC(s.state.ParentQuorumCertificate), + helper.WithTCRank(rank+1), + helper.WithTCSigners(s.indices), + helper.WithTCHighQCRanks(highQCRanks)) + + // return the correct participants and identities from rank state + s.committee = &mocks.DynamicCommittee{} + s.committee.On("IdentitiesByRank", mock.Anything, mock.Anything).Return( + func(rank uint64) []models.WeightedIdentity { + return s.participants + }, + nil, + ) + s.committee.On("QuorumThresholdForRank", mock.Anything).Return(uint64(7000), nil) + + s.verifier = &mocks.Verifier[*helper.TestVote]{} + s.verifier.On("VerifyQuorumCertificate", s.state.ParentQuorumCertificate).Return(nil) + + // set up the validator with the mocked dependencies + s.validator = NewValidator[*helper.TestState, *helper.TestVote](s.committee, s.verifier) +} + +// TestTCOk tests if happy-path returns correct result +func (s *TCSuite) TestTCOk() { + s.verifier.On("VerifyTimeoutCertificate", s.tc).Return(nil).Once() + + // check the default happy case passes + err := s.validator.ValidateTimeoutCertificate(s.tc) + assert.NoError(s.T(), err, "a valid TC should be accepted") +} + +// TestTCNewestQCFromFuture tests if correct error is returned when included QC is higher than TC's rank +func (s *TCSuite) TestTCNewestQCFromFuture() { + // highest QC from future rank + s.tc.(*helper.TestTimeoutCertificate).LatestQuorumCert.(*helper.TestQuorumCertificate).Rank = s.tc.GetRank() + 1 + err := s.validator.ValidateTimeoutCertificate(s.tc) // the QC should not be validated anymore + assert.True(s.T(), models.IsInvalidTimeoutCertificateError(err), "if NewestQC.Rank > TC.Rank, an ErrorInvalidTC error should be raised") +} + +// TestTCNewestQCIsNotHighest tests if correct error is returned when included QC is not highest +func (s *TCSuite) TestTCNewestQCIsNotHighest() { + s.verifier.On("VerifyTimeoutCertificate", s.tc).Return(nil).Once() + + // highest QC rank is not equal to max(TONewestQCRanks) + s.tc.(*helper.TestTimeoutCertificate).LatestRanks[0] = s.tc.GetLatestQuorumCert().GetRank() + 1 + err := s.validator.ValidateTimeoutCertificate(s.tc) // the QC should not be validated anymore + assert.True(s.T(), models.IsInvalidTimeoutCertificateError(err), "if max(highQCRanks) != NewestQC.Rank, an ErrorInvalidTC error should be raised") +} + +// TestTCInvalidSigners tests if correct error is returned when signers are invalid +func (s *TCSuite) TestTCInvalidSigners() { + s.participants = s.participants[:6] // remove participant[6+] from the list of valid consensus participant + err := s.validator.ValidateTimeoutCertificate(s.tc) // the QC should not be validated anymore + assert.True(s.T(), models.IsInvalidTimeoutCertificateError(err), "if some signers are invalid consensus participants, an ErrorInvalidTC error should be raised") +} + +// TestTCThresholdNotReached tests if correct error is returned when TC's singers don't have enough weight +func (s *TCSuite) TestTCThresholdNotReached() { + // signers only have weight 1 out of 10 total (NOT have a supermajority) + s.signers = s.participants[:1] + indices := []byte{0b00000001, 0b00000000} + + s.tc.(*helper.TestTimeoutCertificate).AggregatedSignature.(*helper.TestAggregatedSignature).Bitmask = indices + + // adjust signers to be less than total weight + err := s.validator.ValidateTimeoutCertificate(s.tc) // the QC should not be validated anymore + assert.True(s.T(), models.IsInvalidTimeoutCertificateError(err), "if signers don't have enough weight, an ErrorInvalidTC error should be raised") +} + +// TestTCInvalidNewestQC tests if correct error is returned when included highest QC is invalid +func (s *TCSuite) TestTCInvalidNewestQC() { + *s.verifier = mocks.Verifier[*helper.TestVote]{} + s.verifier.On("VerifyTimeoutCertificate", s.tc).Return(nil).Once() + s.verifier.On("VerifyQuorumCertificate", s.tc.GetLatestQuorumCert()).Return(models.NewInvalidFormatErrorf("invalid qc")).Once() + err := s.validator.ValidateTimeoutCertificate(s.tc) // the QC should not be validated anymore + assert.True(s.T(), models.IsInvalidTimeoutCertificateError(err), "if included QC is invalid, an ErrorInvalidTC error should be raised") +} + +// TestTCVerifyQuorumCertificate_ErrRankUnknown tests if ValidateTimeoutCertificate correctly handles VerifyQuorumCertificate's ErrRankUnknown sentinel error +// Validator shouldn't return a sentinel error here because this behavior is a symptom of internal bug, this behavior is not expected. +func (s *TCSuite) TestTCVerifyQuorumCertificate_ErrRankUnknown() { + *s.verifier = mocks.Verifier[*helper.TestVote]{} + s.verifier.On("VerifyTimeoutCertificate", s.tc).Return(nil).Once() + s.verifier.On("VerifyQuorumCertificate", s.tc.GetLatestQuorumCert()).Return(models.ErrRankUnknown).Once() + err := s.validator.ValidateTimeoutCertificate(s.tc) // the QC should not be validated anymore + assert.Error(s.T(), err) + assert.False(s.T(), models.IsInvalidTimeoutCertificateError(err), "we don't expect a sentinel error here") + assert.NotErrorIs(s.T(), err, models.ErrRankUnknown, "we don't expect a sentinel error here") +} + +// TestTCInvalidSignature tests a few scenarios when the signature is invalid or TC signers is malformed +func (s *TCSuite) TestTCInvalidSignature() { + s.Run("insufficient-signatures", func() { + *s.verifier = mocks.Verifier[*helper.TestVote]{} + s.verifier.On("VerifyQuorumCertificate", mock.Anything).Return(nil).Once() + s.verifier.On("VerifyTimeoutCertificate", s.tc).Return(models.NewInsufficientSignaturesErrorf("")).Once() + + // the Validator should _not_ interpret this as an invalid TC, but as an internal error + err := s.validator.ValidateTimeoutCertificate(s.tc) + assert.True(s.T(), models.IsInsufficientSignaturesError(err)) // unexpected error should be wrapped and propagated upwards + assert.False(s.T(), models.IsInvalidTimeoutCertificateError(err), "should _not_ interpret this as a invalid TC, but as an internal error") + }) + s.Run("invalid-format", func() { + *s.verifier = mocks.Verifier[*helper.TestVote]{} + s.verifier.On("VerifyQuorumCertificate", mock.Anything).Return(nil).Once() + s.verifier.On("VerifyTimeoutCertificate", s.tc).Return(models.NewInvalidFormatErrorf("")).Once() + err := s.validator.ValidateTimeoutCertificate(s.tc) + assert.True(s.T(), models.IsInvalidTimeoutCertificateError(err), "if included TC's inputs are invalid, an ErrorInvalidTC error should be raised") + }) + s.Run("invalid-signature", func() { + *s.verifier = mocks.Verifier[*helper.TestVote]{} + s.verifier.On("VerifyQuorumCertificate", mock.Anything).Return(nil).Once() + s.verifier.On("VerifyTimeoutCertificate", s.tc).Return(models.ErrInvalidSignature).Once() + err := s.validator.ValidateTimeoutCertificate(s.tc) + assert.True(s.T(), models.IsInvalidTimeoutCertificateError(err), "if included TC's signature is invalid, an ErrorInvalidTC error should be raised") + }) + s.Run("verify-sig-exception", func() { + exception := errors.New("verify-sig-exception") + *s.verifier = mocks.Verifier[*helper.TestVote]{} + s.verifier.On("VerifyQuorumCertificate", mock.Anything).Return(nil).Once() + s.verifier.On("VerifyTimeoutCertificate", s.tc).Return(exception).Once() + err := s.validator.ValidateTimeoutCertificate(s.tc) + assert.ErrorIs(s.T(), err, exception, "if included TC's signature is invalid, an exception should be propagated") + assert.False(s.T(), models.IsInvalidTimeoutCertificateError(err)) + }) +} diff --git a/consensus/vote_collector.go b/consensus/vote_collector.go index 40084f5..a82790a 100644 --- a/consensus/vote_collector.go +++ b/consensus/vote_collector.go @@ -94,7 +94,7 @@ type VoteProcessor[VoteT models.Unique] interface { // Process performs processing of single vote. This function is safe to call // from multiple goroutines. // Expected error returns during normal operations: - // * VoteForIncompatibleBlockError - submitted vote for incompatible state + // * VoteForIncompatibleStateError - submitted vote for incompatible state // * VoteForIncompatibleRankError - submitted vote for incompatible rank // * models.InvalidVoteError - submitted vote with invalid signature // * models.DuplicatedSignerError - vote from a signer whose vote was diff --git a/consensus/voteaggregator/vote_aggregator.go b/consensus/voteaggregator/vote_aggregator.go index 679ecb3..98ab49b 100644 --- a/consensus/voteaggregator/vote_aggregator.go +++ b/consensus/voteaggregator/vote_aggregator.go @@ -3,9 +3,9 @@ package voteaggregator import ( "context" "fmt" - "sync" "sync/atomic" + "golang.org/x/sync/errgroup" "source.quilibrium.com/quilibrium/monorepo/consensus" "source.quilibrium.com/quilibrium/monorepo/consensus/models" ) @@ -35,7 +35,7 @@ type VoteAggregator[StateT models.Unique, VoteT models.Unique] struct { finalizedRank atomic.Uint64 // cache the last finalized rank to queue up the pruning work, and unstate the caller who's delivering the finalization event. queuedVotes chan *VoteT queuedStates chan *models.SignedProposal[StateT, VoteT] - wg sync.WaitGroup + wg errgroup.Group } var _ consensus.VoteAggregator[*nilUnique, *nilUnique] = (*VoteAggregator[*nilUnique, *nilUnique])(nil) @@ -64,7 +64,7 @@ func NewVoteAggregator[StateT models.Unique, VoteT models.Unique]( queuedStates: queuedStates, queuedMessagesNotifier: make(chan struct{}, 1), finalizationEventsNotifier: make(chan struct{}, 1), - wg: sync.WaitGroup{}, + wg: errgroup.Group{}, } aggregator.lowestRetainedRank.Store(lowestRetainedRank) @@ -74,16 +74,19 @@ func NewVoteAggregator[StateT models.Unique, VoteT models.Unique]( } func (va *VoteAggregator[StateT, VoteT]) Start(ctx context.Context) error { - va.wg.Add(defaultVoteAggregatorWorkers + 1) + internalCtx, internalCancel := context.WithCancel(ctx) + va.wg.SetLimit(defaultVoteAggregatorWorkers + 1) for i := 0; i < defaultVoteAggregatorWorkers; i++ { // manager for worker routines that process inbound messages - go func() { - defer va.wg.Done() - va.queuedMessagesProcessingLoop(ctx) - }() + va.wg.Go(func() error { + err := va.queuedMessagesProcessingLoop(internalCtx) + if err != nil { + internalCancel() + } + return err + }) } - go func() { - defer va.wg.Done() + va.wg.Go(func() error { // create new context which is not connected to parent // we need to ensure that our internal workers stop before asking // vote collectors to stop. We want to avoid delivering events to already @@ -93,14 +96,15 @@ func (va *VoteAggregator[StateT, VoteT]) Start(ctx context.Context) error { // start vote collectors err := va.collectors.Start(innerCtx) if err != nil { - return + internalCancel() + return err } // Handle the component lifecycle in a separate goroutine so we can capture // any errors thrown during initialization in the main goroutine. go func() { select { - case <-ctx.Done(): + case <-internalCtx.Done(): // wait for internal workers to stop, then signal vote collectors to // stop va.wg.Wait() @@ -108,19 +112,20 @@ func (va *VoteAggregator[StateT, VoteT]) Start(ctx context.Context) error { } }() - va.finalizationProcessingLoop(ctx) - }() - return nil + va.finalizationProcessingLoop(internalCtx) + return nil + }) + return va.wg.Wait() } func (va *VoteAggregator[StateT, VoteT]) queuedMessagesProcessingLoop( ctx context.Context, -) { +) error { notifier := va.queuedMessagesNotifier for { select { case <-ctx.Done(): - return + return nil case <-notifier: err := va.processQueuedMessages(ctx) if err != nil { @@ -128,7 +133,7 @@ func (va *VoteAggregator[StateT, VoteT]) queuedMessagesProcessingLoop( "stopping mesage processing loop", fmt.Errorf("internal error processing queued messages: %w", err), ) - return + return err } } } diff --git a/consensus/voteaggregator/vote_aggregator_test.go b/consensus/voteaggregator/vote_aggregator_test.go new file mode 100644 index 0000000..345bdab --- /dev/null +++ b/consensus/voteaggregator/vote_aggregator_test.go @@ -0,0 +1,107 @@ +package voteaggregator + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + "source.quilibrium.com/quilibrium/monorepo/consensus/mocks" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +func TestVoteAggregator(t *testing.T) { + ts := new(VoteAggregatorTestSuite) + ts.errs = make(chan error, 1) + suite.Run(t, ts) +} + +// VoteAggregatorTestSuite is a test suite for isolated testing of VoteAggregator. +// Contains mocked state which is used to verify correct behavior of VoteAggregator. +// Automatically starts and stops module.Startable in SetupTest and TearDownTest respectively. +type VoteAggregatorTestSuite struct { + suite.Suite + + aggregator *VoteAggregator[*helper.TestState, *helper.TestVote] + collectors *mocks.VoteCollectors[*helper.TestState, *helper.TestVote] + consumer *mocks.VoteAggregationConsumer[*helper.TestState, *helper.TestVote] + stopAggregator context.CancelFunc + errs chan error +} + +func (s *VoteAggregatorTestSuite) SetupTest() { + var err error + s.collectors = mocks.NewVoteCollectors[*helper.TestState, *helper.TestVote](s.T()) + s.consumer = mocks.NewVoteAggregationConsumer[*helper.TestState, *helper.TestVote](s.T()) + + s.collectors.On("Start", mock.Anything).Return(nil).Once() + + s.aggregator, err = NewVoteAggregator( + helper.Logger(), + s.consumer, + 0, + s.collectors, + ) + require.NoError(s.T(), err) + + ctx, cancel := context.WithCancel(context.Background()) + signalerCtx := ctx + s.stopAggregator = cancel + go func() { + err := s.aggregator.Start(signalerCtx) + if err != nil { + s.errs <- err + } + }() +} + +func (s *VoteAggregatorTestSuite) TearDownTest() { + s.stopAggregator() +} + +// TestOnFinalizedState tests if finalized state gets processed when send through `VoteAggregator`. +// Tests the whole processing pipeline. +func (s *VoteAggregatorTestSuite) TestOnFinalizedState() { + finalizedState := helper.MakeState(helper.WithStateRank[*helper.TestState](100)) + done := make(chan struct{}) + s.collectors.On("PruneUpToRank", uint64(100)).Run(func(args mock.Arguments) { + close(done) + }).Once() + s.aggregator.OnFinalizedState(finalizedState) + time.Sleep(100 * time.Millisecond) +} + +// TestProcessInvalidState tests that processing invalid state results in exception, when given as +// an input to AddState (only expects _valid_ states per API contract). +// The exception should be propagated to the VoteAggregator's internal `ComponentManager`. +func (s *VoteAggregatorTestSuite) TestProcessInvalidState() { + state := helper.MakeSignedProposal(helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal( + helper.WithState( + helper.MakeState( + helper.WithStateRank[*helper.TestState](100), + ), + ), + ))) + processed := make(chan struct{}) + collector := mocks.NewVoteCollector[*helper.TestState, *helper.TestVote](s.T()) + collector.On("ProcessState", state).Run(func(_ mock.Arguments) { + close(processed) + }).Return(models.InvalidProposalError[*helper.TestState, *helper.TestVote]{}) + s.collectors.On("GetOrCreateCollector", state.State.Rank).Return(collector, true, nil).Once() + + // submit state for processing + s.aggregator.AddState(state) + + // expect a thrown error + select { + case err := <-s.errs: + require.Error(s.T(), err) + require.False(s.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + case <-time.After(100 * time.Millisecond): + s.T().Fatalf("expected error but haven't received anything") + } +} diff --git a/consensus/voteaggregator/vote_collectors_test.go b/consensus/voteaggregator/vote_collectors_test.go new file mode 100644 index 0000000..db78991 --- /dev/null +++ b/consensus/voteaggregator/vote_collectors_test.go @@ -0,0 +1,158 @@ +package voteaggregator + +import ( + "errors" + "fmt" + "sync" + "testing" + + "github.com/gammazero/workerpool" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.uber.org/atomic" + + "source.quilibrium.com/quilibrium/monorepo/consensus" + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + "source.quilibrium.com/quilibrium/monorepo/consensus/mocks" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +var factoryError = errors.New("factory error") + +func TestVoteCollectors(t *testing.T) { + suite.Run(t, new(VoteCollectorsTestSuite)) +} + +// VoteCollectorsTestSuite is a test suite for isolated testing of VoteCollectors. +// Contains helper methods and mocked state which is used to verify correct behavior of VoteCollectors. +type VoteCollectorsTestSuite struct { + suite.Suite + + mockedCollectors map[uint64]*mocks.VoteCollector[*helper.TestState, *helper.TestVote] + factoryMethod NewCollectorFactoryMethod[*helper.TestState, *helper.TestVote] + collectors *VoteCollectors[*helper.TestState, *helper.TestVote] + lowestLevel uint64 + workerPool *workerpool.WorkerPool +} + +func (s *VoteCollectorsTestSuite) SetupTest() { + s.lowestLevel = 1000 + s.mockedCollectors = make(map[uint64]*mocks.VoteCollector[*helper.TestState, *helper.TestVote]) + s.workerPool = workerpool.New(2) + s.factoryMethod = func(rank uint64, _ consensus.Workers) (consensus.VoteCollector[*helper.TestState, *helper.TestVote], error) { + if collector, found := s.mockedCollectors[rank]; found { + return collector, nil + } + return nil, fmt.Errorf("mocked collector %v not found: %w", rank, factoryError) + } + s.collectors = NewVoteCollectors(helper.Logger(), s.lowestLevel, s.workerPool, s.factoryMethod) +} + +func (s *VoteCollectorsTestSuite) TearDownTest() { + s.workerPool.StopWait() +} + +// prepareMockedCollector prepares a mocked collector and stores it in map, later it will be used +// to mock behavior of vote collectors. +func (s *VoteCollectorsTestSuite) prepareMockedCollector(rank uint64) *mocks.VoteCollector[*helper.TestState, *helper.TestVote] { + collector := &mocks.VoteCollector[*helper.TestState, *helper.TestVote]{} + collector.On("Rank").Return(rank).Maybe() + s.mockedCollectors[rank] = collector + return collector +} + +// TestGetOrCreatorCollector_RankLowerThanLowest tests a scenario where caller tries to create a collector with rank +// lower than already pruned one. This should result in sentinel error `BelowPrunedThresholdError` +func (s *VoteCollectorsTestSuite) TestGetOrCreatorCollector_RankLowerThanLowest() { + collector, created, err := s.collectors.GetOrCreateCollector(s.lowestLevel - 10) + require.Nil(s.T(), collector) + require.False(s.T(), created) + require.Error(s.T(), err) + require.True(s.T(), models.IsBelowPrunedThresholdError(err)) +} + +// TestGetOrCreateCollector_ValidCollector tests a happy path scenario where we try first to create and then retrieve cached collector. +func (s *VoteCollectorsTestSuite) TestGetOrCreateCollector_ValidCollector() { + rank := s.lowestLevel + 10 + s.prepareMockedCollector(rank) + collector, created, err := s.collectors.GetOrCreateCollector(rank) + require.NoError(s.T(), err) + require.True(s.T(), created) + require.Equal(s.T(), rank, collector.Rank()) + + cached, cachedCreated, err := s.collectors.GetOrCreateCollector(rank) + require.NoError(s.T(), err) + require.False(s.T(), cachedCreated) + require.Equal(s.T(), collector, cached) +} + +// TestGetOrCreateCollector_FactoryError tests that error from factory method is propagated to caller. +func (s *VoteCollectorsTestSuite) TestGetOrCreateCollector_FactoryError() { + // creating collector without calling prepareMockedCollector will yield factoryError. + collector, created, err := s.collectors.GetOrCreateCollector(s.lowestLevel + 10) + require.Nil(s.T(), collector) + require.False(s.T(), created) + require.ErrorIs(s.T(), err, factoryError) +} + +// TestGetOrCreateCollectors_ConcurrentAccess tests that concurrently accessing of GetOrCreateCollector creates +// only one collector and all other instances are retrieved from cache. +func (s *VoteCollectorsTestSuite) TestGetOrCreateCollectors_ConcurrentAccess() { + createdTimes := atomic.NewUint64(0) + rank := s.lowestLevel + 10 + s.prepareMockedCollector(rank) + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + _, created, err := s.collectors.GetOrCreateCollector(rank) + require.NoError(s.T(), err) + if created { + createdTimes.Add(1) + } + wg.Done() + }() + } + + wg.Wait() + require.Equal(s.T(), uint64(1), createdTimes.Load()) +} + +// TestPruneUpToRank tests pruning removes item below pruning height and leaves unmodified other items. +func (s *VoteCollectorsTestSuite) TestPruneUpToRank() { + numberOfCollectors := uint64(10) + prunedRanks := make([]uint64, 0) + for i := uint64(0); i < numberOfCollectors; i++ { + rank := s.lowestLevel + i + s.prepareMockedCollector(rank) + _, _, err := s.collectors.GetOrCreateCollector(rank) + require.NoError(s.T(), err) + prunedRanks = append(prunedRanks, rank) + } + + pruningHeight := s.lowestLevel + numberOfCollectors + + expectedCollectors := make([]consensus.VoteCollector[*helper.TestState, *helper.TestVote], 0) + for i := uint64(0); i < numberOfCollectors; i++ { + rank := pruningHeight + i + s.prepareMockedCollector(rank) + collector, _, err := s.collectors.GetOrCreateCollector(rank) + require.NoError(s.T(), err) + expectedCollectors = append(expectedCollectors, collector) + } + + // after this operation collectors below pruning height should be pruned and everything higher + // should be left unmodified + s.collectors.PruneUpToRank(pruningHeight) + + for _, prunedRank := range prunedRanks { + _, _, err := s.collectors.GetOrCreateCollector(prunedRank) + require.Error(s.T(), err) + require.True(s.T(), models.IsBelowPrunedThresholdError(err)) + } + + for _, collector := range expectedCollectors { + cached, _, _ := s.collectors.GetOrCreateCollector(collector.Rank()) + require.Equal(s.T(), collector, cached) + } +} diff --git a/consensus/votecollector/factory_test.go b/consensus/votecollector/factory_test.go new file mode 100644 index 0000000..7aa8d1a --- /dev/null +++ b/consensus/votecollector/factory_test.go @@ -0,0 +1,118 @@ +package votecollector + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "source.quilibrium.com/quilibrium/monorepo/consensus" + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + "source.quilibrium.com/quilibrium/monorepo/consensus/mocks" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +// TestVoteProcessorFactory_CreateWithValidProposal checks if +// VoteProcessorFactory checks the proposer vote based on submitted proposal +func TestVoteProcessorFactory_CreateWithValidProposal(t *testing.T) { + mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote]{} + + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]() + mockedProcessor := &mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote]{} + vote, err := proposal.ProposerVote() + require.NoError(t, err) + mockedProcessor.On("Process", vote).Return(nil).Once() + mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() + + voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ + baseFactory: func(log consensus.TraceLogger, block *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return mockedFactory.Create(log, proposal, dsTag, aggregator) + }, + } + + processor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) + require.NoError(t, err) + require.NotNil(t, processor) + + mockedProcessor.AssertExpectations(t) + mockedFactory.AssertExpectations(t) +} + +// TestVoteProcessorFactory_CreateWithInvalidVote tests that processing proposal with invalid vote doesn't return +// vote processor and returns correct error(sentinel or exception). +func TestVoteProcessorFactory_CreateWithInvalidVote(t *testing.T) { + mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote]{} + + t.Run("invalid-vote", func(t *testing.T) { + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]() + mockedProcessor := &mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote]{} + vote, err := proposal.ProposerVote() + require.NoError(t, err) + mockedProcessor.On("Process", vote).Return(models.NewInvalidVoteErrorf(vote, "")).Once() + mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() + + voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ + baseFactory: func(log consensus.TraceLogger, block *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return mockedFactory.Create(log, proposal, dsTag, aggregator) + }, + } + + processor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) + require.Error(t, err) + require.Nil(t, processor) + require.True(t, models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + + mockedProcessor.AssertExpectations(t) + }) + t.Run("process-vote-exception", func(t *testing.T) { + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]() + mockedProcessor := &mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote]{} + exception := errors.New("process-exception") + vote, err := proposal.ProposerVote() + require.NoError(t, err) + mockedProcessor.On("Process", vote).Return(exception).Once() + + mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() + + voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ + baseFactory: func(log consensus.TraceLogger, block *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return mockedFactory.Create(log, proposal, dsTag, aggregator) + }, + } + + processor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) + require.ErrorIs(t, err, exception) + require.Nil(t, processor) + // an unexpected exception should _not_ be interpreted as the block being invalid + require.False(t, models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + + mockedProcessor.AssertExpectations(t) + }) + + mockedFactory.AssertExpectations(t) +} + +// TestVoteProcessorFactory_CreateProcessException tests that VoteProcessorFactory correctly handles exception +// while creating processor for requested proposal. +func TestVoteProcessorFactory_CreateProcessException(t *testing.T) { + mockedFactory := mocks.VoteProcessorFactory[*helper.TestState, *helper.TestVote]{} + + proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]() + exception := errors.New("create-exception") + + mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything).Return(nil, exception).Once() + voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ + baseFactory: func(log consensus.TraceLogger, block *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return mockedFactory.Create(log, proposal, dsTag, aggregator) + }, + } + + processor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) + require.ErrorIs(t, err, exception) + require.Nil(t, processor) + // an unexpected exception should _not_ be interpreted as the block being invalid + require.False(t, models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + + mockedFactory.AssertExpectations(t) +} diff --git a/consensus/votecollector/statemachine_test.go b/consensus/votecollector/statemachine_test.go new file mode 100644 index 0000000..584cada --- /dev/null +++ b/consensus/votecollector/statemachine_test.go @@ -0,0 +1,286 @@ +package votecollector + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/gammazero/workerpool" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "source.quilibrium.com/quilibrium/monorepo/consensus" + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + "source.quilibrium.com/quilibrium/monorepo/consensus/mocks" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +func TestStateMachine(t *testing.T) { + suite.Run(t, new(StateMachineTestSuite)) +} + +var factoryError = errors.New("factory error") + +// StateMachineTestSuite is a test suite for testing VoteCollector. It stores mocked +// VoteProcessors internally for testing behavior and state transitions for VoteCollector. +type StateMachineTestSuite struct { + suite.Suite + + rank uint64 + notifier *mocks.VoteAggregationConsumer[*helper.TestState, *helper.TestVote] + workerPool *workerpool.WorkerPool + factoryMethod VerifyingVoteProcessorFactory[*helper.TestState, *helper.TestVote] + mockedProcessors map[models.Identity]*mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote] + collector *VoteCollector[*helper.TestState, *helper.TestVote] +} + +func (s *StateMachineTestSuite) TearDownTest() { + // Without this line we are risking running into weird situations where one test has finished but there are active workers + // that are executing some work on the shared pool. Need to ensure that all pending work has been executed before + // starting next test. + s.workerPool.StopWait() +} + +func (s *StateMachineTestSuite) SetupTest() { + s.rank = 1000 + s.mockedProcessors = make(map[models.Identity]*mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote]) + s.notifier = mocks.NewVoteAggregationConsumer[*helper.TestState, *helper.TestVote](s.T()) + + s.factoryMethod = func(log consensus.TraceLogger, state *models.SignedProposal[*helper.TestState, *helper.TestVote]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + if processor, found := s.mockedProcessors[state.State.Identifier]; found { + return processor, nil + } + return nil, fmt.Errorf("mocked processor %v not found: %w", state.State.Identifier, factoryError) + } + + s.workerPool = workerpool.New(4) + s.collector = NewStateMachine(s.rank, helper.Logger(), s.workerPool, s.notifier, s.factoryMethod) +} + +// prepareMockedProcessor prepares a mocked processor and stores it in map, later it will be used +// to mock behavior of verifying vote processor. +func (s *StateMachineTestSuite) prepareMockedProcessor(proposal *models.SignedProposal[*helper.TestState, *helper.TestVote]) *mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote] { + processor := &mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote]{} + processor.On("State").Return(func() *models.State[*helper.TestState] { + return proposal.State + }).Maybe() + processor.On("Status").Return(consensus.VoteCollectorStatusVerifying) + s.mockedProcessors[proposal.State.Identifier] = processor + return processor +} + +// TestStatus_StateTransitions tests that Status returns correct state of VoteCollector in different scenarios +// when proposal processing can possibly change state of collector +func (s *StateMachineTestSuite) TestStatus_StateTransitions() { + state := helper.MakeState(helper.WithStateRank[*helper.TestState](s.rank)) + proposal := helper.MakeSignedProposal(helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal(helper.WithState(state)))) + s.prepareMockedProcessor(proposal) + + // by default, we should create in caching status + require.Equal(s.T(), consensus.VoteCollectorStatusCaching, s.collector.Status()) + + // after processing state we should get into verifying status + err := s.collector.ProcessState(proposal) + require.NoError(s.T(), err) + require.Equal(s.T(), consensus.VoteCollectorStatusVerifying, s.collector.Status()) + + // after submitting double proposal we should transfer into invalid state + err = s.collector.ProcessState(makeSignedProposalWithRank(s.rank)) + require.NoError(s.T(), err) + require.Equal(s.T(), consensus.VoteCollectorStatusInvalid, s.collector.Status()) +} + +// TestStatus_FactoryErrorPropagation verifies that errors from the injected +// factory are handed through (potentially wrapped), but are not replaced. +func (s *StateMachineTestSuite) Test_FactoryErrorPropagation() { + factoryError := errors.New("factory error") + factory := func(log consensus.TraceLogger, state *models.SignedProposal[*helper.TestState, *helper.TestVote]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return nil, factoryError + } + s.collector.createVerifyingProcessor = factory + + // failing to create collector has to result in error and won't change state + proposal := makeSignedProposalWithRank(s.rank) + err := s.collector.ProcessState(proposal) + require.ErrorIs(s.T(), err, factoryError) + require.Equal(s.T(), consensus.VoteCollectorStatusCaching, s.collector.Status()) +} + +// TestAddVote_VerifyingState tests that AddVote correctly process valid and invalid votes as well +// as repeated, invalid and double votes in verifying state +func (s *StateMachineTestSuite) TestAddVote_VerifyingState() { + proposal := makeSignedProposalWithRank(s.rank) + state := proposal.State + processor := s.prepareMockedProcessor(proposal) + err := s.collector.ProcessState(proposal) + require.NoError(s.T(), err) + s.T().Run("add-valid-vote", func(t *testing.T) { + vote := helper.VoteForStateFixture(state) + s.notifier.On("OnVoteProcessed", &vote).Once() + processor.On("Process", &vote).Return(nil).Once() + err := s.collector.AddVote(&vote) + require.NoError(t, err) + processor.AssertCalled(t, "Process", &vote) + }) + s.T().Run("add-double-vote", func(t *testing.T) { + firstVote := helper.VoteForStateFixture(state) + s.notifier.On("OnVoteProcessed", &firstVote).Once() + processor.On("Process", &firstVote).Return(nil).Once() + err := s.collector.AddVote(&firstVote) + require.NoError(t, err) + + secondVote := helper.VoteFixture(func(vote **helper.TestVote) { + (*vote).Rank = firstVote.Rank + (*vote).ID = firstVote.ID + }) // voted stateID is randomly sampled, i.e. it will be different from firstVote + s.notifier.On("OnDoubleVotingDetected", &firstVote, &secondVote).Return(nil).Once() + + err = s.collector.AddVote(&secondVote) + // we shouldn't get an error + require.NoError(t, err) + + // but should get notified about double voting + s.notifier.AssertCalled(t, "OnDoubleVotingDetected", &firstVote, &secondVote) + processor.AssertCalled(t, "Process", &firstVote) + }) + s.T().Run("add-invalid-vote", func(t *testing.T) { + vote := helper.VoteForStateFixture(state, func(vote **helper.TestVote) { + (*vote).Rank = s.rank + }) + processor.On("Process", &vote).Return(models.NewInvalidVoteErrorf[*helper.TestVote](&vote, "")).Once() + s.notifier.On("OnInvalidVoteDetected", mock.Anything).Run(func(args mock.Arguments) { + invalidVoteErr := args.Get(0).(models.InvalidVoteError[*helper.TestVote]) + require.Equal(s.T(), &vote, invalidVoteErr.Vote) + }).Return(nil).Once() + err := s.collector.AddVote(&vote) + // in case process returns models.InvalidVoteError we should silently ignore this error + require.NoError(t, err) + + // but should get notified about invalid vote + s.notifier.AssertCalled(t, "OnInvalidVoteDetected", mock.Anything) + processor.AssertCalled(t, "Process", &vote) + }) + s.T().Run("add-repeated-vote", func(t *testing.T) { + vote := helper.VoteForStateFixture(state) + s.notifier.On("OnVoteProcessed", &vote).Once() + processor.On("Process", &vote).Return(nil).Once() + err := s.collector.AddVote(&vote) + require.NoError(t, err) + + // calling with same vote should exit early without error and don't do any extra processing + err = s.collector.AddVote(&vote) + require.NoError(t, err) + + processor.AssertCalled(t, "Process", &vote) + }) + s.T().Run("add-incompatible-rank-vote", func(t *testing.T) { + vote := helper.VoteForStateFixture(state, func(vote **helper.TestVote) { + (*vote).Rank = s.rank + 1 + }) + err := s.collector.AddVote(&vote) + require.ErrorIs(t, err, VoteForIncompatibleRankError) + }) + s.T().Run("add-incompatible-state-vote", func(t *testing.T) { + vote := helper.VoteForStateFixture(state, func(vote **helper.TestVote) { + (*vote).Rank = s.rank + }) + processor.On("Process", &vote).Return(VoteForIncompatibleStateError).Once() + err := s.collector.AddVote(&vote) + // in case process returns VoteForIncompatibleStateError we should silently ignore this error + require.NoError(t, err) + processor.AssertCalled(t, "Process", &vote) + }) + s.T().Run("unexpected-VoteProcessor-errors-are-passed-up", func(t *testing.T) { + unexpectedError := errors.New("some unexpected error") + vote := helper.VoteForStateFixture(state, func(vote **helper.TestVote) { + (*vote).Rank = s.rank + }) + processor.On("Process", &vote).Return(unexpectedError).Once() + err := s.collector.AddVote(&vote) + require.ErrorIs(t, err, unexpectedError) + }) +} + +// TestProcessState_ProcessingOfCachedVotes tests that after processing state proposal are cached votes +// are sent to vote processor +func (s *StateMachineTestSuite) TestProcessState_ProcessingOfCachedVotes() { + votes := 10 + proposal := makeSignedProposalWithRank(s.rank) + state := proposal.State + processor := s.prepareMockedProcessor(proposal) + for i := 0; i < votes; i++ { + vote := helper.VoteForStateFixture(state) + // once when caching vote, and once when processing cached vote + s.notifier.On("OnVoteProcessed", &vote).Twice() + // eventually it has to be processed by processor + processor.On("Process", &vote).Return(nil).Once() + require.NoError(s.T(), s.collector.AddVote(&vote)) + } + + err := s.collector.ProcessState(proposal) + require.NoError(s.T(), err) + + time.Sleep(100 * time.Millisecond) + + processor.AssertExpectations(s.T()) +} + +// Test_VoteProcessorErrorPropagation verifies that unexpected errors from the `VoteProcessor` +// are propagated up the call stack (potentially wrapped), but are not replaced. +func (s *StateMachineTestSuite) Test_VoteProcessorErrorPropagation() { + proposal := makeSignedProposalWithRank(s.rank) + state := proposal.State + processor := s.prepareMockedProcessor(proposal) + + err := s.collector.ProcessState(helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]( + helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal(helper.WithState[*helper.TestState](state))))) + require.NoError(s.T(), err) + + unexpectedError := errors.New("some unexpected error") + vote := helper.VoteForStateFixture(state, func(vote **helper.TestVote) { + (*vote).Rank = s.rank + }) + processor.On("Process", &vote).Return(unexpectedError).Once() + err = s.collector.AddVote(&vote) + require.ErrorIs(s.T(), err, unexpectedError) +} + +// RegisterVoteConsumer verifies that after registering vote consumer we are receiving all new and past votes +// in strict ordering of arrival. +func (s *StateMachineTestSuite) RegisterVoteConsumer() { + votes := 10 + proposal := makeSignedProposalWithRank(s.rank) + state := proposal.State + processor := s.prepareMockedProcessor(proposal) + expectedVotes := make([]*helper.TestVote, 0) + for i := 0; i < votes; i++ { + vote := helper.VoteForStateFixture(state) + // eventually it has to be process by processor + processor.On("Process", &vote).Return(nil).Once() + require.NoError(s.T(), s.collector.AddVote(&vote)) + expectedVotes = append(expectedVotes, vote) + } + + actualVotes := make([]*helper.TestVote, 0) + consumer := func(vote **helper.TestVote) { + actualVotes = append(actualVotes, *vote) + } + + s.collector.RegisterVoteConsumer(consumer) + + for i := 0; i < votes; i++ { + vote := helper.VoteForStateFixture(state) + // eventually it has to be process by processor + processor.On("Process", &vote).Return(nil).Once() + require.NoError(s.T(), s.collector.AddVote(&vote)) + expectedVotes = append(expectedVotes, vote) + } + + require.Equal(s.T(), expectedVotes, actualVotes) +} + +func makeSignedProposalWithRank(rank uint64) *models.SignedProposal[*helper.TestState, *helper.TestVote] { + return helper.MakeSignedProposal[*helper.TestState, *helper.TestVote](helper.WithProposal[*helper.TestState, *helper.TestVote](helper.MakeProposal(helper.WithState(helper.MakeState(helper.WithStateRank[*helper.TestState](rank)))))) +} diff --git a/consensus/votecollector/testutil.go b/consensus/votecollector/testutil.go new file mode 100644 index 0000000..d49cd6b --- /dev/null +++ b/consensus/votecollector/testutil.go @@ -0,0 +1,51 @@ +package votecollector + +import ( + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + mockconsensus "source.quilibrium.com/quilibrium/monorepo/consensus/mocks" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +type VoteProcessorTestSuiteBase struct { + suite.Suite + + sigWeight uint64 + provingTotalWeight uint64 + onQCCreatedState mock.Mock + + provingAggregator *mockconsensus.WeightedSignatureAggregator + minRequiredWeight uint64 + proposal *models.SignedProposal[*helper.TestState, *helper.TestVote] +} + +func (s *VoteProcessorTestSuiteBase) SetupTest() { + s.provingAggregator = &mockconsensus.WeightedSignatureAggregator{} + s.proposal = helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]() + + // let's assume we have 19 nodes each with weight 100 + s.sigWeight = 100 + s.minRequiredWeight = 1300 // we require at least 13 sigs to collect min weight + s.provingTotalWeight = 0 + + // setup proving signature aggregator + s.provingAggregator.On("TrustedAdd", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + s.provingTotalWeight += s.sigWeight + }).Return(func(signerID models.Identity, sig []byte) uint64 { + return s.provingTotalWeight + }, func(signerID models.Identity, sig []byte) error { + return nil + }).Maybe() + s.provingAggregator.On("TotalWeight").Return(func() uint64 { + return s.provingTotalWeight + }).Maybe() +} + +// onQCCreated is a special function that registers call in mocked state. +// ATTENTION: don't change name of this function since the same name is used in: +// s.onQCCreatedState.On("onQCCreated") statements +func (s *VoteProcessorTestSuiteBase) onQCCreated(qc models.QuorumCertificate) { + s.onQCCreatedState.Called(qc) +} diff --git a/consensus/votecollector/vote_cache_test.go b/consensus/votecollector/vote_cache_test.go new file mode 100644 index 0000000..c636216 --- /dev/null +++ b/consensus/votecollector/vote_cache_test.go @@ -0,0 +1,189 @@ +package votecollector + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +// TestVotesCache_Rank tests that Rank returns same value that was set by constructor +func TestVotesCache_Rank(t *testing.T) { + rank := uint64(100) + cache := NewVotesCache[*helper.TestVote](rank) + require.Equal(t, rank, cache.Rank()) +} + +// TestVotesCache_AddVoteRepeatedVote tests that AddVote skips duplicated votes +func TestVotesCache_AddVoteRepeatedVote(t *testing.T) { + t.Parallel() + + rank := uint64(100) + cache := NewVotesCache[*helper.TestVote](rank) + vote := helper.VoteFixture(func(v **helper.TestVote) { + (*v).Rank = rank + }) + + require.NoError(t, cache.AddVote(&vote)) + err := cache.AddVote(&vote) + require.ErrorIs(t, err, RepeatedVoteErr) +} + +// TestVotesCache_AddVoteIncompatibleRank tests that adding vote with incompatible rank results in error +func TestVotesCache_AddVoteIncompatibleRank(t *testing.T) { + t.Parallel() + + rank := uint64(100) + cache := NewVotesCache[*helper.TestVote](rank) + vote := helper.VoteFixture(func(v **helper.TestVote) { + (*v).Rank = rank + 1 + }) + err := cache.AddVote(&vote) + require.ErrorIs(t, err, VoteForIncompatibleRankError) +} + +// TestVotesCache_GetVote tests that GetVote method +func TestVotesCache_GetVote(t *testing.T) { + rank := uint64(100) + knownVote := helper.VoteFixture(func(v **helper.TestVote) { + (*v).Rank = rank + }) + doubleVote := helper.VoteFixture(func(v **helper.TestVote) { + (*v).Rank = rank + (*v).ID = knownVote.ID + }) + + cache := NewVotesCache[*helper.TestVote](rank) + + // unknown vote + vote, found := cache.GetVote(helper.MakeIdentity()) + assert.Nil(t, vote) + assert.False(t, found) + + // known vote + err := cache.AddVote(&knownVote) + assert.NoError(t, err) + vote, found = cache.GetVote(knownVote.ID) + assert.Equal(t, &knownVote, vote) + assert.True(t, found) + + // for a signer ID with a known vote, the cache should memorize the _first_ encountered vote + err = cache.AddVote(&doubleVote) + assert.True(t, models.IsDoubleVoteError[*helper.TestVote](err)) + vote, found = cache.GetVote(doubleVote.ID) + assert.Equal(t, &knownVote, vote) + assert.True(t, found) +} + +// TestVotesCache_All tests that All returns previously added votes in same order +func TestVotesCache_All(t *testing.T) { + t.Parallel() + + rank := uint64(100) + cache := NewVotesCache[*helper.TestVote](rank) + expectedVotes := make([]**helper.TestVote, 0, 5) + for i := range expectedVotes { + vote := helper.VoteFixture(func(v **helper.TestVote) { + (*v).Rank = rank + }) + expectedVotes[i] = &vote + require.NoError(t, cache.AddVote(&vote)) + } + require.Equal(t, expectedVotes, cache.All()) +} + +// TestVotesCache_RegisterVoteConsumer tests that registered vote consumer receives all previously added votes as well as +// new ones in expected order. +func TestVotesCache_RegisterVoteConsumer(t *testing.T) { + t.Parallel() + + rank := uint64(100) + cache := NewVotesCache[*helper.TestVote](rank) + votesBatchSize := 5 + expectedVotes := make([]*helper.TestVote, 0, votesBatchSize) + // produce first batch before registering vote consumer + for i := range expectedVotes { + vote := helper.VoteFixture(func(v **helper.TestVote) { + (*v).Rank = rank + }) + expectedVotes[i] = vote + require.NoError(t, cache.AddVote(&vote)) + } + + consumedVotes := make([]*helper.TestVote, 0) + voteConsumer := func(vote **helper.TestVote) { + consumedVotes = append(consumedVotes, *vote) + } + + // registering vote consumer has to fill consumedVotes using callback + cache.RegisterVoteConsumer(voteConsumer) + // all cached votes should be fed into the consumer right away + require.Equal(t, expectedVotes, consumedVotes) + + // produce second batch after registering vote consumer + for i := 0; i < votesBatchSize; i++ { + vote := helper.VoteFixture(func(v **helper.TestVote) { + (*v).Rank = rank + }) + expectedVotes = append(expectedVotes, vote) + require.NoError(t, cache.AddVote(&vote)) + } + + // at this point consumedVotes has to have all votes created before and after registering vote + // consumer, and they must be in same order + require.Equal(t, expectedVotes, consumedVotes) +} + +// BenchmarkAdd measured the time it takes to add `numberVotes` concurrently to the VotesCache. +// On MacBook with Intel i7-7820HQ CPU @ 2.90GHz: +// adding 1 million votes in total, with 20 threads concurrently, took 0.48s +func BenchmarkAdd(b *testing.B) { + numberVotes := 1_000_000 + threads := 20 + + // Setup: create worker routines and votes to feed + rank := uint64(10) + cache := NewVotesCache[*helper.TestVote](rank) + + var start sync.WaitGroup + start.Add(threads) + var done sync.WaitGroup + done.Add(threads) + + blockID := helper.MakeIdentity() + n := numberVotes / threads + + for ; threads > 0; threads-- { + go func(i int) { + // create votes and signal ready + votes := make([]*helper.TestVote, 0, n) + for len(votes) < n { + v := helper.VoteFixture(func(v **helper.TestVote) { + (*v).Rank = rank + (*v).StateID = blockID + }) + votes = append(votes, v) + } + start.Done() + + // Wait for last worker routine to signal ready. Then, + // feed all votes into cache + start.Wait() + for _, v := range votes { + err := cache.AddVote(&v) + assert.NoError(b, err) + } + done.Done() + }(threads) + } + start.Wait() + t1 := time.Now() + done.Wait() + duration := time.Since(t1) + fmt.Printf("=> adding %d votes to Cache took %f seconds\n", cache.Size(), duration.Seconds()) +} diff --git a/consensus/votecollector/vote_processor_test.go b/consensus/votecollector/vote_processor_test.go new file mode 100644 index 0000000..8ba9fe7 --- /dev/null +++ b/consensus/votecollector/vote_processor_test.go @@ -0,0 +1,269 @@ +package votecollector + +import ( + "errors" + "sync" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.uber.org/atomic" + + "source.quilibrium.com/quilibrium/monorepo/consensus" + "source.quilibrium.com/quilibrium/monorepo/consensus/helper" + "source.quilibrium.com/quilibrium/monorepo/consensus/mocks" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" +) + +func TestVoteProcessor(t *testing.T) { + suite.Run(t, new(VoteProcessorTestSuite)) +} + +// VoteProcessorTestSuite is a test suite that holds mocked state for isolated testing of VoteProcessor. +type VoteProcessorTestSuite struct { + VoteProcessorTestSuiteBase + + processor *VoteProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer] + allParticipants []models.WeightedIdentity +} + +func (s *VoteProcessorTestSuite) SetupTest() { + s.VoteProcessorTestSuiteBase.SetupTest() + s.allParticipants = helper.WithWeightedIdentityList(14) + votingProvider := mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](s.T()) + s.processor = &VoteProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ + tracer: helper.Logger(), + state: s.proposal.State, + provingSigAggtor: s.provingAggregator, + onQCCreated: s.onQCCreated, + minRequiredWeight: s.minRequiredWeight, + done: *atomic.NewBool(false), + allParticipants: s.allParticipants, + votingProvider: votingProvider, + } +} + +// TestInitialState tests that State() and Status() return correct values after calling constructor +func (s *VoteProcessorTestSuite) TestInitialState() { + require.Equal(s.T(), s.proposal.State, s.processor.State()) + require.Equal(s.T(), consensus.VoteCollectorStatusVerifying, s.processor.Status()) +} + +// TestProcess_VoteNotForProposal tests that vote should pass to validation only if it has correct +// rank and state ID matching proposal that is locked in VoteProcessor +func (s *VoteProcessorTestSuite) TestProcess_VoteNotForProposal() { + v := helper.VoteForStateFixture(s.proposal.State) + v.StateID = "" + err := s.processor.Process(&v) + require.ErrorAs(s.T(), err, &VoteForIncompatibleStateError) + require.False(s.T(), models.IsInvalidVoteError[*helper.TestVote](err)) + + v = helper.VoteForStateFixture(s.proposal.State) + v.Rank = 0 + err = s.processor.Process(&v) + require.ErrorAs(s.T(), err, &VoteForIncompatibleRankError) + require.False(s.T(), models.IsInvalidVoteError[*helper.TestVote](err)) + + s.provingAggregator.AssertNotCalled(s.T(), "Verify") +} + +// TestProcess_InvalidSignature tests that VoteProcessor doesn't collect signatures for votes with invalid consensus. +// Checks are made for cases where both proving and threshold signatures were submitted. +func (s *VoteProcessorTestSuite) TestProcess_InvalidSignature() { + exception := errors.New("unexpected-exception") + + // sentinel error from `InvalidSignerError` should be wrapped as `InvalidVoteError` + voteA := helper.VoteForStateFixture(s.proposal.State) + s.provingAggregator.On("Verify", voteA.ID, mock.Anything).Return(models.NewInvalidSignerErrorf("")).Once() + err := s.processor.Process(&voteA) + require.Error(s.T(), err) + require.True(s.T(), models.IsInvalidVoteError[*helper.TestVote](err)) + require.True(s.T(), models.IsInvalidSignerError(err)) + + // sentinel error from `ErrInvalidSignature` should be wrapped as `InvalidVoteError` + voteB := helper.VoteForStateFixture(s.proposal.State) + s.provingAggregator.On("Verify", voteB.ID, mock.Anything).Return(models.ErrInvalidSignature).Once() + err = s.processor.Process(&voteB) + require.Error(s.T(), err) + require.True(s.T(), models.IsInvalidVoteError[*helper.TestVote](err)) + require.ErrorAs(s.T(), err, &models.ErrInvalidSignature) + + // unexpected errors from `Verify` should be propagated, but should _not_ be wrapped as `InvalidVoteError` + voteC := helper.VoteForStateFixture(s.proposal.State) + s.provingAggregator.On("Verify", voteC.ID, mock.Anything).Return(exception) + err = s.processor.Process(&voteC) + require.ErrorIs(s.T(), err, exception) // unexpected errors from verifying the vote signature should be propagated + require.False(s.T(), models.IsInvalidVoteError[*helper.TestVote](err)) // but not interpreted as an invalid vote + + s.provingAggregator.AssertNotCalled(s.T(), "TrustedAdd") +} + +// TestProcess_TrustedAdd_Exception tests that unexpected exceptions returned by +// WeightedSignatureAggregator.TrustedAdd(..) are _not_ interpreted as invalid votes +func (s *VoteProcessorTestSuite) TestProcess_TrustedAdd_Exception() { + exception := errors.New("unexpected-exception") + provingVote := helper.VoteForStateFixture(s.proposal.State) + s.provingAggregator = mocks.NewWeightedSignatureAggregator(s.T()) + s.provingAggregator.On("Verify", provingVote.ID, mock.Anything).Return(nil).Once() + s.provingAggregator.On("TrustedAdd", provingVote.ID, mock.Anything).Return(uint64(0), exception).Once() + s.processor.provingSigAggtor = s.provingAggregator + err := s.processor.Process(&provingVote) + require.ErrorIs(s.T(), err, exception) + require.False(s.T(), models.IsInvalidVoteError[*helper.TestVote](err)) + s.provingAggregator.AssertExpectations(s.T()) +} + +// TestProcess_BuildQCError tests error path during process of building QC. +// Building QC is a one time operation, we need to make sure that failing in one of the steps leads to exception. +func (s *VoteProcessorTestSuite) TestProcess_BuildQCError() { + // In this test we will mock all dependencies for happy path, and replace some branches with unhappy path + // to simulate errors along the branches. + vote := helper.VoteForStateFixture(s.proposal.State) + + // in this test case we aren't able to aggregate proving signature + exception := errors.New("proving-aggregate-exception") + provingSigAggregator := mocks.NewWeightedSignatureAggregator(s.T()) + provingSigAggregator.On("Verify", mock.Anything, mock.Anything).Return(nil).Once() + provingSigAggregator.On("TrustedAdd", mock.Anything, mock.Anything).Return(s.minRequiredWeight, nil).Once() + provingSigAggregator.On("Aggregate").Return(nil, nil, exception).Once() + + s.processor.provingSigAggtor = provingSigAggregator + err := s.processor.Process(&vote) + require.ErrorIs(s.T(), err, exception) + provingSigAggregator.AssertExpectations(s.T()) +} + +// TestProcess_NotEnoughWeight tests a scenario where we first don't have enough weight, +// then we iteratively increase it to the point where we have enough proving weight. No QC should be created. +func (s *VoteProcessorTestSuite) TestProcess_NotEnoughWeight() { + for i := s.sigWeight; i < s.minRequiredWeight; i += s.sigWeight { + vote := helper.VoteForStateFixture(s.proposal.State) + s.provingAggregator.On("Verify", vote.ID, []byte(vote.Signature)).Return(nil).Once() + err := s.processor.Process(&vote) + require.NoError(s.T(), err) + } + require.False(s.T(), s.processor.done.Load()) + s.onQCCreatedState.AssertNotCalled(s.T(), "onQCCreated") + s.provingAggregator.AssertExpectations(s.T()) +} + +// TestProcess_CreatingQC tests a scenario when we have collected enough proving weight +// and proceed to build QC. Created QC has to have all signatures and identities aggregated by +// aggregator. +func (s *VoteProcessorTestSuite) TestProcess_CreatingQC() { + // prepare test setup: 13 votes with proving sigs + provingSigners := s.allParticipants[:14] + signerIndices := []byte{0b11111111, 0b00011111} + + // setup aggregator + s.provingAggregator = mocks.NewWeightedSignatureAggregator(s.T()) + expectedSig := &helper.TestAggregatedSignature{ + Signature: make([]byte, 74), + PublicKey: make([]byte, 585), + Bitmask: signerIndices, + } + s.provingAggregator.On("Aggregate").Return(provingSigners, expectedSig, nil).Once() + s.processor.provingSigAggtor = s.provingAggregator + s.processor.votingProvider.(*mocks.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]).On( + "FinalizeQuorumCertificate", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(&helper.TestQuorumCertificate{ + Filter: nil, + Rank: s.proposal.State.Rank, + Selector: s.proposal.State.Identifier, + AggregatedSignature: expectedSig, + }, nil) + // expected QC + s.onQCCreatedState.On("onQCCreated", mock.Anything).Run(func(args mock.Arguments) { + qc := args.Get(0).(models.QuorumCertificate) + // ensure that QC contains correct field + expectedQC := &helper.TestQuorumCertificate{ + Rank: s.proposal.State.Rank, + Selector: s.proposal.State.Identifier, + AggregatedSignature: expectedSig, + } + require.Equal(s.T(), expectedQC, qc) + }).Return(nil).Once() + + // add votes + for _, signer := range provingSigners { + vote := helper.VoteForStateFixture(s.proposal.State) + vote.ID = signer.Identity() + expectedSig := []byte(vote.Signature) + s.provingAggregator.On("Verify", vote.ID, expectedSig).Return(nil).Once() + s.provingAggregator.On("TrustedAdd", vote.ID, expectedSig).Run(func(args mock.Arguments) { + s.provingTotalWeight += s.sigWeight + }).Return(s.provingTotalWeight, nil).Once() + err := s.processor.Process(&vote) + require.NoError(s.T(), err) + } + + require.True(s.T(), s.processor.done.Load()) + s.onQCCreatedState.AssertExpectations(s.T()) + s.provingAggregator.AssertExpectations(s.T()) + + // processing extra votes shouldn't result in creating new QCs + vote := helper.VoteForStateFixture(s.proposal.State) + err := s.processor.Process(&vote) + require.NoError(s.T(), err) + + s.onQCCreatedState.AssertExpectations(s.T()) +} + +// TestProcess_ConcurrentCreatingQC tests a scenario where multiple goroutines process vote at same time, +// we expect only one QC created in this scenario. +func (s *VoteProcessorTestSuite) TestProcess_ConcurrentCreatingQC() { + provingSigners := s.allParticipants[:10] + // mock aggregators, so we have enough weight and shares for creating QC + s.provingAggregator = mocks.NewWeightedSignatureAggregator(s.T()) + s.provingAggregator.On("Verify", mock.Anything, mock.Anything).Return(nil) + s.provingAggregator.On("TrustedAdd", mock.Anything, mock.Anything).Return(s.minRequiredWeight, nil) + expectedSig := &helper.TestAggregatedSignature{ + Signature: make([]byte, 74), + PublicKey: make([]byte, 585), + Bitmask: []byte{0b11111111, 0b00000011}, + } + s.provingAggregator.On("Aggregate").Return(provingSigners, expectedSig, nil) + s.processor.provingSigAggtor = s.provingAggregator + + // at this point sending any vote should result in creating QC. + s.onQCCreatedState.On("onQCCreated", mock.Anything).Return(nil).Once() + + s.processor.votingProvider.(*mocks.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]).On( + "FinalizeQuorumCertificate", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(&helper.TestQuorumCertificate{ + Filter: nil, + Rank: s.proposal.State.Rank, + Selector: s.proposal.State.Identifier, + FrameNumber: s.proposal.State.Rank, + Timestamp: int64(s.proposal.State.Timestamp), + AggregatedSignature: expectedSig, + }, nil) + var startupWg, shutdownWg sync.WaitGroup + + vote := helper.VoteForStateFixture(s.proposal.State) + startupWg.Add(1) + // prepare goroutines, so they are ready to submit a vote at roughly same time + for i := 0; i < 5; i++ { + shutdownWg.Add(1) + go func() { + defer shutdownWg.Done() + startupWg.Wait() + err := s.processor.Process(&vote) + require.NoError(s.T(), err) + }() + } + + startupWg.Done() + + // wait for all routines to finish + shutdownWg.Wait() + + s.onQCCreatedState.AssertNumberOfCalls(s.T(), "onQCCreated", 1) +}