ceremonyclient/node/store/consensus.go
Cassandra Heart c797d482f9
v2.1.0.5 (#457)
* wip: conversion of hotstuff from flow into Q-oriented model

* bulk of tests

* remaining non-integration tests

* add integration test, adjust log interface, small tweaks

* further adjustments, restore full pacemaker shape

* add component lifecycle management+supervisor

* further refinements

* resolve timeout hanging

* mostly finalized state for consensus

* bulk of engine swap out

* lifecycle-ify most types

* wiring nearly complete, missing needed hooks for proposals

* plugged in, vetting message validation paths

* global consensus, plugged in and verified

* app shard now wired in too

* do not decode empty keys.yml (#456)

* remove obsolete engine.maxFrames config parameter (#454)

* default to Info log level unless debug is enabled (#453)

* respect config's  "logging" section params, remove obsolete single-file logging (#452)

* Trivial code cleanup aiming to reduce Go compiler warnings (#451)

* simplify range traversal

* simplify channel read for single select case

* delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24

* simplify range traversal

* simplify channel read for single select case

* remove redundant type from array

* simplify range traversal

* simplify channel read for single select case

* RC slate

* finalize 2.1.0.5

* Update comments in StrictMonotonicCounter

Fix comment formatting and clarify description.

---------

Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2025-11-11 05:00:17 -06:00

354 lines
8.9 KiB
Go

package store
import (
"bytes"
"encoding/binary"
"slices"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
)
type PebbleConsensusStore struct {
db store.KVDB
logger *zap.Logger
}
var _ consensus.ConsensusStore[*protobufs.ProposalVote] = (*PebbleConsensusStore)(nil)
func NewPebbleConsensusStore(
db store.KVDB,
logger *zap.Logger,
) *PebbleConsensusStore {
return &PebbleConsensusStore{
db,
logger,
}
}
// GetConsensusState implements consensus.ConsensusStore.
func (p *PebbleConsensusStore) GetConsensusState(filter []byte) (
*models.ConsensusState[*protobufs.ProposalVote],
error,
) {
value, closer, err := p.db.Get(
slices.Concat([]byte{CONSENSUS, CONSENSUS_STATE}, filter),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get consensus state")
}
defer closer.Close()
c := slices.Clone(value)
if len(c) < 24 {
return nil, errors.Wrap(errors.New("invalid data"), "get consensus state")
}
state := &models.ConsensusState[*protobufs.ProposalVote]{}
buf := bytes.NewBuffer(c)
var filterLen uint32
if err := binary.Read(buf, binary.BigEndian, &filterLen); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
if filterLen > 0 {
filterBytes := make([]byte, filterLen)
if _, err := buf.Read(filterBytes); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
state.Filter = filterBytes
}
if err := binary.Read(
buf,
binary.BigEndian,
&state.FinalizedRank,
); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
if err := binary.Read(
buf,
binary.BigEndian,
&state.LatestAcknowledgedRank,
); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
var latestTimeoutLen uint32
if err := binary.Read(buf, binary.BigEndian, &latestTimeoutLen); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
if latestTimeoutLen > 0 {
latestTimeoutBytes := make([]byte, latestTimeoutLen)
if _, err := buf.Read(latestTimeoutBytes); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
lt := &protobufs.TimeoutState{}
if err := lt.FromCanonicalBytes(latestTimeoutBytes); err != nil {
return nil, errors.Wrap(err, "get consensus state")
}
state.LatestTimeout = &models.TimeoutState[*protobufs.ProposalVote]{
Rank: lt.Vote.Rank,
LatestQuorumCertificate: lt.LatestQuorumCertificate,
PriorRankTimeoutCertificate: lt.PriorRankTimeoutCertificate,
Vote: &lt.Vote,
TimeoutTick: lt.TimeoutTick,
}
}
return state, nil
}
// GetLivenessState implements consensus.ConsensusStore.
func (p *PebbleConsensusStore) GetLivenessState(filter []byte) (
*models.LivenessState,
error,
) {
value, closer, err := p.db.Get(
slices.Concat([]byte{CONSENSUS, CONSENSUS_LIVENESS}, filter),
)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get liveness state")
}
defer closer.Close()
c := slices.Clone(value)
if len(c) < 20 {
return nil, errors.Wrap(errors.New("invalid data"), "get liveness state")
}
state := &models.LivenessState{}
buf := bytes.NewBuffer(c)
var filterLen uint32
if err := binary.Read(buf, binary.BigEndian, &filterLen); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
if filterLen > 0 {
filterBytes := make([]byte, filterLen)
if _, err := buf.Read(filterBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
state.Filter = filterBytes
}
if err := binary.Read(
buf,
binary.BigEndian,
&state.CurrentRank,
); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
var latestQCLen uint32
if err := binary.Read(buf, binary.BigEndian, &latestQCLen); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
if latestQCLen > 0 {
latestQCBytes := make([]byte, latestQCLen)
if _, err := buf.Read(latestQCBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
lt := &protobufs.QuorumCertificate{}
if err := lt.FromCanonicalBytes(latestQCBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
state.LatestQuorumCertificate = lt
}
var priorTCLen uint32
if err := binary.Read(buf, binary.BigEndian, &priorTCLen); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
if priorTCLen > 0 {
priorTCBytes := make([]byte, priorTCLen)
if _, err := buf.Read(priorTCBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
lt := &protobufs.TimeoutCertificate{}
if err := lt.FromCanonicalBytes(priorTCBytes); err != nil {
return nil, errors.Wrap(err, "get liveness state")
}
state.PriorRankTimeoutCertificate = lt
}
return state, nil
}
// PutConsensusState implements consensus.ConsensusStore.
func (p *PebbleConsensusStore) PutConsensusState(
state *models.ConsensusState[*protobufs.ProposalVote],
) error {
buf := new(bytes.Buffer)
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(state.Filter)),
); err != nil {
return errors.Wrap(err, "put consensus state")
}
if _, err := buf.Write(state.Filter); err != nil {
return errors.Wrap(err, "put consensus state")
}
if err := binary.Write(
buf,
binary.BigEndian,
state.FinalizedRank,
); err != nil {
return errors.Wrap(err, "put consensus state")
}
if err := binary.Write(
buf,
binary.BigEndian,
state.LatestAcknowledgedRank,
); err != nil {
return errors.Wrap(err, "put consensus state")
}
if state.LatestTimeout == nil {
if err := binary.Write(
buf,
binary.BigEndian,
uint32(0),
); err != nil {
return errors.Wrap(err, "put consensus state")
}
} else {
var priorTC *protobufs.TimeoutCertificate
if state.LatestTimeout.PriorRankTimeoutCertificate != nil {
priorTC = state.LatestTimeout.PriorRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
}
lt := &protobufs.TimeoutState{
LatestQuorumCertificate: state.LatestTimeout.LatestQuorumCertificate.(*protobufs.QuorumCertificate),
PriorRankTimeoutCertificate: priorTC,
Vote: *state.LatestTimeout.Vote,
TimeoutTick: state.LatestTimeout.TimeoutTick,
}
timeoutBytes, err := lt.ToCanonicalBytes()
if err != nil {
return errors.Wrap(err, "put consensus state")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(timeoutBytes)),
); err != nil {
return errors.Wrap(err, "put consensus state")
}
if _, err := buf.Write(timeoutBytes); err != nil {
return errors.Wrap(err, "put consensus state")
}
}
return errors.Wrap(
p.db.Set(
slices.Concat([]byte{CONSENSUS, CONSENSUS_STATE}, state.Filter),
buf.Bytes(),
),
"put consensus state",
)
}
// PutLivenessState implements consensus.ConsensusStore.
func (p *PebbleConsensusStore) PutLivenessState(
state *models.LivenessState,
) error {
buf := new(bytes.Buffer)
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(state.Filter)),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
if _, err := buf.Write(state.Filter); err != nil {
return errors.Wrap(err, "put liveness state")
}
if err := binary.Write(
buf,
binary.BigEndian,
state.CurrentRank,
); err != nil {
return errors.Wrap(err, "put liveness state")
}
if state.LatestQuorumCertificate == nil {
if err := binary.Write(
buf,
binary.BigEndian,
uint32(0),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
} else {
qc := state.LatestQuorumCertificate.(*protobufs.QuorumCertificate)
qcBytes, err := qc.ToCanonicalBytes()
if err != nil {
return errors.Wrap(err, "put liveness state")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(qcBytes)),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
if _, err := buf.Write(qcBytes); err != nil {
return errors.Wrap(err, "put liveness state")
}
}
if state.PriorRankTimeoutCertificate == nil {
if err := binary.Write(
buf,
binary.BigEndian,
uint32(0),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
} else {
tc := state.PriorRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
timeoutBytes, err := tc.ToCanonicalBytes()
if err != nil {
return errors.Wrap(err, "put liveness state")
}
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(timeoutBytes)),
); err != nil {
return errors.Wrap(err, "put liveness state")
}
if _, err := buf.Write(timeoutBytes); err != nil {
return errors.Wrap(err, "put liveness state")
}
}
return errors.Wrap(
p.db.Set(
slices.Concat([]byte{CONSENSUS, CONSENSUS_LIVENESS}, state.Filter),
buf.Bytes(),
),
"put liveness state",
)
}