ceremonyclient/hypergraph/sync_client_driven.go
2026-01-16 11:29:57 -06:00

888 lines
24 KiB
Go

package hypergraph
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"slices"
"strings"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
// syncSession holds the state for a PerformSync session.
type syncSession struct {
shardKey tries.ShardKey
phaseSet protobufs.HypergraphPhaseSet
snapshot *snapshotHandle
idSet hypergraph.IdSet
store tries.TreeBackingStore
}
// PerformSync implements the server side of the client-driven sync protocol.
// The client sends GetBranch and GetLeaves requests, and the server responds
// with the requested data. This is simpler than HyperStream because there's
// no need for both sides to walk in lockstep.
//
// The server uses a snapshot to ensure consistent reads throughout the session.
func (hg *HypergraphCRDT) PerformSync(
stream protobufs.HypergraphComparisonService_PerformSyncServer,
) error {
ctx := stream.Context()
logger := hg.logger.With(zap.String("method", "PerformSync"))
sessionStart := time.Now()
// Session state - initialized on first request
var session *syncSession
defer func() {
if session != nil && session.snapshot != nil {
hg.snapshotMgr.release(session.snapshot)
}
logger.Info(
"PerformSync session finished",
zap.Duration("duration", time.Since(sessionStart)),
)
}()
// Process requests until stream closes
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return errors.Wrap(err, "receive request")
}
var resp *protobufs.HypergraphSyncResponse
switch r := req.Request.(type) {
case *protobufs.HypergraphSyncQuery_GetBranch:
// Initialize session on first request
if session == nil {
session, err = hg.initSyncSession(
r.GetBranch.ShardKey,
r.GetBranch.PhaseSet,
r.GetBranch.ExpectedRoot,
logger,
)
if err != nil {
return errors.Wrap(err, "init sync session")
}
}
resp, err = hg.handleGetBranch(ctx, r.GetBranch, session, logger)
case *protobufs.HypergraphSyncQuery_GetLeaves:
// Initialize session on first request
if session == nil {
session, err = hg.initSyncSession(
r.GetLeaves.ShardKey,
r.GetLeaves.PhaseSet,
r.GetLeaves.ExpectedRoot,
logger,
)
if err != nil {
return errors.Wrap(err, "init sync session")
}
}
resp, err = hg.handleGetLeaves(ctx, r.GetLeaves, session, logger)
default:
resp = &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Error{
Error: &protobufs.HypergraphSyncError{
Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_UNKNOWN,
Message: "unknown request type",
},
},
}
}
if err != nil {
logger.Error("error handling request", zap.Error(err))
resp = &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Error{
Error: &protobufs.HypergraphSyncError{
Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_INTERNAL,
Message: err.Error(),
},
},
}
}
if err := stream.Send(resp); err != nil {
return errors.Wrap(err, "send response")
}
}
}
// initSyncSession initializes a sync session with a snapshot for consistent reads.
func (hg *HypergraphCRDT) initSyncSession(
shardKeyBytes []byte,
phaseSet protobufs.HypergraphPhaseSet,
expectedRoot []byte,
logger *zap.Logger,
) (*syncSession, error) {
if len(shardKeyBytes) != 35 {
return nil, errors.New("shard key must be 35 bytes")
}
shardKey := tries.ShardKey{
L1: [3]byte(shardKeyBytes[:3]),
L2: [32]byte(shardKeyBytes[3:]),
}
// Acquire a snapshot for consistent reads throughout the session.
// If expectedRoot is provided, we try to find a snapshot matching that root.
snapshot := hg.snapshotMgr.acquire(shardKey, expectedRoot)
if snapshot == nil {
return nil, errors.New("failed to acquire snapshot")
}
snapshotStore := snapshot.Store()
idSet := hg.snapshotPhaseSet(shardKey, phaseSet, snapshotStore)
if idSet == nil {
hg.snapshotMgr.release(snapshot)
return nil, errors.New("unsupported phase set")
}
tree := idSet.GetTree()
logger.Debug(
"sync session initialized",
zap.String("shard", hex.EncodeToString(shardKeyBytes)),
zap.Int("phaseSet", int(phaseSet)),
zap.Bool("tree_nil", tree == nil),
zap.Bool("root_nil", tree != nil && tree.Root == nil),
zap.String("snapshot_root", hex.EncodeToString(snapshot.Root())),
)
return &syncSession{
shardKey: shardKey,
phaseSet: phaseSet,
snapshot: snapshot,
idSet: idSet,
store: snapshotStore,
}, nil
}
func (hg *HypergraphCRDT) handleGetBranch(
ctx context.Context,
req *protobufs.HypergraphSyncGetBranchRequest,
session *syncSession,
logger *zap.Logger,
) (*protobufs.HypergraphSyncResponse, error) {
tree := session.idSet.GetTree()
if tree == nil || tree.Root == nil {
// Empty tree - return empty response
logger.Debug("handleGetBranch: empty tree",
zap.Bool("tree_nil", tree == nil),
zap.Bool("root_nil", tree != nil && tree.Root == nil),
zap.String("path", hex.EncodeToString(packPath(req.Path))),
)
return &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Branch{
Branch: &protobufs.HypergraphSyncBranchResponse{
FullPath: req.Path,
Commitment: nil,
Children: nil,
IsLeaf: true,
LeafCount: 0,
},
},
}, nil
}
path := toIntSlice(req.Path)
node := getNodeAtPath(
logger,
tree.SetType,
tree.PhaseType,
tree.ShardKey,
tree.Root,
toInt32Slice(path),
0,
)
if node == nil {
return &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Error{
Error: &protobufs.HypergraphSyncError{
Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND,
Message: "node not found at path",
Path: req.Path,
},
},
}, nil
}
resp := &protobufs.HypergraphSyncBranchResponse{}
// Ensure commitment is computed first
node = ensureCommittedNode(logger, tree, path, node)
switch n := node.(type) {
case *tries.LazyVectorCommitmentBranchNode:
resp.FullPath = toInt32Slice(n.FullPrefix)
resp.Commitment = n.Commitment
resp.IsLeaf = false
resp.LeafCount = uint64(n.LeafCount)
// Collect children
for i := 0; i < 64; i++ {
child := n.Children[i]
if child == nil {
var err error
child, err = n.Store.GetNodeByPath(
tree.SetType,
tree.PhaseType,
tree.ShardKey,
slices.Concat(n.FullPrefix, []int{i}),
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
continue
}
}
if child != nil {
childPath := slices.Concat(n.FullPrefix, []int{i})
child = ensureCommittedNode(logger, tree, childPath, child)
var childCommit []byte
switch c := child.(type) {
case *tries.LazyVectorCommitmentBranchNode:
childCommit = c.Commitment
case *tries.LazyVectorCommitmentLeafNode:
childCommit = c.Commitment
}
if len(childCommit) > 0 {
resp.Children = append(resp.Children, &protobufs.HypergraphSyncChildInfo{
Index: int32(i),
Commitment: childCommit,
})
}
}
}
case *tries.LazyVectorCommitmentLeafNode:
resp.FullPath = req.Path // Leaves don't have FullPrefix, use requested path
resp.Commitment = n.Commitment
resp.IsLeaf = true
resp.LeafCount = 1
}
return &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Branch{
Branch: resp,
},
}, nil
}
func (hg *HypergraphCRDT) handleGetLeaves(
ctx context.Context,
req *protobufs.HypergraphSyncGetLeavesRequest,
session *syncSession,
logger *zap.Logger,
) (*protobufs.HypergraphSyncResponse, error) {
tree := session.idSet.GetTree()
if tree == nil || tree.Root == nil {
return &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Leaves{
Leaves: &protobufs.HypergraphSyncLeavesResponse{
Path: req.Path,
Leaves: nil,
TotalLeaves: 0,
},
},
}, nil
}
path := toIntSlice(req.Path)
node := getNodeAtPath(
logger,
tree.SetType,
tree.PhaseType,
tree.ShardKey,
tree.Root,
toInt32Slice(path),
0,
)
if node == nil {
return &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Error{
Error: &protobufs.HypergraphSyncError{
Code: protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND,
Message: "node not found at path",
Path: req.Path,
},
},
}, nil
}
// Debug: log node details to identify snapshot consistency issues
var nodeCommitment []byte
var nodeStore string
switch n := node.(type) {
case *tries.LazyVectorCommitmentBranchNode:
nodeCommitment = n.Commitment
nodeStore = fmt.Sprintf("%p", n.Store)
case *tries.LazyVectorCommitmentLeafNode:
nodeCommitment = n.Commitment
nodeStore = fmt.Sprintf("%p", n.Store)
}
logger.Debug("handleGetLeaves node info",
zap.String("path", hex.EncodeToString(packPath(req.Path))),
zap.String("nodeCommitment", hex.EncodeToString(nodeCommitment)),
zap.String("nodeStore", nodeStore),
zap.String("sessionStore", fmt.Sprintf("%p", session.store)),
zap.Int("contTokenLen", len(req.ContinuationToken)),
)
// Get all leaves under this node
allLeaves := tries.GetAllLeaves(
tree.SetType,
tree.PhaseType,
tree.ShardKey,
node,
)
// Apply pagination
maxLeaves := int(req.MaxLeaves)
if maxLeaves == 0 {
maxLeaves = 1000 // Default batch size
}
startIdx := 0
if len(req.ContinuationToken) > 0 {
// Simple continuation: token is the start index as hex
idx, err := parseContToken(req.ContinuationToken)
if err == nil {
startIdx = idx
}
}
var leaves []*protobufs.LeafData
var totalNonNil uint64
for i, leaf := range allLeaves {
if leaf == nil {
continue
}
totalNonNil++
if int(totalNonNil) <= startIdx {
continue
}
if len(leaves) >= maxLeaves {
break
}
leafData := &protobufs.LeafData{
Key: leaf.Key,
Value: leaf.Value,
HashTarget: leaf.HashTarget,
Size: leaf.Size.FillBytes(make([]byte, 32)),
}
// Load underlying vertex tree if available (use snapshot store for consistency)
vtree, err := session.store.LoadVertexTree(leaf.Key)
if err == nil && vtree != nil {
data, err := tries.SerializeNonLazyTree(vtree)
if err == nil {
leafData.UnderlyingData = data
}
}
leaves = append(leaves, leafData)
_ = i // suppress unused warning
}
// Debug: log leaf count details
logger.Debug("handleGetLeaves returning",
zap.String("path", hex.EncodeToString(packPath(req.Path))),
zap.Int("allLeavesLen", len(allLeaves)),
zap.Uint64("totalNonNil", totalNonNil),
zap.Int("startIdx", startIdx),
zap.Int("leavesReturned", len(leaves)),
zap.String("treePtr", fmt.Sprintf("%p", tree)),
zap.String("treeRootPtr", fmt.Sprintf("%p", tree.Root)),
)
resp := &protobufs.HypergraphSyncLeavesResponse{
Path: req.Path,
Leaves: leaves,
TotalLeaves: totalNonNil,
}
// Set continuation token if more leaves remain
if startIdx+len(leaves) < int(totalNonNil) {
resp.ContinuationToken = makeContToken(startIdx + len(leaves))
}
return &protobufs.HypergraphSyncResponse{
Response: &protobufs.HypergraphSyncResponse_Leaves{
Leaves: resp,
},
}, nil
}
func (hg *HypergraphCRDT) getPhaseSet(
shardKey tries.ShardKey,
phaseSet protobufs.HypergraphPhaseSet,
) hypergraph.IdSet {
switch phaseSet {
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
return hg.getVertexAddsSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
return hg.getVertexRemovesSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
return hg.getHyperedgeAddsSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
return hg.getHyperedgeRemovesSet(shardKey)
default:
return nil
}
}
func parseContToken(token []byte) (int, error) {
if len(token) == 0 {
return 0, nil
}
// Token is hex-encoded 4 bytes (big-endian int32)
decoded, err := hex.DecodeString(string(token))
if err != nil {
return 0, err
}
if len(decoded) != 4 {
return 0, errors.New("invalid continuation token length")
}
idx := int(decoded[0])<<24 | int(decoded[1])<<16 | int(decoded[2])<<8 | int(decoded[3])
return idx, nil
}
func makeContToken(idx int) []byte {
return []byte(hex.EncodeToString([]byte{byte(idx >> 24), byte(idx >> 16), byte(idx >> 8), byte(idx)}))
}
// SyncFrom performs a client-driven sync from the given server stream.
// It navigates to the covered prefix (if any), then recursively syncs
// differing subtrees. If expectedRoot is provided, the server will attempt
// to sync from a snapshot matching that root commitment.
// Returns the new root commitment after sync completes.
func (hg *HypergraphCRDT) SyncFrom(
stream protobufs.HypergraphComparisonService_PerformSyncClient,
shardKey tries.ShardKey,
phaseSet protobufs.HypergraphPhaseSet,
expectedRoot []byte,
) ([]byte, error) {
hg.mu.Lock()
defer hg.mu.Unlock()
logger := hg.logger.With(
zap.String("method", "SyncFrom"),
zap.String("shard", hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:]))),
)
if len(expectedRoot) > 0 {
logger = logger.With(zap.String("expectedRoot", hex.EncodeToString(expectedRoot)))
}
syncStart := time.Now()
defer func() {
logger.Info("SyncFrom completed", zap.Duration("duration", time.Since(syncStart)))
}()
set := hg.getPhaseSet(shardKey, phaseSet)
if set == nil {
return nil, errors.New("unsupported phase set")
}
shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:])
coveredPrefix := hg.getCoveredPrefix()
// Step 1: Navigate to sync point
syncPoint, err := hg.navigateToSyncPoint(stream, shardKeyBytes, phaseSet, coveredPrefix, expectedRoot, logger)
if err != nil {
return nil, errors.Wrap(err, "navigate to sync point")
}
if syncPoint == nil || len(syncPoint.Commitment) == 0 {
logger.Info("server has no data at sync point")
// Return current root even if no data was synced
root := set.GetTree().Commit(false)
return root, nil
}
// Step 2: Sync the subtree
err = hg.syncSubtree(stream, shardKeyBytes, phaseSet, expectedRoot, syncPoint, set, logger)
if err != nil {
return nil, errors.Wrap(err, "sync subtree")
}
// Step 3: Recompute commitment so future syncs see updated state
root := set.GetTree().Commit(false)
logger.Info(
"hypergraph root commit after sync",
zap.String("root", hex.EncodeToString(root)),
)
return root, nil
}
func (hg *HypergraphCRDT) navigateToSyncPoint(
stream protobufs.HypergraphComparisonService_PerformSyncClient,
shardKey []byte,
phaseSet protobufs.HypergraphPhaseSet,
coveredPrefix []int,
expectedRoot []byte,
logger *zap.Logger,
) (*protobufs.HypergraphSyncBranchResponse, error) {
path := []int32{}
for {
// Query server for branch at current path
err := stream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetBranch{
GetBranch: &protobufs.HypergraphSyncGetBranchRequest{
ShardKey: shardKey,
PhaseSet: phaseSet,
Path: path,
ExpectedRoot: expectedRoot,
},
},
})
if err != nil {
return nil, errors.Wrap(err, "send GetBranch request")
}
resp, err := stream.Recv()
if err != nil {
return nil, errors.Wrap(err, "receive GetBranch response")
}
if errResp := resp.GetError(); errResp != nil {
if errResp.Code == protobufs.HypergraphSyncErrorCode_HYPERGRAPH_SYNC_ERROR_NODE_NOT_FOUND {
// Server doesn't have this path - nothing to sync
return nil, nil
}
return nil, errors.Errorf("server error: %s", errResp.Message)
}
branch := resp.GetBranch()
if branch == nil {
return nil, errors.New("unexpected response type")
}
logger.Debug("navigating",
zap.String("path", hex.EncodeToString(packPath(path))),
zap.String("fullPath", hex.EncodeToString(packPath(branch.FullPath))),
zap.Int("coveredPrefixLen", len(coveredPrefix)),
)
// If no covered prefix, root is the sync point
if len(coveredPrefix) == 0 {
return branch, nil
}
// Check if server's full path reaches or passes our covered prefix
serverPath := toIntSlice(branch.FullPath)
if isPrefixOrEqual(coveredPrefix, serverPath) {
return branch, nil
}
// Need to navigate deeper - find next child to descend into
if len(serverPath) >= len(coveredPrefix) {
// Server path is longer but doesn't match our prefix
// This means server has data outside our coverage
return branch, nil
}
// Server path is shorter - we need to go deeper
nextNibble := coveredPrefix[len(serverPath)]
// Check if server has a child at this index
found := false
for _, child := range branch.Children {
if int(child.Index) == nextNibble {
found = true
break
}
}
if !found {
// Server doesn't have the path we need
logger.Debug("server missing path to covered prefix",
zap.Int("nextNibble", nextNibble),
)
return nil, nil
}
// Descend to next level
path = append(branch.FullPath, int32(nextNibble))
}
}
func (hg *HypergraphCRDT) syncSubtree(
stream protobufs.HypergraphComparisonService_PerformSyncClient,
shardKey []byte,
phaseSet protobufs.HypergraphPhaseSet,
expectedRoot []byte,
serverBranch *protobufs.HypergraphSyncBranchResponse,
localSet hypergraph.IdSet,
logger *zap.Logger,
) error {
tree := localSet.GetTree()
// Get local node at same path
var localCommitment []byte
var localNode tries.LazyVectorCommitmentNode
if tree != nil && tree.Root != nil {
path := toIntSlice(serverBranch.FullPath)
localNode = getNodeAtPath(
logger,
tree.SetType,
tree.PhaseType,
tree.ShardKey,
tree.Root,
serverBranch.FullPath,
0,
)
if localNode != nil {
localNode = ensureCommittedNode(logger, tree, path, localNode)
switch n := localNode.(type) {
case *tries.LazyVectorCommitmentBranchNode:
localCommitment = n.Commitment
case *tries.LazyVectorCommitmentLeafNode:
localCommitment = n.Commitment
}
}
}
// If commitments match, subtrees are identical
if bytes.Equal(localCommitment, serverBranch.Commitment) {
logger.Debug("subtree matches",
zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))),
)
return nil
}
// If server node is a leaf or has no children, fetch all leaves
if serverBranch.IsLeaf || len(serverBranch.Children) == 0 {
return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger)
}
// OPTIMIZATION: If we have NO local data at this path, skip the branch-by-branch
// traversal and just fetch all leaves directly. This avoids N round trips for N
// children when we know we need all of them anyway.
if localNode == nil {
logger.Debug("no local data at path, fetching all leaves directly",
zap.String("path", hex.EncodeToString(packPath(serverBranch.FullPath))),
zap.Int("serverChildren", len(serverBranch.Children)),
)
return hg.fetchAndIntegrateLeaves(stream, shardKey, phaseSet, expectedRoot, serverBranch.FullPath, localSet, logger)
}
// Compare children and recurse
localChildren := make(map[int32][]byte)
if tree != nil && tree.Root != nil {
path := toIntSlice(serverBranch.FullPath)
if branch, ok := localNode.(*tries.LazyVectorCommitmentBranchNode); ok {
for i := 0; i < 64; i++ {
child := branch.Children[i]
if child == nil {
child, _ = branch.Store.GetNodeByPath(
tree.SetType,
tree.PhaseType,
tree.ShardKey,
slices.Concat(path, []int{i}),
)
}
if child != nil {
childPath := slices.Concat(path, []int{i})
child = ensureCommittedNode(logger, tree, childPath, child)
switch c := child.(type) {
case *tries.LazyVectorCommitmentBranchNode:
localChildren[int32(i)] = c.Commitment
case *tries.LazyVectorCommitmentLeafNode:
localChildren[int32(i)] = c.Commitment
}
}
}
}
}
for _, serverChild := range serverBranch.Children {
localChildCommit := localChildren[serverChild.Index]
if bytes.Equal(localChildCommit, serverChild.Commitment) {
// Child matches, skip
continue
}
// Need to sync this child
childPath := append(slices.Clone(serverBranch.FullPath), serverChild.Index)
// Query for child branch
err := stream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetBranch{
GetBranch: &protobufs.HypergraphSyncGetBranchRequest{
ShardKey: shardKey,
PhaseSet: phaseSet,
Path: childPath,
ExpectedRoot: expectedRoot,
},
},
})
if err != nil {
return errors.Wrap(err, "send GetBranch for child")
}
resp, err := stream.Recv()
if err != nil {
return errors.Wrap(err, "receive GetBranch response for child")
}
if errResp := resp.GetError(); errResp != nil {
logger.Warn("error getting child branch",
zap.String("error", errResp.Message),
zap.String("path", hex.EncodeToString(packPath(childPath))),
)
continue
}
childBranch := resp.GetBranch()
if childBranch == nil {
continue
}
// Recurse
if err := hg.syncSubtree(stream, shardKey, phaseSet, expectedRoot, childBranch, localSet, logger); err != nil {
return err
}
}
return nil
}
func (hg *HypergraphCRDT) fetchAndIntegrateLeaves(
stream protobufs.HypergraphComparisonService_PerformSyncClient,
shardKey []byte,
phaseSet protobufs.HypergraphPhaseSet,
expectedRoot []byte,
path []int32,
localSet hypergraph.IdSet,
logger *zap.Logger,
) error {
logger.Debug("fetching leaves",
zap.String("path", hex.EncodeToString(packPath(path))),
)
var continuationToken []byte
totalFetched := 0
for {
err := stream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetLeaves{
GetLeaves: &protobufs.HypergraphSyncGetLeavesRequest{
ShardKey: shardKey,
PhaseSet: phaseSet,
Path: path,
MaxLeaves: 1000,
ContinuationToken: continuationToken,
ExpectedRoot: expectedRoot,
},
},
})
if err != nil {
return errors.Wrap(err, "send GetLeaves request")
}
resp, err := stream.Recv()
if err != nil {
return errors.Wrap(err, "receive GetLeaves response")
}
if errResp := resp.GetError(); errResp != nil {
return errors.Errorf("server error: %s", errResp.Message)
}
leavesResp := resp.GetLeaves()
if leavesResp == nil {
return errors.New("unexpected response type")
}
// Integrate leaves into local tree
txn, err := hg.store.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "create transaction")
}
for _, leaf := range leavesResp.Leaves {
atom := AtomFromBytes(leaf.Value)
// Persist underlying tree if present
if len(leaf.UnderlyingData) > 0 {
vtree, err := tries.DeserializeNonLazyTree(leaf.UnderlyingData)
if err == nil {
if err := hg.store.SaveVertexTree(txn, leaf.Key, vtree); err != nil {
logger.Warn("failed to save vertex tree", zap.Error(err))
}
}
}
if err := localSet.Add(txn, atom); err != nil {
txn.Abort()
return errors.Wrap(err, "add leaf to local set")
}
}
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "commit transaction")
}
totalFetched += len(leavesResp.Leaves)
logger.Debug("fetched leaves batch",
zap.String("path", hex.EncodeToString(packPath(path))),
zap.Int("count", len(leavesResp.Leaves)),
zap.Int("totalFetched", totalFetched),
zap.Uint64("totalAvailable", leavesResp.TotalLeaves),
)
// Check if more leaves remain
if len(leavesResp.ContinuationToken) == 0 {
break
}
continuationToken = leavesResp.ContinuationToken
}
return nil
}
func isPrefixOrEqual(prefix, path []int) bool {
if len(prefix) > len(path) {
return false
}
for i, v := range prefix {
if path[i] != v {
return false
}
}
return true
}