adjust sync to handle no leaf edge cases, multi-path segment traversal

This commit is contained in:
Cassandra Heart 2026-01-13 17:30:15 -06:00
parent 910385ec0b
commit c0215d5768
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA

View File

@ -677,6 +677,20 @@ func (s *streamManager) sendLeafData(
}); err != nil {
return err
}
// Wait for ack even when sending 0 leaves
select {
case <-s.ctx.Done():
return s.ctx.Err()
case msg, ok := <-incomingLeaves:
if !ok {
return errors.New("channel closed")
}
if meta := msg.GetMetadata(); meta == nil || meta.Leaves != 0 {
return errors.New("unexpected ack for 0 leaves")
}
case <-time.After(leafAckTimeout(0)):
return errors.New("timeout waiting for ack")
}
return nil
}
@ -694,6 +708,20 @@ func (s *streamManager) sendLeafData(
}); err != nil {
return err
}
// Wait for ack even when sending 0 leaves
select {
case <-s.ctx.Done():
return s.ctx.Err()
case msg, ok := <-incomingLeaves:
if !ok {
return errors.New("channel closed")
}
if meta := msg.GetMetadata(); meta == nil || meta.Leaves != 0 {
return errors.New("unexpected ack for 0 leaves")
}
case <-time.After(leafAckTimeout(0)):
return errors.New("timeout waiting for ack")
}
return nil
}
@ -1536,11 +1564,12 @@ func (s *streamManager) walk(
// )
if len(lpref) > len(rpref) {
// s.logger.Debug("local prefix longer, traversing remote to path", pathString)
traverse := lpref[len(rpref)-1:]
traverse := lpref[len(rpref):]
rtrav := rnode
traversePath := append([]int32{}, rpref...)
for _, nibble := range traverse {
// s.logger.Debug("attempting remote traversal step")
preTraversal := append([]int32{}, traversePath...)
found := false
for _, child := range rtrav.Children {
if child.Index == nibble {
@ -1556,7 +1585,23 @@ func (s *streamManager) walk(
return errors.Wrap(err, "walk")
}
found = true
break
} else {
// Remote has a child that's not on local's traversal path
missingPath := append(append([]int32{}, preTraversal...), child.Index)
if isServer {
// Server has extra data - send it to client
if err := s.sendLeafData(
missingPath,
incomingLeaves,
); err != nil {
return errors.Wrap(err, "walk")
}
} else {
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
}
}
}
@ -1591,7 +1636,7 @@ func (s *streamManager) walk(
)
} else {
// s.logger.Debug("remote prefix longer, traversing local to path", pathString)
traverse := rpref[len(lpref)-1:]
traverse := rpref[len(lpref):]
ltrav := lnode
traversedPath := append([]int32{}, lnode.Path...)