ceremonyclient/node/consensus/app/message_processors.go
Cassandra Heart 7a4484b05b
v2.1.0.17 (#499)
* v2.1.0.17

* add release notes
2025-12-19 12:29:23 -06:00

1952 lines
48 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package app
import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"slices"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/store"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
var keyRegistryDomain = []byte("KEY_REGISTRY")
func (e *AppConsensusEngine) processConsensusMessageQueue(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-ctx.Done():
return
case <-e.quit:
return
case message := <-e.consensusMessageQueue:
e.handleConsensusMessage(message)
}
}
}
func (e *AppConsensusEngine) processProverMessageQueue(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-e.haltCtx.Done():
return
case <-ctx.Done():
return
case message := <-e.proverMessageQueue:
e.handleProverMessage(message)
}
}
}
func (e *AppConsensusEngine) processFrameMessageQueue(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-e.haltCtx.Done():
return
case <-ctx.Done():
return
case <-e.quit:
return
case message := <-e.frameMessageQueue:
e.handleFrameMessage(message)
}
}
}
func (e *AppConsensusEngine) processGlobalFrameMessageQueue(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-e.haltCtx.Done():
return
case <-ctx.Done():
return
case <-e.quit:
return
case message := <-e.globalFrameMessageQueue:
e.handleGlobalFrameMessage(message)
}
}
}
func (e *AppConsensusEngine) processAlertMessageQueue(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-ctx.Done():
return
case <-e.quit:
return
case message := <-e.globalAlertMessageQueue:
e.handleAlertMessage(message)
}
}
}
func (e *AppConsensusEngine) processAppShardProposalQueue(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-ctx.Done():
return
case proposal := <-e.appShardProposalQueue:
e.handleAppShardProposal(proposal)
}
}
}
func (e *AppConsensusEngine) processPeerInfoMessageQueue(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-e.haltCtx.Done():
return
case <-ctx.Done():
return
case <-e.quit:
return
case message := <-e.globalPeerInfoMessageQueue:
e.handlePeerInfoMessage(message)
}
}
}
func (e *AppConsensusEngine) processDispatchMessageQueue(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-ctx.Done():
return
case <-e.quit:
return
case message := <-e.dispatchMessageQueue:
e.handleDispatchMessage(message)
}
}
}
func (e *AppConsensusEngine) handleAppShardProposal(
proposal *protobufs.AppShardProposal,
) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(
"panic recovered from proposal",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
}
}()
e.logger.Debug(
"handling global proposal",
zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))),
)
// Small gotcha: the proposal structure uses interfaces, so we can't assign
// directly, otherwise the nil values for the structs will fail the nil
// check on the interfaces (and would incur costly reflection if we wanted
// to check it directly)
pqc := proposal.ParentQuorumCertificate
prtc := proposal.PriorRankTimeoutCertificate
vote := proposal.Vote
signedProposal := &models.SignedProposal[*protobufs.AppShardFrame, *protobufs.ProposalVote]{
Proposal: models.Proposal[*protobufs.AppShardFrame]{
State: &models.State[*protobufs.AppShardFrame]{
Rank: proposal.State.GetRank(),
Identifier: proposal.State.Identity(),
ProposerID: proposal.State.Source(),
Timestamp: proposal.State.GetTimestamp(),
State: &proposal.State,
},
},
Vote: &vote,
}
if pqc != nil {
signedProposal.Proposal.State.ParentQuorumCertificate = pqc
}
if prtc != nil {
signedProposal.PreviousRankTimeoutCertificate = prtc
}
finalized := e.forks.FinalizedState()
finalizedRank := finalized.Rank
finalizedFrameNumber := (*finalized.State).Header.FrameNumber
frameNumber := proposal.State.Header.FrameNumber
// drop proposals if we already processed them
if frameNumber <= finalizedFrameNumber ||
proposal.State.Header.Rank <= finalizedRank {
e.logger.Debug("dropping stale proposal")
return
}
existingFrame, _, err := e.clockStore.GetShardClockFrame(
proposal.State.Header.Address,
frameNumber,
false,
)
if err == nil && existingFrame != nil {
qc, qcErr := e.clockStore.GetQuorumCertificate(
proposal.State.Header.Address,
proposal.State.GetRank(),
)
if qcErr == nil && qc != nil &&
qc.GetFrameNumber() == frameNumber &&
qc.Identity() == proposal.State.Identity() {
e.logger.Debug("dropping stale proposal")
return
}
}
if proposal.State.Header.FrameNumber != 0 {
parent, _, err := e.clockStore.GetShardClockFrame(
proposal.State.Header.Address,
proposal.State.Header.FrameNumber-1,
false,
)
if err != nil || parent == nil || !bytes.Equal(
[]byte(parent.Identity()),
proposal.State.Header.ParentSelector,
) {
e.logger.Debug(
"parent frame not stored, requesting sync",
zap.Uint64("frame_number", proposal.State.Header.FrameNumber-1),
)
e.cacheProposal(proposal)
peerID, err := e.getPeerIDOfProver(proposal.State.Header.Prover)
if err != nil {
peerID, err = e.getRandomProverPeerId()
if err != nil {
e.logger.Debug("could not get peer id for sync", zap.Error(err))
return
}
}
head, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress)
if err != nil || head == nil || head.Header == nil {
e.logger.Debug("could not get shard time reel head", zap.Error(err))
return
}
e.syncProvider.AddState(
[]byte(peerID),
head.Header.FrameNumber,
[]byte(head.Identity()),
)
return
}
}
if e.frameChainChecker != nil &&
e.frameChainChecker.CanProcessSequentialChain(finalized, proposal) {
e.deleteCachedProposal(frameNumber)
if e.processProposal(proposal) {
e.drainProposalCache(frameNumber + 1)
return
}
e.logger.Debug("failed to process sequential proposal, caching")
e.cacheProposal(proposal)
return
}
expectedFrame, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress)
if err != nil {
e.logger.Error("could not obtain app time reel head", zap.Error(err))
return
}
expectedFrameNumber := uint64(0)
if expectedFrame != nil && expectedFrame.Header != nil {
expectedFrameNumber = expectedFrame.Header.FrameNumber + 1
}
if frameNumber < expectedFrameNumber {
e.logger.Debug(
"dropping proposal behind expected frame",
zap.Uint64("frame_number", frameNumber),
zap.Uint64("expected_frame_number", expectedFrameNumber),
)
return
}
if frameNumber == expectedFrameNumber {
e.deleteCachedProposal(frameNumber)
if e.processProposal(proposal) {
e.drainProposalCache(frameNumber + 1)
return
}
e.logger.Debug("failed to process expected proposal, caching")
e.cacheProposal(proposal)
return
}
e.cacheProposal(proposal)
e.drainProposalCache(expectedFrameNumber)
}
func (e *AppConsensusEngine) processProposal(
proposal *protobufs.AppShardProposal,
) bool {
return e.processProposalInternal(proposal, false)
}
func (e *AppConsensusEngine) processProposalInternal(
proposal *protobufs.AppShardProposal,
skipAncestors bool,
) bool {
if proposal == nil || proposal.State == nil || proposal.State.Header == nil {
return false
}
if !skipAncestors {
if ok, err := e.ensureShardAncestorStates(proposal); err != nil {
e.logger.Warn(
"failed to recover app shard ancestors",
zap.String("address", e.appAddressHex),
zap.Uint64("frame_number", proposal.State.Header.FrameNumber),
zap.Error(err),
)
e.requestShardAncestorSync(proposal)
return false
} else if !ok {
return false
}
}
e.logger.Debug(
"processing proposal",
zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))),
)
err := e.VerifyQuorumCertificate(proposal.ParentQuorumCertificate)
if err != nil {
e.logger.Debug("proposal has invalid qc", zap.Error(err))
return false
}
if proposal.PriorRankTimeoutCertificate != nil {
err := e.VerifyTimeoutCertificate(proposal.PriorRankTimeoutCertificate)
if err != nil {
e.logger.Debug("proposal has invalid tc", zap.Error(err))
return false
}
}
if proposal.Vote != nil {
err := e.VerifyVote(&proposal.Vote)
if err != nil {
e.logger.Debug("proposal has invalid vote", zap.Error(err))
return false
}
}
err = proposal.State.Validate()
if err != nil {
e.logger.Debug("proposal is not valid", zap.Error(err))
return false
}
valid, err := e.frameValidator.Validate(proposal.State)
if !valid || err != nil {
e.logger.Debug("invalid frame in proposal", zap.Error(err))
return false
}
// Small gotcha: the proposal structure uses interfaces, so we can't assign
// directly, otherwise the nil values for the structs will fail the nil
// check on the interfaces (and would incur costly reflection if we wanted
// to check it directly)
pqc := proposal.ParentQuorumCertificate
prtc := proposal.PriorRankTimeoutCertificate
vote := proposal.Vote
signedProposal := &models.SignedProposal[*protobufs.AppShardFrame, *protobufs.ProposalVote]{
Proposal: models.Proposal[*protobufs.AppShardFrame]{
State: &models.State[*protobufs.AppShardFrame]{
Rank: proposal.State.GetRank(),
Identifier: proposal.State.Identity(),
ProposerID: proposal.Vote.Identity(),
Timestamp: proposal.State.GetTimestamp(),
State: &proposal.State,
},
},
Vote: &vote,
}
if pqc != nil {
signedProposal.Proposal.State.ParentQuorumCertificate = pqc
}
if prtc != nil {
signedProposal.PreviousRankTimeoutCertificate = prtc
}
// IMPORTANT: we do not want to send old proposals to the vote aggregator or
// we risk engine shutdown if the leader selection method changed frame
// validation ensures that the proposer is valid for the proposal per time
// reel rules.
if signedProposal.State.Rank >= e.currentRank {
e.voteAggregator.AddState(signedProposal)
}
e.consensusParticipant.SubmitProposal(signedProposal)
proposalProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
e.trySealParentWithChild(proposal)
e.registerPendingCertifiedParent(proposal)
if proposal.State != nil {
e.recordProposalRank(proposal.State.GetRank())
}
return true
}
type shardAncestorDescriptor struct {
frameNumber uint64
selector []byte
}
func (e *AppConsensusEngine) ensureShardAncestorStates(
proposal *protobufs.AppShardProposal,
) (bool, error) {
ancestors, err := e.collectMissingShardAncestors(proposal)
if err != nil {
return false, err
}
if len(ancestors) == 0 {
return true, nil
}
for i := len(ancestors) - 1; i >= 0; i-- {
ancestor, err := e.buildStoredShardProposal(ancestors[i])
if err != nil {
return false, err
}
if !e.processProposalInternal(ancestor, true) {
return false, fmt.Errorf(
"unable to process ancestor frame %d",
ancestors[i].frameNumber,
)
}
}
return true, nil
}
func (e *AppConsensusEngine) collectMissingShardAncestors(
proposal *protobufs.AppShardProposal,
) ([]shardAncestorDescriptor, error) {
header := proposal.State.Header
if header == nil || header.FrameNumber == 0 {
return nil, nil
}
finalized := e.forks.FinalizedState()
if finalized == nil || finalized.State == nil ||
(*finalized.State).Header == nil {
return nil, errors.New("finalized state unavailable")
}
finalizedFrame := (*finalized.State).Header.FrameNumber
finalizedSelector := []byte(finalized.Identifier)
parentFrame := header.FrameNumber - 1
parentSelector := slices.Clone(header.ParentSelector)
if len(parentSelector) == 0 {
return nil, nil
}
var ancestors []shardAncestorDescriptor
anchored := false
for parentFrame > finalizedFrame && len(parentSelector) > 0 {
if _, found := e.forks.GetState(
models.Identity(string(parentSelector)),
); found {
anchored = true
break
}
ancestors = append(ancestors, shardAncestorDescriptor{
frameNumber: parentFrame,
selector: slices.Clone(parentSelector),
})
frame, err := e.loadShardFrameFromStore(parentFrame, parentSelector)
if err != nil {
return nil, err
}
parentFrame--
parentSelector = slices.Clone(frame.Header.ParentSelector)
}
if !anchored {
switch {
case parentFrame == finalizedFrame:
if !bytes.Equal(parentSelector, finalizedSelector) {
return nil, fmt.Errorf(
"ancestor chain not rooted at finalized frame %d",
finalizedFrame,
)
}
anchored = true
case parentFrame < finalizedFrame:
return nil, fmt.Errorf(
"ancestor chain crossed finalized boundary (frame %d < %d)",
parentFrame,
finalizedFrame,
)
case len(parentSelector) == 0:
return nil, errors.New(
"ancestor selector missing before reaching finalized state",
)
}
}
if !anchored {
return nil, errors.New("ancestor chain could not be anchored in forks")
}
return ancestors, nil
}
func (e *AppConsensusEngine) loadShardFrameFromStore(
frameNumber uint64,
selector []byte,
) (*protobufs.AppShardFrame, error) {
frame, err := e.clockStore.GetStagedShardClockFrame(
e.appAddress,
frameNumber,
selector,
false,
)
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
return nil, err
}
frame, _, err = e.clockStore.GetShardClockFrame(
e.appAddress,
frameNumber,
false,
)
if err != nil {
return nil, err
}
if frame == nil || frame.Header == nil ||
!bytes.Equal([]byte(frame.Identity()), selector) {
return nil, fmt.Errorf(
"sealed shard frame mismatch at %d",
frameNumber,
)
}
}
if frame == nil || frame.Header == nil {
return nil, errors.New("stored shard frame missing header")
}
return frame, nil
}
func (e *AppConsensusEngine) buildStoredShardProposal(
desc shardAncestorDescriptor,
) (*protobufs.AppShardProposal, error) {
frame, err := e.loadShardFrameFromStore(desc.frameNumber, desc.selector)
if err != nil {
return nil, err
}
var parentQC *protobufs.QuorumCertificate
if frame.GetRank() > 0 {
parentQC, err = e.clockStore.GetQuorumCertificate(
e.appAddress,
frame.GetRank()-1,
)
if err != nil {
return nil, err
}
}
var priorTC *protobufs.TimeoutCertificate
if frame.GetRank() > 0 {
priorTC, err = e.clockStore.GetTimeoutCertificate(
e.appAddress,
frame.GetRank()-1,
)
if err != nil && !errors.Is(err, store.ErrNotFound) {
return nil, err
}
if errors.Is(err, store.ErrNotFound) {
priorTC = nil
}
}
vote, err := e.clockStore.GetProposalVote(
e.appAddress,
frame.GetRank(),
[]byte(frame.Identity()),
)
if err != nil {
return nil, err
}
return &protobufs.AppShardProposal{
State: frame,
ParentQuorumCertificate: parentQC,
PriorRankTimeoutCertificate: priorTC,
Vote: vote,
}, nil
}
func (e *AppConsensusEngine) requestShardAncestorSync(
proposal *protobufs.AppShardProposal,
) {
if proposal == nil || proposal.State == nil || proposal.State.Header == nil {
return
}
if e.syncProvider == nil {
return
}
peerID, err := e.getPeerIDOfProver(proposal.State.Header.Prover)
if err != nil {
peerID, err = e.getRandomProverPeerId()
if err != nil {
return
}
}
head, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress)
if err != nil || head == nil || head.Header == nil {
e.logger.Debug("could not obtain shard head for sync", zap.Error(err))
return
}
e.syncProvider.AddState(
[]byte(peerID),
head.Header.FrameNumber,
[]byte(head.Identity()),
)
}
type keyRegistryValidationResult struct {
identityPeerID []byte
proverAddress []byte
}
func (e *AppConsensusEngine) isDuplicatePeerInfo(
peerInfo *protobufs.PeerInfo,
) bool {
digest, err := hashPeerInfo(peerInfo)
if err != nil {
e.logger.Warn("failed to hash peer info", zap.Error(err))
return false
}
e.peerInfoDigestCacheMu.Lock()
defer e.peerInfoDigestCacheMu.Unlock()
if _, ok := e.peerInfoDigestCache[digest]; ok {
return true
}
e.peerInfoDigestCache[digest] = struct{}{}
return false
}
func (e *AppConsensusEngine) isDuplicateKeyRegistry(
keyRegistry *protobufs.KeyRegistry,
) bool {
digest, err := hashKeyRegistry(keyRegistry)
if err != nil {
e.logger.Warn("failed to hash key registry", zap.Error(err))
return false
}
e.keyRegistryDigestCacheMu.Lock()
defer e.keyRegistryDigestCacheMu.Unlock()
if _, ok := e.keyRegistryDigestCache[digest]; ok {
return true
}
e.keyRegistryDigestCache[digest] = struct{}{}
return false
}
func hashPeerInfo(peerInfo *protobufs.PeerInfo) (string, error) {
cloned := proto.Clone(peerInfo).(*protobufs.PeerInfo)
cloned.Timestamp = 0
data, err := cloned.ToCanonicalBytes()
if err != nil {
return "", err
}
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:]), nil
}
func hashKeyRegistry(keyRegistry *protobufs.KeyRegistry) (string, error) {
cloned := proto.Clone(keyRegistry).(*protobufs.KeyRegistry)
cloned.LastUpdated = 0
data, err := cloned.ToCanonicalBytes()
if err != nil {
return "", err
}
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:]), nil
}
func (e *AppConsensusEngine) validateKeyRegistry(
keyRegistry *protobufs.KeyRegistry,
) (*keyRegistryValidationResult, error) {
if keyRegistry.IdentityKey == nil ||
len(keyRegistry.IdentityKey.KeyValue) == 0 {
return nil, fmt.Errorf("key registry missing identity key")
}
if err := keyRegistry.IdentityKey.Validate(); err != nil {
return nil, fmt.Errorf("invalid identity key: %w", err)
}
pubKey, err := pcrypto.UnmarshalEd448PublicKey(
keyRegistry.IdentityKey.KeyValue,
)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal identity key: %w", err)
}
peerID, err := peer.IDFromPublicKey(pubKey)
if err != nil {
return nil, fmt.Errorf("failed to derive identity peer id: %w", err)
}
identityPeerID := []byte(peerID)
if keyRegistry.ProverKey == nil ||
len(keyRegistry.ProverKey.KeyValue) == 0 {
return nil, fmt.Errorf("key registry missing prover key")
}
if err := keyRegistry.ProverKey.Validate(); err != nil {
return nil, fmt.Errorf("invalid prover key: %w", err)
}
if keyRegistry.IdentityToProver == nil ||
len(keyRegistry.IdentityToProver.Signature) == 0 {
return nil, fmt.Errorf("missing identity-to-prover signature")
}
identityMsg := slices.Concat(
keyRegistryDomain,
keyRegistry.ProverKey.KeyValue,
)
valid, err := e.keyManager.ValidateSignature(
crypto.KeyTypeEd448,
keyRegistry.IdentityKey.KeyValue,
identityMsg,
keyRegistry.IdentityToProver.Signature,
nil,
)
if err != nil {
return nil, fmt.Errorf(
"identity-to-prover signature validation failed: %w",
err,
)
}
if !valid {
return nil, fmt.Errorf("identity-to-prover signature invalid")
}
if keyRegistry.ProverToIdentity == nil ||
len(keyRegistry.ProverToIdentity.Signature) == 0 {
return nil, fmt.Errorf("missing prover-to-identity signature")
}
valid, err = e.keyManager.ValidateSignature(
crypto.KeyTypeBLS48581G1,
keyRegistry.ProverKey.KeyValue,
keyRegistry.IdentityKey.KeyValue,
keyRegistry.ProverToIdentity.Signature,
keyRegistryDomain,
)
if err != nil {
return nil, fmt.Errorf(
"prover-to-identity signature validation failed: %w",
err,
)
}
if !valid {
return nil, fmt.Errorf("prover-to-identity signature invalid")
}
addrBI, err := poseidon.HashBytes(keyRegistry.ProverKey.KeyValue)
if err != nil {
return nil, fmt.Errorf("failed to derive prover key address: %w", err)
}
proverAddress := addrBI.FillBytes(make([]byte, 32))
for purpose, collection := range keyRegistry.KeysByPurpose {
if collection == nil {
continue
}
for _, key := range collection.X448Keys {
if err := e.validateSignedX448Key(
key,
identityPeerID,
proverAddress,
keyRegistry,
); err != nil {
return nil, fmt.Errorf(
"invalid x448 key (purpose %s): %w",
purpose,
err,
)
}
}
for _, key := range collection.Decaf448Keys {
if err := e.validateSignedDecaf448Key(
key,
identityPeerID,
proverAddress,
keyRegistry,
); err != nil {
return nil, fmt.Errorf(
"invalid decaf448 key (purpose %s): %w",
purpose,
err,
)
}
}
}
return &keyRegistryValidationResult{
identityPeerID: identityPeerID,
proverAddress: proverAddress,
}, nil
}
func (e *AppConsensusEngine) validateSignedX448Key(
key *protobufs.SignedX448Key,
identityPeerID []byte,
proverAddress []byte,
keyRegistry *protobufs.KeyRegistry,
) error {
if key == nil || key.Key == nil || len(key.Key.KeyValue) == 0 {
return nil
}
msg := slices.Concat(keyRegistryDomain, key.Key.KeyValue)
switch sig := key.Signature.(type) {
case *protobufs.SignedX448Key_Ed448Signature:
if sig.Ed448Signature == nil ||
len(sig.Ed448Signature.Signature) == 0 {
return fmt.Errorf("missing ed448 signature")
}
if !bytes.Equal(key.ParentKeyAddress, identityPeerID) {
return fmt.Errorf("unexpected parent for ed448 signed x448 key")
}
valid, err := e.keyManager.ValidateSignature(
crypto.KeyTypeEd448,
keyRegistry.IdentityKey.KeyValue,
msg,
sig.Ed448Signature.Signature,
nil,
)
if err != nil {
return fmt.Errorf("failed to validate ed448 signature: %w", err)
}
if !valid {
return fmt.Errorf("ed448 signature invalid")
}
case *protobufs.SignedX448Key_BlsSignature:
if sig.BlsSignature == nil ||
len(sig.BlsSignature.Signature) == 0 {
return fmt.Errorf("missing bls signature")
}
if len(proverAddress) != 0 &&
!bytes.Equal(key.ParentKeyAddress, proverAddress) {
return fmt.Errorf("unexpected parent for bls signed x448 key")
}
valid, err := e.keyManager.ValidateSignature(
crypto.KeyTypeBLS48581G1,
keyRegistry.ProverKey.KeyValue,
key.Key.KeyValue,
sig.BlsSignature.Signature,
keyRegistryDomain,
)
if err != nil {
return fmt.Errorf("failed to validate bls signature: %w", err)
}
if !valid {
return fmt.Errorf("bls signature invalid")
}
case *protobufs.SignedX448Key_DecafSignature:
return fmt.Errorf("decaf signature not supported for x448 key")
default:
return fmt.Errorf("missing signature for x448 key")
}
return nil
}
func (e *AppConsensusEngine) validateSignedDecaf448Key(
key *protobufs.SignedDecaf448Key,
identityPeerID []byte,
proverAddress []byte,
keyRegistry *protobufs.KeyRegistry,
) error {
if key == nil || key.Key == nil || len(key.Key.KeyValue) == 0 {
return nil
}
msg := slices.Concat(keyRegistryDomain, key.Key.KeyValue)
switch sig := key.Signature.(type) {
case *protobufs.SignedDecaf448Key_Ed448Signature:
if sig.Ed448Signature == nil ||
len(sig.Ed448Signature.Signature) == 0 {
return fmt.Errorf("missing ed448 signature")
}
if !bytes.Equal(key.ParentKeyAddress, identityPeerID) {
return fmt.Errorf("unexpected parent for ed448 signed decaf key")
}
valid, err := e.keyManager.ValidateSignature(
crypto.KeyTypeEd448,
keyRegistry.IdentityKey.KeyValue,
msg,
sig.Ed448Signature.Signature,
nil,
)
if err != nil {
return fmt.Errorf("failed to validate ed448 signature: %w", err)
}
if !valid {
return fmt.Errorf("ed448 signature invalid")
}
case *protobufs.SignedDecaf448Key_BlsSignature:
if sig.BlsSignature == nil ||
len(sig.BlsSignature.Signature) == 0 {
return fmt.Errorf("missing bls signature")
}
if len(proverAddress) != 0 &&
!bytes.Equal(key.ParentKeyAddress, proverAddress) {
return fmt.Errorf("unexpected parent for bls signed decaf key")
}
valid, err := e.keyManager.ValidateSignature(
crypto.KeyTypeBLS48581G1,
keyRegistry.ProverKey.KeyValue,
key.Key.KeyValue,
sig.BlsSignature.Signature,
keyRegistryDomain,
)
if err != nil {
return fmt.Errorf("failed to validate bls signature: %w", err)
}
if !valid {
return fmt.Errorf("bls signature invalid")
}
case *protobufs.SignedDecaf448Key_DecafSignature:
return fmt.Errorf("decaf signature validation not supported")
default:
return fmt.Errorf("missing signature for decaf key")
}
return nil
}
func (e *AppConsensusEngine) cacheProposal(
proposal *protobufs.AppShardProposal,
) {
if proposal == nil || proposal.State == nil || proposal.State.Header == nil {
return
}
frameNumber := proposal.State.Header.FrameNumber
e.proposalCacheMu.Lock()
e.proposalCache[frameNumber] = proposal
e.proposalCacheMu.Unlock()
e.logger.Debug(
"cached out-of-order proposal",
zap.String("address", e.appAddressHex),
zap.Uint64("frame_number", frameNumber),
)
}
func (e *AppConsensusEngine) deleteCachedProposal(frameNumber uint64) {
e.proposalCacheMu.Lock()
delete(e.proposalCache, frameNumber)
e.proposalCacheMu.Unlock()
}
func (e *AppConsensusEngine) popCachedProposal(
frameNumber uint64,
) *protobufs.AppShardProposal {
e.proposalCacheMu.Lock()
defer e.proposalCacheMu.Unlock()
proposal, ok := e.proposalCache[frameNumber]
if ok {
delete(e.proposalCache, frameNumber)
}
return proposal
}
func (e *AppConsensusEngine) drainProposalCache(startFrame uint64) {
next := startFrame
for {
prop := e.popCachedProposal(next)
if prop == nil {
return
}
if !e.processProposal(prop) {
e.logger.Debug(
"cached proposal failed processing, retaining for retry",
zap.String("address", e.appAddressHex),
zap.Uint64("frame_number", next),
)
e.cacheProposal(prop)
return
}
next++
}
}
func (e *AppConsensusEngine) registerPendingCertifiedParent(
proposal *protobufs.AppShardProposal,
) {
if proposal == nil || proposal.State == nil || proposal.State.Header == nil {
return
}
frameNumber := proposal.State.Header.FrameNumber
e.pendingCertifiedParentsMu.Lock()
e.pendingCertifiedParents[frameNumber] = proposal
e.pendingCertifiedParentsMu.Unlock()
}
func (e *AppConsensusEngine) trySealParentWithChild(
child *protobufs.AppShardProposal,
) {
if child == nil || child.State == nil || child.State.Header == nil {
return
}
header := child.State.Header
if header.FrameNumber == 0 {
return
}
parentFrame := header.FrameNumber - 1
e.pendingCertifiedParentsMu.RLock()
parent, ok := e.pendingCertifiedParents[parentFrame]
e.pendingCertifiedParentsMu.RUnlock()
if !ok || parent == nil || parent.State == nil || parent.State.Header == nil {
return
}
if !bytes.Equal(
header.ParentSelector,
[]byte(parent.State.Identity()),
) {
e.logger.Debug(
"pending parent selector mismatch, dropping entry",
zap.String("address", e.appAddressHex),
zap.Uint64("parent_frame", parent.State.Header.FrameNumber),
zap.Uint64("child_frame", header.FrameNumber),
)
e.pendingCertifiedParentsMu.Lock()
delete(e.pendingCertifiedParents, parentFrame)
e.pendingCertifiedParentsMu.Unlock()
return
}
head, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress)
if err != nil {
e.logger.Error("error fetching app time reel head", zap.Error(err))
return
}
if head != nil && head.Header != nil &&
head.Header.FrameNumber+1 == parent.State.Header.FrameNumber {
e.logger.Debug(
"sealing parent with descendant proposal",
zap.String("address", e.appAddressHex),
zap.Uint64("parent_frame", parent.State.Header.FrameNumber),
zap.Uint64("child_frame", header.FrameNumber),
)
e.addCertifiedState(parent, child)
}
e.pendingCertifiedParentsMu.Lock()
delete(e.pendingCertifiedParents, parentFrame)
e.pendingCertifiedParentsMu.Unlock()
}
func (e *AppConsensusEngine) addCertifiedState(
parent, child *protobufs.AppShardProposal,
) {
if parent == nil || parent.State == nil || parent.State.Header == nil ||
child == nil || child.State == nil || child.State.Header == nil {
e.logger.Error("cannot seal certified state: missing parent or child data")
return
}
qc := child.ParentQuorumCertificate
if qc == nil {
e.logger.Error(
"child missing parent quorum certificate",
zap.Uint64("child_frame_number", child.State.Header.FrameNumber),
)
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
aggregateSig := &protobufs.BLS48581AggregateSignature{
Signature: qc.GetAggregatedSignature().GetSignature(),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: qc.GetAggregatedSignature().GetPubKey(),
},
Bitmask: qc.GetAggregatedSignature().GetBitmask(),
}
if err := e.clockStore.PutQuorumCertificate(
&protobufs.QuorumCertificate{
Filter: e.appAddress,
Rank: qc.GetRank(),
FrameNumber: qc.GetFrameNumber(),
Selector: []byte(qc.Identity()),
AggregateSignature: aggregateSig,
},
txn,
); err != nil {
e.logger.Error("could not insert quorum certificate", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
parent.State.Header.PublicKeySignatureBls48581 = aggregateSig
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.materialize(
txn,
parent.State,
); err != nil {
_ = txn.Abort()
e.logger.Error("could not materialize frame requests", zap.Error(err))
return
}
if err := e.clockStore.CommitShardClockFrame(
e.appAddress,
parent.State.GetFrameNumber(),
[]byte(parent.State.Identity()),
[]*tries.RollingFrecencyCritbitTrie{},
txn,
false,
); err != nil {
_ = txn.Abort()
e.logger.Error("could not put global frame", zap.Error(err))
return
}
if err := e.clockStore.PutCertifiedAppShardState(
parent,
txn,
); err != nil {
e.logger.Error("could not insert certified state", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
if err := e.checkShardCoverage(parent.State.Header.FrameNumber); err != nil {
e.logger.Error("could not check shard coverage", zap.Error(err))
}
}
func (e *AppConsensusEngine) handleConsensusMessage(message *pb.Message) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(
"panic recovered from message",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
}
}()
typePrefix := e.peekMessageType(message)
switch typePrefix {
case protobufs.AppShardProposalType:
e.handleProposal(message)
case protobufs.ProposalVoteType:
e.handleVote(message)
case protobufs.TimeoutStateType:
e.handleTimeoutState(message)
case protobufs.ProverLivenessCheckType:
// Liveness checks are processed globally; nothing to do here.
default:
e.logger.Debug(
"received unknown message type on app address",
zap.Uint32("type", typePrefix),
)
}
}
func (e *AppConsensusEngine) handleFrameMessage(message *pb.Message) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(
"panic recovered from message",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
}
}()
// we're already getting this from consensus
if e.IsInProverTrie(e.getProverAddress()) {
return
}
typePrefix := e.peekMessageType(message)
switch typePrefix {
case protobufs.AppShardFrameType:
timer := prometheus.NewTimer(
frameProcessingDuration.WithLabelValues(e.appAddressHex),
)
defer timer.ObserveDuration()
frame := &protobufs.AppShardFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal frame", zap.Error(err))
framesProcessedTotal.WithLabelValues("error").Inc()
return
}
frameIDBI, _ := poseidon.HashBytes(frame.Header.Output)
frameID := frameIDBI.FillBytes(make([]byte, 32))
e.frameStoreMu.Lock()
e.frameStore[string(frameID)] = frame
e.frameStoreMu.Unlock()
if err := e.appTimeReel.Insert(frame); err != nil {
// Success metric recorded at the end of processing
framesProcessedTotal.WithLabelValues("error").Inc()
return
}
// Success metric recorded at the end of processing
framesProcessedTotal.WithLabelValues("success").Inc()
default:
e.logger.Debug(
"unknown message type",
zap.Uint32("type", typePrefix),
)
}
}
func (e *AppConsensusEngine) handleProverMessage(message *pb.Message) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(
"panic recovered from message",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
}
}()
typePrefix := e.peekMessageType(message)
e.logger.Debug(
"handling prover message",
zap.Uint32("type_prefix", typePrefix),
)
switch typePrefix {
case protobufs.MessageBundleType:
hash := sha3.Sum256(message.Data)
e.addAppMessage(&protobufs.Message{
Address: e.appAddress[:32],
Hash: hash[:],
Payload: slices.Clone(message.Data),
})
e.logger.Debug(
"collected app request for execution",
zap.Uint32("type", typePrefix),
)
default:
e.logger.Debug(
"unknown message type",
zap.Uint32("type", typePrefix),
)
}
}
func (e *AppConsensusEngine) handleGlobalFrameMessage(message *pb.Message) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(
"panic recovered from message",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
}
}()
typePrefix := e.peekMessageType(message)
switch typePrefix {
case protobufs.GlobalFrameType:
timer := prometheus.NewTimer(globalFrameProcessingDuration)
defer timer.ObserveDuration()
frame := &protobufs.GlobalFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal frame", zap.Error(err))
globalFramesProcessedTotal.WithLabelValues("error").Inc()
return
}
// If genesis hasn't been initialized yet, send this frame to the
// genesis init channel (non-blocking)
if !e.genesisInitialized.Load() {
select {
case e.genesisInitChan <- frame:
e.logger.Debug(
"sent global frame to genesis init channel",
zap.Uint64("frame_number", frame.Header.FrameNumber),
)
default:
// Channel already has a frame, skip
}
}
if err := e.globalTimeReel.Insert(frame); err != nil {
// Success metric recorded at the end of processing
globalFramesProcessedTotal.WithLabelValues("error").Inc()
return
}
e.handleGlobalProverRoot(frame)
// Success metric recorded at the end of processing
globalFramesProcessedTotal.WithLabelValues("success").Inc()
default:
e.logger.Debug(
"unknown message type",
zap.Uint32("type", typePrefix),
)
}
}
func (e *AppConsensusEngine) handleAlertMessage(message *pb.Message) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(
"panic recovered from message",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
}
}()
typePrefix := e.peekMessageType(message)
switch typePrefix {
case protobufs.GlobalAlertType:
alert := &protobufs.GlobalAlert{}
if err := alert.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal peer info", zap.Error(err))
return
}
e.emitAlertEvent(alert.Message)
default:
e.logger.Debug(
"unknown message type",
zap.Uint32("type", typePrefix),
)
}
}
func (e *AppConsensusEngine) handlePeerInfoMessage(message *pb.Message) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(
"panic recovered from message",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
}
}()
typePrefix := e.peekMessageType(message)
switch typePrefix {
case protobufs.PeerInfoType:
peerInfo := &protobufs.PeerInfo{}
if err := peerInfo.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal peer info", zap.Error(err))
return
}
if e.peerInfoManager == nil {
e.logger.Warn(
"peer info manager unavailable; dropping peer info",
zap.ByteString("peer_id", peerInfo.PeerId),
)
return
}
if e.isDuplicatePeerInfo(peerInfo) {
if existing := e.peerInfoManager.GetPeerInfo(
peerInfo.PeerId,
); existing != nil {
existing.LastSeen = time.Now().UnixMilli()
return
}
}
// Validate signature
if !e.validatePeerInfoSignature(peerInfo) {
e.logger.Debug("invalid peer info signature",
zap.String("peer_id", peer.ID(peerInfo.PeerId).String()))
return
}
// Also add to the existing peer info manager
e.peerInfoManager.AddPeerInfo(peerInfo)
case protobufs.KeyRegistryType:
keyRegistry := &protobufs.KeyRegistry{}
if err := keyRegistry.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal key registry", zap.Error(err))
return
}
if err := keyRegistry.Validate(); err != nil {
e.logger.Debug("invalid key registry", zap.Error(err))
return
}
validation, err := e.validateKeyRegistry(keyRegistry)
if err != nil {
e.logger.Debug("invalid key registry signatures", zap.Error(err))
return
}
if e.isDuplicateKeyRegistry(keyRegistry) {
_, err := e.keyStore.GetKeyRegistry(validation.identityPeerID)
if err == nil {
return
}
}
txn, err := e.keyStore.NewTransaction()
if err != nil {
e.logger.Error("failed to create keystore txn", zap.Error(err))
return
}
commit := false
defer func() {
if !commit {
if abortErr := txn.Abort(); abortErr != nil {
e.logger.Warn("failed to abort keystore txn", zap.Error(abortErr))
}
}
}()
var identityAddress []byte
if keyRegistry.IdentityKey != nil &&
len(keyRegistry.IdentityKey.KeyValue) != 0 {
if err := e.keyStore.PutIdentityKey(
txn,
validation.identityPeerID,
keyRegistry.IdentityKey,
); err != nil {
e.logger.Error("failed to store identity key", zap.Error(err))
return
}
identityAddress = validation.identityPeerID
}
var proverAddress []byte
if keyRegistry.ProverKey != nil &&
len(keyRegistry.ProverKey.KeyValue) != 0 {
if err := e.keyStore.PutProvingKey(
txn,
validation.proverAddress,
&protobufs.BLS48581SignatureWithProofOfPossession{
PublicKey: keyRegistry.ProverKey,
},
); err != nil {
e.logger.Error("failed to store prover key", zap.Error(err))
return
}
proverAddress = validation.proverAddress
}
if len(identityAddress) != 0 && len(proverAddress) == 32 &&
keyRegistry.IdentityToProver != nil &&
len(keyRegistry.IdentityToProver.Signature) != 0 &&
keyRegistry.ProverToIdentity != nil &&
len(keyRegistry.ProverToIdentity.Signature) != 0 {
if err := e.keyStore.PutCrossSignature(
txn,
identityAddress,
proverAddress,
keyRegistry.IdentityToProver.Signature,
keyRegistry.ProverToIdentity.Signature,
); err != nil {
e.logger.Error("failed to store cross signatures", zap.Error(err))
return
}
}
for _, collection := range keyRegistry.KeysByPurpose {
for _, key := range collection.X448Keys {
if key == nil || key.Key == nil ||
len(key.Key.KeyValue) == 0 {
continue
}
addrBI, err := poseidon.HashBytes(key.Key.KeyValue)
if err != nil {
e.logger.Error("failed to derive x448 key address", zap.Error(err))
return
}
address := addrBI.FillBytes(make([]byte, 32))
if err := e.keyStore.PutSignedX448Key(txn, address, key); err != nil {
e.logger.Error("failed to store signed x448 key", zap.Error(err))
return
}
}
for _, key := range collection.Decaf448Keys {
if key == nil || key.Key == nil ||
len(key.Key.KeyValue) == 0 {
continue
}
addrBI, err := poseidon.HashBytes(key.Key.KeyValue)
if err != nil {
e.logger.Error(
"failed to derive decaf448 key address",
zap.Error(err),
)
return
}
address := addrBI.FillBytes(make([]byte, 32))
if err := e.keyStore.PutSignedDecaf448Key(
txn,
address,
key,
); err != nil {
e.logger.Error("failed to store signed decaf448 key", zap.Error(err))
return
}
}
}
if err := txn.Commit(); err != nil {
e.logger.Error("failed to commit key registry txn", zap.Error(err))
return
}
commit = true
default:
e.logger.Debug(
"unknown message type",
zap.Uint32("type", typePrefix),
)
}
}
func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(
"panic recovered from message",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
}
}()
typePrefix := e.peekMessageType(message)
switch typePrefix {
case protobufs.InboxMessageType:
envelope := &protobufs.InboxMessage{}
if err := envelope.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal envelope", zap.Error(err))
return
}
if err := e.dispatchService.AddInboxMessage(
context.Background(),
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
}
case protobufs.HubAddInboxType:
envelope := &protobufs.HubAddInboxMessage{}
if err := envelope.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal envelope", zap.Error(err))
return
}
if err := e.dispatchService.AddHubInboxAssociation(
context.Background(),
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
}
case protobufs.HubDeleteInboxType:
envelope := &protobufs.HubDeleteInboxMessage{}
if err := envelope.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal envelope", zap.Error(err))
return
}
if err := e.dispatchService.DeleteHubInboxAssociation(
context.Background(),
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
}
default:
e.logger.Debug(
"unknown message type",
zap.Uint32("type", typePrefix),
)
}
}
func (e *AppConsensusEngine) handleProposal(message *pb.Message) {
// Skip our own messages
if bytes.Equal(message.From, e.pubsub.GetPeerID()) {
return
}
timer := prometheus.NewTimer(
proposalProcessingDuration.WithLabelValues(e.appAddressHex),
)
defer timer.ObserveDuration()
proposal := &protobufs.AppShardProposal{}
if err := proposal.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal proposal", zap.Error(err))
proposalProcessedTotal.WithLabelValues("error").Inc()
return
}
if !bytes.Equal(proposal.State.Header.Address, e.appAddress) {
return
}
frameIDBI, _ := poseidon.HashBytes(proposal.State.Header.Output)
frameID := frameIDBI.FillBytes(make([]byte, 32))
e.frameStoreMu.Lock()
e.frameStore[string(frameID)] = proposal.State
e.frameStoreMu.Unlock()
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutProposalVote(txn, proposal.Vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := e.clockStore.StageShardClockFrame(
[]byte(proposal.State.Identity()),
proposal.State,
txn,
); err != nil {
e.logger.Error("could not stage clock frame", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.appShardProposalQueue <- proposal
proposalProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
}
func (e *AppConsensusEngine) AddProposal(proposal *protobufs.AppShardProposal) {
e.appShardProposalQueue <- proposal
}
func (e *AppConsensusEngine) handleVote(message *pb.Message) {
timer := prometheus.NewTimer(
voteProcessingDuration.WithLabelValues(e.appAddressHex),
)
defer timer.ObserveDuration()
vote := &protobufs.ProposalVote{}
if err := vote.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal vote", zap.Error(err))
voteProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
if !bytes.Equal(vote.Filter, e.appAddress) {
return
}
if vote.PublicKeySignatureBls48581 == nil {
e.logger.Error("vote without signature")
voteProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
proverSet, err := e.proverRegistry.GetActiveProvers(e.appAddress)
if err != nil {
e.logger.Error("could not get active provers", zap.Error(err))
voteProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
// Find the voter's public key
var voterPublicKey []byte = nil
for _, prover := range proverSet {
if bytes.Equal(
prover.Address,
vote.PublicKeySignatureBls48581.Address,
) {
voterPublicKey = prover.PublicKey
break
}
}
if voterPublicKey == nil {
e.logger.Warn(
"invalid vote - voter not found",
zap.String(
"voter",
hex.EncodeToString(
vote.PublicKeySignatureBls48581.Address,
),
),
)
voteProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutProposalVote(txn, vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.voteAggregator.AddVote(&vote)
voteProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
}
func (e *AppConsensusEngine) handleTimeoutState(message *pb.Message) {
timer := prometheus.NewTimer(
timeoutStateProcessingDuration.WithLabelValues(e.appAddressHex),
)
defer timer.ObserveDuration()
timeoutState := &protobufs.TimeoutState{}
if err := timeoutState.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal timeoutState", zap.Error(err))
timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
if !bytes.Equal(timeoutState.Vote.Filter, e.appAddress) {
return
}
// Small gotcha: the timeout structure uses interfaces, so we can't assign
// directly, otherwise the nil values for the structs will fail the nil
// check on the interfaces (and would incur costly reflection if we wanted
// to check it directly)
lqc := timeoutState.LatestQuorumCertificate
prtc := timeoutState.PriorRankTimeoutCertificate
timeout := &models.TimeoutState[*protobufs.ProposalVote]{
Rank: timeoutState.Vote.Rank,
Vote: &timeoutState.Vote,
TimeoutTick: timeoutState.TimeoutTick,
}
if lqc != nil {
timeout.LatestQuorumCertificate = lqc
}
if prtc != nil {
timeout.PriorRankTimeoutCertificate = prtc
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutTimeoutVote(txn, timeoutState); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.timeoutAggregator.AddTimeout(timeout)
timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
}
func (e *AppConsensusEngine) peekMessageType(message *pb.Message) uint32 {
// Check if data is long enough to contain type prefix
if len(message.Data) < 4 {
e.logger.Debug(
"message too short",
zap.Int("data_length", len(message.Data)),
)
return 0
}
// Read type prefix from first 4 bytes
return binary.BigEndian.Uint32(message.Data[:4])
}
// validatePeerInfoSignature validates the signature of a peer info message
func (e *AppConsensusEngine) validatePeerInfoSignature(
peerInfo *protobufs.PeerInfo,
) bool {
if len(peerInfo.Signature) == 0 || len(peerInfo.PublicKey) == 0 {
return false
}
// Create a copy of the peer info without the signature for validation
infoCopy := &protobufs.PeerInfo{
PeerId: peerInfo.PeerId,
Reachability: peerInfo.Reachability,
Timestamp: peerInfo.Timestamp,
Version: peerInfo.Version,
PatchNumber: peerInfo.PatchNumber,
Capabilities: peerInfo.Capabilities,
PublicKey: peerInfo.PublicKey,
LastReceivedFrame: peerInfo.LastReceivedFrame,
LastGlobalHeadFrame: peerInfo.LastGlobalHeadFrame,
// Exclude Signature field
}
// Serialize the message for signature validation
msg, err := infoCopy.ToCanonicalBytes()
if err != nil {
e.logger.Debug(
"failed to serialize peer info for validation",
zap.Error(err),
)
return false
}
// Validate the signature using pubsub's verification
valid, err := e.keyManager.ValidateSignature(
crypto.KeyTypeEd448,
peerInfo.PublicKey,
msg,
peerInfo.Signature,
[]byte{},
)
if err != nil {
e.logger.Debug(
"failed to validate signature",
zap.Error(err),
)
return false
}
return valid
}