address issues from previous commit test

This commit is contained in:
Cassandra Heart 2026-01-26 19:45:12 -06:00
parent c091c25b7f
commit 1e7d5331d2
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
5 changed files with 251 additions and 26 deletions

View File

@ -84,7 +84,9 @@ func NewConsensusEngineFactory(
peerInfoManager tp2p.PeerInfoManager,
) *ConsensusEngineFactory {
// Initialize peer seniority data
compat.RebuildPeerSeniority(uint(config.P2P.Network))
if err := compat.RebuildPeerSeniority(uint(config.P2P.Network)); err != nil {
panic(errors.Wrap(err, "failed to load peer seniority data"))
}
return &ConsensusEngineFactory{
logger: logger,

View File

@ -157,6 +157,8 @@ type GlobalConsensusEngine struct {
lastRejectFrame atomic.Uint64
proverRootVerifiedFrame atomic.Uint64
proverRootSynced atomic.Bool
lastMaterializedFrame atomic.Uint64
hasFrameGapSinceSync atomic.Bool
lastProposalFrameNumber atomic.Uint64
lastFrameMessageFrameNumber atomic.Uint64
@ -1630,6 +1632,19 @@ func (e *GlobalConsensusEngine) materialize(
var appliedCount atomic.Int64
var skippedCount atomic.Int64
// Check for frame gaps: if this frame is not contiguous with the last
// materialized frame, mark that we've had gaps since sync
lastMaterialized := e.lastMaterializedFrame.Load()
if lastMaterialized > 0 && frameNumber > lastMaterialized+1 {
e.logger.Info(
"frame gap detected",
zap.Uint64("last_materialized", lastMaterialized),
zap.Uint64("current_frame", frameNumber),
zap.Uint64("gap_size", frameNumber-lastMaterialized-1),
)
e.hasFrameGapSinceSync.Store(true)
}
_, err := e.hypergraph.Commit(frameNumber)
if err != nil {
e.logger.Error("error committing hypergraph", zap.Error(err))
@ -1642,6 +1657,8 @@ func (e *GlobalConsensusEngine) materialize(
// Check prover root BEFORE processing transactions. If there's a mismatch,
// we need to sync first, otherwise we'll apply transactions on top of
// divergent state and then sync will delete the newly added records.
// However, only sync if we've also had frame gaps since last successful
// sync - contiguous frames should not trigger sync even if root mismatches.
if len(expectedProverRoot) > 0 {
localProverRoot, localRootErr := e.computeLocalProverRoot(frameNumber)
if localRootErr != nil {
@ -1654,20 +1671,34 @@ func (e *GlobalConsensusEngine) materialize(
updatedProverRoot := localProverRoot
if localRootErr == nil && len(localProverRoot) > 0 {
if !bytes.Equal(localProverRoot, expectedProverRoot) {
e.logger.Info(
"prover root mismatch detected before processing frame, syncing first",
zap.Uint64("frame_number", frameNumber),
zap.String("expected_root", hex.EncodeToString(expectedProverRoot)),
zap.String("local_root", hex.EncodeToString(localProverRoot)),
)
// Perform blocking hypersync before continuing
result := e.performBlockingProverHypersync(
proposer,
expectedProverRoot,
)
if result != nil {
updatedProverRoot = result
hasGap := e.hasFrameGapSinceSync.Load()
rootMismatch := !bytes.Equal(localProverRoot, expectedProverRoot)
if rootMismatch {
if hasGap {
e.logger.Info(
"prover root mismatch with frame gap detected, syncing",
zap.Uint64("frame_number", frameNumber),
zap.String("expected_root", hex.EncodeToString(expectedProverRoot)),
zap.String("local_root", hex.EncodeToString(localProverRoot)),
)
// Perform blocking hypersync before continuing
result := e.performBlockingProverHypersync(
proposer,
expectedProverRoot,
)
if result != nil {
updatedProverRoot = result
}
// Reset gap tracking after sync
e.hasFrameGapSinceSync.Store(false)
} else {
e.logger.Debug(
"prover root mismatch but no frame gap, skipping sync",
zap.Uint64("frame_number", frameNumber),
zap.String("expected_root", hex.EncodeToString(expectedProverRoot)),
zap.String("local_root", hex.EncodeToString(localProverRoot)),
)
}
}
}
@ -1693,6 +1724,9 @@ func (e *GlobalConsensusEngine) materialize(
}
}
// Update last materialized frame
e.lastMaterializedFrame.Store(frameNumber)
var state state.State
state = hgstate.NewHypergraphState(e.hypergraph)
@ -1944,16 +1978,28 @@ func (e *GlobalConsensusEngine) verifyProverRoot(
}
if !bytes.Equal(localRoot, expected) {
hasGap := e.hasFrameGapSinceSync.Load()
e.logger.Warn(
"prover root mismatch",
zap.Uint64("frame_number", frameNumber),
zap.String("expected_root", hex.EncodeToString(expected)),
zap.String("local_root", hex.EncodeToString(localRoot)),
zap.String("proposer", hex.EncodeToString(proposer)),
zap.Bool("has_frame_gap", hasGap),
)
e.proverRootSynced.Store(false)
e.proverRootVerifiedFrame.Store(0)
e.triggerProverHypersync(proposer, expected)
// Only trigger sync if we've had frame gaps since last successful sync.
// Contiguous frames should not trigger sync even if root mismatches.
if hasGap {
e.triggerProverHypersync(proposer, expected)
} else {
e.logger.Debug(
"skipping sync trigger - no frame gap since last sync",
zap.Uint64("frame_number", frameNumber),
)
}
return false
}
@ -1992,6 +2038,8 @@ func (e *GlobalConsensusEngine) triggerProverHypersync(proposer []byte, expected
L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS,
}
e.syncProvider.HyperSync(ctx, proposer, shardKey, nil, expectedRoot)
// Reset gap tracking after sync completes
e.hasFrameGapSinceSync.Store(false)
if err := e.proverRegistry.Refresh(); err != nil {
e.logger.Warn(
"failed to refresh prover registry after hypersync",
@ -3324,8 +3372,9 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin(
"existing prover has lower seniority than merge would provide, submitting seniority merge",
zap.Uint64("existing_seniority", info.Seniority),
zap.Uint64("merge_seniority", mergeSeniority),
zap.Strings("peer_ids", peerIds),
)
return e.submitSeniorityMerge(frame, helpers)
return e.submitSeniorityMerge(frame, helpers, mergeSeniority, peerIds)
}
e.logger.Debug(
"prover already exists with sufficient seniority, skipping join",
@ -3336,8 +3385,9 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin(
}
e.logger.Info(
"existing seniority detected for proposed join",
zap.String("seniority", mergeSeniorityBI.String()),
"proposing worker join with seniority",
zap.Uint64("seniority", mergeSeniority),
zap.Strings("peer_ids", peerIds),
)
var delegate []byte
@ -3468,7 +3518,11 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin(
return errors.Wrap(err, "propose worker join")
}
e.logger.Debug("submitted join request")
e.logger.Info(
"submitted join request",
zap.Uint64("seniority", mergeSeniority),
zap.Strings("peer_ids", peerIds),
)
return nil
}
@ -3562,6 +3616,8 @@ func (e *GlobalConsensusEngine) buildMergeHelpers() ([]*global.SeniorityMerge, [
func (e *GlobalConsensusEngine) submitSeniorityMerge(
frame *protobufs.GlobalFrame,
helpers []*global.SeniorityMerge,
seniority uint64,
peerIds []string,
) error {
if len(helpers) == 0 {
return errors.New("no merge helpers available")
@ -3611,7 +3667,11 @@ func (e *GlobalConsensusEngine) submitSeniorityMerge(
return errors.Wrap(err, "submit seniority merge")
}
e.logger.Info("submitted seniority merge request")
e.logger.Info(
"submitted seniority merge request",
zap.Uint64("seniority", seniority),
zap.Strings("peer_ids", peerIds),
)
return nil
}

View File

@ -914,6 +914,11 @@ func (r *ProverRegistry) extractGlobalState() error {
continue
}
// Skip vertices with nil roots (e.g., spent merge markers)
if data.Root == nil {
continue
}
// Get the key which is always 64 bytes (domain + data address)
key := make([]byte, 64)
copy(key, iter.Key())

View File

@ -4,11 +4,13 @@ import (
_ "embed"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"strconv"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/mr-tron/base58"
"go.uber.org/zap"
)
type FirstRetroJson struct {
@ -73,27 +75,27 @@ func RebuildPeerSeniority(network uint) error {
err := json.Unmarshal(firstRetroJsonBinary, &firstRetro)
if err != nil {
return err
return fmt.Errorf("failed to unmarshal first_retro.json: %w", err)
}
err = json.Unmarshal(secondRetroJsonBinary, &secondRetro)
if err != nil {
return err
return fmt.Errorf("failed to unmarshal second_retro.json: %w", err)
}
err = json.Unmarshal(thirdRetroJsonBinary, &thirdRetro)
if err != nil {
return err
return fmt.Errorf("failed to unmarshal third_retro.json: %w", err)
}
err = json.Unmarshal(fourthRetroJsonBinary, &fourthRetro)
if err != nil {
return err
return fmt.Errorf("failed to unmarshal fourth_retro.json: %w", err)
}
err = json.Unmarshal(mainnetSeniorityJsonBinary, &mainnetSeniority)
if err != nil {
return err
return fmt.Errorf("failed to unmarshal mainnet_244200_seniority.json: %w", err)
}
}
@ -121,6 +123,13 @@ func OverrideSeniority(
}
func GetAggregatedSeniority(peerIds []string) *big.Int {
logger := zap.L()
logger.Debug(
"GetAggregatedSeniority called",
zap.Strings("peer_ids", peerIds),
zap.Int("mainnet_seniority_map_size", len(mainnetSeniority)),
)
highestFirst := uint64(0)
highestSecond := uint64(0)
highestThird := uint64(0)
@ -227,17 +236,36 @@ func GetAggregatedSeniority(peerIds []string) *big.Int {
// Calculate current aggregated value
currentAggregated := highestFirst + highestSecond + highestThird + highestFourth
logger.Debug(
"retro seniority calculation complete",
zap.Uint64("highest_first", highestFirst),
zap.Uint64("highest_second", highestSecond),
zap.Uint64("highest_third", highestThird),
zap.Uint64("highest_fourth", highestFourth),
zap.Uint64("current_aggregated", currentAggregated),
)
highestMainnetSeniority := uint64(0)
for _, peerId := range peerIds {
// Decode base58
decoded, err := base58.Decode(peerId)
if err != nil {
logger.Warn(
"failed to decode peer ID from base58",
zap.String("peer_id", peerId),
zap.Error(err),
)
continue
}
// Hash with poseidon
hashBI, err := poseidon.HashBytes(decoded)
if err != nil {
logger.Warn(
"failed to hash peer ID with poseidon",
zap.String("peer_id", peerId),
zap.Error(err),
)
continue
}
@ -249,13 +277,32 @@ func GetAggregatedSeniority(peerIds []string) *big.Int {
// Look up in mainnetSeniority
if seniority, exists := mainnetSeniority[addressHex]; exists {
logger.Debug(
"found mainnet seniority for peer",
zap.String("peer_id", peerId),
zap.String("address_hex", addressHex),
zap.Uint64("seniority", seniority),
)
if seniority > highestMainnetSeniority {
highestMainnetSeniority = seniority
}
} else {
logger.Debug(
"no mainnet seniority found for peer",
zap.String("peer_id", peerId),
zap.String("address_hex", addressHex),
)
}
}
// Return the higher value between current aggregated and mainnetSeniority
logger.Info(
"GetAggregatedSeniority result",
zap.Uint64("retro_aggregated", currentAggregated),
zap.Uint64("highest_mainnet_seniority", highestMainnetSeniority),
zap.Bool("using_mainnet", highestMainnetSeniority > currentAggregated),
)
if highestMainnetSeniority > currentAggregated {
return new(big.Int).SetUint64(highestMainnetSeniority)
}

View File

@ -25,6 +25,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
type PebbleDB struct {
@ -94,6 +95,7 @@ var pebbleMigrations = []func(*pebble.Batch, *pebble.DB, *config.Config) error{
migration_2_1_0_1818,
migration_2_1_0_1819,
migration_2_1_0_1820,
migration_2_1_0_1821,
}
func NewPebbleDB(
@ -1141,6 +1143,115 @@ func migration_2_1_0_1820(b *pebble.Batch, db *pebble.DB, cfg *config.Config) er
return doMigration1818(db, cfg)
}
// migration_2_1_0_1821 removes spent merge markers from the global intrinsic domain.
// These markers were created with incorrect seniority values and need to be removed
// to allow provers to re-merge with correct seniority values.
//
// Spent merge markers are identified by:
// 1. Being in the global intrinsic domain (GLOBAL_INTRINSIC_ADDRESS = 0xff * 32)
// 2. Having an empty VectorCommitmentTree (no actual data, just a marker)
func migration_2_1_0_1821(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return doMigration1821(db, cfg)
}
// doMigration1821 performs the actual work for migration_2_1_0_1821.
func doMigration1821(db *pebble.DB, cfg *config.Config) error {
logger := zap.L()
// Global intrinsic address: 32 bytes of 0xff
globalIntrinsicAddress := [32]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
prover := bls48581.NewKZGInclusionProver(logger)
// Create hypergraph from actual DB
dbWrapper := &PebbleDB{db: db}
hgStore := NewPebbleHypergraphStore(cfg.DB, dbWrapper, logger, nil, prover)
hg, err := hgStore.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "load hypergraph")
}
hgCRDT := hg.(*hgcrdt.HypergraphCRDT)
// Get shard key for the global intrinsic domain
// L1 is computed from bloom filter indices of the domain
globalShardKey := tries.ShardKey{
L1: [3]byte(up2p.GetBloomFilterIndices(globalIntrinsicAddress[:], 256, 3)),
L2: globalIntrinsicAddress,
}
// Create a transaction for the deletions
txn, err := hgStore.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "create transaction")
}
// Get the vertex data iterator for the global intrinsic domain
iter := hgCRDT.GetVertexDataIterator(globalIntrinsicAddress)
defer iter.Close()
deletedCount := 0
totalCount := 0
for valid := iter.First(); valid; valid = iter.Next() {
totalCount++
tree := iter.Value()
if tree == nil {
continue
}
// Check if this is an empty tree (spent merge marker)
// Spent markers have Root == nil or GetSize() == 0
if tree.Root == nil || tree.GetSize().Sign() == 0 {
// This is a spent marker - delete it
// The Key() returns the full 64-byte vertex ID (domain + address)
key := iter.Key()
if len(key) < 64 {
continue
}
var vertexID [64]byte
copy(vertexID[:], key[:64])
if err := hgCRDT.DeleteVertexAdd(txn, globalShardKey, vertexID); err != nil {
logger.Warn("failed to delete spent marker",
zap.String("vertex_id", hex.EncodeToString(vertexID[:])),
zap.Error(err),
)
continue
}
deletedCount++
// Log progress every 1000 deletions
if deletedCount%1000 == 0 {
logger.Info("migration 1821: progress",
zap.Int("deleted", deletedCount),
zap.Int("examined", totalCount),
)
}
}
}
// Commit the transaction
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "commit transaction")
}
logger.Info("migration 1821: completed",
zap.Int("deleted_spent_markers", deletedCount),
zap.Int("total_examined", totalCount),
)
return nil
}
// pebbleBatchDB wraps a *pebble.Batch to implement store.KVDB for use in migrations
type pebbleBatchDB struct {
b *pebble.Batch