switch to non-gob serializer

This commit is contained in:
Cassandra Heart 2025-03-01 07:09:43 -06:00
parent 7331618d3c
commit 5d6138fe0c
No known key found for this signature in database
GPG Key ID: 6352152859385958
5 changed files with 320 additions and 47 deletions

View File

@ -7,6 +7,7 @@ import (
"encoding/gob"
"errors"
"fmt"
"io"
"math/big"
"sync"
@ -20,9 +21,12 @@ func init() {
}
const (
BranchNodes = 64
BranchBits = 6 // log2(64)
BranchMask = BranchNodes - 1
BranchNodes = 64
BranchBits = 6 // log2(64)
BranchMask = BranchNodes - 1
TypeNil byte = 0
TypeLeaf byte = 1
TypeBranch byte = 2
)
type VectorCommitmentNode interface {
@ -628,3 +632,275 @@ func DebugNode(node VectorCommitmentNode, depth int, prefix string) {
}
}
}
func SerializeTree(tree *VectorCommitmentTree) ([]byte, error) {
var buf bytes.Buffer
if err := serializeNode(&buf, tree.Root); err != nil {
return nil, fmt.Errorf("failed to serialize tree: %w", err)
}
return buf.Bytes(), nil
}
func DeserializeTree(data []byte) (*VectorCommitmentTree, error) {
buf := bytes.NewReader(data)
node, err := deserializeNode(buf)
if err != nil {
return nil, fmt.Errorf("failed to deserialize tree: %w", err)
}
return &VectorCommitmentTree{Root: node}, nil
}
func serializeNode(w io.Writer, node VectorCommitmentNode) error {
if node == nil {
if err := binary.Write(w, binary.BigEndian, TypeNil); err != nil {
return err
}
return nil
}
switch n := node.(type) {
case *VectorCommitmentLeafNode:
if err := binary.Write(w, binary.BigEndian, TypeLeaf); err != nil {
return err
}
return serializeLeafNode(w, n)
case *VectorCommitmentBranchNode:
if err := binary.Write(w, binary.BigEndian, TypeBranch); err != nil {
return err
}
return serializeBranchNode(w, n)
default:
return fmt.Errorf("unknown node type: %T", node)
}
}
func serializeLeafNode(w io.Writer, node *VectorCommitmentLeafNode) error {
if err := serializeBytes(w, node.Key); err != nil {
return err
}
if err := serializeBytes(w, node.Value); err != nil {
return err
}
if err := serializeBytes(w, node.HashTarget); err != nil {
return err
}
if err := serializeBytes(w, node.Commitment); err != nil {
return err
}
return serializeBigInt(w, node.Size)
}
func serializeBranchNode(w io.Writer, node *VectorCommitmentBranchNode) error {
if err := serializeIntSlice(w, node.Prefix); err != nil {
return err
}
for i := 0; i < BranchNodes; i++ {
child := node.Children[i]
if err := serializeNode(w, child); err != nil {
return err
}
}
if err := serializeBytes(w, node.Commitment); err != nil {
return err
}
if err := serializeBigInt(w, node.Size); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, int64(node.LeafCount)); err != nil {
return err
}
return binary.Write(w, binary.BigEndian, int32(node.LongestBranch))
}
func deserializeNode(r io.Reader) (VectorCommitmentNode, error) {
var nodeType byte
if err := binary.Read(r, binary.BigEndian, &nodeType); err != nil {
return nil, err
}
switch nodeType {
case TypeNil:
return nil, nil
case TypeLeaf:
return deserializeLeafNode(r)
case TypeBranch:
return deserializeBranchNode(r)
default:
return nil, fmt.Errorf("unknown node type marker: %d", nodeType)
}
}
func deserializeLeafNode(r io.Reader) (*VectorCommitmentLeafNode, error) {
node := &VectorCommitmentLeafNode{}
key, err := deserializeBytes(r)
if err != nil {
return nil, err
}
node.Key = key
value, err := deserializeBytes(r)
if err != nil {
return nil, err
}
node.Value = value
hashTarget, err := deserializeBytes(r)
if err != nil {
return nil, err
}
node.HashTarget = hashTarget
commitment, err := deserializeBytes(r)
if err != nil {
return nil, err
}
node.Commitment = commitment
size, err := deserializeBigInt(r)
if err != nil {
return nil, err
}
node.Size = size
return node, nil
}
func deserializeBranchNode(r io.Reader) (*VectorCommitmentBranchNode, error) {
node := &VectorCommitmentBranchNode{}
prefix, err := deserializeIntSlice(r)
if err != nil {
return nil, err
}
node.Prefix = prefix
node.Children = [BranchNodes]VectorCommitmentNode{}
for i := 0; i < BranchNodes; i++ {
child, err := deserializeNode(r)
if err != nil {
return nil, err
}
node.Children[i] = child
}
commitment, err := deserializeBytes(r)
if err != nil {
return nil, err
}
node.Commitment = commitment
size, err := deserializeBigInt(r)
if err != nil {
return nil, err
}
node.Size = size
var leafCount int64
if err := binary.Read(r, binary.BigEndian, &leafCount); err != nil {
return nil, err
}
node.LeafCount = int(leafCount)
var longestBranch int32
if err := binary.Read(r, binary.BigEndian, &longestBranch); err != nil {
return nil, err
}
node.LongestBranch = int(longestBranch)
return node, nil
}
func serializeBytes(w io.Writer, data []byte) error {
length := uint64(len(data))
if err := binary.Write(w, binary.BigEndian, length); err != nil {
return err
}
if length > 0 {
if _, err := w.Write(data); err != nil {
return err
}
}
return nil
}
func deserializeBytes(r io.Reader) ([]byte, error) {
var length uint64
if err := binary.Read(r, binary.BigEndian, &length); err != nil {
return nil, err
}
if length > 0 {
data := make([]byte, length)
if _, err := io.ReadFull(r, data); err != nil {
return nil, err
}
return data, nil
}
return []byte{}, nil
}
func serializeIntSlice(w io.Writer, ints []int) error {
length := uint32(len(ints))
if err := binary.Write(w, binary.BigEndian, length); err != nil {
return err
}
for _, v := range ints {
if err := binary.Write(w, binary.BigEndian, int32(v)); err != nil {
return err
}
}
return nil
}
func deserializeIntSlice(r io.Reader) ([]int, error) {
var length uint32
if err := binary.Read(r, binary.BigEndian, &length); err != nil {
return nil, err
}
ints := make([]int, length)
for i := range ints {
var v int32
if err := binary.Read(r, binary.BigEndian, &v); err != nil {
return nil, err
}
ints[i] = int(v)
}
return ints, nil
}
func serializeBigInt(w io.Writer, n *big.Int) error {
if n == nil {
return binary.Write(w, binary.BigEndian, uint32(0))
}
bytes := n.Bytes()
return serializeBytes(w, bytes)
}
func deserializeBigInt(r io.Reader) (*big.Int, error) {
bytes, err := deserializeBytes(r)
if err != nil {
return nil, err
}
if len(bytes) == 0 {
return new(big.Int), nil
}
n := new(big.Int).SetBytes(bytes)
return n, nil
}

View File

@ -347,19 +347,10 @@ func NewIdSet(atomType AtomType) *IdSet {
}
func (set *IdSet) FromBytes(treeData []byte) error {
set.tree = &crypto.VectorCommitmentTree{}
var b bytes.Buffer
b.Write(treeData)
dec := gob.NewDecoder(&b)
if err := dec.Decode(set.tree); err != nil {
return errors.Wrap(err, "load set")
}
var err error
set.tree, err = crypto.DeserializeTree(treeData)
for _, leaf := range crypto.GetAllLeaves(set.tree.Root) {
set.atoms[[64]byte(leaf.Key)] = AtomFromBytes(leaf.Value)
}
return nil
return errors.Wrap(err, "from bytes")
}
func (set *IdSet) IsDirty() bool {
@ -367,13 +358,7 @@ func (set *IdSet) IsDirty() bool {
}
func (set *IdSet) ToBytes() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(set.tree); err != nil {
return nil, errors.Wrap(err, "to bytes")
}
return buf.Bytes(), nil
return crypto.SerializeTree(set.tree)
}
func (set *IdSet) Add(atom Atom) error {

View File

@ -470,19 +470,21 @@ func walk(
return errors.Wrap(err, "walk")
}
if rtrav == nil {
logger.Info("traversal could not reach path, sending leaf data")
sendLeafData(
stream,
hypergraphStore,
localTree,
path,
metadataOnly,
)
return nil
}
break
}
}
if rtrav == nil {
logger.Info("traversal could not reach path, sending leaf data")
sendLeafData(
stream,
hypergraphStore,
localTree,
path,
metadataOnly,
)
return nil
}
}
logger.Info("traversal completed, performing walk", pathString)
return walk(
@ -613,7 +615,15 @@ func walk(
nextPath,
)
if err != nil {
return errors.Wrap(err, "walk")
logger.Info("incomplete branch descension, sending leaves")
sendLeafData(
stream,
hypergraphStore,
localTree,
nextPath,
metadataOnly,
)
return nil
}
if err = walk(

View File

@ -108,7 +108,7 @@ func TestHypergraphSyncServer(t *testing.T) {
}
txn, _ := serverHypergraphStore.NewTransaction(false)
for _, op := range operations1[:250] {
for _, op := range operations1[:numOperations/2] {
switch op.Type {
case "AddVertex":
id := op.Vertex.GetID()
@ -137,7 +137,7 @@ func TestHypergraphSyncServer(t *testing.T) {
}
txn, _ = clientHypergraphStore.NewTransaction(false)
for _, op := range operations1[250:] {
for _, op := range operations1[numOperations/2:] {
switch op.Type {
case "AddVertex":
id := op.Vertex.GetID()
@ -219,7 +219,7 @@ func TestHypergraphSyncServer(t *testing.T) {
grpcServer := grpc.NewServer()
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), 10000),
rpc.NewHypergraphComparisonServer(logger, serverHypergraphStore, crdts[0], rpc.NewSyncController(), numOperations),
)
log.Println("Server listening on :50051")
go func() {
@ -239,7 +239,7 @@ func TestHypergraphSyncServer(t *testing.T) {
syncController := rpc.NewSyncController()
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, 10000, false)
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false)
if err != nil {
log.Fatalf("Client: failed to sync 1: %v", err)
}
@ -258,7 +258,7 @@ func TestHypergraphSyncServer(t *testing.T) {
log.Fatalf("Client: failed to stream: %v", err)
}
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, 10000, false)
err = rpc.SyncTreeBidirectionally(str, logger, append(append([]byte{}, shardKey.L1[:]...), shardKey.L2[:]...), protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, clientHypergraphStore, crdts[1].GetVertexAdds()[shardKey], syncController, numOperations, false)
if err != nil {
log.Fatalf("Client: failed to sync 2: %v", err)
}

View File

@ -4,7 +4,6 @@ import (
"bytes"
"encoding/gob"
"encoding/hex"
"fmt"
"io/fs"
"os"
"path"
@ -238,7 +237,12 @@ func (p *PebbleHypergraphStore) LoadHypergraph() (
return nil
}
shardSet, err := hex.DecodeString(d.Name())
if len(strings.Split(d.Name(), ".")) != 2 ||
strings.Split(d.Name(), ".")[1] != "vct" {
return nil
}
shardSet, err := hex.DecodeString(strings.Split(d.Name(), ".")[0])
if err != nil {
return err
}
@ -300,8 +304,6 @@ func (p *PebbleHypergraphStore) SaveHypergraph(
return errors.Wrap(err, "save hypergraph")
}
fmt.Println(len(data))
err = os.WriteFile(
path.Join(
hypergraphDir,
@ -321,7 +323,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph(
),
path.Join(
hypergraphDir,
hex.EncodeToString(hypergraphVertexAddsKey(shardKey)),
hex.EncodeToString(hypergraphVertexAddsKey(shardKey))+".vct",
),
); err != nil {
return errors.Wrap(err, "save hypergraph")
@ -355,7 +357,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph(
),
path.Join(
hypergraphDir,
hex.EncodeToString(hypergraphVertexRemovesKey(shardKey)),
hex.EncodeToString(hypergraphVertexRemovesKey(shardKey))+".vct",
),
); err != nil {
return errors.Wrap(err, "save hypergraph")
@ -389,7 +391,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph(
),
path.Join(
hypergraphDir,
hex.EncodeToString(hypergraphHyperedgeAddsKey(shardKey)),
hex.EncodeToString(hypergraphHyperedgeAddsKey(shardKey))+".vct",
),
); err != nil {
return errors.Wrap(err, "save hypergraph")
@ -423,7 +425,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph(
),
path.Join(
hypergraphDir,
hex.EncodeToString(hypergraphHyperedgeRemovesKey(shardKey)),
hex.EncodeToString(hypergraphHyperedgeRemovesKey(shardKey))+".vct",
),
); err != nil {
return errors.Wrap(err, "save hypergraph")