diff --git a/hypergraph/sync.go b/hypergraph/sync.go index df58878..21f95f4 100644 --- a/hypergraph/sync.go +++ b/hypergraph/sync.go @@ -570,8 +570,11 @@ const ( leafAckPerLeafBudget = 20 * time.Millisecond // Generous budget for tree building overhead // Session-level timeouts - maxSyncSessionDuration = 15 * time.Minute // Maximum total time for a sync session - syncIdleTimeout = 5 * time.Minute // Maximum time without activity before session is killed + maxSyncSessionDuration = 30 * time.Minute // Maximum total time for a sync session + syncIdleTimeout = 10 * time.Minute // Maximum time without activity before session is killed + + // Operation-level timeouts + syncOperationTimeout = 2 * time.Minute // Timeout for individual sync operations (queries, responses) ) func leafAckTimeout(count uint64) time.Duration { @@ -588,6 +591,17 @@ func leafAckTimeout(count uint64) time.Duration { return timeout } +// isTimeoutError checks if an error is a timeout-related error that should abort the sync. +func isTimeoutError(err error) bool { + if err == nil { + return false + } + errMsg := err.Error() + return strings.Contains(errMsg, "timed out") || + strings.Contains(errMsg, "context deadline exceeded") || + strings.Contains(errMsg, "context canceled") +} + // sendLeafData builds a LeafData message (with the full leaf data) for the // node at the given path in the local tree and sends it over the stream. func (s *streamManager) sendLeafData( @@ -1123,7 +1137,7 @@ func (s *streamManager) queryNext( s.updateActivity() resp = r return resp, nil - case <-time.After(30 * time.Second): + case <-time.After(syncOperationTimeout): return nil, errors.Wrap( errors.New("timed out"), "handle query", @@ -1168,7 +1182,7 @@ func (s *streamManager) handleLeafData( case *protobufs.HypergraphComparison_Metadata: expectedLeaves = msg.GetMetadata().Leaves } - case <-time.After(30 * time.Second): + case <-time.After(syncOperationTimeout): return errors.Wrap( errors.New("timed out"), "handle leaf data", @@ -1240,7 +1254,7 @@ func (s *streamManager) handleLeafData( "handle leaf data", ) } - case <-time.After(30 * time.Second): + case <-time.After(syncOperationTimeout): return errors.Wrap( errors.New("timed out"), "handle leaf data", @@ -1360,7 +1374,7 @@ func (s *streamManager) handleQueryNext( s.updateActivity() branch = branchInfo return branch, nil - case <-time.After(30 * time.Second): + case <-time.After(syncOperationTimeout): return nil, errors.Wrap( errors.New("timed out"), "handle query next", @@ -1436,7 +1450,7 @@ func (s *streamManager) descendIndex( local = branchInfo remote = r return local, remote, nil - case <-time.After(30 * time.Second): + case <-time.After(syncOperationTimeout): return nil, nil, errors.Wrap( errors.New("timed out"), "descend index", @@ -1527,6 +1541,7 @@ func (s *streamManager) walk( traversePath := append([]int32{}, rpref...) for _, nibble := range traverse { // s.logger.Debug("attempting remote traversal step") + found := false for _, child := range rtrav.Children { if child.Index == nibble { // s.logger.Debug("sending query") @@ -1540,14 +1555,14 @@ func (s *streamManager) walk( s.logger.Error("query failed", zap.Error(err)) return errors.Wrap(err, "walk") } - + found = true break } } // If no child matched or queryNext returned nil, remote doesn't // have the path that local has - if rtrav == nil { + if !found || rtrav == nil { // s.logger.Debug("traversal could not reach path") if isServer { err := s.sendLeafData( @@ -1583,6 +1598,7 @@ func (s *streamManager) walk( for _, nibble := range traverse { // s.logger.Debug("attempting local traversal step") preTraversal := append([]int32{}, traversedPath...) + found := false for _, child := range ltrav.Children { if child.Index == nibble { traversedPath = append(traversedPath, nibble) @@ -1611,6 +1627,7 @@ func (s *streamManager) walk( return errors.Wrap(err, "walk") } } + found = true } else { // Local has a child that's not on remote's traversal path missingPath := append(append([]int32{}, preTraversal...), child.Index) @@ -1630,6 +1647,21 @@ func (s *streamManager) walk( } } } + + // If no child matched the nibble, local doesn't have the path + // that remote expects - receive remote's data + if !found { + if isServer { + err := s.sendLeafData( + preTraversal, + incomingLeaves, + ) + return errors.Wrap(err, "walk") + } else { + err := s.handleLeafData(incomingLeaves) + return errors.Wrap(err, "walk") + } + } } // s.logger.Debug("traversal completed, performing walk", pathString) return s.walk( @@ -1708,6 +1740,16 @@ func (s *streamManager) walk( nextPath, ) if err != nil { + // If this is a timeout or context error, abort the sync entirely + // rather than trying to continue with leaf data exchange + if isTimeoutError(err) { + s.logger.Warn( + "branch descension timeout - aborting sync", + zap.Error(err), + zap.String("path", hex.EncodeToString(packPath(nextPath))), + ) + return errors.Wrap(err, "walk: branch descension timeout") + } s.logger.Info("incomplete branch descension", zap.Error(err)) if isServer { if err := s.sendLeafData( diff --git a/node/consensus/app/consensus_liveness_provider.go b/node/consensus/app/consensus_liveness_provider.go index de44b80..3cc5ca3 100644 --- a/node/consensus/app/consensus_liveness_provider.go +++ b/node/consensus/app/consensus_liveness_provider.go @@ -27,7 +27,7 @@ func (p *AppLivenessProvider) Collect( } mixnetMessages := []*protobufs.Message{} - currentSet, _ := p.engine.proverRegistry.GetActiveProvers(nil) + currentSet, _ := p.engine.proverRegistry.GetActiveProvers(p.engine.appAddress) if len(currentSet) >= 9 { // Prepare mixnet for collecting messages err := p.engine.mixnet.PrepareMixnet() diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index bcf00af..50ea78b 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -2078,6 +2078,10 @@ func (e *GlobalConsensusEngine) performBlockingProverHypersync( } e.logger.Info("blocking hypersync completed") + if len(newRoots) == 0 { + return nil + } + return newRoots[0] } diff --git a/node/consensus/sync/sync_provider.go b/node/consensus/sync/sync_provider.go index c73e678..eec3ef4 100644 --- a/node/consensus/sync/sync_provider.go +++ b/node/consensus/sync/sync_provider.go @@ -714,7 +714,7 @@ func (e *SyncProvider[StateT, ProposalT]) getRandomProverPeerId() ( peer.ID, error, ) { - provers, err := e.proverRegistry.GetActiveProvers(nil) + provers, err := e.proverRegistry.GetActiveProvers(e.filter) if err != nil { e.logger.Error( "could not get active provers for sync", diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index a619b55..fa8e52a 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -1401,7 +1401,10 @@ const connectivityCacheValidity = 7 * 24 * time.Hour // 1 week // connectivityCachePath returns the path to the connectivity check cache file // for this core. The file is stored in /connectivity-check- func (b *BlossomSub) connectivityCachePath() string { - return filepath.Join(string(b.configDir), fmt.Sprintf("connectivity-check-%d", b.coreId)) + return filepath.Join( + string(b.configDir), + fmt.Sprintf("connectivity-check-%d", b.coreId), + ) } // isConnectivityCacheValid checks if there's a valid (< 1 week old) connectivity diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index fc9d71d..095afef 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -1606,3 +1606,621 @@ func TestHypergraphSyncWithModifiedEntries(t *testing.T) { t.Logf("Sync completed successfully - %d entries with same keys but different values were updated", numVertices) } + +// TestHypergraphBidirectionalSyncWithDisjointData tests that when node A has 500 +// unique vertices and node B has 500 different unique vertices, syncing in both +// directions results in both nodes having all 1000 vertices. +func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) { + logger, _ := zap.NewDevelopment() + enc := verenc.NewMPCitHVerifiableEncryptor(1) + inclusionProver := bls48581.NewKZGInclusionProver(logger) + + // Create data trees for all 1000 vertices + numVerticesPerNode := 500 + totalVertices := numVerticesPerNode * 2 + dataTrees := make([]*tries.VectorCommitmentTree, totalVertices) + eg := errgroup.Group{} + eg.SetLimit(100) + for i := 0; i < totalVertices; i++ { + eg.Go(func() error { + dataTrees[i] = buildDataTree(t, inclusionProver) + return nil + }) + } + eg.Wait() + t.Log("Generated data trees") + + // Create databases and stores for both nodes + nodeADB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA/store"}, 0) + defer nodeADB.Close() + + nodeBDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB/store"}, 0) + defer nodeBDB.Close() + + nodeAStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA/store"}, + nodeADB, + logger, + enc, + inclusionProver, + ) + + nodeBStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB/store"}, + nodeBDB, + logger, + enc, + inclusionProver, + ) + + nodeAHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "nodeA")), + nodeAStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + nodeBHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "nodeB")), + nodeBStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + // Create a shared domain for all vertices + domain := randomBytes32(t) + + // Generate vertices for node A (first 500) + nodeAVertices := make([]application.Vertex, numVerticesPerNode) + for i := 0; i < numVerticesPerNode; i++ { + addr := randomBytes32(t) + nodeAVertices[i] = hgcrdt.NewVertex( + domain, + addr, + dataTrees[i].Commit(inclusionProver, false), + dataTrees[i].GetSize(), + ) + } + + // Generate vertices for node B (second 500, completely different) + nodeBVertices := make([]application.Vertex, numVerticesPerNode) + for i := 0; i < numVerticesPerNode; i++ { + addr := randomBytes32(t) + nodeBVertices[i] = hgcrdt.NewVertex( + domain, + addr, + dataTrees[numVerticesPerNode+i].Commit(inclusionProver, false), + dataTrees[numVerticesPerNode+i].GetSize(), + ) + } + + shardKey := application.GetShardKey(nodeAVertices[0]) + + // Add vertices to node A + t.Log("Adding 500 vertices to node A") + nodeATxn, err := nodeAStore.NewTransaction(false) + require.NoError(t, err) + for i, v := range nodeAVertices { + id := v.GetID() + require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i])) + require.NoError(t, nodeAHG.AddVertex(nodeATxn, v)) + } + require.NoError(t, nodeATxn.Commit()) + + // Add vertices to node B + t.Log("Adding 500 different vertices to node B") + nodeBTxn, err := nodeBStore.NewTransaction(false) + require.NoError(t, err) + for i, v := range nodeBVertices { + id := v.GetID() + require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[numVerticesPerNode+i])) + require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v)) + } + require.NoError(t, nodeBTxn.Commit()) + + // Commit both hypergraphs + _, err = nodeAHG.Commit(1) + require.NoError(t, err) + _, err = nodeBHG.Commit(1) + require.NoError(t, err) + + nodeARootBefore := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + nodeBRootBefore := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + t.Logf("Node A root before sync: %x", nodeARootBefore) + t.Logf("Node B root before sync: %x", nodeBRootBefore) + require.NotEqual(t, nodeARootBefore, nodeBRootBefore, "roots should differ before sync") + + // Helper to set up gRPC server for a hypergraph + setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) { + const bufSize = 1 << 20 + lis := bufconn.Listen(bufSize) + + grpcServer := grpc.NewServer( + grpc.ChainStreamInterceptor(func( + srv interface{}, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + _, priv, _ := ed448.GenerateKey(rand.Reader) + privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv) + require.NoError(t, err) + + pub := privKey.GetPublic() + peerID, err := peer.IDFromPublicKey(pub) + require.NoError(t, err) + + return handler(srv, &serverStream{ + ServerStream: ss, + ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID), + }) + }), + ) + protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg) + + go func() { + _ = grpcServer.Serve(lis) + }() + + return lis, grpcServer + } + + dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) { + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + conn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(dialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + return conn, protobufs.NewHypergraphComparisonServiceClient(conn) + } + + // Step 1: Node A syncs from Node B (as server) + // Node A should receive Node B's 500 vertices + t.Log("Step 1: Node A syncs from Node B (B is server)") + nodeBHG.PublishSnapshot(nodeBRootBefore) + lisB, serverB := setupServer(nodeBHG) + defer serverB.Stop() + + connB, clientB := dialClient(lisB) + streamB, err := clientB.HyperStream(context.Background()) + require.NoError(t, err) + + _, err = nodeAHG.Sync( + streamB, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, + ) + require.NoError(t, err) + require.NoError(t, streamB.CloseSend()) + connB.Close() + + _, err = nodeAHG.Commit(2) + require.NoError(t, err) + + nodeARootAfterFirstSync := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + t.Logf("Node A root after syncing from B: %x", nodeARootAfterFirstSync) + + // Step 2: Node B syncs from Node A (as server) + // Node B should receive Node A's 500 vertices + t.Log("Step 2: Node B syncs from Node A (A is server)") + nodeAHG.PublishSnapshot(nodeARootAfterFirstSync) + lisA, serverA := setupServer(nodeAHG) + defer serverA.Stop() + + connA, clientA := dialClient(lisA) + streamA, err := clientA.HyperStream(context.Background()) + require.NoError(t, err) + + _, err = nodeBHG.Sync( + streamA, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, + ) + require.NoError(t, err) + require.NoError(t, streamA.CloseSend()) + connA.Close() + + _, err = nodeBHG.Commit(2) + require.NoError(t, err) + + // Verify both nodes have converged + nodeARootFinal := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + nodeBRootFinal := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + t.Logf("Node A final root: %x", nodeARootFinal) + t.Logf("Node B final root: %x", nodeBRootFinal) + + assert.Equal(t, nodeARootFinal, nodeBRootFinal, "both nodes should have identical roots after bidirectional sync") + + // Verify the tree contains all 1000 vertices + nodeATree := nodeAHG.GetVertexAddsSet(shardKey).GetTree() + nodeBTree := nodeBHG.GetVertexAddsSet(shardKey).GetTree() + + nodeALeaves := tries.GetAllLeaves( + nodeATree.SetType, + nodeATree.PhaseType, + nodeATree.ShardKey, + nodeATree.Root, + ) + nodeBLeaves := tries.GetAllLeaves( + nodeBTree.SetType, + nodeBTree.PhaseType, + nodeBTree.ShardKey, + nodeBTree.Root, + ) + + nodeALeafCount := 0 + for _, leaf := range nodeALeaves { + if leaf != nil { + nodeALeafCount++ + } + } + nodeBLeafCount := 0 + for _, leaf := range nodeBLeaves { + if leaf != nil { + nodeBLeafCount++ + } + } + + t.Logf("Node A has %d leaves, Node B has %d leaves", nodeALeafCount, nodeBLeafCount) + assert.Equal(t, totalVertices, nodeALeafCount, "Node A should have all 1000 vertices") + assert.Equal(t, totalVertices, nodeBLeafCount, "Node B should have all 1000 vertices") + + // Verify no differences between the trees + diffLeaves := tries.CompareLeaves(nodeATree, nodeBTree) + assert.Empty(t, diffLeaves, "there should be no differences between the trees") + + t.Log("Bidirectional sync test passed - both nodes have all 1000 vertices") +} + +// TestHypergraphSyncWithPrefixLengthMismatch tests sync behavior when one node +// has a deeper tree structure (longer prefix path) than the other. This tests +// the prefix length mismatch handling in the walk function. +// +// We create two nodes with different tree structures that will cause prefix +// length mismatches during sync. Node A has deeper prefixes at certain branches +// while Node B has shallower but wider structures. +func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) { + logger, _ := zap.NewDevelopment() + enc := verenc.NewMPCitHVerifiableEncryptor(1) + inclusionProver := bls48581.NewKZGInclusionProver(logger) + + // Create data trees + numTrees := 20 + dataTrees := make([]*tries.VectorCommitmentTree, numTrees) + for i := 0; i < numTrees; i++ { + dataTrees[i] = buildDataTree(t, inclusionProver) + } + + // Fixed domain (appAddress) - all vertices must share this to be in the same shard + fixedDomain := [32]byte{} + + // Helper to create a vertex with a specific dataAddress path suffix. + // The vertex ID is [appAddress (32 bytes) || dataAddress (32 bytes)]. + // The path is derived from the full 64-byte ID. + // With BranchBits=6, nibbles 0-41 come from appAddress, nibbles 42+ from dataAddress. + // Since all vertices share the same appAddress, their paths share the first 42 nibbles. + // Path differences come from dataAddress (nibbles 42+). + // + // We control the "suffix path" starting at nibble 42 by setting bits in dataAddress. + createVertexWithDataPath := func(suffixPath []int, uniqueSuffix uint64, treeIdx int) application.Vertex { + dataAddr := [32]byte{} + + // Pack the suffix path nibbles into bits of dataAddress + // Nibble 42 starts at bit 0 of dataAddress + bitPos := 0 + for _, nibble := range suffixPath { + byteIdx := bitPos / 8 + bitOffset := bitPos % 8 + + if bitOffset+6 <= 8 { + // Nibble fits in one byte + dataAddr[byteIdx] |= byte(nibble << (8 - bitOffset - 6)) + } else { + // Nibble spans two bytes + bitsInFirstByte := 8 - bitOffset + dataAddr[byteIdx] |= byte(nibble >> (6 - bitsInFirstByte)) + if byteIdx+1 < 32 { + dataAddr[byteIdx+1] |= byte(nibble << (8 - (6 - bitsInFirstByte))) + } + } + bitPos += 6 + } + + // Add unique suffix in the last 8 bytes to make each vertex distinct + binary.BigEndian.PutUint64(dataAddr[24:], uniqueSuffix) + + return hgcrdt.NewVertex( + fixedDomain, + dataAddr, + dataTrees[treeIdx].Commit(inclusionProver, false), + dataTrees[treeIdx].GetSize(), + ) + } + + // Run the test in both directions + runSyncTest := func(direction string) { + t.Run(direction, func(t *testing.T) { + // Create fresh databases for this sub-test + nodeADB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeA_%s/store", direction)}, 0) + defer nodeADB.Close() + + nodeBDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeB_%s/store", direction)}, 0) + defer nodeBDB.Close() + + nodeAStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeA_%s/store", direction)}, + nodeADB, + logger, + enc, + inclusionProver, + ) + + nodeBStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeB_%s/store", direction)}, + nodeBDB, + logger, + enc, + inclusionProver, + ) + + nodeAHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "nodeA-"+direction)), + nodeAStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + nodeBHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "nodeB-"+direction)), + nodeBStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + // Create vertices with specific path structures to cause prefix mismatches. + // All vertices share the same appAddress (fixedDomain), so they're in the same shard. + // Their paths share the first 42 nibbles (all zeros from fixedDomain). + // Path differences come from dataAddress, starting at nibble 42. + // + // We create vertices with suffix paths (nibbles 42+) that differ: + // Node A: suffix paths 0,1,x and 0,2,x and 1,x + // Node B: suffix paths 0,0,x and 0,1,x and 0,3,x and 1,x + // + // This creates prefix mismatch scenarios in the dataAddress portion of the tree. + + t.Log("Creating Node A structure") + nodeAVertices := []application.Vertex{ + createVertexWithDataPath([]int{0, 1}, 100, 0), // suffix path 0,1,... + createVertexWithDataPath([]int{0, 2}, 101, 1), // suffix path 0,2,... + createVertexWithDataPath([]int{1}, 102, 2), // suffix path 1,... + } + t.Logf("Created Node A vertices with suffix paths: 0,1; 0,2; 1") + + t.Log("Creating Node B structure") + nodeBVertices := []application.Vertex{ + createVertexWithDataPath([]int{0, 0}, 200, 3), // suffix path 0,0,... + createVertexWithDataPath([]int{0, 1}, 201, 4), // suffix path 0,1,... + createVertexWithDataPath([]int{0, 3}, 202, 5), // suffix path 0,3,... + createVertexWithDataPath([]int{1}, 203, 6), // suffix path 1,... + } + t.Logf("Created Node B vertices with suffix paths: 0,0; 0,1; 0,3; 1") + + // Verify the paths - show nibbles 40-50 where the difference should be + t.Log("Node A vertices paths (showing nibbles 40-50 where dataAddress starts):") + for i, v := range nodeAVertices { + id := v.GetID() + path := GetFullPath(id[:]) + // Nibble 42 is where dataAddress bits start (256/6 = 42.67) + start := 40 + end := min(50, len(path)) + if end > start { + t.Logf(" Vertex %d path[%d:%d]: %v", i, start, end, path[start:end]) + } + } + t.Log("Node B vertices paths (showing nibbles 40-50 where dataAddress starts):") + for i, v := range nodeBVertices { + id := v.GetID() + path := GetFullPath(id[:]) + start := 40 + end := min(50, len(path)) + if end > start { + t.Logf(" Vertex %d path[%d:%d]: %v", i, start, end, path[start:end]) + } + } + + shardKey := application.GetShardKey(nodeAVertices[0]) + + // Add vertices to Node A + nodeATxn, err := nodeAStore.NewTransaction(false) + require.NoError(t, err) + for i, v := range nodeAVertices { + id := v.GetID() + require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i])) + require.NoError(t, nodeAHG.AddVertex(nodeATxn, v)) + } + require.NoError(t, nodeATxn.Commit()) + + // Add vertices to Node B + nodeBTxn, err := nodeBStore.NewTransaction(false) + require.NoError(t, err) + for i, v := range nodeBVertices { + id := v.GetID() + require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[3+i])) + require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v)) + } + require.NoError(t, nodeBTxn.Commit()) + + // Commit both + _, err = nodeAHG.Commit(1) + require.NoError(t, err) + _, err = nodeBHG.Commit(1) + require.NoError(t, err) + + nodeARoot := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + nodeBRoot := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + t.Logf("Node A root: %x", nodeARoot) + t.Logf("Node B root: %x", nodeBRoot) + + // Setup gRPC server + const bufSize = 1 << 20 + setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) { + lis := bufconn.Listen(bufSize) + grpcServer := grpc.NewServer( + grpc.ChainStreamInterceptor(func( + srv interface{}, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + _, priv, _ := ed448.GenerateKey(rand.Reader) + privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv) + require.NoError(t, err) + pub := privKey.GetPublic() + peerID, err := peer.IDFromPublicKey(pub) + require.NoError(t, err) + return handler(srv, &serverStream{ + ServerStream: ss, + ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID), + }) + }), + ) + protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg) + go func() { _ = grpcServer.Serve(lis) }() + return lis, grpcServer + } + + dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) { + dialer := func(context.Context, string) (net.Conn, error) { return lis.Dial() } + conn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(dialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + return conn, protobufs.NewHypergraphComparisonServiceClient(conn) + } + + var serverHG, clientHG *hgcrdt.HypergraphCRDT + var serverRoot []byte + + if direction == "A_syncs_from_B" { + serverHG = nodeBHG + clientHG = nodeAHG + serverRoot = nodeBRoot + } else { + serverHG = nodeAHG + clientHG = nodeBHG + serverRoot = nodeARoot + } + + serverHG.PublishSnapshot(serverRoot) + lis, grpcServer := setupServer(serverHG) + defer grpcServer.Stop() + + // Count client leaves before sync + clientTreeBefore := clientHG.GetVertexAddsSet(shardKey).GetTree() + clientLeavesBefore := tries.GetAllLeaves( + clientTreeBefore.SetType, + clientTreeBefore.PhaseType, + clientTreeBefore.ShardKey, + clientTreeBefore.Root, + ) + clientLeafCountBefore := 0 + for _, leaf := range clientLeavesBefore { + if leaf != nil { + clientLeafCountBefore++ + } + } + t.Logf("Client has %d leaves before sync", clientLeafCountBefore) + + conn, client := dialClient(lis) + stream, err := client.HyperStream(context.Background()) + require.NoError(t, err) + + _, err = clientHG.Sync( + stream, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, + ) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + conn.Close() + + _, err = clientHG.Commit(2) + require.NoError(t, err) + + // In CRDT sync, the client receives data from the server and MERGES it. + // The client should now have BOTH its original vertices AND the server's vertices. + // So the client root should differ from both original roots (it's a superset). + clientRoot := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + t.Logf("Client root after sync: %x", clientRoot) + + // Get all leaves from the client tree after sync + clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree() + clientLeaves := tries.GetAllLeaves( + clientTree.SetType, + clientTree.PhaseType, + clientTree.ShardKey, + clientTree.Root, + ) + + clientLeafCount := 0 + for _, leaf := range clientLeaves { + if leaf != nil { + clientLeafCount++ + } + } + + // After sync, client should have received server's vertices (merged with its own) + // The client should have at least as many leaves as it started with + assert.GreaterOrEqual(t, clientLeafCount, clientLeafCountBefore, + "client should not lose leaves during sync") + + // Client should have gained some leaves from the server (unless they already had them all) + t.Logf("Sync %s completed - client went from %d to %d leaves", + direction, clientLeafCountBefore, clientLeafCount) + + // Verify the sync actually transferred data by checking that server's vertices are now in client + serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree() + serverLeaves := tries.GetAllLeaves( + serverTree.SetType, + serverTree.PhaseType, + serverTree.ShardKey, + serverTree.Root, + ) + serverLeafCount := 0 + for _, leaf := range serverLeaves { + if leaf != nil { + serverLeafCount++ + } + } + t.Logf("Server has %d leaves", serverLeafCount) + + // The client should have at least as many leaves as the server + // (since it's merging server data into its own) + assert.GreaterOrEqual(t, clientLeafCount, serverLeafCount, + "client should have at least as many leaves as server after sync") + }) + } + + // Test both directions + runSyncTest("A_syncs_from_B") + runSyncTest("B_syncs_from_A") +}