From 59c2d7694efc21fda98a2e1616578ef95024c1a6 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Sat, 15 Nov 2025 01:38:41 -0600 Subject: [PATCH] v2.1.0.8 --- config/version.go | 2 +- consensus/consensus_voting.go | 5 +- .../counters/strict_monotonic_counter.go | 4 +- consensus/mocks/voting_provider.go | 9 +- .../timeoutaggregator/timeout_aggregator.go | 6 +- .../timeoutcollector/timeout_collector.go | 6 + .../timeoutcollector/timeout_processor.go | 46 ++++++- crates/bls48581/src/bls48581/bls256.rs | 44 ++++++ crates/bls48581/src/lib.rs | 34 +++++ crates/bls48581/src/lib.udl | 1 + node/consensus/app/app_consensus_engine.go | 19 ++- .../app/consensus_voting_provider.go | 26 +++- .../global/consensus_voting_provider.go | 27 +++- .../global/global_consensus_engine.go | 60 +++++++- node/consensus/global/message_subscription.go | 24 ++-- node/consensus/global/message_validation.go | 28 ++-- node/store/clock.go | 129 ++++++++++++++++++ node/store/constants.go | 6 + node/store/pebble.go | 2 +- types/crypto/keys.go | 6 + types/mocks/bls_constructor.go | 11 ++ types/mocks/clock_store.go | 26 ++++ types/store/clock.go | 8 ++ 23 files changed, 474 insertions(+), 55 deletions(-) diff --git a/config/version.go b/config/version.go index d7a6a2e..a5dd651 100644 --- a/config/version.go +++ b/config/version.go @@ -43,7 +43,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x07 + return 0x08 } func GetRCNumber() byte { diff --git a/consensus/consensus_voting.go b/consensus/consensus_voting.go index 593eaba..528a2ca 100644 --- a/consensus/consensus_voting.go +++ b/consensus/consensus_voting.go @@ -32,12 +32,13 @@ type VotingProvider[ state *models.State[StateT], aggregatedSignature models.AggregatedSignature, ) (models.QuorumCertificate, error) - // Produces a timeout certificate + // Produces a timeout certificate. Assumes VotingProvider will reorganize + // latestQuorumCertificateRanks in signer order. FinalizeTimeout( ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, - latestQuorumCertificateRanks []uint64, + latestQuorumCertificateRanks []TimeoutSignerInfo, aggregatedSignature models.AggregatedSignature, ) (models.TimeoutCertificate, error) } diff --git a/consensus/counters/strict_monotonic_counter.go b/consensus/counters/strict_monotonic_counter.go index 93b5023..6af0929 100644 --- a/consensus/counters/strict_monotonic_counter.go +++ b/consensus/counters/strict_monotonic_counter.go @@ -5,8 +5,8 @@ import "sync/atomic" // StrictMonotonicCounter is a helper struct which implements a strict monotonic // counter. StrictMonotonicCounter is implemented using atomic operations and // doesn't allow to set a value which is lower or equal to the already stored -// one. The counter is implemented solely with non-blocking atomic operations -// for concurrency safety. +// ne. The counter is implemented solely with non-blocking atomic operations for +// concurrency safety. type StrictMonotonicCounter struct { atomicCounter uint64 } diff --git a/consensus/mocks/voting_provider.go b/consensus/mocks/voting_provider.go index 11e2688..8aff9bd 100644 --- a/consensus/mocks/voting_provider.go +++ b/consensus/mocks/voting_provider.go @@ -6,6 +6,7 @@ import ( context "context" mock "github.com/stretchr/testify/mock" + "source.quilibrium.com/quilibrium/monorepo/consensus" models "source.quilibrium.com/quilibrium/monorepo/consensus/models" ) @@ -45,7 +46,7 @@ func (_m *VotingProvider[StateT, VoteT, PeerIDT]) FinalizeQuorumCertificate(ctx } // FinalizeTimeout provides a mock function with given fields: ctx, rank, latestQuorumCertificate, latestQuorumCertificateRanks, aggregatedSignature -func (_m *VotingProvider[StateT, VoteT, PeerIDT]) FinalizeTimeout(ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, latestQuorumCertificateRanks []uint64, aggregatedSignature models.AggregatedSignature) (models.TimeoutCertificate, error) { +func (_m *VotingProvider[StateT, VoteT, PeerIDT]) FinalizeTimeout(ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, latestQuorumCertificateRanks []consensus.TimeoutSignerInfo, aggregatedSignature models.AggregatedSignature) (models.TimeoutCertificate, error) { ret := _m.Called(ctx, rank, latestQuorumCertificate, latestQuorumCertificateRanks, aggregatedSignature) if len(ret) == 0 { @@ -54,10 +55,10 @@ func (_m *VotingProvider[StateT, VoteT, PeerIDT]) FinalizeTimeout(ctx context.Co var r0 models.TimeoutCertificate var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, models.QuorumCertificate, []uint64, models.AggregatedSignature) (models.TimeoutCertificate, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint64, models.QuorumCertificate, []consensus.TimeoutSignerInfo, models.AggregatedSignature) (models.TimeoutCertificate, error)); ok { return rf(ctx, rank, latestQuorumCertificate, latestQuorumCertificateRanks, aggregatedSignature) } - if rf, ok := ret.Get(0).(func(context.Context, uint64, models.QuorumCertificate, []uint64, models.AggregatedSignature) models.TimeoutCertificate); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint64, models.QuorumCertificate, []consensus.TimeoutSignerInfo, models.AggregatedSignature) models.TimeoutCertificate); ok { r0 = rf(ctx, rank, latestQuorumCertificate, latestQuorumCertificateRanks, aggregatedSignature) } else { if ret.Get(0) != nil { @@ -65,7 +66,7 @@ func (_m *VotingProvider[StateT, VoteT, PeerIDT]) FinalizeTimeout(ctx context.Co } } - if rf, ok := ret.Get(1).(func(context.Context, uint64, models.QuorumCertificate, []uint64, models.AggregatedSignature) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, uint64, models.QuorumCertificate, []consensus.TimeoutSignerInfo, models.AggregatedSignature) error); ok { r1 = rf(ctx, rank, latestQuorumCertificate, latestQuorumCertificateRanks, aggregatedSignature) } else { r1 = ret.Error(1) diff --git a/consensus/timeoutaggregator/timeout_aggregator.go b/consensus/timeoutaggregator/timeout_aggregator.go index 3bd2698..f00c25c 100644 --- a/consensus/timeoutaggregator/timeout_aggregator.go +++ b/consensus/timeoutaggregator/timeout_aggregator.go @@ -161,7 +161,11 @@ func (t *TimeoutAggregator[VoteT]) processQueuedTimeout( timeoutState.Rank, err) } - t.tracer.Trace("adding timeout to collector") + t.tracer.Trace( + "adding timeout to collector", + consensus.Uint64Param("timeout_rank", timeoutState.Rank), + consensus.IdentityParam("timeout_voter", (*timeoutState.Vote).Identity()), + ) err = collector.AddTimeout(timeoutState) if err != nil { return fmt.Errorf("could not process TO for rank %d: %w", diff --git a/consensus/timeoutcollector/timeout_collector.go b/consensus/timeoutcollector/timeout_collector.go index d972cb0..9ab08ab 100644 --- a/consensus/timeoutcollector/timeout_collector.go +++ b/consensus/timeoutcollector/timeout_collector.go @@ -97,6 +97,12 @@ func (c *TimeoutCollector[VoteT]) processTimeout( err := c.processor.Process(timeout) if err != nil { if invalidTimeoutErr, ok := models.AsInvalidTimeoutError[VoteT](err); ok { + c.tracer.Error( + "invalid timeout detected", + err, + consensus.Uint64Param("timeout_rank", timeout.Rank), + consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), + ) c.notifier.OnInvalidTimeoutDetected(*invalidTimeoutErr) return nil } diff --git a/consensus/timeoutcollector/timeout_processor.go b/consensus/timeoutcollector/timeout_processor.go index c20466b..ed2eadc 100644 --- a/consensus/timeoutcollector/timeout_processor.go +++ b/consensus/timeoutcollector/timeout_processor.go @@ -128,6 +128,12 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process( timeout *models.TimeoutState[VoteT], ) error { if p.rank != timeout.Rank { + p.tracer.Trace( + "received incompatible timeout", + consensus.Uint64Param("processor_rank", p.rank), + consensus.Uint64Param("timeout_rank", timeout.Rank), + consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), + ) return fmt.Errorf( "received incompatible timeout, expected %d got %d: %w", p.rank, @@ -137,17 +143,43 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process( } if p.tcTracker.Done() { + p.tracer.Trace( + "timeout tracker done", + consensus.Uint64Param("processor_rank", p.rank), + consensus.Uint64Param("timeout_rank", timeout.Rank), + consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), + ) return nil } err := p.validateTimeout(timeout) if err != nil { + p.tracer.Error( + "timeout validation failed", + err, + consensus.Uint64Param("processor_rank", p.rank), + consensus.Uint64Param("timeout_rank", timeout.Rank), + consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), + ) return fmt.Errorf("validating timeout failed: %w", err) } if p.tcTracker.Done() { + p.tracer.Trace( + "timeout tracker done", + consensus.Uint64Param("processor_rank", p.rank), + consensus.Uint64Param("timeout_rank", timeout.Rank), + consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), + ) return nil } + p.tracer.Trace( + "adding timeout to signature aggregator", + consensus.Uint64Param("processor_rank", p.rank), + consensus.Uint64Param("timeout_rank", timeout.Rank), + consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), + ) + // CAUTION: for correctness it is critical that we update the // `newestQCTracker` first, _before_ we add the TO's signature to // `sigAggregator`. Reasoning: @@ -166,6 +198,13 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) Process( timeout.LatestQuorumCertificate.GetRank(), ) if err != nil { + p.tracer.Error( + "timeout signature could not be added", + err, + consensus.Uint64Param("processor_rank", p.rank), + consensus.Uint64Param("timeout_rank", timeout.Rank), + consensus.IdentityParam("timeout_voter", (*timeout.Vote).Identity()), + ) if models.IsInvalidSignerError(err) { return models.NewInvalidTimeoutErrorf( timeout, @@ -361,11 +400,6 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) buildTC() ( ) } - newestQCRanks := make([]uint64, 0, len(signersData)) - for _, data := range signersData { - newestQCRanks = append(newestQCRanks, data.NewestQCRank) - } - // Note that `newestQC` can have a larger rank than any of the ranks included // in `newestQCRanks`. This is because for a TO currently being processes // following two operations are executed in separate steps: @@ -378,7 +412,7 @@ func (p *TimeoutProcessor[StateT, VoteT, PeerIDT]) buildTC() ( context.TODO(), p.rank, *newestQC, - newestQCRanks, + signersData, aggregatedSig, ) if err != nil { diff --git a/crates/bls48581/src/bls48581/bls256.rs b/crates/bls48581/src/bls48581/bls256.rs index 35087d4..7b553e4 100644 --- a/crates/bls48581/src/bls48581/bls256.rs +++ b/crates/bls48581/src/bls48581/bls256.rs @@ -168,3 +168,47 @@ pub fn core_verify(sig: &[u8], m: &[u8], w: &[u8]) -> isize { } BLS_FAIL } + +pub fn core_msig_verify(sig: &[u8], ms: &Vec>, ws: &Vec>) -> isize { + let mut d = ECP::frombytes(&sig); + if !pair8::g1member(&d) { + return BLS_FAIL; + } + d.neg(); + + let mut pks = Vec::::new(); + for kw in ws { + let pk = ECP8::frombytes(&kw); + if !pair8::g2member(&pk) { + return BLS_FAIL; + } + pks.push(pk); + } + + let mut hms = Vec::::new(); + for m in ms { + let hm = bls_hash_to_point(m); + hms.push(hm); + } + + // Use new multi-pairing mechanism + let mut r = pair8::initmp(); + // pair8::another(&mut r,&g,&d); + unsafe { + pair8::another_pc(&mut r, &G2_TAB, &d); + } + for (pk, hm) in pks.iter().zip(hms) { + pair8::another(&mut r, &pk, &hm); + } + let mut v = pair8::miller(&mut r); + + //.. or alternatively + // let g = ECP8::generator(); + // let mut v = pair8::ate2(&g, &d, &pk, &hm); + + v = pair8::fexp(&v); + if v.isunity() { + return BLS_OK; + } + BLS_FAIL +} diff --git a/crates/bls48581/src/lib.rs b/crates/bls48581/src/lib.rs index 8a363e5..a4f6cb2 100644 --- a/crates/bls48581/src/lib.rs +++ b/crates/bls48581/src/lib.rs @@ -551,6 +551,18 @@ pub fn bls_verify(pk: &[u8], sig: &[u8], msg: &[u8], domain: &[u8]) -> bool { is_sig_ok == bls48581::bls256::BLS_OK } +pub fn bls_verify_msig_mmsg(pks: &Vec>, sig: &[u8], msgs: &Vec>, domain: &[u8]) -> bool { + let mut fullmsgs = Vec::>::new(); + for msg in msgs { + let mut fullmsg = domain.to_vec(); + fullmsg.extend_from_slice(&msg); + fullmsgs.push(fullmsg); + } + + let is_sig_ok = bls48581::bls256::core_msig_verify(&sig, &fullmsgs, &pks); + is_sig_ok == bls48581::bls256::BLS_OK +} + pub fn bls_aggregate(pks: &Vec>, sigs: &Vec>) -> BlsAggregateOutput { if pks.len() != sigs.len() { return BlsAggregateOutput{ @@ -912,6 +924,28 @@ mod tests { assert!(bls_verify(&blsAggregateOutput.aggregate_public_key, &blsAggregateOutput.aggregate_signature, b"test msg", b"test domain")); } + #[test] + fn bls_multi_message_sign() { + init(); + let outs: Vec = (0..20).into_iter().map(|_| bls_keygen()).collect(); + let mut pks = Vec::>::new(); + let mut messages = Vec::>::new(); + let mut sigs = Vec::>::new(); + for (i, out) in outs.clone().iter().enumerate() { + assert!(bls_verify(&out.public_key.clone(), &out.proof_of_possession_sig, &out.public_key, b"BLS48_POP_SK")); + let msg = format!("test msg {i}"); + let sig = bls_sign(&out.secret_key, msg.as_bytes(), b"test domain"); + pks.push(out.public_key.clone()); + messages.push(msg.as_bytes().to_vec()); + sigs.push(sig); + } + let blsAggregateOutput = bls_aggregate( + &outs.iter().map(|out| out.public_key.clone()).collect::>>(), + &sigs, + ); + assert!(bls_verify_msig_mmsg(&pks, &blsAggregateOutput.aggregate_signature, &messages, b"test domain")); + } + #[test] fn multiproof_roundtrip() { init(); // sets up the global BLS48‑581 constants diff --git a/crates/bls48581/src/lib.udl b/crates/bls48581/src/lib.udl index 326eaee..094879b 100644 --- a/crates/bls48581/src/lib.udl +++ b/crates/bls48581/src/lib.udl @@ -9,6 +9,7 @@ namespace bls48581 { sequence bls_sign([ByRef] sequence sk, [ByRef] sequence msg, [ByRef] sequence domain); boolean bls_verify([ByRef] sequence pk, [ByRef] sequence sig, [ByRef] sequence msg, [ByRef] sequence domain); BlsAggregateOutput bls_aggregate([ByRef] sequence> pks, [ByRef] sequence> sigs); + boolean bls_verify_msig_mmsg([ByRef] sequence> pks, [ByRef] sequence sig, [ByRef] sequence> msgs, [ByRef] sequence domain); }; dictionary Multiproof { diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 0cc8107..ccbac93 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -2402,14 +2402,23 @@ func (e *AppConsensusEngine) VerifyTimeoutCertificate( } pubkeys := [][]byte{} + messages := [][]byte{} signatures := [][]byte{} if ((len(provers) + 7) / 8) > len(tc.AggregateSignature.Bitmask) { return models.ErrInvalidSignature } + + idx := 0 for i, prover := range provers { if tc.AggregateSignature.Bitmask[i/8]&(1<<(i%8)) == (1 << (i % 8)) { pubkeys = append(pubkeys, prover.PublicKey) signatures = append(signatures, tc.AggregateSignature.GetSignature()) + messages = append(messages, verification.MakeTimeoutMessage( + nil, + tc.Rank, + tc.LatestRanks[idx], + )) + idx++ } } @@ -2425,14 +2434,10 @@ func (e *AppConsensusEngine) VerifyTimeoutCertificate( return models.ErrInvalidSignature } - if valid := e.blsConstructor.VerifySignatureRaw( - tc.AggregateSignature.GetPubKey(), + if valid := e.blsConstructor.VerifyMultiMessageSignatureRaw( + pubkeys, tc.AggregateSignature.GetSignature(), - verification.MakeTimeoutMessage( - nil, - tc.Rank, - tc.LatestQuorumCertificate.Rank, - ), + messages, slices.Concat([]byte("appshardtimeout"), e.appAddress), ); !valid { return models.ErrInvalidSignature diff --git a/node/consensus/app/consensus_voting_provider.go b/node/consensus/app/consensus_voting_provider.go index 6408b83..a904561 100644 --- a/node/consensus/app/consensus_voting_provider.go +++ b/node/consensus/app/consensus_voting_provider.go @@ -3,6 +3,7 @@ package app import ( "context" "slices" + "sort" "time" "github.com/pkg/errors" @@ -66,13 +67,34 @@ func (p *AppVotingProvider) FinalizeTimeout( ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, - latestQuorumCertificateRanks []uint64, + latestQuorumCertificateRanks []consensus.TimeoutSignerInfo, aggregatedSignature models.AggregatedSignature, ) (models.TimeoutCertificate, error) { + ranksInProverOrder := slices.Clone(latestQuorumCertificateRanks) + provers, err := p.engine.proverRegistry.GetActiveProvers(p.engine.appAddress) + if err != nil { + return nil, err + } + + proverIndexes := map[models.Identity]int{} + for i, p := range provers { + proverIndexes[models.Identity(p.Address)] = i + } + + sort.Slice(ranksInProverOrder, func(i, j int) bool { + return proverIndexes[ranksInProverOrder[i].Signer]- + proverIndexes[ranksInProverOrder[j].Signer] < 0 + }) + + ranks := []uint64{} + for _, r := range ranksInProverOrder { + ranks = append(ranks, r.NewestQCRank) + } + return &protobufs.TimeoutCertificate{ Filter: p.engine.appAddress, Rank: rank, - LatestRanks: latestQuorumCertificateRanks, + LatestRanks: ranks, LatestQuorumCertificate: latestQuorumCertificate.(*protobufs.QuorumCertificate), Timestamp: uint64(time.Now().UnixMilli()), AggregateSignature: &protobufs.BLS48581AggregateSignature{ diff --git a/node/consensus/global/consensus_voting_provider.go b/node/consensus/global/consensus_voting_provider.go index 68b9650..fe8e161 100644 --- a/node/consensus/global/consensus_voting_provider.go +++ b/node/consensus/global/consensus_voting_provider.go @@ -2,6 +2,8 @@ package global import ( "context" + "slices" + "sort" "time" "github.com/pkg/errors" @@ -59,12 +61,33 @@ func (p *GlobalVotingProvider) FinalizeTimeout( ctx context.Context, rank uint64, latestQuorumCertificate models.QuorumCertificate, - latestQuorumCertificateRanks []uint64, + latestQuorumCertificateRanks []consensus.TimeoutSignerInfo, aggregatedSignature models.AggregatedSignature, ) (models.TimeoutCertificate, error) { + ranksInProverOrder := slices.Clone(latestQuorumCertificateRanks) + provers, err := p.engine.proverRegistry.GetActiveProvers(nil) + if err != nil { + return nil, err + } + + proverIndexes := map[models.Identity]int{} + for i, p := range provers { + proverIndexes[models.Identity(p.Address)] = i + } + + sort.Slice(ranksInProverOrder, func(i, j int) bool { + return proverIndexes[ranksInProverOrder[i].Signer]- + proverIndexes[ranksInProverOrder[j].Signer] < 0 + }) + + ranks := []uint64{} + for _, r := range ranksInProverOrder { + ranks = append(ranks, r.NewestQCRank) + } + return &protobufs.TimeoutCertificate{ Rank: rank, - LatestRanks: latestQuorumCertificateRanks, + LatestRanks: ranks, LatestQuorumCertificate: latestQuorumCertificate.(*protobufs.QuorumCertificate), Timestamp: uint64(time.Now().UnixMilli()), AggregateSignature: &protobufs.BLS48581AggregateSignature{ diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index f1e39e4..5e6467f 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -521,6 +521,25 @@ func NewGlobalConsensusEngine( if err != nil { establishGenesis() } else { + if latest.LatestTimeout != nil { + logger.Info( + "obtained latest consensus state", + zap.Uint64("finalized_rank", latest.FinalizedRank), + zap.Uint64("latest_acknowledged_rank", latest.LatestAcknowledgedRank), + zap.Uint64("latest_timeout_rank", latest.LatestTimeout.Rank), + zap.Uint64("latest_timeout_tick", latest.LatestTimeout.TimeoutTick), + zap.Uint64( + "latest_timeout_qc_rank", + latest.LatestTimeout.LatestQuorumCertificate.GetRank(), + ), + ) + } else { + logger.Info( + "obtained latest consensus state", + zap.Uint64("finalized_rank", latest.FinalizedRank), + zap.Uint64("latest_acknowledged_rank", latest.LatestAcknowledgedRank), + ) + } qc, err := engine.clockStore.GetQuorumCertificate( nil, latest.FinalizedRank, @@ -2609,6 +2628,13 @@ func (e *GlobalConsensusEngine) OnOwnProposal( return } + err = e.clockStore.PutGlobalClockFrameCandidate(*proposal.State.State, txn) + if err != nil { + e.logger.Error("could not put frame candidate", zap.Error(err)) + txn.Abort() + return + } + if err := txn.Commit(); err != nil { e.logger.Error("could not commit transaction", zap.Error(err)) txn.Abort() @@ -2674,6 +2700,11 @@ func (e *GlobalConsensusEngine) OnOwnTimeout( return } + e.logger.Debug( + "aggregating own timeout", + zap.Uint64("timeout_rank", timeout.Rank), + zap.Uint64("vote_rank", (*timeout.Vote).Rank), + ) e.timeoutAggregator.AddTimeout(timeout) if err := e.pubsub.PublishToBitmask( @@ -2787,6 +2818,16 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( frame, ok := e.frameStore[qc.Identity()] e.frameStoreMu.RUnlock() + if !ok { + frame, err = e.clockStore.GetGlobalClockFrameCandidate( + qc.GetFrameNumber(), + []byte(qc.Identity()), + ) + if err == nil { + ok = true + } + } + if !ok { e.logger.Error( "no frame for quorum certificate", @@ -3110,13 +3151,22 @@ func (e *GlobalConsensusEngine) VerifyTimeoutCertificate( pubkeys := [][]byte{} signatures := [][]byte{} + messages := [][]byte{} if ((len(provers) + 7) / 8) > len(tc.AggregateSignature.Bitmask) { return models.ErrInvalidSignature } + + idx := 0 for i, prover := range provers { if tc.AggregateSignature.Bitmask[i/8]&(1<<(i%8)) == (1 << (i % 8)) { pubkeys = append(pubkeys, prover.PublicKey) signatures = append(signatures, tc.AggregateSignature.GetSignature()) + messages = append(messages, verification.MakeTimeoutMessage( + nil, + tc.Rank, + tc.LatestRanks[idx], + )) + idx++ } } @@ -3132,14 +3182,10 @@ func (e *GlobalConsensusEngine) VerifyTimeoutCertificate( return models.ErrInvalidSignature } - if valid := e.blsConstructor.VerifySignatureRaw( - tc.AggregateSignature.GetPubKey(), + if valid := e.blsConstructor.VerifyMultiMessageSignatureRaw( + pubkeys, tc.AggregateSignature.GetSignature(), - verification.MakeTimeoutMessage( - nil, - tc.Rank, - tc.LatestQuorumCertificate.Rank, - ), + messages, []byte("globaltimeout"), ); !valid { return models.ErrInvalidSignature diff --git a/node/consensus/global/message_subscription.go b/node/consensus/global/message_subscription.go index 1391497..00d1a36 100644 --- a/node/consensus/global/message_subscription.go +++ b/node/consensus/global/message_subscription.go @@ -26,10 +26,10 @@ func (e *GlobalConsensusEngine) subscribeToGlobalConsensus() error { select { case <-e.haltCtx.Done(): return nil - case e.globalConsensusMessageQueue <- message: - return nil case <-e.ShutdownSignal(): return errors.New("context cancelled") + case e.globalConsensusMessageQueue <- message: + return nil default: e.logger.Warn("global message queue full, dropping message") return nil @@ -57,10 +57,10 @@ func (e *GlobalConsensusEngine) subscribeToGlobalConsensus() error { select { case <-e.haltCtx.Done(): return nil - case e.appFramesMessageQueue <- message: - return nil case <-e.ShutdownSignal(): return errors.New("context cancelled") + case e.appFramesMessageQueue <- message: + return nil default: e.logger.Warn("app frames message queue full, dropping message") return nil @@ -98,10 +98,10 @@ func (e *GlobalConsensusEngine) subscribeToShardConsensusMessages() error { select { case <-e.haltCtx.Done(): return nil - case e.shardConsensusMessageQueue <- message: - return nil case <-e.ShutdownSignal(): return errors.New("context cancelled") + case e.shardConsensusMessageQueue <- message: + return nil default: e.logger.Warn("shard consensus queue full, dropping message") return nil @@ -140,10 +140,10 @@ func (e *GlobalConsensusEngine) subscribeToFrameMessages() error { select { case <-e.haltCtx.Done(): return nil - case e.globalFrameMessageQueue <- message: - return nil case <-e.ShutdownSignal(): return errors.New("context cancelled") + case e.globalFrameMessageQueue <- message: + return nil default: e.logger.Warn("global frame queue full, dropping message") return nil @@ -179,11 +179,11 @@ func (e *GlobalConsensusEngine) subscribeToProverMessages() error { select { case <-e.haltCtx.Done(): return nil + case <-e.ShutdownSignal(): + return errors.New("context cancelled") case e.globalProverMessageQueue <- message: e.logger.Debug("received prover message") return nil - case <-e.ShutdownSignal(): - return errors.New("context cancelled") default: e.logger.Warn("global prover message queue full, dropping message") return nil @@ -214,10 +214,10 @@ func (e *GlobalConsensusEngine) subscribeToPeerInfoMessages() error { select { case <-e.haltCtx.Done(): return nil - case e.globalPeerInfoMessageQueue <- message: - return nil case <-e.ShutdownSignal(): return errors.New("context cancelled") + case e.globalPeerInfoMessageQueue <- message: + return nil default: e.logger.Warn("peer info message queue full, dropping message") return nil diff --git a/node/consensus/global/message_validation.go b/node/consensus/global/message_validation.go index 0651569..cd6e4c0 100644 --- a/node/consensus/global/message_validation.go +++ b/node/consensus/global/message_validation.go @@ -50,7 +50,11 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage( } if e.currentRank > proposal.GetRank() { - e.logger.Debug("proposal is stale") + e.logger.Debug( + "proposal is stale", + zap.Uint64("current_rank", e.currentRank), + zap.Uint64("timeout_rank", proposal.GetRank()), + ) proposalValidationTotal.WithLabelValues("reject").Inc() return tp2p.ValidationResultIgnore } @@ -86,7 +90,11 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage( // We should still accept votes for the past rank – either because a peer // needs it, or because we need it to trump a TC if e.currentRank > vote.Rank+1 { - e.logger.Debug("vote is stale") + e.logger.Debug( + "vote is stale", + zap.Uint64("current_rank", e.currentRank), + zap.Uint64("timeout_rank", vote.Rank), + ) return tp2p.ValidationResultIgnore } @@ -112,12 +120,6 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage( return tp2p.ValidationResultReject } - // We should still accept votes for the past rank in case a peer needs it - if e.currentRank > timeoutState.Vote.Rank+1 { - e.logger.Debug("timeout is stale") - return tp2p.ValidationResultIgnore - } - // Validate the timeoutState if err := timeoutState.Validate(); err != nil { e.logger.Debug("invalid timeoutState", zap.Error(err)) @@ -125,6 +127,16 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage( return tp2p.ValidationResultReject } + // We should still accept votes for the past rank in case a peer needs it + if e.currentRank > timeoutState.Vote.Rank+1 { + e.logger.Debug( + "timeout is stale", + zap.Uint64("current_rank", e.currentRank), + zap.Uint64("timeout_rank", timeoutState.Vote.Rank), + ) + return tp2p.ValidationResultIgnore + } + timeoutStateValidationTotal.WithLabelValues("accept").Inc() default: diff --git a/node/store/clock.go b/node/store/clock.go index a8acb6d..47de0ba 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -532,6 +532,13 @@ func clockGlobalFrameKey(frameNumber uint64) []byte { return clockFrameKey([]byte{}, frameNumber, CLOCK_GLOBAL_FRAME) } +func clockGlobalFrameCandidateKey(frameNumber uint64, selector []byte) []byte { + key := []byte{CLOCK_FRAME, CLOCK_GLOBAL_FRAME_CANDIDATE} + key = binary.BigEndian.AppendUint64(key, frameNumber) + key = append(key, selector...) + return key +} + func extractFrameNumberFromGlobalFrameKey( key []byte, ) (uint64, error) { @@ -731,6 +738,18 @@ func clockGlobalFrameRequestKey( return key } +func clockGlobalFrameRequestCandidateKey( + selector []byte, + frameNumber uint64, + requestIndex uint16, +) []byte { + key := []byte{CLOCK_FRAME, CLOCK_GLOBAL_FRAME_REQUEST_CANDIDATE} + key = append(key, selector...) + key = binary.BigEndian.AppendUint64(key, frameNumber) + key = binary.BigEndian.AppendUint16(key, requestIndex) + return key +} + func clockProposalVoteKey(rank uint64, filter []byte, identity []byte) []byte { key := []byte{CLOCK_FRAME, CLOCK_PROPOSAL_VOTE} key = binary.BigEndian.AppendUint64(key, rank) @@ -938,6 +957,116 @@ func (p *PebbleClockStore) PutGlobalClockFrame( return nil } +// PutGlobalClockFrameCandidate implements ClockStore. +func (p *PebbleClockStore) PutGlobalClockFrameCandidate( + frame *protobufs.GlobalFrame, + txn store.Transaction, +) error { + if frame.Header == nil { + return errors.Wrap( + errors.New("frame header is required"), + "put global clock frame candidate", + ) + } + + frameNumber := frame.Header.FrameNumber + + // Serialize the full header using protobuf + headerData, err := proto.Marshal(frame.Header) + if err != nil { + return errors.Wrap(err, "put global clock frame candidate") + } + + if err := txn.Set( + clockGlobalFrameCandidateKey(frameNumber, []byte(frame.Identity())), + headerData, + ); err != nil { + return errors.Wrap(err, "put global clock frame candidate") + } + + // Store requests separately with iterative keys + for i, request := range frame.Requests { + requestData, err := proto.Marshal(request) + if err != nil { + return errors.Wrap(err, "put global clock frame candidate request") + } + + if err := txn.Set( + clockGlobalFrameRequestCandidateKey( + []byte(frame.Identity()), + frameNumber, + uint16(i), + ), + requestData, + ); err != nil { + return errors.Wrap(err, "put global clock frame candidate request") + } + } + + return nil +} + +// GetGlobalClockFrameCandidate implements ClockStore. +func (p *PebbleClockStore) GetGlobalClockFrameCandidate( + frameNumber uint64, + selector []byte, +) (*protobufs.GlobalFrame, error) { + value, closer, err := p.db.Get(clockGlobalFrameCandidateKey( + frameNumber, + selector, + )) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, store.ErrNotFound + } + + return nil, errors.Wrap(err, "get global clock frame candidate") + } + defer closer.Close() + + // Deserialize the GlobalFrameHeader + header := &protobufs.GlobalFrameHeader{} + if err := proto.Unmarshal(value, header); err != nil { + return nil, errors.Wrap(err, "get global clock frame candidate") + } + + frame := &protobufs.GlobalFrame{ + Header: header, + } + + // Retrieve all requests for this frame + var requests []*protobufs.MessageBundle + requestIndex := uint16(0) + for { + requestKey := clockGlobalFrameRequestCandidateKey( + selector, + frameNumber, + requestIndex, + ) + requestData, closer, err := p.db.Get(requestKey) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + // No more requests + break + } + return nil, errors.Wrap(err, "get global clock frame candidate") + } + defer closer.Close() + + request := &protobufs.MessageBundle{} + if err := proto.Unmarshal(requestData, request); err != nil { + return nil, errors.Wrap(err, "get global clock frame candidate") + } + + requests = append(requests, request) + requestIndex++ + } + + frame.Requests = requests + + return frame, nil +} + // GetShardClockFrame implements ClockStore. func (p *PebbleClockStore) GetShardClockFrame( filter []byte, diff --git a/node/store/constants.go b/node/store/constants.go index 0b12fb4..5c5276e 100644 --- a/node/store/constants.go +++ b/node/store/constants.go @@ -36,6 +36,7 @@ const ( CLOCK_TIMEOUT_CERTIFICATE = 0x0C CLOCK_PROPOSAL_VOTE = 0x0D CLOCK_TIMEOUT_VOTE = 0x0E + CLOCK_GLOBAL_FRAME_CANDIDATE = 0x0F CLOCK_GLOBAL_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_GLOBAL_FRAME CLOCK_GLOBAL_FRAME_INDEX_LATEST = 0x20 | CLOCK_GLOBAL_FRAME @@ -67,6 +68,11 @@ const ( CLOCK_SHARD_FRAME_CANDIDATE_INDEX_LATEST = 0x20 | CLOCK_SHARD_FRAME_CANDIDATE_SHARD + + CLOCK_GLOBAL_FRAME_CANDIDATE_INDEX_LATEST = 0x20 | + CLOCK_GLOBAL_FRAME_CANDIDATE + + CLOCK_GLOBAL_FRAME_REQUEST_CANDIDATE = 0xF8 ) // Coin store indexes: diff --git a/node/store/pebble.go b/node/store/pebble.go index 7943637..9f9dca4 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -154,7 +154,7 @@ func (p *PebbleDB) migrate(logger *zap.Logger) error { return nil } - batch := p.db.NewBatch() + batch := p.db.NewIndexedBatch() for i := int(storedVersion); i < len(pebbleMigrations); i++ { logger.Warn( "performing pebble store migration", diff --git a/types/crypto/keys.go b/types/crypto/keys.go index 1a94991..1a0c2ac 100644 --- a/types/crypto/keys.go +++ b/types/crypto/keys.go @@ -28,6 +28,12 @@ type BlsConstructor interface { message []byte, context []byte, ) bool + VerifyMultiMessageSignatureRaw( + publicKeysG2 [][]byte, + signatureG1 []byte, + messages [][]byte, + context []byte, + ) bool Aggregate( publicKeys [][]byte, signatures [][]byte, diff --git a/types/mocks/bls_constructor.go b/types/mocks/bls_constructor.go index 90b44c9..4879718 100644 --- a/types/mocks/bls_constructor.go +++ b/types/mocks/bls_constructor.go @@ -23,6 +23,17 @@ func (m *MockBlsConstructor) VerifySignatureRaw( return args.Bool(0) } +// VerifyMultiMessageSignatureRaw implements crypto.BlsConstructor. +func (m *MockBlsConstructor) VerifyMultiMessageSignatureRaw( + publicKeysG2 [][]byte, + signatureG1 []byte, + messages [][]byte, + context []byte, +) bool { + args := m.Called(publicKeysG2, signatureG1, messages, context) + return args.Bool(0) +} + func (m *MockBlsConstructor) New() (qcrypto.Signer, []byte, error) { args := m.Called() return args.Get(0).(qcrypto.Signer), args.Get(1).([]byte), args.Error(2) diff --git a/types/mocks/clock_store.go b/types/mocks/clock_store.go index 17b6544..9a2f7e3 100644 --- a/types/mocks/clock_store.go +++ b/types/mocks/clock_store.go @@ -16,6 +16,32 @@ type MockClockStore struct { mock.Mock } +// GetGlobalClockFrameCandidate implements store.ClockStore. +func (m *MockClockStore) GetGlobalClockFrameCandidate( + frameNumber uint64, + selector []byte, +) (*protobufs.GlobalFrame, error) { + args := m.Called( + frameNumber, + selector, + ) + + return args.Get(0).(*protobufs.GlobalFrame), args.Error(1) +} + +// PutGlobalClockFrameCandidate implements store.ClockStore. +func (m *MockClockStore) PutGlobalClockFrameCandidate( + frame *protobufs.GlobalFrame, + txn store.Transaction, +) error { + args := m.Called( + frame, + txn, + ) + + return args.Error(0) +} + // GetProposalVote implements store.ClockStore. func (m *MockClockStore) GetProposalVote( filter []byte, diff --git a/types/store/clock.go b/types/store/clock.go index e303ab8..fcf4f10 100644 --- a/types/store/clock.go +++ b/types/store/clock.go @@ -17,6 +17,14 @@ type ClockStore interface { endFrameNumber uint64, ) (TypedIterator[*protobufs.GlobalFrame], error) PutGlobalClockFrame(frame *protobufs.GlobalFrame, txn Transaction) error + PutGlobalClockFrameCandidate( + frame *protobufs.GlobalFrame, + txn Transaction, + ) error + GetGlobalClockFrameCandidate( + frameNumber uint64, + selector []byte, + ) (*protobufs.GlobalFrame, error) GetLatestCertifiedGlobalState() (*protobufs.GlobalProposal, error) GetEarliestCertifiedGlobalState() (*protobufs.GlobalProposal, error) GetCertifiedGlobalState(rank uint64) (*protobufs.GlobalProposal, error)