rearrange collect, reduce sync logging noise

This commit is contained in:
Cassandra Heart 2026-01-28 13:48:42 -06:00
parent 5c97e8d619
commit 97069eafa9
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
3 changed files with 124 additions and 75 deletions

View File

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"slices"
"strings"
@ -26,6 +25,40 @@ type syncSession struct {
store tries.TreeBackingStore
}
// isGlobalProverShard returns true if this is the global prover registry shard
// (L1={0,0,0}, L2=0xff*32). Used to enable detailed logging for prover sync
// without adding noise from other shard syncs.
func isGlobalProverShard(shardKey tries.ShardKey) bool {
if shardKey.L1 != [3]byte{0, 0, 0} {
return false
}
for _, b := range shardKey.L2 {
if b != 0xff {
return false
}
}
return true
}
// isGlobalProverShardBytes checks the same for concatenated byte slice (35 bytes).
func isGlobalProverShardBytes(shardKeyBytes []byte) bool {
if len(shardKeyBytes) != 35 {
return false
}
for i := 0; i < 3; i++ {
if shardKeyBytes[i] != 0x00 {
return false
}
}
for i := 3; i < 35; i++ {
if shardKeyBytes[i] != 0xff {
return false
}
}
return true
}
// PerformSync implements the server side of the client-driven sync protocol.
// The client sends GetBranch and GetLeaves requests, and the server responds
// with the requested data. This is simpler than HyperStream because there's
@ -43,13 +76,14 @@ func (hg *HypergraphCRDT) PerformSync(
// Session state - initialized on first request
var session *syncSession
defer func() {
if session != nil && session.snapshot != nil {
hg.snapshotMgr.release(session.snapshot)
if session != nil {
logger.Info("sync session closed",
zap.Duration("duration", time.Since(sessionStart)),
)
if session.snapshot != nil {
hg.snapshotMgr.release(session.snapshot)
}
}
logger.Info(
"PerformSync session finished",
zap.Duration("duration", time.Since(sessionStart)),
)
}()
// Process requests until stream closes
@ -158,14 +192,9 @@ func (hg *HypergraphCRDT) initSyncSession(
return nil, errors.New("unsupported phase set")
}
tree := idSet.GetTree()
logger.Debug(
"sync session initialized",
logger.Info("sync session started",
zap.String("shard", hex.EncodeToString(shardKeyBytes)),
zap.Int("phaseSet", int(phaseSet)),
zap.Bool("tree_nil", tree == nil),
zap.Bool("root_nil", tree != nil && tree.Root == nil),
zap.String("snapshot_root", hex.EncodeToString(snapshot.Root())),
zap.String("phase", phaseSet.String()),
)
return &syncSession{
@ -185,12 +214,6 @@ func (hg *HypergraphCRDT) handleGetBranch(
) (*protobufs.HypergraphSyncResponse, error) {
tree := session.idSet.GetTree()
if tree == nil || tree.Root == nil {
// Empty tree - return empty response
logger.Debug("handleGetBranch: empty tree",
zap.Bool("tree_nil", tree == nil),
zap.Bool("root_nil", tree != nil && tree.Root == nil),
zap.String("path", hex.EncodeToString(packPath(req.Path))),
)
return &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Branch{
Branch: &protobufs.HypergraphSyncBranchResponse{
@ -334,25 +357,6 @@ func (hg *HypergraphCRDT) handleGetLeaves(
}, nil
}
// Debug: log node details to identify snapshot consistency issues
var nodeCommitment []byte
var nodeStore string
switch n := node.(type) {
case *tries.LazyVectorCommitmentBranchNode:
nodeCommitment = n.Commitment
nodeStore = fmt.Sprintf("%p", n.Store)
case *tries.LazyVectorCommitmentLeafNode:
nodeCommitment = n.Commitment
nodeStore = fmt.Sprintf("%p", n.Store)
}
logger.Debug("handleGetLeaves node info",
zap.String("path", hex.EncodeToString(packPath(req.Path))),
zap.String("nodeCommitment", hex.EncodeToString(nodeCommitment)),
zap.String("nodeStore", nodeStore),
zap.String("sessionStore", fmt.Sprintf("%p", session.store)),
zap.Int("contTokenLen", len(req.ContinuationToken)),
)
// Get all leaves under this node
allLeaves := tries.GetAllLeaves(
tree.SetType,
@ -413,17 +417,6 @@ func (hg *HypergraphCRDT) handleGetLeaves(
_ = i // suppress unused warning
}
// Debug: log leaf count details
logger.Debug("handleGetLeaves returning",
zap.String("path", hex.EncodeToString(packPath(req.Path))),
zap.Int("allLeavesLen", len(allLeaves)),
zap.Uint64("totalNonNil", totalNonNil),
zap.Int("startIdx", startIdx),
zap.Int("leavesReturned", len(leaves)),
zap.String("treePtr", fmt.Sprintf("%p", tree)),
zap.String("treeRootPtr", fmt.Sprintf("%p", tree.Root)),
)
resp := &protobufs.HypergraphSyncLeavesResponse{
Path: req.Path,
Leaves: leaves,
@ -494,6 +487,8 @@ func (hg *HypergraphCRDT) SyncFrom(
hg.mu.Lock()
defer hg.mu.Unlock()
isGlobalProver := isGlobalProverShard(shardKey)
logger := hg.logger.With(
zap.String("method", "SyncFrom"),
zap.String("shard", hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:]))),
@ -504,7 +499,7 @@ func (hg *HypergraphCRDT) SyncFrom(
syncStart := time.Now()
defer func() {
logger.Info("SyncFrom completed", zap.Duration("duration", time.Since(syncStart)))
logger.Debug("SyncFrom completed", zap.Duration("duration", time.Since(syncStart)))
}()
set := hg.getPhaseSet(shardKey, phaseSet)
@ -512,6 +507,12 @@ func (hg *HypergraphCRDT) SyncFrom(
return nil, errors.New("unsupported phase set")
}
// For global prover sync, capture pre-sync state to detect changes
var preSyncRoot []byte
if isGlobalProver {
preSyncRoot = set.GetTree().Commit(false)
}
shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:])
coveredPrefix := hg.getCoveredPrefix()
@ -522,7 +523,7 @@ func (hg *HypergraphCRDT) SyncFrom(
}
if syncPoint == nil || len(syncPoint.Commitment) == 0 {
logger.Info("server has no data at sync point")
logger.Debug("server has no data at sync point")
// Return current root even if no data was synced
root := set.GetTree().Commit(false)
return root, nil
@ -536,10 +537,18 @@ func (hg *HypergraphCRDT) SyncFrom(
// Step 3: Recompute commitment so future syncs see updated state
root := set.GetTree().Commit(false)
logger.Info(
"hypergraph root commit after sync",
zap.String("root", hex.EncodeToString(root)),
)
// For global prover, only log if sync didn't converge (the interesting case)
if isGlobalProver && !bytes.Equal(root, expectedRoot) {
logger.Warn(
"global prover sync did not converge",
zap.String("phase", phaseSet.String()),
zap.String("pre_sync_root", hex.EncodeToString(preSyncRoot)),
zap.String("post_sync_root", hex.EncodeToString(root)),
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
zap.Bool("root_changed", !bytes.Equal(preSyncRoot, root)),
)
}
return root, nil
}
@ -675,12 +684,24 @@ func (hg *HypergraphCRDT) syncSubtree(
// If commitments match, subtrees are identical
if bytes.Equal(localCommitment, serverBranch.Commitment) {
logger.Debug("subtree matches",
zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))),
)
return nil
}
// Log divergence for global prover sync
isGlobalProver := isGlobalProverShardBytes(shardKey)
if isGlobalProver {
logger.Info("global prover sync: commitment divergence",
zap.String("phase", phaseSet.String()),
zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))),
zap.Int("path_depth", len(serverBranch.FullPath)),
zap.String("local_commitment", hex.EncodeToString(localCommitment)),
zap.String("server_commitment", hex.EncodeToString(serverBranch.Commitment)),
zap.Bool("local_has_data", localNode != nil),
zap.Int("server_children", len(serverBranch.Children)),
zap.Bool("server_is_leaf", serverBranch.IsLeaf),
)
}
// If server node is a leaf or has no children, fetch all leaves
if serverBranch.IsLeaf || len(serverBranch.Children) == 0 {
return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger)
@ -689,10 +710,6 @@ func (hg *HypergraphCRDT) syncSubtree(
// If we have NO local data at this path, fetch all leaves directly.
// This avoids N round trips for N children when we need all of them anyway.
if localNode == nil {
logger.Debug("no local data at path, fetching all leaves directly",
zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))),
zap.Int("serverChildren", len(serverBranch.Children)),
)
return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger)
}

View File

@ -165,6 +165,20 @@ func (p *GlobalLeaderProvider) ProveNextState(
)
}
// Collect messages and rebuild shard commitments now that we've acquired
// the proving mutex and validated the prior frame. This prevents race
// conditions where a subsequent OnRankChange would overwrite collectedMessages
// and shardCommitments while we're still proving.
_, err = p.engine.livenessProvider.Collect(
ctx,
prior.Header.FrameNumber+1,
rank,
)
if err != nil {
frameProvingTotal.WithLabelValues("error").Inc()
return nil, models.NewNoVoteErrorf("could not collect: %+v", err)
}
timer := prometheus.NewTimer(frameProvingDuration)
defer timer.ObserveDuration()

View File

@ -1662,12 +1662,35 @@ func (e *GlobalConsensusEngine) materialize(
zap.String("local_root", hex.EncodeToString(localProverRoot)),
)
// Perform blocking hypersync before continuing
result := e.performBlockingProverHypersync(
_ = e.performBlockingProverHypersync(
proposer,
expectedProverRoot,
)
if result != nil {
updatedProverRoot = result
// Re-compute local prover root after sync to verify convergence
newLocalRoot, newRootErr := e.computeLocalProverRoot(frameNumber)
if newRootErr != nil {
e.logger.Warn(
"failed to compute local prover root after sync",
zap.Uint64("frame_number", frameNumber),
zap.Error(newRootErr),
)
} else {
updatedProverRoot = newLocalRoot
if !bytes.Equal(newLocalRoot, expectedProverRoot) {
e.logger.Warn(
"prover root still mismatched after sync - convergence failed",
zap.Uint64("frame_number", frameNumber),
zap.String("expected_root", hex.EncodeToString(expectedProverRoot)),
zap.String("post_sync_local_root", hex.EncodeToString(newLocalRoot)),
)
} else {
e.logger.Info(
"prover root converged after sync",
zap.Uint64("frame_number", frameNumber),
zap.String("root", hex.EncodeToString(newLocalRoot)),
)
}
}
}
}
@ -4195,7 +4218,7 @@ func (e *GlobalConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) {
frameProvingTotal.WithLabelValues("error").Inc()
return
}
prior, err := e.clockStore.GetGlobalClockFrameCandidate(
_, err = e.clockStore.GetGlobalClockFrameCandidate(
qc.FrameNumber,
[]byte(qc.Identity()),
)
@ -4204,14 +4227,9 @@ func (e *GlobalConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) {
frameProvingTotal.WithLabelValues("error").Inc()
return
}
_, err = e.livenessProvider.Collect(
context.TODO(),
prior.Header.FrameNumber+1,
newRank,
)
if err != nil {
return
}
// Note: Collect is called in ProveNextState after tryBeginProvingRank succeeds
// to avoid race conditions where a subsequent OnRankChange overwrites
// collectedMessages and shardCommitments while ProveNextState is still running
}
func (e *GlobalConsensusEngine) rebuildShardCommitments(