From 24282dd6908b566fb0488505b87d807c88ddf85d Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Fri, 14 Feb 2025 23:28:31 -0600 Subject: [PATCH] additional logging for hypersync --- .../token/token_execution_engine.go | 12 +- node/rpc/hypergraph_sync_rpc_server.go | 204 +++++++++++++++++- node/rpc/hypergraph_sync_rpc_server_test.go | 6 +- 3 files changed, 213 insertions(+), 9 deletions(-) diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index a265bd4..a045858 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -390,6 +390,7 @@ func NewTokenExecutionEngine( ) e.grpcServers = append(e.grpcServers[:0:0], syncServer) hyperSync := rpc.NewHypergraphComparisonServer( + e.logger, e.hypergraphStore, e.hypergraph, ) @@ -575,7 +576,7 @@ func (e *TokenExecutionEngine) addBatchToHypergraph(batchKey [][]byte, batchValu } func (e *TokenExecutionEngine) hyperSync() { - peer, err := e.pubSub.GetRandomPeer( + peerId, err := e.pubSub.GetRandomPeer( append([]byte{0x00}, e.intrinsicFilter...), ) if err != nil { @@ -583,12 +584,16 @@ func (e *TokenExecutionEngine) hyperSync() { return } + e.logger.Info( + "syncing hypergraph with peer", + zap.String("peer", peer.ID(peerId).String()), + ) syncTimeout := e.engineConfig.SyncTimeout dialCtx, cancelDial := context.WithTimeout(e.ctx, syncTimeout) defer cancelDial() - cc, err := e.pubSub.GetDirectChannel(dialCtx, peer, "hypersync") + cc, err := e.pubSub.GetDirectChannel(dialCtx, peerId, "hypersync") if err != nil { - e.logger.Debug( + e.logger.Info( "could not establish direct channel", zap.Error(err), ) @@ -612,6 +617,7 @@ func (e *TokenExecutionEngine) hyperSync() { for key, set := range sets { err := rpc.SyncTreeBidirectionally( stream, + e.logger, append(append([]byte{}, key.L1[:]...), key.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, e.hypergraphStore, diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index 5fbfa50..b9bc278 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -3,12 +3,14 @@ package rpc import ( "bytes" "encoding/gob" + "encoding/hex" "fmt" "io" "math/big" "time" "github.com/pkg/errors" + "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/node/crypto" hypergraph "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" @@ -19,15 +21,18 @@ import ( type hypergraphComparisonServer struct { protobufs.UnimplementedHypergraphComparisonServiceServer + logger *zap.Logger localHypergraphStore store.HypergraphStore localHypergraph *hypergraph.Hypergraph } func NewHypergraphComparisonServer( + logger *zap.Logger, hypergraphStore store.HypergraphStore, hypergraph *hypergraph.Hypergraph, ) *hypergraphComparisonServer { return &hypergraphComparisonServer{ + logger: logger, localHypergraphStore: hypergraphStore, localHypergraph: hypergraph, } @@ -257,11 +262,11 @@ func sendLeafDataServer( // and queues further queries as differences are detected. func syncTreeBidirectionallyServer( stream protobufs.HypergraphComparisonService_HyperStreamServer, + logger *zap.Logger, localHypergraphStore store.HypergraphStore, localHypergraph *hypergraph.Hypergraph, metadataOnly bool, ) error { - // Client initializes by sending a Query. msg, err := stream.Recv() if err != nil { return err @@ -270,6 +275,11 @@ func syncTreeBidirectionallyServer( if query == nil { return errors.New("client did not send valid initialization message") } + logger.Info( + "received initialization message", + zap.String("shard_key", hex.EncodeToString(query.ShardKey)), + zap.Int("phase_set", int(query.PhaseSet)), + ) // Lookup our local phase set. var phaseSet map[hypergraph.ShardKey]*hypergraph.IdSet @@ -333,6 +343,10 @@ func syncTreeBidirectionallyServer( for { select { case path := <-pendingQueries: + logger.Info( + "server sending comparison query", + zap.String("path", hex.EncodeToString(packNibbles(path))), + ) queryMsg := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Query{ Query: &protobufs.HypergraphComparisonQuery{ @@ -355,11 +369,22 @@ func syncTreeBidirectionallyServer( case *protobufs.HypergraphComparison_Response: remoteInfo := payload.Response + logger.Info( + "server handling response", + zap.String("path", hex.EncodeToString(packNibbles(remoteInfo.Path))), + ) localInfo, err := getBranchInfoFromTree( idSet.GetTree(), remoteInfo.Path, ) if err != nil { + logger.Info( + "server requesting missing node", + zap.String( + "path", + hex.EncodeToString(packNibbles(remoteInfo.Path)), + ), + ) missingQuery := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Query{ Query: &protobufs.HypergraphComparisonQuery{ @@ -378,7 +403,22 @@ func syncTreeBidirectionallyServer( } if !equalBytes(localInfo.Commitment, remoteInfo.Commitment) { + logger.Info( + "server mismatching commitment at path", + zap.String( + "path", + hex.EncodeToString(packNibbles(remoteInfo.Path)), + ), + zap.String("commitment", hex.EncodeToString(remoteInfo.Commitment)), + ) if isLeaf(remoteInfo) { + logger.Info( + "server sending leaf info", + zap.String( + "path", + hex.EncodeToString(packNibbles(remoteInfo.Path)), + ), + ) if err := sendLeafDataServer( stream, localHypergraphStore, @@ -398,6 +438,22 @@ func syncTreeBidirectionallyServer( } } if !equalBytes(localChildCommit, remoteChild.Commitment) { + logger.Info( + "found mismatching child commitment, enqueueing", + zap.String( + "path", + hex.EncodeToString(packNibbles(remoteInfo.Path)), + ), + zap.Int32("child_index", remoteChild.Index), + zap.String( + "local_commitment", + hex.EncodeToString(localChildCommit), + ), + zap.String( + "remote_commitment", + hex.EncodeToString(remoteChild.Commitment), + ), + ) newPath := append( append([]int32(nil), remoteInfo.Path...), remoteChild.Index, @@ -409,6 +465,13 @@ func syncTreeBidirectionallyServer( } case *protobufs.HypergraphComparison_Query: queryPath := payload.Query.Path + logger.Info( + "server received query for leaves", + zap.String( + "path", + hex.EncodeToString(packNibbles(queryPath)), + ), + ) if payload.Query.IncludeLeafData { if err := sendLeafDataServer( stream, @@ -420,6 +483,13 @@ func syncTreeBidirectionallyServer( return err } } else { + logger.Info( + "server received query for branches", + zap.String( + "path", + hex.EncodeToString(packNibbles(queryPath)), + ), + ) branchInfo, err := getBranchInfoFromTree(idSet.GetTree(), queryPath) if err != nil { continue @@ -435,6 +505,13 @@ func syncTreeBidirectionallyServer( } case *protobufs.HypergraphComparison_LeafData: remoteUpdate := payload.LeafData + logger.Info( + "received leaf data", + zap.String( + "key", + hex.EncodeToString(payload.LeafData.Key), + ), + ) if len(remoteUpdate.UnderlyingData) != 0 { txn, err := localHypergraphStore.NewTransaction(false) if err != nil { @@ -446,16 +523,24 @@ func syncTreeBidirectionallyServer( dec := gob.NewDecoder(&b) if err := dec.Decode(tree); err != nil { + txn.Abort() return err } err = localHypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree) if err != nil { + txn.Abort() + return err + } + + if err = txn.Commit(); err != nil { + txn.Abort() return err } } idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value)) } case <-time.After(5 * time.Second): + logger.Info("server timed out") return nil } } @@ -467,6 +552,7 @@ func (s *hypergraphComparisonServer) HyperStream( ) error { return syncTreeBidirectionallyServer( stream, + s.logger, s.localHypergraphStore, s.localHypergraph, false, @@ -479,13 +565,18 @@ func (s *hypergraphComparisonServer) HyperStream( // their local trees are synchronized. func SyncTreeBidirectionally( stream protobufs.HypergraphComparisonService_HyperStreamClient, + logger *zap.Logger, shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, hypergraphStore store.HypergraphStore, localTree *crypto.VectorCommitmentTree, metadataOnly bool, ) error { - // Send initialization. + logger.Info( + "sending initialization message", + zap.String("shard_key", hex.EncodeToString(shardKey)), + zap.Int("phase_set", int(phaseSet)), + ) if err := stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Query{ Query: &protobufs.HypergraphComparisonQuery{ @@ -535,6 +626,10 @@ func SyncTreeBidirectionally( for { select { case path := <-pendingQueries: + logger.Info( + "sending comparison query", + zap.String("path", hex.EncodeToString(packNibbles(path))), + ) queryMsg := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Query{ Query: &protobufs.HypergraphComparisonQuery{ @@ -553,9 +648,19 @@ func SyncTreeBidirectionally( switch payload := msg.Payload.(type) { case *protobufs.HypergraphComparison_Response: remoteInfo := payload.Response + logger.Info( + "handling response", + zap.String("path", hex.EncodeToString(packNibbles(remoteInfo.Path))), + ) localInfo, err := getBranchInfoFromTree(localTree, remoteInfo.Path) if err != nil { - // Request missing node. + logger.Info( + "requesting missing node", + zap.String( + "path", + hex.EncodeToString(packNibbles(remoteInfo.Path)), + ), + ) missingQuery := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Query{ Query: &protobufs.HypergraphComparisonQuery{ @@ -570,7 +675,22 @@ func SyncTreeBidirectionally( continue } if !equalBytes(localInfo.Commitment, remoteInfo.Commitment) { + logger.Info( + "mismatching commitment at path", + zap.String( + "path", + hex.EncodeToString(packNibbles(remoteInfo.Path)), + ), + zap.String("commitment", hex.EncodeToString(remoteInfo.Commitment)), + ) if isLeaf(remoteInfo) { + logger.Info( + "sending leaf info", + zap.String( + "path", + hex.EncodeToString(packNibbles(remoteInfo.Path)), + ), + ) if err := sendLeafData( stream, hypergraphStore, @@ -590,6 +710,22 @@ func SyncTreeBidirectionally( } } if !equalBytes(localChildCommit, remoteChild.Commitment) { + logger.Info( + "found mismatching child commitment, enqueueing", + zap.String( + "path", + hex.EncodeToString(packNibbles(remoteInfo.Path)), + ), + zap.Int32("child_index", remoteChild.Index), + zap.String( + "local_commitment", + hex.EncodeToString(localChildCommit), + ), + zap.String( + "remote_commitment", + hex.EncodeToString(remoteChild.Commitment), + ), + ) newPath := append( append([]int32(nil), remoteInfo.Path...), remoteChild.Index, @@ -602,6 +738,13 @@ func SyncTreeBidirectionally( case *protobufs.HypergraphComparison_Query: queryPath := payload.Query.Path if payload.Query.IncludeLeafData { + logger.Info( + "received query for leaves", + zap.String( + "path", + hex.EncodeToString(packNibbles(queryPath)), + ), + ) if err := sendLeafData( stream, hypergraphStore, @@ -612,6 +755,13 @@ func SyncTreeBidirectionally( return err } } else { + logger.Info( + "received query for branches", + zap.String( + "path", + hex.EncodeToString(packNibbles(queryPath)), + ), + ) branchInfo, err := getBranchInfoFromTree(localTree, queryPath) if err != nil { continue @@ -626,6 +776,13 @@ func SyncTreeBidirectionally( } } case *protobufs.HypergraphComparison_LeafData: + logger.Info( + "received leaf data", + zap.String( + "key", + hex.EncodeToString(payload.LeafData.Key), + ), + ) remoteUpdate := payload.LeafData size := new(big.Int).SetBytes(remoteUpdate.Size) if len(remoteUpdate.UnderlyingData) != 0 { @@ -639,10 +796,17 @@ func SyncTreeBidirectionally( dec := gob.NewDecoder(&b) if err := dec.Decode(tree); err != nil { + txn.Abort() return err } err = hypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree) if err != nil { + txn.Abort() + return err + } + + if err = txn.Commit(); err != nil { + txn.Abort() return err } } @@ -655,7 +819,41 @@ func SyncTreeBidirectionally( ) } case <-time.After(5 * time.Second): + logger.Info("timed out") return nil } } } + +func packNibbles(values []int32) []byte { + totalBits := len(values) * 6 + out := make([]byte, (totalBits+7)/8) + bitOffset := 0 + + for _, v := range values { + bitsRemaining := 6 + for bitsRemaining > 0 { + byteIndex := bitOffset / 8 + bitPos := bitOffset % 8 + bitsAvailable := 8 - bitPos + n := bitsRemaining + if n > bitsAvailable { + n = bitsAvailable + } + + // From the current 6-bit value, take the top n bits that haven't been + // written. + shift := bitsRemaining - n + bitsToWrite := int((v >> shift) & ((1 << n) - 1)) + + // Place these bits in the current byte. Since we fill each byte from the + // most-significant bit down, shift them to the proper position. + shiftPos := bitsAvailable - n + out[byteIndex] |= byte(bitsToWrite << shiftPos) + + bitOffset += n + bitsRemaining -= n + } + } + return out +} diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index cd5d823..2ec8ab9 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -194,7 +194,7 @@ func TestHypergraphSyncServer(t *testing.T) { grpcServer := grpc.NewServer() protobufs.RegisterHypergraphComparisonServiceServer( grpcServer, - rpc.NewHypergraphComparisonServer(serverHypergraphStore, crdts[0]), + rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0]), ) log.Println("Server listening on :50051") go func() { @@ -212,7 +212,7 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = rpc.SyncTreeBidirectionally(str, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), false) + err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), false) if err != nil { log.Fatalf("Client: failed to sync 1: %v", err) } @@ -231,7 +231,7 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = rpc.SyncTreeBidirectionally(str, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), false) + err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), false) if err != nil { log.Fatalf("Client: failed to sync 2: %v", err) }