From 0adba482e730af0ee24845252b64d4db7af8e3f7 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Sun, 18 Jan 2026 07:09:38 -0600 Subject: [PATCH] square up the roots --- hypergraph/snapshot_manager.go | 39 +- node/rpc/hypergraph_sync_rpc_server_test.go | 745 ++++++++++++++++++++ node/store/pebble.go | 498 ++++++++----- 3 files changed, 1089 insertions(+), 193 deletions(-) diff --git a/hypergraph/snapshot_manager.go b/hypergraph/snapshot_manager.go index 269041d..6e0cf9d 100644 --- a/hypergraph/snapshot_manager.go +++ b/hypergraph/snapshot_manager.go @@ -277,23 +277,38 @@ func (m *snapshotManager) acquire( handle.acquire() return handle } - // Generation exists but no snapshot for this shard yet - reject - m.logger.Warn( - "generation matches expected root but no snapshot exists, rejecting sync request", + // Generation exists but no snapshot for this shard yet. + // Only create if this is the latest generation (current DB state matches). + if gen != m.generations[0] { + m.logger.Warn( + "generation matches expected root but is not latest, cannot create snapshot", + zap.String("expected_root", hex.EncodeToString(expectedRoot)), + ) + return nil + } + // Fall through to create snapshot for latest generation + m.logger.Debug( + "creating snapshot for expected root (latest generation)", zap.String("expected_root", hex.EncodeToString(expectedRoot)), ) - return nil + break } } - // No matching generation found - reject instead of falling back to latest - if m.logger != nil { - m.logger.Warn( - "no snapshot generation matches expected root, rejecting sync request", - zap.String("expected_root", hex.EncodeToString(expectedRoot)), - zap.String("latest_root", hex.EncodeToString(m.generations[0].root)), - ) + // If we didn't find a matching generation at all, reject + if len(m.generations) == 0 || !bytes.Equal(m.generations[0].root, expectedRoot) { + if m.logger != nil { + latestRoot := "" + if len(m.generations) > 0 { + latestRoot = hex.EncodeToString(m.generations[0].root) + } + m.logger.Warn( + "no snapshot generation matches expected root, rejecting sync request", + zap.String("expected_root", hex.EncodeToString(expectedRoot)), + zap.String("latest_root", latestRoot), + ) + } + return nil } - return nil } // Use the latest generation for new snapshots diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 9142a3c..2186b06 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -4,8 +4,11 @@ import ( "bytes" "context" "crypto/rand" + "crypto/sha256" "crypto/sha512" "encoding/binary" + "encoding/hex" + "encoding/json" "fmt" "log" "math/big" @@ -3157,6 +3160,223 @@ waitLoop: t.Logf("Hypersync complete: client synced %d prover vertices", clientLeafCount) assert.Greater(t, clientLeafCount, 0, "should have synced at least some prover vertices") + + // Verify the sync-based repair approach: + // 1. Create a second in-memory hypergraph + // 2. Sync from clientHG to the second hypergraph + // 3. Wipe the tree data from clientDB + // 4. Sync back from the second hypergraph to clientHG + // 5. Verify the root still matches + t.Log("Verifying sync-based repair approach...") + + // Create second in-memory hypergraph + repairDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_repair/store"}, 0) + defer repairDB.Close() + + repairStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_repair/store"}, + repairDB, + logger, + enc, + inclusionProver, + ) + + repairHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "repair")), + repairStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + // Get current root from clientHG before repair + clientRootBeforeRepair := clientHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(false) + t.Logf("Client root before repair: %x", clientRootBeforeRepair) + + // Publish snapshot on clientHG + clientHG.PublishSnapshot(clientRootBeforeRepair) + + // Set up gRPC server backed by clientHG + const repairBufSize = 1 << 20 + clientLis := bufconn.Listen(repairBufSize) + clientGRPCServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), + grpc.MaxSendMsgSize(100*1024*1024), + ) + protobufs.RegisterHypergraphComparisonServiceServer(clientGRPCServer, clientHG) + go func() { _ = clientGRPCServer.Serve(clientLis) }() + + // Dial clientHG + clientDialer := func(context.Context, string) (net.Conn, error) { + return clientLis.Dial() + } + clientRepairConn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(clientDialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), + grpc.MaxCallSendMsgSize(100*1024*1024), + ), + ) + require.NoError(t, err) + + clientRepairClient := protobufs.NewHypergraphComparisonServiceClient(clientRepairConn) + + // Sync from clientHG to repairHG for all phases + repairPhases := []protobufs.HypergraphPhaseSet{ + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES, + } + + t.Log("Syncing client -> repair hypergraph...") + for _, phase := range repairPhases { + stream, err := clientRepairClient.PerformSync(context.Background()) + require.NoError(t, err) + _, err = repairHG.SyncFrom(stream, proverShardKey, phase, nil) + if err != nil { + t.Logf("Sync client->repair phase %v: %v", phase, err) + } + _ = stream.CloseSend() + } + + // Verify repairHG has the data + repairRoot := repairHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(false) + t.Logf("Repair hypergraph root after sync: %x", repairRoot) + assert.Equal(t, clientRootBeforeRepair, repairRoot, "repair HG should match client root") + + // Stop client server before wiping + clientGRPCServer.Stop() + clientRepairConn.Close() + + // Wipe tree data from clientDB for the prover shard + t.Log("Wiping tree data from client DB...") + treePrefixes := []byte{ + store.VERTEX_ADDS_TREE_NODE, + store.VERTEX_REMOVES_TREE_NODE, + store.HYPEREDGE_ADDS_TREE_NODE, + store.HYPEREDGE_REMOVES_TREE_NODE, + store.VERTEX_ADDS_TREE_NODE_BY_PATH, + store.VERTEX_REMOVES_TREE_NODE_BY_PATH, + store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH, + store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH, + store.VERTEX_ADDS_CHANGE_RECORD, + store.VERTEX_REMOVES_CHANGE_RECORD, + store.HYPEREDGE_ADDS_CHANGE_RECORD, + store.HYPEREDGE_REMOVES_CHANGE_RECORD, + store.VERTEX_ADDS_TREE_ROOT, + store.VERTEX_REMOVES_TREE_ROOT, + store.HYPEREDGE_ADDS_TREE_ROOT, + store.HYPEREDGE_REMOVES_TREE_ROOT, + } + + shardKeyBytes := make([]byte, 0, len(proverShardKey.L1)+len(proverShardKey.L2)) + shardKeyBytes = append(shardKeyBytes, proverShardKey.L1[:]...) + shardKeyBytes = append(shardKeyBytes, proverShardKey.L2[:]...) + + for _, prefix := range treePrefixes { + start := append([]byte{store.HYPERGRAPH_SHARD, prefix}, shardKeyBytes...) + // Increment shard key for end bound + endShardKeyBytes := make([]byte, len(shardKeyBytes)) + copy(endShardKeyBytes, shardKeyBytes) + // Since all bytes of L2 are 0xff, incrementing would overflow, so use next prefix + end := []byte{store.HYPERGRAPH_SHARD, prefix + 1} + if err := clientDB.DeleteRange(start, end); err != nil { + t.Logf("DeleteRange for prefix 0x%02x: %v", prefix, err) + } + } + + // Reload clientHG after wipe + t.Log("Reloading client hypergraph after wipe...") + clientStore2 := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"}, + clientDB, + logger, + enc, + inclusionProver, + ) + clientHG2 := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "mainnet-client-reloaded")), + clientStore2, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + // Verify tree is now empty/different + clientRootAfterWipe := clientHG2.GetVertexAddsSet(proverShardKey).GetTree().Commit(false) + t.Logf("Client root after wipe: %x (expected nil or different)", clientRootAfterWipe) + + // Publish snapshot on repairHG for reverse sync + repairHG.PublishSnapshot(repairRoot) + + // Set up gRPC server backed by repairHG + repairLis := bufconn.Listen(repairBufSize) + repairGRPCServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), + grpc.MaxSendMsgSize(100*1024*1024), + ) + protobufs.RegisterHypergraphComparisonServiceServer(repairGRPCServer, repairHG) + go func() { _ = repairGRPCServer.Serve(repairLis) }() + defer repairGRPCServer.Stop() + + // Dial repairHG + repairDialer := func(context.Context, string) (net.Conn, error) { + return repairLis.Dial() + } + repairConn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(repairDialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), + grpc.MaxCallSendMsgSize(100*1024*1024), + ), + ) + require.NoError(t, err) + defer repairConn.Close() + + repairClient := protobufs.NewHypergraphComparisonServiceClient(repairConn) + + // Sync from repairHG to clientHG2 for all phases + t.Log("Syncing repair -> client hypergraph...") + for _, phase := range repairPhases { + stream, err := repairClient.PerformSync(context.Background()) + require.NoError(t, err) + _, err = clientHG2.SyncFrom(stream, proverShardKey, phase, nil) + if err != nil { + t.Logf("Sync repair->client phase %v: %v", phase, err) + } + _ = stream.CloseSend() + } + + // Commit and verify root after repair + clientRootAfterRepair := clientHG2.GetVertexAddsSet(proverShardKey).GetTree().Commit(true) + t.Logf("Client root after repair: %x", clientRootAfterRepair) + t.Logf("Expected root from frame: %x", expectedRoot) + + // Verify the root matches the original (before repair) - this confirms the round-trip works + assert.Equal(t, clientRootBeforeRepair, clientRootAfterRepair, + "root after sync repair should match root before repair") + + // Note: The root may not match the frame's expected root if there was corruption, + // but it should at least match what we synced before the repair. + // The actual fix for the frame mismatch requires fixing the corruption at the source. + t.Logf("Sync-based repair verification complete.") + t.Logf(" Original client root: %x", clientRootBeforeRepair) + t.Logf(" Repaired client root: %x", clientRootAfterRepair) + t.Logf(" Frame expected root: %x", expectedRoot) + if bytes.Equal(clientRootAfterRepair, expectedRoot) { + t.Log("SUCCESS: Repaired root matches frame expected root!") + } else { + t.Log("Note: Repaired root differs from frame expected root - corruption exists at source") + } } // TestHypergraphSyncWithPagination tests that syncing a large tree with >1000 leaves @@ -3376,3 +3596,528 @@ func TestHypergraphSyncWithPagination(t *testing.T) { assert.Equal(t, serverRoot, clientRoot, "client root should match server root after sync") t.Log("Pagination test passed - client converged to server state") } + +// dumpHypergraphShardKeys dumps all database keys matching the global prover shard pattern. +// This replicates the behavior of: dbscan -prefix 09 -search 000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff +// Parameters: +// - t: testing context for logging +// - db: the PebbleDB to inspect +// - label: a label to identify the database in output (e.g., "client", "server") +func dumpHypergraphShardKeys(t *testing.T, db *store.PebbleDB, label string) { + // Prefix 0x09 = HYPERGRAPH_SHARD + prefixFilter := []byte{store.HYPERGRAPH_SHARD} + + // Global prover shard key: L1=[0x00,0x00,0x00], L2=[0xff * 32] + // As hex: 000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff (35 bytes) + keySearchPattern, err := hex.DecodeString("000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + if err != nil { + t.Logf("[%s] Failed to decode search pattern: %v", label, err) + return + } + + // Set iteration bounds based on prefix + lowerBound := prefixFilter + upperBound := []byte{store.HYPERGRAPH_SHARD + 1} + + iter, err := db.NewIter(lowerBound, upperBound) + if err != nil { + t.Logf("[%s] Failed to create iterator: %v", label, err) + return + } + defer iter.Close() + + t.Logf("=== Database dump for %s (prefix=09, search=global prover shard) ===", label) + + count := 0 + for iter.First(); iter.Valid(); iter.Next() { + key := iter.Key() + value := iter.Value() + + // Apply prefix filter + if !bytes.HasPrefix(key, prefixFilter) { + continue + } + + // Apply key search pattern (must contain the global prover shard key bytes) + if !bytes.Contains(key, keySearchPattern) { + continue + } + + count++ + + // Decode and display the key/value + semantic := describeHypergraphKeyForTest(key) + decoded := decodeHypergraphValueForTest(key, value) + + t.Logf("[%s] key: %s", label, hex.EncodeToString(key)) + t.Logf("[%s] semantic: %s", label, semantic) + t.Logf("[%s] value:\n%s\n", label, indentForTest(decoded)) + } + + t.Logf("=== End dump for %s: %d keys matched ===", label, count) +} + +// describeHypergraphKeyForTest provides semantic description of hypergraph keys. +// Mirrors the logic from dbscan/main.go describeHypergraphKey. +func describeHypergraphKeyForTest(key []byte) string { + if len(key) < 2 { + return "hypergraph: invalid key length" + } + + // Check for shard commit keys (frame-based) + if len(key) >= 10 { + switch key[9] { + case store.HYPERGRAPH_VERTEX_ADDS_SHARD_COMMIT, + store.HYPERGRAPH_VERTEX_REMOVES_SHARD_COMMIT, + store.HYPERGRAPH_HYPEREDGE_ADDS_SHARD_COMMIT, + store.HYPERGRAPH_HYPEREDGE_REMOVES_SHARD_COMMIT: + frame := binary.BigEndian.Uint64(key[1:9]) + shard := key[10:] + var setPhase string + switch key[9] { + case store.HYPERGRAPH_VERTEX_ADDS_SHARD_COMMIT: + setPhase = "vertex-adds" + case store.HYPERGRAPH_VERTEX_REMOVES_SHARD_COMMIT: + setPhase = "vertex-removes" + case store.HYPERGRAPH_HYPEREDGE_ADDS_SHARD_COMMIT: + setPhase = "hyperedge-adds" + case store.HYPERGRAPH_HYPEREDGE_REMOVES_SHARD_COMMIT: + setPhase = "hyperedge-removes" + } + return fmt.Sprintf( + "hypergraph shard commit %s frame=%d shard=%s", + setPhase, + frame, + shortHexForTest(shard), + ) + } + } + + sub := key[1] + payload := key[2:] + switch sub { + case store.VERTEX_DATA: + return fmt.Sprintf("hypergraph vertex data id=%s", shortHexForTest(payload)) + case store.VERTEX_TOMBSTONE: + return fmt.Sprintf("hypergraph vertex tombstone id=%s", shortHexForTest(payload)) + case store.VERTEX_ADDS_TREE_NODE, + store.VERTEX_REMOVES_TREE_NODE, + store.HYPEREDGE_ADDS_TREE_NODE, + store.HYPEREDGE_REMOVES_TREE_NODE: + if len(payload) >= 35 { + l1 := payload[:3] + l2 := payload[3:35] + node := payload[35:] + return fmt.Sprintf( + "%s tree node shard=[%s|%s] node=%s", + describeHypergraphTreeTypeForTest(sub), + shortHexForTest(l1), + shortHexForTest(l2), + shortHexForTest(node), + ) + } + return fmt.Sprintf( + "%s tree node (invalid length)", + describeHypergraphTreeTypeForTest(sub), + ) + case store.VERTEX_ADDS_TREE_NODE_BY_PATH, + store.VERTEX_REMOVES_TREE_NODE_BY_PATH, + store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH, + store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH: + if len(payload) >= 35 { + l1 := payload[:3] + l2 := payload[3:35] + path := parseUint64PathForTest(payload[35:]) + return fmt.Sprintf( + "%s path shard=[%s|%s] path=%v", + describeHypergraphTreeTypeForTest(sub), + shortHexForTest(l1), + shortHexForTest(l2), + path, + ) + } + return fmt.Sprintf( + "%s path (invalid length)", + describeHypergraphTreeTypeForTest(sub), + ) + case store.VERTEX_ADDS_TREE_ROOT, + store.VERTEX_REMOVES_TREE_ROOT, + store.HYPEREDGE_ADDS_TREE_ROOT, + store.HYPEREDGE_REMOVES_TREE_ROOT: + if len(payload) >= 35 { + l1 := payload[:3] + l2 := payload[3:35] + return fmt.Sprintf( + "%s tree root shard=[%s|%s]", + describeHypergraphTreeTypeForTest(sub), + shortHexForTest(l1), + shortHexForTest(l2), + ) + } + return fmt.Sprintf( + "%s tree root (invalid length)", + describeHypergraphTreeTypeForTest(sub), + ) + case store.HYPERGRAPH_COVERED_PREFIX: + return "hypergraph covered prefix metadata" + case store.HYPERGRAPH_COMPLETE: + return "hypergraph completeness flag" + default: + return fmt.Sprintf( + "hypergraph unknown subtype 0x%02x raw=%s", + sub, + shortHexForTest(payload), + ) + } +} + +func describeHypergraphTreeTypeForTest(kind byte) string { + switch kind { + case store.VERTEX_ADDS_TREE_NODE, + store.VERTEX_ADDS_TREE_NODE_BY_PATH, + store.VERTEX_ADDS_TREE_ROOT: + return "vertex adds" + case store.VERTEX_REMOVES_TREE_NODE, + store.VERTEX_REMOVES_TREE_NODE_BY_PATH, + store.VERTEX_REMOVES_TREE_ROOT: + return "vertex removes" + case store.HYPEREDGE_ADDS_TREE_NODE, + store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH, + store.HYPEREDGE_ADDS_TREE_ROOT: + return "hyperedge adds" + case store.HYPEREDGE_REMOVES_TREE_NODE, + store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH, + store.HYPEREDGE_REMOVES_TREE_ROOT: + return "hyperedge removes" + default: + return "hypergraph" + } +} + +// decodeHypergraphValueForTest decodes hypergraph values for display. +// Mirrors the logic from dbscan/main.go decodeHypergraphValue. +func decodeHypergraphValueForTest(key []byte, value []byte) string { + if len(value) == 0 { + return "" + } + + sub := byte(0) + if len(key) > 1 { + sub = key[1] + } + + switch sub { + case store.VERTEX_DATA: + return summarizeVectorCommitmentTreeForTest(key, value) + case store.VERTEX_TOMBSTONE: + return shortHexForTest(value) + case store.VERTEX_ADDS_TREE_NODE, + store.VERTEX_REMOVES_TREE_NODE, + store.HYPEREDGE_ADDS_TREE_NODE, + store.HYPEREDGE_REMOVES_TREE_NODE, + store.VERTEX_ADDS_TREE_NODE_BY_PATH, + store.VERTEX_REMOVES_TREE_NODE_BY_PATH, + store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH, + store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH, + store.VERTEX_ADDS_TREE_ROOT, + store.VERTEX_REMOVES_TREE_ROOT, + store.HYPEREDGE_ADDS_TREE_ROOT, + store.HYPEREDGE_REMOVES_TREE_ROOT: + return summarizeHypergraphTreeNodeForTest(value) + case store.HYPERGRAPH_COVERED_PREFIX: + return decodeCoveredPrefixForTest(value) + case store.HYPERGRAPH_COMPLETE: + if len(value) == 0 { + return "complete=false" + } + return fmt.Sprintf("complete=%t", value[len(value)-1] != 0) + default: + return shortHexForTest(value) + } +} + +func summarizeVectorCommitmentTreeForTest(key []byte, value []byte) string { + tree, err := tries.DeserializeNonLazyTree(value) + if err != nil { + return fmt.Sprintf( + "vector_commitment_tree decode_error=%v raw=%s", + err, + shortHexForTest(value), + ) + } + + sum := sha256.Sum256(value) + summary := map[string]any{ + "size_bytes": len(value), + "sha256": shortHexForTest(sum[:]), + } + + // Check if this is a global intrinsic vertex (domain = 0xff*32) + globalIntrinsicAddress := bytes.Repeat([]byte{0xff}, 32) + if len(key) >= 66 { + domain := key[2:34] + address := key[34:66] + + if bytes.Equal(domain, globalIntrinsicAddress) { + // This is a global intrinsic vertex - decode the fields + globalData := decodeGlobalIntrinsicVertexForTest(tree, address) + if globalData != nil { + for k, v := range globalData { + summary[k] = v + } + } + } + } + + jsonBytes, err := json.MarshalIndent(summary, "", " ") + if err != nil { + return fmt.Sprintf("vector_commitment_tree size_bytes=%d", len(value)) + } + + return string(jsonBytes) +} + +func decodeGlobalIntrinsicVertexForTest(tree *tries.VectorCommitmentTree, address []byte) map[string]any { + result := make(map[string]any) + result["vertex_address"] = hex.EncodeToString(address) + + // Check order 0 field + order0Value, err := tree.Get([]byte{0x00}) + if err != nil || len(order0Value) == 0 { + result["type"] = "unknown (no order 0 field)" + return result + } + + switch len(order0Value) { + case 585: + // Prover: PublicKey is 585 bytes + result["type"] = "prover:Prover" + result["public_key"] = shortHexForTest(order0Value) + decodeProverFieldsForTest(tree, result) + case 32: + // Could be Allocation (Prover reference) or Reward (DelegateAddress) + joinFrame, _ := tree.Get([]byte{0x10}) + if len(joinFrame) == 8 { + result["type"] = "allocation:ProverAllocation" + result["prover_reference"] = hex.EncodeToString(order0Value) + decodeAllocationFieldsForTest(tree, result) + } else { + result["type"] = "reward:ProverReward" + result["delegate_address"] = hex.EncodeToString(order0Value) + } + default: + result["type"] = "unknown" + result["order_0_size"] = len(order0Value) + } + + return result +} + +func decodeProverFieldsForTest(tree *tries.VectorCommitmentTree, result map[string]any) { + if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 { + result["status"] = decodeProverStatusForTest(status[0]) + result["status_raw"] = status[0] + } + if storage, err := tree.Get([]byte{0x08}); err == nil && len(storage) == 8 { + result["available_storage"] = binary.BigEndian.Uint64(storage) + } + if seniority, err := tree.Get([]byte{0x0c}); err == nil && len(seniority) == 8 { + result["seniority"] = binary.BigEndian.Uint64(seniority) + } + if kickFrame, err := tree.Get([]byte{0x10}); err == nil && len(kickFrame) == 8 { + result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame) + } +} + +func decodeAllocationFieldsForTest(tree *tries.VectorCommitmentTree, result map[string]any) { + if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 { + result["status"] = decodeProverStatusForTest(status[0]) + result["status_raw"] = status[0] + } + if confirmFilter, err := tree.Get([]byte{0x08}); err == nil && len(confirmFilter) > 0 { + result["confirmation_filter"] = hex.EncodeToString(confirmFilter) + if bytes.Equal(confirmFilter, make([]byte, len(confirmFilter))) { + result["is_global_prover"] = true + } + } else { + result["is_global_prover"] = true + } + if joinFrame, err := tree.Get([]byte{0x10}); err == nil && len(joinFrame) == 8 { + result["join_frame_number"] = binary.BigEndian.Uint64(joinFrame) + } + if leaveFrame, err := tree.Get([]byte{0x14}); err == nil && len(leaveFrame) == 8 { + result["leave_frame_number"] = binary.BigEndian.Uint64(leaveFrame) + } + if lastActive, err := tree.Get([]byte{0x34}); err == nil && len(lastActive) == 8 { + result["last_active_frame_number"] = binary.BigEndian.Uint64(lastActive) + } +} + +func decodeProverStatusForTest(status byte) string { + switch status { + case 0: + return "Joining" + case 1: + return "Active" + case 2: + return "Paused" + case 3: + return "Leaving" + case 4: + return "Rejected" + case 5: + return "Kicked" + default: + return fmt.Sprintf("Unknown(%d)", status) + } +} + +func summarizeHypergraphTreeNodeForTest(value []byte) string { + if len(value) == 0 { + return "hypergraph_tree_node " + } + + hash := sha256.Sum256(value) + hashStr := shortHexForTest(hash[:]) + + reader := bytes.NewReader(value) + var nodeType byte + if err := binary.Read(reader, binary.BigEndian, &nodeType); err != nil { + return fmt.Sprintf("tree_node decode_error=%v sha256=%s", err, hashStr) + } + + switch nodeType { + case tries.TypeNil: + return fmt.Sprintf("tree_nil sha256=%s", hashStr) + case tries.TypeLeaf: + leaf, err := tries.DeserializeLeafNode(nil, reader) + if err != nil { + return fmt.Sprintf("tree_leaf decode_error=%v sha256=%s", err, hashStr) + } + + summary := map[string]any{ + "type": "leaf", + "key": shortHexForTest(leaf.Key), + "value": shortHexForTest(leaf.Value), + "hash_target": shortHexForTest(leaf.HashTarget), + "commitment": shortHexForTest(leaf.Commitment), + "bytes_sha256": hashStr, + } + if leaf.Size != nil { + summary["size"] = leaf.Size.String() + } + + jsonBytes, err := json.MarshalIndent(summary, "", " ") + if err != nil { + return fmt.Sprintf( + "tree_leaf key=%s sha256=%s", + shortHexForTest(leaf.Key), + hashStr, + ) + } + return string(jsonBytes) + case tries.TypeBranch: + branch, err := tries.DeserializeBranchNode(nil, reader, true) + if err != nil { + return fmt.Sprintf("tree_branch decode_error=%v sha256=%s", err, hashStr) + } + + childSummary := map[string]int{ + "branch": 0, + "leaf": 0, + "nil": 0, + } + for _, child := range branch.Children { + switch child.(type) { + case *tries.LazyVectorCommitmentBranchNode: + childSummary["branch"]++ + case *tries.LazyVectorCommitmentLeafNode: + childSummary["leaf"]++ + default: + childSummary["nil"]++ + } + } + + summary := map[string]any{ + "type": "branch", + "prefix": branch.Prefix, + "leaf_count": branch.LeafCount, + "longest_branch": branch.LongestBranch, + "commitment": shortHexForTest(branch.Commitment), + "children": childSummary, + "bytes_sha256": hashStr, + } + if branch.Size != nil { + summary["size"] = branch.Size.String() + } + + jsonBytes, err := json.MarshalIndent(summary, "", " ") + if err != nil { + return fmt.Sprintf( + "tree_branch prefix=%v leafs=%d sha256=%s", + branch.Prefix, + branch.LeafCount, + hashStr, + ) + } + return string(jsonBytes) + default: + return fmt.Sprintf( + "tree_node type=0x%02x payload=%s sha256=%s", + nodeType, + shortHexForTest(value[1:]), + hashStr, + ) + } +} + +func decodeCoveredPrefixForTest(value []byte) string { + if len(value)%8 != 0 { + return shortHexForTest(value) + } + + result := make([]int64, len(value)/8) + for i := range result { + result[i] = int64(binary.BigEndian.Uint64(value[i*8 : (i+1)*8])) + } + + return fmt.Sprintf("covered_prefix=%v", result) +} + +func shortHexForTest(b []byte) string { + if len(b) == 0 { + return "0x" + } + if len(b) <= 16 { + return "0x" + hex.EncodeToString(b) + } + return fmt.Sprintf( + "0x%s...%s(len=%d)", + hex.EncodeToString(b[:8]), + hex.EncodeToString(b[len(b)-8:]), + len(b), + ) +} + +func parseUint64PathForTest(b []byte) []uint64 { + if len(b)%8 != 0 { + return nil + } + + out := make([]uint64, len(b)/8) + for i := range out { + out[i] = binary.BigEndian.Uint64(b[i*8 : (i+1)*8]) + } + return out +} + +func indentForTest(value string) string { + if value == "" { + return "" + } + lines := bytes.Split([]byte(value), []byte("\n")) + for i, line := range lines { + lines[i] = append([]byte(" "), line...) + } + return string(bytes.Join(lines, []byte("\n"))) +} diff --git a/node/store/pebble.go b/node/store/pebble.go index 7bbe309..ffff106 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -7,8 +7,8 @@ import ( "encoding/hex" "fmt" "io" + "net" "os" - "slices" "strings" pebblev1 "github.com/cockroachdb/pebble" @@ -16,9 +16,13 @@ import ( "github.com/cockroachdb/pebble/v2/vfs" "github.com/pkg/errors" "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" "source.quilibrium.com/quilibrium/monorepo/bls48581" "source.quilibrium.com/quilibrium/monorepo/config" - "source.quilibrium.com/quilibrium/monorepo/types/hypergraph" + hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph" + "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/store" "source.quilibrium.com/quilibrium/monorepo/types/tries" ) @@ -34,7 +38,7 @@ func (p *PebbleDB) DB() *pebble.DB { // pebbleMigrations contains ordered migration steps. New migrations append to // the end. -var pebbleMigrations = []func(*pebble.Batch, *pebble.DB) error{ +var pebbleMigrations = []func(*pebble.Batch, *pebble.DB, *config.DBConfig) error{ migration_2_1_0_4, migration_2_1_0_5, migration_2_1_0_8, @@ -86,6 +90,8 @@ var pebbleMigrations = []func(*pebble.Batch, *pebble.DB) error{ migration_2_1_0_1814, migration_2_1_0_1815, migration_2_1_0_1816, + migration_2_1_0_1817, + migration_2_1_0_1818, } func NewPebbleDB( @@ -304,7 +310,7 @@ func (p *PebbleDB) migrate(logger *zap.Logger) error { zap.Int("from_version", int(storedVersion)), zap.Int("to_version", int(storedVersion+1)), ) - if err := pebbleMigrations[i](batch, p.db); err != nil { + if err := pebbleMigrations[i](batch, p.db, p.config); err != nil { batch.Close() logger.Error("migration failed", zap.Error(err)) return errors.Wrapf(err, "apply migration %d", i+1) @@ -482,7 +488,7 @@ func rightAlign(data []byte, size int) []byte { // Resolves all the variations of store issues from any series of upgrade steps // in 2.1.0.1->2.1.0.3 -func migration_2_1_0_4(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_4(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { // batches don't use this but for backcompat the parameter is required wo := &pebble.WriteOptions{} @@ -583,138 +589,138 @@ func migration_2_1_0_4(b *pebble.Batch, db *pebble.DB) error { return nil } -func migration_2_1_0_5(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_5(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { // We just re-run it again - return migration_2_1_0_4(b, db) + return migration_2_1_0_4(b, db, config) } -func migration_2_1_0_8(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_8(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { // these migration entries exist solely to advance migration number so all // nodes are consistent return nil } -func migration_2_1_0_81(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_81(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { // these migration entries exist solely to advance migration number so all // nodes are consistent return nil } -func migration_2_1_0_10(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_10(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { // these migration entries exist solely to advance migration number so all // nodes are consistent return nil } -func migration_2_1_0_11(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_11(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_14(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_14(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_141(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_14(b, db) +func migration_2_1_0_141(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_14(b, db, config) } -func migration_2_1_0_142(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_14(b, db) +func migration_2_1_0_142(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_14(b, db, config) } -func migration_2_1_0_143(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_14(b, db) +func migration_2_1_0_143(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_14(b, db, config) } -func migration_2_1_0_144(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_14(b, db) +func migration_2_1_0_144(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_14(b, db, config) } -func migration_2_1_0_145(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_14(b, db) +func migration_2_1_0_145(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_14(b, db, config) } -func migration_2_1_0_146(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_14(b, db) +func migration_2_1_0_146(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_14(b, db, config) } -func migration_2_1_0_147(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_14(b, db) +func migration_2_1_0_147(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_14(b, db, config) } -func migration_2_1_0_148(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_14(b, db) +func migration_2_1_0_148(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_14(b, db, config) } -func migration_2_1_0_149(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_149(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_1410(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_149(b, db) +func migration_2_1_0_1410(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_149(b, db, config) } -func migration_2_1_0_1411(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_149(b, db) +func migration_2_1_0_1411(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_149(b, db, config) } -func migration_2_1_0_15(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_15(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_151(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_151(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_152(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_152(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_153(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_153(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_154(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_154(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_155(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_155(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_156(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_156(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_157(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_157(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_158(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_158(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_159(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_15(b, db) +func migration_2_1_0_159(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_15(b, db, config) } -func migration_2_1_0_17(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_17(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_171(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_171(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_172(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_172(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_173(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_173(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_18(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_18(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { // Global shard key: L1={0,0,0}, L2=0xff*32 globalShardKey := tries.ShardKey{ L1: [3]byte{}, @@ -795,76 +801,69 @@ func migration_2_1_0_18(b *pebble.Batch, db *pebble.DB) error { return nil } -func migration_2_1_0_181(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_18(b, db) +func migration_2_1_0_181(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_18(b, db, config) } -func migration_2_1_0_182(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_182(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_183(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_183(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_184(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_184(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_185(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_185(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_186(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_186(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_187(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_187(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_188(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_188(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_189(b *pebble.Batch, db *pebble.DB) error { +func migration_2_1_0_189(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { return nil } -func migration_2_1_0_1810(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_189(b, db) +func migration_2_1_0_1810(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_189(b, db, config) } -func migration_2_1_0_1811(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_189(b, db) +func migration_2_1_0_1811(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_189(b, db, config) } -func migration_2_1_0_1812(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_189(b, db) +func migration_2_1_0_1812(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_189(b, db, config) } -func migration_2_1_0_1813(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_189(b, db) +func migration_2_1_0_1813(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_189(b, db, config) } -func migration_2_1_0_1814(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_189(b, db) +func migration_2_1_0_1814(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_189(b, db, config) } -func migration_2_1_0_1815(b *pebble.Batch, db *pebble.DB) error { - return migration_2_1_0_189(b, db) +func migration_2_1_0_1815(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_189(b, db, config) } // migration_2_1_0_1816 recalculates commitments for the global prover trees // to fix potential corruption from earlier versions of sync. -func migration_2_1_0_1816(b *pebble.Batch, db *pebble.DB) error { - // Check if already done - doneKey := []byte{HYPERGRAPH_SHARD, HYPERGRAPH_GLOBAL_PROVER_RECALC_DONE} - if _, closer, err := b.Get(doneKey); err == nil { - closer.Close() - return nil // Already done - } - +func migration_2_1_0_1816(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { // Global prover shard key: L1={0,0,0}, L2=0xff*32 globalShardKey := tries.ShardKey{ L1: [3]byte{}, @@ -879,112 +878,249 @@ func migration_2_1_0_1816(b *pebble.Batch, db *pebble.DB) error { // Initialize prover (logger can be nil for migrations) prover := bls48581.NewKZGInclusionProver(nil) - // Create hypergraph store using the batch - hgStore := NewPebbleHypergraphStore(nil, &PebbleDB{db: db}, nil, nil, prover) + // Create hypergraph store using the batch wrapper + hgStore := NewPebbleHypergraphStore(nil, &PebbleDB{db: db}, zap.L(), nil, prover) - // Load and recalculate each tree for the global prover shard - treeTypes := []struct { - setType string - phaseType string - rootKey func(tries.ShardKey) []byte - }{ - { - string(hypergraph.VertexAtomType), - string(hypergraph.AddsPhaseType), - hypergraphVertexAddsTreeRootKey, - }, - { - string(hypergraph.VertexAtomType), - string(hypergraph.RemovesPhaseType), - hypergraphVertexRemovesTreeRootKey, - }, - { - string(hypergraph.HyperedgeAtomType), - string(hypergraph.AddsPhaseType), - hypergraphHyperedgeAddsTreeRootKey, - }, - { - string(hypergraph.HyperedgeAtomType), - string(hypergraph.RemovesPhaseType), - hypergraphHyperedgeRemovesTreeRootKey, + hg, err := hgStore.LoadHypergraph(nil, 0) + if err != nil { + return err + } + + hgc := hg.(*hgcrdt.HypergraphCRDT) + hgc.GetVertexAddsSet(globalShardKey).GetTree().Commit(true) + + return nil +} + +func migration_2_1_0_1817(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return migration_2_1_0_1816(b, db, config) +} + +// migration_2_1_0_1818 repairs corrupted global prover shard tree data by: +// 1. Creating an in-memory hypergraph instance +// 2. Setting up a local gRPC sync server backed by the actual DB hypergraph +// 3. Syncing from the actual DB to the in-memory instance via the sync protocol +// 4. Wiping all tree data for the global prover shard from the actual DB +// 5. Setting up a local gRPC sync server backed by the in-memory hypergraph +// 6. Syncing from the in-memory instance back to the actual DB hypergraph +func migration_2_1_0_1818(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error { + return doMigration1818(db, config) +} + +// doMigration1818 performs the actual migration work for migration_2_1_0_1818. +// It uses the sync protocol to repair corrupted tree data by syncing to an +// in-memory instance and back. +func doMigration1818(db *pebble.DB, config *config.DBConfig) error { + logger := zap.L() + + // Global prover shard key: L1={0,0,0}, L2=0xff*32 + globalShardKey := tries.ShardKey{ + L1: [3]byte{}, + L2: [32]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, }, } - for _, tt := range treeTypes { - rootData, closer, err := b.Get(tt.rootKey(globalShardKey)) + prover := bls48581.NewKZGInclusionProver(logger) + + // Create hypergraph from actual DB + actualDBWrapper := &PebbleDB{db: db} + actualStore := NewPebbleHypergraphStore(config, actualDBWrapper, logger, nil, prover) + + actualHG, err := actualStore.LoadHypergraph(nil, 0) + if err != nil { + return errors.Wrap(err, "load actual hypergraph") + } + actualHGCRDT := actualHG.(*hgcrdt.HypergraphCRDT) + + // Create in-memory pebble DB directly (bypassing NewPebbleDB to avoid cycle) + memOpts := &pebble.Options{ + MemTableSize: 64 << 20, + FormatMajorVersion: pebble.FormatNewest, + FS: vfs.NewMem(), + } + memDB, err := pebble.Open("", memOpts) + if err != nil { + return errors.Wrap(err, "open in-memory pebble") + } + defer memDB.Close() + + memDBWrapper := &PebbleDB{db: memDB} + memStore := NewPebbleHypergraphStore(config, memDBWrapper, logger, nil, prover) + memHG, err := memStore.LoadHypergraph(nil, 0) + if err != nil { + return errors.Wrap(err, "load in-memory hypergraph") + } + memHGCRDT := memHG.(*hgcrdt.HypergraphCRDT) + + // Phase 1: Sync from actual DB to in-memory + // Get the current root from actual DB + actualRoot := actualHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(false) + if actualRoot == nil { + logger.Info("migration 1818: no data in global prover shard, skipping") + return nil + } + + // Publish snapshot on actual hypergraph + actualHGCRDT.PublishSnapshot(actualRoot) + + // Set up gRPC server backed by actual hypergraph + const bufSize = 1 << 20 + actualLis := bufconn.Listen(bufSize) + actualGRPCServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), + grpc.MaxSendMsgSize(100*1024*1024), + ) + protobufs.RegisterHypergraphComparisonServiceServer(actualGRPCServer, actualHGCRDT) + go func() { _ = actualGRPCServer.Serve(actualLis) }() + defer actualGRPCServer.Stop() + + // Create client connection to actual hypergraph server + actualDialer := func(context.Context, string) (net.Conn, error) { + return actualLis.Dial() + } + actualConn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(actualDialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), + grpc.MaxCallSendMsgSize(100*1024*1024), + ), + ) + if err != nil { + return errors.Wrap(err, "dial actual hypergraph") + } + defer actualConn.Close() + + actualClient := protobufs.NewHypergraphComparisonServiceClient(actualConn) + + // Sync from actual to in-memory for all phases + phases := []protobufs.HypergraphPhaseSet{ + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES, + } + + for _, phase := range phases { + stream, err := actualClient.PerformSync(context.Background()) if err != nil { - // No root for this tree, skip - continue + return errors.Wrapf(err, "create sync stream for phase %v", phase) } - data := slices.Clone(rootData) - closer.Close() - - if len(data) == 0 { - continue - } - - var node tries.LazyVectorCommitmentNode - switch data[0] { - case tries.TypeLeaf: - node, err = tries.DeserializeLeafNode(hgStore, bytes.NewReader(data[1:])) - case tries.TypeBranch: - pathLength := binary.BigEndian.Uint32(data[1:5]) - node, err = tries.DeserializeBranchNode( - hgStore, - bytes.NewReader(data[5+(pathLength*4):]), - false, - ) - if err != nil { - return errors.Wrapf( - err, - "deserialize %s %s branch", - tt.setType, - tt.phaseType, - ) - } - - fullPrefix := []int{} - for i := range pathLength { - fullPrefix = append( - fullPrefix, - int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), - ) - } - branch := node.(*tries.LazyVectorCommitmentBranchNode) - branch.FullPrefix = fullPrefix - default: - continue // Unknown type, skip - } - + _, err = memHGCRDT.SyncFrom(stream, globalShardKey, phase, nil) if err != nil { - return errors.Wrapf( - err, - "deserialize %s %s root", - tt.setType, - tt.phaseType, - ) + logger.Warn("sync from actual to memory failed", zap.Error(err), zap.Any("phase", phase)) } - - // Create tree and force recalculation - tree := &tries.LazyVectorCommitmentTree{ - Root: node, - SetType: tt.setType, - PhaseType: tt.phaseType, - ShardKey: globalShardKey, - Store: hgStore, - CoveredPrefix: nil, - InclusionProver: prover, - } - - // Force full recalculation of commitments - tree.Commit(true) + _ = stream.CloseSend() } - // Mark migration as done - if err := b.Set(doneKey, []byte{0x01}, &pebble.WriteOptions{}); err != nil { - return errors.Wrap(err, "mark global prover recalc done") + // Commit in-memory to get root + memRoot := memHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(false) + logger.Info("migration 1818: synced to in-memory", + zap.String("actual_root", hex.EncodeToString(actualRoot)), + zap.String("mem_root", hex.EncodeToString(memRoot)), + ) + + // Stop the actual server before wiping data + actualGRPCServer.Stop() + actualConn.Close() + + // Phase 2: Wipe tree data for global prover shard from actual DB + treePrefixes := []byte{ + VERTEX_ADDS_TREE_NODE, + VERTEX_REMOVES_TREE_NODE, + HYPEREDGE_ADDS_TREE_NODE, + HYPEREDGE_REMOVES_TREE_NODE, + VERTEX_ADDS_TREE_NODE_BY_PATH, + VERTEX_REMOVES_TREE_NODE_BY_PATH, + HYPEREDGE_ADDS_TREE_NODE_BY_PATH, + HYPEREDGE_REMOVES_TREE_NODE_BY_PATH, + VERTEX_ADDS_CHANGE_RECORD, + VERTEX_REMOVES_CHANGE_RECORD, + HYPEREDGE_ADDS_CHANGE_RECORD, + HYPEREDGE_REMOVES_CHANGE_RECORD, + VERTEX_ADDS_TREE_ROOT, + VERTEX_REMOVES_TREE_ROOT, + HYPEREDGE_ADDS_TREE_ROOT, + HYPEREDGE_REMOVES_TREE_ROOT, } + for _, prefix := range treePrefixes { + start, end := shardRangeBounds(prefix, globalShardKey) + if err := db.DeleteRange(start, end, &pebble.WriteOptions{Sync: true}); err != nil { + return errors.Wrapf(err, "delete range for prefix 0x%02x", prefix) + } + } + + logger.Info("migration 1818: wiped tree data from actual DB") + + // Reload actual hypergraph after wipe + actualStore2 := NewPebbleHypergraphStore(config, actualDBWrapper, logger, nil, prover) + actualHG2, err := actualStore2.LoadHypergraph(nil, 0) + if err != nil { + return errors.Wrap(err, "reload actual hypergraph after wipe") + } + actualHGCRDT2 := actualHG2.(*hgcrdt.HypergraphCRDT) + + // Phase 3: Sync from in-memory back to actual DB + // Publish snapshot on in-memory hypergraph + memHGCRDT.PublishSnapshot(memRoot) + + // Set up gRPC server backed by in-memory hypergraph + memLis := bufconn.Listen(bufSize) + memGRPCServer := grpc.NewServer( + grpc.MaxRecvMsgSize(100*1024*1024), + grpc.MaxSendMsgSize(100*1024*1024), + ) + protobufs.RegisterHypergraphComparisonServiceServer(memGRPCServer, memHGCRDT) + go func() { _ = memGRPCServer.Serve(memLis) }() + defer memGRPCServer.Stop() + + // Create client connection to in-memory hypergraph server + memDialer := func(context.Context, string) (net.Conn, error) { + return memLis.Dial() + } + memConn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(memDialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(100*1024*1024), + grpc.MaxCallSendMsgSize(100*1024*1024), + ), + ) + if err != nil { + return errors.Wrap(err, "dial in-memory hypergraph") + } + defer memConn.Close() + + memClient := protobufs.NewHypergraphComparisonServiceClient(memConn) + + // Sync from in-memory to actual for all phases + for _, phase := range phases { + stream, err := memClient.PerformSync(context.Background()) + if err != nil { + return errors.Wrapf(err, "create sync stream for phase %v (reverse)", phase) + } + _, err = actualHGCRDT2.SyncFrom(stream, globalShardKey, phase, nil) + if err != nil { + logger.Warn("sync from memory to actual failed", zap.Error(err), zap.Any("phase", phase)) + } + _ = stream.CloseSend() + } + + // Final commit + finalRoot := actualHGCRDT2.GetVertexAddsSet(globalShardKey).GetTree().Commit(true) + logger.Info("migration 1818: completed", + zap.String("final_root", hex.EncodeToString(finalRoot)), + ) + return nil }