square up the roots

This commit is contained in:
Cassandra Heart 2026-01-18 07:09:38 -06:00
parent f29cc367c5
commit 0adba482e7
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
3 changed files with 1089 additions and 193 deletions

View File

@ -277,23 +277,38 @@ func (m *snapshotManager) acquire(
handle.acquire()
return handle
}
// Generation exists but no snapshot for this shard yet - reject
m.logger.Warn(
"generation matches expected root but no snapshot exists, rejecting sync request",
// Generation exists but no snapshot for this shard yet.
// Only create if this is the latest generation (current DB state matches).
if gen != m.generations[0] {
m.logger.Warn(
"generation matches expected root but is not latest, cannot create snapshot",
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
)
return nil
}
// Fall through to create snapshot for latest generation
m.logger.Debug(
"creating snapshot for expected root (latest generation)",
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
)
return nil
break
}
}
// No matching generation found - reject instead of falling back to latest
if m.logger != nil {
m.logger.Warn(
"no snapshot generation matches expected root, rejecting sync request",
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
zap.String("latest_root", hex.EncodeToString(m.generations[0].root)),
)
// If we didn't find a matching generation at all, reject
if len(m.generations) == 0 || !bytes.Equal(m.generations[0].root, expectedRoot) {
if m.logger != nil {
latestRoot := ""
if len(m.generations) > 0 {
latestRoot = hex.EncodeToString(m.generations[0].root)
}
m.logger.Warn(
"no snapshot generation matches expected root, rejecting sync request",
zap.String("expected_root", hex.EncodeToString(expectedRoot)),
zap.String("latest_root", latestRoot),
)
}
return nil
}
return nil
}
// Use the latest generation for new snapshots

View File

@ -4,8 +4,11 @@ import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"crypto/sha512"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"math/big"
@ -3157,6 +3160,223 @@ waitLoop:
t.Logf("Hypersync complete: client synced %d prover vertices", clientLeafCount)
assert.Greater(t, clientLeafCount, 0, "should have synced at least some prover vertices")
// Verify the sync-based repair approach:
// 1. Create a second in-memory hypergraph
// 2. Sync from clientHG to the second hypergraph
// 3. Wipe the tree data from clientDB
// 4. Sync back from the second hypergraph to clientHG
// 5. Verify the root still matches
t.Log("Verifying sync-based repair approach...")
// Create second in-memory hypergraph
repairDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_repair/store"}, 0)
defer repairDB.Close()
repairStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_repair/store"},
repairDB,
logger,
enc,
inclusionProver,
)
repairHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "repair")),
repairStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Get current root from clientHG before repair
clientRootBeforeRepair := clientHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(false)
t.Logf("Client root before repair: %x", clientRootBeforeRepair)
// Publish snapshot on clientHG
clientHG.PublishSnapshot(clientRootBeforeRepair)
// Set up gRPC server backed by clientHG
const repairBufSize = 1 << 20
clientLis := bufconn.Listen(repairBufSize)
clientGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(clientGRPCServer, clientHG)
go func() { _ = clientGRPCServer.Serve(clientLis) }()
// Dial clientHG
clientDialer := func(context.Context, string) (net.Conn, error) {
return clientLis.Dial()
}
clientRepairConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(clientDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
require.NoError(t, err)
clientRepairClient := protobufs.NewHypergraphComparisonServiceClient(clientRepairConn)
// Sync from clientHG to repairHG for all phases
repairPhases := []protobufs.HypergraphPhaseSet{
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES,
}
t.Log("Syncing client -> repair hypergraph...")
for _, phase := range repairPhases {
stream, err := clientRepairClient.PerformSync(context.Background())
require.NoError(t, err)
_, err = repairHG.SyncFrom(stream, proverShardKey, phase, nil)
if err != nil {
t.Logf("Sync client->repair phase %v: %v", phase, err)
}
_ = stream.CloseSend()
}
// Verify repairHG has the data
repairRoot := repairHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(false)
t.Logf("Repair hypergraph root after sync: %x", repairRoot)
assert.Equal(t, clientRootBeforeRepair, repairRoot, "repair HG should match client root")
// Stop client server before wiping
clientGRPCServer.Stop()
clientRepairConn.Close()
// Wipe tree data from clientDB for the prover shard
t.Log("Wiping tree data from client DB...")
treePrefixes := []byte{
store.VERTEX_ADDS_TREE_NODE,
store.VERTEX_REMOVES_TREE_NODE,
store.HYPEREDGE_ADDS_TREE_NODE,
store.HYPEREDGE_REMOVES_TREE_NODE,
store.VERTEX_ADDS_TREE_NODE_BY_PATH,
store.VERTEX_REMOVES_TREE_NODE_BY_PATH,
store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH,
store.VERTEX_ADDS_CHANGE_RECORD,
store.VERTEX_REMOVES_CHANGE_RECORD,
store.HYPEREDGE_ADDS_CHANGE_RECORD,
store.HYPEREDGE_REMOVES_CHANGE_RECORD,
store.VERTEX_ADDS_TREE_ROOT,
store.VERTEX_REMOVES_TREE_ROOT,
store.HYPEREDGE_ADDS_TREE_ROOT,
store.HYPEREDGE_REMOVES_TREE_ROOT,
}
shardKeyBytes := make([]byte, 0, len(proverShardKey.L1)+len(proverShardKey.L2))
shardKeyBytes = append(shardKeyBytes, proverShardKey.L1[:]...)
shardKeyBytes = append(shardKeyBytes, proverShardKey.L2[:]...)
for _, prefix := range treePrefixes {
start := append([]byte{store.HYPERGRAPH_SHARD, prefix}, shardKeyBytes...)
// Increment shard key for end bound
endShardKeyBytes := make([]byte, len(shardKeyBytes))
copy(endShardKeyBytes, shardKeyBytes)
// Since all bytes of L2 are 0xff, incrementing would overflow, so use next prefix
end := []byte{store.HYPERGRAPH_SHARD, prefix + 1}
if err := clientDB.DeleteRange(start, end); err != nil {
t.Logf("DeleteRange for prefix 0x%02x: %v", prefix, err)
}
}
// Reload clientHG after wipe
t.Log("Reloading client hypergraph after wipe...")
clientStore2 := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"},
clientDB,
logger,
enc,
inclusionProver,
)
clientHG2 := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "mainnet-client-reloaded")),
clientStore2,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Verify tree is now empty/different
clientRootAfterWipe := clientHG2.GetVertexAddsSet(proverShardKey).GetTree().Commit(false)
t.Logf("Client root after wipe: %x (expected nil or different)", clientRootAfterWipe)
// Publish snapshot on repairHG for reverse sync
repairHG.PublishSnapshot(repairRoot)
// Set up gRPC server backed by repairHG
repairLis := bufconn.Listen(repairBufSize)
repairGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(repairGRPCServer, repairHG)
go func() { _ = repairGRPCServer.Serve(repairLis) }()
defer repairGRPCServer.Stop()
// Dial repairHG
repairDialer := func(context.Context, string) (net.Conn, error) {
return repairLis.Dial()
}
repairConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(repairDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
require.NoError(t, err)
defer repairConn.Close()
repairClient := protobufs.NewHypergraphComparisonServiceClient(repairConn)
// Sync from repairHG to clientHG2 for all phases
t.Log("Syncing repair -> client hypergraph...")
for _, phase := range repairPhases {
stream, err := repairClient.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG2.SyncFrom(stream, proverShardKey, phase, nil)
if err != nil {
t.Logf("Sync repair->client phase %v: %v", phase, err)
}
_ = stream.CloseSend()
}
// Commit and verify root after repair
clientRootAfterRepair := clientHG2.GetVertexAddsSet(proverShardKey).GetTree().Commit(true)
t.Logf("Client root after repair: %x", clientRootAfterRepair)
t.Logf("Expected root from frame: %x", expectedRoot)
// Verify the root matches the original (before repair) - this confirms the round-trip works
assert.Equal(t, clientRootBeforeRepair, clientRootAfterRepair,
"root after sync repair should match root before repair")
// Note: The root may not match the frame's expected root if there was corruption,
// but it should at least match what we synced before the repair.
// The actual fix for the frame mismatch requires fixing the corruption at the source.
t.Logf("Sync-based repair verification complete.")
t.Logf(" Original client root: %x", clientRootBeforeRepair)
t.Logf(" Repaired client root: %x", clientRootAfterRepair)
t.Logf(" Frame expected root: %x", expectedRoot)
if bytes.Equal(clientRootAfterRepair, expectedRoot) {
t.Log("SUCCESS: Repaired root matches frame expected root!")
} else {
t.Log("Note: Repaired root differs from frame expected root - corruption exists at source")
}
}
// TestHypergraphSyncWithPagination tests that syncing a large tree with >1000 leaves
@ -3376,3 +3596,528 @@ func TestHypergraphSyncWithPagination(t *testing.T) {
assert.Equal(t, serverRoot, clientRoot, "client root should match server root after sync")
t.Log("Pagination test passed - client converged to server state")
}
// dumpHypergraphShardKeys dumps all database keys matching the global prover shard pattern.
// This replicates the behavior of: dbscan -prefix 09 -search 000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
// Parameters:
// - t: testing context for logging
// - db: the PebbleDB to inspect
// - label: a label to identify the database in output (e.g., "client", "server")
func dumpHypergraphShardKeys(t *testing.T, db *store.PebbleDB, label string) {
// Prefix 0x09 = HYPERGRAPH_SHARD
prefixFilter := []byte{store.HYPERGRAPH_SHARD}
// Global prover shard key: L1=[0x00,0x00,0x00], L2=[0xff * 32]
// As hex: 000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff (35 bytes)
keySearchPattern, err := hex.DecodeString("000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
if err != nil {
t.Logf("[%s] Failed to decode search pattern: %v", label, err)
return
}
// Set iteration bounds based on prefix
lowerBound := prefixFilter
upperBound := []byte{store.HYPERGRAPH_SHARD + 1}
iter, err := db.NewIter(lowerBound, upperBound)
if err != nil {
t.Logf("[%s] Failed to create iterator: %v", label, err)
return
}
defer iter.Close()
t.Logf("=== Database dump for %s (prefix=09, search=global prover shard) ===", label)
count := 0
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
value := iter.Value()
// Apply prefix filter
if !bytes.HasPrefix(key, prefixFilter) {
continue
}
// Apply key search pattern (must contain the global prover shard key bytes)
if !bytes.Contains(key, keySearchPattern) {
continue
}
count++
// Decode and display the key/value
semantic := describeHypergraphKeyForTest(key)
decoded := decodeHypergraphValueForTest(key, value)
t.Logf("[%s] key: %s", label, hex.EncodeToString(key))
t.Logf("[%s] semantic: %s", label, semantic)
t.Logf("[%s] value:\n%s\n", label, indentForTest(decoded))
}
t.Logf("=== End dump for %s: %d keys matched ===", label, count)
}
// describeHypergraphKeyForTest provides semantic description of hypergraph keys.
// Mirrors the logic from dbscan/main.go describeHypergraphKey.
func describeHypergraphKeyForTest(key []byte) string {
if len(key) < 2 {
return "hypergraph: invalid key length"
}
// Check for shard commit keys (frame-based)
if len(key) >= 10 {
switch key[9] {
case store.HYPERGRAPH_VERTEX_ADDS_SHARD_COMMIT,
store.HYPERGRAPH_VERTEX_REMOVES_SHARD_COMMIT,
store.HYPERGRAPH_HYPEREDGE_ADDS_SHARD_COMMIT,
store.HYPERGRAPH_HYPEREDGE_REMOVES_SHARD_COMMIT:
frame := binary.BigEndian.Uint64(key[1:9])
shard := key[10:]
var setPhase string
switch key[9] {
case store.HYPERGRAPH_VERTEX_ADDS_SHARD_COMMIT:
setPhase = "vertex-adds"
case store.HYPERGRAPH_VERTEX_REMOVES_SHARD_COMMIT:
setPhase = "vertex-removes"
case store.HYPERGRAPH_HYPEREDGE_ADDS_SHARD_COMMIT:
setPhase = "hyperedge-adds"
case store.HYPERGRAPH_HYPEREDGE_REMOVES_SHARD_COMMIT:
setPhase = "hyperedge-removes"
}
return fmt.Sprintf(
"hypergraph shard commit %s frame=%d shard=%s",
setPhase,
frame,
shortHexForTest(shard),
)
}
}
sub := key[1]
payload := key[2:]
switch sub {
case store.VERTEX_DATA:
return fmt.Sprintf("hypergraph vertex data id=%s", shortHexForTest(payload))
case store.VERTEX_TOMBSTONE:
return fmt.Sprintf("hypergraph vertex tombstone id=%s", shortHexForTest(payload))
case store.VERTEX_ADDS_TREE_NODE,
store.VERTEX_REMOVES_TREE_NODE,
store.HYPEREDGE_ADDS_TREE_NODE,
store.HYPEREDGE_REMOVES_TREE_NODE:
if len(payload) >= 35 {
l1 := payload[:3]
l2 := payload[3:35]
node := payload[35:]
return fmt.Sprintf(
"%s tree node shard=[%s|%s] node=%s",
describeHypergraphTreeTypeForTest(sub),
shortHexForTest(l1),
shortHexForTest(l2),
shortHexForTest(node),
)
}
return fmt.Sprintf(
"%s tree node (invalid length)",
describeHypergraphTreeTypeForTest(sub),
)
case store.VERTEX_ADDS_TREE_NODE_BY_PATH,
store.VERTEX_REMOVES_TREE_NODE_BY_PATH,
store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH:
if len(payload) >= 35 {
l1 := payload[:3]
l2 := payload[3:35]
path := parseUint64PathForTest(payload[35:])
return fmt.Sprintf(
"%s path shard=[%s|%s] path=%v",
describeHypergraphTreeTypeForTest(sub),
shortHexForTest(l1),
shortHexForTest(l2),
path,
)
}
return fmt.Sprintf(
"%s path (invalid length)",
describeHypergraphTreeTypeForTest(sub),
)
case store.VERTEX_ADDS_TREE_ROOT,
store.VERTEX_REMOVES_TREE_ROOT,
store.HYPEREDGE_ADDS_TREE_ROOT,
store.HYPEREDGE_REMOVES_TREE_ROOT:
if len(payload) >= 35 {
l1 := payload[:3]
l2 := payload[3:35]
return fmt.Sprintf(
"%s tree root shard=[%s|%s]",
describeHypergraphTreeTypeForTest(sub),
shortHexForTest(l1),
shortHexForTest(l2),
)
}
return fmt.Sprintf(
"%s tree root (invalid length)",
describeHypergraphTreeTypeForTest(sub),
)
case store.HYPERGRAPH_COVERED_PREFIX:
return "hypergraph covered prefix metadata"
case store.HYPERGRAPH_COMPLETE:
return "hypergraph completeness flag"
default:
return fmt.Sprintf(
"hypergraph unknown subtype 0x%02x raw=%s",
sub,
shortHexForTest(payload),
)
}
}
func describeHypergraphTreeTypeForTest(kind byte) string {
switch kind {
case store.VERTEX_ADDS_TREE_NODE,
store.VERTEX_ADDS_TREE_NODE_BY_PATH,
store.VERTEX_ADDS_TREE_ROOT:
return "vertex adds"
case store.VERTEX_REMOVES_TREE_NODE,
store.VERTEX_REMOVES_TREE_NODE_BY_PATH,
store.VERTEX_REMOVES_TREE_ROOT:
return "vertex removes"
case store.HYPEREDGE_ADDS_TREE_NODE,
store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
store.HYPEREDGE_ADDS_TREE_ROOT:
return "hyperedge adds"
case store.HYPEREDGE_REMOVES_TREE_NODE,
store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH,
store.HYPEREDGE_REMOVES_TREE_ROOT:
return "hyperedge removes"
default:
return "hypergraph"
}
}
// decodeHypergraphValueForTest decodes hypergraph values for display.
// Mirrors the logic from dbscan/main.go decodeHypergraphValue.
func decodeHypergraphValueForTest(key []byte, value []byte) string {
if len(value) == 0 {
return "<empty>"
}
sub := byte(0)
if len(key) > 1 {
sub = key[1]
}
switch sub {
case store.VERTEX_DATA:
return summarizeVectorCommitmentTreeForTest(key, value)
case store.VERTEX_TOMBSTONE:
return shortHexForTest(value)
case store.VERTEX_ADDS_TREE_NODE,
store.VERTEX_REMOVES_TREE_NODE,
store.HYPEREDGE_ADDS_TREE_NODE,
store.HYPEREDGE_REMOVES_TREE_NODE,
store.VERTEX_ADDS_TREE_NODE_BY_PATH,
store.VERTEX_REMOVES_TREE_NODE_BY_PATH,
store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH,
store.VERTEX_ADDS_TREE_ROOT,
store.VERTEX_REMOVES_TREE_ROOT,
store.HYPEREDGE_ADDS_TREE_ROOT,
store.HYPEREDGE_REMOVES_TREE_ROOT:
return summarizeHypergraphTreeNodeForTest(value)
case store.HYPERGRAPH_COVERED_PREFIX:
return decodeCoveredPrefixForTest(value)
case store.HYPERGRAPH_COMPLETE:
if len(value) == 0 {
return "complete=false"
}
return fmt.Sprintf("complete=%t", value[len(value)-1] != 0)
default:
return shortHexForTest(value)
}
}
func summarizeVectorCommitmentTreeForTest(key []byte, value []byte) string {
tree, err := tries.DeserializeNonLazyTree(value)
if err != nil {
return fmt.Sprintf(
"vector_commitment_tree decode_error=%v raw=%s",
err,
shortHexForTest(value),
)
}
sum := sha256.Sum256(value)
summary := map[string]any{
"size_bytes": len(value),
"sha256": shortHexForTest(sum[:]),
}
// Check if this is a global intrinsic vertex (domain = 0xff*32)
globalIntrinsicAddress := bytes.Repeat([]byte{0xff}, 32)
if len(key) >= 66 {
domain := key[2:34]
address := key[34:66]
if bytes.Equal(domain, globalIntrinsicAddress) {
// This is a global intrinsic vertex - decode the fields
globalData := decodeGlobalIntrinsicVertexForTest(tree, address)
if globalData != nil {
for k, v := range globalData {
summary[k] = v
}
}
}
}
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
return fmt.Sprintf("vector_commitment_tree size_bytes=%d", len(value))
}
return string(jsonBytes)
}
func decodeGlobalIntrinsicVertexForTest(tree *tries.VectorCommitmentTree, address []byte) map[string]any {
result := make(map[string]any)
result["vertex_address"] = hex.EncodeToString(address)
// Check order 0 field
order0Value, err := tree.Get([]byte{0x00})
if err != nil || len(order0Value) == 0 {
result["type"] = "unknown (no order 0 field)"
return result
}
switch len(order0Value) {
case 585:
// Prover: PublicKey is 585 bytes
result["type"] = "prover:Prover"
result["public_key"] = shortHexForTest(order0Value)
decodeProverFieldsForTest(tree, result)
case 32:
// Could be Allocation (Prover reference) or Reward (DelegateAddress)
joinFrame, _ := tree.Get([]byte{0x10})
if len(joinFrame) == 8 {
result["type"] = "allocation:ProverAllocation"
result["prover_reference"] = hex.EncodeToString(order0Value)
decodeAllocationFieldsForTest(tree, result)
} else {
result["type"] = "reward:ProverReward"
result["delegate_address"] = hex.EncodeToString(order0Value)
}
default:
result["type"] = "unknown"
result["order_0_size"] = len(order0Value)
}
return result
}
func decodeProverFieldsForTest(tree *tries.VectorCommitmentTree, result map[string]any) {
if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 {
result["status"] = decodeProverStatusForTest(status[0])
result["status_raw"] = status[0]
}
if storage, err := tree.Get([]byte{0x08}); err == nil && len(storage) == 8 {
result["available_storage"] = binary.BigEndian.Uint64(storage)
}
if seniority, err := tree.Get([]byte{0x0c}); err == nil && len(seniority) == 8 {
result["seniority"] = binary.BigEndian.Uint64(seniority)
}
if kickFrame, err := tree.Get([]byte{0x10}); err == nil && len(kickFrame) == 8 {
result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame)
}
}
func decodeAllocationFieldsForTest(tree *tries.VectorCommitmentTree, result map[string]any) {
if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 {
result["status"] = decodeProverStatusForTest(status[0])
result["status_raw"] = status[0]
}
if confirmFilter, err := tree.Get([]byte{0x08}); err == nil && len(confirmFilter) > 0 {
result["confirmation_filter"] = hex.EncodeToString(confirmFilter)
if bytes.Equal(confirmFilter, make([]byte, len(confirmFilter))) {
result["is_global_prover"] = true
}
} else {
result["is_global_prover"] = true
}
if joinFrame, err := tree.Get([]byte{0x10}); err == nil && len(joinFrame) == 8 {
result["join_frame_number"] = binary.BigEndian.Uint64(joinFrame)
}
if leaveFrame, err := tree.Get([]byte{0x14}); err == nil && len(leaveFrame) == 8 {
result["leave_frame_number"] = binary.BigEndian.Uint64(leaveFrame)
}
if lastActive, err := tree.Get([]byte{0x34}); err == nil && len(lastActive) == 8 {
result["last_active_frame_number"] = binary.BigEndian.Uint64(lastActive)
}
}
func decodeProverStatusForTest(status byte) string {
switch status {
case 0:
return "Joining"
case 1:
return "Active"
case 2:
return "Paused"
case 3:
return "Leaving"
case 4:
return "Rejected"
case 5:
return "Kicked"
default:
return fmt.Sprintf("Unknown(%d)", status)
}
}
func summarizeHypergraphTreeNodeForTest(value []byte) string {
if len(value) == 0 {
return "hypergraph_tree_node <empty>"
}
hash := sha256.Sum256(value)
hashStr := shortHexForTest(hash[:])
reader := bytes.NewReader(value)
var nodeType byte
if err := binary.Read(reader, binary.BigEndian, &nodeType); err != nil {
return fmt.Sprintf("tree_node decode_error=%v sha256=%s", err, hashStr)
}
switch nodeType {
case tries.TypeNil:
return fmt.Sprintf("tree_nil sha256=%s", hashStr)
case tries.TypeLeaf:
leaf, err := tries.DeserializeLeafNode(nil, reader)
if err != nil {
return fmt.Sprintf("tree_leaf decode_error=%v sha256=%s", err, hashStr)
}
summary := map[string]any{
"type": "leaf",
"key": shortHexForTest(leaf.Key),
"value": shortHexForTest(leaf.Value),
"hash_target": shortHexForTest(leaf.HashTarget),
"commitment": shortHexForTest(leaf.Commitment),
"bytes_sha256": hashStr,
}
if leaf.Size != nil {
summary["size"] = leaf.Size.String()
}
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
return fmt.Sprintf(
"tree_leaf key=%s sha256=%s",
shortHexForTest(leaf.Key),
hashStr,
)
}
return string(jsonBytes)
case tries.TypeBranch:
branch, err := tries.DeserializeBranchNode(nil, reader, true)
if err != nil {
return fmt.Sprintf("tree_branch decode_error=%v sha256=%s", err, hashStr)
}
childSummary := map[string]int{
"branch": 0,
"leaf": 0,
"nil": 0,
}
for _, child := range branch.Children {
switch child.(type) {
case *tries.LazyVectorCommitmentBranchNode:
childSummary["branch"]++
case *tries.LazyVectorCommitmentLeafNode:
childSummary["leaf"]++
default:
childSummary["nil"]++
}
}
summary := map[string]any{
"type": "branch",
"prefix": branch.Prefix,
"leaf_count": branch.LeafCount,
"longest_branch": branch.LongestBranch,
"commitment": shortHexForTest(branch.Commitment),
"children": childSummary,
"bytes_sha256": hashStr,
}
if branch.Size != nil {
summary["size"] = branch.Size.String()
}
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
return fmt.Sprintf(
"tree_branch prefix=%v leafs=%d sha256=%s",
branch.Prefix,
branch.LeafCount,
hashStr,
)
}
return string(jsonBytes)
default:
return fmt.Sprintf(
"tree_node type=0x%02x payload=%s sha256=%s",
nodeType,
shortHexForTest(value[1:]),
hashStr,
)
}
}
func decodeCoveredPrefixForTest(value []byte) string {
if len(value)%8 != 0 {
return shortHexForTest(value)
}
result := make([]int64, len(value)/8)
for i := range result {
result[i] = int64(binary.BigEndian.Uint64(value[i*8 : (i+1)*8]))
}
return fmt.Sprintf("covered_prefix=%v", result)
}
func shortHexForTest(b []byte) string {
if len(b) == 0 {
return "0x"
}
if len(b) <= 16 {
return "0x" + hex.EncodeToString(b)
}
return fmt.Sprintf(
"0x%s...%s(len=%d)",
hex.EncodeToString(b[:8]),
hex.EncodeToString(b[len(b)-8:]),
len(b),
)
}
func parseUint64PathForTest(b []byte) []uint64 {
if len(b)%8 != 0 {
return nil
}
out := make([]uint64, len(b)/8)
for i := range out {
out[i] = binary.BigEndian.Uint64(b[i*8 : (i+1)*8])
}
return out
}
func indentForTest(value string) string {
if value == "" {
return ""
}
lines := bytes.Split([]byte(value), []byte("\n"))
for i, line := range lines {
lines[i] = append([]byte(" "), line...)
}
return string(bytes.Join(lines, []byte("\n")))
}

View File

@ -7,8 +7,8 @@ import (
"encoding/hex"
"fmt"
"io"
"net"
"os"
"slices"
"strings"
pebblev1 "github.com/cockroachdb/pebble"
@ -16,9 +16,13 @@ import (
"github.com/cockroachdb/pebble/v2/vfs"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
"source.quilibrium.com/quilibrium/monorepo/bls48581"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
@ -34,7 +38,7 @@ func (p *PebbleDB) DB() *pebble.DB {
// pebbleMigrations contains ordered migration steps. New migrations append to
// the end.
var pebbleMigrations = []func(*pebble.Batch, *pebble.DB) error{
var pebbleMigrations = []func(*pebble.Batch, *pebble.DB, *config.DBConfig) error{
migration_2_1_0_4,
migration_2_1_0_5,
migration_2_1_0_8,
@ -86,6 +90,8 @@ var pebbleMigrations = []func(*pebble.Batch, *pebble.DB) error{
migration_2_1_0_1814,
migration_2_1_0_1815,
migration_2_1_0_1816,
migration_2_1_0_1817,
migration_2_1_0_1818,
}
func NewPebbleDB(
@ -304,7 +310,7 @@ func (p *PebbleDB) migrate(logger *zap.Logger) error {
zap.Int("from_version", int(storedVersion)),
zap.Int("to_version", int(storedVersion+1)),
)
if err := pebbleMigrations[i](batch, p.db); err != nil {
if err := pebbleMigrations[i](batch, p.db, p.config); err != nil {
batch.Close()
logger.Error("migration failed", zap.Error(err))
return errors.Wrapf(err, "apply migration %d", i+1)
@ -482,7 +488,7 @@ func rightAlign(data []byte, size int) []byte {
// Resolves all the variations of store issues from any series of upgrade steps
// in 2.1.0.1->2.1.0.3
func migration_2_1_0_4(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_4(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
// batches don't use this but for backcompat the parameter is required
wo := &pebble.WriteOptions{}
@ -583,138 +589,138 @@ func migration_2_1_0_4(b *pebble.Batch, db *pebble.DB) error {
return nil
}
func migration_2_1_0_5(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_5(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
// We just re-run it again
return migration_2_1_0_4(b, db)
return migration_2_1_0_4(b, db, config)
}
func migration_2_1_0_8(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_8(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
// these migration entries exist solely to advance migration number so all
// nodes are consistent
return nil
}
func migration_2_1_0_81(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_81(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
// these migration entries exist solely to advance migration number so all
// nodes are consistent
return nil
}
func migration_2_1_0_10(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_10(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
// these migration entries exist solely to advance migration number so all
// nodes are consistent
return nil
}
func migration_2_1_0_11(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_11(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_14(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_14(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_141(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_14(b, db)
func migration_2_1_0_141(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_14(b, db, config)
}
func migration_2_1_0_142(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_14(b, db)
func migration_2_1_0_142(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_14(b, db, config)
}
func migration_2_1_0_143(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_14(b, db)
func migration_2_1_0_143(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_14(b, db, config)
}
func migration_2_1_0_144(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_14(b, db)
func migration_2_1_0_144(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_14(b, db, config)
}
func migration_2_1_0_145(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_14(b, db)
func migration_2_1_0_145(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_14(b, db, config)
}
func migration_2_1_0_146(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_14(b, db)
func migration_2_1_0_146(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_14(b, db, config)
}
func migration_2_1_0_147(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_14(b, db)
func migration_2_1_0_147(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_14(b, db, config)
}
func migration_2_1_0_148(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_14(b, db)
func migration_2_1_0_148(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_14(b, db, config)
}
func migration_2_1_0_149(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_149(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_1410(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_149(b, db)
func migration_2_1_0_1410(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_149(b, db, config)
}
func migration_2_1_0_1411(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_149(b, db)
func migration_2_1_0_1411(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_149(b, db, config)
}
func migration_2_1_0_15(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_15(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_151(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_151(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_152(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_152(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_153(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_153(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_154(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_154(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_155(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_155(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_156(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_156(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_157(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_157(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_158(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_158(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_159(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_15(b, db)
func migration_2_1_0_159(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_15(b, db, config)
}
func migration_2_1_0_17(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_17(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_171(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_171(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_172(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_172(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_173(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_173(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_18(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_18(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
// Global shard key: L1={0,0,0}, L2=0xff*32
globalShardKey := tries.ShardKey{
L1: [3]byte{},
@ -795,76 +801,69 @@ func migration_2_1_0_18(b *pebble.Batch, db *pebble.DB) error {
return nil
}
func migration_2_1_0_181(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_18(b, db)
func migration_2_1_0_181(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_18(b, db, config)
}
func migration_2_1_0_182(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_182(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_183(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_183(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_184(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_184(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_185(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_185(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_186(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_186(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_187(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_187(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_188(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_188(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_189(b *pebble.Batch, db *pebble.DB) error {
func migration_2_1_0_189(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return nil
}
func migration_2_1_0_1810(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_189(b, db)
func migration_2_1_0_1810(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_189(b, db, config)
}
func migration_2_1_0_1811(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_189(b, db)
func migration_2_1_0_1811(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_189(b, db, config)
}
func migration_2_1_0_1812(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_189(b, db)
func migration_2_1_0_1812(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_189(b, db, config)
}
func migration_2_1_0_1813(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_189(b, db)
func migration_2_1_0_1813(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_189(b, db, config)
}
func migration_2_1_0_1814(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_189(b, db)
func migration_2_1_0_1814(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_189(b, db, config)
}
func migration_2_1_0_1815(b *pebble.Batch, db *pebble.DB) error {
return migration_2_1_0_189(b, db)
func migration_2_1_0_1815(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_189(b, db, config)
}
// migration_2_1_0_1816 recalculates commitments for the global prover trees
// to fix potential corruption from earlier versions of sync.
func migration_2_1_0_1816(b *pebble.Batch, db *pebble.DB) error {
// Check if already done
doneKey := []byte{HYPERGRAPH_SHARD, HYPERGRAPH_GLOBAL_PROVER_RECALC_DONE}
if _, closer, err := b.Get(doneKey); err == nil {
closer.Close()
return nil // Already done
}
func migration_2_1_0_1816(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
// Global prover shard key: L1={0,0,0}, L2=0xff*32
globalShardKey := tries.ShardKey{
L1: [3]byte{},
@ -879,112 +878,249 @@ func migration_2_1_0_1816(b *pebble.Batch, db *pebble.DB) error {
// Initialize prover (logger can be nil for migrations)
prover := bls48581.NewKZGInclusionProver(nil)
// Create hypergraph store using the batch
hgStore := NewPebbleHypergraphStore(nil, &PebbleDB{db: db}, nil, nil, prover)
// Create hypergraph store using the batch wrapper
hgStore := NewPebbleHypergraphStore(nil, &PebbleDB{db: db}, zap.L(), nil, prover)
// Load and recalculate each tree for the global prover shard
treeTypes := []struct {
setType string
phaseType string
rootKey func(tries.ShardKey) []byte
}{
{
string(hypergraph.VertexAtomType),
string(hypergraph.AddsPhaseType),
hypergraphVertexAddsTreeRootKey,
},
{
string(hypergraph.VertexAtomType),
string(hypergraph.RemovesPhaseType),
hypergraphVertexRemovesTreeRootKey,
},
{
string(hypergraph.HyperedgeAtomType),
string(hypergraph.AddsPhaseType),
hypergraphHyperedgeAddsTreeRootKey,
},
{
string(hypergraph.HyperedgeAtomType),
string(hypergraph.RemovesPhaseType),
hypergraphHyperedgeRemovesTreeRootKey,
hg, err := hgStore.LoadHypergraph(nil, 0)
if err != nil {
return err
}
hgc := hg.(*hgcrdt.HypergraphCRDT)
hgc.GetVertexAddsSet(globalShardKey).GetTree().Commit(true)
return nil
}
func migration_2_1_0_1817(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return migration_2_1_0_1816(b, db, config)
}
// migration_2_1_0_1818 repairs corrupted global prover shard tree data by:
// 1. Creating an in-memory hypergraph instance
// 2. Setting up a local gRPC sync server backed by the actual DB hypergraph
// 3. Syncing from the actual DB to the in-memory instance via the sync protocol
// 4. Wiping all tree data for the global prover shard from the actual DB
// 5. Setting up a local gRPC sync server backed by the in-memory hypergraph
// 6. Syncing from the in-memory instance back to the actual DB hypergraph
func migration_2_1_0_1818(b *pebble.Batch, db *pebble.DB, config *config.DBConfig) error {
return doMigration1818(db, config)
}
// doMigration1818 performs the actual migration work for migration_2_1_0_1818.
// It uses the sync protocol to repair corrupted tree data by syncing to an
// in-memory instance and back.
func doMigration1818(db *pebble.DB, config *config.DBConfig) error {
logger := zap.L()
// Global prover shard key: L1={0,0,0}, L2=0xff*32
globalShardKey := tries.ShardKey{
L1: [3]byte{},
L2: [32]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
}
for _, tt := range treeTypes {
rootData, closer, err := b.Get(tt.rootKey(globalShardKey))
prover := bls48581.NewKZGInclusionProver(logger)
// Create hypergraph from actual DB
actualDBWrapper := &PebbleDB{db: db}
actualStore := NewPebbleHypergraphStore(config, actualDBWrapper, logger, nil, prover)
actualHG, err := actualStore.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "load actual hypergraph")
}
actualHGCRDT := actualHG.(*hgcrdt.HypergraphCRDT)
// Create in-memory pebble DB directly (bypassing NewPebbleDB to avoid cycle)
memOpts := &pebble.Options{
MemTableSize: 64 << 20,
FormatMajorVersion: pebble.FormatNewest,
FS: vfs.NewMem(),
}
memDB, err := pebble.Open("", memOpts)
if err != nil {
return errors.Wrap(err, "open in-memory pebble")
}
defer memDB.Close()
memDBWrapper := &PebbleDB{db: memDB}
memStore := NewPebbleHypergraphStore(config, memDBWrapper, logger, nil, prover)
memHG, err := memStore.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "load in-memory hypergraph")
}
memHGCRDT := memHG.(*hgcrdt.HypergraphCRDT)
// Phase 1: Sync from actual DB to in-memory
// Get the current root from actual DB
actualRoot := actualHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(false)
if actualRoot == nil {
logger.Info("migration 1818: no data in global prover shard, skipping")
return nil
}
// Publish snapshot on actual hypergraph
actualHGCRDT.PublishSnapshot(actualRoot)
// Set up gRPC server backed by actual hypergraph
const bufSize = 1 << 20
actualLis := bufconn.Listen(bufSize)
actualGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(actualGRPCServer, actualHGCRDT)
go func() { _ = actualGRPCServer.Serve(actualLis) }()
defer actualGRPCServer.Stop()
// Create client connection to actual hypergraph server
actualDialer := func(context.Context, string) (net.Conn, error) {
return actualLis.Dial()
}
actualConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(actualDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
if err != nil {
return errors.Wrap(err, "dial actual hypergraph")
}
defer actualConn.Close()
actualClient := protobufs.NewHypergraphComparisonServiceClient(actualConn)
// Sync from actual to in-memory for all phases
phases := []protobufs.HypergraphPhaseSet{
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES,
}
for _, phase := range phases {
stream, err := actualClient.PerformSync(context.Background())
if err != nil {
// No root for this tree, skip
continue
return errors.Wrapf(err, "create sync stream for phase %v", phase)
}
data := slices.Clone(rootData)
closer.Close()
if len(data) == 0 {
continue
}
var node tries.LazyVectorCommitmentNode
switch data[0] {
case tries.TypeLeaf:
node, err = tries.DeserializeLeafNode(hgStore, bytes.NewReader(data[1:]))
case tries.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
node, err = tries.DeserializeBranchNode(
hgStore,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
if err != nil {
return errors.Wrapf(
err,
"deserialize %s %s branch",
tt.setType,
tt.phaseType,
)
}
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
)
}
branch := node.(*tries.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
continue // Unknown type, skip
}
_, err = memHGCRDT.SyncFrom(stream, globalShardKey, phase, nil)
if err != nil {
return errors.Wrapf(
err,
"deserialize %s %s root",
tt.setType,
tt.phaseType,
)
logger.Warn("sync from actual to memory failed", zap.Error(err), zap.Any("phase", phase))
}
// Create tree and force recalculation
tree := &tries.LazyVectorCommitmentTree{
Root: node,
SetType: tt.setType,
PhaseType: tt.phaseType,
ShardKey: globalShardKey,
Store: hgStore,
CoveredPrefix: nil,
InclusionProver: prover,
}
// Force full recalculation of commitments
tree.Commit(true)
_ = stream.CloseSend()
}
// Mark migration as done
if err := b.Set(doneKey, []byte{0x01}, &pebble.WriteOptions{}); err != nil {
return errors.Wrap(err, "mark global prover recalc done")
// Commit in-memory to get root
memRoot := memHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(false)
logger.Info("migration 1818: synced to in-memory",
zap.String("actual_root", hex.EncodeToString(actualRoot)),
zap.String("mem_root", hex.EncodeToString(memRoot)),
)
// Stop the actual server before wiping data
actualGRPCServer.Stop()
actualConn.Close()
// Phase 2: Wipe tree data for global prover shard from actual DB
treePrefixes := []byte{
VERTEX_ADDS_TREE_NODE,
VERTEX_REMOVES_TREE_NODE,
HYPEREDGE_ADDS_TREE_NODE,
HYPEREDGE_REMOVES_TREE_NODE,
VERTEX_ADDS_TREE_NODE_BY_PATH,
VERTEX_REMOVES_TREE_NODE_BY_PATH,
HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
HYPEREDGE_REMOVES_TREE_NODE_BY_PATH,
VERTEX_ADDS_CHANGE_RECORD,
VERTEX_REMOVES_CHANGE_RECORD,
HYPEREDGE_ADDS_CHANGE_RECORD,
HYPEREDGE_REMOVES_CHANGE_RECORD,
VERTEX_ADDS_TREE_ROOT,
VERTEX_REMOVES_TREE_ROOT,
HYPEREDGE_ADDS_TREE_ROOT,
HYPEREDGE_REMOVES_TREE_ROOT,
}
for _, prefix := range treePrefixes {
start, end := shardRangeBounds(prefix, globalShardKey)
if err := db.DeleteRange(start, end, &pebble.WriteOptions{Sync: true}); err != nil {
return errors.Wrapf(err, "delete range for prefix 0x%02x", prefix)
}
}
logger.Info("migration 1818: wiped tree data from actual DB")
// Reload actual hypergraph after wipe
actualStore2 := NewPebbleHypergraphStore(config, actualDBWrapper, logger, nil, prover)
actualHG2, err := actualStore2.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "reload actual hypergraph after wipe")
}
actualHGCRDT2 := actualHG2.(*hgcrdt.HypergraphCRDT)
// Phase 3: Sync from in-memory back to actual DB
// Publish snapshot on in-memory hypergraph
memHGCRDT.PublishSnapshot(memRoot)
// Set up gRPC server backed by in-memory hypergraph
memLis := bufconn.Listen(bufSize)
memGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(memGRPCServer, memHGCRDT)
go func() { _ = memGRPCServer.Serve(memLis) }()
defer memGRPCServer.Stop()
// Create client connection to in-memory hypergraph server
memDialer := func(context.Context, string) (net.Conn, error) {
return memLis.Dial()
}
memConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(memDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
if err != nil {
return errors.Wrap(err, "dial in-memory hypergraph")
}
defer memConn.Close()
memClient := protobufs.NewHypergraphComparisonServiceClient(memConn)
// Sync from in-memory to actual for all phases
for _, phase := range phases {
stream, err := memClient.PerformSync(context.Background())
if err != nil {
return errors.Wrapf(err, "create sync stream for phase %v (reverse)", phase)
}
_, err = actualHGCRDT2.SyncFrom(stream, globalShardKey, phase, nil)
if err != nil {
logger.Warn("sync from memory to actual failed", zap.Error(err), zap.Any("phase", phase))
}
_ = stream.CloseSend()
}
// Final commit
finalRoot := actualHGCRDT2.GetVertexAddsSet(globalShardKey).GetTree().Commit(true)
logger.Info("migration 1818: completed",
zap.String("final_root", hex.EncodeToString(finalRoot)),
)
return nil
}