mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
207 lines
5.8 KiB
Go
207 lines
5.8 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"time"
|
|
|
|
"github.com/iden3/go-iden3-crypto/poseidon"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus"
|
|
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
)
|
|
|
|
// AppLeaderProvider implements LeaderProvider
|
|
type AppLeaderProvider struct {
|
|
engine *AppConsensusEngine
|
|
}
|
|
|
|
func (p *AppLeaderProvider) GetNextLeaders(
|
|
ctx context.Context,
|
|
prior **protobufs.AppShardFrame,
|
|
) ([]PeerID, error) {
|
|
// Get the parent selector for next prover calculation
|
|
var parentSelector []byte
|
|
if prior != nil && (*prior).Header != nil &&
|
|
len((*prior).Header.Output) >= 32 {
|
|
parentSelectorBI, _ := poseidon.HashBytes((*prior).Header.Output)
|
|
parentSelector = parentSelectorBI.FillBytes(make([]byte, 32))
|
|
} else {
|
|
parentSelector = make([]byte, 32)
|
|
}
|
|
|
|
// Get ordered provers from registry
|
|
provers, err := p.engine.proverRegistry.GetOrderedProvers(
|
|
[32]byte(parentSelector),
|
|
p.engine.appAddress,
|
|
)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get ordered provers")
|
|
}
|
|
|
|
// Convert to PeerIDs
|
|
leaders := make([]PeerID, len(provers))
|
|
for i, prover := range provers {
|
|
leaders[i] = PeerID{ID: prover}
|
|
}
|
|
|
|
if len(leaders) > 0 {
|
|
p.engine.logger.Debug(
|
|
"determined next leaders",
|
|
zap.Int("count", len(leaders)),
|
|
zap.String("first", hex.EncodeToString(leaders[0].ID)),
|
|
)
|
|
}
|
|
|
|
return leaders, nil
|
|
}
|
|
|
|
func (p *AppLeaderProvider) ProveNextState(
|
|
ctx context.Context,
|
|
rank uint64,
|
|
filter []byte,
|
|
priorState models.Identity,
|
|
) (**protobufs.AppShardFrame, error) {
|
|
prior, _, err := p.engine.clockStore.GetLatestShardClockFrame(
|
|
p.engine.appAddress,
|
|
)
|
|
if err != nil {
|
|
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
|
|
return nil, models.NewNoVoteErrorf("could not collect: %+w", err)
|
|
}
|
|
|
|
if prior == nil {
|
|
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
|
|
return nil, models.NewNoVoteErrorf("missing prior frame")
|
|
}
|
|
|
|
latestQC, qcErr := p.engine.clockStore.GetLatestQuorumCertificate(
|
|
p.engine.appAddress,
|
|
)
|
|
if qcErr != nil {
|
|
p.engine.logger.Debug(
|
|
"could not fetch latest quorum certificate",
|
|
zap.Error(qcErr),
|
|
)
|
|
}
|
|
|
|
if prior.Identity() != priorState {
|
|
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
|
|
|
|
if latestQC != nil && latestQC.Identity() == priorState {
|
|
switch {
|
|
case prior.Header.Rank < latestQC.GetRank():
|
|
// We should never be in this scenario because the consensus
|
|
// implementation's safety rules should forbid it, it'll demand sync
|
|
// happen out of band. Nevertheless, we note it so we can find it in
|
|
// logs if it _did_ happen.
|
|
return nil, models.NewNoVoteErrorf(
|
|
"needs sync: prior rank %d behind latest qc rank %d",
|
|
prior.Header.Rank,
|
|
latestQC.GetRank(),
|
|
)
|
|
case prior.Header.FrameNumber == latestQC.GetFrameNumber() &&
|
|
latestQC.Identity() != prior.Identity():
|
|
peerID, peerErr := p.engine.getRandomProverPeerId()
|
|
if peerErr != nil {
|
|
p.engine.logger.Warn(
|
|
"could not determine peer for fork sync",
|
|
zap.Error(peerErr),
|
|
)
|
|
} else {
|
|
p.engine.logger.Warn(
|
|
"detected fork, scheduling sync",
|
|
zap.Uint64("frame_number", latestQC.GetFrameNumber()),
|
|
zap.String("peer_id", peerID.String()),
|
|
)
|
|
p.engine.syncProvider.AddState(
|
|
[]byte(peerID),
|
|
latestQC.GetFrameNumber(),
|
|
[]byte(latestQC.Identity()),
|
|
)
|
|
}
|
|
|
|
return nil, models.NewNoVoteErrorf(
|
|
"fork detected at rank %d (local: %x, qc: %x)",
|
|
latestQC.GetRank(),
|
|
prior.Identity(),
|
|
latestQC.Identity(),
|
|
)
|
|
}
|
|
}
|
|
|
|
return nil, models.NewNoVoteErrorf(
|
|
"building on fork or needs sync: frame %d, rank %d, parent_id: %x, asked: rank %d, id: %x",
|
|
prior.Header.FrameNumber,
|
|
prior.Header.Rank,
|
|
prior.Header.ParentSelector,
|
|
rank,
|
|
priorState,
|
|
)
|
|
}
|
|
|
|
timer := prometheus.NewTimer(frameProvingDuration.WithLabelValues(
|
|
p.engine.appAddressHex,
|
|
))
|
|
defer timer.ObserveDuration()
|
|
|
|
// Get collected messages to include in frame
|
|
p.engine.provingMessagesMu.Lock()
|
|
messages := make([]*protobufs.Message, len(p.engine.provingMessages))
|
|
copy(messages, p.engine.provingMessages)
|
|
p.engine.provingMessages = []*protobufs.Message{}
|
|
p.engine.provingMessagesMu.Unlock()
|
|
|
|
if len(messages) == 0 {
|
|
p.engine.collectedMessagesMu.Lock()
|
|
if len(p.engine.collectedMessages) > 0 {
|
|
messages = make([]*protobufs.Message, len(p.engine.collectedMessages))
|
|
copy(messages, p.engine.collectedMessages)
|
|
p.engine.collectedMessages = []*protobufs.Message{}
|
|
}
|
|
p.engine.collectedMessagesMu.Unlock()
|
|
}
|
|
|
|
// Update pending messages metric
|
|
pendingMessagesCount.WithLabelValues(p.engine.appAddressHex).Set(0)
|
|
|
|
p.engine.logger.Info(
|
|
"proving next state",
|
|
zap.Int("message_count", len(messages)),
|
|
zap.Uint64("frame_number", (*prior).Header.FrameNumber+1),
|
|
)
|
|
|
|
// Prove the frame
|
|
newFrame, err := p.engine.internalProveFrame(rank, messages, prior)
|
|
if err != nil {
|
|
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "error").Inc()
|
|
return nil, errors.Wrap(err, "prove frame")
|
|
}
|
|
|
|
p.engine.frameStoreMu.Lock()
|
|
p.engine.frameStore[string(
|
|
p.engine.calculateFrameSelector(newFrame.Header),
|
|
)] = newFrame.Clone().(*protobufs.AppShardFrame)
|
|
p.engine.frameStoreMu.Unlock()
|
|
|
|
// Update metrics
|
|
frameProvingTotal.WithLabelValues(p.engine.appAddressHex, "success").Inc()
|
|
p.engine.lastProvenFrameTimeMu.Lock()
|
|
p.engine.lastProvenFrameTime = time.Now()
|
|
p.engine.lastProvenFrameTimeMu.Unlock()
|
|
currentFrameNumber.WithLabelValues(p.engine.appAddressHex).Set(
|
|
float64(newFrame.Header.FrameNumber),
|
|
)
|
|
|
|
return &newFrame, nil
|
|
}
|
|
|
|
var _ consensus.LeaderProvider[
|
|
*protobufs.AppShardFrame,
|
|
PeerID,
|
|
CollectedCommitments,
|
|
] = (*AppLeaderProvider)(nil)
|