mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-22 02:47:26 +08:00
1660 lines
37 KiB
Go
1660 lines
37 KiB
Go
package hypergraph
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
|
|
"source.quilibrium.com/quilibrium/monorepo/types/tries"
|
|
)
|
|
|
|
// HyperStream is the gRPC method that handles synchronization.
|
|
func (hg *HypergraphCRDT) HyperStream(
|
|
stream protobufs.HypergraphComparisonService_HyperStreamServer,
|
|
) error {
|
|
peerId, err := hg.authenticationProvider.Identify(stream.Context())
|
|
if err != nil {
|
|
return errors.Wrap(err, "hyper stream")
|
|
}
|
|
|
|
peerKey := peerId.String()
|
|
if !hg.syncController.TryEstablishSyncSession(peerKey) {
|
|
return errors.New("peer already syncing")
|
|
}
|
|
defer func() {
|
|
hg.syncController.EndSyncSession(peerKey)
|
|
}()
|
|
|
|
handle := hg.snapshotMgr.acquire()
|
|
if handle == nil {
|
|
return errors.New("hypergraph snapshot unavailable")
|
|
}
|
|
defer hg.snapshotMgr.release(handle)
|
|
|
|
root := handle.Root()
|
|
if len(root) != 0 {
|
|
hg.logger.Debug(
|
|
"acquired snapshot",
|
|
zap.String("root", hex.EncodeToString(root)),
|
|
)
|
|
} else {
|
|
hg.logger.Debug("acquired snapshot", zap.String("root", ""))
|
|
}
|
|
|
|
snapshotStore := handle.Store()
|
|
|
|
err = hg.syncTreeServer(stream, snapshotStore, root)
|
|
|
|
hg.syncController.SetStatus(peerKey, &hypergraph.SyncInfo{
|
|
Unreachable: false,
|
|
LastSynced: time.Now(),
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// Sync performs the tree diff and synchronization from the client side.
|
|
// The caller (e.g. the client) must initiate the diff from its root.
|
|
// After that, both sides exchange queries, branch info, and leaf updates until
|
|
// their local trees are synchronized.
|
|
func (hg *HypergraphCRDT) Sync(
|
|
stream protobufs.HypergraphComparisonService_HyperStreamClient,
|
|
shardKey tries.ShardKey,
|
|
phaseSet protobufs.HypergraphPhaseSet,
|
|
) error {
|
|
const localSyncKey = "local-sync"
|
|
if !hg.syncController.TryEstablishSyncSession(localSyncKey) {
|
|
return errors.New("local sync already in progress")
|
|
}
|
|
defer func() {
|
|
hg.syncController.EndSyncSession(localSyncKey)
|
|
}()
|
|
|
|
hg.mu.Lock()
|
|
defer hg.mu.Unlock()
|
|
|
|
hg.logger.Info(
|
|
"sending initialization message",
|
|
zap.String(
|
|
"shard_key",
|
|
hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:])),
|
|
),
|
|
zap.Int("phase_set", int(phaseSet)),
|
|
)
|
|
|
|
var set hypergraph.IdSet
|
|
switch phaseSet {
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
|
|
set = hg.getVertexAddsSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
|
|
set = hg.getVertexRemovesSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
|
|
set = hg.getHyperedgeAddsSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
|
|
set = hg.getHyperedgeRemovesSet(shardKey)
|
|
default:
|
|
return errors.New("unsupported phase set")
|
|
}
|
|
|
|
path := hg.getCoveredPrefix()
|
|
|
|
// Send initial query for path
|
|
if err := stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Query{
|
|
Query: &protobufs.HypergraphComparisonQuery{
|
|
ShardKey: slices.Concat(shardKey.L1[:], shardKey.L2[:]),
|
|
PhaseSet: phaseSet,
|
|
Path: toInt32Slice(path),
|
|
Commitment: set.GetTree().Commit(false),
|
|
IncludeLeafData: false,
|
|
},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// hg.logger.Debug("server waiting for initial query")
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
hg.logger.Info("initial recv failed", zap.Error(err))
|
|
return err
|
|
}
|
|
response := msg.GetResponse()
|
|
if response == nil {
|
|
return errors.New(
|
|
"server did not send valid initialization response message",
|
|
)
|
|
}
|
|
|
|
branchInfo, err := getBranchInfoFromTree(
|
|
hg.logger,
|
|
set.GetTree(),
|
|
toInt32Slice(path),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Response{
|
|
Response: branchInfo,
|
|
},
|
|
}
|
|
|
|
if err := stream.Send(resp); err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(stream.Context())
|
|
|
|
incomingQueriesIn, incomingQueriesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparisonQuery](
|
|
cancel,
|
|
"client incoming",
|
|
)
|
|
incomingResponsesIn, incomingResponsesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparisonResponse](
|
|
cancel,
|
|
"client incoming",
|
|
)
|
|
incomingLeavesIn, incomingLeavesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparison](
|
|
cancel,
|
|
"client incoming",
|
|
)
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err == io.EOF {
|
|
// hg.logger.Debug("stream closed by sender")
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
close(incomingLeavesIn)
|
|
return
|
|
}
|
|
if err != nil {
|
|
// hg.logger.Debug("error from stream", zap.Error(err))
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
close(incomingLeavesIn)
|
|
return
|
|
}
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
switch m := msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_LeafData:
|
|
incomingLeavesIn <- msg
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
incomingLeavesIn <- msg
|
|
case *protobufs.HypergraphComparison_Query:
|
|
incomingQueriesIn <- m.Query
|
|
case *protobufs.HypergraphComparison_Response:
|
|
incomingResponsesIn <- m.Response
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
|
|
manager := &streamManager{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: hg.logger,
|
|
stream: stream,
|
|
hypergraphStore: hg.store,
|
|
localTree: set.GetTree(),
|
|
localSet: set,
|
|
lastSent: time.Now(),
|
|
}
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
err := manager.walk(
|
|
branchInfo.Path,
|
|
branchInfo,
|
|
response,
|
|
incomingLeavesOut,
|
|
incomingQueriesOut,
|
|
incomingResponsesOut,
|
|
true,
|
|
false,
|
|
)
|
|
if err != nil {
|
|
hg.logger.Debug("error while syncing", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
hg.logger.Info(
|
|
"hypergraph root commit",
|
|
zap.String("root", hex.EncodeToString(set.GetTree().Commit(false))),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (hg *HypergraphCRDT) GetChildrenForPath(
|
|
ctx context.Context,
|
|
request *protobufs.GetChildrenForPathRequest,
|
|
) (*protobufs.GetChildrenForPathResponse, error) {
|
|
if len(request.ShardKey) != 35 {
|
|
return nil, errors.New("invalid shard key")
|
|
}
|
|
|
|
shardKey := tries.ShardKey{
|
|
L1: [3]byte(request.ShardKey[:3]),
|
|
L2: [32]byte(request.ShardKey[3:]),
|
|
}
|
|
|
|
var set hypergraph.IdSet
|
|
switch request.PhaseSet {
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS:
|
|
set = hg.getVertexAddsSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES:
|
|
set = hg.getVertexRemovesSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS:
|
|
set = hg.getHyperedgeAddsSet(shardKey)
|
|
case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES:
|
|
set = hg.getHyperedgeRemovesSet(shardKey)
|
|
}
|
|
|
|
path := []int{}
|
|
for _, p := range request.Path {
|
|
path = append(path, int(p))
|
|
}
|
|
|
|
segments, err := getSegments(
|
|
set.GetTree(),
|
|
path,
|
|
)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get children for path")
|
|
}
|
|
|
|
response := &protobufs.GetChildrenForPathResponse{
|
|
PathSegments: []*protobufs.TreePathSegments{},
|
|
}
|
|
|
|
for _, segment := range segments {
|
|
pathSegments := &protobufs.TreePathSegments{}
|
|
for i, seg := range segment {
|
|
if seg == nil {
|
|
continue
|
|
}
|
|
switch t := seg.(type) {
|
|
case *tries.LazyVectorCommitmentBranchNode:
|
|
pathSegments.Segments = append(
|
|
pathSegments.Segments,
|
|
&protobufs.TreePathSegment{
|
|
Index: uint32(i),
|
|
Segment: &protobufs.TreePathSegment_Branch{
|
|
Branch: &protobufs.TreePathBranch{
|
|
Prefix: toUint32Slice(t.Prefix),
|
|
Commitment: t.Commitment,
|
|
Size: t.Size.Bytes(),
|
|
LeafCount: uint64(t.LeafCount),
|
|
LongestBranch: uint32(t.LongestBranch),
|
|
FullPrefix: toUint32Slice(t.FullPrefix),
|
|
},
|
|
},
|
|
},
|
|
)
|
|
case *tries.LazyVectorCommitmentLeafNode:
|
|
var data []byte
|
|
tree, err := hg.store.LoadVertexTree(t.Key)
|
|
if err == nil {
|
|
data, err = tries.SerializeNonLazyTree(tree)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "get children for path")
|
|
}
|
|
}
|
|
pathSegments.Segments = append(
|
|
pathSegments.Segments,
|
|
&protobufs.TreePathSegment{
|
|
Index: uint32(i),
|
|
Segment: &protobufs.TreePathSegment_Leaf{
|
|
Leaf: &protobufs.TreePathLeaf{
|
|
Key: t.Key,
|
|
Value: data,
|
|
HashTarget: t.HashTarget,
|
|
Commitment: t.Commitment,
|
|
Size: t.Size.Bytes(),
|
|
},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
response.PathSegments = append(response.PathSegments, pathSegments)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
func toUint32Slice(s []int) []uint32 {
|
|
o := []uint32{}
|
|
for _, p := range s {
|
|
o = append(o, uint32(p))
|
|
}
|
|
return o
|
|
}
|
|
|
|
func toInt32Slice(s []int) []int32 {
|
|
o := []int32{}
|
|
for _, p := range s {
|
|
o = append(o, int32(p))
|
|
}
|
|
return o
|
|
}
|
|
|
|
func isPrefix(prefix []int, path []int) bool {
|
|
if len(prefix) > len(path) {
|
|
return false
|
|
}
|
|
|
|
for i := range prefix {
|
|
if prefix[i] != path[i] {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func getChildSegments(
|
|
setType string,
|
|
phaseType string,
|
|
shardKey tries.ShardKey,
|
|
node *tries.LazyVectorCommitmentBranchNode,
|
|
path []int,
|
|
) ([64]tries.LazyVectorCommitmentNode, int) {
|
|
nodes := [64]tries.LazyVectorCommitmentNode{}
|
|
index := 0
|
|
for i, child := range node.Children {
|
|
if child == nil {
|
|
var err error
|
|
prefix := slices.Concat(node.FullPrefix, []int{i})
|
|
child, err = node.Store.GetNodeByPath(
|
|
setType,
|
|
phaseType,
|
|
shardKey,
|
|
prefix,
|
|
)
|
|
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
|
panic(err)
|
|
}
|
|
|
|
if isPrefix(prefix, path) {
|
|
index = i
|
|
}
|
|
}
|
|
|
|
if child != nil {
|
|
nodes[i] = child
|
|
}
|
|
}
|
|
|
|
return nodes, index
|
|
}
|
|
|
|
func getSegments(tree *tries.LazyVectorCommitmentTree, path []int) (
|
|
[][64]tries.LazyVectorCommitmentNode,
|
|
error,
|
|
) {
|
|
segments := [][64]tries.LazyVectorCommitmentNode{
|
|
{tree.Root},
|
|
}
|
|
|
|
node := tree.Root
|
|
for node != nil {
|
|
switch t := node.(type) {
|
|
case *tries.LazyVectorCommitmentBranchNode:
|
|
segment, index := getChildSegments(
|
|
tree.SetType,
|
|
tree.PhaseType,
|
|
tree.ShardKey,
|
|
t,
|
|
path,
|
|
)
|
|
segments = append(segments, segment)
|
|
node = segment[index]
|
|
case *tries.LazyVectorCommitmentLeafNode:
|
|
node = nil
|
|
}
|
|
}
|
|
|
|
return segments, nil
|
|
}
|
|
|
|
type streamManager struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
logger *zap.Logger
|
|
stream hypergraph.HyperStream
|
|
hypergraphStore tries.TreeBackingStore
|
|
localTree *tries.LazyVectorCommitmentTree
|
|
localSet hypergraph.IdSet
|
|
lastSent time.Time
|
|
}
|
|
|
|
// sendLeafData builds a LeafData message (with the full leaf data) for the
|
|
// node at the given path in the local tree and sends it over the stream.
|
|
func (s *streamManager) sendLeafData(
|
|
path []int32,
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
) error {
|
|
send := func(leaf *tries.LazyVectorCommitmentLeafNode) error {
|
|
update := &protobufs.LeafData{
|
|
Key: leaf.Key,
|
|
Value: leaf.Value,
|
|
HashTarget: leaf.HashTarget,
|
|
Size: leaf.Size.FillBytes(make([]byte, 32)),
|
|
}
|
|
tree, err := s.hypergraphStore.LoadVertexTree(leaf.Key)
|
|
if err == nil {
|
|
b, err := tries.SerializeNonLazyTree(tree)
|
|
if err != nil {
|
|
return errors.Wrap(err, "send leaf data")
|
|
}
|
|
update.UnderlyingData = b
|
|
}
|
|
msg := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_LeafData{
|
|
LeafData: update,
|
|
},
|
|
}
|
|
|
|
// s.logger.Info(
|
|
// "sending leaf data",
|
|
// zap.String("key", hex.EncodeToString(leaf.Key)),
|
|
// )
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
err = s.stream.Send(msg)
|
|
if err != nil {
|
|
return errors.Wrap(err, "send leaf data")
|
|
}
|
|
|
|
s.lastSent = time.Now()
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
intPath := []int{}
|
|
for _, i := range path {
|
|
intPath = append(intPath, int(i))
|
|
}
|
|
|
|
node, err := s.localTree.GetByPath(intPath)
|
|
if err != nil {
|
|
s.logger.Error("could not get by path", zap.Error(err))
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: 0},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if node == nil {
|
|
// s.logger.Info("no node, sending 0 leaves")
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: 0},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode)
|
|
count := uint64(0)
|
|
if !ok {
|
|
children := tries.GetAllLeaves(
|
|
s.localTree.SetType,
|
|
s.localTree.PhaseType,
|
|
s.localTree.ShardKey,
|
|
node,
|
|
)
|
|
for _, child := range children {
|
|
if child == nil {
|
|
continue
|
|
}
|
|
count++
|
|
}
|
|
// s.logger.Info("sending set of leaves", zap.Uint64("leaf_count", count))
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
for _, child := range children {
|
|
if child == nil {
|
|
continue
|
|
}
|
|
|
|
if err := send(child); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
count = 1
|
|
// s.logger.Info("sending one leaf", zap.Uint64("leaf_count", count))
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: count},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
if err := send(leaf); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"send leaf data",
|
|
)
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
return errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"send leaf data",
|
|
)
|
|
}
|
|
|
|
switch msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
expectedLeaves := msg.GetMetadata().Leaves
|
|
if expectedLeaves != count {
|
|
return errors.Wrap(
|
|
errors.New("did not match"),
|
|
"send leaf data",
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return errors.Wrap(
|
|
errors.New("invalid message"),
|
|
"send leaf data",
|
|
)
|
|
case <-time.After(30 * time.Second):
|
|
return errors.Wrap(
|
|
errors.New("timed out"),
|
|
"send leaf data",
|
|
)
|
|
}
|
|
}
|
|
|
|
// getNodeAtPath traverses the tree along the provided nibble path. It returns
|
|
// the node found (or nil if not found). The depth argument is used for internal
|
|
// recursion.
|
|
func getNodeAtPath(
|
|
logger *zap.Logger,
|
|
setType string,
|
|
phaseType string,
|
|
shardKey tries.ShardKey,
|
|
node tries.LazyVectorCommitmentNode,
|
|
path []int32,
|
|
depth int,
|
|
) tries.LazyVectorCommitmentNode {
|
|
if node == nil {
|
|
return nil
|
|
}
|
|
if len(path) == 0 {
|
|
return node
|
|
}
|
|
|
|
switch n := node.(type) {
|
|
case *tries.LazyVectorCommitmentLeafNode:
|
|
return node
|
|
case *tries.LazyVectorCommitmentBranchNode:
|
|
// Check that the branch's prefix matches the beginning of the query path.
|
|
if len(path) < len(n.Prefix) {
|
|
return nil
|
|
}
|
|
|
|
for i, nib := range n.Prefix {
|
|
if int32(nib) != path[i] {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Remove the prefix portion from the path.
|
|
remainder := path[len(n.Prefix):]
|
|
if len(remainder) == 0 {
|
|
return node
|
|
}
|
|
|
|
// The first element of the remainder selects the child.
|
|
childIndex := remainder[0]
|
|
if int(childIndex) < 0 || int(childIndex) >= len(n.Children) {
|
|
return nil
|
|
}
|
|
|
|
child, err := n.Store.GetNodeByPath(
|
|
setType,
|
|
phaseType,
|
|
shardKey,
|
|
slices.Concat(n.FullPrefix, []int{int(childIndex)}),
|
|
)
|
|
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
|
logger.Panic("failed to get node by path", zap.Error(err))
|
|
}
|
|
|
|
if child == nil {
|
|
return nil
|
|
}
|
|
|
|
return getNodeAtPath(
|
|
logger,
|
|
setType,
|
|
phaseType,
|
|
shardKey,
|
|
child,
|
|
remainder[1:],
|
|
depth+len(n.Prefix)+1,
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getBranchInfoFromTree looks up the node at the given path in the local tree,
|
|
// computes its commitment, and (if it is a branch) collects its immediate
|
|
// children's commitments.
|
|
func getBranchInfoFromTree(
|
|
logger *zap.Logger,
|
|
tree *tries.LazyVectorCommitmentTree,
|
|
path []int32,
|
|
) (
|
|
*protobufs.HypergraphComparisonResponse,
|
|
error,
|
|
) {
|
|
node := getNodeAtPath(
|
|
logger,
|
|
tree.SetType,
|
|
tree.PhaseType,
|
|
tree.ShardKey,
|
|
tree.Root,
|
|
path,
|
|
0,
|
|
)
|
|
if node == nil {
|
|
return &protobufs.HypergraphComparisonResponse{
|
|
Path: path,
|
|
Commitment: []byte{},
|
|
IsRoot: len(path) == 0,
|
|
}, nil
|
|
}
|
|
|
|
intpath := []int{}
|
|
for _, p := range path {
|
|
intpath = append(intpath, int(p))
|
|
}
|
|
|
|
node = ensureCommittedNode(logger, tree, intpath, node)
|
|
|
|
branchInfo := &protobufs.HypergraphComparisonResponse{
|
|
Path: path,
|
|
IsRoot: len(path) == 0,
|
|
}
|
|
|
|
if branch, ok := node.(*tries.LazyVectorCommitmentBranchNode); ok {
|
|
branchInfo.Commitment = branch.Commitment
|
|
if len(branch.Commitment) == 0 {
|
|
panic("branch cannot have no commitment")
|
|
}
|
|
|
|
for _, p := range branch.Prefix {
|
|
branchInfo.Path = append(branchInfo.Path, int32(p))
|
|
}
|
|
for i := 0; i < len(branch.Children); i++ {
|
|
child := branch.Children[i]
|
|
if child == nil {
|
|
var err error
|
|
child, err = branch.Store.GetNodeByPath(
|
|
tree.SetType,
|
|
tree.PhaseType,
|
|
tree.ShardKey,
|
|
slices.Concat(branch.FullPrefix, []int{i}),
|
|
)
|
|
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
|
logger.Panic("failed to get node by path", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
childPath := slices.Concat(branch.FullPrefix, []int{i})
|
|
child = ensureCommittedNode(logger, tree, childPath, child)
|
|
|
|
if child != nil {
|
|
var childCommit []byte
|
|
if childB, ok := child.(*tries.LazyVectorCommitmentBranchNode); ok {
|
|
childCommit = childB.Commitment
|
|
} else if childL, ok := child.(*tries.LazyVectorCommitmentLeafNode); ok {
|
|
childCommit = childL.Commitment
|
|
}
|
|
|
|
if len(childCommit) == 0 {
|
|
panic("cannot have non-committed child")
|
|
}
|
|
branchInfo.Children = append(
|
|
branchInfo.Children,
|
|
&protobufs.BranchChild{
|
|
Index: int32(i),
|
|
Commitment: childCommit,
|
|
},
|
|
)
|
|
}
|
|
}
|
|
} else if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok {
|
|
branchInfo.Commitment = leaf.Commitment
|
|
if len(branchInfo.Commitment) == 0 {
|
|
panic("leaf cannot have no commitment")
|
|
}
|
|
}
|
|
return branchInfo, nil
|
|
}
|
|
|
|
func ensureCommittedNode(
|
|
logger *zap.Logger,
|
|
tree *tries.LazyVectorCommitmentTree,
|
|
path []int,
|
|
node tries.LazyVectorCommitmentNode,
|
|
) tries.LazyVectorCommitmentNode {
|
|
if node == nil {
|
|
return nil
|
|
}
|
|
|
|
hasCommit := func(commitment []byte) bool {
|
|
return len(commitment) != 0
|
|
}
|
|
|
|
switch n := node.(type) {
|
|
case *tries.LazyVectorCommitmentBranchNode:
|
|
if hasCommit(n.Commitment) {
|
|
return node
|
|
}
|
|
case *tries.LazyVectorCommitmentLeafNode:
|
|
if hasCommit(n.Commitment) {
|
|
return node
|
|
}
|
|
default:
|
|
return node
|
|
}
|
|
|
|
reloaded, err := tree.Store.GetNodeByPath(
|
|
tree.SetType,
|
|
tree.PhaseType,
|
|
tree.ShardKey,
|
|
path,
|
|
)
|
|
if err != nil && !strings.Contains(err.Error(), "item not found") {
|
|
logger.Panic("failed to reload node by path", zap.Error(err))
|
|
}
|
|
if reloaded != nil {
|
|
return reloaded
|
|
}
|
|
|
|
return node
|
|
}
|
|
|
|
// isLeaf infers whether a HypergraphComparisonResponse message represents a
|
|
// leaf node.
|
|
func isLeaf(info *protobufs.HypergraphComparisonResponse) bool {
|
|
return len(info.Children) == 0
|
|
}
|
|
|
|
func queryNext(
|
|
ctx context.Context,
|
|
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
|
|
stream hypergraph.HyperStream,
|
|
path []int32,
|
|
) (
|
|
*protobufs.HypergraphComparisonResponse,
|
|
error,
|
|
) {
|
|
if err := stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Query{
|
|
Query: &protobufs.HypergraphComparisonQuery{
|
|
Path: path,
|
|
IncludeLeafData: true,
|
|
},
|
|
},
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle query",
|
|
)
|
|
case resp, ok := <-incomingResponses:
|
|
if !ok {
|
|
return nil, errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"handle query",
|
|
)
|
|
}
|
|
return resp, nil
|
|
case <-time.After(30 * time.Second):
|
|
return nil, errors.Wrap(
|
|
errors.New("timed out"),
|
|
"handle query",
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *streamManager) handleLeafData(
|
|
ctx context.Context,
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
) error {
|
|
expectedLeaves := uint64(0)
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle leaf data",
|
|
)
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
return errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
|
|
switch msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_LeafData:
|
|
return errors.Wrap(
|
|
errors.New("invalid message"),
|
|
"handle leaf data",
|
|
)
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
expectedLeaves = msg.GetMetadata().Leaves
|
|
}
|
|
case <-time.After(30 * time.Second):
|
|
return errors.Wrap(
|
|
errors.New("timed out"),
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
|
|
// s.logger.Info("expecting leaves", zap.Uint64("count", expectedLeaves))
|
|
|
|
var txn tries.TreeBackingStoreTransaction
|
|
var err error
|
|
for i := uint64(0); i < expectedLeaves; i++ {
|
|
if i%100 == 0 {
|
|
if txn != nil {
|
|
if err := txn.Commit(); err != nil {
|
|
return errors.Wrap(
|
|
err,
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
}
|
|
txn, err = s.hypergraphStore.NewTransaction(false)
|
|
if err != nil {
|
|
return errors.Wrap(
|
|
err,
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle leaf data",
|
|
)
|
|
case msg, ok := <-incomingLeaves:
|
|
if !ok {
|
|
return errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
|
|
var remoteUpdate *protobufs.LeafData
|
|
switch msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
return errors.Wrap(
|
|
errors.New("invalid message"),
|
|
"handle leaf data",
|
|
)
|
|
case *protobufs.HypergraphComparison_LeafData:
|
|
remoteUpdate = msg.GetLeafData()
|
|
}
|
|
|
|
// s.logger.Info(
|
|
// "received leaf data",
|
|
// zap.String("key", hex.EncodeToString(remoteUpdate.Key)),
|
|
// )
|
|
|
|
theirs := AtomFromBytes(remoteUpdate.Value)
|
|
if len(remoteUpdate.UnderlyingData) != 0 {
|
|
tree, err := tries.DeserializeNonLazyTree(remoteUpdate.UnderlyingData)
|
|
if err != nil {
|
|
s.logger.Error("server returned invalid tree", zap.Error(err))
|
|
txn.Abort()
|
|
return err
|
|
}
|
|
|
|
err = s.localSet.ValidateTree(
|
|
remoteUpdate.Key,
|
|
remoteUpdate.Value,
|
|
tree,
|
|
)
|
|
if err != nil {
|
|
s.logger.Error("server returned invalid tree", zap.Error(err))
|
|
txn.Abort()
|
|
return err
|
|
}
|
|
|
|
err = s.hypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree)
|
|
if err != nil {
|
|
txn.Abort()
|
|
return err
|
|
}
|
|
}
|
|
|
|
err := s.localSet.Add(txn, theirs)
|
|
if err != nil {
|
|
s.logger.Error("error while saving", zap.Error(err))
|
|
return errors.Wrap(
|
|
err,
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
case <-time.After(30 * time.Second):
|
|
return errors.Wrap(
|
|
errors.New("timed out"),
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
}
|
|
|
|
if txn != nil {
|
|
if err := txn.Commit(); err != nil {
|
|
return errors.Wrap(
|
|
err,
|
|
"handle leaf data",
|
|
)
|
|
}
|
|
}
|
|
|
|
if err := s.stream.Send(&protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Metadata{
|
|
Metadata: &protobufs.HypersyncMetadata{Leaves: expectedLeaves},
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func handleQueryNext(
|
|
logger *zap.Logger,
|
|
ctx context.Context,
|
|
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
|
|
stream hypergraph.HyperStream,
|
|
localTree *tries.LazyVectorCommitmentTree,
|
|
path []int32,
|
|
) (
|
|
*protobufs.HypergraphComparisonResponse,
|
|
error,
|
|
) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle query next",
|
|
)
|
|
case query, ok := <-incomingQueries:
|
|
if !ok {
|
|
return nil, errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"handle query next",
|
|
)
|
|
}
|
|
|
|
if slices.Compare(query.Path, path) != 0 {
|
|
return nil, errors.Wrap(
|
|
errors.New("invalid query received"),
|
|
"handle query next",
|
|
)
|
|
}
|
|
|
|
branchInfo, err := getBranchInfoFromTree(logger, localTree, path)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "handle query next")
|
|
}
|
|
|
|
resp := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Response{
|
|
Response: branchInfo,
|
|
},
|
|
}
|
|
|
|
if err := stream.Send(resp); err != nil {
|
|
return nil, errors.Wrap(err, "handle query next")
|
|
}
|
|
|
|
return branchInfo, nil
|
|
case <-time.After(30 * time.Second):
|
|
return nil, errors.Wrap(
|
|
errors.New("timed out"),
|
|
"handle query next",
|
|
)
|
|
}
|
|
}
|
|
|
|
func descendIndex(
|
|
logger *zap.Logger,
|
|
ctx context.Context,
|
|
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
|
|
stream hypergraph.HyperStream,
|
|
localTree *tries.LazyVectorCommitmentTree,
|
|
path []int32,
|
|
) (
|
|
*protobufs.HypergraphComparisonResponse,
|
|
*protobufs.HypergraphComparisonResponse,
|
|
error,
|
|
) {
|
|
branchInfo, err := getBranchInfoFromTree(logger, localTree, path)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrap(err, "descend index")
|
|
}
|
|
|
|
resp := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Response{
|
|
Response: branchInfo,
|
|
},
|
|
}
|
|
|
|
if err := stream.Send(resp); err != nil {
|
|
return nil, nil, errors.Wrap(err, "descend index")
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, nil, errors.Wrap(
|
|
errors.New("context canceled"),
|
|
"handle query next",
|
|
)
|
|
case resp, ok := <-incomingResponses:
|
|
if !ok {
|
|
return nil, nil, errors.Wrap(
|
|
errors.New("channel closed"),
|
|
"descend index",
|
|
)
|
|
}
|
|
|
|
if slices.Compare(branchInfo.Path, resp.Path) != 0 {
|
|
return nil, nil, errors.Wrap(
|
|
fmt.Errorf(
|
|
"invalid path received: %v, expected: %v",
|
|
resp.Path,
|
|
branchInfo.Path,
|
|
),
|
|
"descend index",
|
|
)
|
|
}
|
|
|
|
return branchInfo, resp, nil
|
|
case <-time.After(30 * time.Second):
|
|
return nil, nil, errors.Wrap(
|
|
errors.New("timed out"),
|
|
"descend index",
|
|
)
|
|
}
|
|
}
|
|
|
|
func packPath(path []int32) []byte {
|
|
b := []byte{}
|
|
for _, p := range path {
|
|
b = append(b, byte(p))
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (s *streamManager) walk(
|
|
path []int32,
|
|
lnode, rnode *protobufs.HypergraphComparisonResponse,
|
|
incomingLeaves <-chan *protobufs.HypergraphComparison,
|
|
incomingQueries <-chan *protobufs.HypergraphComparisonQuery,
|
|
incomingResponses <-chan *protobufs.HypergraphComparisonResponse,
|
|
init bool,
|
|
isServer bool,
|
|
) error {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return s.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
pathString := zap.String("path", hex.EncodeToString(packPath(path)))
|
|
|
|
if bytes.Equal(lnode.Commitment, rnode.Commitment) {
|
|
// s.logger.Debug(
|
|
// "commitments match",
|
|
// pathString,
|
|
// zap.String("commitment", hex.EncodeToString(lnode.Commitment)),
|
|
// )
|
|
return nil
|
|
}
|
|
|
|
if isLeaf(lnode) && isLeaf(rnode) && !init {
|
|
return nil
|
|
}
|
|
|
|
if isLeaf(rnode) || isLeaf(lnode) {
|
|
// s.logger.Debug("leaf/branch mismatch at path", pathString)
|
|
if isServer {
|
|
err := s.sendLeafData(
|
|
path,
|
|
incomingLeaves,
|
|
)
|
|
return errors.Wrap(err, "walk")
|
|
} else {
|
|
err := s.handleLeafData(s.ctx, incomingLeaves)
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
|
|
lpref := lnode.Path
|
|
rpref := rnode.Path
|
|
if len(lpref) != len(rpref) {
|
|
// s.logger.Debug(
|
|
// "prefix length mismatch",
|
|
// zap.Int("local_prefix", len(lpref)),
|
|
// zap.Int("remote_prefix", len(rpref)),
|
|
// pathString,
|
|
// )
|
|
if len(lpref) > len(rpref) {
|
|
// s.logger.Debug("local prefix longer, traversing remote to path", pathString)
|
|
traverse := lpref[len(rpref)-1:]
|
|
rtrav := rnode
|
|
traversePath := append([]int32{}, rpref...)
|
|
for _, nibble := range traverse {
|
|
// s.logger.Debug("attempting remote traversal step")
|
|
for _, child := range rtrav.Children {
|
|
if child.Index == nibble {
|
|
// s.logger.Debug("sending query")
|
|
traversePath = append(traversePath, child.Index)
|
|
var err error
|
|
rtrav, err = queryNext(
|
|
s.ctx,
|
|
incomingResponses,
|
|
s.stream,
|
|
traversePath,
|
|
)
|
|
if err != nil {
|
|
s.logger.Error("query failed", zap.Error(err))
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
if rtrav == nil {
|
|
// s.logger.Debug("traversal could not reach path")
|
|
if isServer {
|
|
err := s.sendLeafData(
|
|
lpref,
|
|
incomingLeaves,
|
|
)
|
|
return errors.Wrap(err, "walk")
|
|
} else {
|
|
err := s.handleLeafData(s.ctx, incomingLeaves)
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
// s.logger.Debug("traversal completed, performing walk", pathString)
|
|
return s.walk(
|
|
path,
|
|
lnode,
|
|
rtrav,
|
|
incomingLeaves,
|
|
incomingQueries,
|
|
incomingResponses,
|
|
false,
|
|
isServer,
|
|
)
|
|
} else {
|
|
// s.logger.Debug("remote prefix longer, traversing local to path", pathString)
|
|
traverse := rpref[len(lpref)-1:]
|
|
ltrav := lnode
|
|
traversedPath := append([]int32{}, lnode.Path...)
|
|
|
|
for _, nibble := range traverse {
|
|
// s.logger.Debug("attempting local traversal step")
|
|
preTraversal := append([]int32{}, traversedPath...)
|
|
for _, child := range ltrav.Children {
|
|
if child.Index == nibble {
|
|
traversedPath = append(traversedPath, nibble)
|
|
var err error
|
|
// s.logger.Debug("expecting query")
|
|
ltrav, err = handleQueryNext(
|
|
s.logger,
|
|
s.ctx,
|
|
incomingQueries,
|
|
s.stream,
|
|
s.localTree,
|
|
traversedPath,
|
|
)
|
|
if err != nil {
|
|
s.logger.Error("expect failed", zap.Error(err))
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
|
|
if ltrav == nil {
|
|
// s.logger.Debug("traversal could not reach path")
|
|
if isServer {
|
|
err := s.sendLeafData(
|
|
preTraversal,
|
|
incomingLeaves,
|
|
)
|
|
return errors.Wrap(err, "walk")
|
|
} else {
|
|
err := s.handleLeafData(s.ctx, incomingLeaves)
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
} else {
|
|
// s.logger.Debug(
|
|
// "known missing branch",
|
|
// zap.String(
|
|
// "path",
|
|
// hex.EncodeToString(
|
|
// packPath(
|
|
// append(append([]int32{}, preTraversal...), child.Index),
|
|
// ),
|
|
// ),
|
|
// ),
|
|
// )
|
|
if isServer {
|
|
if err := s.sendLeafData(
|
|
append(append([]int32{}, preTraversal...), child.Index),
|
|
incomingLeaves,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
} else {
|
|
err := s.handleLeafData(s.ctx, incomingLeaves)
|
|
if err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// s.logger.Debug("traversal completed, performing walk", pathString)
|
|
return s.walk(
|
|
path,
|
|
ltrav,
|
|
rnode,
|
|
incomingLeaves,
|
|
incomingQueries,
|
|
incomingResponses,
|
|
false,
|
|
isServer,
|
|
)
|
|
}
|
|
} else {
|
|
if slices.Compare(lpref, rpref) == 0 {
|
|
// s.logger.Debug("prefixes match, diffing children")
|
|
for i := int32(0); i < 64; i++ {
|
|
// s.logger.Debug("checking branch", zap.Int32("branch", i))
|
|
var lchild *protobufs.BranchChild = nil
|
|
for _, lc := range lnode.Children {
|
|
if lc.Index == i {
|
|
// s.logger.Debug("local instance found", zap.Int32("branch", i))
|
|
|
|
lchild = lc
|
|
break
|
|
}
|
|
}
|
|
var rchild *protobufs.BranchChild = nil
|
|
for _, rc := range rnode.Children {
|
|
if rc.Index == i {
|
|
// s.logger.Debug("remote instance found", zap.Int32("branch", i))
|
|
|
|
rchild = rc
|
|
break
|
|
}
|
|
}
|
|
if (lchild != nil && rchild == nil) ||
|
|
(lchild == nil && rchild != nil) {
|
|
s.logger.Info("branch divergence", pathString)
|
|
if lchild != nil {
|
|
nextPath := append(
|
|
append([]int32{}, lpref...),
|
|
lchild.Index,
|
|
)
|
|
if isServer {
|
|
if err := s.sendLeafData(
|
|
nextPath,
|
|
incomingLeaves,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
if rchild != nil {
|
|
if !isServer {
|
|
err := s.handleLeafData(s.ctx, incomingLeaves)
|
|
if err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
if lchild != nil {
|
|
nextPath := append(
|
|
append([]int32{}, lpref...),
|
|
lchild.Index,
|
|
)
|
|
lc, rc, err := descendIndex(
|
|
s.logger,
|
|
s.ctx,
|
|
incomingResponses,
|
|
s.stream,
|
|
s.localTree,
|
|
nextPath,
|
|
)
|
|
if err != nil {
|
|
// s.logger.Debug("incomplete branch descension", zap.Error(err))
|
|
if isServer {
|
|
if err := s.sendLeafData(
|
|
nextPath,
|
|
incomingLeaves,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
} else {
|
|
err := s.handleLeafData(s.ctx, incomingLeaves)
|
|
if err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
if err = s.walk(
|
|
nextPath,
|
|
lc,
|
|
rc,
|
|
incomingLeaves,
|
|
incomingQueries,
|
|
incomingResponses,
|
|
false,
|
|
isServer,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// s.logger.Debug("prefix mismatch on both sides", pathString)
|
|
if isServer {
|
|
if err := s.sendLeafData(
|
|
path,
|
|
incomingLeaves,
|
|
); err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
} else {
|
|
err := s.handleLeafData(s.ctx, incomingLeaves)
|
|
if err != nil {
|
|
return errors.Wrap(err, "walk")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// syncTreeServer implements the diff and sync logic on the
|
|
// server side. It sends the local root info, then processes incoming messages,
|
|
// and queues further queries as differences are detected.
|
|
func (hg *HypergraphCRDT) syncTreeServer(
|
|
stream protobufs.HypergraphComparisonService_HyperStreamServer,
|
|
snapshotStore tries.TreeBackingStore,
|
|
snapshotRoot []byte,
|
|
) error {
|
|
if len(snapshotRoot) != 0 {
|
|
hg.logger.Info(
|
|
"syncing with snapshot",
|
|
zap.String("root", hex.EncodeToString(snapshotRoot)),
|
|
)
|
|
} else {
|
|
hg.logger.Info("syncing with snapshot", zap.String("root", ""))
|
|
}
|
|
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
query := msg.GetQuery()
|
|
if query == nil {
|
|
return errors.New("client did not send valid initialization message")
|
|
}
|
|
|
|
hg.logger.Info("received initialization message")
|
|
|
|
if len(query.ShardKey) != 35 {
|
|
return errors.New("invalid shard key")
|
|
}
|
|
|
|
shardKey := tries.ShardKey{
|
|
L1: [3]byte(query.ShardKey[:3]),
|
|
L2: [32]byte(query.ShardKey[3:]),
|
|
}
|
|
|
|
idSet := hg.snapshotPhaseSet(shardKey, query.PhaseSet, snapshotStore)
|
|
if idSet == nil {
|
|
return errors.New("unsupported phase set")
|
|
}
|
|
|
|
branchInfo, err := getBranchInfoFromTree(
|
|
hg.logger,
|
|
idSet.GetTree(),
|
|
query.Path,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// hg.logger.Debug(
|
|
// "returning branch info",
|
|
// zap.String("commitment", hex.EncodeToString(branchInfo.Commitment)),
|
|
// zap.Int("children", len(branchInfo.Children)),
|
|
// zap.Int("path", len(branchInfo.Path)),
|
|
// )
|
|
|
|
resp := &protobufs.HypergraphComparison{
|
|
Payload: &protobufs.HypergraphComparison_Response{
|
|
Response: branchInfo,
|
|
},
|
|
}
|
|
|
|
if err := stream.Send(resp); err != nil {
|
|
return err
|
|
}
|
|
|
|
msg, err = stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
response := msg.GetResponse()
|
|
if response == nil {
|
|
return errors.New(
|
|
"client did not send valid initialization response message",
|
|
)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(stream.Context())
|
|
|
|
incomingQueriesIn, incomingQueriesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparisonQuery](
|
|
cancel,
|
|
"server incoming",
|
|
)
|
|
incomingResponsesIn, incomingResponsesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparisonResponse](
|
|
cancel,
|
|
"server incoming",
|
|
)
|
|
incomingLeavesIn, incomingLeavesOut :=
|
|
UnboundedChan[*protobufs.HypergraphComparison](
|
|
cancel,
|
|
"server incoming",
|
|
)
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err == io.EOF {
|
|
hg.logger.Info("server stream recv eof")
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
return
|
|
}
|
|
if err != nil {
|
|
hg.logger.Info("server stream recv error", zap.Error(err))
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
return
|
|
}
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
switch m := msg.Payload.(type) {
|
|
case *protobufs.HypergraphComparison_LeafData:
|
|
hg.logger.Warn("received leaf from client, terminating")
|
|
cancel()
|
|
close(incomingQueriesIn)
|
|
close(incomingResponsesIn)
|
|
return
|
|
case *protobufs.HypergraphComparison_Metadata:
|
|
incomingLeavesIn <- msg
|
|
case *protobufs.HypergraphComparison_Query:
|
|
incomingQueriesIn <- m.Query
|
|
case *protobufs.HypergraphComparison_Response:
|
|
incomingResponsesIn <- m.Response
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
|
|
manager := &streamManager{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
logger: hg.logger,
|
|
stream: stream,
|
|
hypergraphStore: snapshotStore,
|
|
localTree: idSet.GetTree(),
|
|
lastSent: time.Now(),
|
|
}
|
|
go func() {
|
|
defer wg.Done()
|
|
err := manager.walk(
|
|
branchInfo.Path,
|
|
branchInfo,
|
|
response,
|
|
incomingLeavesOut,
|
|
incomingQueriesOut,
|
|
incomingResponsesOut,
|
|
true,
|
|
true,
|
|
)
|
|
if err != nil {
|
|
hg.logger.Error("error while syncing", zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func UnboundedChan[T any](
|
|
cancel context.CancelFunc,
|
|
purpose string,
|
|
) (chan<- T, <-chan T) {
|
|
in := make(chan T)
|
|
out := make(chan T)
|
|
go func() {
|
|
var queue []T
|
|
for {
|
|
var active chan T
|
|
var next T
|
|
if len(queue) > 0 {
|
|
active = out
|
|
next = queue[0]
|
|
}
|
|
select {
|
|
case msg, ok := <-in:
|
|
if !ok {
|
|
cancel()
|
|
close(out)
|
|
return
|
|
}
|
|
|
|
queue = append(queue, msg)
|
|
case active <- next:
|
|
queue = queue[1:]
|
|
}
|
|
}
|
|
}()
|
|
return in, out
|
|
}
|