From 97069eafa9dbee65665e970c546930917dfb4f07 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Wed, 28 Jan 2026 13:48:42 -0600 Subject: [PATCH] rearrange collect, reduce sync logging noise --- hypergraph/sync_client_driven.go | 143 ++++++++++-------- .../global/consensus_leader_provider.go | 14 ++ .../global/global_consensus_engine.go | 42 +++-- 3 files changed, 124 insertions(+), 75 deletions(-) diff --git a/hypergraph/sync_client_driven.go b/hypergraph/sync_client_driven.go index 1147acd..ca723fb 100644 --- a/hypergraph/sync_client_driven.go +++ b/hypergraph/sync_client_driven.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/hex" - "fmt" "io" "slices" "strings" @@ -26,6 +25,40 @@ type syncSession struct { store tries.TreeBackingStore } +// isGlobalProverShard returns true if this is the global prover registry shard +// (L1={0,0,0}, L2=0xff*32). Used to enable detailed logging for prover sync +// without adding noise from other shard syncs. +func isGlobalProverShard(shardKey tries.ShardKey) bool { + if shardKey.L1 != [3]byte{0, 0, 0} { + return false + } + for _, b := range shardKey.L2 { + if b != 0xff { + return false + } + } + return true +} + +// isGlobalProverShardBytes checks the same for concatenated byte slice (35 bytes). +func isGlobalProverShardBytes(shardKeyBytes []byte) bool { + if len(shardKeyBytes) != 35 { + return false + } + for i := 0; i < 3; i++ { + if shardKeyBytes[i] != 0x00 { + return false + } + } + for i := 3; i < 35; i++ { + if shardKeyBytes[i] != 0xff { + return false + } + } + return true +} + + // PerformSync implements the server side of the client-driven sync protocol. // The client sends GetBranch and GetLeaves requests, and the server responds // with the requested data. This is simpler than HyperStream because there's @@ -43,13 +76,14 @@ func (hg *HypergraphCRDT) PerformSync( // Session state - initialized on first request var session *syncSession defer func() { - if session != nil && session.snapshot != nil { - hg.snapshotMgr.release(session.snapshot) + if session != nil { + logger.Info("sync session closed", + zap.Duration("duration", time.Since(sessionStart)), + ) + if session.snapshot != nil { + hg.snapshotMgr.release(session.snapshot) + } } - logger.Info( - "PerformSync session finished", - zap.Duration("duration", time.Since(sessionStart)), - ) }() // Process requests until stream closes @@ -158,14 +192,9 @@ func (hg *HypergraphCRDT) initSyncSession( return nil, errors.New("unsupported phase set") } - tree := idSet.GetTree() - logger.Debug( - "sync session initialized", + logger.Info("sync session started", zap.String("shard", hex.EncodeToString(shardKeyBytes)), - zap.Int("phaseSet", int(phaseSet)), - zap.Bool("tree_nil", tree == nil), - zap.Bool("root_nil", tree != nil && tree.Root == nil), - zap.String("snapshot_root", hex.EncodeToString(snapshot.Root())), + zap.String("phase", phaseSet.String()), ) return &syncSession{ @@ -185,12 +214,6 @@ func (hg *HypergraphCRDT) handleGetBranch( ) (*protobufs.HypergraphSyncResponse, error) { tree := session.idSet.GetTree() if tree == nil || tree.Root == nil { - // Empty tree - return empty response - logger.Debug("handleGetBranch: empty tree", - zap.Bool("tree_nil", tree == nil), - zap.Bool("root_nil", tree != nil && tree.Root == nil), - zap.String("path", hex.EncodeToString(packPath(req.Path))), - ) return &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Branch{ Branch: &protobufs.HypergraphSyncBranchResponse{ @@ -334,25 +357,6 @@ func (hg *HypergraphCRDT) handleGetLeaves( }, nil } - // Debug: log node details to identify snapshot consistency issues - var nodeCommitment []byte - var nodeStore string - switch n := node.(type) { - case *tries.LazyVectorCommitmentBranchNode: - nodeCommitment = n.Commitment - nodeStore = fmt.Sprintf("%p", n.Store) - case *tries.LazyVectorCommitmentLeafNode: - nodeCommitment = n.Commitment - nodeStore = fmt.Sprintf("%p", n.Store) - } - logger.Debug("handleGetLeaves node info", - zap.String("path", hex.EncodeToString(packPath(req.Path))), - zap.String("nodeCommitment", hex.EncodeToString(nodeCommitment)), - zap.String("nodeStore", nodeStore), - zap.String("sessionStore", fmt.Sprintf("%p", session.store)), - zap.Int("contTokenLen", len(req.ContinuationToken)), - ) - // Get all leaves under this node allLeaves := tries.GetAllLeaves( tree.SetType, @@ -413,17 +417,6 @@ func (hg *HypergraphCRDT) handleGetLeaves( _ = i // suppress unused warning } - // Debug: log leaf count details - logger.Debug("handleGetLeaves returning", - zap.String("path", hex.EncodeToString(packPath(req.Path))), - zap.Int("allLeavesLen", len(allLeaves)), - zap.Uint64("totalNonNil", totalNonNil), - zap.Int("startIdx", startIdx), - zap.Int("leavesReturned", len(leaves)), - zap.String("treePtr", fmt.Sprintf("%p", tree)), - zap.String("treeRootPtr", fmt.Sprintf("%p", tree.Root)), - ) - resp := &protobufs.HypergraphSyncLeavesResponse{ Path: req.Path, Leaves: leaves, @@ -494,6 +487,8 @@ func (hg *HypergraphCRDT) SyncFrom( hg.mu.Lock() defer hg.mu.Unlock() + isGlobalProver := isGlobalProverShard(shardKey) + logger := hg.logger.With( zap.String("method", "SyncFrom"), zap.String("shard", hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:]))), @@ -504,7 +499,7 @@ func (hg *HypergraphCRDT) SyncFrom( syncStart := time.Now() defer func() { - logger.Info("SyncFrom completed", zap.Duration("duration", time.Since(syncStart))) + logger.Debug("SyncFrom completed", zap.Duration("duration", time.Since(syncStart))) }() set := hg.getPhaseSet(shardKey, phaseSet) @@ -512,6 +507,12 @@ func (hg *HypergraphCRDT) SyncFrom( return nil, errors.New("unsupported phase set") } + // For global prover sync, capture pre-sync state to detect changes + var preSyncRoot []byte + if isGlobalProver { + preSyncRoot = set.GetTree().Commit(false) + } + shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:]) coveredPrefix := hg.getCoveredPrefix() @@ -522,7 +523,7 @@ func (hg *HypergraphCRDT) SyncFrom( } if syncPoint == nil || len(syncPoint.Commitment) == 0 { - logger.Info("server has no data at sync point") + logger.Debug("server has no data at sync point") // Return current root even if no data was synced root := set.GetTree().Commit(false) return root, nil @@ -536,10 +537,18 @@ func (hg *HypergraphCRDT) SyncFrom( // Step 3: Recompute commitment so future syncs see updated state root := set.GetTree().Commit(false) - logger.Info( - "hypergraph root commit after sync", - zap.String("root", hex.EncodeToString(root)), - ) + + // For global prover, only log if sync didn't converge (the interesting case) + if isGlobalProver && !bytes.Equal(root, expectedRoot) { + logger.Warn( + "global prover sync did not converge", + zap.String("phase", phaseSet.String()), + zap.String("pre_sync_root", hex.EncodeToString(preSyncRoot)), + zap.String("post_sync_root", hex.EncodeToString(root)), + zap.String("expected_root", hex.EncodeToString(expectedRoot)), + zap.Bool("root_changed", !bytes.Equal(preSyncRoot, root)), + ) + } return root, nil } @@ -675,12 +684,24 @@ func (hg *HypergraphCRDT) syncSubtree( // If commitments match, subtrees are identical if bytes.Equal(localCommitment, serverBranch.Commitment) { - logger.Debug("subtree matches", - zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))), - ) return nil } + // Log divergence for global prover sync + isGlobalProver := isGlobalProverShardBytes(shardKey) + if isGlobalProver { + logger.Info("global prover sync: commitment divergence", + zap.String("phase", phaseSet.String()), + zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))), + zap.Int("path_depth", len(serverBranch.FullPath)), + zap.String("local_commitment", hex.EncodeToString(localCommitment)), + zap.String("server_commitment", hex.EncodeToString(serverBranch.Commitment)), + zap.Bool("local_has_data", localNode != nil), + zap.Int("server_children", len(serverBranch.Children)), + zap.Bool("server_is_leaf", serverBranch.IsLeaf), + ) + } + // If server node is a leaf or has no children, fetch all leaves if serverBranch.IsLeaf || len(serverBranch.Children) == 0 { return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger) @@ -689,10 +710,6 @@ func (hg *HypergraphCRDT) syncSubtree( // If we have NO local data at this path, fetch all leaves directly. // This avoids N round trips for N children when we need all of them anyway. if localNode == nil { - logger.Debug("no local data at path, fetching all leaves directly", - zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))), - zap.Int("serverChildren", len(serverBranch.Children)), - ) return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger) } diff --git a/node/consensus/global/consensus_leader_provider.go b/node/consensus/global/consensus_leader_provider.go index 8179500..ca7f585 100644 --- a/node/consensus/global/consensus_leader_provider.go +++ b/node/consensus/global/consensus_leader_provider.go @@ -165,6 +165,20 @@ func (p *GlobalLeaderProvider) ProveNextState( ) } + // Collect messages and rebuild shard commitments now that we've acquired + // the proving mutex and validated the prior frame. This prevents race + // conditions where a subsequent OnRankChange would overwrite collectedMessages + // and shardCommitments while we're still proving. + _, err = p.engine.livenessProvider.Collect( + ctx, + prior.Header.FrameNumber+1, + rank, + ) + if err != nil { + frameProvingTotal.WithLabelValues("error").Inc() + return nil, models.NewNoVoteErrorf("could not collect: %+v", err) + } + timer := prometheus.NewTimer(frameProvingDuration) defer timer.ObserveDuration() diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index e45655a..8389a80 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -1662,12 +1662,35 @@ func (e *GlobalConsensusEngine) materialize( zap.String("local_root", hex.EncodeToString(localProverRoot)), ) // Perform blocking hypersync before continuing - result := e.performBlockingProverHypersync( + _ = e.performBlockingProverHypersync( proposer, expectedProverRoot, ) - if result != nil { - updatedProverRoot = result + + // Re-compute local prover root after sync to verify convergence + newLocalRoot, newRootErr := e.computeLocalProverRoot(frameNumber) + if newRootErr != nil { + e.logger.Warn( + "failed to compute local prover root after sync", + zap.Uint64("frame_number", frameNumber), + zap.Error(newRootErr), + ) + } else { + updatedProverRoot = newLocalRoot + if !bytes.Equal(newLocalRoot, expectedProverRoot) { + e.logger.Warn( + "prover root still mismatched after sync - convergence failed", + zap.Uint64("frame_number", frameNumber), + zap.String("expected_root", hex.EncodeToString(expectedProverRoot)), + zap.String("post_sync_local_root", hex.EncodeToString(newLocalRoot)), + ) + } else { + e.logger.Info( + "prover root converged after sync", + zap.Uint64("frame_number", frameNumber), + zap.String("root", hex.EncodeToString(newLocalRoot)), + ) + } } } } @@ -4195,7 +4218,7 @@ func (e *GlobalConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) { frameProvingTotal.WithLabelValues("error").Inc() return } - prior, err := e.clockStore.GetGlobalClockFrameCandidate( + _, err = e.clockStore.GetGlobalClockFrameCandidate( qc.FrameNumber, []byte(qc.Identity()), ) @@ -4204,14 +4227,9 @@ func (e *GlobalConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) { frameProvingTotal.WithLabelValues("error").Inc() return } - _, err = e.livenessProvider.Collect( - context.TODO(), - prior.Header.FrameNumber+1, - newRank, - ) - if err != nil { - return - } + // Note: Collect is called in ProveNextState after tryBeginProvingRank succeeds + // to avoid race conditions where a subsequent OnRankChange overwrites + // collectedMessages and shardCommitments while ProveNextState is still running } func (e *GlobalConsensusEngine) rebuildShardCommitments(