From 6d3a301b7cab960eb06830b752615f87eb2eedd9 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Tue, 25 Mar 2025 00:43:06 -0500 Subject: [PATCH] resolve issues with conversion, fix streaming --- bedlam/apps/garbled/data/.gitignore | 1 + bedlam/compiler/compiler.go | 7 +- bedlam/compiler/ssa/streamer.go | 3 + node/crypto/lazy_proof_tree.go | 28 +- node/crypto/proof_tree.go | 26 +- node/crypto/proof_tree_test.go | 2 +- node/crypto/tree_compare.go | 4 +- .../token/token_execution_engine.go | 2 + node/rpc/hypergraph_sync_rpc_server.go | 15 +- node/rpc/hypergraph_sync_rpc_server_test.go | 100 ++-- node/store/hypergraph.go | 493 +++++++++++------- node/store/inmem.go | 4 + 12 files changed, 403 insertions(+), 282 deletions(-) create mode 100644 bedlam/apps/garbled/data/.gitignore diff --git a/bedlam/apps/garbled/data/.gitignore b/bedlam/apps/garbled/data/.gitignore new file mode 100644 index 0000000..89e120f --- /dev/null +++ b/bedlam/apps/garbled/data/.gitignore @@ -0,0 +1 @@ +pre* diff --git a/bedlam/compiler/compiler.go b/bedlam/compiler/compiler.go index 752d22d..ca37a12 100644 --- a/bedlam/compiler/compiler.go +++ b/bedlam/compiler/compiler.go @@ -156,11 +156,12 @@ func (c *Compiler) stream(conn *p2p.Conn, oti ot.OT, source string, out, bits, err := program.Stream(conn, oti, c.params, input, timing) if err != nil { + fmt.Println(err) return nil, nil, err } - if false { - program.StreamDebug() - } + // if false { + program.StreamDebug() + // } return out, bits, err } diff --git a/bedlam/compiler/ssa/streamer.go b/bedlam/compiler/ssa/streamer.go index 76778e8..c3c584b 100644 --- a/bedlam/compiler/ssa/streamer.go +++ b/bedlam/compiler/ssa/streamer.go @@ -719,6 +719,9 @@ func (prog *Program) garble(conn *p2p.Conn, streaming *circuit.Streaming, if err := conn.SendUint32(int(maxID + 1)); err != nil { return err } + if err := conn.Flush(); err != nil { + return err + } tInit, tGarble, err := streaming.Garble(circ, in, out) if err != nil { return err diff --git a/node/crypto/lazy_proof_tree.go b/node/crypto/lazy_proof_tree.go index 7746cdc..390f80f 100644 --- a/node/crypto/lazy_proof_tree.go +++ b/node/crypto/lazy_proof_tree.go @@ -162,7 +162,7 @@ func (n *LazyVectorCommitmentBranchNode) Commit( phaseType, shardKey, generateKeyFromPath(n.FullPrefix), - n.FullPrefix, + path, n, ); err != nil { panic(err) @@ -444,7 +444,7 @@ func (t *LazyVectorCommitmentTree) Insert( t.PhaseType, t.ShardKey, generateKeyFromPath(slices.Concat(path, sharedNibbles)), - slices.Concat(path, sharedNibbles), + path, branch, ) if err != nil { @@ -499,13 +499,15 @@ func (t *LazyVectorCommitmentTree) Insert( path, newBranch.Prefix, []int{expectedNibble}, + n.Prefix, ) + err = t.Store.InsertNode( txn, t.SetType, t.PhaseType, t.ShardKey, - generateKeyFromPath(slices.Concat(path, newBranch.Prefix, []int{expectedNibble})), + generateKeyFromPath(slices.Concat(path, newBranch.Prefix, []int{expectedNibble}, n.Prefix)), slices.Concat(path, newBranch.Prefix, []int{expectedNibble}), newBranch.Children[expectedNibble], ) @@ -520,7 +522,7 @@ func (t *LazyVectorCommitmentTree) Insert( t.PhaseType, t.ShardKey, generateKeyFromPath(slices.Concat(path, newBranch.Prefix)), - slices.Concat(path, newBranch.Prefix), + path, newBranch, ) if err != nil { @@ -534,11 +536,8 @@ func (t *LazyVectorCommitmentTree) Insert( // Key matches prefix, continue with final nibble finalNibble := getNextNibble(key, depth+len(n.Prefix)*BranchBits) - maybeBranch, ok := n.Children[finalNibble].(*LazyVectorCommitmentBranchNode) newPath := slices.Concat(path, n.Prefix, []int{finalNibble}) - if ok { - newPath = append(newPath, maybeBranch.Prefix...) - } + delta, inserted := insert( n.Children[finalNibble], depth+len(n.Prefix)*BranchBits+BranchBits, @@ -577,11 +576,8 @@ func (t *LazyVectorCommitmentTree) Insert( } else { // Simple branch without prefix nibble := getNextNibble(key, depth) - maybeBranch, ok := n.Children[nibble].(*LazyVectorCommitmentBranchNode) newPath := slices.Concat(path, n.Prefix, []int{nibble}) - if ok { - newPath = append(newPath, maybeBranch.Prefix...) - } + delta, inserted := insert(n.Children[nibble], depth+BranchBits, newPath) n.Children[nibble] = inserted n.Commitment = nil @@ -619,13 +615,7 @@ func (t *LazyVectorCommitmentTree) Insert( return 0, nil } - prefix := []int{} - branch, ok := t.Root.(*LazyVectorCommitmentBranchNode) - if ok { - prefix = append(prefix, branch.Prefix...) - } - - _, t.Root = insert(t.Root, 0, prefix) + _, t.Root = insert(t.Root, 0, []int{}) return errors.Wrap(t.Store.SaveRoot( t.SetType, t.PhaseType, diff --git a/node/crypto/proof_tree.go b/node/crypto/proof_tree.go index 7e25e8d..9048b2c 100644 --- a/node/crypto/proof_tree.go +++ b/node/crypto/proof_tree.go @@ -8,6 +8,8 @@ import ( "errors" "fmt" "math/big" + "slices" + "strings" "sync" rbls48581 "source.quilibrium.com/quilibrium/monorepo/bls48581" @@ -200,7 +202,7 @@ type VectorCommitmentTree struct { func getNextNibble(key []byte, pos int) int { startByte := pos / 8 if startByte >= len(key) { - return 0 + return -1 } // Calculate how many bits we need from the current byte @@ -232,7 +234,7 @@ func getNibblesUntilDiverge(key1, key2 []byte, startDepth int) ([]int, int) { for { n1 := getNextNibble(key1, depth) n2 := getNextNibble(key2, depth) - if n1 != n2 { + if n1 == -1 || n2 == -1 || n1 != n2 { return nibbles, depth } nibbles = append(nibbles, n1) @@ -613,20 +615,32 @@ func (t *VectorCommitmentTree) GetSize() *big.Int { return t.Root.GetSize() } -func DebugNode(node VectorCommitmentNode, depth int, prefix string) { +func DebugNode(setType, phaseType string, shardKey ShardKey, node LazyVectorCommitmentNode, depth int, prefix string) { if node == nil { return } switch n := node.(type) { - case *VectorCommitmentLeafNode: + case *LazyVectorCommitmentLeafNode: fmt.Printf("%sLeaf: key=%x value=%x\n", prefix, n.Key, n.Value) - case *VectorCommitmentBranchNode: + case *LazyVectorCommitmentBranchNode: fmt.Printf("%sBranch %v:\n", prefix, n.Prefix) for i, child := range n.Children { + if child == nil { + var err error + child, err = n.Store.GetNodeByPath( + setType, + phaseType, + shardKey, + slices.Concat(n.FullPrefix, []int{i}), + ) + if err != nil && !strings.Contains(err.Error(), "not found") { + panic(err) + } + } if child != nil { fmt.Printf("%s [%d]:\n", prefix, i) - DebugNode(child, depth+1, prefix+" ") + DebugNode(setType, phaseType, shardKey, child, depth+1, prefix+" ") } } } diff --git a/node/crypto/proof_tree_test.go b/node/crypto/proof_tree_test.go index 7f12769..fd58ef9 100644 --- a/node/crypto/proof_tree_test.go +++ b/node/crypto/proof_tree_test.go @@ -379,5 +379,5 @@ func TestVectorCommitmentTrees(t *testing.T) { t.Errorf("incorrect longest branch count, %d, %d,", 4, longestBranch) } - DebugNode(tree.Root, 0, "") + // DebugNode(tree.Root, 0, "") } diff --git a/node/crypto/tree_compare.go b/node/crypto/tree_compare.go index 5798202..a248fb2 100644 --- a/node/crypto/tree_compare.go +++ b/node/crypto/tree_compare.go @@ -185,8 +185,6 @@ func CompareLeaves(tree1, tree2 *LazyVectorCommitmentTree) []LeafDifference { tree2.ShardKey, tree2.Root, ) - fmt.Println(len(leaves1)) - fmt.Println(len(leaves2)) differences := make([]LeafDifference, 0) @@ -323,6 +321,8 @@ func GetAllLeaves( leaves = append(leaves, n) case *LazyVectorCommitmentBranchNode: for i, child := range n.Children { + child := child + i := i var err error if child == nil { child, err = n.Store.GetNodeByPath( diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index 47c4b9d..289f02d 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -972,6 +972,7 @@ func (e *TokenExecutionEngine) rebuildMissingSetForHypergraph(set [][]byte) { txn.Abort() panic(err) } + e.hypergraphStore.MarkHypergraphAsComplete() } func (e *TokenExecutionEngine) rebuildHypergraph(totalRange int) { @@ -1057,6 +1058,7 @@ func (e *TokenExecutionEngine) rebuildHypergraph(totalRange int) { txn.Abort() panic(err) } + e.hypergraphStore.MarkHypergraphAsComplete() } // GetName implements ExecutionEngine diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index 88cec75..f37245a 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -365,6 +365,7 @@ func (s *streamManager) sendLeafData( default: } + fmt.Printf("send leaf data %v\n", path) node := getNodeAtPath( s.localTree.SetType, s.localTree.PhaseType, @@ -381,6 +382,7 @@ func (s *streamManager) sendLeafData( s.localTree.ShardKey, node, ) + s.logger.Info("sending set of leaves", zap.Int("leaf_count", len(children))) for _, child := range children { if child == nil { continue @@ -412,6 +414,7 @@ func getNodeAtPath( return nil } if len(path) == 0 { + fmt.Println("path is zero, returning") return node } @@ -419,6 +422,7 @@ func getNodeAtPath( case *crypto.LazyVectorCommitmentLeafNode: return node case *crypto.LazyVectorCommitmentBranchNode: + fmt.Printf("path %v\nprefix %v\n", path, n.Prefix) // Check that the branch's prefix matches the beginning of the query path. if len(path) < len(n.Prefix) { return nil @@ -426,6 +430,7 @@ func getNodeAtPath( for i, nib := range n.Prefix { if int32(nib) != path[i] { + fmt.Println("no viable prefix") return nil } } @@ -433,15 +438,18 @@ func getNodeAtPath( // Remove the prefix portion from the path. remainder := path[len(n.Prefix):] if len(remainder) == 0 { + fmt.Println("remainder 0, return node") return node } // The first element of the remainder selects the child. childIndex := remainder[0] if int(childIndex) < 0 || int(childIndex) >= len(n.Children) { + fmt.Println("invalid child index") return nil } + fmt.Printf("get node at path %v\n", slices.Concat(n.FullPrefix, []int{int(childIndex)})) child, err := n.Store.GetNodeByPath( setType, phaseType, @@ -456,6 +464,7 @@ func getNodeAtPath( return nil } + fmt.Println("recurse") return getNodeAtPath( setType, phaseType, @@ -740,7 +749,11 @@ func (s *streamManager) walk( pathString := zap.String("path", hex.EncodeToString(packPath(path))) if bytes.Equal(lnode.Commitment, rnode.Commitment) { - s.logger.Info("commitments match", pathString) + s.logger.Info( + "commitments match", + pathString, + zap.String("commitment", hex.EncodeToString(lnode.Commitment)), + ) return nil } diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 64ae1de..dddee1f 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -10,7 +10,6 @@ import ( "math/big" "net" "testing" - "time" "github.com/cloudflare/circl/sign/ed448" pcrypto "github.com/libp2p/go-libp2p/core/crypto" @@ -158,17 +157,16 @@ func TestHypergraphSyncServer(t *testing.T) { switch op.Type { case "AddVertex": id := op.Vertex.GetID() - fmt.Printf("server add vertex %x %v\n", id, time.Now()) serverHypergraphStore.SaveVertexTree(txn, id[:], dataTree) crdts[0].AddVertex(nil, op.Vertex) case "RemoveVertex": crdts[0].RemoveVertex(nil, op.Vertex) - case "AddHyperedge": - fmt.Printf("server add hyperedge %v\n", time.Now()) - crdts[0].AddHyperedge(nil, op.Hyperedge) - case "RemoveHyperedge": - fmt.Printf("server remove hyperedge %v\n", time.Now()) - crdts[0].RemoveHyperedge(nil, op.Hyperedge) + // case "AddHyperedge": + // fmt.Printf("server add hyperedge %v\n", time.Now()) + // crdts[0].AddHyperedge(nil, op.Hyperedge) + // case "RemoveHyperedge": + // fmt.Printf("server remove hyperedge %v\n", time.Now()) + // crdts[0].RemoveHyperedge(nil, op.Hyperedge) } } txn.Commit() @@ -178,12 +176,12 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[0].AddVertex(nil, op.Vertex) case "RemoveVertex": crdts[0].RemoveVertex(nil, op.Vertex) - case "AddHyperedge": - fmt.Printf("server add hyperedge %v\n", time.Now()) - crdts[0].AddHyperedge(nil, op.Hyperedge) - case "RemoveHyperedge": - fmt.Printf("server remove hyperedge %v\n", time.Now()) - crdts[0].RemoveHyperedge(nil, op.Hyperedge) + // case "AddHyperedge": + // fmt.Printf("server add hyperedge %v\n", time.Now()) + // crdts[0].AddHyperedge(nil, op.Hyperedge) + // case "RemoveHyperedge": + // fmt.Printf("server remove hyperedge %v\n", time.Now()) + // crdts[0].RemoveHyperedge(nil, op.Hyperedge) } } @@ -192,17 +190,16 @@ func TestHypergraphSyncServer(t *testing.T) { switch op.Type { case "AddVertex": id := op.Vertex.GetID() - fmt.Printf("client add vertex %x %v\n", id, time.Now()) clientHypergraphStore.SaveVertexTree(txn, id[:], dataTree) crdts[1].AddVertex(nil, op.Vertex) case "RemoveVertex": crdts[1].RemoveVertex(nil, op.Vertex) - case "AddHyperedge": - fmt.Printf("client add hyperedge %v\n", time.Now()) - crdts[1].AddHyperedge(nil, op.Hyperedge) - case "RemoveHyperedge": - fmt.Printf("client remove hyperedge %v\n", time.Now()) - crdts[1].RemoveHyperedge(nil, op.Hyperedge) + // case "AddHyperedge": + // fmt.Printf("client add hyperedge %v\n", time.Now()) + // crdts[1].AddHyperedge(nil, op.Hyperedge) + // case "RemoveHyperedge": + // fmt.Printf("client remove hyperedge %v\n", time.Now()) + // crdts[1].RemoveHyperedge(nil, op.Hyperedge) } } txn.Commit() @@ -212,12 +209,12 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[1].AddVertex(nil, op.Vertex) case "RemoveVertex": crdts[1].RemoveVertex(nil, op.Vertex) - case "AddHyperedge": - fmt.Printf("client add hyperedge %v\n", time.Now()) - crdts[1].AddHyperedge(nil, op.Hyperedge) - case "RemoveHyperedge": - fmt.Printf("client remove hyperedge %v\n", time.Now()) - crdts[1].RemoveHyperedge(nil, op.Hyperedge) + // case "AddHyperedge": + // fmt.Printf("client add hyperedge %v\n", time.Now()) + // crdts[1].AddHyperedge(nil, op.Hyperedge) + // case "RemoveHyperedge": + // fmt.Printf("client remove hyperedge %v\n", time.Now()) + // crdts[1].RemoveHyperedge(nil, op.Hyperedge) } } @@ -227,10 +224,10 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[2].AddVertex(nil, op.Vertex) case "RemoveVertex": crdts[2].RemoveVertex(nil, op.Vertex) - case "AddHyperedge": - crdts[2].AddHyperedge(nil, op.Hyperedge) - case "RemoveHyperedge": - crdts[2].RemoveHyperedge(nil, op.Hyperedge) + // case "AddHyperedge": + // crdts[2].AddHyperedge(nil, op.Hyperedge) + // case "RemoveHyperedge": + // crdts[2].RemoveHyperedge(nil, op.Hyperedge) } } for _, op := range operations2 { @@ -239,32 +236,35 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[2].AddVertex(nil, op.Vertex) case "RemoveVertex": crdts[2].RemoveVertex(nil, op.Vertex) - case "AddHyperedge": - crdts[2].AddHyperedge(nil, op.Hyperedge) - case "RemoveHyperedge": - crdts[2].RemoveHyperedge(nil, op.Hyperedge) + // case "AddHyperedge": + // crdts[2].AddHyperedge(nil, op.Hyperedge) + // case "RemoveHyperedge": + // crdts[2].RemoveHyperedge(nil, op.Hyperedge) } } - crdts[0].Commit() - crdts[1].Commit() - crdts[2].Commit() + // crdts[0].Commit() + // crdts[1].Commit() + // crdts[2].Commit() // err := serverHypergraphStore.SaveHypergraph(crdts[0]) // assert.NoError(t, err) // err = clientHypergraphStore.SaveHypergraph(crdts[1]) // assert.NoError(t, err) - // serverLoad, err := serverHypergraphStore.LoadHypergraph() - // assert.NoError(t, err) - // clientLoad, err := clientHypergraphStore.LoadHypergraph() - // assert.NoError(t, err) - // assert.Len(t, crypto.CompareLeaves( - // crdts[0].GetVertexAdds()[shardKey].GetTree(), - // serverLoad.GetVertexAdds()[shardKey].GetTree(), - // ), 0) - // assert.Len(t, crypto.CompareLeaves( - // crdts[1].GetVertexAdds()[shardKey].GetTree(), - // clientLoad.GetVertexAdds()[shardKey].GetTree(), - // ), 0) + serverHypergraphStore.MarkHypergraphAsComplete() + clientHypergraphStore.MarkHypergraphAsComplete() + serverLoad, err := serverHypergraphStore.LoadHypergraph() + assert.NoError(t, err) + clientLoad, err := clientHypergraphStore.LoadHypergraph() + assert.NoError(t, err) + assert.Len(t, crypto.CompareLeaves( + crdts[0].GetVertexAdds()[shardKey].GetTree(), + serverLoad.GetVertexAdds()[shardKey].GetTree(), + ), 0) + assert.Len(t, crypto.CompareLeaves( + crdts[1].GetVertexAdds()[shardKey].GetTree(), + clientLoad.GetVertexAdds()[shardKey].GetTree(), + ), 0) + crypto.DebugNode(string(application.VertexAtomType), string(application.AddsPhaseType), shardKey, serverLoad.GetVertexAdds()[shardKey].GetTree().Root, 0, "") log.Printf("Generated data") lis, err := net.Listen("tcp", ":50051") diff --git a/node/store/hypergraph.go b/node/store/hypergraph.go index 5804566..02a9f05 100644 --- a/node/store/hypergraph.go +++ b/node/store/hypergraph.go @@ -76,6 +76,7 @@ type HypergraphStore interface { shardKey crypto.ShardKey, path []int, ) error + MarkHypergraphAsComplete() } var _ HypergraphStore = (*PebbleHypergraphStore)(nil) @@ -113,6 +114,7 @@ const ( VERTEX_REMOVES_TREE_NODE_BY_PATH = 0x32 HYPEREDGE_ADDS_TREE_NODE_BY_PATH = 0x23 HYPEREDGE_REMOVES_TREE_NODE_BY_PATH = 0x33 + HYPERGRAPH_COMPLETE = 0xFB VERTEX_ADDS_TREE_ROOT = 0xFC VERTEX_REMOVES_TREE_ROOT = 0xFD HYPEREDGE_ADDS_TREE_ROOT = 0xFE @@ -391,236 +393,255 @@ func (p *PebbleHypergraphStore) LoadHypergraph() ( hg := application.NewHypergraph(p) hypergraphDir := path.Join(p.config.Path, "hypergraph") - vertexAddsIter, err := p.db.NewIter( - []byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_ROOT}, - []byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_ROOT}, + flag, flagCloser, err := p.db.Get( + []byte{HYPERGRAPH_SHARD, HYPERGRAPH_COMPLETE}, ) - if err != nil { - return nil, errors.Wrap(err, "load hypergraph") + complete := false + inProgress := false + if err == nil { + if flag[0] == 0x01 { + inProgress = true + } else { + complete = true + } + flagCloser.Close() } - defer vertexAddsIter.Close() - loadedFromDB := false - for vertexAddsIter.First(); vertexAddsIter.Valid(); vertexAddsIter.Next() { - loadedFromDB = true - shardKey := shardKeyFromKey(vertexAddsIter.Key()) - data := vertexAddsIter.Value() - - var node crypto.LazyVectorCommitmentNode - switch data[0] { - case crypto.TypeLeaf: - node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:])) - case crypto.TypeBranch: - pathLength := binary.BigEndian.Uint32(data[1:5]) - - node, err = crypto.DeserializeBranchNode( - p, - bytes.NewReader(data[5+(pathLength*4):]), - false, - ) - - fullPrefix := []int{} - for i := range pathLength { - fullPrefix = append( - fullPrefix, - int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), - ) - } - branch := node.(*crypto.LazyVectorCommitmentBranchNode) - branch.FullPrefix = fullPrefix - default: - err = ErrInvalidData - } - - if err != nil { - return nil, errors.Wrap(err, "load hypergraph") - } - - err = hg.ImportTree( - application.VertexAtomType, - application.AddsPhaseType, - shardKey, - node, - p, + if complete || inProgress { + vertexAddsIter, err := p.db.NewIter( + []byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_ROOT}, + []byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_ROOT}, ) if err != nil { return nil, errors.Wrap(err, "load hypergraph") } - } + defer vertexAddsIter.Close() - vertexRemovesIter, err := p.db.NewIter( - []byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_ROOT}, - []byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_ROOT}, - ) - if err != nil { - return nil, errors.Wrap(err, "load hypergraph") - } - defer vertexRemovesIter.Close() + loadedFromDB := false + for vertexAddsIter.First(); vertexAddsIter.Valid(); vertexAddsIter.Next() { + loadedFromDB = true + shardKey := shardKeyFromKey(vertexAddsIter.Key()) + data := vertexAddsIter.Value() - for vertexRemovesIter.First(); vertexRemovesIter.Valid(); vertexRemovesIter.Next() { - loadedFromDB = true - shardKey := shardKeyFromKey(vertexRemovesIter.Key()) - data := vertexRemovesIter.Value() + var node crypto.LazyVectorCommitmentNode + switch data[0] { + case crypto.TypeLeaf: + node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:])) + case crypto.TypeBranch: + pathLength := binary.BigEndian.Uint32(data[1:5]) - var node crypto.LazyVectorCommitmentNode - switch data[0] { - case crypto.TypeLeaf: - node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:])) - case crypto.TypeBranch: - pathLength := binary.BigEndian.Uint32(data[1:5]) - - node, err = crypto.DeserializeBranchNode( - p, - bytes.NewReader(data[5+(pathLength*4):]), - false, - ) - - fullPrefix := []int{} - for i := range pathLength { - fullPrefix = append( - fullPrefix, - int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), + node, err = crypto.DeserializeBranchNode( + p, + bytes.NewReader(data[5+(pathLength*4):]), + false, ) + + fullPrefix := []int{} + for i := range pathLength { + fullPrefix = append( + fullPrefix, + int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), + ) + } + branch := node.(*crypto.LazyVectorCommitmentBranchNode) + branch.FullPrefix = fullPrefix + default: + err = ErrInvalidData + } + + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") + } + + err = hg.ImportTree( + application.VertexAtomType, + application.AddsPhaseType, + shardKey, + node, + p, + ) + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") } - branch := node.(*crypto.LazyVectorCommitmentBranchNode) - branch.FullPrefix = fullPrefix - default: - err = ErrInvalidData } - if err != nil { - return nil, errors.Wrap(err, "load hypergraph") - } - - err = hg.ImportTree( - application.VertexAtomType, - application.RemovesPhaseType, - shardKey, - node, - p, + vertexRemovesIter, err := p.db.NewIter( + []byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_ROOT}, + []byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_ROOT}, ) if err != nil { return nil, errors.Wrap(err, "load hypergraph") } - } + defer vertexRemovesIter.Close() - hyperedgeAddsIter, err := p.db.NewIter( - []byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_ROOT}, - []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT}, - ) - if err != nil { - return nil, errors.Wrap(err, "load hypergraph") - } - defer hyperedgeAddsIter.Close() + for vertexRemovesIter.First(); vertexRemovesIter.Valid(); vertexRemovesIter.Next() { + loadedFromDB = true + shardKey := shardKeyFromKey(vertexRemovesIter.Key()) + data := vertexRemovesIter.Value() - for hyperedgeAddsIter.First(); hyperedgeAddsIter.Valid(); hyperedgeAddsIter.Next() { - loadedFromDB = true - shardKey := shardKeyFromKey(hyperedgeAddsIter.Key()) - data := hyperedgeAddsIter.Value() + var node crypto.LazyVectorCommitmentNode + switch data[0] { + case crypto.TypeLeaf: + node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:])) + case crypto.TypeBranch: + pathLength := binary.BigEndian.Uint32(data[1:5]) - var node crypto.LazyVectorCommitmentNode - switch data[0] { - case crypto.TypeLeaf: - node, err = crypto.DeserializeLeafNode( - p, - bytes.NewReader(data[1:]), - ) - case crypto.TypeBranch: - pathLength := binary.BigEndian.Uint32(data[1:5]) - - node, err = crypto.DeserializeBranchNode( - p, - bytes.NewReader(data[5+(pathLength*4):]), - false, - ) - - fullPrefix := []int{} - for i := range pathLength { - fullPrefix = append( - fullPrefix, - int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), + node, err = crypto.DeserializeBranchNode( + p, + bytes.NewReader(data[5+(pathLength*4):]), + false, ) + + fullPrefix := []int{} + for i := range pathLength { + fullPrefix = append( + fullPrefix, + int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), + ) + } + branch := node.(*crypto.LazyVectorCommitmentBranchNode) + branch.FullPrefix = fullPrefix + default: + err = ErrInvalidData + } + + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") + } + + err = hg.ImportTree( + application.VertexAtomType, + application.RemovesPhaseType, + shardKey, + node, + p, + ) + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") } - branch := node.(*crypto.LazyVectorCommitmentBranchNode) - branch.FullPrefix = fullPrefix - default: - err = ErrInvalidData } - if err != nil { - return nil, errors.Wrap(err, "load hypergraph") - } - - err = hg.ImportTree( - application.HyperedgeAtomType, - application.AddsPhaseType, - shardKey, - node, - p, + hyperedgeAddsIter, err := p.db.NewIter( + []byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_ROOT}, + []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT}, ) if err != nil { return nil, errors.Wrap(err, "load hypergraph") } - } + defer hyperedgeAddsIter.Close() - hyperedgeRemovesIter, err := p.db.NewIter( - []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT}, - []byte{(HYPERGRAPH_SHARD + 1), 0x00}, - ) - if err != nil { - return nil, errors.Wrap(err, "load hypergraph") - } - defer hyperedgeRemovesIter.Close() + for hyperedgeAddsIter.First(); hyperedgeAddsIter.Valid(); hyperedgeAddsIter.Next() { + loadedFromDB = true + shardKey := shardKeyFromKey(hyperedgeAddsIter.Key()) + data := hyperedgeAddsIter.Value() - for hyperedgeRemovesIter.First(); hyperedgeRemovesIter.Valid(); hyperedgeRemovesIter.Next() { - loadedFromDB = true - shardKey := shardKeyFromKey(hyperedgeRemovesIter.Key()) - data := hyperedgeRemovesIter.Value() - - var node crypto.LazyVectorCommitmentNode - switch data[0] { - case crypto.TypeLeaf: - node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:])) - case crypto.TypeBranch: - pathLength := binary.BigEndian.Uint32(data[1:5]) - - node, err = crypto.DeserializeBranchNode( - p, - bytes.NewReader(data[5+(pathLength*4):]), - false, - ) - - fullPrefix := []int{} - for i := range pathLength { - fullPrefix = append( - fullPrefix, - int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), + var node crypto.LazyVectorCommitmentNode + switch data[0] { + case crypto.TypeLeaf: + node, err = crypto.DeserializeLeafNode( + p, + bytes.NewReader(data[1:]), ) + case crypto.TypeBranch: + pathLength := binary.BigEndian.Uint32(data[1:5]) + node, err = crypto.DeserializeBranchNode( + p, + bytes.NewReader(data[5+(pathLength*4):]), + false, + ) + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") + } + + fullPrefix := []int{} + for i := range pathLength { + fullPrefix = append( + fullPrefix, + int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), + ) + } + + branch := node.(*crypto.LazyVectorCommitmentBranchNode) + branch.FullPrefix = fullPrefix + default: + err = ErrInvalidData + } + + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") + } + + err = hg.ImportTree( + application.HyperedgeAtomType, + application.AddsPhaseType, + shardKey, + node, + p, + ) + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") } - branch := node.(*crypto.LazyVectorCommitmentBranchNode) - branch.FullPrefix = fullPrefix - default: - err = ErrInvalidData } - if err != nil { - return nil, errors.Wrap(err, "load hypergraph") - } - - err = hg.ImportTree( - application.HyperedgeAtomType, - application.RemovesPhaseType, - shardKey, - node, - p, + hyperedgeRemovesIter, err := p.db.NewIter( + []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT}, + []byte{(HYPERGRAPH_SHARD + 1), 0x00}, ) if err != nil { return nil, errors.Wrap(err, "load hypergraph") } - } + defer hyperedgeRemovesIter.Close() - if loadedFromDB { - return hg, nil + for hyperedgeRemovesIter.First(); hyperedgeRemovesIter.Valid(); hyperedgeRemovesIter.Next() { + loadedFromDB = true + shardKey := shardKeyFromKey(hyperedgeRemovesIter.Key()) + data := hyperedgeRemovesIter.Value() + + var node crypto.LazyVectorCommitmentNode + switch data[0] { + case crypto.TypeLeaf: + node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:])) + case crypto.TypeBranch: + pathLength := binary.BigEndian.Uint32(data[1:5]) + + node, err = crypto.DeserializeBranchNode( + p, + bytes.NewReader(data[5+(pathLength*4):]), + false, + ) + + fullPrefix := []int{} + for i := range pathLength { + fullPrefix = append( + fullPrefix, + int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])), + ) + } + branch := node.(*crypto.LazyVectorCommitmentBranchNode) + branch.FullPrefix = fullPrefix + default: + err = ErrInvalidData + } + + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") + } + + err = hg.ImportTree( + application.HyperedgeAtomType, + application.RemovesPhaseType, + shardKey, + node, + p, + ) + if err != nil { + return nil, errors.Wrap(err, "load hypergraph") + } + } + + if loadedFromDB && complete { + return hg, nil + } } vertexAddsPrefix := hex.EncodeToString( @@ -637,6 +658,46 @@ func (p *PebbleHypergraphStore) LoadHypergraph() ( ) p.logger.Info("converting hypergraph, this may take a moment") + if !inProgress { + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_ROOT}, + []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT}, + ) + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_NODE}, + []byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_NODE + 1}, + ) + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_NODE}, + []byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_NODE + 1}, + ) + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_NODE}, + []byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_NODE + 1}, + ) + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_NODE}, + []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_NODE + 1}, + ) + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_NODE_BY_PATH}, + []byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_NODE_BY_PATH + 1}, + ) + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_NODE_BY_PATH}, + []byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_NODE_BY_PATH + 1}, + ) + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_NODE_BY_PATH}, + []byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_NODE_BY_PATH + 1}, + ) + p.db.DeleteRange( + []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_NODE_BY_PATH}, + []byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_NODE_BY_PATH + 1}, + ) + p.db.Set([]byte{HYPERGRAPH_SHARD, HYPERGRAPH_COMPLETE}, []byte{0x01}) + } + err = errors.Wrap( filepath.WalkDir( hypergraphDir, @@ -698,12 +759,31 @@ func (p *PebbleHypergraphStore) LoadHypergraph() ( return err } + var existingTree *crypto.LazyVectorCommitmentTree + if inProgress { + existing, ok := hg.GetVertexAdds()[shardKeyFromKey(shardSet)] + if ok { + existingTree = existing.GetTree() + } + } + txn, err := p.NewTransaction(false) if err != nil { return err } + size := len(atoms) for i, atom := range atoms { - hg.AddVertex(txn, atom.(application.Vertex)) + vert, ok := atom.(application.Vertex) + if !ok { + continue + } + id := vert.GetID() + if existingTree != nil { + if v, _ := existingTree.Get(id[:]); v == nil { + continue + } + } + hg.AddVertex(txn, vert) if i%100 == 99 { if err := txn.Commit(); err != nil { @@ -714,8 +794,17 @@ func (p *PebbleHypergraphStore) LoadHypergraph() ( if err != nil { return err } + p.logger.Info( + "converted batch", + zap.Float32("percentage", float32(i*100)/float32(size)), + ) } } + + p.logger.Info( + "converted batch", + zap.Float32("percentage", float32(100)), + ) if txn != nil { if err := txn.Commit(); err != nil { txn.Abort() @@ -723,6 +812,7 @@ func (p *PebbleHypergraphStore) LoadHypergraph() ( } } + p.db.Set([]byte{HYPERGRAPH_SHARD, HYPERGRAPH_COMPLETE}, []byte{0x02}) return nil }, ), @@ -735,6 +825,10 @@ func (p *PebbleHypergraphStore) LoadHypergraph() ( return hg, nil } +func (p *PebbleHypergraphStore) MarkHypergraphAsComplete() { + p.db.Set([]byte{HYPERGRAPH_SHARD, HYPERGRAPH_COMPLETE}, []byte{0x02}) +} + func (p *PebbleHypergraphStore) SaveHypergraph( hg *application.Hypergraph, ) error { @@ -1023,13 +1117,12 @@ func (p *PebbleHypergraphStore) GetNodeByPath( func generateSlices(fullpref, pref []int) [][]int { result := [][]int{} - if len(pref) >= len(fullpref) { - return [][]int{fullpref} + if len(pref) > len(fullpref) { + panic("invalid prefix length") } - for i := 0; i <= len(pref); i++ { - newLen := len(fullpref) - i - newSlice := make([]int, newLen) - copy(newSlice, fullpref[:newLen]) + for i := len(pref); i <= len(fullpref); i++ { + newSlice := make([]int, i) + copy(newSlice, fullpref[:i]) result = append(result, newSlice) } @@ -1077,11 +1170,11 @@ func (p *PebbleHypergraphStore) InsertNode( nodeKey := keyFn(shardKey, key) switch n := node.(type) { case *crypto.LazyVectorCommitmentBranchNode: - length := uint32(len(path)) + length := uint32(len(n.FullPrefix)) pathBytes := []byte{} pathBytes = binary.BigEndian.AppendUint32(pathBytes, length) for i := range int(length) { - pathBytes = binary.BigEndian.AppendUint32(pathBytes, uint32(path[i])) + pathBytes = binary.BigEndian.AppendUint32(pathBytes, uint32(n.FullPrefix[i])) } err := crypto.SerializeBranchNode(&b, n, false) if err != nil { @@ -1169,7 +1262,7 @@ func (p *PebbleHypergraphStore) SaveRoot( if err != nil { return errors.Wrap(err, "insert node") } - data := append([]byte{crypto.TypeBranch}, b.Bytes()...) + data := append([]byte{crypto.TypeLeaf}, b.Bytes()...) err = p.db.Set(nodeKey, data) return errors.Wrap(err, "insert node") } diff --git a/node/store/inmem.go b/node/store/inmem.go index 54110a9..e103826 100644 --- a/node/store/inmem.go +++ b/node/store/inmem.go @@ -305,6 +305,10 @@ func NewInMemKVDB() *InMemKVDB { } } +func (d *InMemKVDB) GetRawStore() map[string][]byte { + return d.store +} + func (d *InMemKVDB) Get(key []byte) ([]byte, io.Closer, error) { if !d.open { return nil, nil, errors.New("inmem db closed")