resolve edge case on incomplete descension, add backpressure

This commit is contained in:
Cassandra Heart 2025-03-05 02:11:27 -06:00
parent 5d6138fe0c
commit 83073423c0
No known key found for this signature in database
GPG Key ID: 6352152859385958

View File

@ -69,11 +69,13 @@ func NewHypergraphComparisonServer(
// node at the given path in the local tree and sends it over the stream.
func sendLeafData(
stream HyperStream,
backpressure chan struct{},
hypergraphStore store.HypergraphStore,
localTree *crypto.VectorCommitmentTree,
path []int32,
metadataOnly bool,
) error {
backpressure <- struct{}{}
send := func(leaf *crypto.VectorCommitmentLeafNode) error {
update := &protobufs.LeafData{
Key: leaf.Key,
@ -392,6 +394,7 @@ func walk(
logger *zap.Logger,
path []int32,
lnode, rnode *protobufs.HypergraphComparisonResponse,
backpressure chan struct{},
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
stream HyperStream,
@ -417,6 +420,7 @@ func walk(
logger.Info("leaves mismatch commitments, sending", pathString)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -430,6 +434,7 @@ func walk(
logger.Info("leaf/branch mismatch at path", pathString)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -478,6 +483,7 @@ func walk(
logger.Info("traversal could not reach path, sending leaf data")
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -493,6 +499,7 @@ func walk(
path,
lnode,
rtrav,
backpressure,
incomingQueries,
incomingResponses,
stream,
@ -530,6 +537,7 @@ func walk(
logger.Info("traversal could not reach path, sending leaf data")
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -551,6 +559,7 @@ func walk(
)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
append(append([]int32{}, preTraversal...), child.Index),
@ -566,6 +575,7 @@ func walk(
path,
ltrav,
rnode,
backpressure,
incomingQueries,
incomingResponses,
stream,
@ -600,7 +610,14 @@ func walk(
if (lchild != nil && rchild == nil) ||
(lchild == nil && rchild != nil) {
logger.Info("branch divergence", pathString)
sendLeafData(stream, hypergraphStore, localTree, path, metadataOnly)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
metadataOnly,
)
} else {
if lchild != nil {
nextPath := append(
@ -618,12 +635,13 @@ func walk(
logger.Info("incomplete branch descension, sending leaves")
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
nextPath,
metadataOnly,
)
return nil
continue
}
if err = walk(
@ -632,6 +650,7 @@ func walk(
nextPath,
lc,
rc,
backpressure,
incomingQueries,
incomingResponses,
stream,
@ -646,7 +665,14 @@ func walk(
}
} else {
logger.Info("prefix mismatch on both sides", pathString)
sendLeafData(stream, hypergraphStore, localTree, path, metadataOnly)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
metadataOnly,
)
}
}
@ -728,6 +754,7 @@ func syncTreeBidirectionallyServer(
)
}
backpressure := make(chan struct{}, 1000)
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming")
incomingResponsesIn, incomingResponsesOut :=
@ -740,6 +767,7 @@ func syncTreeBidirectionallyServer(
msg, err := stream.Recv()
if err == io.EOF {
logger.Info("received disconnect")
close(backpressure)
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
@ -747,6 +775,7 @@ func syncTreeBidirectionallyServer(
}
if err != nil {
logger.Info("received error", zap.Error(err))
close(backpressure)
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
@ -757,6 +786,7 @@ func syncTreeBidirectionallyServer(
}
switch m := msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
go func() { <-backpressure }()
incomingLeavesIn <- m.LeafData
case *protobufs.HypergraphComparison_Query:
incomingQueriesIn <- m.Query
@ -777,6 +807,7 @@ func syncTreeBidirectionallyServer(
[]int32{},
branchInfo,
response,
backpressure,
incomingQueriesOut,
incomingResponsesOut,
stream,
@ -933,6 +964,7 @@ func SyncTreeBidirectionally(
return err
}
backpressure := make(chan struct{}, 1000)
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming")
incomingResponsesIn, incomingResponsesOut :=
@ -944,12 +976,14 @@ func SyncTreeBidirectionally(
for {
msg, err := stream.Recv()
if err == io.EOF {
close(backpressure)
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
return
}
if err != nil {
close(backpressure)
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
@ -960,6 +994,7 @@ func SyncTreeBidirectionally(
}
switch m := msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
go func() { <-backpressure }()
incomingLeavesIn <- m.LeafData
case *protobufs.HypergraphComparison_Query:
incomingQueriesIn <- m.Query
@ -979,6 +1014,7 @@ func SyncTreeBidirectionally(
[]int32{},
branchInfo,
response,
backpressure,
incomingQueriesOut,
incomingResponsesOut,
stream,