From 61fcbf35f9639338f46307ae44e5cc7e48c7fe79 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 6 Nov 2025 04:02:16 -0600 Subject: [PATCH] global consensus, plugged in and verified --- consensus/integration/instance_test.go | 7 +- consensus/mocks/vote_processor_factory.go | 16 +- .../pubsub/vote_collector_distributor.go | 4 +- consensus/pacemaker/rank_tracker.go | 1 + consensus/participant/participant.go | 2 +- .../weighted_signature_aggregator.go | 6 +- consensus/timeoutcollector/aggregation.go | 5 +- consensus/timeoutcollector/factory.go | 6 + .../timeout_processor_test.go | 6 +- consensus/validator/validator.go | 23 ++- consensus/verification/common.go | 33 +++- consensus/vote_collector.go | 1 + consensus/voteaggregator/vote_aggregator.go | 3 +- .../voteaggregator/vote_aggregator_test.go | 2 +- consensus/votecollector/factory.go | 6 +- consensus/votecollector/factory_test.go | 32 ++-- consensus/votecollector/statemachine.go | 7 + consensus/votecollector/statemachine_test.go | 6 +- consensus/votecollector/vote_processor.go | 3 +- .../consensus_signature_aggregator_wrapper.go | 4 +- node/consensus/app/app_consensus_engine.go | 24 ++- ...consensus_engine_chaos_integration_test.go | 19 +-- .../app_consensus_engine_integration_test.go | 37 ++--- node/consensus/app/consensus_sync_provider.go | 2 +- node/consensus/app/integration_helper_test.go | 3 + node/consensus/app/message_processors.go | 4 +- .../events/global_event_distributor.go | 1 + .../global/consensus_leader_provider.go | 5 + .../global/consensus_sync_provider.go | 18 ++- .../global/consensus_voting_provider.go | 39 +++-- node/consensus/global/event_distributor.go | 12 +- node/consensus/global/genesis.go | 28 ++-- .../global/global_consensus_engine.go | 127 +++++++++++---- ...lobal_consensus_engine_integration_test.go | 101 +++++------- node/consensus/global/message_processors.go | 146 ++++++++---------- node/consensus/global/message_subscription.go | 14 +- node/consensus/time/app_time_reel.go | 1 - node/consensus/time/app_time_reel_test.go | 122 +++++++-------- node/consensus/time/global_time_reel.go | 2 +- .../global_time_reel_equivocation_test.go | 18 +-- node/consensus/time/global_time_reel_test.go | 104 ++++++------- .../time/simple_equivocation_test.go | 6 +- node/consensus/voting/voting_aggregator.go | 24 ++- node/p2p/blossomsub.go | 2 + 44 files changed, 575 insertions(+), 457 deletions(-) diff --git a/consensus/integration/instance_test.go b/consensus/integration/instance_test.go index 2d7faa6..1b10056 100644 --- a/consensus/integration/instance_test.go +++ b/consensus/integration/instance_test.go @@ -411,10 +411,11 @@ func NewInstance(t *testing.T, options ...Option) *Instance { } voteProcessorFactory := mocks.NewVoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer](t) - voteProcessorFactory.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( - func(tracer consensus.TraceLogger, proposal *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote] { + voteProcessorFactory.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + func(tracer consensus.TraceLogger, filter []byte, proposal *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote] { processor, err := votecollector.NewBootstrapVoteProcessor[*helper.TestState, *helper.TestVote, *helper.TestPeer]( in.logger, + filter, in.committee, proposal.State, onQCCreated, @@ -475,7 +476,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance { }, nil }).Maybe() sigAgg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil).Maybe() - createCollectorFactoryMethod := votecollector.NewStateMachineFactory(in.logger, voteAggregationDistributor, voteProcessorFactory.Create, []byte{}, sigAgg, in.voting) + createCollectorFactoryMethod := votecollector.NewStateMachineFactory(in.logger, []byte{}, voteAggregationDistributor, voteProcessorFactory.Create, []byte{}, sigAgg, in.voting) voteCollectors := voteaggregator.NewVoteCollectors[*helper.TestState, *helper.TestVote](in.logger, livenessData.CurrentRank, workerpool.New(2), createCollectorFactoryMethod) // initialize the vote aggregator diff --git a/consensus/mocks/vote_processor_factory.go b/consensus/mocks/vote_processor_factory.go index d1b0a95..9c3a48a 100644 --- a/consensus/mocks/vote_processor_factory.go +++ b/consensus/mocks/vote_processor_factory.go @@ -15,8 +15,8 @@ type VoteProcessorFactory[StateT models.Unique, VoteT models.Unique, PeerIDT mod } // Create provides a mock function with given fields: tracer, proposal, dsTag, aggregator -func (_m *VoteProcessorFactory[StateT, VoteT, PeerIDT]) Create(tracer consensus.TraceLogger, proposal *models.SignedProposal[StateT, VoteT], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[StateT, VoteT, PeerIDT]) (consensus.VerifyingVoteProcessor[StateT, VoteT], error) { - ret := _m.Called(tracer, proposal, dsTag, aggregator, voter) +func (_m *VoteProcessorFactory[StateT, VoteT, PeerIDT]) Create(tracer consensus.TraceLogger, filter []byte, proposal *models.SignedProposal[StateT, VoteT], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[StateT, VoteT, PeerIDT]) (consensus.VerifyingVoteProcessor[StateT, VoteT], error) { + ret := _m.Called(tracer, filter, proposal, dsTag, aggregator, voter) if len(ret) == 0 { panic("no return value specified for Create") @@ -24,19 +24,19 @@ func (_m *VoteProcessorFactory[StateT, VoteT, PeerIDT]) Create(tracer consensus. var r0 consensus.VerifyingVoteProcessor[StateT, VoteT] var r1 error - if rf, ok := ret.Get(0).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) (consensus.VerifyingVoteProcessor[StateT, VoteT], error)); ok { - return rf(tracer, proposal, dsTag, aggregator, voter) + if rf, ok := ret.Get(0).(func(consensus.TraceLogger, []byte, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) (consensus.VerifyingVoteProcessor[StateT, VoteT], error)); ok { + return rf(tracer, filter, proposal, dsTag, aggregator, voter) } - if rf, ok := ret.Get(0).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) consensus.VerifyingVoteProcessor[StateT, VoteT]); ok { - r0 = rf(tracer, proposal, dsTag, aggregator, voter) + if rf, ok := ret.Get(0).(func(consensus.TraceLogger, []byte, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) consensus.VerifyingVoteProcessor[StateT, VoteT]); ok { + r0 = rf(tracer, filter, proposal, dsTag, aggregator, voter) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(consensus.VerifyingVoteProcessor[StateT, VoteT]) } } - if rf, ok := ret.Get(1).(func(consensus.TraceLogger, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) error); ok { - r1 = rf(tracer, proposal, dsTag, aggregator, voter) + if rf, ok := ret.Get(1).(func(consensus.TraceLogger, []byte, *models.SignedProposal[StateT, VoteT], []byte, consensus.SignatureAggregator, consensus.VotingProvider[StateT, VoteT, PeerIDT]) error); ok { + r1 = rf(tracer, filter, proposal, dsTag, aggregator, voter) } else { r1 = ret.Error(1) } diff --git a/consensus/notifications/pubsub/vote_collector_distributor.go b/consensus/notifications/pubsub/vote_collector_distributor.go index 58f3ad9..89af85b 100644 --- a/consensus/notifications/pubsub/vote_collector_distributor.go +++ b/consensus/notifications/pubsub/vote_collector_distributor.go @@ -8,8 +8,8 @@ import ( ) // VoteCollectorDistributor ingests notifications about vote aggregation and -// distributes them to consumers. Such notifications are produced by the vote aggregation logic. -// Concurrency safe. +// distributes them to consumers. Such notifications are produced by the vote +// aggregation logic. Concurrency safe. type VoteCollectorDistributor[VoteT models.Unique] struct { consumers []consensus.VoteCollectorConsumer[VoteT] lock sync.RWMutex diff --git a/consensus/pacemaker/rank_tracker.go b/consensus/pacemaker/rank_tracker.go index 1ece370..b844a55 100644 --- a/consensus/pacemaker/rank_tracker.go +++ b/consensus/pacemaker/rank_tracker.go @@ -108,6 +108,7 @@ func (vt *rankTracker[StateT, VoteT]) ReceiveTimeoutCertificate( tc models.TimeoutCertificate, ) (uint64, error) { rank := vt.livenessState.CurrentRank + if tc == nil { return rank, nil } diff --git a/consensus/participant/participant.go b/consensus/participant/participant.go index 186eef3..51d4339 100644 --- a/consensus/participant/participant.go +++ b/consensus/participant/participant.go @@ -142,7 +142,7 @@ func NewParticipant[ loop, err := eventloop.NewEventLoop( logger, eventHandler, - time.Now().Add(10*time.Second), + time.Now(), ) if err != nil { return nil, fmt.Errorf("could not initialize event loop: %w", err) diff --git a/consensus/signature/weighted_signature_aggregator.go b/consensus/signature/weighted_signature_aggregator.go index ce98d34..b58e1ad 100644 --- a/consensus/signature/weighted_signature_aggregator.go +++ b/consensus/signature/weighted_signature_aggregator.go @@ -108,8 +108,12 @@ func (w *WeightedSignatureAggregator) Verify( ok = w.aggregator.VerifySignatureRaw(info.pk, sig, w.message, w.dsTag) if !ok { return fmt.Errorf( - "invalid signature from %s: %w", + "invalid signature %x from %x (pk: %x, msg: %x, dsTag: %x): %w", + sig, signerID, + info.pk, + w.message, + w.dsTag, models.ErrInvalidSignature, ) } diff --git a/consensus/timeoutcollector/aggregation.go b/consensus/timeoutcollector/aggregation.go index 51db397..78a8610 100644 --- a/consensus/timeoutcollector/aggregation.go +++ b/consensus/timeoutcollector/aggregation.go @@ -36,6 +36,7 @@ type sigInfo struct { // verification was done outside the module. Implementation is thread-safe. type TimeoutSignatureAggregator struct { lock sync.RWMutex + filter []byte dsTag []byte aggregator consensus.SignatureAggregator idToInfo map[models.Identity]signerInfo // auxiliary map to lookup signer weight and public key (only gets updated by constructor) @@ -61,6 +62,7 @@ var _ consensus.TimeoutSignatureAggregator = (*TimeoutSignatureAggregator)(nil) // in the protocol. func NewTimeoutSignatureAggregator( aggregator consensus.SignatureAggregator, + filter []byte, rank uint64, // rank for which we are aggregating signatures ids []models.WeightedIdentity, // list of all authorized signers dsTag []byte, // domain separation tag used by the signature @@ -83,6 +85,7 @@ func NewTimeoutSignatureAggregator( return &TimeoutSignatureAggregator{ aggregator: aggregator, + filter: filter, dsTag: dsTag, idToInfo: idToInfo, idToSignature: make(map[models.Identity]sigInfo), @@ -125,7 +128,7 @@ func (a *TimeoutSignatureAggregator) VerifyAndAdd( ) } - msg := verification.MakeTimeoutMessage(a.rank, newestQCRank) + msg := verification.MakeTimeoutMessage(a.filter, a.rank, newestQCRank) valid := a.aggregator.VerifySignatureRaw(info.pk, sig, msg, a.dsTag) if !valid { return a.TotalWeight(), fmt.Errorf( diff --git a/consensus/timeoutcollector/factory.go b/consensus/timeoutcollector/factory.go index 754cf66..b4ac96d 100644 --- a/consensus/timeoutcollector/factory.go +++ b/consensus/timeoutcollector/factory.go @@ -60,6 +60,7 @@ type TimeoutProcessorFactory[ PeerIDT models.Unique, ] struct { tracer consensus.TraceLogger + filter []byte aggregator consensus.SignatureAggregator committee consensus.Replicas notifier consensus.TimeoutCollectorConsumer[VoteT] @@ -78,18 +79,22 @@ func NewTimeoutProcessorFactory[ PeerIDT models.Unique, ]( tracer consensus.TraceLogger, + filter []byte, aggregator consensus.SignatureAggregator, notifier consensus.TimeoutCollectorConsumer[VoteT], committee consensus.Replicas, validator consensus.Validator[StateT, VoteT], + voting consensus.VotingProvider[StateT, VoteT, PeerIDT], domainSeparationTag []byte, ) *TimeoutProcessorFactory[StateT, VoteT, PeerIDT] { return &TimeoutProcessorFactory[StateT, VoteT, PeerIDT]{ tracer: tracer, + filter: filter, aggregator: aggregator, committee: committee, notifier: notifier, validator: validator, + voting: voting, domainSeparationTag: domainSeparationTag, } } @@ -110,6 +115,7 @@ func (f *TimeoutProcessorFactory[StateT, VoteT, PeerIDT]) Create(rank uint64) ( sigAggregator, err := NewTimeoutSignatureAggregator( f.aggregator, + f.filter, rank, allParticipants, f.domainSeparationTag, diff --git a/consensus/timeoutcollector/timeout_processor_test.go b/consensus/timeoutcollector/timeout_processor_test.go index 1b51b5c..2a92e01 100644 --- a/consensus/timeoutcollector/timeout_processor_test.go +++ b/consensus/timeoutcollector/timeout_processor_test.go @@ -580,7 +580,7 @@ func TestTimeoutProcessor_BuildVerifyTC(t *testing.T) { sigagg.On("VerifySignatureRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true) sigagg.On("Aggregate", mock.Anything, mock.Anything).Return(&helper.TestAggregatedSignature{PublicKey: make([]byte, 585), Signature: make([]byte, 74), Bitmask: []byte{0b11111111, 0b00000111}}, nil) - aggregator, err := NewTimeoutSignatureAggregator(sigagg, rank, provingSignersSkeleton, []byte{}) + aggregator, err := NewTimeoutSignatureAggregator(sigagg, []byte{}, rank, provingSignersSkeleton, []byte{}) require.NoError(t, err) notifier := mocks.NewTimeoutCollectorConsumer[*helper.TestVote](t) @@ -609,7 +609,7 @@ func TestTimeoutProcessor_BuildVerifyTC(t *testing.T) { // at this point we have created QCs for rank N-1 and N additionally a TC for rank N, we can create TC for rank N+1 // with timeout states containing both QC and TC for rank N - aggregator, err = NewTimeoutSignatureAggregator(sigagg, rank+1, provingSignersSkeleton, []byte{}) + aggregator, err = NewTimeoutSignatureAggregator(sigagg, []byte{}, rank+1, provingSignersSkeleton, []byte{}) require.NoError(t, err) notifier = mocks.NewTimeoutCollectorConsumer[*helper.TestVote](t) @@ -663,7 +663,7 @@ func createRealQC( Timestamp: uint64(time.Now().UnixMilli()), AggregatedSignature: &helper.TestAggregatedSignature{PublicKey: make([]byte, 585), Signature: make([]byte, 74), Bitmask: []byte{0b11111111, 0b00000111}}, }, nil) - voteProcessor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, sigagg, votingProvider) + voteProcessor, err := voteProcessorFactory.Create(helper.Logger(), []byte{}, proposal, []byte{}, sigagg, votingProvider) require.NoError(t, err) for _, signer := range signers[1:] { diff --git a/consensus/validator/validator.go b/consensus/validator/validator.go index 2cff81e..8340b3f 100644 --- a/consensus/validator/validator.go +++ b/consensus/validator/validator.go @@ -68,12 +68,11 @@ func (v *Validator[StateT, VoteT]) ValidateTimeoutCertificate( signerIDs := []models.WeightedIdentity{} sigIndices := tc.GetAggregatedSignature().GetBitmask() totalWeight := uint64(0) + if len(sigIndices) < (len(allParticipants)+7)/8 { + return models.NewInsufficientSignaturesErrorf("insufficient signatures") + } for i, member := range allParticipants { - if len(sigIndices) < (i/8)+1 { - return models.NewInsufficientSignaturesErrorf("insufficient signatures") - } - - if sigIndices[i/8]>>i%8&1 == 1 { + if sigIndices[i/8]&(1<<(i%8)) == (1 << (i % 8)) { signerIDs = append(signerIDs, member) totalWeight += member.Weight() } @@ -217,14 +216,14 @@ func (v *Validator[StateT, VoteT]) ValidateQuorumCertificate( signerIDs := []models.WeightedIdentity{} sigIndices := qc.GetAggregatedSignature().GetBitmask() totalWeight := uint64(0) + if len(sigIndices) < (len(allParticipants)+7)/8 { + return newInvalidQuorumCertificateError( + qc, + models.NewInsufficientSignaturesErrorf("insufficient signatures"), + ) + } for i, member := range allParticipants { - if len(sigIndices) < (i/8)+1 { - return newInvalidQuorumCertificateError( - qc, - models.NewInsufficientSignaturesErrorf("insufficient signatures"), - ) - } - if sigIndices[i/8]>>i%8&1 == 1 { + if sigIndices[i/8]&(1<<(i%8)) == (1 << (i % 8)) { signerIDs = append(signerIDs, member) totalWeight += member.Weight() } diff --git a/consensus/verification/common.go b/consensus/verification/common.go index 25d5422..fa79301 100644 --- a/consensus/verification/common.go +++ b/consensus/verification/common.go @@ -3,6 +3,7 @@ package verification import ( "encoding/binary" "fmt" + "slices" "source.quilibrium.com/quilibrium/monorepo/consensus" "source.quilibrium.com/quilibrium/monorepo/consensus/models" @@ -13,21 +14,33 @@ import ( // structure that is signed contains the sometimes redundant rank number and // state ID; this allows us to create the signed message and verify the signed // message without having the full state contents. -func MakeVoteMessage(rank uint64, stateID models.Identity) []byte { - msg := []byte{} - binary.BigEndian.AppendUint64(msg, rank) - msg = append(msg, stateID[:]...) - return msg +func MakeVoteMessage( + filter []byte, + rank uint64, + stateID models.Identity, +) []byte { + return slices.Concat( + filter, + binary.BigEndian.AppendUint64( + slices.Clone([]byte(stateID)), + rank, + ), + ) } // MakeTimeoutMessage generates the message we have to sign in order to be able // to contribute to Active Pacemaker protocol. Each replica signs with the // highest QC rank known to that replica. -func MakeTimeoutMessage(rank uint64, newestQCRank uint64) []byte { +func MakeTimeoutMessage( + filter []byte, + rank uint64, + newestQCRank uint64, +) []byte { msg := make([]byte, 16) binary.BigEndian.PutUint64(msg[:8], rank) binary.BigEndian.PutUint64(msg[8:], newestQCRank) - return msg + + return slices.Concat(filter, msg) } // verifyAggregatedSignatureOneMessage encapsulates the logic of verifying an @@ -79,6 +92,7 @@ func verifyAggregatedSignatureOneMessage( // edge cases in the logic (i.e. as fatal) func verifyTCSignatureManyMessages( validator consensus.SignatureAggregator, + filter []byte, pks [][]byte, sigData []byte, rank uint64, @@ -91,7 +105,10 @@ func verifyTCSignatureManyMessages( messages := make([][]byte, 0, len(pks)) for i := 0; i < len(pks); i++ { - messages = append(messages, MakeTimeoutMessage(rank, highQCRanks[i])) + messages = append( + messages, + MakeTimeoutMessage(filter, rank, highQCRanks[i]), + ) } valid := validator.VerifySignatureMultiMessage( diff --git a/consensus/vote_collector.go b/consensus/vote_collector.go index a82790a..7caeeb7 100644 --- a/consensus/vote_collector.go +++ b/consensus/vote_collector.go @@ -134,6 +134,7 @@ type VoteProcessorFactory[ // * models.InvalidProposalError - proposal has invalid proposer vote Create( tracer TraceLogger, + filter []byte, proposal *models.SignedProposal[StateT, VoteT], dsTag []byte, aggregator SignatureAggregator, diff --git a/consensus/voteaggregator/vote_aggregator.go b/consensus/voteaggregator/vote_aggregator.go index f59a70e..8720c2c 100644 --- a/consensus/voteaggregator/vote_aggregator.go +++ b/consensus/voteaggregator/vote_aggregator.go @@ -293,9 +293,10 @@ func (va *VoteAggregator[StateT, VoteT]) processQueuedState( // VoteAggregator needs to make sure that it's submitting for processing // ONLY valid states. return fmt.Errorf( - "received invalid state for processing %x at rank %d", + "received invalid state for processing %x at rank %d: %+w", state.State.Identifier, state.State.Rank, + err, ) } return fmt.Errorf( diff --git a/consensus/voteaggregator/vote_aggregator_test.go b/consensus/voteaggregator/vote_aggregator_test.go index 02225fc..11d3f5d 100644 --- a/consensus/voteaggregator/vote_aggregator_test.go +++ b/consensus/voteaggregator/vote_aggregator_test.go @@ -101,7 +101,7 @@ func (s *VoteAggregatorTestSuite) TestProcessInvalidState() { select { case err := <-s.errs: require.Error(s.T(), err) - require.False(s.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) + require.True(s.T(), models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) case <-time.After(100 * time.Millisecond): s.T().Fatalf("expected error but haven't received anything") } diff --git a/consensus/votecollector/factory.go b/consensus/votecollector/factory.go index 2a854f5..a39cee6 100644 --- a/consensus/votecollector/factory.go +++ b/consensus/votecollector/factory.go @@ -23,6 +23,7 @@ type baseFactory[ PeerIDT models.Unique, ] func( tracer consensus.TraceLogger, + filter []byte, state *models.State[StateT], dsTag []byte, aggregator consensus.SignatureAggregator, @@ -54,6 +55,7 @@ var _ consensus.VoteProcessorFactory[*nilUnique, *nilUnique, *nilUnique] = (*Vot // * models.InvalidProposalError - proposal has invalid proposer vote func (f *VoteProcessorFactory[StateT, VoteT, PeerIDT]) Create( tracer consensus.TraceLogger, + filter []byte, proposal *models.SignedProposal[StateT, VoteT], dsTag []byte, aggregator consensus.SignatureAggregator, @@ -61,6 +63,7 @@ func (f *VoteProcessorFactory[StateT, VoteT, PeerIDT]) Create( ) (consensus.VerifyingVoteProcessor[StateT, VoteT], error) { processor, err := f.baseFactory( tracer, + filter, proposal.State, dsTag, aggregator, @@ -127,6 +130,7 @@ func NewBootstrapVoteProcessor[ PeerIDT models.Unique, ]( tracer consensus.TraceLogger, + filter []byte, committee consensus.DynamicCommittee, state *models.State[StateT], onQCCreated consensus.OnQuorumCertificateCreated, @@ -138,7 +142,7 @@ func NewBootstrapVoteProcessor[ committee: committee, onQCCreated: onQCCreated, } - return factory.Create(tracer, state, dsTag, aggregator, votingProvider) + return factory.Create(tracer, filter, state, dsTag, aggregator, votingProvider) } // Type used to satisfy generic arguments in compiler time type assertion check diff --git a/consensus/votecollector/factory_test.go b/consensus/votecollector/factory_test.go index d9eb008..b0e86bb 100644 --- a/consensus/votecollector/factory_test.go +++ b/consensus/votecollector/factory_test.go @@ -23,15 +23,15 @@ func TestVoteProcessorFactory_CreateWithValidProposal(t *testing.T) { vote, err := proposal.ProposerVote() require.NoError(t, err) mockedProcessor.On("Process", vote).Return(nil).Once() - mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() + mockedFactory.On("Create", helper.Logger(), []byte{}, proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ - baseFactory: func(log consensus.TraceLogger, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { - return mockedFactory.Create(log, proposal, dsTag, aggregator, votingProvider) + baseFactory: func(log consensus.TraceLogger, filter []byte, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return mockedFactory.Create(log, filter, proposal, dsTag, aggregator, votingProvider) }, } - processor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) + processor, err := voteProcessorFactory.Create(helper.Logger(), []byte{}, proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) require.NoError(t, err) require.NotNil(t, processor) @@ -50,15 +50,15 @@ func TestVoteProcessorFactory_CreateWithInvalidVote(t *testing.T) { vote, err := proposal.ProposerVote() require.NoError(t, err) mockedProcessor.On("Process", vote).Return(models.NewInvalidVoteErrorf(vote, "")).Once() - mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() + mockedFactory.On("Create", helper.Logger(), []byte{}, proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ - baseFactory: func(log consensus.TraceLogger, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { - return mockedFactory.Create(log, proposal, dsTag, aggregator, votingProvider) + baseFactory: func(log consensus.TraceLogger, filter []byte, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return mockedFactory.Create(log, filter, proposal, dsTag, aggregator, votingProvider) }, } - processor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) + processor, err := voteProcessorFactory.Create(helper.Logger(), []byte{}, proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) require.Error(t, err) require.Nil(t, processor) require.True(t, models.IsInvalidProposalError[*helper.TestState, *helper.TestVote](err)) @@ -73,15 +73,15 @@ func TestVoteProcessorFactory_CreateWithInvalidVote(t *testing.T) { require.NoError(t, err) mockedProcessor.On("Process", vote).Return(exception).Once() - mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() + mockedFactory.On("Create", helper.Logger(), []byte{}, proposal, mock.Anything, mock.Anything, mock.Anything).Return(mockedProcessor, nil).Once() voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ - baseFactory: func(log consensus.TraceLogger, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { - return mockedFactory.Create(log, proposal, dsTag, aggregator, votingProvider) + baseFactory: func(log consensus.TraceLogger, filter []byte, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return mockedFactory.Create(log, filter, proposal, dsTag, aggregator, votingProvider) }, } - processor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) + processor, err := voteProcessorFactory.Create(helper.Logger(), []byte{}, proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) require.ErrorIs(t, err, exception) require.Nil(t, processor) // an unexpected exception should _not_ be interpreted as the state being invalid @@ -101,14 +101,14 @@ func TestVoteProcessorFactory_CreateProcessException(t *testing.T) { proposal := helper.MakeSignedProposal[*helper.TestState, *helper.TestVote]() exception := errors.New("create-exception") - mockedFactory.On("Create", helper.Logger(), proposal, mock.Anything, mock.Anything, mock.Anything).Return(nil, exception).Once() + mockedFactory.On("Create", helper.Logger(), []byte{}, proposal, mock.Anything, mock.Anything, mock.Anything).Return(nil, exception).Once() voteProcessorFactory := &VoteProcessorFactory[*helper.TestState, *helper.TestVote, *helper.TestPeer]{ - baseFactory: func(log consensus.TraceLogger, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { - return mockedFactory.Create(log, proposal, dsTag, aggregator, votingProvider) + baseFactory: func(log consensus.TraceLogger, filter []byte, state *models.State[*helper.TestState], dsTag []byte, aggregator consensus.SignatureAggregator, votingProvider consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + return mockedFactory.Create(log, filter, proposal, dsTag, aggregator, votingProvider) }, } - processor, err := voteProcessorFactory.Create(helper.Logger(), proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) + processor, err := voteProcessorFactory.Create(helper.Logger(), []byte{}, proposal, []byte{}, mocks.NewSignatureAggregator(t), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](t)) require.ErrorIs(t, err, exception) require.Nil(t, processor) // an unexpected exception should _not_ be interpreted as the state being invalid diff --git a/consensus/votecollector/statemachine.go b/consensus/votecollector/statemachine.go index b40f42d..e314422 100644 --- a/consensus/votecollector/statemachine.go +++ b/consensus/votecollector/statemachine.go @@ -24,6 +24,7 @@ type VerifyingVoteProcessorFactory[ PeerIDT models.Unique, ] = func( tracer consensus.TraceLogger, + filter []byte, proposal *models.SignedProposal[StateT, VoteT], dsTag []byte, aggregator consensus.SignatureAggregator, @@ -39,6 +40,7 @@ type VoteCollector[ ] struct { sync.Mutex tracer consensus.TraceLogger + filter []byte workers consensus.Workers notifier consensus.VoteAggregationConsumer[StateT, VoteT] createVerifyingProcessor VerifyingVoteProcessorFactory[StateT, VoteT, PeerIDT] @@ -72,6 +74,7 @@ func NewStateMachineFactory[ PeerIDT models.Unique, ]( tracer consensus.TraceLogger, + filter []byte, notifier consensus.VoteAggregationConsumer[StateT, VoteT], verifyingVoteProcessorFactory VerifyingVoteProcessorFactory[ StateT, @@ -88,6 +91,7 @@ func NewStateMachineFactory[ ) { return NewStateMachine[StateT, VoteT]( rank, + filter, tracer, workers, notifier, @@ -105,6 +109,7 @@ func NewStateMachine[ PeerIDT models.Unique, ]( rank uint64, + filter []byte, tracer consensus.TraceLogger, workers consensus.Workers, notifier consensus.VoteAggregationConsumer[StateT, VoteT], @@ -119,6 +124,7 @@ func NewStateMachine[ ) *VoteCollector[StateT, VoteT, PeerIDT] { sm := &VoteCollector[StateT, VoteT, PeerIDT]{ tracer: tracer, + filter: filter, workers: workers, notifier: notifier, createVerifyingProcessor: verifyingVoteProcessorFactory, @@ -346,6 +352,7 @@ func (m *VoteCollector[StateT, VoteT, PeerIDT]) caching2Verifying( stateID := proposal.State.Identifier newProc, err := m.createVerifyingProcessor( m.tracer, + m.filter, proposal, m.dsTag, m.aggregator, diff --git a/consensus/votecollector/statemachine_test.go b/consensus/votecollector/statemachine_test.go index 8423ae4..3b5978e 100644 --- a/consensus/votecollector/statemachine_test.go +++ b/consensus/votecollector/statemachine_test.go @@ -48,7 +48,7 @@ func (s *StateMachineTestSuite) SetupTest() { s.mockedProcessors = make(map[models.Identity]*mocks.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote]) s.notifier = mocks.NewVoteAggregationConsumer[*helper.TestState, *helper.TestVote](s.T()) - s.factoryMethod = func(log consensus.TraceLogger, state *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + s.factoryMethod = func(log consensus.TraceLogger, filter []byte, state *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { if processor, found := s.mockedProcessors[state.State.Identifier]; found { return processor, nil } @@ -56,7 +56,7 @@ func (s *StateMachineTestSuite) SetupTest() { } s.workerPool = workerpool.New(4) - s.collector = NewStateMachine(s.rank, helper.Logger(), s.workerPool, s.notifier, s.factoryMethod, []byte{}, consensus.SignatureAggregator(mocks.NewSignatureAggregator(s.T())), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](s.T())) + s.collector = NewStateMachine(s.rank, []byte{}, helper.Logger(), s.workerPool, s.notifier, s.factoryMethod, []byte{}, consensus.SignatureAggregator(mocks.NewSignatureAggregator(s.T())), mocks.NewVotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer](s.T())) } // prepareMockedProcessor prepares a mocked processor and stores it in map, later it will be used @@ -96,7 +96,7 @@ func (s *StateMachineTestSuite) TestStatus_StateTransitions() { // factory are handed through (potentially wrapped), but are not replaced. func (s *StateMachineTestSuite) Test_FactoryErrorPropagation() { factoryError := errors.New("factory error") - factory := func(log consensus.TraceLogger, state *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { + factory := func(log consensus.TraceLogger, filter []byte, state *models.SignedProposal[*helper.TestState, *helper.TestVote], dsTag []byte, aggregator consensus.SignatureAggregator, voter consensus.VotingProvider[*helper.TestState, *helper.TestVote, *helper.TestPeer]) (consensus.VerifyingVoteProcessor[*helper.TestState, *helper.TestVote], error) { return nil, factoryError } s.collector.createVerifyingProcessor = factory diff --git a/consensus/votecollector/vote_processor.go b/consensus/votecollector/vote_processor.go index 3a93e03..d542ebe 100644 --- a/consensus/votecollector/vote_processor.go +++ b/consensus/votecollector/vote_processor.go @@ -36,6 +36,7 @@ type provingVoteProcessorFactoryBase[ // Caller must treat all errors as exceptions func (f *provingVoteProcessorFactoryBase[StateT, VoteT, PeerIDT]) Create( tracer consensus.TraceLogger, + filter []byte, state *models.State[StateT], dsTag []byte, aggregator consensus.SignatureAggregator, @@ -47,7 +48,7 @@ func (f *provingVoteProcessorFactoryBase[StateT, VoteT, PeerIDT]) Create( } // message that has to be verified against aggregated signature - msg := verification.MakeVoteMessage(state.Rank, state.Identifier) + msg := verification.MakeVoteMessage(filter, state.Rank, state.Identifier) // prepare the proving public keys of participants provingKeys := make([][]byte, 0, len(allParticipants)) diff --git a/node/consensus/aggregator/consensus_signature_aggregator_wrapper.go b/node/consensus/aggregator/consensus_signature_aggregator_wrapper.go index f9585bc..187a475 100644 --- a/node/consensus/aggregator/consensus_signature_aggregator_wrapper.go +++ b/node/consensus/aggregator/consensus_signature_aggregator_wrapper.go @@ -60,8 +60,8 @@ func (c *ConsensusSignatureAggregatorWrapper) Aggregate( bitmask := make([]byte, (len(provers)+7)/8) for i, p := range provers { - if _, ok := pubs[string(p.PublicKey)]; !ok { - bitmask[i/8] |= 1 << (i % 8) + if _, ok := pubs[string(p.PublicKey)]; ok { + bitmask[i/8] |= (1 << (i % 8)) } } diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index f859246..68400b7 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -435,10 +435,14 @@ func NewAppConsensusEngine( engine.voteAggregator, err = voting.NewAppShardVoteAggregator[PeerID]( tracing.NewZapTracer(logger), + appAddress, engine, voteAggregationDistributor, engine.signatureAggregator, engine.votingProvider, + func(qc models.QuorumCertificate) { + engine.consensusParticipant.OnQuorumCertificateConstructedFromVotes(qc) + }, (*initialState).GetRank(), ) if err != nil { @@ -446,10 +450,12 @@ func NewAppConsensusEngine( } engine.timeoutAggregator, err = voting.NewAppShardTimeoutAggregator[PeerID]( tracing.NewZapTracer(logger), + appAddress, engine, engine, engine.signatureAggregator, timeoutAggregationDistributor, + engine.votingProvider, (*initialState).GetRank(), ) @@ -1112,7 +1118,7 @@ func (e *AppConsensusEngine) initializeGenesis() *protobufs.AppShardFrame { e.frameStore[string(frameID)] = genesisFrame e.frameStoreMu.Unlock() - if err := e.appTimeReel.Insert(e.ctx, genesisFrame); err != nil { + if err := e.appTimeReel.Insert(genesisFrame); err != nil { e.logger.Error("failed to add genesis frame to time reel", zap.Error(err)) e.frameStoreMu.Lock() delete(e.frameStore, string(frameID)) @@ -1667,9 +1673,13 @@ func (e *AppConsensusEngine) VerifyQuorumCertificate( pubkeys := [][]byte{} signatures := [][]byte{} - if (len(provers) + 7/8) > len(qc.AggregateSignature.Bitmask) { + if ((len(provers) + 7) / 8) > len(qc.AggregateSignature.Bitmask) { return errors.Wrap( - errors.New("bitmask invalid for prover set"), + errors.Errorf( + "bitmask invalid for prover set, expected: %d, actual: %d", + ((len(provers)+7)/8), + len(qc.AggregateSignature.Bitmask), + ), "verify quorum certificate", ) } @@ -1739,9 +1749,13 @@ func (e *AppConsensusEngine) VerifyTimeoutCertificate( pubkeys := [][]byte{} signatures := [][]byte{} - if (len(provers) + 7/8) > len(tc.AggregateSignature.Bitmask) { + if ((len(provers) + 7) / 8) > len(tc.AggregateSignature.Bitmask) { return errors.Wrap( - errors.New("bitmask invalid for prover set"), + errors.Errorf( + "bitmask invalid for prover set, expected: %d, actual: %d", + ((len(provers)+7)/8), + len(tc.AggregateSignature.Bitmask), + ), "verify timeout certificate", ) } diff --git a/node/consensus/app/app_consensus_engine_chaos_integration_test.go b/node/consensus/app/app_consensus_engine_chaos_integration_test.go index ff7a9cb..b1a7235 100644 --- a/node/consensus/app/app_consensus_engine_chaos_integration_test.go +++ b/node/consensus/app/app_consensus_engine_chaos_integration_test.go @@ -327,10 +327,10 @@ func TestAppConsensusEngine_Integration_ChaosScenario(t *testing.T) { // Subscribe to frames pubsub.Subscribe(engine.getConsensusMessageBitmask(), func(message *pb.Message) error { - frame := &protobufs.AppShardFrame{} + frame := &protobufs.AppShardProposal{} if err := frame.FromCanonicalBytes(message.Data); err == nil { node.mu.Lock() - node.frameHistory = append(node.frameHistory, frame) + node.frameHistory = append(node.frameHistory, frame.State) node.mu.Unlock() } return nil @@ -657,14 +657,16 @@ func TestAppConsensusEngine_Integration_ChaosScenario(t *testing.T) { voterAddress := nodes[nodeIdx].engine.getAddressFromPublicKey(publicKey) // Create vote message - vote := &protobufs.FrameVote{ + vote := &protobufs.ProposalVote{ FrameNumber: frame.Header.FrameNumber, - Proposer: frame.Header.Prover, - Approve: true, + Filter: frame.Header.Address, + Rank: frame.GetRank(), + Selector: []byte(frame.Identity()), PublicKeySignatureBls48581: &protobufs.BLS48581AddressedSignature{ Address: voterAddress, Signature: sig, }, + Timestamp: uint64(time.Now().UnixMilli()), } // Serialize and publish @@ -979,13 +981,6 @@ func TestAppConsensusEngine_Integration_ChaosScenario(t *testing.T) { // Stop all nodes t.Log("\nStep 8: Cleanup") for i, node := range nodes { - // Unregister executors - node.mu.RLock() - for name := range node.executors { - node.engine.UnregisterExecutor(name, 0, true) - } - node.mu.RUnlock() - // Stop engine node.engine.Stop(true) close(node.quit) diff --git a/node/consensus/app/app_consensus_engine_integration_test.go b/node/consensus/app/app_consensus_engine_integration_test.go index 2f4aae4..57ddac4 100644 --- a/node/consensus/app/app_consensus_engine_integration_test.go +++ b/node/consensus/app/app_consensus_engine_integration_test.go @@ -252,30 +252,24 @@ func TestAppConsensusEngine_Integration_BasicFrameProgression(t *testing.T) { typePrefix := binary.BigEndian.Uint32(message.Data[:4]) switch typePrefix { - case protobufs.AppShardFrameType: - frame := &protobufs.AppShardFrame{} + case protobufs.AppShardProposalType: + frame := &protobufs.AppShardProposal{} if err := frame.FromCanonicalBytes(message.Data); err != nil { return errors.New("error") } framesMu.Lock() - frameHistory = append(frameHistory, frame) + frameHistory = append(frameHistory, frame.State) framesMu.Unlock() - case protobufs.ProverLivenessCheckType: - livenessCheck := &protobufs.ProverLivenessCheck{} - if err := livenessCheck.FromCanonicalBytes(message.Data); err != nil { - return errors.New("error") - } - - case protobufs.FrameVoteType: - vote := &protobufs.FrameVote{} + case protobufs.ProposalVoteType: + vote := &protobufs.ProposalVote{} if err := vote.FromCanonicalBytes(message.Data); err != nil { return errors.New("error") } - case protobufs.FrameConfirmationType: - confirmation := &protobufs.FrameConfirmation{} - if err := confirmation.FromCanonicalBytes(message.Data); err != nil { + case protobufs.TimeoutStateType: + state := &protobufs.TimeoutState{} + if err := state.FromCanonicalBytes(message.Data); err != nil { return errors.New("error") } @@ -359,7 +353,6 @@ func TestAppConsensusEngine_Integration_BasicFrameProgression(t *testing.T) { // Stop t.Log("Step 8: Cleaning up") - engine.UnregisterExecutor("test-executor", 0, false) engine.Stop(false) } @@ -1253,10 +1246,6 @@ func TestAppConsensusEngine_Integration_GlobalAppCoordination(t *testing.T) { } }() - // Start event distributor - err = eventDistributor.Start(ctx) - require.NoError(t, err) - // Don't add initial frame - let the time reel initialize itself // Create app engine @@ -1374,7 +1363,6 @@ func TestAppConsensusEngine_Integration_GlobalAppCoordination(t *testing.T) { eventsMu.Unlock() eventDistributor.Unsubscribe("test-tracker") - eventDistributor.Stop() engine.Stop(false) } @@ -3203,14 +3191,14 @@ func TestAppConsensusEngine_Integration_AlertStopsProgression(t *testing.T) { typePrefix := binary.BigEndian.Uint32(data[:4]) // Check if it's a GlobalFrame - if typePrefix == protobufs.AppShardFrameType { - frame := &protobufs.AppShardFrame{} + if typePrefix == protobufs.AppShardProposalType { + frame := &protobufs.AppShardProposal{} if err := frame.FromCanonicalBytes(data); err == nil { mu.Lock() if afterAlert { - afterAlertFrames = append(afterAlertFrames, frame) + afterAlertFrames = append(afterAlertFrames, frame.State) } else { - publishedFrames = append(publishedFrames, frame) + publishedFrames = append(publishedFrames, frame.State) } mu.Unlock() } @@ -3258,6 +3246,5 @@ func TestAppConsensusEngine_Integration_AlertStopsProgression(t *testing.T) { require.Equal(t, 0, afterAlertCount) // Stop - engine.UnregisterExecutor("test-executor", 0, false) engine.Stop(false) } diff --git a/node/consensus/app/consensus_sync_provider.go b/node/consensus/app/consensus_sync_provider.go index 20e94ab..2bfdc9f 100644 --- a/node/consensus/app/consensus_sync_provider.go +++ b/node/consensus/app/consensus_sync_provider.go @@ -317,7 +317,7 @@ func (p *AppSyncProvider) syncWithPeer( return latest, errors.Wrap(err, "sync") } - err = p.engine.appTimeReel.Insert(p.engine.ctx, response.Frame) + err = p.engine.appTimeReel.Insert(response.Frame) if err != nil { return latest, errors.Wrap(err, "sync") } diff --git a/node/consensus/app/integration_helper_test.go b/node/consensus/app/integration_helper_test.go index 37a7946..9de38f6 100644 --- a/node/consensus/app/integration_helper_test.go +++ b/node/consensus/app/integration_helper_test.go @@ -497,6 +497,9 @@ func registerProverInHypergraphWithFilter(t *testing.T, hg thypergraph.Hypergrap t.Fatalf("Failed to insert status: %v", err) } + err = tree.Insert([]byte{3 << 2}, []byte{0, 0, 0, 0, 0, 0, 3, 232}, nil, big.NewInt(0)) // seniority = 1000 + require.NoError(t, err) + // Type Index: typeBI, _ := poseidon.HashBytes( slices.Concat(bytes.Repeat([]byte{0xff}, 32), []byte("prover:Prover")), diff --git a/node/consensus/app/message_processors.go b/node/consensus/app/message_processors.go index 93f0cd7..d77dab8 100644 --- a/node/consensus/app/message_processors.go +++ b/node/consensus/app/message_processors.go @@ -194,7 +194,7 @@ func (e *AppConsensusEngine) handleFrameMessage(message *pb.Message) { e.frameStore[string(frameID)] = frame e.frameStoreMu.Unlock() - if err := e.appTimeReel.Insert(e.ctx, frame); err != nil { + if err := e.appTimeReel.Insert(frame); err != nil { // Success metric recorded at the end of processing framesProcessedTotal.WithLabelValues("error").Inc() return @@ -274,7 +274,7 @@ func (e *AppConsensusEngine) handleGlobalFrameMessage(message *pb.Message) { return } - if err := e.globalTimeReel.Insert(e.ctx, frame); err != nil { + if err := e.globalTimeReel.Insert(frame); err != nil { // Success metric recorded at the end of processing globalFramesProcessedTotal.WithLabelValues("error").Inc() return diff --git a/node/consensus/events/global_event_distributor.go b/node/consensus/events/global_event_distributor.go index cbed453..ceffbd8 100644 --- a/node/consensus/events/global_event_distributor.go +++ b/node/consensus/events/global_event_distributor.go @@ -52,6 +52,7 @@ func (g *GlobalEventDistributor) Start( go g.trackUptime() <-ctx.Done() + g.wg.Wait() g.mu.Lock() g.running = false for _, ch := range g.subscribers { diff --git a/node/consensus/global/consensus_leader_provider.go b/node/consensus/global/consensus_leader_provider.go index 7cfd74b..2dd3748 100644 --- a/node/consensus/global/consensus_leader_provider.go +++ b/node/consensus/global/consensus_leader_provider.go @@ -68,6 +68,11 @@ func (p *GlobalLeaderProvider) ProveNextState( filter []byte, priorState models.Identity, ) (**protobufs.GlobalFrame, error) { + _, err := p.engine.livenessProvider.Collect(ctx) + if err != nil { + return nil, errors.Wrap(err, "prove next state") + } + timer := prometheus.NewTimer(frameProvingDuration) defer timer.ObserveDuration() diff --git a/node/consensus/global/consensus_sync_provider.go b/node/consensus/global/consensus_sync_provider.go index 7d6bbb4..5610bf7 100644 --- a/node/consensus/global/consensus_sync_provider.go +++ b/node/consensus/global/consensus_sync_provider.go @@ -84,7 +84,7 @@ func (p *GlobalSyncProvider) Synchronize( return } - err = p.syncWithMesh() + err = p.syncWithMesh(ctx) if err != nil { dataCh <- existing errCh <- err @@ -112,7 +112,7 @@ func (p *GlobalSyncProvider) Synchronize( return dataCh, errCh } -func (p *GlobalSyncProvider) syncWithMesh() error { +func (p *GlobalSyncProvider) syncWithMesh(ctx context.Context) error { p.engine.logger.Info("synchronizing with peers") latest, err := p.engine.globalTimeReel.GetHead() @@ -162,7 +162,7 @@ func (p *GlobalSyncProvider) syncWithMesh() error { latest = head } - latest, err = p.syncWithPeer(latest, []byte(peerID)) + latest, err = p.syncWithPeer(ctx, latest, []byte(peerID)) if err != nil { p.engine.logger.Debug("error syncing frame", zap.Error(err)) } @@ -178,6 +178,7 @@ func (p *GlobalSyncProvider) syncWithMesh() error { } func (p *GlobalSyncProvider) syncWithPeer( + ctx context.Context, latest *protobufs.GlobalFrame, peerId []byte, ) (*protobufs.GlobalFrame, error) { @@ -188,7 +189,7 @@ func (p *GlobalSyncProvider) syncWithPeer( ) syncTimeout := p.engine.config.Engine.SyncTimeout - dialCtx, cancelDial := context.WithTimeout(p.engine.ctx, syncTimeout) + dialCtx, cancelDial := context.WithTimeout(ctx, syncTimeout) defer cancelDial() cc, err := p.engine.pubsub.GetDirectChannel(dialCtx, peerId, "sync") if err != nil { @@ -206,7 +207,7 @@ func (p *GlobalSyncProvider) syncWithPeer( client := protobufs.NewGlobalServiceClient(cc) for { - getCtx, cancelGet := context.WithTimeout(p.engine.ctx, syncTimeout) + getCtx, cancelGet := context.WithTimeout(ctx, syncTimeout) response, err := client.GetGlobalFrame( getCtx, &protobufs.GetGlobalFrameRequest{ @@ -254,7 +255,7 @@ func (p *GlobalSyncProvider) syncWithPeer( return latest, errors.Wrap(err, "sync") } - err = p.engine.globalTimeReel.Insert(p.engine.ctx, response.Frame) + err = p.engine.globalTimeReel.Insert(response.Frame) if err != nil { return latest, errors.Wrap(err, "sync") } @@ -264,6 +265,7 @@ func (p *GlobalSyncProvider) syncWithPeer( } func (p *GlobalSyncProvider) hyperSyncWithProver( + ctx context.Context, prover []byte, shardKey tries.ShardKey, ) { @@ -275,7 +277,7 @@ func (p *GlobalSyncProvider) hyperSyncWithProver( peerId, err := peer.IDFromPublicKey(pubKey) if err == nil { ch, err := p.engine.pubsub.GetDirectChannel( - p.engine.ctx, + ctx, []byte(peerId), "sync", ) @@ -283,7 +285,7 @@ func (p *GlobalSyncProvider) hyperSyncWithProver( if err == nil { defer ch.Close() client := protobufs.NewHypergraphComparisonServiceClient(ch) - str, err := client.HyperStream(p.engine.ctx) + str, err := client.HyperStream(ctx) if err != nil { p.engine.logger.Error("error from sync", zap.Error(err)) } else { diff --git a/node/consensus/global/consensus_voting_provider.go b/node/consensus/global/consensus_voting_provider.go index b536aa2..a054277 100644 --- a/node/consensus/global/consensus_voting_provider.go +++ b/node/consensus/global/consensus_voting_provider.go @@ -2,7 +2,6 @@ package global import ( "context" - "encoding/binary" "time" "github.com/pkg/errors" @@ -25,11 +24,17 @@ func (p *GlobalVotingProvider) FinalizeQuorumCertificate( aggregatedSignature models.AggregatedSignature, ) (models.QuorumCertificate, error) { return &protobufs.QuorumCertificate{ - Rank: (*state.State).GetRank(), - FrameNumber: (*state.State).Header.FrameNumber, - Selector: []byte((*state.State).Identity()), - Timestamp: uint64(time.Now().UnixMilli()), - AggregateSignature: aggregatedSignature.(*protobufs.BLS48581AggregateSignature), + Rank: (*state.State).GetRank(), + FrameNumber: (*state.State).Header.FrameNumber, + Selector: []byte((*state.State).Identity()), + Timestamp: uint64(time.Now().UnixMilli()), + AggregateSignature: &protobufs.BLS48581AggregateSignature{ + Signature: aggregatedSignature.GetSignature(), + PublicKey: &protobufs.BLS48581G2PublicKey{ + KeyValue: aggregatedSignature.GetPubKey(), + }, + Bitmask: aggregatedSignature.GetBitmask(), + }, }, nil } @@ -46,7 +51,13 @@ func (p *GlobalVotingProvider) FinalizeTimeout( LatestRanks: latestQuorumCertificateRanks, LatestQuorumCertificate: latestQuorumCertificate.(*protobufs.QuorumCertificate), Timestamp: uint64(time.Now().UnixMilli()), - AggregateSignature: aggregatedSignature.(*protobufs.BLS48581AggregateSignature), + AggregateSignature: &protobufs.BLS48581AggregateSignature{ + Signature: aggregatedSignature.GetSignature(), + PublicKey: &protobufs.BLS48581G2PublicKey{ + KeyValue: aggregatedSignature.GetPubKey(), + }, + Bitmask: aggregatedSignature.GetBitmask(), + }, }, nil } @@ -69,6 +80,7 @@ func (p *GlobalVotingProvider) SignTimeoutVote( // Create vote (signature) signatureData := verification.MakeTimeoutMessage( + nil, currentRank, newestQuorumCertificateRank, ) @@ -85,7 +97,7 @@ func (p *GlobalVotingProvider) SignTimeoutVote( vote := &protobufs.ProposalVote{ FrameNumber: 0, Rank: currentRank, - Selector: binary.BigEndian.AppendUint64(nil, currentRank), + Selector: nil, Timestamp: uint64(time.Now().UnixMilli()), PublicKeySignatureBls48581: &protobufs.BLS48581AddressedSignature{ Address: voterAddress, @@ -112,14 +124,11 @@ func (p *GlobalVotingProvider) SignVote( } // Create vote (signature) - signatureData, err := p.engine.frameProver.GetGlobalFrameSignaturePayload( - (*state.State).Header, + signatureData := verification.MakeVoteMessage( + nil, + state.Rank, + state.Identifier, ) - if err != nil { - p.engine.logger.Error("could not get signature payload", zap.Error(err)) - return nil, errors.Wrap(err, "sign vote") - } - sig, err := signer.SignWithDomain(signatureData, []byte("global")) if err != nil { p.engine.logger.Error("could not sign vote", zap.Error(err)) diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index 6585540..a06995d 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -2,6 +2,7 @@ package global import ( "bytes" + "context" "encoding/hex" "fmt" "math/rand" @@ -91,7 +92,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( allocated = allocated && w.Allocated } if !allocated { - e.evaluateForProposals(data) + e.evaluateForProposals(ctx, data) } } } @@ -204,7 +205,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( go func() { for { select { - case <-e.ctx.Done(): + case <-ctx.Done(): return case <-time.After(10 * time.Second): e.logger.Error( @@ -226,7 +227,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( go func() { for { select { - case <-e.ctx.Done(): + case <-ctx.Done(): return case <-time.After(10 * time.Second): e.logger.Error( @@ -357,6 +358,7 @@ func (e *GlobalConsensusEngine) estimateSeniorityFromConfig() uint64 { } func (e *GlobalConsensusEngine) evaluateForProposals( + ctx context.Context, data *consensustime.GlobalEvent, ) { self, err := e.proverRegistry.GetProverInfo(e.getProverAddress()) @@ -393,7 +395,7 @@ func (e *GlobalConsensusEngine) evaluateForProposals( } idx := rand.Int63n(int64(len(ps))) - e.syncProvider.hyperSyncWithProver(ps[idx].Address, key) + e.syncProvider.hyperSyncWithProver(ctx, ps[idx].Address, key) for _, shard := range shards { path := []int{} @@ -442,7 +444,7 @@ func (e *GlobalConsensusEngine) evaluateForProposals( size := e.hypergraph.GetSize(&key, path) resp, err := e.hypergraph.GetChildrenForPath( - e.ctx, + ctx, &protobufs.GetChildrenForPathRequest{ ShardKey: slices.Concat(key.L1[:], key.L2[:]), Path: shard.Path, diff --git a/node/consensus/global/genesis.go b/node/consensus/global/genesis.go index 4454a2f..0dc797c 100644 --- a/node/consensus/global/genesis.go +++ b/node/consensus/global/genesis.go @@ -1,6 +1,7 @@ package global import ( + "bytes" _ "embed" "encoding/base64" "encoding/binary" @@ -271,27 +272,30 @@ func (e *GlobalConsensusEngine) initializeGenesis() ( if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil { txn.Abort() e.logger.Error("could not add frame", zap.Error(err)) - e.ctx.Throw(err) return nil, nil } genesisQC := &protobufs.QuorumCertificate{ - Rank: 0, - Filter: []byte{}, - FrameNumber: genesisFrame.Header.FrameNumber, - Selector: []byte(genesisFrame.Identity()), - Timestamp: 0, - AggregateSignature: &protobufs.BLS48581AggregateSignature{}, + Rank: 0, + Filter: []byte{}, + FrameNumber: genesisFrame.Header.FrameNumber, + Selector: []byte(genesisFrame.Identity()), + Timestamp: 0, + AggregateSignature: &protobufs.BLS48581AggregateSignature{ + PublicKey: &protobufs.BLS48581G2PublicKey{ + KeyValue: make([]byte, 585), + }, + Signature: make([]byte, 74), + Bitmask: bytes.Repeat([]byte{0xff}, 32), + }, } if err := e.clockStore.PutQuorumCertificate(genesisQC, txn); err != nil { txn.Abort() e.logger.Error("could not add quorum certificate", zap.Error(err)) - e.ctx.Throw(err) return nil, nil } if err := txn.Commit(); err != nil { txn.Abort() e.logger.Error("could not add frame", zap.Error(err)) - e.ctx.Throw(err) return nil, nil } if err = e.consensusStore.PutLivenessState( @@ -301,7 +305,6 @@ func (e *GlobalConsensusEngine) initializeGenesis() ( }, ); err != nil { e.logger.Error("could not add liveness state", zap.Error(err)) - e.ctx.Throw(err) return nil, nil } if err = e.consensusStore.PutConsensusState( @@ -311,7 +314,6 @@ func (e *GlobalConsensusEngine) initializeGenesis() ( }, ); err != nil { e.logger.Error("could not add consensus state", zap.Error(err)) - e.ctx.Throw(err) return nil, nil } @@ -444,7 +446,7 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame { state = hgstate.NewHypergraphState(e.hypergraph) for _, pubkey := range proverPubKeys { - err = e.addGenesisProver(rdfMultiprover, state, pubkey, 0, 0) + err = e.addGenesisProver(rdfMultiprover, state, pubkey, 1000, 0) if err != nil { e.logger.Error("error adding prover", zap.Error(err)) return nil @@ -522,13 +524,11 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame { if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil { txn.Abort() e.logger.Error("could not add frame", zap.Error(err)) - e.ctx.Throw(err) return nil } if err := txn.Commit(); err != nil { txn.Abort() e.logger.Error("could not add frame", zap.Error(err)) - e.ctx.Throw(err) return nil } diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 6806838..7641de0 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -139,8 +139,6 @@ type GlobalConsensusEngine struct { halt context.CancelFunc // Internal state - ctx lifecycle.SignalerContext - cancel context.CancelFunc quit chan struct{} wg sync.WaitGroup minimumProvers func() uint64 @@ -467,6 +465,7 @@ func NewGlobalConsensusEngine( ready lifecycle.ReadyFunc, ) { if err := engine.workerManager.Start(ctx); err != nil { + engine.logger.Error("could not start worker manager", zap.Error(err)) ctx.Throw(err) return } @@ -516,17 +515,26 @@ func NewGlobalConsensusEngine( voteAggregationDistributor, engine.signatureAggregator, engine.votingProvider, + func(qc models.QuorumCertificate) { + select { + case <-engine.haltCtx.Done(): + return + default: + } + engine.consensusParticipant.OnQuorumCertificateConstructedFromVotes(qc) + }, state.Rank(), ) if err != nil { return nil, err } - engine.timeoutAggregator, err = voting.NewAppShardTimeoutAggregator[GlobalPeerID]( + engine.timeoutAggregator, err = voting.NewGlobalTimeoutAggregator[GlobalPeerID]( tracing.NewZapTracer(logger), engine, engine, engine.signatureAggregator, timeoutAggregationDistributor, + engine.votingProvider, state.Rank(), ) @@ -536,6 +544,7 @@ func NewGlobalConsensusEngine( ready lifecycle.ReadyFunc, ) { if err := engine.startConsensus(state, ctx, ready); err != nil { + engine.logger.Error("could not start consensus", zap.Error(err)) ctx.Throw(err) return } @@ -549,42 +558,36 @@ func NewGlobalConsensusEngine( // Subscribe to global consensus if participating err = engine.subscribeToGlobalConsensus() if err != nil { - engine.ctx.Throw(errors.Wrap(err, "start")) return nil, err } // Subscribe to shard consensus messages to broker lock agreement err = engine.subscribeToShardConsensusMessages() if err != nil { - engine.ctx.Throw(err) return nil, errors.Wrap(err, "start") } // Subscribe to frames err = engine.subscribeToFrameMessages() if err != nil { - engine.ctx.Throw(err) return nil, errors.Wrap(err, "start") } // Subscribe to prover messages err = engine.subscribeToProverMessages() if err != nil { - engine.ctx.Throw(err) return nil, errors.Wrap(err, "start") } // Subscribe to peer info messages err = engine.subscribeToPeerInfoMessages() if err != nil { - engine.ctx.Throw(err) return nil, errors.Wrap(err, "start") } // Subscribe to alert messages err = engine.subscribeToAlertMessages() if err != nil { - engine.ctx.Throw(err) return nil, errors.Wrap(err, "start") } @@ -2160,7 +2163,7 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin( wg.Go(func() error { client := protobufs.NewDataIPCServiceClient(svc) resp, err := client.CreateJoinProof( - e.ctx, + context.TODO(), &protobufs.CreateJoinProofRequest{ Challenge: challenge[:], Difficulty: frame.Header.Difficulty, @@ -2393,6 +2396,9 @@ func (e *GlobalConsensusEngine) startConsensus( } ready() + e.voteAggregator.Start(ctx) + e.timeoutAggregator.Start(ctx) + <-lifecycle.AllReady(e.voteAggregator, e.timeoutAggregator) e.consensusParticipant.Start(ctx) return nil } @@ -2424,6 +2430,11 @@ func (e *GlobalConsensusEngine) OnDoubleProposeDetected( proposal1 *models.State[*protobufs.GlobalFrame], proposal2 *models.State[*protobufs.GlobalFrame], ) { + select { + case <-e.haltCtx.Done(): + return + default: + } e.eventDistributor.Publish(typesconsensus.ControlEvent{ Type: typesconsensus.ControlEventGlobalEquivocation, Data: &consensustime.GlobalEvent{ @@ -2467,7 +2478,12 @@ func (e *GlobalConsensusEngine) OnOwnProposal( ], targetPublicationTime time.Time, ) { - var priorTC *protobufs.TimeoutCertificate + select { + case <-e.haltCtx.Done(): + return + default: + } + var priorTC *protobufs.TimeoutCertificate = nil if proposal.PreviousRankTimeoutCertificate != nil { priorTC = proposal.PreviousRankTimeoutCertificate.(*protobufs.TimeoutCertificate) @@ -2485,6 +2501,7 @@ func (e *GlobalConsensusEngine) OnOwnProposal( return } + e.voteAggregator.AddState(proposal) e.consensusParticipant.SubmitProposal(proposal) if err := e.pubsub.PublishToBitmask( @@ -2499,6 +2516,12 @@ func (e *GlobalConsensusEngine) OnOwnProposal( func (e *GlobalConsensusEngine) OnOwnTimeout( timeout *models.TimeoutState[*protobufs.ProposalVote], ) { + select { + case <-e.haltCtx.Done(): + return + default: + } + var priorTC *protobufs.TimeoutCertificate if timeout.PriorRankTimeoutCertificate != nil { priorTC = @@ -2533,6 +2556,12 @@ func (e *GlobalConsensusEngine) OnOwnVote( vote **protobufs.ProposalVote, recipientID models.Identity, ) { + select { + case <-e.haltCtx.Done(): + return + default: + } + data, err := (*vote).ToCanonicalBytes() if err != nil { e.logger.Error("could not serialize timeout", zap.Error(err)) @@ -2605,6 +2634,9 @@ func (e *GlobalConsensusEngine) OnStartingTimeout( func (e *GlobalConsensusEngine) OnStateIncorporated( state *models.State[*protobufs.GlobalFrame], ) { + if err := e.globalTimeReel.Insert(*state.State); err != nil { + e.logger.Error("unable to insert frame into time reel", zap.Error(err)) + } } // OnTimeoutCertificateTriggeredRankChange implements consensus.Consumer. @@ -2638,9 +2670,13 @@ func (e *GlobalConsensusEngine) VerifyQuorumCertificate( pubkeys := [][]byte{} signatures := [][]byte{} - if (len(provers) + 7/8) > len(qc.AggregateSignature.Bitmask) { + if ((len(provers) + 7) / 8) > len(qc.AggregateSignature.Bitmask) { return errors.Wrap( - errors.New("bitmask invalid for prover set"), + errors.Errorf( + "bitmask invalid for prover set, expected: %d, actual: %d", + ((len(provers)+7)/8), + len(qc.AggregateSignature.Bitmask), + ), "verify quorum certificate", ) } @@ -2669,13 +2705,7 @@ func (e *GlobalConsensusEngine) VerifyQuorumCertificate( if valid := e.blsConstructor.VerifySignatureRaw( qc.AggregateSignature.GetPubKey(), qc.AggregateSignature.GetSignature(), - binary.BigEndian.AppendUint64( - binary.BigEndian.AppendUint64( - slices.Concat(qc.Filter, qc.Selector), - qc.Rank, - ), - qc.FrameNumber, - ), + verification.MakeVoteMessage(nil, qc.Rank, qc.Identity()), []byte("global"), ); !valid { return errors.Wrap( @@ -2710,9 +2740,13 @@ func (e *GlobalConsensusEngine) VerifyTimeoutCertificate( pubkeys := [][]byte{} signatures := [][]byte{} - if (len(provers) + 7/8) > len(tc.AggregateSignature.Bitmask) { + if ((len(provers) + 7) / 8) > len(tc.AggregateSignature.Bitmask) { return errors.Wrap( - errors.New("bitmask invalid for prover set"), + errors.Errorf( + "bitmask invalid for prover set, expected: %d, actual: %d", + ((len(provers)+7)/8), + len(tc.AggregateSignature.Bitmask), + ), "verify timeout certificate", ) } @@ -2741,12 +2775,10 @@ func (e *GlobalConsensusEngine) VerifyTimeoutCertificate( if valid := e.blsConstructor.VerifySignatureRaw( tc.AggregateSignature.GetPubKey(), tc.AggregateSignature.GetSignature(), - binary.BigEndian.AppendUint64( - binary.BigEndian.AppendUint64( - slices.Clone(tc.Filter), - tc.Rank, - ), - tc.LatestQuorumCertificate.GetRank(), + verification.MakeTimeoutMessage( + nil, + tc.Rank, + tc.LatestQuorumCertificate.Rank, ), []byte("globaltimeout"), ); !valid { @@ -2763,7 +2795,44 @@ func (e *GlobalConsensusEngine) VerifyTimeoutCertificate( func (e *GlobalConsensusEngine) VerifyVote( vote **protobufs.ProposalVote, ) error { - panic("unimplemented") + if vote == nil || *vote == nil { + return errors.Wrap(errors.New("nil vote"), "verify vote") + } + + if err := (*vote).Validate(); err != nil { + return errors.Wrap(err, "verify vote") + } + + provers, err := e.proverRegistry.GetActiveProvers(nil) + if err != nil { + return errors.Wrap(err, "verify vote") + } + + var pubkey []byte + for _, p := range provers { + if bytes.Equal(p.Address, (*vote).PublicKeySignatureBls48581.Address) { + pubkey = p.PublicKey + break + } + } + + if bytes.Equal(pubkey, []byte{}) { + return errors.Wrap(errors.New("invalid prover"), "verify vote") + } + + if valid := e.blsConstructor.VerifySignatureRaw( + pubkey, + (*vote).PublicKeySignatureBls48581.Signature, + verification.MakeVoteMessage(nil, (*vote).Rank, (*vote).Source()), + []byte("global"), + ); !valid { + return errors.Wrap( + errors.New("invalid signature"), + "verify vote", + ) + } + + return nil } var _ consensus.DynamicCommittee = (*GlobalConsensusEngine)(nil) diff --git a/node/consensus/global/global_consensus_engine_integration_test.go b/node/consensus/global/global_consensus_engine_integration_test.go index fef1a6f..bcf4cd9 100644 --- a/node/consensus/global/global_consensus_engine_integration_test.go +++ b/node/consensus/global/global_consensus_engine_integration_test.go @@ -119,11 +119,11 @@ func (m *mockIntegrationPubSub) PublishToBitmask(bitmask []byte, data []byte) er typePrefix := binary.BigEndian.Uint32(data[:4]) // Check if it's a GlobalFrame - if typePrefix == protobufs.GlobalFrameType { - frame := &protobufs.GlobalFrame{} + if typePrefix == protobufs.GlobalProposalType { + frame := &protobufs.GlobalProposal{} if err := frame.FromCanonicalBytes(data); err == nil { m.mu.Lock() - m.frames = append(m.frames, frame) + m.frames = append(m.frames, frame.State) m.mu.Unlock() } } @@ -276,6 +276,9 @@ func registerProverInHypergraph(t *testing.T, hg thypergraph.Hypergraph, publicK t.Fatalf("Failed to insert status: %v", err) } + err = tree.Insert([]byte{3 << 2}, []byte{0, 0, 0, 0, 0, 0, 3, 232}, nil, big.NewInt(0)) // seniority = 1000 + require.NoError(t, err) + // Type Index: typeBI, _ := poseidon.HashBytes( slices.Concat(bytes.Repeat([]byte{0xff}, 32), []byte("prover:Prover")), @@ -545,7 +548,7 @@ func createIntegrationTestGlobalConsensusEngineWithHypergraphAndKey( &verenc.MPCitHVerifiableEncryptor{}, // verEnc &bulletproofs.Decaf448KeyConstructor{}, // decafConstructor compiler.NewBedlamCompiler(), - nil, + bc, qp2p.NewInMemoryPeerInfoManager(logger), ) require.NoError(t, err) @@ -574,11 +577,11 @@ func TestGlobalConsensusEngine_Integration_BasicFrameProgression(t *testing.T) { typePrefix := binary.BigEndian.Uint32(data[:4]) // Check if it's a GlobalFrame - if typePrefix == protobufs.GlobalFrameType { - frame := &protobufs.GlobalFrame{} + if typePrefix == protobufs.GlobalProposalType { + frame := &protobufs.GlobalProposal{} if err := frame.FromCanonicalBytes(data); err == nil { mu.Lock() - publishedFrames = append(publishedFrames, frame) + publishedFrames = append(publishedFrames, frame.State) mu.Unlock() } } @@ -586,7 +589,7 @@ func TestGlobalConsensusEngine_Integration_BasicFrameProgression(t *testing.T) { } // Start the engine - ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background()) + ctx, cancel, errChan := lifecycle.WithSignallerAndCancel(context.Background()) err := engine.Start(ctx) require.NoError(t, err) @@ -618,44 +621,7 @@ func TestGlobalConsensusEngine_Integration_BasicFrameProgression(t *testing.T) { t.Logf("Published %d frames", frameCount) // Stop the engine - <-engine.Stop(false) -} - -func TestGlobalConsensusEngine_Integration_StateTransitions(t *testing.T) { - // Generate hosts for testing - _, m, cleanupHosts := tests.GenerateSimnetHosts(t, 1, []libp2p.Option{}) - defer cleanupHosts() - - engine, _, _, _ := createIntegrationTestGlobalConsensusEngine(t, []byte(m.Nodes[0].ID()), 99, m.Nodes[0], m.Keys[0], m.Nodes) - - // Track state transitions - transitions := make([]string, 0) - var mu sync.Mutex - - // Start the engine - ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background()) - err := engine.Start(ctx) - require.NoError(t, err) - - // Check for startup errors - select { - case err := <-errChan: - require.NoError(t, err) - case <-time.After(100 * time.Millisecond): - // No error is good - } - - // Wait for state transitions - time.Sleep(10 * time.Second) - - // Verify we had some state transitions - mu.Lock() - transitionCount := len(transitions) - mu.Unlock() - - assert.Greater(t, transitionCount, 0, "Expected at least one state transition") - - // Stop the engine + cancel() <-engine.Stop(false) } @@ -749,13 +715,13 @@ func TestGlobalConsensusEngine_Integration_MultiNodeConsensus(t *testing.T) { typePrefix := binary.BigEndian.Uint32(data[:4]) // Check if it's a GlobalFrame - if typePrefix == protobufs.GlobalFrameType { - frame := &protobufs.GlobalFrame{} + if typePrefix == protobufs.GlobalProposalType { + frame := &protobufs.GlobalProposal{} if err := frame.FromCanonicalBytes(data); err == nil { mu.Lock() - allFrames[nodeIdx] = append(allFrames[nodeIdx], frame) + allFrames[nodeIdx] = append(allFrames[nodeIdx], frame.State) mu.Unlock() - t.Logf("Node %d published frame %d", nodeIdx+1, frame.Header.FrameNumber) + t.Logf("Node %d published frame %d", nodeIdx+1, frame.State.Header.FrameNumber) } } } @@ -783,11 +749,13 @@ func TestGlobalConsensusEngine_Integration_MultiNodeConsensus(t *testing.T) { }) } + cancels := []func(){} // Start all engines for i := 0; i < 6; i++ { - ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background()) + ctx, cancel, errChan := lifecycle.WithSignallerAndCancel(context.Background()) err := engines[i].Start(ctx) require.NoError(t, err) + cancels = append(cancels, cancel) // Check for startup errors select { @@ -869,6 +837,7 @@ loop: // Stop all engines for i := 0; i < 6; i++ { + cancels[i]() <-engines[i].Stop(false) } } @@ -927,7 +896,7 @@ func TestGlobalConsensusEngine_Integration_ShardCoverage(t *testing.T) { engine.eventDistributor = eventDistributor // Start the event distributor - ctx, _, _ := lifecycle.WithSignallerAndCancel(context.Background()) + ctx, cancel, _ := lifecycle.WithSignallerAndCancel(context.Background()) err := engine.Start(ctx) require.NoError(t, err) @@ -954,7 +923,8 @@ func TestGlobalConsensusEngine_Integration_ShardCoverage(t *testing.T) { require.False(t, newHeadAfter) // Stop the event distributor - engine.Stop(false) + cancel() + <-engine.Stop(false) } // TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying tests that engines @@ -974,7 +944,7 @@ func TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying(t *testing. engines := make([]*GlobalConsensusEngine, numNodes) pubsubs := make([]*mockIntegrationPubSub, numNodes) - quits := make([]chan struct{}, numNodes) + cancels := make([]func(), numNodes) // Create shared hypergraph with NO provers registered inclusionProver := bls48581.NewKZGInclusionProver(logger) @@ -1083,13 +1053,12 @@ func TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying(t *testing. &verenc.MPCitHVerifiableEncryptor{}, // verEnc &bulletproofs.Decaf448KeyConstructor{}, // decafConstructor compiler.NewBedlamCompiler(), - nil, // blsConstructor + bc, // blsConstructor qp2p.NewInMemoryPeerInfoManager(logger), ) require.NoError(t, err) engines[i] = engine - quits[i] = make(chan struct{}) } // Wire up all pubsubs to each other @@ -1103,9 +1072,11 @@ func TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying(t *testing. // Start all engines for i := 0; i < numNodes; i++ { - ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background()) + ctx, cancel, errChan := lifecycle.WithSignallerAndCancel(context.Background()) err := engines[i].Start(ctx) require.NoError(t, err) + cancels[i] = cancel + select { case err := <-errChan: require.NoError(t, err) @@ -1130,7 +1101,7 @@ func TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying(t *testing. // Stop all engines for i := 0; i < numNodes; i++ { - close(quits[i]) + cancels[i]() <-engines[i].Stop(false) } @@ -1159,14 +1130,14 @@ func TestGlobalConsensusEngine_Integration_AlertStopsProgression(t *testing.T) { typePrefix := binary.BigEndian.Uint32(data[:4]) // Check if it's a GlobalFrame - if typePrefix == protobufs.GlobalFrameType { - frame := &protobufs.GlobalFrame{} + if typePrefix == protobufs.GlobalProposalType { + frame := &protobufs.GlobalProposal{} if err := frame.FromCanonicalBytes(data); err == nil { mu.Lock() if afterAlert { - afterAlertFrames = append(afterAlertFrames, frame) + afterAlertFrames = append(afterAlertFrames, frame.State) } else { - publishedFrames = append(publishedFrames, frame) + publishedFrames = append(publishedFrames, frame.State) } mu.Unlock() } @@ -1175,7 +1146,7 @@ func TestGlobalConsensusEngine_Integration_AlertStopsProgression(t *testing.T) { } // Start the engine - ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background()) + ctx, cancel, errChan := lifecycle.WithSignallerAndCancel(context.Background()) err := engine.Start(ctx) require.NoError(t, err) @@ -1236,6 +1207,7 @@ func TestGlobalConsensusEngine_Integration_AlertStopsProgression(t *testing.T) { require.Equal(t, 0, afterAlertCount) // Stop the engine + cancel() <-engine.Stop(false) } @@ -1261,6 +1233,9 @@ func registerProverInHypergraphWithFilter(t *testing.T, hg thypergraph.Hypergrap t.Fatalf("Failed to insert status: %v", err) } + err = tree.Insert([]byte{3 << 2}, []byte{0, 0, 0, 0, 0, 0, 3, 232}, nil, big.NewInt(0)) // seniority = 1000 + require.NoError(t, err) + // Type Index: typeBI, _ := poseidon.HashBytes( slices.Concat(bytes.Repeat([]byte{0xff}, 32), []byte("prover:Prover")), diff --git a/node/consensus/global/message_processors.go b/node/consensus/global/message_processors.go index 9c0d946..b2e2680 100644 --- a/node/consensus/global/message_processors.go +++ b/node/consensus/global/message_processors.go @@ -2,6 +2,7 @@ package global import ( "bytes" + "context" "encoding/binary" "encoding/hex" "fmt" @@ -87,7 +88,7 @@ func (e *GlobalConsensusEngine) processFrameMessageQueue( case <-ctx.Done(): return case message := <-e.globalFrameMessageQueue: - e.handleFrameMessage(message) + e.handleFrameMessage(ctx, message) } } } @@ -217,7 +218,10 @@ func (e *GlobalConsensusEngine) handleProverMessage(message *pb.Message) { } } -func (e *GlobalConsensusEngine) handleFrameMessage(message *pb.Message) { +func (e *GlobalConsensusEngine) handleFrameMessage( + ctx context.Context, + message *pb.Message, +) { defer func() { if r := recover(); r != nil { e.logger.Error( @@ -249,7 +253,7 @@ func (e *GlobalConsensusEngine) handleFrameMessage(message *pb.Message) { clone := frame.Clone().(*protobufs.GlobalFrame) e.frameStoreMu.Unlock() - if err := e.globalTimeReel.Insert(e.ctx, clone); err != nil { + if err := e.globalTimeReel.Insert(clone); err != nil { // Success metric recorded at the end of processing framesProcessedTotal.WithLabelValues("error").Inc() return @@ -799,6 +803,11 @@ func (e *GlobalConsensusEngine) handleAlertMessage(message *pb.Message) { } func (e *GlobalConsensusEngine) handleProposal(message *pb.Message) { + // Skip our own messages + if bytes.Equal(message.From, e.pubsub.GetPeerID()) { + return + } + timer := prometheus.NewTimer(proposalProcessingDuration) defer timer.ObserveDuration() @@ -815,28 +824,46 @@ func (e *GlobalConsensusEngine) handleProposal(message *pb.Message) { e.frameStore[string(frameID)] = proposal.State e.frameStoreMu.Unlock() - e.consensusParticipant.SubmitProposal( - &models.SignedProposal[*protobufs.GlobalFrame, *protobufs.ProposalVote]{ - Proposal: models.Proposal[*protobufs.GlobalFrame]{ - State: &models.State[*protobufs.GlobalFrame]{ - Rank: proposal.State.GetRank(), - Identifier: proposal.State.Identity(), - ProposerID: proposal.Vote.Identity(), - ParentQuorumCertificate: proposal.ParentQuorumCertificate, - Timestamp: proposal.State.GetTimestamp(), - State: &proposal.State, - }, - PreviousRankTimeoutCertificate: proposal.PriorRankTimeoutCertificate, + // Small gotcha: the proposal structure uses interfaces, so we can't assign + // directly, otherwise the nil values for the structs will fail the nil + // check on the interfaces (and would incur costly reflection if we wanted + // to check it directly) + pqc := proposal.ParentQuorumCertificate + prtc := proposal.PriorRankTimeoutCertificate + signedProposal := &models.SignedProposal[*protobufs.GlobalFrame, *protobufs.ProposalVote]{ + Proposal: models.Proposal[*protobufs.GlobalFrame]{ + State: &models.State[*protobufs.GlobalFrame]{ + Rank: proposal.State.GetRank(), + Identifier: proposal.State.Identity(), + ProposerID: proposal.Vote.Identity(), + Timestamp: proposal.State.GetTimestamp(), + State: &proposal.State, }, - Vote: &proposal.Vote, }, - ) + Vote: &proposal.Vote, + } + + if pqc != nil { + signedProposal.Proposal.State.ParentQuorumCertificate = pqc + } + + if prtc != nil { + signedProposal.PreviousRankTimeoutCertificate = prtc + } + + e.voteAggregator.AddState(signedProposal) + e.consensusParticipant.SubmitProposal(signedProposal) // Success metric recorded at the end of processing proposalProcessedTotal.WithLabelValues("success").Inc() } func (e *GlobalConsensusEngine) handleVote(message *pb.Message) { + // Skip our own messages + if bytes.Equal(message.From, e.pubsub.GetPeerID()) { + return + } + timer := prometheus.NewTimer(voteProcessingDuration) defer timer.ObserveDuration() @@ -859,7 +886,6 @@ func (e *GlobalConsensusEngine) handleVote(message *pb.Message) { voteProcessedTotal.WithLabelValues("error").Inc() } - // Validate the voter's signature proverSet, err := e.proverRegistry.GetActiveProvers(nil) if err != nil { e.logger.Error("could not get active provers", zap.Error(err)) @@ -893,65 +919,17 @@ func (e *GlobalConsensusEngine) handleVote(message *pb.Message) { return } - // Find the proposal frame for this vote - e.frameStoreMu.RLock() - var proposalFrame *protobufs.GlobalFrame = nil - for _, frame := range e.frameStore { - if frame.Header != nil && - frame.Header.FrameNumber == vote.FrameNumber && - bytes.Equal( - e.getAddressFromPublicKey( - frame.Header.PublicKeySignatureBls48581.PublicKey.KeyValue, - ), - []byte(vote.Identity()), - ) { - proposalFrame = frame - break - } - } - e.frameStoreMu.RUnlock() - - if proposalFrame == nil { - e.logger.Warn( - "vote for unknown proposal", - zap.Uint64("frame_number", vote.FrameNumber), - zap.String("proposer", hex.EncodeToString([]byte(vote.Identity()))), - ) - voteProcessedTotal.WithLabelValues("error").Inc() - return - } - - // Get the signature payload for the proposal - signatureData, err := e.frameProver.GetGlobalFrameSignaturePayload( - proposalFrame.Header, - ) - if err != nil { - e.logger.Error("could not get signature payload", zap.Error(err)) - voteProcessedTotal.WithLabelValues("error").Inc() - return - } - - // Validate the vote signature - valid, err := e.keyManager.ValidateSignature( - crypto.KeyTypeBLS48581G1, - voterPublicKey, - signatureData, - vote.PublicKeySignatureBls48581.Signature, - []byte("global"), - ) - - if err != nil || !valid { - e.logger.Error("invalid vote signature", zap.Error(err)) - voteProcessedTotal.WithLabelValues("error").Inc() - return - } - e.voteAggregator.AddVote(&vote) voteProcessedTotal.WithLabelValues("success").Inc() } func (e *GlobalConsensusEngine) handleTimeoutState(message *pb.Message) { + // Skip our own messages + if bytes.Equal(message.From, e.pubsub.GetPeerID()) { + return + } + timer := prometheus.NewTimer(voteProcessingDuration) defer timer.ObserveDuration() @@ -969,13 +947,25 @@ func (e *GlobalConsensusEngine) handleTimeoutState(message *pb.Message) { return } - e.timeoutAggregator.AddTimeout(&models.TimeoutState[*protobufs.ProposalVote]{ - Rank: timeoutState.Vote.Rank, - LatestQuorumCertificate: timeoutState.LatestQuorumCertificate, - PriorRankTimeoutCertificate: timeoutState.PriorRankTimeoutCertificate, - Vote: &timeoutState.Vote, - TimeoutTick: timeoutState.TimeoutTick, - }) + // Small gotcha: the timeout structure uses interfaces, so we can't assign + // directly, otherwise the nil values for the structs will fail the nil + // check on the interfaces (and would incur costly reflection if we wanted + // to check it directly) + lqc := timeoutState.LatestQuorumCertificate + prtc := timeoutState.PriorRankTimeoutCertificate + timeout := &models.TimeoutState[*protobufs.ProposalVote]{ + Rank: timeoutState.Vote.Rank, + Vote: &timeoutState.Vote, + TimeoutTick: timeoutState.TimeoutTick, + } + if lqc != nil { + timeout.LatestQuorumCertificate = lqc + } + if prtc != nil { + timeout.PriorRankTimeoutCertificate = prtc + } + + e.timeoutAggregator.AddTimeout(timeout) voteProcessedTotal.WithLabelValues("success").Inc() } diff --git a/node/consensus/global/message_subscription.go b/node/consensus/global/message_subscription.go index bd370f3..7089c2f 100644 --- a/node/consensus/global/message_subscription.go +++ b/node/consensus/global/message_subscription.go @@ -28,7 +28,7 @@ func (e *GlobalConsensusEngine) subscribeToGlobalConsensus() error { return nil case e.globalConsensusMessageQueue <- message: return nil - case <-e.ctx.Done(): + case <-e.ShutdownSignal(): return errors.New("context cancelled") default: e.logger.Warn("global message queue full, dropping message") @@ -59,7 +59,7 @@ func (e *GlobalConsensusEngine) subscribeToGlobalConsensus() error { return nil case e.appFramesMessageQueue <- message: return nil - case <-e.ctx.Done(): + case <-e.ShutdownSignal(): return errors.New("context cancelled") default: e.logger.Warn("app frames message queue full, dropping message") @@ -100,7 +100,7 @@ func (e *GlobalConsensusEngine) subscribeToShardConsensusMessages() error { return nil case e.shardConsensusMessageQueue <- message: return nil - case <-e.ctx.Done(): + case <-e.ShutdownSignal(): return errors.New("context cancelled") default: e.logger.Warn("shard consensus queue full, dropping message") @@ -137,7 +137,7 @@ func (e *GlobalConsensusEngine) subscribeToFrameMessages() error { return nil case e.globalFrameMessageQueue <- message: return nil - case <-e.ctx.Done(): + case <-e.ShutdownSignal(): return errors.New("context cancelled") default: e.logger.Warn("global frame queue full, dropping message") @@ -177,7 +177,7 @@ func (e *GlobalConsensusEngine) subscribeToProverMessages() error { case e.globalProverMessageQueue <- message: e.logger.Debug("received prover message") return nil - case <-e.ctx.Done(): + case <-e.ShutdownSignal(): return errors.New("context cancelled") default: e.logger.Warn("global prover message queue full, dropping message") @@ -211,7 +211,7 @@ func (e *GlobalConsensusEngine) subscribeToPeerInfoMessages() error { return nil case e.globalPeerInfoMessageQueue <- message: return nil - case <-e.ctx.Done(): + case <-e.ShutdownSignal(): return errors.New("context cancelled") default: e.logger.Warn("peer info message queue full, dropping message") @@ -243,7 +243,7 @@ func (e *GlobalConsensusEngine) subscribeToAlertMessages() error { select { case e.globalAlertMessageQueue <- message: return nil - case <-e.ctx.Done(): + case <-e.ShutdownSignal(): return errors.New("context cancelled") default: e.logger.Warn("alert message queue full, dropping message") diff --git a/node/consensus/time/app_time_reel.go b/node/consensus/time/app_time_reel.go index a427c68..7b06be9 100644 --- a/node/consensus/time/app_time_reel.go +++ b/node/consensus/time/app_time_reel.go @@ -219,7 +219,6 @@ func (a *AppTimeReel) sendEvent(event AppEvent) { // Insert inserts an app frame header into the tree structure func (a *AppTimeReel) Insert( - ctx context.Context, frame *protobufs.AppShardFrame, ) error { // Start timing diff --git a/node/consensus/time/app_time_reel_test.go b/node/consensus/time/app_time_reel_test.go index a2483cc..5c46450 100644 --- a/node/consensus/time/app_time_reel_test.go +++ b/node/consensus/time/app_time_reel_test.go @@ -72,7 +72,7 @@ func TestAppTimeReel_BasicOperations(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) assert.NoError(t, err) // Check that genesis became head @@ -93,7 +93,7 @@ func TestAppTimeReel_BasicOperations(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) assert.NoError(t, err) // Check new head @@ -152,7 +152,7 @@ func TestAppTimeReel_WrongAddress(t *testing.T) { }, } - err = atr.Insert(ctx, wrongFrame) + err = atr.Insert(wrongFrame) assert.Error(t, err) assert.Contains(t, err.Error(), "frame address does not match reel address") } @@ -191,7 +191,7 @@ func TestAppTimeReel_Equivocation(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) assert.NoError(t, err) // Drain any events @@ -216,7 +216,7 @@ func TestAppTimeReel_Equivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) assert.NoError(t, err) // Drain any events @@ -241,7 +241,7 @@ func TestAppTimeReel_Equivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Equivocation) + err = atr.Insert(frame1Equivocation) assert.NoError(t, err) // Give the goroutine time to send the event @@ -283,7 +283,7 @@ func TestAppTimeReel_Fork(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) assert.NoError(t, err) // Insert valid frame 1 with BLS signature @@ -302,7 +302,7 @@ func TestAppTimeReel_Fork(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) assert.NoError(t, err) // Try to insert forking frame 1 with non-overlapping bitmask (different signers) @@ -322,7 +322,7 @@ func TestAppTimeReel_Fork(t *testing.T) { } // This should succeed - it's a fork, not equivocation - err = atr.Insert(ctx, frame1Fork) + err = atr.Insert(frame1Fork) assert.NoError(t, err) // Head should still be the original frame1 @@ -356,7 +356,7 @@ func TestAppTimeReel_ParentValidation(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) assert.NoError(t, err) // Insert valid frame 1 @@ -372,7 +372,7 @@ func TestAppTimeReel_ParentValidation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) assert.NoError(t, err) // Try to insert frame with a completely invalid parent selector that doesn't match any existing frame @@ -389,7 +389,7 @@ func TestAppTimeReel_ParentValidation(t *testing.T) { } // This should succeed (goes to pending since parent not found) - err = atr.Insert(ctx, badFrame) + err = atr.Insert(badFrame) assert.NoError(t, err) // Check that it's in pending frames @@ -462,7 +462,7 @@ func TestAppTimeReel_ForkDetection(t *testing.T) { // Insert chain for _, frame := range frames { - err := atr.Insert(ctx, frame) + err := atr.Insert(frame) require.NoError(t, err) } @@ -511,7 +511,7 @@ func TestAppTimeReel_ForkChoice_MoreSignatures(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -536,7 +536,7 @@ func TestAppTimeReel_ForkChoice_MoreSignatures(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Weak) + err = atr.Insert(frame1Weak) require.NoError(t, err) // Verify weak frame is initially head @@ -567,7 +567,7 @@ func TestAppTimeReel_ForkChoice_MoreSignatures(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Strong) + err = atr.Insert(frame1Strong) require.NoError(t, err) // Give the goroutine time to send the event @@ -621,7 +621,7 @@ func TestAppTimeReel_ForkChoice_NoReplacement(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -646,7 +646,7 @@ func TestAppTimeReel_ForkChoice_NoReplacement(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Strong) + err = atr.Insert(frame1Strong) require.NoError(t, err) // Verify strong frame is head @@ -677,7 +677,7 @@ func TestAppTimeReel_ForkChoice_NoReplacement(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Weak) + err = atr.Insert(frame1Weak) require.NoError(t, err) // Give some time for any potential events @@ -743,7 +743,7 @@ func TestAppTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { []byte("prover8"), }, nil) - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -780,7 +780,7 @@ func TestAppTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { []byte("prover8"), }, nil) - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) require.NoError(t, err) select { case <-eventCh: @@ -870,21 +870,21 @@ func TestAppTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { }, nil) // Insert chain A frames in order: 2A, 3A, 4A - err = atr.Insert(ctx, frame2A) + err = atr.Insert(frame2A) require.NoError(t, err) select { case <-eventCh: case <-time.After(50 * time.Millisecond): } - err = atr.Insert(ctx, frame3A) + err = atr.Insert(frame3A) require.NoError(t, err) select { case <-eventCh: case <-time.After(50 * time.Millisecond): } - err = atr.Insert(ctx, frame4A) + err = atr.Insert(frame4A) require.NoError(t, err) select { case <-eventCh: @@ -971,7 +971,7 @@ func TestAppTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { // This should work because the time reel should handle out-of-order insertion // Insert frame 4B first - err = atr.Insert(ctx, frame4B) + err = atr.Insert(frame4B) require.NoError(t, err, "inserting 4B should succeed even without its parents") select { case <-eventCh: @@ -985,7 +985,7 @@ func TestAppTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { assert.Equal(t, []byte("frame4A_output"), head.Header.Output, "should still be chain A") // Insert frame 3B - err = atr.Insert(ctx, frame3B) + err = atr.Insert(frame3B) require.NoError(t, err, "inserting 3B should succeed") select { case <-eventCh: @@ -999,7 +999,7 @@ func TestAppTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { assert.Equal(t, []byte("frame4A_output"), head.Header.Output, "should still be chain A") // Insert frame 2B - this completes the chain B lineage - err = atr.Insert(ctx, frame2B) + err = atr.Insert(frame2B) require.NoError(t, err, "inserting 2B should succeed and complete chain B") // Give time for reorganization @@ -1063,7 +1063,7 @@ func TestAppTimeReel_MultipleProvers(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Build chain with alternating provers @@ -1081,7 +1081,7 @@ func TestAppTimeReel_MultipleProvers(t *testing.T) { }, } - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) prevOutput = frame.Header.Output @@ -1292,19 +1292,19 @@ func TestAppTimeReel_ComplexForkWithOutOfOrderInsertion(t *testing.T) { // Now insert in the specified order: 1, 3', 3, 2, 3'', 2' // Step 1: Insert genesis first (needed as base) - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // Step 2: Insert frame 1 t.Log("Inserting frame 1") - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // Step 3: Insert frame 3' (should go to pending since 2' doesn't exist yet) t.Log("Inserting frame 3'") - err = atr.Insert(ctx, frame3Prime) + err = atr.Insert(frame3Prime) require.NoError(t, err) time.Sleep(50 * time.Millisecond) @@ -1314,13 +1314,13 @@ func TestAppTimeReel_ComplexForkWithOutOfOrderInsertion(t *testing.T) { // Step 4: Insert frame 3 (should also go to pending since 2 doesn't exist yet) t.Log("Inserting frame 3") - err = atr.Insert(ctx, frame3) + err = atr.Insert(frame3) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // Step 5: Insert frame 2 (this should complete the 1->2->3 chain) t.Log("Inserting frame 2") - err = atr.Insert(ctx, frame2) + err = atr.Insert(frame2) require.NoError(t, err) time.Sleep(100 * time.Millisecond) // Give more time for processing @@ -1332,13 +1332,13 @@ func TestAppTimeReel_ComplexForkWithOutOfOrderInsertion(t *testing.T) { // Step 6: Insert frame 3'' (another competing frame on 2') t.Log("Inserting frame 3''") - err = atr.Insert(ctx, frame3DoublePrime) + err = atr.Insert(frame3DoublePrime) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // Step 7: Insert frame 2' (this completes the 1->2'->3' and 1->2'->3'' chains) t.Log("Inserting frame 2'") - err = atr.Insert(ctx, frame2Prime) + err = atr.Insert(frame2Prime) require.NoError(t, err) time.Sleep(200 * time.Millisecond) // Give ample time for fork choice evaluation @@ -1399,7 +1399,7 @@ func TestAppTimeReel_TreePruning(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Build a long chain that will trigger pruning (370 frames total) @@ -1417,7 +1417,7 @@ func TestAppTimeReel_TreePruning(t *testing.T) { }, } - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) prevOutput = frame.Header.Output @@ -1487,7 +1487,7 @@ func TestAppTimeReel_TreePruningWithForks(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Build main chain for 365 frames @@ -1506,7 +1506,7 @@ func TestAppTimeReel_TreePruningWithForks(t *testing.T) { }, } - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) if i == 5 { @@ -1533,7 +1533,7 @@ func TestAppTimeReel_TreePruningWithForks(t *testing.T) { }, } - err = atr.Insert(ctx, forkFrame) + err = atr.Insert(forkFrame) require.NoError(t, err) // Continue main chain for 375 more frames to trigger deep pruning @@ -1550,7 +1550,7 @@ func TestAppTimeReel_TreePruningWithForks(t *testing.T) { }, } - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) prevOutput = frame.Header.Output @@ -1628,7 +1628,7 @@ drained: []byte("prover8"), }, nil) - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -1712,14 +1712,14 @@ drained: }, nil) // Insert weak branch first - err = atr.Insert(ctx, frame1A) + err = atr.Insert(frame1A) require.NoError(t, err) select { case <-eventCh: case <-time.After(50 * time.Millisecond): } - err = atr.Insert(ctx, frame2A) + err = atr.Insert(frame2A) require.NoError(t, err) select { case <-eventCh: @@ -1763,7 +1763,7 @@ drained: }, nil) // Insert stronger branch out of order: first 2B (goes to pending), then 1B - err = atr.Insert(ctx, frame2B) + err = atr.Insert(frame2B) require.NoError(t, err, "should accept frame 2B into pending") // Head should still be weak branch @@ -1772,7 +1772,7 @@ drained: assert.Equal(t, []byte("frame2A_output"), head.Header.Output, "head should still be weak branch") // Now insert 1B, which should complete the strong branch and trigger fork choice - err = atr.Insert(ctx, frame1B) + err = atr.Insert(frame1B) require.NoError(t, err) // Give time for fork choice to process @@ -1871,7 +1871,7 @@ func TestAppTimeReel_ForkEventsWithReplay(t *testing.T) { // Insert initial chain for _, frame := range []*protobufs.AppShardFrame{genesis, frame1, frame2, frame3} { - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) time.Sleep(10 * time.Millisecond) // Allow events to be sent } @@ -1930,7 +1930,7 @@ func TestAppTimeReel_ForkEventsWithReplay(t *testing.T) { // Insert stronger fork - this should trigger a reorganization for _, frame := range []*protobufs.AppShardFrame{frame2Prime, frame3Prime, frame4Prime} { - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // Allow events to propagate } @@ -2017,7 +2017,7 @@ func TestAppTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Insert valid frame 1 @@ -2036,7 +2036,7 @@ func TestAppTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Valid) + err = atr.Insert(frame1Valid) require.NoError(t, err) // Test Case 1: Complete overlap - same signers, different content @@ -2055,7 +2055,7 @@ func TestAppTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Equivocation1) + err = atr.Insert(frame1Equivocation1) assert.NoError(t, err) // Test Case 2: Partial overlap - some same signers @@ -2074,7 +2074,7 @@ func TestAppTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Equivocation2) + err = atr.Insert(frame1Equivocation2) assert.NoError(t, err) // Test Case 3: No overlap - should be allowed (fork) @@ -2093,7 +2093,7 @@ func TestAppTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Fork) + err = atr.Insert(frame1Fork) assert.NoError(t, err, "should allow fork with no overlapping signers") // Wait for events to be processed @@ -2163,7 +2163,7 @@ func TestAppTimeReel_ProverRegistryForkChoice(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -2213,7 +2213,7 @@ func TestAppTimeReel_ProverRegistryForkChoice(t *testing.T) { } // Insert frame with wrong prover first - err = atr.Insert(ctx, frame1b) + err = atr.Insert(frame1b) require.NoError(t, err) // Should become head initially @@ -2226,7 +2226,7 @@ func TestAppTimeReel_ProverRegistryForkChoice(t *testing.T) { } // Insert frame with correct prover - err = atr.Insert(ctx, frame1a) + err = atr.Insert(frame1a) require.NoError(t, err) // Should trigger fork choice and frame1a should win @@ -2292,7 +2292,7 @@ func TestAppTimeReel_ProverRegistryWithOrderedProvers(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Create three competing frames with different provers from the ordered list @@ -2358,7 +2358,7 @@ func TestAppTimeReel_ProverRegistryWithOrderedProvers(t *testing.T) { // Insert in reverse order of preference t.Logf("Inserting frame1a with prover: %s", frame1a.Header.Prover) - err = atr.Insert(ctx, frame1a) + err = atr.Insert(frame1a) require.NoError(t, err) // Drain events for frame1a @@ -2381,7 +2381,7 @@ func TestAppTimeReel_ProverRegistryWithOrderedProvers(t *testing.T) { t.Logf("Head after frame1a: %s", head1.Header.Output) t.Logf("Inserting frame1b with prover: %s", frame1b.Header.Prover) - err = atr.Insert(ctx, frame1b) + err = atr.Insert(frame1b) require.NoError(t, err) drainEvents("frame1b") @@ -2391,7 +2391,7 @@ func TestAppTimeReel_ProverRegistryWithOrderedProvers(t *testing.T) { t.Logf("Head after frame1b: %s", head2.Header.Output) t.Logf("Inserting frame1c with prover: %s", frame1c.Header.Prover) - err = atr.Insert(ctx, frame1c) + err = atr.Insert(frame1c) require.NoError(t, err) drainEvents("frame1c") diff --git a/node/consensus/time/global_time_reel.go b/node/consensus/time/global_time_reel.go index 99f2707..25bb716 100644 --- a/node/consensus/time/global_time_reel.go +++ b/node/consensus/time/global_time_reel.go @@ -204,6 +204,7 @@ func (g *GlobalTimeReel) Start( // Warm the in-memory tree/cache from store. if err := g.bootstrapFromStore(); err != nil { + g.logger.Error("could not bootstrap from store", zap.Error(err)) ctx.Throw(err) return } @@ -234,7 +235,6 @@ func (g *GlobalTimeReel) sendEvent(event GlobalEvent) { // Insert inserts a global frame header into the tree structure (non-blocking) func (g *GlobalTimeReel) Insert( - ctx context.Context, frame *protobufs.GlobalFrame, ) error { // Start timing diff --git a/node/consensus/time/global_time_reel_equivocation_test.go b/node/consensus/time/global_time_reel_equivocation_test.go index 10dfee2..e459162 100644 --- a/node/consensus/time/global_time_reel_equivocation_test.go +++ b/node/consensus/time/global_time_reel_equivocation_test.go @@ -66,7 +66,7 @@ func TestGlobalTimeReel_MassiveEquivocationForkChoice(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Build chain A: 200 frames with bitmask 0b11100011 (signers 0,1,5,6,7) @@ -85,7 +85,7 @@ func TestGlobalTimeReel_MassiveEquivocationForkChoice(t *testing.T) { }, } - err = atr.Insert(ctx, frameA) + err = atr.Insert(frameA) require.NoError(t, err) prevOutput = frameA.Header.Output } @@ -114,7 +114,7 @@ func TestGlobalTimeReel_MassiveEquivocationForkChoice(t *testing.T) { }, } - err = atr.Insert(ctx, frameB) + err = atr.Insert(frameB) // Should now succeed even with equivocation assert.NoError(t, err, "Should accept frame despite equivocation at frame %d", i) prevOutput = frameB.Header.Output @@ -175,7 +175,7 @@ func TestGlobalTimeReel_EquivocationWithForkChoice(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -198,7 +198,7 @@ func TestGlobalTimeReel_EquivocationWithForkChoice(t *testing.T) { }, } - err = atr.Insert(ctx, frame1A) + err = atr.Insert(frame1A) require.NoError(t, err) // Drain new head event @@ -222,7 +222,7 @@ func TestGlobalTimeReel_EquivocationWithForkChoice(t *testing.T) { } // This should succeed now, but generate an equivocation event - err = atr.Insert(ctx, frame1B) + err = atr.Insert(frame1B) assert.NoError(t, err) // Wait for equivocation event @@ -267,7 +267,7 @@ func TestGlobalTimeReel_NonOverlappingForks(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Build two non-overlapping chains @@ -289,7 +289,7 @@ func TestGlobalTimeReel_NonOverlappingForks(t *testing.T) { }, }, } - err = atr.Insert(ctx, frameA) + err = atr.Insert(frameA) require.NoError(t, err) prevOutputA = frameA.Header.Output } @@ -309,7 +309,7 @@ func TestGlobalTimeReel_NonOverlappingForks(t *testing.T) { }, }, } - err = atr.Insert(ctx, frameB) + err = atr.Insert(frameB) require.NoError(t, err, "non-overlapping fork should be allowed") prevOutputB = frameB.Header.Output } diff --git a/node/consensus/time/global_time_reel_test.go b/node/consensus/time/global_time_reel_test.go index 7c318fc..aecc4ff 100644 --- a/node/consensus/time/global_time_reel_test.go +++ b/node/consensus/time/global_time_reel_test.go @@ -44,7 +44,7 @@ func TestGlobalTimeReel_BasicOperations(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) assert.NoError(t, err) // Check that genesis became head @@ -66,7 +66,7 @@ func TestGlobalTimeReel_BasicOperations(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) assert.NoError(t, err) // Check new head @@ -133,7 +133,7 @@ func TestGlobalTimeReel_Equivocation(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) assert.NoError(t, err) // Drain any events @@ -156,7 +156,7 @@ func TestGlobalTimeReel_Equivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) assert.NoError(t, err) // Drain any events @@ -182,7 +182,7 @@ func TestGlobalTimeReel_Equivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Equivocation) + err = atr.Insert(frame1Equivocation) assert.NoError(t, err) // Give the goroutine time to send the event @@ -223,7 +223,7 @@ func TestGlobalTimeReel_Fork(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) assert.NoError(t, err) // Insert valid frame 1 with BLS signature @@ -240,7 +240,7 @@ func TestGlobalTimeReel_Fork(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) assert.NoError(t, err) assertLatestNumOutput(t, s, 1, frame1.Header.Output) assertStoreNumOutput(t, s, 1, frame1.Header.Output) @@ -260,7 +260,7 @@ func TestGlobalTimeReel_Fork(t *testing.T) { } // This should succeed - it's a fork, not equivocation - err = atr.Insert(ctx, frame1Fork) + err = atr.Insert(frame1Fork) assert.NoError(t, err) time.Sleep(50 * time.Millisecond) @@ -294,7 +294,7 @@ func TestGlobalTimeReel_ParentValidation(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) assert.NoError(t, err) // Insert valid frame 1 @@ -308,7 +308,7 @@ func TestGlobalTimeReel_ParentValidation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) assert.NoError(t, err) assertLatestNumOutput(t, s, 1, frame1.Header.Output) @@ -324,7 +324,7 @@ func TestGlobalTimeReel_ParentValidation(t *testing.T) { } // This should succeed (goes to pending since parent not found) - err = atr.Insert(ctx, badFrame) + err = atr.Insert(badFrame) assert.NoError(t, err) assertNoGlobalAt(t, s, 2) @@ -393,7 +393,7 @@ func TestGlobalTimeReel_ForkDetection(t *testing.T) { // Insert chain for _, frame := range frames { - err := atr.Insert(ctx, frame) + err := atr.Insert(frame) require.NoError(t, err) } @@ -442,7 +442,7 @@ func TestGlobalTimeReel_ForkChoice_MoreSignatures(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -465,7 +465,7 @@ func TestGlobalTimeReel_ForkChoice_MoreSignatures(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Weak) + err = atr.Insert(frame1Weak) require.NoError(t, err) // Verify weak frame is initially head @@ -495,7 +495,7 @@ func TestGlobalTimeReel_ForkChoice_MoreSignatures(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Strong) + err = atr.Insert(frame1Strong) require.NoError(t, err) // Verify strong frame is now head @@ -554,7 +554,7 @@ func TestGlobalTimeReel_ForkChoice_NoReplacement(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -577,7 +577,7 @@ func TestGlobalTimeReel_ForkChoice_NoReplacement(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Strong) + err = atr.Insert(frame1Strong) require.NoError(t, err) // Verify strong frame is head @@ -607,7 +607,7 @@ func TestGlobalTimeReel_ForkChoice_NoReplacement(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Weak) + err = atr.Insert(frame1Weak) require.NoError(t, err) // Give some time for any potential events @@ -658,7 +658,7 @@ func TestGlobalTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -681,7 +681,7 @@ func TestGlobalTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { }, } - err = atr.Insert(ctx, frame1) + err = atr.Insert(frame1) require.NoError(t, err) select { case <-eventCh: @@ -729,21 +729,21 @@ func TestGlobalTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { } // Insert chain A frames in order: 2A, 3A, 4A - err = atr.Insert(ctx, frame2A) + err = atr.Insert(frame2A) require.NoError(t, err) select { case <-eventCh: case <-time.After(50 * time.Millisecond): } - err = atr.Insert(ctx, frame3A) + err = atr.Insert(frame3A) require.NoError(t, err) select { case <-eventCh: case <-time.After(50 * time.Millisecond): } - err = atr.Insert(ctx, frame4A) + err = atr.Insert(frame4A) require.NoError(t, err) select { case <-eventCh: @@ -801,7 +801,7 @@ func TestGlobalTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { // This should work because the time reel should handle out-of-order insertion // Insert frame 4B first - err = atr.Insert(ctx, frame4B) + err = atr.Insert(frame4B) require.NoError(t, err, "inserting 4B should succeed even without its parents") select { case <-eventCh: @@ -815,7 +815,7 @@ func TestGlobalTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { assert.Equal(t, []byte("frame4A_output"), head.Header.Output, "should still be chain A") // Insert frame 3B - err = atr.Insert(ctx, frame3B) + err = atr.Insert(frame3B) require.NoError(t, err, "inserting 3B should succeed") select { case <-eventCh: @@ -829,7 +829,7 @@ func TestGlobalTimeReel_DeepForkChoice_ReverseInsertion(t *testing.T) { assert.Equal(t, []byte("frame4A_output"), head.Header.Output, "should still be chain A") // Insert frame 2B - this completes the chain B lineage - err = atr.Insert(ctx, frame2B) + err = atr.Insert(frame2B) require.NoError(t, err, "inserting 2B should succeed and complete chain B") // Give time for reorganization @@ -881,7 +881,7 @@ func TestGlobalTimeReel_TreePruning(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Build a long chain that will trigger pruning (370 frames total) @@ -897,7 +897,7 @@ func TestGlobalTimeReel_TreePruning(t *testing.T) { }, } - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) prevOutput = frame.Header.Output @@ -964,7 +964,7 @@ func TestGlobalTimeReel_TreePruningWithForks(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Build main chain for 365 frames @@ -981,7 +981,7 @@ func TestGlobalTimeReel_TreePruningWithForks(t *testing.T) { }, } - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) if i == 5 { @@ -1006,7 +1006,7 @@ func TestGlobalTimeReel_TreePruningWithForks(t *testing.T) { }, } - err = atr.Insert(ctx, forkFrame) + err = atr.Insert(forkFrame) require.NoError(t, err) // Continue main chain for 375 more frames to trigger deep pruning @@ -1021,7 +1021,7 @@ func TestGlobalTimeReel_TreePruningWithForks(t *testing.T) { }, } - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) prevOutput = frame.Header.Output @@ -1084,7 +1084,7 @@ loop: }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Drain genesis event @@ -1140,14 +1140,14 @@ loop: } // Insert weak branch first - err = atr.Insert(ctx, frame1A) + err = atr.Insert(frame1A) require.NoError(t, err) select { case <-eventCh: case <-time.After(50 * time.Millisecond): } - err = atr.Insert(ctx, frame2A) + err = atr.Insert(frame2A) require.NoError(t, err) select { case <-eventCh: @@ -1178,7 +1178,7 @@ loop: frame2B.Header.ParentSelector = computeGlobalPoseidonHash(frame1B.Header.Output) // Insert stronger branch out of order: first 2B (goes to pending), then 1B - err = atr.Insert(ctx, frame2B) + err = atr.Insert(frame2B) require.NoError(t, err, "should accept frame 2B into pending") // Head should still be weak branch @@ -1187,7 +1187,7 @@ loop: assert.Equal(t, []byte("frame2A_output"), head.Header.Output, "head should still be weak branch") // Now insert 1B, which should complete the strong branch and trigger fork choice - err = atr.Insert(ctx, frame1B) + err = atr.Insert(frame1B) require.NoError(t, err) // Give time for fork choice to process @@ -1277,7 +1277,7 @@ func TestGlobalTimeReel_ForkEventsWithReplay(t *testing.T) { // Insert initial chain for _, frame := range []*protobufs.GlobalFrame{genesis, frame1, frame2, frame3} { - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) time.Sleep(10 * time.Millisecond) // Allow events to be sent } @@ -1330,7 +1330,7 @@ func TestGlobalTimeReel_ForkEventsWithReplay(t *testing.T) { // Insert stronger fork - this should trigger a reorganization for _, frame := range []*protobufs.GlobalFrame{frame2Prime, frame3Prime, frame4Prime} { - err = atr.Insert(ctx, frame) + err = atr.Insert(frame) require.NoError(t, err) time.Sleep(50 * time.Millisecond) // Allow events to propagate } @@ -1414,7 +1414,7 @@ func TestGlobalTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, genesis) + err = atr.Insert(genesis) require.NoError(t, err) // Insert valid frame 1 @@ -1431,7 +1431,7 @@ func TestGlobalTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Valid) + err = atr.Insert(frame1Valid) require.NoError(t, err) // Test Case 1: Complete overlap - same signers, different content @@ -1448,7 +1448,7 @@ func TestGlobalTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Equivocation1) + err = atr.Insert(frame1Equivocation1) assert.NoError(t, err) // Test Case 2: Partial overlap - some same signers @@ -1465,7 +1465,7 @@ func TestGlobalTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Equivocation2) + err = atr.Insert(frame1Equivocation2) assert.NoError(t, err) // Test Case 3: No overlap - should be allowed (fork) @@ -1482,7 +1482,7 @@ func TestGlobalTimeReel_ComprehensiveEquivocation(t *testing.T) { }, } - err = atr.Insert(ctx, frame1Fork) + err = atr.Insert(frame1Fork) assert.NoError(t, err, "should allow fork with no overlapping signers") // Wait for events to be processed @@ -1565,7 +1565,7 @@ func TestGlobalTimeReel_NonArchive_SnapForward_WhenGapExceeds360(t *testing.T) { ParentSelector: []byte("unknown_parent"), }, } - require.NoError(t, tr.Insert(context.Background(), future)) + require.NoError(t, tr.Insert(future)) newHead, err := tr.GetHead() require.NoError(t, err) @@ -1588,7 +1588,7 @@ func TestGlobalTimeReel_NonArchive_PrunesStore_AsHeadAdvances(t *testing.T) { var prev *protobufs.GlobalFrame for n := uint64(1); n <= uint64(maxGlobalTreeDepth)+25; n++ { f := createGlobalFrame(n, prev, []byte(fmt.Sprintf("out%d", n))) - require.NoError(t, tr.Insert(context.Background(), f)) + require.NoError(t, tr.Insert(f)) prev = f } @@ -1624,7 +1624,7 @@ func TestGlobalTimeReel_NonArchive_PendingResolves_WhenParentArrives(t *testing. var prev *protobufs.GlobalFrame for n := uint64(90); n <= 99; n++ { f := createGlobalFrame(n, prev, []byte(fmt.Sprintf("base_%d", n))) - require.NoError(t, tr.Insert(ctx, f)) + require.NoError(t, tr.Insert(f)) prev = f } @@ -1640,7 +1640,7 @@ func TestGlobalTimeReel_NonArchive_PendingResolves_WhenParentArrives(t *testing. ParentSelector: computeGlobalPoseidonHash(out100), // points to future parent 100 }, } - require.NoError(t, tr.Insert(ctx, child101)) + require.NoError(t, tr.Insert(child101)) // Should appear in pending (under the selector for out100). pending := tr.GetPendingFrames() @@ -1655,7 +1655,7 @@ func TestGlobalTimeReel_NonArchive_PendingResolves_WhenParentArrives(t *testing. ParentSelector: computeGlobalPoseidonHash([]byte("base_99")), }, } - require.NoError(t, tr.Insert(ctx, parent100)) + require.NoError(t, tr.Insert(parent100)) // Give a beat for pending processing. time.Sleep(25 * time.Millisecond) @@ -1711,7 +1711,7 @@ drain: ParentSelector: []byte("unknown"), }, } - require.NoError(t, tr.Insert(ctx, snapTip)) + require.NoError(t, tr.Insert(snapTip)) // We should get a fork select { @@ -1734,7 +1734,7 @@ drain: ParentSelector: computeGlobalPoseidonHash(prev.Header.Output), }, } - require.NoError(t, tr.Insert(ctx, f)) + require.NoError(t, tr.Insert(f)) prev = f // Expect exactly one new-head event per append, and zero fork events. @@ -1826,7 +1826,7 @@ func buildAndPersistChain(t *testing.T, s *store.PebbleClockStore, start, end ui var prev *protobufs.GlobalFrame for n := start; n <= end; n++ { f := createGlobalFrame(n, prev, []byte(fmt.Sprintf("out%d", n))) - require.NoError(t, reel.Insert(context.Background(), f)) + require.NoError(t, reel.Insert(f)) prev = f } } diff --git a/node/consensus/time/simple_equivocation_test.go b/node/consensus/time/simple_equivocation_test.go index f7442a8..8dc558c 100644 --- a/node/consensus/time/simple_equivocation_test.go +++ b/node/consensus/time/simple_equivocation_test.go @@ -33,7 +33,7 @@ func TestGlobalTimeReel_SimpleEquivocation(t *testing.T) { }, } - err = globalReel.Insert(context.Background(), genesis) + err = globalReel.Insert(genesis) require.NoError(t, err) parentSelector := computeGlobalPoseidonHash(genesis.Header.Output) @@ -50,7 +50,7 @@ func TestGlobalTimeReel_SimpleEquivocation(t *testing.T) { }, } - err = globalReel.Insert(context.Background(), frame1A) + err = globalReel.Insert(frame1A) require.NoError(t, err) // Insert frame 1B with signers 2,3,4,5,6,7 (bitmask 0b11111100) @@ -66,7 +66,7 @@ func TestGlobalTimeReel_SimpleEquivocation(t *testing.T) { }, } - err = globalReel.Insert(context.Background(), frame1B) + err = globalReel.Insert(frame1B) require.NoError(t, err, "Should accept frame despite equivocation") // Check equivocators are tracked diff --git a/node/consensus/voting/voting_aggregator.go b/node/consensus/voting/voting_aggregator.go index 1f163d8..a66b4d6 100644 --- a/node/consensus/voting/voting_aggregator.go +++ b/node/consensus/voting/voting_aggregator.go @@ -26,6 +26,7 @@ func NewAppShardVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[ func NewAppShardVoteAggregator[PeerIDT models.Unique]( logger consensus.TraceLogger, + filter []byte, committee consensus.DynamicCommittee, voteAggregationDistributor *pubsub.VoteAggregationDistributor[ *protobufs.AppShardFrame, @@ -37,6 +38,7 @@ func NewAppShardVoteAggregator[PeerIDT models.Unique]( *protobufs.ProposalVote, PeerIDT, ], + onQCCreated consensus.OnQuorumCertificateCreated, currentRank uint64, ) ( consensus.VoteAggregator[*protobufs.AppShardFrame, *protobufs.ProposalVote], @@ -46,10 +48,11 @@ func NewAppShardVoteAggregator[PeerIDT models.Unique]( *protobufs.AppShardFrame, *protobufs.ProposalVote, PeerIDT, - ](committee, func(qc models.QuorumCertificate) {}) + ](committee, onQCCreated) createCollectorFactoryMethod := votecollector.NewStateMachineFactory( logger, + filter, voteAggregationDistributor, votecollector.VerifyingVoteProcessorFactory[ *protobufs.AppShardFrame, @@ -86,10 +89,16 @@ func NewAppShardTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistri func NewAppShardTimeoutAggregator[PeerIDT models.Unique]( logger consensus.TraceLogger, + filter []byte, committee consensus.DynamicCommittee, consensusVerifier consensus.Verifier[*protobufs.ProposalVote], signatureAggregator consensus.SignatureAggregator, timeoutAggregationDistributor *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote], + votingProvider consensus.VotingProvider[ + *protobufs.AppShardFrame, + *protobufs.ProposalVote, + PeerIDT, + ], currentRank uint64, ) (consensus.TimeoutAggregator[*protobufs.ProposalVote], error) { // initialize the Validator @@ -104,10 +113,12 @@ func NewAppShardTimeoutAggregator[PeerIDT models.Unique]( PeerIDT, ]( logger, + filter, signatureAggregator, timeoutAggregationDistributor, committee, validator, + votingProvider, []byte("appshardtimeout"), ) @@ -155,6 +166,7 @@ func NewGlobalVoteAggregator[PeerIDT models.Unique]( *protobufs.ProposalVote, PeerIDT, ], + onQCCreated consensus.OnQuorumCertificateCreated, currentRank uint64, ) ( consensus.VoteAggregator[*protobufs.GlobalFrame, *protobufs.ProposalVote], @@ -164,10 +176,11 @@ func NewGlobalVoteAggregator[PeerIDT models.Unique]( *protobufs.GlobalFrame, *protobufs.ProposalVote, PeerIDT, - ](committee, func(qc models.QuorumCertificate) {}) + ](committee, onQCCreated) createCollectorFactoryMethod := votecollector.NewStateMachineFactory( logger, + nil, voteAggregationDistributor, votecollector.VerifyingVoteProcessorFactory[ *protobufs.GlobalFrame, @@ -208,6 +221,11 @@ func NewGlobalTimeoutAggregator[PeerIDT models.Unique]( consensusVerifier consensus.Verifier[*protobufs.ProposalVote], signatureAggregator consensus.SignatureAggregator, timeoutAggregationDistributor *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote], + votingProvider consensus.VotingProvider[ + *protobufs.GlobalFrame, + *protobufs.ProposalVote, + PeerIDT, + ], currentRank uint64, ) (consensus.TimeoutAggregator[*protobufs.ProposalVote], error) { // initialize the Validator @@ -222,10 +240,12 @@ func NewGlobalTimeoutAggregator[PeerIDT models.Unique]( PeerIDT, ]( logger, + nil, signatureAggregator, timeoutAggregationDistributor, committee, validator, + votingProvider, []byte("globaltimeout"), ) diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index e7afe7a..312add8 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -12,6 +12,7 @@ import ( "math/big" "math/bits" "net" + "runtime/debug" "sync" "sync/atomic" "time" @@ -1018,6 +1019,7 @@ func (b *BlossomSub) subscribeHandler( b.logger.Error( "message handler panicked, recovering", zap.Any("panic", r), + zap.String("stack", string(debug.Stack())), ) } }()