retain descension fix, remove backpressure

This commit is contained in:
Cassandra Heart 2025-03-05 17:20:38 -06:00
parent f8a9616f76
commit 12aafe55e8
No known key found for this signature in database
GPG Key ID: 6352152859385958

View File

@ -69,13 +69,11 @@ func NewHypergraphComparisonServer(
// node at the given path in the local tree and sends it over the stream.
func sendLeafData(
stream HyperStream,
backpressure chan struct{},
hypergraphStore store.HypergraphStore,
localTree *crypto.VectorCommitmentTree,
path []int32,
metadataOnly bool,
) error {
backpressure <- struct{}{}
send := func(leaf *crypto.VectorCommitmentLeafNode) error {
update := &protobufs.LeafData{
Key: leaf.Key,
@ -394,7 +392,6 @@ func walk(
logger *zap.Logger,
path []int32,
lnode, rnode *protobufs.HypergraphComparisonResponse,
backpressure chan struct{},
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
stream HyperStream,
@ -420,7 +417,6 @@ func walk(
logger.Info("leaves mismatch commitments, sending", pathString)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -434,7 +430,6 @@ func walk(
logger.Info("leaf/branch mismatch at path", pathString)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -483,7 +478,6 @@ func walk(
logger.Info("traversal could not reach path, sending leaf data")
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -499,7 +493,6 @@ func walk(
path,
lnode,
rtrav,
backpressure,
incomingQueries,
incomingResponses,
stream,
@ -537,7 +530,6 @@ func walk(
logger.Info("traversal could not reach path, sending leaf data")
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -559,7 +551,6 @@ func walk(
)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
append(append([]int32{}, preTraversal...), child.Index),
@ -575,7 +566,6 @@ func walk(
path,
ltrav,
rnode,
backpressure,
incomingQueries,
incomingResponses,
stream,
@ -612,7 +602,6 @@ func walk(
logger.Info("branch divergence", pathString)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -635,7 +624,6 @@ func walk(
logger.Info("incomplete branch descension, sending leaves")
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
nextPath,
@ -650,7 +638,6 @@ func walk(
nextPath,
lc,
rc,
backpressure,
incomingQueries,
incomingResponses,
stream,
@ -667,7 +654,6 @@ func walk(
logger.Info("prefix mismatch on both sides", pathString)
sendLeafData(
stream,
backpressure,
hypergraphStore,
localTree,
path,
@ -754,7 +740,6 @@ func syncTreeBidirectionallyServer(
)
}
backpressure := make(chan struct{}, 1000)
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming")
incomingResponsesIn, incomingResponsesOut :=
@ -784,7 +769,6 @@ func syncTreeBidirectionallyServer(
}
switch m := msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
go func() { <-backpressure }()
incomingLeavesIn <- m.LeafData
case *protobufs.HypergraphComparison_Query:
incomingQueriesIn <- m.Query
@ -805,7 +789,6 @@ func syncTreeBidirectionallyServer(
[]int32{},
branchInfo,
response,
backpressure,
incomingQueriesOut,
incomingResponsesOut,
stream,
@ -962,7 +945,6 @@ func SyncTreeBidirectionally(
return err
}
backpressure := make(chan struct{}, 1000)
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming")
incomingResponsesIn, incomingResponsesOut :=
@ -990,7 +972,6 @@ func SyncTreeBidirectionally(
}
switch m := msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
go func() { <-backpressure }()
incomingLeavesIn <- m.LeafData
case *protobufs.HypergraphComparison_Query:
incomingQueriesIn <- m.Query
@ -1010,7 +991,6 @@ func SyncTreeBidirectionally(
[]int32{},
branchInfo,
response,
backpressure,
incomingQueriesOut,
incomingResponsesOut,
stream,