From 3eb7477ff8b925cc8f9532395ccebc4d56b459d2 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 9 Oct 2025 22:44:42 -0500 Subject: [PATCH] fix: support test/devnet defaults for coverage scenarios --- node/consensus/global/coverage_events.go | 153 ++++++++++++++++++----- 1 file changed, 120 insertions(+), 33 deletions(-) diff --git a/node/consensus/global/coverage_events.go b/node/consensus/global/coverage_events.go index 4b47c68..5c3ad70 100644 --- a/node/consensus/global/coverage_events.go +++ b/node/consensus/global/coverage_events.go @@ -6,23 +6,57 @@ import ( "fmt" "math/big" + "github.com/pkg/errors" "go.uber.org/zap" typesconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus" ) +// Define coverage thresholds +var ( + minProvers = uint64(0) + maxProvers = uint64(0) + haltThreshold = uint64(0) + haltGraceFrames = uint64(0) +) + +func (e *GlobalConsensusEngine) ensureCoverageThresholds() { + if minProvers != 0 { + return + } + + // Network halt if <= 3 provers for mainnet: + haltThreshold = 3 + if e.config.P2P.Network != 0 { + haltThreshold = 0 + if e.minimumProvers() > 1 { + haltThreshold = 1 + } + } + + // Minimum provers for safe operation + minProvers = e.minimumProvers() + + // Maximum provers before split consideration + maxProvers = 32 + + // Require sustained critical state for 360 frames + haltGraceFrames = 360 +} + // checkShardCoverage verifies coverage levels for all active shards func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { - // Define coverage thresholds - const ( - minProvers = 5 // Minimum provers for safe operation - maxProvers = 32 // Maximum provers before split consideration - haltThreshold = 3 // Network halt if <= 3 provers - haltGraceFrames = 360 // Require sustained critical state for 360 frames - ) + e.ensureCoverageThresholds() // Get shard coverage information from prover registry shardCoverageMap := e.getShardCoverageMap() + // Set up the streak map so we can quickly establish halt conditions on + // restarts + err := e.ensureStreakMap(frameNumber) + if err != nil { + return errors.Wrap(err, "check shard coverage") + } + // Update state summaries metric stateSummariesAggregated.Set(float64(len(shardCoverageMap))) @@ -39,7 +73,7 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { continue } - proverCount := coverage.ProverCount + proverCount := uint64(coverage.ProverCount) attestedStorage := coverage.AttestedStorage size := big.NewInt(0) @@ -50,7 +84,7 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { e.logger.Debug( "checking shard coverage", zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))), - zap.Int("prover_count", proverCount), + zap.Uint64("prover_count", proverCount), zap.Uint64("attested_storage", attestedStorage), zap.Uint64("shard_size", size.Uint64()), ) @@ -62,21 +96,25 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { e.logger.Warn( "Shard has insufficient coverage but is blacklisted - skipping halt", zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))), - zap.Int("prover_count", proverCount), - zap.Int("halt_threshold", haltThreshold), + zap.Uint64("prover_count", proverCount), + zap.Uint64("halt_threshold", haltThreshold), ) continue } // Bump the streak – only increments once per frame - streak := e.bumpStreak(shardAddress, frameNumber) + streak, err := e.bumpStreak(shardAddress, frameNumber) + if err != nil { + return errors.Wrap(err, "check shard coverage") + } + remaining := int(haltGraceFrames - streak.Count) if remaining <= 0 { e.logger.Error( "CRITICAL: Shard has insufficient coverage - triggering network halt", zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))), - zap.Int("prover_count", proverCount), - zap.Int("halt_threshold", haltThreshold), + zap.Uint64("prover_count", proverCount), + zap.Uint64("halt_threshold", haltThreshold), ) // Emit halt event @@ -84,8 +122,8 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { typesconsensus.ControlEventCoverageHalt, &typesconsensus.CoverageEventData{ ShardAddress: []byte(shardAddress), - ProverCount: proverCount, - RequiredProvers: minProvers, + ProverCount: int(proverCount), + RequiredProvers: int(minProvers), AttestedStorage: attestedStorage, TreeMetadata: coverage.TreeMetadata, Message: fmt.Sprintf( @@ -101,8 +139,8 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { e.logger.Warn( "Shard at critical coverage — grace window in effect", zap.String("shard_address", hex.EncodeToString([]byte(shardAddress))), - zap.Int("prover_count", proverCount), - zap.Int("halt_threshold", haltThreshold), + zap.Uint64("prover_count", proverCount), + zap.Uint64("halt_threshold", haltThreshold), zap.Uint64("streak_frames", streak.Count), zap.Int("frames_until_halt", remaining), ) @@ -110,8 +148,8 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { typesconsensus.ControlEventCoverageWarn, &typesconsensus.CoverageEventData{ ShardAddress: []byte(shardAddress), - ProverCount: proverCount, - RequiredProvers: minProvers, + ProverCount: int(proverCount), + RequiredProvers: int(minProvers), AttestedStorage: attestedStorage, TreeMetadata: coverage.TreeMetadata, Message: fmt.Sprintf( @@ -151,7 +189,7 @@ type ShardCoverage struct { func (e *GlobalConsensusEngine) handleLowCoverage( shardAddress []byte, coverage *ShardCoverage, - minProvers int, + minProvers uint64, ) { addressLen := len(shardAddress) @@ -161,7 +199,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage( "shard has low coverage", zap.String("shard_address", hex.EncodeToString(shardAddress)), zap.Int("prover_count", coverage.ProverCount), - zap.Int("min_provers", minProvers), + zap.Uint64("min_provers", minProvers), ) // Emit coverage warning event @@ -170,7 +208,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage( &typesconsensus.CoverageEventData{ ShardAddress: shardAddress, ProverCount: coverage.ProverCount, - RequiredProvers: minProvers, + RequiredProvers: int(minProvers), AttestedStorage: coverage.AttestedStorage, TreeMetadata: coverage.TreeMetadata, Message: "Application shard has low prover coverage", @@ -235,7 +273,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage( &typesconsensus.CoverageEventData{ ShardAddress: shardAddress, ProverCount: coverage.ProverCount, - RequiredProvers: minProvers, + RequiredProvers: int(minProvers), AttestedStorage: coverage.AttestedStorage, TreeMetadata: coverage.TreeMetadata, Message: "shard has low coverage and cannot be merged due to insufficient storage", @@ -249,7 +287,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage( &typesconsensus.CoverageEventData{ ShardAddress: shardAddress, ProverCount: coverage.ProverCount, - RequiredProvers: minProvers, + RequiredProvers: int(minProvers), AttestedStorage: coverage.AttestedStorage, TreeMetadata: coverage.TreeMetadata, Message: "Shard has low coverage and no siblings for merge", @@ -262,7 +300,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage( func (e *GlobalConsensusEngine) handleHighCoverage( shardAddress []byte, coverage *ShardCoverage, - maxProvers int, + maxProvers uint64, ) { addressLen := len(shardAddress) @@ -534,22 +572,71 @@ func (e *GlobalConsensusEngine) proposeShardSplit( return proposedShards } -func (e *GlobalConsensusEngine) ensureStreakMap() { - if e.lowCoverageStreak == nil { - e.lowCoverageStreak = make(map[string]*coverageStreak) +func (e *GlobalConsensusEngine) ensureStreakMap(frameNumber uint64) error { + if e.lowCoverageStreak != nil { + return nil } + + e.logger.Debug("ensuring streak map") + e.lowCoverageStreak = make(map[string]*coverageStreak) + + info, err := e.proverRegistry.GetAllActiveAppShardProvers() + if err != nil { + e.logger.Error( + "could not retrieve active app shard provers", + zap.Error(err), + ) + return errors.Wrap(err, "ensure streak map") + } + + effectiveCoverage := map[string]int{} + lastFrame := map[string]uint64{} + + for _, i := range info { + for _, allocation := range i.Allocations { + if _, ok := effectiveCoverage[string(allocation.ConfirmationFilter)]; !ok { + effectiveCoverage[string(allocation.ConfirmationFilter)] = 0 + lastFrame[string(allocation.ConfirmationFilter)] = + allocation.LastActiveFrameNumber + } + + if allocation.Status == typesconsensus.ProverStatusActive { + effectiveCoverage[string(allocation.ConfirmationFilter)]++ + lastFrame[string(allocation.ConfirmationFilter)] = max( + lastFrame[string(allocation.ConfirmationFilter)], + allocation.LastActiveFrameNumber, + ) + } + } + } + + for shardKey, coverage := range effectiveCoverage { + if coverage <= int(haltThreshold) { + e.lowCoverageStreak[shardKey] = &coverageStreak{ + StartFrame: lastFrame[shardKey], + LastFrame: frameNumber, + Count: frameNumber - lastFrame[shardKey], + } + } + } + + return nil } func (e *GlobalConsensusEngine) bumpStreak( shardKey string, frame uint64, -) *coverageStreak { - e.ensureStreakMap() +) (*coverageStreak, error) { + err := e.ensureStreakMap(frame) + if err != nil { + return nil, errors.Wrap(err, "bump streak") + } + s := e.lowCoverageStreak[shardKey] if s == nil { s = &coverageStreak{StartFrame: frame, LastFrame: frame, Count: 1} e.lowCoverageStreak[shardKey] = s - return s + return s, nil } // Only increment if we advanced frames, prevents double counting within the @@ -558,7 +645,7 @@ func (e *GlobalConsensusEngine) bumpStreak( s.Count += (frame - s.LastFrame) s.LastFrame = frame } - return s + return s, nil } func (e *GlobalConsensusEngine) clearStreak(shardKey string) {