diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index 73e00eb..7c6aeac 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -69,11 +69,13 @@ func NewHypergraphComparisonServer( // node at the given path in the local tree and sends it over the stream. func sendLeafData( stream HyperStream, + backpressure chan struct{}, hypergraphStore store.HypergraphStore, localTree *crypto.VectorCommitmentTree, path []int32, metadataOnly bool, ) error { + backpressure <- struct{}{} send := func(leaf *crypto.VectorCommitmentLeafNode) error { update := &protobufs.LeafData{ Key: leaf.Key, @@ -392,6 +394,7 @@ func walk( logger *zap.Logger, path []int32, lnode, rnode *protobufs.HypergraphComparisonResponse, + backpressure chan struct{}, incomingQueries <-chan *protobufs.HypergraphComparisonQuery, incomingResponses <-chan *protobufs.HypergraphComparisonResponse, stream HyperStream, @@ -417,6 +420,7 @@ func walk( logger.Info("leaves mismatch commitments, sending", pathString) sendLeafData( stream, + backpressure, hypergraphStore, localTree, path, @@ -430,6 +434,7 @@ func walk( logger.Info("leaf/branch mismatch at path", pathString) sendLeafData( stream, + backpressure, hypergraphStore, localTree, path, @@ -478,6 +483,7 @@ func walk( logger.Info("traversal could not reach path, sending leaf data") sendLeafData( stream, + backpressure, hypergraphStore, localTree, path, @@ -493,6 +499,7 @@ func walk( path, lnode, rtrav, + backpressure, incomingQueries, incomingResponses, stream, @@ -530,6 +537,7 @@ func walk( logger.Info("traversal could not reach path, sending leaf data") sendLeafData( stream, + backpressure, hypergraphStore, localTree, path, @@ -551,6 +559,7 @@ func walk( ) sendLeafData( stream, + backpressure, hypergraphStore, localTree, append(append([]int32{}, preTraversal...), child.Index), @@ -566,6 +575,7 @@ func walk( path, ltrav, rnode, + backpressure, incomingQueries, incomingResponses, stream, @@ -600,7 +610,14 @@ func walk( if (lchild != nil && rchild == nil) || (lchild == nil && rchild != nil) { logger.Info("branch divergence", pathString) - sendLeafData(stream, hypergraphStore, localTree, path, metadataOnly) + sendLeafData( + stream, + backpressure, + hypergraphStore, + localTree, + path, + metadataOnly, + ) } else { if lchild != nil { nextPath := append( @@ -618,12 +635,13 @@ func walk( logger.Info("incomplete branch descension, sending leaves") sendLeafData( stream, + backpressure, hypergraphStore, localTree, nextPath, metadataOnly, ) - return nil + continue } if err = walk( @@ -632,6 +650,7 @@ func walk( nextPath, lc, rc, + backpressure, incomingQueries, incomingResponses, stream, @@ -646,7 +665,14 @@ func walk( } } else { logger.Info("prefix mismatch on both sides", pathString) - sendLeafData(stream, hypergraphStore, localTree, path, metadataOnly) + sendLeafData( + stream, + backpressure, + hypergraphStore, + localTree, + path, + metadataOnly, + ) } } @@ -728,6 +754,7 @@ func syncTreeBidirectionallyServer( ) } + backpressure := make(chan struct{}, 1000) incomingQueriesIn, incomingQueriesOut := UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming") incomingResponsesIn, incomingResponsesOut := @@ -740,6 +767,7 @@ func syncTreeBidirectionallyServer( msg, err := stream.Recv() if err == io.EOF { logger.Info("received disconnect") + close(backpressure) close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) @@ -747,6 +775,7 @@ func syncTreeBidirectionallyServer( } if err != nil { logger.Info("received error", zap.Error(err)) + close(backpressure) close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) @@ -757,6 +786,7 @@ func syncTreeBidirectionallyServer( } switch m := msg.Payload.(type) { case *protobufs.HypergraphComparison_LeafData: + go func() { <-backpressure }() incomingLeavesIn <- m.LeafData case *protobufs.HypergraphComparison_Query: incomingQueriesIn <- m.Query @@ -777,6 +807,7 @@ func syncTreeBidirectionallyServer( []int32{}, branchInfo, response, + backpressure, incomingQueriesOut, incomingResponsesOut, stream, @@ -933,6 +964,7 @@ func SyncTreeBidirectionally( return err } + backpressure := make(chan struct{}, 1000) incomingQueriesIn, incomingQueriesOut := UnboundedChan[*protobufs.HypergraphComparisonQuery]("server incoming") incomingResponsesIn, incomingResponsesOut := @@ -944,12 +976,14 @@ func SyncTreeBidirectionally( for { msg, err := stream.Recv() if err == io.EOF { + close(backpressure) close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) return } if err != nil { + close(backpressure) close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) @@ -960,6 +994,7 @@ func SyncTreeBidirectionally( } switch m := msg.Payload.(type) { case *protobufs.HypergraphComparison_LeafData: + go func() { <-backpressure }() incomingLeavesIn <- m.LeafData case *protobufs.HypergraphComparison_Query: incomingQueriesIn <- m.Query @@ -979,6 +1014,7 @@ func SyncTreeBidirectionally( []int32{}, branchInfo, response, + backpressure, incomingQueriesOut, incomingResponsesOut, stream,