mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
274 lines
7.3 KiB
Go
274 lines
7.3 KiB
Go
package voting
|
|
|
|
import (
|
|
"slices"
|
|
|
|
"github.com/gammazero/workerpool"
|
|
"github.com/pkg/errors"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/notifications/pubsub"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutaggregator"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/timeoutcollector"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/validator"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/voteaggregator"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/votecollector"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
)
|
|
|
|
func NewAppShardVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[
|
|
*protobufs.AppShardFrame,
|
|
*protobufs.ProposalVote,
|
|
] {
|
|
return pubsub.NewVoteAggregationDistributor[
|
|
*protobufs.AppShardFrame,
|
|
*protobufs.ProposalVote,
|
|
]()
|
|
}
|
|
|
|
func NewAppShardVoteAggregator[PeerIDT models.Unique](
|
|
logger consensus.TraceLogger,
|
|
filter []byte,
|
|
committee consensus.DynamicCommittee,
|
|
voteAggregationDistributor *pubsub.VoteAggregationDistributor[
|
|
*protobufs.AppShardFrame,
|
|
*protobufs.ProposalVote,
|
|
],
|
|
signatureAggregator consensus.SignatureAggregator,
|
|
votingProvider consensus.VotingProvider[
|
|
*protobufs.AppShardFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
],
|
|
onQCCreated consensus.OnQuorumCertificateCreated,
|
|
currentRank uint64,
|
|
) (
|
|
consensus.VoteAggregator[*protobufs.AppShardFrame, *protobufs.ProposalVote],
|
|
error,
|
|
) {
|
|
voteProcessorFactory := votecollector.NewVoteProcessorFactory[
|
|
*protobufs.AppShardFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
](committee, onQCCreated)
|
|
|
|
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(
|
|
logger,
|
|
filter,
|
|
voteAggregationDistributor,
|
|
votecollector.VerifyingVoteProcessorFactory[
|
|
*protobufs.AppShardFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
](
|
|
voteProcessorFactory.Create,
|
|
),
|
|
slices.Concat([]byte("appshard"), filter),
|
|
signatureAggregator,
|
|
votingProvider,
|
|
)
|
|
voteCollectors := voteaggregator.NewVoteCollectors(
|
|
logger,
|
|
currentRank,
|
|
workerpool.New(2),
|
|
createCollectorFactoryMethod,
|
|
)
|
|
|
|
// initialize the vote aggregator
|
|
voteAggregator, err := voteaggregator.NewVoteAggregator(
|
|
logger,
|
|
voteAggregationDistributor,
|
|
currentRank,
|
|
voteCollectors,
|
|
)
|
|
|
|
return voteAggregator, errors.Wrap(err, "new global vote aggregator")
|
|
}
|
|
|
|
func NewAppShardTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote] {
|
|
return pubsub.NewTimeoutAggregationDistributor[*protobufs.ProposalVote]()
|
|
}
|
|
|
|
func NewAppShardTimeoutAggregator[PeerIDT models.Unique](
|
|
logger consensus.TraceLogger,
|
|
filter []byte,
|
|
committee consensus.DynamicCommittee,
|
|
consensusVerifier consensus.Verifier[*protobufs.ProposalVote],
|
|
signatureAggregator consensus.SignatureAggregator,
|
|
timeoutAggregationDistributor *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote],
|
|
votingProvider consensus.VotingProvider[
|
|
*protobufs.AppShardFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
],
|
|
currentRank uint64,
|
|
) (consensus.TimeoutAggregator[*protobufs.ProposalVote], error) {
|
|
// initialize the Validator
|
|
validator := validator.NewValidator[*protobufs.AppShardFrame](
|
|
committee,
|
|
consensusVerifier,
|
|
)
|
|
|
|
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[
|
|
*protobufs.AppShardFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
](
|
|
logger,
|
|
filter,
|
|
signatureAggregator,
|
|
timeoutAggregationDistributor,
|
|
committee,
|
|
validator,
|
|
votingProvider,
|
|
slices.Concat([]byte("appshardtimeout"), filter),
|
|
)
|
|
|
|
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(
|
|
logger,
|
|
timeoutAggregationDistributor,
|
|
timeoutProcessorFactory,
|
|
)
|
|
timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(
|
|
logger,
|
|
currentRank,
|
|
timeoutCollectorFactory,
|
|
)
|
|
|
|
// initialize the timeout aggregator
|
|
timeoutAggregator, err := timeoutaggregator.NewTimeoutAggregator(
|
|
logger,
|
|
currentRank,
|
|
timeoutCollectors,
|
|
)
|
|
|
|
return timeoutAggregator, errors.Wrap(err, "new global timeout aggregator")
|
|
}
|
|
|
|
func NewGlobalVoteAggregationDistributor() *pubsub.VoteAggregationDistributor[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
] {
|
|
return pubsub.NewVoteAggregationDistributor[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
]()
|
|
}
|
|
|
|
func NewGlobalVoteAggregator[PeerIDT models.Unique](
|
|
logger consensus.TraceLogger,
|
|
committee consensus.DynamicCommittee,
|
|
voteAggregationDistributor *pubsub.VoteAggregationDistributor[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
],
|
|
signatureAggregator consensus.SignatureAggregator,
|
|
votingProvider consensus.VotingProvider[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
],
|
|
onQCCreated consensus.OnQuorumCertificateCreated,
|
|
currentRank uint64,
|
|
) (
|
|
consensus.VoteAggregator[*protobufs.GlobalFrame, *protobufs.ProposalVote],
|
|
error,
|
|
) {
|
|
voteProcessorFactory := votecollector.NewVoteProcessorFactory[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
](committee, onQCCreated)
|
|
|
|
createCollectorFactoryMethod := votecollector.NewStateMachineFactory(
|
|
logger,
|
|
nil,
|
|
voteAggregationDistributor,
|
|
votecollector.VerifyingVoteProcessorFactory[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
](
|
|
voteProcessorFactory.Create,
|
|
),
|
|
[]byte("global"),
|
|
signatureAggregator,
|
|
votingProvider,
|
|
)
|
|
voteCollectors := voteaggregator.NewVoteCollectors(
|
|
logger,
|
|
currentRank,
|
|
workerpool.New(2),
|
|
createCollectorFactoryMethod,
|
|
)
|
|
|
|
// initialize the vote aggregator
|
|
voteAggregator, err := voteaggregator.NewVoteAggregator(
|
|
logger,
|
|
voteAggregationDistributor,
|
|
currentRank,
|
|
voteCollectors,
|
|
)
|
|
|
|
return voteAggregator, errors.Wrap(err, "new global vote aggregator")
|
|
}
|
|
|
|
func NewGlobalTimeoutAggregationDistributor() *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote] {
|
|
return pubsub.NewTimeoutAggregationDistributor[*protobufs.ProposalVote]()
|
|
}
|
|
|
|
func NewGlobalTimeoutAggregator[PeerIDT models.Unique](
|
|
logger consensus.TraceLogger,
|
|
committee consensus.DynamicCommittee,
|
|
consensusVerifier consensus.Verifier[*protobufs.ProposalVote],
|
|
signatureAggregator consensus.SignatureAggregator,
|
|
timeoutAggregationDistributor *pubsub.TimeoutAggregationDistributor[*protobufs.ProposalVote],
|
|
votingProvider consensus.VotingProvider[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
],
|
|
currentRank uint64,
|
|
) (consensus.TimeoutAggregator[*protobufs.ProposalVote], error) {
|
|
// initialize the Validator
|
|
validator := validator.NewValidator[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
](committee, consensusVerifier)
|
|
|
|
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory[
|
|
*protobufs.GlobalFrame,
|
|
*protobufs.ProposalVote,
|
|
PeerIDT,
|
|
](
|
|
logger,
|
|
nil,
|
|
signatureAggregator,
|
|
timeoutAggregationDistributor,
|
|
committee,
|
|
validator,
|
|
votingProvider,
|
|
[]byte("globaltimeout"),
|
|
)
|
|
|
|
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(
|
|
logger,
|
|
timeoutAggregationDistributor,
|
|
timeoutProcessorFactory,
|
|
)
|
|
timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(
|
|
logger,
|
|
currentRank,
|
|
timeoutCollectorFactory,
|
|
)
|
|
|
|
// initialize the timeout aggregator
|
|
timeoutAggregator, err := timeoutaggregator.NewTimeoutAggregator(
|
|
logger,
|
|
currentRank,
|
|
timeoutCollectors,
|
|
)
|
|
|
|
return timeoutAggregator, errors.Wrap(err, "new global timeout aggregator")
|
|
}
|