mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
2563 lines
59 KiB
Go
2563 lines
59 KiB
Go
package hypergraph
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc/peer"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/tries"
|
|
)
|
|
|
|
// HyperStream is the gRPC method that handles synchronization.
|
|
func (hg *HypergraphCRDT) HyperStream(
|
|
stream protobufs.HypergraphComparisonService_HyperStreamServer,
|
|
) (err error) {
|
|
requestCtx := stream.Context()
|
|
ctx, shutdownCancel := hg.contextWithShutdown(requestCtx)
|
|
defer shutdownCancel()
|
|
|
|
sessionLogger := hg.logger
|
|
sessionStart := time.Now()
|
|
defer func() {
|
|
sessionLogger.Info(
|
|
"hyperstream session finished",
|
|
zap.Duration("session_duration", time.Since(sessionStart)),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
|
|
identifyStart := time.Now()
|
|
peerId, err := hg.authenticationProvider.Identify(requestCtx)
|
|
if err != nil {
|
|
return errors.Wrap(err, "hyper stream")
|
|
}
|
|
sessionLogger = sessionLogger.With(zap.String("peer_id", peerId.String()))
|
|
sessionLogger.Debug(
|
|
"identified peer",
|
|
zap.Duration("duration", time.Since(identifyStart)),
|
|
)
|
|
|
|
peerKey := peerId.String()
|
|
if addr := peerIPFromContext(requestCtx); addr != "" {
|
|
sessionLogger = sessionLogger.With(zap.String("peer_ip", addr))
|
|
}
|
|
if !hg.syncController.TryEstablishSyncSession(peerKey) {
|
|
return errors.New("peer already syncing")
|
|
}
|
|
defer func() {
|
|
hg.syncController.EndSyncSession(peerKey)
|
|
}()
|
|
|
|
syncStart := time.Now()
|
|
err = hg.syncTreeServer(ctx, stream, sessionLogger)
|
|
sessionLogger.Info(
|
|
"syncTreeServer completed",
|
|
zap.Duration("sync_duration", time.Since(syncStart)),
|
|
zap.Error(err),
|
|
)
|
|
|
|
hg.syncController.SetStatus(peerKey, &hypergraph.SyncInfo{
|
|
Unreachable: false,
|
|
LastSynced: time.Now(),
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// Sync performs the tree diff and synchronization from the client side.
|
|
// The caller (e.g. the client) must initiate the diff from its root.
|
|
// After that, both sides exchange queries, branch info, and leaf updates until
|
|
// their local trees are synchronized.
|
|
func (hg *HypergraphCRDT) Sync(
|
|
stream protobufs.HypergraphComparisonService_HyperStreamClient,
|
|
shardKey tries.ShardKey,
|
|
phaseSet protobufs.HypergraphPhaseSet,
|
|
) (err error) {
|
|
const localSyncKey = "local-sync"
|
|
if !hg.syncController.TryEstablishSyncSession(localSyncKey) {
|
|
return errors.New("local sync already in progress")
|
|
}
|
|
defer func() {
|
|
hg.syncController.EndSyncSession(localSyncKey)
|
|
}()
|
|
|
|
hg.mu.Lock()
|
|
defer hg.mu.Unlock()
|
|
|
|
syncStart := time.Now()
|
|
shardKeyHex := hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:]))
|
|
hg.logger.Info(
|
|
"sync started",
|
|
zap.String("shard_key", shardKeyHex),
|
|
zap.Int("phase_set", int(phaseSet)),
|
|
)
|
|
defer func() {
|
|
hg.logger.Info(
|
|
"sync finished",
|
|
zap.String("shard_key", shardKeyHex),
|
|
zap.Duration("duration", time.Since(syncStart)),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
|
|
var set hypergraph.IdSet
|
|
switch phaseSet {
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
|
|
set = hg.getVertexAddsSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
|
|
set = hg.getVertexRemovesSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
|
|
set = hg.getHyperedgeAddsSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
|
|
set = hg.getHyperedgeRemovesSet(shardKey)
|
|
default:
|
|
return errors.New("unsupported phase set")
|
|
}
|
|
|
|
path := hg.getCoveredPrefix()
|
|
|
|
// Send initial query for path
|
|
sendStart := time.Now()
|
|
if err := stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Query{
|
|
Query: &protobufs.HypergraphComparisonQuery{
|
|
ShardKey: slices.Concat(shardKey.L1[:], shardKey.L2[:]),
|
|
PhaseSet: phaseSet,
|
|
Path: toInt32Slice(path),
|
|
Commitment: set.GetTree().Commit(false),
|
|
IncludeLeafData: false,
|
|
},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
hg.logger.Debug(
|
|
"sent initialization message",
|
|
zap.String("shard_key", shardKeyHex),
|
|
zap.Duration("duration", time.Since(sendStart)),
|
|
)
|
|
|
|
// hg.logger.Debug("server waiting for initial query")
|
|
recvStart := time.Now()
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
hg.logger.Info("initial recv failed", zap.Error(err))
|
|
return err
|
|
}
|
|
hg.logger.Debug(
|
|
"received initialization response",
|
|
zap.String("shard_key", shardKeyHex),
|
|
zap.Duration("duration", time.Since(recvStart)),
|
|
)
|
|
response := msg.GetResponse()
|
|
if response == nil {
|
|
return errors.New(
|
|
"server did not send valid initialization response message",
|
|
)
|
|
}
|
|
|
|
branchInfoStart := time.Now()
|
|
branchInfo, err := getBranchInfoFromTree(
|
|
hg.logger,
|
|
set.GetTree(),
|
|
toInt32Slice(path),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
hg.logger.Debug(
|
|
"constructed branch info",
|
|
zap.String("shard_key", shardKeyHex),
|
|
zap.Duration("duration", time.Since(branchInfoStart)),
|
|
)
|
|
|
|
resp := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Response{
|
|
Response: branchInfo,
|
|
},
|
|
}
|
|
|
|
responseSendStart := time.Now()
|
|
if err := stream.Send(resp); err != nil {
|
|
return err
|
|
}
|
|
hg.logger.Debug(
|
|
"sent initial branch info",
|
|
zap.String("shard_key", shardKeyHex),
|
|
zap.Duration("duration", time.Since(responseSendStart)),
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(stream.Context())
|
|
|
|
incomingQueriesIn, incomingQueriesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparisonQuery](
|
|
cancel,
|
|
"client incoming",
|
|
)
|
|
incomingResponsesIn, incomingResponsesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparisonResponse](
|
|
cancel,
|
|
"client incoming",
|
|
)
|
|
incomingLeavesIn, incomingLeavesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparison](
|
|
cancel,
|
|
"client incoming",
|
|
)
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err == io.EOF {
|
|
// hg.logger.Debug("stream closed by sender")
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
close(incomingLeavesIn)
|
|
return
|
|
}
|
|
if err != nil {
|
|
// hg.logger.Debug("error from stream", zap.Error(err))
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
close(incomingLeavesIn)
|
|
return
|
|
}
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
switch m := msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_LeafData:
|
|
incomingLeavesIn <- msg
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
incomingLeavesIn <- msg
|
|
case *protobufs.HypergraphComparison_Query:
|
|
incomingQueriesIn <- m.Query
|
|
case *protobufs.HypergraphComparison_Response:
|
|
incomingResponsesIn <- m.Response
|
|
}
|
|
}
|
|
}()
|
|
|
|
manager := &streamManager{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: hg.logger,
|
|
stream: stream,
|
|
hypergraphStore: hg.store,
|
|
localTree: set.GetTree(),
|
|
localSet: set,
|
|
lastSent: time.Now(),
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := manager.walk(
|
|
branchInfo.Path,
|
|
branchInfo,
|
|
response,
|
|
incomingLeavesOut,
|
|
incomingQueriesOut,
|
|
incomingResponsesOut,
|
|
true,
|
|
false,
|
|
shardKey,
|
|
phaseSet,
|
|
)
|
|
if err != nil {
|
|
hg.logger.Debug("error while syncing", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
hg.logger.Info(
|
|
"hypergraph root commit",
|
|
zap.String("root", hex.EncodeToString(set.GetTree().Commit(false))),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (hg *HypergraphCRDT) GetChildrenForPath(
|
|
ctx context.Context,
|
|
request *protobufs.GetChildrenForPathRequest,
|
|
) (*protobufs.GetChildrenForPathResponse, error) {
|
|
if len(request.ShardKey) != 35 {
|
|
return nil, errors.New("invalid shard key")
|
|
}
|
|
|
|
shardKey := tries.ShardKey{
|
|
L1: [3]byte(request.ShardKey[:3]),
|
|
L2: [32]byte(request.ShardKey[3:]),
|
|
}
|
|
|
|
var set hypergraph.IdSet
|
|
switch request.PhaseSet {
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
|
|
set = hg.getVertexAddsSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
|
|
set = hg.getVertexRemovesSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
|
|
set = hg.getHyperedgeAddsSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
|
|
set = hg.getHyperedgeRemovesSet(shardKey)
|
|
}
|
|
|
|
path := []int{}
|
|
for _, p := range request.Path {
|
|
path = append(path, int(p))
|
|
}
|
|
|
|
segments, err := getSegments(
|
|
set.GetTree(),
|
|
path,
|
|
)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get children for path")
|
|
}
|
|
|
|
response := &protobufs.GetChildrenForPathResponse{
|
|
PathSegments: []*protobufs.TreePathSegments{},
|
|
}
|
|
|
|
for _, segment := range segments {
|
|
pathSegments := &protobufs.TreePathSegments{}
|
|
for i, seg := range segment {
|
|
if seg == nil {
|
|
continue
|
|
}
|
|
switch t := seg.(type) {
|
|
case *tries.LazyVectorCommitmentBranchNode:
|
|
pathSegments.Segments = append(
|
|
pathSegments.Segments,
|
|
&protobufs.TreePathSegment{
|
|
Index: uint32(i),
|
|
Segment: &protobufs.TreePathSegment_Branch{
|
|
Branch: &protobufs.TreePathBranch{
|
|
Prefix: toUint32Slice(t.Prefix),
|
|
Commitment: t.Commitment,
|
|
Size: t.Size.Bytes(),
|
|
LeafCount: uint64(t.LeafCount),
|
|
LongestBranch: uint32(t.LongestBranch),
|
|
FullPrefix: toUint32Slice(t.FullPrefix),
|
|
},
|
|
},
|
|
},
|
|
)
|
|
case *tries.LazyVectorCommitmentLeafNode:
|
|
var data []byte
|
|
tree, err := hg.store.LoadVertexTree(t.Key)
|
|
if err == nil {
|
|
data, err = tries.SerializeNonLazyTree(tree)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get children for path")
|
|
}
|
|
}
|
|
pathSegments.Segments = append(
|
|
pathSegments.Segments,
|
|
&protobufs.TreePathSegment{
|
|
Index: uint32(i),
|
|
Segment: &protobufs.TreePathSegment_Leaf{
|
|
Leaf: &protobufs.TreePathLeaf{
|
|
Key: t.Key,
|
|
Value: data,
|
|
HashTarget: t.HashTarget,
|
|
Commitment: t.Commitment,
|
|
Size: t.Size.Bytes(),
|
|
},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
response.PathSegments = append(response.PathSegments, pathSegments)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
func toUint32Slice(s []int) []uint32 {
|
|
o := []uint32{}
|
|
for _, p := range s {
|
|
o = append(o, uint32(p))
|
|
}
|
|
return o
|
|
}
|
|
|
|
func toInt32Slice(s []int) []int32 {
|
|
o := []int32{}
|
|
for _, p := range s {
|
|
o = append(o, int32(p))
|
|
}
|
|
return o
|
|
}
|
|
|
|
func toIntSlice(s []int32) []int {
|
|
o := []int{}
|
|
for _, p := range s {
|
|
o = append(o, int(p))
|
|
}
|
|
return o
|
|
}
|
|
|
|
func isPrefix(prefix []int, path []int) bool {
|
|
if len(prefix) > len(path) {
|
|
return false
|
|
}
|
|
|
|
for i := range prefix {
|
|
if prefix[i] != path[i] {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func getChildSegments(
|
|
setType string,
|
|
phaseType string,
|
|
shardKey tries.ShardKey,
|
|
node *tries.LazyVectorCommitmentBranchNode,
|
|
path []int,
|
|
) ([64]tries.LazyVectorCommitmentNode, int) {
|
|
nodes := [64]tries.LazyVectorCommitmentNode{}
|
|
index := 0
|
|
for i, child := range node.Children {
|
|
if child == nil {
|
|
var err error
|
|
prefix := slices.Concat(node.FullPrefix, []int{i})
|
|
child, err = node.Store.GetNodeByPath(
|
|
setType,
|
|
phaseType,
|
|
shardKey,
|
|
prefix,
|
|
)
|
|
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
|
panic(err)
|
|
}
|
|
|
|
if isPrefix(prefix, path) {
|
|
index = i
|
|
}
|
|
}
|
|
|
|
if child != nil {
|
|
nodes[i] = child
|
|
}
|
|
}
|
|
|
|
return nodes, index
|
|
}
|
|
|
|
func getSegments(tree *tries.LazyVectorCommitmentTree, path []int) (
|
|
[][64]tries.LazyVectorCommitmentNode,
|
|
error,
|
|
) {
|
|
segments := [][64]tries.LazyVectorCommitmentNode{
|
|
{tree.Root},
|
|
}
|
|
|
|
node := tree.Root
|
|
for node != nil {
|
|
switch t := node.(type) {
|
|
case *tries.LazyVectorCommitmentBranchNode:
|
|
segment, index := getChildSegments(
|
|
tree.SetType,
|
|
tree.PhaseType,
|
|
tree.ShardKey,
|
|
t,
|
|
path,
|
|
)
|
|
segments = append(segments, segment)
|
|
node = segment[index]
|
|
case *tries.LazyVectorCommitmentLeafNode:
|
|
node = nil
|
|
}
|
|
}
|
|
|
|
return segments, nil
|
|
}
|
|
|
|
type streamManager struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
logger *zap.Logger
|
|
stream hypergraph.HyperStream
|
|
hypergraphStore tries.TreeBackingStore
|
|
localTree *tries.LazyVectorCommitmentTree
|
|
localSet hypergraph.IdSet
|
|
lastSent time.Time
|
|
snapshot *snapshotHandle
|
|
}
|
|
|
|
type rawVertexSaver interface {
|
|
SaveVertexTreeRaw(
|
|
txn tries.TreeBackingStoreTransaction,
|
|
id []byte,
|
|
data []byte,
|
|
) error
|
|
}
|
|
|
|
type vertexTreeDeleter interface {
|
|
DeleteVertexTree(
|
|
txn tries.TreeBackingStoreTransaction,
|
|
id []byte,
|
|
) error
|
|
}
|
|
|
|
const (
|
|
leafAckMinTimeout = 30 * time.Second
|
|
leafAckMaxTimeout = 10 * time.Minute
|
|
leafAckPerLeafBudget = 20 * time.Millisecond // Generous budget for tree building overhead
|
|
pruneTxnChunk = 100
|
|
)
|
|
|
|
func leafAckTimeout(count uint64) time.Duration {
|
|
// Calculate timeout with per-leaf budget plus a base overhead
|
|
baseOverhead := 30 * time.Second
|
|
timeout := baseOverhead + time.Duration(count)*leafAckPerLeafBudget
|
|
|
|
if timeout < leafAckMinTimeout {
|
|
return leafAckMinTimeout
|
|
}
|
|
if timeout > leafAckMaxTimeout {
|
|
return leafAckMaxTimeout
|
|
}
|
|
return timeout
|
|
}
|
|
|
|
func shouldUseRawSync(phaseSet protobufs.HypergraphPhaseSet) bool {
|
|
return phaseSet == protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS
|
|
}
|
|
|
|
func keyWithinCoveredPrefix(key []byte, prefix []int) bool {
|
|
if len(prefix) == 0 {
|
|
return true
|
|
}
|
|
path := tries.GetFullPath(key)
|
|
if len(path) < len(prefix) {
|
|
return false
|
|
}
|
|
for i, nib := range prefix {
|
|
if path[i] != nib {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// rawShardSync performs a full raw sync of all leaves from server to client.
|
|
// This iterates directly over the database, bypassing in-memory tree caching
|
|
// to ensure all leaves are sent even if the in-memory tree is stale.
|
|
func (s *streamManager) rawShardSync(
|
|
shardKey tries.ShardKey,
|
|
phaseSet protobufs.HypergraphPhaseSet,
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
coveredPrefix []int32,
|
|
) error {
|
|
shardHex := hex.EncodeToString(shardKey.L2[:])
|
|
s.logger.Info(
|
|
"SERVER: starting raw shard sync (direct DB iteration)",
|
|
zap.String("shard_key", shardHex),
|
|
)
|
|
start := time.Now()
|
|
prefix := toIntSlice(coveredPrefix)
|
|
|
|
// Determine set and phase type strings
|
|
setType := string(hypergraph.VertexAtomType)
|
|
phaseType := string(hypergraph.AddsPhaseType)
|
|
switch phaseSet {
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
|
|
setType = string(hypergraph.VertexAtomType)
|
|
phaseType = string(hypergraph.AddsPhaseType)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
|
|
setType = string(hypergraph.VertexAtomType)
|
|
phaseType = string(hypergraph.RemovesPhaseType)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
|
|
setType = string(hypergraph.HyperedgeAtomType)
|
|
phaseType = string(hypergraph.AddsPhaseType)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
|
|
setType = string(hypergraph.HyperedgeAtomType)
|
|
phaseType = string(hypergraph.RemovesPhaseType)
|
|
}
|
|
|
|
// Get raw leaf iterator from the database
|
|
iter, err := s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey)
|
|
if err != nil {
|
|
s.logger.Error(
|
|
"SERVER: failed to create raw leaf iterator",
|
|
zap.String("shard_key", shardHex),
|
|
zap.Error(err),
|
|
)
|
|
return errors.Wrap(err, "raw shard sync")
|
|
}
|
|
defer iter.Close()
|
|
|
|
// First pass: count leaves
|
|
var count uint64
|
|
for valid := iter.First(); valid; valid = iter.Next() {
|
|
leaf, err := iter.Leaf()
|
|
if err != nil {
|
|
// Skip non-leaf nodes (branches)
|
|
continue
|
|
}
|
|
if leaf != nil && keyWithinCoveredPrefix(leaf.Key, prefix) {
|
|
count++
|
|
}
|
|
}
|
|
|
|
s.logger.Info(
|
|
"SERVER: raw sync sending metadata",
|
|
zap.String("shard_key", shardHex),
|
|
zap.Uint64("leaf_count", count),
|
|
)
|
|
|
|
// Send metadata with leaf count
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
|
|
},
|
|
}); err != nil {
|
|
return errors.Wrap(err, "raw shard sync: send metadata")
|
|
}
|
|
|
|
// Create new iterator for sending (previous one is exhausted)
|
|
iter.Close()
|
|
iter, err = s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey)
|
|
if err != nil {
|
|
return errors.Wrap(err, "raw shard sync: recreate iterator")
|
|
}
|
|
defer iter.Close()
|
|
|
|
// Second pass: send leaves
|
|
var sent uint64
|
|
for valid := iter.First(); valid; valid = iter.Next() {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
leaf, err := iter.Leaf()
|
|
if err != nil {
|
|
// Skip non-leaf nodes
|
|
continue
|
|
}
|
|
if leaf == nil {
|
|
continue
|
|
}
|
|
if !keyWithinCoveredPrefix(leaf.Key, prefix) {
|
|
continue
|
|
}
|
|
|
|
update := &protobufs.LeafData{
|
|
Key: leaf.Key,
|
|
Value: leaf.Value,
|
|
HashTarget: leaf.HashTarget,
|
|
Size: leaf.Size,
|
|
UnderlyingData: leaf.UnderlyingData,
|
|
}
|
|
|
|
msg := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_LeafData{
|
|
LeafData: update,
|
|
},
|
|
}
|
|
|
|
if err := s.stream.Send(msg); err != nil {
|
|
return errors.Wrap(err, "raw shard sync: send leaf")
|
|
}
|
|
|
|
sent++
|
|
if sent%1000 == 0 {
|
|
s.logger.Debug(
|
|
"SERVER: raw sync progress",
|
|
zap.Uint64("sent", sent),
|
|
zap.Uint64("total", count),
|
|
)
|
|
}
|
|
}
|
|
|
|
s.logger.Info(
|
|
"SERVER: raw sync sent all leaves, waiting for ack",
|
|
zap.String("shard_key", shardHex),
|
|
zap.Uint64("sent", sent),
|
|
)
|
|
|
|
// Wait for acknowledgment
|
|
timeoutTimer := time.NewTimer(leafAckTimeout(count))
|
|
defer timeoutTimer.Stop()
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return errors.Wrap(s.ctx.Err(), "raw shard sync: wait ack")
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
return errors.Wrap(errors.New("channel closed"), "raw shard sync: wait ack")
|
|
}
|
|
meta := msg.GetMetadata()
|
|
if meta == nil {
|
|
return errors.Wrap(errors.New("expected metadata ack"), "raw shard sync: wait ack")
|
|
}
|
|
if meta.Leaves != count {
|
|
return errors.Wrap(
|
|
fmt.Errorf("ack mismatch: expected %d, got %d", count, meta.Leaves),
|
|
"raw shard sync: wait ack",
|
|
)
|
|
}
|
|
case <-timeoutTimer.C:
|
|
return errors.Wrap(errors.New("timeout waiting for ack"), "raw shard sync")
|
|
}
|
|
|
|
s.logger.Info(
|
|
"SERVER: raw shard sync completed",
|
|
zap.String("shard_key", shardHex),
|
|
zap.Uint64("leaves_sent", sent),
|
|
zap.Duration("duration", time.Since(start)),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// receiveRawShardSync receives a full raw sync of all leaves from server.
|
|
// It uses tree insertion to properly build the tree structure on the client.
|
|
func (s *streamManager) receiveRawShardSync(
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
) error {
|
|
start := time.Now()
|
|
s.logger.Info("CLIENT: starting receiveRawShardSync")
|
|
|
|
expectedLeaves, err := s.awaitRawLeafMetadata(incomingLeaves)
|
|
if err != nil {
|
|
s.logger.Error("CLIENT: failed to receive metadata", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
s.logger.Info(
|
|
"CLIENT: received metadata",
|
|
zap.Uint64("expected_leaves", expectedLeaves),
|
|
)
|
|
|
|
var txn tries.TreeBackingStoreTransaction
|
|
var processed uint64
|
|
seenKeys := make(map[string]struct{})
|
|
for processed < expectedLeaves {
|
|
if processed%100 == 0 {
|
|
if txn != nil {
|
|
if err := txn.Commit(); err != nil {
|
|
return errors.Wrap(err, "receive raw shard sync")
|
|
}
|
|
}
|
|
txn, err = s.hypergraphStore.NewTransaction(false)
|
|
if err != nil {
|
|
return errors.Wrap(err, "receive raw shard sync")
|
|
}
|
|
}
|
|
|
|
leafMsg, err := s.awaitLeafData(incomingLeaves)
|
|
if err != nil {
|
|
if txn != nil {
|
|
txn.Abort()
|
|
}
|
|
s.logger.Error(
|
|
"CLIENT: failed to receive leaf",
|
|
zap.Uint64("processed", processed),
|
|
zap.Uint64("expected", expectedLeaves),
|
|
zap.Error(err),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// Deserialize the atom from the raw value
|
|
theirs := AtomFromBytes(leafMsg.Value)
|
|
if theirs == nil {
|
|
if txn != nil {
|
|
txn.Abort()
|
|
}
|
|
return errors.Wrap(
|
|
errors.New("invalid atom"),
|
|
"receive raw shard sync",
|
|
)
|
|
}
|
|
|
|
// Persist underlying vertex tree data if present
|
|
if len(leafMsg.UnderlyingData) > 0 {
|
|
if saver, ok := s.hypergraphStore.(rawVertexSaver); ok {
|
|
if err := saver.SaveVertexTreeRaw(
|
|
txn,
|
|
leafMsg.Key,
|
|
leafMsg.UnderlyingData,
|
|
); err != nil {
|
|
txn.Abort()
|
|
return errors.Wrap(err, "receive raw shard sync: save vertex tree")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Track key so we can prune anything absent from the authoritative list.
|
|
seenKeys[string(append([]byte(nil), leafMsg.Key...))] = struct{}{}
|
|
|
|
// Use Add to properly build tree structure
|
|
if err := s.localSet.Add(txn, theirs); err != nil {
|
|
txn.Abort()
|
|
return errors.Wrap(err, "receive raw shard sync: add atom")
|
|
}
|
|
|
|
processed++
|
|
if processed%1000 == 0 {
|
|
s.logger.Debug(
|
|
"CLIENT: raw sync progress",
|
|
zap.Uint64("processed", processed),
|
|
zap.Uint64("expected", expectedLeaves),
|
|
)
|
|
}
|
|
}
|
|
|
|
if txn != nil {
|
|
if err := txn.Commit(); err != nil {
|
|
return errors.Wrap(err, "receive raw shard sync")
|
|
}
|
|
}
|
|
|
|
// Send acknowledgment
|
|
if err := s.sendLeafMetadata(expectedLeaves); err != nil {
|
|
return errors.Wrap(err, "receive raw shard sync")
|
|
}
|
|
|
|
if err := s.pruneRawSyncExtras(seenKeys); err != nil {
|
|
return errors.Wrap(err, "receive raw shard sync")
|
|
}
|
|
|
|
s.logger.Info(
|
|
"CLIENT: raw shard sync completed",
|
|
zap.Uint64("leaves_received", expectedLeaves),
|
|
zap.Duration("duration", time.Since(start)),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
func (s *streamManager) pruneRawSyncExtras(seen map[string]struct{}) error {
|
|
start := time.Now()
|
|
setType := s.localTree.SetType
|
|
phaseType := s.localTree.PhaseType
|
|
shardKey := s.localTree.ShardKey
|
|
|
|
iter, err := s.hypergraphStore.IterateRawLeaves(setType, phaseType, shardKey)
|
|
if err != nil {
|
|
return errors.Wrap(err, "prune raw sync extras: iterator")
|
|
}
|
|
defer iter.Close()
|
|
|
|
var txn tries.TreeBackingStoreTransaction
|
|
var pruned uint64
|
|
|
|
commitTxn := func() error {
|
|
if txn == nil {
|
|
return nil
|
|
}
|
|
if err := txn.Commit(); err != nil {
|
|
txn.Abort()
|
|
return err
|
|
}
|
|
txn = nil
|
|
return nil
|
|
}
|
|
|
|
for valid := iter.First(); valid; valid = iter.Next() {
|
|
leaf, err := iter.Leaf()
|
|
if err != nil || leaf == nil {
|
|
continue
|
|
}
|
|
if _, ok := seen[string(leaf.Key)]; ok {
|
|
continue
|
|
}
|
|
|
|
if txn == nil {
|
|
txn, err = s.hypergraphStore.NewTransaction(false)
|
|
if err != nil {
|
|
return errors.Wrap(err, "prune raw sync extras")
|
|
}
|
|
}
|
|
|
|
atom := AtomFromBytes(leaf.Value)
|
|
if atom == nil {
|
|
s.logger.Warn("CLIENT: skipping stale leaf with invalid atom", zap.String("key", hex.EncodeToString(leaf.Key)))
|
|
continue
|
|
}
|
|
|
|
if err := s.localSet.Delete(txn, atom); err != nil {
|
|
txn.Abort()
|
|
return errors.Wrap(err, "prune raw sync extras")
|
|
}
|
|
if err := s.deleteVertexTreeIfNeeded(txn, atom, leaf.Key); err != nil {
|
|
txn.Abort()
|
|
return errors.Wrap(err, "prune raw sync extras")
|
|
}
|
|
|
|
pruned++
|
|
if pruned%pruneTxnChunk == 0 {
|
|
if err := commitTxn(); err != nil {
|
|
return errors.Wrap(err, "prune raw sync extras")
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := commitTxn(); err != nil {
|
|
return errors.Wrap(err, "prune raw sync extras")
|
|
}
|
|
|
|
if pruned > 0 {
|
|
s.logger.Info(
|
|
"CLIENT: pruned stale leaves after raw sync",
|
|
zap.Uint64("count", pruned),
|
|
zap.Duration("duration", time.Since(start)),
|
|
)
|
|
} else {
|
|
s.logger.Info(
|
|
"CLIENT: no stale leaves found after raw sync",
|
|
zap.Duration("duration", time.Since(start)),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *streamManager) awaitRawLeafMetadata(
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
) (uint64, error) {
|
|
s.logger.Debug("CLIENT: awaitRawLeafMetadata waiting...")
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return 0, errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"await raw leaf metadata",
|
|
)
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
s.logger.Error("CLIENT: incomingLeaves channel closed")
|
|
return 0, errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"await raw leaf metadata",
|
|
)
|
|
}
|
|
meta := msg.GetMetadata()
|
|
if meta == nil {
|
|
s.logger.Error(
|
|
"CLIENT: received non-metadata message while waiting for metadata",
|
|
zap.String("payload_type", fmt.Sprintf("%T", msg.Payload)),
|
|
)
|
|
return 0, errors.Wrap(
|
|
errors.New("invalid message: expected metadata"),
|
|
"await raw leaf metadata",
|
|
)
|
|
}
|
|
s.logger.Debug(
|
|
"CLIENT: received metadata",
|
|
zap.Uint64("leaves", meta.Leaves),
|
|
)
|
|
return meta.Leaves, nil
|
|
case <-time.After(leafAckTimeout(1)):
|
|
s.logger.Error("CLIENT: timeout waiting for metadata")
|
|
return 0, errors.Wrap(
|
|
errors.New("timed out waiting for metadata"),
|
|
"await raw leaf metadata",
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *streamManager) awaitLeafData(
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
) (*protobufs.LeafData, error) {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return nil, errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"await leaf data",
|
|
)
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
return nil, errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"await leaf data",
|
|
)
|
|
}
|
|
if leaf := msg.GetLeafData(); leaf != nil {
|
|
return leaf, nil
|
|
}
|
|
return nil, errors.Wrap(
|
|
errors.New("invalid message: expected leaf data"),
|
|
"await leaf data",
|
|
)
|
|
case <-time.After(leafAckTimeout(1)):
|
|
return nil, errors.Wrap(
|
|
errors.New("timed out waiting for leaf data"),
|
|
"await leaf data",
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *streamManager) sendLeafMetadata(leaves uint64) error {
|
|
s.logger.Debug("sending leaf metadata ack", zap.Uint64("leaves", leaves))
|
|
return s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: leaves},
|
|
},
|
|
})
|
|
}
|
|
|
|
// sendLeafData builds a LeafData message (with the full leaf data) for the
|
|
// node at the given path in the local tree and sends it over the stream.
|
|
func (s *streamManager) sendLeafData(
|
|
path []int32,
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
) (err error) {
|
|
start := time.Now()
|
|
pathHex := hex.EncodeToString(packPath(path))
|
|
var count uint64
|
|
s.logger.Debug("send leaf data start", zap.String("path", pathHex))
|
|
defer func() {
|
|
s.logger.Debug(
|
|
"send leaf data finished",
|
|
zap.String("path", pathHex),
|
|
zap.Uint64("leaves_sent", count),
|
|
zap.Duration("duration", time.Since(start)),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
send := func(leaf *tries.LazyVectorCommitmentLeafNode) error {
|
|
update := &protobufs.LeafData{
|
|
Key: leaf.Key,
|
|
Value: leaf.Value,
|
|
HashTarget: leaf.HashTarget,
|
|
Size: leaf.Size.FillBytes(make([]byte, 32)),
|
|
}
|
|
data, err := s.getSerializedLeaf(leaf.Key)
|
|
if err != nil {
|
|
return errors.Wrap(err, "send leaf data")
|
|
}
|
|
if len(data) != 0 {
|
|
update.UnderlyingData = data
|
|
}
|
|
msg := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_LeafData{
|
|
LeafData: update,
|
|
},
|
|
}
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
err = s.stream.Send(msg)
|
|
if err != nil {
|
|
return errors.Wrap(err, "send leaf data")
|
|
}
|
|
|
|
s.lastSent = time.Now()
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
intPath := []int{}
|
|
for _, i := range path {
|
|
intPath = append(intPath, int(i))
|
|
}
|
|
|
|
node, err := s.localTree.GetByPath(intPath)
|
|
if err != nil {
|
|
s.logger.Error("could not get by path", zap.Error(err))
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: 0},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if node == nil {
|
|
s.logger.Warn(
|
|
"SERVER: node is nil at path, sending 0 leaves",
|
|
zap.String("path", pathHex),
|
|
zap.Bool("tree_nil", s.localTree == nil),
|
|
zap.Bool("root_nil", s.localTree != nil && s.localTree.Root == nil),
|
|
)
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: 0},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode)
|
|
count = uint64(0)
|
|
|
|
// Debug: log the node type
|
|
s.logger.Info(
|
|
"SERVER: node type at path",
|
|
zap.String("path", pathHex),
|
|
zap.Bool("is_leaf", ok),
|
|
zap.String("node_type", fmt.Sprintf("%T", node)),
|
|
)
|
|
|
|
if !ok {
|
|
children := tries.GetAllLeaves(
|
|
s.localTree.SetType,
|
|
s.localTree.PhaseType,
|
|
s.localTree.ShardKey,
|
|
node,
|
|
)
|
|
for _, child := range children {
|
|
if child == nil {
|
|
continue
|
|
}
|
|
count++
|
|
}
|
|
s.logger.Info(
|
|
"SERVER: sending metadata with leaf count (branch)",
|
|
zap.String("path", pathHex),
|
|
zap.Uint64("leaf_count", count),
|
|
zap.Int("children_slice_len", len(children)),
|
|
)
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
for _, child := range children {
|
|
if child == nil {
|
|
continue
|
|
}
|
|
|
|
if err := send(child); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
count = 1
|
|
s.logger.Info(
|
|
"SERVER: root is a single leaf, sending 1 leaf",
|
|
zap.String("path", pathHex),
|
|
zap.String("leaf_key", hex.EncodeToString(leaf.Key)),
|
|
)
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
if err := send(leaf); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
timeoutTimer := time.NewTimer(leafAckTimeout(count))
|
|
defer timeoutTimer.Stop()
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"send leaf data",
|
|
)
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
return errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"send leaf data",
|
|
)
|
|
}
|
|
|
|
switch msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
expectedLeaves := msg.GetMetadata().Leaves
|
|
if expectedLeaves != count {
|
|
return errors.Wrap(
|
|
errors.New("did not match"),
|
|
"send leaf data",
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return errors.Wrap(
|
|
errors.New("invalid message"),
|
|
"send leaf data",
|
|
)
|
|
case <-timeoutTimer.C:
|
|
return errors.Wrap(
|
|
errors.New("timed out"),
|
|
"send leaf data",
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *streamManager) getSerializedLeaf(key []byte) ([]byte, error) {
|
|
if s.snapshot != nil {
|
|
if data, ok := s.snapshot.getLeafData(key); ok {
|
|
return data, nil
|
|
}
|
|
if s.snapshot.isLeafMiss(key) {
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
type rawVertexLoader interface {
|
|
LoadVertexTreeRaw(id []byte) ([]byte, error)
|
|
}
|
|
|
|
if loader, ok := s.hypergraphStore.(rawVertexLoader); ok {
|
|
data, err := loader.LoadVertexTreeRaw(key)
|
|
if err != nil {
|
|
if s.snapshot != nil {
|
|
s.snapshot.markLeafMiss(key)
|
|
}
|
|
return nil, nil
|
|
}
|
|
if len(data) != 0 {
|
|
if s.snapshot != nil {
|
|
s.snapshot.storeLeafData(key, data)
|
|
}
|
|
return data, nil
|
|
}
|
|
}
|
|
|
|
tree, err := s.hypergraphStore.LoadVertexTree(key)
|
|
if err != nil {
|
|
if s.snapshot != nil {
|
|
s.snapshot.markLeafMiss(key)
|
|
}
|
|
return nil, nil
|
|
}
|
|
data, err := tries.SerializeNonLazyTree(tree)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if s.snapshot != nil {
|
|
s.snapshot.storeLeafData(key, data)
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
func (s *streamManager) getBranchInfo(
|
|
path []int32,
|
|
) (*protobufs.HypergraphComparisonResponse, error) {
|
|
if s.snapshot != nil {
|
|
if resp, ok := s.snapshot.getBranchInfo(path); ok {
|
|
return resp, nil
|
|
}
|
|
}
|
|
|
|
resp, err := getBranchInfoFromTree(s.logger, s.localTree, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if s.snapshot != nil {
|
|
s.snapshot.storeBranchInfo(path, resp)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// getNodeAtPath traverses the tree along the provided nibble path. It returns
|
|
// the node found (or nil if not found). The depth argument is used for internal
|
|
// recursion.
|
|
func getNodeAtPath(
|
|
logger *zap.Logger,
|
|
setType string,
|
|
phaseType string,
|
|
shardKey tries.ShardKey,
|
|
node tries.LazyVectorCommitmentNode,
|
|
path []int32,
|
|
depth int,
|
|
) tries.LazyVectorCommitmentNode {
|
|
if node == nil {
|
|
return nil
|
|
}
|
|
if len(path) == 0 {
|
|
return node
|
|
}
|
|
|
|
switch n := node.(type) {
|
|
case *tries.LazyVectorCommitmentLeafNode:
|
|
return node
|
|
case *tries.LazyVectorCommitmentBranchNode:
|
|
// Check that the branch's prefix matches the beginning of the query path.
|
|
if len(path) < len(n.Prefix) {
|
|
return nil
|
|
}
|
|
|
|
for i, nib := range n.Prefix {
|
|
if int32(nib) != path[i] {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Remove the prefix portion from the path.
|
|
remainder := path[len(n.Prefix):]
|
|
if len(remainder) == 0 {
|
|
return node
|
|
}
|
|
|
|
// The first element of the remainder selects the child.
|
|
childIndex := remainder[0]
|
|
if int(childIndex) < 0 || int(childIndex) >= len(n.Children) {
|
|
return nil
|
|
}
|
|
|
|
child, err := n.Store.GetNodeByPath(
|
|
setType,
|
|
phaseType,
|
|
shardKey,
|
|
slices.Concat(n.FullPrefix, []int{int(childIndex)}),
|
|
)
|
|
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
|
logger.Panic("failed to get node by path", zap.Error(err))
|
|
}
|
|
|
|
if child == nil {
|
|
return nil
|
|
}
|
|
|
|
return getNodeAtPath(
|
|
logger,
|
|
setType,
|
|
phaseType,
|
|
shardKey,
|
|
child,
|
|
remainder[1:],
|
|
depth+len(n.Prefix)+1,
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getBranchInfoFromTree looks up the node at the given path in the local tree,
|
|
// computes its commitment, and (if it is a branch) collects its immediate
|
|
// children's commitments.
|
|
func getBranchInfoFromTree(
|
|
logger *zap.Logger,
|
|
tree *tries.LazyVectorCommitmentTree,
|
|
path []int32,
|
|
) (
|
|
*protobufs.HypergraphComparisonResponse,
|
|
error,
|
|
) {
|
|
node := getNodeAtPath(
|
|
logger,
|
|
tree.SetType,
|
|
tree.PhaseType,
|
|
tree.ShardKey,
|
|
tree.Root,
|
|
path,
|
|
0,
|
|
)
|
|
if node == nil {
|
|
return &protobufs.HypergraphComparisonResponse{
|
|
Path: path, // buildutils:allow-slice-alias this assignment is ephemeral
|
|
Commitment: []byte{},
|
|
IsRoot: len(path) == 0,
|
|
}, nil
|
|
}
|
|
|
|
intpath := []int{}
|
|
for _, p := range path {
|
|
intpath = append(intpath, int(p))
|
|
}
|
|
|
|
node = ensureCommittedNode(logger, tree, intpath, node)
|
|
|
|
branchInfo := &protobufs.HypergraphComparisonResponse{
|
|
Path: path, // buildutils:allow-slice-alias this assignment is ephemeral
|
|
IsRoot: len(path) == 0,
|
|
}
|
|
|
|
if branch, ok := node.(*tries.LazyVectorCommitmentBranchNode); ok {
|
|
branchInfo.Commitment = branch.Commitment
|
|
if len(branch.Commitment) == 0 {
|
|
return nil, errors.New("invalid commitment")
|
|
}
|
|
|
|
for _, p := range branch.Prefix {
|
|
branchInfo.Path = append(branchInfo.Path, int32(p))
|
|
}
|
|
for i := 0; i < len(branch.Children); i++ {
|
|
child := branch.Children[i]
|
|
if child == nil {
|
|
var err error
|
|
child, err = branch.Store.GetNodeByPath(
|
|
tree.SetType,
|
|
tree.PhaseType,
|
|
tree.ShardKey,
|
|
slices.Concat(branch.FullPrefix, []int{i}),
|
|
)
|
|
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
|
logger.Panic("failed to get node by path", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
childPath := slices.Concat(branch.FullPrefix, []int{i})
|
|
child = ensureCommittedNode(logger, tree, childPath, child)
|
|
|
|
if child != nil {
|
|
var childCommit []byte
|
|
if childB, ok := child.(*tries.LazyVectorCommitmentBranchNode); ok {
|
|
childCommit = childB.Commitment
|
|
} else if childL, ok := child.(*tries.LazyVectorCommitmentLeafNode); ok {
|
|
childCommit = childL.Commitment
|
|
}
|
|
|
|
if len(childCommit) == 0 {
|
|
return nil, errors.New("invalid commitment")
|
|
}
|
|
branchInfo.Children = append(
|
|
branchInfo.Children,
|
|
&protobufs.BranchChild{
|
|
Index: int32(i),
|
|
Commitment: childCommit,
|
|
},
|
|
)
|
|
}
|
|
}
|
|
} else if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok {
|
|
branchInfo.Commitment = leaf.Commitment
|
|
if len(branchInfo.Commitment) == 0 {
|
|
return nil, errors.New("invalid commitment")
|
|
}
|
|
}
|
|
return branchInfo, nil
|
|
}
|
|
|
|
func ensureCommittedNode(
|
|
logger *zap.Logger,
|
|
tree *tries.LazyVectorCommitmentTree,
|
|
path []int,
|
|
node tries.LazyVectorCommitmentNode,
|
|
) tries.LazyVectorCommitmentNode {
|
|
if node == nil {
|
|
return nil
|
|
}
|
|
|
|
hasCommit := func(commitment []byte) bool {
|
|
return len(commitment) != 0
|
|
}
|
|
|
|
switch n := node.(type) {
|
|
case *tries.LazyVectorCommitmentBranchNode:
|
|
if hasCommit(n.Commitment) {
|
|
return node
|
|
}
|
|
case *tries.LazyVectorCommitmentLeafNode:
|
|
if hasCommit(n.Commitment) {
|
|
return node
|
|
}
|
|
default:
|
|
return node
|
|
}
|
|
|
|
reloaded, err := tree.Store.GetNodeByPath(
|
|
tree.SetType,
|
|
tree.PhaseType,
|
|
tree.ShardKey,
|
|
path,
|
|
)
|
|
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
|
logger.Panic("failed to reload node by path", zap.Error(err))
|
|
}
|
|
if reloaded != nil {
|
|
return reloaded
|
|
}
|
|
|
|
return node
|
|
}
|
|
|
|
// isLeaf infers whether a HypergraphComparisonResponse message represents a
|
|
// leaf node.
|
|
func isLeaf(info *protobufs.HypergraphComparisonResponse) bool {
|
|
return len(info.Children) == 0
|
|
}
|
|
|
|
func (s *streamManager) queryNext(
|
|
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
|
|
path []int32,
|
|
) (
|
|
resp *protobufs.HypergraphComparisonResponse,
|
|
err error,
|
|
) {
|
|
start := time.Now()
|
|
pathHex := hex.EncodeToString(packPath(path))
|
|
s.logger.Debug("query next start", zap.String("path", pathHex))
|
|
defer func() {
|
|
s.logger.Debug(
|
|
"query next finished",
|
|
zap.String("path", pathHex),
|
|
zap.Duration("duration", time.Since(start)),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Query{
|
|
Query: &protobufs.HypergraphComparisonQuery{
|
|
Path: path, // buildutils:allow-slice-alias this assignment is ephemeral
|
|
IncludeLeafData: true,
|
|
},
|
|
},
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return nil, errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle query",
|
|
)
|
|
case r, ok := <-incomingResponses:
|
|
if !ok {
|
|
return nil, errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"handle query",
|
|
)
|
|
}
|
|
resp = r
|
|
return resp, nil
|
|
case <-time.After(30 * time.Second):
|
|
return nil, errors.Wrap(
|
|
errors.New("timed out"),
|
|
"handle query",
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *streamManager) handleLeafData(
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
) (err error) {
|
|
start := time.Now()
|
|
var expectedLeaves uint64
|
|
s.logger.Debug("handle leaf data start")
|
|
defer func() {
|
|
s.logger.Debug(
|
|
"handle leaf data finished",
|
|
zap.Uint64("leaves_expected", expectedLeaves),
|
|
zap.Duration("duration", time.Since(start)),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle leaf data",
|
|
)
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
return errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
|
|
switch msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_LeafData:
|
|
return errors.Wrap(
|
|
errors.New("invalid message"),
|
|
"handle leaf data",
|
|
)
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
expectedLeaves = msg.GetMetadata().Leaves
|
|
}
|
|
case <-time.After(30 * time.Second):
|
|
return errors.Wrap(
|
|
errors.New("timed out"),
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
|
|
// s.logger.Info("expecting leaves", zap.Uint64("count", expectedLeaves))
|
|
|
|
var txn tries.TreeBackingStoreTransaction
|
|
for i := uint64(0); i < expectedLeaves; i++ {
|
|
if i%100 == 0 {
|
|
if txn != nil {
|
|
if err := txn.Commit(); err != nil {
|
|
return errors.Wrap(
|
|
err,
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
}
|
|
txn, err = s.hypergraphStore.NewTransaction(false)
|
|
if err != nil {
|
|
return errors.Wrap(
|
|
err,
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
}
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle leaf data",
|
|
)
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
return errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
|
|
var remoteUpdate *protobufs.LeafData
|
|
switch msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
return errors.Wrap(
|
|
errors.New("invalid message"),
|
|
"handle leaf data",
|
|
)
|
|
case *protobufs.HypergraphComparison_LeafData:
|
|
remoteUpdate = msg.GetLeafData()
|
|
}
|
|
|
|
// s.logger.Info(
|
|
// "received leaf data",
|
|
// zap.String("key", hex.EncodeToString(remoteUpdate.Key)),
|
|
// )
|
|
|
|
theirs := AtomFromBytes(remoteUpdate.Value)
|
|
if err := s.persistLeafTree(txn, remoteUpdate); err != nil {
|
|
txn.Abort()
|
|
return err
|
|
}
|
|
|
|
err := s.localSet.Add(txn, theirs)
|
|
if err != nil {
|
|
s.logger.Error("error while saving", zap.Error(err))
|
|
return errors.Wrap(
|
|
err,
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
case <-time.After(30 * time.Second):
|
|
return errors.Wrap(
|
|
errors.New("timed out"),
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
}
|
|
|
|
if txn != nil {
|
|
if err := txn.Commit(); err != nil {
|
|
return errors.Wrap(
|
|
err,
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
}
|
|
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: expectedLeaves},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *streamManager) deleteVertexTreeIfNeeded(
|
|
txn tries.TreeBackingStoreTransaction,
|
|
atom hypergraph.Atom,
|
|
key []byte,
|
|
) error {
|
|
if atom == nil || atom.GetAtomType() != hypergraph.VertexAtomType {
|
|
return nil
|
|
}
|
|
|
|
deleter, ok := s.hypergraphStore.(vertexTreeDeleter)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return deleter.DeleteVertexTree(txn, key)
|
|
}
|
|
|
|
func (s *streamManager) pruneLocalSubtree(path []int32) (uint64, error) {
|
|
start := time.Now()
|
|
pathHex := hex.EncodeToString(packPath(path))
|
|
s.logger.Info(
|
|
"CLIENT: pruning subtree",
|
|
zap.String("path", pathHex),
|
|
)
|
|
|
|
intPath := make([]int, len(path))
|
|
for i, nib := range path {
|
|
intPath[i] = int(nib)
|
|
}
|
|
|
|
node, err := s.localTree.GetByPath(intPath)
|
|
if err != nil {
|
|
return 0, errors.Wrap(err, "prune local subtree")
|
|
}
|
|
|
|
if node == nil {
|
|
s.logger.Debug(
|
|
"CLIENT: prune skipped, node missing",
|
|
zap.String("path", pathHex),
|
|
)
|
|
return 0, nil
|
|
}
|
|
|
|
leaves := []*tries.LazyVectorCommitmentLeafNode{}
|
|
if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok {
|
|
leaves = append(leaves, leaf)
|
|
} else {
|
|
gathered := tries.GetAllLeaves(
|
|
s.localTree.SetType,
|
|
s.localTree.PhaseType,
|
|
s.localTree.ShardKey,
|
|
node,
|
|
)
|
|
for _, leaf := range gathered {
|
|
if leaf == nil {
|
|
continue
|
|
}
|
|
leaves = append(leaves, leaf)
|
|
}
|
|
}
|
|
|
|
if len(leaves) == 0 {
|
|
s.logger.Debug(
|
|
"CLIENT: prune skipped, no leaves",
|
|
zap.String("path", pathHex),
|
|
)
|
|
return 0, nil
|
|
}
|
|
|
|
var txn tries.TreeBackingStoreTransaction
|
|
var pruned uint64
|
|
|
|
commitTxn := func() error {
|
|
if txn == nil {
|
|
return nil
|
|
}
|
|
if err := txn.Commit(); err != nil {
|
|
txn.Abort()
|
|
return err
|
|
}
|
|
txn = nil
|
|
return nil
|
|
}
|
|
|
|
for idx, leaf := range leaves {
|
|
if idx%pruneTxnChunk == 0 {
|
|
if err := commitTxn(); err != nil {
|
|
return pruned, errors.Wrap(err, "prune local subtree")
|
|
}
|
|
txn, err = s.hypergraphStore.NewTransaction(false)
|
|
if err != nil {
|
|
return pruned, errors.Wrap(err, "prune local subtree")
|
|
}
|
|
}
|
|
|
|
atom := AtomFromBytes(leaf.Value)
|
|
if atom == nil {
|
|
txn.Abort()
|
|
return pruned, errors.Wrap(errors.New("invalid atom payload"), "prune local subtree")
|
|
}
|
|
|
|
if err := s.localSet.Delete(txn, atom); err != nil {
|
|
txn.Abort()
|
|
return pruned, errors.Wrap(err, "prune local subtree")
|
|
}
|
|
|
|
if err := s.deleteVertexTreeIfNeeded(txn, atom, leaf.Key); err != nil {
|
|
txn.Abort()
|
|
return pruned, errors.Wrap(err, "prune local subtree")
|
|
}
|
|
|
|
pruned++
|
|
}
|
|
|
|
if err := commitTxn(); err != nil {
|
|
return pruned, errors.Wrap(err, "prune local subtree")
|
|
}
|
|
|
|
s.logger.Info(
|
|
"CLIENT: pruned local subtree",
|
|
zap.String("path", pathHex),
|
|
zap.Uint64("leaf_count", pruned),
|
|
zap.Duration("duration", time.Since(start)),
|
|
)
|
|
|
|
return pruned, nil
|
|
}
|
|
|
|
func (s *streamManager) persistLeafTree(
|
|
txn tries.TreeBackingStoreTransaction,
|
|
update *protobufs.LeafData,
|
|
) error {
|
|
if len(update.UnderlyingData) == 0 {
|
|
return nil
|
|
}
|
|
|
|
needsValidation := s.requiresTreeValidation()
|
|
_, canSaveRaw := s.hypergraphStore.(rawVertexSaver)
|
|
|
|
var tree *tries.VectorCommitmentTree
|
|
var err error
|
|
if needsValidation || !canSaveRaw {
|
|
tree, err = tries.DeserializeNonLazyTree(update.UnderlyingData)
|
|
if err != nil {
|
|
s.logger.Error("server returned invalid tree", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
if needsValidation {
|
|
if err := s.localSet.ValidateTree(
|
|
update.Key,
|
|
update.Value,
|
|
tree,
|
|
); err != nil {
|
|
s.logger.Error("server returned invalid tree", zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
if saver, ok := s.hypergraphStore.(rawVertexSaver); ok {
|
|
buf := make([]byte, len(update.UnderlyingData))
|
|
copy(buf, update.UnderlyingData)
|
|
return saver.SaveVertexTreeRaw(txn, update.Key, buf)
|
|
}
|
|
|
|
return s.hypergraphStore.SaveVertexTree(txn, update.Key, tree)
|
|
}
|
|
|
|
func (s *streamManager) requiresTreeValidation() bool {
|
|
if typed, ok := s.localSet.(*idSet); ok {
|
|
return typed.validator != nil
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *streamManager) handleQueryNext(
|
|
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
|
|
path []int32,
|
|
) (
|
|
branch *protobufs.HypergraphComparisonResponse,
|
|
err error,
|
|
) {
|
|
start := time.Now()
|
|
pathHex := hex.EncodeToString(packPath(path))
|
|
s.logger.Debug("handle query next start", zap.String("path", pathHex))
|
|
defer func() {
|
|
s.logger.Debug(
|
|
"handle query next finished",
|
|
zap.String("path", pathHex),
|
|
zap.Duration("duration", time.Since(start)),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return nil, errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle query next",
|
|
)
|
|
case query, ok := <-incomingQueries:
|
|
if !ok {
|
|
return nil, errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"handle query next",
|
|
)
|
|
}
|
|
|
|
if slices.Compare(query.Path, path) != 0 {
|
|
return nil, errors.Wrap(
|
|
errors.New("invalid query received"),
|
|
"handle query next",
|
|
)
|
|
}
|
|
|
|
branchInfo, berr := s.getBranchInfo(path)
|
|
if berr != nil {
|
|
return nil, errors.Wrap(berr, "handle query next")
|
|
}
|
|
|
|
resp := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Response{
|
|
Response: branchInfo,
|
|
},
|
|
}
|
|
|
|
if err := s.stream.Send(resp); err != nil {
|
|
return nil, errors.Wrap(err, "handle query next")
|
|
}
|
|
|
|
branch = branchInfo
|
|
return branch, nil
|
|
case <-time.After(30 * time.Second):
|
|
return nil, errors.Wrap(
|
|
errors.New("timed out"),
|
|
"handle query next",
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *streamManager) descendIndex(
|
|
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
|
|
path []int32,
|
|
) (
|
|
local *protobufs.HypergraphComparisonResponse,
|
|
remote *protobufs.HypergraphComparisonResponse,
|
|
err error,
|
|
) {
|
|
start := time.Now()
|
|
pathHex := hex.EncodeToString(packPath(path))
|
|
s.logger.Debug("descend index start", zap.String("path", pathHex))
|
|
defer func() {
|
|
s.logger.Debug(
|
|
"descend index finished",
|
|
zap.String("path", pathHex),
|
|
zap.Duration("duration", time.Since(start)),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
branchInfo, err := s.getBranchInfo(path)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "descend index")
|
|
}
|
|
|
|
resp := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Response{
|
|
Response: branchInfo,
|
|
},
|
|
}
|
|
|
|
if err := s.stream.Send(resp); err != nil {
|
|
return nil, nil, errors.Wrap(err, "descend index")
|
|
}
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return nil, nil, errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle query next",
|
|
)
|
|
case r, ok := <-incomingResponses:
|
|
if !ok {
|
|
return nil, nil, errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"descend index",
|
|
)
|
|
}
|
|
|
|
if slices.Compare(branchInfo.Path, r.Path) != 0 {
|
|
return nil, nil, errors.Wrap(
|
|
fmt.Errorf(
|
|
"invalid path received: %v, expected: %v",
|
|
r.Path,
|
|
branchInfo.Path,
|
|
),
|
|
"descend index",
|
|
)
|
|
}
|
|
|
|
local = branchInfo
|
|
remote = r
|
|
return local, remote, nil
|
|
case <-time.After(30 * time.Second):
|
|
return nil, nil, errors.Wrap(
|
|
errors.New("timed out"),
|
|
"descend index",
|
|
)
|
|
}
|
|
}
|
|
|
|
func packPath(path []int32) []byte {
|
|
b := []byte{}
|
|
for _, p := range path {
|
|
b = append(b, byte(p))
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (s *streamManager) walk(
|
|
path []int32,
|
|
lnode, rnode *protobufs.HypergraphComparisonResponse,
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
|
|
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
|
|
init bool,
|
|
isServer bool,
|
|
shardKey tries.ShardKey,
|
|
phaseSet protobufs.HypergraphPhaseSet,
|
|
) error {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// pathString := zap.String("path", hex.EncodeToString(packPath(path)))
|
|
|
|
if bytes.Equal(lnode.Commitment, rnode.Commitment) {
|
|
// s.logger.Debug(
|
|
// "commitments match",
|
|
// pathString,
|
|
// zap.String("commitment", hex.EncodeToString(lnode.Commitment)),
|
|
// )
|
|
return nil
|
|
}
|
|
|
|
// Check if we should use raw sync mode for this phase set
|
|
if init && shouldUseRawSync(phaseSet) {
|
|
s.logger.Info(
|
|
"walk: using raw sync mode",
|
|
zap.Bool("is_server", isServer),
|
|
zap.Int("phase_set", int(phaseSet)),
|
|
)
|
|
if isServer {
|
|
return s.rawShardSync(shardKey, phaseSet, incomingLeaves, path)
|
|
}
|
|
return s.receiveRawShardSync(incomingLeaves)
|
|
}
|
|
|
|
if isLeaf(lnode) && isLeaf(rnode) && !init {
|
|
return nil
|
|
}
|
|
|
|
if isLeaf(rnode) || isLeaf(lnode) {
|
|
// s.logger.Debug("leaf/branch mismatch at path", pathString)
|
|
if isServer {
|
|
err := s.sendLeafData(
|
|
path,
|
|
incomingLeaves,
|
|
)
|
|
return errors.Wrap(err, "walk")
|
|
} else {
|
|
err := s.handleLeafData(incomingLeaves)
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
|
|
lpref := lnode.Path
|
|
rpref := rnode.Path
|
|
if len(lpref) != len(rpref) {
|
|
// s.logger.Debug(
|
|
// "prefix length mismatch",
|
|
// zap.Int("local_prefix", len(lpref)),
|
|
// zap.Int("remote_prefix", len(rpref)),
|
|
// pathString,
|
|
// )
|
|
if len(lpref) > len(rpref) {
|
|
// s.logger.Debug("local prefix longer, traversing remote to path", pathString)
|
|
traverse := lpref[len(rpref)-1:]
|
|
rtrav := rnode
|
|
traversePath := append([]int32{}, rpref...)
|
|
for _, nibble := range traverse {
|
|
// s.logger.Debug("attempting remote traversal step")
|
|
for _, child := range rtrav.Children {
|
|
if child.Index == nibble {
|
|
// s.logger.Debug("sending query")
|
|
traversePath = append(traversePath, child.Index)
|
|
var err error
|
|
rtrav, err = s.queryNext(
|
|
incomingResponses,
|
|
traversePath,
|
|
)
|
|
if err != nil {
|
|
s.logger.Error("query failed", zap.Error(err))
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
if rtrav == nil {
|
|
// s.logger.Debug("traversal could not reach path")
|
|
if isServer {
|
|
err := s.sendLeafData(
|
|
lpref,
|
|
incomingLeaves,
|
|
)
|
|
return errors.Wrap(err, "walk")
|
|
} else {
|
|
_, err := s.pruneLocalSubtree(lpref)
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
// s.logger.Debug("traversal completed, performing walk", pathString)
|
|
return s.walk(
|
|
path,
|
|
lnode,
|
|
rtrav,
|
|
incomingLeaves,
|
|
incomingQueries,
|
|
incomingResponses,
|
|
false,
|
|
isServer,
|
|
shardKey,
|
|
phaseSet,
|
|
)
|
|
} else {
|
|
// s.logger.Debug("remote prefix longer, traversing local to path", pathString)
|
|
traverse := rpref[len(lpref)-1:]
|
|
ltrav := lnode
|
|
traversedPath := append([]int32{}, lnode.Path...)
|
|
|
|
for _, nibble := range traverse {
|
|
// s.logger.Debug("attempting local traversal step")
|
|
preTraversal := append([]int32{}, traversedPath...)
|
|
for _, child := range ltrav.Children {
|
|
if child.Index == nibble {
|
|
traversedPath = append(traversedPath, nibble)
|
|
var err error
|
|
// s.logger.Debug("expecting query")
|
|
ltrav, err = s.handleQueryNext(
|
|
incomingQueries,
|
|
traversedPath,
|
|
)
|
|
if err != nil {
|
|
s.logger.Error("expect failed", zap.Error(err))
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
|
|
if ltrav == nil {
|
|
// s.logger.Debug("traversal could not reach path")
|
|
if isServer {
|
|
err := s.sendLeafData(
|
|
preTraversal,
|
|
incomingLeaves,
|
|
)
|
|
return errors.Wrap(err, "walk")
|
|
} else {
|
|
err := s.handleLeafData(incomingLeaves)
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
} else {
|
|
missingPath := append(append([]int32{}, preTraversal...), child.Index)
|
|
if isServer {
|
|
if err := s.sendLeafData(
|
|
missingPath,
|
|
incomingLeaves,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
} else {
|
|
if _, err := s.pruneLocalSubtree(missingPath); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// s.logger.Debug("traversal completed, performing walk", pathString)
|
|
return s.walk(
|
|
path,
|
|
ltrav,
|
|
rnode,
|
|
incomingLeaves,
|
|
incomingQueries,
|
|
incomingResponses,
|
|
false,
|
|
isServer,
|
|
shardKey,
|
|
phaseSet,
|
|
)
|
|
}
|
|
} else {
|
|
if slices.Compare(lpref, rpref) == 0 {
|
|
// s.logger.Debug("prefixes match, diffing children")
|
|
for i := int32(0); i < 64; i++ {
|
|
// s.logger.Debug("checking branch", zap.Int32("branch", i))
|
|
var lchild *protobufs.BranchChild = nil
|
|
for _, lc := range lnode.Children {
|
|
if lc.Index == i {
|
|
// s.logger.Debug("local instance found", zap.Int32("branch", i))
|
|
|
|
lchild = lc
|
|
break
|
|
}
|
|
}
|
|
var rchild *protobufs.BranchChild = nil
|
|
for _, rc := range rnode.Children {
|
|
if rc.Index == i {
|
|
// s.logger.Debug("remote instance found", zap.Int32("branch", i))
|
|
|
|
rchild = rc
|
|
break
|
|
}
|
|
}
|
|
if (lchild != nil && rchild == nil) ||
|
|
(lchild == nil && rchild != nil) {
|
|
// s.logger.Info("branch divergence", pathString)
|
|
if lchild != nil {
|
|
nextPath := append(
|
|
append([]int32{}, lpref...),
|
|
lchild.Index,
|
|
)
|
|
if isServer {
|
|
if err := s.sendLeafData(
|
|
nextPath,
|
|
incomingLeaves,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
} else {
|
|
if _, err := s.pruneLocalSubtree(nextPath); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
if rchild != nil {
|
|
if !isServer {
|
|
err := s.handleLeafData(incomingLeaves)
|
|
if err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
if lchild != nil {
|
|
nextPath := append(
|
|
append([]int32{}, lpref...),
|
|
lchild.Index,
|
|
)
|
|
lc, rc, err := s.descendIndex(
|
|
incomingResponses,
|
|
nextPath,
|
|
)
|
|
if err != nil {
|
|
// s.logger.Debug("incomplete branch descension", zap.Error(err))
|
|
if isServer {
|
|
if err := s.sendLeafData(
|
|
nextPath,
|
|
incomingLeaves,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
} else {
|
|
if _, err := s.pruneLocalSubtree(nextPath); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
if err = s.walk(
|
|
nextPath,
|
|
lc,
|
|
rc,
|
|
incomingLeaves,
|
|
incomingQueries,
|
|
incomingResponses,
|
|
false,
|
|
isServer,
|
|
shardKey,
|
|
phaseSet,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// s.logger.Debug("prefix mismatch on both sides", pathString)
|
|
if isServer {
|
|
if err := s.sendLeafData(
|
|
path,
|
|
incomingLeaves,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
} else {
|
|
err := s.handleLeafData(incomingLeaves)
|
|
if err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// syncTreeServer implements the diff and sync logic on the
|
|
// server side. It sends the local root info, then processes incoming messages,
|
|
// and queues further queries as differences are detected.
|
|
func (hg *HypergraphCRDT) syncTreeServer(
|
|
ctx context.Context,
|
|
stream protobufs.HypergraphComparisonService_HyperStreamServer,
|
|
sessionLogger *zap.Logger,
|
|
) error {
|
|
logger := sessionLogger
|
|
if logger == nil {
|
|
logger = hg.logger
|
|
}
|
|
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
query := msg.GetQuery()
|
|
if query == nil {
|
|
return errors.New("client did not send valid initialization message")
|
|
}
|
|
|
|
logger.Info("received initialization message")
|
|
|
|
if len(query.ShardKey) != 35 {
|
|
return errors.New("invalid shard key")
|
|
}
|
|
|
|
shardKey := tries.ShardKey{
|
|
L1: [3]byte(query.ShardKey[:3]),
|
|
L2: [32]byte(query.ShardKey[3:]),
|
|
}
|
|
|
|
snapshotStart := time.Now()
|
|
handle := hg.snapshotMgr.acquire(shardKey)
|
|
if handle == nil {
|
|
return errors.New("hypergraph shard snapshot unavailable")
|
|
}
|
|
defer hg.snapshotMgr.release(handle)
|
|
logger.Debug(
|
|
"snapshot acquisition complete",
|
|
zap.Duration("duration", time.Since(snapshotStart)),
|
|
)
|
|
|
|
snapshotRoot := handle.Root()
|
|
if len(snapshotRoot) != 0 {
|
|
logger.Info(
|
|
"syncing with snapshot",
|
|
zap.String("root", hex.EncodeToString(snapshotRoot)),
|
|
)
|
|
} else {
|
|
logger.Info("syncing with snapshot", zap.String("root", ""))
|
|
}
|
|
|
|
snapshotStore := handle.Store()
|
|
|
|
idSet := hg.snapshotPhaseSet(shardKey, query.PhaseSet, snapshotStore)
|
|
if idSet == nil {
|
|
return errors.New("unsupported phase set")
|
|
}
|
|
|
|
branchInfo, err := getBranchInfoFromTree(
|
|
logger,
|
|
idSet.GetTree(),
|
|
query.Path,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// hg.logger.Debug(
|
|
// "returning branch info",
|
|
// zap.String("commitment", hex.EncodeToString(branchInfo.Commitment)),
|
|
// zap.Int("children", len(branchInfo.Children)),
|
|
// zap.Int("path", len(branchInfo.Path)),
|
|
// )
|
|
|
|
resp := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Response{
|
|
Response: branchInfo,
|
|
},
|
|
}
|
|
|
|
if err := stream.Send(resp); err != nil {
|
|
return err
|
|
}
|
|
|
|
msg, err = stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
response := msg.GetResponse()
|
|
if response == nil {
|
|
return errors.New(
|
|
"client did not send valid initialization response message",
|
|
)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
incomingQueriesIn, incomingQueriesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparisonQuery](
|
|
cancel,
|
|
"server incoming",
|
|
)
|
|
incomingResponsesIn, incomingResponsesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparisonResponse](
|
|
cancel,
|
|
"server incoming",
|
|
)
|
|
incomingLeavesIn, incomingLeavesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparison](
|
|
cancel,
|
|
"server incoming",
|
|
)
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err == io.EOF {
|
|
logger.Info("server stream recv eof")
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
close(incomingLeavesIn)
|
|
return
|
|
}
|
|
if err != nil {
|
|
logger.Info("server stream recv error", zap.Error(err))
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
close(incomingLeavesIn)
|
|
return
|
|
}
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
switch m := msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_LeafData:
|
|
logger.Warn("received leaf from client, terminating")
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
close(incomingLeavesIn)
|
|
return
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
incomingLeavesIn <- msg
|
|
case *protobufs.HypergraphComparison_Query:
|
|
incomingQueriesIn <- m.Query
|
|
case *protobufs.HypergraphComparison_Response:
|
|
incomingResponsesIn <- m.Response
|
|
}
|
|
}
|
|
}()
|
|
|
|
manager := &streamManager{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: logger,
|
|
stream: stream,
|
|
hypergraphStore: snapshotStore,
|
|
localTree: idSet.GetTree(),
|
|
localSet: idSet,
|
|
lastSent: time.Now(),
|
|
snapshot: handle,
|
|
}
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
err := manager.walk(
|
|
branchInfo.Path,
|
|
branchInfo,
|
|
response,
|
|
incomingLeavesOut,
|
|
incomingQueriesOut,
|
|
incomingResponsesOut,
|
|
true,
|
|
true,
|
|
shardKey,
|
|
query.PhaseSet,
|
|
)
|
|
if err != nil {
|
|
logger.Error("error while syncing", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func UnboundedChan[T any](
|
|
cancel context.CancelFunc,
|
|
purpose string,
|
|
) (chan<- T, <-chan T) {
|
|
in := make(chan T)
|
|
out := make(chan T)
|
|
go func() {
|
|
var queue []T
|
|
for {
|
|
var active chan T
|
|
var next T
|
|
if len(queue) > 0 {
|
|
active = out
|
|
next = queue[0]
|
|
}
|
|
select {
|
|
case msg, ok := <-in:
|
|
if !ok {
|
|
cancel()
|
|
close(out)
|
|
return
|
|
}
|
|
|
|
queue = append(queue, msg)
|
|
case active <- next:
|
|
queue = queue[1:]
|
|
}
|
|
}
|
|
}()
|
|
return in, out
|
|
}
|
|
|
|
func peerIPFromContext(ctx context.Context) string {
|
|
if ctx == nil {
|
|
return ""
|
|
}
|
|
p, ok := peer.FromContext(ctx)
|
|
if !ok || p.Addr == nil {
|
|
return ""
|
|
}
|
|
host, _, err := net.SplitHostPort(p.Addr.String())
|
|
if err != nil {
|
|
return p.Addr.String()
|
|
}
|
|
return host
|
|
}
|