mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
use simpler sync
This commit is contained in:
parent
c0215d5768
commit
69519ad0cb
812
hypergraph/sync_client_driven.go
Normal file
812
hypergraph/sync_client_driven.go
Normal file
@ -0,0 +1,812 @@
|
||||
package hypergraph
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
|
||||
"source.quilibrium.com/quilibrium/monorepo/types/tries"
|
||||
)
|
||||
|
||||
// syncSession holds the state for a PerformSync session.
|
||||
type syncSession struct {
|
||||
shardKey tries.ShardKey
|
||||
phaseSet protobufs.HypergraphPhaseSet
|
||||
snapshot *snapshotHandle
|
||||
idSet hypergraph.IdSet
|
||||
store tries.TreeBackingStore
|
||||
}
|
||||
|
||||
// PerformSync implements the server side of the client-driven sync protocol.
|
||||
// The client sends GetBranch and GetLeaves requests, and the server responds
|
||||
// with the requested data. This is simpler than HyperStream because there's
|
||||
// no need for both sides to walk in lockstep.
|
||||
//
|
||||
// The server uses a snapshot to ensure consistent reads throughout the session.
|
||||
func (hg *HypergraphCRDT) PerformSync(
|
||||
stream protobufs.HypergraphComparisonService_PerformSyncServer,
|
||||
) error {
|
||||
ctx := stream.Context()
|
||||
|
||||
logger := hg.logger.With(zap.String("method", "PerformSync"))
|
||||
sessionStart := time.Now()
|
||||
|
||||
// Session state - initialized on first request
|
||||
var session *syncSession
|
||||
defer func() {
|
||||
if session != nil && session.snapshot != nil {
|
||||
hg.snapshotMgr.release(session.snapshot)
|
||||
}
|
||||
logger.Info(
|
||||
"PerformSync session finished",
|
||||
zap.Duration("duration", time.Since(sessionStart)),
|
||||
)
|
||||
}()
|
||||
|
||||
// Process requests until stream closes
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
req, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "receive request")
|
||||
}
|
||||
|
||||
var resp *protobufs.HypergraphSyncResponse
|
||||
|
||||
switch r := req.Request.(type) {
|
||||
case *protobufs.HypergraphSyncQuery_GetBranch:
|
||||
// Initialize session on first request
|
||||
if session == nil {
|
||||
session, err = hg.initSyncSession(r.GetBranch.ShardKey, r.GetBranch.PhaseSet, logger)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "init sync session")
|
||||
}
|
||||
}
|
||||
resp, err = hg.handleGetBranch(ctx, r.GetBranch, session, logger)
|
||||
case *protobufs.HypergraphSyncQuery_GetLeaves:
|
||||
// Initialize session on first request
|
||||
if session == nil {
|
||||
session, err = hg.initSyncSession(r.GetLeaves.ShardKey, r.GetLeaves.PhaseSet, logger)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "init sync session")
|
||||
}
|
||||
}
|
||||
resp, err = hg.handleGetLeaves(ctx, r.GetLeaves, session, logger)
|
||||
default:
|
||||
resp = &protobufs.HypergraphSyncResponse{
|
||||
Response: &protobufs.HypergraphSyncResponse_Error{
|
||||
Error: &protobufs.HypergraphSyncError{
|
||||
Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_UNKNOWN,
|
||||
Message: "unknown request type",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Error("error handling request", zap.Error(err))
|
||||
resp = &protobufs.HypergraphSyncResponse{
|
||||
Response: &protobufs.HypergraphSyncResponse_Error{
|
||||
Error: &protobufs.HypergraphSyncError{
|
||||
Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_INTERNAL,
|
||||
Message: err.Error(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if err := stream.Send(resp); err != nil {
|
||||
return errors.Wrap(err, "send response")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// initSyncSession initializes a sync session with a snapshot for consistent reads.
|
||||
func (hg *HypergraphCRDT) initSyncSession(
|
||||
shardKeyBytes []byte,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
logger *zap.Logger,
|
||||
) (*syncSession, error) {
|
||||
if len(shardKeyBytes) != 35 {
|
||||
return nil, errors.New("shard key must be 35 bytes")
|
||||
}
|
||||
|
||||
shardKey := tries.ShardKey{
|
||||
L1: [3]byte(shardKeyBytes[:3]),
|
||||
L2: [32]byte(shardKeyBytes[3:]),
|
||||
}
|
||||
|
||||
// Acquire a snapshot for consistent reads throughout the session
|
||||
snapshot := hg.snapshotMgr.acquire(shardKey, nil)
|
||||
if snapshot == nil {
|
||||
return nil, errors.New("failed to acquire snapshot")
|
||||
}
|
||||
|
||||
snapshotStore := snapshot.Store()
|
||||
idSet := hg.snapshotPhaseSet(shardKey, phaseSet, snapshotStore)
|
||||
if idSet == nil {
|
||||
hg.snapshotMgr.release(snapshot)
|
||||
return nil, errors.New("unsupported phase set")
|
||||
}
|
||||
|
||||
logger.Debug(
|
||||
"sync session initialized",
|
||||
zap.String("shard", hex.EncodeToString(shardKeyBytes)),
|
||||
zap.Int("phaseSet", int(phaseSet)),
|
||||
)
|
||||
|
||||
return &syncSession{
|
||||
shardKey: shardKey,
|
||||
phaseSet: phaseSet,
|
||||
snapshot: snapshot,
|
||||
idSet: idSet,
|
||||
store: snapshotStore,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (hg *HypergraphCRDT) handleGetBranch(
|
||||
ctx context.Context,
|
||||
req *protobufs.HypergraphSyncGetBranchRequest,
|
||||
session *syncSession,
|
||||
logger *zap.Logger,
|
||||
) (*protobufs.HypergraphSyncResponse, error) {
|
||||
tree := session.idSet.GetTree()
|
||||
if tree == nil || tree.Root == nil {
|
||||
// Empty tree - return empty response
|
||||
return &protobufs.HypergraphSyncResponse{
|
||||
Response: &protobufs.HypergraphSyncResponse_Branch{
|
||||
Branch: &protobufs.HypergraphSyncBranchResponse{
|
||||
FullPath: req.Path,
|
||||
Commitment: nil,
|
||||
Children: nil,
|
||||
IsLeaf: true,
|
||||
LeafCount: 0,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
path := toIntSlice(req.Path)
|
||||
|
||||
node := getNodeAtPath(
|
||||
logger,
|
||||
tree.SetType,
|
||||
tree.PhaseType,
|
||||
tree.ShardKey,
|
||||
tree.Root,
|
||||
toInt32Slice(path),
|
||||
0,
|
||||
)
|
||||
|
||||
if node == nil {
|
||||
return &protobufs.HypergraphSyncResponse{
|
||||
Response: &protobufs.HypergraphSyncResponse_Error{
|
||||
Error: &protobufs.HypergraphSyncError{
|
||||
Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND,
|
||||
Message: "node not found at path",
|
||||
Path: req.Path,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
resp := &protobufs.HypergraphSyncBranchResponse{}
|
||||
|
||||
// Ensure commitment is computed first
|
||||
node = ensureCommittedNode(logger, tree, path, node)
|
||||
|
||||
switch n := node.(type) {
|
||||
case *tries.LazyVectorCommitmentBranchNode:
|
||||
resp.FullPath = toInt32Slice(n.FullPrefix)
|
||||
resp.Commitment = n.Commitment
|
||||
resp.IsLeaf = false
|
||||
resp.LeafCount = uint64(n.LeafCount)
|
||||
|
||||
// Collect children
|
||||
for i := 0; i < 64; i++ {
|
||||
child := n.Children[i]
|
||||
if child == nil {
|
||||
var err error
|
||||
child, err = n.Store.GetNodeByPath(
|
||||
tree.SetType,
|
||||
tree.PhaseType,
|
||||
tree.ShardKey,
|
||||
slices.Concat(n.FullPrefix, []int{i}),
|
||||
)
|
||||
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if child != nil {
|
||||
childPath := slices.Concat(n.FullPrefix, []int{i})
|
||||
child = ensureCommittedNode(logger, tree, childPath, child)
|
||||
|
||||
var childCommit []byte
|
||||
switch c := child.(type) {
|
||||
case *tries.LazyVectorCommitmentBranchNode:
|
||||
childCommit = c.Commitment
|
||||
case *tries.LazyVectorCommitmentLeafNode:
|
||||
childCommit = c.Commitment
|
||||
}
|
||||
|
||||
if len(childCommit) > 0 {
|
||||
resp.Children = append(resp.Children, &protobufs.HypergraphSyncChildInfo{
|
||||
Index: int32(i),
|
||||
Commitment: childCommit,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case *tries.LazyVectorCommitmentLeafNode:
|
||||
resp.FullPath = req.Path // Leaves don't have FullPrefix, use requested path
|
||||
resp.Commitment = n.Commitment
|
||||
resp.IsLeaf = true
|
||||
resp.LeafCount = 1
|
||||
}
|
||||
|
||||
return &protobufs.HypergraphSyncResponse{
|
||||
Response: &protobufs.HypergraphSyncResponse_Branch{
|
||||
Branch: resp,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (hg *HypergraphCRDT) handleGetLeaves(
|
||||
ctx context.Context,
|
||||
req *protobufs.HypergraphSyncGetLeavesRequest,
|
||||
session *syncSession,
|
||||
logger *zap.Logger,
|
||||
) (*protobufs.HypergraphSyncResponse, error) {
|
||||
tree := session.idSet.GetTree()
|
||||
if tree == nil || tree.Root == nil {
|
||||
return &protobufs.HypergraphSyncResponse{
|
||||
Response: &protobufs.HypergraphSyncResponse_Leaves{
|
||||
Leaves: &protobufs.HypergraphSyncLeavesResponse{
|
||||
Path: req.Path,
|
||||
Leaves: nil,
|
||||
TotalLeaves: 0,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
path := toIntSlice(req.Path)
|
||||
|
||||
node := getNodeAtPath(
|
||||
logger,
|
||||
tree.SetType,
|
||||
tree.PhaseType,
|
||||
tree.ShardKey,
|
||||
tree.Root,
|
||||
toInt32Slice(path),
|
||||
0,
|
||||
)
|
||||
|
||||
if node == nil {
|
||||
return &protobufs.HypergraphSyncResponse{
|
||||
Response: &protobufs.HypergraphSyncResponse_Error{
|
||||
Error: &protobufs.HypergraphSyncError{
|
||||
Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND,
|
||||
Message: "node not found at path",
|
||||
Path: req.Path,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get all leaves under this node
|
||||
allLeaves := tries.GetAllLeaves(
|
||||
tree.SetType,
|
||||
tree.PhaseType,
|
||||
tree.ShardKey,
|
||||
node,
|
||||
)
|
||||
|
||||
// Apply pagination
|
||||
maxLeaves := int(req.MaxLeaves)
|
||||
if maxLeaves == 0 {
|
||||
maxLeaves = 1000 // Default batch size
|
||||
}
|
||||
|
||||
startIdx := 0
|
||||
if len(req.ContinuationToken) > 0 {
|
||||
// Simple continuation: token is the start index as hex
|
||||
idx, err := parseContToken(req.ContinuationToken)
|
||||
if err == nil {
|
||||
startIdx = idx
|
||||
}
|
||||
}
|
||||
|
||||
var leaves []*protobufs.LeafData
|
||||
var totalNonNil uint64
|
||||
|
||||
for i, leaf := range allLeaves {
|
||||
if leaf == nil {
|
||||
continue
|
||||
}
|
||||
totalNonNil++
|
||||
|
||||
if int(totalNonNil) <= startIdx {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(leaves) >= maxLeaves {
|
||||
break
|
||||
}
|
||||
|
||||
leafData := &protobufs.LeafData{
|
||||
Key: leaf.Key,
|
||||
Value: leaf.Value,
|
||||
HashTarget: leaf.HashTarget,
|
||||
Size: leaf.Size.FillBytes(make([]byte, 32)),
|
||||
}
|
||||
|
||||
// Load underlying vertex tree if available (use snapshot store for consistency)
|
||||
vtree, err := session.store.LoadVertexTree(leaf.Key)
|
||||
if err == nil && vtree != nil {
|
||||
data, err := tries.SerializeNonLazyTree(vtree)
|
||||
if err == nil {
|
||||
leafData.UnderlyingData = data
|
||||
}
|
||||
}
|
||||
|
||||
leaves = append(leaves, leafData)
|
||||
_ = i // suppress unused warning
|
||||
}
|
||||
|
||||
resp := &protobufs.HypergraphSyncLeavesResponse{
|
||||
Path: req.Path,
|
||||
Leaves: leaves,
|
||||
TotalLeaves: totalNonNil,
|
||||
}
|
||||
|
||||
// Set continuation token if more leaves remain
|
||||
if startIdx+len(leaves) < int(totalNonNil) {
|
||||
resp.ContinuationToken = makeContToken(startIdx + len(leaves))
|
||||
}
|
||||
|
||||
return &protobufs.HypergraphSyncResponse{
|
||||
Response: &protobufs.HypergraphSyncResponse_Leaves{
|
||||
Leaves: resp,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (hg *HypergraphCRDT) getPhaseSet(
|
||||
shardKey tries.ShardKey,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
) hypergraph.IdSet {
|
||||
switch phaseSet {
|
||||
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
|
||||
return hg.getVertexAddsSet(shardKey)
|
||||
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
|
||||
return hg.getVertexRemovesSet(shardKey)
|
||||
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
|
||||
return hg.getHyperedgeAddsSet(shardKey)
|
||||
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
|
||||
return hg.getHyperedgeRemovesSet(shardKey)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func parseContToken(token []byte) (int, error) {
|
||||
if len(token) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
var idx int
|
||||
_, err := hex.Decode(make([]byte, len(token)/2), token)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Simple: just parse as decimal string
|
||||
for _, b := range token {
|
||||
if b >= '0' && b <= '9' {
|
||||
idx = idx*10 + int(b-'0')
|
||||
}
|
||||
}
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
func makeContToken(idx int) []byte {
|
||||
return []byte(hex.EncodeToString([]byte{byte(idx >> 24), byte(idx >> 16), byte(idx >> 8), byte(idx)}))
|
||||
}
|
||||
|
||||
// SyncFrom performs a client-driven sync from the given server stream.
|
||||
// It navigates to the covered prefix (if any), then recursively syncs
|
||||
// differing subtrees.
|
||||
func (hg *HypergraphCRDT) SyncFrom(
|
||||
stream protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey tries.ShardKey,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
) error {
|
||||
hg.mu.Lock()
|
||||
defer hg.mu.Unlock()
|
||||
|
||||
logger := hg.logger.With(
|
||||
zap.String("method", "SyncFrom"),
|
||||
zap.String("shard", hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:]))),
|
||||
)
|
||||
|
||||
syncStart := time.Now()
|
||||
defer func() {
|
||||
logger.Info("SyncFrom completed", zap.Duration("duration", time.Since(syncStart)))
|
||||
}()
|
||||
|
||||
set := hg.getPhaseSet(shardKey, phaseSet)
|
||||
if set == nil {
|
||||
return errors.New("unsupported phase set")
|
||||
}
|
||||
|
||||
shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:])
|
||||
coveredPrefix := hg.getCoveredPrefix()
|
||||
|
||||
// Step 1: Navigate to sync point
|
||||
syncPoint, err := hg.navigateToSyncPoint(stream, shardKeyBytes, phaseSet, coveredPrefix, logger)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "navigate to sync point")
|
||||
}
|
||||
|
||||
if syncPoint == nil || len(syncPoint.Commitment) == 0 {
|
||||
logger.Info("server has no data at sync point")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Step 2: Sync the subtree
|
||||
err = hg.syncSubtree(stream, shardKeyBytes, phaseSet, syncPoint, set, logger)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "sync subtree")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (hg *HypergraphCRDT) navigateToSyncPoint(
|
||||
stream protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey []byte,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
coveredPrefix []int,
|
||||
logger *zap.Logger,
|
||||
) (*protobufs.HypergraphSyncBranchResponse, error) {
|
||||
path := []int32{}
|
||||
|
||||
for {
|
||||
// Query server for branch at current path
|
||||
err := stream.Send(&protobufs.HypergraphSyncQuery{
|
||||
Request: &protobufs.HypergraphSyncQuery_GetBranch{
|
||||
GetBranch: &protobufs.HypergraphSyncGetBranchRequest{
|
||||
ShardKey: shardKey,
|
||||
PhaseSet: phaseSet,
|
||||
Path: path,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "send GetBranch request")
|
||||
}
|
||||
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "receive GetBranch response")
|
||||
}
|
||||
|
||||
if errResp := resp.GetError(); errResp != nil {
|
||||
if errResp.Code == protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND {
|
||||
// Server doesn't have this path - nothing to sync
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errors.Errorf("server error: %s", errResp.Message)
|
||||
}
|
||||
|
||||
branch := resp.GetBranch()
|
||||
if branch == nil {
|
||||
return nil, errors.New("unexpected response type")
|
||||
}
|
||||
|
||||
logger.Debug("navigating",
|
||||
zap.String("path", hex.EncodeToString(packPath(path))),
|
||||
zap.String("fullPath", hex.EncodeToString(packPath(branch.FullPath))),
|
||||
zap.Int("coveredPrefixLen", len(coveredPrefix)),
|
||||
)
|
||||
|
||||
// If no covered prefix, root is the sync point
|
||||
if len(coveredPrefix) == 0 {
|
||||
return branch, nil
|
||||
}
|
||||
|
||||
// Check if server's full path reaches or passes our covered prefix
|
||||
serverPath := toIntSlice(branch.FullPath)
|
||||
if isPrefixOrEqual(coveredPrefix, serverPath) {
|
||||
return branch, nil
|
||||
}
|
||||
|
||||
// Need to navigate deeper - find next child to descend into
|
||||
if len(serverPath) >= len(coveredPrefix) {
|
||||
// Server path is longer but doesn't match our prefix
|
||||
// This means server has data outside our coverage
|
||||
return branch, nil
|
||||
}
|
||||
|
||||
// Server path is shorter - we need to go deeper
|
||||
nextNibble := coveredPrefix[len(serverPath)]
|
||||
|
||||
// Check if server has a child at this index
|
||||
found := false
|
||||
for _, child := range branch.Children {
|
||||
if int(child.Index) == nextNibble {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
// Server doesn't have the path we need
|
||||
logger.Debug("server missing path to covered prefix",
|
||||
zap.Int("nextNibble", nextNibble),
|
||||
)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Descend to next level
|
||||
path = append(branch.FullPath, int32(nextNibble))
|
||||
}
|
||||
}
|
||||
|
||||
func (hg *HypergraphCRDT) syncSubtree(
|
||||
stream protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey []byte,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
serverBranch *protobufs.HypergraphSyncBranchResponse,
|
||||
localSet hypergraph.IdSet,
|
||||
logger *zap.Logger,
|
||||
) error {
|
||||
tree := localSet.GetTree()
|
||||
|
||||
// Get local node at same path
|
||||
var localCommitment []byte
|
||||
if tree != nil && tree.Root != nil {
|
||||
path := toIntSlice(serverBranch.FullPath)
|
||||
localNode := getNodeAtPath(
|
||||
logger,
|
||||
tree.SetType,
|
||||
tree.PhaseType,
|
||||
tree.ShardKey,
|
||||
tree.Root,
|
||||
serverBranch.FullPath,
|
||||
0,
|
||||
)
|
||||
if localNode != nil {
|
||||
localNode = ensureCommittedNode(logger, tree, path, localNode)
|
||||
switch n := localNode.(type) {
|
||||
case *tries.LazyVectorCommitmentBranchNode:
|
||||
localCommitment = n.Commitment
|
||||
case *tries.LazyVectorCommitmentLeafNode:
|
||||
localCommitment = n.Commitment
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If commitments match, subtrees are identical
|
||||
if bytes.Equal(localCommitment, serverBranch.Commitment) {
|
||||
logger.Debug("subtree matches",
|
||||
zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// If server node is a leaf or has no children, fetch all leaves
|
||||
if serverBranch.IsLeaf || len(serverBranch.Children) == 0 {
|
||||
return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, serverBranch.FullPath, localSet, logger)
|
||||
}
|
||||
|
||||
// Compare children and recurse
|
||||
localChildren := make(map[int32][]byte)
|
||||
if tree != nil && tree.Root != nil {
|
||||
path := toIntSlice(serverBranch.FullPath)
|
||||
localNode := getNodeAtPath(
|
||||
logger,
|
||||
tree.SetType,
|
||||
tree.PhaseType,
|
||||
tree.ShardKey,
|
||||
tree.Root,
|
||||
serverBranch.FullPath,
|
||||
0,
|
||||
)
|
||||
if branch, ok := localNode.(*tries.LazyVectorCommitmentBranchNode); ok {
|
||||
for i := 0; i < 64; i++ {
|
||||
child := branch.Children[i]
|
||||
if child == nil {
|
||||
child, _ = branch.Store.GetNodeByPath(
|
||||
tree.SetType,
|
||||
tree.PhaseType,
|
||||
tree.ShardKey,
|
||||
slices.Concat(path, []int{i}),
|
||||
)
|
||||
}
|
||||
if child != nil {
|
||||
childPath := slices.Concat(path, []int{i})
|
||||
child = ensureCommittedNode(logger, tree, childPath, child)
|
||||
switch c := child.(type) {
|
||||
case *tries.LazyVectorCommitmentBranchNode:
|
||||
localChildren[int32(i)] = c.Commitment
|
||||
case *tries.LazyVectorCommitmentLeafNode:
|
||||
localChildren[int32(i)] = c.Commitment
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, serverChild := range serverBranch.Children {
|
||||
localChildCommit := localChildren[serverChild.Index]
|
||||
|
||||
if bytes.Equal(localChildCommit, serverChild.Commitment) {
|
||||
// Child matches, skip
|
||||
continue
|
||||
}
|
||||
|
||||
// Need to sync this child
|
||||
childPath := append(slices.Clone(serverBranch.FullPath), serverChild.Index)
|
||||
|
||||
// Query for child branch
|
||||
err := stream.Send(&protobufs.HypergraphSyncQuery{
|
||||
Request: &protobufs.HypergraphSyncQuery_GetBranch{
|
||||
GetBranch: &protobufs.HypergraphSyncGetBranchRequest{
|
||||
ShardKey: shardKey,
|
||||
PhaseSet: phaseSet,
|
||||
Path: childPath,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "send GetBranch for child")
|
||||
}
|
||||
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "receive GetBranch response for child")
|
||||
}
|
||||
|
||||
if errResp := resp.GetError(); errResp != nil {
|
||||
logger.Warn("error getting child branch",
|
||||
zap.String("error", errResp.Message),
|
||||
zap.String("path", hex.EncodeToString(packPath(childPath))),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
childBranch := resp.GetBranch()
|
||||
if childBranch == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Recurse
|
||||
if err := hg.syncSubtree(stream, shardKey, phaseSet, childBranch, localSet, logger); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (hg *HypergraphCRDT) fetchAndIntegrateLeaves(
|
||||
stream protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey []byte,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
path []int32,
|
||||
localSet hypergraph.IdSet,
|
||||
logger *zap.Logger,
|
||||
) error {
|
||||
logger.Debug("fetching leaves",
|
||||
zap.String("path", hex.EncodeToString(packPath(path))),
|
||||
)
|
||||
|
||||
var continuationToken []byte
|
||||
totalFetched := 0
|
||||
|
||||
for {
|
||||
err := stream.Send(&protobufs.HypergraphSyncQuery{
|
||||
Request: &protobufs.HypergraphSyncQuery_GetLeaves{
|
||||
GetLeaves: &protobufs.HypergraphSyncGetLeavesRequest{
|
||||
ShardKey: shardKey,
|
||||
PhaseSet: phaseSet,
|
||||
Path: path,
|
||||
MaxLeaves: 1000,
|
||||
ContinuationToken: continuationToken,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "send GetLeaves request")
|
||||
}
|
||||
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "receive GetLeaves response")
|
||||
}
|
||||
|
||||
if errResp := resp.GetError(); errResp != nil {
|
||||
return errors.Errorf("server error: %s", errResp.Message)
|
||||
}
|
||||
|
||||
leavesResp := resp.GetLeaves()
|
||||
if leavesResp == nil {
|
||||
return errors.New("unexpected response type")
|
||||
}
|
||||
|
||||
// Integrate leaves into local tree
|
||||
txn, err := hg.store.NewTransaction(false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "create transaction")
|
||||
}
|
||||
|
||||
for _, leaf := range leavesResp.Leaves {
|
||||
atom := AtomFromBytes(leaf.Value)
|
||||
|
||||
// Persist underlying tree if present
|
||||
if len(leaf.UnderlyingData) > 0 {
|
||||
vtree, err := tries.DeserializeNonLazyTree(leaf.UnderlyingData)
|
||||
if err == nil {
|
||||
if err := hg.store.SaveVertexTree(txn, leaf.Key, vtree); err != nil {
|
||||
logger.Warn("failed to save vertex tree", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := localSet.Add(txn, atom); err != nil {
|
||||
txn.Abort()
|
||||
return errors.Wrap(err, "add leaf to local set")
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Commit(); err != nil {
|
||||
return errors.Wrap(err, "commit transaction")
|
||||
}
|
||||
|
||||
totalFetched += len(leavesResp.Leaves)
|
||||
|
||||
logger.Debug("fetched leaves batch",
|
||||
zap.Int("count", len(leavesResp.Leaves)),
|
||||
zap.Int("totalFetched", totalFetched),
|
||||
zap.Uint64("totalAvailable", leavesResp.TotalLeaves),
|
||||
)
|
||||
|
||||
// Check if more leaves remain
|
||||
if len(leavesResp.ContinuationToken) == 0 {
|
||||
break
|
||||
}
|
||||
continuationToken = leavesResp.ContinuationToken
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func isPrefixOrEqual(prefix, path []int) bool {
|
||||
if len(prefix) > len(path) {
|
||||
return false
|
||||
}
|
||||
for i, v := range prefix {
|
||||
if path[i] != v {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
@ -404,9 +404,8 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSync(
|
||||
}
|
||||
|
||||
phaseSyncs := [](func(
|
||||
protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
tries.ShardKey,
|
||||
[]byte,
|
||||
) []byte){
|
||||
p.hyperSyncVertexAdds,
|
||||
p.hyperSyncVertexRemoves,
|
||||
@ -433,13 +432,13 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSync(
|
||||
}
|
||||
|
||||
client := protobufs.NewHypergraphComparisonServiceClient(ch)
|
||||
str, err := client.HyperStream(ctx)
|
||||
str, err := client.PerformSync(ctx)
|
||||
if err != nil {
|
||||
p.logger.Error("error from sync", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
root := syncPhase(str, shardKey, expectedRoot)
|
||||
root := syncPhase(str, shardKey)
|
||||
if cerr := ch.Close(); cerr != nil {
|
||||
p.logger.Error("error while closing connection", zap.Error(cerr))
|
||||
}
|
||||
@ -479,9 +478,8 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSyncSelf(
|
||||
}
|
||||
|
||||
phaseSyncs := [](func(
|
||||
protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
tries.ShardKey,
|
||||
[]byte,
|
||||
) []byte){
|
||||
p.hyperSyncVertexAdds,
|
||||
p.hyperSyncVertexRemoves,
|
||||
@ -507,13 +505,13 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSyncSelf(
|
||||
}
|
||||
|
||||
client := protobufs.NewHypergraphComparisonServiceClient(ch)
|
||||
str, err := client.HyperStream(ctx)
|
||||
str, err := client.PerformSync(ctx)
|
||||
if err != nil {
|
||||
p.logger.Error("error from self-sync", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
syncPhase(str, shardKey, expectedRoot)
|
||||
syncPhase(str, shardKey)
|
||||
if cerr := ch.Close(); cerr != nil {
|
||||
p.logger.Error("error while closing connection", zap.Error(cerr))
|
||||
}
|
||||
@ -524,75 +522,67 @@ func (p *SyncProvider[StateT, ProposalT]) HyperSyncSelf(
|
||||
}
|
||||
|
||||
func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexAdds(
|
||||
str protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
str protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey tries.ShardKey,
|
||||
expectedRoot []byte,
|
||||
) []byte {
|
||||
root, err := p.hypergraph.Sync(
|
||||
err := p.hypergraph.SyncFrom(
|
||||
str,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
expectedRoot,
|
||||
)
|
||||
if err != nil {
|
||||
p.logger.Error("error from sync", zap.Error(err))
|
||||
}
|
||||
str.CloseSend()
|
||||
return root
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncProvider[StateT, ProposalT]) hyperSyncVertexRemoves(
|
||||
str protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
str protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey tries.ShardKey,
|
||||
expectedRoot []byte,
|
||||
) []byte {
|
||||
root, err := p.hypergraph.Sync(
|
||||
err := p.hypergraph.SyncFrom(
|
||||
str,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES,
|
||||
expectedRoot,
|
||||
)
|
||||
if err != nil {
|
||||
p.logger.Error("error from sync", zap.Error(err))
|
||||
}
|
||||
str.CloseSend()
|
||||
return root
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncProvider[StateT, ProposalT]) hyperSyncHyperedgeAdds(
|
||||
str protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
str protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey tries.ShardKey,
|
||||
expectedRoot []byte,
|
||||
) []byte {
|
||||
root, err := p.hypergraph.Sync(
|
||||
err := p.hypergraph.SyncFrom(
|
||||
str,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS,
|
||||
expectedRoot,
|
||||
)
|
||||
if err != nil {
|
||||
p.logger.Error("error from sync", zap.Error(err))
|
||||
}
|
||||
str.CloseSend()
|
||||
return root
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncProvider[StateT, ProposalT]) hyperSyncHyperedgeRemoves(
|
||||
str protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
str protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey tries.ShardKey,
|
||||
expectedRoot []byte,
|
||||
) []byte {
|
||||
root, err := p.hypergraph.Sync(
|
||||
err := p.hypergraph.SyncFrom(
|
||||
str,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES,
|
||||
expectedRoot,
|
||||
)
|
||||
if err != nil {
|
||||
p.logger.Error("error from sync", zap.Error(err))
|
||||
}
|
||||
str.CloseSend()
|
||||
return root
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SyncProvider[StateT, ProposalT]) AddState(
|
||||
|
||||
@ -186,7 +186,8 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
|
||||
"quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer,
|
||||
},
|
||||
map[string]channel.AllowedPeerPolicyType{
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer,
|
||||
|
||||
@ -305,12 +305,12 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
log.Fatalf("Client: failed to listen: %v", err)
|
||||
}
|
||||
client := protobufs.NewHypergraphComparisonServiceClient(conn)
|
||||
str, err := client.HyperStream(context.TODO())
|
||||
str, err := client.PerformSync(context.TODO())
|
||||
if err != nil {
|
||||
log.Fatalf("Client: failed to stream: %v", err)
|
||||
}
|
||||
|
||||
_, err = crdts[1].Sync(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
|
||||
err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS)
|
||||
if err != nil {
|
||||
log.Fatalf("Client: failed to sync 1: %v", err)
|
||||
}
|
||||
@ -354,12 +354,12 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
|
||||
str, err = client.HyperStream(context.TODO())
|
||||
str, err = client.PerformSync(context.TODO())
|
||||
if err != nil {
|
||||
log.Fatalf("Client: failed to stream: %v", err)
|
||||
}
|
||||
|
||||
_, err = crdts[1].Sync(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
|
||||
err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS)
|
||||
if err != nil {
|
||||
log.Fatalf("Client: failed to sync 2: %v", err)
|
||||
}
|
||||
@ -608,12 +608,12 @@ func TestHypergraphPartialSync(t *testing.T) {
|
||||
log.Fatalf("Client: failed to listen: %v", err)
|
||||
}
|
||||
client := protobufs.NewHypergraphComparisonServiceClient(conn)
|
||||
str, err := client.HyperStream(context.TODO())
|
||||
str, err := client.PerformSync(context.TODO())
|
||||
if err != nil {
|
||||
log.Fatalf("Client: failed to stream: %v", err)
|
||||
}
|
||||
|
||||
_, err = crdts[1].Sync(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
|
||||
err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS)
|
||||
if err != nil {
|
||||
log.Fatalf("Client: failed to sync 1: %v", err)
|
||||
}
|
||||
@ -627,13 +627,13 @@ func TestHypergraphPartialSync(t *testing.T) {
|
||||
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
|
||||
str, err = client.HyperStream(context.TODO())
|
||||
str, err = client.PerformSync(context.TODO())
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Client: failed to stream: %v", err)
|
||||
}
|
||||
|
||||
_, err = crdts[1].Sync(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
|
||||
err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS)
|
||||
if err != nil {
|
||||
log.Fatalf("Client: failed to sync 2: %v", err)
|
||||
}
|
||||
@ -866,13 +866,12 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
|
||||
context.Background(),
|
||||
100*time.Second,
|
||||
)
|
||||
stream, err := client.HyperStream(streamCtx)
|
||||
stream, err := client.PerformSync(streamCtx)
|
||||
require.NoError(t, err)
|
||||
clientHG.Sync(
|
||||
clientHG.SyncFrom(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, stream.CloseSend())
|
||||
cancelStream()
|
||||
@ -924,18 +923,17 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
|
||||
// Publish the server's snapshot so clients can sync against this exact state
|
||||
serverHG.PublishSnapshot(serverRoot)
|
||||
|
||||
// Create a snapshot handle for this shard by doing a sync with nil expectedRoot.
|
||||
// Create a snapshot handle for this shard by doing a sync.
|
||||
// This is needed because the snapshot manager only creates handles when acquire
|
||||
// is called, and subsequent syncs with expectedRoot require the handle to exist.
|
||||
// is called.
|
||||
{
|
||||
conn, client := dialClient()
|
||||
stream, err := client.HyperStream(context.Background())
|
||||
stream, err := client.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
_, _ = clientHGs[0].Sync(
|
||||
_ = clientHGs[0].SyncFrom(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil, // nil to create the snapshot handle
|
||||
)
|
||||
_ = stream.CloseSend()
|
||||
conn.Close()
|
||||
@ -954,13 +952,12 @@ func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
|
||||
context.Background(),
|
||||
100*time.Second,
|
||||
)
|
||||
stream, err := client.HyperStream(streamCtx)
|
||||
stream, err := client.PerformSync(streamCtx)
|
||||
require.NoError(t, err)
|
||||
_, err = clientHGs[idx].Sync(
|
||||
err = clientHGs[idx].SyncFrom(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
serverRoot,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stream.CloseSend())
|
||||
@ -1262,14 +1259,13 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) {
|
||||
clientDB, clientHG := createClient("client-snapshot-root1")
|
||||
conn, client := dialClient()
|
||||
|
||||
stream, err := client.HyperStream(context.Background())
|
||||
stream, err := client.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = clientHG.Sync(
|
||||
err = clientHG.SyncFrom(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stream.CloseSend())
|
||||
@ -1308,22 +1304,21 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) {
|
||||
// Verify roots are different
|
||||
require.NotEqual(t, root1, root2, "roots should be different after adding more data")
|
||||
|
||||
// Test 1: Sync with nil expectedRoot (should get latest = root2)
|
||||
t.Run("sync with nil expectedRoot gets latest", func(t *testing.T) {
|
||||
// Test 1: Sync gets latest state
|
||||
t.Run("sync gets latest", func(t *testing.T) {
|
||||
clientDB, clientHG := createClient("client1")
|
||||
defer clientDB.Close()
|
||||
|
||||
conn, client := dialClient()
|
||||
defer conn.Close()
|
||||
|
||||
stream, err := client.HyperStream(context.Background())
|
||||
stream, err := client.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = clientHG.Sync(
|
||||
err = clientHG.SyncFrom(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil, // nil expectedRoot should use latest snapshot
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stream.CloseSend())
|
||||
@ -1334,25 +1329,24 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) {
|
||||
clientRoot := clientCommit[shardKey][0]
|
||||
|
||||
// Client should have synced to the latest (root2)
|
||||
assert.Equal(t, root2, clientRoot, "client should sync to latest root when expectedRoot is nil")
|
||||
assert.Equal(t, root2, clientRoot, "client should sync to latest root")
|
||||
})
|
||||
|
||||
// Test 2: Sync with expectedRoot = root1 (should get historical snapshot)
|
||||
t.Run("sync with expectedRoot gets matching generation", func(t *testing.T) {
|
||||
// Test 2: Multiple syncs converge to same state
|
||||
t.Run("multiple syncs converge", func(t *testing.T) {
|
||||
clientDB, clientHG := createClient("client2")
|
||||
defer clientDB.Close()
|
||||
|
||||
conn, client := dialClient()
|
||||
defer conn.Close()
|
||||
|
||||
stream, err := client.HyperStream(context.Background())
|
||||
stream, err := client.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = clientHG.Sync(
|
||||
err = clientHG.SyncFrom(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
root1, // Request sync against root1
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stream.CloseSend())
|
||||
@ -1362,34 +1356,8 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
clientRoot := clientCommit[shardKey][0]
|
||||
|
||||
// Client should have synced to root1 (the historical snapshot)
|
||||
assert.Equal(t, root1, clientRoot, "client should sync to historical root when expectedRoot matches")
|
||||
})
|
||||
|
||||
// Test 3: Sync with unknown expectedRoot (should fail - no matching snapshot)
|
||||
t.Run("sync with unknown expectedRoot fails", func(t *testing.T) {
|
||||
clientDB, clientHG := createClient("client3")
|
||||
defer clientDB.Close()
|
||||
|
||||
conn, client := dialClient()
|
||||
defer conn.Close()
|
||||
|
||||
stream, err := client.HyperStream(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Use a fake root that doesn't exist
|
||||
unknownRoot := make([]byte, 48)
|
||||
rand.Read(unknownRoot)
|
||||
|
||||
_, err = clientHG.Sync(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
unknownRoot, // Unknown root has no matching snapshot
|
||||
)
|
||||
// Sync should fail because no snapshot exists for the unknown root
|
||||
require.Error(t, err, "sync with unknown expectedRoot should fail")
|
||||
_ = stream.CloseSend()
|
||||
// Client should have synced to the latest (root2)
|
||||
assert.Equal(t, root2, clientRoot, "client should sync to latest root")
|
||||
})
|
||||
}
|
||||
|
||||
@ -1575,14 +1543,13 @@ func TestHypergraphSyncWithModifiedEntries(t *testing.T) {
|
||||
conn, client := dialClient()
|
||||
defer conn.Close()
|
||||
|
||||
stream, err := client.HyperStream(context.Background())
|
||||
stream, err := client.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = clientHG.Sync(
|
||||
err = clientHG.SyncFrom(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stream.CloseSend())
|
||||
@ -1791,14 +1758,13 @@ func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) {
|
||||
defer serverB.Stop()
|
||||
|
||||
connB, clientB := dialClient(lisB)
|
||||
streamB, err := clientB.HyperStream(context.Background())
|
||||
streamB, err := clientB.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = nodeAHG.Sync(
|
||||
err = nodeAHG.SyncFrom(
|
||||
streamB,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, streamB.CloseSend())
|
||||
@ -1818,14 +1784,13 @@ func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) {
|
||||
defer serverA.Stop()
|
||||
|
||||
connA, clientA := dialClient(lisA)
|
||||
streamA, err := clientA.HyperStream(context.Background())
|
||||
streamA, err := clientA.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = nodeBHG.Sync(
|
||||
err = nodeBHG.SyncFrom(
|
||||
streamA,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, streamA.CloseSend())
|
||||
@ -1883,6 +1848,283 @@ func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) {
|
||||
t.Log("Bidirectional sync test passed - both nodes have all 1000 vertices")
|
||||
}
|
||||
|
||||
// TestHypergraphBidirectionalSyncClientDriven tests the new client-driven sync
|
||||
// protocol (PerformSync/SyncFrom) with two nodes having disjoint data sets.
|
||||
// Node A has 500 unique vertices and node B has 500 different unique vertices.
|
||||
// After syncing in both directions, both nodes should have all 1000 vertices.
|
||||
func TestHypergraphBidirectionalSyncClientDriven(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
enc := verenc.NewMPCitHVerifiableEncryptor(1)
|
||||
inclusionProver := bls48581.NewKZGInclusionProver(logger)
|
||||
|
||||
// Create data trees for all 1000 vertices
|
||||
numVerticesPerNode := 500
|
||||
totalVertices := numVerticesPerNode * 2
|
||||
dataTrees := make([]*tries.VectorCommitmentTree, totalVertices)
|
||||
eg := errgroup.Group{}
|
||||
eg.SetLimit(100)
|
||||
for i := 0; i < totalVertices; i++ {
|
||||
eg.Go(func() error {
|
||||
dataTrees[i] = buildDataTree(t, inclusionProver)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
eg.Wait()
|
||||
t.Log("Generated data trees")
|
||||
|
||||
// Create databases and stores for both nodes
|
||||
nodeADB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA_cd/store"}, 0)
|
||||
defer nodeADB.Close()
|
||||
|
||||
nodeBDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB_cd/store"}, 0)
|
||||
defer nodeBDB.Close()
|
||||
|
||||
nodeAStore := store.NewPebbleHypergraphStore(
|
||||
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA_cd/store"},
|
||||
nodeADB,
|
||||
logger,
|
||||
enc,
|
||||
inclusionProver,
|
||||
)
|
||||
|
||||
nodeBStore := store.NewPebbleHypergraphStore(
|
||||
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB_cd/store"},
|
||||
nodeBDB,
|
||||
logger,
|
||||
enc,
|
||||
inclusionProver,
|
||||
)
|
||||
|
||||
nodeAHG := hgcrdt.NewHypergraph(
|
||||
logger.With(zap.String("side", "nodeA-cd")),
|
||||
nodeAStore,
|
||||
inclusionProver,
|
||||
[]int{},
|
||||
&tests.Nopthenticator{},
|
||||
200,
|
||||
)
|
||||
|
||||
nodeBHG := hgcrdt.NewHypergraph(
|
||||
logger.With(zap.String("side", "nodeB-cd")),
|
||||
nodeBStore,
|
||||
inclusionProver,
|
||||
[]int{},
|
||||
&tests.Nopthenticator{},
|
||||
200,
|
||||
)
|
||||
|
||||
// Create a shared domain for all vertices
|
||||
domain := randomBytes32(t)
|
||||
|
||||
// Generate vertices for node A (first 500)
|
||||
nodeAVertices := make([]application.Vertex, numVerticesPerNode)
|
||||
for i := 0; i < numVerticesPerNode; i++ {
|
||||
addr := randomBytes32(t)
|
||||
nodeAVertices[i] = hgcrdt.NewVertex(
|
||||
domain,
|
||||
addr,
|
||||
dataTrees[i].Commit(inclusionProver, false),
|
||||
dataTrees[i].GetSize(),
|
||||
)
|
||||
}
|
||||
|
||||
// Generate vertices for node B (second 500, completely different)
|
||||
nodeBVertices := make([]application.Vertex, numVerticesPerNode)
|
||||
for i := 0; i < numVerticesPerNode; i++ {
|
||||
addr := randomBytes32(t)
|
||||
nodeBVertices[i] = hgcrdt.NewVertex(
|
||||
domain,
|
||||
addr,
|
||||
dataTrees[numVerticesPerNode+i].Commit(inclusionProver, false),
|
||||
dataTrees[numVerticesPerNode+i].GetSize(),
|
||||
)
|
||||
}
|
||||
|
||||
shardKey := application.GetShardKey(nodeAVertices[0])
|
||||
|
||||
// Add vertices to node A
|
||||
t.Log("Adding 500 vertices to node A")
|
||||
nodeATxn, err := nodeAStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
for i, v := range nodeAVertices {
|
||||
id := v.GetID()
|
||||
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
|
||||
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
|
||||
}
|
||||
require.NoError(t, nodeATxn.Commit())
|
||||
|
||||
// Add vertices to node B
|
||||
t.Log("Adding 500 different vertices to node B")
|
||||
nodeBTxn, err := nodeBStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
for i, v := range nodeBVertices {
|
||||
id := v.GetID()
|
||||
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[numVerticesPerNode+i]))
|
||||
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
|
||||
}
|
||||
require.NoError(t, nodeBTxn.Commit())
|
||||
|
||||
// Commit both hypergraphs
|
||||
_, err = nodeAHG.Commit(1)
|
||||
require.NoError(t, err)
|
||||
_, err = nodeBHG.Commit(1)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeARootBefore := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
nodeBRootBefore := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
t.Logf("Node A root before sync: %x", nodeARootBefore)
|
||||
t.Logf("Node B root before sync: %x", nodeBRootBefore)
|
||||
require.NotEqual(t, nodeARootBefore, nodeBRootBefore, "roots should differ before sync")
|
||||
|
||||
// Helper to set up gRPC server for a hypergraph
|
||||
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
|
||||
const bufSize = 1 << 20
|
||||
lis := bufconn.Listen(bufSize)
|
||||
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.ChainStreamInterceptor(func(
|
||||
srv interface{},
|
||||
ss grpc.ServerStream,
|
||||
info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler,
|
||||
) error {
|
||||
_, priv, _ := ed448.GenerateKey(rand.Reader)
|
||||
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
|
||||
pub := privKey.GetPublic()
|
||||
peerID, err := peer.IDFromPublicKey(pub)
|
||||
require.NoError(t, err)
|
||||
|
||||
return handler(srv, &serverStream{
|
||||
ServerStream: ss,
|
||||
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
|
||||
})
|
||||
}),
|
||||
)
|
||||
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
|
||||
|
||||
go func() {
|
||||
_ = grpcServer.Serve(lis)
|
||||
}()
|
||||
|
||||
return lis, grpcServer
|
||||
}
|
||||
|
||||
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
|
||||
dialer := func(context.Context, string) (net.Conn, error) {
|
||||
return lis.Dial()
|
||||
}
|
||||
conn, err := grpc.DialContext(
|
||||
context.Background(),
|
||||
"bufnet",
|
||||
grpc.WithContextDialer(dialer),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
|
||||
}
|
||||
|
||||
// Convert tries.ShardKey to bytes for SyncFrom
|
||||
shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:])
|
||||
_ = shardKeyBytes // Used below in the SyncFrom call
|
||||
|
||||
// Step 1: Node A syncs from Node B (as server) using client-driven sync
|
||||
// Node A should receive Node B's 500 vertices
|
||||
t.Log("Step 1: Node A syncs from Node B using PerformSync (B is server)")
|
||||
lisB, serverB := setupServer(nodeBHG)
|
||||
defer serverB.Stop()
|
||||
|
||||
connB, clientB := dialClient(lisB)
|
||||
streamB, err := clientB.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = nodeAHG.SyncFrom(
|
||||
streamB,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, streamB.CloseSend())
|
||||
connB.Close()
|
||||
|
||||
_, err = nodeAHG.Commit(2)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeARootAfterFirstSync := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
t.Logf("Node A root after syncing from B: %x", nodeARootAfterFirstSync)
|
||||
|
||||
// Step 2: Node B syncs from Node A (as server) using client-driven sync
|
||||
// Node B should receive Node A's 500 vertices
|
||||
t.Log("Step 2: Node B syncs from Node A using PerformSync (A is server)")
|
||||
lisA, serverA := setupServer(nodeAHG)
|
||||
defer serverA.Stop()
|
||||
|
||||
connA, clientA := dialClient(lisA)
|
||||
streamA, err := clientA.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = nodeBHG.SyncFrom(
|
||||
streamA,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, streamA.CloseSend())
|
||||
connA.Close()
|
||||
|
||||
_, err = nodeBHG.Commit(2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify both nodes have converged
|
||||
nodeARootFinal := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
nodeBRootFinal := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
t.Logf("Node A final root: %x", nodeARootFinal)
|
||||
t.Logf("Node B final root: %x", nodeBRootFinal)
|
||||
|
||||
assert.Equal(t, nodeARootFinal, nodeBRootFinal, "both nodes should have identical roots after bidirectional sync")
|
||||
|
||||
// Verify the tree contains all 1000 vertices
|
||||
nodeATree := nodeAHG.GetVertexAddsSet(shardKey).GetTree()
|
||||
nodeBTree := nodeBHG.GetVertexAddsSet(shardKey).GetTree()
|
||||
|
||||
nodeALeaves := tries.GetAllLeaves(
|
||||
nodeATree.SetType,
|
||||
nodeATree.PhaseType,
|
||||
nodeATree.ShardKey,
|
||||
nodeATree.Root,
|
||||
)
|
||||
nodeBLeaves := tries.GetAllLeaves(
|
||||
nodeBTree.SetType,
|
||||
nodeBTree.PhaseType,
|
||||
nodeBTree.ShardKey,
|
||||
nodeBTree.Root,
|
||||
)
|
||||
|
||||
nodeALeafCount := 0
|
||||
for _, leaf := range nodeALeaves {
|
||||
if leaf != nil {
|
||||
nodeALeafCount++
|
||||
}
|
||||
}
|
||||
nodeBLeafCount := 0
|
||||
for _, leaf := range nodeBLeaves {
|
||||
if leaf != nil {
|
||||
nodeBLeafCount++
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Node A has %d leaves, Node B has %d leaves", nodeALeafCount, nodeBLeafCount)
|
||||
assert.Equal(t, totalVertices, nodeALeafCount, "Node A should have all 1000 vertices")
|
||||
assert.Equal(t, totalVertices, nodeBLeafCount, "Node B should have all 1000 vertices")
|
||||
|
||||
// Verify no differences between the trees
|
||||
diffLeaves := tries.CompareLeaves(nodeATree, nodeBTree)
|
||||
assert.Empty(t, diffLeaves, "there should be no differences between the trees")
|
||||
|
||||
t.Log("Client-driven bidirectional sync test passed - both nodes have all 1000 vertices")
|
||||
}
|
||||
|
||||
// TestHypergraphSyncWithPrefixLengthMismatch tests sync behavior when one node
|
||||
// has a deeper tree structure (longer prefix path) than the other. This tests
|
||||
// the prefix length mismatch handling in the walk function.
|
||||
@ -2150,14 +2392,13 @@ func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) {
|
||||
t.Logf("Client has %d leaves before sync", clientLeafCountBefore)
|
||||
|
||||
conn, client := dialClient(lis)
|
||||
stream, err := client.HyperStream(context.Background())
|
||||
stream, err := client.PerformSync(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = clientHG.Sync(
|
||||
err = clientHG.SyncFrom(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stream.CloseSend())
|
||||
|
||||
@ -264,7 +264,8 @@ func TestWorkerManager_AllocateDeallocateWorker(t *testing.T) {
|
||||
"quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer,
|
||||
},
|
||||
map[string]channel.AllowedPeerPolicyType{
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer,
|
||||
@ -561,7 +562,8 @@ func TestWorkerManager_EmptyFilter(t *testing.T) {
|
||||
"quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer,
|
||||
},
|
||||
map[string]channel.AllowedPeerPolicyType{
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer,
|
||||
@ -687,7 +689,8 @@ func TestWorkerManager_FilterUpdate(t *testing.T) {
|
||||
"quilibrium.node.global.pb.KeyRegistryService": channel.OnlySelfPeer,
|
||||
},
|
||||
map[string]channel.AllowedPeerPolicyType{
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync": channel.OnlyShardProverPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/GetTag": channel.AnyPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/PutTag": channel.AnyPeer,
|
||||
"/quilibrium.node.global.pb.MixnetService/PutMessage": channel.AnyPeer,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -108,6 +108,49 @@ func local_request_HypergraphComparisonService_GetChildrenForPath_0(ctx context.
|
||||
|
||||
}
|
||||
|
||||
func request_HypergraphComparisonService_PerformSync_0(ctx context.Context, marshaler runtime.Marshaler, client HypergraphComparisonServiceClient, req *http.Request, pathParams map[string]string) (HypergraphComparisonService_PerformSyncClient, runtime.ServerMetadata, error) {
|
||||
var metadata runtime.ServerMetadata
|
||||
stream, err := client.PerformSync(ctx)
|
||||
if err != nil {
|
||||
grpclog.Infof("Failed to start streaming: %v", err)
|
||||
return nil, metadata, err
|
||||
}
|
||||
dec := marshaler.NewDecoder(req.Body)
|
||||
handleSend := func() error {
|
||||
var protoReq HypergraphSyncQuery
|
||||
err := dec.Decode(&protoReq)
|
||||
if err == io.EOF {
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
grpclog.Infof("Failed to decode request: %v", err)
|
||||
return err
|
||||
}
|
||||
if err := stream.Send(&protoReq); err != nil {
|
||||
grpclog.Infof("Failed to send request: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
if err := handleSend(); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := stream.CloseSend(); err != nil {
|
||||
grpclog.Infof("Failed to terminate client stream: %v", err)
|
||||
}
|
||||
}()
|
||||
header, err := stream.Header()
|
||||
if err != nil {
|
||||
grpclog.Infof("Failed to get header from client: %v", err)
|
||||
return nil, metadata, err
|
||||
}
|
||||
metadata.HeaderMD = header
|
||||
return stream, metadata, nil
|
||||
}
|
||||
|
||||
// RegisterHypergraphComparisonServiceHandlerServer registers the http handlers for service HypergraphComparisonService to "mux".
|
||||
// UnaryRPC :call HypergraphComparisonServiceServer directly.
|
||||
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
|
||||
@ -146,6 +189,13 @@ func RegisterHypergraphComparisonServiceHandlerServer(ctx context.Context, mux *
|
||||
|
||||
})
|
||||
|
||||
mux.Handle("POST", pattern_HypergraphComparisonService_PerformSync_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
|
||||
err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport")
|
||||
_, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
|
||||
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
|
||||
return
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -231,6 +281,28 @@ func RegisterHypergraphComparisonServiceHandlerClient(ctx context.Context, mux *
|
||||
|
||||
})
|
||||
|
||||
mux.Handle("POST", pattern_HypergraphComparisonService_PerformSync_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
defer cancel()
|
||||
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
|
||||
var err error
|
||||
var annotatedContext context.Context
|
||||
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync", runtime.WithHTTPPathPattern("/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync"))
|
||||
if err != nil {
|
||||
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
|
||||
return
|
||||
}
|
||||
resp, md, err := request_HypergraphComparisonService_PerformSync_0(annotatedContext, inboundMarshaler, client, req, pathParams)
|
||||
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
|
||||
if err != nil {
|
||||
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
forward_HypergraphComparisonService_PerformSync_0(annotatedContext, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...)
|
||||
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -238,10 +310,14 @@ var (
|
||||
pattern_HypergraphComparisonService_HyperStream_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.application.pb.HypergraphComparisonService", "HyperStream"}, ""))
|
||||
|
||||
pattern_HypergraphComparisonService_GetChildrenForPath_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.application.pb.HypergraphComparisonService", "GetChildrenForPath"}, ""))
|
||||
|
||||
pattern_HypergraphComparisonService_PerformSync_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"quilibrium.node.application.pb.HypergraphComparisonService", "PerformSync"}, ""))
|
||||
)
|
||||
|
||||
var (
|
||||
forward_HypergraphComparisonService_HyperStream_0 = runtime.ForwardResponseStream
|
||||
|
||||
forward_HypergraphComparisonService_GetChildrenForPath_0 = runtime.ForwardResponseMessage
|
||||
|
||||
forward_HypergraphComparisonService_PerformSync_0 = runtime.ForwardResponseStream
|
||||
)
|
||||
|
||||
@ -180,4 +180,106 @@ message GetChildrenForPathResponse {
|
||||
service HypergraphComparisonService {
|
||||
rpc HyperStream(stream HypergraphComparison) returns (stream HypergraphComparison);
|
||||
rpc GetChildrenForPath(GetChildrenForPathRequest) returns (GetChildrenForPathResponse);
|
||||
|
||||
// PerformSync provides a client-driven sync interface. Unlike HyperStream
|
||||
// which requires both sides to walk in lockstep, PerformSync uses a simple
|
||||
// request/response pattern where the client navigates the server's tree
|
||||
// and fetches data as needed.
|
||||
rpc PerformSync(stream HypergraphSyncQuery) returns (stream HypergraphSyncResponse);
|
||||
}
|
||||
|
||||
// HypergraphSyncQuery wraps request types for the client-driven sync RPC.
|
||||
message HypergraphSyncQuery {
|
||||
oneof request {
|
||||
HypergraphSyncGetBranchRequest get_branch = 1;
|
||||
HypergraphSyncGetLeavesRequest get_leaves = 2;
|
||||
}
|
||||
}
|
||||
|
||||
// HypergraphSyncResponse wraps response types for the client-driven sync RPC.
|
||||
message HypergraphSyncResponse {
|
||||
oneof response {
|
||||
HypergraphSyncBranchResponse branch = 1;
|
||||
HypergraphSyncLeavesResponse leaves = 2;
|
||||
HypergraphSyncError error = 3;
|
||||
}
|
||||
}
|
||||
|
||||
// HypergraphSyncGetBranchRequest queries for branch information at a path.
|
||||
message HypergraphSyncGetBranchRequest {
|
||||
// The shard key (35 bytes: L1 bloom filter (3) + L2 app address (32)).
|
||||
bytes shard_key = 1;
|
||||
// The phase set to query.
|
||||
HypergraphPhaseSet phase_set = 2;
|
||||
// The path to query. Empty path queries the root.
|
||||
repeated int32 path = 3;
|
||||
}
|
||||
|
||||
// HypergraphSyncBranchResponse contains branch information at the queried path.
|
||||
message HypergraphSyncBranchResponse {
|
||||
// The full path to this node, including any compressed prefix.
|
||||
// This may be longer than the requested path due to path compression.
|
||||
repeated int32 full_path = 1;
|
||||
// The commitment (hash) for this node.
|
||||
bytes commitment = 2;
|
||||
// Child information. Empty if this is a leaf node.
|
||||
repeated HypergraphSyncChildInfo children = 3;
|
||||
// True if this node is a leaf (has no children).
|
||||
bool is_leaf = 4;
|
||||
// The number of leaves under this node (for progress estimation).
|
||||
uint64 leaf_count = 5;
|
||||
}
|
||||
|
||||
// HypergraphSyncChildInfo contains summary information about a child node.
|
||||
message HypergraphSyncChildInfo {
|
||||
// The child index (0-63 for a 64-ary tree).
|
||||
int32 index = 1;
|
||||
// The commitment (hash) for this child.
|
||||
bytes commitment = 2;
|
||||
}
|
||||
|
||||
// HypergraphSyncGetLeavesRequest requests all leaves under a subtree.
|
||||
message HypergraphSyncGetLeavesRequest {
|
||||
// The shard key.
|
||||
bytes shard_key = 1;
|
||||
// The phase set to query.
|
||||
HypergraphPhaseSet phase_set = 2;
|
||||
// The path to the subtree root.
|
||||
repeated int32 path = 3;
|
||||
// Maximum number of leaves to return (0 = server default).
|
||||
uint32 max_leaves = 4;
|
||||
// Continuation token for pagination. Empty for first request.
|
||||
bytes continuation_token = 5;
|
||||
}
|
||||
|
||||
// HypergraphSyncLeavesResponse contains leaves from the requested subtree.
|
||||
message HypergraphSyncLeavesResponse {
|
||||
// Echoed path from the request.
|
||||
repeated int32 path = 1;
|
||||
// The leaves under this path (reuses existing LeafData message).
|
||||
repeated LeafData leaves = 2;
|
||||
// Continuation token if more leaves remain. Empty if this is the last batch.
|
||||
bytes continuation_token = 3;
|
||||
// Total number of leaves under this path (for progress tracking).
|
||||
uint64 total_leaves = 4;
|
||||
}
|
||||
|
||||
// HypergraphSyncError reports an error during sync.
|
||||
message HypergraphSyncError {
|
||||
// Error code for programmatic handling.
|
||||
HypergraphSyncErrorCode code = 1;
|
||||
// Human-readable error message.
|
||||
string message = 2;
|
||||
// The path where the error occurred, if applicable.
|
||||
repeated int32 path = 3;
|
||||
}
|
||||
|
||||
// HypergraphSyncErrorCode enumerates possible sync errors.
|
||||
enum HypergraphSyncErrorCode {
|
||||
HYPERGRAPH_SYNC_ERROR_UNKNOWN = 0;
|
||||
HYPERGRAPH_SYNC_ERROR_INVALID_SHARD_KEY = 1;
|
||||
HYPERGRAPH_SYNC_ERROR_INVALID_PATH = 2;
|
||||
HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND = 3;
|
||||
HYPERGRAPH_SYNC_ERROR_SNAPSHOT_UNAVAILABLE = 4;
|
||||
HYPERGRAPH_SYNC_ERROR_INTERNAL = 5;
|
||||
}
|
||||
@ -21,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion7
|
||||
const (
|
||||
HypergraphComparisonService_HyperStream_FullMethodName = "/quilibrium.node.application.pb.HypergraphComparisonService/HyperStream"
|
||||
HypergraphComparisonService_GetChildrenForPath_FullMethodName = "/quilibrium.node.application.pb.HypergraphComparisonService/GetChildrenForPath"
|
||||
HypergraphComparisonService_PerformSync_FullMethodName = "/quilibrium.node.application.pb.HypergraphComparisonService/PerformSync"
|
||||
)
|
||||
|
||||
// HypergraphComparisonServiceClient is the client API for HypergraphComparisonService service.
|
||||
@ -29,6 +30,11 @@ const (
|
||||
type HypergraphComparisonServiceClient interface {
|
||||
HyperStream(ctx context.Context, opts ...grpc.CallOption) (HypergraphComparisonService_HyperStreamClient, error)
|
||||
GetChildrenForPath(ctx context.Context, in *GetChildrenForPathRequest, opts ...grpc.CallOption) (*GetChildrenForPathResponse, error)
|
||||
// PerformSync provides a client-driven sync interface. Unlike HyperStream
|
||||
// which requires both sides to walk in lockstep, PerformSync uses a simple
|
||||
// request/response pattern where the client navigates the server's tree
|
||||
// and fetches data as needed.
|
||||
PerformSync(ctx context.Context, opts ...grpc.CallOption) (HypergraphComparisonService_PerformSyncClient, error)
|
||||
}
|
||||
|
||||
type hypergraphComparisonServiceClient struct {
|
||||
@ -79,12 +85,48 @@ func (c *hypergraphComparisonServiceClient) GetChildrenForPath(ctx context.Conte
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *hypergraphComparisonServiceClient) PerformSync(ctx context.Context, opts ...grpc.CallOption) (HypergraphComparisonService_PerformSyncClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &HypergraphComparisonService_ServiceDesc.Streams[1], HypergraphComparisonService_PerformSync_FullMethodName, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &hypergraphComparisonServicePerformSyncClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type HypergraphComparisonService_PerformSyncClient interface {
|
||||
Send(*HypergraphSyncQuery) error
|
||||
Recv() (*HypergraphSyncResponse, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type hypergraphComparisonServicePerformSyncClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *hypergraphComparisonServicePerformSyncClient) Send(m *HypergraphSyncQuery) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *hypergraphComparisonServicePerformSyncClient) Recv() (*HypergraphSyncResponse, error) {
|
||||
m := new(HypergraphSyncResponse)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// HypergraphComparisonServiceServer is the server API for HypergraphComparisonService service.
|
||||
// All implementations must embed UnimplementedHypergraphComparisonServiceServer
|
||||
// for forward compatibility
|
||||
type HypergraphComparisonServiceServer interface {
|
||||
HyperStream(HypergraphComparisonService_HyperStreamServer) error
|
||||
GetChildrenForPath(context.Context, *GetChildrenForPathRequest) (*GetChildrenForPathResponse, error)
|
||||
// PerformSync provides a client-driven sync interface. Unlike HyperStream
|
||||
// which requires both sides to walk in lockstep, PerformSync uses a simple
|
||||
// request/response pattern where the client navigates the server's tree
|
||||
// and fetches data as needed.
|
||||
PerformSync(HypergraphComparisonService_PerformSyncServer) error
|
||||
mustEmbedUnimplementedHypergraphComparisonServiceServer()
|
||||
}
|
||||
|
||||
@ -98,6 +140,9 @@ func (UnimplementedHypergraphComparisonServiceServer) HyperStream(HypergraphComp
|
||||
func (UnimplementedHypergraphComparisonServiceServer) GetChildrenForPath(context.Context, *GetChildrenForPathRequest) (*GetChildrenForPathResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetChildrenForPath not implemented")
|
||||
}
|
||||
func (UnimplementedHypergraphComparisonServiceServer) PerformSync(HypergraphComparisonService_PerformSyncServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method PerformSync not implemented")
|
||||
}
|
||||
func (UnimplementedHypergraphComparisonServiceServer) mustEmbedUnimplementedHypergraphComparisonServiceServer() {
|
||||
}
|
||||
|
||||
@ -156,6 +201,32 @@ func _HypergraphComparisonService_GetChildrenForPath_Handler(srv interface{}, ct
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _HypergraphComparisonService_PerformSync_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(HypergraphComparisonServiceServer).PerformSync(&hypergraphComparisonServicePerformSyncServer{stream})
|
||||
}
|
||||
|
||||
type HypergraphComparisonService_PerformSyncServer interface {
|
||||
Send(*HypergraphSyncResponse) error
|
||||
Recv() (*HypergraphSyncQuery, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type hypergraphComparisonServicePerformSyncServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *hypergraphComparisonServicePerformSyncServer) Send(m *HypergraphSyncResponse) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *hypergraphComparisonServicePerformSyncServer) Recv() (*HypergraphSyncQuery, error) {
|
||||
m := new(HypergraphSyncQuery)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// HypergraphComparisonService_ServiceDesc is the grpc.ServiceDesc for HypergraphComparisonService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
@ -175,6 +246,12 @@ var HypergraphComparisonService_ServiceDesc = grpc.ServiceDesc{
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
{
|
||||
StreamName: "PerformSync",
|
||||
Handler: _HypergraphComparisonService_PerformSync_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "application.proto",
|
||||
}
|
||||
|
||||
@ -295,15 +295,14 @@ type Hypergraph interface {
|
||||
// Embeds the comparison service
|
||||
protobufs.HypergraphComparisonServiceServer
|
||||
|
||||
// Sync is the client-side initiator for synchronization. If expectedRoot is
|
||||
// provided, the server will attempt to use a snapshot with a matching commit
|
||||
// root. This allows the client to sync against a specific known state.
|
||||
Sync(
|
||||
stream protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
// SyncFrom is the client-side initiator for synchronization using the
|
||||
// client-driven protocol. The client navigates the server's tree and
|
||||
// fetches differing data.
|
||||
SyncFrom(
|
||||
stream protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey tries.ShardKey,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
expectedRoot []byte,
|
||||
) ([]byte, error)
|
||||
) error
|
||||
|
||||
// Transaction and utility operations
|
||||
|
||||
|
||||
@ -207,15 +207,14 @@ func (h *MockHypergraph) HyperStream(
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
// Sync implements hypergraph.Hypergraph.
|
||||
func (h *MockHypergraph) Sync(
|
||||
stream protobufs.HypergraphComparisonService_HyperStreamClient,
|
||||
// SyncFrom implements hypergraph.Hypergraph.
|
||||
func (h *MockHypergraph) SyncFrom(
|
||||
stream protobufs.HypergraphComparisonService_PerformSyncClient,
|
||||
shardKey tries.ShardKey,
|
||||
phaseSet protobufs.HypergraphPhaseSet,
|
||||
expectedRoot []byte,
|
||||
) ([]byte, error) {
|
||||
args := h.Called(stream, shardKey, phaseSet, expectedRoot)
|
||||
return args.Get(0).([]byte), args.Error(1)
|
||||
) error {
|
||||
args := h.Called(stream, shardKey, phaseSet)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
// RunDataPruning implements hypergraph.Hypergraph.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user