diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 00ea3b9..98a617e 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -21,9 +21,12 @@ import ( "google.golang.org/grpc" "source.quilibrium.com/quilibrium/monorepo/config" "source.quilibrium.com/quilibrium/monorepo/consensus" + "source.quilibrium.com/quilibrium/monorepo/consensus/forks" "source.quilibrium.com/quilibrium/monorepo/consensus/models" "source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub" "source.quilibrium.com/quilibrium/monorepo/consensus/participant" + "source.quilibrium.com/quilibrium/monorepo/consensus/validator" + "source.quilibrium.com/quilibrium/monorepo/consensus/verification" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" "source.quilibrium.com/quilibrium/monorepo/lifecycle" "source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator" @@ -31,6 +34,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/consensus/reward" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" "source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing" + "source.quilibrium.com/quilibrium/monorepo/node/consensus/voting" "source.quilibrium.com/quilibrium/monorepo/node/dispatch" "source.quilibrium.com/quilibrium/monorepo/node/execution/manager" hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph" @@ -97,7 +101,7 @@ type AppConsensusEngine struct { currentDifficultyMu sync.RWMutex pendingMessages []*protobufs.Message pendingMessagesMu sync.RWMutex - collectedMessages map[string][]*protobufs.Message + collectedMessages []*protobufs.Message collectedMessagesMu sync.RWMutex lastProvenFrameTime time.Time lastProvenFrameTimeMu sync.RWMutex @@ -130,12 +134,11 @@ type AppConsensusEngine struct { ] // Consensus plugins - signatureAggregator consensus.SignatureAggregator - voteAggregationDistributor *pubsub.VoteAggregationDistributor[ - *protobufs.AppShardFrame, - *protobufs.ProposalVote, - ] + signatureAggregator consensus.SignatureAggregator + voteCollectorDistributor *pubsub.VoteCollectorDistributor[*protobufs.ProposalVote] timeoutCollectorDistributor *pubsub.TimeoutCollectorDistributor[*protobufs.ProposalVote] + voteAggregator consensus.VoteAggregator[*protobufs.AppShardFrame, *protobufs.ProposalVote] + timeoutAggregator consensus.TimeoutAggregator[*protobufs.ProposalVote] // Provider implementations syncProvider *AppSyncProvider @@ -227,7 +230,7 @@ func NewAppConsensusEngine( peerInfoManager: peerInfoManager, executors: make(map[string]execution.ShardExecutionEngine), frameStore: make(map[string]*protobufs.AppShardFrame), - collectedMessages: make(map[string][]*protobufs.Message), + collectedMessages: []*protobufs.Message{}, consensusMessageQueue: make(chan *pb.Message, 1000), proverMessageQueue: make(chan *pb.Message, 1000), frameMessageQueue: make(chan *pb.Message, 100), @@ -240,16 +243,22 @@ func NewAppConsensusEngine( alertPublicKey: []byte{}, } + engine.syncProvider = &AppSyncProvider{engine: engine} + engine.votingProvider = &AppVotingProvider{engine: engine} + engine.leaderProvider = &AppLeaderProvider{engine: engine} + engine.livenessProvider = &AppLivenessProvider{engine: engine} engine.signatureAggregator = aggregator.WrapSignatureAggregator( engine.blsConstructor, engine.proverRegistry, nil, ) - engine.voteAggregationDistributor = pubsub.NewVoteAggregationDistributor[ - *protobufs.AppShardFrame, - *protobufs.ProposalVote, - ]() - engine.timeoutCollectorDistributor = pubsub.NewTimeoutCollectorDistributor[*protobufs.ProposalVote]() + voteAggregationDistributor := voting.NewAppShardVoteAggregationDistributor() + engine.voteCollectorDistributor = + voteAggregationDistributor.VoteCollectorDistributor + timeoutAggregationDistributor := + voting.NewAppShardTimeoutAggregationDistributor() + engine.timeoutCollectorDistributor = + timeoutAggregationDistributor.TimeoutCollectorDistributor if config.Engine.AlertKey != "" { alertPublicKey, err := hex.DecodeString(config.Engine.AlertKey) @@ -352,11 +361,6 @@ func NewAppConsensusEngine( return nil, errors.Wrap(err, "failed to initialize execution engines") } - engine.syncProvider = &AppSyncProvider{engine: engine} - engine.votingProvider = &AppVotingProvider{engine: engine} - engine.leaderProvider = &AppLeaderProvider{engine: engine} - engine.livenessProvider = &AppLivenessProvider{engine: engine} - appTimeReel.SetMaterializeFunc(engine.materialize) appTimeReel.SetRevertFunc(engine.revert) @@ -434,6 +438,26 @@ func NewAppConsensusEngine( initialState = &frame } + engine.voteAggregator, err = voting.NewAppShardVoteAggregator[PeerID]( + tracing.NewZapTracer(logger), + engine, + voteAggregationDistributor, + engine.signatureAggregator, + engine.votingProvider, + (*initialState).GetRank(), + ) + if err != nil { + return nil, err + } + engine.timeoutAggregator, err = voting.NewAppShardTimeoutAggregator[PeerID]( + tracing.NewZapTracer(logger), + engine, + engine, + engine.signatureAggregator, + timeoutAggregationDistributor, + (*initialState).GetRank(), + ) + componentBuilder.AddWorker(func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, @@ -1418,7 +1442,21 @@ func (e *AppConsensusEngine) startConsensus( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) error { - var err error + trustedRoot := &models.CertifiedState[*protobufs.AppShardFrame]{ + State: &models.State[*protobufs.AppShardFrame]{ + State: initialFrame, + }, + } + notifier := pubsub.NewDistributor[ + *protobufs.AppShardFrame, + *protobufs.ProposalVote, + ]() + + forks, err := forks.NewForks(trustedRoot, e, notifier) + if err != nil { + return err + } + e.consensusParticipant, err = participant.NewParticipant[ *protobufs.AppShardFrame, *protobufs.ProposalVote, @@ -1427,21 +1465,29 @@ func (e *AppConsensusEngine) startConsensus( ]( tracing.NewZapTracer(e.logger), // logger e, // committee - e.leaderProvider, // prover - e.votingProvider, // voter - e.consensusStore, // consensusStore - e.signatureAggregator, // signatureAggregator - e, // consensusVerifier - e.voteAggregationDistributor, // voteNotifier - e.timeoutCollectorDistributor, // timeoutNotifier - e, // consumer - e, // finalizer - nil, // filter - &models.CertifiedState[*protobufs.AppShardFrame]{ // trustedRoot - State: &models.State[*protobufs.AppShardFrame]{ - State: initialFrame, - }, - }, + verification.NewSigner[ + *protobufs.AppShardFrame, + *protobufs.ProposalVote, + PeerID, + ](e.votingProvider), // signer + e.leaderProvider, // prover + e.votingProvider, // voter + notifier, // notifier + e.consensusStore, // consensusStore + e.signatureAggregator, // signatureAggregator + e, // consensusVerifier + e.voteCollectorDistributor, // voteCollectorDistributor + e.timeoutCollectorDistributor, // timeoutCollectorDistributor + forks, // forks + validator.NewValidator[ + *protobufs.AppShardFrame, + *protobufs.ProposalVote, + ](e, e), // validator + e.voteAggregator, // voteAggregator + e.timeoutAggregator, // timeoutAggregator + e, // finalizer + nil, // filter + trustedRoot, ) if err != nil { return err @@ -1754,4 +1800,44 @@ func (e *AppConsensusEngine) VerifyVote( panic("unimplemented") } +// IdentitiesByRank implements consensus.DynamicCommittee. +func (e *AppConsensusEngine) IdentitiesByRank(rank uint64) ([]models.WeightedIdentity, error) { + panic("unimplemented") +} + +// IdentitiesByState implements consensus.DynamicCommittee. +func (e *AppConsensusEngine) IdentitiesByState(stateID models.Identity) ([]models.WeightedIdentity, error) { + panic("unimplemented") +} + +// IdentityByRank implements consensus.DynamicCommittee. +func (e *AppConsensusEngine) IdentityByRank(rank uint64, participantID models.Identity) (models.WeightedIdentity, error) { + panic("unimplemented") +} + +// IdentityByState implements consensus.DynamicCommittee. +func (e *AppConsensusEngine) IdentityByState(stateID models.Identity, participantID models.Identity) (models.WeightedIdentity, error) { + panic("unimplemented") +} + +// LeaderForRank implements consensus.DynamicCommittee. +func (e *AppConsensusEngine) LeaderForRank(rank uint64) (models.Identity, error) { + panic("unimplemented") +} + +// QuorumThresholdForRank implements consensus.DynamicCommittee. +func (e *AppConsensusEngine) QuorumThresholdForRank(rank uint64) (uint64, error) { + panic("unimplemented") +} + +// Self implements consensus.DynamicCommittee. +func (e *AppConsensusEngine) Self() models.Identity { + panic("unimplemented") +} + +// TimeoutThresholdForRank implements consensus.DynamicCommittee. +func (e *AppConsensusEngine) TimeoutThresholdForRank(rank uint64) (uint64, error) { + panic("unimplemented") +} + var _ consensus.DynamicCommittee = (*AppConsensusEngine)(nil) diff --git a/node/consensus/app/consensus_leader_provider.go b/node/consensus/app/consensus_leader_provider.go index 7ad8c35..567165a 100644 --- a/node/consensus/app/consensus_leader_provider.go +++ b/node/consensus/app/consensus_leader_provider.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "source.quilibrium.com/quilibrium/monorepo/consensus" "source.quilibrium.com/quilibrium/monorepo/consensus/models" "source.quilibrium.com/quilibrium/monorepo/protobufs" ) @@ -20,8 +21,8 @@ type AppLeaderProvider struct { } func (p *AppLeaderProvider) GetNextLeaders( - prior **protobufs.AppShardFrame, ctx context.Context, + prior **protobufs.AppShardFrame, ) ([]PeerID, error) { // Get the parent selector for next prover calculation var parentSelector []byte @@ -60,16 +61,22 @@ func (p *AppLeaderProvider) GetNextLeaders( } func (p *AppLeaderProvider) ProveNextState( - prior **protobufs.AppShardFrame, - collected CollectedCommitments, ctx context.Context, + rank uint64, + filter []byte, + priorState models.Identity, ) (**protobufs.AppShardFrame, error) { timer := prometheus.NewTimer(frameProvingDuration.WithLabelValues( p.engine.appAddressHex, )) defer timer.ObserveDuration() - if prior == nil || *prior == nil { + prior, err := p.engine.appTimeReel.GetFrame(priorState) + if err != nil { + return nil, errors.Wrap(err, "prove next state") + } + + if prior == nil { frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc() return nil, errors.Wrap(errors.New("nil prior frame"), "prove next state") } @@ -97,20 +104,10 @@ func (p *AppLeaderProvider) ProveNextState( } // Get collected messages to include in frame - p.engine.pendingMessagesMu.RLock() - messages := make([]*protobufs.Message, len(p.engine.collectedMessages[string( - collected.commitmentHash[:32], - )])) - copy(messages, p.engine.collectedMessages[string( - collected.commitmentHash[:32], - )]) - p.engine.pendingMessagesMu.RUnlock() - - // Clear collected messages after copying p.engine.collectedMessagesMu.Lock() - p.engine.collectedMessages[string( - collected.commitmentHash[:32], - )] = []*protobufs.Message{} + messages := make([]*protobufs.Message, len(p.engine.collectedMessages)) + copy(messages, p.engine.collectedMessages) + p.engine.collectedMessages = []*protobufs.Message{} p.engine.collectedMessagesMu.Unlock() // Update pending messages metric @@ -123,7 +120,7 @@ func (p *AppLeaderProvider) ProveNextState( ) // Prove the frame - newFrame, err := p.engine.internalProveFrame(messages, (*prior)) + newFrame, err := p.engine.internalProveFrame(messages, prior) if err != nil { frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc() return nil, errors.Wrap(err, "prove frame") @@ -146,3 +143,9 @@ func (p *AppLeaderProvider) ProveNextState( return &newFrame, nil } + +var _ consensus.LeaderProvider[ + *protobufs.AppShardFrame, + PeerID, + CollectedCommitments, +] = (*AppLeaderProvider)(nil) diff --git a/node/consensus/app/consensus_liveness_provider.go b/node/consensus/app/consensus_liveness_provider.go index 0420c3c..db6def8 100644 --- a/node/consensus/app/consensus_liveness_provider.go +++ b/node/consensus/app/consensus_liveness_provider.go @@ -3,7 +3,6 @@ package app import ( "context" "slices" - "time" "github.com/pkg/errors" "go.uber.org/zap" @@ -91,7 +90,7 @@ func (p *AppLivenessProvider) Collect( } p.engine.collectedMessagesMu.Lock() - p.engine.collectedMessages[string(commitment[:32])] = finalizedMessages + p.engine.collectedMessages = finalizedMessages p.engine.collectedMessagesMu.Unlock() return CollectedCommitments{ @@ -101,77 +100,6 @@ func (p *AppLivenessProvider) Collect( }, nil } -func (p *AppLivenessProvider) SendLiveness( - prior **protobufs.AppShardFrame, - collected CollectedCommitments, - ctx context.Context, -) error { - // Get prover key - signer, _, publicKey, _ := p.engine.GetProvingKey(p.engine.config.Engine) - if publicKey == nil { - return errors.New("no proving key available for liveness check") - } - - frameNumber := uint64(0) - if prior != nil && (*prior).Header != nil { - frameNumber = (*prior).Header.FrameNumber + 1 - } - - lastProcessed := p.engine.GetFrame() - if lastProcessed != nil && lastProcessed.Header.FrameNumber > frameNumber { - return errors.New("out of sync, forcing resync") - } - - // Create liveness check message - livenessCheck := &protobufs.ProverLivenessCheck{ - Filter: p.engine.appAddress, - FrameNumber: frameNumber, - Timestamp: time.Now().UnixMilli(), - CommitmentHash: collected.commitmentHash, - } - - // Sign the message - signatureData, err := livenessCheck.ConstructSignaturePayload() - if err != nil { - return errors.Wrap(err, "send liveness") - } - - sig, err := signer.SignWithDomain( - signatureData, - livenessCheck.GetSignatureDomain(), - ) - if err != nil { - return errors.Wrap(err, "send liveness") - } - - proverAddress := p.engine.getAddressFromPublicKey(publicKey) - livenessCheck.PublicKeySignatureBls48581 = - &protobufs.BLS48581AddressedSignature{ - Address: proverAddress, - Signature: sig, - } - - // Serialize using canonical bytes - data, err := livenessCheck.ToCanonicalBytes() - if err != nil { - return errors.Wrap(err, "serialize liveness check") - } - - if err := p.engine.pubsub.PublishToBitmask( - p.engine.getConsensusMessageBitmask(), - data, - ); err != nil { - return errors.Wrap(err, "send liveness") - } - - p.engine.logger.Info( - "sent liveness check", - zap.Uint64("frame_number", frameNumber), - ) - - return nil -} - func (p *AppLivenessProvider) validateAndLockMessage( frameNumber uint64, i int, diff --git a/node/consensus/app/message_processors.go b/node/consensus/app/message_processors.go index 5057532..975bd3d 100644 --- a/node/consensus/app/message_processors.go +++ b/node/consensus/app/message_processors.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/crypto/sha3" + "source.quilibrium.com/quilibrium/monorepo/consensus/models" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" "source.quilibrium.com/quilibrium/monorepo/lifecycle" "source.quilibrium.com/quilibrium/monorepo/protobufs" @@ -193,7 +194,7 @@ func (e *AppConsensusEngine) handleFrameMessage(message *pb.Message) { e.frameStore[string(frameID)] = frame e.frameStoreMu.Unlock() - if err := e.appTimeReel.Insert(ctx, frame); err != nil { + if err := e.appTimeReel.Insert(e.ctx, frame); err != nil { // Success metric recorded at the end of processing framesProcessedTotal.WithLabelValues("error").Inc() return @@ -273,7 +274,7 @@ func (e *AppConsensusEngine) handleGlobalFrameMessage(message *pb.Message) { return } - if err := e.globalTimeReel.Insert(ctx, frame); err != nil { + if err := e.globalTimeReel.Insert(e.ctx, frame); err != nil { // Success metric recorded at the end of processing globalFramesProcessedTotal.WithLabelValues("error").Inc() return @@ -379,7 +380,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) { } if err := e.dispatchService.AddInboxMessage( - ctx, + e.ctx, envelope, ); err != nil { e.logger.Debug("failed to add inbox message", zap.Error(err)) @@ -392,7 +393,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) { } if err := e.dispatchService.AddHubInboxAssociation( - ctx, + e.ctx, envelope, ); err != nil { e.logger.Debug("failed to add inbox message", zap.Error(err)) @@ -405,7 +406,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) { } if err := e.dispatchService.DeleteHubInboxAssociation( - ctx, + e.ctx, envelope, ); err != nil { e.logger.Debug("failed to add inbox message", zap.Error(err)) @@ -449,10 +450,7 @@ func (e *AppConsensusEngine) handleProposal(message *pb.Message) { e.frameStore[string(frameID)] = frame.Clone().(*protobufs.AppShardFrame) e.frameStoreMu.Unlock() - e.stateMachine.ReceiveProposal( - PeerID{ID: frame.Header.Prover}, - &frame, - ) + e.consensusParticipant.SubmitProposal() proposalProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc() } } @@ -506,64 +504,17 @@ func (e *AppConsensusEngine) handleTimeoutState(message *pb.Message) { return } - if !bytes.Equal(timeoutState.Filter, e.appAddress) { + if !bytes.Equal(timeoutState.Vote.Filter, e.appAddress) { return } - e.frameStoreMu.RLock() - var matchingFrame *protobufs.AppShardFrame - for _, frame := range e.frameStore { - if frame.Header != nil && - frame.Header.FrameNumber == timeoutState.FrameNumber { - frameSelector := e.calculateFrameSelector(frame.Header) - if bytes.Equal(frameSelector, timeoutState.Selector) { - matchingFrame = frame - break - } - } - } - - if matchingFrame == nil { - e.frameStoreMu.RUnlock() - return - } - - e.frameStoreMu.RUnlock() - e.frameStoreMu.Lock() - defer e.frameStoreMu.Unlock() - - matchingFrame.Header.PublicKeySignatureBls48581 = - timeoutState.AggregateSignature - valid, err := e.frameValidator.Validate(matchingFrame) - if !valid || err != nil { - e.logger.Error("received invalid timeoutState", zap.Error(err)) - timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc() - return - } - - if matchingFrame.Header.Prover == nil { - e.logger.Error("timeoutState with no matched prover") - timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc() - return - } - - if err := e.stateMachine.ReceivetimeoutState( - PeerID{ID: matchingFrame.Header.Prover}, - &matchingFrame, - ); err != nil { - e.logger.Error("could not receive timeoutState", zap.Error(err)) - timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc() - return - } - - if err := e.appTimeReel.Insert(ctx, matchingFrame); err != nil { - e.logger.Error( - "could not insert into time reel", - zap.Error(err), - ) - timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc() - return - } + e.timeoutAggregator.AddTimeout(&models.TimeoutState[*protobufs.ProposalVote]{ + Rank: timeoutState.Vote.Rank, + LatestQuorumCertificate: timeoutState.LatestQuorumCertificate, + PriorRankTimeoutCertificate: timeoutState.PriorRankTimeoutCertificate, + Vote: &timeoutState.Vote, + TimeoutTick: timeoutState.TimeoutTick, + }) timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc() } diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 600693c..ec2a4b2 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -27,9 +27,12 @@ import ( "google.golang.org/grpc" "source.quilibrium.com/quilibrium/monorepo/config" "source.quilibrium.com/quilibrium/monorepo/consensus" + "source.quilibrium.com/quilibrium/monorepo/consensus/forks" "source.quilibrium.com/quilibrium/monorepo/consensus/models" "source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub" "source.quilibrium.com/quilibrium/monorepo/consensus/participant" + "source.quilibrium.com/quilibrium/monorepo/consensus/validator" + "source.quilibrium.com/quilibrium/monorepo/consensus/verification" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" "source.quilibrium.com/quilibrium/monorepo/lifecycle" "source.quilibrium.com/quilibrium/monorepo/node/consensus/aggregator" @@ -37,6 +40,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/consensus/reward" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" "source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing" + "source.quilibrium.com/quilibrium/monorepo/node/consensus/voting" "source.quilibrium.com/quilibrium/monorepo/node/dispatch" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global/compat" @@ -165,12 +169,11 @@ type GlobalConsensusEngine struct { ] // Consensus plugins - signatureAggregator consensus.SignatureAggregator - voteAggregationDistributor *pubsub.VoteAggregationDistributor[ - *protobufs.GlobalFrame, - *protobufs.ProposalVote, - ] + signatureAggregator consensus.SignatureAggregator + voteCollectorDistributor *pubsub.VoteCollectorDistributor[*protobufs.ProposalVote] timeoutCollectorDistributor *pubsub.TimeoutCollectorDistributor[*protobufs.ProposalVote] + voteAggregator consensus.VoteAggregator[*protobufs.GlobalFrame, *protobufs.ProposalVote] + timeoutAggregator consensus.TimeoutAggregator[*protobufs.ProposalVote] // Provider implementations syncProvider *GlobalSyncProvider @@ -311,16 +314,23 @@ func NewGlobalConsensusEngine( } } + // Create provider implementations + engine.syncProvider = &GlobalSyncProvider{engine: engine} + engine.votingProvider = &GlobalVotingProvider{engine: engine} + engine.leaderProvider = &GlobalLeaderProvider{engine: engine} + engine.livenessProvider = &GlobalLivenessProvider{engine: engine} engine.signatureAggregator = aggregator.WrapSignatureAggregator( engine.blsConstructor, engine.proverRegistry, nil, ) - engine.voteAggregationDistributor = pubsub.NewVoteAggregationDistributor[ - *protobufs.GlobalFrame, - *protobufs.ProposalVote, - ]() - engine.timeoutCollectorDistributor = pubsub.NewTimeoutCollectorDistributor[*protobufs.ProposalVote]() + voteAggregationDistributor := voting.NewGlobalVoteAggregationDistributor() + engine.voteCollectorDistributor = + voteAggregationDistributor.VoteCollectorDistributor + timeoutAggregationDistributor := + voting.NewGlobalTimeoutAggregationDistributor() + engine.timeoutCollectorDistributor = + timeoutAggregationDistributor.TimeoutCollectorDistributor // Create the worker manager engine.workerManager = mgr.NewWorkerManager( @@ -379,12 +389,6 @@ func NewGlobalConsensusEngine( // Establish alert halt context engine.haltCtx, engine.halt = context.WithCancel(context.Background()) - // Create provider implementations - engine.syncProvider = &GlobalSyncProvider{engine: engine} - engine.votingProvider = &GlobalVotingProvider{engine: engine} - engine.leaderProvider = &GlobalLeaderProvider{engine: engine} - engine.livenessProvider = &GlobalLivenessProvider{engine: engine} - // Create dispatch service engine.dispatchService = dispatch.NewDispatchService( inboxStore, @@ -489,6 +493,26 @@ func NewGlobalConsensusEngine( initialState = &frame } + engine.voteAggregator, err = voting.NewGlobalVoteAggregator[GlobalPeerID]( + tracing.NewZapTracer(logger), + engine, + voteAggregationDistributor, + engine.signatureAggregator, + engine.votingProvider, + (*initialState).GetRank(), + ) + if err != nil { + return nil, err + } + engine.timeoutAggregator, err = voting.NewAppShardTimeoutAggregator[GlobalPeerID]( + tracing.NewZapTracer(logger), + engine, + engine, + engine.signatureAggregator, + timeoutAggregationDistributor, + (*initialState).GetRank(), + ) + if engine.config.P2P.Network == 99 || engine.config.Engine.ArchiveMode { componentBuilder.AddWorker(func( ctx lifecycle.SignalerContext, @@ -2308,7 +2332,21 @@ func (e *GlobalConsensusEngine) startConsensus( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, ) error { - var err error + trustedRoot := &models.CertifiedState[*protobufs.GlobalFrame]{ + State: &models.State[*protobufs.GlobalFrame]{ + State: initialFrame, + }, + } + notifier := pubsub.NewDistributor[ + *protobufs.GlobalFrame, + *protobufs.ProposalVote, + ]() + + forks, err := forks.NewForks(trustedRoot, e, notifier) + if err != nil { + return err + } + e.consensusParticipant, err = participant.NewParticipant[ *protobufs.GlobalFrame, *protobufs.ProposalVote, @@ -2317,27 +2355,36 @@ func (e *GlobalConsensusEngine) startConsensus( ]( tracing.NewZapTracer(e.logger), // logger e, // committee - e.leaderProvider, // prover - e.votingProvider, // voter - e.consensusStore, // consensusStore - e.signatureAggregator, // signatureAggregator - e, // consensusVerifier - e.voteAggregationDistributor, // voteNotifier - e.timeoutCollectorDistributor, // timeoutNotifier - e, // consumer - e, // finalizer - nil, // filter - &models.CertifiedState[*protobufs.GlobalFrame]{ // trustedRoot - State: &models.State[*protobufs.GlobalFrame]{ - State: initialFrame, - }, - }, + verification.NewSigner[ + *protobufs.GlobalFrame, + *protobufs.ProposalVote, + GlobalPeerID, + ](e.votingProvider), // signer + e.leaderProvider, // prover + e.votingProvider, // voter + notifier, // notifier + e.consensusStore, // consensusStore + e.signatureAggregator, // signatureAggregator + e, // consensusVerifier + e.voteCollectorDistributor, // voteCollectorDistributor + e.timeoutCollectorDistributor, // timeoutCollectorDistributor + forks, // forks + validator.NewValidator[ + *protobufs.GlobalFrame, + *protobufs.ProposalVote, + ](e, e), // validator + e.voteAggregator, // voteAggregator + e.timeoutAggregator, // timeoutAggregator + e, // finalizer + nil, // filter + trustedRoot, ) if err != nil { return err } - e.consensusParticipant.Start(e.ctx) + ready() + e.consensusParticipant.Start(ctx) return nil } diff --git a/node/consensus/voting/voting_aggregator.go b/node/consensus/voting/voting_aggregator.go index 6b86e71..1f163d8 100644 --- a/node/consensus/voting/voting_aggregator.go +++ b/node/consensus/voting/voting_aggregator.go @@ -11,7 +11,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/consensus/validator" "source.quilibrium.com/quilibrium/monorepo/consensus/voteaggregator" "source.quilibrium.com/quilibrium/monorepo/consensus/votecollector" - "source.quilibrium.com/quilibrium/monorepo/node/consensus/global" "source.quilibrium.com/quilibrium/monorepo/protobufs" ) @@ -25,7 +24,7 @@ func NewAppShardVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[ ]() } -func NewAppShardVoteAggregator( +func NewAppShardVoteAggregator[PeerIDT models.Unique]( logger consensus.TraceLogger, committee consensus.DynamicCommittee, voteAggregationDistributor *pubsub.VoteAggregationDistributor[ @@ -36,7 +35,7 @@ func NewAppShardVoteAggregator( votingProvider consensus.VotingProvider[ *protobufs.AppShardFrame, *protobufs.ProposalVote, - global.GlobalPeerID, + PeerIDT, ], currentRank uint64, ) ( @@ -46,7 +45,7 @@ func NewAppShardVoteAggregator( voteProcessorFactory := votecollector.NewVoteProcessorFactory[ *protobufs.AppShardFrame, *protobufs.ProposalVote, - global.GlobalPeerID, + PeerIDT, ](committee, func(qc models.QuorumCertificate) {}) createCollectorFactoryMethod := votecollector.NewStateMachineFactory( @@ -55,7 +54,7 @@ func NewAppShardVoteAggregator( votecollector.VerifyingVoteProcessorFactory[ *protobufs.AppShardFrame, *protobufs.ProposalVote, - global.GlobalPeerID, + PeerIDT, ]( voteProcessorFactory.Create, ), @@ -85,7 +84,7 @@ func NewAppShardTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistri return pubsub.NewTimeoutAggregationDistributor[*protobufs.ProposalVote]() } -func NewAppShardTimeoutAggregator( +func NewAppShardTimeoutAggregator[PeerIDT models.Unique]( logger consensus.TraceLogger, committee consensus.DynamicCommittee, consensusVerifier consensus.Verifier[*protobufs.ProposalVote], @@ -102,7 +101,7 @@ func NewAppShardTimeoutAggregator( timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[ *protobufs.AppShardFrame, *protobufs.ProposalVote, - global.GlobalPeerID, + PeerIDT, ]( logger, signatureAggregator, @@ -143,7 +142,7 @@ func NewGlobalVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[ ]() } -func NewGlobalVoteAggregator( +func NewGlobalVoteAggregator[PeerIDT models.Unique]( logger consensus.TraceLogger, committee consensus.DynamicCommittee, voteAggregationDistributor *pubsub.VoteAggregationDistributor[ @@ -154,7 +153,7 @@ func NewGlobalVoteAggregator( votingProvider consensus.VotingProvider[ *protobufs.GlobalFrame, *protobufs.ProposalVote, - global.GlobalPeerID, + PeerIDT, ], currentRank uint64, ) ( @@ -164,7 +163,7 @@ func NewGlobalVoteAggregator( voteProcessorFactory := votecollector.NewVoteProcessorFactory[ *protobufs.GlobalFrame, *protobufs.ProposalVote, - global.GlobalPeerID, + PeerIDT, ](committee, func(qc models.QuorumCertificate) {}) createCollectorFactoryMethod := votecollector.NewStateMachineFactory( @@ -173,7 +172,7 @@ func NewGlobalVoteAggregator( votecollector.VerifyingVoteProcessorFactory[ *protobufs.GlobalFrame, *protobufs.ProposalVote, - global.GlobalPeerID, + PeerIDT, ]( voteProcessorFactory.Create, ), @@ -203,7 +202,7 @@ func NewGlobalTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistribu return pubsub.NewTimeoutAggregationDistributor[*protobufs.ProposalVote]() } -func NewGlobalTimeoutAggregator( +func NewGlobalTimeoutAggregator[PeerIDT models.Unique]( logger consensus.TraceLogger, committee consensus.DynamicCommittee, consensusVerifier consensus.Verifier[*protobufs.ProposalVote], @@ -220,7 +219,7 @@ func NewGlobalTimeoutAggregator( timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[ *protobufs.GlobalFrame, *protobufs.ProposalVote, - global.GlobalPeerID, + PeerIDT, ]( logger, signatureAggregator,