ceremonyclient/consensus/votecollector/vote_processor.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

242 lines
7.4 KiB
Go

package votecollector
import (
"context"
"errors"
"fmt"
"go.uber.org/atomic"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/consensus/signature"
"source.quilibrium.com/quilibrium/monorepo/consensus/verification"
)
/* ***************** Base-Factory for VoteProcessor ****************** */
// provingVoteProcessorFactoryBase implements a factory for creating
// VoteProcessor holds needed dependencies to initialize VoteProcessor.
// CAUTION:
// this base factory only creates the VerifyingVoteProcessor for the given
// state. It does _not_ check the proposer's vote for its own state, i.e. it
// does _not_ implement `consensus.VoteProcessorFactory`. This base factory
// should be wrapped by `votecollector.VoteProcessorFactory` which adds the
// logic to verify the proposer's vote (decorator pattern).
type provingVoteProcessorFactoryBase[
StateT models.Unique,
VoteT models.Unique,
PeerIDT models.Unique,
] struct {
committee consensus.DynamicCommittee
onQCCreated consensus.OnQuorumCertificateCreated
}
// Create creates VoteProcessor for processing votes for the given state.
// Caller must treat all errors as exceptions
func (f *provingVoteProcessorFactoryBase[StateT, VoteT, PeerIDT]) Create(
tracer consensus.TraceLogger,
filter []byte,
state *models.State[StateT],
dsTag []byte,
aggregator consensus.SignatureAggregator,
votingProvider consensus.VotingProvider[StateT, VoteT, PeerIDT],
) (consensus.VerifyingVoteProcessor[StateT, VoteT], error) {
allParticipants, err := f.committee.IdentitiesByState(state.Identifier)
if err != nil {
return nil, fmt.Errorf("error retrieving consensus participants: %w", err)
}
// message that has to be verified against aggregated signature
msg := verification.MakeVoteMessage(filter, state.Rank, state.Identifier)
// prepare the proving public keys of participants
provingKeys := make([][]byte, 0, len(allParticipants))
for _, participant := range allParticipants {
provingKeys = append(provingKeys, participant.PublicKey())
}
provingSigAggtor, err := signature.NewWeightedSignatureAggregator(
allParticipants,
provingKeys,
msg,
dsTag,
aggregator,
)
if err != nil {
return nil, fmt.Errorf(
"could not create aggregator for proving signatures: %w",
err,
)
}
minRequiredWeight, err := f.committee.QuorumThresholdForRank(state.Rank)
if err != nil {
return nil, fmt.Errorf(
"could not get weight threshold for rank %d: %w",
state.Rank,
err,
)
}
return &VoteProcessor[StateT, VoteT, PeerIDT]{
tracer: tracer,
state: state,
provingSigAggtor: provingSigAggtor,
votingProvider: votingProvider,
onQCCreated: f.onQCCreated,
minRequiredWeight: minRequiredWeight,
done: *atomic.NewBool(false),
allParticipants: allParticipants,
}, nil
}
/* ****************** VoteProcessor Implementation ******************* */
// VoteProcessor implements the consensus.VerifyingVoteProcessor interface.
// It processes hotstuff votes from a collector cluster, where participants vote
// in favour of a state by proving their proving key consensus.
// Concurrency safe.
type VoteProcessor[
StateT models.Unique,
VoteT models.Unique,
PeerIDT models.Unique,
] struct {
tracer consensus.TraceLogger
state *models.State[StateT]
provingSigAggtor consensus.WeightedSignatureAggregator
onQCCreated consensus.OnQuorumCertificateCreated
votingProvider consensus.VotingProvider[StateT, VoteT, PeerIDT]
minRequiredWeight uint64
done atomic.Bool
allParticipants []models.WeightedIdentity
}
// State returns state that is part of proposal that we are processing votes for.
func (p *VoteProcessor[StateT, VoteT, PeerIDT]) State() *models.State[StateT] {
return p.state
}
// Status returns status of this vote processor, it's always verifying.
func (p *VoteProcessor[
StateT,
VoteT,
PeerIDT,
]) Status() consensus.VoteCollectorStatus {
return consensus.VoteCollectorStatusVerifying
}
// Process performs processing of single vote in concurrent safe way. This
// function is implemented to be called by multiple goroutines at the same time.
// Supports processing of both proving and threshold signatures. Design of this
// function is event driven, as soon as we collect enough weight to create a QC
// we will immediately do this and submit it via callback for further
// processing.
// Expected error returns during normal operations:
// * VoteForIncompatibleStateError - submitted vote for incompatible state
// * VoteForIncompatibleRankError - submitted vote for incompatible rank
// * models.InvalidVoteError - submitted vote with invalid signature
// All other errors should be treated as exceptions.
func (p *VoteProcessor[StateT, VoteT, PeerIDT]) Process(vote *VoteT) error {
err := EnsureVoteForState[StateT, VoteT](vote, p.state)
if err != nil {
return fmt.Errorf("received incompatible vote: %w", err)
}
// Vote Processing state machine
if p.done.Load() {
return nil
}
err = p.provingSigAggtor.Verify((*vote).Identity(), (*vote).GetSignature())
if err != nil {
if models.IsInvalidSignerError(err) {
return models.NewInvalidVoteErrorf(
vote,
"vote %x for rank %d is not signed by an authorized consensus participant: %w",
(*vote).Identity(),
(*vote).GetRank(),
err,
)
}
if errors.Is(err, models.ErrInvalidSignature) {
return models.NewInvalidVoteErrorf(
vote,
"vote %x for rank %d has an invalid proving signature: %w",
(*vote).Identity(),
(*vote).GetRank(),
err,
)
}
return fmt.Errorf("internal error checking signature validity: %w", err)
}
if p.done.Load() {
return nil
}
totalWeight, err := p.provingSigAggtor.TrustedAdd(
(*vote).Identity(),
(*vote).GetSignature(),
)
if err != nil {
// we don't expect any errors here during normal operation, as we previously
// checked for duplicated votes from the same signer and verified the
// signer+signature
return fmt.Errorf(
"unexpected exception adding signature from vote %x to proving aggregator: %w",
(*vote).Identity(),
err,
)
}
p.tracer.Trace(fmt.Sprintf(
"processed vote, total weight=(%d), required=(%d)",
totalWeight,
p.minRequiredWeight,
))
// checking of conditions for building QC are satisfied
if totalWeight < p.minRequiredWeight {
return nil
}
// At this point, we have enough signatures to build a QC. Another routine
// might just be at this point. To avoid duplicate work, only one routine can
// pass:
if !p.done.CompareAndSwap(false, true) {
return nil
}
qc, err := p.buildQC()
if err != nil {
return fmt.Errorf("internal error constructing QC from votes: %w", err)
}
p.tracer.Trace("new QC has been created")
p.onQCCreated(qc)
return nil
}
// buildQC performs aggregation of signatures when we have collected enough
// weight for building QC. This function is run only once by single worker.
// Any error should be treated as exception.
func (p *VoteProcessor[StateT, VoteT, PeerIDT]) buildQC() (
models.QuorumCertificate,
error,
) {
_, aggregatedSig, err := p.provingSigAggtor.Aggregate()
if err != nil {
return nil, fmt.Errorf("could not aggregate proving signature: %w", err)
}
qc, err := p.votingProvider.FinalizeQuorumCertificate(
context.Background(),
p.state,
aggregatedSig,
)
if err != nil {
return nil, fmt.Errorf("could not build quorum certificate: %w", err)
}
return qc, nil
}