diff --git a/hypergraph/hypergraph.go b/hypergraph/hypergraph.go index ea2ca93..fc2bf9b 100644 --- a/hypergraph/hypergraph.go +++ b/hypergraph/hypergraph.go @@ -171,15 +171,27 @@ func (hg *HypergraphCRDT) snapshotSet( hg.setsMu.RUnlock() if set == nil { + // Try to load root from snapshot store since set doesn't exist in memory + var root tries.LazyVectorCommitmentNode + if targetStore != nil { + root, _ = targetStore.GetNodeByPath( + string(atomType), + string(phaseType), + shardKey, + []int{}, // empty path = root + ) + } set = NewIdSet( atomType, phaseType, shardKey, - hg.store, + targetStore, // Use target store directly since set is new hg.prover, - nil, + root, hg.getCoveredPrefix(), ) + // Return directly - no need to clone since we already used targetStore + return set } return hg.cloneSetWithStore(set, targetStore) diff --git a/hypergraph/sync_client_driven.go b/hypergraph/sync_client_driven.go index 6bcf660..54a0930 100644 --- a/hypergraph/sync_client_driven.go +++ b/hypergraph/sync_client_driven.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "io" "slices" "strings" @@ -73,7 +74,12 @@ func (hg *HypergraphCRDT) PerformSync( case *protobufs.HypergraphSyncQuery_GetBranch: // Initialize session on first request if session == nil { - session, err = hg.initSyncSession(r.GetBranch.ShardKey, r.GetBranch.PhaseSet, logger) + session, err = hg.initSyncSession( + r.GetBranch.ShardKey, + r.GetBranch.PhaseSet, + r.GetBranch.ExpectedRoot, + logger, + ) if err != nil { return errors.Wrap(err, "init sync session") } @@ -82,7 +88,12 @@ func (hg *HypergraphCRDT) PerformSync( case *protobufs.HypergraphSyncQuery_GetLeaves: // Initialize session on first request if session == nil { - session, err = hg.initSyncSession(r.GetLeaves.ShardKey, r.GetLeaves.PhaseSet, logger) + session, err = hg.initSyncSession( + r.GetLeaves.ShardKey, + r.GetLeaves.PhaseSet, + r.GetLeaves.ExpectedRoot, + logger, + ) if err != nil { return errors.Wrap(err, "init sync session") } @@ -121,6 +132,7 @@ func (hg *HypergraphCRDT) PerformSync( func (hg *HypergraphCRDT) initSyncSession( shardKeyBytes []byte, phaseSet protobufs.HypergraphPhaseSet, + expectedRoot []byte, logger *zap.Logger, ) (*syncSession, error) { if len(shardKeyBytes) != 35 { @@ -132,8 +144,9 @@ func (hg *HypergraphCRDT) initSyncSession( L2: [32]byte(shardKeyBytes[3:]), } - // Acquire a snapshot for consistent reads throughout the session - snapshot := hg.snapshotMgr.acquire(shardKey, nil) + // Acquire a snapshot for consistent reads throughout the session. + // If expectedRoot is provided, we try to find a snapshot matching that root. + snapshot := hg.snapshotMgr.acquire(shardKey, expectedRoot) if snapshot == nil { return nil, errors.New("failed to acquire snapshot") } @@ -145,10 +158,14 @@ func (hg *HypergraphCRDT) initSyncSession( return nil, errors.New("unsupported phase set") } + tree := idSet.GetTree() logger.Debug( "sync session initialized", zap.String("shard", hex.EncodeToString(shardKeyBytes)), zap.Int("phaseSet", int(phaseSet)), + zap.Bool("tree_nil", tree == nil), + zap.Bool("root_nil", tree != nil && tree.Root == nil), + zap.String("snapshot_root", hex.EncodeToString(snapshot.Root())), ) return &syncSession{ @@ -169,6 +186,11 @@ func (hg *HypergraphCRDT) handleGetBranch( tree := session.idSet.GetTree() if tree == nil || tree.Root == nil { // Empty tree - return empty response + logger.Debug("handleGetBranch: empty tree", + zap.Bool("tree_nil", tree == nil), + zap.Bool("root_nil", tree != nil && tree.Root == nil), + zap.String("path", hex.EncodeToString(packPath(req.Path))), + ) return &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Branch{ Branch: &protobufs.HypergraphSyncBranchResponse{ @@ -312,6 +334,25 @@ func (hg *HypergraphCRDT) handleGetLeaves( }, nil } + // Debug: log node details to identify snapshot consistency issues + var nodeCommitment []byte + var nodeStore string + switch n := node.(type) { + case *tries.LazyVectorCommitmentBranchNode: + nodeCommitment = n.Commitment + nodeStore = fmt.Sprintf("%p", n.Store) + case *tries.LazyVectorCommitmentLeafNode: + nodeCommitment = n.Commitment + nodeStore = fmt.Sprintf("%p", n.Store) + } + logger.Debug("handleGetLeaves node info", + zap.String("path", hex.EncodeToString(packPath(req.Path))), + zap.String("nodeCommitment", hex.EncodeToString(nodeCommitment)), + zap.String("nodeStore", nodeStore), + zap.String("sessionStore", fmt.Sprintf("%p", session.store)), + zap.Int("contTokenLen", len(req.ContinuationToken)), + ) + // Get all leaves under this node allLeaves := tries.GetAllLeaves( tree.SetType, @@ -372,6 +413,17 @@ func (hg *HypergraphCRDT) handleGetLeaves( _ = i // suppress unused warning } + // Debug: log leaf count details + logger.Debug("handleGetLeaves returning", + zap.String("path", hex.EncodeToString(packPath(req.Path))), + zap.Int("allLeavesLen", len(allLeaves)), + zap.Uint64("totalNonNil", totalNonNil), + zap.Int("startIdx", startIdx), + zap.Int("leavesReturned", len(leaves)), + zap.String("treePtr", fmt.Sprintf("%p", tree)), + zap.String("treeRootPtr", fmt.Sprintf("%p", tree.Root)), + ) + resp := &protobufs.HypergraphSyncLeavesResponse{ Path: req.Path, Leaves: leaves, @@ -412,17 +464,15 @@ func parseContToken(token []byte) (int, error) { if len(token) == 0 { return 0, nil } - var idx int - _, err := hex.Decode(make([]byte, len(token)/2), token) + // Token is hex-encoded 4 bytes (big-endian int32) + decoded, err := hex.DecodeString(string(token)) if err != nil { return 0, err } - // Simple: just parse as decimal string - for _, b := range token { - if b >= '0' && b <= '9' { - idx = idx*10 + int(b-'0') - } + if len(decoded) != 4 { + return 0, errors.New("invalid continuation token length") } + idx := int(decoded[0])<<24 | int(decoded[1])<<16 | int(decoded[2])<<8 | int(decoded[3]) return idx, nil } @@ -432,12 +482,15 @@ func makeContToken(idx int) []byte { // SyncFrom performs a client-driven sync from the given server stream. // It navigates to the covered prefix (if any), then recursively syncs -// differing subtrees. +// differing subtrees. If expectedRoot is provided, the server will attempt +// to sync from a snapshot matching that root commitment. +// Returns the new root commitment after sync completes. func (hg *HypergraphCRDT) SyncFrom( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey tries.ShardKey, phaseSet protobufs.HypergraphPhaseSet, -) error { + expectedRoot []byte, +) ([]byte, error) { hg.mu.Lock() defer hg.mu.Unlock() @@ -445,6 +498,9 @@ func (hg *HypergraphCRDT) SyncFrom( zap.String("method", "SyncFrom"), zap.String("shard", hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:]))), ) + if len(expectedRoot) > 0 { + logger = logger.With(zap.String("expectedRoot", hex.EncodeToString(expectedRoot))) + } syncStart := time.Now() defer func() { @@ -453,30 +509,39 @@ func (hg *HypergraphCRDT) SyncFrom( set := hg.getPhaseSet(shardKey, phaseSet) if set == nil { - return errors.New("unsupported phase set") + return nil, errors.New("unsupported phase set") } shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:]) coveredPrefix := hg.getCoveredPrefix() // Step 1: Navigate to sync point - syncPoint, err := hg.navigateToSyncPoint(stream, shardKeyBytes, phaseSet, coveredPrefix, logger) + syncPoint, err := hg.navigateToSyncPoint(stream, shardKeyBytes, phaseSet, coveredPrefix, expectedRoot, logger) if err != nil { - return errors.Wrap(err, "navigate to sync point") + return nil, errors.Wrap(err, "navigate to sync point") } if syncPoint == nil || len(syncPoint.Commitment) == 0 { logger.Info("server has no data at sync point") - return nil + // Return current root even if no data was synced + root := set.GetTree().Commit(false) + return root, nil } // Step 2: Sync the subtree - err = hg.syncSubtree(stream, shardKeyBytes, phaseSet, syncPoint, set, logger) + err = hg.syncSubtree(stream, shardKeyBytes, phaseSet, expectedRoot, syncPoint, set, logger) if err != nil { - return errors.Wrap(err, "sync subtree") + return nil, errors.Wrap(err, "sync subtree") } - return nil + // Step 3: Recompute commitment so future syncs see updated state + root := set.GetTree().Commit(false) + logger.Info( + "hypergraph root commit after sync", + zap.String("root", hex.EncodeToString(root)), + ) + + return root, nil } func (hg *HypergraphCRDT) navigateToSyncPoint( @@ -484,6 +549,7 @@ func (hg *HypergraphCRDT) navigateToSyncPoint( shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, coveredPrefix []int, + expectedRoot []byte, logger *zap.Logger, ) (*protobufs.HypergraphSyncBranchResponse, error) { path := []int32{} @@ -493,9 +559,10 @@ func (hg *HypergraphCRDT) navigateToSyncPoint( err := stream.Send(&protobufs.HypergraphSyncQuery{ Request: &protobufs.HypergraphSyncQuery_GetBranch{ GetBranch: &protobufs.HypergraphSyncGetBranchRequest{ - ShardKey: shardKey, - PhaseSet: phaseSet, - Path: path, + ShardKey: shardKey, + PhaseSet: phaseSet, + Path: path, + ExpectedRoot: expectedRoot, }, }, }) @@ -574,6 +641,7 @@ func (hg *HypergraphCRDT) syncSubtree( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, + expectedRoot []byte, serverBranch *protobufs.HypergraphSyncBranchResponse, localSet hypergraph.IdSet, logger *zap.Logger, @@ -582,9 +650,10 @@ func (hg *HypergraphCRDT) syncSubtree( // Get local node at same path var localCommitment []byte + var localNode tries.LazyVectorCommitmentNode if tree != nil && tree.Root != nil { path := toIntSlice(serverBranch.FullPath) - localNode := getNodeAtPath( + localNode = getNodeAtPath( logger, tree.SetType, tree.PhaseType, @@ -614,22 +683,24 @@ func (hg *HypergraphCRDT) syncSubtree( // If server node is a leaf or has no children, fetch all leaves if serverBranch.IsLeaf || len(serverBranch.Children) == 0 { - return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, serverBranch.FullPath, localSet, logger) + return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger) + } + + // OPTIMIZATION: If we have NO local data at this path, skip the branch-by-branch + // traversal and just fetch all leaves directly. This avoids N round trips for N + // children when we know we need all of them anyway. + if localNode == nil { + logger.Debug("no local data at path, fetching all leaves directly", + zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))), + zap.Int("serverChildren", len(serverBranch.Children)), + ) + return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger) } // Compare children and recurse localChildren := make(map[int32][]byte) if tree != nil && tree.Root != nil { path := toIntSlice(serverBranch.FullPath) - localNode := getNodeAtPath( - logger, - tree.SetType, - tree.PhaseType, - tree.ShardKey, - tree.Root, - serverBranch.FullPath, - 0, - ) if branch, ok := localNode.(*tries.LazyVectorCommitmentBranchNode); ok { for i := 0; i < 64; i++ { child := branch.Children[i] @@ -670,9 +741,10 @@ func (hg *HypergraphCRDT) syncSubtree( err := stream.Send(&protobufs.HypergraphSyncQuery{ Request: &protobufs.HypergraphSyncQuery_GetBranch{ GetBranch: &protobufs.HypergraphSyncGetBranchRequest{ - ShardKey: shardKey, - PhaseSet: phaseSet, - Path: childPath, + ShardKey: shardKey, + PhaseSet: phaseSet, + Path: childPath, + ExpectedRoot: expectedRoot, }, }, }) @@ -699,7 +771,7 @@ func (hg *HypergraphCRDT) syncSubtree( } // Recurse - if err := hg.syncSubtree(stream, shardKey, phaseSet, childBranch, localSet, logger); err != nil { + if err := hg.syncSubtree(stream, shardKey, phaseSet, expectedRoot, childBranch, localSet, logger); err != nil { return err } } @@ -711,6 +783,7 @@ func (hg *HypergraphCRDT) fetchAndIntegrateLeaves( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, + expectedRoot []byte, path []int32, localSet hypergraph.IdSet, logger *zap.Logger, @@ -731,6 +804,7 @@ func (hg *HypergraphCRDT) fetchAndIntegrateLeaves( Path: path, MaxLeaves: 1000, ContinuationToken: continuationToken, + ExpectedRoot: expectedRoot, }, }, }) @@ -784,6 +858,7 @@ func (hg *HypergraphCRDT) fetchAndIntegrateLeaves( totalFetched += len(leavesResp.Leaves) logger.Debug("fetched leaves batch", + zap.String("path", hex.EncodeToString(packPath(path))), zap.Int("count", len(leavesResp.Leaves)), zap.Int("totalFetched", totalFetched), zap.Uint64("totalAvailable", leavesResp.TotalLeaves), diff --git a/node/consensus/app/message_validation.go b/node/consensus/app/message_validation.go index 35c8e53..1ad2bb6 100644 --- a/node/consensus/app/message_validation.go +++ b/node/consensus/app/message_validation.go @@ -479,6 +479,13 @@ func (e *AppConsensusEngine) validatePeerInfoMessage( return p2p.ValidationResultReject } + if peerInfo.Timestamp < now-1000 { + e.logger.Debug("peer info timestamp too old, ignoring", + zap.Int64("peer_timestamp", peerInfo.Timestamp), + ) + return p2p.ValidationResultIgnore + } + if peerInfo.Timestamp > fiveMinutesLater { e.logger.Debug("peer info timestamp too far in future", zap.Int64("peer_timestamp", peerInfo.Timestamp), @@ -499,10 +506,17 @@ func (e *AppConsensusEngine) validatePeerInfoMessage( } now := time.Now().UnixMilli() - if int64(keyRegistry.LastUpdated) < now-1000 { - e.logger.Debug("key registry timestamp too old") + + if int64(keyRegistry.LastUpdated) < now-60000 { + e.logger.Debug("key registry timestamp too old, rejecting") return p2p.ValidationResultReject } + + if int64(keyRegistry.LastUpdated) < now-1000 { + e.logger.Debug("key registry timestamp too old") + return p2p.ValidationResultIgnore + } + if int64(keyRegistry.LastUpdated) > now+5000 { e.logger.Debug("key registry timestamp too far in future") return p2p.ValidationResultIgnore diff --git a/node/consensus/global/message_validation.go b/node/consensus/global/message_validation.go index 4728b33..366db5f 100644 --- a/node/consensus/global/message_validation.go +++ b/node/consensus/global/message_validation.go @@ -504,13 +504,20 @@ func (e *GlobalConsensusEngine) validatePeerInfoMessage( now := time.Now().UnixMilli() - if peerInfo.Timestamp < now-1000 { - e.logger.Debug("peer info timestamp too old", + if peerInfo.Timestamp < now-60000 { + e.logger.Debug("peer info timestamp too old, rejecting", zap.Int64("peer_timestamp", peerInfo.Timestamp), ) return tp2p.ValidationResultReject } + if peerInfo.Timestamp < now-1000 { + e.logger.Debug("peer info timestamp too old, ignoring", + zap.Int64("peer_timestamp", peerInfo.Timestamp), + ) + return tp2p.ValidationResultIgnore + } + if peerInfo.Timestamp > now+5000 { e.logger.Debug("peer info timestamp too far in future", zap.Int64("peer_timestamp", peerInfo.Timestamp), @@ -532,9 +539,14 @@ func (e *GlobalConsensusEngine) validatePeerInfoMessage( now := time.Now().UnixMilli() + if int64(keyRegistry.LastUpdated) < now-60000 { + e.logger.Debug("key registry timestamp too old, rejecting") + return tp2p.ValidationResultReject + } + if int64(keyRegistry.LastUpdated) < now-1000 { e.logger.Debug("key registry timestamp too old") - return tp2p.ValidationResultReject + return tp2p.ValidationResultIgnore } if int64(keyRegistry.LastUpdated) > now+5000 { diff --git a/node/consensus/sync/sync_provider.go b/node/consensus/sync/sync_provider.go index f2d9264..85deb14 100644 --- a/node/consensus/sync/sync_provider.go +++ b/node/consensus/sync/sync_provider.go @@ -406,6 +406,7 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSync( phaseSyncs := [](func( protobufs.HypergraphComparisonService_PerformSyncClient, tries.ShardKey, + []byte, ) []byte){ p.hyperSyncVertexAdds, p.hyperSyncVertexRemoves, @@ -438,7 +439,7 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSync( return nil } - root := syncPhase(str, shardKey) + root := syncPhase(str, shardKey, expectedRoot) if cerr := ch.Close(); cerr != nil { p.logger.Error("error while closing connection", zap.Error(cerr)) } @@ -480,6 +481,7 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSyncSelf( phaseSyncs := [](func( protobufs.HypergraphComparisonService_PerformSyncClient, tries.ShardKey, + []byte, ) []byte){ p.hyperSyncVertexAdds, p.hyperSyncVertexRemoves, @@ -511,7 +513,7 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSyncSelf( return } - syncPhase(str, shardKey) + syncPhase(str, shardKey, expectedRoot) if cerr := ch.Close(); cerr != nil { p.logger.Error("error while closing connection", zap.Error(cerr)) } @@ -524,65 +526,73 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSyncSelf( func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexAdds( str protobufs.HypergraphComparisonService_PerformSyncClient, shardKey tries.ShardKey, + expectedRoot []byte, ) []byte { - err := p.hypergraph.SyncFrom( + root, err := p.hypergraph.SyncFrom( str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + expectedRoot, ) if err != nil { p.logger.Error("error from sync", zap.Error(err)) } str.CloseSend() - return nil + return root } func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexRemoves( str protobufs.HypergraphComparisonService_PerformSyncClient, shardKey tries.ShardKey, + expectedRoot []byte, ) []byte { - err := p.hypergraph.SyncFrom( + root, err := p.hypergraph.SyncFrom( str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES, + expectedRoot, ) if err != nil { p.logger.Error("error from sync", zap.Error(err)) } str.CloseSend() - return nil + return root } func (p *SyncProvider[StateT, ProposalT]) hyperSyncHyperedgeAdds( str protobufs.HypergraphComparisonService_PerformSyncClient, shardKey tries.ShardKey, + expectedRoot []byte, ) []byte { - err := p.hypergraph.SyncFrom( + root, err := p.hypergraph.SyncFrom( str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS, + expectedRoot, ) if err != nil { p.logger.Error("error from sync", zap.Error(err)) } str.CloseSend() - return nil + return root } func (p *SyncProvider[StateT, ProposalT]) hyperSyncHyperedgeRemoves( str protobufs.HypergraphComparisonService_PerformSyncClient, shardKey tries.ShardKey, + expectedRoot []byte, ) []byte { - err := p.hypergraph.SyncFrom( + root, err := p.hypergraph.SyncFrom( str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES, + expectedRoot, ) if err != nil { p.logger.Error("error from sync", zap.Error(err)) } str.CloseSend() - return nil + return root } func (p *SyncProvider[StateT, ProposalT]) AddState( diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index 3a104f8..14463fa 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -186,8 +186,8 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { "quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer, }, map[string]channel.AllowedPeerPolicyType{ - "/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer, - "/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync": channel.OnlyShardProverPeer, + "/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer, + "/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync": channel.OnlyShardProverPeer, "/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer, "/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer, "/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer, diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index fe45978..7f94755 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -10,6 +10,7 @@ import ( "log" "math/big" "net" + "os" "slices" "sync" "testing" @@ -19,6 +20,8 @@ import ( "github.com/iden3/go-iden3-crypto/poseidon" pcrypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + mn "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -28,12 +31,16 @@ import ( "google.golang.org/grpc/test/bufconn" "source.quilibrium.com/quilibrium/monorepo/bls48581" "source.quilibrium.com/quilibrium/monorepo/config" + "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph" + "source.quilibrium.com/quilibrium/monorepo/types/channel" internal_grpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc" + "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/store" "source.quilibrium.com/quilibrium/monorepo/node/tests" "source.quilibrium.com/quilibrium/monorepo/protobufs" application "source.quilibrium.com/quilibrium/monorepo/types/hypergraph" + tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p" "source.quilibrium.com/quilibrium/monorepo/types/tries" crypto "source.quilibrium.com/quilibrium/monorepo/types/tries" "source.quilibrium.com/quilibrium/monorepo/verenc" @@ -266,6 +273,8 @@ func TestHypergraphSyncServer(t *testing.T) { } grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { _, priv, _ := ed448.GenerateKey(rand.Reader) privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv) @@ -300,7 +309,13 @@ func TestHypergraphSyncServer(t *testing.T) { } }() - conn, err := grpc.DialContext(context.TODO(), "localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.DialContext(context.TODO(), "localhost:50051", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), + ) if err != nil { log.Fatalf("Client: failed to listen: %v", err) } @@ -310,7 +325,7 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS) + _, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil) if err != nil { log.Fatalf("Client: failed to sync 1: %v", err) } @@ -359,7 +374,7 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS) + _, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil) if err != nil { log.Fatalf("Client: failed to sync 2: %v", err) } @@ -570,6 +585,8 @@ func TestHypergraphPartialSync(t *testing.T) { } grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { _, priv, _ := ed448.GenerateKey(rand.Reader) privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv) @@ -603,7 +620,13 @@ func TestHypergraphPartialSync(t *testing.T) { } }() - conn, err := grpc.DialContext(context.TODO(), "localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.DialContext(context.TODO(), "localhost:50051", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), + ) if err != nil { log.Fatalf("Client: failed to listen: %v", err) } @@ -613,7 +636,7 @@ func TestHypergraphPartialSync(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS) + _, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil) if err != nil { log.Fatalf("Client: failed to sync 1: %v", err) } @@ -633,7 +656,7 @@ func TestHypergraphPartialSync(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS) + _, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil) if err != nil { log.Fatalf("Client: failed to sync 2: %v", err) } @@ -783,6 +806,8 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { lis := bufconn.Listen(bufSize) grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB grpc.ChainStreamInterceptor(func( srv interface{}, ss grpc.ServerStream, @@ -823,6 +848,10 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { "bufnet", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), ) require.NoError(t, err) return conn, protobufs.NewHypergraphComparisonServiceClient(conn) @@ -868,10 +897,11 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { ) stream, err := client.PerformSync(streamCtx) require.NoError(t, err) - clientHG.SyncFrom( + _, _ = clientHG.SyncFrom( stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, stream.CloseSend()) cancelStream() @@ -930,10 +960,11 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { conn, client := dialClient() stream, err := client.PerformSync(context.Background()) require.NoError(t, err) - _ = clientHGs[0].SyncFrom( + _, _ = clientHGs[0].SyncFrom( stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) _ = stream.CloseSend() conn.Close() @@ -954,10 +985,11 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) { ) stream, err := client.PerformSync(streamCtx) require.NoError(t, err) - err = clientHGs[idx].SyncFrom( + _, err = clientHGs[idx].SyncFrom( stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, stream.CloseSend()) @@ -1188,6 +1220,8 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) { lis := bufconn.Listen(bufSize) grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB grpc.ChainStreamInterceptor(func( srv interface{}, ss grpc.ServerStream, @@ -1224,6 +1258,10 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) { "bufnet", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), ) require.NoError(t, err) return conn, protobufs.NewHypergraphComparisonServiceClient(conn) @@ -1262,10 +1300,11 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) { stream, err := client.PerformSync(context.Background()) require.NoError(t, err) - err = clientHG.SyncFrom( + _, err = clientHG.SyncFrom( stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, stream.CloseSend()) @@ -1315,10 +1354,11 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) { stream, err := client.PerformSync(context.Background()) require.NoError(t, err) - err = clientHG.SyncFrom( + _, err = clientHG.SyncFrom( stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, stream.CloseSend()) @@ -1343,10 +1383,11 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) { stream, err := client.PerformSync(context.Background()) require.NoError(t, err) - err = clientHG.SyncFrom( + _, err = clientHG.SyncFrom( stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, stream.CloseSend()) @@ -1497,6 +1538,8 @@ func TestHypergraphSyncWithModifiedEntries(t *testing.T) { lis := bufconn.Listen(bufSize) grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB grpc.ChainStreamInterceptor(func( srv interface{}, ss grpc.ServerStream, @@ -1533,6 +1576,10 @@ func TestHypergraphSyncWithModifiedEntries(t *testing.T) { "bufnet", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), ) require.NoError(t, err) return conn, protobufs.NewHypergraphComparisonServiceClient(conn) @@ -1546,10 +1593,11 @@ func TestHypergraphSyncWithModifiedEntries(t *testing.T) { stream, err := client.PerformSync(context.Background()) require.NoError(t, err) - err = clientHG.SyncFrom( + _, err = clientHG.SyncFrom( stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, stream.CloseSend()) @@ -1707,6 +1755,8 @@ func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) { lis := bufconn.Listen(bufSize) grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB grpc.ChainStreamInterceptor(func( srv interface{}, ss grpc.ServerStream, @@ -1745,6 +1795,10 @@ func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) { "bufnet", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), ) require.NoError(t, err) return conn, protobufs.NewHypergraphComparisonServiceClient(conn) @@ -1761,10 +1815,11 @@ func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) { streamB, err := clientB.PerformSync(context.Background()) require.NoError(t, err) - err = nodeAHG.SyncFrom( + _, err = nodeAHG.SyncFrom( streamB, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, streamB.CloseSend()) @@ -1787,10 +1842,11 @@ func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) { streamA, err := clientA.PerformSync(context.Background()) require.NoError(t, err) - err = nodeBHG.SyncFrom( + _, err = nodeBHG.SyncFrom( streamA, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, streamA.CloseSend()) @@ -1982,6 +2038,8 @@ func TestHypergraphBidirectionalSyncClientDriven(t *testing.T) { lis := bufconn.Listen(bufSize) grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB grpc.ChainStreamInterceptor(func( srv interface{}, ss grpc.ServerStream, @@ -2020,6 +2078,10 @@ func TestHypergraphBidirectionalSyncClientDriven(t *testing.T) { "bufnet", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), ) require.NoError(t, err) return conn, protobufs.NewHypergraphComparisonServiceClient(conn) @@ -2039,10 +2101,11 @@ func TestHypergraphBidirectionalSyncClientDriven(t *testing.T) { streamB, err := clientB.PerformSync(context.Background()) require.NoError(t, err) - err = nodeAHG.SyncFrom( + _, err = nodeAHG.SyncFrom( streamB, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, streamB.CloseSend()) @@ -2064,10 +2127,11 @@ func TestHypergraphBidirectionalSyncClientDriven(t *testing.T) { streamA, err := clientA.PerformSync(context.Background()) require.NoError(t, err) - err = nodeBHG.SyncFrom( + _, err = nodeBHG.SyncFrom( streamA, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, streamA.CloseSend()) @@ -2323,6 +2387,8 @@ func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) { setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) { lis := bufconn.Listen(bufSize) grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB grpc.ChainStreamInterceptor(func( srv interface{}, ss grpc.ServerStream, @@ -2353,6 +2419,10 @@ func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) { "bufnet", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), ) require.NoError(t, err) return conn, protobufs.NewHypergraphComparisonServiceClient(conn) @@ -2395,10 +2465,11 @@ func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) { stream, err := client.PerformSync(context.Background()) require.NoError(t, err) - err = clientHG.SyncFrom( + _, err = clientHG.SyncFrom( stream, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, ) require.NoError(t, err) require.NoError(t, stream.CloseSend()) @@ -2465,3 +2536,843 @@ func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) { runSyncTest("A_syncs_from_B") runSyncTest("B_syncs_from_A") } + +// TestMainnetBlossomsubFrameReceptionAndHypersync is an integration test that: +// 1. Connects to mainnet blossomsub using real bootstrap peers +// 2. Subscribes to the global frame bitmask (0x0000) as done in global_consensus_engine.go +// 3. Receives a real frame from a global prover on mainnet +// 4. Performs hypersync on the prover shard (000000ffffffff...ffffffff) +// 5. Confirms the synced data matches the prover root commitment from the frame +// +// This test requires network access and may take up to 5 minutes to receive a frame. +// Run with: go test -v -timeout 10m -run TestMainnetBlossomsubFrameReceptionAndHypersync +func TestMainnetBlossomsubFrameReceptionAndHypersync(t *testing.T) { + if testing.Short() { + t.Skip("skipping mainnet integration test in short mode") + } + + logger, _ := zap.NewDevelopment() + enc := verenc.NewMPCitHVerifiableEncryptor(1) + inclusionProver := bls48581.NewKZGInclusionProver(logger) + + // The prover shard key from global consensus: + // L1 = [0x00, 0x00, 0x00], L2 = bytes.Repeat([]byte{0xff}, 32) + proverShardKey := tries.ShardKey{ + L1: [3]byte{0x00, 0x00, 0x00}, + L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)), + } + + // Frame bitmask from global consensus: []byte{0x00, 0x00} + globalFrameBitmask := []byte{0x00, 0x00} + + // Create in-memory hypergraph store for the client + clientDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"}, 0) + defer clientDB.Close() + + clientStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"}, + clientDB, + logger, + enc, + inclusionProver, + ) + + clientHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "mainnet-client")), + clientStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + // Generate a random peer key for this test node + peerPrivKey, _, err := pcrypto.GenerateEd448Key(rand.Reader) + require.NoError(t, err) + peerPrivKeyBytes, err := peerPrivKey.Raw() + require.NoError(t, err) + + // Create P2P config with mainnet bootstrap peers + p2pConfig := &config.P2PConfig{ + ListenMultiaddr: "/ip4/0.0.0.0/udp/0/quic-v1", // Use random port + BootstrapPeers: config.BootstrapPeers, + PeerPrivKey: fmt.Sprintf("%x", peerPrivKeyBytes), + Network: 0, // Mainnet + D: 8, + DLo: 6, + DHi: 12, + DScore: 4, + DOut: 2, + HistoryLength: 5, + HistoryGossip: 3, + DLazy: 6, + GossipFactor: 0.25, + GossipRetransmission: 3, + HeartbeatInitialDelay: 100 * time.Millisecond, + HeartbeatInterval: 1 * time.Second, + FanoutTTL: 60 * time.Second, + PrunePeers: 16, + PruneBackoff: time.Minute, + UnsubscribeBackoff: 10 * time.Second, + Connectors: 8, + MaxPendingConnections: 128, + ConnectionTimeout: 30 * time.Second, + DirectConnectTicks: 300, + DirectConnectInitialDelay: 1 * time.Second, + OpportunisticGraftTicks: 60, + OpportunisticGraftPeers: 2, + GraftFloodThreshold: 10 * time.Second, + MaxIHaveLength: 5000, + MaxIHaveMessages: 10, + MaxIDontWantMessages: 10, + IWantFollowupTime: 3 * time.Second, + IDontWantMessageThreshold: 10000, + IDontWantMessageTTL: 3, + MinBootstrapPeers: 1, + BootstrapParallelism: 4, + DiscoveryParallelism: 4, + DiscoveryPeerLookupLimit: 100, + PingTimeout: 30 * time.Second, + PingPeriod: time.Minute, + PingAttempts: 3, + LowWatermarkConnections: -1, + HighWatermarkConnections: -1, + SubscriptionQueueSize: 128, + ValidateQueueSize: 128, + ValidateWorkers: 4, + PeerOutboundQueueSize: 128, + } + + engineConfig := &config.EngineConfig{} + + // Create a temporary config directory + configDir, err := os.MkdirTemp("", "quil-test-*") + require.NoError(t, err) + defer os.RemoveAll(configDir) + + // Create connectivity cache file to bypass the connectivity test + // The cache file must be named "connectivity-check-" and exist in configDir + connectivityCachePath := fmt.Sprintf("%s/connectivity-check-0", configDir) + err = os.WriteFile(connectivityCachePath, []byte(time.Now().Format(time.RFC3339)), 0644) + require.NoError(t, err) + + t.Log("Connecting to mainnet blossomsub...") + + // Create the real blossomsub instance + pubsub := p2p.NewBlossomSub( + p2pConfig, + engineConfig, + logger.Named("blossomsub"), + 0, + p2p.ConfigDir(configDir), + ) + defer pubsub.Close() + + t.Logf("Connected to mainnet with peer ID: %x", pubsub.GetPeerID()) + t.Logf("Bootstrap peers: %d", len(config.BootstrapPeers)) + + // Create a channel to receive frames + frameReceived := make(chan *protobufs.GlobalFrame, 1) + + // Create a peer info manager to store peer reachability info + // We use a simple in-memory map to store peer info from the peer info bitmask + peerInfoMap := make(map[string]*tp2p.PeerInfo) + var peerInfoMu sync.RWMutex + + // Create a key registry map to map prover addresses to identity peer IDs + // Key: prover address ([]byte as string), Value: identity peer ID + keyRegistryMap := make(map[string]peer.ID) + var keyRegistryMu sync.RWMutex + + // Peer info bitmask from global consensus: []byte{0x00, 0x00, 0x00, 0x00} + globalPeerInfoBitmask := []byte{0x00, 0x00, 0x00, 0x00} + + // Subscribe to peer info bitmask - this handles both PeerInfo and KeyRegistry messages + t.Log("Subscribing to global peer info bitmask...") + err = pubsub.Subscribe(globalPeerInfoBitmask, func(message *pb.Message) error { + if len(message.Data) < 4 { + return nil + } + + // Check type prefix + typePrefix := binary.BigEndian.Uint32(message.Data[:4]) + + switch typePrefix { + case protobufs.PeerInfoType: + peerInfoMsg := &protobufs.PeerInfo{} + if err := peerInfoMsg.FromCanonicalBytes(message.Data); err != nil { + t.Logf("Failed to unmarshal peer info: %v", err) + return nil + } + + // Validate signature using Ed448 + if len(peerInfoMsg.Signature) == 0 || len(peerInfoMsg.PublicKey) == 0 { + return nil + } + + // Create a copy without signature for validation + infoCopy := &protobufs.PeerInfo{ + PeerId: peerInfoMsg.PeerId, + Reachability: peerInfoMsg.Reachability, + Timestamp: peerInfoMsg.Timestamp, + Version: peerInfoMsg.Version, + PatchNumber: peerInfoMsg.PatchNumber, + Capabilities: peerInfoMsg.Capabilities, + PublicKey: peerInfoMsg.PublicKey, + LastReceivedFrame: peerInfoMsg.LastReceivedFrame, + LastGlobalHeadFrame: peerInfoMsg.LastGlobalHeadFrame, + } + + msg, err := infoCopy.ToCanonicalBytes() + if err != nil { + return nil + } + + // Validate Ed448 signature + if !ed448.Verify(ed448.PublicKey(peerInfoMsg.PublicKey), msg, peerInfoMsg.Signature, "") { + return nil + } + + // Convert and store peer info + reachability := []tp2p.Reachability{} + for _, r := range peerInfoMsg.Reachability { + reachability = append(reachability, tp2p.Reachability{ + Filter: r.Filter, + PubsubMultiaddrs: r.PubsubMultiaddrs, + StreamMultiaddrs: r.StreamMultiaddrs, + }) + } + + peerInfoMu.Lock() + peerInfoMap[string(peerInfoMsg.PeerId)] = &tp2p.PeerInfo{ + PeerId: peerInfoMsg.PeerId, + Reachability: reachability, + Cores: uint32(len(reachability)), + LastSeen: time.Now().UnixMilli(), + Version: peerInfoMsg.Version, + PatchNumber: peerInfoMsg.PatchNumber, + LastReceivedFrame: peerInfoMsg.LastReceivedFrame, + LastGlobalHeadFrame: peerInfoMsg.LastGlobalHeadFrame, + } + peerInfoMu.Unlock() + + peerIdStr := peer.ID(peerInfoMsg.PeerId).String() + t.Logf("Received peer info for %s with %d reachability entries", + peerIdStr, len(reachability)) + + case protobufs.KeyRegistryType: + keyRegistry := &protobufs.KeyRegistry{} + if err := keyRegistry.FromCanonicalBytes(message.Data); err != nil { + t.Logf("Failed to unmarshal key registry: %v", err) + return nil + } + + // We need identity key and prover key to establish the mapping + if keyRegistry.IdentityKey == nil || len(keyRegistry.IdentityKey.KeyValue) == 0 { + return nil + } + if keyRegistry.ProverKey == nil || len(keyRegistry.ProverKey.KeyValue) == 0 { + return nil + } + + // Derive peer ID from identity key + pk, err := pcrypto.UnmarshalEd448PublicKey(keyRegistry.IdentityKey.KeyValue) + if err != nil { + t.Logf("Failed to unmarshal identity key: %v", err) + return nil + } + identityPeerID, err := peer.IDFromPublicKey(pk) + if err != nil { + t.Logf("Failed to derive peer ID from identity key: %v", err) + return nil + } + + // Derive prover address from prover key (Poseidon hash) + proverAddrBI, err := poseidon.HashBytes(keyRegistry.ProverKey.KeyValue) + if err != nil { + t.Logf("Failed to derive prover address: %v", err) + return nil + } + proverAddress := proverAddrBI.FillBytes(make([]byte, 32)) + + // Store the mapping: prover address -> identity peer ID + keyRegistryMu.Lock() + keyRegistryMap[string(proverAddress)] = identityPeerID + keyRegistryMu.Unlock() + + t.Logf("Received key registry: prover %x -> peer %s", + proverAddress, identityPeerID.String()) + } + + return nil + }) + require.NoError(t, err) + + // Register a validator for peer info messages with age checks + err = pubsub.RegisterValidator(globalPeerInfoBitmask, func(peerID peer.ID, message *pb.Message) tp2p.ValidationResult { + if len(message.Data) < 4 { + return tp2p.ValidationResultReject + } + + typePrefix := binary.BigEndian.Uint32(message.Data[:4]) + now := time.Now().UnixMilli() + + switch typePrefix { + case protobufs.PeerInfoType: + peerInfo := &protobufs.PeerInfo{} + if err := peerInfo.FromCanonicalBytes(message.Data); err != nil { + return tp2p.ValidationResultReject + } + + // Age checks: timestamp must be within 1 second in the past, 5 seconds in the future + if peerInfo.Timestamp < now-1000 { + t.Logf("Rejecting peer info: timestamp too old (%d < %d)", peerInfo.Timestamp, now-1000) + return tp2p.ValidationResultReject + } + if peerInfo.Timestamp > now+5000 { + t.Logf("Ignoring peer info: timestamp too far in future (%d > %d)", peerInfo.Timestamp, now+5000) + return tp2p.ValidationResultIgnore + } + + case protobufs.KeyRegistryType: + keyRegistry := &protobufs.KeyRegistry{} + if err := keyRegistry.FromCanonicalBytes(message.Data); err != nil { + return tp2p.ValidationResultReject + } + + // Age checks: LastUpdated must be within 1 second in the past, 5 seconds in the future + if int64(keyRegistry.LastUpdated) < now-1000 { + t.Logf("Rejecting key registry: timestamp too old (%d < %d)", keyRegistry.LastUpdated, now-1000) + return tp2p.ValidationResultReject + } + if int64(keyRegistry.LastUpdated) > now+5000 { + t.Logf("Ignoring key registry: timestamp too far in future (%d > %d)", keyRegistry.LastUpdated, now+5000) + return tp2p.ValidationResultIgnore + } + + default: + return tp2p.ValidationResultIgnore + } + + return tp2p.ValidationResultAccept + }, true) + require.NoError(t, err) + + // Subscribe to frame messages + t.Log("Subscribing to global frame bitmask...") + err = pubsub.Subscribe(globalFrameBitmask, func(message *pb.Message) error { + t.Logf("Received message on frame bitmask, data length: %d", len(message.Data)) + + if len(message.Data) < 4 { + return nil + } + + // Check type prefix + typePrefix := binary.BigEndian.Uint32(message.Data[:4]) + t.Logf("Message type prefix: %d (GlobalFrameType=%d)", typePrefix, protobufs.GlobalFrameType) + if typePrefix != protobufs.GlobalFrameType { + return nil + } + + frame := &protobufs.GlobalFrame{} + if err := frame.FromCanonicalBytes(message.Data); err != nil { + t.Logf("Failed to unmarshal frame: %v", err) + return nil + } + + t.Logf("Received frame %d from prover %x with root %x", + frame.Header.FrameNumber, + frame.Header.Prover, + frame.Header.ProverTreeCommitment) + + select { + case frameReceived <- frame: + default: + } + return nil + }) + require.NoError(t, err) + + // Register a validator for frame messages with age checks + err = pubsub.RegisterValidator(globalFrameBitmask, func(peerID peer.ID, message *pb.Message) tp2p.ValidationResult { + if len(message.Data) < 4 { + return tp2p.ValidationResultReject + } + + typePrefix := binary.BigEndian.Uint32(message.Data[:4]) + if typePrefix != protobufs.GlobalFrameType { + return tp2p.ValidationResultIgnore + } + + frame := &protobufs.GlobalFrame{} + if err := frame.FromCanonicalBytes(message.Data); err != nil { + t.Logf("Frame validation: failed to unmarshal: %v", err) + return tp2p.ValidationResultReject + } + + // Check signature is present + if frame.Header.PublicKeySignatureBls48581 == nil || + frame.Header.PublicKeySignatureBls48581.PublicKey == nil || + frame.Header.PublicKeySignatureBls48581.PublicKey.KeyValue == nil { + t.Logf("Frame validation: missing signature") + return tp2p.ValidationResultReject + } + + // Age check: frame must be within 120 seconds + frameAge := time.Since(time.UnixMilli(frame.Header.Timestamp)) + if frameAge > 120*time.Second { + t.Logf("Frame validation: too old (age=%v)", frameAge) + return tp2p.ValidationResultIgnore + } + + t.Logf("Frame validation: accepting frame %d (age=%v)", frame.Header.FrameNumber, frameAge) + return tp2p.ValidationResultAccept + }, true) + require.NoError(t, err) + + t.Log("Waiting for a global frame from mainnet (this may take up to 20 minutes)...") + + // Wait for a frame with a longer timeout for mainnet - frames can take a while + var receivedFrame *protobufs.GlobalFrame + select { + case receivedFrame = <-frameReceived: + t.Logf("Successfully received frame %d!", receivedFrame.Header.FrameNumber) + case <-time.After(20 * time.Minute): + t.Fatal("timeout waiting for frame from mainnet - ensure network connectivity") + } + + // Verify frame has required fields + require.NotNil(t, receivedFrame.Header, "frame must have header") + require.NotEmpty(t, receivedFrame.Header.Prover, "frame must have prover") + require.NotEmpty(t, receivedFrame.Header.ProverTreeCommitment, "frame must have prover tree commitment") + + expectedRoot := receivedFrame.Header.ProverTreeCommitment + proverAddress := receivedFrame.Header.Prover // This is the prover ADDRESS (hash of BLS key), not a peer ID + + t.Logf("Frame details:") + t.Logf(" Frame number: %d", receivedFrame.Header.FrameNumber) + t.Logf(" Prover address: %x", proverAddress) + t.Logf(" Prover root commitment: %x", expectedRoot) + + // Now we need to find the prover's peer info to connect and sync + // The prover address (in frame) needs to be mapped to a peer ID via key registry + t.Log("Looking up prover peer info...") + + // Helper function to get prover's identity peer ID from key registry + getProverPeerID := func() (peer.ID, bool) { + keyRegistryMu.RLock() + defer keyRegistryMu.RUnlock() + + peerID, ok := keyRegistryMap[string(proverAddress)] + return peerID, ok + } + + // Helper function to get multiaddr from peer info map using peer ID + getMultiaddrForPeer := func(peerID peer.ID) string { + peerInfoMu.RLock() + defer peerInfoMu.RUnlock() + + info, ok := peerInfoMap[string([]byte(peerID))] + if !ok || len(info.Reachability) == 0 { + return "" + } + + // Try stream multiaddrs first (for direct gRPC connection) + for _, r := range info.Reachability { + if len(r.StreamMultiaddrs) > 0 { + return r.StreamMultiaddrs[0] + } + } + // Fall back to pubsub multiaddrs + for _, r := range info.Reachability { + if len(r.PubsubMultiaddrs) > 0 { + return r.PubsubMultiaddrs[0] + } + } + return "" + } + + // Wait for key registry and peer info to arrive (provers broadcast every 5 minutes) + t.Log("Waiting for prover key registry and peer info (up to 10 minutes)...") + + var proverPeerID peer.ID + var proverMultiaddr string + timeout := time.After(10 * time.Minute) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + +waitLoop: + for { + select { + case <-timeout: + t.Log("Timeout waiting for prover info") + break waitLoop + case <-ticker.C: + // First try to get the peer ID from key registry + if proverPeerID == "" { + if pID, ok := getProverPeerID(); ok { + proverPeerID = pID + t.Logf("Found key registry: prover address %x -> peer ID %s", proverAddress, proverPeerID.String()) + } + } + + // If we have the peer ID, try to get the multiaddr from peer info + if proverPeerID != "" { + proverMultiaddr = getMultiaddrForPeer(proverPeerID) + if proverMultiaddr != "" { + t.Logf("Found prover peer info from peer info bitmask!") + break waitLoop + } + } + + // Log progress + keyRegistryMu.RLock() + peerInfoMu.RLock() + t.Logf("Still waiting... key registries: %d, peer infos: %d, have prover peer ID: %v", + len(keyRegistryMap), len(peerInfoMap), proverPeerID != "") + peerInfoMu.RUnlock() + keyRegistryMu.RUnlock() + } + } + + // If we have peer ID but no multiaddr, try connected peers + if proverPeerID != "" && proverMultiaddr == "" { + t.Log("Checking connected peers for prover...") + networkInfo := pubsub.GetNetworkInfo() + for _, info := range networkInfo.NetworkInfo { + if bytes.Equal(info.PeerId, []byte(proverPeerID)) && len(info.Multiaddrs) > 0 { + proverMultiaddr = info.Multiaddrs[0] + t.Logf("Found prover in connected peers") + break + } + } + } + + // Final fallback - direct lookup using peer ID + if proverPeerID != "" && proverMultiaddr == "" { + t.Logf("Attempting direct peer lookup...") + proverMultiaddr = pubsub.GetMultiaddrOfPeer([]byte(proverPeerID)) + } + + if proverPeerID == "" { + t.Skip("Could not find prover key registry - prover may not have broadcast key info yet") + } + + if proverMultiaddr == "" { + t.Skip("Could not find prover multiaddr - prover may not have broadcast peer info yet") + } + + t.Logf("Prover multiaddr: %s", proverMultiaddr) + + // Connect to the prover using direct gRPC connection via multiaddr + t.Log("Connecting to prover for hypersync...") + + // Create TLS credentials for the connection + creds, err := p2p.NewPeerAuthenticator( + logger, + p2pConfig, + nil, + nil, + nil, + nil, + [][]byte{[]byte(proverPeerID)}, + map[string]channel.AllowedPeerPolicyType{}, + map[string]channel.AllowedPeerPolicyType{}, + ).CreateClientTLSCredentials([]byte(proverPeerID)) + if err != nil { + t.Skipf("Could not create TLS credentials: %v", err) + } + + // Parse the multiaddr and convert to network address + ma, err := multiaddr.StringCast(proverMultiaddr) + if err != nil { + t.Skipf("Could not parse multiaddr %s: %v", proverMultiaddr, err) + } + + mga, err := mn.ToNetAddr(ma) + if err != nil { + t.Skipf("Could not convert multiaddr to net addr: %v", err) + } + + // Create gRPC client connection + conn, err := grpc.NewClient( + mga.String(), + grpc.WithTransportCredentials(creds), + ) + if err != nil { + t.Skipf("Could not establish connection to prover: %v", err) + } + defer conn.Close() + + client := protobufs.NewHypergraphComparisonServiceClient(conn) + + // Perform hypersync on all phases + t.Log("Performing hypersync on prover shard...") + + phases := []protobufs.HypergraphPhaseSet{ + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + } + + for _, phase := range phases { + stream, err := client.PerformSync(context.Background()) + if err != nil { + t.Logf("PerformSync error: %v", err) + continue + } + + _, err = clientHG.SyncFrom(stream, proverShardKey, phase, expectedRoot) + if err != nil { + t.Logf("SyncFrom error for phase %v: %v", phase, err) + } + _ = stream.CloseSend() + } + + // Commit client to compute root + _, err = clientHG.Commit(uint64(receivedFrame.Header.FrameNumber)) + require.NoError(t, err) + + // Verify client now has the expected prover root + clientProverRoot := clientHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(false) + t.Logf("Client prover root after sync: %x", clientProverRoot) + t.Logf("Expected prover root from frame: %x", expectedRoot) + + assert.Equal(t, expectedRoot, clientProverRoot, + "client prover root should match frame's prover tree commitment after hypersync") + + // Count vertices synced + clientTree := clientHG.GetVertexAddsSet(proverShardKey).GetTree() + clientLeaves := tries.GetAllLeaves( + clientTree.SetType, + clientTree.PhaseType, + clientTree.ShardKey, + clientTree.Root, + ) + + clientLeafCount := 0 + for _, leaf := range clientLeaves { + if leaf != nil { + clientLeafCount++ + } + } + + t.Logf("Hypersync complete: client synced %d prover vertices", clientLeafCount) + assert.Greater(t, clientLeafCount, 0, "should have synced at least some prover vertices") +} + +// TestHypergraphSyncWithPagination tests that syncing a large tree with >1000 leaves +// correctly handles pagination through multiple GetLeaves requests. +func TestHypergraphSyncWithPagination(t *testing.T) { + logger, _ := zap.NewDevelopment() + enc := verenc.NewMPCitHVerifiableEncryptor(1) + inclusionProver := bls48581.NewKZGInclusionProver(logger) + + // Create 1500 data trees to exceed the 1000 leaf batch size + numVertices := 1500 + dataTrees := make([]*tries.VectorCommitmentTree, numVertices) + eg := errgroup.Group{} + eg.SetLimit(100) + for i := 0; i < numVertices; i++ { + eg.Go(func() error { + dataTrees[i] = buildDataTree(t, inclusionProver) + return nil + }) + } + eg.Wait() + t.Log("Generated data trees") + + // Create server DB and store + serverDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_server/store"}, 0) + defer serverDB.Close() + + serverStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_server/store"}, + serverDB, + logger, + enc, + inclusionProver, + ) + + serverHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "server")), + serverStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + // Create client DB and store + clientDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_client/store"}, 0) + defer clientDB.Close() + + clientStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_client/store"}, + clientDB, + logger, + enc, + inclusionProver, + ) + + clientHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "client")), + clientStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + // Create all vertices in a single domain + domain := randomBytes32(t) + vertices := make([]application.Vertex, numVertices) + for i := 0; i < numVertices; i++ { + vertices[i] = hgcrdt.NewVertex( + domain, + randomBytes32(t), + dataTrees[i].Commit(inclusionProver, false), + dataTrees[i].GetSize(), + ) + } + shardKey := application.GetShardKey(vertices[0]) + + // Add all vertices to server + t.Logf("Adding %d vertices to server", numVertices) + serverTxn, err := serverStore.NewTransaction(false) + require.NoError(t, err) + for i, v := range vertices { + id := v.GetID() + require.NoError(t, serverStore.SaveVertexTree(serverTxn, id[:], dataTrees[i])) + require.NoError(t, serverHG.AddVertex(serverTxn, v)) + } + require.NoError(t, serverTxn.Commit()) + + // Add initial vertex to client (to establish same shard key) + clientTxn, err := clientStore.NewTransaction(false) + require.NoError(t, err) + id := vertices[0].GetID() + require.NoError(t, clientStore.SaveVertexTree(clientTxn, id[:], dataTrees[0])) + require.NoError(t, clientHG.AddVertex(clientTxn, vertices[0])) + require.NoError(t, clientTxn.Commit()) + + // Commit both + _, err = serverHG.Commit(1) + require.NoError(t, err) + _, err = clientHG.Commit(1) + require.NoError(t, err) + + serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + serverHG.PublishSnapshot(serverRoot) + + t.Logf("Server root: %x", serverRoot) + + // Verify server has 1500 vertices + serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree() + serverLeaves := tries.GetAllLeaves( + serverTree.SetType, + serverTree.PhaseType, + serverTree.ShardKey, + serverTree.Root, + ) + serverLeafCount := 0 + for _, leaf := range serverLeaves { + if leaf != nil { + serverLeafCount++ + } + } + assert.Equal(t, numVertices, serverLeafCount, "server should have %d leaves", numVertices) + t.Logf("Server has %d leaves", serverLeafCount) + + // Setup gRPC server + const bufSize = 1 << 20 + lis := bufconn.Listen(bufSize) + + grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxSendMsgSize(100*1024*1024), // 100 MB + grpc.ChainStreamInterceptor(func( + srv interface{}, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + _, priv, _ := ed448.GenerateKey(rand.Reader) + privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv) + require.NoError(t, err) + + pub := privKey.GetPublic() + peerID, err := peer.IDFromPublicKey(pub) + require.NoError(t, err) + + return handler(srv, &serverStream{ + ServerStream: ss, + ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID), + }) + }), + ) + protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG) + defer grpcServer.Stop() + + go func() { + _ = grpcServer.Serve(lis) + }() + + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + conn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(dialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB + ), + ) + require.NoError(t, err) + defer conn.Close() + + client := protobufs.NewHypergraphComparisonServiceClient(conn) + + // Perform sync + t.Log("Starting sync with pagination...") + stream, err := client.PerformSync(context.Background()) + require.NoError(t, err) + + _, err = clientHG.SyncFrom( + stream, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, + ) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + + // Commit client and verify + _, err = clientHG.Commit(2) + require.NoError(t, err) + + clientRoot := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + t.Logf("Client root after sync: %x", clientRoot) + + // Verify client now has all 1500 vertices + clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree() + clientLeaves := tries.GetAllLeaves( + clientTree.SetType, + clientTree.PhaseType, + clientTree.ShardKey, + clientTree.Root, + ) + clientLeafCount := 0 + for _, leaf := range clientLeaves { + if leaf != nil { + clientLeafCount++ + } + } + assert.Equal(t, numVertices, clientLeafCount, "client should have %d leaves after sync", numVertices) + t.Logf("Client has %d leaves after sync", clientLeafCount) + + // Verify roots match + assert.Equal(t, serverRoot, clientRoot, "client root should match server root after sync") + t.Log("Pagination test passed - client converged to server state") +} diff --git a/protobufs/application.pb.go b/protobufs/application.pb.go index 9dcbd34..0c99f5d 100644 --- a/protobufs/application.pb.go +++ b/protobufs/application.pb.go @@ -1531,6 +1531,10 @@ type HypergraphSyncGetBranchRequest struct { PhaseSet HypergraphPhaseSet `protobuf:"varint,2,opt,name=phase_set,json=phaseSet,proto3,enum=quilibrium.node.application.pb.HypergraphPhaseSet" json:"phase_set,omitempty"` // The path to query. Empty path queries the root. Path []int32 `protobuf:"varint,3,rep,packed,name=path,proto3" json:"path,omitempty"` + // The expected root commitment the client wants to sync against. When set, + // the server will attempt to find a snapshot with a matching root. If empty, + // the server uses the latest available snapshot. + ExpectedRoot []byte `protobuf:"bytes,4,opt,name=expected_root,json=expectedRoot,proto3" json:"expected_root,omitempty"` } func (x *HypergraphSyncGetBranchRequest) Reset() { @@ -1586,6 +1590,13 @@ func (x *HypergraphSyncGetBranchRequest) GetPath() []int32 { return nil } +func (x *HypergraphSyncGetBranchRequest) GetExpectedRoot() []byte { + if x != nil { + return x.ExpectedRoot + } + return nil +} + // HypergraphSyncBranchResponse contains branch information at the queried path. type HypergraphSyncBranchResponse struct { state protoimpl.MessageState @@ -1746,6 +1757,10 @@ type HypergraphSyncGetLeavesRequest struct { MaxLeaves uint32 `protobuf:"varint,4,opt,name=max_leaves,json=maxLeaves,proto3" json:"max_leaves,omitempty"` // Continuation token for pagination. Empty for first request. ContinuationToken []byte `protobuf:"bytes,5,opt,name=continuation_token,json=continuationToken,proto3" json:"continuation_token,omitempty"` + // The expected root commitment the client wants to sync against. When set, + // the server will attempt to find a snapshot with a matching root. If empty, + // the server uses the latest available snapshot. + ExpectedRoot []byte `protobuf:"bytes,6,opt,name=expected_root,json=expectedRoot,proto3" json:"expected_root,omitempty"` } func (x *HypergraphSyncGetLeavesRequest) Reset() { @@ -1815,6 +1830,13 @@ func (x *HypergraphSyncGetLeavesRequest) GetContinuationToken() []byte { return nil } +func (x *HypergraphSyncGetLeavesRequest) GetExpectedRoot() []byte { + if x != nil { + return x.ExpectedRoot + } + return nil +} + // HypergraphSyncLeavesResponse contains leaves from the requested subtree. type HypergraphSyncLeavesResponse struct { state protoimpl.MessageState @@ -2159,7 +2181,7 @@ var file_application_proto_rawDesc = []byte{ 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x0a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa2, 0x01, 0x0a, 0x1e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xc7, 0x01, 0x0a, 0x1e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x47, 0x65, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x73, 0x68, 0x61, 0x72, @@ -2169,124 +2191,128 @@ var file_application_proto_rawDesc = []byte{ 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, 0x73, 0x65, 0x53, 0x65, 0x74, 0x52, 0x08, 0x70, 0x68, 0x61, 0x73, 0x65, 0x53, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, - 0x03, 0x28, 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x22, 0xe8, 0x01, 0x0a, 0x1c, 0x48, 0x79, - 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x42, 0x72, 0x61, 0x6e, - 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x75, - 0x6c, 0x6c, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x03, 0x28, 0x05, 0x52, 0x08, 0x66, - 0x75, 0x6c, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, - 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x63, 0x6f, 0x6d, - 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x53, 0x0a, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, - 0x72, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x37, 0x2e, 0x71, 0x75, 0x69, 0x6c, - 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, - 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, - 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x49, 0x6e, - 0x66, 0x6f, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x12, 0x17, 0x0a, 0x07, - 0x69, 0x73, 0x5f, 0x6c, 0x65, 0x61, 0x66, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, - 0x73, 0x4c, 0x65, 0x61, 0x66, 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x65, 0x61, 0x66, 0x5f, 0x63, 0x6f, - 0x75, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6c, 0x65, 0x61, 0x66, 0x43, - 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x4f, 0x0a, 0x17, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, - 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, - 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, - 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, - 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, - 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xf0, 0x01, 0x0a, 0x1e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, - 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x47, 0x65, 0x74, 0x4c, 0x65, 0x61, 0x76, 0x65, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, - 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x73, 0x68, 0x61, - 0x72, 0x64, 0x4b, 0x65, 0x79, 0x12, 0x4f, 0x0a, 0x09, 0x70, 0x68, 0x61, 0x73, 0x65, 0x5f, 0x73, - 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x32, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, - 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, - 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, - 0x72, 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, 0x73, 0x65, 0x53, 0x65, 0x74, 0x52, 0x08, 0x70, 0x68, - 0x61, 0x73, 0x65, 0x53, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, - 0x20, 0x03, 0x28, 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x61, - 0x78, 0x5f, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, - 0x6d, 0x61, 0x78, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x6f, 0x6e, - 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, - 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x11, 0x63, 0x6f, 0x6e, 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, - 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0xc6, 0x01, 0x0a, 0x1c, 0x48, 0x79, 0x70, - 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x65, 0x61, 0x76, 0x65, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, - 0x68, 0x18, 0x01, 0x20, 0x03, 0x28, 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x40, 0x0a, - 0x06, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x28, 0x2e, - 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, - 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x4c, - 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, 0x61, 0x52, 0x06, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x12, - 0x2d, 0x0a, 0x12, 0x63, 0x6f, 0x6e, 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, - 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x11, 0x63, 0x6f, 0x6e, - 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x21, - 0x0a, 0x0c, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x4c, 0x65, 0x61, 0x76, 0x65, - 0x73, 0x22, 0x90, 0x01, 0x0a, 0x13, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, - 0x53, 0x79, 0x6e, 0x63, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x4b, 0x0a, 0x04, 0x63, 0x6f, 0x64, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x37, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, - 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, - 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x64, 0x65, - 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x03, 0x28, 0x05, 0x52, 0x04, - 0x70, 0x61, 0x74, 0x68, 0x2a, 0xb8, 0x01, 0x0a, 0x12, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, - 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, 0x73, 0x65, 0x53, 0x65, 0x74, 0x12, 0x24, 0x0a, 0x20, 0x48, - 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, - 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54, 0x45, 0x58, 0x5f, 0x41, 0x44, 0x44, 0x53, 0x10, - 0x00, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, - 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54, 0x45, 0x58, - 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10, 0x01, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59, - 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, - 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x41, 0x44, 0x44, - 0x53, 0x10, 0x02, 0x12, 0x2a, 0x0a, 0x26, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, - 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, - 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10, 0x03, 0x2a, - 0x8f, 0x02, 0x0a, 0x17, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, - 0x6e, 0x63, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x21, 0x0a, 0x1d, 0x48, - 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x45, - 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x2b, - 0x0a, 0x27, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, - 0x43, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, - 0x53, 0x48, 0x41, 0x52, 0x44, 0x5f, 0x4b, 0x45, 0x59, 0x10, 0x01, 0x12, 0x26, 0x0a, 0x22, 0x48, - 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x45, - 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x50, 0x41, 0x54, - 0x48, 0x10, 0x02, 0x12, 0x28, 0x0a, 0x24, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, - 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x4e, 0x4f, 0x44, - 0x45, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x03, 0x12, 0x2e, 0x0a, - 0x2a, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, - 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x53, 0x4e, 0x41, 0x50, 0x53, 0x48, 0x4f, 0x54, 0x5f, - 0x55, 0x4e, 0x41, 0x56, 0x41, 0x49, 0x4c, 0x41, 0x42, 0x4c, 0x45, 0x10, 0x04, 0x12, 0x22, 0x0a, - 0x1e, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, - 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, - 0x05, 0x32, 0xaa, 0x03, 0x0a, 0x1b, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, - 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x12, 0x7d, 0x0a, 0x0b, 0x48, 0x79, 0x70, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x12, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, - 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, - 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, - 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x1a, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, - 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, - 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x28, 0x01, 0x30, 0x01, - 0x12, 0x8b, 0x01, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, - 0x46, 0x6f, 0x72, 0x50, 0x61, 0x74, 0x68, 0x12, 0x39, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, - 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x69, 0x6c, - 0x64, 0x72, 0x65, 0x6e, 0x46, 0x6f, 0x72, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x3a, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, - 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x46, - 0x6f, 0x72, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x7e, - 0x0a, 0x0b, 0x50, 0x65, 0x72, 0x66, 0x6f, 0x72, 0x6d, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x33, 0x2e, + 0x03, 0x28, 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x78, 0x70, + 0x65, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x0c, 0x65, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x6f, 0x6f, 0x74, 0x22, 0xe8, + 0x01, 0x0a, 0x1c, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, + 0x63, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x1b, 0x0a, 0x09, 0x66, 0x75, 0x6c, 0x6c, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x05, 0x52, 0x08, 0x66, 0x75, 0x6c, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1e, 0x0a, 0x0a, + 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x53, 0x0a, 0x08, + 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x37, + 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, + 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, + 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x43, 0x68, + 0x69, 0x6c, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x63, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, + 0x6e, 0x12, 0x17, 0x0a, 0x07, 0x69, 0x73, 0x5f, 0x6c, 0x65, 0x61, 0x66, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x66, 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x65, + 0x61, 0x66, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, + 0x6c, 0x65, 0x61, 0x66, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x4f, 0x0a, 0x17, 0x48, 0x79, 0x70, + 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x43, 0x68, 0x69, 0x6c, 0x64, + 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, + 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, + 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x95, 0x02, 0x0a, 0x1e, 0x48, + 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x47, 0x65, 0x74, + 0x4c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, + 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x4b, 0x65, 0x79, 0x12, 0x4f, 0x0a, 0x09, 0x70, 0x68, + 0x61, 0x73, 0x65, 0x5f, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x32, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, - 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x51, 0x75, 0x65, - 0x72, 0x79, 0x1a, 0x36, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, - 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, - 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3a, - 0x5a, 0x38, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, - 0x69, 0x75, 0x6d, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, - 0x75, 0x6d, 0x2f, 0x6d, 0x6f, 0x6e, 0x6f, 0x72, 0x65, 0x70, 0x6f, 0x2f, 0x6e, 0x6f, 0x64, 0x65, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, 0x73, 0x65, 0x53, 0x65, + 0x74, 0x52, 0x08, 0x70, 0x68, 0x61, 0x73, 0x65, 0x53, 0x65, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70, + 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x03, 0x28, 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, + 0x1d, 0x0a, 0x0a, 0x6d, 0x61, 0x78, 0x5f, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x09, 0x6d, 0x61, 0x78, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x12, 0x2d, + 0x0a, 0x12, 0x63, 0x6f, 0x6e, 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, + 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x11, 0x63, 0x6f, 0x6e, 0x74, + 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x23, 0x0a, + 0x0d, 0x65, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x65, 0x78, 0x70, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x6f, + 0x6f, 0x74, 0x22, 0xc6, 0x01, 0x0a, 0x1c, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, + 0x68, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x40, 0x0a, 0x06, 0x6c, 0x65, 0x61, 0x76, 0x65, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, + 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x4c, 0x65, 0x61, 0x66, 0x44, 0x61, 0x74, + 0x61, 0x52, 0x06, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x6f, 0x6e, + 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x11, 0x63, 0x6f, 0x6e, 0x74, 0x69, 0x6e, 0x75, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x6f, 0x74, 0x61, + 0x6c, 0x5f, 0x6c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, + 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x73, 0x22, 0x90, 0x01, 0x0a, 0x13, + 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x45, 0x72, + 0x72, 0x6f, 0x72, 0x12, 0x4b, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x37, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, + 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, + 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, + 0x63, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, + 0x74, 0x68, 0x18, 0x03, 0x20, 0x03, 0x28, 0x05, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x2a, 0xb8, + 0x01, 0x0a, 0x12, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x50, 0x68, 0x61, + 0x73, 0x65, 0x53, 0x65, 0x74, 0x12, 0x24, 0x0a, 0x20, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, + 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, + 0x52, 0x54, 0x45, 0x58, 0x5f, 0x41, 0x44, 0x44, 0x53, 0x10, 0x00, 0x12, 0x27, 0x0a, 0x23, 0x48, + 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, + 0x53, 0x45, 0x54, 0x5f, 0x56, 0x45, 0x52, 0x54, 0x45, 0x58, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, + 0x45, 0x53, 0x10, 0x01, 0x12, 0x27, 0x0a, 0x23, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, + 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, + 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, 0x41, 0x44, 0x44, 0x53, 0x10, 0x02, 0x12, 0x2a, 0x0a, + 0x26, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x50, 0x48, 0x41, 0x53, + 0x45, 0x5f, 0x53, 0x45, 0x54, 0x5f, 0x48, 0x59, 0x50, 0x45, 0x52, 0x45, 0x44, 0x47, 0x45, 0x5f, + 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x53, 0x10, 0x03, 0x2a, 0x8f, 0x02, 0x0a, 0x17, 0x48, 0x79, + 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x45, 0x72, 0x72, 0x6f, + 0x72, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x21, 0x0a, 0x1d, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, + 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x55, + 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x2b, 0x0a, 0x27, 0x48, 0x59, 0x50, 0x45, + 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x45, 0x52, 0x52, 0x4f, + 0x52, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x53, 0x48, 0x41, 0x52, 0x44, 0x5f, + 0x4b, 0x45, 0x59, 0x10, 0x01, 0x12, 0x26, 0x0a, 0x22, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, + 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x49, + 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x50, 0x41, 0x54, 0x48, 0x10, 0x02, 0x12, 0x28, 0x0a, + 0x24, 0x48, 0x59, 0x50, 0x45, 0x52, 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, + 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, + 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x03, 0x12, 0x2e, 0x0a, 0x2a, 0x48, 0x59, 0x50, 0x45, 0x52, + 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, + 0x5f, 0x53, 0x4e, 0x41, 0x50, 0x53, 0x48, 0x4f, 0x54, 0x5f, 0x55, 0x4e, 0x41, 0x56, 0x41, 0x49, + 0x4c, 0x41, 0x42, 0x4c, 0x45, 0x10, 0x04, 0x12, 0x22, 0x0a, 0x1e, 0x48, 0x59, 0x50, 0x45, 0x52, + 0x47, 0x52, 0x41, 0x50, 0x48, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, + 0x5f, 0x49, 0x4e, 0x54, 0x45, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x05, 0x32, 0xaa, 0x03, 0x0a, 0x1b, + 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, + 0x69, 0x73, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, 0x0b, 0x48, + 0x79, 0x70, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x34, 0x2e, 0x71, 0x75, 0x69, + 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, + 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, + 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, + 0x1a, 0x34, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, + 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, + 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x43, 0x6f, 0x6d, 0x70, + 0x61, 0x72, 0x69, 0x73, 0x6f, 0x6e, 0x28, 0x01, 0x30, 0x01, 0x12, 0x8b, 0x01, 0x0a, 0x12, 0x47, + 0x65, 0x74, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x46, 0x6f, 0x72, 0x50, 0x61, 0x74, + 0x68, 0x12, 0x39, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, + 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, + 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x46, 0x6f, + 0x72, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x3a, 0x2e, 0x71, + 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, + 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x47, 0x65, + 0x74, 0x43, 0x68, 0x69, 0x6c, 0x64, 0x72, 0x65, 0x6e, 0x46, 0x6f, 0x72, 0x50, 0x61, 0x74, 0x68, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x7e, 0x0a, 0x0b, 0x50, 0x65, 0x72, 0x66, + 0x6f, 0x72, 0x6d, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x33, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, + 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, 0x70, 0x65, 0x72, 0x67, 0x72, + 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x51, 0x75, 0x65, 0x72, 0x79, 0x1a, 0x36, 0x2e, 0x71, + 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x61, + 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x62, 0x2e, 0x48, 0x79, + 0x70, 0x65, 0x72, 0x67, 0x72, 0x61, 0x70, 0x68, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3a, 0x5a, 0x38, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x2e, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x71, 0x75, 0x69, 0x6c, 0x69, 0x62, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x6d, 0x6f, 0x6e, + 0x6f, 0x72, 0x65, 0x70, 0x6f, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/protobufs/application.proto b/protobufs/application.proto index f8f2ed7..2f69c90 100644 --- a/protobufs/application.proto +++ b/protobufs/application.proto @@ -213,6 +213,10 @@ message HypergraphSyncGetBranchRequest { HypergraphPhaseSet phase_set = 2; // The path to query. Empty path queries the root. repeated int32 path = 3; + // The expected root commitment the client wants to sync against. When set, + // the server will attempt to find a snapshot with a matching root. If empty, + // the server uses the latest available snapshot. + bytes expected_root = 4; } // HypergraphSyncBranchResponse contains branch information at the queried path. @@ -250,6 +254,10 @@ message HypergraphSyncGetLeavesRequest { uint32 max_leaves = 4; // Continuation token for pagination. Empty for first request. bytes continuation_token = 5; + // The expected root commitment the client wants to sync against. When set, + // the server will attempt to find a snapshot with a matching root. If empty, + // the server uses the latest available snapshot. + bytes expected_root = 6; } // HypergraphSyncLeavesResponse contains leaves from the requested subtree. diff --git a/types/hypergraph/hypergraph.go b/types/hypergraph/hypergraph.go index 57837bf..0e1ad93 100644 --- a/types/hypergraph/hypergraph.go +++ b/types/hypergraph/hypergraph.go @@ -297,12 +297,15 @@ type Hypergraph interface { // SyncFrom is the client-side initiator for synchronization using the // client-driven protocol. The client navigates the server's tree and - // fetches differing data. + // fetches differing data. If expectedRoot is provided, the server will + // attempt to sync from a snapshot matching that root commitment. + // Returns the new root commitment after sync completes. SyncFrom( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey tries.ShardKey, phaseSet protobufs.HypergraphPhaseSet, - ) error + expectedRoot []byte, + ) ([]byte, error) // Transaction and utility operations diff --git a/types/mocks/hypergraph.go b/types/mocks/hypergraph.go index 0f42be7..da25d58 100644 --- a/types/mocks/hypergraph.go +++ b/types/mocks/hypergraph.go @@ -212,9 +212,13 @@ func (h *MockHypergraph) SyncFrom( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey tries.ShardKey, phaseSet protobufs.HypergraphPhaseSet, -) error { - args := h.Called(stream, shardKey, phaseSet) - return args.Error(0) + expectedRoot []byte, +) ([]byte, error) { + args := h.Called(stream, shardKey, phaseSet, expectedRoot) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]byte), args.Error(1) } // RunDataPruning implements hypergraph.Hypergraph.