ceremonyclient/node/consensus/voting/voting_aggregator.go
2025-11-26 03:22:48 -06:00

274 lines
7.3 KiB
Go

package voting
import (
"slices"
"github.com/gammazero/workerpool"
"github.com/pkg/errors"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub"
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutaggregator"
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutcollector"
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
"source.quilibrium.com/quilibrium/monorepo/consensus/voteaggregator"
"source.quilibrium.com/quilibrium/monorepo/consensus/votecollector"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
)
func NewAppShardVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
] {
return pubsub.NewVoteAggregationDistributor[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
]()
}
func NewAppShardVoteAggregator[PeerIDT models.Unique](
logger consensus.TraceLogger,
filter []byte,
committee consensus.DynamicCommittee,
voteAggregationDistributor *pubsub.VoteAggregationDistributor[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
],
signatureAggregator consensus.SignatureAggregator,
votingProvider consensus.VotingProvider[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
PeerIDT,
],
onQCCreated consensus.OnQuorumCertificateCreated,
currentRank uint64,
) (
consensus.VoteAggregator[*protobufs.AppShardFrame, *protobufs.ProposalVote],
error,
) {
voteProcessorFactory := votecollector.NewVoteProcessorFactory[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
PeerIDT,
](committee, onQCCreated)
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(
logger,
filter,
voteAggregationDistributor,
votecollector.VerifyingVoteProcessorFactory[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
PeerIDT,
](
voteProcessorFactory.Create,
),
slices.Concat([]byte("appshard"), filter),
signatureAggregator,
votingProvider,
)
voteCollectors := voteaggregator.NewVoteCollectors(
logger,
currentRank,
workerpool.New(2),
createCollectorFactoryMethod,
)
// initialize the vote aggregator
voteAggregator, err := voteaggregator.NewVoteAggregator(
logger,
voteAggregationDistributor,
currentRank,
voteCollectors,
)
return voteAggregator, errors.Wrap(err, "new global vote aggregator")
}
func NewAppShardTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote] {
return pubsub.NewTimeoutAggregationDistributor[*protobufs.ProposalVote]()
}
func NewAppShardTimeoutAggregator[PeerIDT models.Unique](
logger consensus.TraceLogger,
filter []byte,
committee consensus.DynamicCommittee,
consensusVerifier consensus.Verifier[*protobufs.ProposalVote],
signatureAggregator consensus.SignatureAggregator,
timeoutAggregationDistributor *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote],
votingProvider consensus.VotingProvider[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
PeerIDT,
],
currentRank uint64,
) (consensus.TimeoutAggregator[*protobufs.ProposalVote], error) {
// initialize the Validator
validator := validator.NewValidator[*protobufs.AppShardFrame](
committee,
consensusVerifier,
)
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
PeerIDT,
](
logger,
filter,
signatureAggregator,
timeoutAggregationDistributor,
committee,
validator,
votingProvider,
slices.Concat([]byte("appshardtimeout"), filter),
)
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(
logger,
timeoutAggregationDistributor,
timeoutProcessorFactory,
)
timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(
logger,
currentRank,
timeoutCollectorFactory,
)
// initialize the timeout aggregator
timeoutAggregator, err := timeoutaggregator.NewTimeoutAggregator(
logger,
currentRank,
timeoutCollectors,
)
return timeoutAggregator, errors.Wrap(err, "new global timeout aggregator")
}
func NewGlobalVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
] {
return pubsub.NewVoteAggregationDistributor[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
]()
}
func NewGlobalVoteAggregator[PeerIDT models.Unique](
logger consensus.TraceLogger,
committee consensus.DynamicCommittee,
voteAggregationDistributor *pubsub.VoteAggregationDistributor[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
],
signatureAggregator consensus.SignatureAggregator,
votingProvider consensus.VotingProvider[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
PeerIDT,
],
onQCCreated consensus.OnQuorumCertificateCreated,
currentRank uint64,
) (
consensus.VoteAggregator[*protobufs.GlobalFrame, *protobufs.ProposalVote],
error,
) {
voteProcessorFactory := votecollector.NewVoteProcessorFactory[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
PeerIDT,
](committee, onQCCreated)
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(
logger,
nil,
voteAggregationDistributor,
votecollector.VerifyingVoteProcessorFactory[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
PeerIDT,
](
voteProcessorFactory.Create,
),
[]byte("global"),
signatureAggregator,
votingProvider,
)
voteCollectors := voteaggregator.NewVoteCollectors(
logger,
currentRank,
workerpool.New(2),
createCollectorFactoryMethod,
)
// initialize the vote aggregator
voteAggregator, err := voteaggregator.NewVoteAggregator(
logger,
voteAggregationDistributor,
currentRank,
voteCollectors,
)
return voteAggregator, errors.Wrap(err, "new global vote aggregator")
}
func NewGlobalTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote] {
return pubsub.NewTimeoutAggregationDistributor[*protobufs.ProposalVote]()
}
func NewGlobalTimeoutAggregator[PeerIDT models.Unique](
logger consensus.TraceLogger,
committee consensus.DynamicCommittee,
consensusVerifier consensus.Verifier[*protobufs.ProposalVote],
signatureAggregator consensus.SignatureAggregator,
timeoutAggregationDistributor *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote],
votingProvider consensus.VotingProvider[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
PeerIDT,
],
currentRank uint64,
) (consensus.TimeoutAggregator[*protobufs.ProposalVote], error) {
// initialize the Validator
validator := validator.NewValidator[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
](committee, consensusVerifier)
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
PeerIDT,
](
logger,
nil,
signatureAggregator,
timeoutAggregationDistributor,
committee,
validator,
votingProvider,
[]byte("globaltimeout"),
)
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(
logger,
timeoutAggregationDistributor,
timeoutProcessorFactory,
)
timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(
logger,
currentRank,
timeoutCollectorFactory,
)
// initialize the timeout aggregator
timeoutAggregator, err := timeoutaggregator.NewTimeoutAggregator(
logger,
currentRank,
timeoutCollectors,
)
return timeoutAggregator, errors.Wrap(err, "new global timeout aggregator")
}