ceremonyclient/consensus/eventloop/event_loop.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

383 lines
13 KiB
Go

package eventloop
import (
"context"
"fmt"
"time"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/tracker"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
)
// queuedProposal is a helper structure that is used to transmit proposal in
// channel it contains an attached insertionTime that is used to measure how
// long we have waited between queening proposal and actually processing by
// `EventHandler`.
type queuedProposal[StateT models.Unique, VoteT models.Unique] struct {
proposal *models.SignedProposal[StateT, VoteT]
insertionTime time.Time
}
// EventLoop buffers all incoming events to the hotstuff EventHandler, and feeds
// EventHandler one event at a time.
type EventLoop[StateT models.Unique, VoteT models.Unique] struct {
*lifecycle.ComponentManager
eventHandler consensus.EventHandler[StateT, VoteT]
proposals chan queuedProposal[StateT, VoteT]
newestSubmittedTimeoutCertificate *tracker.NewestTCTracker
newestSubmittedQc *tracker.NewestQCTracker
newestSubmittedPartialTimeoutCertificate *tracker.NewestPartialTimeoutCertificateTracker
tcSubmittedNotifier chan struct{}
qcSubmittedNotifier chan struct{}
partialTimeoutCertificateCreatedNotifier chan struct{}
startTime time.Time
tracer consensus.TraceLogger
}
var _ consensus.EventLoop[*nilUnique, *nilUnique] = (*EventLoop[*nilUnique, *nilUnique])(nil)
// NewEventLoop creates an instance of EventLoop.
func NewEventLoop[StateT models.Unique, VoteT models.Unique](
tracer consensus.TraceLogger,
eventHandler consensus.EventHandler[StateT, VoteT],
startTime time.Time,
) (*EventLoop[StateT, VoteT], error) {
// we will use a buffered channel to avoid blocking of caller
// we can't afford to drop messages since it undermines liveness, but we also
// want to avoid blocking of compliance engine. We assume that we should be
// able to process proposals faster than compliance engine feeds them, worst
// case we will fill the buffer and state compliance engine worker but that
// should happen only if compliance engine receives large number of states in
// short period of time(when catching up for instance).
proposals := make(chan queuedProposal[StateT, VoteT], 1000)
el := &EventLoop[StateT, VoteT]{
tracer: tracer,
eventHandler: eventHandler,
proposals: proposals,
tcSubmittedNotifier: make(chan struct{}, 1),
qcSubmittedNotifier: make(chan struct{}, 1),
partialTimeoutCertificateCreatedNotifier: make(chan struct{}, 1),
newestSubmittedTimeoutCertificate: tracker.NewNewestTCTracker(),
newestSubmittedQc: tracker.NewNewestQCTracker(),
newestSubmittedPartialTimeoutCertificate: tracker.NewNewestPartialTimeoutCertificateTracker(),
startTime: startTime,
}
componentBuilder := lifecycle.NewComponentManagerBuilder()
componentBuilder.AddWorker(func(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
ready()
// launch when scheduled by el.startTime
el.tracer.Trace(fmt.Sprintf("event loop will start at: %v", el.startTime))
select {
case <-ctx.Done():
return
case <-time.After(time.Until(el.startTime)):
el.tracer.Trace("starting event loop")
err := el.loop(ctx)
if err != nil {
el.tracer.Error("irrecoverable event loop error", err)
ctx.Throw(err)
}
}
})
el.ComponentManager = componentBuilder.Build()
return el, nil
}
// loop executes the core HotStuff logic in a single thread. It picks inputs
// from the various inbound channels and executes the EventHandler's respective
// method for processing this input. During normal operations, the EventHandler
// is not expected to return any errors, as all inputs are assumed to be fully
// validated (or produced by trusted components within the node). Therefore,
// any error is a symptom of state corruption, bugs or violation of API
// contracts. In all cases, continuing operations is not an option, i.e. we exit
// the event loop and return an exception.
func (el *EventLoop[StateT, VoteT]) loop(ctx context.Context) error {
err := el.eventHandler.Start(ctx)
if err != nil {
return fmt.Errorf("could not start event handler: %w", err)
}
shutdownSignaled := ctx.Done()
timeoutCertificates := el.tcSubmittedNotifier
quorumCertificates := el.qcSubmittedNotifier
partialTCs := el.partialTimeoutCertificateCreatedNotifier
for {
// Giving timeout events the priority to be processed first.
// This is to prevent attacks from malicious nodes that attempt
// to block honest nodes' pacemaker from progressing by sending
// other events.
timeoutChannel := el.eventHandler.TimeoutChannel()
// the first select makes sure we process timeouts with priority
select {
// if we receive the shutdown signal, exit the loop
case <-shutdownSignaled:
el.tracer.Trace("shutting down event loop")
return nil
// processing timeout or partial TC event are top priority since
// they allow node to contribute to TC aggregation when replicas can't
// make progress on happy path
case <-timeoutChannel:
el.tracer.Trace("received timeout")
err = el.eventHandler.OnLocalTimeout()
if err != nil {
return fmt.Errorf("could not process timeout: %w", err)
}
// At this point, we have received and processed an event from the timeout
// channel. A timeout also means that we have made progress. A new timeout
// will have been started and el.eventHandler.TimeoutChannel() will be a
// NEW channel (for the just-started timeout). Very important to start the
// for loop from the beginning, to continue the with the new timeout
// channel!
continue
case <-partialTCs:
el.tracer.Trace("received partial timeout")
err = el.eventHandler.OnPartialTimeoutCertificateCreated(
el.newestSubmittedPartialTimeoutCertificate.NewestPartialTimeoutCertificate(),
)
if err != nil {
return fmt.Errorf("could not process partial created TC event: %w", err)
}
// At this point, we have received and processed partial TC event, it
// could have resulted in several scenarios:
// 1. a rank change with potential voting or proposal creation
// 2. a created and broadcast timeout state
// 3. QC and TC didn't result in rank change and no timeout was created
// since we have already timed out or the partial TC was created for rank
// different from current one.
continue
default:
el.tracer.Trace("non-priority event")
// fall through to non-priority events
}
// select for state headers/QCs here
select {
// same as before
case <-shutdownSignaled:
el.tracer.Trace("shutting down event loop")
return nil
// same as before
case <-timeoutChannel:
el.tracer.Trace("received timeout")
err = el.eventHandler.OnLocalTimeout()
if err != nil {
return fmt.Errorf("could not process timeout: %w", err)
}
// if we have a new proposal, process it
case queuedItem := <-el.proposals:
el.tracer.Trace("received proposal")
proposal := queuedItem.proposal
err = el.eventHandler.OnReceiveProposal(proposal)
if err != nil {
return fmt.Errorf(
"could not process proposal %x: %w",
proposal.State.Identifier,
err,
)
}
el.tracer.Trace(
"state proposal has been processed successfully",
consensus.Uint64Param("rank", proposal.State.Rank),
)
// if we have a new QC, process it
case <-quorumCertificates:
el.tracer.Trace("received quorum certificate")
err = el.eventHandler.OnReceiveQuorumCertificate(
*el.newestSubmittedQc.NewestQC(),
)
if err != nil {
return fmt.Errorf("could not process QC: %w", err)
}
// if we have a new TC, process it
case <-timeoutCertificates:
el.tracer.Trace("received timeout certificate")
err = el.eventHandler.OnReceiveTimeoutCertificate(
*el.newestSubmittedTimeoutCertificate.NewestTC(),
)
if err != nil {
return fmt.Errorf("could not process TC: %w", err)
}
case <-partialTCs:
el.tracer.Trace("received partial timeout certificate")
err = el.eventHandler.OnPartialTimeoutCertificateCreated(
el.newestSubmittedPartialTimeoutCertificate.NewestPartialTimeoutCertificate(),
)
if err != nil {
return fmt.Errorf("could no process partial created TC event: %w", err)
}
}
}
}
// SubmitProposal pushes the received state to the proposals channel
func (el *EventLoop[StateT, VoteT]) SubmitProposal(
proposal *models.SignedProposal[StateT, VoteT],
) {
queueItem := queuedProposal[StateT, VoteT]{
proposal: proposal,
insertionTime: time.Now(),
}
select {
case el.proposals <- queueItem:
case <-el.ComponentManager.ShutdownSignal():
return
}
}
// onTrustedQC pushes the received QC (which MUST be validated) to the
// quorumCertificates channel
func (el *EventLoop[StateT, VoteT]) onTrustedQC(qc *models.QuorumCertificate) {
if el.newestSubmittedQc.Track(qc) {
select {
case el.qcSubmittedNotifier <- struct{}{}:
default:
}
}
}
// onTrustedTC pushes the received TC (which MUST be validated) to the
// timeoutCertificates channel
func (el *EventLoop[StateT, VoteT]) onTrustedTC(tc *models.TimeoutCertificate) {
if el.newestSubmittedTimeoutCertificate.Track(tc) {
select {
case el.tcSubmittedNotifier <- struct{}{}:
default:
}
} else {
qc := (*tc).GetLatestQuorumCert()
if el.newestSubmittedQc.Track(&qc) {
select {
case el.qcSubmittedNotifier <- struct{}{}:
default:
}
}
}
}
// OnTimeoutCertificateConstructedFromTimeouts pushes the received TC to the
// timeoutCertificates channel
func (el *EventLoop[StateT, VoteT]) OnTimeoutCertificateConstructedFromTimeouts(
tc models.TimeoutCertificate,
) {
el.onTrustedTC(&tc)
}
// OnPartialTimeoutCertificateCreated created a
// consensus.PartialTimeoutCertificateCreated payload and pushes it into
// partialTimeoutCertificateCreated buffered channel for further processing by
// EventHandler. Since we use buffered channel this function can block if buffer
// is full.
func (el *EventLoop[StateT, VoteT]) OnPartialTimeoutCertificateCreated(
rank uint64,
newestQC models.QuorumCertificate,
previousRankTimeoutCert models.TimeoutCertificate,
) {
event := &consensus.PartialTimeoutCertificateCreated{
Rank: rank,
NewestQuorumCertificate: newestQC,
PriorRankTimeoutCertificate: previousRankTimeoutCert,
}
if el.newestSubmittedPartialTimeoutCertificate.Track(event) {
select {
case el.partialTimeoutCertificateCreatedNotifier <- struct{}{}:
default:
}
}
}
// OnNewQuorumCertificateDiscovered pushes already validated QCs that were
// submitted from TimeoutAggregator to the event handler
func (el *EventLoop[StateT, VoteT]) OnNewQuorumCertificateDiscovered(
qc models.QuorumCertificate,
) {
el.onTrustedQC(&qc)
}
// OnNewTimeoutCertificateDiscovered pushes already validated TCs that were
// submitted from TimeoutAggregator to the event handler
func (el *EventLoop[StateT, VoteT]) OnNewTimeoutCertificateDiscovered(
tc models.TimeoutCertificate,
) {
el.onTrustedTC(&tc)
}
// OnQuorumCertificateConstructedFromVotes implements
// consensus.VoteCollectorConsumer and pushes received qc into processing
// pipeline.
func (el *EventLoop[StateT, VoteT]) OnQuorumCertificateConstructedFromVotes(
qc models.QuorumCertificate,
) {
el.onTrustedQC(&qc)
}
// OnTimeoutProcessed implements consensus.TimeoutCollectorConsumer and is no-op
func (el *EventLoop[StateT, VoteT]) OnTimeoutProcessed(
timeout *models.TimeoutState[VoteT],
) {
}
// OnVoteProcessed implements consensus.VoteCollectorConsumer and is no-op
func (el *EventLoop[StateT, VoteT]) OnVoteProcessed(vote *VoteT) {}
// Type used to satisfy generic arguments in compiler time type assertion check
type nilUnique struct{}
// GetSignature implements models.Unique.
func (n *nilUnique) GetSignature() []byte {
panic("unimplemented")
}
// GetTimestamp implements models.Unique.
func (n *nilUnique) GetTimestamp() uint64 {
panic("unimplemented")
}
// Source implements models.Unique.
func (n *nilUnique) Source() models.Identity {
panic("unimplemented")
}
// Clone implements models.Unique.
func (n *nilUnique) Clone() models.Unique {
panic("unimplemented")
}
// GetRank implements models.Unique.
func (n *nilUnique) GetRank() uint64 {
panic("unimplemented")
}
// Identity implements models.Unique.
func (n *nilUnique) Identity() models.Identity {
panic("unimplemented")
}
var _ models.Unique = (*nilUnique)(nil)