ceremonyclient/node/consensus/app/coverage_events.go
2025-12-15 16:45:31 -06:00

252 lines
6.0 KiB
Go

package app
import (
"encoding/hex"
"fmt"
"math/big"
"github.com/pkg/errors"
"go.uber.org/zap"
typesconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus"
)
type coverageStreak struct {
StartFrame uint64
LastFrame uint64
Count uint64
}
type shardCoverage struct {
ProverCount int
AttestedStorage uint64
TreeMetadata []typesconsensus.TreeMetadata
}
func (e *AppConsensusEngine) ensureCoverageThresholds() {
e.coverageOnce.Do(func() {
e.coverageMinProvers = e.minimumProvers()
if e.config.P2P.Network == 0 {
e.coverageHaltThreshold = 3
} else {
if e.coverageMinProvers > 1 {
e.coverageHaltThreshold = 1
} else {
e.coverageHaltThreshold = 0
}
}
e.coverageHaltGrace = 360
})
}
func (e *AppConsensusEngine) checkShardCoverage(frameNumber uint64) error {
e.ensureCoverageThresholds()
coverage, ok := e.getShardCoverage()
if !ok {
e.clearCoverageStreak(string(e.appAddress))
return nil
}
key := string(e.appAddress)
size := big.NewInt(0)
for _, metadata := range coverage.TreeMetadata {
size = size.Add(size, new(big.Int).SetUint64(metadata.TotalSize))
}
if uint64(coverage.ProverCount) <= e.coverageHaltThreshold &&
size.Cmp(big.NewInt(0)) > 0 {
streak, err := e.bumpCoverageStreak(key, frameNumber)
if err != nil {
return errors.Wrap(err, "check shard coverage")
}
var remaining int64 = int64(e.coverageHaltGrace) - int64(streak.Count)
if remaining < 0 {
remaining = 0
}
if e.config.P2P.Network == 0 && remaining == 0 {
e.emitCoverageEvent(
typesconsensus.ControlEventCoverageHalt,
&typesconsensus.CoverageEventData{
ShardAddress: e.appAddress,
ProverCount: coverage.ProverCount,
RequiredProvers: int(e.coverageMinProvers),
AttestedStorage: coverage.AttestedStorage,
TreeMetadata: coverage.TreeMetadata,
Message: fmt.Sprintf(
"Shard %s has only %d provers, halting operations",
e.appAddressHex,
coverage.ProverCount,
),
},
)
return nil
}
e.emitCoverageEvent(
typesconsensus.ControlEventCoverageWarn,
&typesconsensus.CoverageEventData{
ShardAddress: e.appAddress,
ProverCount: coverage.ProverCount,
RequiredProvers: int(e.coverageMinProvers),
AttestedStorage: coverage.AttestedStorage,
TreeMetadata: coverage.TreeMetadata,
Message: fmt.Sprintf(
"Critical coverage (<= %d provers). Grace period: %d/%d frames toward halt.",
e.coverageHaltThreshold,
streak.Count,
e.coverageHaltGrace,
),
},
)
return nil
}
e.clearCoverageStreak(key)
if uint64(coverage.ProverCount) < e.coverageMinProvers {
e.emitCoverageEvent(
typesconsensus.ControlEventCoverageWarn,
&typesconsensus.CoverageEventData{
ShardAddress: e.appAddress,
ProverCount: coverage.ProverCount,
RequiredProvers: int(e.coverageMinProvers),
AttestedStorage: coverage.AttestedStorage,
TreeMetadata: coverage.TreeMetadata,
Message: fmt.Sprintf(
"Shard %s below minimum coverage: %d/%d provers.",
e.appAddressHex,
coverage.ProverCount,
e.coverageMinProvers,
),
},
)
}
return nil
}
func (e *AppConsensusEngine) getShardCoverage() (*shardCoverage, bool) {
proverCount, err := e.proverRegistry.GetProverCount(e.appAddress)
if err != nil {
e.logger.Warn(
"failed to get prover count for shard",
zap.String("shard_address", e.appAddressHex),
zap.Error(err),
)
return nil, false
}
if proverCount == 0 {
return nil, false
}
activeProvers, err := e.proverRegistry.GetActiveProvers(e.appAddress)
if err != nil {
e.logger.Warn(
"failed to get active provers for shard",
zap.String("shard_address", e.appAddressHex),
zap.Error(err),
)
return nil, false
}
attestedStorage := uint64(0)
for _, prover := range activeProvers {
attestedStorage += prover.AvailableStorage
}
var treeMetadata []typesconsensus.TreeMetadata
metadata, err := e.hypergraph.GetMetadataAtKey(e.appAddress)
if err != nil {
e.logger.Error("could not obtain metadata for shard", zap.Error(err))
return nil, false
}
for _, entry := range metadata {
treeMetadata = append(
treeMetadata,
typesconsensus.TreeMetadata{
CommitmentRoot: entry.Commitment,
TotalSize: entry.Size,
TotalLeaves: entry.LeafCount,
},
)
}
return &shardCoverage{
ProverCount: proverCount,
AttestedStorage: attestedStorage,
TreeMetadata: treeMetadata,
}, true
}
func (e *AppConsensusEngine) ensureCoverageStreakMap(frameNumber uint64) error {
if e.lowCoverageStreak != nil {
return nil
}
e.lowCoverageStreak = make(map[string]*coverageStreak)
coverage, ok := e.getShardCoverage()
if !ok {
return nil
}
if uint64(coverage.ProverCount) <= e.coverageHaltThreshold {
e.lowCoverageStreak[string(e.appAddress)] = &coverageStreak{
StartFrame: frameNumber,
LastFrame: frameNumber,
Count: 1,
}
}
return nil
}
func (e *AppConsensusEngine) bumpCoverageStreak(
key string,
frame uint64,
) (*coverageStreak, error) {
if err := e.ensureCoverageStreakMap(frame); err != nil {
return nil, errors.Wrap(err, "bump coverage streak")
}
streak := e.lowCoverageStreak[key]
if streak == nil {
streak = &coverageStreak{
StartFrame: frame,
LastFrame: frame,
Count: 1,
}
e.lowCoverageStreak[key] = streak
return streak, nil
}
if frame > streak.LastFrame {
streak.Count += (frame - streak.LastFrame)
streak.LastFrame = frame
}
return streak, nil
}
func (e *AppConsensusEngine) clearCoverageStreak(key string) {
if e.lowCoverageStreak != nil {
delete(e.lowCoverageStreak, key)
}
}
func (e *AppConsensusEngine) emitCoverageEvent(
eventType typesconsensus.ControlEventType,
data *typesconsensus.CoverageEventData,
) {
event := typesconsensus.ControlEvent{
Type: eventType,
Data: data,
}
go e.eventDistributor.Publish(event)
e.logger.Info(
"emitted coverage event",
zap.String("type", fmt.Sprintf("%d", eventType)),
zap.String("shard_address", hex.EncodeToString(data.ShardAddress)),
zap.Int("prover_count", data.ProverCount),
zap.String("message", data.Message),
)
}