diff --git a/hypergraph/sync.go b/hypergraph/sync.go index 21f95f4..ce9abf4 100644 --- a/hypergraph/sync.go +++ b/hypergraph/sync.go @@ -677,6 +677,20 @@ func (s *streamManager) sendLeafData( }); err != nil { return err } + // Wait for ack even when sending 0 leaves + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case msg, ok := <-incomingLeaves: + if !ok { + return errors.New("channel closed") + } + if meta := msg.GetMetadata(); meta == nil || meta.Leaves != 0 { + return errors.New("unexpected ack for 0 leaves") + } + case <-time.After(leafAckTimeout(0)): + return errors.New("timeout waiting for ack") + } return nil } @@ -694,6 +708,20 @@ func (s *streamManager) sendLeafData( }); err != nil { return err } + // Wait for ack even when sending 0 leaves + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case msg, ok := <-incomingLeaves: + if !ok { + return errors.New("channel closed") + } + if meta := msg.GetMetadata(); meta == nil || meta.Leaves != 0 { + return errors.New("unexpected ack for 0 leaves") + } + case <-time.After(leafAckTimeout(0)): + return errors.New("timeout waiting for ack") + } return nil } @@ -1536,11 +1564,12 @@ func (s *streamManager) walk( // ) if len(lpref) > len(rpref) { // s.logger.Debug("local prefix longer, traversing remote to path", pathString) - traverse := lpref[len(rpref)-1:] + traverse := lpref[len(rpref):] rtrav := rnode traversePath := append([]int32{}, rpref...) for _, nibble := range traverse { // s.logger.Debug("attempting remote traversal step") + preTraversal := append([]int32{}, traversePath...) found := false for _, child := range rtrav.Children { if child.Index == nibble { @@ -1556,7 +1585,23 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } found = true - break + } else { + // Remote has a child that's not on local's traversal path + missingPath := append(append([]int32{}, preTraversal...), child.Index) + if isServer { + // Server has extra data - send it to client + if err := s.sendLeafData( + missingPath, + incomingLeaves, + ); err != nil { + return errors.Wrap(err, "walk") + } + } else { + err := s.handleLeafData(incomingLeaves) + if err != nil { + return errors.Wrap(err, "walk") + } + } } } @@ -1591,7 +1636,7 @@ func (s *streamManager) walk( ) } else { // s.logger.Debug("remote prefix longer, traversing local to path", pathString) - traverse := rpref[len(lpref)-1:] + traverse := rpref[len(lpref):] ltrav := lnode traversedPath := append([]int32{}, lnode.Path...)