remove pruning from sync

This commit is contained in:
Cassandra Heart 2025-12-28 16:40:06 -06:00
parent 96066fee33
commit 4629c50a75
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
2 changed files with 39 additions and 438 deletions

View File

@ -563,18 +563,10 @@ func (s *streamManager) updateActivity() {
s.lastSent = time.Now()
}
type vertexTreeDeleter interface {
DeleteVertexTree(
txn tries.TreeBackingStoreTransaction,
id []byte,
) error
}
const (
leafAckMinTimeout = 30 * time.Second
leafAckMaxTimeout = 10 * time.Minute
leafAckPerLeafBudget = 20 * time.Millisecond // Generous budget for tree building overhead
pruneTxnChunk = 100
// Session-level timeouts
maxSyncSessionDuration = 15 * time.Minute // Maximum total time for a sync session
@ -1275,399 +1267,6 @@ func (s *streamManager) handleLeafData(
return nil
}
func (s *streamManager) deleteVertexTreeIfNeeded(
txn tries.TreeBackingStoreTransaction,
atom hypergraph.Atom,
key []byte,
) error {
if atom == nil || atom.GetAtomType() != hypergraph.VertexAtomType {
return nil
}
deleter, ok := s.hypergraphStore.(vertexTreeDeleter)
if !ok {
return nil
}
return deleter.DeleteVertexTree(txn, key)
}
// handleLeafDataWithMerge receives leaf data from the server and merges it with
// local data.
// 1. Collects all local leaf keys at the path first
// 2. Receives and adds all remote leaves, tracking which keys were received
// 3. Only after successful receipt, prunes local keys that weren't in the
// remote set
//
// This ensures data is never lost due to mid-sync failures or incomplete
// transfers.
func (s *streamManager) handleLeafDataWithMerge(
path []int32,
incomingLeaves <-chan *protobufs.HypergraphComparison,
) (err error) {
start := time.Now()
pathHex := hex.EncodeToString(packPath(path))
var expectedLeaves uint64
s.logger.Debug(
"handle leaf data with merge start",
zap.String("path", pathHex),
)
defer func() {
s.logger.Debug(
"handle leaf data with merge finished",
zap.String("path", pathHex),
zap.Uint64("leaves_expected", expectedLeaves),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
)
}()
// Collect all local leaf keys at this path
intPath := make([]int, len(path))
for i, nib := range path {
intPath[i] = int(nib)
}
localKeys := make(map[string]*tries.LazyVectorCommitmentLeafNode)
node, err := s.localTree.GetByPath(intPath)
if err != nil && !strings.Contains(err.Error(), "item not found") {
return errors.Wrap(err, "handle leaf data with merge")
}
if node != nil {
if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok {
localKeys[string(leaf.Key)] = leaf
} else {
gathered := tries.GetAllLeaves(
s.localTree.SetType,
s.localTree.PhaseType,
s.localTree.ShardKey,
node,
)
for _, leaf := range gathered {
if leaf != nil {
localKeys[string(leaf.Key)] = leaf
}
}
}
}
s.logger.Debug(
"collected local keys for merge",
zap.String("path", pathHex),
zap.Int("local_key_count", len(localKeys)),
)
// Receive metadata with expected leaf count
select {
case <-s.ctx.Done():
return errors.Wrap(
errors.New("context canceled"),
"handle leaf data with merge",
)
case msg, ok := <-incomingLeaves:
if !ok {
return errors.Wrap(
errors.New("channel closed"),
"handle leaf data with merge",
)
}
switch msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
return errors.Wrap(
errors.New("invalid message"),
"handle leaf data with merge",
)
case *protobufs.HypergraphComparison_Metadata:
expectedLeaves = msg.GetMetadata().Leaves
}
case <-time.After(30 * time.Second):
return errors.Wrap(
errors.New("timed out"),
"handle leaf data with merge",
)
}
// Receive all leaves and add them, tracking received keys
receivedKeys := make(map[string]struct{}, expectedLeaves)
var txn tries.TreeBackingStoreTransaction
for i := uint64(0); i < expectedLeaves; i++ {
if i%100 == 0 {
if txn != nil {
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "handle leaf data with merge")
}
}
txn, err = s.hypergraphStore.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "handle leaf data with merge")
}
}
select {
case <-s.ctx.Done():
if txn != nil {
txn.Abort()
}
return errors.Wrap(
errors.New("context canceled"),
"handle leaf data with merge",
)
case msg, ok := <-incomingLeaves:
if !ok {
if txn != nil {
txn.Abort()
}
return errors.Wrap(
errors.New("channel closed"),
"handle leaf data with merge",
)
}
var remoteUpdate *protobufs.LeafData
switch msg.Payload.(type) {
case *protobufs.HypergraphComparison_Metadata:
if txn != nil {
txn.Abort()
}
return errors.Wrap(
errors.New("invalid message"),
"handle leaf data with merge",
)
case *protobufs.HypergraphComparison_LeafData:
remoteUpdate = msg.GetLeafData()
}
// Track this key as received from server
receivedKeys[string(remoteUpdate.Key)] = struct{}{}
theirs := AtomFromBytes(remoteUpdate.Value)
if err := s.persistLeafTree(txn, remoteUpdate); err != nil {
txn.Abort()
return err
}
if err := s.localSet.Add(txn, theirs); err != nil {
s.logger.Error("error while saving", zap.Error(err))
txn.Abort()
return errors.Wrap(err, "handle leaf data with merge")
}
case <-time.After(30 * time.Second):
if txn != nil {
txn.Abort()
}
return errors.Wrap(
errors.New("timed out"),
"handle leaf data with merge",
)
}
}
if txn != nil {
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "handle leaf data with merge")
}
txn = nil
}
// Prune local keys that weren't in the received set
// Only do this AFTER successfully receiving all remote data
var prunedCount uint64
for keyStr, leaf := range localKeys {
if _, received := receivedKeys[keyStr]; !received {
// This key exists locally but was not sent by server - prune it
if txn == nil {
txn, err = s.hypergraphStore.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "handle leaf data with merge")
}
} else if prunedCount%pruneTxnChunk == 0 {
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "handle leaf data with merge")
}
txn, err = s.hypergraphStore.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "handle leaf data with merge")
}
}
atom := AtomFromBytes(leaf.Value)
if atom == nil {
if txn != nil {
txn.Abort()
}
return errors.Wrap(
errors.New("invalid atom payload"),
"handle leaf data with merge",
)
}
if err := s.localSet.Delete(txn, atom); err != nil {
if txn != nil {
txn.Abort()
}
return errors.Wrap(err, "handle leaf data with merge")
}
if err := s.deleteVertexTreeIfNeeded(txn, atom, leaf.Key); err != nil {
if txn != nil {
txn.Abort()
}
return errors.Wrap(err, "handle leaf data with merge")
}
prunedCount++
}
}
if txn != nil {
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "handle leaf data with merge")
}
}
s.logger.Info(
"merge complete",
zap.String("path", pathHex),
zap.Uint64("received", expectedLeaves),
zap.Int("local_before", len(localKeys)),
zap.Uint64("pruned", prunedCount),
)
// Send acknowledgment
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: expectedLeaves},
},
}); err != nil {
return err
}
return nil
}
func (s *streamManager) pruneLocalSubtree(path []int32) (uint64, error) {
start := time.Now()
pathHex := hex.EncodeToString(packPath(path))
s.logger.Info(
"CLIENT: pruning subtree",
zap.String("path", pathHex),
)
intPath := make([]int, len(path))
for i, nib := range path {
intPath[i] = int(nib)
}
node, err := s.localTree.GetByPath(intPath)
if err != nil {
// "item not found" means the tree is empty at this path - nothing to prune
if strings.Contains(err.Error(), "item not found") {
s.logger.Debug(
"CLIENT: prune skipped, item not found",
zap.String("path", pathHex),
)
return 0, nil
}
return 0, errors.Wrap(err, "prune local subtree")
}
if node == nil {
s.logger.Debug(
"CLIENT: prune skipped, node missing",
zap.String("path", pathHex),
)
return 0, nil
}
leaves := []*tries.LazyVectorCommitmentLeafNode{}
if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok {
leaves = append(leaves, leaf)
} else {
gathered := tries.GetAllLeaves(
s.localTree.SetType,
s.localTree.PhaseType,
s.localTree.ShardKey,
node,
)
for _, leaf := range gathered {
if leaf == nil {
continue
}
leaves = append(leaves, leaf)
}
}
if len(leaves) == 0 {
s.logger.Debug(
"CLIENT: prune skipped, no leaves",
zap.String("path", pathHex),
)
return 0, nil
}
var txn tries.TreeBackingStoreTransaction
var pruned uint64
commitTxn := func() error {
if txn == nil {
return nil
}
if err := txn.Commit(); err != nil {
txn.Abort()
return err
}
txn = nil
return nil
}
for idx, leaf := range leaves {
if idx%pruneTxnChunk == 0 {
if err := commitTxn(); err != nil {
return pruned, errors.Wrap(err, "prune local subtree")
}
txn, err = s.hypergraphStore.NewTransaction(false)
if err != nil {
return pruned, errors.Wrap(err, "prune local subtree")
}
}
atom := AtomFromBytes(leaf.Value)
if atom == nil {
txn.Abort()
return pruned, errors.Wrap(
errors.New("invalid atom payload"),
"prune local subtree",
)
}
if err := s.localSet.Delete(txn, atom); err != nil {
txn.Abort()
return pruned, errors.Wrap(err, "prune local subtree")
}
if err := s.deleteVertexTreeIfNeeded(txn, atom, leaf.Key); err != nil {
txn.Abort()
return pruned, errors.Wrap(err, "prune local subtree")
}
pruned++
}
if err := commitTxn(); err != nil {
return pruned, errors.Wrap(err, "prune local subtree")
}
s.logger.Info(
"CLIENT: pruned local subtree",
zap.String("path", pathHex),
zap.Uint64("leaf_count", pruned),
zap.Duration("duration", time.Since(start)),
)
return pruned, nil
}
func (s *streamManager) persistLeafTree(
txn tries.TreeBackingStoreTransaction,
update *protobufs.LeafData,
@ -1888,7 +1487,7 @@ func (s *streamManager) walk(
return errors.Wrap(err, "walk")
} else {
// Merge remote data with local, pruning only what server doesn't have
err := s.handleLeafDataWithMerge(path, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
@ -1903,7 +1502,7 @@ func (s *streamManager) walk(
return errors.Wrap(err, "walk")
} else {
// Merge remote data with local, pruning only what server doesn't have
err := s.handleLeafDataWithMerge(path, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
@ -1955,9 +1554,9 @@ func (s *streamManager) walk(
)
return errors.Wrap(err, "walk")
} else {
// Client has data at lpref that server doesn't have - prune it
_, err := s.pruneLocalSubtree(lpref)
return errors.Wrap(err, "walk")
// Client has data at lpref that server doesn't have
// Skip - pruning happens after sync completes
return nil
}
}
}
@ -2009,7 +1608,7 @@ func (s *streamManager) walk(
return errors.Wrap(err, "walk")
} else {
// Merge server data with local at preTraversal path
err := s.handleLeafDataWithMerge(preTraversal, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
@ -2024,12 +1623,9 @@ func (s *streamManager) walk(
); err != nil {
return errors.Wrap(err, "walk")
}
} else {
// Client has extra data that server doesn't have - prune it
if _, err := s.pruneLocalSubtree(missingPath); err != nil {
return errors.Wrap(err, "walk")
}
}
// Client has extra data that server doesn't have
// Skip - pruning happens after sync completes
}
}
// If no child matched the nibble, the local tree doesn't extend
@ -2048,7 +1644,7 @@ func (s *streamManager) walk(
}
} else {
// Client receives and merges data from server at preTraversal path
if err := s.handleLeafDataWithMerge(preTraversal, incomingLeaves); err != nil {
if err := s.handleLeafData(incomingLeaves); err != nil {
return errors.Wrap(err, "walk")
}
}
@ -2097,11 +1693,11 @@ func (s *streamManager) walk(
// s.logger.Info("branch divergence", pathString)
if lchild != nil && rchild == nil {
// Local has a child that remote doesn't have
nextPath := append(
append([]int32{}, lpref...),
lchild.Index,
)
if isServer {
nextPath := append(
append([]int32{}, lpref...),
lchild.Index,
)
// Server has data client doesn't - send it
if err := s.sendLeafData(
nextPath,
@ -2109,13 +1705,9 @@ func (s *streamManager) walk(
); err != nil {
return errors.Wrap(err, "walk")
}
} else {
// Client has data server doesn't - prune it directly
// (no protocol exchange needed, server has nothing to send)
if _, err := s.pruneLocalSubtree(nextPath); err != nil {
return errors.Wrap(err, "walk")
}
}
// Client has data server doesn't
// Skip - pruning happens after sync completes
}
if rchild != nil && lchild == nil {
// Remote has a child that local doesn't have
@ -2147,20 +1739,9 @@ func (s *streamManager) walk(
)
if err != nil {
// s.logger.Debug("incomplete branch descension", zap.Error(err))
if isServer {
if err := s.sendLeafData(
nextPath,
incomingLeaves,
); err != nil {
return errors.Wrap(err, "walk")
}
} else {
// Server will send data, merge with local
if err := s.handleLeafDataWithMerge(nextPath, incomingLeaves); err != nil {
return errors.Wrap(err, "walk")
}
}
continue
// Don't try to merge/prune on error - the connection may have failed
// and we don't want to delete local data based on incomplete info
return errors.Wrap(err, "walk")
}
if err = s.walk(
@ -2191,7 +1772,7 @@ func (s *streamManager) walk(
}
} else {
// Merge server data with local, pruning only what server doesn't have
err := s.handleLeafDataWithMerge(path, incomingLeaves)
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}

View File

@ -72,6 +72,10 @@ var pebbleMigrations = []func(*pebble.Batch) error{
migration_2_1_0_183,
migration_2_1_0_184,
migration_2_1_0_185,
migration_2_1_0_186,
migration_2_1_0_187,
migration_2_1_0_188,
migration_2_1_0_189,
}
func NewPebbleDB(
@ -798,6 +802,22 @@ func migration_2_1_0_184(b *pebble.Batch) error {
}
func migration_2_1_0_185(b *pebble.Batch) error {
return nil
}
func migration_2_1_0_186(b *pebble.Batch) error {
return nil
}
func migration_2_1_0_187(b *pebble.Batch) error {
return nil
}
func migration_2_1_0_188(b *pebble.Batch) error {
return nil
}
func migration_2_1_0_189(b *pebble.Batch) error {
return migration_2_1_0_18(b)
}