wiring nearly complete, missing needed hooks for proposals

This commit is contained in:
Cassandra Heart 2025-11-04 07:08:35 -06:00
parent aaf73efef4
commit c86674b97f
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
6 changed files with 248 additions and 234 deletions

View File

@ -21,9 +21,12 @@ import (
"google.golang.org/grpc"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/forks"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub"
"source.quilibrium.com/quilibrium/monorepo/consensus/participant"
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
"source.quilibrium.com/quilibrium/monorepo/consensus/verification"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator"
@ -31,6 +34,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/consensus/reward"
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/voting"
"source.quilibrium.com/quilibrium/monorepo/node/dispatch"
"source.quilibrium.com/quilibrium/monorepo/node/execution/manager"
hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph"
@ -97,7 +101,7 @@ type AppConsensusEngine struct {
currentDifficultyMu sync.RWMutex
pendingMessages []*protobufs.Message
pendingMessagesMu sync.RWMutex
collectedMessages map[string][]*protobufs.Message
collectedMessages []*protobufs.Message
collectedMessagesMu sync.RWMutex
lastProvenFrameTime time.Time
lastProvenFrameTimeMu sync.RWMutex
@ -130,12 +134,11 @@ type AppConsensusEngine struct {
]
// Consensus plugins
signatureAggregator consensus.SignatureAggregator
voteAggregationDistributor *pubsub.VoteAggregationDistributor[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
]
signatureAggregator consensus.SignatureAggregator
voteCollectorDistributor *pubsub.VoteCollectorDistributor[*protobufs.ProposalVote]
timeoutCollectorDistributor *pubsub.TimeoutCollectorDistributor[*protobufs.ProposalVote]
voteAggregator consensus.VoteAggregator[*protobufs.AppShardFrame, *protobufs.ProposalVote]
timeoutAggregator consensus.TimeoutAggregator[*protobufs.ProposalVote]
// Provider implementations
syncProvider *AppSyncProvider
@ -227,7 +230,7 @@ func NewAppConsensusEngine(
peerInfoManager: peerInfoManager,
executors: make(map[string]execution.ShardExecutionEngine),
frameStore: make(map[string]*protobufs.AppShardFrame),
collectedMessages: make(map[string][]*protobufs.Message),
collectedMessages: []*protobufs.Message{},
consensusMessageQueue: make(chan *pb.Message, 1000),
proverMessageQueue: make(chan *pb.Message, 1000),
frameMessageQueue: make(chan *pb.Message, 100),
@ -240,16 +243,22 @@ func NewAppConsensusEngine(
alertPublicKey: []byte{},
}
engine.syncProvider = &AppSyncProvider{engine: engine}
engine.votingProvider = &AppVotingProvider{engine: engine}
engine.leaderProvider = &AppLeaderProvider{engine: engine}
engine.livenessProvider = &AppLivenessProvider{engine: engine}
engine.signatureAggregator = aggregator.WrapSignatureAggregator(
engine.blsConstructor,
engine.proverRegistry,
nil,
)
engine.voteAggregationDistributor = pubsub.NewVoteAggregationDistributor[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
]()
engine.timeoutCollectorDistributor = pubsub.NewTimeoutCollectorDistributor[*protobufs.ProposalVote]()
voteAggregationDistributor := voting.NewAppShardVoteAggregationDistributor()
engine.voteCollectorDistributor =
voteAggregationDistributor.VoteCollectorDistributor
timeoutAggregationDistributor :=
voting.NewAppShardTimeoutAggregationDistributor()
engine.timeoutCollectorDistributor =
timeoutAggregationDistributor.TimeoutCollectorDistributor
if config.Engine.AlertKey != "" {
alertPublicKey, err := hex.DecodeString(config.Engine.AlertKey)
@ -352,11 +361,6 @@ func NewAppConsensusEngine(
return nil, errors.Wrap(err, "failed to initialize execution engines")
}
engine.syncProvider = &AppSyncProvider{engine: engine}
engine.votingProvider = &AppVotingProvider{engine: engine}
engine.leaderProvider = &AppLeaderProvider{engine: engine}
engine.livenessProvider = &AppLivenessProvider{engine: engine}
appTimeReel.SetMaterializeFunc(engine.materialize)
appTimeReel.SetRevertFunc(engine.revert)
@ -434,6 +438,26 @@ func NewAppConsensusEngine(
initialState = &frame
}
engine.voteAggregator, err = voting.NewAppShardVoteAggregator[PeerID](
tracing.NewZapTracer(logger),
engine,
voteAggregationDistributor,
engine.signatureAggregator,
engine.votingProvider,
(*initialState).GetRank(),
)
if err != nil {
return nil, err
}
engine.timeoutAggregator, err = voting.NewAppShardTimeoutAggregator[PeerID](
tracing.NewZapTracer(logger),
engine,
engine,
engine.signatureAggregator,
timeoutAggregationDistributor,
(*initialState).GetRank(),
)
componentBuilder.AddWorker(func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
@ -1418,7 +1442,21 @@ func (e *AppConsensusEngine) startConsensus(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) error {
var err error
trustedRoot := &models.CertifiedState[*protobufs.AppShardFrame]{
State: &models.State[*protobufs.AppShardFrame]{
State: initialFrame,
},
}
notifier := pubsub.NewDistributor[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
]()
forks, err := forks.NewForks(trustedRoot, e, notifier)
if err != nil {
return err
}
e.consensusParticipant, err = participant.NewParticipant[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
@ -1427,21 +1465,29 @@ func (e *AppConsensusEngine) startConsensus(
](
tracing.NewZapTracer(e.logger), // logger
e, // committee
e.leaderProvider, // prover
e.votingProvider, // voter
e.consensusStore, // consensusStore
e.signatureAggregator, // signatureAggregator
e, // consensusVerifier
e.voteAggregationDistributor, // voteNotifier
e.timeoutCollectorDistributor, // timeoutNotifier
e, // consumer
e, // finalizer
nil, // filter
&models.CertifiedState[*protobufs.AppShardFrame]{ // trustedRoot
State: &models.State[*protobufs.AppShardFrame]{
State: initialFrame,
},
},
verification.NewSigner[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
PeerID,
](e.votingProvider), // signer
e.leaderProvider, // prover
e.votingProvider, // voter
notifier, // notifier
e.consensusStore, // consensusStore
e.signatureAggregator, // signatureAggregator
e, // consensusVerifier
e.voteCollectorDistributor, // voteCollectorDistributor
e.timeoutCollectorDistributor, // timeoutCollectorDistributor
forks, // forks
validator.NewValidator[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
](e, e), // validator
e.voteAggregator, // voteAggregator
e.timeoutAggregator, // timeoutAggregator
e, // finalizer
nil, // filter
trustedRoot,
)
if err != nil {
return err
@ -1754,4 +1800,44 @@ func (e *AppConsensusEngine) VerifyVote(
panic("unimplemented")
}
// IdentitiesByRank implements consensus.DynamicCommittee.
func (e *AppConsensusEngine) IdentitiesByRank(rank uint64) ([]models.WeightedIdentity, error) {
panic("unimplemented")
}
// IdentitiesByState implements consensus.DynamicCommittee.
func (e *AppConsensusEngine) IdentitiesByState(stateID models.Identity) ([]models.WeightedIdentity, error) {
panic("unimplemented")
}
// IdentityByRank implements consensus.DynamicCommittee.
func (e *AppConsensusEngine) IdentityByRank(rank uint64, participantID models.Identity) (models.WeightedIdentity, error) {
panic("unimplemented")
}
// IdentityByState implements consensus.DynamicCommittee.
func (e *AppConsensusEngine) IdentityByState(stateID models.Identity, participantID models.Identity) (models.WeightedIdentity, error) {
panic("unimplemented")
}
// LeaderForRank implements consensus.DynamicCommittee.
func (e *AppConsensusEngine) LeaderForRank(rank uint64) (models.Identity, error) {
panic("unimplemented")
}
// QuorumThresholdForRank implements consensus.DynamicCommittee.
func (e *AppConsensusEngine) QuorumThresholdForRank(rank uint64) (uint64, error) {
panic("unimplemented")
}
// Self implements consensus.DynamicCommittee.
func (e *AppConsensusEngine) Self() models.Identity {
panic("unimplemented")
}
// TimeoutThresholdForRank implements consensus.DynamicCommittee.
func (e *AppConsensusEngine) TimeoutThresholdForRank(rank uint64) (uint64, error) {
panic("unimplemented")
}
var _ consensus.DynamicCommittee = (*AppConsensusEngine)(nil)

View File

@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
)
@ -20,8 +21,8 @@ type AppLeaderProvider struct {
}
func (p *AppLeaderProvider) GetNextLeaders(
prior **protobufs.AppShardFrame,
ctx context.Context,
prior **protobufs.AppShardFrame,
) ([]PeerID, error) {
// Get the parent selector for next prover calculation
var parentSelector []byte
@ -60,16 +61,22 @@ func (p *AppLeaderProvider) GetNextLeaders(
}
func (p *AppLeaderProvider) ProveNextState(
prior **protobufs.AppShardFrame,
collected CollectedCommitments,
ctx context.Context,
rank uint64,
filter []byte,
priorState models.Identity,
) (**protobufs.AppShardFrame, error) {
timer := prometheus.NewTimer(frameProvingDuration.WithLabelValues(
p.engine.appAddressHex,
))
defer timer.ObserveDuration()
if prior == nil || *prior == nil {
prior, err := p.engine.appTimeReel.GetFrame(priorState)
if err != nil {
return nil, errors.Wrap(err, "prove next state")
}
if prior == nil {
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
return nil, errors.Wrap(errors.New("nil prior frame"), "prove next state")
}
@ -97,20 +104,10 @@ func (p *AppLeaderProvider) ProveNextState(
}
// Get collected messages to include in frame
p.engine.pendingMessagesMu.RLock()
messages := make([]*protobufs.Message, len(p.engine.collectedMessages[string(
collected.commitmentHash[:32],
)]))
copy(messages, p.engine.collectedMessages[string(
collected.commitmentHash[:32],
)])
p.engine.pendingMessagesMu.RUnlock()
// Clear collected messages after copying
p.engine.collectedMessagesMu.Lock()
p.engine.collectedMessages[string(
collected.commitmentHash[:32],
)] = []*protobufs.Message{}
messages := make([]*protobufs.Message, len(p.engine.collectedMessages))
copy(messages, p.engine.collectedMessages)
p.engine.collectedMessages = []*protobufs.Message{}
p.engine.collectedMessagesMu.Unlock()
// Update pending messages metric
@ -123,7 +120,7 @@ func (p *AppLeaderProvider) ProveNextState(
)
// Prove the frame
newFrame, err := p.engine.internalProveFrame(messages, (*prior))
newFrame, err := p.engine.internalProveFrame(messages, prior)
if err != nil {
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
return nil, errors.Wrap(err, "prove frame")
@ -146,3 +143,9 @@ func (p *AppLeaderProvider) ProveNextState(
return &newFrame, nil
}
var _ consensus.LeaderProvider[
*protobufs.AppShardFrame,
PeerID,
CollectedCommitments,
] = (*AppLeaderProvider)(nil)

View File

@ -3,7 +3,6 @@ package app
import (
"context"
"slices"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -91,7 +90,7 @@ func (p *AppLivenessProvider) Collect(
}
p.engine.collectedMessagesMu.Lock()
p.engine.collectedMessages[string(commitment[:32])] = finalizedMessages
p.engine.collectedMessages = finalizedMessages
p.engine.collectedMessagesMu.Unlock()
return CollectedCommitments{
@ -101,77 +100,6 @@ func (p *AppLivenessProvider) Collect(
}, nil
}
func (p *AppLivenessProvider) SendLiveness(
prior **protobufs.AppShardFrame,
collected CollectedCommitments,
ctx context.Context,
) error {
// Get prover key
signer, _, publicKey, _ := p.engine.GetProvingKey(p.engine.config.Engine)
if publicKey == nil {
return errors.New("no proving key available for liveness check")
}
frameNumber := uint64(0)
if prior != nil && (*prior).Header != nil {
frameNumber = (*prior).Header.FrameNumber + 1
}
lastProcessed := p.engine.GetFrame()
if lastProcessed != nil && lastProcessed.Header.FrameNumber > frameNumber {
return errors.New("out of sync, forcing resync")
}
// Create liveness check message
livenessCheck := &protobufs.ProverLivenessCheck{
Filter: p.engine.appAddress,
FrameNumber: frameNumber,
Timestamp: time.Now().UnixMilli(),
CommitmentHash: collected.commitmentHash,
}
// Sign the message
signatureData, err := livenessCheck.ConstructSignaturePayload()
if err != nil {
return errors.Wrap(err, "send liveness")
}
sig, err := signer.SignWithDomain(
signatureData,
livenessCheck.GetSignatureDomain(),
)
if err != nil {
return errors.Wrap(err, "send liveness")
}
proverAddress := p.engine.getAddressFromPublicKey(publicKey)
livenessCheck.PublicKeySignatureBls48581 =
&protobufs.BLS48581AddressedSignature{
Address: proverAddress,
Signature: sig,
}
// Serialize using canonical bytes
data, err := livenessCheck.ToCanonicalBytes()
if err != nil {
return errors.Wrap(err, "serialize liveness check")
}
if err := p.engine.pubsub.PublishToBitmask(
p.engine.getConsensusMessageBitmask(),
data,
); err != nil {
return errors.Wrap(err, "send liveness")
}
p.engine.logger.Info(
"sent liveness check",
zap.Uint64("frame_number", frameNumber),
)
return nil
}
func (p *AppLivenessProvider) validateAndLockMessage(
frameNumber uint64,
i int,

View File

@ -9,6 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
@ -193,7 +194,7 @@ func (e *AppConsensusEngine) handleFrameMessage(message *pb.Message) {
e.frameStore[string(frameID)] = frame
e.frameStoreMu.Unlock()
if err := e.appTimeReel.Insert(ctx, frame); err != nil {
if err := e.appTimeReel.Insert(e.ctx, frame); err != nil {
// Success metric recorded at the end of processing
framesProcessedTotal.WithLabelValues("error").Inc()
return
@ -273,7 +274,7 @@ func (e *AppConsensusEngine) handleGlobalFrameMessage(message *pb.Message) {
return
}
if err := e.globalTimeReel.Insert(ctx, frame); err != nil {
if err := e.globalTimeReel.Insert(e.ctx, frame); err != nil {
// Success metric recorded at the end of processing
globalFramesProcessedTotal.WithLabelValues("error").Inc()
return
@ -379,7 +380,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) {
}
if err := e.dispatchService.AddInboxMessage(
ctx,
e.ctx,
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
@ -392,7 +393,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) {
}
if err := e.dispatchService.AddHubInboxAssociation(
ctx,
e.ctx,
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
@ -405,7 +406,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) {
}
if err := e.dispatchService.DeleteHubInboxAssociation(
ctx,
e.ctx,
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
@ -449,10 +450,7 @@ func (e *AppConsensusEngine) handleProposal(message *pb.Message) {
e.frameStore[string(frameID)] = frame.Clone().(*protobufs.AppShardFrame)
e.frameStoreMu.Unlock()
e.stateMachine.ReceiveProposal(
PeerID{ID: frame.Header.Prover},
&frame,
)
e.consensusParticipant.SubmitProposal()
proposalProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
}
}
@ -506,64 +504,17 @@ func (e *AppConsensusEngine) handleTimeoutState(message *pb.Message) {
return
}
if !bytes.Equal(timeoutState.Filter, e.appAddress) {
if !bytes.Equal(timeoutState.Vote.Filter, e.appAddress) {
return
}
e.frameStoreMu.RLock()
var matchingFrame *protobufs.AppShardFrame
for _, frame := range e.frameStore {
if frame.Header != nil &&
frame.Header.FrameNumber == timeoutState.FrameNumber {
frameSelector := e.calculateFrameSelector(frame.Header)
if bytes.Equal(frameSelector, timeoutState.Selector) {
matchingFrame = frame
break
}
}
}
if matchingFrame == nil {
e.frameStoreMu.RUnlock()
return
}
e.frameStoreMu.RUnlock()
e.frameStoreMu.Lock()
defer e.frameStoreMu.Unlock()
matchingFrame.Header.PublicKeySignatureBls48581 =
timeoutState.AggregateSignature
valid, err := e.frameValidator.Validate(matchingFrame)
if !valid || err != nil {
e.logger.Error("received invalid timeoutState", zap.Error(err))
timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
if matchingFrame.Header.Prover == nil {
e.logger.Error("timeoutState with no matched prover")
timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
if err := e.stateMachine.ReceivetimeoutState(
PeerID{ID: matchingFrame.Header.Prover},
&matchingFrame,
); err != nil {
e.logger.Error("could not receive timeoutState", zap.Error(err))
timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
if err := e.appTimeReel.Insert(ctx, matchingFrame); err != nil {
e.logger.Error(
"could not insert into time reel",
zap.Error(err),
)
timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
e.timeoutAggregator.AddTimeout(&models.TimeoutState[*protobufs.ProposalVote]{
Rank: timeoutState.Vote.Rank,
LatestQuorumCertificate: timeoutState.LatestQuorumCertificate,
PriorRankTimeoutCertificate: timeoutState.PriorRankTimeoutCertificate,
Vote: &timeoutState.Vote,
TimeoutTick: timeoutState.TimeoutTick,
})
timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
}

View File

@ -27,9 +27,12 @@ import (
"google.golang.org/grpc"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/forks"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub"
"source.quilibrium.com/quilibrium/monorepo/consensus/participant"
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
"source.quilibrium.com/quilibrium/monorepo/consensus/verification"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator"
@ -37,6 +40,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/consensus/reward"
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/voting"
"source.quilibrium.com/quilibrium/monorepo/node/dispatch"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global/compat"
@ -165,12 +169,11 @@ type GlobalConsensusEngine struct {
]
// Consensus plugins
signatureAggregator consensus.SignatureAggregator
voteAggregationDistributor *pubsub.VoteAggregationDistributor[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
]
signatureAggregator consensus.SignatureAggregator
voteCollectorDistributor *pubsub.VoteCollectorDistributor[*protobufs.ProposalVote]
timeoutCollectorDistributor *pubsub.TimeoutCollectorDistributor[*protobufs.ProposalVote]
voteAggregator consensus.VoteAggregator[*protobufs.GlobalFrame, *protobufs.ProposalVote]
timeoutAggregator consensus.TimeoutAggregator[*protobufs.ProposalVote]
// Provider implementations
syncProvider *GlobalSyncProvider
@ -311,16 +314,23 @@ func NewGlobalConsensusEngine(
}
}
// Create provider implementations
engine.syncProvider = &GlobalSyncProvider{engine: engine}
engine.votingProvider = &GlobalVotingProvider{engine: engine}
engine.leaderProvider = &GlobalLeaderProvider{engine: engine}
engine.livenessProvider = &GlobalLivenessProvider{engine: engine}
engine.signatureAggregator = aggregator.WrapSignatureAggregator(
engine.blsConstructor,
engine.proverRegistry,
nil,
)
engine.voteAggregationDistributor = pubsub.NewVoteAggregationDistributor[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
]()
engine.timeoutCollectorDistributor = pubsub.NewTimeoutCollectorDistributor[*protobufs.ProposalVote]()
voteAggregationDistributor := voting.NewGlobalVoteAggregationDistributor()
engine.voteCollectorDistributor =
voteAggregationDistributor.VoteCollectorDistributor
timeoutAggregationDistributor :=
voting.NewGlobalTimeoutAggregationDistributor()
engine.timeoutCollectorDistributor =
timeoutAggregationDistributor.TimeoutCollectorDistributor
// Create the worker manager
engine.workerManager = mgr.NewWorkerManager(
@ -379,12 +389,6 @@ func NewGlobalConsensusEngine(
// Establish alert halt context
engine.haltCtx, engine.halt = context.WithCancel(context.Background())
// Create provider implementations
engine.syncProvider = &GlobalSyncProvider{engine: engine}
engine.votingProvider = &GlobalVotingProvider{engine: engine}
engine.leaderProvider = &GlobalLeaderProvider{engine: engine}
engine.livenessProvider = &GlobalLivenessProvider{engine: engine}
// Create dispatch service
engine.dispatchService = dispatch.NewDispatchService(
inboxStore,
@ -489,6 +493,26 @@ func NewGlobalConsensusEngine(
initialState = &frame
}
engine.voteAggregator, err = voting.NewGlobalVoteAggregator[GlobalPeerID](
tracing.NewZapTracer(logger),
engine,
voteAggregationDistributor,
engine.signatureAggregator,
engine.votingProvider,
(*initialState).GetRank(),
)
if err != nil {
return nil, err
}
engine.timeoutAggregator, err = voting.NewAppShardTimeoutAggregator[GlobalPeerID](
tracing.NewZapTracer(logger),
engine,
engine,
engine.signatureAggregator,
timeoutAggregationDistributor,
(*initialState).GetRank(),
)
if engine.config.P2P.Network == 99 || engine.config.Engine.ArchiveMode {
componentBuilder.AddWorker(func(
ctx lifecycle.SignalerContext,
@ -2308,7 +2332,21 @@ func (e *GlobalConsensusEngine) startConsensus(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) error {
var err error
trustedRoot := &models.CertifiedState[*protobufs.GlobalFrame]{
State: &models.State[*protobufs.GlobalFrame]{
State: initialFrame,
},
}
notifier := pubsub.NewDistributor[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
]()
forks, err := forks.NewForks(trustedRoot, e, notifier)
if err != nil {
return err
}
e.consensusParticipant, err = participant.NewParticipant[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
@ -2317,27 +2355,36 @@ func (e *GlobalConsensusEngine) startConsensus(
](
tracing.NewZapTracer(e.logger), // logger
e, // committee
e.leaderProvider, // prover
e.votingProvider, // voter
e.consensusStore, // consensusStore
e.signatureAggregator, // signatureAggregator
e, // consensusVerifier
e.voteAggregationDistributor, // voteNotifier
e.timeoutCollectorDistributor, // timeoutNotifier
e, // consumer
e, // finalizer
nil, // filter
&models.CertifiedState[*protobufs.GlobalFrame]{ // trustedRoot
State: &models.State[*protobufs.GlobalFrame]{
State: initialFrame,
},
},
verification.NewSigner[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
GlobalPeerID,
](e.votingProvider), // signer
e.leaderProvider, // prover
e.votingProvider, // voter
notifier, // notifier
e.consensusStore, // consensusStore
e.signatureAggregator, // signatureAggregator
e, // consensusVerifier
e.voteCollectorDistributor, // voteCollectorDistributor
e.timeoutCollectorDistributor, // timeoutCollectorDistributor
forks, // forks
validator.NewValidator[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
](e, e), // validator
e.voteAggregator, // voteAggregator
e.timeoutAggregator, // timeoutAggregator
e, // finalizer
nil, // filter
trustedRoot,
)
if err != nil {
return err
}
e.consensusParticipant.Start(e.ctx)
ready()
e.consensusParticipant.Start(ctx)
return nil
}

View File

@ -11,7 +11,6 @@ import (
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
"source.quilibrium.com/quilibrium/monorepo/consensus/voteaggregator"
"source.quilibrium.com/quilibrium/monorepo/consensus/votecollector"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/global"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
)
@ -25,7 +24,7 @@ func NewAppShardVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[
]()
}
func NewAppShardVoteAggregator(
func NewAppShardVoteAggregator[PeerIDT models.Unique](
logger consensus.TraceLogger,
committee consensus.DynamicCommittee,
voteAggregationDistributor *pubsub.VoteAggregationDistributor[
@ -36,7 +35,7 @@ func NewAppShardVoteAggregator(
votingProvider consensus.VotingProvider[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
global.GlobalPeerID,
PeerIDT,
],
currentRank uint64,
) (
@ -46,7 +45,7 @@ func NewAppShardVoteAggregator(
voteProcessorFactory := votecollector.NewVoteProcessorFactory[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
global.GlobalPeerID,
PeerIDT,
](committee, func(qc models.QuorumCertificate) {})
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(
@ -55,7 +54,7 @@ func NewAppShardVoteAggregator(
votecollector.VerifyingVoteProcessorFactory[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
global.GlobalPeerID,
PeerIDT,
](
voteProcessorFactory.Create,
),
@ -85,7 +84,7 @@ func NewAppShardTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistri
return pubsub.NewTimeoutAggregationDistributor[*protobufs.ProposalVote]()
}
func NewAppShardTimeoutAggregator(
func NewAppShardTimeoutAggregator[PeerIDT models.Unique](
logger consensus.TraceLogger,
committee consensus.DynamicCommittee,
consensusVerifier consensus.Verifier[*protobufs.ProposalVote],
@ -102,7 +101,7 @@ func NewAppShardTimeoutAggregator(
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
global.GlobalPeerID,
PeerIDT,
](
logger,
signatureAggregator,
@ -143,7 +142,7 @@ func NewGlobalVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[
]()
}
func NewGlobalVoteAggregator(
func NewGlobalVoteAggregator[PeerIDT models.Unique](
logger consensus.TraceLogger,
committee consensus.DynamicCommittee,
voteAggregationDistributor *pubsub.VoteAggregationDistributor[
@ -154,7 +153,7 @@ func NewGlobalVoteAggregator(
votingProvider consensus.VotingProvider[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
global.GlobalPeerID,
PeerIDT,
],
currentRank uint64,
) (
@ -164,7 +163,7 @@ func NewGlobalVoteAggregator(
voteProcessorFactory := votecollector.NewVoteProcessorFactory[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
global.GlobalPeerID,
PeerIDT,
](committee, func(qc models.QuorumCertificate) {})
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(
@ -173,7 +172,7 @@ func NewGlobalVoteAggregator(
votecollector.VerifyingVoteProcessorFactory[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
global.GlobalPeerID,
PeerIDT,
](
voteProcessorFactory.Create,
),
@ -203,7 +202,7 @@ func NewGlobalTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistribu
return pubsub.NewTimeoutAggregationDistributor[*protobufs.ProposalVote]()
}
func NewGlobalTimeoutAggregator(
func NewGlobalTimeoutAggregator[PeerIDT models.Unique](
logger consensus.TraceLogger,
committee consensus.DynamicCommittee,
consensusVerifier consensus.Verifier[*protobufs.ProposalVote],
@ -220,7 +219,7 @@ func NewGlobalTimeoutAggregator(
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
global.GlobalPeerID,
PeerIDT,
](
logger,
signatureAggregator,