From 690eb654b3cb72da32be0583bd3cd4777502936d Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 20 Feb 2025 00:24:53 -0600 Subject: [PATCH] tighten up sending behaviors --- node/rpc/hypergraph_sync_rpc_server.go | 144 +++++++++++++++++-------- 1 file changed, 99 insertions(+), 45 deletions(-) diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index 5afd543..d8e209b 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -43,11 +43,17 @@ func NewHypergraphComparisonServer( func sendLeafData( stream protobufs.HypergraphComparisonService_HyperStreamClient, hypergraphStore store.HypergraphStore, + sent map[string]struct{}, localTree *crypto.VectorCommitmentTree, path []int32, metadataOnly bool, ) error { send := func(leaf *crypto.VectorCommitmentLeafNode) error { + if _, ok := sent[string(leaf.Key)]; ok { + return nil + } + sent[string(leaf.Key)] = struct{}{} + update := &protobufs.LeafData{ Key: leaf.Key, Value: leaf.Value, @@ -207,11 +213,16 @@ func isLeaf(info *protobufs.HypergraphComparisonResponse) bool { func sendLeafDataServer( stream protobufs.HypergraphComparisonService_HyperStreamServer, hypergraphStore store.HypergraphStore, + sent map[string]struct{}, localTree *crypto.VectorCommitmentTree, path []int32, metadataOnly bool, ) error { send := func(leaf *crypto.VectorCommitmentLeafNode) error { + if _, ok := sent[string(leaf.Key)]; ok { + return nil + } + sent[string(leaf.Key)] = struct{}{} update := &protobufs.LeafData{ Key: leaf.Key, Value: leaf.Value, @@ -321,10 +332,13 @@ func syncTreeBidirectionallyServer( return err } - pendingIn, pendingOut := UnboundedChan[[]int32]() + requested := map[string]struct{}{} + sent := map[string]struct{}{} + inserts := []*protobufs.LeafData{} + pendingIn, pendingOut := UnboundedChan[[]int32]("server pending") pendingIn <- rootPath - incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]() + incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]("server incoming") go func() { for { msg, err := stream.Recv() @@ -336,10 +350,15 @@ func syncTreeBidirectionallyServer( close(incomingIn) return } + if msg == nil { + continue + } + incomingIn <- *msg } }() +outer: for { select { case path := <-pendingOut: @@ -363,7 +382,7 @@ func syncTreeBidirectionallyServer( case msg, ok := <-incomingOut: if !ok { - return nil + break outer } switch payload := msg.Payload.(type) { @@ -385,18 +404,22 @@ func syncTreeBidirectionallyServer( hex.EncodeToString(packNibbles(remoteInfo.Path)), ), ) - missingQuery := &protobufs.HypergraphComparison{ - Payload: &protobufs.HypergraphComparison_Query{ - Query: &protobufs.HypergraphComparisonQuery{ - ShardKey: query.ShardKey, - PhaseSet: query.PhaseSet, - Path: remoteInfo.Path, - IncludeLeafData: true, + + if _, ok := requested[string(packNibbles(remoteInfo.Path))]; !ok { + requested[string(packNibbles(remoteInfo.Path))] = struct{}{} + missingQuery := &protobufs.HypergraphComparison{ + Payload: &protobufs.HypergraphComparison_Query{ + Query: &protobufs.HypergraphComparisonQuery{ + ShardKey: query.ShardKey, + PhaseSet: query.PhaseSet, + Path: remoteInfo.Path, + IncludeLeafData: true, + }, }, - }, - } - if err := stream.Send(missingQuery); err != nil { - return err + } + if err := stream.Send(missingQuery); err != nil { + break outer + } } // Do not queue children for a missing node. continue @@ -422,11 +445,12 @@ func syncTreeBidirectionallyServer( if err := sendLeafDataServer( stream, localHypergraphStore, + sent, idSet.GetTree(), remoteInfo.Path, metadataOnly, ); err != nil { - return err + break outer } } else { for _, remoteChild := range remoteInfo.Children { @@ -477,11 +501,12 @@ func syncTreeBidirectionallyServer( if err := sendLeafDataServer( stream, localHypergraphStore, + sent, idSet.GetTree(), queryPath, metadataOnly, ); err != nil { - return err + break outer } } else { logger.Info( @@ -501,7 +526,7 @@ func syncTreeBidirectionallyServer( }, } if err := stream.Send(resp); err != nil { - return err + break outer } } case *protobufs.HypergraphComparison_LeafData: @@ -538,13 +563,20 @@ func syncTreeBidirectionallyServer( return err } } - idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value)) + + inserts = append(inserts, remoteUpdate) } case <-time.After(5 * time.Second): logger.Info("server timed out") - return nil + break outer } } + + for _, remoteUpdate := range inserts { + idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value)) + } + + return nil } // HyperStream is the gRPC method that handles bidirectional synchronization. @@ -605,10 +637,13 @@ func SyncTreeBidirectionally( return err } - pendingIn, pendingOut := UnboundedChan[[]int32]() + requested := map[string]struct{}{} + sent := map[string]struct{}{} + inserts := []*protobufs.LeafData{} + pendingIn, pendingOut := UnboundedChan[[]int32]("client pending") pendingIn <- []int32{} - incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]() + incomingIn, incomingOut := UnboundedChan[protobufs.HypergraphComparison]("client incoming") go func() { for { msg, err := stream.Recv() @@ -620,10 +655,14 @@ func SyncTreeBidirectionally( close(incomingIn) return } + if msg == nil { + continue + } incomingIn <- *msg } }() +outer: for { select { case path := <-pendingOut: @@ -640,11 +679,11 @@ func SyncTreeBidirectionally( }, } if err := stream.Send(queryMsg); err != nil { - return err + break outer } case msg, ok := <-incomingOut: if !ok { - return nil + break outer } switch payload := msg.Payload.(type) { case *protobufs.HypergraphComparison_Response: @@ -662,16 +701,19 @@ func SyncTreeBidirectionally( hex.EncodeToString(packNibbles(remoteInfo.Path)), ), ) - missingQuery := &protobufs.HypergraphComparison{ - Payload: &protobufs.HypergraphComparison_Query{ - Query: &protobufs.HypergraphComparisonQuery{ - Path: remoteInfo.Path, - IncludeLeafData: true, + if _, ok := requested[string(packNibbles(remoteInfo.Path))]; !ok { + requested[string(packNibbles(remoteInfo.Path))] = struct{}{} + missingQuery := &protobufs.HypergraphComparison{ + Payload: &protobufs.HypergraphComparison_Query{ + Query: &protobufs.HypergraphComparisonQuery{ + Path: remoteInfo.Path, + IncludeLeafData: true, + }, }, - }, - } - if err := stream.Send(missingQuery); err != nil { - return err + } + if err := stream.Send(missingQuery); err != nil { + break outer + } } continue } @@ -695,11 +737,12 @@ func SyncTreeBidirectionally( if err := sendLeafData( stream, hypergraphStore, + sent, localTree, remoteInfo.Path, metadataOnly, ); err != nil { - return err + break outer } } else { for _, remoteChild := range remoteInfo.Children { @@ -731,7 +774,6 @@ func SyncTreeBidirectionally( append([]int32(nil), remoteInfo.Path...), remoteChild.Index, ) - pendingIn <- newPath } } @@ -750,6 +792,7 @@ func SyncTreeBidirectionally( if err := sendLeafData( stream, hypergraphStore, + sent, localTree, queryPath, metadataOnly, @@ -774,7 +817,7 @@ func SyncTreeBidirectionally( }, } if err := stream.Send(resp); err != nil { - return err + break outer } } case *protobufs.HypergraphComparison_LeafData: @@ -786,7 +829,6 @@ func SyncTreeBidirectionally( ), ) remoteUpdate := payload.LeafData - size := new(big.Int).SetBytes(remoteUpdate.Size) if len(remoteUpdate.UnderlyingData) != 0 { txn, err := hypergraphStore.NewTransaction(false) if err != nil { @@ -813,21 +855,29 @@ func SyncTreeBidirectionally( } } - localTree.Insert( - remoteUpdate.Key, - remoteUpdate.Value, - remoteUpdate.HashTarget, - size, - ) + inserts = append(inserts, remoteUpdate) } case <-time.After(5 * time.Second): logger.Info("timed out") - return nil + break outer } } + + for _, remoteUpdate := range inserts { + size := new(big.Int).SetBytes(remoteUpdate.Size) + + localTree.Insert( + remoteUpdate.Key, + remoteUpdate.Value, + remoteUpdate.HashTarget, + size, + ) + } + + return nil } -func UnboundedChan[T any]() (chan<- T, <-chan T) { +func UnboundedChan[T any](purpose string) (chan<- T, <-chan T) { in := make(chan T) out := make(chan T) go func() { @@ -840,7 +890,11 @@ func UnboundedChan[T any]() (chan<- T, <-chan T) { next = queue[0] } select { - case msg := <-in: + case msg, ok := <-in: + if !ok { + return + } + queue = append(queue, msg) case active <- next: queue = queue[1:]