mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* wip: conversion of hotstuff from flow into Q-oriented model * bulk of tests * remaining non-integration tests * add integration test, adjust log interface, small tweaks * further adjustments, restore full pacemaker shape * add component lifecycle management+supervisor * further refinements * resolve timeout hanging * mostly finalized state for consensus * bulk of engine swap out * lifecycle-ify most types * wiring nearly complete, missing needed hooks for proposals * plugged in, vetting message validation paths * global consensus, plugged in and verified * app shard now wired in too * do not decode empty keys.yml (#456) * remove obsolete engine.maxFrames config parameter (#454) * default to Info log level unless debug is enabled (#453) * respect config's "logging" section params, remove obsolete single-file logging (#452) * Trivial code cleanup aiming to reduce Go compiler warnings (#451) * simplify range traversal * simplify channel read for single select case * delete rand.Seed() deprecated in Go 1.20 and no-op as of Go 1.24 * simplify range traversal * simplify channel read for single select case * remove redundant type from array * simplify range traversal * simplify channel read for single select case * RC slate * finalize 2.1.0.5 * Update comments in StrictMonotonicCounter Fix comment formatting and clarify description. --------- Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
354 lines
8.9 KiB
Go
354 lines
8.9 KiB
Go
package store
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"slices"
|
|
|
|
"github.com/cockroachdb/pebble"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/store"
|
|
)
|
|
|
|
type PebbleConsensusStore struct {
|
|
db store.KVDB
|
|
logger *zap.Logger
|
|
}
|
|
|
|
var _ consensus.ConsensusStore[*protobufs.ProposalVote] = (*PebbleConsensusStore)(nil)
|
|
|
|
func NewPebbleConsensusStore(
|
|
db store.KVDB,
|
|
logger *zap.Logger,
|
|
) *PebbleConsensusStore {
|
|
return &PebbleConsensusStore{
|
|
db,
|
|
logger,
|
|
}
|
|
}
|
|
|
|
// GetConsensusState implements consensus.ConsensusStore.
|
|
func (p *PebbleConsensusStore) GetConsensusState(filter []byte) (
|
|
*models.ConsensusState[*protobufs.ProposalVote],
|
|
error,
|
|
) {
|
|
value, closer, err := p.db.Get(
|
|
slices.Concat([]byte{CONSENSUS, CONSENSUS_STATE}, filter),
|
|
)
|
|
if err != nil {
|
|
if errors.Is(err, pebble.ErrNotFound) {
|
|
return nil, ErrNotFound
|
|
}
|
|
|
|
return nil, errors.Wrap(err, "get consensus state")
|
|
}
|
|
defer closer.Close()
|
|
|
|
c := slices.Clone(value)
|
|
if len(c) < 24 {
|
|
return nil, errors.Wrap(errors.New("invalid data"), "get consensus state")
|
|
}
|
|
|
|
state := &models.ConsensusState[*protobufs.ProposalVote]{}
|
|
buf := bytes.NewBuffer(c)
|
|
|
|
var filterLen uint32
|
|
if err := binary.Read(buf, binary.BigEndian, &filterLen); err != nil {
|
|
return nil, errors.Wrap(err, "get consensus state")
|
|
}
|
|
if filterLen > 0 {
|
|
filterBytes := make([]byte, filterLen)
|
|
if _, err := buf.Read(filterBytes); err != nil {
|
|
return nil, errors.Wrap(err, "get consensus state")
|
|
}
|
|
state.Filter = filterBytes
|
|
}
|
|
|
|
if err := binary.Read(
|
|
buf,
|
|
binary.BigEndian,
|
|
&state.FinalizedRank,
|
|
); err != nil {
|
|
return nil, errors.Wrap(err, "get consensus state")
|
|
}
|
|
|
|
if err := binary.Read(
|
|
buf,
|
|
binary.BigEndian,
|
|
&state.LatestAcknowledgedRank,
|
|
); err != nil {
|
|
return nil, errors.Wrap(err, "get consensus state")
|
|
}
|
|
|
|
var latestTimeoutLen uint32
|
|
if err := binary.Read(buf, binary.BigEndian, &latestTimeoutLen); err != nil {
|
|
return nil, errors.Wrap(err, "get consensus state")
|
|
}
|
|
if latestTimeoutLen > 0 {
|
|
latestTimeoutBytes := make([]byte, latestTimeoutLen)
|
|
if _, err := buf.Read(latestTimeoutBytes); err != nil {
|
|
return nil, errors.Wrap(err, "get consensus state")
|
|
}
|
|
lt := &protobufs.TimeoutState{}
|
|
if err := lt.FromCanonicalBytes(latestTimeoutBytes); err != nil {
|
|
return nil, errors.Wrap(err, "get consensus state")
|
|
}
|
|
state.LatestTimeout = &models.TimeoutState[*protobufs.ProposalVote]{
|
|
Rank: lt.Vote.Rank,
|
|
LatestQuorumCertificate: lt.LatestQuorumCertificate,
|
|
PriorRankTimeoutCertificate: lt.PriorRankTimeoutCertificate,
|
|
Vote: <.Vote,
|
|
TimeoutTick: lt.TimeoutTick,
|
|
}
|
|
}
|
|
|
|
return state, nil
|
|
}
|
|
|
|
// GetLivenessState implements consensus.ConsensusStore.
|
|
func (p *PebbleConsensusStore) GetLivenessState(filter []byte) (
|
|
*models.LivenessState,
|
|
error,
|
|
) {
|
|
value, closer, err := p.db.Get(
|
|
slices.Concat([]byte{CONSENSUS, CONSENSUS_LIVENESS}, filter),
|
|
)
|
|
if err != nil {
|
|
if errors.Is(err, pebble.ErrNotFound) {
|
|
return nil, ErrNotFound
|
|
}
|
|
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
defer closer.Close()
|
|
|
|
c := slices.Clone(value)
|
|
if len(c) < 20 {
|
|
return nil, errors.Wrap(errors.New("invalid data"), "get liveness state")
|
|
}
|
|
|
|
state := &models.LivenessState{}
|
|
buf := bytes.NewBuffer(c)
|
|
|
|
var filterLen uint32
|
|
if err := binary.Read(buf, binary.BigEndian, &filterLen); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
if filterLen > 0 {
|
|
filterBytes := make([]byte, filterLen)
|
|
if _, err := buf.Read(filterBytes); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
state.Filter = filterBytes
|
|
}
|
|
|
|
if err := binary.Read(
|
|
buf,
|
|
binary.BigEndian,
|
|
&state.CurrentRank,
|
|
); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
|
|
var latestQCLen uint32
|
|
if err := binary.Read(buf, binary.BigEndian, &latestQCLen); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
if latestQCLen > 0 {
|
|
latestQCBytes := make([]byte, latestQCLen)
|
|
if _, err := buf.Read(latestQCBytes); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
lt := &protobufs.QuorumCertificate{}
|
|
if err := lt.FromCanonicalBytes(latestQCBytes); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
state.LatestQuorumCertificate = lt
|
|
}
|
|
|
|
var priorTCLen uint32
|
|
if err := binary.Read(buf, binary.BigEndian, &priorTCLen); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
if priorTCLen > 0 {
|
|
priorTCBytes := make([]byte, priorTCLen)
|
|
if _, err := buf.Read(priorTCBytes); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
lt := &protobufs.TimeoutCertificate{}
|
|
if err := lt.FromCanonicalBytes(priorTCBytes); err != nil {
|
|
return nil, errors.Wrap(err, "get liveness state")
|
|
}
|
|
state.PriorRankTimeoutCertificate = lt
|
|
}
|
|
|
|
return state, nil
|
|
}
|
|
|
|
// PutConsensusState implements consensus.ConsensusStore.
|
|
func (p *PebbleConsensusStore) PutConsensusState(
|
|
state *models.ConsensusState[*protobufs.ProposalVote],
|
|
) error {
|
|
buf := new(bytes.Buffer)
|
|
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
uint32(len(state.Filter)),
|
|
); err != nil {
|
|
return errors.Wrap(err, "put consensus state")
|
|
}
|
|
if _, err := buf.Write(state.Filter); err != nil {
|
|
return errors.Wrap(err, "put consensus state")
|
|
}
|
|
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
state.FinalizedRank,
|
|
); err != nil {
|
|
return errors.Wrap(err, "put consensus state")
|
|
}
|
|
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
state.LatestAcknowledgedRank,
|
|
); err != nil {
|
|
return errors.Wrap(err, "put consensus state")
|
|
}
|
|
|
|
if state.LatestTimeout == nil {
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
uint32(0),
|
|
); err != nil {
|
|
return errors.Wrap(err, "put consensus state")
|
|
}
|
|
} else {
|
|
var priorTC *protobufs.TimeoutCertificate
|
|
if state.LatestTimeout.PriorRankTimeoutCertificate != nil {
|
|
priorTC = state.LatestTimeout.PriorRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
|
|
}
|
|
lt := &protobufs.TimeoutState{
|
|
LatestQuorumCertificate: state.LatestTimeout.LatestQuorumCertificate.(*protobufs.QuorumCertificate),
|
|
PriorRankTimeoutCertificate: priorTC,
|
|
Vote: *state.LatestTimeout.Vote,
|
|
TimeoutTick: state.LatestTimeout.TimeoutTick,
|
|
}
|
|
timeoutBytes, err := lt.ToCanonicalBytes()
|
|
if err != nil {
|
|
return errors.Wrap(err, "put consensus state")
|
|
}
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
uint32(len(timeoutBytes)),
|
|
); err != nil {
|
|
return errors.Wrap(err, "put consensus state")
|
|
}
|
|
if _, err := buf.Write(timeoutBytes); err != nil {
|
|
return errors.Wrap(err, "put consensus state")
|
|
}
|
|
}
|
|
|
|
return errors.Wrap(
|
|
p.db.Set(
|
|
slices.Concat([]byte{CONSENSUS, CONSENSUS_STATE}, state.Filter),
|
|
buf.Bytes(),
|
|
),
|
|
"put consensus state",
|
|
)
|
|
}
|
|
|
|
// PutLivenessState implements consensus.ConsensusStore.
|
|
func (p *PebbleConsensusStore) PutLivenessState(
|
|
state *models.LivenessState,
|
|
) error {
|
|
buf := new(bytes.Buffer)
|
|
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
uint32(len(state.Filter)),
|
|
); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
if _, err := buf.Write(state.Filter); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
state.CurrentRank,
|
|
); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
|
|
if state.LatestQuorumCertificate == nil {
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
uint32(0),
|
|
); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
} else {
|
|
qc := state.LatestQuorumCertificate.(*protobufs.QuorumCertificate)
|
|
qcBytes, err := qc.ToCanonicalBytes()
|
|
if err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
uint32(len(qcBytes)),
|
|
); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
if _, err := buf.Write(qcBytes); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
}
|
|
|
|
if state.PriorRankTimeoutCertificate == nil {
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
uint32(0),
|
|
); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
} else {
|
|
tc := state.PriorRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
|
|
timeoutBytes, err := tc.ToCanonicalBytes()
|
|
if err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
if err := binary.Write(
|
|
buf,
|
|
binary.BigEndian,
|
|
uint32(len(timeoutBytes)),
|
|
); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
if _, err := buf.Write(timeoutBytes); err != nil {
|
|
return errors.Wrap(err, "put liveness state")
|
|
}
|
|
}
|
|
|
|
return errors.Wrap(
|
|
p.db.Set(
|
|
slices.Concat([]byte{CONSENSUS, CONSENSUS_LIVENESS}, state.Filter),
|
|
buf.Bytes(),
|
|
),
|
|
"put liveness state",
|
|
)
|
|
}
|