diff --git a/node/consensus/global/factory.go b/node/consensus/global/factory.go index 1533695..650cecd 100644 --- a/node/consensus/global/factory.go +++ b/node/consensus/global/factory.go @@ -84,7 +84,9 @@ func NewConsensusEngineFactory( peerInfoManager tp2p.PeerInfoManager, ) *ConsensusEngineFactory { // Initialize peer seniority data - compat.RebuildPeerSeniority(uint(config.P2P.Network)) + if err := compat.RebuildPeerSeniority(uint(config.P2P.Network)); err != nil { + panic(errors.Wrap(err, "failed to load peer seniority data")) + } return &ConsensusEngineFactory{ logger: logger, diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index ef84dec..5e8de45 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -157,6 +157,8 @@ type GlobalConsensusEngine struct { lastRejectFrame atomic.Uint64 proverRootVerifiedFrame atomic.Uint64 proverRootSynced atomic.Bool + lastMaterializedFrame atomic.Uint64 + hasFrameGapSinceSync atomic.Bool lastProposalFrameNumber atomic.Uint64 lastFrameMessageFrameNumber atomic.Uint64 @@ -1630,6 +1632,19 @@ func (e *GlobalConsensusEngine) materialize( var appliedCount atomic.Int64 var skippedCount atomic.Int64 + // Check for frame gaps: if this frame is not contiguous with the last + // materialized frame, mark that we've had gaps since sync + lastMaterialized := e.lastMaterializedFrame.Load() + if lastMaterialized > 0 && frameNumber > lastMaterialized+1 { + e.logger.Info( + "frame gap detected", + zap.Uint64("last_materialized", lastMaterialized), + zap.Uint64("current_frame", frameNumber), + zap.Uint64("gap_size", frameNumber-lastMaterialized-1), + ) + e.hasFrameGapSinceSync.Store(true) + } + _, err := e.hypergraph.Commit(frameNumber) if err != nil { e.logger.Error("error committing hypergraph", zap.Error(err)) @@ -1642,6 +1657,8 @@ func (e *GlobalConsensusEngine) materialize( // Check prover root BEFORE processing transactions. If there's a mismatch, // we need to sync first, otherwise we'll apply transactions on top of // divergent state and then sync will delete the newly added records. + // However, only sync if we've also had frame gaps since last successful + // sync - contiguous frames should not trigger sync even if root mismatches. if len(expectedProverRoot) > 0 { localProverRoot, localRootErr := e.computeLocalProverRoot(frameNumber) if localRootErr != nil { @@ -1654,20 +1671,34 @@ func (e *GlobalConsensusEngine) materialize( updatedProverRoot := localProverRoot if localRootErr == nil && len(localProverRoot) > 0 { - if !bytes.Equal(localProverRoot, expectedProverRoot) { - e.logger.Info( - "prover root mismatch detected before processing frame, syncing first", - zap.Uint64("frame_number", frameNumber), - zap.String("expected_root", hex.EncodeToString(expectedProverRoot)), - zap.String("local_root", hex.EncodeToString(localProverRoot)), - ) - // Perform blocking hypersync before continuing - result := e.performBlockingProverHypersync( - proposer, - expectedProverRoot, - ) - if result != nil { - updatedProverRoot = result + hasGap := e.hasFrameGapSinceSync.Load() + rootMismatch := !bytes.Equal(localProverRoot, expectedProverRoot) + + if rootMismatch { + if hasGap { + e.logger.Info( + "prover root mismatch with frame gap detected, syncing", + zap.Uint64("frame_number", frameNumber), + zap.String("expected_root", hex.EncodeToString(expectedProverRoot)), + zap.String("local_root", hex.EncodeToString(localProverRoot)), + ) + // Perform blocking hypersync before continuing + result := e.performBlockingProverHypersync( + proposer, + expectedProverRoot, + ) + if result != nil { + updatedProverRoot = result + } + // Reset gap tracking after sync + e.hasFrameGapSinceSync.Store(false) + } else { + e.logger.Debug( + "prover root mismatch but no frame gap, skipping sync", + zap.Uint64("frame_number", frameNumber), + zap.String("expected_root", hex.EncodeToString(expectedProverRoot)), + zap.String("local_root", hex.EncodeToString(localProverRoot)), + ) } } } @@ -1693,6 +1724,9 @@ func (e *GlobalConsensusEngine) materialize( } } + // Update last materialized frame + e.lastMaterializedFrame.Store(frameNumber) + var state state.State state = hgstate.NewHypergraphState(e.hypergraph) @@ -1944,16 +1978,28 @@ func (e *GlobalConsensusEngine) verifyProverRoot( } if !bytes.Equal(localRoot, expected) { + hasGap := e.hasFrameGapSinceSync.Load() e.logger.Warn( "prover root mismatch", zap.Uint64("frame_number", frameNumber), zap.String("expected_root", hex.EncodeToString(expected)), zap.String("local_root", hex.EncodeToString(localRoot)), zap.String("proposer", hex.EncodeToString(proposer)), + zap.Bool("has_frame_gap", hasGap), ) e.proverRootSynced.Store(false) e.proverRootVerifiedFrame.Store(0) - e.triggerProverHypersync(proposer, expected) + + // Only trigger sync if we've had frame gaps since last successful sync. + // Contiguous frames should not trigger sync even if root mismatches. + if hasGap { + e.triggerProverHypersync(proposer, expected) + } else { + e.logger.Debug( + "skipping sync trigger - no frame gap since last sync", + zap.Uint64("frame_number", frameNumber), + ) + } return false } @@ -1992,6 +2038,8 @@ func (e *GlobalConsensusEngine) triggerProverHypersync(proposer []byte, expected L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS, } e.syncProvider.HyperSync(ctx, proposer, shardKey, nil, expectedRoot) + // Reset gap tracking after sync completes + e.hasFrameGapSinceSync.Store(false) if err := e.proverRegistry.Refresh(); err != nil { e.logger.Warn( "failed to refresh prover registry after hypersync", @@ -3324,8 +3372,9 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin( "existing prover has lower seniority than merge would provide, submitting seniority merge", zap.Uint64("existing_seniority", info.Seniority), zap.Uint64("merge_seniority", mergeSeniority), + zap.Strings("peer_ids", peerIds), ) - return e.submitSeniorityMerge(frame, helpers) + return e.submitSeniorityMerge(frame, helpers, mergeSeniority, peerIds) } e.logger.Debug( "prover already exists with sufficient seniority, skipping join", @@ -3336,8 +3385,9 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin( } e.logger.Info( - "existing seniority detected for proposed join", - zap.String("seniority", mergeSeniorityBI.String()), + "proposing worker join with seniority", + zap.Uint64("seniority", mergeSeniority), + zap.Strings("peer_ids", peerIds), ) var delegate []byte @@ -3468,7 +3518,11 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin( return errors.Wrap(err, "propose worker join") } - e.logger.Debug("submitted join request") + e.logger.Info( + "submitted join request", + zap.Uint64("seniority", mergeSeniority), + zap.Strings("peer_ids", peerIds), + ) return nil } @@ -3562,6 +3616,8 @@ func (e *GlobalConsensusEngine) buildMergeHelpers() ([]*global.SeniorityMerge, [ func (e *GlobalConsensusEngine) submitSeniorityMerge( frame *protobufs.GlobalFrame, helpers []*global.SeniorityMerge, + seniority uint64, + peerIds []string, ) error { if len(helpers) == 0 { return errors.New("no merge helpers available") @@ -3611,7 +3667,11 @@ func (e *GlobalConsensusEngine) submitSeniorityMerge( return errors.Wrap(err, "submit seniority merge") } - e.logger.Info("submitted seniority merge request") + e.logger.Info( + "submitted seniority merge request", + zap.Uint64("seniority", seniority), + zap.Strings("peer_ids", peerIds), + ) return nil } diff --git a/node/consensus/provers/prover_registry.go b/node/consensus/provers/prover_registry.go index 0b57933..5317fd3 100644 --- a/node/consensus/provers/prover_registry.go +++ b/node/consensus/provers/prover_registry.go @@ -914,6 +914,11 @@ func (r *ProverRegistry) extractGlobalState() error { continue } + // Skip vertices with nil roots (e.g., spent merge markers) + if data.Root == nil { + continue + } + // Get the key which is always 64 bytes (domain + data address) key := make([]byte, 64) copy(key, iter.Key()) diff --git a/node/execution/intrinsics/global/compat/seniority.go b/node/execution/intrinsics/global/compat/seniority.go index 0111688..a69db40 100644 --- a/node/execution/intrinsics/global/compat/seniority.go +++ b/node/execution/intrinsics/global/compat/seniority.go @@ -4,11 +4,13 @@ import ( _ "embed" "encoding/hex" "encoding/json" + "fmt" "math/big" "strconv" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/mr-tron/base58" + "go.uber.org/zap" ) type FirstRetroJson struct { @@ -73,27 +75,27 @@ func RebuildPeerSeniority(network uint) error { err := json.Unmarshal(firstRetroJsonBinary, &firstRetro) if err != nil { - return err + return fmt.Errorf("failed to unmarshal first_retro.json: %w", err) } err = json.Unmarshal(secondRetroJsonBinary, &secondRetro) if err != nil { - return err + return fmt.Errorf("failed to unmarshal second_retro.json: %w", err) } err = json.Unmarshal(thirdRetroJsonBinary, &thirdRetro) if err != nil { - return err + return fmt.Errorf("failed to unmarshal third_retro.json: %w", err) } err = json.Unmarshal(fourthRetroJsonBinary, &fourthRetro) if err != nil { - return err + return fmt.Errorf("failed to unmarshal fourth_retro.json: %w", err) } err = json.Unmarshal(mainnetSeniorityJsonBinary, &mainnetSeniority) if err != nil { - return err + return fmt.Errorf("failed to unmarshal mainnet_244200_seniority.json: %w", err) } } @@ -121,6 +123,13 @@ func OverrideSeniority( } func GetAggregatedSeniority(peerIds []string) *big.Int { + logger := zap.L() + logger.Debug( + "GetAggregatedSeniority called", + zap.Strings("peer_ids", peerIds), + zap.Int("mainnet_seniority_map_size", len(mainnetSeniority)), + ) + highestFirst := uint64(0) highestSecond := uint64(0) highestThird := uint64(0) @@ -227,17 +236,36 @@ func GetAggregatedSeniority(peerIds []string) *big.Int { // Calculate current aggregated value currentAggregated := highestFirst + highestSecond + highestThird + highestFourth + logger.Debug( + "retro seniority calculation complete", + zap.Uint64("highest_first", highestFirst), + zap.Uint64("highest_second", highestSecond), + zap.Uint64("highest_third", highestThird), + zap.Uint64("highest_fourth", highestFourth), + zap.Uint64("current_aggregated", currentAggregated), + ) + highestMainnetSeniority := uint64(0) for _, peerId := range peerIds { // Decode base58 decoded, err := base58.Decode(peerId) if err != nil { + logger.Warn( + "failed to decode peer ID from base58", + zap.String("peer_id", peerId), + zap.Error(err), + ) continue } // Hash with poseidon hashBI, err := poseidon.HashBytes(decoded) if err != nil { + logger.Warn( + "failed to hash peer ID with poseidon", + zap.String("peer_id", peerId), + zap.Error(err), + ) continue } @@ -249,13 +277,32 @@ func GetAggregatedSeniority(peerIds []string) *big.Int { // Look up in mainnetSeniority if seniority, exists := mainnetSeniority[addressHex]; exists { + logger.Debug( + "found mainnet seniority for peer", + zap.String("peer_id", peerId), + zap.String("address_hex", addressHex), + zap.Uint64("seniority", seniority), + ) if seniority > highestMainnetSeniority { highestMainnetSeniority = seniority } + } else { + logger.Debug( + "no mainnet seniority found for peer", + zap.String("peer_id", peerId), + zap.String("address_hex", addressHex), + ) } } // Return the higher value between current aggregated and mainnetSeniority + logger.Info( + "GetAggregatedSeniority result", + zap.Uint64("retro_aggregated", currentAggregated), + zap.Uint64("highest_mainnet_seniority", highestMainnetSeniority), + zap.Bool("using_mainnet", highestMainnetSeniority > currentAggregated), + ) + if highestMainnetSeniority > currentAggregated { return new(big.Int).SetUint64(highestMainnetSeniority) } diff --git a/node/store/pebble.go b/node/store/pebble.go index cedc86a..f1f0b72 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -25,6 +25,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/store" "source.quilibrium.com/quilibrium/monorepo/types/tries" + up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p" ) type PebbleDB struct { @@ -94,6 +95,7 @@ var pebbleMigrations = []func(*pebble.Batch, *pebble.DB, *config.Config) error{ migration_2_1_0_1818, migration_2_1_0_1819, migration_2_1_0_1820, + migration_2_1_0_1821, } func NewPebbleDB( @@ -1141,6 +1143,115 @@ func migration_2_1_0_1820(b *pebble.Batch, db *pebble.DB, cfg *config.Config) er return doMigration1818(db, cfg) } +// migration_2_1_0_1821 removes spent merge markers from the global intrinsic domain. +// These markers were created with incorrect seniority values and need to be removed +// to allow provers to re-merge with correct seniority values. +// +// Spent merge markers are identified by: +// 1. Being in the global intrinsic domain (GLOBAL_INTRINSIC_ADDRESS = 0xff * 32) +// 2. Having an empty VectorCommitmentTree (no actual data, just a marker) +func migration_2_1_0_1821(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { + return doMigration1821(db, cfg) +} + +// doMigration1821 performs the actual work for migration_2_1_0_1821. +func doMigration1821(db *pebble.DB, cfg *config.Config) error { + logger := zap.L() + + // Global intrinsic address: 32 bytes of 0xff + globalIntrinsicAddress := [32]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + } + + prover := bls48581.NewKZGInclusionProver(logger) + + // Create hypergraph from actual DB + dbWrapper := &PebbleDB{db: db} + hgStore := NewPebbleHypergraphStore(cfg.DB, dbWrapper, logger, nil, prover) + + hg, err := hgStore.LoadHypergraph(nil, 0) + if err != nil { + return errors.Wrap(err, "load hypergraph") + } + hgCRDT := hg.(*hgcrdt.HypergraphCRDT) + + // Get shard key for the global intrinsic domain + // L1 is computed from bloom filter indices of the domain + globalShardKey := tries.ShardKey{ + L1: [3]byte(up2p.GetBloomFilterIndices(globalIntrinsicAddress[:], 256, 3)), + L2: globalIntrinsicAddress, + } + + // Create a transaction for the deletions + txn, err := hgStore.NewTransaction(false) + if err != nil { + return errors.Wrap(err, "create transaction") + } + + // Get the vertex data iterator for the global intrinsic domain + iter := hgCRDT.GetVertexDataIterator(globalIntrinsicAddress) + defer iter.Close() + + deletedCount := 0 + totalCount := 0 + + for valid := iter.First(); valid; valid = iter.Next() { + totalCount++ + + tree := iter.Value() + if tree == nil { + continue + } + + // Check if this is an empty tree (spent merge marker) + // Spent markers have Root == nil or GetSize() == 0 + if tree.Root == nil || tree.GetSize().Sign() == 0 { + // This is a spent marker - delete it + // The Key() returns the full 64-byte vertex ID (domain + address) + key := iter.Key() + if len(key) < 64 { + continue + } + + var vertexID [64]byte + copy(vertexID[:], key[:64]) + + if err := hgCRDT.DeleteVertexAdd(txn, globalShardKey, vertexID); err != nil { + logger.Warn("failed to delete spent marker", + zap.String("vertex_id", hex.EncodeToString(vertexID[:])), + zap.Error(err), + ) + continue + } + + deletedCount++ + + // Log progress every 1000 deletions + if deletedCount%1000 == 0 { + logger.Info("migration 1821: progress", + zap.Int("deleted", deletedCount), + zap.Int("examined", totalCount), + ) + } + } + } + + // Commit the transaction + if err := txn.Commit(); err != nil { + return errors.Wrap(err, "commit transaction") + } + + logger.Info("migration 1821: completed", + zap.Int("deleted_spent_markers", deletedCount), + zap.Int("total_examined", totalCount), + ) + + return nil +} + // pebbleBatchDB wraps a *pebble.Batch to implement store.KVDB for use in migrations type pebbleBatchDB struct { b *pebble.Batch