ceremonyclient/hypergraph/sync.go
Cassandra Heart 0425b38fa2
v2.1.0.15 (#485)
* v2.1.0.15

* add release notes
2025-12-09 21:55:18 -06:00

2323 lines
54 KiB
Go

package hypergraph
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"net"
"slices"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc/peer"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
// HyperStream is the gRPC method that handles synchronization.
func (hg *HypergraphCRDT) HyperStream(
stream protobufs.HypergraphComparisonService_HyperStreamServer,
) (err error) {
requestCtx := stream.Context()
ctx, shutdownCancel := hg.contextWithShutdown(requestCtx)
defer shutdownCancel()
sessionLogger := hg.logger
sessionStart := time.Now()
defer func() {
sessionLogger.Info(
"hyperstream session finished",
zap.Duration("session_duration", time.Since(sessionStart)),
zap.Error(err),
)
}()
identifyStart := time.Now()
peerId, err := hg.authenticationProvider.Identify(requestCtx)
if err != nil {
return errors.Wrap(err, "hyper stream")
}
sessionLogger = sessionLogger.With(zap.String("peer_id", peerId.String()))
sessionLogger.Debug(
"identified peer",
zap.Duration("duration", time.Since(identifyStart)),
)
peerKey := peerId.String()
if addr := peerIPFromContext(requestCtx); addr != "" {
sessionLogger = sessionLogger.With(zap.String("peer_ip", addr))
}
if !hg.syncController.TryEstablishSyncSession(peerKey) {
return errors.New("peer already syncing")
}
defer func() {
hg.syncController.EndSyncSession(peerKey)
}()
snapshotStart := time.Now()
handle := hg.snapshotMgr.acquire()
if handle == nil {
return errors.New("hypergraph snapshot unavailable")
}
defer hg.snapshotMgr.release(handle)
sessionLogger.Debug(
"snapshot acquisition complete",
zap.Duration("duration", time.Since(snapshotStart)),
)
root := handle.Root()
if len(root) != 0 {
sessionLogger.Debug(
"acquired snapshot",
zap.String("root", hex.EncodeToString(root)),
)
} else {
sessionLogger.Debug("acquired snapshot", zap.String("root", ""))
}
snapshotStore := handle.Store()
syncStart := time.Now()
err = hg.syncTreeServer(ctx, stream, snapshotStore, root, sessionLogger, handle)
sessionLogger.Info(
"syncTreeServer completed",
zap.Duration("sync_duration", time.Since(syncStart)),
zap.Error(err),
)
hg.syncController.SetStatus(peerKey, &hypergraph.SyncInfo{
Unreachable: false,
LastSynced: time.Now(),
})
return err
}
// Sync performs the tree diff and synchronization from the client side.
// The caller (e.g. the client) must initiate the diff from its root.
// After that, both sides exchange queries, branch info, and leaf updates until
// their local trees are synchronized.
func (hg *HypergraphCRDT) Sync(
stream protobufs.HypergraphComparisonService_HyperStreamClient,
shardKey tries.ShardKey,
phaseSet protobufs.HypergraphPhaseSet,
) (err error) {
const localSyncKey = "local-sync"
if !hg.syncController.TryEstablishSyncSession(localSyncKey) {
return errors.New("local sync already in progress")
}
defer func() {
hg.syncController.EndSyncSession(localSyncKey)
}()
hg.mu.Lock()
defer hg.mu.Unlock()
syncStart := time.Now()
shardKeyHex := hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:]))
hg.logger.Info(
"sync started",
zap.String("shard_key", shardKeyHex),
zap.Int("phase_set", int(phaseSet)),
)
defer func() {
hg.logger.Info(
"sync finished",
zap.String("shard_key", shardKeyHex),
zap.Duration("duration", time.Since(syncStart)),
zap.Error(err),
)
}()
var set hypergraph.IdSet
switch phaseSet {
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
set = hg.getVertexAddsSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
set = hg.getVertexRemovesSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
set = hg.getHyperedgeAddsSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
set = hg.getHyperedgeRemovesSet(shardKey)
default:
return errors.New("unsupported phase set")
}
path := hg.getCoveredPrefix()
// Send initial query for path
sendStart := time.Now()
if err := stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Query{
Query: &protobufs.HypergraphComparisonQuery{
ShardKey: slices.Concat(shardKey.L1[:], shardKey.L2[:]),
PhaseSet: phaseSet,
Path: toInt32Slice(path),
Commitment: set.GetTree().Commit(false),
IncludeLeafData: false,
},
},
}); err != nil {
return err
}
hg.logger.Debug(
"sent initialization message",
zap.String("shard_key", shardKeyHex),
zap.Duration("duration", time.Since(sendStart)),
)
// hg.logger.Debug("server waiting for initial query")
recvStart := time.Now()
msg, err := stream.Recv()
if err != nil {
hg.logger.Info("initial recv failed", zap.Error(err))
return err
}
hg.logger.Debug(
"received initialization response",
zap.String("shard_key", shardKeyHex),
zap.Duration("duration", time.Since(recvStart)),
)
response := msg.GetResponse()
if response == nil {
return errors.New(
"server did not send valid initialization response message",
)
}
branchInfoStart := time.Now()
branchInfo, err := getBranchInfoFromTree(
hg.logger,
set.GetTree(),
toInt32Slice(path),
)
if err != nil {
return err
}
hg.logger.Debug(
"constructed branch info",
zap.String("shard_key", shardKeyHex),
zap.Duration("duration", time.Since(branchInfoStart)),
)
resp := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Response{
Response: branchInfo,
},
}
responseSendStart := time.Now()
if err := stream.Send(resp); err != nil {
return err
}
hg.logger.Debug(
"sent initial branch info",
zap.String("shard_key", shardKeyHex),
zap.Duration("duration", time.Since(responseSendStart)),
)
ctx, cancel := context.WithCancel(stream.Context())
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery](
cancel,
"client incoming",
)
incomingResponsesIn, incomingResponsesOut :=
UnboundedChan[*protobufs.HypergraphComparisonResponse](
cancel,
"client incoming",
)
incomingLeavesIn, incomingLeavesOut :=
UnboundedChan[*protobufs.HypergraphComparison](
cancel,
"client incoming",
)
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
// hg.logger.Debug("stream closed by sender")
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
return
}
if err != nil {
// hg.logger.Debug("error from stream", zap.Error(err))
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
return
}
if msg == nil {
continue
}
switch m := msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
incomingLeavesIn <- msg
case *protobufs.HypergraphComparison_Metadata:
incomingLeavesIn <- msg
case *protobufs.HypergraphComparison_Query:
incomingQueriesIn <- m.Query
case *protobufs.HypergraphComparison_Response:
incomingResponsesIn <- m.Response
}
}
}()
manager := &streamManager{
ctx: ctx,
cancel: cancel,
logger: hg.logger,
stream: stream,
hypergraphStore: hg.store,
localTree: set.GetTree(),
localSet: set,
lastSent: time.Now(),
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := manager.walk(
branchInfo.Path,
branchInfo,
response,
incomingLeavesOut,
incomingQueriesOut,
incomingResponsesOut,
true,
false,
shardKey,
phaseSet,
)
if err != nil {
hg.logger.Debug("error while syncing", zap.Error(err))
}
}()
wg.Wait()
hg.logger.Info(
"hypergraph root commit",
zap.String("root", hex.EncodeToString(set.GetTree().Commit(false))),
)
return nil
}
func (hg *HypergraphCRDT) GetChildrenForPath(
ctx context.Context,
request *protobufs.GetChildrenForPathRequest,
) (*protobufs.GetChildrenForPathResponse, error) {
if len(request.ShardKey) != 35 {
return nil, errors.New("invalid shard key")
}
shardKey := tries.ShardKey{
L1: [3]byte(request.ShardKey[:3]),
L2: [32]byte(request.ShardKey[3:]),
}
var set hypergraph.IdSet
switch request.PhaseSet {
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
set = hg.getVertexAddsSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
set = hg.getVertexRemovesSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
set = hg.getHyperedgeAddsSet(shardKey)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
set = hg.getHyperedgeRemovesSet(shardKey)
}
path := []int{}
for _, p := range request.Path {
path = append(path, int(p))
}
segments, err := getSegments(
set.GetTree(),
path,
)
if err != nil {
return nil, errors.Wrap(err, "get children for path")
}
response := &protobufs.GetChildrenForPathResponse{
PathSegments: []*protobufs.TreePathSegments{},
}
for _, segment := range segments {
pathSegments := &protobufs.TreePathSegments{}
for i, seg := range segment {
if seg == nil {
continue
}
switch t := seg.(type) {
case *tries.LazyVectorCommitmentBranchNode:
pathSegments.Segments = append(
pathSegments.Segments,
&protobufs.TreePathSegment{
Index: uint32(i),
Segment: &protobufs.TreePathSegment_Branch{
Branch: &protobufs.TreePathBranch{
Prefix: toUint32Slice(t.Prefix),
Commitment: t.Commitment,
Size: t.Size.Bytes(),
LeafCount: uint64(t.LeafCount),
LongestBranch: uint32(t.LongestBranch),
FullPrefix: toUint32Slice(t.FullPrefix),
},
},
},
)
case *tries.LazyVectorCommitmentLeafNode:
var data []byte
tree, err := hg.store.LoadVertexTree(t.Key)
if err == nil {
data, err = tries.SerializeNonLazyTree(tree)
if err != nil {
return nil, errors.Wrap(err, "get children for path")
}
}
pathSegments.Segments = append(
pathSegments.Segments,
&protobufs.TreePathSegment{
Index: uint32(i),
Segment: &protobufs.TreePathSegment_Leaf{
Leaf: &protobufs.TreePathLeaf{
Key: t.Key,
Value: data,
HashTarget: t.HashTarget,
Commitment: t.Commitment,
Size: t.Size.Bytes(),
},
},
},
)
}
}
response.PathSegments = append(response.PathSegments, pathSegments)
}
return response, nil
}
func toUint32Slice(s []int) []uint32 {
o := []uint32{}
for _, p := range s {
o = append(o, uint32(p))
}
return o
}
func toInt32Slice(s []int) []int32 {
o := []int32{}
for _, p := range s {
o = append(o, int32(p))
}
return o
}
func isPrefix(prefix []int, path []int) bool {
if len(prefix) > len(path) {
return false
}
for i := range prefix {
if prefix[i] != path[i] {
return false
}
}
return true
}
func getChildSegments(
setType string,
phaseType string,
shardKey tries.ShardKey,
node *tries.LazyVectorCommitmentBranchNode,
path []int,
) ([64]tries.LazyVectorCommitmentNode, int) {
nodes := [64]tries.LazyVectorCommitmentNode{}
index := 0
for i, child := range node.Children {
if child == nil {
var err error
prefix := slices.Concat(node.FullPrefix, []int{i})
child, err = node.Store.GetNodeByPath(
setType,
phaseType,
shardKey,
prefix,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
panic(err)
}
if isPrefix(prefix, path) {
index = i
}
}
if child != nil {
nodes[i] = child
}
}
return nodes, index
}
func getSegments(tree *tries.LazyVectorCommitmentTree, path []int) (
[][64]tries.LazyVectorCommitmentNode,
error,
) {
segments := [][64]tries.LazyVectorCommitmentNode{
{tree.Root},
}
node := tree.Root
for node != nil {
switch t := node.(type) {
case *tries.LazyVectorCommitmentBranchNode:
segment, index := getChildSegments(
tree.SetType,
tree.PhaseType,
tree.ShardKey,
t,
path,
)
segments = append(segments, segment)
node = segment[index]
case *tries.LazyVectorCommitmentLeafNode:
node = nil
}
}
return segments, nil
}
type streamManager struct {
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
stream hypergraph.HyperStream
hypergraphStore tries.TreeBackingStore
localTree *tries.LazyVectorCommitmentTree
localSet hypergraph.IdSet
lastSent time.Time
snapshot *snapshotHandle
}
type rawVertexSaver interface {
SaveVertexTreeRaw(
txn tries.TreeBackingStoreTransaction,
id []byte,
data []byte,
) error
}
const (
leafAckMinTimeout = 30 * time.Second
leafAckMaxTimeout = 10 * time.Minute
leafAckPerLeafBudget = 20 * time.Millisecond // Generous budget for tree building overhead
)
func leafAckTimeout(count uint64) time.Duration {
// Calculate timeout with per-leaf budget plus a base overhead
baseOverhead := 30 * time.Second
timeout := baseOverhead + time.Duration(count)*leafAckPerLeafBudget
if timeout < leafAckMinTimeout {
return leafAckMinTimeout
}
if timeout > leafAckMaxTimeout {
return leafAckMaxTimeout
}
return timeout
}
func shouldUseRawSync(phaseSet protobufs.HypergraphPhaseSet) bool {
return phaseSet == protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS
}
// rawShardSync performs a full raw sync of all leaves from server to client.
// This iterates directly over the database, bypassing in-memory tree caching
// to ensure all leaves are sent even if the in-memory tree is stale.
func (s *streamManager) rawShardSync(
shardKey tries.ShardKey,
phaseSet protobufs.HypergraphPhaseSet,
incomingLeaves <-chan *protobufs.HypergraphComparison,
) error {
shardHex := hex.EncodeToString(shardKey.L2[:])
s.logger.Info(
"SERVER: starting raw shard sync (direct DB iteration)",
zap.String("shard_key", shardHex),
)
start := time.Now()
// Determine set and phase type strings
setType := string(hypergraph.VertexAtomType)
phaseType := string(hypergraph.AddsPhaseType)
switch phaseSet {
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
setType = string(hypergraph.VertexAtomType)
phaseType = string(hypergraph.AddsPhaseType)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
setType = string(hypergraph.VertexAtomType)
phaseType = string(hypergraph.RemovesPhaseType)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
setType = string(hypergraph.HyperedgeAtomType)
phaseType = string(hypergraph.AddsPhaseType)
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
setType = string(hypergraph.HyperedgeAtomType)
phaseType = string(hypergraph.RemovesPhaseType)
}
// Get raw leaf iterator from the database
iter, err := s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey)
if err != nil {
s.logger.Error(
"SERVER: failed to create raw leaf iterator",
zap.String("shard_key", shardHex),
zap.Error(err),
)
return errors.Wrap(err, "raw shard sync")
}
defer iter.Close()
// First pass: count leaves
var count uint64
for valid := iter.First(); valid; valid = iter.Next() {
leaf, err := iter.Leaf()
if err != nil {
// Skip non-leaf nodes (branches)
continue
}
if leaf != nil {
count++
}
}
s.logger.Info(
"SERVER: raw sync sending metadata",
zap.String("shard_key", shardHex),
zap.Uint64("leaf_count", count),
)
// Send metadata with leaf count
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
},
}); err != nil {
return errors.Wrap(err, "raw shard sync: send metadata")
}
// Create new iterator for sending (previous one is exhausted)
iter.Close()
iter, err = s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey)
if err != nil {
return errors.Wrap(err, "raw shard sync: recreate iterator")
}
defer iter.Close()
// Second pass: send leaves
var sent uint64
for valid := iter.First(); valid; valid = iter.Next() {
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
}
leaf, err := iter.Leaf()
if err != nil {
// Skip non-leaf nodes
continue
}
if leaf == nil {
continue
}
update := &protobufs.LeafData{
Key: leaf.Key,
Value: leaf.Value,
HashTarget: leaf.HashTarget,
Size: leaf.Size,
UnderlyingData: leaf.UnderlyingData,
}
msg := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_LeafData{
LeafData: update,
},
}
if err := s.stream.Send(msg); err != nil {
return errors.Wrap(err, "raw shard sync: send leaf")
}
sent++
if sent%1000 == 0 {
s.logger.Debug(
"SERVER: raw sync progress",
zap.Uint64("sent", sent),
zap.Uint64("total", count),
)
}
}
s.logger.Info(
"SERVER: raw sync sent all leaves, waiting for ack",
zap.String("shard_key", shardHex),
zap.Uint64("sent", sent),
)
// Wait for acknowledgment
timeoutTimer := time.NewTimer(leafAckTimeout(count))
defer timeoutTimer.Stop()
select {
case <-s.ctx.Done():
return errors.Wrap(s.ctx.Err(), "raw shard sync: wait ack")
case msg, ok := <-incomingLeaves:
if !ok {
return errors.Wrap(errors.New("channel closed"), "raw shard sync: wait ack")
}
meta := msg.GetMetadata()
if meta == nil {
return errors.Wrap(errors.New("expected metadata ack"), "raw shard sync: wait ack")
}
if meta.Leaves != count {
return errors.Wrap(
fmt.Errorf("ack mismatch: expected %d, got %d", count, meta.Leaves),
"raw shard sync: wait ack",
)
}
case <-timeoutTimer.C:
return errors.Wrap(errors.New("timeout waiting for ack"), "raw shard sync")
}
s.logger.Info(
"SERVER: raw shard sync completed",
zap.String("shard_key", shardHex),
zap.Uint64("leaves_sent", sent),
zap.Duration("duration", time.Since(start)),
)
return nil
}
// receiveRawShardSync receives a full raw sync of all leaves from server.
// It uses tree insertion to properly build the tree structure on the client.
func (s *streamManager) receiveRawShardSync(
incomingLeaves <-chan *protobufs.HypergraphComparison,
) error {
start := time.Now()
s.logger.Info("CLIENT: starting receiveRawShardSync")
expectedLeaves, err := s.awaitRawLeafMetadata(incomingLeaves)
if err != nil {
s.logger.Error("CLIENT: failed to receive metadata", zap.Error(err))
return err
}
s.logger.Info(
"CLIENT: received metadata",
zap.Uint64("expected_leaves", expectedLeaves),
)
var txn tries.TreeBackingStoreTransaction
var processed uint64
for processed < expectedLeaves {
if processed%100 == 0 {
if txn != nil {
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "receive raw shard sync")
}
}
txn, err = s.hypergraphStore.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "receive raw shard sync")
}
}
leafMsg, err := s.awaitLeafData(incomingLeaves)
if err != nil {
if txn != nil {
txn.Abort()
}
s.logger.Error(
"CLIENT: failed to receive leaf",
zap.Uint64("processed", processed),
zap.Uint64("expected", expectedLeaves),
zap.Error(err),
)
return err
}
// Deserialize the atom from the raw value
theirs := AtomFromBytes(leafMsg.Value)
if theirs == nil {
if txn != nil {
txn.Abort()
}
return errors.Wrap(
errors.New("invalid atom"),
"receive raw shard sync",
)
}
// Persist underlying vertex tree data if present
if len(leafMsg.UnderlyingData) > 0 {
if saver, ok := s.hypergraphStore.(rawVertexSaver); ok {
if err := saver.SaveVertexTreeRaw(
txn,
leafMsg.Key,
leafMsg.UnderlyingData,
); err != nil {
txn.Abort()
return errors.Wrap(err, "receive raw shard sync: save vertex tree")
}
}
}
// Use Add to properly build tree structure
if err := s.localSet.Add(txn, theirs); err != nil {
txn.Abort()
return errors.Wrap(err, "receive raw shard sync: add atom")
}
processed++
if processed%1000 == 0 {
s.logger.Debug(
"CLIENT: raw sync progress",
zap.Uint64("processed", processed),
zap.Uint64("expected", expectedLeaves),
)
}
}
if txn != nil {
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "receive raw shard sync")
}
}
// Send acknowledgment
if err := s.sendLeafMetadata(expectedLeaves); err != nil {
return errors.Wrap(err, "receive raw shard sync")
}
s.logger.Info(
"CLIENT: raw shard sync completed",
zap.Uint64("leaves_received", expectedLeaves),
zap.Duration("duration", time.Since(start)),
)
return nil
}
func (s *streamManager) awaitRawLeafMetadata(
incomingLeaves <-chan *protobufs.HypergraphComparison,
) (uint64, error) {
s.logger.Debug("CLIENT: awaitRawLeafMetadata waiting...")
select {
case <-s.ctx.Done():
return 0, errors.Wrap(
errors.New("context canceled"),
"await raw leaf metadata",
)
case msg, ok := <-incomingLeaves:
if !ok {
s.logger.Error("CLIENT: incomingLeaves channel closed")
return 0, errors.Wrap(
errors.New("channel closed"),
"await raw leaf metadata",
)
}
meta := msg.GetMetadata()
if meta == nil {
s.logger.Error(
"CLIENT: received non-metadata message while waiting for metadata",
zap.String("payload_type", fmt.Sprintf("%T", msg.Payload)),
)
return 0, errors.Wrap(
errors.New("invalid message: expected metadata"),
"await raw leaf metadata",
)
}
s.logger.Debug(
"CLIENT: received metadata",
zap.Uint64("leaves", meta.Leaves),
)
return meta.Leaves, nil
case <-time.After(leafAckTimeout(1)):
s.logger.Error("CLIENT: timeout waiting for metadata")
return 0, errors.Wrap(
errors.New("timed out waiting for metadata"),
"await raw leaf metadata",
)
}
}
func (s *streamManager) awaitLeafData(
incomingLeaves <-chan *protobufs.HypergraphComparison,
) (*protobufs.LeafData, error) {
select {
case <-s.ctx.Done():
return nil, errors.Wrap(
errors.New("context canceled"),
"await leaf data",
)
case msg, ok := <-incomingLeaves:
if !ok {
return nil, errors.Wrap(
errors.New("channel closed"),
"await leaf data",
)
}
if leaf := msg.GetLeafData(); leaf != nil {
return leaf, nil
}
return nil, errors.Wrap(
errors.New("invalid message: expected leaf data"),
"await leaf data",
)
case <-time.After(leafAckTimeout(1)):
return nil, errors.Wrap(
errors.New("timed out waiting for leaf data"),
"await leaf data",
)
}
}
func (s *streamManager) sendLeafMetadata(leaves uint64) error {
s.logger.Debug("sending leaf metadata ack", zap.Uint64("leaves", leaves))
return s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: leaves},
},
})
}
// sendLeafData builds a LeafData message (with the full leaf data) for the
// node at the given path in the local tree and sends it over the stream.
func (s *streamManager) sendLeafData(
path []int32,
incomingLeaves <-chan *protobufs.HypergraphComparison,
) (err error) {
start := time.Now()
pathHex := hex.EncodeToString(packPath(path))
var count uint64
s.logger.Debug("send leaf data start", zap.String("path", pathHex))
defer func() {
s.logger.Debug(
"send leaf data finished",
zap.String("path", pathHex),
zap.Uint64("leaves_sent", count),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
)
}()
send := func(leaf *tries.LazyVectorCommitmentLeafNode) error {
update := &protobufs.LeafData{
Key: leaf.Key,
Value: leaf.Value,
HashTarget: leaf.HashTarget,
Size: leaf.Size.FillBytes(make([]byte, 32)),
}
data, err := s.getSerializedLeaf(leaf.Key)
if err != nil {
return errors.Wrap(err, "send leaf data")
}
if len(data) != 0 {
update.UnderlyingData = data
}
msg := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_LeafData{
LeafData: update,
},
}
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
}
err = s.stream.Send(msg)
if err != nil {
return errors.Wrap(err, "send leaf data")
}
s.lastSent = time.Now()
return nil
}
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
}
intPath := []int{}
for _, i := range path {
intPath = append(intPath, int(i))
}
node, err := s.localTree.GetByPath(intPath)
if err != nil {
s.logger.Error("could not get by path", zap.Error(err))
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: 0},
},
}); err != nil {
return err
}
return nil
}
if node == nil {
s.logger.Warn(
"SERVER: node is nil at path, sending 0 leaves",
zap.String("path", pathHex),
zap.Bool("tree_nil", s.localTree == nil),
zap.Bool("root_nil", s.localTree != nil && s.localTree.Root == nil),
)
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: 0},
},
}); err != nil {
return err
}
return nil
}
leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode)
count = uint64(0)
// Debug: log the node type
s.logger.Info(
"SERVER: node type at path",
zap.String("path", pathHex),
zap.Bool("is_leaf", ok),
zap.String("node_type", fmt.Sprintf("%T", node)),
)
if !ok {
children := tries.GetAllLeaves(
s.localTree.SetType,
s.localTree.PhaseType,
s.localTree.ShardKey,
node,
)
for _, child := range children {
if child == nil {
continue
}
count++
}
s.logger.Info(
"SERVER: sending metadata with leaf count (branch)",
zap.String("path", pathHex),
zap.Uint64("leaf_count", count),
zap.Int("children_slice_len", len(children)),
)
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
},
}); err != nil {
return err
}
for _, child := range children {
if child == nil {
continue
}
if err := send(child); err != nil {
return err
}
}
} else {
count = 1
s.logger.Info(
"SERVER: root is a single leaf, sending 1 leaf",
zap.String("path", pathHex),
zap.String("leaf_key", hex.EncodeToString(leaf.Key)),
)
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
},
}); err != nil {
return err
}
if err := send(leaf); err != nil {
return err
}
}
timeoutTimer := time.NewTimer(leafAckTimeout(count))
defer timeoutTimer.Stop()
select {
case <-s.ctx.Done():
return errors.Wrap(
errors.New("context canceled"),
"send leaf data",
)
case msg, ok := <-incomingLeaves:
if !ok {
return errors.Wrap(
errors.New("channel closed"),
"send leaf data",
)
}
switch msg.Payload.(type) {
case *protobufs.HypergraphComparison_Metadata:
expectedLeaves := msg.GetMetadata().Leaves
if expectedLeaves != count {
return errors.Wrap(
errors.New("did not match"),
"send leaf data",
)
}
return nil
}
return errors.Wrap(
errors.New("invalid message"),
"send leaf data",
)
case <-timeoutTimer.C:
return errors.Wrap(
errors.New("timed out"),
"send leaf data",
)
}
}
func (s *streamManager) getSerializedLeaf(key []byte) ([]byte, error) {
if s.snapshot != nil {
if data, ok := s.snapshot.getLeafData(key); ok {
return data, nil
}
if s.snapshot.isLeafMiss(key) {
return nil, nil
}
}
type rawVertexLoader interface {
LoadVertexTreeRaw(id []byte) ([]byte, error)
}
if loader, ok := s.hypergraphStore.(rawVertexLoader); ok {
data, err := loader.LoadVertexTreeRaw(key)
if err != nil {
if s.snapshot != nil {
s.snapshot.markLeafMiss(key)
}
return nil, nil
}
if len(data) != 0 {
if s.snapshot != nil {
s.snapshot.storeLeafData(key, data)
}
return data, nil
}
}
tree, err := s.hypergraphStore.LoadVertexTree(key)
if err != nil {
if s.snapshot != nil {
s.snapshot.markLeafMiss(key)
}
return nil, nil
}
data, err := tries.SerializeNonLazyTree(tree)
if err != nil {
return nil, err
}
if s.snapshot != nil {
s.snapshot.storeLeafData(key, data)
}
return data, nil
}
func (s *streamManager) getBranchInfo(
path []int32,
) (*protobufs.HypergraphComparisonResponse, error) {
if s.snapshot != nil {
if resp, ok := s.snapshot.getBranchInfo(path); ok {
return resp, nil
}
}
resp, err := getBranchInfoFromTree(s.logger, s.localTree, path)
if err != nil {
return nil, err
}
if s.snapshot != nil {
s.snapshot.storeBranchInfo(path, resp)
}
return resp, nil
}
// getNodeAtPath traverses the tree along the provided nibble path. It returns
// the node found (or nil if not found). The depth argument is used for internal
// recursion.
func getNodeAtPath(
logger *zap.Logger,
setType string,
phaseType string,
shardKey tries.ShardKey,
node tries.LazyVectorCommitmentNode,
path []int32,
depth int,
) tries.LazyVectorCommitmentNode {
if node == nil {
return nil
}
if len(path) == 0 {
return node
}
switch n := node.(type) {
case *tries.LazyVectorCommitmentLeafNode:
return node
case *tries.LazyVectorCommitmentBranchNode:
// Check that the branch's prefix matches the beginning of the query path.
if len(path) < len(n.Prefix) {
return nil
}
for i, nib := range n.Prefix {
if int32(nib) != path[i] {
return nil
}
}
// Remove the prefix portion from the path.
remainder := path[len(n.Prefix):]
if len(remainder) == 0 {
return node
}
// The first element of the remainder selects the child.
childIndex := remainder[0]
if int(childIndex) < 0 || int(childIndex) >= len(n.Children) {
return nil
}
child, err := n.Store.GetNodeByPath(
setType,
phaseType,
shardKey,
slices.Concat(n.FullPrefix, []int{int(childIndex)}),
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
logger.Panic("failed to get node by path", zap.Error(err))
}
if child == nil {
return nil
}
return getNodeAtPath(
logger,
setType,
phaseType,
shardKey,
child,
remainder[1:],
depth+len(n.Prefix)+1,
)
}
return nil
}
// getBranchInfoFromTree looks up the node at the given path in the local tree,
// computes its commitment, and (if it is a branch) collects its immediate
// children's commitments.
func getBranchInfoFromTree(
logger *zap.Logger,
tree *tries.LazyVectorCommitmentTree,
path []int32,
) (
*protobufs.HypergraphComparisonResponse,
error,
) {
node := getNodeAtPath(
logger,
tree.SetType,
tree.PhaseType,
tree.ShardKey,
tree.Root,
path,
0,
)
if node == nil {
return &protobufs.HypergraphComparisonResponse{
Path: path,
Commitment: []byte{},
IsRoot: len(path) == 0,
}, nil
}
intpath := []int{}
for _, p := range path {
intpath = append(intpath, int(p))
}
node = ensureCommittedNode(logger, tree, intpath, node)
branchInfo := &protobufs.HypergraphComparisonResponse{
Path: path,
IsRoot: len(path) == 0,
}
if branch, ok := node.(*tries.LazyVectorCommitmentBranchNode); ok {
branchInfo.Commitment = branch.Commitment
if len(branch.Commitment) == 0 {
return nil, errors.New("invalid commitment")
}
for _, p := range branch.Prefix {
branchInfo.Path = append(branchInfo.Path, int32(p))
}
for i := 0; i < len(branch.Children); i++ {
child := branch.Children[i]
if child == nil {
var err error
child, err = branch.Store.GetNodeByPath(
tree.SetType,
tree.PhaseType,
tree.ShardKey,
slices.Concat(branch.FullPrefix, []int{i}),
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
logger.Panic("failed to get node by path", zap.Error(err))
}
}
childPath := slices.Concat(branch.FullPrefix, []int{i})
child = ensureCommittedNode(logger, tree, childPath, child)
if child != nil {
var childCommit []byte
if childB, ok := child.(*tries.LazyVectorCommitmentBranchNode); ok {
childCommit = childB.Commitment
} else if childL, ok := child.(*tries.LazyVectorCommitmentLeafNode); ok {
childCommit = childL.Commitment
}
if len(childCommit) == 0 {
return nil, errors.New("invalid commitment")
}
branchInfo.Children = append(
branchInfo.Children,
&protobufs.BranchChild{
Index: int32(i),
Commitment: childCommit,
},
)
}
}
} else if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok {
branchInfo.Commitment = leaf.Commitment
if len(branchInfo.Commitment) == 0 {
return nil, errors.New("invalid commitment")
}
}
return branchInfo, nil
}
func ensureCommittedNode(
logger *zap.Logger,
tree *tries.LazyVectorCommitmentTree,
path []int,
node tries.LazyVectorCommitmentNode,
) tries.LazyVectorCommitmentNode {
if node == nil {
return nil
}
hasCommit := func(commitment []byte) bool {
return len(commitment) != 0
}
switch n := node.(type) {
case *tries.LazyVectorCommitmentBranchNode:
if hasCommit(n.Commitment) {
return node
}
case *tries.LazyVectorCommitmentLeafNode:
if hasCommit(n.Commitment) {
return node
}
default:
return node
}
reloaded, err := tree.Store.GetNodeByPath(
tree.SetType,
tree.PhaseType,
tree.ShardKey,
path,
)
if err != nil && !strings.Contains(err.Error(), "item not found") {
logger.Panic("failed to reload node by path", zap.Error(err))
}
if reloaded != nil {
return reloaded
}
return node
}
// isLeaf infers whether a HypergraphComparisonResponse message represents a
// leaf node.
func isLeaf(info *protobufs.HypergraphComparisonResponse) bool {
return len(info.Children) == 0
}
func (s *streamManager) queryNext(
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
path []int32,
) (
resp *protobufs.HypergraphComparisonResponse,
err error,
) {
start := time.Now()
pathHex := hex.EncodeToString(packPath(path))
s.logger.Debug("query next start", zap.String("path", pathHex))
defer func() {
s.logger.Debug(
"query next finished",
zap.String("path", pathHex),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
)
}()
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Query{
Query: &protobufs.HypergraphComparisonQuery{
Path: path,
IncludeLeafData: true,
},
},
}); err != nil {
return nil, err
}
select {
case <-s.ctx.Done():
return nil, errors.Wrap(
errors.New("context canceled"),
"handle query",
)
case r, ok := <-incomingResponses:
if !ok {
return nil, errors.Wrap(
errors.New("channel closed"),
"handle query",
)
}
resp = r
return resp, nil
case <-time.After(30 * time.Second):
return nil, errors.Wrap(
errors.New("timed out"),
"handle query",
)
}
}
func (s *streamManager) handleLeafData(
incomingLeaves <-chan *protobufs.HypergraphComparison,
) (err error) {
start := time.Now()
var expectedLeaves uint64
s.logger.Debug("handle leaf data start")
defer func() {
s.logger.Debug(
"handle leaf data finished",
zap.Uint64("leaves_expected", expectedLeaves),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
)
}()
select {
case <-s.ctx.Done():
return errors.Wrap(
errors.New("context canceled"),
"handle leaf data",
)
case msg, ok := <-incomingLeaves:
if !ok {
return errors.Wrap(
errors.New("channel closed"),
"handle leaf data",
)
}
switch msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
return errors.Wrap(
errors.New("invalid message"),
"handle leaf data",
)
case *protobufs.HypergraphComparison_Metadata:
expectedLeaves = msg.GetMetadata().Leaves
}
case <-time.After(30 * time.Second):
return errors.Wrap(
errors.New("timed out"),
"handle leaf data",
)
}
// s.logger.Info("expecting leaves", zap.Uint64("count", expectedLeaves))
var txn tries.TreeBackingStoreTransaction
for i := uint64(0); i < expectedLeaves; i++ {
if i%100 == 0 {
if txn != nil {
if err := txn.Commit(); err != nil {
return errors.Wrap(
err,
"handle leaf data",
)
}
}
txn, err = s.hypergraphStore.NewTransaction(false)
if err != nil {
return errors.Wrap(
err,
"handle leaf data",
)
}
}
select {
case <-s.ctx.Done():
return errors.Wrap(
errors.New("context canceled"),
"handle leaf data",
)
case msg, ok := <-incomingLeaves:
if !ok {
return errors.Wrap(
errors.New("channel closed"),
"handle leaf data",
)
}
var remoteUpdate *protobufs.LeafData
switch msg.Payload.(type) {
case *protobufs.HypergraphComparison_Metadata:
return errors.Wrap(
errors.New("invalid message"),
"handle leaf data",
)
case *protobufs.HypergraphComparison_LeafData:
remoteUpdate = msg.GetLeafData()
}
// s.logger.Info(
// "received leaf data",
// zap.String("key", hex.EncodeToString(remoteUpdate.Key)),
// )
theirs := AtomFromBytes(remoteUpdate.Value)
if err := s.persistLeafTree(txn, remoteUpdate); err != nil {
txn.Abort()
return err
}
err := s.localSet.Add(txn, theirs)
if err != nil {
s.logger.Error("error while saving", zap.Error(err))
return errors.Wrap(
err,
"handle leaf data",
)
}
case <-time.After(30 * time.Second):
return errors.Wrap(
errors.New("timed out"),
"handle leaf data",
)
}
}
if txn != nil {
if err := txn.Commit(); err != nil {
return errors.Wrap(
err,
"handle leaf data",
)
}
}
if err := s.stream.Send(&protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Metadata{
Metadata: &protobufs.HypersyncMetadata{Leaves: expectedLeaves},
},
}); err != nil {
return err
}
return nil
}
func (s *streamManager) persistLeafTree(
txn tries.TreeBackingStoreTransaction,
update *protobufs.LeafData,
) error {
if len(update.UnderlyingData) == 0 {
return nil
}
needsValidation := s.requiresTreeValidation()
_, canSaveRaw := s.hypergraphStore.(rawVertexSaver)
var tree *tries.VectorCommitmentTree
var err error
if needsValidation || !canSaveRaw {
tree, err = tries.DeserializeNonLazyTree(update.UnderlyingData)
if err != nil {
s.logger.Error("server returned invalid tree", zap.Error(err))
return err
}
}
if needsValidation {
if err := s.localSet.ValidateTree(
update.Key,
update.Value,
tree,
); err != nil {
s.logger.Error("server returned invalid tree", zap.Error(err))
return err
}
}
if saver, ok := s.hypergraphStore.(rawVertexSaver); ok {
buf := make([]byte, len(update.UnderlyingData))
copy(buf, update.UnderlyingData)
return saver.SaveVertexTreeRaw(txn, update.Key, buf)
}
return s.hypergraphStore.SaveVertexTree(txn, update.Key, tree)
}
func (s *streamManager) requiresTreeValidation() bool {
if typed, ok := s.localSet.(*idSet); ok {
return typed.validator != nil
}
return false
}
func (s *streamManager) handleQueryNext(
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
path []int32,
) (
branch *protobufs.HypergraphComparisonResponse,
err error,
) {
start := time.Now()
pathHex := hex.EncodeToString(packPath(path))
s.logger.Debug("handle query next start", zap.String("path", pathHex))
defer func() {
s.logger.Debug(
"handle query next finished",
zap.String("path", pathHex),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
)
}()
select {
case <-s.ctx.Done():
return nil, errors.Wrap(
errors.New("context canceled"),
"handle query next",
)
case query, ok := <-incomingQueries:
if !ok {
return nil, errors.Wrap(
errors.New("channel closed"),
"handle query next",
)
}
if slices.Compare(query.Path, path) != 0 {
return nil, errors.Wrap(
errors.New("invalid query received"),
"handle query next",
)
}
branchInfo, berr := s.getBranchInfo(path)
if berr != nil {
return nil, errors.Wrap(berr, "handle query next")
}
resp := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Response{
Response: branchInfo,
},
}
if err := s.stream.Send(resp); err != nil {
return nil, errors.Wrap(err, "handle query next")
}
branch = branchInfo
return branch, nil
case <-time.After(30 * time.Second):
return nil, errors.Wrap(
errors.New("timed out"),
"handle query next",
)
}
}
func (s *streamManager) descendIndex(
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
path []int32,
) (
local *protobufs.HypergraphComparisonResponse,
remote *protobufs.HypergraphComparisonResponse,
err error,
) {
start := time.Now()
pathHex := hex.EncodeToString(packPath(path))
s.logger.Debug("descend index start", zap.String("path", pathHex))
defer func() {
s.logger.Debug(
"descend index finished",
zap.String("path", pathHex),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
)
}()
branchInfo, err := s.getBranchInfo(path)
if err != nil {
return nil, nil, errors.Wrap(err, "descend index")
}
resp := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Response{
Response: branchInfo,
},
}
if err := s.stream.Send(resp); err != nil {
return nil, nil, errors.Wrap(err, "descend index")
}
select {
case <-s.ctx.Done():
return nil, nil, errors.Wrap(
errors.New("context canceled"),
"handle query next",
)
case r, ok := <-incomingResponses:
if !ok {
return nil, nil, errors.Wrap(
errors.New("channel closed"),
"descend index",
)
}
if slices.Compare(branchInfo.Path, r.Path) != 0 {
return nil, nil, errors.Wrap(
fmt.Errorf(
"invalid path received: %v, expected: %v",
r.Path,
branchInfo.Path,
),
"descend index",
)
}
local = branchInfo
remote = r
return local, remote, nil
case <-time.After(30 * time.Second):
return nil, nil, errors.Wrap(
errors.New("timed out"),
"descend index",
)
}
}
func packPath(path []int32) []byte {
b := []byte{}
for _, p := range path {
b = append(b, byte(p))
}
return b
}
func (s *streamManager) walk(
path []int32,
lnode, rnode *protobufs.HypergraphComparisonResponse,
incomingLeaves <-chan *protobufs.HypergraphComparison,
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
init bool,
isServer bool,
shardKey tries.ShardKey,
phaseSet protobufs.HypergraphPhaseSet,
) error {
// Check if we should use raw sync mode for this phase set
if init && shouldUseRawSync(phaseSet) {
s.logger.Info(
"walk: using raw sync mode",
zap.Bool("is_server", isServer),
zap.Int("phase_set", int(phaseSet)),
)
if isServer {
return s.rawShardSync(shardKey, phaseSet, incomingLeaves)
}
return s.receiveRawShardSync(incomingLeaves)
}
select {
case <-s.ctx.Done():
return s.ctx.Err()
default:
}
// pathString := zap.String("path", hex.EncodeToString(packPath(path)))
if bytes.Equal(lnode.Commitment, rnode.Commitment) {
// s.logger.Debug(
// "commitments match",
// pathString,
// zap.String("commitment", hex.EncodeToString(lnode.Commitment)),
// )
return nil
}
if isLeaf(lnode) && isLeaf(rnode) && !init {
return nil
}
if isLeaf(rnode) || isLeaf(lnode) {
// s.logger.Debug("leaf/branch mismatch at path", pathString)
if isServer {
err := s.sendLeafData(
path,
incomingLeaves,
)
return errors.Wrap(err, "walk")
} else {
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
lpref := lnode.Path
rpref := rnode.Path
if len(lpref) != len(rpref) {
// s.logger.Debug(
// "prefix length mismatch",
// zap.Int("local_prefix", len(lpref)),
// zap.Int("remote_prefix", len(rpref)),
// pathString,
// )
if len(lpref) > len(rpref) {
// s.logger.Debug("local prefix longer, traversing remote to path", pathString)
traverse := lpref[len(rpref)-1:]
rtrav := rnode
traversePath := append([]int32{}, rpref...)
for _, nibble := range traverse {
// s.logger.Debug("attempting remote traversal step")
for _, child := range rtrav.Children {
if child.Index == nibble {
// s.logger.Debug("sending query")
traversePath = append(traversePath, child.Index)
var err error
rtrav, err = s.queryNext(
incomingResponses,
traversePath,
)
if err != nil {
s.logger.Error("query failed", zap.Error(err))
return errors.Wrap(err, "walk")
}
break
}
}
if rtrav == nil {
// s.logger.Debug("traversal could not reach path")
if isServer {
err := s.sendLeafData(
lpref,
incomingLeaves,
)
return errors.Wrap(err, "walk")
} else {
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
}
// s.logger.Debug("traversal completed, performing walk", pathString)
return s.walk(
path,
lnode,
rtrav,
incomingLeaves,
incomingQueries,
incomingResponses,
false,
isServer,
shardKey,
phaseSet,
)
} else {
// s.logger.Debug("remote prefix longer, traversing local to path", pathString)
traverse := rpref[len(lpref)-1:]
ltrav := lnode
traversedPath := append([]int32{}, lnode.Path...)
for _, nibble := range traverse {
// s.logger.Debug("attempting local traversal step")
preTraversal := append([]int32{}, traversedPath...)
for _, child := range ltrav.Children {
if child.Index == nibble {
traversedPath = append(traversedPath, nibble)
var err error
// s.logger.Debug("expecting query")
ltrav, err = s.handleQueryNext(
incomingQueries,
traversedPath,
)
if err != nil {
s.logger.Error("expect failed", zap.Error(err))
return errors.Wrap(err, "walk")
}
if ltrav == nil {
// s.logger.Debug("traversal could not reach path")
if isServer {
err := s.sendLeafData(
preTraversal,
incomingLeaves,
)
return errors.Wrap(err, "walk")
} else {
err := s.handleLeafData(incomingLeaves)
return errors.Wrap(err, "walk")
}
}
} else {
// s.logger.Debug(
// "known missing branch",
// zap.String(
// "path",
// hex.EncodeToString(
// packPath(
// append(append([]int32{}, preTraversal...), child.Index),
// ),
// ),
// ),
// )
if isServer {
if err := s.sendLeafData(
append(append([]int32{}, preTraversal...), child.Index),
incomingLeaves,
); err != nil {
return errors.Wrap(err, "walk")
}
} else {
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
}
}
}
}
// s.logger.Debug("traversal completed, performing walk", pathString)
return s.walk(
path,
ltrav,
rnode,
incomingLeaves,
incomingQueries,
incomingResponses,
false,
isServer,
shardKey,
phaseSet,
)
}
} else {
if slices.Compare(lpref, rpref) == 0 {
// s.logger.Debug("prefixes match, diffing children")
for i := int32(0); i < 64; i++ {
// s.logger.Debug("checking branch", zap.Int32("branch", i))
var lchild *protobufs.BranchChild = nil
for _, lc := range lnode.Children {
if lc.Index == i {
// s.logger.Debug("local instance found", zap.Int32("branch", i))
lchild = lc
break
}
}
var rchild *protobufs.BranchChild = nil
for _, rc := range rnode.Children {
if rc.Index == i {
// s.logger.Debug("remote instance found", zap.Int32("branch", i))
rchild = rc
break
}
}
if (lchild != nil && rchild == nil) ||
(lchild == nil && rchild != nil) {
// s.logger.Info("branch divergence", pathString)
if lchild != nil {
nextPath := append(
append([]int32{}, lpref...),
lchild.Index,
)
if isServer {
if err := s.sendLeafData(
nextPath,
incomingLeaves,
); err != nil {
return errors.Wrap(err, "walk")
}
}
}
if rchild != nil {
if !isServer {
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
}
}
} else {
if lchild != nil {
nextPath := append(
append([]int32{}, lpref...),
lchild.Index,
)
lc, rc, err := s.descendIndex(
incomingResponses,
nextPath,
)
if err != nil {
// s.logger.Debug("incomplete branch descension", zap.Error(err))
if isServer {
if err := s.sendLeafData(
nextPath,
incomingLeaves,
); err != nil {
return errors.Wrap(err, "walk")
}
} else {
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
}
continue
}
if err = s.walk(
nextPath,
lc,
rc,
incomingLeaves,
incomingQueries,
incomingResponses,
false,
isServer,
shardKey,
phaseSet,
); err != nil {
return errors.Wrap(err, "walk")
}
}
}
}
} else {
// s.logger.Debug("prefix mismatch on both sides", pathString)
if isServer {
if err := s.sendLeafData(
path,
incomingLeaves,
); err != nil {
return errors.Wrap(err, "walk")
}
} else {
err := s.handleLeafData(incomingLeaves)
if err != nil {
return errors.Wrap(err, "walk")
}
}
}
}
return nil
}
// syncTreeServer implements the diff and sync logic on the
// server side. It sends the local root info, then processes incoming messages,
// and queues further queries as differences are detected.
func (hg *HypergraphCRDT) syncTreeServer(
ctx context.Context,
stream protobufs.HypergraphComparisonService_HyperStreamServer,
snapshotStore tries.TreeBackingStore,
snapshotRoot []byte,
sessionLogger *zap.Logger,
handle *snapshotHandle,
) error {
logger := sessionLogger
if logger == nil {
logger = hg.logger
}
if len(snapshotRoot) != 0 {
logger.Info(
"syncing with snapshot",
zap.String("root", hex.EncodeToString(snapshotRoot)),
)
} else {
logger.Info("syncing with snapshot", zap.String("root", ""))
}
msg, err := stream.Recv()
if err != nil {
return err
}
query := msg.GetQuery()
if query == nil {
return errors.New("client did not send valid initialization message")
}
logger.Info("received initialization message")
if len(query.ShardKey) != 35 {
return errors.New("invalid shard key")
}
shardKey := tries.ShardKey{
L1: [3]byte(query.ShardKey[:3]),
L2: [32]byte(query.ShardKey[3:]),
}
idSet := hg.snapshotPhaseSet(shardKey, query.PhaseSet, snapshotStore)
if idSet == nil {
return errors.New("unsupported phase set")
}
branchInfo, err := getBranchInfoFromTree(
logger,
idSet.GetTree(),
query.Path,
)
if err != nil {
return err
}
// hg.logger.Debug(
// "returning branch info",
// zap.String("commitment", hex.EncodeToString(branchInfo.Commitment)),
// zap.Int("children", len(branchInfo.Children)),
// zap.Int("path", len(branchInfo.Path)),
// )
resp := &protobufs.HypergraphComparison{
Payload: &protobufs.HypergraphComparison_Response{
Response: branchInfo,
},
}
if err := stream.Send(resp); err != nil {
return err
}
msg, err = stream.Recv()
if err != nil {
return err
}
response := msg.GetResponse()
if response == nil {
return errors.New(
"client did not send valid initialization response message",
)
}
ctx, cancel := context.WithCancel(ctx)
incomingQueriesIn, incomingQueriesOut :=
UnboundedChan[*protobufs.HypergraphComparisonQuery](
cancel,
"server incoming",
)
incomingResponsesIn, incomingResponsesOut :=
UnboundedChan[*protobufs.HypergraphComparisonResponse](
cancel,
"server incoming",
)
incomingLeavesIn, incomingLeavesOut :=
UnboundedChan[*protobufs.HypergraphComparison](
cancel,
"server incoming",
)
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
logger.Info("server stream recv eof")
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
return
}
if err != nil {
logger.Info("server stream recv error", zap.Error(err))
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
return
}
if msg == nil {
continue
}
switch m := msg.Payload.(type) {
case *protobufs.HypergraphComparison_LeafData:
logger.Warn("received leaf from client, terminating")
cancel()
close(incomingQueriesIn)
close(incomingResponsesIn)
close(incomingLeavesIn)
return
case *protobufs.HypergraphComparison_Metadata:
incomingLeavesIn <- msg
case *protobufs.HypergraphComparison_Query:
incomingQueriesIn <- m.Query
case *protobufs.HypergraphComparison_Response:
incomingResponsesIn <- m.Response
}
}
}()
manager := &streamManager{
ctx: ctx,
cancel: cancel,
logger: logger,
stream: stream,
hypergraphStore: snapshotStore,
localTree: idSet.GetTree(),
localSet: idSet,
lastSent: time.Now(),
snapshot: handle,
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := manager.walk(
branchInfo.Path,
branchInfo,
response,
incomingLeavesOut,
incomingQueriesOut,
incomingResponsesOut,
true,
true,
shardKey,
query.PhaseSet,
)
if err != nil {
logger.Error("error while syncing", zap.Error(err))
}
}()
wg.Wait()
return nil
}
func UnboundedChan[T any](
cancel context.CancelFunc,
purpose string,
) (chan<- T, <-chan T) {
in := make(chan T)
out := make(chan T)
go func() {
var queue []T
for {
var active chan T
var next T
if len(queue) > 0 {
active = out
next = queue[0]
}
select {
case msg, ok := <-in:
if !ok {
cancel()
close(out)
return
}
queue = append(queue, msg)
case active <- next:
queue = queue[1:]
}
}
}()
return in, out
}
func peerIPFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}
p, ok := peer.FromContext(ctx)
if !ok || p.Addr == nil {
return ""
}
host, _, err := net.SplitHostPort(p.Addr.String())
if err != nil {
return p.Addr.String()
}
return host
}