From 5d6138fe0cd16a2b7cdd85ddf432418440cb4bd6 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Sat, 1 Mar 2025 07:09:43 -0600 Subject: [PATCH] switch to non-gob serializer --- node/crypto/proof_tree.go | 282 +++++++++++++++++++- node/hypergraph/application/hypergraph.go | 23 +- node/rpc/hypergraph_sync_rpc_server.go | 34 ++- node/rpc/hypergraph_sync_rpc_server_test.go | 10 +- node/store/hypergraph.go | 18 +- 5 files changed, 320 insertions(+), 47 deletions(-) diff --git a/node/crypto/proof_tree.go b/node/crypto/proof_tree.go index 7140f96..a8d48d2 100644 --- a/node/crypto/proof_tree.go +++ b/node/crypto/proof_tree.go @@ -7,6 +7,7 @@ import ( "encoding/gob" "errors" "fmt" + "io" "math/big" "sync" @@ -20,9 +21,12 @@ func init() { } const ( - BranchNodes = 64 - BranchBits = 6 // log2(64) - BranchMask = BranchNodes - 1 + BranchNodes = 64 + BranchBits = 6 // log2(64) + BranchMask = BranchNodes - 1 + TypeNil byte = 0 + TypeLeaf byte = 1 + TypeBranch byte = 2 ) type VectorCommitmentNode interface { @@ -628,3 +632,275 @@ func DebugNode(node VectorCommitmentNode, depth int, prefix string) { } } } + +func SerializeTree(tree *VectorCommitmentTree) ([]byte, error) { + var buf bytes.Buffer + if err := serializeNode(&buf, tree.Root); err != nil { + return nil, fmt.Errorf("failed to serialize tree: %w", err) + } + return buf.Bytes(), nil +} + +func DeserializeTree(data []byte) (*VectorCommitmentTree, error) { + buf := bytes.NewReader(data) + node, err := deserializeNode(buf) + if err != nil { + return nil, fmt.Errorf("failed to deserialize tree: %w", err) + } + return &VectorCommitmentTree{Root: node}, nil +} + +func serializeNode(w io.Writer, node VectorCommitmentNode) error { + if node == nil { + if err := binary.Write(w, binary.BigEndian, TypeNil); err != nil { + return err + } + return nil + } + + switch n := node.(type) { + case *VectorCommitmentLeafNode: + if err := binary.Write(w, binary.BigEndian, TypeLeaf); err != nil { + return err + } + return serializeLeafNode(w, n) + case *VectorCommitmentBranchNode: + if err := binary.Write(w, binary.BigEndian, TypeBranch); err != nil { + return err + } + return serializeBranchNode(w, n) + default: + return fmt.Errorf("unknown node type: %T", node) + } +} + +func serializeLeafNode(w io.Writer, node *VectorCommitmentLeafNode) error { + if err := serializeBytes(w, node.Key); err != nil { + return err + } + + if err := serializeBytes(w, node.Value); err != nil { + return err + } + + if err := serializeBytes(w, node.HashTarget); err != nil { + return err + } + + if err := serializeBytes(w, node.Commitment); err != nil { + return err + } + + return serializeBigInt(w, node.Size) +} + +func serializeBranchNode(w io.Writer, node *VectorCommitmentBranchNode) error { + if err := serializeIntSlice(w, node.Prefix); err != nil { + return err + } + + for i := 0; i < BranchNodes; i++ { + child := node.Children[i] + if err := serializeNode(w, child); err != nil { + return err + } + } + + if err := serializeBytes(w, node.Commitment); err != nil { + return err + } + + if err := serializeBigInt(w, node.Size); err != nil { + return err + } + + if err := binary.Write(w, binary.BigEndian, int64(node.LeafCount)); err != nil { + return err + } + + return binary.Write(w, binary.BigEndian, int32(node.LongestBranch)) +} + +func deserializeNode(r io.Reader) (VectorCommitmentNode, error) { + var nodeType byte + if err := binary.Read(r, binary.BigEndian, &nodeType); err != nil { + return nil, err + } + + switch nodeType { + case TypeNil: + return nil, nil + case TypeLeaf: + return deserializeLeafNode(r) + case TypeBranch: + return deserializeBranchNode(r) + default: + return nil, fmt.Errorf("unknown node type marker: %d", nodeType) + } +} + +func deserializeLeafNode(r io.Reader) (*VectorCommitmentLeafNode, error) { + node := &VectorCommitmentLeafNode{} + + key, err := deserializeBytes(r) + if err != nil { + return nil, err + } + node.Key = key + + value, err := deserializeBytes(r) + if err != nil { + return nil, err + } + node.Value = value + + hashTarget, err := deserializeBytes(r) + if err != nil { + return nil, err + } + node.HashTarget = hashTarget + + commitment, err := deserializeBytes(r) + if err != nil { + return nil, err + } + node.Commitment = commitment + + size, err := deserializeBigInt(r) + if err != nil { + return nil, err + } + node.Size = size + + return node, nil +} + +func deserializeBranchNode(r io.Reader) (*VectorCommitmentBranchNode, error) { + node := &VectorCommitmentBranchNode{} + + prefix, err := deserializeIntSlice(r) + if err != nil { + return nil, err + } + node.Prefix = prefix + + node.Children = [BranchNodes]VectorCommitmentNode{} + for i := 0; i < BranchNodes; i++ { + child, err := deserializeNode(r) + if err != nil { + return nil, err + } + node.Children[i] = child + } + + commitment, err := deserializeBytes(r) + if err != nil { + return nil, err + } + node.Commitment = commitment + + size, err := deserializeBigInt(r) + if err != nil { + return nil, err + } + node.Size = size + + var leafCount int64 + if err := binary.Read(r, binary.BigEndian, &leafCount); err != nil { + return nil, err + } + node.LeafCount = int(leafCount) + + var longestBranch int32 + if err := binary.Read(r, binary.BigEndian, &longestBranch); err != nil { + return nil, err + } + node.LongestBranch = int(longestBranch) + + return node, nil +} + +func serializeBytes(w io.Writer, data []byte) error { + length := uint64(len(data)) + if err := binary.Write(w, binary.BigEndian, length); err != nil { + return err + } + + if length > 0 { + if _, err := w.Write(data); err != nil { + return err + } + } + return nil +} + +func deserializeBytes(r io.Reader) ([]byte, error) { + var length uint64 + if err := binary.Read(r, binary.BigEndian, &length); err != nil { + return nil, err + } + + if length > 0 { + data := make([]byte, length) + if _, err := io.ReadFull(r, data); err != nil { + return nil, err + } + return data, nil + } + return []byte{}, nil +} + +func serializeIntSlice(w io.Writer, ints []int) error { + length := uint32(len(ints)) + if err := binary.Write(w, binary.BigEndian, length); err != nil { + return err + } + + for _, v := range ints { + if err := binary.Write(w, binary.BigEndian, int32(v)); err != nil { + return err + } + } + return nil +} + +func deserializeIntSlice(r io.Reader) ([]int, error) { + var length uint32 + if err := binary.Read(r, binary.BigEndian, &length); err != nil { + return nil, err + } + + ints := make([]int, length) + for i := range ints { + var v int32 + if err := binary.Read(r, binary.BigEndian, &v); err != nil { + return nil, err + } + ints[i] = int(v) + } + return ints, nil +} + +func serializeBigInt(w io.Writer, n *big.Int) error { + if n == nil { + return binary.Write(w, binary.BigEndian, uint32(0)) + } + + bytes := n.Bytes() + + return serializeBytes(w, bytes) +} + +func deserializeBigInt(r io.Reader) (*big.Int, error) { + bytes, err := deserializeBytes(r) + if err != nil { + return nil, err + } + + if len(bytes) == 0 { + return new(big.Int), nil + } + + n := new(big.Int).SetBytes(bytes) + return n, nil +} diff --git a/node/hypergraph/application/hypergraph.go b/node/hypergraph/application/hypergraph.go index c65b39f..0a379f2 100644 --- a/node/hypergraph/application/hypergraph.go +++ b/node/hypergraph/application/hypergraph.go @@ -347,19 +347,10 @@ func NewIdSet(atomType AtomType) *IdSet { } func (set *IdSet) FromBytes(treeData []byte) error { - set.tree = &crypto.VectorCommitmentTree{} - var b bytes.Buffer - b.Write(treeData) - dec := gob.NewDecoder(&b) - if err := dec.Decode(set.tree); err != nil { - return errors.Wrap(err, "load set") - } + var err error + set.tree, err = crypto.DeserializeTree(treeData) - for _, leaf := range crypto.GetAllLeaves(set.tree.Root) { - set.atoms[[64]byte(leaf.Key)] = AtomFromBytes(leaf.Value) - } - - return nil + return errors.Wrap(err, "from bytes") } func (set *IdSet) IsDirty() bool { @@ -367,13 +358,7 @@ func (set *IdSet) IsDirty() bool { } func (set *IdSet) ToBytes() ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(set.tree); err != nil { - return nil, errors.Wrap(err, "to bytes") - } - - return buf.Bytes(), nil + return crypto.SerializeTree(set.tree) } func (set *IdSet) Add(atom Atom) error { diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index 2f5631f..73e00eb 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -470,19 +470,21 @@ func walk( return errors.Wrap(err, "walk") } - if rtrav == nil { - logger.Info("traversal could not reach path, sending leaf data") - sendLeafData( - stream, - hypergraphStore, - localTree, - path, - metadataOnly, - ) - return nil - } + break } } + + if rtrav == nil { + logger.Info("traversal could not reach path, sending leaf data") + sendLeafData( + stream, + hypergraphStore, + localTree, + path, + metadataOnly, + ) + return nil + } } logger.Info("traversal completed, performing walk", pathString) return walk( @@ -613,7 +615,15 @@ func walk( nextPath, ) if err != nil { - return errors.Wrap(err, "walk") + logger.Info("incomplete branch descension, sending leaves") + sendLeafData( + stream, + hypergraphStore, + localTree, + nextPath, + metadataOnly, + ) + return nil } if err = walk( diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index a5757a8..7cab45d 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -108,7 +108,7 @@ func TestHypergraphSyncServer(t *testing.T) { } txn, _ := serverHypergraphStore.NewTransaction(false) - for _, op := range operations1[:250] { + for _, op := range operations1[:numOperations/2] { switch op.Type { case "AddVertex": id := op.Vertex.GetID() @@ -137,7 +137,7 @@ func TestHypergraphSyncServer(t *testing.T) { } txn, _ = clientHypergraphStore.NewTransaction(false) - for _, op := range operations1[250:] { + for _, op := range operations1[numOperations/2:] { switch op.Type { case "AddVertex": id := op.Vertex.GetID() @@ -219,7 +219,7 @@ func TestHypergraphSyncServer(t *testing.T) { grpcServer := grpc.NewServer() protobufs.RegisterHypergraphComparisonServiceServer( grpcServer, - rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), 10000), + rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), numOperations), ) log.Println("Server listening on :50051") go func() { @@ -239,7 +239,7 @@ func TestHypergraphSyncServer(t *testing.T) { syncController := rpc.NewSyncController() - err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, 10000, false) + err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false) if err != nil { log.Fatalf("Client: failed to sync 1: %v", err) } @@ -258,7 +258,7 @@ func TestHypergraphSyncServer(t *testing.T) { log.Fatalf("Client: failed to stream: %v", err) } - err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, 10000, false) + err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false) if err != nil { log.Fatalf("Client: failed to sync 2: %v", err) } diff --git a/node/store/hypergraph.go b/node/store/hypergraph.go index 5642b34..4a2e615 100644 --- a/node/store/hypergraph.go +++ b/node/store/hypergraph.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/gob" "encoding/hex" - "fmt" "io/fs" "os" "path" @@ -238,7 +237,12 @@ func (p *PebbleHypergraphStore) LoadHypergraph() ( return nil } - shardSet, err := hex.DecodeString(d.Name()) + if len(strings.Split(d.Name(), ".")) != 2 || + strings.Split(d.Name(), ".")[1] != "vct" { + return nil + } + + shardSet, err := hex.DecodeString(strings.Split(d.Name(), ".")[0]) if err != nil { return err } @@ -300,8 +304,6 @@ func (p *PebbleHypergraphStore) SaveHypergraph( return errors.Wrap(err, "save hypergraph") } - fmt.Println(len(data)) - err = os.WriteFile( path.Join( hypergraphDir, @@ -321,7 +323,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph( ), path.Join( hypergraphDir, - hex.EncodeToString(hypergraphVertexAddsKey(shardKey)), + hex.EncodeToString(hypergraphVertexAddsKey(shardKey))+".vct", ), ); err != nil { return errors.Wrap(err, "save hypergraph") @@ -355,7 +357,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph( ), path.Join( hypergraphDir, - hex.EncodeToString(hypergraphVertexRemovesKey(shardKey)), + hex.EncodeToString(hypergraphVertexRemovesKey(shardKey))+".vct", ), ); err != nil { return errors.Wrap(err, "save hypergraph") @@ -389,7 +391,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph( ), path.Join( hypergraphDir, - hex.EncodeToString(hypergraphHyperedgeAddsKey(shardKey)), + hex.EncodeToString(hypergraphHyperedgeAddsKey(shardKey))+".vct", ), ); err != nil { return errors.Wrap(err, "save hypergraph") @@ -423,7 +425,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph( ), path.Join( hypergraphDir, - hex.EncodeToString(hypergraphHyperedgeRemovesKey(shardKey)), + hex.EncodeToString(hypergraphHyperedgeRemovesKey(shardKey))+".vct", ), ); err != nil { return errors.Wrap(err, "save hypergraph")