mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
* wip: conversion of hotstuff from flow into Q-oriented model * bulk of tests * remaining non-integration tests * add integration test, adjust log interface, small tweaks * further adjustments, restore full pacemaker shape * add component lifecycle management+supervisor * further refinements * resolve timeout hanging * mostly finalized state for consensus * bulk of engine swap out * lifecycle-ify most types * wiring nearly complete, missing needed hooks for proposals * plugged in, vetting message validation paths * global consensus, plugged in and verified * app shard now wired in too * do not decode empty keys.yml (#456) * remove obsolete engine.maxFrames config parameter (#454) * default to Info log level unless debug is enabled (#453) * respect config's "logging" section params, remove obsolete single-file logging (#452) * Trivial code cleanup aiming to reduce Go compiler warnings (#451) * simplify range traversal * simplify channel read for single select case * delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24 * simplify range traversal * simplify channel read for single select case * remove redundant type from array * simplify range traversal * simplify channel read for single select case * RC slate * finalize 2.1.0.5 * Update comments in StrictMonotonicCounter Fix comment formatting and clarify description. --------- Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
128 lines
4.5 KiB
Go
128 lines
4.5 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
|
|
)
|
|
|
|
// Distributor distributes notifications to a list of consumers (event
|
|
// consumers).
|
|
//
|
|
// It allows thread-safe subscription of multiple consumers to events.
|
|
type Distributor[StateT models.Unique, VoteT models.Unique] struct {
|
|
*FollowerDistributor[StateT, VoteT]
|
|
*CommunicatorDistributor[StateT, VoteT]
|
|
*ParticipantDistributor[StateT, VoteT]
|
|
}
|
|
|
|
var _ consensus.Consumer[*nilUnique, *nilUnique] = (*Distributor[*nilUnique, *nilUnique])(nil)
|
|
|
|
func NewDistributor[
|
|
StateT models.Unique,
|
|
VoteT models.Unique,
|
|
]() *Distributor[StateT, VoteT] {
|
|
return &Distributor[StateT, VoteT]{
|
|
FollowerDistributor: NewFollowerDistributor[StateT, VoteT](),
|
|
CommunicatorDistributor: NewCommunicatorDistributor[StateT, VoteT](),
|
|
ParticipantDistributor: NewParticipantDistributor[StateT, VoteT](),
|
|
}
|
|
}
|
|
|
|
// AddConsumer adds an event consumer to the Distributor
|
|
func (p *Distributor[StateT, VoteT]) AddConsumer(
|
|
consumer consensus.Consumer[StateT, VoteT],
|
|
) {
|
|
p.FollowerDistributor.AddFollowerConsumer(consumer)
|
|
p.CommunicatorDistributor.AddCommunicatorConsumer(consumer)
|
|
p.ParticipantDistributor.AddParticipantConsumer(consumer)
|
|
}
|
|
|
|
// FollowerDistributor ingests consensus follower events and distributes it to
|
|
// consumers. It allows thread-safe subscription of multiple consumers to
|
|
// events.
|
|
type FollowerDistributor[StateT models.Unique, VoteT models.Unique] struct {
|
|
*ProposalViolationDistributor[StateT, VoteT]
|
|
*FinalizationDistributor[StateT]
|
|
}
|
|
|
|
var _ consensus.FollowerConsumer[*nilUnique, *nilUnique] = (*FollowerDistributor[*nilUnique, *nilUnique])(nil)
|
|
|
|
func NewFollowerDistributor[
|
|
StateT models.Unique,
|
|
VoteT models.Unique,
|
|
]() *FollowerDistributor[StateT, VoteT] {
|
|
return &FollowerDistributor[StateT, VoteT]{
|
|
ProposalViolationDistributor: NewProposalViolationDistributor[StateT, VoteT](),
|
|
FinalizationDistributor: NewFinalizationDistributor[StateT](),
|
|
}
|
|
}
|
|
|
|
// AddFollowerConsumer registers the input `consumer` to be notified on
|
|
// `consensus.ConsensusFollowerConsumer` events.
|
|
func (d *FollowerDistributor[StateT, VoteT]) AddFollowerConsumer(
|
|
consumer consensus.FollowerConsumer[StateT, VoteT],
|
|
) {
|
|
d.FinalizationDistributor.AddFinalizationConsumer(consumer)
|
|
d.ProposalViolationDistributor.AddProposalViolationConsumer(consumer)
|
|
}
|
|
|
|
// TimeoutAggregationDistributor ingests timeout aggregation events and
|
|
// distributes it to consumers. It allows thread-safe subscription of multiple
|
|
// consumers to events.
|
|
type TimeoutAggregationDistributor[VoteT models.Unique] struct {
|
|
*TimeoutAggregationViolationDistributor[VoteT]
|
|
*TimeoutCollectorDistributor[VoteT]
|
|
}
|
|
|
|
var _ consensus.TimeoutAggregationConsumer[*nilUnique] = (*TimeoutAggregationDistributor[*nilUnique])(nil)
|
|
|
|
func NewTimeoutAggregationDistributor[
|
|
VoteT models.Unique,
|
|
]() *TimeoutAggregationDistributor[VoteT] {
|
|
return &TimeoutAggregationDistributor[VoteT]{
|
|
TimeoutAggregationViolationDistributor: NewTimeoutAggregationViolationDistributor[VoteT](),
|
|
TimeoutCollectorDistributor: NewTimeoutCollectorDistributor[VoteT](),
|
|
}
|
|
}
|
|
|
|
func (d *TimeoutAggregationDistributor[VoteT]) AddTimeoutAggregationConsumer(
|
|
consumer consensus.TimeoutAggregationConsumer[VoteT],
|
|
) {
|
|
d.TimeoutAggregationViolationDistributor.
|
|
AddTimeoutAggregationViolationConsumer(consumer)
|
|
d.TimeoutCollectorDistributor.AddTimeoutCollectorConsumer(consumer)
|
|
}
|
|
|
|
// VoteAggregationDistributor ingests vote aggregation events and distributes it
|
|
// to consumers. It allows thread-safe subscription of multiple consumers to
|
|
// events.
|
|
type VoteAggregationDistributor[
|
|
StateT models.Unique,
|
|
VoteT models.Unique,
|
|
] struct {
|
|
*VoteAggregationViolationDistributor[StateT, VoteT]
|
|
*VoteCollectorDistributor[VoteT]
|
|
}
|
|
|
|
var _ consensus.VoteAggregationConsumer[*nilUnique, *nilUnique] = (*VoteAggregationDistributor[*nilUnique, *nilUnique])(nil)
|
|
|
|
func NewVoteAggregationDistributor[
|
|
StateT models.Unique,
|
|
VoteT models.Unique,
|
|
]() *VoteAggregationDistributor[StateT, VoteT] {
|
|
return &VoteAggregationDistributor[StateT, VoteT]{
|
|
VoteAggregationViolationDistributor: NewVoteAggregationViolationDistributor[StateT, VoteT](),
|
|
VoteCollectorDistributor: NewQCCreatedDistributor[VoteT](),
|
|
}
|
|
}
|
|
|
|
func (
|
|
d *VoteAggregationDistributor[StateT, VoteT],
|
|
) AddVoteAggregationConsumer(
|
|
consumer consensus.VoteAggregationConsumer[StateT, VoteT],
|
|
) {
|
|
d.VoteAggregationViolationDistributor.
|
|
AddVoteAggregationViolationConsumer(consumer)
|
|
d.VoteCollectorDistributor.AddVoteCollectorConsumer(consumer)
|
|
}
|