finalize 2.1.0.5

This commit is contained in:
Cassandra Heart 2025-11-11 04:57:55 -06:00
parent 268b98eaa3
commit 8e509ec5a8
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
30 changed files with 3142 additions and 1054 deletions

View File

@ -199,7 +199,10 @@ func (el *EventLoop[StateT, VoteT]) loop(ctx context.Context) error {
)
}
el.tracer.Trace("state proposal has been processed successfully")
el.tracer.Trace(
"state proposal has been processed successfully",
consensus.Uint64Param("rank", proposal.State.Rank),
)
// if we have a new QC, process it
case <-quorumCertificates:

View File

@ -44,11 +44,11 @@ func NewParticipant[
pending []*models.SignedProposal[StateT, VoteT],
) (*eventloop.EventLoop[StateT, VoteT], error) {
cfg, err := timeout.NewConfig(
10*time.Second,
30*time.Second,
20*time.Second,
3*time.Minute,
1.2,
6,
10*time.Second,
28*time.Second,
)
if err != nil {
return nil, err
@ -103,7 +103,7 @@ func NewParticipant[
pacemaker, err := pacemaker.NewPacemaker[StateT, VoteT](
filter,
controller,
pacemaker.NoProposalDelay(),
pacemaker.NewStaticProposalDurationProvider(8*time.Second),
notifier,
consensusStore,
logger,

View File

@ -361,11 +361,11 @@ func provideDifficultyAnchorFrameNumber(config *config.Config) uint64 {
}
func provideDifficultyAnchorParentTime() int64 {
return 1761217200000
return 1762862400000
}
func provideDifficultyAnchorDifficulty() uint32 {
return 160000 // Initial difficulty
return 80000 // Initial difficulty
}
func provideGlobalTimeReel(

View File

@ -423,11 +423,11 @@ func provideDifficultyAnchorFrameNumber(config2 *config.Config) uint64 {
}
func provideDifficultyAnchorParentTime() int64 {
return 1761217200000
return 1762862400000
}
func provideDifficultyAnchorDifficulty() uint32 {
return 160000
return 80000
}
func provideGlobalTimeReel(

View File

@ -96,34 +96,38 @@ type AppConsensusEngine struct {
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
]
encryptedChannel channel.EncryptedChannel
dispatchService *dispatch.DispatchService
blsConstructor crypto.BlsConstructor
minimumProvers func() uint64
executors map[string]execution.ShardExecutionEngine
executorsMu sync.RWMutex
executionManager *manager.ExecutionEngineManager
peerInfoManager tp2p.PeerInfoManager
currentDifficulty uint32
currentDifficultyMu sync.RWMutex
pendingMessages []*protobufs.Message
pendingMessagesMu sync.RWMutex
collectedMessages []*protobufs.Message
collectedMessagesMu sync.RWMutex
lastProvenFrameTime time.Time
lastProvenFrameTimeMu sync.RWMutex
frameStore map[string]*protobufs.AppShardFrame
frameStoreMu sync.RWMutex
proposalCache map[string]*protobufs.AppShardProposal
proposalCacheMu sync.RWMutex
proofCache map[uint64][516]byte
proofCacheMu sync.RWMutex
ctx lifecycle.SignalerContext
cancel context.CancelFunc
quit chan struct{}
canRunStandalone bool
blacklistMap map[string]bool
alertPublicKey []byte
encryptedChannel channel.EncryptedChannel
dispatchService *dispatch.DispatchService
blsConstructor crypto.BlsConstructor
minimumProvers func() uint64
executors map[string]execution.ShardExecutionEngine
executorsMu sync.RWMutex
executionManager *manager.ExecutionEngineManager
peerInfoManager tp2p.PeerInfoManager
currentDifficulty uint32
currentDifficultyMu sync.RWMutex
pendingMessages []*protobufs.Message
pendingMessagesMu sync.RWMutex
collectedMessages []*protobufs.Message
collectedMessagesMu sync.RWMutex
provingMessages []*protobufs.Message
provingMessagesMu sync.RWMutex
lastProvenFrameTime time.Time
lastProvenFrameTimeMu sync.RWMutex
frameStore map[string]*protobufs.AppShardFrame
frameStoreMu sync.RWMutex
proposalCache map[uint64]*protobufs.AppShardProposal
proposalCacheMu sync.RWMutex
pendingCertifiedParents map[uint64]*protobufs.AppShardProposal
pendingCertifiedParentsMu sync.RWMutex
proofCache map[uint64][516]byte
proofCacheMu sync.RWMutex
ctx lifecycle.SignalerContext
cancel context.CancelFunc
quit chan struct{}
canRunStandalone bool
blacklistMap map[string]bool
alertPublicKey []byte
// Message queues
consensusMessageQueue chan *pb.Message
@ -244,9 +248,12 @@ func NewAppConsensusEngine(
peerInfoManager: peerInfoManager,
executors: make(map[string]execution.ShardExecutionEngine),
frameStore: make(map[string]*protobufs.AppShardFrame),
proposalCache: make(map[string]*protobufs.AppShardProposal),
proposalCache: make(map[uint64]*protobufs.AppShardProposal),
pendingCertifiedParents: make(map[uint64]*protobufs.AppShardProposal),
proofCache: make(map[uint64][516]byte),
pendingMessages: []*protobufs.Message{},
collectedMessages: []*protobufs.Message{},
provingMessages: []*protobufs.Message{},
consensusMessageQueue: make(chan *pb.Message, 1000),
proverMessageQueue: make(chan *pb.Message, 1000),
frameMessageQueue: make(chan *pb.Message, 100),
@ -1695,42 +1702,44 @@ func (e *AppConsensusEngine) OnOwnProposal(
],
targetPublicationTime time.Time,
) {
select {
case <-e.haltCtx.Done():
return
default:
}
var priorTC *protobufs.TimeoutCertificate = nil
if proposal.PreviousRankTimeoutCertificate != nil {
priorTC =
proposal.PreviousRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
}
go func() {
select {
case <-time.After(time.Until(targetPublicationTime)):
case <-e.ShutdownSignal():
return
}
var priorTC *protobufs.TimeoutCertificate = nil
if proposal.PreviousRankTimeoutCertificate != nil {
priorTC =
proposal.PreviousRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
}
// Manually override the signature as the vdf prover's signature is invalid
(*proposal.State.State).Header.PublicKeySignatureBls48581.Signature =
(*proposal.Vote).PublicKeySignatureBls48581.Signature
// Manually override the signature as the vdf prover's signature is invalid
(*proposal.State.State).Header.PublicKeySignatureBls48581.Signature =
(*proposal.Vote).PublicKeySignatureBls48581.Signature
pbProposal := &protobufs.AppShardProposal{
State: *proposal.State.State,
ParentQuorumCertificate: proposal.Proposal.State.ParentQuorumCertificate.(*protobufs.QuorumCertificate),
PriorRankTimeoutCertificate: priorTC,
Vote: *proposal.Vote,
}
data, err := pbProposal.ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize proposal", zap.Error(err))
return
}
pbProposal := &protobufs.AppShardProposal{
State: *proposal.State.State,
ParentQuorumCertificate: proposal.Proposal.State.ParentQuorumCertificate.(*protobufs.QuorumCertificate),
PriorRankTimeoutCertificate: priorTC,
Vote: *proposal.Vote,
}
data, err := pbProposal.ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize proposal", zap.Error(err))
return
}
e.voteAggregator.AddState(proposal)
e.consensusParticipant.SubmitProposal(proposal)
e.voteAggregator.AddState(proposal)
e.consensusParticipant.SubmitProposal(proposal)
if err := e.pubsub.PublishToBitmask(
e.getConsensusMessageBitmask(),
data,
); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
if err := e.pubsub.PublishToBitmask(
e.getConsensusMessageBitmask(),
data,
); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
}()
}
// OnOwnTimeout implements consensus.Consumer.
@ -1899,6 +1908,20 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange(
e.logger.Debug("no prior rank TC to include", zap.Uint64("rank", newRank-1))
}
vote, err := e.clockStore.GetProposalVote(
e.appAddress,
frame.GetRank(),
[]byte(frame.Source()),
)
if err != nil {
e.logger.Error(
"cannot find proposer's vote",
zap.Uint64("rank", newRank-1),
zap.String("proposer", hex.EncodeToString([]byte(frame.Source()))),
)
return
}
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
@ -1910,6 +1933,7 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange(
State: frame,
ParentQuorumCertificate: parentQC,
PriorRankTimeoutCertificate: priorRankTC,
Vote: vote,
},
txn,
); err != nil {
@ -1964,7 +1988,220 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange(
}
// OnRankChange implements consensus.Consumer.
func (e *AppConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) {}
func (e *AppConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) {
err := e.ensureGlobalClient()
if err != nil {
e.logger.Error("cannot confirm cross-shard locks", zap.Error(err))
return
}
frame, err := e.appTimeReel.GetHead()
if err != nil {
e.logger.Error("cannot obtain time reel head", zap.Error(err))
return
}
res, err := e.globalClient.GetLockedAddresses(
context.Background(),
&protobufs.GetLockedAddressesRequest{
ShardAddress: e.appAddress,
FrameNumber: frame.Header.FrameNumber,
},
)
if err != nil {
e.logger.Error("cannot confirm cross-shard locks", zap.Error(err))
return
}
// Build a map of transaction hashes to their committed status
txMap := map[string]bool{}
txIncluded := map[string]bool{}
txMessageMap := map[string]*protobufs.Message{}
txHashesInOrder := []string{}
txShardRefs := map[string]map[string]struct{}{}
e.collectedMessagesMu.Lock()
collected := make([]*protobufs.Message, len(e.collectedMessages))
copy(collected, e.collectedMessages)
e.collectedMessages = []*protobufs.Message{}
e.collectedMessagesMu.Unlock()
e.provingMessagesMu.Lock()
e.provingMessages = []*protobufs.Message{}
e.provingMessagesMu.Unlock()
for _, req := range collected {
tx, err := req.ToCanonicalBytes()
if err != nil {
e.logger.Error("cannot confirm cross-shard locks", zap.Error(err))
return
}
txHash := sha3.Sum256(tx)
e.logger.Debug(
"adding transaction in frame to commit check",
zap.String("tx_hash", hex.EncodeToString(txHash[:])),
)
hashStr := string(txHash[:])
txMap[hashStr] = false
txIncluded[hashStr] = true
txMessageMap[hashStr] = req
txHashesInOrder = append(txHashesInOrder, hashStr)
}
// Check that transactions are committed in our shard and collect shard
// addresses
shardAddressesSet := make(map[string]bool)
for _, tx := range res.Transactions {
e.logger.Debug(
"checking transaction from global map",
zap.String("tx_hash", hex.EncodeToString(tx.TransactionHash)),
)
hashStr := string(tx.TransactionHash)
if _, ok := txMap[hashStr]; ok {
txMap[hashStr] = tx.Committed
// Extract shard addresses from each locked transaction's shard addresses
for _, shardAddr := range tx.ShardAddresses {
// Extract the applicable shard address (can be shorter than the full
// address)
extractedShards := e.extractShardAddresses(shardAddr)
for _, extractedShard := range extractedShards {
shardAddrStr := string(extractedShard)
shardAddressesSet[shardAddrStr] = true
if txShardRefs[hashStr] == nil {
txShardRefs[hashStr] = make(map[string]struct{})
}
txShardRefs[hashStr][shardAddrStr] = struct{}{}
}
}
}
}
// Check that all transactions are committed in our shard
for _, committed := range txMap {
if !committed {
e.logger.Error("transaction not committed in local shard")
return
}
}
// Check cross-shard locks for each unique shard address
for shardAddrStr := range shardAddressesSet {
shardAddr := []byte(shardAddrStr)
// Skip our own shard since we already checked it
if bytes.Equal(shardAddr, e.appAddress) {
continue
}
// Query the global client for locked addresses in this shard
shardRes, err := e.globalClient.GetLockedAddresses(
context.Background(),
&protobufs.GetLockedAddressesRequest{
ShardAddress: shardAddr,
FrameNumber: frame.Header.FrameNumber,
},
)
if err != nil {
e.logger.Error(
"failed to get locked addresses for shard",
zap.String("shard_addr", hex.EncodeToString(shardAddr)),
zap.Error(err),
)
for hashStr, shards := range txShardRefs {
if _, ok := shards[shardAddrStr]; ok {
txIncluded[hashStr] = false
}
}
continue
}
// Check that all our transactions are committed in this shard
for txHashStr := range txMap {
committedInShard := false
for _, tx := range shardRes.Transactions {
if string(tx.TransactionHash) == txHashStr {
committedInShard = tx.Committed
break
}
}
if !committedInShard {
e.logger.Error("cannot confirm cross-shard locks")
txIncluded[txHashStr] = false
}
}
}
e.provingMessagesMu.Lock()
e.provingMessages = e.provingMessages[:0]
for _, hashStr := range txHashesInOrder {
if txIncluded[hashStr] {
e.provingMessages = append(e.provingMessages, txMessageMap[hashStr])
}
}
e.provingMessagesMu.Unlock()
commitments, err := e.livenessProvider.Collect(context.Background())
if err != nil {
e.logger.Error("could not collect commitments", zap.Error(err))
return
}
if err := e.broadcastLivenessCheck(newRank, commitments); err != nil {
e.logger.Error("could not broadcast liveness check", zap.Error(err))
}
}
func (e *AppConsensusEngine) broadcastLivenessCheck(
newRank uint64,
commitments CollectedCommitments,
) error {
signer, _, publicKey, _ := e.GetProvingKey(e.config.Engine)
if signer == nil || publicKey == nil {
return errors.Wrap(
errors.New("no proving key available"),
"broadcast liveness check",
)
}
check := &protobufs.ProverLivenessCheck{
Filter: slices.Clone(e.appAddress),
Rank: newRank,
FrameNumber: commitments.frameNumber,
Timestamp: time.Now().UnixMilli(),
CommitmentHash: slices.Clone(commitments.commitmentHash),
}
payload, err := check.ConstructSignaturePayload()
if err != nil {
return errors.Wrap(err, "construct liveness payload")
}
sig, err := signer.SignWithDomain(payload, check.GetSignatureDomain())
if err != nil {
return errors.Wrap(err, "sign liveness check")
}
check.PublicKeySignatureBls48581 = &protobufs.BLS48581AddressedSignature{
Address: e.getAddressFromPublicKey(publicKey),
Signature: sig,
}
bytes, err := check.ToCanonicalBytes()
if err != nil {
return errors.Wrap(err, "marshal liveness check")
}
if err := e.pubsub.PublishToBitmask(
e.getConsensusMessageBitmask(),
bytes,
); err != nil {
return errors.Wrap(err, "publish liveness check")
}
return nil
}
// OnReceiveProposal implements consensus.Consumer.
func (e *AppConsensusEngine) OnReceiveProposal(
@ -2043,6 +2280,13 @@ func (e *AppConsensusEngine) OnTimeoutCertificateTriggeredRankChange(
Bitmask: qc.GetAggregatedSignature().GetBitmask(),
},
},
AggregateSignature: &protobufs.BLS48581AggregateSignature{
Signature: tc.GetAggregatedSignature().GetSignature(),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: tc.GetAggregatedSignature().GetPubKey(),
},
Bitmask: tc.GetAggregatedSignature().GetBitmask(),
},
}, txn)
if err != nil {
txn.Abort()
@ -2086,7 +2330,7 @@ func (e *AppConsensusEngine) VerifyQuorumCertificate(
}
}
provers, err := e.proverRegistry.GetActiveProvers(nil)
provers, err := e.proverRegistry.GetActiveProvers(e.appAddress)
if err != nil {
return errors.Wrap(err, "verify quorum certificate")
}
@ -2145,7 +2389,7 @@ func (e *AppConsensusEngine) VerifyTimeoutCertificate(
)
}
provers, err := e.proverRegistry.GetActiveProvers(nil)
provers, err := e.proverRegistry.GetActiveProvers(e.appAddress)
if err != nil {
return errors.Wrap(err, "verify timeout certificate")
}
@ -2204,7 +2448,7 @@ func (e *AppConsensusEngine) VerifyVote(
)
}
provers, err := e.proverRegistry.GetActiveProvers(nil)
provers, err := e.proverRegistry.GetActiveProvers(e.appAddress)
if err != nil {
return errors.Wrap(err, "verify vote")
}
@ -2367,4 +2611,99 @@ func (e *AppConsensusEngine) getRandomProverPeerId() (peer.ID, error) {
return id, nil
}
func (e *AppConsensusEngine) getPeerIDOfProver(
prover []byte,
) (peer.ID, error) {
registry, err := e.signerRegistry.GetKeyRegistryByProver(prover)
if err != nil {
e.logger.Debug(
"could not get registry for prover",
zap.Error(err),
)
return "", err
}
if registry == nil || registry.IdentityKey == nil {
e.logger.Debug("registry for prover not found")
return "", errors.New("registry not found for prover")
}
pk, err := pcrypto.UnmarshalEd448PublicKey(registry.IdentityKey.KeyValue)
if err != nil {
e.logger.Debug(
"could not parse pub key",
zap.Error(err),
)
return "", err
}
id, err := peer.IDFromPublicKey(pk)
if err != nil {
e.logger.Debug(
"could not derive peer id",
zap.Error(err),
)
return "", err
}
return id, nil
}
// extractShardAddresses extracts all possible shard addresses from a
// transaction address
func (e *AppConsensusEngine) extractShardAddresses(txAddress []byte) [][]byte {
var shardAddresses [][]byte
// Get the full path from the transaction address
path := GetFullPath(txAddress)
// The first 43 nibbles (258 bits) represent the base shard address
// We need to extract all possible shard addresses by considering path
// segments after the 43rd nibble
if len(path) <= 43 {
// If the path is too short, just return the original address truncated to
// 32 bytes
if len(txAddress) >= 32 {
shardAddresses = append(shardAddresses, txAddress[:32])
}
return shardAddresses
}
// Convert the first 43 nibbles to bytes (base shard address)
baseShardAddr := txAddress[:32]
l1 := up2p.GetBloomFilterIndices(baseShardAddr, 256, 3)
candidates := map[string]struct{}{}
// Now generate all possible shard addresses by extending the path
// Each additional nibble after the 43rd creates a new shard address
for i := 43; i < len(path); i++ {
// Create a new shard address by extending the base with this path segment
extendedAddr := make([]byte, 32)
copy(extendedAddr, baseShardAddr)
// Add the path segment as a byte
extendedAddr = append(extendedAddr, byte(path[i]))
candidates[string(extendedAddr)] = struct{}{}
}
shards, err := e.shardsStore.GetAppShards(
slices.Concat(l1, baseShardAddr),
[]uint32{},
)
if err != nil {
return [][]byte{}
}
for _, shard := range shards {
if _, ok := candidates[string(
slices.Concat(shard.L2, uint32ToBytes(shard.Path)),
)]; ok {
shardAddresses = append(shardAddresses, shard.L2)
}
}
return shardAddresses
}
var _ consensus.DynamicCommittee = (*AppConsensusEngine)(nil)

View File

@ -66,11 +66,6 @@ func (p *AppLeaderProvider) ProveNextState(
filter []byte,
priorState models.Identity,
) (**protobufs.AppShardFrame, error) {
_, err := p.engine.livenessProvider.Collect(ctx)
if err != nil {
return nil, models.NewNoVoteErrorf("could not collect: %+w", err)
}
timer := prometheus.NewTimer(frameProvingDuration.WithLabelValues(
p.engine.appAddressHex,
))
@ -122,11 +117,21 @@ func (p *AppLeaderProvider) ProveNextState(
}
// Get collected messages to include in frame
p.engine.collectedMessagesMu.Lock()
messages := make([]*protobufs.Message, len(p.engine.collectedMessages))
copy(messages, p.engine.collectedMessages)
p.engine.collectedMessages = []*protobufs.Message{}
p.engine.collectedMessagesMu.Unlock()
p.engine.provingMessagesMu.Lock()
messages := make([]*protobufs.Message, len(p.engine.provingMessages))
copy(messages, p.engine.provingMessages)
p.engine.provingMessages = []*protobufs.Message{}
p.engine.provingMessagesMu.Unlock()
if len(messages) == 0 {
p.engine.collectedMessagesMu.Lock()
if len(p.engine.collectedMessages) > 0 {
messages = make([]*protobufs.Message, len(p.engine.collectedMessages))
copy(messages, p.engine.collectedMessages)
p.engine.collectedMessages = []*protobufs.Message{}
}
p.engine.collectedMessagesMu.Unlock()
}
// Update pending messages metric
pendingMessagesCount.WithLabelValues(p.engine.appAddressHex).Set(0)

View File

@ -82,20 +82,6 @@ func (p *AppSyncProvider) Start(
request.frameNumber,
request.peerId,
)
case <-time.After(6 * time.Minute):
finalized := p.engine.forks.FinalizedState()
id, err := p.engine.getRandomProverPeerId()
if err != nil {
p.engine.logger.Debug("could not get random prover", zap.Error(err))
}
p.engine.logger.Info(
"synchronizing with peer",
zap.String("peer", id.String()),
zap.Uint64("finalized_rank", finalized.Rank),
)
p.processState(ctx, (*finalized.State).Header.FrameNumber, []byte(id))
}
}
}
@ -145,6 +131,39 @@ func (p *AppSyncProvider) Synchronize(
}
peerCount := p.engine.pubsub.GetPeerstoreCount()
requiredPeers := p.engine.config.Engine.MinimumPeersRequired
if peerCount < requiredPeers {
p.engine.logger.Info(
"waiting for minimum peers",
zap.Int("current", peerCount),
zap.Int("required", requiredPeers),
)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
waitPeers:
for {
select {
case <-ctx.Done():
errCh <- errors.Wrap(
ctx.Err(),
"synchronize cancelled while waiting for peers",
)
return
case <-ticker.C:
peerCount = p.engine.pubsub.GetPeerstoreCount()
if peerCount >= requiredPeers {
p.engine.logger.Info(
"minimum peers reached",
zap.Int("peers", peerCount),
)
break waitPeers
}
}
}
}
if peerCount < int(p.engine.minimumProvers()) {
errCh <- errors.Wrap(
errors.New("minimum provers not reached"),
@ -389,7 +408,10 @@ func (p *AppSyncProvider) syncWithPeer(
defer func() {
if err := cc.Close(); err != nil {
p.engine.logger.Error("error while closing connection", zap.Error(err))
p.engine.logger.Error(
"error while closing connection",
zap.Error(err),
)
}
}()
@ -402,7 +424,7 @@ func (p *AppSyncProvider) syncWithPeer(
getCtx,
&protobufs.GetAppShardProposalRequest{
Filter: p.engine.appAddress,
FrameNumber: frameNumber + 1,
FrameNumber: frameNumber,
},
// The message size limits are swapped because the server is the one
// sending the data.
@ -435,13 +457,20 @@ func (p *AppSyncProvider) syncWithPeer(
}
if response.Proposal == nil || response.Proposal.State == nil ||
response.Proposal.State.Header == nil ||
response.Proposal.State.Header.FrameNumber != frameNumber+1 {
response.Proposal.State.Header.FrameNumber != frameNumber {
p.engine.logger.Debug("received empty response from peer")
return nil
}
if err := response.Proposal.Validate(); err != nil {
p.engine.logger.Debug("received invalid response from peer")
return nil
}
p.engine.logger.Info(
"received new leading frame",
zap.Uint64("frame_number", response.Proposal.State.Header.FrameNumber),
zap.Uint64(
"frame_number",
response.Proposal.State.Header.FrameNumber,
),
zap.Duration(
"frame_age",
frametime.AppFrameSince(response.Proposal.State),
@ -666,7 +695,7 @@ func (p *AppSyncProvider) hyperSyncWithProver(
peerId, err := peer.IDFromPublicKey(pubKey)
if err == nil {
ch, err := p.engine.pubsub.GetDirectChannel(
p.engine.ctx,
context.Background(),
[]byte(peerId),
"sync",
)
@ -674,7 +703,7 @@ func (p *AppSyncProvider) hyperSyncWithProver(
if err == nil {
defer ch.Close()
client := protobufs.NewHypergraphComparisonServiceClient(ch)
str, err := client.HyperStream(p.engine.ctx)
str, err := client.HyperStream(context.Background())
if err != nil {
p.engine.logger.Error("error from sync", zap.Error(err))
} else {

View File

@ -2,6 +2,7 @@ package app
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
@ -166,16 +167,12 @@ func (e *AppConsensusEngine) handleAppShardProposal(
pqc := proposal.ParentQuorumCertificate
prtc := proposal.PriorRankTimeoutCertificate
vote := proposal.Vote
proposer := models.Identity("")
if vote != nil {
proposer = vote.Identity()
}
signedProposal := &models.SignedProposal[*protobufs.AppShardFrame, *protobufs.ProposalVote]{
Proposal: models.Proposal[*protobufs.AppShardFrame]{
State: &models.State[*protobufs.AppShardFrame]{
Rank: proposal.State.GetRank(),
Identifier: proposal.State.Identity(),
ProposerID: proposer,
ProposerID: proposal.State.Source(),
Timestamp: proposal.State.GetTimestamp(),
State: &proposal.State,
},
@ -210,24 +207,14 @@ func (e *AppConsensusEngine) handleAppShardProposal(
e.logger.Debug("dropping stale proposal")
return
}
// if we have a parent, cache and move on
e.proposalCacheMu.RLock()
_, ok := e.proposalCache[string(proposal.State.Header.ParentSelector)]
e.proposalCacheMu.RUnlock()
if ok {
e.proposalCacheMu.Lock()
e.proposalCache[proposal.State.Identity()] = proposal
e.proposalCacheMu.Unlock()
return
}
if proposal.State.Header.FrameNumber != 0 {
// also check with persistence layer
parent, _, err := e.clockStore.GetShardClockFrame(
proposal.State.Header.Address,
proposal.State.Header.FrameNumber-1,
false,
)
if err != nil || !bytes.Equal(
if err != nil || parent == nil || !bytes.Equal(
[]byte(parent.Identity()),
proposal.State.Header.ParentSelector,
) {
@ -235,31 +222,68 @@ func (e *AppConsensusEngine) handleAppShardProposal(
"parent frame not stored, requesting sync",
zap.Uint64("frame_number", proposal.State.Header.FrameNumber-1),
)
e.proposalCacheMu.Lock()
e.proposalCache[proposal.State.Identity()] = proposal
e.proposalCacheMu.Unlock()
var peerId []byte
if proposal.Vote != nil {
peerId = []byte(proposal.Vote.Identity())
} else {
id, err := e.getRandomProverPeerId()
e.cacheProposal(proposal)
peerID, err := e.getPeerIDOfProver(proposal.State.Header.Prover)
if err != nil {
peerID, err = e.getRandomProverPeerId()
if err != nil {
e.logger.Debug("could not get random peer", zap.Error(err))
e.logger.Debug("could not get peer id for sync", zap.Error(err))
return
}
peerId = []byte(id)
}
e.syncProvider.AddState(peerId, proposal.State.Header.FrameNumber-1)
head, err := e.appTimeReel.GetHead()
if err != nil || head == nil || head.Header == nil {
e.logger.Debug("could not get shard time reel head", zap.Error(err))
return
}
e.syncProvider.AddState([]byte(peerID), head.Header.FrameNumber)
return
}
}
e.processProposal(proposal)
frameNumber := proposal.State.Header.FrameNumber
expectedFrame, err := e.appTimeReel.GetHead()
if err != nil {
e.logger.Error("could not obtain app time reel head", zap.Error(err))
return
}
expectedFrameNumber := uint64(0)
if expectedFrame != nil && expectedFrame.Header != nil {
expectedFrameNumber = expectedFrame.Header.FrameNumber + 1
}
if frameNumber < expectedFrameNumber {
e.logger.Debug(
"dropping proposal behind expected frame",
zap.Uint64("frame_number", frameNumber),
zap.Uint64("expected_frame_number", expectedFrameNumber),
)
return
}
if frameNumber == expectedFrameNumber {
e.deleteCachedProposal(frameNumber)
if e.processProposal(proposal) {
e.drainProposalCache(frameNumber + 1)
return
}
e.logger.Debug("failed to process expected proposal, caching")
e.cacheProposal(proposal)
return
}
e.cacheProposal(proposal)
e.drainProposalCache(expectedFrameNumber)
}
func (e *AppConsensusEngine) processProposal(
proposal *protobufs.AppShardProposal,
) {
) bool {
e.logger.Debug(
"processing proposal",
zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))),
@ -268,14 +292,14 @@ func (e *AppConsensusEngine) processProposal(
err := e.VerifyQuorumCertificate(proposal.ParentQuorumCertificate)
if err != nil {
e.logger.Debug("proposal has invalid qc", zap.Error(err))
return
return false
}
if proposal.PriorRankTimeoutCertificate != nil {
err := e.VerifyTimeoutCertificate(proposal.PriorRankTimeoutCertificate)
if err != nil {
e.logger.Debug("proposal has invalid tc", zap.Error(err))
return
return false
}
}
@ -283,20 +307,20 @@ func (e *AppConsensusEngine) processProposal(
err := e.VerifyVote(&proposal.Vote)
if err != nil {
e.logger.Debug("proposal has invalid vote", zap.Error(err))
return
return false
}
}
err = proposal.State.Validate()
if err != nil {
e.logger.Debug("proposal is not valid", zap.Error(err))
return
return false
}
valid, err := e.frameValidator.Validate(proposal.State)
if !valid || err != nil {
e.logger.Debug("invalid frame in proposal", zap.Error(err))
return
return false
}
// Small gotcha: the proposal structure uses interfaces, so we can't assign
@ -327,28 +351,243 @@ func (e *AppConsensusEngine) processProposal(
signedProposal.PreviousRankTimeoutCertificate = prtc
}
e.voteAggregator.AddState(signedProposal)
e.consensusParticipant.SubmitProposal(signedProposal)
proposalProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
e.proposalCacheMu.RLock()
props := []*protobufs.AppShardProposal{}
removes := []string{}
for id, prop := range e.proposalCache {
if bytes.Equal(
prop.State.Header.ParentSelector,
[]byte(proposal.State.Identity()),
) {
props = append(props, prop)
removes = append(removes, id)
}
e.trySealParentWithChild(proposal)
e.registerPendingCertifiedParent(proposal)
return true
}
func (e *AppConsensusEngine) cacheProposal(
proposal *protobufs.AppShardProposal,
) {
if proposal == nil || proposal.State == nil || proposal.State.Header == nil {
return
}
e.proposalCacheMu.RUnlock()
for i := range props {
prop := props[i]
remove := removes[i]
e.processProposal(prop)
e.proposalCacheMu.Lock()
delete(e.proposalCache, remove)
e.proposalCacheMu.Unlock()
frameNumber := proposal.State.Header.FrameNumber
e.proposalCacheMu.Lock()
e.proposalCache[frameNumber] = proposal
e.proposalCacheMu.Unlock()
e.logger.Debug(
"cached out-of-order proposal",
zap.String("address", e.appAddressHex),
zap.Uint64("frame_number", frameNumber),
)
}
func (e *AppConsensusEngine) deleteCachedProposal(frameNumber uint64) {
e.proposalCacheMu.Lock()
delete(e.proposalCache, frameNumber)
e.proposalCacheMu.Unlock()
}
func (e *AppConsensusEngine) popCachedProposal(
frameNumber uint64,
) *protobufs.AppShardProposal {
e.proposalCacheMu.Lock()
defer e.proposalCacheMu.Unlock()
proposal, ok := e.proposalCache[frameNumber]
if ok {
delete(e.proposalCache, frameNumber)
}
return proposal
}
func (e *AppConsensusEngine) drainProposalCache(startFrame uint64) {
next := startFrame
for {
prop := e.popCachedProposal(next)
if prop == nil {
return
}
if !e.processProposal(prop) {
e.logger.Debug(
"cached proposal failed processing, retaining for retry",
zap.String("address", e.appAddressHex),
zap.Uint64("frame_number", next),
)
e.cacheProposal(prop)
return
}
next++
}
}
func (e *AppConsensusEngine) registerPendingCertifiedParent(
proposal *protobufs.AppShardProposal,
) {
if proposal == nil || proposal.State == nil || proposal.State.Header == nil {
return
}
frameNumber := proposal.State.Header.FrameNumber
e.pendingCertifiedParentsMu.Lock()
e.pendingCertifiedParents[frameNumber] = proposal
e.pendingCertifiedParentsMu.Unlock()
}
func (e *AppConsensusEngine) trySealParentWithChild(
child *protobufs.AppShardProposal,
) {
if child == nil || child.State == nil || child.State.Header == nil {
return
}
header := child.State.Header
if header.FrameNumber == 0 {
return
}
parentFrame := header.FrameNumber - 1
e.pendingCertifiedParentsMu.RLock()
parent, ok := e.pendingCertifiedParents[parentFrame]
e.pendingCertifiedParentsMu.RUnlock()
if !ok || parent == nil || parent.State == nil || parent.State.Header == nil {
return
}
if !bytes.Equal(
header.ParentSelector,
[]byte(parent.State.Identity()),
) {
e.logger.Debug(
"pending parent selector mismatch, dropping entry",
zap.String("address", e.appAddressHex),
zap.Uint64("parent_frame", parent.State.Header.FrameNumber),
zap.Uint64("child_frame", header.FrameNumber),
)
e.pendingCertifiedParentsMu.Lock()
delete(e.pendingCertifiedParents, parentFrame)
e.pendingCertifiedParentsMu.Unlock()
return
}
head, err := e.appTimeReel.GetHead()
if err != nil {
e.logger.Error("error fetching app time reel head", zap.Error(err))
return
}
if head != nil && head.Header != nil &&
head.Header.FrameNumber+1 == parent.State.Header.FrameNumber {
e.logger.Debug(
"sealing parent with descendant proposal",
zap.String("address", e.appAddressHex),
zap.Uint64("parent_frame", parent.State.Header.FrameNumber),
zap.Uint64("child_frame", header.FrameNumber),
)
e.addCertifiedState(parent, child)
}
e.pendingCertifiedParentsMu.Lock()
delete(e.pendingCertifiedParents, parentFrame)
e.pendingCertifiedParentsMu.Unlock()
}
func (e *AppConsensusEngine) addCertifiedState(
parent, child *protobufs.AppShardProposal,
) {
if parent == nil || parent.State == nil || parent.State.Header == nil ||
child == nil || child.State == nil || child.State.Header == nil {
e.logger.Error("cannot seal certified state: missing parent or child data")
return
}
qc := child.ParentQuorumCertificate
if qc == nil {
e.logger.Error(
"child missing parent quorum certificate",
zap.Uint64("child_frame_number", child.State.Header.FrameNumber),
)
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
aggregateSig := &protobufs.BLS48581AggregateSignature{
Signature: qc.GetAggregatedSignature().GetSignature(),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: qc.GetAggregatedSignature().GetPubKey(),
},
Bitmask: qc.GetAggregatedSignature().GetBitmask(),
}
if err := e.clockStore.PutQuorumCertificate(
&protobufs.QuorumCertificate{
Filter: e.appAddress,
Rank: qc.GetRank(),
FrameNumber: qc.GetFrameNumber(),
Selector: []byte(qc.Identity()),
AggregateSignature: aggregateSig,
},
txn,
); err != nil {
e.logger.Error("could not insert quorum certificate", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
child.State.Header.PublicKeySignatureBls48581 = aggregateSig
if err := e.appTimeReel.Insert(child.State); err != nil {
e.logger.Error("could not insert frame into app time reel", zap.Error(err))
return
}
head, err := e.appTimeReel.GetHead()
if err != nil {
e.logger.Error("could not get app time reel head", zap.Error(err))
return
}
if head == nil || head.Header == nil ||
!bytes.Equal(child.State.Header.Output, head.Header.Output) {
e.logger.Error(
"app frames not aligned",
zap.String("address", e.appAddressHex),
zap.Uint64("new_frame_number", child.State.Header.FrameNumber),
)
return
}
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutCertifiedAppShardState(
child,
txn,
); err != nil {
e.logger.Error("could not insert certified state", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
}
@ -374,6 +613,9 @@ func (e *AppConsensusEngine) handleConsensusMessage(message *pb.Message) {
case protobufs.TimeoutStateType:
e.handleTimeoutState(message)
case protobufs.ProverLivenessCheckType:
// Liveness checks are processed globally; nothing to do here.
default:
e.logger.Debug(
"received unknown message type on app address",
@ -448,7 +690,10 @@ func (e *AppConsensusEngine) handleProverMessage(message *pb.Message) {
typePrefix := e.peekMessageType(message)
e.logger.Debug("handling prover message", zap.Uint32("type_prefix", typePrefix))
e.logger.Debug(
"handling prover message",
zap.Uint32("type_prefix", typePrefix),
)
switch typePrefix {
case protobufs.MessageBundleType:
// MessageBundle messages need to be collected for execution
@ -605,7 +850,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) {
}
if err := e.dispatchService.AddInboxMessage(
e.ctx,
context.Background(),
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
@ -618,7 +863,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) {
}
if err := e.dispatchService.AddHubInboxAssociation(
e.ctx,
context.Background(),
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
@ -631,7 +876,7 @@ func (e *AppConsensusEngine) handleDispatchMessage(message *pb.Message) {
}
if err := e.dispatchService.DeleteHubInboxAssociation(
e.ctx,
context.Background(),
envelope,
); err != nil {
e.logger.Debug("failed to add inbox message", zap.Error(err))
@ -663,12 +908,34 @@ func (e *AppConsensusEngine) handleProposal(message *pb.Message) {
return
}
if !bytes.Equal(proposal.State.Header.Address, e.appAddress) {
return
}
frameIDBI, _ := poseidon.HashBytes(proposal.State.Header.Output)
frameID := frameIDBI.FillBytes(make([]byte, 32))
e.frameStoreMu.Lock()
e.frameStore[string(frameID)] = proposal.State
e.frameStoreMu.Unlock()
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutProposalVote(txn, proposal.Vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.appShardProposalQueue <- proposal
proposalProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()
@ -697,10 +964,10 @@ func (e *AppConsensusEngine) handleVote(message *pb.Message) {
return
}
proverSet, err := e.proverRegistry.GetActiveProvers(nil)
proverSet, err := e.proverRegistry.GetActiveProvers(e.appAddress)
if err != nil {
e.logger.Error("could not get active provers", zap.Error(err))
voteProcessedTotal.WithLabelValues("error").Inc()
voteProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
@ -726,7 +993,25 @@ func (e *AppConsensusEngine) handleVote(message *pb.Message) {
),
),
)
voteProcessedTotal.WithLabelValues("error").Inc()
voteProcessedTotal.WithLabelValues(e.appAddressHex, "error").Inc()
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutProposalVote(txn, vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
@ -770,6 +1055,24 @@ func (e *AppConsensusEngine) handleTimeoutState(message *pb.Message) {
timeout.PriorRankTimeoutCertificate = prtc
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutTimeoutVote(txn, timeoutState); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.timeoutAggregator.AddTimeout(timeout)
timeoutStateProcessedTotal.WithLabelValues(e.appAddressHex, "success").Inc()

View File

@ -146,6 +146,22 @@ func (e *AppConsensusEngine) validateConsensusMessage(
timeoutStateValidationTotal.WithLabelValues(e.appAddressHex, "accept").Inc()
case protobufs.ProverLivenessCheckType:
check := &protobufs.ProverLivenessCheck{}
if err := check.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal liveness check", zap.Error(err))
return p2p.ValidationResultReject
}
if err := check.Validate(); err != nil {
e.logger.Debug("invalid liveness check", zap.Error(err))
return p2p.ValidationResultReject
}
if len(check.Filter) != 0 && !bytes.Equal(check.Filter, e.appAddress) {
return p2p.ValidationResultIgnore
}
default:
return p2p.ValidationResultReject
}

View File

@ -107,6 +107,15 @@ func (e *AppConsensusEngine) GetAppShardProposal(
return &protobufs.AppShardProposalResponse{}, nil
}
vote, err := e.clockStore.GetProposalVote(
request.Filter,
frame.GetRank(),
[]byte(frame.Source()),
)
if err != nil {
return &protobufs.AppShardProposalResponse{}, nil
}
parent, _, err := e.clockStore.GetShardClockFrame(
request.Filter,
request.FrameNumber-1,
@ -144,6 +153,7 @@ func (e *AppConsensusEngine) GetAppShardProposal(
State: frame,
ParentQuorumCertificate: parentQC,
PriorRankTimeoutCertificate: priorRankTC,
Vote: vote,
}
return &protobufs.AppShardProposalResponse{
Proposal: proposal,

View File

@ -5,15 +5,18 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"math/big"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
// GlobalLeaderProvider implements LeaderProvider
@ -149,22 +152,6 @@ func (p *GlobalLeaderProvider) ProveNextState(
p.engine.currentDifficulty = uint32(difficulty)
p.engine.currentDifficultyMu.Unlock()
// Prove the global frame header
newHeader, err := p.engine.frameProver.ProveGlobalFrameHeader(
(*prior).Header,
p.engine.shardCommitments,
p.engine.proverRoot,
signer,
timestamp,
uint32(difficulty),
proverIndex,
)
if err != nil {
frameProvingTotal.WithLabelValues("error").Inc()
return nil, errors.Wrap(err, "prove next state")
}
newHeader.Rank = rank
// Convert collected messages to MessageBundles
requests := make(
[]*protobufs.MessageBundle,
@ -175,6 +162,7 @@ func (p *GlobalLeaderProvider) ProveNextState(
"including messages",
zap.Int("message_count", len(p.engine.collectedMessages)),
)
requestTree := &tries.VectorCommitmentTree{}
for _, msgData := range p.engine.collectedMessages {
// Check if data is long enough to contain type prefix
if len(msgData) < 4 {
@ -211,9 +199,40 @@ func (p *GlobalLeaderProvider) ProveNextState(
if messageBundle.Timestamp == 0 {
messageBundle.Timestamp = time.Now().UnixMilli()
}
id := sha3.Sum256(msgData)
err := requestTree.Insert(id[:], msgData, nil, big.NewInt(0))
if err != nil {
p.engine.logger.Warn(
"failed to add global request",
zap.Error(err),
)
continue
}
requests = append(requests, messageBundle)
}
requestRoot := requestTree.Commit(p.engine.inclusionProver, false)
// Prove the global frame header
newHeader, err := p.engine.frameProver.ProveGlobalFrameHeader(
(*prior).Header,
p.engine.shardCommitments,
p.engine.proverRoot,
requestRoot,
signer,
timestamp,
uint32(difficulty),
proverIndex,
)
if err != nil {
frameProvingTotal.WithLabelValues("error").Inc()
return nil, errors.Wrap(err, "prove next state")
}
newHeader.Prover = p.engine.getProverAddress()
newHeader.Rank = rank
// Create the new global frame with requests
newFrame := &protobufs.GlobalFrame{
Header: newHeader,

View File

@ -70,20 +70,6 @@ func (p *GlobalSyncProvider) Start(
request.frameNumber,
request.peerId,
)
case <-time.After(6 * time.Minute):
finalized := p.engine.forks.FinalizedState()
id, err := p.engine.getRandomProverPeerId()
if err != nil {
p.engine.logger.Debug("could not get random prover", zap.Error(err))
}
p.engine.logger.Info(
"synchronizing with peer",
zap.String("peer", id.String()),
zap.Uint64("finalized_rank", finalized.Rank),
)
p.processState(ctx, (*finalized.State).Header.FrameNumber, []byte(id))
}
}
}
@ -340,7 +326,7 @@ func (p *GlobalSyncProvider) syncWithPeer(
response, err := client.GetGlobalProposal(
getCtx,
&protobufs.GetGlobalProposalRequest{
FrameNumber: frameNumber + 1,
FrameNumber: frameNumber,
},
// The message size limits are swapped because the server is the one
// sending the data.
@ -374,10 +360,18 @@ func (p *GlobalSyncProvider) syncWithPeer(
if response.Proposal == nil || response.Proposal.State == nil ||
response.Proposal.State.Header == nil ||
response.Proposal.State.Header.FrameNumber != frameNumber+1 {
response.Proposal.State.Header.FrameNumber != frameNumber {
p.engine.logger.Debug("received empty response from peer")
return nil
}
if err := response.Proposal.Validate(); err != nil {
p.engine.logger.Debug(
"received invalid response from peer",
zap.Error(err),
)
return nil
}
p.engine.logger.Info(
"received new leading frame",
zap.Uint64("frame_number", response.Proposal.State.Header.FrameNumber),

View File

@ -128,6 +128,7 @@ type GlobalConsensusEngine struct {
peerInfoManager tp2p.PeerInfoManager
workerManager worker.WorkerManager
proposer *provers.Manager
currentRank uint64
alertPublicKey []byte
hasSentKeyBundle bool
@ -146,24 +147,26 @@ type GlobalConsensusEngine struct {
halt context.CancelFunc
// Internal state
quit chan struct{}
wg sync.WaitGroup
minimumProvers func() uint64
blacklistMap map[string]bool
blacklistMu sync.RWMutex
pendingMessages [][]byte
pendingMessagesMu sync.RWMutex
currentDifficulty uint32
currentDifficultyMu sync.RWMutex
lastProvenFrameTime time.Time
lastProvenFrameTimeMu sync.RWMutex
frameStore map[string]*protobufs.GlobalFrame
frameStoreMu sync.RWMutex
proposalCache map[string]*protobufs.GlobalProposal
proposalCacheMu sync.RWMutex
appFrameStore map[string]*protobufs.AppShardFrame
appFrameStoreMu sync.RWMutex
lowCoverageStreak map[string]*coverageStreak
quit chan struct{}
wg sync.WaitGroup
minimumProvers func() uint64
blacklistMap map[string]bool
blacklistMu sync.RWMutex
pendingMessages [][]byte
pendingMessagesMu sync.RWMutex
currentDifficulty uint32
currentDifficultyMu sync.RWMutex
lastProvenFrameTime time.Time
lastProvenFrameTimeMu sync.RWMutex
frameStore map[string]*protobufs.GlobalFrame
frameStoreMu sync.RWMutex
proposalCache map[uint64]*protobufs.GlobalProposal
proposalCacheMu sync.RWMutex
pendingCertifiedParents map[uint64]*protobufs.GlobalProposal
pendingCertifiedParentsMu sync.RWMutex
appFrameStore map[string]*protobufs.AppShardFrame
appFrameStoreMu sync.RWMutex
lowCoverageStreak map[string]*coverageStreak
// Transaction cross-shard lock tracking
txLockMap map[uint64]map[string]map[string]*LockedTransaction
@ -269,7 +272,8 @@ func NewGlobalConsensusEngine(
peerInfoManager: peerInfoManager,
frameStore: make(map[string]*protobufs.GlobalFrame),
appFrameStore: make(map[string]*protobufs.AppShardFrame),
proposalCache: make(map[string]*protobufs.GlobalProposal),
proposalCache: make(map[uint64]*protobufs.GlobalProposal),
pendingCertifiedParents: make(map[uint64]*protobufs.GlobalProposal),
globalConsensusMessageQueue: make(chan *pb.Message, 1000),
globalFrameMessageQueue: make(chan *pb.Message, 100),
globalProverMessageQueue: make(chan *pb.Message, 1000),
@ -551,6 +555,11 @@ func NewGlobalConsensusEngine(
pending = engine.getPendingProposals(frame.Header.FrameNumber)
}
liveness, err := engine.consensusStore.GetLivenessState(nil)
if err == nil {
engine.currentRank = liveness.CurrentRank
}
engine.voteAggregator, err = voting.NewGlobalVoteAggregator[GlobalPeerID](
tracing.NewZapTracer(logger),
engine,
@ -2539,42 +2548,62 @@ func (e *GlobalConsensusEngine) OnOwnProposal(
],
targetPublicationTime time.Time,
) {
select {
case <-e.haltCtx.Done():
return
default:
}
var priorTC *protobufs.TimeoutCertificate = nil
if proposal.PreviousRankTimeoutCertificate != nil {
priorTC =
proposal.PreviousRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
}
go func() {
select {
case <-time.After(time.Until(targetPublicationTime)):
case <-e.ShutdownSignal():
return
}
var priorTC *protobufs.TimeoutCertificate = nil
if proposal.PreviousRankTimeoutCertificate != nil {
priorTC =
proposal.PreviousRankTimeoutCertificate.(*protobufs.TimeoutCertificate)
}
// Manually override the signature as the vdf prover's signature is invalid
(*proposal.State.State).Header.PublicKeySignatureBls48581.Signature =
(*proposal.Vote).PublicKeySignatureBls48581.Signature
// Manually override the signature as the vdf prover's signature is invalid
(*proposal.State.State).Header.PublicKeySignatureBls48581.Signature =
(*proposal.Vote).PublicKeySignatureBls48581.Signature
pbProposal := &protobufs.GlobalProposal{
State: *proposal.State.State,
ParentQuorumCertificate: proposal.Proposal.State.ParentQuorumCertificate.(*protobufs.QuorumCertificate),
PriorRankTimeoutCertificate: priorTC,
Vote: *proposal.Vote,
}
data, err := pbProposal.ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize proposal", zap.Error(err))
return
}
pbProposal := &protobufs.GlobalProposal{
State: *proposal.State.State,
ParentQuorumCertificate: proposal.Proposal.State.ParentQuorumCertificate.(*protobufs.QuorumCertificate),
PriorRankTimeoutCertificate: priorTC,
Vote: *proposal.Vote,
}
data, err := pbProposal.ToCanonicalBytes()
if err != nil {
e.logger.Error("could not serialize proposal", zap.Error(err))
return
}
e.voteAggregator.AddState(proposal)
e.consensusParticipant.SubmitProposal(proposal)
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.pubsub.PublishToBitmask(
GLOBAL_CONSENSUS_BITMASK,
data,
); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
if err := e.clockStore.PutProposalVote(txn, *proposal.Vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.voteAggregator.AddState(proposal)
e.consensusParticipant.SubmitProposal(proposal)
if err := e.pubsub.PublishToBitmask(
GLOBAL_CONSENSUS_BITMASK,
data,
); err != nil {
e.logger.Error("could not publish", zap.Error(err))
}
}()
}
// OnOwnTimeout implements consensus.Consumer.
@ -2606,6 +2635,24 @@ func (e *GlobalConsensusEngine) OnOwnTimeout(
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutTimeoutVote(txn, pbTimeout); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.timeoutAggregator.AddTimeout(timeout)
if err := e.pubsub.PublishToBitmask(
@ -2633,6 +2680,24 @@ func (e *GlobalConsensusEngine) OnOwnVote(
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutProposalVote(txn, *vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.voteAggregator.AddVote(vote)
if err := e.pubsub.PublishToBitmask(
@ -2707,6 +2772,20 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
zap.Uint64("rank", newRank-1),
zap.Uint64("frame_number", qc.GetFrameNumber()),
)
current, err := e.globalTimeReel.GetHead()
if err != nil {
e.logger.Error("could not get time reel head", zap.Error(err))
return
}
peer, err := e.getRandomProverPeerId()
if err != nil {
e.logger.Error("could not get random peer", zap.Error(err))
return
}
e.syncProvider.AddState(
[]byte(peer),
current.Header.FrameNumber,
)
return
}
@ -2718,6 +2797,38 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
return
}
current, err := e.globalTimeReel.GetHead()
if err != nil {
e.logger.Error("could not get time reel head", zap.Error(err))
return
}
if !bytes.Equal(frame.Header.Output, current.Header.Output) {
e.logger.Error(
"frames not aligned, might need sync",
zap.Uint64("new_frame_number", frame.Header.FrameNumber),
zap.Uint64("reel_frame_number", current.Header.FrameNumber),
zap.Uint64("new_frame_rank", frame.Header.Rank),
zap.Uint64("reel_frame_rank", current.Header.Rank),
zap.String("new_frame_id", hex.EncodeToString([]byte(frame.Identity()))),
zap.String(
"reel_frame_id",
hex.EncodeToString([]byte(current.Identity())),
),
)
peerID, err := e.getPeerIDOfProver(frame.Header.Prover)
if err != nil {
return
}
e.syncProvider.AddState(
[]byte(peerID),
current.Header.FrameNumber,
)
return
}
if !bytes.Equal(frame.Header.ParentSelector, parentQC.Selector) {
e.logger.Error(
"quorum certificate does not match frame parent",
@ -2739,6 +2850,20 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
e.logger.Debug("no prior rank TC to include", zap.Uint64("rank", newRank-1))
}
vote, err := e.clockStore.GetProposalVote(
nil,
frame.GetRank(),
[]byte(frame.Source()),
)
if err != nil {
e.logger.Error(
"cannot find proposer's vote",
zap.Uint64("rank", newRank-1),
zap.String("proposer", hex.EncodeToString([]byte(frame.Source()))),
)
return
}
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
@ -2750,6 +2875,7 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
State: frame,
ParentQuorumCertificate: parentQC,
PriorRankTimeoutCertificate: priorRankTC,
Vote: vote,
},
txn,
); err != nil {
@ -2765,7 +2891,9 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
}
// OnRankChange implements consensus.Consumer.
func (e *GlobalConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) {}
func (e *GlobalConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) {
e.currentRank = newRank
}
// OnReceiveProposal implements consensus.Consumer.
func (e *GlobalConsensusEngine) OnReceiveProposal(
@ -2843,6 +2971,13 @@ func (e *GlobalConsensusEngine) OnTimeoutCertificateTriggeredRankChange(
Bitmask: qc.GetAggregatedSignature().GetBitmask(),
},
},
AggregateSignature: &protobufs.BLS48581AggregateSignature{
Signature: tc.GetAggregatedSignature().GetSignature(),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: tc.GetAggregatedSignature().GetPubKey(),
},
Bitmask: tc.GetAggregatedSignature().GetBitmask(),
},
}, txn)
if err != nil {
txn.Abort()
@ -3114,20 +3249,11 @@ func (e *GlobalConsensusEngine) getPendingProposals(
return result
}
func (e *GlobalConsensusEngine) getRandomProverPeerId() (peer.ID, error) {
provers, err := e.proverRegistry.GetActiveProvers(nil)
if err != nil {
e.logger.Error(
"could not get active provers for sync",
zap.Error(err),
)
}
if len(provers) == 0 {
return "", err
}
index := rand.Intn(len(provers))
func (e *GlobalConsensusEngine) getPeerIDOfProver(
prover []byte,
) (peer.ID, error) {
registry, err := e.signerRegistry.GetKeyRegistryByProver(
provers[index].Address,
prover,
)
if err != nil {
e.logger.Debug(
@ -3163,4 +3289,28 @@ func (e *GlobalConsensusEngine) getRandomProverPeerId() (peer.ID, error) {
return id, nil
}
func (e *GlobalConsensusEngine) getRandomProverPeerId() (peer.ID, error) {
provers, err := e.proverRegistry.GetActiveProvers(nil)
if err != nil {
e.logger.Error(
"could not get active provers for sync",
zap.Error(err),
)
}
if len(provers) == 0 {
return "", err
}
otherProvers := []*typesconsensus.ProverInfo{}
for _, p := range provers {
if bytes.Equal(p.Address, e.getProverAddress()) {
continue
}
otherProvers = append(otherProvers, p)
}
index := rand.Intn(len(otherProvers))
return e.getPeerIDOfProver(otherProvers[index].Address)
}
var _ consensus.DynamicCommittee = (*GlobalConsensusEngine)(nil)

View File

@ -13,6 +13,7 @@
"QmestbFp8PddwRk6ysBRrmWZEiHun5aRidHkqFxgeFaWVK": "030d8e130fd666160ff5df6dbf935adb69b4906e6ac074f675a268b59b470b9393f45273143924d6a8d647fbce93d4bbfaad10bc13c72770b68070ad0898fd98f2f9882d614c00eb909a07dcccc8d7d7472b13617832ebd1943fd66d8a05cda88171990b15e8a99514a7817619f7fb2577ee82ca496ca36aaa52f31052c2e76039b10f0fcb0b44c2a0dd830e4fdf40fb5647e903c3e44689df8cf38830b9edc6a1746c38672b255b956a434d6538c8807d9be066e9a7121da436a6ac215cafe10a8b44192b252bd5683c3e0c7805155e8f7ad8dfc3b01467f2f029f91100caf4907600d8562b3442952ba8dc086661aa5cafaf09acbf01ec55b950032b3d358b2fa6ee282fd72a4cb28022052bd48656e5aac56a9c50eae3e187eda525fc4552010cb1419507b271c7ff3bd7aabb9ee9a1eb1dc4ad88a76f42808e716490c02efddc3c0a5e2386efcdb0b83dccdc7c543d1571e2d9c927ff433ca8aa03b091e0138549c79fe2c1a90e669cef03540353d6ea1ca5ca71b43dc27caa64447b3c49bd4a66dd12f457468b1d32515efc471e2189bc2bb4ce43b5a526c1e6aa7d60e1e1b6e6f795232a823c37ea6ab2a8af2834a3980a758bec050507b416dad50a3645cf757b48b7f43e1b786d1d9b00c270bcd650ae9e1e325765a751c566776d6e5558a6fcd054a2b4947d5b4adf4c528dac71bde90fcca346505869180f4ed740085197cd2a40c6f8ebbeb90d8c5bf359e21870626fbb2151c9f5d507f5903f7abfaeaea88ca14af4cb51b45d9624c8ebecec5f98b9b41182c539d13ae3fe21c2cafd80abdfbba14b75",
"QmaMcbX3NH5d6BXy87C1n2xT68XzCwtkbvMRqdLYphh19b": "02108cc721d540c80e378b84b06cb67ecaf9f4bd091a52c1d85b3ef26d536e8482b72cf6651b445bcd0943b40d98bf03c66208170c04689d510cc2317c7ab5c26bb00bb8a1186aee4cc80f6041aca5676b575e65801c88f9e039845338f7ec42fa825b7a7d79138b8ea3427675398b9b6cafd90e5acb33445059946298e675917fdd40fb5810b2dc9de51d7798f271d6eb6cee0353ffb24982dd2e5ac9e482e0fdac819ded3115fe2ce5a28b581a22e74c79aba7895f4ec758c7f57d85c481b4393a0bea7a6b37a0e64a7a4674818f59c9acdb40538103fba190a89f0f05c46a96b24b93c643279c929f6f81f43d17ce0b5ec29e23bf6bcc22efafc6bd8aba77fabf128742fcf5c5f77266c5690118f2b331d4fc6283b9a56f905f94421b9fe9b8789d4b5e07b1b4ae71cb2c064309303fa348a0e58c009f2e34087ac5fcf89c02e6a20b70e7840483bdbbd98f79d4586c4478e869711f70a8bea4baeb789ec63575d42975eb148c79f7f1a02e8d1199214724596cab28dede4c585feab18fa25d9640b117872d8e8af563e1c5dc8a63e7b971ba3fbb79a744c2672dcec227d78583461e66f08ff2cdd6772dd310c9b008a85b1d3a3b010ff224faf8b4863056812a6c0f038a1f29af7c1d23d88f06e722c50f12f59543e550d22155cb7696325bba91055fd1f136dbdf3183c4b39eed32350dc8d11d2a8aaea57d5a77370de21169b96cb673a97aefc2eaa222a8f7964f10618bb25f61f3eb5bcea94e130fcb1a33bd36d3ac612d8e72de81eff2ceefca9c85efcd3218f61737c11c070b4f14790808e591170d2533",
"QmS3xJKbAmQxDiry9HpXV6bJyRvyd47pbufpZwEmgY1cy6": "0202e6667d72cf6e25691421b84774f38399aa0580e6807ec71053cb7c67571b2d36e992bf65862c81568860c9a649c413d63785f431f8484f3372d2f61de79485ee31f3670f6dc0340d0142b89c4b17972a0ee8a90e923c237e49dd9d0a658f93a008cb962f5ccb5fbe774c21ecf3bf228944114b8dbbd98128bf40299d52a30f8447db3c755e94a942b68f950e50e26c94d7126265216e69e995337443ec72baf1a5c61c72195e16923b7d04d52802cbd1a27d0b92bce34b6b755fdc7021427a6678d9cbf209874884993effb96181e6caa04dfa43586f72f262bc0a327d6b05f8754c4ffcd2e0a94745917a544fcb808043745d24fc816d4c5a84b03358b0ab24f26f92f409fad55206142aab29952d27f094394ee8b00b5f418a89d1caf95455dd6551067b0ac9540624097b283eeb59ca2b2f8c4e82bb06d6952be97a6e61ed55878aec3a13496a2d9e1015c7a456525552f8c0e9ee8cc8c5c989bc1feb57b8d630d24a05ccf824ee66031a0060729318061c6b933ca1e9659e44f3a11c3f65e3f8d2c2bc7944124290192355913ead6be3ca047a01d2b7a66f48aeed19b96b9209fab73922a1424d4006c42270f8814bf5c544080db0b783402eefcc7a5b41b52d8f6c287dc1c6806994d74a77566fb0cfb946a08329478d0b255d9afbdfa860051060e73b04bb817d86888115bb1b74078a479e9dda2a957e14780ca5100ac7fea80f497bac01b6b9f44e6137de16616961501dcb28b0e766cf3c1fdc87c5ab701510560041857ff32f629fba9077ef7d1473ecd69d0e39ee9c899d2d2afcd2013929670d25",
"QmZKERVN8UkwLp9mPCZw4aaRx9N8Ewnkv7VQh1zyZwBSir": "0301d31c4a06e16789184aa15898d96df20e4286569cfde2f26ae44407705a3ab2969876a146e360f33516422bec027809143183b07c3d84c578dbfa87a690a50e35f450c281be0433c70e9e5d2b0aa1967719d06af2c9c2e3e257624567e4c8f9882328ed2011d327ece7cdc1a23ec370ede0ad28a00cf476156c0d7b0968e16b21e01bae11993d988415f18173bcb99887e00137202680a818549aa6944360ac03f234e9aaaa3b333ee96a9f19f693cac97ec5c736b216d210550311507766b72779021b4023d354bd35fc0f2834014911a4ea8fddff19a7a8f69e030cb119d64190fb81d3635721014b05695566d0cb890f5d86ad0d007ea2a8b3008717d89ff9775950083439969873cfacd258be04d05128de5ae60bfb704174592f6565c5539d8e6804a2e899e19acb512eeba676a5b0c64b868937b578f3741a671938aedba2329c17d21a4d910d2b2b886b5efa502c1de3f05495eef88e2247d4d751983a81a928f9b957eabdfb7f7e510ec5dedf9bcdaff92126aff162773299ab920f390fdb1b3bd9e6ae46eb3b16a07ffd69fb38c916c77ed6deb721b0355c21cb9d9cb4b22e8a41756a40c2d48a4764f6781c865a700614126c1008d910a7bbd758261bee38914b753d15259c094b57f301acff008fbad5161aff0204a96290f395206535feaedcedb0cb6121fdf31c28ab9c7d85c7dec473f531347b4f76c12a0c5eb7f0c3c0077697373a409dba2a0813be122807ae6df88e1aa5d086e265e9ea394e5b98a0d96527cf69bc794ea17c54cfa68fd5c75856a6c6d3ff9e7a9df0f22853e20ac9b6442d"
"QmZKERVN8UkwLp9mPCZw4aaRx9N8Ewnkv7VQh1zyZwBSir": "0301d31c4a06e16789184aa15898d96df20e4286569cfde2f26ae44407705a3ab2969876a146e360f33516422bec027809143183b07c3d84c578dbfa87a690a50e35f450c281be0433c70e9e5d2b0aa1967719d06af2c9c2e3e257624567e4c8f9882328ed2011d327ece7cdc1a23ec370ede0ad28a00cf476156c0d7b0968e16b21e01bae11993d988415f18173bcb99887e00137202680a818549aa6944360ac03f234e9aaaa3b333ee96a9f19f693cac97ec5c736b216d210550311507766b72779021b4023d354bd35fc0f2834014911a4ea8fddff19a7a8f69e030cb119d64190fb81d3635721014b05695566d0cb890f5d86ad0d007ea2a8b3008717d89ff9775950083439969873cfacd258be04d05128de5ae60bfb704174592f6565c5539d8e6804a2e899e19acb512eeba676a5b0c64b868937b578f3741a671938aedba2329c17d21a4d910d2b2b886b5efa502c1de3f05495eef88e2247d4d751983a81a928f9b957eabdfb7f7e510ec5dedf9bcdaff92126aff162773299ab920f390fdb1b3bd9e6ae46eb3b16a07ffd69fb38c916c77ed6deb721b0355c21cb9d9cb4b22e8a41756a40c2d48a4764f6781c865a700614126c1008d910a7bbd758261bee38914b753d15259c094b57f301acff008fbad5161aff0204a96290f395206535feaedcedb0cb6121fdf31c28ab9c7d85c7dec473f531347b4f76c12a0c5eb7f0c3c0077697373a409dba2a0813be122807ae6df88e1aa5d086e265e9ea394e5b98a0d96527cf69bc794ea17c54cfa68fd5c75856a6c6d3ff9e7a9df0f22853e20ac9b6442d",
"QmcuXdV3mdgwmhUv9kzRnZmjJyBwE7erNWVo8Q2ikrcjzX": "030a9616776ff90a2027c4a9df7234e12bbc6747ab47767502f3b0c2853e93c2be31c5f6abe2be2b5e331d389a1ecead12dbddc8edf0ee9ba5a8ecf7732b0fc3c967322b2aedbb5064b80a8e3434181d75be04d8033923a2beed4f1f9a28f3ce3b9734fddd7520ae1513bba4843200ffe8e7b43bb3ef7bf532505bb70baa86b68dfd4d419ac63ce13f7310fcbebb7e944aa0640331a9c9ce0f8feede477d47ceb78c988c46e3c87ae561b5bf4d59b187693a2bf47182c503d2b0e937ea87187bfc0e7526775a5d1355c71a3200a1eb5256ef965b57a5429305a6fa8d092b104f4038b09c59f0720d2fc25ccd418e51e59f30417b9855d7e1bcd36b9799c0a6e1d2dbf7997650a9159515c72a8b37e2817c27088d1c92e59821c58e909cac8551ba2998dc7d00c903bdb96234d57f9db4e70320257aa55f8f21ce0cb59231e0517b361ed4e03d6445da94bb55fb536f7ccebea6e048efb8e9d7c77755c9c16fb749209a77f99e9d603437e45aab3711aceba7ac7a2840f3665d2ee315a7f034205af39f277f7fbd9bbabe7e4a0dd3cdf0a820d44c93573738b084952453483ce110e0a977e8ac2d460baffd55ba9d10304c0def3d8a962a07aa879d931a9a6cb39458ea7a49bb33a30059de8c1946d88ab73faf2ff471108cf917206f4ee82b674b1d438f6016f7ca2d0a6108d7618e4bda97f9808b05115e8673b934ca3a31e30e323e8680847f76804e505b35d43ecf377ea123318b3b778c64e32cba8816cef307692d4875e0af4d5c6cdf1c3eebc042365fe2dd667a1a50003d41bfd04b754a94d388dfff35a1cd"
}
}

View File

@ -19,6 +19,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
var keyRegistryDomain = []byte("KEY_REGISTRY")
@ -193,6 +194,9 @@ func (e *GlobalConsensusEngine) handleShardConsensusMessage(
case protobufs.ProposalVoteType:
e.handleShardVote(message)
case protobufs.ProverLivenessCheckType:
e.handleShardLivenessCheck(message)
case protobufs.TimeoutStateType:
// e.handleShardTimeout(message)
}
@ -841,16 +845,12 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
pqc := proposal.ParentQuorumCertificate
prtc := proposal.PriorRankTimeoutCertificate
vote := proposal.Vote
proposer := models.Identity("")
if vote != nil {
proposer = vote.Identity()
}
signedProposal := &models.SignedProposal[*protobufs.GlobalFrame, *protobufs.ProposalVote]{
Proposal: models.Proposal[*protobufs.GlobalFrame]{
State: &models.State[*protobufs.GlobalFrame]{
Rank: proposal.State.GetRank(),
Identifier: proposal.State.Identity(),
ProposerID: proposer,
ProposerID: proposal.Vote.Identity(),
Timestamp: proposal.State.GetTimestamp(),
State: &proposal.State,
},
@ -884,16 +884,6 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
}
// if we have a parent, cache and move on
e.proposalCacheMu.RLock()
_, ok := e.proposalCache[string(proposal.State.Header.ParentSelector)]
e.proposalCacheMu.RUnlock()
if ok {
e.proposalCacheMu.Lock()
e.proposalCache[proposal.State.Identity()] = proposal
e.proposalCacheMu.Unlock()
return
}
if proposal.State.Header.FrameNumber != 0 {
// also check with persistence layer
parent, err := e.clockStore.GetGlobalClockFrame(
@ -907,31 +897,66 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
"parent frame not stored, requesting sync",
zap.Uint64("frame_number", proposal.State.Header.FrameNumber-1),
)
e.proposalCacheMu.Lock()
e.proposalCache[proposal.State.Identity()] = proposal
e.proposalCacheMu.Unlock()
var peerId []byte
if proposal.Vote != nil {
peerId = []byte(proposal.Vote.Identity())
} else {
id, err := e.getRandomProverPeerId()
e.cacheProposal(proposal)
peerID, err := e.getPeerIDOfProver(proposal.State.Header.Prover)
if err != nil {
peerID, err = e.getRandomProverPeerId()
if err != nil {
e.logger.Debug("could not get random peer", zap.Error(err))
return
}
peerId = []byte(id)
}
e.syncProvider.AddState(peerId, proposal.State.Header.FrameNumber-1)
head, err := e.globalTimeReel.GetHead()
if err != nil {
return
}
e.syncProvider.AddState(
[]byte(peerID),
head.Header.FrameNumber,
)
return
}
}
e.processProposal(proposal)
frameNumber := proposal.State.Header.FrameNumber
expectedFrame, err := e.globalTimeReel.GetHead()
if err != nil {
e.logger.Error("could not obtain time reel head", zap.Error(err))
return
}
expectedFrameNumber := expectedFrame.Header.FrameNumber + 1
if frameNumber < expectedFrameNumber {
e.logger.Debug(
"dropping proposal behind expected frame",
zap.Uint64("frame_number", frameNumber),
zap.Uint64("expected_frame_number", expectedFrameNumber),
)
return
}
if frameNumber == expectedFrameNumber {
e.deleteCachedProposal(frameNumber)
if e.processProposal(proposal) {
e.drainProposalCache(frameNumber + 1)
return
}
e.logger.Debug("failed to process expected proposal, caching")
e.cacheProposal(proposal)
return
}
e.cacheProposal(proposal)
e.drainProposalCache(expectedFrameNumber)
}
func (e *GlobalConsensusEngine) processProposal(
proposal *protobufs.GlobalProposal,
) {
) bool {
e.logger.Debug(
"processing proposal",
zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))),
@ -940,35 +965,33 @@ func (e *GlobalConsensusEngine) processProposal(
err := e.VerifyQuorumCertificate(proposal.ParentQuorumCertificate)
if err != nil {
e.logger.Debug("proposal has invalid qc", zap.Error(err))
return
return false
}
if proposal.PriorRankTimeoutCertificate != nil {
err := e.VerifyTimeoutCertificate(proposal.PriorRankTimeoutCertificate)
if err != nil {
e.logger.Debug("proposal has invalid tc", zap.Error(err))
return
return false
}
}
if proposal.Vote != nil {
err := e.VerifyVote(&proposal.Vote)
if err != nil {
e.logger.Debug("proposal has invalid vote", zap.Error(err))
return
}
err = e.VerifyVote(&proposal.Vote)
if err != nil {
e.logger.Debug("proposal has invalid vote", zap.Error(err))
return false
}
err = proposal.State.Validate()
if err != nil {
e.logger.Debug("proposal is not valid", zap.Error(err))
return
return false
}
valid, err := e.frameValidator.Validate(proposal.State)
if !valid || err != nil {
e.logger.Debug("invalid frame in proposal", zap.Error(err))
return
return false
}
// Small gotcha: the proposal structure uses interfaces, so we can't assign
@ -983,7 +1006,7 @@ func (e *GlobalConsensusEngine) processProposal(
State: &models.State[*protobufs.GlobalFrame]{
Rank: proposal.State.GetRank(),
Identifier: proposal.State.Identity(),
ProposerID: proposal.Vote.Identity(),
ProposerID: vote.Identity(),
Timestamp: proposal.State.GetTimestamp(),
State: &proposal.State,
},
@ -1001,27 +1024,241 @@ func (e *GlobalConsensusEngine) processProposal(
e.voteAggregator.AddState(signedProposal)
e.consensusParticipant.SubmitProposal(signedProposal)
e.proposalCacheMu.RLock()
props := []*protobufs.GlobalProposal{}
removes := []string{}
for id, prop := range e.proposalCache {
if bytes.Equal(
prop.State.Header.ParentSelector,
[]byte(proposal.State.Identity()),
) {
props = append(props, prop)
removes = append(removes, id)
}
}
e.proposalCacheMu.RUnlock()
e.trySealParentWithChild(proposal)
e.registerPendingCertifiedParent(proposal)
for i := range props {
prop := props[i]
remove := removes[i]
e.processProposal(prop)
e.proposalCacheMu.Lock()
delete(e.proposalCache, remove)
e.proposalCacheMu.Unlock()
return true
}
func (e *GlobalConsensusEngine) cacheProposal(
proposal *protobufs.GlobalProposal,
) {
frameNumber := proposal.State.Header.FrameNumber
e.proposalCacheMu.Lock()
e.proposalCache[frameNumber] = proposal
e.proposalCacheMu.Unlock()
e.logger.Debug(
"cached out-of-order proposal",
zap.Uint64("frame_number", frameNumber),
zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))),
)
}
func (e *GlobalConsensusEngine) deleteCachedProposal(frameNumber uint64) {
e.proposalCacheMu.Lock()
delete(e.proposalCache, frameNumber)
e.proposalCacheMu.Unlock()
}
func (e *GlobalConsensusEngine) popCachedProposal(
frameNumber uint64,
) *protobufs.GlobalProposal {
e.proposalCacheMu.Lock()
defer e.proposalCacheMu.Unlock()
proposal, ok := e.proposalCache[frameNumber]
if ok {
delete(e.proposalCache, frameNumber)
}
return proposal
}
func (e *GlobalConsensusEngine) drainProposalCache(startFrame uint64) {
next := startFrame
for {
prop := e.popCachedProposal(next)
if prop == nil {
return
}
if !e.processProposal(prop) {
e.logger.Debug(
"cached proposal failed processing, retaining for retry",
zap.Uint64("frame_number", next),
)
e.cacheProposal(prop)
return
}
next++
}
}
func (e *GlobalConsensusEngine) registerPendingCertifiedParent(
proposal *protobufs.GlobalProposal,
) {
if proposal == nil || proposal.State == nil || proposal.State.Header == nil {
return
}
frameNumber := proposal.State.Header.FrameNumber
e.pendingCertifiedParentsMu.Lock()
e.pendingCertifiedParents[frameNumber] = proposal
e.pendingCertifiedParentsMu.Unlock()
}
func (e *GlobalConsensusEngine) trySealParentWithChild(
child *protobufs.GlobalProposal,
) {
if child == nil || child.State == nil || child.State.Header == nil {
return
}
header := child.State.Header
if header.FrameNumber == 0 {
return
}
parentFrame := header.FrameNumber - 1
e.pendingCertifiedParentsMu.RLock()
parent, ok := e.pendingCertifiedParents[parentFrame]
e.pendingCertifiedParentsMu.RUnlock()
if !ok || parent == nil || parent.State == nil || parent.State.Header == nil {
return
}
if !bytes.Equal(
header.ParentSelector,
[]byte(parent.State.Identity()),
) {
e.logger.Debug(
"pending parent selector mismatch, dropping entry",
zap.Uint64("parent_frame", parent.State.Header.FrameNumber),
zap.Uint64("child_frame", header.FrameNumber),
)
e.pendingCertifiedParentsMu.Lock()
delete(e.pendingCertifiedParents, parentFrame)
e.pendingCertifiedParentsMu.Unlock()
return
}
e.logger.Debug(
"sealing parent with descendant proposal",
zap.Uint64("parent_frame", parent.State.Header.FrameNumber),
zap.Uint64("child_frame", header.FrameNumber),
)
head, err := e.globalTimeReel.GetHead()
if err != nil {
e.logger.Error("error fetching time reel head", zap.Error(err))
return
}
if head.Header.FrameNumber+1 == parent.State.Header.FrameNumber {
e.addCertifiedState(parent, child)
}
e.pendingCertifiedParentsMu.Lock()
delete(e.pendingCertifiedParents, parentFrame)
e.pendingCertifiedParentsMu.Unlock()
}
func (e *GlobalConsensusEngine) addCertifiedState(
parent, child *protobufs.GlobalProposal,
) {
if parent == nil || parent.State == nil || parent.State.Header == nil ||
child == nil || child.State == nil || child.State.Header == nil {
e.logger.Error("cannot seal certified state: missing parent or child data")
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
qc := child.ParentQuorumCertificate
if qc == nil {
e.logger.Error(
"child missing parent quorum certificate",
zap.Uint64("child_frame_number", child.State.Header.FrameNumber),
)
return
}
aggregateSig := &protobufs.BLS48581AggregateSignature{
Signature: qc.GetAggregatedSignature().GetSignature(),
PublicKey: &protobufs.BLS48581G2PublicKey{
KeyValue: qc.GetAggregatedSignature().GetPubKey(),
},
Bitmask: qc.GetAggregatedSignature().GetBitmask(),
}
if err := e.clockStore.PutQuorumCertificate(
&protobufs.QuorumCertificate{
Rank: qc.GetRank(),
FrameNumber: qc.GetFrameNumber(),
Selector: []byte(qc.Identity()),
AggregateSignature: aggregateSig,
},
txn,
); err != nil {
e.logger.Error("could not insert quorum certificate", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
parent.State.Header.PublicKeySignatureBls48581 = aggregateSig
err = e.globalTimeReel.Insert(parent.State)
if err != nil {
e.logger.Error("could not insert frame into time reel", zap.Error(err))
return
}
current, err := e.globalTimeReel.GetHead()
if err != nil {
e.logger.Error("could not get time reel head", zap.Error(err))
return
}
if !bytes.Equal(parent.State.Header.Output, current.Header.Output) {
e.logger.Error(
"frames not aligned",
zap.Uint64("parent_frame_number", parent.State.Header.FrameNumber),
zap.Uint64("new_frame_number", child.State.Header.FrameNumber),
zap.Uint64("reel_frame_number", current.Header.FrameNumber),
zap.Uint64("new_frame_rank", child.State.Header.Rank),
zap.Uint64("reel_frame_rank", current.Header.Rank),
zap.String(
"new_frame_id",
hex.EncodeToString([]byte(child.State.Identity())),
),
zap.String(
"reel_frame_id",
hex.EncodeToString([]byte(current.Identity())),
),
)
return
}
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutCertifiedGlobalState(
parent,
txn,
); err != nil {
e.logger.Error("could not insert certified state", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
}
@ -1047,6 +1284,24 @@ func (e *GlobalConsensusEngine) handleProposal(message *pb.Message) {
e.frameStore[string(frameID)] = proposal.State
e.frameStoreMu.Unlock()
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutProposalVote(txn, proposal.Vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.globalProposalQueue <- proposal
// Success metric recorded at the end of processing
@ -1114,8 +1369,25 @@ func (e *GlobalConsensusEngine) handleVote(message *pb.Message) {
return
}
e.voteAggregator.AddVote(&vote)
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutProposalVote(txn, vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.voteAggregator.AddVote(&vote)
voteProcessedTotal.WithLabelValues("success").Inc()
}
@ -1160,6 +1432,24 @@ func (e *GlobalConsensusEngine) handleTimeoutState(message *pb.Message) {
timeout.PriorRankTimeoutCertificate = prtc
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutTimeoutVote(txn, timeoutState); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.timeoutAggregator.AddTimeout(timeout)
voteProcessedTotal.WithLabelValues("success").Inc()
@ -1232,6 +1522,134 @@ func (e *GlobalConsensusEngine) handleShardProposal(message *pb.Message) {
shardProposalProcessedTotal.WithLabelValues("success").Inc()
}
func (e *GlobalConsensusEngine) handleShardLivenessCheck(message *pb.Message) {
timer := prometheus.NewTimer(shardLivenessCheckProcessingDuration)
defer timer.ObserveDuration()
livenessCheck := &protobufs.ProverLivenessCheck{}
if err := livenessCheck.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal liveness check", zap.Error(err))
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
// Validate the liveness check structure
if err := livenessCheck.Validate(); err != nil {
e.logger.Debug("invalid liveness check", zap.Error(err))
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
proverSet, err := e.proverRegistry.GetActiveProvers(livenessCheck.Filter)
if err != nil {
e.logger.Error("could not receive liveness check", zap.Error(err))
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
var found []byte = nil
for _, prover := range proverSet {
if bytes.Equal(
prover.Address,
livenessCheck.PublicKeySignatureBls48581.Address,
) {
lcBytes, err := livenessCheck.ConstructSignaturePayload()
if err != nil {
e.logger.Error(
"could not construct signature message for liveness check",
zap.Error(err),
)
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
break
}
valid, err := e.keyManager.ValidateSignature(
crypto.KeyTypeBLS48581G1,
prover.PublicKey,
lcBytes,
livenessCheck.PublicKeySignatureBls48581.Signature,
livenessCheck.GetSignatureDomain(),
)
if err != nil || !valid {
e.logger.Error(
"could not validate signature for liveness check",
zap.Error(err),
)
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
break
}
found = prover.PublicKey
break
}
}
if found == nil {
e.logger.Warn(
"invalid liveness check",
zap.String(
"prover",
hex.EncodeToString(
livenessCheck.PublicKeySignatureBls48581.Address,
),
),
)
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
if len(livenessCheck.CommitmentHash) > 32 {
e.txLockMu.Lock()
if _, ok := e.txLockMap[livenessCheck.FrameNumber]; !ok {
e.txLockMap[livenessCheck.FrameNumber] = make(
map[string]map[string]*LockedTransaction,
)
}
_, ok := e.txLockMap[livenessCheck.FrameNumber][string(livenessCheck.Filter)]
if !ok {
e.txLockMap[livenessCheck.FrameNumber][string(livenessCheck.Filter)] =
make(map[string]*LockedTransaction)
}
filter := string(livenessCheck.Filter)
commits, err := tries.DeserializeNonLazyTree(
livenessCheck.CommitmentHash[32:],
)
if err != nil {
e.txLockMu.Unlock()
e.logger.Error("could not deserialize commitment trie", zap.Error(err))
shardLivenessCheckProcessedTotal.WithLabelValues("error").Inc()
return
}
leaves := tries.GetAllPreloadedLeaves(commits.Root)
for _, leaf := range leaves {
existing, ok := e.txLockMap[livenessCheck.FrameNumber][filter][string(leaf.Key)]
prover := []byte{}
if ok {
prover = existing.Prover
}
prover = append(
prover,
livenessCheck.PublicKeySignatureBls48581.Address...,
)
e.txLockMap[livenessCheck.FrameNumber][filter][string(leaf.Key)] =
&LockedTransaction{
TransactionHash: leaf.Key,
ShardAddresses: slices.Collect(slices.Chunk(leaf.Value, 64)),
Prover: prover,
Committed: false,
Filled: false,
}
}
e.txLockMu.Unlock()
}
shardLivenessCheckProcessedTotal.WithLabelValues("success").Inc()
}
func (e *GlobalConsensusEngine) handleShardVote(message *pb.Message) {
timer := prometheus.NewTimer(shardVoteProcessingDuration)
defer timer.ObserveDuration()

View File

@ -43,7 +43,14 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage(
return tp2p.ValidationResultReject
}
if e.forks.FinalizedRank() > proposal.GetRank() {
if err := proposal.Validate(); err != nil {
e.logger.Debug("invalid proposal", zap.Error(err))
proposalValidationTotal.WithLabelValues("reject").Inc()
return tp2p.ValidationResultIgnore
}
if e.currentRank > proposal.GetRank() {
e.logger.Debug("proposal is stale")
proposalValidationTotal.WithLabelValues("reject").Inc()
return tp2p.ValidationResultIgnore
}
@ -76,8 +83,8 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage(
return tp2p.ValidationResultReject
}
now := uint64(time.Now().UnixMilli())
if vote.Timestamp > now+5000 || vote.Timestamp < now-5000 {
if e.currentRank > vote.Rank {
e.logger.Debug("vote is stale")
return tp2p.ValidationResultIgnore
}
@ -103,9 +110,8 @@ func (e *GlobalConsensusEngine) validateGlobalConsensusMessage(
return tp2p.ValidationResultReject
}
now := uint64(time.Now().UnixMilli())
if timeoutState.Timestamp > now+5000 ||
timeoutState.Timestamp < now-5000 {
if e.currentRank > timeoutState.Vote.Rank {
e.logger.Debug("timeout is stale")
return tp2p.ValidationResultIgnore
}
@ -242,6 +248,18 @@ func (e *GlobalConsensusEngine) validateShardConsensusMessage(
shardTimeoutStateValidationTotal.WithLabelValues("accept").Inc()
case protobufs.ProverLivenessCheckType:
check := &protobufs.ProverLivenessCheck{}
if err := check.FromCanonicalBytes(message.Data); err != nil {
e.logger.Debug("failed to unmarshal liveness check", zap.Error(err))
return tp2p.ValidationResultReject
}
if err := check.Validate(); err != nil {
e.logger.Debug("invalid liveness check", zap.Error(err))
return tp2p.ValidationResultReject
}
default:
return tp2p.ValidationResultReject
}

View File

@ -95,6 +95,27 @@ var (
},
)
// Shard liveness check processing metrics
shardLivenessCheckProcessedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: subsystem,
Name: "shard_liveness_check_processed_total",
Help: "Total number of shard liveness checks processed by the global consensus engine",
},
[]string{"status"}, // status: "success", "error", "invalid"
)
shardLivenessCheckProcessingDuration = promauto.NewHistogram(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: subsystem,
Name: "shard_liveness_check_processing_duration_seconds",
Help: "Time taken to process a shard liveness check",
Buckets: prometheus.DefBuckets,
},
)
// Global vote processing metrics
voteProcessedTotal = promauto.NewCounterVec(
prometheus.CounterOpts{

View File

@ -99,6 +99,15 @@ func (e *GlobalConsensusEngine) GetGlobalProposal(
return &protobufs.GlobalProposalResponse{}, nil
}
vote, err := e.clockStore.GetProposalVote(
nil,
frame.GetRank(),
[]byte(frame.Source()),
)
if err != nil {
return &protobufs.GlobalProposalResponse{}, nil
}
parent, err := e.clockStore.GetGlobalClockFrame(request.FrameNumber - 1)
if err != nil {
e.logger.Debug(
@ -128,6 +137,7 @@ func (e *GlobalConsensusEngine) GetGlobalProposal(
State: frame,
ParentQuorumCertificate: parentQC,
PriorRankTimeoutCertificate: priorRankTC,
Vote: vote,
}
return &protobufs.GlobalProposalResponse{

View File

@ -8,9 +8,11 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math/big"
"path/filepath"
"slices"
"sort"
"strings"
@ -279,6 +281,11 @@ func decodeValue(key []byte, value []byte) string {
return shortHex(value)
}
return decodeDataProofValue(key[0], key[1], value)
case store.CONSENSUS:
if len(key) < 2 {
return shortHex(value)
}
return decodeConsensusValue(key, value)
case store.INBOX:
if len(key) < 2 {
return shortHex(value)
@ -286,6 +293,8 @@ func decodeValue(key []byte, value []byte) string {
return decodeInboxValue(key[1], value)
case store.HYPERGRAPH_SHARD:
return decodeHypergraphValue(key, value)
case store.MIGRATION:
return decodeMigrationValue(value)
default:
return shortHex(value)
}
@ -327,11 +336,98 @@ func decodeClockValue(key []byte, value []byte) string {
return fmt.Sprintf("frame=%d", frame)
}
return shortHex(value)
case store.CLOCK_GLOBAL_CERTIFIED_STATE,
store.CLOCK_SHARD_CERTIFIED_STATE:
return decodeCertifiedStateValue(value)
case store.CLOCK_GLOBAL_CERTIFIED_STATE_INDEX_EARLIEST,
store.CLOCK_GLOBAL_CERTIFIED_STATE_INDEX_LATEST,
store.CLOCK_SHARD_CERTIFIED_STATE_INDEX_EARLIEST,
store.CLOCK_SHARD_CERTIFIED_STATE_INDEX_LATEST,
store.CLOCK_QUORUM_CERTIFICATE_INDEX_EARLIEST,
store.CLOCK_QUORUM_CERTIFICATE_INDEX_LATEST,
store.CLOCK_TIMEOUT_CERTIFICATE_INDEX_EARLIEST,
store.CLOCK_TIMEOUT_CERTIFICATE_INDEX_LATEST:
if len(value) == 8 {
rank := binary.BigEndian.Uint64(value)
return fmt.Sprintf("rank=%d", rank)
}
return shortHex(value)
case store.CLOCK_QUORUM_CERTIFICATE:
return decodeQuorumCertificateValue(value)
case store.CLOCK_TIMEOUT_CERTIFICATE:
return decodeTimeoutCertificateValue(value)
default:
return shortHex(value)
}
}
func decodeCertifiedStateValue(value []byte) string {
if len(value) != 24 {
return shortHex(value)
}
frameNumber := binary.BigEndian.Uint64(value[:8])
qcRank := binary.BigEndian.Uint64(value[8:16])
tcRank := binary.BigEndian.Uint64(value[16:])
return fmt.Sprintf(
"frame=%d quorum_rank=%d timeout_rank=%d",
frameNumber,
qcRank,
tcRank,
)
}
func decodeQuorumCertificateValue(value []byte) string {
qc := &protobufs.QuorumCertificate{}
if err := qc.FromCanonicalBytes(slices.Clone(value)); err != nil {
return fmt.Sprintf(
"quorum_certificate decode_error=%v raw=%s",
err,
shortHex(value),
)
}
if s, err := jsonMarshaler.Marshal(qc); err == nil {
return string(s)
}
return shortHex(value)
}
func decodeTimeoutCertificateValue(value []byte) string {
tc := &protobufs.TimeoutCertificate{}
if err := tc.FromCanonicalBytes(slices.Clone(value)); err != nil {
return fmt.Sprintf(
"timeout_certificate decode_error=%v raw=%s",
err,
shortHex(value),
)
}
if s, err := jsonMarshaler.Marshal(tc); err == nil {
return string(s)
}
return shortHex(value)
}
func decodeTimeoutStateValue(value []byte) string {
state := &protobufs.TimeoutState{}
if err := state.FromCanonicalBytes(slices.Clone(value)); err != nil {
return fmt.Sprintf(
"timeout_state decode_error=%v raw=%s",
err,
shortHex(value),
)
}
if s, err := jsonMarshaler.Marshal(state); err == nil {
return string(s)
}
return shortHex(value)
}
func decodeKeyBundleValue(sub byte, value []byte) string {
switch sub {
case store.KEY_IDENTITY:
@ -409,6 +505,151 @@ func decodeDataProofValue(prefix byte, sub byte, value []byte) string {
return shortHex(value)
}
func decodeConsensusValue(key []byte, value []byte) string {
switch key[1] {
case store.CONSENSUS_STATE:
return decodeConsensusStateValue(value)
case store.CONSENSUS_LIVENESS:
return decodeConsensusLivenessValue(value)
default:
return shortHex(value)
}
}
func decodeConsensusStateValue(value []byte) string {
buf := bytes.NewReader(value)
filter, err := readUint32PrefixedBytes(buf)
if err != nil {
return fmt.Sprintf(
"consensus_state decode_error=%v raw=%s",
err,
shortHex(value),
)
}
var finalizedRank uint64
if err := binary.Read(buf, binary.BigEndian, &finalizedRank); err != nil {
return fmt.Sprintf(
"consensus_state decode_error=%v raw=%s",
err,
shortHex(value),
)
}
var latestAckRank uint64
if err := binary.Read(buf, binary.BigEndian, &latestAckRank); err != nil {
return fmt.Sprintf(
"consensus_state decode_error=%v raw=%s",
err,
shortHex(value),
)
}
latestTimeoutBytes, err := readUint32PrefixedBytes(buf)
if err != nil {
return fmt.Sprintf(
"consensus_state decode_error=%v raw=%s",
err,
shortHex(value),
)
}
var builder strings.Builder
fmt.Fprintf(&builder, "filter=%s\n", shortHex(filter))
fmt.Fprintf(&builder, "finalized_rank=%d\n", finalizedRank)
fmt.Fprintf(&builder, "latest_ack_rank=%d", latestAckRank)
if len(latestTimeoutBytes) > 0 {
builder.WriteString("\nlatest_timeout_state=\n")
builder.WriteString(indent(decodeTimeoutStateValue(latestTimeoutBytes)))
}
return builder.String()
}
func decodeConsensusLivenessValue(value []byte) string {
buf := bytes.NewReader(value)
filter, err := readUint32PrefixedBytes(buf)
if err != nil {
return fmt.Sprintf(
"consensus_liveness decode_error=%v raw=%s",
err,
shortHex(value),
)
}
var currentRank uint64
if err := binary.Read(buf, binary.BigEndian, &currentRank); err != nil {
return fmt.Sprintf(
"consensus_liveness decode_error=%v raw=%s",
err,
shortHex(value),
)
}
latestQCBytes, err := readUint32PrefixedBytes(buf)
if err != nil {
return fmt.Sprintf(
"consensus_liveness decode_error=%v raw=%s",
err,
shortHex(value),
)
}
priorTCBytes, err := readUint32PrefixedBytes(buf)
if err != nil {
return fmt.Sprintf(
"consensus_liveness decode_error=%v raw=%s",
err,
shortHex(value),
)
}
var builder strings.Builder
fmt.Fprintf(&builder, "filter=%s\n", shortHex(filter))
fmt.Fprintf(&builder, "current_rank=%d", currentRank)
if len(latestQCBytes) > 0 {
builder.WriteString("\nlatest_quorum_certificate=\n")
builder.WriteString(indent(decodeQuorumCertificateValue(latestQCBytes)))
}
if len(priorTCBytes) > 0 {
builder.WriteString("\nprior_timeout_certificate=\n")
builder.WriteString(indent(decodeTimeoutCertificateValue(priorTCBytes)))
}
return builder.String()
}
func decodeMigrationValue(value []byte) string {
if len(value) == 8 {
version := binary.BigEndian.Uint64(value)
return fmt.Sprintf("migration_version=%d", version)
}
return shortHex(value)
}
func readUint32PrefixedBytes(r io.Reader) ([]byte, error) {
var length uint32
if err := binary.Read(r, binary.BigEndian, &length); err != nil {
return nil, err
}
if length == 0 {
return nil, nil
}
data := make([]byte, length)
if _, err := io.ReadFull(r, data); err != nil {
return nil, err
}
return data, nil
}
func decodeInboxValue(sub byte, value []byte) string {
switch sub {
case store.INBOX_MESSAGE:
@ -761,6 +1002,10 @@ func describeKey(key []byte) string {
return describeInboxKey(key)
case store.WORKER:
return describeWorkerKey(key)
case store.CONSENSUS:
return describeConsensusKey(key)
case store.MIGRATION:
return "pebble store migration version"
default:
return fmt.Sprintf("unknown prefix 0x%02x (len=%d)", key[0], len(key))
}
@ -826,6 +1071,37 @@ func describeClockKey(key []byte) string {
"clock shard frame latest index shard=%s",
shortHex(key[2:]),
)
case store.CLOCK_GLOBAL_CERTIFIED_STATE:
if len(key) >= 10 {
rank := binary.BigEndian.Uint64(key[2:10])
return fmt.Sprintf("clock global certified state rank=%d", rank)
}
return "clock global certified state (invalid length)"
case store.CLOCK_GLOBAL_CERTIFIED_STATE_INDEX_EARLIEST:
return "clock global certified state earliest index"
case store.CLOCK_GLOBAL_CERTIFIED_STATE_INDEX_LATEST:
return "clock global certified state latest index"
case store.CLOCK_SHARD_CERTIFIED_STATE:
if len(key) >= 10 {
rank := binary.BigEndian.Uint64(key[2:10])
filter := key[10:]
return fmt.Sprintf(
"clock shard certified state rank=%d shard=%s",
rank,
shortHex(filter),
)
}
return "clock shard certified state (invalid length)"
case store.CLOCK_SHARD_CERTIFIED_STATE_INDEX_EARLIEST:
return fmt.Sprintf(
"clock shard certified state earliest index shard=%s",
shortHex(key[2:]),
)
case store.CLOCK_SHARD_CERTIFIED_STATE_INDEX_LATEST:
return fmt.Sprintf(
"clock shard certified state latest index shard=%s",
shortHex(key[2:]),
)
case store.CLOCK_SHARD_FRAME_INDEX_PARENT:
if len(key) >= 42 {
frame := binary.BigEndian.Uint64(key[2:10])
@ -880,6 +1156,51 @@ func describeClockKey(key []byte) string {
"clock compaction marker shard=%s",
shortHex(key[2:]),
)
case store.CLOCK_QUORUM_CERTIFICATE:
if len(key) >= 10 {
rank := binary.BigEndian.Uint64(key[2:10])
filter := key[10:]
if len(filter) > 0 {
return fmt.Sprintf(
"clock quorum certificate rank=%d filter=%s",
rank,
shortHex(filter),
)
}
return fmt.Sprintf("clock quorum certificate rank=%d", rank)
}
return "clock quorum certificate (invalid length)"
case store.CLOCK_QUORUM_CERTIFICATE_INDEX_EARLIEST:
return fmt.Sprintf(
"clock quorum certificate earliest index filter=%s",
shortHex(key[2:]),
)
case store.CLOCK_QUORUM_CERTIFICATE_INDEX_LATEST:
return fmt.Sprintf(
"clock quorum certificate latest index filter=%s",
shortHex(key[2:]),
)
case store.CLOCK_TIMEOUT_CERTIFICATE:
if len(key) >= 10 {
rank := binary.BigEndian.Uint64(key[2:10])
filter := key[10:]
return fmt.Sprintf(
"clock timeout certificate rank=%d filter=%s",
rank,
shortHex(filter),
)
}
return "clock timeout certificate (invalid length)"
case store.CLOCK_TIMEOUT_CERTIFICATE_INDEX_EARLIEST:
return fmt.Sprintf(
"clock timeout certificate earliest index filter=%s",
shortHex(key[2:]),
)
case store.CLOCK_TIMEOUT_CERTIFICATE_INDEX_LATEST:
return fmt.Sprintf(
"clock timeout certificate latest index filter=%s",
shortHex(key[2:]),
)
case store.CLOCK_SHARD_FRAME_CANDIDATE_SHARD:
return fmt.Sprintf("clock shard candidate frame raw=%s", shortHex(key))
case store.CLOCK_SHARD_FRAME_CANDIDATE_INDEX_LATEST:
@ -1433,6 +1754,27 @@ func describeWorkerKey(key []byte) string {
}
}
func describeConsensusKey(key []byte) string {
if len(key) < 2 {
return "consensus store: invalid key length"
}
sub := key[1]
filter := key[2:]
switch sub {
case store.CONSENSUS_STATE:
return fmt.Sprintf("consensus state filter=%s", shortHex(filter))
case store.CONSENSUS_LIVENESS:
return fmt.Sprintf("consensus liveness filter=%s", shortHex(filter))
default:
return fmt.Sprintf(
"consensus store unknown subtype 0x%02x raw=%s",
sub,
shortHex(filter),
)
}
}
func shortHex(b []byte) string {
if len(b) == 0 {
return "0x"

View File

@ -731,6 +731,22 @@ func clockGlobalFrameRequestKey(
return key
}
func clockProposalVoteKey(rank uint64, filter []byte, identity []byte) []byte {
key := []byte{CLOCK_FRAME, CLOCK_PROPOSAL_VOTE}
key = binary.BigEndian.AppendUint64(key, rank)
key = append(key, filter...)
key = append(key, identity...)
return key
}
func clockTimeoutVoteKey(rank uint64, filter []byte, identity []byte) []byte {
key := []byte{CLOCK_FRAME, CLOCK_TIMEOUT_VOTE}
key = binary.BigEndian.AppendUint64(key, rank)
key = append(key, filter...)
key = append(key, identity...)
return key
}
func (p *PebbleClockStore) NewTransaction(indexed bool) (
store.Transaction,
error,
@ -1661,6 +1677,11 @@ func (p *PebbleClockStore) GetCertifiedGlobalState(rank uint64) (
return nil, errors.Wrap(err, "get certified global state")
}
vote, err := p.GetProposalVote(nil, frame.GetRank(), frame.Header.Prover)
if err != nil && !errors.Is(err, store.ErrNotFound) {
return nil, errors.Wrap(err, "get certified app shard state")
}
qc, err := p.GetQuorumCertificate(nil, qcRank)
if err != nil && !errors.Is(err, store.ErrNotFound) {
return nil, errors.Wrap(err, "get certified global state")
@ -1675,6 +1696,7 @@ func (p *PebbleClockStore) GetCertifiedGlobalState(rank uint64) (
State: frame,
ParentQuorumCertificate: qc,
PriorRankTimeoutCertificate: tc,
Vote: vote,
}, nil
}
@ -1720,6 +1742,9 @@ func (p *PebbleClockStore) PutCertifiedGlobalState(
if err := p.PutGlobalClockFrame(state.State, txn); err != nil {
return errors.Wrap(err, "put certified global state")
}
if err := p.PutProposalVote(txn, state.Vote); err != nil {
return errors.Wrap(err, "put certified global state")
}
}
if state.ParentQuorumCertificate != nil {
if state.ParentQuorumCertificate.Rank > rank {
@ -1863,6 +1888,11 @@ func (p *PebbleClockStore) GetCertifiedAppShardState(
return nil, errors.Wrap(err, "get certified app shard state")
}
vote, err := p.GetProposalVote(filter, frame.GetRank(), frame.Header.Prover)
if err != nil && !errors.Is(err, store.ErrNotFound) {
return nil, errors.Wrap(err, "get certified app shard state")
}
qc, err := p.GetQuorumCertificate(filter, qcRank)
if err != nil && !errors.Is(err, store.ErrNotFound) {
return nil, errors.Wrap(err, "get certified app shard state")
@ -1877,6 +1907,7 @@ func (p *PebbleClockStore) GetCertifiedAppShardState(
State: frame,
ParentQuorumCertificate: qc,
PriorRankTimeoutCertificate: tc,
Vote: vote,
}, nil
}
@ -1936,6 +1967,9 @@ func (p *PebbleClockStore) PutCertifiedAppShardState(
); err != nil {
return errors.Wrap(err, "put certified app shard state")
}
if err := p.PutProposalVote(txn, state.Vote); err != nil {
return errors.Wrap(err, "put certified app shard state")
}
filter = state.State.Header.Address
}
if state.ParentQuorumCertificate != nil {
@ -2276,3 +2310,183 @@ func (p *PebbleClockStore) PutTimeoutCertificate(
return nil
}
func (p *PebbleClockStore) PutProposalVote(
txn store.Transaction,
vote *protobufs.ProposalVote,
) error {
if vote == nil {
return errors.Wrap(
errors.New("proposal vote is required"),
"put proposal vote",
)
}
rank := vote.Rank
filter := vote.Filter
identity := vote.Identity()
data, err := vote.ToCanonicalBytes()
if err != nil {
return errors.Wrap(
errors.Wrap(err, store.ErrInvalidData.Error()),
"put proposal vote",
)
}
key := clockProposalVoteKey(rank, filter, []byte(identity))
err = txn.Set(key, data)
return errors.Wrap(err, "put proposal vote")
}
func (p *PebbleClockStore) GetProposalVote(
filter []byte,
rank uint64,
identity []byte,
) (
*protobufs.ProposalVote,
error,
) {
key := clockProposalVoteKey(rank, filter, []byte(identity))
value, closer, err := p.db.Get(key)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, store.ErrNotFound
}
return nil, errors.Wrap(err, "get proposal vote")
}
defer closer.Close()
vote := &protobufs.ProposalVote{}
if err := vote.FromCanonicalBytes(slices.Clone(value)); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, store.ErrInvalidData.Error()),
"get proposal vote",
)
}
return vote, nil
}
func (p *PebbleClockStore) GetProposalVotes(filter []byte, rank uint64) (
[]*protobufs.ProposalVote,
error,
) {
results := []*protobufs.ProposalVote{}
startKey := clockProposalVoteKey(rank, filter, nil)
endKey := clockProposalVoteKey(rank+1, filter, nil)
iterator, err := p.db.NewIter(startKey, endKey)
if err != nil {
return nil, errors.Wrap(err, "get proposal votes")
}
defer iterator.Close()
for iterator.First(); iterator.Valid(); iterator.Next() {
key := iterator.Key()
if len(key) != len(startKey)+32 {
continue
}
value := iterator.Value()
vote := &protobufs.ProposalVote{}
if err := vote.FromCanonicalBytes(slices.Clone(value)); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, store.ErrInvalidData.Error()),
"get proposal votes",
)
}
results = append(results, vote)
}
return results, nil
}
func (p *PebbleClockStore) PutTimeoutVote(
txn store.Transaction,
vote *protobufs.TimeoutState,
) error {
if vote == nil {
return errors.Wrap(
errors.New("timeout vote is required"),
"put timeout vote",
)
}
rank := vote.Vote.Rank
filter := vote.Vote.Filter
identity := vote.Vote.Identity()
data, err := vote.ToCanonicalBytes()
if err != nil {
return errors.Wrap(
errors.Wrap(err, store.ErrInvalidData.Error()),
"put timeout vote",
)
}
key := clockTimeoutVoteKey(rank, filter, []byte(identity))
err = txn.Set(key, data)
return errors.Wrap(err, "put timeout vote")
}
func (p *PebbleClockStore) GetTimeoutVote(
filter []byte,
rank uint64,
identity []byte,
) (
*protobufs.TimeoutState,
error,
) {
key := clockTimeoutVoteKey(rank, filter, []byte(identity))
value, closer, err := p.db.Get(key)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, store.ErrNotFound
}
return nil, errors.Wrap(err, "get proposal vote")
}
defer closer.Close()
vote := &protobufs.TimeoutState{}
if err := vote.FromCanonicalBytes(slices.Clone(value)); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, store.ErrInvalidData.Error()),
"get proposal vote",
)
}
return vote, nil
}
func (p *PebbleClockStore) GetTimeoutVotes(filter []byte, rank uint64) (
[]*protobufs.TimeoutState,
error,
) {
results := []*protobufs.TimeoutState{}
startKey := clockTimeoutVoteKey(rank, filter, nil)
endKey := clockTimeoutVoteKey(rank+1, filter, nil)
iterator, err := p.db.NewIter(startKey, endKey)
if err != nil {
return nil, errors.Wrap(err, "get timeout votes")
}
defer iterator.Close()
for iterator.First(); iterator.Valid(); iterator.Next() {
key := iterator.Key()
if len(key) != len(startKey)+32 {
continue
}
value := iterator.Value()
vote := &protobufs.TimeoutState{}
if err := vote.FromCanonicalBytes(slices.Clone(value)); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, store.ErrInvalidData.Error()),
"get timeout votes",
)
}
results = append(results, vote)
}
return results, nil
}

View File

@ -34,6 +34,8 @@ const (
CLOCK_SHARD_CERTIFIED_STATE = 0x0A
CLOCK_QUORUM_CERTIFICATE = 0x0B
CLOCK_TIMEOUT_CERTIFICATE = 0x0C
CLOCK_PROPOSAL_VOTE = 0x0D
CLOCK_TIMEOUT_VOTE = 0x0E
CLOCK_GLOBAL_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_GLOBAL_FRAME
CLOCK_GLOBAL_FRAME_INDEX_LATEST = 0x20 | CLOCK_GLOBAL_FRAME

View File

@ -23,6 +23,7 @@ type PebbleDB struct {
// the end.
var pebbleMigrations = []func(*pebble.Batch) error{
migration_2_1_0_4,
migration_2_1_0_5,
}
func NewPebbleDB(
@ -437,3 +438,8 @@ func migration_2_1_0_4(b *pebble.Batch) error {
return nil
}
func migration_2_1_0_5(b *pebble.Batch) error {
// We just re-run it again
return migration_2_1_0_4(b)
}

View File

@ -142,14 +142,7 @@ func (g *GlobalFrame) Identity() models.Identity {
// Source implements models.Unique.
func (g *GlobalFrame) Source() models.Identity {
id, err := poseidon.HashBytes(
g.Header.PublicKeySignatureBls48581.PublicKey.KeyValue,
)
if err != nil {
return ""
}
return models.Identity(id.FillBytes(make([]byte, 32)))
return models.Identity(g.Header.Prover)
}
func (a *AppShardFrame) Clone() models.Unique {
@ -2080,6 +2073,30 @@ func (g *GlobalFrameHeader) ToCanonicalBytes() ([]byte, error) {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write requests_root
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(g.RequestsRoot)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(g.RequestsRoot); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write prover
if err := binary.Write(
buf,
binary.BigEndian,
uint32(len(g.Prover)),
); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
if _, err := buf.Write(g.Prover); err != nil {
return nil, errors.Wrap(err, "to canonical bytes")
}
// Write public_key_signature_bls48581
if g.PublicKeySignatureBls48581 != nil {
sigBytes, err := g.PublicKeySignatureBls48581.ToCanonicalBytes()
@ -2191,6 +2208,34 @@ func (g *GlobalFrameHeader) FromCanonicalBytes(data []byte) error {
return errors.Wrap(err, "from canonical bytes")
}
// Read requests_root
var requestsRootLen uint32
if err := binary.Read(
buf,
binary.BigEndian,
&requestsRootLen,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
g.RequestsRoot = make([]byte, requestsRootLen)
if _, err := buf.Read(g.RequestsRoot); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read prover
var proverLen uint32
if err := binary.Read(
buf,
binary.BigEndian,
&proverLen,
); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
g.Prover = make([]byte, proverLen)
if _, err := buf.Read(g.Prover); err != nil {
return errors.Wrap(err, "from canonical bytes")
}
// Read public_key_signature_bls48581
var sigLen uint32
if err := binary.Read(buf, binary.BigEndian, &sigLen); err != nil {
@ -4642,6 +4687,22 @@ func (h *GlobalFrameHeader) Validate() error {
)
}
// Requests root commitment should be 64 or 74 bytes
if len(h.RequestsRoot) != 64 && len(h.RequestsRoot) != 74 {
return errors.Wrap(
errors.New("invalid request root commitment length"),
"validate",
)
}
// Prover must be set
if len(h.Prover) != 32 {
return errors.Wrap(
errors.New("invalid prover length"),
"validate",
)
}
// Signature must be present
if h.PublicKeySignatureBls48581 == nil {
return errors.Wrap(errors.New("missing signature"), "validate")

File diff suppressed because it is too large Load Diff

View File

@ -149,8 +149,12 @@ message GlobalFrameHeader {
repeated bytes global_commitments = 7;
// The prover tree root commitment
bytes prover_tree_commitment = 8;
// The request root commitment
bytes requests_root = 9;
// The prover of the frame
bytes prover = 10;
// The confirmation signatures of the frame
quilibrium.node.keys.pb.BLS48581AggregateSignature public_key_signature_bls48581 = 9;
quilibrium.node.keys.pb.BLS48581AggregateSignature public_key_signature_bls48581 = 11;
}
message FrameHeader {

View File

@ -38,6 +38,7 @@ type FrameProver interface {
previousFrame *protobufs.GlobalFrameHeader,
commitments [][]byte,
proverRoot []byte,
requestRoot []byte,
provingKey Signer,
timestamp int64,
difficulty uint32,

View File

@ -16,6 +16,82 @@ type MockClockStore struct {
mock.Mock
}
// GetProposalVote implements store.ClockStore.
func (m *MockClockStore) GetProposalVote(
filter []byte,
rank uint64,
identity []byte,
) (*protobufs.ProposalVote, error) {
args := m.Called(
filter,
rank,
identity,
)
return args.Get(0).(*protobufs.ProposalVote), args.Error(1)
}
// GetProposalVotes implements store.ClockStore.
func (m *MockClockStore) GetProposalVotes(
filter []byte,
rank uint64,
) ([]*protobufs.ProposalVote, error) {
args := m.Called(
filter,
rank,
)
return args.Get(0).([]*protobufs.ProposalVote), args.Error(1)
}
// GetTimeoutVote implements store.ClockStore.
func (m *MockClockStore) GetTimeoutVote(
filter []byte,
rank uint64,
identity []byte,
) (*protobufs.TimeoutState, error) {
args := m.Called(
filter,
rank,
identity,
)
return args.Get(0).(*protobufs.TimeoutState), args.Error(1)
}
// GetTimeoutVotes implements store.ClockStore.
func (m *MockClockStore) GetTimeoutVotes(
filter []byte,
rank uint64,
) ([]*protobufs.TimeoutState, error) {
args := m.Called(
filter,
rank,
)
return args.Get(0).([]*protobufs.TimeoutState), args.Error(1)
}
// PutProposalVote implements store.ClockStore.
func (m *MockClockStore) PutProposalVote(
txn store.Transaction,
vote *protobufs.ProposalVote,
) error {
args := m.Called(
txn,
vote,
)
return args.Error(0)
}
// PutTimeoutVote implements store.ClockStore.
func (m *MockClockStore) PutTimeoutVote(
txn store.Transaction,
vote *protobufs.TimeoutState,
) error {
args := m.Called(
txn,
vote,
)
return args.Error(0)
}
// GetCertifiedAppShardState implements store.ClockStore.
func (m *MockClockStore) GetCertifiedAppShardState(
filter []byte,

View File

@ -118,6 +118,7 @@ func (m *MockFrameProver) ProveGlobalFrameHeader(
previousFrame *protobufs.GlobalFrameHeader,
commitments [][]byte,
proverRoot []byte,
requestsRoot []byte,
provingKey crypto.Signer,
timestamp int64,
difficulty uint32,
@ -127,6 +128,7 @@ func (m *MockFrameProver) ProveGlobalFrameHeader(
previousFrame,
commitments,
proverRoot,
requestsRoot,
provingKey,
timestamp,
difficulty,

View File

@ -171,4 +171,22 @@ type ClockStore interface {
filter []byte,
tree *tries.VectorCommitmentTree,
) error
PutProposalVote(txn Transaction, vote *protobufs.ProposalVote) error
GetProposalVote(filter []byte, rank uint64, identity []byte) (
*protobufs.ProposalVote,
error,
)
GetProposalVotes(filter []byte, rank uint64) (
[]*protobufs.ProposalVote,
error,
)
PutTimeoutVote(txn Transaction, vote *protobufs.TimeoutState) error
GetTimeoutVote(filter []byte, rank uint64, identity []byte) (
*protobufs.TimeoutState,
error,
)
GetTimeoutVotes(filter []byte, rank uint64) (
[]*protobufs.TimeoutState,
error,
)
}

View File

@ -374,6 +374,7 @@ func (w *WesolowskiFrameProver) ProveGlobalFrameHeader(
previousFrame *protobufs.GlobalFrameHeader,
commitments [][]byte,
proverRoot []byte,
requestRoot []byte,
provingKey qcrypto.Signer,
timestamp int64,
difficulty uint32,
@ -410,6 +411,7 @@ func (w *WesolowskiFrameProver) ProveGlobalFrameHeader(
}
input = append(input, proverRoot...)
input = append(input, requestRoot...)
b := sha3.Sum256(input)
o := WesolowskiSolve(b, difficulty)
@ -433,6 +435,7 @@ func (w *WesolowskiFrameProver) ProveGlobalFrameHeader(
ParentSelector: parent.FillBytes(make([]byte, 32)),
GlobalCommitments: commitments,
ProverTreeCommitment: proverRoot,
RequestsRoot: requestRoot,
}
switch pubkeyType {
@ -489,6 +492,7 @@ func (w *WesolowskiFrameProver) GetGlobalFrameSignaturePayload(
}
input = append(input, frame.ProverTreeCommitment...)
input = append(input, frame.RequestsRoot...)
b := sha3.Sum256(input)
proof := [516]byte{}