ceremonyclient/node/rpc/hypergraph_sync_rpc_server_test.go
2026-01-16 11:29:57 -06:00

3379 lines
104 KiB
Go

package rpc_test
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha512"
"encoding/binary"
"fmt"
"log"
"math/big"
"net"
"os"
"slices"
"sync"
"testing"
"time"
"github.com/cloudflare/circl/sign/ed448"
"github.com/iden3/go-iden3-crypto/poseidon"
pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
"source.quilibrium.com/quilibrium/monorepo/bls48581"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
internal_grpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tests"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
application "source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
crypto "source.quilibrium.com/quilibrium/monorepo/types/tries"
"source.quilibrium.com/quilibrium/monorepo/verenc"
)
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
type Operation struct {
Type string // "AddVertex", "RemoveVertex", "AddHyperedge", "RemoveHyperedge"
Vertex application.Vertex
Hyperedge application.Hyperedge
}
func TestHypergraphSyncServer(t *testing.T) {
numParties := 3
numOperations := 1000
log.Printf("Generating data")
enc := verenc.NewMPCitHVerifiableEncryptor(1)
pub, _, _ := ed448.GenerateKey(rand.Reader)
data1 := enc.Encrypt(make([]byte, 20), pub)
verenc1 := data1[0].Compress()
vertices1 := make([]application.Vertex, numOperations)
dataTree1 := &crypto.VectorCommitmentTree{}
logger, _ := zap.NewDevelopment()
inclusionProver := bls48581.NewKZGInclusionProver(logger)
for _, d := range []application.Encrypted{verenc1} {
dataBytes := d.ToBytes()
id := sha512.Sum512(dataBytes)
dataTree1.Insert(id[:], dataBytes, d.GetStatement(), big.NewInt(int64(len(data1)*55)))
}
dataTree1.Commit(inclusionProver, false)
for i := 0; i < numOperations; i++ {
b := make([]byte, 32)
rand.Read(b)
vertices1[i] = hgcrdt.NewVertex(
[32]byte{},
[32]byte(b),
dataTree1.Commit(inclusionProver, false),
dataTree1.GetSize(),
)
}
hyperedges := make([]application.Hyperedge, numOperations/10)
for i := 0; i < numOperations/10; i++ {
hyperedges[i] = hgcrdt.NewHyperedge(
[32]byte{},
[32]byte{0, 0, byte((i >> 8) / 256), byte(i / 256)},
)
for j := 0; j < 3; j++ {
n, _ := rand.Int(rand.Reader, big.NewInt(int64(len(vertices1))))
v := vertices1[n.Int64()]
hyperedges[i].AddExtrinsic(v)
}
}
shardKey := application.GetShardKey(vertices1[0])
operations1 := make([]Operation, numOperations)
operations2 := make([]Operation, numOperations)
for i := 0; i < numOperations; i++ {
operations1[i] = Operation{Type: "AddVertex", Vertex: vertices1[i]}
}
for i := 0; i < numOperations; i++ {
op, _ := rand.Int(rand.Reader, big.NewInt(2))
switch op.Int64() {
case 0:
e, _ := rand.Int(rand.Reader, big.NewInt(int64(len(hyperedges))))
operations2[i] = Operation{Type: "AddHyperedge", Hyperedge: hyperedges[e.Int64()]}
case 1:
e, _ := rand.Int(rand.Reader, big.NewInt(int64(len(hyperedges))))
operations2[i] = Operation{Type: "RemoveHyperedge", Hyperedge: hyperedges[e.Int64()]}
}
}
clientKvdb := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}, 0)
serverKvdb := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}, 0)
controlKvdb := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestcontrol/store"}, 0)
clientHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestclient/store"},
clientKvdb,
logger,
enc,
inclusionProver,
)
serverHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestserver/store"},
serverKvdb,
logger,
enc,
inclusionProver,
)
controlHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestcontrol/store"},
controlKvdb,
logger,
enc,
inclusionProver,
)
crdts := make([]application.Hypergraph, numParties)
crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
servertxn, _ := serverHypergraphStore.NewTransaction(false)
clienttxn, _ := clientHypergraphStore.NewTransaction(false)
controltxn, _ := controlHypergraphStore.NewTransaction(false)
for i, op := range operations1 {
switch op.Type {
case "AddVertex":
{
id := op.Vertex.GetID()
serverHypergraphStore.SaveVertexTree(servertxn, id[:], dataTree1)
crdts[0].AddVertex(servertxn, op.Vertex)
}
{
if i%3 == 0 {
id := op.Vertex.GetID()
clientHypergraphStore.SaveVertexTree(clienttxn, id[:], dataTree1)
crdts[1].AddVertex(clienttxn, op.Vertex)
}
}
case "RemoveVertex":
crdts[0].RemoveVertex(nil, op.Vertex)
// case "AddHyperedge":
// fmt.Printf("server add hyperedge %v\n", time.Now())
// crdts[0].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// fmt.Printf("server remove hyperedge %v\n", time.Now())
// crdts[0].RemoveHyperedge(nil, op.Hyperedge)
}
}
servertxn.Commit()
clienttxn.Commit()
// Seed many orphan vertices that only exist on the client so pruning can
// remove them. We create enough orphans with varied addresses to trigger
// tree restructuring (node merges) when they get deleted during sync.
// This tests the fix for the FullPrefix bug in lazy_proof_tree.go Delete().
numOrphans := 200
orphanVertices := make([]application.Vertex, numOrphans)
orphanIDs := make([][64]byte, numOrphans)
orphanTxn, err := clientHypergraphStore.NewTransaction(false)
require.NoError(t, err)
for i := 0; i < numOrphans; i++ {
orphanData := make([]byte, 32)
_, _ = rand.Read(orphanData)
// Mix in the index to ensure varied distribution across tree branches
binary.BigEndian.PutUint32(orphanData[28:], uint32(i))
var orphanAddr [32]byte
copy(orphanAddr[:], orphanData)
orphanVertices[i] = hgcrdt.NewVertex(
vertices1[0].GetAppAddress(),
orphanAddr,
dataTree1.Commit(inclusionProver, false),
dataTree1.GetSize(),
)
orphanShard := application.GetShardKey(orphanVertices[i])
require.Equal(t, shardKey, orphanShard, "orphan vertex %d must share shard", i)
orphanIDs[i] = orphanVertices[i].GetID()
require.NoError(t, clientHypergraphStore.SaveVertexTree(orphanTxn, orphanIDs[i][:], dataTree1))
require.NoError(t, crdts[1].AddVertex(orphanTxn, orphanVertices[i]))
}
require.NoError(t, orphanTxn.Commit())
clientSet := crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey)
for i := 0; i < numOrphans; i++ {
require.True(t, clientSet.Has(orphanIDs[i]), "client must start with orphan leaf %d", i)
}
logger.Info("saved")
for _, op := range operations1 {
switch op.Type {
case "AddVertex":
crdts[2].AddVertex(controltxn, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(controltxn, op.Vertex)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
for _, op := range operations2 {
switch op.Type {
case "AddVertex":
crdts[2].AddVertex(controltxn, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(controltxn, op.Vertex)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
controltxn.Commit()
logger.Info("run commit server")
crdts[0].Commit(0)
logger.Info("run commit client")
crdts[1].Commit(0)
// crdts[2].Commit()
// err := serverHypergraphStore.SaveHypergraph(crdts[0])
// assert.NoError(t, err)
// err = clientHypergraphStore.SaveHypergraph(crdts[1])
// assert.NoError(t, err)
logger.Info("mark as complete")
serverHypergraphStore.MarkHypergraphAsComplete()
clientHypergraphStore.MarkHypergraphAsComplete()
logger.Info("load server")
log.Printf("Generated data")
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Server: failed to listen: %v", err)
}
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
if err != nil {
t.FailNow()
}
pub := privKey.GetPublic()
peerId, err := peer.IDFromPublicKey(pub)
if err != nil {
t.FailNow()
}
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(
ss.Context(),
peerId,
),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
crdts[0],
)
defer grpcServer.Stop()
log.Println("Server listening on :50051")
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Server: failed to serve: %v", err)
}
}()
conn, err := grpc.DialContext(context.TODO(), "localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
if err != nil {
log.Fatalf("Client: failed to listen: %v", err)
}
client := protobufs.NewHypergraphComparisonServiceClient(conn)
str, err := client.PerformSync(context.TODO())
if err != nil {
log.Fatalf("Client: failed to stream: %v", err)
}
_, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
if err != nil {
log.Fatalf("Client: failed to sync 1: %v", err)
}
str.CloseSend()
// Verify all orphan vertices were pruned after sync
for i := 0; i < numOrphans; i++ {
require.False(t, clientSet.Has(orphanIDs[i]), "orphan vertex %d should be pruned after sync", i)
}
leaves := crypto.CompareLeaves(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
)
fmt.Println("pass completed, orphans:", len(leaves))
// Ensure every leaf received during raw sync lies within the covered prefix path.
clientTree := crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree()
coveredPrefixPath := clientTree.CoveredPrefix
if len(coveredPrefixPath) == 0 {
coveredPrefixPath = tries.GetFullPath(orphanIDs[0][:])[:0]
}
allLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
for _, leaf := range allLeaves {
if leaf == nil {
continue
}
if len(coveredPrefixPath) > 0 {
require.True(
t,
isPrefix(coveredPrefixPath, tries.GetFullPath(leaf.Key)),
"raw sync leaf outside covered prefix",
)
}
}
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
str, err = client.PerformSync(context.TODO())
if err != nil {
log.Fatalf("Client: failed to stream: %v", err)
}
_, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
if err != nil {
log.Fatalf("Client: failed to sync 2: %v", err)
}
str.CloseSend()
if !bytes.Equal(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
) {
leaves := crypto.CompareLeaves(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
)
fmt.Println("remaining orphans", len(leaves))
log.Fatalf(
"trees mismatch: %v %v",
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
)
}
if !bytes.Equal(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
crdts[2].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
) {
log.Fatalf(
"trees did not converge to correct state: %v %v",
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
crdts[2].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
)
}
}
func TestHypergraphPartialSync(t *testing.T) {
numParties := 3
numOperations := 1000
log.Printf("Generating data")
enc := verenc.NewMPCitHVerifiableEncryptor(1)
pub, _, _ := ed448.GenerateKey(rand.Reader)
data1 := enc.Encrypt(make([]byte, 20), pub)
verenc1 := data1[0].Compress()
vertices1 := make([]application.Vertex, numOperations)
dataTree1 := &crypto.VectorCommitmentTree{}
logger, _ := zap.NewDevelopment()
inclusionProver := bls48581.NewKZGInclusionProver(logger)
domain := make([]byte, 32)
rand.Read(domain)
domainbi, _ := poseidon.HashBytes(domain)
domain = domainbi.FillBytes(make([]byte, 32))
for _, d := range []application.Encrypted{verenc1} {
dataBytes := d.ToBytes()
id := sha512.Sum512(dataBytes)
dataTree1.Insert(id[:], dataBytes, d.GetStatement(), big.NewInt(int64(len(data1)*55)))
}
dataTree1.Commit(inclusionProver, false)
for i := 0; i < numOperations; i++ {
b := make([]byte, 32)
rand.Read(b)
addr, _ := poseidon.HashBytes(b)
vertices1[i] = hgcrdt.NewVertex(
[32]byte(domain),
[32]byte(addr.FillBytes(make([]byte, 32))),
dataTree1.Commit(inclusionProver, false),
dataTree1.GetSize(),
)
}
hyperedges := make([]application.Hyperedge, numOperations/10)
for i := 0; i < numOperations/10; i++ {
hyperedges[i] = hgcrdt.NewHyperedge(
[32]byte(domain),
[32]byte{0, 0, byte((i >> 8) / 256), byte(i / 256)},
)
for j := 0; j < 3; j++ {
n, _ := rand.Int(rand.Reader, big.NewInt(int64(len(vertices1))))
v := vertices1[n.Int64()]
hyperedges[i].AddExtrinsic(v)
}
}
shardKey := application.GetShardKey(vertices1[0])
operations1 := make([]Operation, numOperations)
operations2 := make([]Operation, numOperations)
for i := 0; i < numOperations; i++ {
operations1[i] = Operation{Type: "AddVertex", Vertex: vertices1[i]}
}
for i := 0; i < numOperations; i++ {
op, _ := rand.Int(rand.Reader, big.NewInt(2))
switch op.Int64() {
case 0:
e, _ := rand.Int(rand.Reader, big.NewInt(int64(len(hyperedges))))
operations2[i] = Operation{Type: "AddHyperedge", Hyperedge: hyperedges[e.Int64()]}
case 1:
e, _ := rand.Int(rand.Reader, big.NewInt(int64(len(hyperedges))))
operations2[i] = Operation{Type: "RemoveHyperedge", Hyperedge: hyperedges[e.Int64()]}
}
}
clientKvdb := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}, 0)
serverKvdb := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}, 0)
controlKvdb := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestcontrol/store"}, 0)
clientHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestclient/store"},
clientKvdb,
logger,
enc,
inclusionProver,
)
serverHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestserver/store"},
serverKvdb,
logger,
enc,
inclusionProver,
)
controlHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestcontrol/store"},
controlKvdb,
logger,
enc,
inclusionProver,
)
crdts := make([]application.Hypergraph, numParties)
crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
servertxn, _ := serverHypergraphStore.NewTransaction(false)
controltxn, _ := controlHypergraphStore.NewTransaction(false)
branchfork := []int32{}
for i, op := range operations1 {
switch op.Type {
case "AddVertex":
{
id := op.Vertex.GetID()
serverHypergraphStore.SaveVertexTree(servertxn, id[:], dataTree1)
crdts[0].AddVertex(servertxn, op.Vertex)
}
{
if i == 500 {
id := op.Vertex.GetID()
// Grab the first path of the data address, should get 1/64th ish
branchfork = GetFullPath(id[:])[:44]
}
}
case "RemoveVertex":
crdts[0].RemoveVertex(nil, op.Vertex)
// case "AddHyperedge":
// fmt.Printf("server add hyperedge %v\n", time.Now())
// crdts[0].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// fmt.Printf("server remove hyperedge %v\n", time.Now())
// crdts[0].RemoveHyperedge(nil, op.Hyperedge)
}
}
servertxn.Commit()
crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, toIntSlice(toUint32Slice(branchfork)), &tests.Nopthenticator{}, 200)
logger.Info("saved")
for _, op := range operations1 {
switch op.Type {
case "AddVertex":
crdts[2].AddVertex(controltxn, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(controltxn, op.Vertex)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
for _, op := range operations2 {
switch op.Type {
case "AddVertex":
crdts[2].AddVertex(controltxn, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(controltxn, op.Vertex)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
controltxn.Commit()
logger.Info("run commit server")
crdts[0].Commit(1)
logger.Info("run commit client")
crdts[1].Commit(1)
// crdts[2].Commit()
// err := serverHypergraphStore.SaveHypergraph(crdts[0])
// assert.NoError(t, err)
// err = clientHypergraphStore.SaveHypergraph(crdts[1])
// assert.NoError(t, err)
logger.Info("load server")
log.Printf("Generated data")
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Server: failed to listen: %v", err)
}
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
if err != nil {
t.FailNow()
}
pub := privKey.GetPublic()
peerId, err := peer.IDFromPublicKey(pub)
if err != nil {
t.FailNow()
}
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(
ss.Context(),
peerId,
),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
crdts[0],
)
log.Println("Server listening on :50051")
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Server: failed to serve: %v", err)
}
}()
conn, err := grpc.DialContext(context.TODO(), "localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
if err != nil {
log.Fatalf("Client: failed to listen: %v", err)
}
client := protobufs.NewHypergraphComparisonServiceClient(conn)
str, err := client.PerformSync(context.TODO())
if err != nil {
log.Fatalf("Client: failed to stream: %v", err)
}
_, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
if err != nil {
log.Fatalf("Client: failed to sync 1: %v", err)
}
str.CloseSend()
leaves := crypto.CompareLeaves(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
)
fmt.Println("pass completed, orphans:", len(leaves))
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
str, err = client.PerformSync(context.TODO())
if err != nil {
log.Fatalf("Client: failed to stream: %v", err)
}
_, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
if err != nil {
log.Fatalf("Client: failed to sync 2: %v", err)
}
str.CloseSend()
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false)
desc, err := crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().GetByPath(toIntSlice(toUint32Slice(branchfork)))
require.NoError(t, err)
if !bytes.Equal(
desc.(*crypto.LazyVectorCommitmentBranchNode).Commitment,
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(false),
) {
leaves := crypto.CompareLeaves(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
)
fmt.Println("remaining orphans", len(leaves))
}
clientHas := 0
iter, err := clientHypergraphStore.GetVertexDataIterator(shardKey)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
clientHas++
}
// Assume variable distribution, but roughly triple is a safe guess. If it fails, just bump it.
assert.Greater(t, 40, clientHas, "mismatching vertex data entries")
// assert.Greater(t, clientHas, 1, "mismatching vertex data entries")
}
func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
logDuration := func(step string, start time.Time) {
t.Logf("%s took %s", step, time.Since(start))
}
start := time.Now()
dataTrees := make([]*tries.VectorCommitmentTree, 1000)
eg := errgroup.Group{}
eg.SetLimit(1000)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
})
}
eg.Wait()
logDuration("generated data trees", start)
setupStart := time.Now()
serverDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}, 0)
defer serverDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"},
serverDB,
logger,
enc,
inclusionProver,
)
logDuration("server DB/store initialization", setupStart)
const clientCount = 8
clientDBs := make([]*store.PebbleDB, clientCount)
clientStores := make([]*store.PebbleHypergraphStore, clientCount)
clientHGs := make([]*hgcrdt.HypergraphCRDT, clientCount)
serverHypergraphStart := time.Now()
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
logDuration("server hypergraph initialization", serverHypergraphStart)
clientSetupStart := time.Now()
for i := 0; i < clientCount; i++ {
clientDBs[i] = store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestclient%d/store", i)}, 0)
clientStores[i] = store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestclient%d/store", i)},
clientDBs[i],
logger,
enc,
inclusionProver,
)
clientHGs[i] = hgcrdt.NewHypergraph(
logger.With(zap.String("side", fmt.Sprintf("client-%d", i))),
clientStores[i],
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
}
logDuration("client hypergraph initialization", clientSetupStart)
defer func() {
for _, db := range clientDBs {
if db != nil {
db.Close()
}
}
}()
// Seed both hypergraphs with a baseline vertex so they share the shard key.
domain := randomBytes32(t)
initialVertex := hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[0].Commit(inclusionProver, false),
dataTrees[0].GetSize(),
)
seedStart := time.Now()
addVertices(
t,
serverStore,
serverHG,
dataTrees[:1],
initialVertex,
)
logDuration("seed server baseline vertex", seedStart)
for i := 0; i < clientCount; i++ {
start := time.Now()
addVertices(
t,
clientStores[i],
clientHGs[i],
dataTrees[:1],
initialVertex,
)
logDuration(fmt.Sprintf("seed client-%d baseline vertex", i), start)
}
shardKey := application.GetShardKey(initialVertex)
// Start gRPC server backed by the server hypergraph.
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
serverHG,
)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialClient := func() (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Publish initial snapshot so clients can sync during the rounds
initialRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
serverHG.PublishSnapshot(initialRoot)
const rounds = 3
for round := 0; round < rounds; round++ {
currentRound := round
roundStart := time.Now()
c, _ := serverHG.Commit(uint64(currentRound))
fmt.Printf("svr commitment: %x\n", c[shardKey][0])
genStart := time.Now()
updates := generateVertices(
t,
domain,
dataTrees,
inclusionProver,
15,
1+(15*currentRound),
)
logDuration(fmt.Sprintf("round %d vertex generation", currentRound), genStart)
var syncWG sync.WaitGroup
var serverWG sync.WaitGroup
syncWG.Add(clientCount)
serverWG.Add(1)
for clientIdx := 0; clientIdx < clientCount; clientIdx++ {
go func(idx int, round int) {
defer syncWG.Done()
clientSyncStart := time.Now()
clientHG := clientHGs[idx]
conn, client := dialClient()
streamCtx, cancelStream := context.WithTimeout(
context.Background(),
100*time.Second,
)
stream, err := client.PerformSync(streamCtx)
require.NoError(t, err)
_, _ = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, stream.CloseSend())
cancelStream()
conn.Close()
c, _ := clientHGs[idx].Commit(uint64(round))
fmt.Printf("cli commitment: %x\n", c[shardKey][0])
logDuration(fmt.Sprintf("round %d client-%d sync", round, idx), clientSyncStart)
}(clientIdx, currentRound)
}
go func(round int) {
defer serverWG.Done()
serverRoundStart := time.Now()
logger.Info("server applying concurrent updates", zap.Int("round", round))
addVertices(t, serverStore, serverHG, dataTrees[1+(15*round):1+(15*(round+1))], updates...)
logger.Info(
"server applied concurrent updates",
zap.Int("round", round),
zap.Duration("duration", time.Since(serverRoundStart)),
)
logger.Info("server commit starting", zap.Int("round", round))
_, err := serverHG.Commit(uint64(round + 1))
require.NoError(t, err)
logger.Info("server commit finished", zap.Int("round", round))
}(round)
syncWG.Wait()
serverWG.Wait()
logDuration(fmt.Sprintf("round %d total sync", currentRound), roundStart)
}
// Add additional server-only updates after the concurrent sync rounds.
extraStart := time.Now()
extraUpdates := generateVertices(t, domain, dataTrees, inclusionProver, len(dataTrees)-(1+(15*rounds))-1, 1+(15*rounds))
addVertices(t, serverStore, serverHG, dataTrees[1+(15*rounds):], extraUpdates...)
logDuration("server extra updates application", extraStart)
commitStart := time.Now()
_, err := serverHG.Commit(100)
require.NoError(t, err)
_, err = serverHG.Commit(101)
require.NoError(t, err)
logDuration("server final commits", commitStart)
wg := sync.WaitGroup{}
wg.Add(1)
serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
// Publish the server's snapshot so clients can sync against this exact state
serverHG.PublishSnapshot(serverRoot)
// Create a snapshot handle for this shard by doing a sync.
// This is needed because the snapshot manager only creates handles when acquire
// is called.
{
conn, client := dialClient()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, _ = clientHGs[0].SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
_ = stream.CloseSend()
conn.Close()
}
for i := 0; i < 1; i++ {
go func(idx int) {
defer wg.Done()
catchUpStart := time.Now()
_, err = clientHGs[idx].Commit(100)
require.NoError(t, err)
// Final sync to catch up.
conn, client := dialClient()
streamCtx, cancelStream := context.WithTimeout(
context.Background(),
100*time.Second,
)
stream, err := client.PerformSync(streamCtx)
require.NoError(t, err)
_, err = clientHGs[idx].SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
cancelStream()
conn.Close()
_, err = clientHGs[idx].Commit(101)
require.NoError(t, err)
clientRoot := clientHGs[idx].GetVertexAddsSet(shardKey).GetTree().Commit(false)
assert.Equal(t, serverRoot, clientRoot, "client should converge to server state")
logDuration(fmt.Sprintf("client-%d final catch-up", idx), catchUpStart)
}(i)
}
wg.Wait()
}
func buildDataTree(
t *testing.T,
prover *bls48581.KZGInclusionProver,
) *crypto.VectorCommitmentTree {
t.Helper()
tree := &crypto.VectorCommitmentTree{}
b := make([]byte, 20000)
rand.Read(b)
for bytes := range slices.Chunk(b, 64) {
id := sha512.Sum512(bytes)
tree.Insert(id[:], bytes, nil, big.NewInt(int64(len(bytes))))
}
tree.Commit(prover, false)
return tree
}
func addVertices(
t *testing.T,
hStore *store.PebbleHypergraphStore,
hg *hgcrdt.HypergraphCRDT,
dataTrees []*crypto.VectorCommitmentTree,
vertices ...application.Vertex,
) {
t.Helper()
txn, err := hStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range vertices {
id := v.GetID()
require.NoError(t, hStore.SaveVertexTree(txn, id[:], dataTrees[i]))
require.NoError(t, hg.AddVertex(txn, v))
}
require.NoError(t, txn.Commit())
}
func generateVertices(
t *testing.T,
appAddress [32]byte,
dataTrees []*crypto.VectorCommitmentTree,
prover *bls48581.KZGInclusionProver,
count int,
startingIndex int,
) []application.Vertex {
t.Helper()
verts := make([]application.Vertex, count)
for i := 0; i < count; i++ {
addr := randomBytes32(t)
binary.BigEndian.PutUint64(addr[:], uint64(i))
verts[i] = hgcrdt.NewVertex(
appAddress,
addr,
dataTrees[startingIndex+i].Commit(prover, false),
dataTrees[startingIndex+i].GetSize(),
)
}
return verts
}
func randomBytes32(t *testing.T) [32]byte {
t.Helper()
var out [32]byte
_, err := rand.Read(out[:])
require.NoError(t, err)
return out
}
func toUint32Slice(s []int32) []uint32 {
o := []uint32{}
for _, p := range s {
o = append(o, uint32(p))
}
return o
}
func toIntSlice(s []uint32) []int {
o := []int{}
for _, p := range s {
o = append(o, int(p))
}
return o
}
func isPrefix(prefix []int, path []int) bool {
if len(prefix) > len(path) {
return false
}
for i := range prefix {
if prefix[i] != path[i] {
return false
}
}
return true
}
func GetFullPath(key []byte) []int32 {
var nibbles []int32
depth := 0
for {
n1 := getNextNibble(key, depth)
if n1 == -1 {
break
}
nibbles = append(nibbles, n1)
depth += tries.BranchBits
}
return nibbles
}
// getNextNibble returns the next BranchBits bits from the key starting at pos
func getNextNibble(key []byte, pos int) int32 {
startByte := pos / 8
if startByte >= len(key) {
return -1
}
// Calculate how many bits we need from the current byte
startBit := pos % 8
bitsFromCurrentByte := 8 - startBit
result := int(key[startByte] & ((1 << bitsFromCurrentByte) - 1))
if bitsFromCurrentByte >= tries.BranchBits {
// We have enough bits in the current byte
return int32((result >> (bitsFromCurrentByte - tries.BranchBits)) &
tries.BranchMask)
}
// We need bits from the next byte
result = result << (tries.BranchBits - bitsFromCurrentByte)
if startByte+1 < len(key) {
remainingBits := tries.BranchBits - bitsFromCurrentByte
nextByte := int(key[startByte+1])
result |= (nextByte >> (8 - remainingBits))
}
return int32(result & tries.BranchMask)
}
// TestHypergraphSyncWithExpectedRoot tests that clients can request sync
// against a specific snapshot generation by providing an expected root.
// The server should use a matching historical snapshot if available.
func TestHypergraphSyncWithExpectedRoot(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create data trees for vertices
dataTrees := make([]*tries.VectorCommitmentTree, 100)
for i := 0; i < 100; i++ {
dataTrees[i] = buildDataTree(t, inclusionProver)
}
serverDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}, 0)
defer serverDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"},
serverDB,
logger,
enc,
inclusionProver,
)
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create initial vertex to establish shard key
domain := randomBytes32(t)
initialVertex := hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[0].Commit(inclusionProver, false),
dataTrees[0].GetSize(),
)
shardKey := application.GetShardKey(initialVertex)
// Phase 1: Add initial vertices to server and commit
phase1Vertices := make([]application.Vertex, 20)
phase1Vertices[0] = initialVertex
for i := 1; i < 20; i++ {
phase1Vertices[i] = hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
addVertices(t, serverStore, serverHG, dataTrees[:20], phase1Vertices...)
// Commit to get root1
commitResult1, err := serverHG.Commit(1)
require.NoError(t, err)
root1 := commitResult1[shardKey][0]
t.Logf("Root after phase 1: %x", root1)
// Publish root1 as the current snapshot generation
serverHG.PublishSnapshot(root1)
// Start gRPC server early so we can create a snapshot while root1 is current
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialClient := func() (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Helper to create a fresh client hypergraph
clientCounter := 0
createClient := func(name string) (*store.PebbleDB, *hgcrdt.HypergraphCRDT) {
clientCounter++
clientDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestclient%d/store", clientCounter)}, 0)
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestclient%d/store", clientCounter)},
clientDB,
logger,
enc,
inclusionProver,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", name)),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
return clientDB, clientHG
}
// IMPORTANT: Create a snapshot while root1 is current by doing a sync now.
// This snapshot will be preserved when we later publish root2.
t.Log("Creating snapshot for root1 by syncing a client while root1 is current")
{
clientDB, clientHG := createClient("client-snapshot-root1")
conn, client := dialClient()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Verify this client got root1
clientCommit, err := clientHG.Commit(1)
require.NoError(t, err)
require.Equal(t, root1, clientCommit[shardKey][0], "snapshot client should have root1")
conn.Close()
clientDB.Close()
}
// Phase 2: Add more vertices to server and commit
phase2Vertices := make([]application.Vertex, 30)
for i := 0; i < 30; i++ {
phase2Vertices[i] = hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[20+i].Commit(inclusionProver, false),
dataTrees[20+i].GetSize(),
)
}
addVertices(t, serverStore, serverHG, dataTrees[20:50], phase2Vertices...)
// Commit to get root2
commitResult2, err := serverHG.Commit(2)
require.NoError(t, err)
root2 := commitResult2[shardKey][0]
t.Logf("Root after phase 2: %x", root2)
// Publish root2 as the new current snapshot generation
// This preserves the root1 generation (with its snapshot) as a historical generation
serverHG.PublishSnapshot(root2)
// Verify roots are different
require.NotEqual(t, root1, root2, "roots should be different after adding more data")
// Test 1: Sync gets latest state
t.Run("sync gets latest", func(t *testing.T) {
clientDB, clientHG := createClient("client1")
defer clientDB.Close()
conn, client := dialClient()
defer conn.Close()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client to get comparable root
clientCommit, err := clientHG.Commit(1)
require.NoError(t, err)
clientRoot := clientCommit[shardKey][0]
// Client should have synced to the latest (root2)
assert.Equal(t, root2, clientRoot, "client should sync to latest root")
})
// Test 2: Multiple syncs converge to same state
t.Run("multiple syncs converge", func(t *testing.T) {
clientDB, clientHG := createClient("client2")
defer clientDB.Close()
conn, client := dialClient()
defer conn.Close()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client to get comparable root
clientCommit, err := clientHG.Commit(1)
require.NoError(t, err)
clientRoot := clientCommit[shardKey][0]
// Client should have synced to the latest (root2)
assert.Equal(t, root2, clientRoot, "client should sync to latest root")
})
}
// TestHypergraphSyncWithModifiedEntries tests sync behavior when both client
// and server have the same keys but with different values (modified entries).
// This verifies that sync correctly updates entries rather than just adding
// new ones or deleting orphans.
func TestHypergraphSyncWithModifiedEntries(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create enough data trees for all vertices we'll need
numVertices := 50
dataTrees := make([]*tries.VectorCommitmentTree, numVertices*2) // Extra for modified versions
for i := 0; i < len(dataTrees); i++ {
dataTrees[i] = buildDataTree(t, inclusionProver)
}
// Create server and client databases
serverDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}, 0)
defer serverDB.Close()
clientDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}, 0)
defer clientDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"},
serverDB,
logger,
enc,
inclusionProver,
)
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"},
clientDB,
logger,
enc,
inclusionProver,
)
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "client")),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create a shared domain for all vertices
domain := randomBytes32(t)
// Generate fixed addresses that will be used by both client and server
// This ensures they share the same keys
addresses := make([][32]byte, numVertices)
for i := 0; i < numVertices; i++ {
addresses[i] = randomBytes32(t)
}
// Create "original" vertices for the client (using first set of data trees)
clientVertices := make([]application.Vertex, numVertices)
for i := 0; i < numVertices; i++ {
clientVertices[i] = hgcrdt.NewVertex(
domain,
addresses[i], // Same address
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
// Create "modified" vertices for the server (using second set of data trees)
// These have the SAME addresses but DIFFERENT data commitments
serverVertices := make([]application.Vertex, numVertices)
for i := 0; i < numVertices; i++ {
serverVertices[i] = hgcrdt.NewVertex(
domain,
addresses[i], // Same address as client
dataTrees[numVertices+i].Commit(inclusionProver, false), // Different data
dataTrees[numVertices+i].GetSize(),
)
}
shardKey := application.GetShardKey(clientVertices[0])
// Add original vertices to client
t.Log("Adding original vertices to client")
clientTxn, err := clientStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range clientVertices {
id := v.GetID()
require.NoError(t, clientStore.SaveVertexTree(clientTxn, id[:], dataTrees[i]))
require.NoError(t, clientHG.AddVertex(clientTxn, v))
}
require.NoError(t, clientTxn.Commit())
// Add modified vertices to server
t.Log("Adding modified vertices to server")
serverTxn, err := serverStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range serverVertices {
id := v.GetID()
require.NoError(t, serverStore.SaveVertexTree(serverTxn, id[:], dataTrees[numVertices+i]))
require.NoError(t, serverHG.AddVertex(serverTxn, v))
}
require.NoError(t, serverTxn.Commit())
// Commit both hypergraphs
_, err = clientHG.Commit(1)
require.NoError(t, err)
_, err = serverHG.Commit(1)
require.NoError(t, err)
// Verify roots are different before sync (modified entries should cause different roots)
clientRootBefore := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
require.NotEqual(t, clientRootBefore, serverRoot, "roots should differ before sync due to modified entries")
t.Logf("Client root before sync: %x", clientRootBefore)
t.Logf("Server root: %x", serverRoot)
// Publish server snapshot
serverHG.PublishSnapshot(serverRoot)
// Start gRPC server
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialClient := func() (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Perform sync
t.Log("Performing sync to update modified entries")
conn, client := dialClient()
defer conn.Close()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client after sync
_, err = clientHG.Commit(2)
require.NoError(t, err)
// Verify client now matches server
clientRootAfter := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Client root after sync: %x", clientRootAfter)
assert.Equal(t, serverRoot, clientRootAfter, "client should converge to server state after sync with modified entries")
// Verify all entries were updated by comparing the leaves
serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree()
clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree()
diffLeaves := tries.CompareLeaves(serverTree, clientTree)
assert.Empty(t, diffLeaves, "there should be no difference in leaves after sync")
t.Logf("Sync completed successfully - %d entries with same keys but different values were updated", numVertices)
}
// TestHypergraphBidirectionalSyncWithDisjointData tests that when node A has 500
// unique vertices and node B has 500 different unique vertices, syncing in both
// directions results in both nodes having all 1000 vertices.
func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create data trees for all 1000 vertices
numVerticesPerNode := 500
totalVertices := numVerticesPerNode * 2
dataTrees := make([]*tries.VectorCommitmentTree, totalVertices)
eg := errgroup.Group{}
eg.SetLimit(100)
for i := 0; i < totalVertices; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
})
}
eg.Wait()
t.Log("Generated data trees")
// Create databases and stores for both nodes
nodeADB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA/store"}, 0)
defer nodeADB.Close()
nodeBDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB/store"}, 0)
defer nodeBDB.Close()
nodeAStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA/store"},
nodeADB,
logger,
enc,
inclusionProver,
)
nodeBStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB/store"},
nodeBDB,
logger,
enc,
inclusionProver,
)
nodeAHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeA")),
nodeAStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
nodeBHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeB")),
nodeBStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create a shared domain for all vertices
domain := randomBytes32(t)
// Generate vertices for node A (first 500)
nodeAVertices := make([]application.Vertex, numVerticesPerNode)
for i := 0; i < numVerticesPerNode; i++ {
addr := randomBytes32(t)
nodeAVertices[i] = hgcrdt.NewVertex(
domain,
addr,
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
// Generate vertices for node B (second 500, completely different)
nodeBVertices := make([]application.Vertex, numVerticesPerNode)
for i := 0; i < numVerticesPerNode; i++ {
addr := randomBytes32(t)
nodeBVertices[i] = hgcrdt.NewVertex(
domain,
addr,
dataTrees[numVerticesPerNode+i].Commit(inclusionProver, false),
dataTrees[numVerticesPerNode+i].GetSize(),
)
}
shardKey := application.GetShardKey(nodeAVertices[0])
// Add vertices to node A
t.Log("Adding 500 vertices to node A")
nodeATxn, err := nodeAStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeAVertices {
id := v.GetID()
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
}
require.NoError(t, nodeATxn.Commit())
// Add vertices to node B
t.Log("Adding 500 different vertices to node B")
nodeBTxn, err := nodeBStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeBVertices {
id := v.GetID()
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[numVerticesPerNode+i]))
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
}
require.NoError(t, nodeBTxn.Commit())
// Commit both hypergraphs
_, err = nodeAHG.Commit(1)
require.NoError(t, err)
_, err = nodeBHG.Commit(1)
require.NoError(t, err)
nodeARootBefore := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
nodeBRootBefore := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Node A root before sync: %x", nodeARootBefore)
t.Logf("Node B root before sync: %x", nodeBRootBefore)
require.NotEqual(t, nodeARootBefore, nodeBRootBefore, "roots should differ before sync")
// Helper to set up gRPC server for a hypergraph
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
go func() {
_ = grpcServer.Serve(lis)
}()
return lis, grpcServer
}
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Step 1: Node A syncs from Node B (as server)
// Node A should receive Node B's 500 vertices
t.Log("Step 1: Node A syncs from Node B (B is server)")
nodeBHG.PublishSnapshot(nodeBRootBefore)
lisB, serverB := setupServer(nodeBHG)
defer serverB.Stop()
connB, clientB := dialClient(lisB)
streamB, err := clientB.PerformSync(context.Background())
require.NoError(t, err)
_, err = nodeAHG.SyncFrom(
streamB,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, streamB.CloseSend())
connB.Close()
_, err = nodeAHG.Commit(2)
require.NoError(t, err)
nodeARootAfterFirstSync := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Node A root after syncing from B: %x", nodeARootAfterFirstSync)
// Step 2: Node B syncs from Node A (as server)
// Node B should receive Node A's 500 vertices
t.Log("Step 2: Node B syncs from Node A (A is server)")
nodeAHG.PublishSnapshot(nodeARootAfterFirstSync)
lisA, serverA := setupServer(nodeAHG)
defer serverA.Stop()
connA, clientA := dialClient(lisA)
streamA, err := clientA.PerformSync(context.Background())
require.NoError(t, err)
_, err = nodeBHG.SyncFrom(
streamA,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, streamA.CloseSend())
connA.Close()
_, err = nodeBHG.Commit(2)
require.NoError(t, err)
// Verify both nodes have converged
nodeARootFinal := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
nodeBRootFinal := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Node A final root: %x", nodeARootFinal)
t.Logf("Node B final root: %x", nodeBRootFinal)
assert.Equal(t, nodeARootFinal, nodeBRootFinal, "both nodes should have identical roots after bidirectional sync")
// Verify the tree contains all 1000 vertices
nodeATree := nodeAHG.GetVertexAddsSet(shardKey).GetTree()
nodeBTree := nodeBHG.GetVertexAddsSet(shardKey).GetTree()
nodeALeaves := tries.GetAllLeaves(
nodeATree.SetType,
nodeATree.PhaseType,
nodeATree.ShardKey,
nodeATree.Root,
)
nodeBLeaves := tries.GetAllLeaves(
nodeBTree.SetType,
nodeBTree.PhaseType,
nodeBTree.ShardKey,
nodeBTree.Root,
)
nodeALeafCount := 0
for _, leaf := range nodeALeaves {
if leaf != nil {
nodeALeafCount++
}
}
nodeBLeafCount := 0
for _, leaf := range nodeBLeaves {
if leaf != nil {
nodeBLeafCount++
}
}
t.Logf("Node A has %d leaves, Node B has %d leaves", nodeALeafCount, nodeBLeafCount)
assert.Equal(t, totalVertices, nodeALeafCount, "Node A should have all 1000 vertices")
assert.Equal(t, totalVertices, nodeBLeafCount, "Node B should have all 1000 vertices")
// Verify no differences between the trees
diffLeaves := tries.CompareLeaves(nodeATree, nodeBTree)
assert.Empty(t, diffLeaves, "there should be no differences between the trees")
t.Log("Bidirectional sync test passed - both nodes have all 1000 vertices")
}
// TestHypergraphBidirectionalSyncClientDriven tests the new client-driven sync
// protocol (PerformSync/SyncFrom) with two nodes having disjoint data sets.
// Node A has 500 unique vertices and node B has 500 different unique vertices.
// After syncing in both directions, both nodes should have all 1000 vertices.
func TestHypergraphBidirectionalSyncClientDriven(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create data trees for all 1000 vertices
numVerticesPerNode := 500
totalVertices := numVerticesPerNode * 2
dataTrees := make([]*tries.VectorCommitmentTree, totalVertices)
eg := errgroup.Group{}
eg.SetLimit(100)
for i := 0; i < totalVertices; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
})
}
eg.Wait()
t.Log("Generated data trees")
// Create databases and stores for both nodes
nodeADB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA_cd/store"}, 0)
defer nodeADB.Close()
nodeBDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB_cd/store"}, 0)
defer nodeBDB.Close()
nodeAStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA_cd/store"},
nodeADB,
logger,
enc,
inclusionProver,
)
nodeBStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB_cd/store"},
nodeBDB,
logger,
enc,
inclusionProver,
)
nodeAHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeA-cd")),
nodeAStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
nodeBHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeB-cd")),
nodeBStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create a shared domain for all vertices
domain := randomBytes32(t)
// Generate vertices for node A (first 500)
nodeAVertices := make([]application.Vertex, numVerticesPerNode)
for i := 0; i < numVerticesPerNode; i++ {
addr := randomBytes32(t)
nodeAVertices[i] = hgcrdt.NewVertex(
domain,
addr,
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
// Generate vertices for node B (second 500, completely different)
nodeBVertices := make([]application.Vertex, numVerticesPerNode)
for i := 0; i < numVerticesPerNode; i++ {
addr := randomBytes32(t)
nodeBVertices[i] = hgcrdt.NewVertex(
domain,
addr,
dataTrees[numVerticesPerNode+i].Commit(inclusionProver, false),
dataTrees[numVerticesPerNode+i].GetSize(),
)
}
shardKey := application.GetShardKey(nodeAVertices[0])
// Add vertices to node A
t.Log("Adding 500 vertices to node A")
nodeATxn, err := nodeAStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeAVertices {
id := v.GetID()
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
}
require.NoError(t, nodeATxn.Commit())
// Add vertices to node B
t.Log("Adding 500 different vertices to node B")
nodeBTxn, err := nodeBStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeBVertices {
id := v.GetID()
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[numVerticesPerNode+i]))
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
}
require.NoError(t, nodeBTxn.Commit())
// Commit both hypergraphs
_, err = nodeAHG.Commit(1)
require.NoError(t, err)
_, err = nodeBHG.Commit(1)
require.NoError(t, err)
nodeARootBefore := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
nodeBRootBefore := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Node A root before sync: %x", nodeARootBefore)
t.Logf("Node B root before sync: %x", nodeBRootBefore)
require.NotEqual(t, nodeARootBefore, nodeBRootBefore, "roots should differ before sync")
// Helper to set up gRPC server for a hypergraph
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
go func() {
_ = grpcServer.Serve(lis)
}()
return lis, grpcServer
}
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Convert tries.ShardKey to bytes for SyncFrom
shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:])
_ = shardKeyBytes // Used below in the SyncFrom call
// Step 1: Node A syncs from Node B (as server) using client-driven sync
// Node A should receive Node B's 500 vertices
t.Log("Step 1: Node A syncs from Node B using PerformSync (B is server)")
lisB, serverB := setupServer(nodeBHG)
defer serverB.Stop()
connB, clientB := dialClient(lisB)
streamB, err := clientB.PerformSync(context.Background())
require.NoError(t, err)
_, err = nodeAHG.SyncFrom(
streamB,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, streamB.CloseSend())
connB.Close()
_, err = nodeAHG.Commit(2)
require.NoError(t, err)
nodeARootAfterFirstSync := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Node A root after syncing from B: %x", nodeARootAfterFirstSync)
// Step 2: Node B syncs from Node A (as server) using client-driven sync
// Node B should receive Node A's 500 vertices
t.Log("Step 2: Node B syncs from Node A using PerformSync (A is server)")
lisA, serverA := setupServer(nodeAHG)
defer serverA.Stop()
connA, clientA := dialClient(lisA)
streamA, err := clientA.PerformSync(context.Background())
require.NoError(t, err)
_, err = nodeBHG.SyncFrom(
streamA,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, streamA.CloseSend())
connA.Close()
_, err = nodeBHG.Commit(2)
require.NoError(t, err)
// Verify both nodes have converged
nodeARootFinal := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
nodeBRootFinal := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Node A final root: %x", nodeARootFinal)
t.Logf("Node B final root: %x", nodeBRootFinal)
assert.Equal(t, nodeARootFinal, nodeBRootFinal, "both nodes should have identical roots after bidirectional sync")
// Verify the tree contains all 1000 vertices
nodeATree := nodeAHG.GetVertexAddsSet(shardKey).GetTree()
nodeBTree := nodeBHG.GetVertexAddsSet(shardKey).GetTree()
nodeALeaves := tries.GetAllLeaves(
nodeATree.SetType,
nodeATree.PhaseType,
nodeATree.ShardKey,
nodeATree.Root,
)
nodeBLeaves := tries.GetAllLeaves(
nodeBTree.SetType,
nodeBTree.PhaseType,
nodeBTree.ShardKey,
nodeBTree.Root,
)
nodeALeafCount := 0
for _, leaf := range nodeALeaves {
if leaf != nil {
nodeALeafCount++
}
}
nodeBLeafCount := 0
for _, leaf := range nodeBLeaves {
if leaf != nil {
nodeBLeafCount++
}
}
t.Logf("Node A has %d leaves, Node B has %d leaves", nodeALeafCount, nodeBLeafCount)
assert.Equal(t, totalVertices, nodeALeafCount, "Node A should have all 1000 vertices")
assert.Equal(t, totalVertices, nodeBLeafCount, "Node B should have all 1000 vertices")
// Verify no differences between the trees
diffLeaves := tries.CompareLeaves(nodeATree, nodeBTree)
assert.Empty(t, diffLeaves, "there should be no differences between the trees")
t.Log("Client-driven bidirectional sync test passed - both nodes have all 1000 vertices")
}
// TestHypergraphSyncWithPrefixLengthMismatch tests sync behavior when one node
// has a deeper tree structure (longer prefix path) than the other. This tests
// the prefix length mismatch handling in the walk function.
//
// We create two nodes with different tree structures that will cause prefix
// length mismatches during sync. Node A has deeper prefixes at certain branches
// while Node B has shallower but wider structures.
func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create data trees
numTrees := 20
dataTrees := make([]*tries.VectorCommitmentTree, numTrees)
for i := 0; i < numTrees; i++ {
dataTrees[i] = buildDataTree(t, inclusionProver)
}
// Fixed domain (appAddress) - all vertices must share this to be in the same shard
fixedDomain := [32]byte{}
// Helper to create a vertex with a specific dataAddress path suffix.
// The vertex ID is [appAddress (32 bytes) || dataAddress (32 bytes)].
// The path is derived from the full 64-byte ID.
// With BranchBits=6, nibbles 0-41 come from appAddress, nibbles 42+ from dataAddress.
// Since all vertices share the same appAddress, their paths share the first 42 nibbles.
// Path differences come from dataAddress (nibbles 42+).
//
// We control the "suffix path" starting at nibble 42 by setting bits in dataAddress.
createVertexWithDataPath := func(suffixPath []int, uniqueSuffix uint64, treeIdx int) application.Vertex {
dataAddr := [32]byte{}
// Pack the suffix path nibbles into bits of dataAddress
// Nibble 42 starts at bit 0 of dataAddress
bitPos := 0
for _, nibble := range suffixPath {
byteIdx := bitPos / 8
bitOffset := bitPos % 8
if bitOffset+6 <= 8 {
// Nibble fits in one byte
dataAddr[byteIdx] |= byte(nibble << (8 - bitOffset - 6))
} else {
// Nibble spans two bytes
bitsInFirstByte := 8 - bitOffset
dataAddr[byteIdx] |= byte(nibble >> (6 - bitsInFirstByte))
if byteIdx+1 < 32 {
dataAddr[byteIdx+1] |= byte(nibble << (8 - (6 - bitsInFirstByte)))
}
}
bitPos += 6
}
// Add unique suffix in the last 8 bytes to make each vertex distinct
binary.BigEndian.PutUint64(dataAddr[24:], uniqueSuffix)
return hgcrdt.NewVertex(
fixedDomain,
dataAddr,
dataTrees[treeIdx].Commit(inclusionProver, false),
dataTrees[treeIdx].GetSize(),
)
}
// Run the test in both directions
runSyncTest := func(direction string) {
t.Run(direction, func(t *testing.T) {
// Create fresh databases for this sub-test
nodeADB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeA_%s/store", direction)}, 0)
defer nodeADB.Close()
nodeBDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeB_%s/store", direction)}, 0)
defer nodeBDB.Close()
nodeAStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeA_%s/store", direction)},
nodeADB,
logger,
enc,
inclusionProver,
)
nodeBStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeB_%s/store", direction)},
nodeBDB,
logger,
enc,
inclusionProver,
)
nodeAHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeA-"+direction)),
nodeAStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
nodeBHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeB-"+direction)),
nodeBStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create vertices with specific path structures to cause prefix mismatches.
// All vertices share the same appAddress (fixedDomain), so they're in the same shard.
// Their paths share the first 42 nibbles (all zeros from fixedDomain).
// Path differences come from dataAddress, starting at nibble 42.
//
// We create vertices with suffix paths (nibbles 42+) that differ:
// Node A: suffix paths 0,1,x and 0,2,x and 1,x
// Node B: suffix paths 0,0,x and 0,1,x and 0,3,x and 1,x
//
// This creates prefix mismatch scenarios in the dataAddress portion of the tree.
t.Log("Creating Node A structure")
nodeAVertices := []application.Vertex{
createVertexWithDataPath([]int{0, 1}, 100, 0), // suffix path 0,1,...
createVertexWithDataPath([]int{0, 2}, 101, 1), // suffix path 0,2,...
createVertexWithDataPath([]int{1}, 102, 2), // suffix path 1,...
}
t.Logf("Created Node A vertices with suffix paths: 0,1; 0,2; 1")
t.Log("Creating Node B structure")
nodeBVertices := []application.Vertex{
createVertexWithDataPath([]int{0, 0}, 200, 3), // suffix path 0,0,...
createVertexWithDataPath([]int{0, 1}, 201, 4), // suffix path 0,1,...
createVertexWithDataPath([]int{0, 3}, 202, 5), // suffix path 0,3,...
createVertexWithDataPath([]int{1}, 203, 6), // suffix path 1,...
}
t.Logf("Created Node B vertices with suffix paths: 0,0; 0,1; 0,3; 1")
// Verify the paths - show nibbles 40-50 where the difference should be
t.Log("Node A vertices paths (showing nibbles 40-50 where dataAddress starts):")
for i, v := range nodeAVertices {
id := v.GetID()
path := GetFullPath(id[:])
// Nibble 42 is where dataAddress bits start (256/6 = 42.67)
start := 40
end := min(50, len(path))
if end > start {
t.Logf(" Vertex %d path[%d:%d]: %v", i, start, end, path[start:end])
}
}
t.Log("Node B vertices paths (showing nibbles 40-50 where dataAddress starts):")
for i, v := range nodeBVertices {
id := v.GetID()
path := GetFullPath(id[:])
start := 40
end := min(50, len(path))
if end > start {
t.Logf(" Vertex %d path[%d:%d]: %v", i, start, end, path[start:end])
}
}
shardKey := application.GetShardKey(nodeAVertices[0])
// Add vertices to Node A
nodeATxn, err := nodeAStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeAVertices {
id := v.GetID()
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
}
require.NoError(t, nodeATxn.Commit())
// Add vertices to Node B
nodeBTxn, err := nodeBStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeBVertices {
id := v.GetID()
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[3+i]))
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
}
require.NoError(t, nodeBTxn.Commit())
// Commit both
_, err = nodeAHG.Commit(1)
require.NoError(t, err)
_, err = nodeBHG.Commit(1)
require.NoError(t, err)
nodeARoot := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
nodeBRoot := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Node A root: %x", nodeARoot)
t.Logf("Node B root: %x", nodeBRoot)
// Setup gRPC server
const bufSize = 1 << 20
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
go func() { _ = grpcServer.Serve(lis) }()
return lis, grpcServer
}
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) { return lis.Dial() }
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
var serverHG, clientHG *hgcrdt.HypergraphCRDT
var serverRoot []byte
if direction == "A_syncs_from_B" {
serverHG = nodeBHG
clientHG = nodeAHG
serverRoot = nodeBRoot
} else {
serverHG = nodeAHG
clientHG = nodeBHG
serverRoot = nodeARoot
}
serverHG.PublishSnapshot(serverRoot)
lis, grpcServer := setupServer(serverHG)
defer grpcServer.Stop()
// Count client leaves before sync
clientTreeBefore := clientHG.GetVertexAddsSet(shardKey).GetTree()
clientLeavesBefore := tries.GetAllLeaves(
clientTreeBefore.SetType,
clientTreeBefore.PhaseType,
clientTreeBefore.ShardKey,
clientTreeBefore.Root,
)
clientLeafCountBefore := 0
for _, leaf := range clientLeavesBefore {
if leaf != nil {
clientLeafCountBefore++
}
}
t.Logf("Client has %d leaves before sync", clientLeafCountBefore)
conn, client := dialClient(lis)
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
conn.Close()
_, err = clientHG.Commit(2)
require.NoError(t, err)
// In CRDT sync, the client receives data from the server and MERGES it.
// The client should now have BOTH its original vertices AND the server's vertices.
// So the client root should differ from both original roots (it's a superset).
clientRoot := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Client root after sync: %x", clientRoot)
// Get all leaves from the client tree after sync
clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree()
clientLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
clientLeafCount := 0
for _, leaf := range clientLeaves {
if leaf != nil {
clientLeafCount++
}
}
// After sync, client should have received server's vertices (merged with its own)
// The client should have at least as many leaves as it started with
assert.GreaterOrEqual(t, clientLeafCount, clientLeafCountBefore,
"client should not lose leaves during sync")
// Client should have gained some leaves from the server (unless they already had them all)
t.Logf("Sync %s completed - client went from %d to %d leaves",
direction, clientLeafCountBefore, clientLeafCount)
// Verify the sync actually transferred data by checking that server's vertices are now in client
serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree()
serverLeaves := tries.GetAllLeaves(
serverTree.SetType,
serverTree.PhaseType,
serverTree.ShardKey,
serverTree.Root,
)
serverLeafCount := 0
for _, leaf := range serverLeaves {
if leaf != nil {
serverLeafCount++
}
}
t.Logf("Server has %d leaves", serverLeafCount)
// The client should have at least as many leaves as the server
// (since it's merging server data into its own)
assert.GreaterOrEqual(t, clientLeafCount, serverLeafCount,
"client should have at least as many leaves as server after sync")
})
}
// Test both directions
runSyncTest("A_syncs_from_B")
runSyncTest("B_syncs_from_A")
}
// TestMainnetBlossomsubFrameReceptionAndHypersync is an integration test that:
// 1. Connects to mainnet blossomsub using real bootstrap peers
// 2. Subscribes to the global frame bitmask (0x0000) as done in global_consensus_engine.go
// 3. Receives a real frame from a global prover on mainnet
// 4. Performs hypersync on the prover shard (000000ffffffff...ffffffff)
// 5. Confirms the synced data matches the prover root commitment from the frame
//
// This test requires network access and may take up to 5 minutes to receive a frame.
// Run with: go test -v -timeout 10m -run TestMainnetBlossomsubFrameReceptionAndHypersync
func TestMainnetBlossomsubFrameReceptionAndHypersync(t *testing.T) {
if testing.Short() {
t.Skip("skipping mainnet integration test in short mode")
}
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// The prover shard key from global consensus:
// L1 = [0x00, 0x00, 0x00], L2 = bytes.Repeat([]byte{0xff}, 32)
proverShardKey := tries.ShardKey{
L1: [3]byte{0x00, 0x00, 0x00},
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
}
// Frame bitmask from global consensus: []byte{0x00, 0x00}
globalFrameBitmask := []byte{0x00, 0x00}
// Create in-memory hypergraph store for the client
clientDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"}, 0)
defer clientDB.Close()
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"},
clientDB,
logger,
enc,
inclusionProver,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "mainnet-client")),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Generate a random peer key for this test node
peerPrivKey, _, err := pcrypto.GenerateEd448Key(rand.Reader)
require.NoError(t, err)
peerPrivKeyBytes, err := peerPrivKey.Raw()
require.NoError(t, err)
// Create P2P config with mainnet bootstrap peers
p2pConfig := &config.P2PConfig{
ListenMultiaddr: "/ip4/0.0.0.0/udp/0/quic-v1", // Use random port
BootstrapPeers: config.BootstrapPeers,
PeerPrivKey: fmt.Sprintf("%x", peerPrivKeyBytes),
Network: 0, // Mainnet
D: 8,
DLo: 6,
DHi: 12,
DScore: 4,
DOut: 2,
HistoryLength: 5,
HistoryGossip: 3,
DLazy: 6,
GossipFactor: 0.25,
GossipRetransmission: 3,
HeartbeatInitialDelay: 100 * time.Millisecond,
HeartbeatInterval: 1 * time.Second,
FanoutTTL: 60 * time.Second,
PrunePeers: 16,
PruneBackoff: time.Minute,
UnsubscribeBackoff: 10 * time.Second,
Connectors: 8,
MaxPendingConnections: 128,
ConnectionTimeout: 30 * time.Second,
DirectConnectTicks: 300,
DirectConnectInitialDelay: 1 * time.Second,
OpportunisticGraftTicks: 60,
OpportunisticGraftPeers: 2,
GraftFloodThreshold: 10 * time.Second,
MaxIHaveLength: 5000,
MaxIHaveMessages: 10,
MaxIDontWantMessages: 10,
IWantFollowupTime: 3 * time.Second,
IDontWantMessageThreshold: 10000,
IDontWantMessageTTL: 3,
MinBootstrapPeers: 1,
BootstrapParallelism: 4,
DiscoveryParallelism: 4,
DiscoveryPeerLookupLimit: 100,
PingTimeout: 30 * time.Second,
PingPeriod: time.Minute,
PingAttempts: 3,
LowWatermarkConnections: -1,
HighWatermarkConnections: -1,
SubscriptionQueueSize: 128,
ValidateQueueSize: 128,
ValidateWorkers: 4,
PeerOutboundQueueSize: 128,
}
engineConfig := &config.EngineConfig{}
// Create a temporary config directory
configDir, err := os.MkdirTemp("", "quil-test-*")
require.NoError(t, err)
defer os.RemoveAll(configDir)
// Create connectivity cache file to bypass the connectivity test
// The cache file must be named "connectivity-check-<coreId>" and exist in configDir
connectivityCachePath := fmt.Sprintf("%s/connectivity-check-0", configDir)
err = os.WriteFile(connectivityCachePath, []byte(time.Now().Format(time.RFC3339)), 0644)
require.NoError(t, err)
t.Log("Connecting to mainnet blossomsub...")
// Create the real blossomsub instance
pubsub := p2p.NewBlossomSub(
p2pConfig,
engineConfig,
logger.Named("blossomsub"),
0,
p2p.ConfigDir(configDir),
)
defer pubsub.Close()
t.Logf("Connected to mainnet with peer ID: %x", pubsub.GetPeerID())
t.Logf("Bootstrap peers: %d", len(config.BootstrapPeers))
// Create a channel to receive frames
frameReceived := make(chan *protobufs.GlobalFrame, 1)
// Create a peer info manager to store peer reachability info
// We use a simple in-memory map to store peer info from the peer info bitmask
peerInfoMap := make(map[string]*tp2p.PeerInfo)
var peerInfoMu sync.RWMutex
// Create a key registry map to map prover addresses to identity peer IDs
// Key: prover address ([]byte as string), Value: identity peer ID
keyRegistryMap := make(map[string]peer.ID)
var keyRegistryMu sync.RWMutex
// Peer info bitmask from global consensus: []byte{0x00, 0x00, 0x00, 0x00}
globalPeerInfoBitmask := []byte{0x00, 0x00, 0x00, 0x00}
// Subscribe to peer info bitmask - this handles both PeerInfo and KeyRegistry messages
t.Log("Subscribing to global peer info bitmask...")
err = pubsub.Subscribe(globalPeerInfoBitmask, func(message *pb.Message) error {
if len(message.Data) < 4 {
return nil
}
// Check type prefix
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
switch typePrefix {
case protobufs.PeerInfoType:
peerInfoMsg := &protobufs.PeerInfo{}
if err := peerInfoMsg.FromCanonicalBytes(message.Data); err != nil {
t.Logf("Failed to unmarshal peer info: %v", err)
return nil
}
// Validate signature using Ed448
if len(peerInfoMsg.Signature) == 0 || len(peerInfoMsg.PublicKey) == 0 {
return nil
}
// Create a copy without signature for validation
infoCopy := &protobufs.PeerInfo{
PeerId: peerInfoMsg.PeerId,
Reachability: peerInfoMsg.Reachability,
Timestamp: peerInfoMsg.Timestamp,
Version: peerInfoMsg.Version,
PatchNumber: peerInfoMsg.PatchNumber,
Capabilities: peerInfoMsg.Capabilities,
PublicKey: peerInfoMsg.PublicKey,
LastReceivedFrame: peerInfoMsg.LastReceivedFrame,
LastGlobalHeadFrame: peerInfoMsg.LastGlobalHeadFrame,
}
msg, err := infoCopy.ToCanonicalBytes()
if err != nil {
return nil
}
// Validate Ed448 signature
if !ed448.Verify(ed448.PublicKey(peerInfoMsg.PublicKey), msg, peerInfoMsg.Signature, "") {
return nil
}
// Convert and store peer info
reachability := []tp2p.Reachability{}
for _, r := range peerInfoMsg.Reachability {
reachability = append(reachability, tp2p.Reachability{
Filter: r.Filter,
PubsubMultiaddrs: r.PubsubMultiaddrs,
StreamMultiaddrs: r.StreamMultiaddrs,
})
}
peerInfoMu.Lock()
peerInfoMap[string(peerInfoMsg.PeerId)] = &tp2p.PeerInfo{
PeerId: peerInfoMsg.PeerId,
Reachability: reachability,
Cores: uint32(len(reachability)),
LastSeen: time.Now().UnixMilli(),
Version: peerInfoMsg.Version,
PatchNumber: peerInfoMsg.PatchNumber,
LastReceivedFrame: peerInfoMsg.LastReceivedFrame,
LastGlobalHeadFrame: peerInfoMsg.LastGlobalHeadFrame,
}
peerInfoMu.Unlock()
peerIdStr := peer.ID(peerInfoMsg.PeerId).String()
t.Logf("Received peer info for %s with %d reachability entries",
peerIdStr, len(reachability))
case protobufs.KeyRegistryType:
keyRegistry := &protobufs.KeyRegistry{}
if err := keyRegistry.FromCanonicalBytes(message.Data); err != nil {
t.Logf("Failed to unmarshal key registry: %v", err)
return nil
}
// We need identity key and prover key to establish the mapping
if keyRegistry.IdentityKey == nil || len(keyRegistry.IdentityKey.KeyValue) == 0 {
return nil
}
if keyRegistry.ProverKey == nil || len(keyRegistry.ProverKey.KeyValue) == 0 {
return nil
}
// Derive peer ID from identity key
pk, err := pcrypto.UnmarshalEd448PublicKey(keyRegistry.IdentityKey.KeyValue)
if err != nil {
t.Logf("Failed to unmarshal identity key: %v", err)
return nil
}
identityPeerID, err := peer.IDFromPublicKey(pk)
if err != nil {
t.Logf("Failed to derive peer ID from identity key: %v", err)
return nil
}
// Derive prover address from prover key (Poseidon hash)
proverAddrBI, err := poseidon.HashBytes(keyRegistry.ProverKey.KeyValue)
if err != nil {
t.Logf("Failed to derive prover address: %v", err)
return nil
}
proverAddress := proverAddrBI.FillBytes(make([]byte, 32))
// Store the mapping: prover address -> identity peer ID
keyRegistryMu.Lock()
keyRegistryMap[string(proverAddress)] = identityPeerID
keyRegistryMu.Unlock()
t.Logf("Received key registry: prover %x -> peer %s",
proverAddress, identityPeerID.String())
}
return nil
})
require.NoError(t, err)
// Register a validator for peer info messages with age checks
err = pubsub.RegisterValidator(globalPeerInfoBitmask, func(peerID peer.ID, message *pb.Message) tp2p.ValidationResult {
if len(message.Data) < 4 {
return tp2p.ValidationResultReject
}
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
now := time.Now().UnixMilli()
switch typePrefix {
case protobufs.PeerInfoType:
peerInfo := &protobufs.PeerInfo{}
if err := peerInfo.FromCanonicalBytes(message.Data); err != nil {
return tp2p.ValidationResultReject
}
// Age checks: timestamp must be within 1 second in the past, 5 seconds in the future
if peerInfo.Timestamp < now-1000 {
t.Logf("Rejecting peer info: timestamp too old (%d < %d)", peerInfo.Timestamp, now-1000)
return tp2p.ValidationResultReject
}
if peerInfo.Timestamp > now+5000 {
t.Logf("Ignoring peer info: timestamp too far in future (%d > %d)", peerInfo.Timestamp, now+5000)
return tp2p.ValidationResultIgnore
}
case protobufs.KeyRegistryType:
keyRegistry := &protobufs.KeyRegistry{}
if err := keyRegistry.FromCanonicalBytes(message.Data); err != nil {
return tp2p.ValidationResultReject
}
// Age checks: LastUpdated must be within 1 second in the past, 5 seconds in the future
if int64(keyRegistry.LastUpdated) < now-1000 {
t.Logf("Rejecting key registry: timestamp too old (%d < %d)", keyRegistry.LastUpdated, now-1000)
return tp2p.ValidationResultReject
}
if int64(keyRegistry.LastUpdated) > now+5000 {
t.Logf("Ignoring key registry: timestamp too far in future (%d > %d)", keyRegistry.LastUpdated, now+5000)
return tp2p.ValidationResultIgnore
}
default:
return tp2p.ValidationResultIgnore
}
return tp2p.ValidationResultAccept
}, true)
require.NoError(t, err)
// Subscribe to frame messages
t.Log("Subscribing to global frame bitmask...")
err = pubsub.Subscribe(globalFrameBitmask, func(message *pb.Message) error {
t.Logf("Received message on frame bitmask, data length: %d", len(message.Data))
if len(message.Data) < 4 {
return nil
}
// Check type prefix
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
t.Logf("Message type prefix: %d (GlobalFrameType=%d)", typePrefix, protobufs.GlobalFrameType)
if typePrefix != protobufs.GlobalFrameType {
return nil
}
frame := &protobufs.GlobalFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
t.Logf("Failed to unmarshal frame: %v", err)
return nil
}
t.Logf("Received frame %d from prover %x with root %x",
frame.Header.FrameNumber,
frame.Header.Prover,
frame.Header.ProverTreeCommitment)
select {
case frameReceived <- frame:
default:
}
return nil
})
require.NoError(t, err)
// Register a validator for frame messages with age checks
err = pubsub.RegisterValidator(globalFrameBitmask, func(peerID peer.ID, message *pb.Message) tp2p.ValidationResult {
if len(message.Data) < 4 {
return tp2p.ValidationResultReject
}
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
if typePrefix != protobufs.GlobalFrameType {
return tp2p.ValidationResultIgnore
}
frame := &protobufs.GlobalFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
t.Logf("Frame validation: failed to unmarshal: %v", err)
return tp2p.ValidationResultReject
}
// Check signature is present
if frame.Header.PublicKeySignatureBls48581 == nil ||
frame.Header.PublicKeySignatureBls48581.PublicKey == nil ||
frame.Header.PublicKeySignatureBls48581.PublicKey.KeyValue == nil {
t.Logf("Frame validation: missing signature")
return tp2p.ValidationResultReject
}
// Age check: frame must be within 120 seconds
frameAge := time.Since(time.UnixMilli(frame.Header.Timestamp))
if frameAge > 120*time.Second {
t.Logf("Frame validation: too old (age=%v)", frameAge)
return tp2p.ValidationResultIgnore
}
t.Logf("Frame validation: accepting frame %d (age=%v)", frame.Header.FrameNumber, frameAge)
return tp2p.ValidationResultAccept
}, true)
require.NoError(t, err)
t.Log("Waiting for a global frame from mainnet (this may take up to 20 minutes)...")
// Wait for a frame with a longer timeout for mainnet - frames can take a while
var receivedFrame *protobufs.GlobalFrame
select {
case receivedFrame = <-frameReceived:
t.Logf("Successfully received frame %d!", receivedFrame.Header.FrameNumber)
case <-time.After(20 * time.Minute):
t.Fatal("timeout waiting for frame from mainnet - ensure network connectivity")
}
// Verify frame has required fields
require.NotNil(t, receivedFrame.Header, "frame must have header")
require.NotEmpty(t, receivedFrame.Header.Prover, "frame must have prover")
require.NotEmpty(t, receivedFrame.Header.ProverTreeCommitment, "frame must have prover tree commitment")
expectedRoot := receivedFrame.Header.ProverTreeCommitment
proverAddress := receivedFrame.Header.Prover // This is the prover ADDRESS (hash of BLS key), not a peer ID
t.Logf("Frame details:")
t.Logf(" Frame number: %d", receivedFrame.Header.FrameNumber)
t.Logf(" Prover address: %x", proverAddress)
t.Logf(" Prover root commitment: %x", expectedRoot)
// Now we need to find the prover's peer info to connect and sync
// The prover address (in frame) needs to be mapped to a peer ID via key registry
t.Log("Looking up prover peer info...")
// Helper function to get prover's identity peer ID from key registry
getProverPeerID := func() (peer.ID, bool) {
keyRegistryMu.RLock()
defer keyRegistryMu.RUnlock()
peerID, ok := keyRegistryMap[string(proverAddress)]
return peerID, ok
}
// Helper function to get multiaddr from peer info map using peer ID
getMultiaddrForPeer := func(peerID peer.ID) string {
peerInfoMu.RLock()
defer peerInfoMu.RUnlock()
info, ok := peerInfoMap[string([]byte(peerID))]
if !ok || len(info.Reachability) == 0 {
return ""
}
// Try stream multiaddrs first (for direct gRPC connection)
for _, r := range info.Reachability {
if len(r.StreamMultiaddrs) > 0 {
return r.StreamMultiaddrs[0]
}
}
// Fall back to pubsub multiaddrs
for _, r := range info.Reachability {
if len(r.PubsubMultiaddrs) > 0 {
return r.PubsubMultiaddrs[0]
}
}
return ""
}
// Wait for key registry and peer info to arrive (provers broadcast every 5 minutes)
t.Log("Waiting for prover key registry and peer info (up to 10 minutes)...")
var proverPeerID peer.ID
var proverMultiaddr string
timeout := time.After(10 * time.Minute)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
waitLoop:
for {
select {
case <-timeout:
t.Log("Timeout waiting for prover info")
break waitLoop
case <-ticker.C:
// First try to get the peer ID from key registry
if proverPeerID == "" {
if pID, ok := getProverPeerID(); ok {
proverPeerID = pID
t.Logf("Found key registry: prover address %x -> peer ID %s", proverAddress, proverPeerID.String())
}
}
// If we have the peer ID, try to get the multiaddr from peer info
if proverPeerID != "" {
proverMultiaddr = getMultiaddrForPeer(proverPeerID)
if proverMultiaddr != "" {
t.Logf("Found prover peer info from peer info bitmask!")
break waitLoop
}
}
// Log progress
keyRegistryMu.RLock()
peerInfoMu.RLock()
t.Logf("Still waiting... key registries: %d, peer infos: %d, have prover peer ID: %v",
len(keyRegistryMap), len(peerInfoMap), proverPeerID != "")
peerInfoMu.RUnlock()
keyRegistryMu.RUnlock()
}
}
// If we have peer ID but no multiaddr, try connected peers
if proverPeerID != "" && proverMultiaddr == "" {
t.Log("Checking connected peers for prover...")
networkInfo := pubsub.GetNetworkInfo()
for _, info := range networkInfo.NetworkInfo {
if bytes.Equal(info.PeerId, []byte(proverPeerID)) && len(info.Multiaddrs) > 0 {
proverMultiaddr = info.Multiaddrs[0]
t.Logf("Found prover in connected peers")
break
}
}
}
// Final fallback - direct lookup using peer ID
if proverPeerID != "" && proverMultiaddr == "" {
t.Logf("Attempting direct peer lookup...")
proverMultiaddr = pubsub.GetMultiaddrOfPeer([]byte(proverPeerID))
}
if proverPeerID == "" {
t.Skip("Could not find prover key registry - prover may not have broadcast key info yet")
}
if proverMultiaddr == "" {
t.Skip("Could not find prover multiaddr - prover may not have broadcast peer info yet")
}
t.Logf("Prover multiaddr: %s", proverMultiaddr)
// Connect to the prover using direct gRPC connection via multiaddr
t.Log("Connecting to prover for hypersync...")
// Create TLS credentials for the connection
creds, err := p2p.NewPeerAuthenticator(
logger,
p2pConfig,
nil,
nil,
nil,
nil,
[][]byte{[]byte(proverPeerID)},
map[string]channel.AllowedPeerPolicyType{},
map[string]channel.AllowedPeerPolicyType{},
).CreateClientTLSCredentials([]byte(proverPeerID))
if err != nil {
t.Skipf("Could not create TLS credentials: %v", err)
}
// Parse the multiaddr and convert to network address
ma, err := multiaddr.StringCast(proverMultiaddr)
if err != nil {
t.Skipf("Could not parse multiaddr %s: %v", proverMultiaddr, err)
}
mga, err := mn.ToNetAddr(ma)
if err != nil {
t.Skipf("Could not convert multiaddr to net addr: %v", err)
}
// Create gRPC client connection
conn, err := grpc.NewClient(
mga.String(),
grpc.WithTransportCredentials(creds),
)
if err != nil {
t.Skipf("Could not establish connection to prover: %v", err)
}
defer conn.Close()
client := protobufs.NewHypergraphComparisonServiceClient(conn)
// Perform hypersync on all phases
t.Log("Performing hypersync on prover shard...")
phases := []protobufs.HypergraphPhaseSet{
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
}
for _, phase := range phases {
stream, err := client.PerformSync(context.Background())
if err != nil {
t.Logf("PerformSync error: %v", err)
continue
}
_, err = clientHG.SyncFrom(stream, proverShardKey, phase, expectedRoot)
if err != nil {
t.Logf("SyncFrom error for phase %v: %v", phase, err)
}
_ = stream.CloseSend()
}
// Commit client to compute root
_, err = clientHG.Commit(uint64(receivedFrame.Header.FrameNumber))
require.NoError(t, err)
// Verify client now has the expected prover root
clientProverRoot := clientHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(false)
t.Logf("Client prover root after sync: %x", clientProverRoot)
t.Logf("Expected prover root from frame: %x", expectedRoot)
assert.Equal(t, expectedRoot, clientProverRoot,
"client prover root should match frame's prover tree commitment after hypersync")
// Count vertices synced
clientTree := clientHG.GetVertexAddsSet(proverShardKey).GetTree()
clientLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
clientLeafCount := 0
for _, leaf := range clientLeaves {
if leaf != nil {
clientLeafCount++
}
}
t.Logf("Hypersync complete: client synced %d prover vertices", clientLeafCount)
assert.Greater(t, clientLeafCount, 0, "should have synced at least some prover vertices")
}
// TestHypergraphSyncWithPagination tests that syncing a large tree with >1000 leaves
// correctly handles pagination through multiple GetLeaves requests.
func TestHypergraphSyncWithPagination(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create 1500 data trees to exceed the 1000 leaf batch size
numVertices := 1500
dataTrees := make([]*tries.VectorCommitmentTree, numVertices)
eg := errgroup.Group{}
eg.SetLimit(100)
for i := 0; i < numVertices; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
})
}
eg.Wait()
t.Log("Generated data trees")
// Create server DB and store
serverDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_server/store"}, 0)
defer serverDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_server/store"},
serverDB,
logger,
enc,
inclusionProver,
)
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create client DB and store
clientDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_client/store"}, 0)
defer clientDB.Close()
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_client/store"},
clientDB,
logger,
enc,
inclusionProver,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "client")),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create all vertices in a single domain
domain := randomBytes32(t)
vertices := make([]application.Vertex, numVertices)
for i := 0; i < numVertices; i++ {
vertices[i] = hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
shardKey := application.GetShardKey(vertices[0])
// Add all vertices to server
t.Logf("Adding %d vertices to server", numVertices)
serverTxn, err := serverStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range vertices {
id := v.GetID()
require.NoError(t, serverStore.SaveVertexTree(serverTxn, id[:], dataTrees[i]))
require.NoError(t, serverHG.AddVertex(serverTxn, v))
}
require.NoError(t, serverTxn.Commit())
// Add initial vertex to client (to establish same shard key)
clientTxn, err := clientStore.NewTransaction(false)
require.NoError(t, err)
id := vertices[0].GetID()
require.NoError(t, clientStore.SaveVertexTree(clientTxn, id[:], dataTrees[0]))
require.NoError(t, clientHG.AddVertex(clientTxn, vertices[0]))
require.NoError(t, clientTxn.Commit())
// Commit both
_, err = serverHG.Commit(1)
require.NoError(t, err)
_, err = clientHG.Commit(1)
require.NoError(t, err)
serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
serverHG.PublishSnapshot(serverRoot)
t.Logf("Server root: %x", serverRoot)
// Verify server has 1500 vertices
serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree()
serverLeaves := tries.GetAllLeaves(
serverTree.SetType,
serverTree.PhaseType,
serverTree.ShardKey,
serverTree.Root,
)
serverLeafCount := 0
for _, leaf := range serverLeaves {
if leaf != nil {
serverLeafCount++
}
}
assert.Equal(t, numVertices, serverLeafCount, "server should have %d leaves", numVertices)
t.Logf("Server has %d leaves", serverLeafCount)
// Setup gRPC server
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
defer conn.Close()
client := protobufs.NewHypergraphComparisonServiceClient(conn)
// Perform sync
t.Log("Starting sync with pagination...")
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client and verify
_, err = clientHG.Commit(2)
require.NoError(t, err)
clientRoot := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Client root after sync: %x", clientRoot)
// Verify client now has all 1500 vertices
clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree()
clientLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
clientLeafCount := 0
for _, leaf := range clientLeaves {
if leaf != nil {
clientLeafCount++
}
}
assert.Equal(t, numVertices, clientLeafCount, "client should have %d leaves after sync", numVertices)
t.Logf("Client has %d leaves after sync", clientLeafCount)
// Verify roots match
assert.Equal(t, serverRoot, clientRoot, "client root should match server root after sync")
t.Log("Pagination test passed - client converged to server state")
}