package hypergraph import ( "bytes" "context" "encoding/hex" "fmt" "io" "slices" "strings" "time" "github.com/pkg/errors" "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/hypergraph" "source.quilibrium.com/quilibrium/monorepo/types/tries" ) // syncSession holds the state for a PerformSync session. type syncSession struct { shardKey tries.ShardKey phaseSet protobufs.HypergraphPhaseSet snapshot *snapshotHandle idSet hypergraph.IdSet store tries.TreeBackingStore } // PerformSync implements the server side of the client-driven sync protocol. // The client sends GetBranch and GetLeaves requests, and the server responds // with the requested data. This is simpler than HyperStream because there's // no need for both sides to walk in lockstep. // // The server uses a snapshot to ensure consistent reads throughout the session. func (hg *HypergraphCRDT) PerformSync( stream protobufs.HypergraphComparisonService_PerformSyncServer, ) error { ctx := stream.Context() logger := hg.logger.With(zap.String("method", "PerformSync")) sessionStart := time.Now() // Session state - initialized on first request var session *syncSession defer func() { if session != nil && session.snapshot != nil { hg.snapshotMgr.release(session.snapshot) } logger.Info( "PerformSync session finished", zap.Duration("duration", time.Since(sessionStart)), ) }() // Process requests until stream closes for { select { case <-ctx.Done(): return ctx.Err() default: } req, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return errors.Wrap(err, "receive request") } var resp *protobufs.HypergraphSyncResponse switch r := req.Request.(type) { case *protobufs.HypergraphSyncQuery_GetBranch: // Initialize session on first request if session == nil { session, err = hg.initSyncSession( r.GetBranch.ShardKey, r.GetBranch.PhaseSet, r.GetBranch.ExpectedRoot, logger, ) if err != nil { return errors.Wrap(err, "init sync session") } } resp, err = hg.handleGetBranch(ctx, r.GetBranch, session, logger) case *protobufs.HypergraphSyncQuery_GetLeaves: // Initialize session on first request if session == nil { session, err = hg.initSyncSession( r.GetLeaves.ShardKey, r.GetLeaves.PhaseSet, r.GetLeaves.ExpectedRoot, logger, ) if err != nil { return errors.Wrap(err, "init sync session") } } resp, err = hg.handleGetLeaves(ctx, r.GetLeaves, session, logger) default: resp = &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Error{ Error: &protobufs.HypergraphSyncError{ Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_UNKNOWN, Message: "unknown request type", }, }, } } if err != nil { logger.Error("error handling request", zap.Error(err)) resp = &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Error{ Error: &protobufs.HypergraphSyncError{ Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_INTERNAL, Message: err.Error(), }, }, } } if err := stream.Send(resp); err != nil { return errors.Wrap(err, "send response") } } } // initSyncSession initializes a sync session with a snapshot for consistent reads. func (hg *HypergraphCRDT) initSyncSession( shardKeyBytes []byte, phaseSet protobufs.HypergraphPhaseSet, expectedRoot []byte, logger *zap.Logger, ) (*syncSession, error) { if len(shardKeyBytes) != 35 { return nil, errors.New("shard key must be 35 bytes") } shardKey := tries.ShardKey{ L1: [3]byte(shardKeyBytes[:3]), L2: [32]byte(shardKeyBytes[3:]), } // Acquire a snapshot for consistent reads throughout the session. // If expectedRoot is provided, we try to find a snapshot matching that root. snapshot := hg.snapshotMgr.acquire(shardKey, expectedRoot) if snapshot == nil { return nil, errors.New("failed to acquire snapshot") } snapshotStore := snapshot.Store() idSet := hg.snapshotPhaseSet(shardKey, phaseSet, snapshotStore) if idSet == nil { hg.snapshotMgr.release(snapshot) return nil, errors.New("unsupported phase set") } tree := idSet.GetTree() logger.Debug( "sync session initialized", zap.String("shard", hex.EncodeToString(shardKeyBytes)), zap.Int("phaseSet", int(phaseSet)), zap.Bool("tree_nil", tree == nil), zap.Bool("root_nil", tree != nil && tree.Root == nil), zap.String("snapshot_root", hex.EncodeToString(snapshot.Root())), ) return &syncSession{ shardKey: shardKey, phaseSet: phaseSet, snapshot: snapshot, idSet: idSet, store: snapshotStore, }, nil } func (hg *HypergraphCRDT) handleGetBranch( ctx context.Context, req *protobufs.HypergraphSyncGetBranchRequest, session *syncSession, logger *zap.Logger, ) (*protobufs.HypergraphSyncResponse, error) { tree := session.idSet.GetTree() if tree == nil || tree.Root == nil { // Empty tree - return empty response logger.Debug("handleGetBranch: empty tree", zap.Bool("tree_nil", tree == nil), zap.Bool("root_nil", tree != nil && tree.Root == nil), zap.String("path", hex.EncodeToString(packPath(req.Path))), ) return &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Branch{ Branch: &protobufs.HypergraphSyncBranchResponse{ FullPath: req.Path, Commitment: nil, Children: nil, IsLeaf: true, LeafCount: 0, }, }, }, nil } path := toIntSlice(req.Path) node := getNodeAtPath( logger, tree.SetType, tree.PhaseType, tree.ShardKey, tree.Root, toInt32Slice(path), 0, ) if node == nil { return &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Error{ Error: &protobufs.HypergraphSyncError{ Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND, Message: "node not found at path", Path: req.Path, }, }, }, nil } resp := &protobufs.HypergraphSyncBranchResponse{} // Ensure commitment is computed first node = ensureCommittedNode(logger, tree, path, node) switch n := node.(type) { case *tries.LazyVectorCommitmentBranchNode: resp.FullPath = toInt32Slice(n.FullPrefix) resp.Commitment = n.Commitment resp.IsLeaf = false resp.LeafCount = uint64(n.LeafCount) // Collect children for i := 0; i < 64; i++ { child := n.Children[i] if child == nil { var err error child, err = n.Store.GetNodeByPath( tree.SetType, tree.PhaseType, tree.ShardKey, slices.Concat(n.FullPrefix, []int{i}), ) if err != nil && !strings.Contains(err.Error(), "item not found") { continue } } if child != nil { childPath := slices.Concat(n.FullPrefix, []int{i}) child = ensureCommittedNode(logger, tree, childPath, child) var childCommit []byte switch c := child.(type) { case *tries.LazyVectorCommitmentBranchNode: childCommit = c.Commitment case *tries.LazyVectorCommitmentLeafNode: childCommit = c.Commitment } if len(childCommit) > 0 { resp.Children = append(resp.Children, &protobufs.HypergraphSyncChildInfo{ Index: int32(i), Commitment: childCommit, }) } } } case *tries.LazyVectorCommitmentLeafNode: resp.FullPath = req.Path // Leaves don't have FullPrefix, use requested path resp.Commitment = n.Commitment resp.IsLeaf = true resp.LeafCount = 1 } return &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Branch{ Branch: resp, }, }, nil } func (hg *HypergraphCRDT) handleGetLeaves( ctx context.Context, req *protobufs.HypergraphSyncGetLeavesRequest, session *syncSession, logger *zap.Logger, ) (*protobufs.HypergraphSyncResponse, error) { tree := session.idSet.GetTree() if tree == nil || tree.Root == nil { return &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Leaves{ Leaves: &protobufs.HypergraphSyncLeavesResponse{ Path: req.Path, Leaves: nil, TotalLeaves: 0, }, }, }, nil } path := toIntSlice(req.Path) node := getNodeAtPath( logger, tree.SetType, tree.PhaseType, tree.ShardKey, tree.Root, toInt32Slice(path), 0, ) if node == nil { return &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Error{ Error: &protobufs.HypergraphSyncError{ Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND, Message: "node not found at path", Path: req.Path, }, }, }, nil } // Debug: log node details to identify snapshot consistency issues var nodeCommitment []byte var nodeStore string switch n := node.(type) { case *tries.LazyVectorCommitmentBranchNode: nodeCommitment = n.Commitment nodeStore = fmt.Sprintf("%p", n.Store) case *tries.LazyVectorCommitmentLeafNode: nodeCommitment = n.Commitment nodeStore = fmt.Sprintf("%p", n.Store) } logger.Debug("handleGetLeaves node info", zap.String("path", hex.EncodeToString(packPath(req.Path))), zap.String("nodeCommitment", hex.EncodeToString(nodeCommitment)), zap.String("nodeStore", nodeStore), zap.String("sessionStore", fmt.Sprintf("%p", session.store)), zap.Int("contTokenLen", len(req.ContinuationToken)), ) // Get all leaves under this node allLeaves := tries.GetAllLeaves( tree.SetType, tree.PhaseType, tree.ShardKey, node, ) // Apply pagination maxLeaves := int(req.MaxLeaves) if maxLeaves == 0 { maxLeaves = 1000 // Default batch size } startIdx := 0 if len(req.ContinuationToken) > 0 { // Simple continuation: token is the start index as hex idx, err := parseContToken(req.ContinuationToken) if err == nil { startIdx = idx } } var leaves []*protobufs.LeafData var totalNonNil uint64 for i, leaf := range allLeaves { if leaf == nil { continue } totalNonNil++ if int(totalNonNil) <= startIdx { continue } if len(leaves) >= maxLeaves { break } leafData := &protobufs.LeafData{ Key: leaf.Key, Value: leaf.Value, HashTarget: leaf.HashTarget, Size: leaf.Size.FillBytes(make([]byte, 32)), } // Load underlying vertex tree if available (use snapshot store for consistency) vtree, err := session.store.LoadVertexTree(leaf.Key) if err == nil && vtree != nil { data, err := tries.SerializeNonLazyTree(vtree) if err == nil { leafData.UnderlyingData = data } } leaves = append(leaves, leafData) _ = i // suppress unused warning } // Debug: log leaf count details logger.Debug("handleGetLeaves returning", zap.String("path", hex.EncodeToString(packPath(req.Path))), zap.Int("allLeavesLen", len(allLeaves)), zap.Uint64("totalNonNil", totalNonNil), zap.Int("startIdx", startIdx), zap.Int("leavesReturned", len(leaves)), zap.String("treePtr", fmt.Sprintf("%p", tree)), zap.String("treeRootPtr", fmt.Sprintf("%p", tree.Root)), ) resp := &protobufs.HypergraphSyncLeavesResponse{ Path: req.Path, Leaves: leaves, TotalLeaves: totalNonNil, } // Set continuation token if more leaves remain if startIdx+len(leaves) < int(totalNonNil) { resp.ContinuationToken = makeContToken(startIdx + len(leaves)) } return &protobufs.HypergraphSyncResponse{ Response: &protobufs.HypergraphSyncResponse_Leaves{ Leaves: resp, }, }, nil } func (hg *HypergraphCRDT) getPhaseSet( shardKey tries.ShardKey, phaseSet protobufs.HypergraphPhaseSet, ) hypergraph.IdSet { switch phaseSet { case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS: return hg.getVertexAddsSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES: return hg.getVertexRemovesSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS: return hg.getHyperedgeAddsSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES: return hg.getHyperedgeRemovesSet(shardKey) default: return nil } } func parseContToken(token []byte) (int, error) { if len(token) == 0 { return 0, nil } // Token is hex-encoded 4 bytes (big-endian int32) decoded, err := hex.DecodeString(string(token)) if err != nil { return 0, err } if len(decoded) != 4 { return 0, errors.New("invalid continuation token length") } idx := int(decoded[0])<<24 | int(decoded[1])<<16 | int(decoded[2])<<8 | int(decoded[3]) return idx, nil } func makeContToken(idx int) []byte { return []byte(hex.EncodeToString([]byte{byte(idx >> 24), byte(idx >> 16), byte(idx >> 8), byte(idx)})) } // SyncFrom performs a client-driven sync from the given server stream. // It navigates to the covered prefix (if any), then recursively syncs // differing subtrees. If expectedRoot is provided, the server will attempt // to sync from a snapshot matching that root commitment. // Returns the new root commitment after sync completes. func (hg *HypergraphCRDT) SyncFrom( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey tries.ShardKey, phaseSet protobufs.HypergraphPhaseSet, expectedRoot []byte, ) ([]byte, error) { hg.mu.Lock() defer hg.mu.Unlock() logger := hg.logger.With( zap.String("method", "SyncFrom"), zap.String("shard", hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:]))), ) if len(expectedRoot) > 0 { logger = logger.With(zap.String("expectedRoot", hex.EncodeToString(expectedRoot))) } syncStart := time.Now() defer func() { logger.Info("SyncFrom completed", zap.Duration("duration", time.Since(syncStart))) }() set := hg.getPhaseSet(shardKey, phaseSet) if set == nil { return nil, errors.New("unsupported phase set") } shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:]) coveredPrefix := hg.getCoveredPrefix() // Step 1: Navigate to sync point syncPoint, err := hg.navigateToSyncPoint(stream, shardKeyBytes, phaseSet, coveredPrefix, expectedRoot, logger) if err != nil { return nil, errors.Wrap(err, "navigate to sync point") } if syncPoint == nil || len(syncPoint.Commitment) == 0 { logger.Info("server has no data at sync point") // Return current root even if no data was synced root := set.GetTree().Commit(false) return root, nil } // Step 2: Sync the subtree err = hg.syncSubtree(stream, shardKeyBytes, phaseSet, expectedRoot, syncPoint, set, logger) if err != nil { return nil, errors.Wrap(err, "sync subtree") } // Step 3: Recompute commitment so future syncs see updated state root := set.GetTree().Commit(false) logger.Info( "hypergraph root commit after sync", zap.String("root", hex.EncodeToString(root)), ) return root, nil } func (hg *HypergraphCRDT) navigateToSyncPoint( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, coveredPrefix []int, expectedRoot []byte, logger *zap.Logger, ) (*protobufs.HypergraphSyncBranchResponse, error) { path := []int32{} for { // Query server for branch at current path err := stream.Send(&protobufs.HypergraphSyncQuery{ Request: &protobufs.HypergraphSyncQuery_GetBranch{ GetBranch: &protobufs.HypergraphSyncGetBranchRequest{ ShardKey: shardKey, PhaseSet: phaseSet, Path: path, ExpectedRoot: expectedRoot, }, }, }) if err != nil { return nil, errors.Wrap(err, "send GetBranch request") } resp, err := stream.Recv() if err != nil { return nil, errors.Wrap(err, "receive GetBranch response") } if errResp := resp.GetError(); errResp != nil { if errResp.Code == protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND { // Server doesn't have this path - nothing to sync return nil, nil } return nil, errors.Errorf("server error: %s", errResp.Message) } branch := resp.GetBranch() if branch == nil { return nil, errors.New("unexpected response type") } logger.Debug("navigating", zap.String("path", hex.EncodeToString(packPath(path))), zap.String("fullPath", hex.EncodeToString(packPath(branch.FullPath))), zap.Int("coveredPrefixLen", len(coveredPrefix)), ) // If no covered prefix, root is the sync point if len(coveredPrefix) == 0 { return branch, nil } // Check if server's full path reaches or passes our covered prefix serverPath := toIntSlice(branch.FullPath) if isPrefixOrEqual(coveredPrefix, serverPath) { return branch, nil } // Need to navigate deeper - find next child to descend into if len(serverPath) >= len(coveredPrefix) { // Server path is longer but doesn't match our prefix // This means server has data outside our coverage return branch, nil } // Server path is shorter - we need to go deeper nextNibble := coveredPrefix[len(serverPath)] // Check if server has a child at this index found := false for _, child := range branch.Children { if int(child.Index) == nextNibble { found = true break } } if !found { // Server doesn't have the path we need logger.Debug("server missing path to covered prefix", zap.Int("nextNibble", nextNibble), ) return nil, nil } // Descend to next level path = append(branch.FullPath, int32(nextNibble)) } } func (hg *HypergraphCRDT) syncSubtree( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, expectedRoot []byte, serverBranch *protobufs.HypergraphSyncBranchResponse, localSet hypergraph.IdSet, logger *zap.Logger, ) error { tree := localSet.GetTree() // Get local node at same path var localCommitment []byte var localNode tries.LazyVectorCommitmentNode if tree != nil && tree.Root != nil { path := toIntSlice(serverBranch.FullPath) localNode = getNodeAtPath( logger, tree.SetType, tree.PhaseType, tree.ShardKey, tree.Root, serverBranch.FullPath, 0, ) if localNode != nil { localNode = ensureCommittedNode(logger, tree, path, localNode) switch n := localNode.(type) { case *tries.LazyVectorCommitmentBranchNode: localCommitment = n.Commitment case *tries.LazyVectorCommitmentLeafNode: localCommitment = n.Commitment } } } // If commitments match, subtrees are identical if bytes.Equal(localCommitment, serverBranch.Commitment) { logger.Debug("subtree matches", zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))), ) return nil } // If server node is a leaf or has no children, fetch all leaves if serverBranch.IsLeaf || len(serverBranch.Children) == 0 { return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger) } // If we have NO local data at this path, fetch all leaves directly. // This avoids N round trips for N children when we need all of them anyway. if localNode == nil { logger.Debug("no local data at path, fetching all leaves directly", zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))), zap.Int("serverChildren", len(serverBranch.Children)), ) return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger) } // Compare children and recurse localChildren := make(map[int32][]byte) if tree != nil && tree.Root != nil { path := toIntSlice(serverBranch.FullPath) if branch, ok := localNode.(*tries.LazyVectorCommitmentBranchNode); ok { for i := 0; i < 64; i++ { child := branch.Children[i] if child == nil { child, _ = branch.Store.GetNodeByPath( tree.SetType, tree.PhaseType, tree.ShardKey, slices.Concat(path, []int{i}), ) } if child != nil { childPath := slices.Concat(path, []int{i}) child = ensureCommittedNode(logger, tree, childPath, child) switch c := child.(type) { case *tries.LazyVectorCommitmentBranchNode: localChildren[int32(i)] = c.Commitment case *tries.LazyVectorCommitmentLeafNode: localChildren[int32(i)] = c.Commitment } } } } } for _, serverChild := range serverBranch.Children { localChildCommit := localChildren[serverChild.Index] if bytes.Equal(localChildCommit, serverChild.Commitment) { // Child matches, skip continue } // Need to sync this child childPath := append(slices.Clone(serverBranch.FullPath), serverChild.Index) // Query for child branch err := stream.Send(&protobufs.HypergraphSyncQuery{ Request: &protobufs.HypergraphSyncQuery_GetBranch{ GetBranch: &protobufs.HypergraphSyncGetBranchRequest{ ShardKey: shardKey, PhaseSet: phaseSet, Path: childPath, ExpectedRoot: expectedRoot, }, }, }) if err != nil { return errors.Wrap(err, "send GetBranch for child") } resp, err := stream.Recv() if err != nil { return errors.Wrap(err, "receive GetBranch response for child") } if errResp := resp.GetError(); errResp != nil { logger.Warn("error getting child branch", zap.String("error", errResp.Message), zap.String("path", hex.EncodeToString(packPath(childPath))), ) continue } childBranch := resp.GetBranch() if childBranch == nil { continue } // Recurse if err := hg.syncSubtree(stream, shardKey, phaseSet, expectedRoot, childBranch, localSet, logger); err != nil { return err } } return nil } func (hg *HypergraphCRDT) fetchAndIntegrateLeaves( stream protobufs.HypergraphComparisonService_PerformSyncClient, shardKey []byte, phaseSet protobufs.HypergraphPhaseSet, expectedRoot []byte, path []int32, localSet hypergraph.IdSet, logger *zap.Logger, ) error { logger.Debug("fetching leaves", zap.String("path", hex.EncodeToString(packPath(path))), ) var continuationToken []byte totalFetched := 0 for { err := stream.Send(&protobufs.HypergraphSyncQuery{ Request: &protobufs.HypergraphSyncQuery_GetLeaves{ GetLeaves: &protobufs.HypergraphSyncGetLeavesRequest{ ShardKey: shardKey, PhaseSet: phaseSet, Path: path, MaxLeaves: 1000, ContinuationToken: continuationToken, ExpectedRoot: expectedRoot, }, }, }) if err != nil { return errors.Wrap(err, "send GetLeaves request") } resp, err := stream.Recv() if err != nil { return errors.Wrap(err, "receive GetLeaves response") } if errResp := resp.GetError(); errResp != nil { return errors.Errorf("server error: %s", errResp.Message) } leavesResp := resp.GetLeaves() if leavesResp == nil { return errors.New("unexpected response type") } // Integrate leaves into local tree txn, err := hg.store.NewTransaction(false) if err != nil { return errors.Wrap(err, "create transaction") } for _, leaf := range leavesResp.Leaves { atom := AtomFromBytes(leaf.Value) // Persist underlying tree if present if len(leaf.UnderlyingData) > 0 { vtree, err := tries.DeserializeNonLazyTree(leaf.UnderlyingData) if err == nil { if err := hg.store.SaveVertexTree(txn, leaf.Key, vtree); err != nil { logger.Warn("failed to save vertex tree", zap.Error(err)) } } } if err := localSet.Add(txn, atom); err != nil { txn.Abort() return errors.Wrap(err, "add leaf to local set") } } if err := txn.Commit(); err != nil { return errors.Wrap(err, "commit transaction") } totalFetched += len(leavesResp.Leaves) logger.Debug("fetched leaves batch", zap.String("path", hex.EncodeToString(packPath(path))), zap.Int("count", len(leavesResp.Leaves)), zap.Int("totalFetched", totalFetched), zap.Uint64("totalAvailable", leavesResp.TotalLeaves), ) // Check if more leaves remain if len(leavesResp.ContinuationToken) == 0 { break } continuationToken = leavesResp.ContinuationToken } return nil } func isPrefixOrEqual(prefix, path []int) bool { if len(prefix) > len(path) { return false } for i, v := range prefix { if path[i] != v { return false } } return true }