plugged in, vetting message validation paths

This commit is contained in:
Cassandra Heart 2025-11-05 08:23:07 -06:00
parent c86674b97f
commit 68daf0c783
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
40 changed files with 3941 additions and 1475 deletions

View File

@ -13,6 +13,6 @@ type ConsensusStore[VoteT models.Unique] interface {
// ReadOnlyConsensusStore defines the methods required for reading internal
// state persisted between restarts of the consensus engine.
type ReadOnlyConsensusStore[VoteT models.Unique] interface {
GetConsensusState() (*models.ConsensusState[VoteT], error)
GetLivenessState() (*models.LivenessState, error)
GetConsensusState(filter []byte) (*models.ConsensusState[VoteT], error)
GetLivenessState(filter []byte) (*models.LivenessState, error)
}

View File

@ -491,6 +491,11 @@ func (e *EventHandler[
]) proposeForNewRankIfPrimary() error {
start := time.Now() // track the start time
curRank := e.paceMaker.CurrentRank()
e.tracer.Trace(
"deciding to propose",
consensus.Uint64Param("current_rank", curRank),
consensus.IdentityParam("self", e.committee.Self()),
)
currentLeader, err := e.committee.LeaderForRank(curRank)
if err != nil {
return fmt.Errorf(

View File

@ -49,7 +49,7 @@ func NewTestPacemaker[
notifier consensus.Consumer[StateT, VoteT],
store consensus.ConsensusStore[VoteT],
) *TestPacemaker[StateT, VoteT, PeerIDT, CollectedT] {
p, err := pacemaker.NewPacemaker[StateT, VoteT](timeoutController, proposalDelayProvider, notifier, store, helper.Logger())
p, err := pacemaker.NewPacemaker[StateT, VoteT](nil, timeoutController, proposalDelayProvider, notifier, store, helper.Logger())
if err != nil {
panic(err)
}
@ -114,7 +114,7 @@ func initPacemaker(t require.TestingT, ctx context.Context, livenessData *models
require.NoError(t, err)
persist := &mocks.ConsensusStore[*helper.TestVote]{}
persist.On("PutLivenessState", mock.Anything).Return(nil).Maybe()
persist.On("GetLivenessState").Return(livenessData, nil).Once()
persist.On("GetLivenessState", mock.Anything).Return(livenessData, nil).Once()
pm := NewTestPacemaker[*helper.TestState, *helper.TestVote, *helper.TestPeer, *helper.TestCollected](timeout.NewController(tc), pacemaker.NoProposalDelay(), notifier, persist)
notifier.On("OnStartingTimeout", mock.Anything, mock.Anything).Return()
notifier.On("OnQuorumCertificateTriggeredRankChange", mock.Anything, mock.Anything, mock.Anything).Return()

View File

@ -34,6 +34,11 @@ func NewForks[StateT models.Unique, VoteT models.Unique](
finalizationCallback consensus.Finalizer,
notifier consensus.FollowerConsumer[StateT, VoteT],
) (*Forks[StateT, VoteT], error) {
if trustedRoot == nil {
return nil,
models.NewConfigurationErrorf("invalid root: root is nil")
}
if (trustedRoot.State.Identifier != trustedRoot.CertifyingQuorumCertificate.Identity()) ||
(trustedRoot.State.Rank != trustedRoot.CertifyingQuorumCertificate.GetRank()) {
return nil,

View File

@ -380,11 +380,11 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
LatestQuorumCertificate: rootQC,
}
in.persist.On("GetLivenessState").Return(livenessData, nil).Once()
in.persist.On("GetLivenessState", mock.Anything).Return(livenessData, nil).Once()
// initialize the pacemaker
controller := timeout.NewController(cfg.Timeouts)
in.pacemaker, err = pacemaker.NewPacemaker[*helper.TestState, *helper.TestVote](controller, pacemaker.NoProposalDelay(), notifier, in.persist, in.logger)
in.pacemaker, err = pacemaker.NewPacemaker[*helper.TestState, *helper.TestVote](nil, controller, pacemaker.NoProposalDelay(), notifier, in.persist, in.logger)
require.NoError(t, err)
// initialize the forks handler
@ -580,7 +580,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
in.persist.On("GetConsensusState", mock.Anything).Return(safetyData, nil).Once()
// initialize the safety rules
in.safetyRules, err = safetyrules.NewSafetyRules(in.signer, in.persist, in.committee)
in.safetyRules, err = safetyrules.NewSafetyRules(nil, in.signer, in.persist, in.committee)
require.NoError(t, err)
// initialize the state producer

View File

@ -13,8 +13,8 @@ type ConsensusStore[VoteT models.Unique] struct {
}
// GetConsensusState provides a mock function with no fields
func (_m *ConsensusStore[VoteT]) GetConsensusState() (*models.ConsensusState[VoteT], error) {
ret := _m.Called()
func (_m *ConsensusStore[VoteT]) GetConsensusState(filter []byte) (*models.ConsensusState[VoteT], error) {
ret := _m.Called(filter)
if len(ret) == 0 {
panic("no return value specified for GetConsensusState")
@ -22,19 +22,19 @@ func (_m *ConsensusStore[VoteT]) GetConsensusState() (*models.ConsensusState[Vot
var r0 *models.ConsensusState[VoteT]
var r1 error
if rf, ok := ret.Get(0).(func() (*models.ConsensusState[VoteT], error)); ok {
return rf()
if rf, ok := ret.Get(0).(func(filter []byte) (*models.ConsensusState[VoteT], error)); ok {
return rf(filter)
}
if rf, ok := ret.Get(0).(func() *models.ConsensusState[VoteT]); ok {
r0 = rf()
if rf, ok := ret.Get(0).(func(filter []byte) *models.ConsensusState[VoteT]); ok {
r0 = rf(filter)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*models.ConsensusState[VoteT])
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
if rf, ok := ret.Get(1).(func(filter []byte) error); ok {
r1 = rf(filter)
} else {
r1 = ret.Error(1)
}
@ -43,8 +43,8 @@ func (_m *ConsensusStore[VoteT]) GetConsensusState() (*models.ConsensusState[Vot
}
// GetLivenessState provides a mock function with no fields
func (_m *ConsensusStore[VoteT]) GetLivenessState() (*models.LivenessState, error) {
ret := _m.Called()
func (_m *ConsensusStore[VoteT]) GetLivenessState(filter []byte) (*models.LivenessState, error) {
ret := _m.Called(filter)
if len(ret) == 0 {
panic("no return value specified for GetLivenessState")
@ -52,19 +52,19 @@ func (_m *ConsensusStore[VoteT]) GetLivenessState() (*models.LivenessState, erro
var r0 *models.LivenessState
var r1 error
if rf, ok := ret.Get(0).(func() (*models.LivenessState, error)); ok {
return rf()
if rf, ok := ret.Get(0).(func(filter []byte) (*models.LivenessState, error)); ok {
return rf(filter)
}
if rf, ok := ret.Get(0).(func() *models.LivenessState); ok {
r0 = rf()
if rf, ok := ret.Get(0).(func(filter []byte) *models.LivenessState); ok {
r0 = rf(filter)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*models.LivenessState)
}
}
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
if rf, ok := ret.Get(1).(func(filter []byte) error); ok {
r1 = rf(filter)
} else {
r1 = ret.Error(1)
}

View File

@ -48,6 +48,7 @@ var _ consensus.ProposalDurationProvider = (*Pacemaker[*nilUnique, *nilUnique])(
// Expected error conditions:
// * models.ConfigurationError if initial LivenessState is invalid
func NewPacemaker[StateT models.Unique, VoteT models.Unique](
filter []byte,
timeoutController *timeout.Controller,
proposalDurationProvider consensus.ProposalDurationProvider,
notifier consensus.Consumer[StateT, VoteT],
@ -55,7 +56,7 @@ func NewPacemaker[StateT models.Unique, VoteT models.Unique](
tracer consensus.TraceLogger,
recovery ...recoveryInformation[StateT, VoteT],
) (*Pacemaker[StateT, VoteT], error) {
vt, err := newRankTracker[StateT, VoteT](store)
vt, err := newRankTracker[StateT, VoteT](filter, store)
if err != nil {
return nil, fmt.Errorf("initializing rank tracker failed: %w", err)
}

View File

@ -67,10 +67,10 @@ func (s *PacemakerTestSuite) SetupTest() {
PriorRankTimeoutCertificate: nil,
LatestQuorumCertificate: s.initialQC,
}
s.store.On("GetLivenessState").Return(livenessState, nil)
s.store.On("GetLivenessState", mock.Anything).Return(livenessState, nil)
// init Pacemaker and start
s.pacemaker, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger())
s.pacemaker, err = NewPacemaker(nil, timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger())
require.NoError(s.T(), err)
var ctx context.Context
@ -335,6 +335,7 @@ func (s *PacemakerTestSuite) Test_Initialization() {
// test that the constructor finds the newest QC and TC
s.Run("Random TCs and QCs combined", func() {
pm, err := NewPacemaker(
nil,
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(),
WithQCs[*helper.TestState, *helper.TestVote](qcs...), WithTCs[*helper.TestState, *helper.TestVote](tcs...),
)
@ -355,6 +356,7 @@ func (s *PacemakerTestSuite) Test_Initialization() {
tcs[45] = helper.MakeTC(helper.WithTCRank(highestRank+15), helper.WithTCNewestQC(QC(highestRank+12)))
pm, err := NewPacemaker(
nil,
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(),
WithTCs[*helper.TestState, *helper.TestVote](tcs...), WithQCs[*helper.TestState, *helper.TestVote](qcs...),
)
@ -375,6 +377,7 @@ func (s *PacemakerTestSuite) Test_Initialization() {
tcs[45] = helper.MakeTC(helper.WithTCRank(highestRank+15), helper.WithTCNewestQC(QC(highestRank+15)))
pm, err := NewPacemaker(
nil,
timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(),
WithTCs[*helper.TestState, *helper.TestVote](tcs...), WithQCs[*helper.TestState, *helper.TestVote](qcs...),
)
@ -391,11 +394,11 @@ func (s *PacemakerTestSuite) Test_Initialization() {
// Verify that WithTCs still works correctly if no TCs are given:
// the list of TCs is empty or all contained TCs are nil
s.Run("Only nil TCs", func() {
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote]())
pm, err := NewPacemaker(nil, timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote]())
require.NoError(s.T(), err)
require.Equal(s.T(), s.initialRank, pm.CurrentRank())
pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote](nil, nil, nil))
pm, err = NewPacemaker(nil, timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithTCs[*helper.TestState, *helper.TestVote](nil, nil, nil))
require.NoError(s.T(), err)
require.Equal(s.T(), s.initialRank, pm.CurrentRank())
})
@ -403,11 +406,11 @@ func (s *PacemakerTestSuite) Test_Initialization() {
// Verify that WithQCs still works correctly if no QCs are given:
// the list of QCs is empty or all contained QCs are nil
s.Run("Only nil QCs", func() {
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote]())
pm, err := NewPacemaker(nil, timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote]())
require.NoError(s.T(), err)
require.Equal(s.T(), s.initialRank, pm.CurrentRank())
pm, err = NewPacemaker(timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote](nil, nil, nil))
pm, err = NewPacemaker(nil, timeout.NewController(s.timeoutConf), NoProposalDelay(), s.notifier, s.store, helper.Logger(), WithQCs[*helper.TestState, *helper.TestVote](nil, nil, nil))
require.NoError(s.T(), err)
require.Equal(s.T(), s.initialRank, pm.CurrentRank())
})
@ -417,7 +420,7 @@ func (s *PacemakerTestSuite) Test_Initialization() {
// TestProposalDuration tests that the active pacemaker forwards proposal duration values from the provider.
func (s *PacemakerTestSuite) TestProposalDuration() {
proposalDurationProvider := NewStaticProposalDurationProvider(time.Millisecond * 500)
pm, err := NewPacemaker(timeout.NewController(s.timeoutConf), &proposalDurationProvider, s.notifier, s.store, helper.Logger())
pm, err := NewPacemaker(nil, timeout.NewController(s.timeoutConf), &proposalDurationProvider, s.notifier, s.store, helper.Logger())
require.NoError(s.T(), err)
now := time.Now().UTC()

View File

@ -28,9 +28,10 @@ type rankTracker[StateT models.Unique, VoteT models.Unique] struct {
// newRankTracker instantiates a rankTracker.
func newRankTracker[StateT models.Unique, VoteT models.Unique](
filter []byte,
store consensus.ConsensusStore[VoteT],
) (rankTracker[StateT, VoteT], error) {
livenessState, err := store.GetLivenessState()
livenessState, err := store.GetLivenessState(filter)
if err != nil {
return rankTracker[StateT, VoteT]{},
fmt.Errorf("could not load liveness data: %w", err)

View File

@ -40,10 +40,10 @@ func (s *RankTrackerTestSuite) SetupTest() {
CurrentRank: s.initialRank, // we entered rank 5 by observing a QC for rank 4
}
s.store = mocks.NewConsensusStore[*helper.TestVote](s.T())
s.store.On("GetLivenessState").Return(s.livenessState, nil).Once()
s.store.On("GetLivenessState", mock.Anything).Return(s.livenessState, nil).Once()
var err error
s.tracker, err = newRankTracker[*helper.TestState, *helper.TestVote](s.store)
s.tracker, err = newRankTracker[*helper.TestState, *helper.TestVote](nil, s.store)
require.NoError(s.T(), err)
}

View File

@ -42,8 +42,8 @@ func NewParticipant[
trustedRoot *models.CertifiedState[StateT],
) (*eventloop.EventLoop[StateT, VoteT], error) {
cfg, err := timeout.NewConfig(
1*time.Second,
3*time.Second,
10*time.Second,
30*time.Second,
1.2,
6,
10*time.Second,
@ -52,7 +52,7 @@ func NewParticipant[
return nil, err
}
livenessState, err := consensusStore.GetLivenessState()
livenessState, err := consensusStore.GetLivenessState(filter)
if err != nil {
livenessState = &models.LivenessState{
Filter: filter,
@ -66,7 +66,7 @@ func NewParticipant[
}
}
consensusState, err := consensusStore.GetConsensusState()
consensusState, err := consensusStore.GetConsensusState(filter)
if err != nil {
consensusState = &models.ConsensusState[VoteT]{
FinalizedRank: trustedRoot.Rank(),
@ -82,21 +82,10 @@ func NewParticipant[
voteAggregator.PruneUpToRank(trustedRoot.Rank())
timeoutAggregator.PruneUpToRank(trustedRoot.Rank())
// initialize dynamically updatable timeout config
timeoutConfig, err := timeout.NewConfig(
time.Duration(cfg.MinReplicaTimeout),
time.Duration(cfg.MaxReplicaTimeout),
cfg.TimeoutAdjustmentFactor,
cfg.HappyPathMaxRoundFailures,
time.Duration(cfg.MaxTimeoutStateRebroadcastInterval),
)
if err != nil {
return nil, fmt.Errorf("could not initialize timeout config: %w", err)
}
// initialize the pacemaker
controller := timeout.NewController(timeoutConfig)
pacemaker, err := pacemaker.NewPacemaker(
controller := timeout.NewController(cfg)
pacemaker, err := pacemaker.NewPacemaker[StateT, VoteT](
filter,
controller,
pacemaker.NoProposalDelay(),
notifier,
@ -108,7 +97,8 @@ func NewParticipant[
}
// initialize the safetyRules
safetyRules, err := safetyrules.NewSafetyRules(
safetyRules, err := safetyrules.NewSafetyRules[StateT, VoteT](
filter,
signer,
consensusStore,
committee,

View File

@ -42,12 +42,13 @@ var _ consensus.SafetyRules[*nilUnique, *nilUnique] = (*SafetyRules[*nilUnique,
// NewSafetyRules creates a new SafetyRules instance
func NewSafetyRules[StateT models.Unique, VoteT models.Unique](
filter []byte,
signer consensus.Signer[StateT, VoteT],
store consensus.ConsensusStore[VoteT],
committee consensus.DynamicCommittee,
) (*SafetyRules[StateT, VoteT], error) {
// get the last stored safety data
consensusState, err := store.GetConsensusState()
consensusState, err := store.GetConsensusState(filter)
if err != nil {
return nil, fmt.Errorf("could not load safety data: %w", err)
}
@ -330,7 +331,11 @@ func (r *SafetyRules[StateT, VoteT]) SignOwnProposal(
) (*VoteT, error) {
// check that the state is created by us
if unsignedProposal.State.ProposerID != r.committee.Self() {
return nil, fmt.Errorf("can't sign proposal for someone else's state")
return nil, fmt.Errorf(
"can't sign proposal for someone else's state, proposer: %x, self: %x",
unsignedProposal.State.ProposerID,
r.committee.Self(),
)
}
return r.produceVote(unsignedProposal, unsignedProposal.State.Rank)
@ -480,7 +485,9 @@ func (r *SafetyRules[StateT, VoteT]) validateEvidenceForEnteringRank(
// Condition 4:
if previousRankTimeoutCert == nil {
return fmt.Errorf(
"expecting TC because QC is not for prior rank; but didn't get any TC",
"expecting TC because QC (%d) is not for prior rank (%d - 1); but didn't get any TC",
newestQC.GetRank(),
rank,
)
}
if previousRankTimeoutCert.GetRank()+1 != rank {

View File

@ -66,9 +66,9 @@ func (s *SafetyRulesTestSuite) SetupTest() {
LatestAcknowledgedRank: s.bootstrapState.Rank,
}
s.persister.On("GetConsensusState").Return(s.safetyData, nil).Once()
s.persister.On("GetConsensusState", mock.Anything).Return(s.safetyData, nil).Once()
var err error
s.safety, err = NewSafetyRules(s.signer, s.persister, s.committee)
s.safety, err = NewSafetyRules(nil, s.signer, s.persister, s.committee)
require.NoError(s.T(), err)
}

View File

@ -1,7 +1,10 @@
package app
import (
"context"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/global"
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/node/execution/manager"
@ -84,7 +87,28 @@ func (d *DHTNode) Stop() {
func (m *MasterNode) Start(quitCh chan struct{}) error {
// Start the global consensus engine
m.quit = quitCh
errChan := m.globalConsensus.Start(quitCh)
supervisor, err := lifecycle.NewSupervisor(
[]*lifecycle.Node{
&lifecycle.Node{
Name: "master node",
Factory: func() (lifecycle.Component, error) {
return m.globalConsensus, nil
},
OnError: func(err error) lifecycle.ErrorHandlingBehavior {
return lifecycle.ErrorShouldShutdown
},
},
},
)
if err != nil {
return err
}
errChan := make(chan error)
go func() {
errChan <- supervisor.Start(context.Background())
}()
select {
case err := <-errChan:
if err != nil {

View File

@ -10,6 +10,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/bulletproofs"
"source.quilibrium.com/quilibrium/monorepo/channel"
"source.quilibrium.com/quilibrium/monorepo/config"
qconsensus "source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/compiler"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/app"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/difficulty"
@ -26,6 +27,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tests"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
tchannel "source.quilibrium.com/quilibrium/monorepo/types/channel"
tcompiler "source.quilibrium.com/quilibrium/monorepo/types/compiler"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
@ -90,6 +92,7 @@ var storeSet = wire.NewSet(
store.NewPeerstoreDatastore,
store.NewPebbleShardsStore,
store.NewPebbleWorkerStore,
store.NewPebbleConsensusStore,
wire.Bind(new(tstore.ClockStore), new(*store.PebbleClockStore)),
wire.Bind(new(tstore.TokenStore), new(*store.PebbleTokenStore)),
wire.Bind(new(tstore.DataProofStore), new(*store.PebbleDataProofStore)),
@ -99,6 +102,10 @@ var storeSet = wire.NewSet(
wire.Bind(new(tries.TreeBackingStore), new(*store.PebbleHypergraphStore)),
wire.Bind(new(tstore.ShardsStore), new(*store.PebbleShardsStore)),
wire.Bind(new(tstore.WorkerStore), new(*store.PebbleWorkerStore)),
wire.Bind(
new(qconsensus.ConsensusStore[*protobufs.ProposalVote]),
new(*store.PebbleConsensusStore),
),
)
var pubSubSet = wire.NewSet(

View File

@ -13,6 +13,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/bulletproofs"
"source.quilibrium.com/quilibrium/monorepo/channel"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/compiler"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/app"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/difficulty"
@ -29,9 +30,10 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
store2 "source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tests"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
channel2 "source.quilibrium.com/quilibrium/monorepo/types/channel"
compiler2 "source.quilibrium.com/quilibrium/monorepo/types/compiler"
"source.quilibrium.com/quilibrium/monorepo/types/consensus"
consensus2 "source.quilibrium.com/quilibrium/monorepo/types/consensus"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
keys2 "source.quilibrium.com/quilibrium/monorepo/types/keys"
@ -227,9 +229,10 @@ func NewMasterNode(logger *zap.Logger, config2 *config.Config, coreId uint) (*Ma
pebbleInboxStore := store2.NewPebbleInboxStore(pebbleDB, logger)
pebbleShardsStore := store2.NewPebbleShardsStore(pebbleDB, logger)
pebbleWorkerStore := store2.NewPebbleWorkerStore(pebbleDB, logger)
pebbleConsensusStore := store2.NewPebbleConsensusStore(pebbleDB, logger)
doubleRatchetEncryptedChannel := channel.NewDoubleRatchetEncryptedChannel()
bedlamCompiler := compiler.NewBedlamCompiler()
consensusEngineFactory := global.NewConsensusEngineFactory(logger, config2, blossomSub, hypergraph, fileKeyManager, pebbleKeyStore, frameProver, kzgInclusionProver, cachedSignerRegistry, proverRegistry, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, pebbleClockStore, pebbleInboxStore, pebbleHypergraphStore, pebbleShardsStore, pebbleWorkerStore, doubleRatchetEncryptedChannel, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, bls48581KeyConstructor, inMemoryPeerInfoManager)
consensusEngineFactory := global.NewConsensusEngineFactory(logger, config2, blossomSub, hypergraph, fileKeyManager, pebbleKeyStore, frameProver, kzgInclusionProver, cachedSignerRegistry, proverRegistry, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, pebbleClockStore, pebbleInboxStore, pebbleHypergraphStore, pebbleShardsStore, pebbleWorkerStore, pebbleConsensusStore, doubleRatchetEncryptedChannel, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, bls48581KeyConstructor, inMemoryPeerInfoManager)
globalConsensusComponents, err := provideGlobalConsensusComponents(consensusEngineFactory, config2)
if err != nil {
return nil, err
@ -272,7 +275,11 @@ var verencSet = wire.NewSet(
),
)
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store2.NewPebbleDB, wire.Bind(new(store.KVDB), new(*store2.PebbleDB)), store2.NewPebbleClockStore, store2.NewPebbleTokenStore, store2.NewPebbleDataProofStore, store2.NewPebbleHypergraphStore, store2.NewPebbleInboxStore, store2.NewPebbleKeyStore, store2.NewPeerstoreDatastore, store2.NewPebbleShardsStore, store2.NewPebbleWorkerStore, wire.Bind(new(store.ClockStore), new(*store2.PebbleClockStore)), wire.Bind(new(store.TokenStore), new(*store2.PebbleTokenStore)), wire.Bind(new(store.DataProofStore), new(*store2.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store2.PebbleHypergraphStore)), wire.Bind(new(store.InboxStore), new(*store2.PebbleInboxStore)), wire.Bind(new(store.KeyStore), new(*store2.PebbleKeyStore)), wire.Bind(new(tries.TreeBackingStore), new(*store2.PebbleHypergraphStore)), wire.Bind(new(store.ShardsStore), new(*store2.PebbleShardsStore)), wire.Bind(new(store.WorkerStore), new(*store2.PebbleWorkerStore)))
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store2.NewPebbleDB, wire.Bind(new(store.KVDB), new(*store2.PebbleDB)), store2.NewPebbleClockStore, store2.NewPebbleTokenStore, store2.NewPebbleDataProofStore, store2.NewPebbleHypergraphStore, store2.NewPebbleInboxStore, store2.NewPebbleKeyStore, store2.NewPeerstoreDatastore, store2.NewPebbleShardsStore, store2.NewPebbleWorkerStore, store2.NewPebbleConsensusStore, wire.Bind(new(store.ClockStore), new(*store2.PebbleClockStore)), wire.Bind(new(store.TokenStore), new(*store2.PebbleTokenStore)), wire.Bind(new(store.DataProofStore), new(*store2.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store2.PebbleHypergraphStore)), wire.Bind(new(store.InboxStore), new(*store2.PebbleInboxStore)), wire.Bind(new(store.KeyStore), new(*store2.PebbleKeyStore)), wire.Bind(new(tries.TreeBackingStore), new(*store2.PebbleHypergraphStore)), wire.Bind(new(store.ShardsStore), new(*store2.PebbleShardsStore)), wire.Bind(new(store.WorkerStore), new(*store2.PebbleWorkerStore)), wire.Bind(
new(consensus.ConsensusStore[*protobufs.ProposalVote]),
new(*store2.PebbleConsensusStore),
),
)
var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), wire.FieldsOf(new(*config.Config), "Engine"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, channel.NewDoubleRatchetEncryptedChannel, wire.Bind(new(p2p2.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p2.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)), wire.Bind(
new(channel2.EncryptedChannel),
@ -302,21 +309,21 @@ var hypergraphSet = wire.NewSet(
)
var validatorSet = wire.NewSet(registration.NewCachedSignerRegistry, wire.Bind(
new(consensus.SignerRegistry),
new(consensus2.SignerRegistry),
new(*registration.CachedSignerRegistry),
), provers.NewProverRegistry, fees.NewDynamicFeeManager, validator.NewBLSGlobalFrameValidator, wire.Bind(
new(consensus.GlobalFrameValidator),
new(consensus2.GlobalFrameValidator),
new(*validator.BLSGlobalFrameValidator),
), validator.NewBLSAppFrameValidator, wire.Bind(
new(consensus.AppFrameValidator),
new(consensus2.AppFrameValidator),
new(*validator.BLSAppFrameValidator),
), provideDifficultyAnchorFrameNumber,
provideDifficultyAnchorParentTime,
provideDifficultyAnchorDifficulty, difficulty.NewAsertDifficultyAdjuster, wire.Bind(
new(consensus.DifficultyAdjuster),
new(consensus2.DifficultyAdjuster),
new(*difficulty.AsertDifficultyAdjuster),
), reward.NewOptRewardIssuance, wire.Bind(
new(consensus.RewardIssuance),
new(consensus2.RewardIssuance),
new(*reward.OptimizedProofOfMeaningfulWorkRewardIssuance),
),
)
@ -348,8 +355,8 @@ func NewDataWorkerNode(
func provideDataWorkerIPC(
rpcMultiaddr string, config2 *config.Config,
signerRegistry consensus.SignerRegistry,
proverRegistry consensus.ProverRegistry,
signerRegistry consensus2.SignerRegistry,
proverRegistry consensus2.ProverRegistry,
appConsensusEngineFactory *app.AppConsensusEngineFactory,
peerInfoManager p2p2.PeerInfoManager,
frameProver crypto.FrameProver,

View File

@ -356,11 +356,6 @@ func NewAppConsensusEngine(
ps,
)
// Initialize execution engines
if err := engine.executionManager.InitializeEngines(); err != nil {
return nil, errors.Wrap(err, "failed to initialize execution engines")
}
appTimeReel.SetMaterializeFunc(engine.materialize)
appTimeReel.SetRevertFunc(engine.revert)

View File

@ -140,7 +140,7 @@ func (e *AppConsensusEngine) handleConsensusMessage(message *pb.Message) {
typePrefix := e.peekMessageType(message)
switch typePrefix {
case protobufs.AppShardFrameType:
case protobufs.AppShardProposalType:
e.handleProposal(message)
case protobufs.ProposalVoteType:
@ -426,15 +426,16 @@ func (e *AppConsensusEngine) handleProposal(message *pb.Message) {
)
defer timer.ObserveDuration()
frame := &protobufs.AppShardFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal frame", zap.Error(err))
proposal := &protobufs.AppShardProposal{}
if err := proposal.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal proposal", zap.Error(err))
proposalProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
if frame.Header != nil && frame.Header.Prover != nil {
valid, err := e.frameValidator.Validate(frame)
if proposal.State != nil && proposal.State.Header != nil &&
proposal.State.Header.Prover != nil {
valid, err := e.frameValidator.Validate(proposal.State)
if !valid || err != nil {
e.logger.Error("received invalid frame", zap.Error(err))
proposalProcessedTotal.WithLabelValues(
@ -444,13 +445,29 @@ func (e *AppConsensusEngine) handleProposal(message *pb.Message) {
return
}
frameIDBI, _ := poseidon.HashBytes(frame.Header.Output)
frameIDBI, _ := poseidon.HashBytes(proposal.State.Header.Output)
frameID := frameIDBI.FillBytes(make([]byte, 32))
e.frameStoreMu.Lock()
e.frameStore[string(frameID)] = frame.Clone().(*protobufs.AppShardFrame)
e.frameStore[string(frameID)] =
proposal.State.Clone().(*protobufs.AppShardFrame)
e.frameStoreMu.Unlock()
e.consensusParticipant.SubmitProposal()
e.consensusParticipant.SubmitProposal(
&models.SignedProposal[*protobufs.AppShardFrame, *protobufs.ProposalVote]{
Proposal: models.Proposal[*protobufs.AppShardFrame]{
State: &models.State[*protobufs.AppShardFrame]{
Rank: proposal.State.GetRank(),
Identifier: proposal.State.Identity(),
ProposerID: proposal.Vote.Identity(),
ParentQuorumCertificate: proposal.ParentQuorumCertificate,
Timestamp: proposal.State.GetTimestamp(),
State: &proposal.State,
},
PreviousRankTimeoutCertificate: proposal.PriorRankTimeoutCertificate,
},
Vote: &proposal.Vote,
},
)
proposalProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
}
}
@ -478,15 +495,7 @@ func (e *AppConsensusEngine) handleVote(message *pb.Message) {
return
}
if err := e.stateMachine.ReceiveVote(
PeerID{ID: vote.Proposer},
PeerID{ID: vote.PublicKeySignatureBls48581.Address},
&vote,
); err != nil {
e.logger.Error("could not receive vote", zap.Error(err))
voteProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
e.voteAggregator.AddVote(&vote)
voteProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
}

View File

@ -33,42 +33,42 @@ func (e *AppConsensusEngine) validateConsensusMessage(
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
switch typePrefix {
case protobufs.AppShardFrameType:
case protobufs.AppShardProposalType:
timer := prometheus.NewTimer(
proposalValidationDuration.WithLabelValues(e.appAddressHex),
)
defer timer.ObserveDuration()
frame := &protobufs.AppShardFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
proposal := &protobufs.AppShardProposal{}
if err := proposal.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal frame", zap.Error(err))
proposalValidationTotal.WithLabelValues(e.appAddressHex, "reject").Inc()
return p2p.ValidationResultReject
}
if frame.Header == nil {
e.logger.Debug("frame has no header")
if err := proposal.Validate(); err != nil {
e.logger.Error("invalid proposal", zap.Error(err))
proposalValidationTotal.WithLabelValues(e.appAddressHex, "reject").Inc()
return p2p.ValidationResultReject
}
if !bytes.Equal(frame.Header.Address, e.appAddress) {
if !bytes.Equal(proposal.State.Header.Address, e.appAddress) {
proposalValidationTotal.WithLabelValues(e.appAddressHex, "ignore").Inc()
return p2p.ValidationResultIgnore
}
if frametime.AppFrameSince(frame) > 20*time.Second {
if frametime.AppFrameSince(proposal.State) > 20*time.Second {
proposalValidationTotal.WithLabelValues(e.appAddressHex, "ignore").Inc()
return p2p.ValidationResultIgnore
}
if frame.Header.PublicKeySignatureBls48581 != nil {
if proposal.State.Header.PublicKeySignatureBls48581 != nil {
e.logger.Debug("frame validation has signature")
proposalValidationTotal.WithLabelValues(e.appAddressHex, "reject").Inc()
return p2p.ValidationResultReject
}
valid, err := e.frameValidator.Validate(frame)
valid, err := e.frameValidator.Validate(proposal.State)
if err != nil {
e.logger.Debug("frame validation error", zap.Error(err))
proposalValidationTotal.WithLabelValues(e.appAddressHex, "reject").Inc()

View File

@ -119,7 +119,13 @@ func (e *GlobalConsensusEngine) LeaderForRank(
}
}
selector := found.Identity()
var selector models.Identity
if found == nil {
selector = models.Identity(make([]byte, 32))
} else {
selector = found.Identity()
}
prover, err := e.proverRegistry.GetNextProver([32]byte([]byte(selector)), nil)
if err != nil {
return "", errors.Wrap(err, "leader for rank")

View File

@ -71,7 +71,7 @@ func (p *GlobalLeaderProvider) ProveNextState(
timer := prometheus.NewTimer(frameProvingDuration)
defer timer.ObserveDuration()
prior, err := p.engine.globalTimeReel.GetFrame(priorState)
prior, err := p.engine.clockStore.GetLatestGlobalClockFrame()
if err != nil {
return nil, errors.Wrap(err, "prove next state")
}
@ -81,6 +81,13 @@ func (p *GlobalLeaderProvider) ProveNextState(
return nil, errors.Wrap(errors.New("nil prior frame"), "prove next state")
}
if prior.Identity() != priorState {
return nil, errors.Wrap(
errors.New("missing prior frame"),
"prove next state",
)
}
// Get prover index
provers, err := p.engine.proverRegistry.GetActiveProvers(nil)
if err != nil {
@ -150,6 +157,7 @@ func (p *GlobalLeaderProvider) ProveNextState(
frameProvingTotal.WithLabelValues("error").Inc()
return nil, errors.Wrap(err, "prove next state")
}
newHeader.Rank = rank
// Convert collected messages to MessageBundles
requests := make(

View File

@ -80,10 +80,7 @@ func (p *GlobalSyncProvider) Synchronize(
}
if !hasFrame {
p.engine.logger.Info("initializing genesis")
genesis := p.engine.initializeGenesis()
dataCh <- &genesis
errCh <- nil
errCh <- errors.New("no frame")
return
}

View File

@ -163,6 +163,7 @@ func (f *ConsensusEngineFactory) CreateGlobalConsensusEngine(
f.inboxStore,
f.hypergraphStore,
f.shardsStore,
f.consensusStore,
f.workerStore,
f.encryptedChannel,
f.bulletproofProver,

View File

@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mr-tron/base58"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
globalintrinsics "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global/compat"
@ -54,7 +55,10 @@ func (e *GlobalConsensusEngine) getMainnetGenesisJSON() *GenesisJson {
}
// TODO[2.1.1+]: Refactor out direct hypergraph access
func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
func (e *GlobalConsensusEngine) initializeGenesis() (
*protobufs.GlobalFrame,
*protobufs.QuorumCertificate,
) {
e.logger.Info("initializing genesis frame for global consensus")
var genesisFrame *protobufs.GlobalFrame
@ -63,7 +67,7 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
if e.config.P2P.Network == 0 {
genesisData := e.getMainnetGenesisJSON()
if genesisData == nil {
return nil
return nil, nil
}
// Decode base64 encoded fields
@ -72,13 +76,13 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
)
if err != nil {
e.logger.Error("failed to decode parent selector", zap.Error(err))
return nil
return nil, nil
}
output, err := base64.StdEncoding.DecodeString(genesisData.Output)
if err != nil {
e.logger.Error("failed to decode output", zap.Error(err))
return nil
return nil, nil
}
// Create genesis header with actual data
@ -124,7 +128,7 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
zap.String("value", base64Value),
zap.Error(err),
)
return nil
return nil, nil
}
l1 := up2p.GetBloomFilterIndices(keyBytes, 256, 3)
@ -148,7 +152,7 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
zap.Error(err),
)
txn.Abort()
return nil
return nil, nil
}
}
}
@ -169,19 +173,19 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
err = e.establishMainnetGenesisProvers(state, genesisData)
if err != nil {
e.logger.Error("failed to establish provers", zap.Error(err))
return nil
return nil, nil
}
err = state.Commit()
if err != nil {
e.logger.Error("failed to commit", zap.Error(err))
return nil
return nil, nil
}
roots, err := e.hypergraph.Commit(0)
if err != nil {
e.logger.Error("could not commit", zap.Error(err))
return nil
return nil, nil
}
proverRoots := roots[tries.ShardKey{
@ -224,7 +228,7 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
"failed to place app shard",
zap.Error(err),
)
return nil
return nil, nil
}
l1 := up2p.GetBloomFilterIndices(token.QUIL_TOKEN_ADDRESS, 256, 3)
@ -240,7 +244,7 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
zap.Error(err),
)
txn.Abort()
return nil
return nil, nil
}
if err = txn.Commit(); err != nil {
e.logger.Error(
@ -248,7 +252,7 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
zap.Error(err),
)
txn.Abort()
return nil
return nil, nil
}
}
@ -260,18 +264,61 @@ func (e *GlobalConsensusEngine) initializeGenesis() *protobufs.GlobalFrame {
e.frameStoreMu.Unlock()
// Add to time reel
if err := e.globalTimeReel.Insert(e.ctx, genesisFrame); err != nil {
e.logger.Error("failed to add genesis frame to time reel", zap.Error(err))
// Clean up on error
e.frameStoreMu.Lock()
delete(e.frameStore, string(frameID))
e.frameStoreMu.Unlock()
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil {
txn.Abort()
e.logger.Error("could not add frame", zap.Error(err))
e.ctx.Throw(err)
return nil, nil
}
genesisQC := &protobufs.QuorumCertificate{
Rank: 0,
Filter: []byte{},
FrameNumber: genesisFrame.Header.FrameNumber,
Selector: []byte(genesisFrame.Identity()),
Timestamp: 0,
AggregateSignature: &protobufs.BLS48581AggregateSignature{},
}
if err := e.clockStore.PutQuorumCertificate(genesisQC, txn); err != nil {
txn.Abort()
e.logger.Error("could not add quorum certificate", zap.Error(err))
e.ctx.Throw(err)
return nil, nil
}
if err := txn.Commit(); err != nil {
txn.Abort()
e.logger.Error("could not add frame", zap.Error(err))
e.ctx.Throw(err)
return nil, nil
}
if err = e.consensusStore.PutLivenessState(
&models.LivenessState{
CurrentRank: 1,
LatestQuorumCertificate: genesisQC,
},
); err != nil {
e.logger.Error("could not add liveness state", zap.Error(err))
e.ctx.Throw(err)
return nil, nil
}
if err = e.consensusStore.PutConsensusState(
&models.ConsensusState[*protobufs.ProposalVote]{
FinalizedRank: 0,
LatestAcknowledgedRank: 0,
},
); err != nil {
e.logger.Error("could not add consensus state", zap.Error(err))
e.ctx.Throw(err)
return nil, nil
}
e.proverRegistry.Refresh()
e.logger.Info("initialized genesis frame for global consensus")
return genesisFrame
return genesisFrame, genesisQC
}
// createStubGenesis creates a stub genesis frame for non-mainnet networks
@ -468,12 +515,21 @@ func (e *GlobalConsensusEngine) createStubGenesis() *protobufs.GlobalFrame {
e.frameStoreMu.Unlock()
// Add to time reel
if err := e.globalTimeReel.Insert(e.ctx, genesisFrame); err != nil {
e.logger.Error("failed to add genesis frame to time reel", zap.Error(err))
// Clean up on error
e.frameStoreMu.Lock()
delete(e.frameStore, string(frameID))
e.frameStoreMu.Unlock()
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
if err := e.clockStore.PutGlobalClockFrame(genesisFrame, txn); err != nil {
txn.Abort()
e.logger.Error("could not add frame", zap.Error(err))
e.ctx.Throw(err)
return nil
}
if err := txn.Commit(); err != nil {
txn.Abort()
e.logger.Error("could not add frame", zap.Error(err))
e.ctx.Throw(err)
return nil
}
return genesisFrame

View File

@ -227,6 +227,7 @@ func NewGlobalConsensusEngine(
inboxStore store.InboxStore,
hypergraphStore store.HypergraphStore,
shardsStore store.ShardsStore,
consensusStore consensus.ConsensusStore[*protobufs.ProposalVote],
workerStore store.WorkerStore,
encryptedChannel channel.EncryptedChannel,
bulletproofProver crypto.BulletproofProver,
@ -245,6 +246,7 @@ func NewGlobalConsensusEngine(
keyStore: keyStore,
clockStore: clockStore,
shardsStore: shardsStore,
consensusStore: consensusStore,
frameProver: frameProver,
inclusionProver: inclusionProver,
signerRegistry: signerRegistry,
@ -421,11 +423,6 @@ func NewGlobalConsensusEngine(
}
engine.executionManager = executionManager
// Initialize execution engines
if err := engine.executionManager.InitializeEngines(); err != nil {
return nil, errors.Wrap(err, "new global consensus engine")
}
// Initialize metrics
engineState.Set(0) // EngineStateStopped
currentDifficulty.Set(float64(config.Engine.Difficulty))
@ -483,14 +480,34 @@ func NewGlobalConsensusEngine(
componentBuilder.AddWorker(engine.eventDistributor.Start)
componentBuilder.AddWorker(engine.globalTimeReel.Start)
frame, err := engine.clockStore.GetLatestGlobalClockFrame()
latest, err := engine.clockStore.GetLatestCertifiedGlobalState()
var state *models.CertifiedState[*protobufs.GlobalFrame]
if err != nil {
frame = engine.initializeGenesis()
}
var initialState **protobufs.GlobalFrame = nil
if frame != nil {
initialState = &frame
frame, qc := engine.initializeGenesis()
state = &models.CertifiedState[*protobufs.GlobalFrame]{
State: &models.State[*protobufs.GlobalFrame]{
Rank: 0,
Identifier: frame.Identity(),
State: &frame,
},
CertifyingQuorumCertificate: qc,
}
} else {
qc, err := engine.clockStore.GetLatestQuorumCertificate(nil)
if err != nil {
panic(err)
}
state = &models.CertifiedState[*protobufs.GlobalFrame]{
State: &models.State[*protobufs.GlobalFrame]{
Rank: latest.GetRank(),
Identifier: latest.State.Identity(),
ProposerID: qc.Source(),
ParentQuorumCertificate: latest.ParentQuorumCertificate,
Timestamp: latest.State.GetTimestamp(),
State: &latest.State,
},
CertifyingQuorumCertificate: qc,
}
}
engine.voteAggregator, err = voting.NewGlobalVoteAggregator[GlobalPeerID](
@ -499,7 +516,7 @@ func NewGlobalConsensusEngine(
voteAggregationDistributor,
engine.signatureAggregator,
engine.votingProvider,
(*initialState).GetRank(),
state.Rank(),
)
if err != nil {
return nil, err
@ -510,7 +527,7 @@ func NewGlobalConsensusEngine(
engine,
engine.signatureAggregator,
timeoutAggregationDistributor,
(*initialState).GetRank(),
state.Rank(),
)
if engine.config.P2P.Network == 99 || engine.config.Engine.ArchiveMode {
@ -518,7 +535,7 @@ func NewGlobalConsensusEngine(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
if err := engine.startConsensus(initialState, ctx, ready); err != nil {
if err := engine.startConsensus(state, ctx, ready); err != nil {
ctx.Throw(err)
return
}
@ -689,6 +706,7 @@ func NewGlobalConsensusEngine(
}
engine.ComponentManager = componentBuilder.Build()
return engine, nil
}
@ -793,11 +811,6 @@ func (e *GlobalConsensusEngine) getAddressFromPublicKey(
func (e *GlobalConsensusEngine) Stop(force bool) <-chan error {
errChan := make(chan error, 1)
// Cancel context
if e.cancel != nil {
e.cancel()
}
// Unsubscribe from pubsub
if e.config.Engine.ArchiveMode || e.config.P2P.Network == 99 {
e.pubsub.Unsubscribe(GLOBAL_CONSENSUS_BITMASK, false)
@ -824,7 +837,7 @@ func (e *GlobalConsensusEngine) Stop(force bool) <-chan error {
e.pubsub.UnregisterValidator(GLOBAL_ALERT_BITMASK)
select {
case <-e.ctx.Done():
case <-e.Done():
// Clean shutdown
case <-time.After(30 * time.Second):
if !force {
@ -2328,19 +2341,15 @@ func (e *GlobalConsensusEngine) DecideWorkerJoins(
}
func (e *GlobalConsensusEngine) startConsensus(
initialFrame **protobufs.GlobalFrame,
trustedRoot *models.CertifiedState[*protobufs.GlobalFrame],
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) error {
trustedRoot := &models.CertifiedState[*protobufs.GlobalFrame]{
State: &models.State[*protobufs.GlobalFrame]{
State: initialFrame,
},
}
notifier := pubsub.NewDistributor[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
]()
notifier.AddConsumer(e)
forks, err := forks.NewForks(trustedRoot, e, notifier)
if err != nil {
@ -2458,12 +2467,65 @@ func (e *GlobalConsensusEngine) OnOwnProposal(
],
targetPublicationTime time.Time,
) {
var priorTC *protobufs.TimeoutCertificate
if proposal.PreviousRankTimeoutCertificate != nil {
priorTC =
proposal.PreviousRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
}
pbProposal := &protobufs.GlobalProposal{
State: *proposal.State.State,
ParentQuorumCertificate: proposal.Proposal.State.ParentQuorumCertificate.(*protobufs.QuorumCertificate),
PriorRankTimeoutCertificate: priorTC,
Vote: *proposal.Vote,
}
data, err := pbProposal.ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize proposal", zap.Error(err))
return
}
e.consensusParticipant.SubmitProposal(proposal)
if err := e.pubsub.PublishToBitmask(
GLOBAL_CONSENSUS_BITMASK,
data,
); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
}
// OnOwnTimeout implements consensus.Consumer.
func (e *GlobalConsensusEngine) OnOwnTimeout(
timeout *models.TimeoutState[*protobufs.ProposalVote],
) {
var priorTC *protobufs.TimeoutCertificate
if timeout.PriorRankTimeoutCertificate != nil {
priorTC =
timeout.PriorRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
}
pbTimeout := &protobufs.TimeoutState{
LatestQuorumCertificate: timeout.LatestQuorumCertificate.(*protobufs.QuorumCertificate),
PriorRankTimeoutCertificate: priorTC,
Vote: *timeout.Vote,
TimeoutTick: timeout.TimeoutTick,
Timestamp: uint64(time.Now().UnixMilli()),
}
data, err := pbTimeout.ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize timeout", zap.Error(err))
return
}
e.timeoutAggregator.AddTimeout(timeout)
if err := e.pubsub.PublishToBitmask(
GLOBAL_CONSENSUS_BITMASK,
data,
); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
}
// OnOwnVote implements consensus.Consumer.
@ -2471,6 +2533,20 @@ func (e *GlobalConsensusEngine) OnOwnVote(
vote **protobufs.ProposalVote,
recipientID models.Identity,
) {
data, err := (*vote).ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize timeout", zap.Error(err))
return
}
e.voteAggregator.AddVote(vote)
if err := e.pubsub.PublishToBitmask(
GLOBAL_CONSENSUS_BITMASK,
data,
); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
}
// OnPartialTimeoutCertificate implements consensus.Consumer.

View File

@ -33,9 +33,9 @@ import (
"source.quilibrium.com/quilibrium/monorepo/bulletproofs"
"source.quilibrium.com/quilibrium/monorepo/channel"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/compiler"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/difficulty"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/events"
@ -61,16 +61,6 @@ import (
"source.quilibrium.com/quilibrium/monorepo/verenc"
)
type testTransitionListener struct {
onTransition func(from, to consensus.State, event consensus.Event)
}
func (l *testTransitionListener) OnTransition(from, to consensus.State, event consensus.Event) {
if l.onTransition != nil {
l.onTransition(from, to, event)
}
}
// mockIntegrationPubSub is a pubsub mock for integration testing
type mockIntegrationPubSub struct {
mock.Mock
@ -548,6 +538,7 @@ func createIntegrationTestGlobalConsensusEngineWithHypergraphAndKey(
nil, // inboxStore
nil, // hypergraphStore
store.NewPebbleShardsStore(pebbleDB, logger),
store.NewPebbleConsensusStore(pebbleDB, logger),
store.NewPebbleWorkerStore(pebbleDB, logger),
channel.NewDoubleRatchetEncryptedChannel(), // encryptedChannel
&bulletproofs.Decaf448BulletproofProver{}, // bulletproofProver
@ -595,8 +586,9 @@ func TestGlobalConsensusEngine_Integration_BasicFrameProgression(t *testing.T) {
}
// Start the engine
quit := make(chan struct{})
errChan := engine.Start(quit)
ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background())
err := engine.Start(ctx)
require.NoError(t, err)
// Check for startup errors
select {
@ -607,7 +599,7 @@ func TestGlobalConsensusEngine_Integration_BasicFrameProgression(t *testing.T) {
}
// Wait for state transitions
time.Sleep(2 * time.Second)
time.Sleep(20 * time.Second)
// Verify engine is in an active state
state := engine.GetState()
@ -626,7 +618,6 @@ func TestGlobalConsensusEngine_Integration_BasicFrameProgression(t *testing.T) {
t.Logf("Published %d frames", frameCount)
// Stop the engine
close(quit)
<-engine.Stop(false)
}
@ -641,19 +632,10 @@ func TestGlobalConsensusEngine_Integration_StateTransitions(t *testing.T) {
transitions := make([]string, 0)
var mu sync.Mutex
listener := &testTransitionListener{
onTransition: func(from, to consensus.State, event consensus.Event) {
mu.Lock()
transitions = append(transitions, fmt.Sprintf("%s->%s", from, to))
mu.Unlock()
t.Logf("State transition: %s -> %s (event: %s)", from, to, event)
},
}
// Start the engine
quit := make(chan struct{})
errChan := engine.Start(quit)
engine.stateMachine.AddListener(listener)
ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background())
err := engine.Start(ctx)
require.NoError(t, err)
// Check for startup errors
select {
@ -674,7 +656,6 @@ func TestGlobalConsensusEngine_Integration_StateTransitions(t *testing.T) {
assert.Greater(t, transitionCount, 0, "Expected at least one state transition")
// Stop the engine
close(quit)
<-engine.Stop(false)
}
@ -790,9 +771,9 @@ func TestGlobalConsensusEngine_Integration_MultiNodeConsensus(t *testing.T) {
mu.Lock()
defer mu.Unlock()
switch typePrefix {
case protobufs.GlobalFrameType:
case protobufs.GlobalProposalType:
proposalCount[nodeIdx]++
case protobufs.FrameVoteType:
case protobufs.ProposalVoteType:
voteCount[nodeIdx]++
case protobufs.ProverLivenessCheckType:
livenessCount[nodeIdx]++
@ -803,10 +784,10 @@ func TestGlobalConsensusEngine_Integration_MultiNodeConsensus(t *testing.T) {
}
// Start all engines
quits := make([]chan struct{}, 6)
for i := 0; i < 6; i++ {
quits[i] = make(chan struct{})
errChan := engines[i].Start(quits[i])
ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background())
err := engines[i].Start(ctx)
require.NoError(t, err)
// Check for startup errors
select {
@ -888,7 +869,6 @@ loop:
// Stop all engines
for i := 0; i < 6; i++ {
close(quits[i])
<-engines[i].Stop(false)
}
}
@ -947,10 +927,12 @@ func TestGlobalConsensusEngine_Integration_ShardCoverage(t *testing.T) {
engine.eventDistributor = eventDistributor
// Start the event distributor
engine.Start(make(chan struct{}))
ctx, _, _ := lifecycle.WithSignallerAndCancel(context.Background())
err := engine.Start(ctx)
require.NoError(t, err)
// Run shard coverage check
err := engine.checkShardCoverage(1)
err = engine.checkShardCoverage(1)
require.NoError(t, err)
// Wait for event processing and possible new app shard head
@ -972,7 +954,7 @@ func TestGlobalConsensusEngine_Integration_ShardCoverage(t *testing.T) {
require.False(t, newHeadAfter)
// Stop the event distributor
eventDistributor.Stop()
engine.Stop(false)
}
// TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying tests that engines
@ -1094,6 +1076,7 @@ func TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying(t *testing.
nil, // inboxStore
nil, // hypergraphStore
store.NewPebbleShardsStore(pebbleDB, logger),
store.NewPebbleConsensusStore(pebbleDB, logger),
store.NewPebbleWorkerStore(pebbleDB, logger),
channel.NewDoubleRatchetEncryptedChannel(),
&bulletproofs.Decaf448BulletproofProver{}, // bulletproofProver
@ -1120,7 +1103,9 @@ func TestGlobalConsensusEngine_Integration_NoProversStaysInVerifying(t *testing.
// Start all engines
for i := 0; i < numNodes; i++ {
errChan := engines[i].Start(quits[i])
ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background())
err := engines[i].Start(ctx)
require.NoError(t, err)
select {
case err := <-errChan:
require.NoError(t, err)
@ -1190,8 +1175,9 @@ func TestGlobalConsensusEngine_Integration_AlertStopsProgression(t *testing.T) {
}
// Start the engine
quit := make(chan struct{})
errChan := engine.Start(quit)
ctx, _, errChan := lifecycle.WithSignallerAndCancel(context.Background())
err := engine.Start(ctx)
require.NoError(t, err)
// Check for startup errors
select {
@ -1250,7 +1236,6 @@ func TestGlobalConsensusEngine_Integration_AlertStopsProgression(t *testing.T) {
require.Equal(t, 0, afterAlertCount)
// Stop the engine
close(quit)
<-engine.Stop(false)
}

View File

@ -12,11 +12,11 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
var keyRegistryDomain = []byte("KEY_REGISTRY")
@ -136,7 +136,7 @@ func (e *GlobalConsensusEngine) handleGlobalConsensusMessage(
typePrefix := e.peekMessageType(message)
switch typePrefix {
case protobufs.GlobalFrameType:
case protobufs.GlobalProposalType:
e.handleProposal(message)
case protobufs.ProposalVoteType:
@ -175,11 +175,11 @@ func (e *GlobalConsensusEngine) handleShardConsensusMessage(
case protobufs.AppShardFrameType:
e.handleShardProposal(message)
case protobufs.ProverLivenessCheckType:
e.handleShardLivenessCheck(message)
case protobufs.ProposalVoteType:
e.handleShardVote(message)
case protobufs.TimeoutStateType:
// e.handleShardTimeout(message)
}
}
@ -802,37 +802,35 @@ func (e *GlobalConsensusEngine) handleProposal(message *pb.Message) {
timer := prometheus.NewTimer(proposalProcessingDuration)
defer timer.ObserveDuration()
frame := &protobufs.GlobalFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal frame", zap.Error(err))
proposal := &protobufs.GlobalProposal{}
if err := proposal.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal proposal", zap.Error(err))
proposalProcessedTotal.WithLabelValues("error").Inc()
return
}
frameIDBI, _ := poseidon.HashBytes(frame.Header.Output)
frameIDBI, _ := poseidon.HashBytes(proposal.State.Header.Output)
frameID := frameIDBI.FillBytes(make([]byte, 32))
e.frameStoreMu.Lock()
e.frameStore[string(frameID)] = frame
e.frameStore[string(frameID)] = proposal.State
e.frameStoreMu.Unlock()
// For proposals, we need to identify the proposer differently
// The proposer's address should be determinable from the frame header
proposerAddress := e.getAddressFromPublicKey(
frame.Header.PublicKeySignatureBls48581.PublicKey.KeyValue,
)
if len(proposerAddress) > 0 {
clonedFrame := frame.Clone().(*protobufs.GlobalFrame)
if err := e.stateMachine.ReceiveProposal(
GlobalPeerID{
ID: proposerAddress,
e.consensusParticipant.SubmitProposal(
&models.SignedProposal[*protobufs.GlobalFrame, *protobufs.ProposalVote]{
Proposal: models.Proposal[*protobufs.GlobalFrame]{
State: &models.State[*protobufs.GlobalFrame]{
Rank: proposal.State.GetRank(),
Identifier: proposal.State.Identity(),
ProposerID: proposal.Vote.Identity(),
ParentQuorumCertificate: proposal.ParentQuorumCertificate,
Timestamp: proposal.State.GetTimestamp(),
State: &proposal.State,
},
PreviousRankTimeoutCertificate: proposal.PriorRankTimeoutCertificate,
},
&clonedFrame,
); err != nil {
e.logger.Error("could not receive proposal", zap.Error(err))
proposalProcessedTotal.WithLabelValues("error").Inc()
return
}
}
Vote: &proposal.Vote,
},
)
// Success metric recorded at the end of processing
proposalProcessedTotal.WithLabelValues("success").Inc()
@ -905,7 +903,7 @@ func (e *GlobalConsensusEngine) handleVote(message *pb.Message) {
e.getAddressFromPublicKey(
frame.Header.PublicKeySignatureBls48581.PublicKey.KeyValue,
),
vote.Proposer,
[]byte(vote.Identity()),
) {
proposalFrame = frame
break
@ -917,7 +915,7 @@ func (e *GlobalConsensusEngine) handleVote(message *pb.Message) {
e.logger.Warn(
"vote for unknown proposal",
zap.Uint64("frame_number", vote.FrameNumber),
zap.String("proposer", hex.EncodeToString(vote.Proposer)),
zap.String("proposer", hex.EncodeToString([]byte(vote.Identity()))),
)
voteProcessedTotal.WithLabelValues("error").Inc()
return
@ -948,16 +946,7 @@ func (e *GlobalConsensusEngine) handleVote(message *pb.Message) {
return
}
// Signature is valid, process the vote
if err := e.stateMachine.ReceiveVote(
GlobalPeerID{ID: vote.Proposer},
GlobalPeerID{ID: vote.PublicKeySignatureBls48581.Address},
&vote,
); err != nil {
e.logger.Error("could not receive vote", zap.Error(err))
voteProcessedTotal.WithLabelValues("error").Inc()
return
}
e.voteAggregator.AddVote(&vote)
voteProcessedTotal.WithLabelValues("success").Inc()
}
@ -966,56 +955,27 @@ func (e *GlobalConsensusEngine) handleTimeoutState(message *pb.Message) {
timer := prometheus.NewTimer(voteProcessingDuration)
defer timer.ObserveDuration()
state := &protobufs.TimeoutState{}
if err := state.FromCanonicalBytes(message.Data); err != nil {
timeoutState := &protobufs.TimeoutState{}
if err := timeoutState.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal timeout", zap.Error(err))
voteProcessedTotal.WithLabelValues("error").Inc()
return
}
// Validate the vote structure
if err := state.Validate(); err != nil {
if err := timeoutState.Validate(); err != nil {
e.logger.Debug("invalid timeout", zap.Error(err))
voteProcessedTotal.WithLabelValues("error").Inc()
return
}
// Validate the voter's signature
proverSet, err := e.proverRegistry.GetActiveProvers(nil)
if err != nil {
e.logger.Error("could not get active provers", zap.Error(err))
voteProcessedTotal.WithLabelValues("error").Inc()
return
}
// Find the voter's public key
var voterPublicKey []byte = nil
for _, prover := range proverSet {
if bytes.Equal(
prover.Address,
state.Vote.PublicKeySignatureBls48581.Address,
) {
voterPublicKey = prover.PublicKey
break
}
}
if voterPublicKey == nil {
e.logger.Warn(
"invalid vote - voter not found",
zap.String(
"voter",
hex.EncodeToString(
state.Vote.PublicKeySignatureBls48581.Address,
),
),
)
voteProcessedTotal.WithLabelValues("error").Inc()
return
}
// Signature is valid, process the vote
if err := e.timeoutCollectorDistributor.OnTimeoutProcessed(state)
e.timeoutAggregator.AddTimeout(&models.TimeoutState[*protobufs.ProposalVote]{
Rank: timeoutState.Vote.Rank,
LatestQuorumCertificate: timeoutState.LatestQuorumCertificate,
PriorRankTimeoutCertificate: timeoutState.PriorRankTimeoutCertificate,
Vote: &timeoutState.Vote,
TimeoutTick: timeoutState.TimeoutTick,
})
voteProcessedTotal.WithLabelValues("success").Inc()
}
@ -1087,139 +1047,11 @@ func (e *GlobalConsensusEngine) handleShardProposal(message *pb.Message) {
shardProposalProcessedTotal.WithLabelValues("success").Inc()
}
func (e *GlobalConsensusEngine) handleShardLivenessCheck(message *pb.Message) {
timer := prometheus.NewTimer(shardLivenessCheckProcessingDuration)
defer timer.ObserveDuration()
livenessCheck := &protobufs.ProverLivenessCheck{}
if err := livenessCheck.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal liveness check", zap.Error(err))
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
// Validate the liveness check structure
if err := livenessCheck.Validate(); err != nil {
e.logger.Debug("invalid liveness check", zap.Error(err))
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
proverSet, err := e.proverRegistry.GetActiveProvers(livenessCheck.Filter)
if err != nil {
e.logger.Error("could not receive liveness check", zap.Error(err))
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
var found []byte = nil
for _, prover := range proverSet {
if bytes.Equal(
prover.Address,
livenessCheck.PublicKeySignatureBls48581.Address,
) {
lcBytes, err := livenessCheck.ConstructSignaturePayload()
if err != nil {
e.logger.Error(
"could not construct signature message for liveness check",
zap.Error(err),
)
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
break
}
valid, err := e.keyManager.ValidateSignature(
crypto.KeyTypeBLS48581G1,
prover.PublicKey,
lcBytes,
livenessCheck.PublicKeySignatureBls48581.Signature,
livenessCheck.GetSignatureDomain(),
)
if err != nil || !valid {
e.logger.Error(
"could not validate signature for liveness check",
zap.Error(err),
)
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
break
}
found = prover.PublicKey
break
}
}
if found == nil {
e.logger.Warn(
"invalid liveness check",
zap.String(
"prover",
hex.EncodeToString(
livenessCheck.PublicKeySignatureBls48581.Address,
),
),
)
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
if len(livenessCheck.CommitmentHash) > 32 {
e.txLockMu.Lock()
if _, ok := e.txLockMap[livenessCheck.FrameNumber]; !ok {
e.txLockMap[livenessCheck.FrameNumber] = make(
map[string]map[string]*LockedTransaction,
)
}
_, ok := e.txLockMap[livenessCheck.FrameNumber][string(livenessCheck.Filter)]
if !ok {
e.txLockMap[livenessCheck.FrameNumber][string(livenessCheck.Filter)] =
make(map[string]*LockedTransaction)
}
filter := string(livenessCheck.Filter)
commits, err := tries.DeserializeNonLazyTree(
livenessCheck.CommitmentHash[32:],
)
if err != nil {
e.txLockMu.Unlock()
e.logger.Error("could not deserialize commitment trie", zap.Error(err))
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
leaves := tries.GetAllPreloadedLeaves(commits.Root)
for _, leaf := range leaves {
existing, ok := e.txLockMap[livenessCheck.FrameNumber][filter][string(leaf.Key)]
prover := []byte{}
if ok {
prover = existing.Prover
}
prover = append(
prover,
livenessCheck.PublicKeySignatureBls48581.Address...,
)
e.txLockMap[livenessCheck.FrameNumber][filter][string(leaf.Key)] =
&LockedTransaction{
TransactionHash: leaf.Key,
ShardAddresses: slices.Collect(slices.Chunk(leaf.Value, 64)),
Prover: prover,
Committed: false,
Filled: false,
}
}
e.txLockMu.Unlock()
}
shardLivenessCheckProcessedTotal.WithLabelValues("success").Inc()
}
func (e *GlobalConsensusEngine) handleShardVote(message *pb.Message) {
timer := prometheus.NewTimer(shardVoteProcessingDuration)
defer timer.ObserveDuration()
vote := &protobufs.FrameVote{}
vote := &protobufs.ProposalVote{}
if err := vote.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal vote", zap.Error(err))
shardVoteProcessedTotal.WithLabelValues("error").Inc()
@ -1274,7 +1106,7 @@ func (e *GlobalConsensusEngine) handleShardVote(message *pb.Message) {
}
e.appFrameStoreMu.Lock()
frameID := fmt.Sprintf("%x%d", vote.Proposer, vote.FrameNumber)
frameID := fmt.Sprintf("%x%d", vote.Identity(), vote.FrameNumber)
proposalFrame := e.appFrameStore[frameID]
e.appFrameStoreMu.Unlock()
@ -1300,7 +1132,7 @@ func (e *GlobalConsensusEngine) handleShardVote(message *pb.Message) {
voterPublicKey,
signatureData,
vote.PublicKeySignatureBls48581.Signature,
[]byte("global"),
[]byte("appshard"),
)
if err != nil || !valid {

View File

@ -30,33 +30,25 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage(
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
switch typePrefix {
case protobufs.GlobalFrameType:
case protobufs.GlobalProposalType:
start := time.Now()
defer func() {
proposalValidationDuration.Observe(time.Since(start).Seconds())
}()
frame := &protobufs.GlobalFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
proposal := &protobufs.GlobalProposal{}
if err := proposal.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal frame", zap.Error(err))
proposalValidationTotal.WithLabelValues("reject").Inc()
return tp2p.ValidationResultReject
}
if frametime.GlobalFrameSince(frame) > 20*time.Second {
if frametime.GlobalFrameSince(proposal.State) > 20*time.Second {
proposalValidationTotal.WithLabelValues("reject").Inc()
return tp2p.ValidationResultIgnore
}
if frame.Header.PublicKeySignatureBls48581 == nil ||
frame.Header.PublicKeySignatureBls48581.PublicKey == nil ||
frame.Header.PublicKeySignatureBls48581.PublicKey.KeyValue == nil {
e.logger.Debug("global frame validation missing signature")
proposalValidationTotal.WithLabelValues("reject").Inc()
return tp2p.ValidationResultReject
}
valid, err := e.frameValidator.Validate(frame)
valid, err := e.frameValidator.Validate(proposal.State)
if err != nil {
e.logger.Debug("global frame validation error", zap.Error(err))
proposalValidationTotal.WithLabelValues("reject").Inc()
@ -104,7 +96,7 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage(
timeoutStateValidationDuration.Observe(time.Since(start).Seconds())
}()
timeoutState := &protobufs.QuorumCertificate{}
timeoutState := &protobufs.TimeoutState{}
if err := timeoutState.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal timeoutState", zap.Error(err))
timeoutStateValidationTotal.WithLabelValues("reject").Inc()

View File

@ -418,6 +418,7 @@ func createTestGlobalConsensusEngine(t *testing.T) (
nil,
nil,
nil,
nil,
&mockEncryptedChannel{},
&mocks.MockBulletproofProver{},
&mocks.MockVerifiableEncryptor{},

View File

@ -18,6 +18,7 @@ import (
health "google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/registration"
"source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
@ -276,7 +277,7 @@ func TestOnionGRPC_RealRelayAndKeys(t *testing.T) {
logger := zap.NewNop()
// 1) Spin up a real gRPC health server (ephemeral port)
lis, err := net.Listen("tcp", "127.0.0.1:8080")
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
s := grpc.NewServer()
hs := health.NewServer()
@ -345,8 +346,10 @@ func TestOnionGRPC_RealRelayAndKeys(t *testing.T) {
// 6) PeerInfoManager ordering (entry->middle->exit)
pm := p2p.NewInMemoryPeerInfoManager(logger)
pm.Start()
defer pm.Stop()
ctx, cancel, _ := lifecycle.WithSignallerAndCancel(context.Background())
go pm.Start(ctx, func() {})
time.Sleep(100 * time.Millisecond)
defer cancel()
pm.AddPeerInfo(&protobufs.PeerInfo{PeerId: []byte("relay1"), Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
pm.AddPeerInfo(&protobufs.PeerInfo{PeerId: []byte("relay2"), Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
pm.AddPeerInfo(&protobufs.PeerInfo{PeerId: []byte("relay3"), Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
@ -372,9 +375,9 @@ func TestOnionGRPC_RealRelayAndKeys(t *testing.T) {
)
// 8) Build a 3-hop circuit
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
circ, err := or.BuildCircuit(ctx, 3)
hctx, hcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer hcancel()
circ, err := or.BuildCircuit(hctx, 3)
require.NoError(t, err)
// 9) gRPC dial through onion using MULTIADDR as "addr" (relay expects MA bytes in BEGIN)
@ -458,15 +461,16 @@ func TestHiddenService_RemoteRendezvous(t *testing.T) {
// Peer managers (client knows R, service knows A then R)
pmClient := p2p.NewInMemoryPeerInfoManager(logger)
pmClient.Start()
defer pmClient.Stop()
ctx, cancel, _ := lifecycle.WithSignallerAndCancel(context.Background())
go pmClient.Start(ctx, func() {})
defer cancel()
// client knows three rendezvous relays
for _, id := range [][]byte{[]byte("relayR1"), []byte("relayR2"), []byte("relayR3")} {
pmClient.AddPeerInfo(&protobufs.PeerInfo{PeerId: id, Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
}
pmService := p2p.NewInMemoryPeerInfoManager(logger)
pmService.Start()
defer pmService.Stop()
go pmService.Start(ctx, func() {})
// service knows three intro relays
for _, id := range [][]byte{[]byte("relayA1"), []byte("relayA2"), []byte("relayA3")} {
pmService.AddPeerInfo(&protobufs.PeerInfo{PeerId: id, Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
@ -487,17 +491,17 @@ func TestHiddenService_RemoteRendezvous(t *testing.T) {
onion.WithKeyConstructor(func() ([]byte, []byte, error) { k := keys.NewX448Key(); return k.Public(), k.Private(), nil }),
onion.WithSharedSecret(func(priv, pub []byte) ([]byte, error) { e, _ := keys.X448KeyFromBytes(priv); return e.AgreeWith(pub) }),
)
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
hctx, hcancel := context.WithTimeout(context.Background(), 6*time.Second)
defer hcancel()
var serviceID [32]byte
copy(serviceID[:], []byte("service-id-32-bytes-------------")[:32])
_, err = orService.RegisterIntro(ctx, []byte("relayA1"), serviceID)
_, err = orService.RegisterIntro(hctx, []byte("relayA1"), serviceID)
require.NoError(t, err)
// CLIENT: build circuit to rendezvous relay and send REND1
cR, err := orClient.BuildCircuitToExit(ctx, 3, []byte("relayR1"))
cR, err := orClient.BuildCircuitToExit(hctx, 3, []byte("relayR1"))
require.NoError(t, err)
var cookie [16]byte
_, _ = rand.Read(cookie[:])
@ -506,8 +510,7 @@ func TestHiddenService_RemoteRendezvous(t *testing.T) {
// CLIENT: build circuit to intro relay and send INTRODUCE(serviceID, "relayR", cookie, clientSid)
pmIntro := p2p.NewInMemoryPeerInfoManager(logger)
pmIntro.Start()
defer pmIntro.Stop()
go pmIntro.Start(ctx, func() {})
pmIntro.AddPeerInfo(&protobufs.PeerInfo{PeerId: []byte("relayA1"), Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
pmIntro.AddPeerInfo(&protobufs.PeerInfo{PeerId: []byte("relayA2"), Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
pmIntro.AddPeerInfo(&protobufs.PeerInfo{PeerId: []byte("relayA3"), Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
@ -518,7 +521,7 @@ func TestHiddenService_RemoteRendezvous(t *testing.T) {
onion.WithSharedSecret(func(priv, pub []byte) ([]byte, error) { e, _ := keys.X448KeyFromBytes(priv); return e.AgreeWith(pub) }),
)
cI, err := orIntro.BuildCircuit(ctx, 3)
cI, err := orIntro.BuildCircuit(hctx, 3)
require.NoError(t, err)
require.NoError(t, orClient.ClientIntroduce(cI, serviceID, "relayR1", cookie, clientSid))
@ -527,7 +530,7 @@ func TestHiddenService_RemoteRendezvous(t *testing.T) {
pmService.AddPeerInfo(&protobufs.PeerInfo{PeerId: []byte("relayR2"), Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
pmService.AddPeerInfo(&protobufs.PeerInfo{PeerId: []byte("relayR3"), Capabilities: []*protobufs.Capability{{ProtocolIdentifier: onion.ProtocolRouting}}})
time.Sleep(150 * time.Millisecond)
cRS, err := orService.BuildCircuitToExit(ctx, 3, []byte("relayR1"))
cRS, err := orService.BuildCircuitToExit(hctx, 3, []byte("relayR1"))
require.NoError(t, err)
serviceSid := uint16(0xD777)
require.NoError(t, orService.ServiceCompleteRendezvous(cRS, cookie, serviceSid))

View File

@ -36,6 +36,7 @@ func (m *InMemoryPeerInfoManager) Start(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
m.ctx = ctx
ready()
for {
select {
@ -74,7 +75,7 @@ func (m *InMemoryPeerInfoManager) Start(
LastSeen: seen,
})
m.peerInfoMx.Unlock()
case <-m.ctx.Done():
case <-ctx.Done():
return
}
}

File diff suppressed because it is too large Load Diff

353
node/store/consensus.go Normal file
View File

@ -0,0 +1,353 @@
package store
import (
"bytes"
"encoding/binary"
"slices"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
type PebbleConsensusStore struct {
db store.KVDB
logger *zap.Logger
}
var _ consensus.ConsensusStore[*protobufs.ProposalVote] = (*PebbleConsensusStore)(nil)
func NewPebbleConsensusStore(
db store.KVDB,
logger *zap.Logger,
) *PebbleConsensusStore {
return &PebbleConsensusStore{
db,
logger,
}
}
// GetConsensusState implements consensus.ConsensusStore.
func (p *PebbleConsensusStore) GetConsensusState(filter []byte) (
*models.ConsensusState[*protobufs.ProposalVote],
error,
) {
value, closer, err := p.db.Get(
slices.Concat([]byte{CONSENSUS, CONSENSUS_STATE}, filter),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get consensus state")
}
defer closer.Close()
c := slices.Clone(value)
if len(c) < 24 {
return nil, errors.Wrap(errors.New("invalid data"), "get consensus state")
}
state := &models.ConsensusState[*protobufs.ProposalVote]{}
buf := bytes.NewBuffer(c)
var filterLen uint32
if err := binary.Read(buf, binary.BigEndian, &filterLen); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
if filterLen > 0 {
filterBytes := make([]byte, filterLen)
if _, err := buf.Read(filterBytes); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
state.Filter = filterBytes
}
if err := binary.Read(
buf,
binary.BigEndian,
&state.FinalizedRank,
); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
if err := binary.Read(
buf,
binary.BigEndian,
&state.LatestAcknowledgedRank,
); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
var latestTimeoutLen uint32
if err := binary.Read(buf, binary.BigEndian, &latestTimeoutLen); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
if latestTimeoutLen > 0 {
latestTimeoutBytes := make([]byte, latestTimeoutLen)
if _, err := buf.Read(latestTimeoutBytes); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
lt := &protobufs.TimeoutState{}
if err := lt.FromCanonicalBytes(latestTimeoutBytes); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
state.LatestTimeout = &models.TimeoutState[*protobufs.ProposalVote]{
Rank: lt.Vote.Rank,
LatestQuorumCertificate: lt.LatestQuorumCertificate,
PriorRankTimeoutCertificate: lt.PriorRankTimeoutCertificate,
Vote: &lt.Vote,
TimeoutTick: lt.TimeoutTick,
}
}
return state, nil
}
// GetLivenessState implements consensus.ConsensusStore.
func (p *PebbleConsensusStore) GetLivenessState(filter []byte) (
*models.LivenessState,
error,
) {
value, closer, err := p.db.Get(
slices.Concat([]byte{CONSENSUS, CONSENSUS_LIVENESS}, filter),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get liveness state")
}
defer closer.Close()
c := slices.Clone(value)
if len(c) < 20 {
return nil, errors.Wrap(errors.New("invalid data"), "get liveness state")
}
state := &models.LivenessState{}
buf := bytes.NewBuffer(c)
var filterLen uint32
if err := binary.Read(buf, binary.BigEndian, &filterLen); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
if filterLen > 0 {
filterBytes := make([]byte, filterLen)
if _, err := buf.Read(filterBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
state.Filter = filterBytes
}
if err := binary.Read(
buf,
binary.BigEndian,
&state.CurrentRank,
); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
var latestQCLen uint32
if err := binary.Read(buf, binary.BigEndian, &latestQCLen); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
if latestQCLen > 0 {
latestQCBytes := make([]byte, latestQCLen)
if _, err := buf.Read(latestQCBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
lt := &protobufs.QuorumCertificate{}
if err := lt.FromCanonicalBytes(latestQCBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
state.LatestQuorumCertificate = lt
}
var priorTCLen uint32
if err := binary.Read(buf, binary.BigEndian, &priorTCLen); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
if priorTCLen > 0 {
priorTCBytes := make([]byte, priorTCLen)
if _, err := buf.Read(priorTCBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
lt := &protobufs.TimeoutCertificate{}
if err := lt.FromCanonicalBytes(priorTCBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
state.PriorRankTimeoutCertificate = lt
}
return state, nil
}
// PutConsensusState implements consensus.ConsensusStore.
func (p *PebbleConsensusStore) PutConsensusState(
state *models.ConsensusState[*protobufs.ProposalVote],
) error {
buf := new(bytes.Buffer)
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(state.Filter)),
); err != nil {
return errors.Wrap(err, "put consensus state")
}
if _, err := buf.Write(state.Filter); err != nil {
return errors.Wrap(err, "put consensus state")
}
if err := binary.Write(
buf,
binary.BigEndian,
state.FinalizedRank,
); err != nil {
return errors.Wrap(err, "put consensus state")
}
if err := binary.Write(
buf,
binary.BigEndian,
state.LatestAcknowledgedRank,
); err != nil {
return errors.Wrap(err, "put consensus state")
}
if state.LatestTimeout == nil {
if err := binary.Write(
buf,
binary.BigEndian,
uint32(0),
); err != nil {
return errors.Wrap(err, "put consensus state")
}
} else {
var priorTC *protobufs.TimeoutCertificate
if state.LatestTimeout.PriorRankTimeoutCertificate != nil {
priorTC = state.LatestTimeout.PriorRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
}
lt := &protobufs.TimeoutState{
LatestQuorumCertificate: state.LatestTimeout.LatestQuorumCertificate.(*protobufs.QuorumCertificate),
PriorRankTimeoutCertificate: priorTC,
Vote: *state.LatestTimeout.Vote,
TimeoutTick: state.LatestTimeout.TimeoutTick,
}
timeoutBytes, err := lt.ToCanonicalBytes()
if err != nil {
return errors.Wrap(err, "put consensus state")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(timeoutBytes)),
); err != nil {
return errors.Wrap(err, "put consensus state")
}
if _, err := buf.Write(timeoutBytes); err != nil {
return errors.Wrap(err, "put consensus state")
}
}
return errors.Wrap(
p.db.Set(
slices.Concat([]byte{CONSENSUS, CONSENSUS_STATE}, state.Filter),
buf.Bytes(),
),
"put consensus state",
)
}
// PutLivenessState implements consensus.ConsensusStore.
func (p *PebbleConsensusStore) PutLivenessState(
state *models.LivenessState,
) error {
buf := new(bytes.Buffer)
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(state.Filter)),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
if _, err := buf.Write(state.Filter); err != nil {
return errors.Wrap(err, "put liveness state")
}
if err := binary.Write(
buf,
binary.BigEndian,
state.CurrentRank,
); err != nil {
return errors.Wrap(err, "put liveness state")
}
if state.LatestQuorumCertificate == nil {
if err := binary.Write(
buf,
binary.BigEndian,
uint32(0),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
} else {
qc := state.LatestQuorumCertificate.(*protobufs.QuorumCertificate)
qcBytes, err := qc.ToCanonicalBytes()
if err != nil {
return errors.Wrap(err, "put liveness state")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(qcBytes)),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
if _, err := buf.Write(qcBytes); err != nil {
return errors.Wrap(err, "put liveness state")
}
}
if state.PriorRankTimeoutCertificate == nil {
if err := binary.Write(
buf,
binary.BigEndian,
uint32(0),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
} else {
tc := state.PriorRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
timeoutBytes, err := tc.ToCanonicalBytes()
if err != nil {
return errors.Wrap(err, "put liveness state")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(timeoutBytes)),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
if _, err := buf.Write(timeoutBytes); err != nil {
return errors.Wrap(err, "put liveness state")
}
}
return errors.Wrap(
p.db.Set(
slices.Concat([]byte{CONSENSUS, CONSENSUS_LIVENESS}, state.Filter),
buf.Bytes(),
),
"put liveness state",
)
}

View File

@ -14,27 +14,55 @@ const (
HYPERGRAPH_SHARD = 0x09
SHARD = 0x0A
INBOX = 0x0B
CONSENSUS = 0x0C
MIGRATION = 0xF0
WORKER = 0xFF
)
// Clock store indexes:
const (
CLOCK_GLOBAL_FRAME = 0x00
CLOCK_SHARD_FRAME_SHARD = 0x01
CLOCK_SHARD_FRAME_CANDIDATE_SHARD = 0x02
CLOCK_SHARD_FRAME_FRECENCY_SHARD = 0x03
CLOCK_SHARD_FRAME_DISTANCE_SHARD = 0x04
CLOCK_COMPACTION_SHARD = 0x05
CLOCK_SHARD_FRAME_SENIORITY_SHARD = 0x06
CLOCK_SHARD_FRAME_STATE_TREE = 0x07
CLOCK_GLOBAL_FRAME_REQUEST = 0x08
CLOCK_GLOBAL_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_GLOBAL_FRAME
CLOCK_GLOBAL_FRAME_INDEX_LATEST = 0x20 | CLOCK_GLOBAL_FRAME
CLOCK_GLOBAL_FRAME_INDEX_PARENT = 0x30 | CLOCK_GLOBAL_FRAME
CLOCK_SHARD_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_SHARD_FRAME_SHARD
CLOCK_SHARD_FRAME_INDEX_LATEST = 0x20 | CLOCK_SHARD_FRAME_SHARD
CLOCK_SHARD_FRAME_INDEX_PARENT = 0x30 | CLOCK_SHARD_FRAME_SHARD
CLOCK_GLOBAL_FRAME = 0x00
CLOCK_SHARD_FRAME_SHARD = 0x01
CLOCK_SHARD_FRAME_CANDIDATE_SHARD = 0x02
CLOCK_SHARD_FRAME_FRECENCY_SHARD = 0x03
CLOCK_SHARD_FRAME_DISTANCE_SHARD = 0x04
CLOCK_COMPACTION_SHARD = 0x05
CLOCK_SHARD_FRAME_SENIORITY_SHARD = 0x06
CLOCK_SHARD_FRAME_STATE_TREE = 0x07
CLOCK_GLOBAL_FRAME_REQUEST = 0x08
CLOCK_GLOBAL_CERTIFIED_STATE = 0x09
CLOCK_SHARD_CERTIFIED_STATE = 0x0A
CLOCK_QUORUM_CERTIFICATE = 0x0B
CLOCK_TIMEOUT_CERTIFICATE = 0x0C
CLOCK_GLOBAL_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_GLOBAL_FRAME
CLOCK_GLOBAL_FRAME_INDEX_LATEST = 0x20 | CLOCK_GLOBAL_FRAME
CLOCK_GLOBAL_FRAME_INDEX_PARENT = 0x30 | CLOCK_GLOBAL_FRAME
CLOCK_SHARD_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_SHARD_FRAME_SHARD
CLOCK_SHARD_FRAME_INDEX_LATEST = 0x20 | CLOCK_SHARD_FRAME_SHARD
CLOCK_SHARD_FRAME_INDEX_PARENT = 0x30 | CLOCK_SHARD_FRAME_SHARD
CLOCK_GLOBAL_CERTIFIED_STATE_INDEX_EARLIEST = 0x10 |
CLOCK_GLOBAL_CERTIFIED_STATE
CLOCK_GLOBAL_CERTIFIED_STATE_INDEX_LATEST = 0x20 |
CLOCK_GLOBAL_CERTIFIED_STATE
CLOCK_SHARD_CERTIFIED_STATE_INDEX_EARLIEST = 0x10 |
CLOCK_SHARD_CERTIFIED_STATE
CLOCK_SHARD_CERTIFIED_STATE_INDEX_LATEST = 0x20 |
CLOCK_SHARD_CERTIFIED_STATE
CLOCK_QUORUM_CERTIFICATE_INDEX_EARLIEST = 0x10 |
CLOCK_QUORUM_CERTIFICATE
CLOCK_QUORUM_CERTIFICATE_INDEX_LATEST = 0x20 |
CLOCK_QUORUM_CERTIFICATE
CLOCK_TIMEOUT_CERTIFICATE_INDEX_EARLIEST = 0x10 |
CLOCK_TIMEOUT_CERTIFICATE
CLOCK_TIMEOUT_CERTIFICATE_INDEX_LATEST = 0x20 |
CLOCK_TIMEOUT_CERTIFICATE
CLOCK_SHARD_FRAME_CANDIDATE_INDEX_LATEST = 0x20 |
CLOCK_SHARD_FRAME_CANDIDATE_SHARD
)
@ -132,3 +160,9 @@ const (
WORKER_BY_CORE = 0x00
WORKER_BY_FILTER = 0x01
)
// Consensus store indexes:
const (
CONSENSUS_STATE = 0x00
CONSENSUS_LIVENESS = 0x01
)

View File

@ -60,6 +60,8 @@ const (
PathType uint32 = 0x0314
TraversalSubProofType uint32 = 0x0315
TraversalProofType uint32 = 0x0316
GlobalProposalType uint32 = 0x0317
AppShardProposalType uint32 = 0x0318
TimeoutStateType uint32 = 0x031C
TimeoutCertificateType uint32 = 0x031D

View File

@ -142,7 +142,14 @@ func (g *GlobalFrame) Identity() models.Identity {
// Source implements models.Unique.
func (g *GlobalFrame) Source() models.Identity {
return g.Header.PublicKeySignatureBls48581.Identity()
id, err := poseidon.HashBytes(
g.Header.PublicKeySignatureBls48581.PublicKey.KeyValue,
)
if err != nil {
return ""
}
return models.Identity(id.FillBytes(make([]byte, 32)))
}
func (a *AppShardFrame) Clone() models.Unique {
@ -176,7 +183,391 @@ func (a *AppShardFrame) Identity() models.Identity {
// Source implements models.Unique.
func (a *AppShardFrame) Source() models.Identity {
return a.Header.PublicKeySignatureBls48581.Identity()
return models.Identity(a.Header.Prover)
}
func (s *AppShardProposal) GetRank() uint64 {
rank := uint64(0)
if s.State != nil && s.State.GetRank() > rank {
rank = s.State.GetRank()
}
if s.ParentQuorumCertificate != nil &&
s.ParentQuorumCertificate.GetRank() > rank {
rank = s.ParentQuorumCertificate.GetRank()
}
if s.PriorRankTimeoutCertificate != nil &&
s.PriorRankTimeoutCertificate.GetRank() > rank {
rank = s.PriorRankTimeoutCertificate.GetRank()
}
return rank
}
func (s *AppShardProposal) ToCanonicalBytes() ([]byte, error) {
buf := new(bytes.Buffer)
// Write type prefix
if err := binary.Write(
buf,
binary.BigEndian,
AppShardProposalType,
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write state
stateBytes, err := s.State.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(stateBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(stateBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write parent_quorum_certificate
parentQCBytes, err := s.ParentQuorumCertificate.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(parentQCBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(parentQCBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write prior_rank_timeout_certificate
if s.PriorRankTimeoutCertificate == nil {
if err := binary.Write(
buf,
binary.BigEndian,
uint32(0),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
} else {
priorTCBytes, err := s.PriorRankTimeoutCertificate.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(priorTCBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(priorTCBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
}
// Write vote
voteBytes, err := s.Vote.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(voteBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(voteBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
return buf.Bytes(), nil
}
func (s *AppShardProposal) FromCanonicalBytes(data []byte) error {
buf := bytes.NewBuffer(data)
// Read and verify type prefix
var typePrefix uint32
if err := binary.Read(buf, binary.BigEndian, &typePrefix); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if typePrefix != AppShardProposalType {
return errors.Wrap(
errors.New("invalid type prefix"),
"from canonical bytes",
)
}
// Read state
var stateLen uint32
if err := binary.Read(buf, binary.BigEndian, &stateLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
stateBytes := make([]byte, stateLen)
if _, err := buf.Read(stateBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
s.State = &AppShardFrame{}
if err := s.State.FromCanonicalBytes(stateBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read parent_quorum_certificate
var parentQCLen uint32
if err := binary.Read(buf, binary.BigEndian, &parentQCLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
parentQCBytes := make([]byte, parentQCLen)
if _, err := buf.Read(parentQCBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
s.ParentQuorumCertificate = &QuorumCertificate{}
if err := s.ParentQuorumCertificate.FromCanonicalBytes(
parentQCBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read prior_rank_timeout_certificate
var priorRankTCLen uint32
if err := binary.Read(buf, binary.BigEndian, &priorRankTCLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if priorRankTCLen != 0 {
priorRankTCBytes := make([]byte, priorRankTCLen)
if _, err := buf.Read(priorRankTCBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
s.PriorRankTimeoutCertificate = &TimeoutCertificate{}
if err := s.PriorRankTimeoutCertificate.FromCanonicalBytes(
priorRankTCBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
}
// Read vote
var voteLen uint32
if err := binary.Read(buf, binary.BigEndian, &voteLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
voteBytes := make([]byte, voteLen)
if _, err := buf.Read(voteBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
s.Vote = &ProposalVote{}
if err := s.Vote.FromCanonicalBytes(
voteBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
return nil
}
func (s *GlobalProposal) GetRank() uint64 {
rank := uint64(0)
if s.State != nil && s.State.GetRank() > rank {
rank = s.State.GetRank()
}
if s.ParentQuorumCertificate != nil &&
s.ParentQuorumCertificate.GetRank() > rank {
rank = s.ParentQuorumCertificate.GetRank()
}
if s.PriorRankTimeoutCertificate != nil &&
s.PriorRankTimeoutCertificate.GetRank() > rank {
rank = s.PriorRankTimeoutCertificate.GetRank()
}
return rank
}
func (s *GlobalProposal) ToCanonicalBytes() ([]byte, error) {
buf := new(bytes.Buffer)
// Write type prefix
if err := binary.Write(
buf,
binary.BigEndian,
GlobalProposalType,
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write state
stateBytes, err := s.State.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(stateBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(stateBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write parent_quorum_certificate
parentQCBytes, err := s.ParentQuorumCertificate.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(parentQCBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(parentQCBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write prior_rank_timeout_certificate
if s.PriorRankTimeoutCertificate == nil {
if err := binary.Write(
buf,
binary.BigEndian,
uint32(0),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
} else {
priorTCBytes, err := s.PriorRankTimeoutCertificate.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(priorTCBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(priorTCBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
}
// Write vote
voteBytes, err := s.Vote.ToCanonicalBytes()
if err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(voteBytes)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(voteBytes); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
return buf.Bytes(), nil
}
func (s *GlobalProposal) FromCanonicalBytes(data []byte) error {
buf := bytes.NewBuffer(data)
// Read and verify type prefix
var typePrefix uint32
if err := binary.Read(buf, binary.BigEndian, &typePrefix); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if typePrefix != GlobalProposalType {
return errors.Wrap(
errors.New("invalid type prefix"),
"from canonical bytes",
)
}
// Read state
var stateLen uint32
if err := binary.Read(buf, binary.BigEndian, &stateLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
stateBytes := make([]byte, stateLen)
if _, err := buf.Read(stateBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
s.State = &GlobalFrame{}
if err := s.State.FromCanonicalBytes(stateBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read parent_quorum_certificate
var parentQCLen uint32
if err := binary.Read(buf, binary.BigEndian, &parentQCLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
parentQCBytes := make([]byte, parentQCLen)
if _, err := buf.Read(parentQCBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
s.ParentQuorumCertificate = &QuorumCertificate{}
if err := s.ParentQuorumCertificate.FromCanonicalBytes(
parentQCBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read prior_rank_timeout_certificate
var priorRankTCLen uint32
if err := binary.Read(buf, binary.BigEndian, &priorRankTCLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
if priorRankTCLen != 0 {
priorRankTCBytes := make([]byte, priorRankTCLen)
if _, err := buf.Read(priorRankTCBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
s.PriorRankTimeoutCertificate = &TimeoutCertificate{}
if err := s.PriorRankTimeoutCertificate.FromCanonicalBytes(
priorRankTCBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
}
// Read vote
var voteLen uint32
if err := binary.Read(buf, binary.BigEndian, &voteLen); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
voteBytes := make([]byte, voteLen)
if _, err := buf.Read(voteBytes); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
s.Vote = &ProposalVote{}
if err := s.Vote.FromCanonicalBytes(
voteBytes,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
return nil
}
func (s *SeniorityMerge) ToCanonicalBytes() ([]byte, error) {
@ -4398,11 +4789,103 @@ func (f *ProposalVote) Validate() error {
return nil
}
var _ ValidatableMessage = (*AppShardProposal)(nil)
func (f *AppShardProposal) Validate() error {
if f == nil {
return errors.Wrap(errors.New("nil proposal"), "validate")
}
if f.State == nil {
return errors.Wrap(
errors.New("missing state"),
"validate",
)
}
if err := f.State.Validate(); err != nil {
return err
}
if f.ParentQuorumCertificate == nil {
return errors.Wrap(
errors.New("missing parent quorum certificate"),
"validate",
)
}
if err := f.ParentQuorumCertificate.Validate(); err != nil {
return err
}
if f.PriorRankTimeoutCertificate != nil {
if err := f.PriorRankTimeoutCertificate.Validate(); err != nil {
return err
}
}
if f.Vote == nil {
return errors.Wrap(errors.New("missing vote"), "validate")
}
if err := f.Vote.Validate(); err != nil {
return err
}
return nil
}
var _ ValidatableMessage = (*GlobalProposal)(nil)
func (f *GlobalProposal) Validate() error {
if f == nil {
return errors.Wrap(errors.New("nil proposal"), "validate")
}
if f.State == nil {
return errors.Wrap(
errors.New("missing state"),
"validate",
)
}
if err := f.State.Validate(); err != nil {
return err
}
if f.ParentQuorumCertificate == nil {
return errors.Wrap(
errors.New("missing parent quorum certificate"),
"validate",
)
}
if err := f.ParentQuorumCertificate.Validate(); err != nil {
return err
}
if f.PriorRankTimeoutCertificate != nil {
if err := f.PriorRankTimeoutCertificate.Validate(); err != nil {
return err
}
}
if f.Vote == nil {
return errors.Wrap(errors.New("missing vote"), "validate")
}
if err := f.Vote.Validate(); err != nil {
return err
}
return nil
}
var _ ValidatableMessage = (*TimeoutState)(nil)
func (f *TimeoutState) Validate() error {
if f == nil {
return errors.Wrap(errors.New("nil vote"), "validate")
return errors.Wrap(errors.New("nil timeout state"), "validate")
}
if f.LatestQuorumCertificate != nil {

File diff suppressed because it is too large Load Diff

View File

@ -215,6 +215,28 @@ message ProverLivenessCheck {
quilibrium.node.keys.pb.BLS48581AddressedSignature public_key_signature_bls48581 = 6;
}
message AppShardProposal {
// The associated state for the proposal
AppShardFrame state = 1;
// The parent quorum certificate to this state
QuorumCertificate parent_quorum_certificate = 2;
// The previous rank's timeout certificate, if applicable
TimeoutCertificate prior_rank_timeout_certificate = 3;
// The proposer's vote
ProposalVote vote = 4;
}
message GlobalProposal {
// The associated state for the proposal
GlobalFrame state = 1;
// The parent quorum certificate to this state
QuorumCertificate parent_quorum_certificate = 2;
// The previous rank's timeout certificate, if applicable
TimeoutCertificate prior_rank_timeout_certificate = 3;
// The proposer's vote
ProposalVote vote = 4;
}
message ProposalVote {
// The filter for the prover's commitment in the trie
bytes filter = 1;

View File

@ -16,6 +16,242 @@ type MockClockStore struct {
mock.Mock
}
// GetCertifiedAppShardState implements store.ClockStore.
func (m *MockClockStore) GetCertifiedAppShardState(
filter []byte,
rank uint64,
) (*protobufs.AppShardProposal, error) {
args := m.Called(
filter,
rank,
)
return args.Get(0).(*protobufs.AppShardProposal), args.Error(1)
}
// GetCertifiedGlobalState implements store.ClockStore.
func (m *MockClockStore) GetCertifiedGlobalState(rank uint64) (
*protobufs.GlobalProposal,
error,
) {
args := m.Called(
rank,
)
return args.Get(0).(*protobufs.GlobalProposal), args.Error(1)
}
// GetEarliestCertifiedAppShardState implements store.ClockStore.
func (m *MockClockStore) GetEarliestCertifiedAppShardState(
filter []byte,
) (*protobufs.AppShardProposal, error) {
args := m.Called(
filter,
)
return args.Get(0).(*protobufs.AppShardProposal), args.Error(1)
}
// GetEarliestCertifiedGlobalState implements store.ClockStore.
func (m *MockClockStore) GetEarliestCertifiedGlobalState() (
*protobufs.GlobalProposal,
error,
) {
args := m.Called()
return args.Get(0).(*protobufs.GlobalProposal), args.Error(1)
}
// GetEarliestQuorumCertificate implements store.ClockStore.
func (m *MockClockStore) GetEarliestQuorumCertificate(filter []byte) (
*protobufs.QuorumCertificate,
error,
) {
args := m.Called(
filter,
)
return args.Get(0).(*protobufs.QuorumCertificate), args.Error(1)
}
// GetEarliestTimeoutCertificate implements store.ClockStore.
func (m *MockClockStore) GetEarliestTimeoutCertificate(filter []byte) (
*protobufs.TimeoutCertificate,
error,
) {
args := m.Called(
filter,
)
return args.Get(0).(*protobufs.TimeoutCertificate), args.Error(1)
}
// GetLatestCertifiedAppShardState implements store.ClockStore.
func (m *MockClockStore) GetLatestCertifiedAppShardState(filter []byte) (
*protobufs.AppShardProposal,
error,
) {
args := m.Called(
filter,
)
return args.Get(0).(*protobufs.AppShardProposal), args.Error(1)
}
// GetLatestCertifiedGlobalState implements store.ClockStore.
func (m *MockClockStore) GetLatestCertifiedGlobalState() (
*protobufs.GlobalProposal,
error,
) {
args := m.Called()
return args.Get(0).(*protobufs.GlobalProposal), args.Error(1)
}
// GetLatestQuorumCertificate implements store.ClockStore.
func (m *MockClockStore) GetLatestQuorumCertificate(filter []byte) (
*protobufs.QuorumCertificate,
error,
) {
args := m.Called(
filter,
)
return args.Get(0).(*protobufs.QuorumCertificate), args.Error(1)
}
// GetLatestTimeoutCertificate implements store.ClockStore.
func (m *MockClockStore) GetLatestTimeoutCertificate(filter []byte) (
*protobufs.TimeoutCertificate,
error,
) {
args := m.Called(
filter,
)
return args.Get(0).(*protobufs.TimeoutCertificate), args.Error(1)
}
// GetQuorumCertificate implements store.ClockStore.
func (m *MockClockStore) GetQuorumCertificate(filter []byte, rank uint64) (
*protobufs.QuorumCertificate,
error,
) {
args := m.Called(
filter,
rank,
)
return args.Get(0).(*protobufs.QuorumCertificate), args.Error(1)
}
// GetTimeoutCertificate implements store.ClockStore.
func (m *MockClockStore) GetTimeoutCertificate(filter []byte, rank uint64) (
*protobufs.TimeoutCertificate,
error,
) {
args := m.Called(
filter,
rank,
)
return args.Get(0).(*protobufs.TimeoutCertificate), args.Error(1)
}
// PutCertifiedAppShardState implements store.ClockStore.
func (m *MockClockStore) PutCertifiedAppShardState(
state *protobufs.AppShardProposal,
txn store.Transaction,
) error {
args := m.Called(
state,
txn,
)
return args.Error(0)
}
// PutCertifiedGlobalState implements store.ClockStore.
func (m *MockClockStore) PutCertifiedGlobalState(
state *protobufs.GlobalProposal,
txn store.Transaction,
) error {
args := m.Called(
state,
txn,
)
return args.Error(0)
}
// PutQuorumCertificate implements store.ClockStore.
func (m *MockClockStore) PutQuorumCertificate(
qc *protobufs.QuorumCertificate,
txn store.Transaction,
) error {
args := m.Called(
qc,
txn,
)
return args.Error(0)
}
// PutTimeoutCertificate implements store.ClockStore.
func (m *MockClockStore) PutTimeoutCertificate(
timeoutCertificate *protobufs.TimeoutCertificate,
txn store.Transaction,
) error {
args := m.Called(
timeoutCertificate,
txn,
)
return args.Error(0)
}
// RangeCertifiedAppShardStates implements store.ClockStore.
func (m *MockClockStore) RangeCertifiedAppShardStates(
filter []byte,
startRank uint64,
endRank uint64,
) (store.TypedIterator[*protobufs.AppShardProposal], error) {
args := m.Called(
filter,
startRank,
endRank,
)
return args.Get(0).(store.TypedIterator[*protobufs.AppShardProposal]),
args.Error(1)
}
// RangeCertifiedGlobalStates implements store.ClockStore.
func (m *MockClockStore) RangeCertifiedGlobalStates(
startRank uint64,
endRank uint64,
) (store.TypedIterator[*protobufs.GlobalProposal], error) {
args := m.Called(
startRank,
endRank,
)
return args.Get(0).(store.TypedIterator[*protobufs.GlobalProposal]),
args.Error(1)
}
// RangeQuorumCertificates implements store.ClockStore.
func (m *MockClockStore) RangeQuorumCertificates(
filter []byte,
startRank uint64,
endRank uint64,
) (store.TypedIterator[*protobufs.QuorumCertificate], error) {
args := m.Called(
filter,
startRank,
endRank,
)
return args.Get(0).(store.TypedIterator[*protobufs.QuorumCertificate]),
args.Error(1)
}
// RangeTimeoutCertificates implements store.ClockStore.
func (m *MockClockStore) RangeTimeoutCertificates(
filter []byte,
startRank uint64,
endRank uint64,
) (store.TypedIterator[*protobufs.TimeoutCertificate], error) {
args := m.Called(
filter,
startRank,
endRank,
)
return args.Get(0).(store.TypedIterator[*protobufs.TimeoutCertificate]),
args.Error(1)
}
// CommitShardClockFrame implements store.ClockStore.
func (m *MockClockStore) CommitShardClockFrame(
filter []byte,

View File

@ -17,6 +17,55 @@ type ClockStore interface {
endFrameNumber uint64,
) (TypedIterator[*protobufs.GlobalFrame], error)
PutGlobalClockFrame(frame *protobufs.GlobalFrame, txn Transaction) error
GetLatestCertifiedGlobalState() (*protobufs.GlobalProposal, error)
GetEarliestCertifiedGlobalState() (*protobufs.GlobalProposal, error)
GetCertifiedGlobalState(rank uint64) (*protobufs.GlobalProposal, error)
RangeCertifiedGlobalStates(
startRank uint64,
endRank uint64,
) (TypedIterator[*protobufs.GlobalProposal], error)
PutCertifiedGlobalState(
state *protobufs.GlobalProposal,
txn Transaction,
) error
GetLatestQuorumCertificate(
filter []byte,
) (*protobufs.QuorumCertificate, error)
GetEarliestQuorumCertificate(
filter []byte,
) (*protobufs.QuorumCertificate, error)
GetQuorumCertificate(
filter []byte,
rank uint64,
) (*protobufs.QuorumCertificate, error)
RangeQuorumCertificates(
filter []byte,
startRank uint64,
endRank uint64,
) (TypedIterator[*protobufs.QuorumCertificate], error)
PutQuorumCertificate(
qc *protobufs.QuorumCertificate,
txn Transaction,
) error
GetLatestTimeoutCertificate(
filter []byte,
) (*protobufs.TimeoutCertificate, error)
GetEarliestTimeoutCertificate(
filter []byte,
) (*protobufs.TimeoutCertificate, error)
GetTimeoutCertificate(
filter []byte,
rank uint64,
) (*protobufs.TimeoutCertificate, error)
RangeTimeoutCertificates(
filter []byte,
startRank uint64,
endRank uint64,
) (TypedIterator[*protobufs.TimeoutCertificate], error)
PutTimeoutCertificate(
timeoutCertificate *protobufs.TimeoutCertificate,
txn Transaction,
) error
GetLatestShardClockFrame(
filter []byte,
) (*protobufs.AppShardFrame, []*tries.RollingFrecencyCritbitTrie, error)
@ -58,6 +107,25 @@ type ClockStore interface {
filter []byte,
frameNumber uint64,
) error
GetLatestCertifiedAppShardState(
filter []byte,
) (*protobufs.AppShardProposal, error)
GetEarliestCertifiedAppShardState(
filter []byte,
) (*protobufs.AppShardProposal, error)
GetCertifiedAppShardState(
filter []byte,
rank uint64,
) (*protobufs.AppShardProposal, error)
RangeCertifiedAppShardStates(
filter []byte,
startRank uint64,
endRank uint64,
) (TypedIterator[*protobufs.AppShardProposal], error)
PutCertifiedAppShardState(
state *protobufs.AppShardProposal,
txn Transaction,
) error
ResetGlobalClockFrames() error
ResetShardClockFrames(filter []byte) error
Compact(