From 7d02708f4f4fde0dc0a14354c552ea3923f191a2 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Tue, 23 Dec 2025 21:00:22 -0600 Subject: [PATCH] resolve sync issue, remove raw sync --- hypergraph/snapshot_manager.go | 15 +- hypergraph/sync.go | 599 +++--------------- node/consensus/app/app_consensus_engine.go | 75 ++- node/consensus/global/event_distributor.go | 4 + .../global/global_consensus_engine.go | 111 +++- node/consensus/provers/prover_registry.go | 13 + node/store/pebble.go | 5 + types/tries/lazy_proof_tree.go | 17 +- 8 files changed, 288 insertions(+), 551 deletions(-) diff --git a/hypergraph/snapshot_manager.go b/hypergraph/snapshot_manager.go index 4ba90a7..269041d 100644 --- a/hypergraph/snapshot_manager.go +++ b/hypergraph/snapshot_manager.go @@ -277,22 +277,23 @@ func (m *snapshotManager) acquire( handle.acquire() return handle } - // Generation exists but no snapshot for this shard yet - m.logger.Info( - "generation matches expected root but no snapshot exists, using latest", + // Generation exists but no snapshot for this shard yet - reject + m.logger.Warn( + "generation matches expected root but no snapshot exists, rejecting sync request", zap.String("expected_root", hex.EncodeToString(expectedRoot)), ) - break + return nil } } - // No matching generation found + // No matching generation found - reject instead of falling back to latest if m.logger != nil { - m.logger.Info( - "no snapshot generation matches expected root, using latest", + m.logger.Warn( + "no snapshot generation matches expected root, rejecting sync request", zap.String("expected_root", hex.EncodeToString(expectedRoot)), zap.String("latest_root", hex.EncodeToString(m.generations[0].root)), ) } + return nil } // Use the latest generation for new snapshots diff --git a/hypergraph/sync.go b/hypergraph/sync.go index 265b3c0..52ac350 100644 --- a/hypergraph/sync.go +++ b/hypergraph/sync.go @@ -563,14 +563,6 @@ func (s *streamManager) updateActivity() { s.lastSent = time.Now() } -type rawVertexSaver interface { - SaveVertexTreeRaw( - txn tries.TreeBackingStoreTransaction, - id []byte, - data []byte, - ) error -} - type vertexTreeDeleter interface { DeleteVertexTree( txn tries.TreeBackingStoreTransaction, @@ -603,488 +595,6 @@ func leafAckTimeout(count uint64) time.Duration { return timeout } -func shouldUseRawSync(phaseSet protobufs.HypergraphPhaseSet) bool { - return phaseSet == protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS -} - -func keyWithinCoveredPrefix(key []byte, prefix []int) bool { - if len(prefix) == 0 { - return true - } - path := tries.GetFullPath(key) - if len(path) < len(prefix) { - return false - } - for i, nib := range prefix { - if path[i] != nib { - return false - } - } - return true -} - -// rawShardSync performs a full raw sync of all leaves from server to client. -// This iterates directly over the database, bypassing in-memory tree caching -// to ensure all leaves are sent even if the in-memory tree is stale. -func (s *streamManager) rawShardSync( - shardKey tries.ShardKey, - phaseSet protobufs.HypergraphPhaseSet, - incomingLeaves <-chan *protobufs.HypergraphComparison, - coveredPrefix []int32, -) error { - shardHex := hex.EncodeToString(shardKey.L2[:]) - s.logger.Info( - "SERVER: starting raw shard sync (direct DB iteration)", - zap.String("shard_key", shardHex), - ) - start := time.Now() - prefix := toIntSlice(coveredPrefix) - - // Determine set and phase type strings - setType := string(hypergraph.VertexAtomType) - phaseType := string(hypergraph.AddsPhaseType) - switch phaseSet { - case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS: - setType = string(hypergraph.VertexAtomType) - phaseType = string(hypergraph.AddsPhaseType) - case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES: - setType = string(hypergraph.VertexAtomType) - phaseType = string(hypergraph.RemovesPhaseType) - case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS: - setType = string(hypergraph.HyperedgeAtomType) - phaseType = string(hypergraph.AddsPhaseType) - case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES: - setType = string(hypergraph.HyperedgeAtomType) - phaseType = string(hypergraph.RemovesPhaseType) - } - - // Get raw leaf iterator from the database - iter, err := s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey) - if err != nil { - s.logger.Error( - "SERVER: failed to create raw leaf iterator", - zap.String("shard_key", shardHex), - zap.Error(err), - ) - return errors.Wrap(err, "raw shard sync") - } - defer iter.Close() - - // First pass: count leaves - var count uint64 - for valid := iter.First(); valid; valid = iter.Next() { - leaf, err := iter.Leaf() - if err != nil { - // Skip non-leaf nodes (branches) - continue - } - if leaf != nil && keyWithinCoveredPrefix(leaf.Key, prefix) { - count++ - } - } - - s.logger.Info( - "SERVER: raw sync sending metadata", - zap.String("shard_key", shardHex), - zap.Uint64("leaf_count", count), - ) - - // Send metadata with leaf count - if err := s.stream.Send(&protobufs.HypergraphComparison{ - Payload: &protobufs.HypergraphComparison_Metadata{ - Metadata: &protobufs.HypersyncMetadata{Leaves: count}, - }, - }); err != nil { - return errors.Wrap(err, "raw shard sync: send metadata") - } - - // Create new iterator for sending (previous one is exhausted) - iter.Close() - iter, err = s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey) - if err != nil { - return errors.Wrap(err, "raw shard sync: recreate iterator") - } - defer iter.Close() - - // Second pass: send leaves - var sent uint64 - for valid := iter.First(); valid; valid = iter.Next() { - select { - case <-s.ctx.Done(): - return s.ctx.Err() - default: - } - - leaf, err := iter.Leaf() - if err != nil { - // Skip non-leaf nodes - continue - } - if leaf == nil { - continue - } - if !keyWithinCoveredPrefix(leaf.Key, prefix) { - continue - } - - update := &protobufs.LeafData{ - Key: leaf.Key, - Value: leaf.Value, - HashTarget: leaf.HashTarget, - Size: leaf.Size, - UnderlyingData: leaf.UnderlyingData, - } - - msg := &protobufs.HypergraphComparison{ - Payload: &protobufs.HypergraphComparison_LeafData{ - LeafData: update, - }, - } - - if err := s.stream.Send(msg); err != nil { - return errors.Wrap(err, "raw shard sync: send leaf") - } - - sent++ - // Update activity periodically to prevent idle timeout - if sent%100 == 0 { - s.updateActivity() - } - if sent%1000 == 0 { - s.logger.Debug( - "SERVER: raw sync progress", - zap.Uint64("sent", sent), - zap.Uint64("total", count), - ) - } - } - - s.logger.Info( - "SERVER: raw sync sent all leaves, waiting for ack", - zap.String("shard_key", shardHex), - zap.Uint64("sent", sent), - ) - - // Wait for acknowledgment - timeoutTimer := time.NewTimer(leafAckTimeout(count)) - defer timeoutTimer.Stop() - - select { - case <-s.ctx.Done(): - return errors.Wrap(s.ctx.Err(), "raw shard sync: wait ack") - case msg, ok := <-incomingLeaves: - if !ok { - return errors.Wrap(errors.New("channel closed"), "raw shard sync: wait ack") - } - meta := msg.GetMetadata() - if meta == nil { - return errors.Wrap(errors.New("expected metadata ack"), "raw shard sync: wait ack") - } - if meta.Leaves != count { - return errors.Wrap( - fmt.Errorf("ack mismatch: expected %d, got %d", count, meta.Leaves), - "raw shard sync: wait ack", - ) - } - case <-timeoutTimer.C: - return errors.Wrap(errors.New("timeout waiting for ack"), "raw shard sync") - } - - s.logger.Info( - "SERVER: raw shard sync completed", - zap.String("shard_key", shardHex), - zap.Uint64("leaves_sent", sent), - zap.Duration("duration", time.Since(start)), - ) - return nil -} - -// receiveRawShardSync receives a full raw sync of all leaves from server. -// It uses tree insertion to properly build the tree structure on the client. -func (s *streamManager) receiveRawShardSync( - incomingLeaves <-chan *protobufs.HypergraphComparison, -) error { - start := time.Now() - s.logger.Info("CLIENT: starting receiveRawShardSync") - - expectedLeaves, err := s.awaitRawLeafMetadata(incomingLeaves) - if err != nil { - s.logger.Error("CLIENT: failed to receive metadata", zap.Error(err)) - return err - } - - s.logger.Info( - "CLIENT: received metadata", - zap.Uint64("expected_leaves", expectedLeaves), - ) - - var txn tries.TreeBackingStoreTransaction - var processed uint64 - seenKeys := make(map[string]struct{}) - for processed < expectedLeaves { - if processed%100 == 0 { - if txn != nil { - if err := txn.Commit(); err != nil { - return errors.Wrap(err, "receive raw shard sync") - } - } - txn, err = s.hypergraphStore.NewTransaction(false) - if err != nil { - return errors.Wrap(err, "receive raw shard sync") - } - } - - leafMsg, err := s.awaitLeafData(incomingLeaves) - if err != nil { - if txn != nil { - txn.Abort() - } - s.logger.Error( - "CLIENT: failed to receive leaf", - zap.Uint64("processed", processed), - zap.Uint64("expected", expectedLeaves), - zap.Error(err), - ) - return err - } - - // Deserialize the atom from the raw value - theirs := AtomFromBytes(leafMsg.Value) - if theirs == nil { - if txn != nil { - txn.Abort() - } - return errors.Wrap( - errors.New("invalid atom"), - "receive raw shard sync", - ) - } - - // Persist underlying vertex tree data if present - if len(leafMsg.UnderlyingData) > 0 { - if saver, ok := s.hypergraphStore.(rawVertexSaver); ok { - if err := saver.SaveVertexTreeRaw( - txn, - leafMsg.Key, - leafMsg.UnderlyingData, - ); err != nil { - txn.Abort() - return errors.Wrap(err, "receive raw shard sync: save vertex tree") - } - } - } - - // Track key so we can prune anything absent from the authoritative list. - seenKeys[string(append([]byte(nil), leafMsg.Key...))] = struct{}{} - - // Use Add to properly build tree structure - if err := s.localSet.Add(txn, theirs); err != nil { - txn.Abort() - return errors.Wrap(err, "receive raw shard sync: add atom") - } - - processed++ - if processed%1000 == 0 { - s.logger.Debug( - "CLIENT: raw sync progress", - zap.Uint64("processed", processed), - zap.Uint64("expected", expectedLeaves), - ) - } - } - - if txn != nil { - if err := txn.Commit(); err != nil { - return errors.Wrap(err, "receive raw shard sync") - } - } - - // Send acknowledgment - if err := s.sendLeafMetadata(expectedLeaves); err != nil { - return errors.Wrap(err, "receive raw shard sync") - } - - if err := s.pruneRawSyncExtras(seenKeys); err != nil { - return errors.Wrap(err, "receive raw shard sync") - } - - s.logger.Info( - "CLIENT: raw shard sync completed", - zap.Uint64("leaves_received", expectedLeaves), - zap.Duration("duration", time.Since(start)), - ) - return nil -} - -func (s *streamManager) pruneRawSyncExtras(seen map[string]struct{}) error { - start := time.Now() - setType := s.localTree.SetType - phaseType := s.localTree.PhaseType - shardKey := s.localTree.ShardKey - - iter, err := s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey) - if err != nil { - return errors.Wrap(err, "prune raw sync extras: iterator") - } - defer iter.Close() - - var txn tries.TreeBackingStoreTransaction - var pruned uint64 - - commitTxn := func() error { - if txn == nil { - return nil - } - if err := txn.Commit(); err != nil { - txn.Abort() - return err - } - txn = nil - return nil - } - - for valid := iter.First(); valid; valid = iter.Next() { - leaf, err := iter.Leaf() - if err != nil || leaf == nil { - continue - } - if _, ok := seen[string(leaf.Key)]; ok { - continue - } - - if txn == nil { - txn, err = s.hypergraphStore.NewTransaction(false) - if err != nil { - return errors.Wrap(err, "prune raw sync extras") - } - } - - atom := AtomFromBytes(leaf.Value) - if atom == nil { - s.logger.Warn("CLIENT: skipping stale leaf with invalid atom", zap.String("key", hex.EncodeToString(leaf.Key))) - continue - } - - if err := s.localSet.Delete(txn, atom); err != nil { - txn.Abort() - return errors.Wrap(err, "prune raw sync extras") - } - if err := s.deleteVertexTreeIfNeeded(txn, atom, leaf.Key); err != nil { - txn.Abort() - return errors.Wrap(err, "prune raw sync extras") - } - - pruned++ - if pruned%pruneTxnChunk == 0 { - if err := commitTxn(); err != nil { - return errors.Wrap(err, "prune raw sync extras") - } - } - } - - if err := commitTxn(); err != nil { - return errors.Wrap(err, "prune raw sync extras") - } - - if pruned > 0 { - s.logger.Info( - "CLIENT: pruned stale leaves after raw sync", - zap.Uint64("count", pruned), - zap.Duration("duration", time.Since(start)), - ) - } else { - s.logger.Info( - "CLIENT: no stale leaves found after raw sync", - zap.Duration("duration", time.Since(start)), - ) - } - - return nil -} - -func (s *streamManager) awaitRawLeafMetadata( - incomingLeaves <-chan *protobufs.HypergraphComparison, -) (uint64, error) { - s.logger.Debug("CLIENT: awaitRawLeafMetadata waiting...") - select { - case <-s.ctx.Done(): - return 0, errors.Wrap( - errors.New("context canceled"), - "await raw leaf metadata", - ) - case msg, ok := <-incomingLeaves: - if !ok { - s.logger.Error("CLIENT: incomingLeaves channel closed") - return 0, errors.Wrap( - errors.New("channel closed"), - "await raw leaf metadata", - ) - } - meta := msg.GetMetadata() - if meta == nil { - s.logger.Error( - "CLIENT: received non-metadata message while waiting for metadata", - zap.String("payload_type", fmt.Sprintf("%T", msg.Payload)), - ) - return 0, errors.Wrap( - errors.New("invalid message: expected metadata"), - "await raw leaf metadata", - ) - } - s.logger.Debug( - "CLIENT: received metadata", - zap.Uint64("leaves", meta.Leaves), - ) - return meta.Leaves, nil - case <-time.After(leafAckTimeout(1)): - s.logger.Error("CLIENT: timeout waiting for metadata") - return 0, errors.Wrap( - errors.New("timed out waiting for metadata"), - "await raw leaf metadata", - ) - } -} - -func (s *streamManager) awaitLeafData( - incomingLeaves <-chan *protobufs.HypergraphComparison, -) (*protobufs.LeafData, error) { - select { - case <-s.ctx.Done(): - return nil, errors.Wrap( - errors.New("context canceled"), - "await leaf data", - ) - case msg, ok := <-incomingLeaves: - if !ok { - return nil, errors.Wrap( - errors.New("channel closed"), - "await leaf data", - ) - } - if leaf := msg.GetLeafData(); leaf != nil { - return leaf, nil - } - return nil, errors.Wrap( - errors.New("invalid message: expected leaf data"), - "await leaf data", - ) - case <-time.After(leafAckTimeout(1)): - return nil, errors.Wrap( - errors.New("timed out waiting for leaf data"), - "await leaf data", - ) - } -} - -func (s *streamManager) sendLeafMetadata(leaves uint64) error { - s.logger.Debug("sending leaf metadata ack", zap.Uint64("leaves", leaves)) - return s.stream.Send(&protobufs.HypergraphComparison{ - Payload: &protobufs.HypergraphComparison_Metadata{ - Metadata: &protobufs.HypersyncMetadata{Leaves: leaves}, - }, - }) -} - // sendLeafData builds a LeafData message (with the full leaf data) for the // node at the given path in the local tree and sends it over the stream. func (s *streamManager) sendLeafData( @@ -1797,6 +1307,14 @@ func (s *streamManager) pruneLocalSubtree(path []int32) (uint64, error) { node, err := s.localTree.GetByPath(intPath) if err != nil { + // "item not found" means the tree is empty at this path - nothing to prune + if strings.Contains(err.Error(), "item not found") { + s.logger.Debug( + "CLIENT: prune skipped, item not found", + zap.String("path", pathHex), + ) + return 0, nil + } return 0, errors.Wrap(err, "prune local subtree") } @@ -1901,20 +1419,13 @@ func (s *streamManager) persistLeafTree( return nil } - needsValidation := s.requiresTreeValidation() - _, canSaveRaw := s.hypergraphStore.(rawVertexSaver) - - var tree *tries.VectorCommitmentTree - var err error - if needsValidation || !canSaveRaw { - tree, err = tries.DeserializeNonLazyTree(update.UnderlyingData) - if err != nil { - s.logger.Error("server returned invalid tree", zap.Error(err)) - return err - } + tree, err := tries.DeserializeNonLazyTree(update.UnderlyingData) + if err != nil { + s.logger.Error("server returned invalid tree", zap.Error(err)) + return err } - if needsValidation { + if s.requiresTreeValidation() { if err := s.localSet.ValidateTree( update.Key, update.Value, @@ -1925,12 +1436,6 @@ func (s *streamManager) persistLeafTree( } } - if saver, ok := s.hypergraphStore.(rawVertexSaver); ok { - buf := make([]byte, len(update.UnderlyingData)) - copy(buf, update.UnderlyingData) - return saver.SaveVertexTreeRaw(txn, update.Key, buf) - } - return s.hypergraphStore.SaveVertexTree(txn, update.Key, tree) } @@ -2115,21 +1620,23 @@ func (s *streamManager) walk( return nil } - // Check if we should use raw sync mode for this phase set - if init && shouldUseRawSync(phaseSet) { - s.logger.Info( - "walk: using raw sync mode", - zap.Bool("is_server", isServer), - zap.Int("phase_set", int(phaseSet)), - ) - if isServer { - return s.rawShardSync(shardKey, phaseSet, incomingLeaves, path) - } - return s.receiveRawShardSync(incomingLeaves) - } - if isLeaf(lnode) && isLeaf(rnode) && !init { - return nil + // Both are leaves with differing commitments - need to sync + // Server sends its leaf, client prunes local and receives server's leaf + if isServer { + err := s.sendLeafData( + path, + incomingLeaves, + ) + return errors.Wrap(err, "walk") + } else { + // Prune local leaf first since it differs from server + if _, err := s.pruneLocalSubtree(path); err != nil { + return errors.Wrap(err, "walk") + } + err := s.handleLeafData(incomingLeaves) + return errors.Wrap(err, "walk") + } } if isLeaf(rnode) || isLeaf(lnode) { @@ -2141,6 +1648,11 @@ func (s *streamManager) walk( ) return errors.Wrap(err, "walk") } else { + // Prune local subtree first - either local is leaf with different data, + // or local is branch with children that server doesn't have + if _, err := s.pruneLocalSubtree(path); err != nil { + return errors.Wrap(err, "walk") + } err := s.handleLeafData(incomingLeaves) return errors.Wrap(err, "walk") } @@ -2157,13 +1669,15 @@ func (s *streamManager) walk( // ) if len(lpref) > len(rpref) { // s.logger.Debug("local prefix longer, traversing remote to path", pathString) - traverse := lpref[len(rpref)-1:] + traverse := lpref[len(rpref):] rtrav := rnode traversePath := append([]int32{}, rpref...) for _, nibble := range traverse { // s.logger.Debug("attempting remote traversal step") + foundMatch := false for _, child := range rtrav.Children { if child.Index == nibble { + foundMatch = true // s.logger.Debug("sending query") traversePath = append(traversePath, child.Index) var err error @@ -2180,7 +1694,9 @@ func (s *streamManager) walk( } } - if rtrav == nil { + // If no child matched or queryNext returned nil, remote doesn't + // have the path that local has + if !foundMatch || rtrav == nil { // s.logger.Debug("traversal could not reach path") if isServer { err := s.sendLeafData( @@ -2209,15 +1725,17 @@ func (s *streamManager) walk( ) } else { // s.logger.Debug("remote prefix longer, traversing local to path", pathString) - traverse := rpref[len(lpref)-1:] + traverse := rpref[len(lpref):] ltrav := lnode traversedPath := append([]int32{}, lnode.Path...) for _, nibble := range traverse { // s.logger.Debug("attempting local traversal step") preTraversal := append([]int32{}, traversedPath...) + foundMatch := false for _, child := range ltrav.Children { if child.Index == nibble { + foundMatch = true traversedPath = append(traversedPath, nibble) var err error // s.logger.Debug("expecting query") @@ -2259,6 +1777,28 @@ func (s *streamManager) walk( } } } + // If no child matched the nibble, the local tree doesn't extend + // to match the remote's deeper prefix. We still need to respond to + // the remote's query with an empty response, then handle the leaf data. + if !foundMatch { + // Respond to remote's pending query with our current path info + // (which will have empty commitment since we don't have the path) + traversedPath = append(traversedPath, nibble) + ltrav, _ = s.handleQueryNext(incomingQueries, traversedPath) + + if isServer { + // Server sends its data since client's tree is shallower + if err := s.sendLeafData(preTraversal, incomingLeaves); err != nil { + return errors.Wrap(err, "walk") + } + } else { + // Client receives data from server + if err := s.handleLeafData(incomingLeaves); err != nil { + return errors.Wrap(err, "walk") + } + } + return nil + } } // s.logger.Debug("traversal completed, performing walk", pathString) return s.walk( @@ -2319,6 +1859,11 @@ func (s *streamManager) walk( } } if rchild != nil { + // Remote has a child that local doesn't have + // - If SERVER: remote (client) has extra data, server has nothing to send + // Client will prune this on their side via lchild != nil case + // - If CLIENT: remote (server) has data we need to receive + // Server sends this via their lchild != nil case if !isServer { err := s.handleLeafData(incomingLeaves) if err != nil { @@ -2380,6 +1925,10 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } } else { + // Prune local data first since prefixes differ + if _, err := s.pruneLocalSubtree(path); err != nil { + return errors.Wrap(err, "walk") + } err := s.handleLeafData(incomingLeaves) if err != nil { return errors.Wrap(err, "walk") diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index adef4d4..3d7ebf7 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -997,7 +997,8 @@ func (e *AppConsensusEngine) handleGlobalProverRoot( ) e.globalProverRootSynced.Store(false) e.globalProverRootVerifiedFrame.Store(0) - e.triggerGlobalHypersync(frame.Header.Prover, expectedProverRoot) + // Use blocking hypersync to ensure we're synced before continuing + e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot) return } @@ -1014,7 +1015,8 @@ func (e *AppConsensusEngine) handleGlobalProverRoot( ) e.globalProverRootSynced.Store(false) e.globalProverRootVerifiedFrame.Store(0) - e.triggerGlobalHypersync(frame.Header.Prover, expectedProverRoot) + // Use blocking hypersync to ensure we're synced before continuing + e.performBlockingGlobalHypersync(frame.Header.Prover, expectedProverRoot) return } @@ -1095,6 +1097,75 @@ func (e *AppConsensusEngine) triggerGlobalHypersync(proposer []byte, expectedRoo }() } +// performBlockingGlobalHypersync performs a synchronous hypersync that blocks +// until completion. This is used before materializing frames to ensure we sync +// before applying any transactions when there's a prover root mismatch. +func (e *AppConsensusEngine) performBlockingGlobalHypersync(proposer []byte, expectedRoot []byte) { + if e.syncProvider == nil { + e.logger.Debug("blocking hypersync: no sync provider") + return + } + if bytes.Equal(proposer, e.proverAddress) { + e.logger.Debug("blocking hypersync: we are the proposer") + return + } + + // Wait for any existing sync to complete first + for e.globalProverSyncInProgress.Load() { + e.logger.Debug("blocking hypersync: waiting for existing sync to complete") + time.Sleep(100 * time.Millisecond) + } + + // Mark sync as in progress + if !e.globalProverSyncInProgress.CompareAndSwap(false, true) { + // Another sync started, wait for it + for e.globalProverSyncInProgress.Load() { + time.Sleep(100 * time.Millisecond) + } + return + } + defer e.globalProverSyncInProgress.Store(false) + + e.logger.Info( + "performing blocking global hypersync before processing frame", + zap.String("proposer", hex.EncodeToString(proposer)), + zap.String("expected_root", hex.EncodeToString(expectedRoot)), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up shutdown handler + done := make(chan struct{}) + go func() { + select { + case <-e.ShutdownSignal(): + cancel() + case <-done: + } + }() + + selfPeerID := peer.ID(e.pubsub.GetPeerID()) + shardKey := tries.ShardKey{ + L1: [3]byte{0x00, 0x00, 0x00}, + L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS, + } + + // Perform sync synchronously (blocking) + e.syncProvider.HyperSyncSelf(ctx, selfPeerID, shardKey, nil, expectedRoot) + close(done) + + if err := e.proverRegistry.Refresh(); err != nil { + e.logger.Warn( + "failed to refresh prover registry after blocking hypersync", + zap.Error(err), + ) + } + + e.globalProverRootSynced.Store(true) + e.logger.Info("blocking global hypersync completed") +} + func (e *AppConsensusEngine) GetFrame() *protobufs.AppShardFrame { frame, _, _ := e.clockStore.GetLatestShardClockFrame(e.appAddress) return frame diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index fef8813..c4703f5 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -118,6 +118,10 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( e.evaluateForProposals(ctx, data, needsProposals) } else { self, effectiveSeniority := e.allocationContext() + // Still reconcile allocations even when all workers appear + // allocated - this clears stale filters that no longer match + // prover allocations on-chain. + e.reconcileWorkerAllocations(data.Frame.Header.FrameNumber, self) e.checkExcessPendingJoins(self, data.Frame.Header.FrameNumber) e.logAllocationStatusOnly(ctx, data, self, effectiveSeniority) } diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 88194ad..61de3e4 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -1625,6 +1625,25 @@ func (e *GlobalConsensusEngine) materialize( return errors.Wrap(err, "materialize") } + // Check prover root BEFORE processing transactions. If there's a mismatch, + // we need to sync first, otherwise we'll apply transactions on top of + // divergent state and then sync will delete the newly added records. + if len(expectedProverRoot) > 0 { + localRoot, localErr := e.computeLocalProverRoot(frameNumber) + if localErr == nil && len(localRoot) > 0 { + if !bytes.Equal(localRoot, expectedProverRoot) { + e.logger.Info( + "prover root mismatch detected before processing frame, syncing first", + zap.Uint64("frame_number", frameNumber), + zap.String("expected_root", hex.EncodeToString(expectedProverRoot)), + zap.String("local_root", hex.EncodeToString(localRoot)), + ) + // Perform blocking hypersync before continuing + e.performBlockingProverHypersync(proposer, expectedProverRoot) + } + } + } + var state state.State state = hgstate.NewHypergraphState(e.hypergraph) @@ -1736,23 +1755,13 @@ func (e *GlobalConsensusEngine) materialize( return errors.Wrap(err, "materialize") } - shouldVerifyRoot := !e.config.Engine.ArchiveMode || e.config.P2P.Network == 99 localProverRoot, localRootErr := e.computeLocalProverRoot(frameNumber) if localRootErr != nil { - logMsg := "failed to compute local prover root" - if shouldVerifyRoot { - e.logger.Warn( - logMsg, - zap.Uint64("frame_number", frameNumber), - zap.Error(localRootErr), - ) - } else { - e.logger.Debug( - logMsg, - zap.Uint64("frame_number", frameNumber), - zap.Error(localRootErr), - ) - } + e.logger.Warn( + "failed to compute local prover root", + zap.Uint64("frame_number", frameNumber), + zap.Error(localRootErr), + ) } // Publish the snapshot generation with the new root so clients can sync @@ -1763,7 +1772,7 @@ func (e *GlobalConsensusEngine) materialize( } } - if len(localProverRoot) > 0 && shouldVerifyRoot { + if len(localProverRoot) > 0 { if e.verifyProverRoot( frameNumber, expectedProverRoot, @@ -1985,7 +1994,77 @@ func (e *GlobalConsensusEngine) triggerProverHypersync(proposer []byte, expected }() } +// performBlockingProverHypersync performs a synchronous hypersync that blocks +// until completion. This is used at the start of materialize to ensure we sync +// before applying any transactions when there's a prover root mismatch. +func (e *GlobalConsensusEngine) performBlockingProverHypersync(proposer []byte, expectedRoot []byte) { + if e.syncProvider == nil || len(proposer) == 0 { + e.logger.Debug("blocking hypersync: no sync provider or proposer") + return + } + if bytes.Equal(proposer, e.getProverAddress()) { + e.logger.Debug("blocking hypersync: we are the proposer") + return + } + + // Wait for any existing sync to complete first + for e.proverSyncInProgress.Load() { + e.logger.Debug("blocking hypersync: waiting for existing sync to complete") + time.Sleep(100 * time.Millisecond) + } + + // Mark sync as in progress + if !e.proverSyncInProgress.CompareAndSwap(false, true) { + // Another sync started, wait for it + for e.proverSyncInProgress.Load() { + time.Sleep(100 * time.Millisecond) + } + return + } + defer e.proverSyncInProgress.Store(false) + + e.logger.Info( + "performing blocking hypersync before processing frame", + zap.String("proposer", hex.EncodeToString(proposer)), + zap.String("expected_root", hex.EncodeToString(expectedRoot)), + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up shutdown handler + done := make(chan struct{}) + go func() { + select { + case <-e.ShutdownSignal(): + cancel() + case <-done: + } + }() + + shardKey := tries.ShardKey{ + L1: [3]byte{0x00, 0x00, 0x00}, + L2: intrinsics.GLOBAL_INTRINSIC_ADDRESS, + } + + // Perform sync synchronously (blocking) + e.syncProvider.HyperSync(ctx, proposer, shardKey, nil, expectedRoot) + close(done) + + if err := e.proverRegistry.Refresh(); err != nil { + e.logger.Warn( + "failed to refresh prover registry after blocking hypersync", + zap.Error(err), + ) + } + + e.logger.Info("blocking hypersync completed") +} + func (e *GlobalConsensusEngine) reconcileLocalWorkerAllocations() { + if e.config.Engine.ArchiveMode { + return + } if e.workerManager == nil || e.proverRegistry == nil { return } diff --git a/node/consensus/provers/prover_registry.go b/node/consensus/provers/prover_registry.go index 0b57933..b6180ba 100644 --- a/node/consensus/provers/prover_registry.go +++ b/node/consensus/provers/prover_registry.go @@ -412,6 +412,19 @@ func (r *ProverRegistry) PruneOrphanJoins(frameNumber uint64) error { return nil } + // Reload prover state from hypergraph to ensure deterministic pruning + // across all nodes regardless of in-memory cache state + r.globalTrie = &tries.RollingFrecencyCritbitTrie{} + r.shardTries = make(map[string]*tries.RollingFrecencyCritbitTrie) + r.proverCache = make(map[string]*consensus.ProverInfo) + r.filterCache = make(map[string][]*consensus.ProverInfo) + r.addressToFilters = make(map[string][]string) + + if err := r.extractGlobalState(); err != nil { + r.logger.Error("failed to reload global state before pruning", zap.Error(err)) + return errors.Wrap(err, "prune orphan joins") + } + cutoff := frameNumber - 760 var prunedAllocations int var prunedProvers int diff --git a/node/store/pebble.go b/node/store/pebble.go index 628c882..b19c453 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -67,6 +67,7 @@ var pebbleMigrations = []func(*pebble.Batch) error{ migration_2_1_0_172, migration_2_1_0_173, migration_2_1_0_18, + migration_2_1_0_181, } func NewPebbleDB( @@ -776,6 +777,10 @@ func migration_2_1_0_18(b *pebble.Batch) error { return nil } +func migration_2_1_0_181(b *pebble.Batch) error { + return migration_2_1_0_18(b) +} + type pebbleSnapshotDB struct { snap *pebble.Snapshot } diff --git a/types/tries/lazy_proof_tree.go b/types/tries/lazy_proof_tree.go index b2de5a8..cb54645 100644 --- a/types/tries/lazy_proof_tree.go +++ b/types/tries/lazy_proof_tree.go @@ -2038,8 +2038,13 @@ func (t *LazyVectorCommitmentTree) Delete( } retNode = nil case 1: + // Identify the child's original path to prevent orphaned storage entries + originalChildPath := slices.Concat(n.FullPrefix, []int{lastChildIndex}) + if childBranch, ok := lastChild.(*LazyVectorCommitmentBranchNode); ok { // Merge this node's prefix with the child's prefix + // Note: We do NOT update FullPrefix because children are stored + // relative to the branch's FullPrefix, and they'd become unreachable mergedPrefix := []int{} mergedPrefix = append(mergedPrefix, n.Prefix...) mergedPrefix = append(mergedPrefix, lastChildIndex) @@ -2048,7 +2053,17 @@ func (t *LazyVectorCommitmentTree) Delete( childBranch.Prefix = mergedPrefix childBranch.Commitment = nil - // Delete this node from storage + // Delete the child from its original path to prevent orphan + _ = t.Store.DeleteNode( + txn, + t.SetType, + t.PhaseType, + t.ShardKey, + generateKeyFromPath(originalChildPath), + originalChildPath, + ) + + // Delete this node (parent) from storage err := t.Store.DeleteNode( txn, t.SetType,