package hypergraph import ( "bytes" "context" "encoding/hex" "fmt" "io" "slices" "strings" "sync" "time" "github.com/pkg/errors" "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/hypergraph" "source.quilibrium.com/quilibrium/monorepo/types/tries" ) // HyperStream is the gRPC method that handles synchronization. func (hg *HypergraphCRDT) HyperStream( stream protobufs.HypergraphComparisonService_HyperStreamServer, ) error { peerId, err := hg.authenticationProvider.Identify(stream.Context()) if err != nil { return errors.Wrap(err, "hyper stream") } peerKey := peerId.String() if !hg.syncController.TryEstablishSyncSession(peerKey) { return errors.New("peer already syncing") } defer func() { hg.syncController.EndSyncSession(peerKey) }() handle := hg.snapshotMgr.acquire() if handle == nil { return errors.New("hypergraph snapshot unavailable") } defer hg.snapshotMgr.release(handle) root := handle.Root() if len(root) != 0 { hg.logger.Debug( "acquired snapshot", zap.String("root", hex.EncodeToString(root)), ) } else { hg.logger.Debug("acquired snapshot", zap.String("root", "")) } snapshotStore := handle.Store() err = hg.syncTreeServer(stream, snapshotStore, root) hg.syncController.SetStatus(peerKey, &hypergraph.SyncInfo{ Unreachable: false, LastSynced: time.Now(), }) return err } // Sync performs the tree diff and synchronization from the client side. // The caller (e.g. the client) must initiate the diff from its root. // After that, both sides exchange queries, branch info, and leaf updates until // their local trees are synchronized. func (hg *HypergraphCRDT) Sync( stream protobufs.HypergraphComparisonService_HyperStreamClient, shardKey tries.ShardKey, phaseSet protobufs.HypergraphPhaseSet, ) error { const localSyncKey = "local-sync" if !hg.syncController.TryEstablishSyncSession(localSyncKey) { return errors.New("local sync already in progress") } defer func() { hg.syncController.EndSyncSession(localSyncKey) }() hg.mu.Lock() defer hg.mu.Unlock() hg.logger.Info( "sending initialization message", zap.String( "shard_key", hex.EncodeToString(slices.Concat(shardKey.L1[:], shardKey.L2[:])), ), zap.Int("phase_set", int(phaseSet)), ) var set hypergraph.IdSet switch phaseSet { case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS: set = hg.getVertexAddsSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES: set = hg.getVertexRemovesSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS: set = hg.getHyperedgeAddsSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES: set = hg.getHyperedgeRemovesSet(shardKey) default: return errors.New("unsupported phase set") } path := hg.getCoveredPrefix() // Send initial query for path if err := stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Query{ Query: &protobufs.HypergraphComparisonQuery{ ShardKey: slices.Concat(shardKey.L1[:], shardKey.L2[:]), PhaseSet: phaseSet, Path: toInt32Slice(path), Commitment: set.GetTree().Commit(false), IncludeLeafData: false, }, }, }); err != nil { return err } // hg.logger.Debug("server waiting for initial query") msg, err := stream.Recv() if err != nil { hg.logger.Info("initial recv failed", zap.Error(err)) return err } response := msg.GetResponse() if response == nil { return errors.New( "server did not send valid initialization response message", ) } branchInfo, err := getBranchInfoFromTree( hg.logger, set.GetTree(), toInt32Slice(path), ) if err != nil { return err } resp := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Response{ Response: branchInfo, }, } if err := stream.Send(resp); err != nil { return err } ctx, cancel := context.WithCancel(stream.Context()) incomingQueriesIn, incomingQueriesOut := UnboundedChan[*protobufs.HypergraphComparisonQuery]( cancel, "client incoming", ) incomingResponsesIn, incomingResponsesOut := UnboundedChan[*protobufs.HypergraphComparisonResponse]( cancel, "client incoming", ) incomingLeavesIn, incomingLeavesOut := UnboundedChan[*protobufs.HypergraphComparison]( cancel, "client incoming", ) go func() { for { msg, err := stream.Recv() if err == io.EOF { // hg.logger.Debug("stream closed by sender") cancel() close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) return } if err != nil { // hg.logger.Debug("error from stream", zap.Error(err)) cancel() close(incomingQueriesIn) close(incomingResponsesIn) close(incomingLeavesIn) return } if msg == nil { continue } switch m := msg.Payload.(type) { case *protobufs.HypergraphComparison_LeafData: incomingLeavesIn <- msg case *protobufs.HypergraphComparison_Metadata: incomingLeavesIn <- msg case *protobufs.HypergraphComparison_Query: incomingQueriesIn <- m.Query case *protobufs.HypergraphComparison_Response: incomingResponsesIn <- m.Response } } }() wg := sync.WaitGroup{} wg.Add(1) manager := &streamManager{ ctx: ctx, cancel: cancel, logger: hg.logger, stream: stream, hypergraphStore: hg.store, localTree: set.GetTree(), localSet: set, lastSent: time.Now(), } go func() { defer wg.Done() err := manager.walk( branchInfo.Path, branchInfo, response, incomingLeavesOut, incomingQueriesOut, incomingResponsesOut, true, false, ) if err != nil { hg.logger.Debug("error while syncing", zap.Error(err)) } }() wg.Wait() hg.logger.Info( "hypergraph root commit", zap.String("root", hex.EncodeToString(set.GetTree().Commit(false))), ) return nil } func (hg *HypergraphCRDT) GetChildrenForPath( ctx context.Context, request *protobufs.GetChildrenForPathRequest, ) (*protobufs.GetChildrenForPathResponse, error) { if len(request.ShardKey) != 35 { return nil, errors.New("invalid shard key") } shardKey := tries.ShardKey{ L1: [3]byte(request.ShardKey[:3]), L2: [32]byte(request.ShardKey[3:]), } var set hypergraph.IdSet switch request.PhaseSet { case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS: set = hg.getVertexAddsSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES: set = hg.getVertexRemovesSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS: set = hg.getHyperedgeAddsSet(shardKey) case protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES: set = hg.getHyperedgeRemovesSet(shardKey) } path := []int{} for _, p := range request.Path { path = append(path, int(p)) } segments, err := getSegments( set.GetTree(), path, ) if err != nil { return nil, errors.Wrap(err, "get children for path") } response := &protobufs.GetChildrenForPathResponse{ PathSegments: []*protobufs.TreePathSegments{}, } for _, segment := range segments { pathSegments := &protobufs.TreePathSegments{} for i, seg := range segment { if seg == nil { continue } switch t := seg.(type) { case *tries.LazyVectorCommitmentBranchNode: pathSegments.Segments = append( pathSegments.Segments, &protobufs.TreePathSegment{ Index: uint32(i), Segment: &protobufs.TreePathSegment_Branch{ Branch: &protobufs.TreePathBranch{ Prefix: toUint32Slice(t.Prefix), Commitment: t.Commitment, Size: t.Size.Bytes(), LeafCount: uint64(t.LeafCount), LongestBranch: uint32(t.LongestBranch), FullPrefix: toUint32Slice(t.FullPrefix), }, }, }, ) case *tries.LazyVectorCommitmentLeafNode: var data []byte tree, err := hg.store.LoadVertexTree(t.Key) if err == nil { data, err = tries.SerializeNonLazyTree(tree) if err != nil { return nil, errors.Wrap(err, "get children for path") } } pathSegments.Segments = append( pathSegments.Segments, &protobufs.TreePathSegment{ Index: uint32(i), Segment: &protobufs.TreePathSegment_Leaf{ Leaf: &protobufs.TreePathLeaf{ Key: t.Key, Value: data, HashTarget: t.HashTarget, Commitment: t.Commitment, Size: t.Size.Bytes(), }, }, }, ) } } response.PathSegments = append(response.PathSegments, pathSegments) } return response, nil } func toUint32Slice(s []int) []uint32 { o := []uint32{} for _, p := range s { o = append(o, uint32(p)) } return o } func toInt32Slice(s []int) []int32 { o := []int32{} for _, p := range s { o = append(o, int32(p)) } return o } func isPrefix(prefix []int, path []int) bool { if len(prefix) > len(path) { return false } for i := range prefix { if prefix[i] != path[i] { return false } } return true } func getChildSegments( setType string, phaseType string, shardKey tries.ShardKey, node *tries.LazyVectorCommitmentBranchNode, path []int, ) ([64]tries.LazyVectorCommitmentNode, int) { nodes := [64]tries.LazyVectorCommitmentNode{} index := 0 for i, child := range node.Children { if child == nil { var err error prefix := slices.Concat(node.FullPrefix, []int{i}) child, err = node.Store.GetNodeByPath( setType, phaseType, shardKey, prefix, ) if err != nil && !strings.Contains(err.Error(), "item not found") { panic(err) } if isPrefix(prefix, path) { index = i } } if child != nil { nodes[i] = child } } return nodes, index } func getSegments(tree *tries.LazyVectorCommitmentTree, path []int) ( [][64]tries.LazyVectorCommitmentNode, error, ) { segments := [][64]tries.LazyVectorCommitmentNode{ {tree.Root}, } node := tree.Root for node != nil { switch t := node.(type) { case *tries.LazyVectorCommitmentBranchNode: segment, index := getChildSegments( tree.SetType, tree.PhaseType, tree.ShardKey, t, path, ) segments = append(segments, segment) node = segment[index] case *tries.LazyVectorCommitmentLeafNode: node = nil } } return segments, nil } type streamManager struct { ctx context.Context cancel context.CancelFunc logger *zap.Logger stream hypergraph.HyperStream hypergraphStore tries.TreeBackingStore localTree *tries.LazyVectorCommitmentTree localSet hypergraph.IdSet lastSent time.Time } // sendLeafData builds a LeafData message (with the full leaf data) for the // node at the given path in the local tree and sends it over the stream. func (s *streamManager) sendLeafData( path []int32, incomingLeaves <-chan *protobufs.HypergraphComparison, ) error { send := func(leaf *tries.LazyVectorCommitmentLeafNode) error { update := &protobufs.LeafData{ Key: leaf.Key, Value: leaf.Value, HashTarget: leaf.HashTarget, Size: leaf.Size.FillBytes(make([]byte, 32)), } tree, err := s.hypergraphStore.LoadVertexTree(leaf.Key) if err == nil { b, err := tries.SerializeNonLazyTree(tree) if err != nil { return errors.Wrap(err, "send leaf data") } update.UnderlyingData = b } msg := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_LeafData{ LeafData: update, }, } // s.logger.Info( // "sending leaf data", // zap.String("key", hex.EncodeToString(leaf.Key)), // ) select { case <-s.ctx.Done(): return s.ctx.Err() default: } err = s.stream.Send(msg) if err != nil { return errors.Wrap(err, "send leaf data") } s.lastSent = time.Now() return nil } select { case <-s.ctx.Done(): return s.ctx.Err() default: } intPath := []int{} for _, i := range path { intPath = append(intPath, int(i)) } node, err := s.localTree.GetByPath(intPath) if err != nil { s.logger.Error("could not get by path", zap.Error(err)) if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Metadata{ Metadata: &protobufs.HypersyncMetadata{Leaves: 0}, }, }); err != nil { return err } return nil } if node == nil { // s.logger.Info("no node, sending 0 leaves") if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Metadata{ Metadata: &protobufs.HypersyncMetadata{Leaves: 0}, }, }); err != nil { return err } return nil } leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode) count := uint64(0) if !ok { children := tries.GetAllLeaves( s.localTree.SetType, s.localTree.PhaseType, s.localTree.ShardKey, node, ) for _, child := range children { if child == nil { continue } count++ } // s.logger.Info("sending set of leaves", zap.Uint64("leaf_count", count)) if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Metadata{ Metadata: &protobufs.HypersyncMetadata{Leaves: count}, }, }); err != nil { return err } for _, child := range children { if child == nil { continue } if err := send(child); err != nil { return err } } } else { count = 1 // s.logger.Info("sending one leaf", zap.Uint64("leaf_count", count)) if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Metadata{ Metadata: &protobufs.HypersyncMetadata{Leaves: count}, }, }); err != nil { return err } if err := send(leaf); err != nil { return err } } select { case <-s.ctx.Done(): return errors.Wrap( errors.New("context canceled"), "send leaf data", ) case msg, ok := <-incomingLeaves: if !ok { return errors.Wrap( errors.New("channel closed"), "send leaf data", ) } switch msg.Payload.(type) { case *protobufs.HypergraphComparison_Metadata: expectedLeaves := msg.GetMetadata().Leaves if expectedLeaves != count { return errors.Wrap( errors.New("did not match"), "send leaf data", ) } return nil } return errors.Wrap( errors.New("invalid message"), "send leaf data", ) case <-time.After(30 * time.Second): return errors.Wrap( errors.New("timed out"), "send leaf data", ) } } // getNodeAtPath traverses the tree along the provided nibble path. It returns // the node found (or nil if not found). The depth argument is used for internal // recursion. func getNodeAtPath( logger *zap.Logger, setType string, phaseType string, shardKey tries.ShardKey, node tries.LazyVectorCommitmentNode, path []int32, depth int, ) tries.LazyVectorCommitmentNode { if node == nil { return nil } if len(path) == 0 { return node } switch n := node.(type) { case *tries.LazyVectorCommitmentLeafNode: return node case *tries.LazyVectorCommitmentBranchNode: // Check that the branch's prefix matches the beginning of the query path. if len(path) < len(n.Prefix) { return nil } for i, nib := range n.Prefix { if int32(nib) != path[i] { return nil } } // Remove the prefix portion from the path. remainder := path[len(n.Prefix):] if len(remainder) == 0 { return node } // The first element of the remainder selects the child. childIndex := remainder[0] if int(childIndex) < 0 || int(childIndex) >= len(n.Children) { return nil } child, err := n.Store.GetNodeByPath( setType, phaseType, shardKey, slices.Concat(n.FullPrefix, []int{int(childIndex)}), ) if err != nil && !strings.Contains(err.Error(), "item not found") { logger.Panic("failed to get node by path", zap.Error(err)) } if child == nil { return nil } return getNodeAtPath( logger, setType, phaseType, shardKey, child, remainder[1:], depth+len(n.Prefix)+1, ) } return nil } // getBranchInfoFromTree looks up the node at the given path in the local tree, // computes its commitment, and (if it is a branch) collects its immediate // children's commitments. func getBranchInfoFromTree( logger *zap.Logger, tree *tries.LazyVectorCommitmentTree, path []int32, ) ( *protobufs.HypergraphComparisonResponse, error, ) { node := getNodeAtPath( logger, tree.SetType, tree.PhaseType, tree.ShardKey, tree.Root, path, 0, ) if node == nil { return &protobufs.HypergraphComparisonResponse{ Path: path, Commitment: []byte{}, IsRoot: len(path) == 0, }, nil } intpath := []int{} for _, p := range path { intpath = append(intpath, int(p)) } node = ensureCommittedNode(logger, tree, intpath, node) branchInfo := &protobufs.HypergraphComparisonResponse{ Path: path, IsRoot: len(path) == 0, } if branch, ok := node.(*tries.LazyVectorCommitmentBranchNode); ok { branchInfo.Commitment = branch.Commitment if len(branch.Commitment) == 0 { panic("branch cannot have no commitment") } for _, p := range branch.Prefix { branchInfo.Path = append(branchInfo.Path, int32(p)) } for i := 0; i < len(branch.Children); i++ { child := branch.Children[i] if child == nil { var err error child, err = branch.Store.GetNodeByPath( tree.SetType, tree.PhaseType, tree.ShardKey, slices.Concat(branch.FullPrefix, []int{i}), ) if err != nil && !strings.Contains(err.Error(), "item not found") { logger.Panic("failed to get node by path", zap.Error(err)) } } childPath := slices.Concat(branch.FullPrefix, []int{i}) child = ensureCommittedNode(logger, tree, childPath, child) if child != nil { var childCommit []byte if childB, ok := child.(*tries.LazyVectorCommitmentBranchNode); ok { childCommit = childB.Commitment } else if childL, ok := child.(*tries.LazyVectorCommitmentLeafNode); ok { childCommit = childL.Commitment } if len(childCommit) == 0 { panic("cannot have non-committed child") } branchInfo.Children = append( branchInfo.Children, &protobufs.BranchChild{ Index: int32(i), Commitment: childCommit, }, ) } } } else if leaf, ok := node.(*tries.LazyVectorCommitmentLeafNode); ok { branchInfo.Commitment = leaf.Commitment if len(branchInfo.Commitment) == 0 { panic("leaf cannot have no commitment") } } return branchInfo, nil } func ensureCommittedNode( logger *zap.Logger, tree *tries.LazyVectorCommitmentTree, path []int, node tries.LazyVectorCommitmentNode, ) tries.LazyVectorCommitmentNode { if node == nil { return nil } hasCommit := func(commitment []byte) bool { return len(commitment) != 0 } switch n := node.(type) { case *tries.LazyVectorCommitmentBranchNode: if hasCommit(n.Commitment) { return node } case *tries.LazyVectorCommitmentLeafNode: if hasCommit(n.Commitment) { return node } default: return node } reloaded, err := tree.Store.GetNodeByPath( tree.SetType, tree.PhaseType, tree.ShardKey, path, ) if err != nil && !strings.Contains(err.Error(), "item not found") { logger.Panic("failed to reload node by path", zap.Error(err)) } if reloaded != nil { return reloaded } return node } // isLeaf infers whether a HypergraphComparisonResponse message represents a // leaf node. func isLeaf(info *protobufs.HypergraphComparisonResponse) bool { return len(info.Children) == 0 } func queryNext( ctx context.Context, incomingResponses <-chan *protobufs.HypergraphComparisonResponse, stream hypergraph.HyperStream, path []int32, ) ( *protobufs.HypergraphComparisonResponse, error, ) { if err := stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Query{ Query: &protobufs.HypergraphComparisonQuery{ Path: path, IncludeLeafData: true, }, }, }); err != nil { return nil, err } select { case <-ctx.Done(): return nil, errors.Wrap( errors.New("context canceled"), "handle query", ) case resp, ok := <-incomingResponses: if !ok { return nil, errors.Wrap( errors.New("channel closed"), "handle query", ) } return resp, nil case <-time.After(30 * time.Second): return nil, errors.Wrap( errors.New("timed out"), "handle query", ) } } func (s *streamManager) handleLeafData( ctx context.Context, incomingLeaves <-chan *protobufs.HypergraphComparison, ) error { expectedLeaves := uint64(0) select { case <-ctx.Done(): return errors.Wrap( errors.New("context canceled"), "handle leaf data", ) case msg, ok := <-incomingLeaves: if !ok { return errors.Wrap( errors.New("channel closed"), "handle leaf data", ) } switch msg.Payload.(type) { case *protobufs.HypergraphComparison_LeafData: return errors.Wrap( errors.New("invalid message"), "handle leaf data", ) case *protobufs.HypergraphComparison_Metadata: expectedLeaves = msg.GetMetadata().Leaves } case <-time.After(30 * time.Second): return errors.Wrap( errors.New("timed out"), "handle leaf data", ) } // s.logger.Info("expecting leaves", zap.Uint64("count", expectedLeaves)) var txn tries.TreeBackingStoreTransaction var err error for i := uint64(0); i < expectedLeaves; i++ { if i%100 == 0 { if txn != nil { if err := txn.Commit(); err != nil { return errors.Wrap( err, "handle leaf data", ) } } txn, err = s.hypergraphStore.NewTransaction(false) if err != nil { return errors.Wrap( err, "handle leaf data", ) } } select { case <-ctx.Done(): return errors.Wrap( errors.New("context canceled"), "handle leaf data", ) case msg, ok := <-incomingLeaves: if !ok { return errors.Wrap( errors.New("channel closed"), "handle leaf data", ) } var remoteUpdate *protobufs.LeafData switch msg.Payload.(type) { case *protobufs.HypergraphComparison_Metadata: return errors.Wrap( errors.New("invalid message"), "handle leaf data", ) case *protobufs.HypergraphComparison_LeafData: remoteUpdate = msg.GetLeafData() } // s.logger.Info( // "received leaf data", // zap.String("key", hex.EncodeToString(remoteUpdate.Key)), // ) theirs := AtomFromBytes(remoteUpdate.Value) if len(remoteUpdate.UnderlyingData) != 0 { tree, err := tries.DeserializeNonLazyTree(remoteUpdate.UnderlyingData) if err != nil { s.logger.Error("server returned invalid tree", zap.Error(err)) txn.Abort() return err } err = s.localSet.ValidateTree( remoteUpdate.Key, remoteUpdate.Value, tree, ) if err != nil { s.logger.Error("server returned invalid tree", zap.Error(err)) txn.Abort() return err } err = s.hypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree) if err != nil { txn.Abort() return err } } err := s.localSet.Add(txn, theirs) if err != nil { s.logger.Error("error while saving", zap.Error(err)) return errors.Wrap( err, "handle leaf data", ) } case <-time.After(30 * time.Second): return errors.Wrap( errors.New("timed out"), "handle leaf data", ) } } if txn != nil { if err := txn.Commit(); err != nil { return errors.Wrap( err, "handle leaf data", ) } } if err := s.stream.Send(&protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Metadata{ Metadata: &protobufs.HypersyncMetadata{Leaves: expectedLeaves}, }, }); err != nil { return err } return nil } func handleQueryNext( logger *zap.Logger, ctx context.Context, incomingQueries <-chan *protobufs.HypergraphComparisonQuery, stream hypergraph.HyperStream, localTree *tries.LazyVectorCommitmentTree, path []int32, ) ( *protobufs.HypergraphComparisonResponse, error, ) { select { case <-ctx.Done(): return nil, errors.Wrap( errors.New("context canceled"), "handle query next", ) case query, ok := <-incomingQueries: if !ok { return nil, errors.Wrap( errors.New("channel closed"), "handle query next", ) } if slices.Compare(query.Path, path) != 0 { return nil, errors.Wrap( errors.New("invalid query received"), "handle query next", ) } branchInfo, err := getBranchInfoFromTree(logger, localTree, path) if err != nil { return nil, errors.Wrap(err, "handle query next") } resp := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Response{ Response: branchInfo, }, } if err := stream.Send(resp); err != nil { return nil, errors.Wrap(err, "handle query next") } return branchInfo, nil case <-time.After(30 * time.Second): return nil, errors.Wrap( errors.New("timed out"), "handle query next", ) } } func descendIndex( logger *zap.Logger, ctx context.Context, incomingResponses <-chan *protobufs.HypergraphComparisonResponse, stream hypergraph.HyperStream, localTree *tries.LazyVectorCommitmentTree, path []int32, ) ( *protobufs.HypergraphComparisonResponse, *protobufs.HypergraphComparisonResponse, error, ) { branchInfo, err := getBranchInfoFromTree(logger, localTree, path) if err != nil { return nil, nil, errors.Wrap(err, "descend index") } resp := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Response{ Response: branchInfo, }, } if err := stream.Send(resp); err != nil { return nil, nil, errors.Wrap(err, "descend index") } select { case <-ctx.Done(): return nil, nil, errors.Wrap( errors.New("context canceled"), "handle query next", ) case resp, ok := <-incomingResponses: if !ok { return nil, nil, errors.Wrap( errors.New("channel closed"), "descend index", ) } if slices.Compare(branchInfo.Path, resp.Path) != 0 { return nil, nil, errors.Wrap( fmt.Errorf( "invalid path received: %v, expected: %v", resp.Path, branchInfo.Path, ), "descend index", ) } return branchInfo, resp, nil case <-time.After(30 * time.Second): return nil, nil, errors.Wrap( errors.New("timed out"), "descend index", ) } } func packPath(path []int32) []byte { b := []byte{} for _, p := range path { b = append(b, byte(p)) } return b } func (s *streamManager) walk( path []int32, lnode, rnode *protobufs.HypergraphComparisonResponse, incomingLeaves <-chan *protobufs.HypergraphComparison, incomingQueries <-chan *protobufs.HypergraphComparisonQuery, incomingResponses <-chan *protobufs.HypergraphComparisonResponse, init bool, isServer bool, ) error { select { case <-s.ctx.Done(): return s.ctx.Err() default: } pathString := zap.String("path", hex.EncodeToString(packPath(path))) if bytes.Equal(lnode.Commitment, rnode.Commitment) { // s.logger.Debug( // "commitments match", // pathString, // zap.String("commitment", hex.EncodeToString(lnode.Commitment)), // ) return nil } if isLeaf(lnode) && isLeaf(rnode) && !init { return nil } if isLeaf(rnode) || isLeaf(lnode) { // s.logger.Debug("leaf/branch mismatch at path", pathString) if isServer { err := s.sendLeafData( path, incomingLeaves, ) return errors.Wrap(err, "walk") } else { err := s.handleLeafData(s.ctx, incomingLeaves) return errors.Wrap(err, "walk") } } lpref := lnode.Path rpref := rnode.Path if len(lpref) != len(rpref) { // s.logger.Debug( // "prefix length mismatch", // zap.Int("local_prefix", len(lpref)), // zap.Int("remote_prefix", len(rpref)), // pathString, // ) if len(lpref) > len(rpref) { // s.logger.Debug("local prefix longer, traversing remote to path", pathString) traverse := lpref[len(rpref)-1:] rtrav := rnode traversePath := append([]int32{}, rpref...) for _, nibble := range traverse { // s.logger.Debug("attempting remote traversal step") for _, child := range rtrav.Children { if child.Index == nibble { // s.logger.Debug("sending query") traversePath = append(traversePath, child.Index) var err error rtrav, err = queryNext( s.ctx, incomingResponses, s.stream, traversePath, ) if err != nil { s.logger.Error("query failed", zap.Error(err)) return errors.Wrap(err, "walk") } break } } if rtrav == nil { // s.logger.Debug("traversal could not reach path") if isServer { err := s.sendLeafData( lpref, incomingLeaves, ) return errors.Wrap(err, "walk") } else { err := s.handleLeafData(s.ctx, incomingLeaves) return errors.Wrap(err, "walk") } } } // s.logger.Debug("traversal completed, performing walk", pathString) return s.walk( path, lnode, rtrav, incomingLeaves, incomingQueries, incomingResponses, false, isServer, ) } else { // s.logger.Debug("remote prefix longer, traversing local to path", pathString) traverse := rpref[len(lpref)-1:] ltrav := lnode traversedPath := append([]int32{}, lnode.Path...) for _, nibble := range traverse { // s.logger.Debug("attempting local traversal step") preTraversal := append([]int32{}, traversedPath...) for _, child := range ltrav.Children { if child.Index == nibble { traversedPath = append(traversedPath, nibble) var err error // s.logger.Debug("expecting query") ltrav, err = handleQueryNext( s.logger, s.ctx, incomingQueries, s.stream, s.localTree, traversedPath, ) if err != nil { s.logger.Error("expect failed", zap.Error(err)) return errors.Wrap(err, "walk") } if ltrav == nil { // s.logger.Debug("traversal could not reach path") if isServer { err := s.sendLeafData( preTraversal, incomingLeaves, ) return errors.Wrap(err, "walk") } else { err := s.handleLeafData(s.ctx, incomingLeaves) return errors.Wrap(err, "walk") } } } else { // s.logger.Debug( // "known missing branch", // zap.String( // "path", // hex.EncodeToString( // packPath( // append(append([]int32{}, preTraversal...), child.Index), // ), // ), // ), // ) if isServer { if err := s.sendLeafData( append(append([]int32{}, preTraversal...), child.Index), incomingLeaves, ); err != nil { return errors.Wrap(err, "walk") } } else { err := s.handleLeafData(s.ctx, incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } } } } } // s.logger.Debug("traversal completed, performing walk", pathString) return s.walk( path, ltrav, rnode, incomingLeaves, incomingQueries, incomingResponses, false, isServer, ) } } else { if slices.Compare(lpref, rpref) == 0 { // s.logger.Debug("prefixes match, diffing children") for i := int32(0); i < 64; i++ { // s.logger.Debug("checking branch", zap.Int32("branch", i)) var lchild *protobufs.BranchChild = nil for _, lc := range lnode.Children { if lc.Index == i { // s.logger.Debug("local instance found", zap.Int32("branch", i)) lchild = lc break } } var rchild *protobufs.BranchChild = nil for _, rc := range rnode.Children { if rc.Index == i { // s.logger.Debug("remote instance found", zap.Int32("branch", i)) rchild = rc break } } if (lchild != nil && rchild == nil) || (lchild == nil && rchild != nil) { s.logger.Info("branch divergence", pathString) if lchild != nil { nextPath := append( append([]int32{}, lpref...), lchild.Index, ) if isServer { if err := s.sendLeafData( nextPath, incomingLeaves, ); err != nil { return errors.Wrap(err, "walk") } } } if rchild != nil { if !isServer { err := s.handleLeafData(s.ctx, incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } } } } else { if lchild != nil { nextPath := append( append([]int32{}, lpref...), lchild.Index, ) lc, rc, err := descendIndex( s.logger, s.ctx, incomingResponses, s.stream, s.localTree, nextPath, ) if err != nil { // s.logger.Debug("incomplete branch descension", zap.Error(err)) if isServer { if err := s.sendLeafData( nextPath, incomingLeaves, ); err != nil { return errors.Wrap(err, "walk") } } else { err := s.handleLeafData(s.ctx, incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } } continue } if err = s.walk( nextPath, lc, rc, incomingLeaves, incomingQueries, incomingResponses, false, isServer, ); err != nil { return errors.Wrap(err, "walk") } } } } } else { // s.logger.Debug("prefix mismatch on both sides", pathString) if isServer { if err := s.sendLeafData( path, incomingLeaves, ); err != nil { return errors.Wrap(err, "walk") } } else { err := s.handleLeafData(s.ctx, incomingLeaves) if err != nil { return errors.Wrap(err, "walk") } } } } return nil } // syncTreeServer implements the diff and sync logic on the // server side. It sends the local root info, then processes incoming messages, // and queues further queries as differences are detected. func (hg *HypergraphCRDT) syncTreeServer( stream protobufs.HypergraphComparisonService_HyperStreamServer, snapshotStore tries.TreeBackingStore, snapshotRoot []byte, ) error { if len(snapshotRoot) != 0 { hg.logger.Info( "syncing with snapshot", zap.String("root", hex.EncodeToString(snapshotRoot)), ) } else { hg.logger.Info("syncing with snapshot", zap.String("root", "")) } msg, err := stream.Recv() if err != nil { return err } query := msg.GetQuery() if query == nil { return errors.New("client did not send valid initialization message") } hg.logger.Info("received initialization message") if len(query.ShardKey) != 35 { return errors.New("invalid shard key") } shardKey := tries.ShardKey{ L1: [3]byte(query.ShardKey[:3]), L2: [32]byte(query.ShardKey[3:]), } idSet := hg.snapshotPhaseSet(shardKey, query.PhaseSet, snapshotStore) if idSet == nil { return errors.New("unsupported phase set") } branchInfo, err := getBranchInfoFromTree( hg.logger, idSet.GetTree(), query.Path, ) if err != nil { return err } // hg.logger.Debug( // "returning branch info", // zap.String("commitment", hex.EncodeToString(branchInfo.Commitment)), // zap.Int("children", len(branchInfo.Children)), // zap.Int("path", len(branchInfo.Path)), // ) resp := &protobufs.HypergraphComparison{ Payload: &protobufs.HypergraphComparison_Response{ Response: branchInfo, }, } if err := stream.Send(resp); err != nil { return err } msg, err = stream.Recv() if err != nil { return err } response := msg.GetResponse() if response == nil { return errors.New( "client did not send valid initialization response message", ) } ctx, cancel := context.WithCancel(stream.Context()) incomingQueriesIn, incomingQueriesOut := UnboundedChan[*protobufs.HypergraphComparisonQuery]( cancel, "server incoming", ) incomingResponsesIn, incomingResponsesOut := UnboundedChan[*protobufs.HypergraphComparisonResponse]( cancel, "server incoming", ) incomingLeavesIn, incomingLeavesOut := UnboundedChan[*protobufs.HypergraphComparison]( cancel, "server incoming", ) go func() { for { msg, err := stream.Recv() if err == io.EOF { hg.logger.Info("server stream recv eof") cancel() close(incomingQueriesIn) close(incomingResponsesIn) return } if err != nil { hg.logger.Info("server stream recv error", zap.Error(err)) cancel() close(incomingQueriesIn) close(incomingResponsesIn) return } if msg == nil { continue } switch m := msg.Payload.(type) { case *protobufs.HypergraphComparison_LeafData: hg.logger.Warn("received leaf from client, terminating") cancel() close(incomingQueriesIn) close(incomingResponsesIn) return case *protobufs.HypergraphComparison_Metadata: incomingLeavesIn <- msg case *protobufs.HypergraphComparison_Query: incomingQueriesIn <- m.Query case *protobufs.HypergraphComparison_Response: incomingResponsesIn <- m.Response } } }() wg := sync.WaitGroup{} wg.Add(1) manager := &streamManager{ ctx: ctx, cancel: cancel, logger: hg.logger, stream: stream, hypergraphStore: snapshotStore, localTree: idSet.GetTree(), lastSent: time.Now(), } go func() { defer wg.Done() err := manager.walk( branchInfo.Path, branchInfo, response, incomingLeavesOut, incomingQueriesOut, incomingResponsesOut, true, true, ) if err != nil { hg.logger.Error("error while syncing", zap.Error(err)) } }() wg.Wait() return nil } func UnboundedChan[T any]( cancel context.CancelFunc, purpose string, ) (chan<- T, <-chan T) { in := make(chan T) out := make(chan T) go func() { var queue []T for { var active chan T var next T if len(queue) > 0 { active = out next = queue[0] } select { case msg, ok := <-in: if !ok { cancel() close(out) return } queue = append(queue, msg) case active <- next: queue = queue[1:] } } }() return in, out }