ceremonyclient/node/consensus/app/consensus_leader_provider.go
2025-11-26 03:22:48 -06:00

207 lines
5.8 KiB
Go

package app
import (
"context"
"encoding/hex"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/consensus"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
)
// AppLeaderProvider implements LeaderProvider
type AppLeaderProvider struct {
engine *AppConsensusEngine
}
func (p *AppLeaderProvider) GetNextLeaders(
ctx context.Context,
prior **protobufs.AppShardFrame,
) ([]PeerID, error) {
// Get the parent selector for next prover calculation
var parentSelector []byte
if prior != nil && (*prior).Header != nil &&
len((*prior).Header.Output) >= 32 {
parentSelectorBI, _ := poseidon.HashBytes((*prior).Header.Output)
parentSelector = parentSelectorBI.FillBytes(make([]byte, 32))
} else {
parentSelector = make([]byte, 32)
}
// Get ordered provers from registry
provers, err := p.engine.proverRegistry.GetOrderedProvers(
[32]byte(parentSelector),
p.engine.appAddress,
)
if err != nil {
return nil, errors.Wrap(err, "get ordered provers")
}
// Convert to PeerIDs
leaders := make([]PeerID, len(provers))
for i, prover := range provers {
leaders[i] = PeerID{ID: prover}
}
if len(leaders) > 0 {
p.engine.logger.Debug(
"determined next leaders",
zap.Int("count", len(leaders)),
zap.String("first", hex.EncodeToString(leaders[0].ID)),
)
}
return leaders, nil
}
func (p *AppLeaderProvider) ProveNextState(
ctx context.Context,
rank uint64,
filter []byte,
priorState models.Identity,
) (**protobufs.AppShardFrame, error) {
prior, _, err := p.engine.clockStore.GetLatestShardClockFrame(
p.engine.appAddress,
)
if err != nil {
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
return nil, models.NewNoVoteErrorf("could not collect: %+w", err)
}
if prior == nil {
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
return nil, models.NewNoVoteErrorf("missing prior frame")
}
latestQC, qcErr := p.engine.clockStore.GetLatestQuorumCertificate(
p.engine.appAddress,
)
if qcErr != nil {
p.engine.logger.Debug(
"could not fetch latest quorum certificate",
zap.Error(qcErr),
)
}
if prior.Identity() != priorState {
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
if latestQC != nil && latestQC.Identity() == priorState {
switch {
case prior.Header.Rank < latestQC.GetRank():
// We should never be in this scenario because the consensus
// implementation's safety rules should forbid it, it'll demand sync
// happen out of band. Nevertheless, we note it so we can find it in
// logs if it _did_ happen.
return nil, models.NewNoVoteErrorf(
"needs sync: prior rank %d behind latest qc rank %d",
prior.Header.Rank,
latestQC.GetRank(),
)
case prior.Header.FrameNumber == latestQC.GetFrameNumber() &&
latestQC.Identity() != prior.Identity():
peerID, peerErr := p.engine.getRandomProverPeerId()
if peerErr != nil {
p.engine.logger.Warn(
"could not determine peer for fork sync",
zap.Error(peerErr),
)
} else {
p.engine.logger.Warn(
"detected fork, scheduling sync",
zap.Uint64("frame_number", latestQC.GetFrameNumber()),
zap.String("peer_id", peerID.String()),
)
p.engine.syncProvider.AddState(
[]byte(peerID),
latestQC.GetFrameNumber(),
[]byte(latestQC.Identity()),
)
}
return nil, models.NewNoVoteErrorf(
"fork detected at rank %d (local: %x, qc: %x)",
latestQC.GetRank(),
prior.Identity(),
latestQC.Identity(),
)
}
}
return nil, models.NewNoVoteErrorf(
"building on fork or needs sync: frame %d, rank %d, parent_id: %x, asked: rank %d, id: %x",
prior.Header.FrameNumber,
prior.Header.Rank,
prior.Header.ParentSelector,
rank,
priorState,
)
}
timer := prometheus.NewTimer(frameProvingDuration.WithLabelValues(
p.engine.appAddressHex,
))
defer timer.ObserveDuration()
// Get collected messages to include in frame
p.engine.provingMessagesMu.Lock()
messages := make([]*protobufs.Message, len(p.engine.provingMessages))
copy(messages, p.engine.provingMessages)
p.engine.provingMessages = []*protobufs.Message{}
p.engine.provingMessagesMu.Unlock()
if len(messages) == 0 {
p.engine.collectedMessagesMu.Lock()
if len(p.engine.collectedMessages) > 0 {
messages = make([]*protobufs.Message, len(p.engine.collectedMessages))
copy(messages, p.engine.collectedMessages)
p.engine.collectedMessages = []*protobufs.Message{}
}
p.engine.collectedMessagesMu.Unlock()
}
// Update pending messages metric
pendingMessagesCount.WithLabelValues(p.engine.appAddressHex).Set(0)
p.engine.logger.Info(
"proving next state",
zap.Int("message_count", len(messages)),
zap.Uint64("frame_number", (*prior).Header.FrameNumber+1),
)
// Prove the frame
newFrame, err := p.engine.internalProveFrame(rank, messages, prior)
if err != nil {
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
return nil, errors.Wrap(err, "prove frame")
}
p.engine.frameStoreMu.Lock()
p.engine.frameStore[string(
p.engine.calculateFrameSelector(newFrame.Header),
)] = newFrame.Clone().(*protobufs.AppShardFrame)
p.engine.frameStoreMu.Unlock()
// Update metrics
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "success").Inc()
p.engine.lastProvenFrameTimeMu.Lock()
p.engine.lastProvenFrameTime = time.Now()
p.engine.lastProvenFrameTimeMu.Unlock()
currentFrameNumber.WithLabelValues(p.engine.appAddressHex).Set(
float64(newFrame.Header.FrameNumber),
)
return &newFrame, nil
}
var _ consensus.LeaderProvider[
*protobufs.AppShardFrame,
PeerID,
CollectedCommitments,
] = (*AppLeaderProvider)(nil)