ceremonyclient/consensus/pacemaker/pacemaker.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

332 lines
11 KiB
Go

package pacemaker
import (
"context"
"fmt"
"time"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/pacemaker/timeout"
"source.quilibrium.com/quilibrium/monorepo/consensus/tracker"
)
// Pacemaker implements consensus.Pacemaker
// Conceptually, we use the Pacemaker algorithm first proposed in [1]
// (specifically Jolteon) and described in more detail in [2] (aka DiemBFT v4).
// [1] https://arxiv.org/abs/2106.10362
// [2] https://developers.diem.com/papers/diem-consensus-state-machine-replication-in-the-diem-statechain/2021-08-17.pdf
//
// To enter a new rank `r`, the Pacemaker must observe a valid QC or TC for rank
// `r-1`. The Pacemaker also controls when a node should locally time out for a
// given rank. Locally timing a rank does not cause a rank change.
// A local timeout for a rank `r` causes a node to:
// - never produce a vote for any proposal with rank ≤ `r`, after the timeout
// - produce and broadcast a timeout object, which can form a part of the TC
// for the timed out rank
//
// Not concurrency safe.
type Pacemaker[StateT models.Unique, VoteT models.Unique] struct {
consensus.ProposalDurationProvider
ctx context.Context
tracer consensus.TraceLogger
timeoutControl *timeout.Controller
notifier consensus.ParticipantConsumer[StateT, VoteT]
rankTracker rankTracker[StateT, VoteT]
started bool
}
var _ consensus.Pacemaker = (*Pacemaker[*nilUnique, *nilUnique])(nil)
var _ consensus.ProposalDurationProvider = (*Pacemaker[*nilUnique, *nilUnique])(nil)
// New creates a new Pacemaker instance
// - startRank is the rank for the pacemaker to start with.
// - timeoutController controls the timeout trigger.
// - notifier provides callbacks for pacemaker events.
//
// Expected error conditions:
// * models.ConfigurationError if initial LivenessState is invalid
func NewPacemaker[StateT models.Unique, VoteT models.Unique](
filter []byte,
timeoutController *timeout.Controller,
proposalDurationProvider consensus.ProposalDurationProvider,
notifier consensus.Consumer[StateT, VoteT],
store consensus.ConsensusStore[VoteT],
tracer consensus.TraceLogger,
recovery ...recoveryInformation[StateT, VoteT],
) (*Pacemaker[StateT, VoteT], error) {
vt, err := newRankTracker[StateT, VoteT](filter, store)
if err != nil {
return nil, fmt.Errorf("initializing rank tracker failed: %w", err)
}
pm := &Pacemaker[StateT, VoteT]{
ProposalDurationProvider: proposalDurationProvider,
timeoutControl: timeoutController,
notifier: notifier,
rankTracker: vt,
tracer: tracer,
started: false,
}
for _, recoveryAction := range recovery {
err = recoveryAction(pm)
if err != nil {
return nil, fmt.Errorf("ingesting recovery information failed: %w", err)
}
}
return pm, nil
}
// CurrentRank returns the current rank
func (p *Pacemaker[StateT, VoteT]) CurrentRank() uint64 {
return p.rankTracker.CurrentRank()
}
// LatestQuorumCertificate returns QC with the highest rank discovered by
// Pacemaker.
func (
p *Pacemaker[StateT, VoteT],
) LatestQuorumCertificate() models.QuorumCertificate {
return p.rankTracker.LatestQuorumCertificate()
}
// PriorRankTimeoutCertificate returns TC for last rank, this will be nil only
// if the current rank was entered with a QC.
func (
p *Pacemaker[StateT, VoteT],
) PriorRankTimeoutCertificate() models.TimeoutCertificate {
return p.rankTracker.PriorRankTimeoutCertificate()
}
// TimeoutCh returns the timeout channel for current active timeout.
// Note the returned timeout channel returns only one timeout, which is the
// current timeout. To get the timeout for the next timeout, you need to call
// TimeoutCh() again.
func (p *Pacemaker[StateT, VoteT]) TimeoutCh() <-chan time.Time {
return p.timeoutControl.Channel()
}
// ReceiveQuorumCertificate notifies the pacemaker with a new QC, which might
// allow pacemaker to fast-forward its rank. In contrast to
// `ReceiveTimeoutCertificate`, this function does _not_ handle `nil` inputs.
// No errors are expected, any error should be treated as exception.
func (p *Pacemaker[StateT, VoteT]) ReceiveQuorumCertificate(
qc models.QuorumCertificate,
) (*models.NextRank, error) {
initialRank := p.CurrentRank()
resultingRank, err := p.rankTracker.ReceiveQuorumCertificate(qc)
if err != nil {
return nil, fmt.Errorf(
"unexpected exception in rankTracker while processing QC for rank %d: %w",
qc.GetRank(),
err,
)
}
if resultingRank <= initialRank {
return nil, nil
}
// QC triggered rank change:
p.timeoutControl.OnProgressBeforeTimeout()
p.notifier.OnQuorumCertificateTriggeredRankChange(
initialRank,
resultingRank,
qc,
)
p.notifier.OnRankChange(initialRank, resultingRank)
timerInfo := p.timeoutControl.StartTimeout(p.ctx, resultingRank)
p.notifier.OnStartingTimeout(
timerInfo.StartTime,
timerInfo.StartTime.Add(timerInfo.Duration),
)
return &models.NextRank{
Rank: timerInfo.Rank,
Start: timerInfo.StartTime,
End: timerInfo.StartTime.Add(timerInfo.Duration),
}, nil
}
// ReceiveTimeoutCertificate notifies the Pacemaker of a new timeout
// certificate, which may allow Pacemaker to fast-forward its current rank. A
// nil TC is an expected valid input, so that callers may pass in e.g.
// `Proposal.PriorRankTimeoutCertificate`, which may or may not have a value.
// No errors are expected, any error should be treated as exception
func (p *Pacemaker[StateT, VoteT]) ReceiveTimeoutCertificate(
tc models.TimeoutCertificate,
) (*models.NextRank, error) {
initialRank := p.CurrentRank()
resultingRank, err := p.rankTracker.ReceiveTimeoutCertificate(tc)
if err != nil {
return nil, fmt.Errorf(
"unexpected exception in rankTracker while processing TC for rank %d: %w",
tc.GetRank(),
err,
)
}
p.tracer.Trace(
"pacemaker receive tc",
consensus.Uint64Param("resulting_rank", resultingRank),
consensus.Uint64Param("initial_rank", initialRank),
)
if resultingRank <= initialRank {
return nil, nil
}
// TC triggered rank change:
p.timeoutControl.OnTimeout()
p.notifier.OnTimeoutCertificateTriggeredRankChange(
initialRank,
resultingRank,
tc,
)
p.notifier.OnRankChange(initialRank, resultingRank)
timerInfo := p.timeoutControl.StartTimeout(p.ctx, resultingRank)
p.notifier.OnStartingTimeout(
timerInfo.StartTime,
timerInfo.StartTime.Add(timerInfo.Duration),
)
return &models.NextRank{
Rank: timerInfo.Rank,
Start: timerInfo.StartTime,
End: timerInfo.StartTime.Add(timerInfo.Duration),
}, nil
}
// Start starts the pacemaker by starting the initial timer for the current
// rank. Start should only be called once - subsequent calls are a no-op.
// CAUTION: Pacemaker is not concurrency safe. The Start method must
// be executed by the same goroutine that also calls the other business logic
// methods, or concurrency safety has to be implemented externally.
func (p *Pacemaker[StateT, VoteT]) Start(ctx context.Context) {
if p.started {
return
}
p.started = true
p.ctx = ctx
timerInfo := p.timeoutControl.StartTimeout(ctx, p.CurrentRank())
p.notifier.OnStartingTimeout(
timerInfo.StartTime,
timerInfo.StartTime.Add(timerInfo.Duration),
)
}
/* ------------------------------------ recovery parameters for Pacemaker ------------------------------------ */
// recoveryInformation provides optional information to the Pacemaker during its
// construction to ingest additional information that was potentially lost
// during a crash or reboot. Following the "information-driven" approach, we
// consider potentially older or redundant information as consistent with our
// already-present knowledge, i.e. as a no-op.
type recoveryInformation[
StateT models.Unique,
VoteT models.Unique,
] func(p *Pacemaker[StateT, VoteT]) error
// WithQCs informs the Pacemaker about the given QCs. Old and nil QCs are
// accepted (no-op).
func WithQCs[
StateT models.Unique,
VoteT models.Unique,
](qcs ...models.QuorumCertificate) recoveryInformation[StateT, VoteT] {
// To avoid excessive database writes during initialization, we pre-filter the
// newest QC here and only hand that one to the rankTracker. For recovery, we
// allow the special case of nil QCs, because the genesis state has no QC.
tracker := tracker.NewNewestQCTracker()
for _, qc := range qcs {
if qc == nil {
continue // no-op
}
tracker.Track(&qc)
}
newestQC := tracker.NewestQC()
if newestQC == nil {
return func(p *Pacemaker[StateT, VoteT]) error { return nil } // no-op
}
return func(p *Pacemaker[StateT, VoteT]) error {
_, err := p.rankTracker.ReceiveQuorumCertificate(*newestQC)
return err
}
}
// WithTCs informs the Pacemaker about the given TCs. Old and nil TCs are
// accepted (no-op).
func WithTCs[
StateT models.Unique,
VoteT models.Unique,
](tcs ...models.TimeoutCertificate) recoveryInformation[StateT, VoteT] {
qcTracker := tracker.NewNewestQCTracker()
tcTracker := tracker.NewNewestTCTracker()
for _, tc := range tcs {
if tc == nil {
continue // no-op
}
tcTracker.Track(&tc)
qc := tc.GetLatestQuorumCert()
qcTracker.Track(&qc)
}
newestTC := tcTracker.NewestTC()
newestQC := qcTracker.NewestQC()
if newestTC == nil { // shortcut if no TCs provided
return func(p *Pacemaker[StateT, VoteT]) error { return nil } // no-op
}
return func(p *Pacemaker[StateT, VoteT]) error {
_, err := p.rankTracker.ReceiveTimeoutCertificate(*newestTC) // allows nil inputs
if err != nil {
return fmt.Errorf(
"rankTracker failed to process newest TC provided in constructor: %w",
err,
)
}
_, err = p.rankTracker.ReceiveQuorumCertificate(*newestQC) // should never be nil, because a valid TC always contain a QC
if err != nil {
return fmt.Errorf(
"rankTracker failed to process newest QC extracted from the TCs provided in constructor: %w",
err,
)
}
return nil
}
}
// Type used to satisfy generic arguments in compiler time type assertion check
type nilUnique struct{}
// GetSignature implements models.Unique.
func (n *nilUnique) GetSignature() []byte {
panic("unimplemented")
}
// GetTimestamp implements models.Unique.
func (n *nilUnique) GetTimestamp() uint64 {
panic("unimplemented")
}
// Source implements models.Unique.
func (n *nilUnique) Source() models.Identity {
panic("unimplemented")
}
// Clone implements models.Unique.
func (n *nilUnique) Clone() models.Unique {
panic("unimplemented")
}
// GetRank implements models.Unique.
func (n *nilUnique) GetRank() uint64 {
panic("unimplemented")
}
// Identity implements models.Unique.
func (n *nilUnique) Identity() models.Identity {
panic("unimplemented")
}
var _ models.Unique = (*nilUnique)(nil)