additional logging for hypersync

This commit is contained in:
Cassandra Heart 2025-02-14 23:28:31 -06:00
parent 7b150e5e0c
commit 24282dd690
No known key found for this signature in database
GPG Key ID: 6352152859385958
3 changed files with 213 additions and 9 deletions

View File

@ -390,6 +390,7 @@ func NewTokenExecutionEngine(
)
e.grpcServers = append(e.grpcServers[:0:0], syncServer)
hyperSync := rpc.NewHypergraphComparisonServer(
e.logger,
e.hypergraphStore,
e.hypergraph,
)
@ -575,7 +576,7 @@ func (e *TokenExecutionEngine) addBatchToHypergraph(batchKey [][]byte, batchValu
}
func (e *TokenExecutionEngine) hyperSync() {
peer, err := e.pubSub.GetRandomPeer(
peerId, err := e.pubSub.GetRandomPeer(
append([]byte{0x00}, e.intrinsicFilter...),
)
if err != nil {
@ -583,12 +584,16 @@ func (e *TokenExecutionEngine) hyperSync() {
return
}
e.logger.Info(
"syncing hypergraph with peer",
zap.String("peer", peer.ID(peerId).String()),
)
syncTimeout := e.engineConfig.SyncTimeout
dialCtx, cancelDial := context.WithTimeout(e.ctx, syncTimeout)
defer cancelDial()
cc, err := e.pubSub.GetDirectChannel(dialCtx, peer, "hypersync")
cc, err := e.pubSub.GetDirectChannel(dialCtx, peerId, "hypersync")
if err != nil {
e.logger.Debug(
e.logger.Info(
"could not establish direct channel",
zap.Error(err),
)
@ -612,6 +617,7 @@ func (e *TokenExecutionEngine) hyperSync() {
for key, set := range sets {
err := rpc.SyncTreeBidirectionally(
stream,
e.logger,
append(append([]byte{}, key.L1[:]...), key.L2[:]...),
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
e.hypergraphStore,

View File

@ -3,12 +3,14 @@ package rpc
import (
"bytes"
"encoding/gob"
"encoding/hex"
"fmt"
"io"
"math/big"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/crypto"
hypergraph "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
@ -19,15 +21,18 @@ import (
type hypergraphComparisonServer struct {
protobufs.UnimplementedHypergraphComparisonServiceServer
logger *zap.Logger
localHypergraphStore store.HypergraphStore
localHypergraph *hypergraph.Hypergraph
}
func NewHypergraphComparisonServer(
logger *zap.Logger,
hypergraphStore store.HypergraphStore,
hypergraph *hypergraph.Hypergraph,
) *hypergraphComparisonServer {
return &hypergraphComparisonServer{
logger: logger,
localHypergraphStore: hypergraphStore,
localHypergraph: hypergraph,
}
@ -257,11 +262,11 @@ func sendLeafDataServer(
// and queues further queries as differences are detected.
func syncTreeBidirectionallyServer(
stream protobufs.HypergraphComparisonService_HyperStreamServer,
logger *zap.Logger,
localHypergraphStore store.HypergraphStore,
localHypergraph *hypergraph.Hypergraph,
metadataOnly bool,
) error {
// Client initializes by sending a Query.
msg, err := stream.Recv()
if err != nil {
return err
@ -270,6 +275,11 @@ func syncTreeBidirectionallyServer(
if query == nil {
return errors.New("client did not send valid initialization message")
}
logger.Info(
"received initialization message",
zap.String("shard_key", hex.EncodeToString(query.ShardKey)),
zap.Int("phase_set", int(query.PhaseSet)),
)
// Lookup our local phase set.
var phaseSet map[hypergraph.ShardKey]*hypergraph.IdSet
@ -333,6 +343,10 @@ func syncTreeBidirectionallyServer(
for {
select {
case path := <-pendingQueries:
logger.Info(
"server sending comparison query",
zap.String("path", hex.EncodeToString(packNibbles(path))),
)
queryMsg := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Query{
Query: &protobufs.HypergraphComparisonQuery{
@ -355,11 +369,22 @@ func syncTreeBidirectionallyServer(
case *protobufs.HypergraphComparison_Response:
remoteInfo := payload.Response
logger.Info(
"server handling response",
zap.String("path", hex.EncodeToString(packNibbles(remoteInfo.Path))),
)
localInfo, err := getBranchInfoFromTree(
idSet.GetTree(),
remoteInfo.Path,
)
if err != nil {
logger.Info(
"server requesting missing node",
zap.String(
"path",
hex.EncodeToString(packNibbles(remoteInfo.Path)),
),
)
missingQuery := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Query{
Query: &protobufs.HypergraphComparisonQuery{
@ -378,7 +403,22 @@ func syncTreeBidirectionallyServer(
}
if !equalBytes(localInfo.Commitment, remoteInfo.Commitment) {
logger.Info(
"server mismatching commitment at path",
zap.String(
"path",
hex.EncodeToString(packNibbles(remoteInfo.Path)),
),
zap.String("commitment", hex.EncodeToString(remoteInfo.Commitment)),
)
if isLeaf(remoteInfo) {
logger.Info(
"server sending leaf info",
zap.String(
"path",
hex.EncodeToString(packNibbles(remoteInfo.Path)),
),
)
if err := sendLeafDataServer(
stream,
localHypergraphStore,
@ -398,6 +438,22 @@ func syncTreeBidirectionallyServer(
}
}
if !equalBytes(localChildCommit, remoteChild.Commitment) {
logger.Info(
"found mismatching child commitment, enqueueing",
zap.String(
"path",
hex.EncodeToString(packNibbles(remoteInfo.Path)),
),
zap.Int32("child_index", remoteChild.Index),
zap.String(
"local_commitment",
hex.EncodeToString(localChildCommit),
),
zap.String(
"remote_commitment",
hex.EncodeToString(remoteChild.Commitment),
),
)
newPath := append(
append([]int32(nil), remoteInfo.Path...),
remoteChild.Index,
@ -409,6 +465,13 @@ func syncTreeBidirectionallyServer(
}
case *protobufs.HypergraphComparison_Query:
queryPath := payload.Query.Path
logger.Info(
"server received query for leaves",
zap.String(
"path",
hex.EncodeToString(packNibbles(queryPath)),
),
)
if payload.Query.IncludeLeafData {
if err := sendLeafDataServer(
stream,
@ -420,6 +483,13 @@ func syncTreeBidirectionallyServer(
return err
}
} else {
logger.Info(
"server received query for branches",
zap.String(
"path",
hex.EncodeToString(packNibbles(queryPath)),
),
)
branchInfo, err := getBranchInfoFromTree(idSet.GetTree(), queryPath)
if err != nil {
continue
@ -435,6 +505,13 @@ func syncTreeBidirectionallyServer(
}
case *protobufs.HypergraphComparison_LeafData:
remoteUpdate := payload.LeafData
logger.Info(
"received leaf data",
zap.String(
"key",
hex.EncodeToString(payload.LeafData.Key),
),
)
if len(remoteUpdate.UnderlyingData) != 0 {
txn, err := localHypergraphStore.NewTransaction(false)
if err != nil {
@ -446,16 +523,24 @@ func syncTreeBidirectionallyServer(
dec := gob.NewDecoder(&b)
if err := dec.Decode(tree); err != nil {
txn.Abort()
return err
}
err = localHypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree)
if err != nil {
txn.Abort()
return err
}
if err = txn.Commit(); err != nil {
txn.Abort()
return err
}
}
idSet.Add(hypergraph.AtomFromBytes(remoteUpdate.Value))
}
case <-time.After(5 * time.Second):
logger.Info("server timed out")
return nil
}
}
@ -467,6 +552,7 @@ func (s *hypergraphComparisonServer) HyperStream(
) error {
return syncTreeBidirectionallyServer(
stream,
s.logger,
s.localHypergraphStore,
s.localHypergraph,
false,
@ -479,13 +565,18 @@ func (s *hypergraphComparisonServer) HyperStream(
// their local trees are synchronized.
func SyncTreeBidirectionally(
stream protobufs.HypergraphComparisonService_HyperStreamClient,
logger *zap.Logger,
shardKey []byte,
phaseSet protobufs.HypergraphPhaseSet,
hypergraphStore store.HypergraphStore,
localTree *crypto.VectorCommitmentTree,
metadataOnly bool,
) error {
// Send initialization.
logger.Info(
"sending initialization message",
zap.String("shard_key", hex.EncodeToString(shardKey)),
zap.Int("phase_set", int(phaseSet)),
)
if err := stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Query{
Query: &protobufs.HypergraphComparisonQuery{
@ -535,6 +626,10 @@ func SyncTreeBidirectionally(
for {
select {
case path := <-pendingQueries:
logger.Info(
"sending comparison query",
zap.String("path", hex.EncodeToString(packNibbles(path))),
)
queryMsg := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Query{
Query: &protobufs.HypergraphComparisonQuery{
@ -553,9 +648,19 @@ func SyncTreeBidirectionally(
switch payload := msg.Payload.(type) {
case *protobufs.HypergraphComparison_Response:
remoteInfo := payload.Response
logger.Info(
"handling response",
zap.String("path", hex.EncodeToString(packNibbles(remoteInfo.Path))),
)
localInfo, err := getBranchInfoFromTree(localTree, remoteInfo.Path)
if err != nil {
// Request missing node.
logger.Info(
"requesting missing node",
zap.String(
"path",
hex.EncodeToString(packNibbles(remoteInfo.Path)),
),
)
missingQuery := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Query{
Query: &protobufs.HypergraphComparisonQuery{
@ -570,7 +675,22 @@ func SyncTreeBidirectionally(
continue
}
if !equalBytes(localInfo.Commitment, remoteInfo.Commitment) {
logger.Info(
"mismatching commitment at path",
zap.String(
"path",
hex.EncodeToString(packNibbles(remoteInfo.Path)),
),
zap.String("commitment", hex.EncodeToString(remoteInfo.Commitment)),
)
if isLeaf(remoteInfo) {
logger.Info(
"sending leaf info",
zap.String(
"path",
hex.EncodeToString(packNibbles(remoteInfo.Path)),
),
)
if err := sendLeafData(
stream,
hypergraphStore,
@ -590,6 +710,22 @@ func SyncTreeBidirectionally(
}
}
if !equalBytes(localChildCommit, remoteChild.Commitment) {
logger.Info(
"found mismatching child commitment, enqueueing",
zap.String(
"path",
hex.EncodeToString(packNibbles(remoteInfo.Path)),
),
zap.Int32("child_index", remoteChild.Index),
zap.String(
"local_commitment",
hex.EncodeToString(localChildCommit),
),
zap.String(
"remote_commitment",
hex.EncodeToString(remoteChild.Commitment),
),
)
newPath := append(
append([]int32(nil), remoteInfo.Path...),
remoteChild.Index,
@ -602,6 +738,13 @@ func SyncTreeBidirectionally(
case *protobufs.HypergraphComparison_Query:
queryPath := payload.Query.Path
if payload.Query.IncludeLeafData {
logger.Info(
"received query for leaves",
zap.String(
"path",
hex.EncodeToString(packNibbles(queryPath)),
),
)
if err := sendLeafData(
stream,
hypergraphStore,
@ -612,6 +755,13 @@ func SyncTreeBidirectionally(
return err
}
} else {
logger.Info(
"received query for branches",
zap.String(
"path",
hex.EncodeToString(packNibbles(queryPath)),
),
)
branchInfo, err := getBranchInfoFromTree(localTree, queryPath)
if err != nil {
continue
@ -626,6 +776,13 @@ func SyncTreeBidirectionally(
}
}
case *protobufs.HypergraphComparison_LeafData:
logger.Info(
"received leaf data",
zap.String(
"key",
hex.EncodeToString(payload.LeafData.Key),
),
)
remoteUpdate := payload.LeafData
size := new(big.Int).SetBytes(remoteUpdate.Size)
if len(remoteUpdate.UnderlyingData) != 0 {
@ -639,10 +796,17 @@ func SyncTreeBidirectionally(
dec := gob.NewDecoder(&b)
if err := dec.Decode(tree); err != nil {
txn.Abort()
return err
}
err = hypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree)
if err != nil {
txn.Abort()
return err
}
if err = txn.Commit(); err != nil {
txn.Abort()
return err
}
}
@ -655,7 +819,41 @@ func SyncTreeBidirectionally(
)
}
case <-time.After(5 * time.Second):
logger.Info("timed out")
return nil
}
}
}
func packNibbles(values []int32) []byte {
totalBits := len(values) * 6
out := make([]byte, (totalBits+7)/8)
bitOffset := 0
for _, v := range values {
bitsRemaining := 6
for bitsRemaining > 0 {
byteIndex := bitOffset / 8
bitPos := bitOffset % 8
bitsAvailable := 8 - bitPos
n := bitsRemaining
if n > bitsAvailable {
n = bitsAvailable
}
// From the current 6-bit value, take the top n bits that haven't been
// written.
shift := bitsRemaining - n
bitsToWrite := int((v >> shift) & ((1 << n) - 1))
// Place these bits in the current byte. Since we fill each byte from the
// most-significant bit down, shift them to the proper position.
shiftPos := bitsAvailable - n
out[byteIndex] |= byte(bitsToWrite << shiftPos)
bitOffset += n
bitsRemaining -= n
}
}
return out
}

View File

@ -194,7 +194,7 @@ func TestHypergraphSyncServer(t *testing.T) {
grpcServer := grpc.NewServer()
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
rpc.NewHypergraphComparisonServer(serverHypergraphStore, crdts[0]),
rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0]),
)
log.Println("Server listening on :50051")
go func() {
@ -212,7 +212,7 @@ func TestHypergraphSyncServer(t *testing.T) {
log.Fatalf("Client: failed to stream: %v", err)
}
err = rpc.SyncTreeBidirectionally(str, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), false)
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), false)
if err != nil {
log.Fatalf("Client: failed to sync 1: %v", err)
}
@ -231,7 +231,7 @@ func TestHypergraphSyncServer(t *testing.T) {
log.Fatalf("Client: failed to stream: %v", err)
}
err = rpc.SyncTreeBidirectionally(str, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), false)
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey].GetTree(), false)
if err != nil {
log.Fatalf("Client: failed to sync 2: %v", err)
}