resolve issues with conversion, fix streaming

This commit is contained in:
Cassandra Heart 2025-03-25 00:43:06 -05:00
parent 405f087587
commit 6d3a301b7c
No known key found for this signature in database
GPG Key ID: 6352152859385958
12 changed files with 403 additions and 282 deletions

1
bedlam/apps/garbled/data/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
pre*

View File

@ -156,11 +156,12 @@ func (c *Compiler) stream(conn *p2p.Conn, oti ot.OT, source string,
out, bits, err := program.Stream(conn, oti, c.params, input, timing)
if err != nil {
fmt.Println(err)
return nil, nil, err
}
if false {
program.StreamDebug()
}
// if false {
program.StreamDebug()
// }
return out, bits, err
}

View File

@ -719,6 +719,9 @@ func (prog *Program) garble(conn *p2p.Conn, streaming *circuit.Streaming,
if err := conn.SendUint32(int(maxID + 1)); err != nil {
return err
}
if err := conn.Flush(); err != nil {
return err
}
tInit, tGarble, err := streaming.Garble(circ, in, out)
if err != nil {
return err

View File

@ -162,7 +162,7 @@ func (n *LazyVectorCommitmentBranchNode) Commit(
phaseType,
shardKey,
generateKeyFromPath(n.FullPrefix),
n.FullPrefix,
path,
n,
); err != nil {
panic(err)
@ -444,7 +444,7 @@ func (t *LazyVectorCommitmentTree) Insert(
t.PhaseType,
t.ShardKey,
generateKeyFromPath(slices.Concat(path, sharedNibbles)),
slices.Concat(path, sharedNibbles),
path,
branch,
)
if err != nil {
@ -499,13 +499,15 @@ func (t *LazyVectorCommitmentTree) Insert(
path,
newBranch.Prefix,
[]int{expectedNibble},
n.Prefix,
)
err = t.Store.InsertNode(
txn,
t.SetType,
t.PhaseType,
t.ShardKey,
generateKeyFromPath(slices.Concat(path, newBranch.Prefix, []int{expectedNibble})),
generateKeyFromPath(slices.Concat(path, newBranch.Prefix, []int{expectedNibble}, n.Prefix)),
slices.Concat(path, newBranch.Prefix, []int{expectedNibble}),
newBranch.Children[expectedNibble],
)
@ -520,7 +522,7 @@ func (t *LazyVectorCommitmentTree) Insert(
t.PhaseType,
t.ShardKey,
generateKeyFromPath(slices.Concat(path, newBranch.Prefix)),
slices.Concat(path, newBranch.Prefix),
path,
newBranch,
)
if err != nil {
@ -534,11 +536,8 @@ func (t *LazyVectorCommitmentTree) Insert(
// Key matches prefix, continue with final nibble
finalNibble := getNextNibble(key, depth+len(n.Prefix)*BranchBits)
maybeBranch, ok := n.Children[finalNibble].(*LazyVectorCommitmentBranchNode)
newPath := slices.Concat(path, n.Prefix, []int{finalNibble})
if ok {
newPath = append(newPath, maybeBranch.Prefix...)
}
delta, inserted := insert(
n.Children[finalNibble],
depth+len(n.Prefix)*BranchBits+BranchBits,
@ -577,11 +576,8 @@ func (t *LazyVectorCommitmentTree) Insert(
} else {
// Simple branch without prefix
nibble := getNextNibble(key, depth)
maybeBranch, ok := n.Children[nibble].(*LazyVectorCommitmentBranchNode)
newPath := slices.Concat(path, n.Prefix, []int{nibble})
if ok {
newPath = append(newPath, maybeBranch.Prefix...)
}
delta, inserted := insert(n.Children[nibble], depth+BranchBits, newPath)
n.Children[nibble] = inserted
n.Commitment = nil
@ -619,13 +615,7 @@ func (t *LazyVectorCommitmentTree) Insert(
return 0, nil
}
prefix := []int{}
branch, ok := t.Root.(*LazyVectorCommitmentBranchNode)
if ok {
prefix = append(prefix, branch.Prefix...)
}
_, t.Root = insert(t.Root, 0, prefix)
_, t.Root = insert(t.Root, 0, []int{})
return errors.Wrap(t.Store.SaveRoot(
t.SetType,
t.PhaseType,

View File

@ -8,6 +8,8 @@ import (
"errors"
"fmt"
"math/big"
"slices"
"strings"
"sync"
rbls48581 "source.quilibrium.com/quilibrium/monorepo/bls48581"
@ -200,7 +202,7 @@ type VectorCommitmentTree struct {
func getNextNibble(key []byte, pos int) int {
startByte := pos / 8
if startByte >= len(key) {
return 0
return -1
}
// Calculate how many bits we need from the current byte
@ -232,7 +234,7 @@ func getNibblesUntilDiverge(key1, key2 []byte, startDepth int) ([]int, int) {
for {
n1 := getNextNibble(key1, depth)
n2 := getNextNibble(key2, depth)
if n1 != n2 {
if n1 == -1 || n2 == -1 || n1 != n2 {
return nibbles, depth
}
nibbles = append(nibbles, n1)
@ -613,20 +615,32 @@ func (t *VectorCommitmentTree) GetSize() *big.Int {
return t.Root.GetSize()
}
func DebugNode(node VectorCommitmentNode, depth int, prefix string) {
func DebugNode(setType, phaseType string, shardKey ShardKey, node LazyVectorCommitmentNode, depth int, prefix string) {
if node == nil {
return
}
switch n := node.(type) {
case *VectorCommitmentLeafNode:
case *LazyVectorCommitmentLeafNode:
fmt.Printf("%sLeaf: key=%x value=%x\n", prefix, n.Key, n.Value)
case *VectorCommitmentBranchNode:
case *LazyVectorCommitmentBranchNode:
fmt.Printf("%sBranch %v:\n", prefix, n.Prefix)
for i, child := range n.Children {
if child == nil {
var err error
child, err = n.Store.GetNodeByPath(
setType,
phaseType,
shardKey,
slices.Concat(n.FullPrefix, []int{i}),
)
if err != nil && !strings.Contains(err.Error(), "not found") {
panic(err)
}
}
if child != nil {
fmt.Printf("%s [%d]:\n", prefix, i)
DebugNode(child, depth+1, prefix+" ")
DebugNode(setType, phaseType, shardKey, child, depth+1, prefix+" ")
}
}
}

View File

@ -379,5 +379,5 @@ func TestVectorCommitmentTrees(t *testing.T) {
t.Errorf("incorrect longest branch count, %d, %d,", 4, longestBranch)
}
DebugNode(tree.Root, 0, "")
// DebugNode(tree.Root, 0, "")
}

View File

@ -185,8 +185,6 @@ func CompareLeaves(tree1, tree2 *LazyVectorCommitmentTree) []LeafDifference {
tree2.ShardKey,
tree2.Root,
)
fmt.Println(len(leaves1))
fmt.Println(len(leaves2))
differences := make([]LeafDifference, 0)
@ -323,6 +321,8 @@ func GetAllLeaves(
leaves = append(leaves, n)
case *LazyVectorCommitmentBranchNode:
for i, child := range n.Children {
child := child
i := i
var err error
if child == nil {
child, err = n.Store.GetNodeByPath(

View File

@ -972,6 +972,7 @@ func (e *TokenExecutionEngine) rebuildMissingSetForHypergraph(set [][]byte) {
txn.Abort()
panic(err)
}
e.hypergraphStore.MarkHypergraphAsComplete()
}
func (e *TokenExecutionEngine) rebuildHypergraph(totalRange int) {
@ -1057,6 +1058,7 @@ func (e *TokenExecutionEngine) rebuildHypergraph(totalRange int) {
txn.Abort()
panic(err)
}
e.hypergraphStore.MarkHypergraphAsComplete()
}
// GetName implements ExecutionEngine

View File

@ -365,6 +365,7 @@ func (s *streamManager) sendLeafData(
default:
}
fmt.Printf("send leaf data %v\n", path)
node := getNodeAtPath(
s.localTree.SetType,
s.localTree.PhaseType,
@ -381,6 +382,7 @@ func (s *streamManager) sendLeafData(
s.localTree.ShardKey,
node,
)
s.logger.Info("sending set of leaves", zap.Int("leaf_count", len(children)))
for _, child := range children {
if child == nil {
continue
@ -412,6 +414,7 @@ func getNodeAtPath(
return nil
}
if len(path) == 0 {
fmt.Println("path is zero, returning")
return node
}
@ -419,6 +422,7 @@ func getNodeAtPath(
case *crypto.LazyVectorCommitmentLeafNode:
return node
case *crypto.LazyVectorCommitmentBranchNode:
fmt.Printf("path %v\nprefix %v\n", path, n.Prefix)
// Check that the branch's prefix matches the beginning of the query path.
if len(path) < len(n.Prefix) {
return nil
@ -426,6 +430,7 @@ func getNodeAtPath(
for i, nib := range n.Prefix {
if int32(nib) != path[i] {
fmt.Println("no viable prefix")
return nil
}
}
@ -433,15 +438,18 @@ func getNodeAtPath(
// Remove the prefix portion from the path.
remainder := path[len(n.Prefix):]
if len(remainder) == 0 {
fmt.Println("remainder 0, return node")
return node
}
// The first element of the remainder selects the child.
childIndex := remainder[0]
if int(childIndex) < 0 || int(childIndex) >= len(n.Children) {
fmt.Println("invalid child index")
return nil
}
fmt.Printf("get node at path %v\n", slices.Concat(n.FullPrefix, []int{int(childIndex)}))
child, err := n.Store.GetNodeByPath(
setType,
phaseType,
@ -456,6 +464,7 @@ func getNodeAtPath(
return nil
}
fmt.Println("recurse")
return getNodeAtPath(
setType,
phaseType,
@ -740,7 +749,11 @@ func (s *streamManager) walk(
pathString := zap.String("path", hex.EncodeToString(packPath(path)))
if bytes.Equal(lnode.Commitment, rnode.Commitment) {
s.logger.Info("commitments match", pathString)
s.logger.Info(
"commitments match",
pathString,
zap.String("commitment", hex.EncodeToString(lnode.Commitment)),
)
return nil
}

View File

@ -10,7 +10,6 @@ import (
"math/big"
"net"
"testing"
"time"
"github.com/cloudflare/circl/sign/ed448"
pcrypto "github.com/libp2p/go-libp2p/core/crypto"
@ -158,17 +157,16 @@ func TestHypergraphSyncServer(t *testing.T) {
switch op.Type {
case "AddVertex":
id := op.Vertex.GetID()
fmt.Printf("server add vertex %x %v\n", id, time.Now())
serverHypergraphStore.SaveVertexTree(txn, id[:], dataTree)
crdts[0].AddVertex(nil, op.Vertex)
case "RemoveVertex":
crdts[0].RemoveVertex(nil, op.Vertex)
case "AddHyperedge":
fmt.Printf("server add hyperedge %v\n", time.Now())
crdts[0].AddHyperedge(nil, op.Hyperedge)
case "RemoveHyperedge":
fmt.Printf("server remove hyperedge %v\n", time.Now())
crdts[0].RemoveHyperedge(nil, op.Hyperedge)
// case "AddHyperedge":
// fmt.Printf("server add hyperedge %v\n", time.Now())
// crdts[0].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// fmt.Printf("server remove hyperedge %v\n", time.Now())
// crdts[0].RemoveHyperedge(nil, op.Hyperedge)
}
}
txn.Commit()
@ -178,12 +176,12 @@ func TestHypergraphSyncServer(t *testing.T) {
crdts[0].AddVertex(nil, op.Vertex)
case "RemoveVertex":
crdts[0].RemoveVertex(nil, op.Vertex)
case "AddHyperedge":
fmt.Printf("server add hyperedge %v\n", time.Now())
crdts[0].AddHyperedge(nil, op.Hyperedge)
case "RemoveHyperedge":
fmt.Printf("server remove hyperedge %v\n", time.Now())
crdts[0].RemoveHyperedge(nil, op.Hyperedge)
// case "AddHyperedge":
// fmt.Printf("server add hyperedge %v\n", time.Now())
// crdts[0].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// fmt.Printf("server remove hyperedge %v\n", time.Now())
// crdts[0].RemoveHyperedge(nil, op.Hyperedge)
}
}
@ -192,17 +190,16 @@ func TestHypergraphSyncServer(t *testing.T) {
switch op.Type {
case "AddVertex":
id := op.Vertex.GetID()
fmt.Printf("client add vertex %x %v\n", id, time.Now())
clientHypergraphStore.SaveVertexTree(txn, id[:], dataTree)
crdts[1].AddVertex(nil, op.Vertex)
case "RemoveVertex":
crdts[1].RemoveVertex(nil, op.Vertex)
case "AddHyperedge":
fmt.Printf("client add hyperedge %v\n", time.Now())
crdts[1].AddHyperedge(nil, op.Hyperedge)
case "RemoveHyperedge":
fmt.Printf("client remove hyperedge %v\n", time.Now())
crdts[1].RemoveHyperedge(nil, op.Hyperedge)
// case "AddHyperedge":
// fmt.Printf("client add hyperedge %v\n", time.Now())
// crdts[1].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// fmt.Printf("client remove hyperedge %v\n", time.Now())
// crdts[1].RemoveHyperedge(nil, op.Hyperedge)
}
}
txn.Commit()
@ -212,12 +209,12 @@ func TestHypergraphSyncServer(t *testing.T) {
crdts[1].AddVertex(nil, op.Vertex)
case "RemoveVertex":
crdts[1].RemoveVertex(nil, op.Vertex)
case "AddHyperedge":
fmt.Printf("client add hyperedge %v\n", time.Now())
crdts[1].AddHyperedge(nil, op.Hyperedge)
case "RemoveHyperedge":
fmt.Printf("client remove hyperedge %v\n", time.Now())
crdts[1].RemoveHyperedge(nil, op.Hyperedge)
// case "AddHyperedge":
// fmt.Printf("client add hyperedge %v\n", time.Now())
// crdts[1].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// fmt.Printf("client remove hyperedge %v\n", time.Now())
// crdts[1].RemoveHyperedge(nil, op.Hyperedge)
}
}
@ -227,10 +224,10 @@ func TestHypergraphSyncServer(t *testing.T) {
crdts[2].AddVertex(nil, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(nil, op.Vertex)
case "AddHyperedge":
crdts[2].AddHyperedge(nil, op.Hyperedge)
case "RemoveHyperedge":
crdts[2].RemoveHyperedge(nil, op.Hyperedge)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
for _, op := range operations2 {
@ -239,32 +236,35 @@ func TestHypergraphSyncServer(t *testing.T) {
crdts[2].AddVertex(nil, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(nil, op.Vertex)
case "AddHyperedge":
crdts[2].AddHyperedge(nil, op.Hyperedge)
case "RemoveHyperedge":
crdts[2].RemoveHyperedge(nil, op.Hyperedge)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
crdts[0].Commit()
crdts[1].Commit()
crdts[2].Commit()
// crdts[0].Commit()
// crdts[1].Commit()
// crdts[2].Commit()
// err := serverHypergraphStore.SaveHypergraph(crdts[0])
// assert.NoError(t, err)
// err = clientHypergraphStore.SaveHypergraph(crdts[1])
// assert.NoError(t, err)
// serverLoad, err := serverHypergraphStore.LoadHypergraph()
// assert.NoError(t, err)
// clientLoad, err := clientHypergraphStore.LoadHypergraph()
// assert.NoError(t, err)
// assert.Len(t, crypto.CompareLeaves(
// crdts[0].GetVertexAdds()[shardKey].GetTree(),
// serverLoad.GetVertexAdds()[shardKey].GetTree(),
// ), 0)
// assert.Len(t, crypto.CompareLeaves(
// crdts[1].GetVertexAdds()[shardKey].GetTree(),
// clientLoad.GetVertexAdds()[shardKey].GetTree(),
// ), 0)
serverHypergraphStore.MarkHypergraphAsComplete()
clientHypergraphStore.MarkHypergraphAsComplete()
serverLoad, err := serverHypergraphStore.LoadHypergraph()
assert.NoError(t, err)
clientLoad, err := clientHypergraphStore.LoadHypergraph()
assert.NoError(t, err)
assert.Len(t, crypto.CompareLeaves(
crdts[0].GetVertexAdds()[shardKey].GetTree(),
serverLoad.GetVertexAdds()[shardKey].GetTree(),
), 0)
assert.Len(t, crypto.CompareLeaves(
crdts[1].GetVertexAdds()[shardKey].GetTree(),
clientLoad.GetVertexAdds()[shardKey].GetTree(),
), 0)
crypto.DebugNode(string(application.VertexAtomType), string(application.AddsPhaseType), shardKey, serverLoad.GetVertexAdds()[shardKey].GetTree().Root, 0, "")
log.Printf("Generated data")
lis, err := net.Listen("tcp", ":50051")

View File

@ -76,6 +76,7 @@ type HypergraphStore interface {
shardKey crypto.ShardKey,
path []int,
) error
MarkHypergraphAsComplete()
}
var _ HypergraphStore = (*PebbleHypergraphStore)(nil)
@ -113,6 +114,7 @@ const (
VERTEX_REMOVES_TREE_NODE_BY_PATH = 0x32
HYPEREDGE_ADDS_TREE_NODE_BY_PATH = 0x23
HYPEREDGE_REMOVES_TREE_NODE_BY_PATH = 0x33
HYPERGRAPH_COMPLETE = 0xFB
VERTEX_ADDS_TREE_ROOT = 0xFC
VERTEX_REMOVES_TREE_ROOT = 0xFD
HYPEREDGE_ADDS_TREE_ROOT = 0xFE
@ -391,236 +393,255 @@ func (p *PebbleHypergraphStore) LoadHypergraph() (
hg := application.NewHypergraph(p)
hypergraphDir := path.Join(p.config.Path, "hypergraph")
vertexAddsIter, err := p.db.NewIter(
[]byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_ROOT},
[]byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_ROOT},
flag, flagCloser, err := p.db.Get(
[]byte{HYPERGRAPH_SHARD, HYPERGRAPH_COMPLETE},
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
complete := false
inProgress := false
if err == nil {
if flag[0] == 0x01 {
inProgress = true
} else {
complete = true
}
flagCloser.Close()
}
defer vertexAddsIter.Close()
loadedFromDB := false
for vertexAddsIter.First(); vertexAddsIter.Valid(); vertexAddsIter.Next() {
loadedFromDB = true
shardKey := shardKeyFromKey(vertexAddsIter.Key())
data := vertexAddsIter.Value()
var node crypto.LazyVectorCommitmentNode
switch data[0] {
case crypto.TypeLeaf:
node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:]))
case crypto.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
node, err = crypto.DeserializeBranchNode(
p,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
)
}
branch := node.(*crypto.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
err = ErrInvalidData
}
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
err = hg.ImportTree(
application.VertexAtomType,
application.AddsPhaseType,
shardKey,
node,
p,
if complete || inProgress {
vertexAddsIter, err := p.db.NewIter(
[]byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_ROOT},
[]byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_ROOT},
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
}
defer vertexAddsIter.Close()
vertexRemovesIter, err := p.db.NewIter(
[]byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_ROOT},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_ROOT},
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
defer vertexRemovesIter.Close()
loadedFromDB := false
for vertexAddsIter.First(); vertexAddsIter.Valid(); vertexAddsIter.Next() {
loadedFromDB = true
shardKey := shardKeyFromKey(vertexAddsIter.Key())
data := vertexAddsIter.Value()
for vertexRemovesIter.First(); vertexRemovesIter.Valid(); vertexRemovesIter.Next() {
loadedFromDB = true
shardKey := shardKeyFromKey(vertexRemovesIter.Key())
data := vertexRemovesIter.Value()
var node crypto.LazyVectorCommitmentNode
switch data[0] {
case crypto.TypeLeaf:
node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:]))
case crypto.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
var node crypto.LazyVectorCommitmentNode
switch data[0] {
case crypto.TypeLeaf:
node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:]))
case crypto.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
node, err = crypto.DeserializeBranchNode(
p,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
node, err = crypto.DeserializeBranchNode(
p,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
)
}
branch := node.(*crypto.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
err = ErrInvalidData
}
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
err = hg.ImportTree(
application.VertexAtomType,
application.AddsPhaseType,
shardKey,
node,
p,
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
branch := node.(*crypto.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
err = ErrInvalidData
}
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
err = hg.ImportTree(
application.VertexAtomType,
application.RemovesPhaseType,
shardKey,
node,
p,
vertexRemovesIter, err := p.db.NewIter(
[]byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_ROOT},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_ROOT},
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
}
defer vertexRemovesIter.Close()
hyperedgeAddsIter, err := p.db.NewIter(
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_ROOT},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT},
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
defer hyperedgeAddsIter.Close()
for vertexRemovesIter.First(); vertexRemovesIter.Valid(); vertexRemovesIter.Next() {
loadedFromDB = true
shardKey := shardKeyFromKey(vertexRemovesIter.Key())
data := vertexRemovesIter.Value()
for hyperedgeAddsIter.First(); hyperedgeAddsIter.Valid(); hyperedgeAddsIter.Next() {
loadedFromDB = true
shardKey := shardKeyFromKey(hyperedgeAddsIter.Key())
data := hyperedgeAddsIter.Value()
var node crypto.LazyVectorCommitmentNode
switch data[0] {
case crypto.TypeLeaf:
node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:]))
case crypto.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
var node crypto.LazyVectorCommitmentNode
switch data[0] {
case crypto.TypeLeaf:
node, err = crypto.DeserializeLeafNode(
p,
bytes.NewReader(data[1:]),
)
case crypto.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
node, err = crypto.DeserializeBranchNode(
p,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
node, err = crypto.DeserializeBranchNode(
p,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
)
}
branch := node.(*crypto.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
err = ErrInvalidData
}
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
err = hg.ImportTree(
application.VertexAtomType,
application.RemovesPhaseType,
shardKey,
node,
p,
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
branch := node.(*crypto.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
err = ErrInvalidData
}
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
err = hg.ImportTree(
application.HyperedgeAtomType,
application.AddsPhaseType,
shardKey,
node,
p,
hyperedgeAddsIter, err := p.db.NewIter(
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_ROOT},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT},
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
}
defer hyperedgeAddsIter.Close()
hyperedgeRemovesIter, err := p.db.NewIter(
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT},
[]byte{(HYPERGRAPH_SHARD + 1), 0x00},
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
defer hyperedgeRemovesIter.Close()
for hyperedgeAddsIter.First(); hyperedgeAddsIter.Valid(); hyperedgeAddsIter.Next() {
loadedFromDB = true
shardKey := shardKeyFromKey(hyperedgeAddsIter.Key())
data := hyperedgeAddsIter.Value()
for hyperedgeRemovesIter.First(); hyperedgeRemovesIter.Valid(); hyperedgeRemovesIter.Next() {
loadedFromDB = true
shardKey := shardKeyFromKey(hyperedgeRemovesIter.Key())
data := hyperedgeRemovesIter.Value()
var node crypto.LazyVectorCommitmentNode
switch data[0] {
case crypto.TypeLeaf:
node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:]))
case crypto.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
node, err = crypto.DeserializeBranchNode(
p,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
var node crypto.LazyVectorCommitmentNode
switch data[0] {
case crypto.TypeLeaf:
node, err = crypto.DeserializeLeafNode(
p,
bytes.NewReader(data[1:]),
)
case crypto.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
node, err = crypto.DeserializeBranchNode(
p,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
)
}
branch := node.(*crypto.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
err = ErrInvalidData
}
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
err = hg.ImportTree(
application.HyperedgeAtomType,
application.AddsPhaseType,
shardKey,
node,
p,
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
branch := node.(*crypto.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
err = ErrInvalidData
}
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
err = hg.ImportTree(
application.HyperedgeAtomType,
application.RemovesPhaseType,
shardKey,
node,
p,
hyperedgeRemovesIter, err := p.db.NewIter(
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT},
[]byte{(HYPERGRAPH_SHARD + 1), 0x00},
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
}
defer hyperedgeRemovesIter.Close()
if loadedFromDB {
return hg, nil
for hyperedgeRemovesIter.First(); hyperedgeRemovesIter.Valid(); hyperedgeRemovesIter.Next() {
loadedFromDB = true
shardKey := shardKeyFromKey(hyperedgeRemovesIter.Key())
data := hyperedgeRemovesIter.Value()
var node crypto.LazyVectorCommitmentNode
switch data[0] {
case crypto.TypeLeaf:
node, err = crypto.DeserializeLeafNode(p, bytes.NewReader(data[1:]))
case crypto.TypeBranch:
pathLength := binary.BigEndian.Uint32(data[1:5])
node, err = crypto.DeserializeBranchNode(
p,
bytes.NewReader(data[5+(pathLength*4):]),
false,
)
fullPrefix := []int{}
for i := range pathLength {
fullPrefix = append(
fullPrefix,
int(binary.BigEndian.Uint32(data[5+(i*4):5+((i+1)*4)])),
)
}
branch := node.(*crypto.LazyVectorCommitmentBranchNode)
branch.FullPrefix = fullPrefix
default:
err = ErrInvalidData
}
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
err = hg.ImportTree(
application.HyperedgeAtomType,
application.RemovesPhaseType,
shardKey,
node,
p,
)
if err != nil {
return nil, errors.Wrap(err, "load hypergraph")
}
}
if loadedFromDB && complete {
return hg, nil
}
}
vertexAddsPrefix := hex.EncodeToString(
@ -637,6 +658,46 @@ func (p *PebbleHypergraphStore) LoadHypergraph() (
)
p.logger.Info("converting hypergraph, this may take a moment")
if !inProgress {
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_ROOT},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_ROOT},
)
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_NODE},
[]byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_NODE + 1},
)
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_NODE},
[]byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_NODE + 1},
)
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_NODE},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_NODE + 1},
)
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_NODE},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_NODE + 1},
)
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_NODE_BY_PATH},
[]byte{HYPERGRAPH_SHARD, VERTEX_ADDS_TREE_NODE_BY_PATH + 1},
)
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_NODE_BY_PATH},
[]byte{HYPERGRAPH_SHARD, VERTEX_REMOVES_TREE_NODE_BY_PATH + 1},
)
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_NODE_BY_PATH},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_ADDS_TREE_NODE_BY_PATH + 1},
)
p.db.DeleteRange(
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_NODE_BY_PATH},
[]byte{HYPERGRAPH_SHARD, HYPEREDGE_REMOVES_TREE_NODE_BY_PATH + 1},
)
p.db.Set([]byte{HYPERGRAPH_SHARD, HYPERGRAPH_COMPLETE}, []byte{0x01})
}
err = errors.Wrap(
filepath.WalkDir(
hypergraphDir,
@ -698,12 +759,31 @@ func (p *PebbleHypergraphStore) LoadHypergraph() (
return err
}
var existingTree *crypto.LazyVectorCommitmentTree
if inProgress {
existing, ok := hg.GetVertexAdds()[shardKeyFromKey(shardSet)]
if ok {
existingTree = existing.GetTree()
}
}
txn, err := p.NewTransaction(false)
if err != nil {
return err
}
size := len(atoms)
for i, atom := range atoms {
hg.AddVertex(txn, atom.(application.Vertex))
vert, ok := atom.(application.Vertex)
if !ok {
continue
}
id := vert.GetID()
if existingTree != nil {
if v, _ := existingTree.Get(id[:]); v == nil {
continue
}
}
hg.AddVertex(txn, vert)
if i%100 == 99 {
if err := txn.Commit(); err != nil {
@ -714,8 +794,17 @@ func (p *PebbleHypergraphStore) LoadHypergraph() (
if err != nil {
return err
}
p.logger.Info(
"converted batch",
zap.Float32("percentage", float32(i*100)/float32(size)),
)
}
}
p.logger.Info(
"converted batch",
zap.Float32("percentage", float32(100)),
)
if txn != nil {
if err := txn.Commit(); err != nil {
txn.Abort()
@ -723,6 +812,7 @@ func (p *PebbleHypergraphStore) LoadHypergraph() (
}
}
p.db.Set([]byte{HYPERGRAPH_SHARD, HYPERGRAPH_COMPLETE}, []byte{0x02})
return nil
},
),
@ -735,6 +825,10 @@ func (p *PebbleHypergraphStore) LoadHypergraph() (
return hg, nil
}
func (p *PebbleHypergraphStore) MarkHypergraphAsComplete() {
p.db.Set([]byte{HYPERGRAPH_SHARD, HYPERGRAPH_COMPLETE}, []byte{0x02})
}
func (p *PebbleHypergraphStore) SaveHypergraph(
hg *application.Hypergraph,
) error {
@ -1023,13 +1117,12 @@ func (p *PebbleHypergraphStore) GetNodeByPath(
func generateSlices(fullpref, pref []int) [][]int {
result := [][]int{}
if len(pref) >= len(fullpref) {
return [][]int{fullpref}
if len(pref) > len(fullpref) {
panic("invalid prefix length")
}
for i := 0; i <= len(pref); i++ {
newLen := len(fullpref) - i
newSlice := make([]int, newLen)
copy(newSlice, fullpref[:newLen])
for i := len(pref); i <= len(fullpref); i++ {
newSlice := make([]int, i)
copy(newSlice, fullpref[:i])
result = append(result, newSlice)
}
@ -1077,11 +1170,11 @@ func (p *PebbleHypergraphStore) InsertNode(
nodeKey := keyFn(shardKey, key)
switch n := node.(type) {
case *crypto.LazyVectorCommitmentBranchNode:
length := uint32(len(path))
length := uint32(len(n.FullPrefix))
pathBytes := []byte{}
pathBytes = binary.BigEndian.AppendUint32(pathBytes, length)
for i := range int(length) {
pathBytes = binary.BigEndian.AppendUint32(pathBytes, uint32(path[i]))
pathBytes = binary.BigEndian.AppendUint32(pathBytes, uint32(n.FullPrefix[i]))
}
err := crypto.SerializeBranchNode(&b, n, false)
if err != nil {
@ -1169,7 +1262,7 @@ func (p *PebbleHypergraphStore) SaveRoot(
if err != nil {
return errors.Wrap(err, "insert node")
}
data := append([]byte{crypto.TypeBranch}, b.Bytes()...)
data := append([]byte{crypto.TypeLeaf}, b.Bytes()...)
err = p.db.Set(nodeKey, data)
return errors.Wrap(err, "insert node")
}

View File

@ -305,6 +305,10 @@ func NewInMemKVDB() *InMemKVDB {
}
}
func (d *InMemKVDB) GetRawStore() map[string][]byte {
return d.store
}
func (d *InMemKVDB) Get(key []byte) ([]byte, io.Closer, error) {
if !d.open {
return nil, nil, errors.New("inmem db closed")