From 4629c50a75e4ed9413b74d8f41cf5d3dedef7452 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Sun, 28 Dec 2025 16:40:06 -0600 Subject: [PATCH] remove pruning from sync --- hypergraph/sync.go | 457 ++----------------------------------------- node/store/pebble.go | 20 ++ 2 files changed, 39 insertions(+), 438 deletions(-) diff --git a/hypergraph/sync.go b/hypergraph/sync.go index b09818c..dea9226 100644 --- a/hypergraph/sync.go +++ b/hypergraph/sync.go @@ -563,18 +563,10 @@ func (s *streamManager) updateActivity() { s.lastSent = time.Now() } -type vertexTreeDeleter interface { - DeleteVertexTree( - txn tries.TreeBackingStoreTransaction, - id []byte, - ) error -} - const ( leafAckMinTimeout = 30 * time.Second leafAckMaxTimeout = 10 * time.Minute leafAckPerLeafBudget = 20 * time.Millisecond // Generous budget for tree building overhead - pruneTxnChunk = 100 // Session-level timeouts maxSyncSessionDuration = 15 * time.Minute // Maximum total time for a sync session @@ -1275,399 +1267,6 @@ func (s *streamManager) handleLeafData( return nil } -func (s *streamManager) deleteVertexTreeIfNeeded( - txn tries.TreeBackingStoreTransaction, - atom hypergraph.Atom, - key []byte, -) error { - if atom == nil || atom.GetAtomType() != hypergraph.VertexAtomType { - return nil - } - - deleter, ok := s.hypergraphStore.(vertexTreeDeleter) - if !ok { - return nil - } - - return deleter.DeleteVertexTree(txn, key) -} - -// handleLeafDataWithMerge receives leaf data from the server and merges it with -// local data. -// 1. Collects all local leaf keys at the path first -// 2. Receives and adds all remote leaves, tracking which keys were received -// 3. Only after successful receipt, prunes local keys that weren't in the -// remote set -// -// This ensures data is never lost due to mid-sync failures or incomplete -// transfers. -func (s *streamManager) handleLeafDataWithMerge( - path []int32, - incomingLeaves <-chan *protobufs.HypergraphComparison, -) (err error) { - start := time.Now() - pathHex := hex.EncodeToString(packPath(path)) - var expectedLeaves uint64 - s.logger.Debug( - "handle leaf data with merge start", - zap.String("path", pathHex), - ) - defer func() { - s.logger.Debug( - "handle leaf data with merge finished", - zap.String("path", pathHex), - zap.Uint64("leaves_expected", expectedLeaves), - zap.Duration("duration", time.Since(start)), - zap.Error(err), - ) - }() - - // Collect all local leaf keys at this path - intPath := make([]int, len(path)) - for i, nib := range path { - intPath[i] = int(nib) - } - - localKeys := make(map[string]*tries.LazyVectorCommitmentLeafNode) - node, err := s.localTree.GetByPath(intPath) - if err != nil && !strings.Contains(err.Error(), "item not found") { - return errors.Wrap(err, "handle leaf data with merge") - } - if node != nil { - if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok { - localKeys[string(leaf.Key)] = leaf - } else { - gathered := tries.GetAllLeaves( - s.localTree.SetType, - s.localTree.PhaseType, - s.localTree.ShardKey, - node, - ) - for _, leaf := range gathered { - if leaf != nil { - localKeys[string(leaf.Key)] = leaf - } - } - } - } - - s.logger.Debug( - "collected local keys for merge", - zap.String("path", pathHex), - zap.Int("local_key_count", len(localKeys)), - ) - - // Receive metadata with expected leaf count - select { - case <-s.ctx.Done(): - return errors.Wrap( - errors.New("context canceled"), - "handle leaf data with merge", - ) - case msg, ok := <-incomingLeaves: - if !ok { - return errors.Wrap( - errors.New("channel closed"), - "handle leaf data with merge", - ) - } - - switch msg.Payload.(type) { - case *protobufs.HypergraphComparison_LeafData: - return errors.Wrap( - errors.New("invalid message"), - "handle leaf data with merge", - ) - case *protobufs.HypergraphComparison_Metadata: - expectedLeaves = msg.GetMetadata().Leaves - } - case <-time.After(30 * time.Second): - return errors.Wrap( - errors.New("timed out"), - "handle leaf data with merge", - ) - } - - // Receive all leaves and add them, tracking received keys - receivedKeys := make(map[string]struct{}, expectedLeaves) - var txn tries.TreeBackingStoreTransaction - - for i := uint64(0); i < expectedLeaves; i++ { - if i%100 == 0 { - if txn != nil { - if err := txn.Commit(); err != nil { - return errors.Wrap(err, "handle leaf data with merge") - } - } - txn, err = s.hypergraphStore.NewTransaction(false) - if err != nil { - return errors.Wrap(err, "handle leaf data with merge") - } - } - select { - case <-s.ctx.Done(): - if txn != nil { - txn.Abort() - } - return errors.Wrap( - errors.New("context canceled"), - "handle leaf data with merge", - ) - case msg, ok := <-incomingLeaves: - if !ok { - if txn != nil { - txn.Abort() - } - return errors.Wrap( - errors.New("channel closed"), - "handle leaf data with merge", - ) - } - - var remoteUpdate *protobufs.LeafData - switch msg.Payload.(type) { - case *protobufs.HypergraphComparison_Metadata: - if txn != nil { - txn.Abort() - } - return errors.Wrap( - errors.New("invalid message"), - "handle leaf data with merge", - ) - case *protobufs.HypergraphComparison_LeafData: - remoteUpdate = msg.GetLeafData() - } - - // Track this key as received from server - receivedKeys[string(remoteUpdate.Key)] = struct{}{} - - theirs := AtomFromBytes(remoteUpdate.Value) - if err := s.persistLeafTree(txn, remoteUpdate); err != nil { - txn.Abort() - return err - } - - if err := s.localSet.Add(txn, theirs); err != nil { - s.logger.Error("error while saving", zap.Error(err)) - txn.Abort() - return errors.Wrap(err, "handle leaf data with merge") - } - case <-time.After(30 * time.Second): - if txn != nil { - txn.Abort() - } - return errors.Wrap( - errors.New("timed out"), - "handle leaf data with merge", - ) - } - } - - if txn != nil { - if err := txn.Commit(); err != nil { - return errors.Wrap(err, "handle leaf data with merge") - } - txn = nil - } - - // Prune local keys that weren't in the received set - // Only do this AFTER successfully receiving all remote data - var prunedCount uint64 - for keyStr, leaf := range localKeys { - if _, received := receivedKeys[keyStr]; !received { - // This key exists locally but was not sent by server - prune it - if txn == nil { - txn, err = s.hypergraphStore.NewTransaction(false) - if err != nil { - return errors.Wrap(err, "handle leaf data with merge") - } - } else if prunedCount%pruneTxnChunk == 0 { - if err := txn.Commit(); err != nil { - return errors.Wrap(err, "handle leaf data with merge") - } - txn, err = s.hypergraphStore.NewTransaction(false) - if err != nil { - return errors.Wrap(err, "handle leaf data with merge") - } - } - - atom := AtomFromBytes(leaf.Value) - if atom == nil { - if txn != nil { - txn.Abort() - } - return errors.Wrap( - errors.New("invalid atom payload"), - "handle leaf data with merge", - ) - } - - if err := s.localSet.Delete(txn, atom); err != nil { - if txn != nil { - txn.Abort() - } - return errors.Wrap(err, "handle leaf data with merge") - } - - if err := s.deleteVertexTreeIfNeeded(txn, atom, leaf.Key); err != nil { - if txn != nil { - txn.Abort() - } - return errors.Wrap(err, "handle leaf data with merge") - } - - prunedCount++ - } - } - - if txn != nil { - if err := txn.Commit(); err != nil { - return errors.Wrap(err, "handle leaf data with merge") - } - } - - s.logger.Info( - "merge complete", - zap.String("path", pathHex), - zap.Uint64("received", expectedLeaves), - zap.Int("local_before", len(localKeys)), - zap.Uint64("pruned", prunedCount), - ) - - // Send acknowledgment - if err := s.stream.Send(&protobufs.HypergraphComparison{ - Payload: &protobufs.HypergraphComparison_Metadata{ - Metadata: &protobufs.HypersyncMetadata{Leaves: expectedLeaves}, - }, - }); err != nil { - return err - } - - return nil -} - -func (s *streamManager) pruneLocalSubtree(path []int32) (uint64, error) { - start := time.Now() - pathHex := hex.EncodeToString(packPath(path)) - s.logger.Info( - "CLIENT: pruning subtree", - zap.String("path", pathHex), - ) - - intPath := make([]int, len(path)) - for i, nib := range path { - intPath[i] = int(nib) - } - - node, err := s.localTree.GetByPath(intPath) - if err != nil { - // "item not found" means the tree is empty at this path - nothing to prune - if strings.Contains(err.Error(), "item not found") { - s.logger.Debug( - "CLIENT: prune skipped, item not found", - zap.String("path", pathHex), - ) - return 0, nil - } - return 0, errors.Wrap(err, "prune local subtree") - } - - if node == nil { - s.logger.Debug( - "CLIENT: prune skipped, node missing", - zap.String("path", pathHex), - ) - return 0, nil - } - - leaves := []*tries.LazyVectorCommitmentLeafNode{} - if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok { - leaves = append(leaves, leaf) - } else { - gathered := tries.GetAllLeaves( - s.localTree.SetType, - s.localTree.PhaseType, - s.localTree.ShardKey, - node, - ) - for _, leaf := range gathered { - if leaf == nil { - continue - } - leaves = append(leaves, leaf) - } - } - - if len(leaves) == 0 { - s.logger.Debug( - "CLIENT: prune skipped, no leaves", - zap.String("path", pathHex), - ) - return 0, nil - } - - var txn tries.TreeBackingStoreTransaction - var pruned uint64 - - commitTxn := func() error { - if txn == nil { - return nil - } - if err := txn.Commit(); err != nil { - txn.Abort() - return err - } - txn = nil - return nil - } - - for idx, leaf := range leaves { - if idx%pruneTxnChunk == 0 { - if err := commitTxn(); err != nil { - return pruned, errors.Wrap(err, "prune local subtree") - } - txn, err = s.hypergraphStore.NewTransaction(false) - if err != nil { - return pruned, errors.Wrap(err, "prune local subtree") - } - } - - atom := AtomFromBytes(leaf.Value) - if atom == nil { - txn.Abort() - return pruned, errors.Wrap( - errors.New("invalid atom payload"), - "prune local subtree", - ) - } - - if err := s.localSet.Delete(txn, atom); err != nil { - txn.Abort() - return pruned, errors.Wrap(err, "prune local subtree") - } - - if err := s.deleteVertexTreeIfNeeded(txn, atom, leaf.Key); err != nil { - txn.Abort() - return pruned, errors.Wrap(err, "prune local subtree") - } - - pruned++ - } - - if err := commitTxn(); err != nil { - return pruned, errors.Wrap(err, "prune local subtree") - } - - s.logger.Info( - "CLIENT: pruned local subtree", - zap.String("path", pathHex), - zap.Uint64("leaf_count", pruned), - zap.Duration("duration", time.Since(start)), - ) - - return pruned, nil -} - func (s *streamManager) persistLeafTree( txn tries.TreeBackingStoreTransaction, update *protobufs.LeafData, @@ -1888,7 +1487,7 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } else { // Merge remote data with local, pruning only what server doesn't have - err := s.handleLeafDataWithMerge(path, incomingLeaves) + err := s.handleLeafData(incomingLeaves) return errors.Wrap(err, "walk") } } @@ -1903,7 +1502,7 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } else { // Merge remote data with local, pruning only what server doesn't have - err := s.handleLeafDataWithMerge(path, incomingLeaves) + err := s.handleLeafData(incomingLeaves) return errors.Wrap(err, "walk") } } @@ -1955,9 +1554,9 @@ func (s *streamManager) walk( ) return errors.Wrap(err, "walk") } else { - // Client has data at lpref that server doesn't have - prune it - _, err := s.pruneLocalSubtree(lpref) - return errors.Wrap(err, "walk") + // Client has data at lpref that server doesn't have + // Skip - pruning happens after sync completes + return nil } } } @@ -2009,7 +1608,7 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } else { // Merge server data with local at preTraversal path - err := s.handleLeafDataWithMerge(preTraversal, incomingLeaves) + err := s.handleLeafData(incomingLeaves) return errors.Wrap(err, "walk") } } @@ -2024,12 +1623,9 @@ func (s *streamManager) walk( ); err != nil { return errors.Wrap(err, "walk") } - } else { - // Client has extra data that server doesn't have - prune it - if _, err := s.pruneLocalSubtree(missingPath); err != nil { - return errors.Wrap(err, "walk") - } } + // Client has extra data that server doesn't have + // Skip - pruning happens after sync completes } } // If no child matched the nibble, the local tree doesn't extend @@ -2048,7 +1644,7 @@ func (s *streamManager) walk( } } else { // Client receives and merges data from server at preTraversal path - if err := s.handleLeafDataWithMerge(preTraversal, incomingLeaves); err != nil { + if err := s.handleLeafData(incomingLeaves); err != nil { return errors.Wrap(err, "walk") } } @@ -2097,11 +1693,11 @@ func (s *streamManager) walk( // s.logger.Info("branch divergence", pathString) if lchild != nil && rchild == nil { // Local has a child that remote doesn't have - nextPath := append( - append([]int32{}, lpref...), - lchild.Index, - ) if isServer { + nextPath := append( + append([]int32{}, lpref...), + lchild.Index, + ) // Server has data client doesn't - send it if err := s.sendLeafData( nextPath, @@ -2109,13 +1705,9 @@ func (s *streamManager) walk( ); err != nil { return errors.Wrap(err, "walk") } - } else { - // Client has data server doesn't - prune it directly - // (no protocol exchange needed, server has nothing to send) - if _, err := s.pruneLocalSubtree(nextPath); err != nil { - return errors.Wrap(err, "walk") - } } + // Client has data server doesn't + // Skip - pruning happens after sync completes } if rchild != nil && lchild == nil { // Remote has a child that local doesn't have @@ -2147,20 +1739,9 @@ func (s *streamManager) walk( ) if err != nil { // s.logger.Debug("incomplete branch descension", zap.Error(err)) - if isServer { - if err := s.sendLeafData( - nextPath, - incomingLeaves, - ); err != nil { - return errors.Wrap(err, "walk") - } - } else { - // Server will send data, merge with local - if err := s.handleLeafDataWithMerge(nextPath, incomingLeaves); err != nil { - return errors.Wrap(err, "walk") - } - } - continue + // Don't try to merge/prune on error - the connection may have failed + // and we don't want to delete local data based on incomplete info + return errors.Wrap(err, "walk") } if err = s.walk( @@ -2191,7 +1772,7 @@ func (s *streamManager) walk( } } else { // Merge server data with local, pruning only what server doesn't have - err := s.handleLeafDataWithMerge(path, incomingLeaves) + err := s.handleLeafData(incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } diff --git a/node/store/pebble.go b/node/store/pebble.go index e78cb76..594e451 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -72,6 +72,10 @@ var pebbleMigrations = []func(*pebble.Batch) error{ migration_2_1_0_183, migration_2_1_0_184, migration_2_1_0_185, + migration_2_1_0_186, + migration_2_1_0_187, + migration_2_1_0_188, + migration_2_1_0_189, } func NewPebbleDB( @@ -798,6 +802,22 @@ func migration_2_1_0_184(b *pebble.Batch) error { } func migration_2_1_0_185(b *pebble.Batch) error { + return nil +} + +func migration_2_1_0_186(b *pebble.Batch) error { + return nil +} + +func migration_2_1_0_187(b *pebble.Batch) error { + return nil +} + +func migration_2_1_0_188(b *pebble.Batch) error { + return nil +} + +func migration_2_1_0_189(b *pebble.Batch) error { return migration_2_1_0_18(b) }