ceremonyclient/node/rpc/hypergraph_sync_rpc_server_test.go
Cassandra Heart 12996487c3
v2.1.0.18 (#508)
* experiment: reject bad peer info messages

* v2.1.0.18 preview

* add tagged sync

* Add missing hypergraph changes

* small tweaks to sync

* allow local sync, use it for provers with workers

* missing file

* resolve build error

* resolve sync issue, remove raw sync

* resolve deletion promotion bug

* resolve sync abstraction leak from tree deletion changes

* rearrange prover sync

* remove pruning from sync

* restore removed sync flag

* fix: sync, event stream deadlock, heuristic scoring of better shards

* resolve hanging shutdown + pubsub proxy issue

* further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events

* fix: clean up rust ffi, background coverage events, and sync tweaks

* fix: linking issue for channel, connectivity test aggression, sync regression, join tests

* fix: disjoint sync, improper application of filter

* resolve sync/reel/validation deadlock

* adjust sync to handle no leaf edge cases, multi-path segment traversal

* use simpler sync

* faster, simpler sync with some debug extras

* migration to recalculate

* don't use batch

* square up the roots

* fix nil pointer

* fix: seniority calculation, sync race condition, migration

* make sync dumber

* fix: tree deletion issue

* fix: missing seniority merge request canonical serialization

* address issues from previous commit test

* stale workers should be cleared

* remove missing gap check

* rearrange collect, reduce sync logging noise

* fix: the disjoint leaf/branch sync case

* nuclear option on sync failures

* v2.1.0.18, finalized
2026-02-08 23:51:51 -06:00

4703 lines
148 KiB
Go

package rpc_test
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"crypto/sha512"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"math/big"
"net"
"os"
"slices"
"strings"
"sync"
"testing"
"time"
"github.com/cloudflare/circl/sign/ed448"
"github.com/iden3/go-iden3-crypto/poseidon"
pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
"source.quilibrium.com/quilibrium/monorepo/bls48581"
"source.quilibrium.com/quilibrium/monorepo/config"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
internal_grpc "source.quilibrium.com/quilibrium/monorepo/node/internal/grpc"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/store"
"source.quilibrium.com/quilibrium/monorepo/node/tests"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/channel"
application "source.quilibrium.com/quilibrium/monorepo/types/hypergraph"
tp2p "source.quilibrium.com/quilibrium/monorepo/types/p2p"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
crypto "source.quilibrium.com/quilibrium/monorepo/types/tries"
"source.quilibrium.com/quilibrium/monorepo/verenc"
)
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
type Operation struct {
Type string // "AddVertex", "RemoveVertex", "AddHyperedge", "RemoveHyperedge"
Vertex application.Vertex
Hyperedge application.Hyperedge
}
func TestHypergraphSyncServer(t *testing.T) {
numParties := 3
numOperations := 1000
log.Printf("Generating data")
enc := verenc.NewMPCitHVerifiableEncryptor(1)
pub, _, _ := ed448.GenerateKey(rand.Reader)
data1 := enc.Encrypt(make([]byte, 20), pub)
verenc1 := data1[0].Compress()
vertices1 := make([]application.Vertex, numOperations)
dataTree1 := &crypto.VectorCommitmentTree{}
logger, _ := zap.NewDevelopment()
inclusionProver := bls48581.NewKZGInclusionProver(logger)
for _, d := range []application.Encrypted{verenc1} {
dataBytes := d.ToBytes()
id := sha512.Sum512(dataBytes)
dataTree1.Insert(id[:], dataBytes, d.GetStatement(), big.NewInt(int64(len(data1)*55)))
}
dataTree1.Commit(inclusionProver, false)
for i := 0; i < numOperations; i++ {
b := make([]byte, 32)
rand.Read(b)
vertices1[i] = hgcrdt.NewVertex(
[32]byte{},
[32]byte(b),
dataTree1.Commit(inclusionProver, false),
dataTree1.GetSize(),
)
}
hyperedges := make([]application.Hyperedge, numOperations/10)
for i := 0; i < numOperations/10; i++ {
hyperedges[i] = hgcrdt.NewHyperedge(
[32]byte{},
[32]byte{0, 0, byte((i >> 8) / 256), byte(i / 256)},
)
for j := 0; j < 3; j++ {
n, _ := rand.Int(rand.Reader, big.NewInt(int64(len(vertices1))))
v := vertices1[n.Int64()]
hyperedges[i].AddExtrinsic(v)
}
}
shardKey := application.GetShardKey(vertices1[0])
operations1 := make([]Operation, numOperations)
operations2 := make([]Operation, numOperations)
for i := 0; i < numOperations; i++ {
operations1[i] = Operation{Type: "AddVertex", Vertex: vertices1[i]}
}
for i := 0; i < numOperations; i++ {
op, _ := rand.Int(rand.Reader, big.NewInt(2))
switch op.Int64() {
case 0:
e, _ := rand.Int(rand.Reader, big.NewInt(int64(len(hyperedges))))
operations2[i] = Operation{Type: "AddHyperedge", Hyperedge: hyperedges[e.Int64()]}
case 1:
e, _ := rand.Int(rand.Reader, big.NewInt(int64(len(hyperedges))))
operations2[i] = Operation{Type: "RemoveHyperedge", Hyperedge: hyperedges[e.Int64()]}
}
}
clientKvdb := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}}, 0)
serverKvdb := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}}, 0)
controlKvdb := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestcontrol/store"}}, 0)
clientHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestclient/store"},
clientKvdb,
logger,
enc,
inclusionProver,
)
serverHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestserver/store"},
serverKvdb,
logger,
enc,
inclusionProver,
)
controlHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestcontrol/store"},
controlKvdb,
logger,
enc,
inclusionProver,
)
crdts := make([]application.Hypergraph, numParties)
crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
servertxn, _ := serverHypergraphStore.NewTransaction(false)
clienttxn, _ := clientHypergraphStore.NewTransaction(false)
controltxn, _ := controlHypergraphStore.NewTransaction(false)
for i, op := range operations1 {
switch op.Type {
case "AddVertex":
{
id := op.Vertex.GetID()
serverHypergraphStore.SaveVertexTree(servertxn, id[:], dataTree1)
crdts[0].AddVertex(servertxn, op.Vertex)
}
{
if i%3 == 0 {
id := op.Vertex.GetID()
clientHypergraphStore.SaveVertexTree(clienttxn, id[:], dataTree1)
crdts[1].AddVertex(clienttxn, op.Vertex)
}
}
case "RemoveVertex":
crdts[0].RemoveVertex(nil, op.Vertex)
// case "AddHyperedge":
// fmt.Printf("server add hyperedge %v\n", time.Now())
// crdts[0].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// fmt.Printf("server remove hyperedge %v\n", time.Now())
// crdts[0].RemoveHyperedge(nil, op.Hyperedge)
}
}
servertxn.Commit()
clienttxn.Commit()
// Seed many orphan vertices that only exist on the client so pruning can
// remove them. We create enough orphans with varied addresses to trigger
// tree restructuring (node merges) when they get deleted during sync.
// This tests the fix for the FullPrefix bug in lazy_proof_tree.go Delete().
numOrphans := 200
orphanVertices := make([]application.Vertex, numOrphans)
orphanIDs := make([][64]byte, numOrphans)
orphanTxn, err := clientHypergraphStore.NewTransaction(false)
require.NoError(t, err)
for i := 0; i < numOrphans; i++ {
orphanData := make([]byte, 32)
_, _ = rand.Read(orphanData)
// Mix in the index to ensure varied distribution across tree branches
binary.BigEndian.PutUint32(orphanData[28:], uint32(i))
var orphanAddr [32]byte
copy(orphanAddr[:], orphanData)
orphanVertices[i] = hgcrdt.NewVertex(
vertices1[0].GetAppAddress(),
orphanAddr,
dataTree1.Commit(inclusionProver, false),
dataTree1.GetSize(),
)
orphanShard := application.GetShardKey(orphanVertices[i])
require.Equal(t, shardKey, orphanShard, "orphan vertex %d must share shard", i)
orphanIDs[i] = orphanVertices[i].GetID()
require.NoError(t, clientHypergraphStore.SaveVertexTree(orphanTxn, orphanIDs[i][:], dataTree1))
require.NoError(t, crdts[1].AddVertex(orphanTxn, orphanVertices[i]))
}
require.NoError(t, orphanTxn.Commit())
clientSet := crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey)
for i := 0; i < numOrphans; i++ {
require.True(t, clientSet.Has(orphanIDs[i]), "client must start with orphan leaf %d", i)
}
logger.Info("saved")
for _, op := range operations1 {
switch op.Type {
case "AddVertex":
crdts[2].AddVertex(controltxn, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(controltxn, op.Vertex)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
for _, op := range operations2 {
switch op.Type {
case "AddVertex":
crdts[2].AddVertex(controltxn, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(controltxn, op.Vertex)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
controltxn.Commit()
logger.Info("run commit server")
crdts[0].Commit(0)
logger.Info("run commit client")
crdts[1].Commit(0)
// crdts[2].Commit()
// err := serverHypergraphStore.SaveHypergraph(crdts[0])
// assert.NoError(t, err)
// err = clientHypergraphStore.SaveHypergraph(crdts[1])
// assert.NoError(t, err)
logger.Info("mark as complete")
serverHypergraphStore.MarkHypergraphAsComplete()
clientHypergraphStore.MarkHypergraphAsComplete()
logger.Info("load server")
log.Printf("Generated data")
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Server: failed to listen: %v", err)
}
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
if err != nil {
t.FailNow()
}
pub := privKey.GetPublic()
peerId, err := peer.IDFromPublicKey(pub)
if err != nil {
t.FailNow()
}
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(
ss.Context(),
peerId,
),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
crdts[0],
)
defer grpcServer.Stop()
log.Println("Server listening on :50051")
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Server: failed to serve: %v", err)
}
}()
conn, err := grpc.DialContext(context.TODO(), "localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
if err != nil {
log.Fatalf("Client: failed to listen: %v", err)
}
client := protobufs.NewHypergraphComparisonServiceClient(conn)
str, err := client.PerformSync(context.TODO())
if err != nil {
log.Fatalf("Client: failed to stream: %v", err)
}
_, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
if err != nil {
log.Fatalf("Client: failed to sync 1: %v", err)
}
str.CloseSend()
// Verify all orphan vertices were pruned after sync
for i := 0; i < numOrphans; i++ {
require.False(t, clientSet.Has(orphanIDs[i]), "orphan vertex %d should be pruned after sync", i)
}
leaves := crypto.CompareLeaves(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
)
fmt.Println("pass completed, orphans:", len(leaves))
// Ensure every leaf received during raw sync lies within the covered prefix path.
clientTree := crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree()
coveredPrefixPath := clientTree.CoveredPrefix
if len(coveredPrefixPath) == 0 {
coveredPrefixPath = tries.GetFullPath(orphanIDs[0][:])[:0]
}
allLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
for _, leaf := range allLeaves {
if leaf == nil {
continue
}
if len(coveredPrefixPath) > 0 {
require.True(
t,
isPrefix(coveredPrefixPath, tries.GetFullPath(leaf.Key)),
"raw sync leaf outside covered prefix",
)
}
}
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
str, err = client.PerformSync(context.TODO())
if err != nil {
log.Fatalf("Client: failed to stream: %v", err)
}
_, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
if err != nil {
log.Fatalf("Client: failed to sync 2: %v", err)
}
str.CloseSend()
if !bytes.Equal(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
) {
leaves := crypto.CompareLeaves(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
)
fmt.Println("remaining orphans", len(leaves))
log.Fatalf(
"trees mismatch: %v %v",
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
)
}
if !bytes.Equal(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
crdts[2].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
) {
log.Fatalf(
"trees did not converge to correct state: %v %v",
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
crdts[2].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
)
}
}
func TestHypergraphPartialSync(t *testing.T) {
numParties := 3
numOperations := 1000
log.Printf("Generating data")
enc := verenc.NewMPCitHVerifiableEncryptor(1)
pub, _, _ := ed448.GenerateKey(rand.Reader)
data1 := enc.Encrypt(make([]byte, 20), pub)
verenc1 := data1[0].Compress()
vertices1 := make([]application.Vertex, numOperations)
dataTree1 := &crypto.VectorCommitmentTree{}
logger, _ := zap.NewDevelopment()
inclusionProver := bls48581.NewKZGInclusionProver(logger)
domain := make([]byte, 32)
rand.Read(domain)
domainbi, _ := poseidon.HashBytes(domain)
domain = domainbi.FillBytes(make([]byte, 32))
for _, d := range []application.Encrypted{verenc1} {
dataBytes := d.ToBytes()
id := sha512.Sum512(dataBytes)
dataTree1.Insert(id[:], dataBytes, d.GetStatement(), big.NewInt(int64(len(data1)*55)))
}
dataTree1.Commit(inclusionProver, false)
for i := 0; i < numOperations; i++ {
b := make([]byte, 32)
rand.Read(b)
addr, _ := poseidon.HashBytes(b)
vertices1[i] = hgcrdt.NewVertex(
[32]byte(domain),
[32]byte(addr.FillBytes(make([]byte, 32))),
dataTree1.Commit(inclusionProver, false),
dataTree1.GetSize(),
)
}
hyperedges := make([]application.Hyperedge, numOperations/10)
for i := 0; i < numOperations/10; i++ {
hyperedges[i] = hgcrdt.NewHyperedge(
[32]byte(domain),
[32]byte{0, 0, byte((i >> 8) / 256), byte(i / 256)},
)
for j := 0; j < 3; j++ {
n, _ := rand.Int(rand.Reader, big.NewInt(int64(len(vertices1))))
v := vertices1[n.Int64()]
hyperedges[i].AddExtrinsic(v)
}
}
shardKey := application.GetShardKey(vertices1[0])
operations1 := make([]Operation, numOperations)
operations2 := make([]Operation, numOperations)
for i := 0; i < numOperations; i++ {
operations1[i] = Operation{Type: "AddVertex", Vertex: vertices1[i]}
}
for i := 0; i < numOperations; i++ {
op, _ := rand.Int(rand.Reader, big.NewInt(2))
switch op.Int64() {
case 0:
e, _ := rand.Int(rand.Reader, big.NewInt(int64(len(hyperedges))))
operations2[i] = Operation{Type: "AddHyperedge", Hyperedge: hyperedges[e.Int64()]}
case 1:
e, _ := rand.Int(rand.Reader, big.NewInt(int64(len(hyperedges))))
operations2[i] = Operation{Type: "RemoveHyperedge", Hyperedge: hyperedges[e.Int64()]}
}
}
clientKvdb := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}}, 0)
serverKvdb := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}}, 0)
controlKvdb := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestcontrol/store"}}, 0)
clientHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestclient/store"},
clientKvdb,
logger,
enc,
inclusionProver,
)
serverHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestserver/store"},
serverKvdb,
logger,
enc,
inclusionProver,
)
controlHypergraphStore := store.NewPebbleHypergraphStore(
&config.DBConfig{Path: ".configtestcontrol/store"},
controlKvdb,
logger,
enc,
inclusionProver,
)
crdts := make([]application.Hypergraph, numParties)
crdts[0] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "server")), serverHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
crdts[2] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "control")), controlHypergraphStore, inclusionProver, []int{}, &tests.Nopthenticator{}, 200)
servertxn, _ := serverHypergraphStore.NewTransaction(false)
controltxn, _ := controlHypergraphStore.NewTransaction(false)
branchfork := []int32{}
for i, op := range operations1 {
switch op.Type {
case "AddVertex":
{
id := op.Vertex.GetID()
serverHypergraphStore.SaveVertexTree(servertxn, id[:], dataTree1)
crdts[0].AddVertex(servertxn, op.Vertex)
}
{
if i == 500 {
id := op.Vertex.GetID()
// Grab the first path of the data address, should get 1/64th ish
branchfork = GetFullPath(id[:])[:44]
}
}
case "RemoveVertex":
crdts[0].RemoveVertex(nil, op.Vertex)
// case "AddHyperedge":
// fmt.Printf("server add hyperedge %v\n", time.Now())
// crdts[0].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// fmt.Printf("server remove hyperedge %v\n", time.Now())
// crdts[0].RemoveHyperedge(nil, op.Hyperedge)
}
}
servertxn.Commit()
crdts[1] = hgcrdt.NewHypergraph(logger.With(zap.String("side", "client")), clientHypergraphStore, inclusionProver, toIntSlice(toUint32Slice(branchfork)), &tests.Nopthenticator{}, 200)
logger.Info("saved")
for _, op := range operations1 {
switch op.Type {
case "AddVertex":
crdts[2].AddVertex(controltxn, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(controltxn, op.Vertex)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
for _, op := range operations2 {
switch op.Type {
case "AddVertex":
crdts[2].AddVertex(controltxn, op.Vertex)
case "RemoveVertex":
crdts[2].RemoveVertex(controltxn, op.Vertex)
// case "AddHyperedge":
// crdts[2].AddHyperedge(nil, op.Hyperedge)
// case "RemoveHyperedge":
// crdts[2].RemoveHyperedge(nil, op.Hyperedge)
}
}
controltxn.Commit()
logger.Info("run commit server")
crdts[0].Commit(1)
logger.Info("run commit client")
crdts[1].Commit(1)
// crdts[2].Commit()
// err := serverHypergraphStore.SaveHypergraph(crdts[0])
// assert.NoError(t, err)
// err = clientHypergraphStore.SaveHypergraph(crdts[1])
// assert.NoError(t, err)
logger.Info("load server")
log.Printf("Generated data")
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Server: failed to listen: %v", err)
}
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
if err != nil {
t.FailNow()
}
pub := privKey.GetPublic()
peerId, err := peer.IDFromPublicKey(pub)
if err != nil {
t.FailNow()
}
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(
ss.Context(),
peerId,
),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
crdts[0],
)
log.Println("Server listening on :50051")
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Server: failed to serve: %v", err)
}
}()
conn, err := grpc.DialContext(context.TODO(), "localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
if err != nil {
log.Fatalf("Client: failed to listen: %v", err)
}
client := protobufs.NewHypergraphComparisonServiceClient(conn)
str, err := client.PerformSync(context.TODO())
if err != nil {
log.Fatalf("Client: failed to stream: %v", err)
}
_, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
if err != nil {
log.Fatalf("Client: failed to sync 1: %v", err)
}
str.CloseSend()
leaves := crypto.CompareLeaves(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
)
fmt.Println("pass completed, orphans:", len(leaves))
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
str, err = client.PerformSync(context.TODO())
if err != nil {
log.Fatalf("Client: failed to stream: %v", err)
}
_, err = crdts[1].(*hgcrdt.HypergraphCRDT).SyncFrom(str, shardKey, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, nil)
if err != nil {
log.Fatalf("Client: failed to sync 2: %v", err)
}
str.CloseSend()
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
desc, err := crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().GetByPath(toIntSlice(toUint32Slice(branchfork)))
require.NoError(t, err)
if !bytes.Equal(
desc.(*crypto.LazyVectorCommitmentBranchNode).Commitment,
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree().Commit(nil, false),
) {
leaves := crypto.CompareLeaves(
crdts[0].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
crdts[1].(*hgcrdt.HypergraphCRDT).GetVertexAddsSet(shardKey).GetTree(),
)
fmt.Println("remaining orphans", len(leaves))
}
clientHas := 0
iter, err := clientHypergraphStore.GetVertexDataIterator(shardKey)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
clientHas++
}
// Assume variable distribution, but roughly triple is a safe guess. If it fails, just bump it.
assert.Greater(t, 40, clientHas, "mismatching vertex data entries")
// assert.Greater(t, clientHas, 1, "mismatching vertex data entries")
}
func TestHypergraphSyncWithConcurrentCommits(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
logDuration := func(step string, start time.Time) {
t.Logf("%s took %s", step, time.Since(start))
}
start := time.Now()
dataTrees := make([]*tries.VectorCommitmentTree, 1000)
eg := errgroup.Group{}
eg.SetLimit(1000)
for i := 0; i < 1000; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
})
}
eg.Wait()
logDuration("generated data trees", start)
setupStart := time.Now()
serverDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}}, 0)
defer serverDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"},
serverDB,
logger,
enc,
inclusionProver,
)
logDuration("server DB/store initialization", setupStart)
const clientCount = 8
clientDBs := make([]*store.PebbleDB, clientCount)
clientStores := make([]*store.PebbleHypergraphStore, clientCount)
clientHGs := make([]*hgcrdt.HypergraphCRDT, clientCount)
serverHypergraphStart := time.Now()
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
logDuration("server hypergraph initialization", serverHypergraphStart)
clientSetupStart := time.Now()
for i := 0; i < clientCount; i++ {
clientDBs[i] = store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestclient%d/store", i)}}, 0)
clientStores[i] = store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestclient%d/store", i)},
clientDBs[i],
logger,
enc,
inclusionProver,
)
clientHGs[i] = hgcrdt.NewHypergraph(
logger.With(zap.String("side", fmt.Sprintf("client-%d", i))),
clientStores[i],
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
}
logDuration("client hypergraph initialization", clientSetupStart)
defer func() {
for _, db := range clientDBs {
if db != nil {
db.Close()
}
}
}()
// Seed both hypergraphs with a baseline vertex so they share the shard key.
domain := randomBytes32(t)
initialVertex := hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[0].Commit(inclusionProver, false),
dataTrees[0].GetSize(),
)
seedStart := time.Now()
addVertices(
t,
serverStore,
serverHG,
dataTrees[:1],
initialVertex,
)
logDuration("seed server baseline vertex", seedStart)
for i := 0; i < clientCount; i++ {
start := time.Now()
addVertices(
t,
clientStores[i],
clientHGs[i],
dataTrees[:1],
initialVertex,
)
logDuration(fmt.Sprintf("seed client-%d baseline vertex", i), start)
}
shardKey := application.GetShardKey(initialVertex)
// Start gRPC server backed by the server hypergraph.
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(
grpcServer,
serverHG,
)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialClient := func() (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Publish initial snapshot so clients can sync during the rounds
initialRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
serverHG.PublishSnapshot(initialRoot)
const rounds = 3
for round := 0; round < rounds; round++ {
currentRound := round
roundStart := time.Now()
c, _ := serverHG.Commit(uint64(currentRound))
fmt.Printf("svr commitment: %x\n", c[shardKey][0])
genStart := time.Now()
updates := generateVertices(
t,
domain,
dataTrees,
inclusionProver,
15,
1+(15*currentRound),
)
logDuration(fmt.Sprintf("round %d vertex generation", currentRound), genStart)
var syncWG sync.WaitGroup
var serverWG sync.WaitGroup
syncWG.Add(clientCount)
serverWG.Add(1)
for clientIdx := 0; clientIdx < clientCount; clientIdx++ {
go func(idx int, round int) {
defer syncWG.Done()
clientSyncStart := time.Now()
clientHG := clientHGs[idx]
conn, client := dialClient()
streamCtx, cancelStream := context.WithTimeout(
context.Background(),
100*time.Second,
)
stream, err := client.PerformSync(streamCtx)
require.NoError(t, err)
_, _ = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, stream.CloseSend())
cancelStream()
conn.Close()
c, _ := clientHGs[idx].Commit(uint64(round))
fmt.Printf("cli commitment: %x\n", c[shardKey][0])
logDuration(fmt.Sprintf("round %d client-%d sync", round, idx), clientSyncStart)
}(clientIdx, currentRound)
}
go func(round int) {
defer serverWG.Done()
serverRoundStart := time.Now()
logger.Info("server applying concurrent updates", zap.Int("round", round))
addVertices(t, serverStore, serverHG, dataTrees[1+(15*round):1+(15*(round+1))], updates...)
logger.Info(
"server applied concurrent updates",
zap.Int("round", round),
zap.Duration("duration", time.Since(serverRoundStart)),
)
logger.Info("server commit starting", zap.Int("round", round))
_, err := serverHG.Commit(uint64(round + 1))
require.NoError(t, err)
logger.Info("server commit finished", zap.Int("round", round))
}(round)
syncWG.Wait()
serverWG.Wait()
logDuration(fmt.Sprintf("round %d total sync", currentRound), roundStart)
}
// Add additional server-only updates after the concurrent sync rounds.
extraStart := time.Now()
extraUpdates := generateVertices(t, domain, dataTrees, inclusionProver, len(dataTrees)-(1+(15*rounds))-1, 1+(15*rounds))
addVertices(t, serverStore, serverHG, dataTrees[1+(15*rounds):], extraUpdates...)
logDuration("server extra updates application", extraStart)
commitStart := time.Now()
_, err := serverHG.Commit(100)
require.NoError(t, err)
_, err = serverHG.Commit(101)
require.NoError(t, err)
logDuration("server final commits", commitStart)
wg := sync.WaitGroup{}
wg.Add(1)
serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
// Publish the server's snapshot so clients can sync against this exact state
serverHG.PublishSnapshot(serverRoot)
// Create a snapshot handle for this shard by doing a sync.
// This is needed because the snapshot manager only creates handles when acquire
// is called.
{
conn, client := dialClient()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, _ = clientHGs[0].SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
_ = stream.CloseSend()
conn.Close()
}
for i := 0; i < 1; i++ {
go func(idx int) {
defer wg.Done()
catchUpStart := time.Now()
_, err = clientHGs[idx].Commit(100)
require.NoError(t, err)
// Final sync to catch up.
conn, client := dialClient()
streamCtx, cancelStream := context.WithTimeout(
context.Background(),
100*time.Second,
)
stream, err := client.PerformSync(streamCtx)
require.NoError(t, err)
_, err = clientHGs[idx].SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
cancelStream()
conn.Close()
_, err = clientHGs[idx].Commit(101)
require.NoError(t, err)
clientRoot := clientHGs[idx].GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
assert.Equal(t, serverRoot, clientRoot, "client should converge to server state")
logDuration(fmt.Sprintf("client-%d final catch-up", idx), catchUpStart)
}(i)
}
wg.Wait()
}
func buildDataTree(
t *testing.T,
prover *bls48581.KZGInclusionProver,
) *crypto.VectorCommitmentTree {
t.Helper()
tree := &crypto.VectorCommitmentTree{}
b := make([]byte, 20000)
rand.Read(b)
for bytes := range slices.Chunk(b, 64) {
id := sha512.Sum512(bytes)
tree.Insert(id[:], bytes, nil, big.NewInt(int64(len(bytes))))
}
tree.Commit(prover, false)
return tree
}
func addVertices(
t *testing.T,
hStore *store.PebbleHypergraphStore,
hg *hgcrdt.HypergraphCRDT,
dataTrees []*crypto.VectorCommitmentTree,
vertices ...application.Vertex,
) {
t.Helper()
txn, err := hStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range vertices {
id := v.GetID()
require.NoError(t, hStore.SaveVertexTree(txn, id[:], dataTrees[i]))
require.NoError(t, hg.AddVertex(txn, v))
}
require.NoError(t, txn.Commit())
}
func generateVertices(
t *testing.T,
appAddress [32]byte,
dataTrees []*crypto.VectorCommitmentTree,
prover *bls48581.KZGInclusionProver,
count int,
startingIndex int,
) []application.Vertex {
t.Helper()
verts := make([]application.Vertex, count)
for i := 0; i < count; i++ {
addr := randomBytes32(t)
binary.BigEndian.PutUint64(addr[:], uint64(i))
verts[i] = hgcrdt.NewVertex(
appAddress,
addr,
dataTrees[startingIndex+i].Commit(prover, false),
dataTrees[startingIndex+i].GetSize(),
)
}
return verts
}
func randomBytes32(t *testing.T) [32]byte {
t.Helper()
var out [32]byte
_, err := rand.Read(out[:])
require.NoError(t, err)
return out
}
func toUint32Slice(s []int32) []uint32 {
o := []uint32{}
for _, p := range s {
o = append(o, uint32(p))
}
return o
}
func toIntSlice(s []uint32) []int {
o := []int{}
for _, p := range s {
o = append(o, int(p))
}
return o
}
func isPrefix(prefix []int, path []int) bool {
if len(prefix) > len(path) {
return false
}
for i := range prefix {
if prefix[i] != path[i] {
return false
}
}
return true
}
func GetFullPath(key []byte) []int32 {
var nibbles []int32
depth := 0
for {
n1 := getNextNibble(key, depth)
if n1 == -1 {
break
}
nibbles = append(nibbles, n1)
depth += tries.BranchBits
}
return nibbles
}
// getNextNibble returns the next BranchBits bits from the key starting at pos
func getNextNibble(key []byte, pos int) int32 {
startByte := pos / 8
if startByte >= len(key) {
return -1
}
// Calculate how many bits we need from the current byte
startBit := pos % 8
bitsFromCurrentByte := 8 - startBit
result := int(key[startByte] & ((1 << bitsFromCurrentByte) - 1))
if bitsFromCurrentByte >= tries.BranchBits {
// We have enough bits in the current byte
return int32((result >> (bitsFromCurrentByte - tries.BranchBits)) &
tries.BranchMask)
}
// We need bits from the next byte
result = result << (tries.BranchBits - bitsFromCurrentByte)
if startByte+1 < len(key) {
remainingBits := tries.BranchBits - bitsFromCurrentByte
nextByte := int(key[startByte+1])
result |= (nextByte >> (8 - remainingBits))
}
return int32(result & tries.BranchMask)
}
// TestHypergraphSyncWithExpectedRoot tests that clients can request sync
// against a specific snapshot generation by providing an expected root.
// The server should use a matching historical snapshot if available.
func TestHypergraphSyncWithExpectedRoot(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create data trees for vertices
dataTrees := make([]*tries.VectorCommitmentTree, 100)
for i := 0; i < 100; i++ {
dataTrees[i] = buildDataTree(t, inclusionProver)
}
serverDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}}, 0)
defer serverDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"},
serverDB,
logger,
enc,
inclusionProver,
)
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create initial vertex to establish shard key
domain := randomBytes32(t)
initialVertex := hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[0].Commit(inclusionProver, false),
dataTrees[0].GetSize(),
)
shardKey := application.GetShardKey(initialVertex)
// Phase 1: Add initial vertices to server and commit
phase1Vertices := make([]application.Vertex, 20)
phase1Vertices[0] = initialVertex
for i := 1; i < 20; i++ {
phase1Vertices[i] = hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
addVertices(t, serverStore, serverHG, dataTrees[:20], phase1Vertices...)
// Commit to get root1
commitResult1, err := serverHG.Commit(1)
require.NoError(t, err)
root1 := commitResult1[shardKey][0]
t.Logf("Root after phase 1: %x", root1)
// Publish root1 as the current snapshot generation
serverHG.PublishSnapshot(root1)
// Start gRPC server early so we can create a snapshot while root1 is current
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialClient := func() (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Helper to create a fresh client hypergraph
clientCounter := 0
createClient := func(name string) (*store.PebbleDB, *hgcrdt.HypergraphCRDT) {
clientCounter++
clientDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestclient%d/store", clientCounter)}}, 0)
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestclient%d/store", clientCounter)},
clientDB,
logger,
enc,
inclusionProver,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", name)),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
return clientDB, clientHG
}
// IMPORTANT: Create a snapshot while root1 is current by doing a sync now.
// This snapshot will be preserved when we later publish root2.
t.Log("Creating snapshot for root1 by syncing a client while root1 is current")
{
clientDB, clientHG := createClient("client-snapshot-root1")
conn, client := dialClient()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Verify this client got root1
clientCommit, err := clientHG.Commit(1)
require.NoError(t, err)
require.Equal(t, root1, clientCommit[shardKey][0], "snapshot client should have root1")
conn.Close()
clientDB.Close()
}
// Phase 2: Add more vertices to server and commit
phase2Vertices := make([]application.Vertex, 30)
for i := 0; i < 30; i++ {
phase2Vertices[i] = hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[20+i].Commit(inclusionProver, false),
dataTrees[20+i].GetSize(),
)
}
addVertices(t, serverStore, serverHG, dataTrees[20:50], phase2Vertices...)
// Commit to get root2
commitResult2, err := serverHG.Commit(2)
require.NoError(t, err)
root2 := commitResult2[shardKey][0]
t.Logf("Root after phase 2: %x", root2)
// Publish root2 as the new current snapshot generation
// This preserves the root1 generation (with its snapshot) as a historical generation
serverHG.PublishSnapshot(root2)
// Verify roots are different
require.NotEqual(t, root1, root2, "roots should be different after adding more data")
// Test 1: Sync gets latest state
t.Run("sync gets latest", func(t *testing.T) {
clientDB, clientHG := createClient("client1")
defer clientDB.Close()
conn, client := dialClient()
defer conn.Close()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client to get comparable root
clientCommit, err := clientHG.Commit(1)
require.NoError(t, err)
clientRoot := clientCommit[shardKey][0]
// Client should have synced to the latest (root2)
assert.Equal(t, root2, clientRoot, "client should sync to latest root")
})
// Test 2: Multiple syncs converge to same state
t.Run("multiple syncs converge", func(t *testing.T) {
clientDB, clientHG := createClient("client2")
defer clientDB.Close()
conn, client := dialClient()
defer conn.Close()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client to get comparable root
clientCommit, err := clientHG.Commit(1)
require.NoError(t, err)
clientRoot := clientCommit[shardKey][0]
// Client should have synced to the latest (root2)
assert.Equal(t, root2, clientRoot, "client should sync to latest root")
})
}
// TestHypergraphSyncWithModifiedEntries tests sync behavior when both client
// and server have the same keys but with different values (modified entries).
// This verifies that sync correctly updates entries rather than just adding
// new ones or deleting orphans.
func TestHypergraphSyncWithModifiedEntries(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create enough data trees for all vertices we'll need
numVertices := 50
dataTrees := make([]*tries.VectorCommitmentTree, numVertices*2) // Extra for modified versions
for i := 0; i < len(dataTrees); i++ {
dataTrees[i] = buildDataTree(t, inclusionProver)
}
// Create server and client databases
serverDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}}, 0)
defer serverDB.Close()
clientDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}}, 0)
defer clientDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"},
serverDB,
logger,
enc,
inclusionProver,
)
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"},
clientDB,
logger,
enc,
inclusionProver,
)
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "client")),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create a shared domain for all vertices
domain := randomBytes32(t)
// Generate fixed addresses that will be used by both client and server
// This ensures they share the same keys
addresses := make([][32]byte, numVertices)
for i := 0; i < numVertices; i++ {
addresses[i] = randomBytes32(t)
}
// Create "original" vertices for the client (using first set of data trees)
clientVertices := make([]application.Vertex, numVertices)
for i := 0; i < numVertices; i++ {
clientVertices[i] = hgcrdt.NewVertex(
domain,
addresses[i], // Same address
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
// Create "modified" vertices for the server (using second set of data trees)
// These have the SAME addresses but DIFFERENT data commitments
serverVertices := make([]application.Vertex, numVertices)
for i := 0; i < numVertices; i++ {
serverVertices[i] = hgcrdt.NewVertex(
domain,
addresses[i], // Same address as client
dataTrees[numVertices+i].Commit(inclusionProver, false), // Different data
dataTrees[numVertices+i].GetSize(),
)
}
shardKey := application.GetShardKey(clientVertices[0])
// Add original vertices to client
t.Log("Adding original vertices to client")
clientTxn, err := clientStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range clientVertices {
id := v.GetID()
require.NoError(t, clientStore.SaveVertexTree(clientTxn, id[:], dataTrees[i]))
require.NoError(t, clientHG.AddVertex(clientTxn, v))
}
require.NoError(t, clientTxn.Commit())
// Add modified vertices to server
t.Log("Adding modified vertices to server")
serverTxn, err := serverStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range serverVertices {
id := v.GetID()
require.NoError(t, serverStore.SaveVertexTree(serverTxn, id[:], dataTrees[numVertices+i]))
require.NoError(t, serverHG.AddVertex(serverTxn, v))
}
require.NoError(t, serverTxn.Commit())
// Commit both hypergraphs
_, err = clientHG.Commit(1)
require.NoError(t, err)
_, err = serverHG.Commit(1)
require.NoError(t, err)
// Verify roots are different before sync (modified entries should cause different roots)
clientRootBefore := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
require.NotEqual(t, clientRootBefore, serverRoot, "roots should differ before sync due to modified entries")
t.Logf("Client root before sync: %x", clientRootBefore)
t.Logf("Server root: %x", serverRoot)
// Publish server snapshot
serverHG.PublishSnapshot(serverRoot)
// Start gRPC server
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialClient := func() (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Perform sync
t.Log("Performing sync to update modified entries")
conn, client := dialClient()
defer conn.Close()
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client after sync
_, err = clientHG.Commit(2)
require.NoError(t, err)
// Verify client now matches server
clientRootAfter := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Client root after sync: %x", clientRootAfter)
assert.Equal(t, serverRoot, clientRootAfter, "client should converge to server state after sync with modified entries")
// Verify all entries were updated by comparing the leaves
serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree()
clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree()
diffLeaves := tries.CompareLeaves(serverTree, clientTree)
assert.Empty(t, diffLeaves, "there should be no difference in leaves after sync")
t.Logf("Sync completed successfully - %d entries with same keys but different values were updated", numVertices)
}
// TestHypergraphBidirectionalSyncWithDisjointData tests that when node A has 500
// unique vertices and node B has 500 different unique vertices, syncing in both
// directions results in both nodes having all 1000 vertices.
func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create data trees for all 1000 vertices
numVerticesPerNode := 500
totalVertices := numVerticesPerNode * 2
dataTrees := make([]*tries.VectorCommitmentTree, totalVertices)
eg := errgroup.Group{}
eg.SetLimit(100)
for i := 0; i < totalVertices; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
})
}
eg.Wait()
t.Log("Generated data trees")
// Create databases and stores for both nodes
nodeADB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA/store"}}, 0)
defer nodeADB.Close()
nodeBDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB/store"}}, 0)
defer nodeBDB.Close()
nodeAStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA/store"},
nodeADB,
logger,
enc,
inclusionProver,
)
nodeBStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB/store"},
nodeBDB,
logger,
enc,
inclusionProver,
)
nodeAHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeA")),
nodeAStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
nodeBHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeB")),
nodeBStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create a shared domain for all vertices
domain := randomBytes32(t)
// Generate vertices for node A (first 500)
nodeAVertices := make([]application.Vertex, numVerticesPerNode)
for i := 0; i < numVerticesPerNode; i++ {
addr := randomBytes32(t)
nodeAVertices[i] = hgcrdt.NewVertex(
domain,
addr,
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
// Generate vertices for node B (second 500, completely different)
nodeBVertices := make([]application.Vertex, numVerticesPerNode)
for i := 0; i < numVerticesPerNode; i++ {
addr := randomBytes32(t)
nodeBVertices[i] = hgcrdt.NewVertex(
domain,
addr,
dataTrees[numVerticesPerNode+i].Commit(inclusionProver, false),
dataTrees[numVerticesPerNode+i].GetSize(),
)
}
shardKey := application.GetShardKey(nodeAVertices[0])
// Add vertices to node A
t.Log("Adding 500 vertices to node A")
nodeATxn, err := nodeAStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeAVertices {
id := v.GetID()
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
}
require.NoError(t, nodeATxn.Commit())
// Add vertices to node B
t.Log("Adding 500 different vertices to node B")
nodeBTxn, err := nodeBStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeBVertices {
id := v.GetID()
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[numVerticesPerNode+i]))
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
}
require.NoError(t, nodeBTxn.Commit())
// Commit both hypergraphs
_, err = nodeAHG.Commit(1)
require.NoError(t, err)
_, err = nodeBHG.Commit(1)
require.NoError(t, err)
nodeARootBefore := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
nodeBRootBefore := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Node A root before sync: %x", nodeARootBefore)
t.Logf("Node B root before sync: %x", nodeBRootBefore)
require.NotEqual(t, nodeARootBefore, nodeBRootBefore, "roots should differ before sync")
// Helper to set up gRPC server for a hypergraph
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
go func() {
_ = grpcServer.Serve(lis)
}()
return lis, grpcServer
}
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Step 1: Node A syncs from Node B (as server)
// Node A should receive Node B's 500 vertices
t.Log("Step 1: Node A syncs from Node B (B is server)")
nodeBHG.PublishSnapshot(nodeBRootBefore)
lisB, serverB := setupServer(nodeBHG)
defer serverB.Stop()
connB, clientB := dialClient(lisB)
streamB, err := clientB.PerformSync(context.Background())
require.NoError(t, err)
_, err = nodeAHG.SyncFrom(
streamB,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, streamB.CloseSend())
connB.Close()
_, err = nodeAHG.Commit(2)
require.NoError(t, err)
nodeARootAfterFirstSync := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Node A root after syncing from B: %x", nodeARootAfterFirstSync)
// Step 2: Node B syncs from Node A (as server)
// Node B should receive Node A's 500 vertices
t.Log("Step 2: Node B syncs from Node A (A is server)")
nodeAHG.PublishSnapshot(nodeARootAfterFirstSync)
lisA, serverA := setupServer(nodeAHG)
defer serverA.Stop()
connA, clientA := dialClient(lisA)
streamA, err := clientA.PerformSync(context.Background())
require.NoError(t, err)
_, err = nodeBHG.SyncFrom(
streamA,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, streamA.CloseSend())
connA.Close()
_, err = nodeBHG.Commit(2)
require.NoError(t, err)
// Verify both nodes have converged
nodeARootFinal := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
nodeBRootFinal := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Node A final root: %x", nodeARootFinal)
t.Logf("Node B final root: %x", nodeBRootFinal)
assert.Equal(t, nodeARootFinal, nodeBRootFinal, "both nodes should have identical roots after bidirectional sync")
// Verify the tree contains all 1000 vertices
nodeATree := nodeAHG.GetVertexAddsSet(shardKey).GetTree()
nodeBTree := nodeBHG.GetVertexAddsSet(shardKey).GetTree()
nodeALeaves := tries.GetAllLeaves(
nodeATree.SetType,
nodeATree.PhaseType,
nodeATree.ShardKey,
nodeATree.Root,
)
nodeBLeaves := tries.GetAllLeaves(
nodeBTree.SetType,
nodeBTree.PhaseType,
nodeBTree.ShardKey,
nodeBTree.Root,
)
nodeALeafCount := 0
for _, leaf := range nodeALeaves {
if leaf != nil {
nodeALeafCount++
}
}
nodeBLeafCount := 0
for _, leaf := range nodeBLeaves {
if leaf != nil {
nodeBLeafCount++
}
}
t.Logf("Node A has %d leaves, Node B has %d leaves", nodeALeafCount, nodeBLeafCount)
assert.Equal(t, totalVertices, nodeALeafCount, "Node A should have all 1000 vertices")
assert.Equal(t, totalVertices, nodeBLeafCount, "Node B should have all 1000 vertices")
// Verify no differences between the trees
diffLeaves := tries.CompareLeaves(nodeATree, nodeBTree)
assert.Empty(t, diffLeaves, "there should be no differences between the trees")
t.Log("Bidirectional sync test passed - both nodes have all 1000 vertices")
}
// TestHypergraphBidirectionalSyncClientDriven tests the new client-driven sync
// protocol (PerformSync/SyncFrom) with two nodes having disjoint data sets.
// Node A has 500 unique vertices and node B has 500 different unique vertices.
// After syncing in both directions, both nodes should have all 1000 vertices.
func TestHypergraphBidirectionalSyncClientDriven(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create data trees for all 1000 vertices
numVerticesPerNode := 500
totalVertices := numVerticesPerNode * 2
dataTrees := make([]*tries.VectorCommitmentTree, totalVertices)
eg := errgroup.Group{}
eg.SetLimit(100)
for i := 0; i < totalVertices; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
})
}
eg.Wait()
t.Log("Generated data trees")
// Create databases and stores for both nodes
nodeADB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA_cd/store"}}, 0)
defer nodeADB.Close()
nodeBDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB_cd/store"}}, 0)
defer nodeBDB.Close()
nodeAStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA_cd/store"},
nodeADB,
logger,
enc,
inclusionProver,
)
nodeBStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB_cd/store"},
nodeBDB,
logger,
enc,
inclusionProver,
)
nodeAHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeA-cd")),
nodeAStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
nodeBHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeB-cd")),
nodeBStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create a shared domain for all vertices
domain := randomBytes32(t)
// Generate vertices for node A (first 500)
nodeAVertices := make([]application.Vertex, numVerticesPerNode)
for i := 0; i < numVerticesPerNode; i++ {
addr := randomBytes32(t)
nodeAVertices[i] = hgcrdt.NewVertex(
domain,
addr,
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
// Generate vertices for node B (second 500, completely different)
nodeBVertices := make([]application.Vertex, numVerticesPerNode)
for i := 0; i < numVerticesPerNode; i++ {
addr := randomBytes32(t)
nodeBVertices[i] = hgcrdt.NewVertex(
domain,
addr,
dataTrees[numVerticesPerNode+i].Commit(inclusionProver, false),
dataTrees[numVerticesPerNode+i].GetSize(),
)
}
shardKey := application.GetShardKey(nodeAVertices[0])
// Add vertices to node A
t.Log("Adding 500 vertices to node A")
nodeATxn, err := nodeAStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeAVertices {
id := v.GetID()
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
}
require.NoError(t, nodeATxn.Commit())
// Add vertices to node B
t.Log("Adding 500 different vertices to node B")
nodeBTxn, err := nodeBStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeBVertices {
id := v.GetID()
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[numVerticesPerNode+i]))
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
}
require.NoError(t, nodeBTxn.Commit())
// Commit both hypergraphs
_, err = nodeAHG.Commit(1)
require.NoError(t, err)
_, err = nodeBHG.Commit(1)
require.NoError(t, err)
nodeARootBefore := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
nodeBRootBefore := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Node A root before sync: %x", nodeARootBefore)
t.Logf("Node B root before sync: %x", nodeBRootBefore)
require.NotEqual(t, nodeARootBefore, nodeBRootBefore, "roots should differ before sync")
// Helper to set up gRPC server for a hypergraph
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
go func() {
_ = grpcServer.Serve(lis)
}()
return lis, grpcServer
}
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Convert tries.ShardKey to bytes for SyncFrom
shardKeyBytes := slices.Concat(shardKey.L1[:], shardKey.L2[:])
_ = shardKeyBytes // Used below in the SyncFrom call
// Step 1: Node A syncs from Node B (as server) using client-driven sync
// Node A should receive Node B's 500 vertices
t.Log("Step 1: Node A syncs from Node B using PerformSync (B is server)")
lisB, serverB := setupServer(nodeBHG)
defer serverB.Stop()
connB, clientB := dialClient(lisB)
streamB, err := clientB.PerformSync(context.Background())
require.NoError(t, err)
_, err = nodeAHG.SyncFrom(
streamB,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, streamB.CloseSend())
connB.Close()
_, err = nodeAHG.Commit(2)
require.NoError(t, err)
nodeARootAfterFirstSync := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Node A root after syncing from B: %x", nodeARootAfterFirstSync)
// Step 2: Node B syncs from Node A (as server) using client-driven sync
// Node B should receive Node A's 500 vertices
t.Log("Step 2: Node B syncs from Node A using PerformSync (A is server)")
lisA, serverA := setupServer(nodeAHG)
defer serverA.Stop()
connA, clientA := dialClient(lisA)
streamA, err := clientA.PerformSync(context.Background())
require.NoError(t, err)
_, err = nodeBHG.SyncFrom(
streamA,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, streamA.CloseSend())
connA.Close()
_, err = nodeBHG.Commit(2)
require.NoError(t, err)
// Verify both nodes have converged
nodeARootFinal := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
nodeBRootFinal := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Node A final root: %x", nodeARootFinal)
t.Logf("Node B final root: %x", nodeBRootFinal)
assert.Equal(t, nodeARootFinal, nodeBRootFinal, "both nodes should have identical roots after bidirectional sync")
// Verify the tree contains all 1000 vertices
nodeATree := nodeAHG.GetVertexAddsSet(shardKey).GetTree()
nodeBTree := nodeBHG.GetVertexAddsSet(shardKey).GetTree()
nodeALeaves := tries.GetAllLeaves(
nodeATree.SetType,
nodeATree.PhaseType,
nodeATree.ShardKey,
nodeATree.Root,
)
nodeBLeaves := tries.GetAllLeaves(
nodeBTree.SetType,
nodeBTree.PhaseType,
nodeBTree.ShardKey,
nodeBTree.Root,
)
nodeALeafCount := 0
for _, leaf := range nodeALeaves {
if leaf != nil {
nodeALeafCount++
}
}
nodeBLeafCount := 0
for _, leaf := range nodeBLeaves {
if leaf != nil {
nodeBLeafCount++
}
}
t.Logf("Node A has %d leaves, Node B has %d leaves", nodeALeafCount, nodeBLeafCount)
assert.Equal(t, totalVertices, nodeALeafCount, "Node A should have all 1000 vertices")
assert.Equal(t, totalVertices, nodeBLeafCount, "Node B should have all 1000 vertices")
// Verify no differences between the trees
diffLeaves := tries.CompareLeaves(nodeATree, nodeBTree)
assert.Empty(t, diffLeaves, "there should be no differences between the trees")
t.Log("Client-driven bidirectional sync test passed - both nodes have all 1000 vertices")
}
// TestHypergraphSyncWithPrefixLengthMismatch tests sync behavior when one node
// has a deeper tree structure (longer prefix path) than the other. This tests
// the prefix length mismatch handling in the walk function.
//
// We create two nodes with different tree structures that will cause prefix
// length mismatches during sync. Node A has deeper prefixes at certain branches
// while Node B has shallower but wider structures.
func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create data trees
numTrees := 20
dataTrees := make([]*tries.VectorCommitmentTree, numTrees)
for i := 0; i < numTrees; i++ {
dataTrees[i] = buildDataTree(t, inclusionProver)
}
// Fixed domain (appAddress) - all vertices must share this to be in the same shard
fixedDomain := [32]byte{}
// Helper to create a vertex with a specific dataAddress path suffix.
// The vertex ID is [appAddress (32 bytes) || dataAddress (32 bytes)].
// The path is derived from the full 64-byte ID.
// With BranchBits=6, nibbles 0-41 come from appAddress, nibbles 42+ from dataAddress.
// Since all vertices share the same appAddress, their paths share the first 42 nibbles.
// Path differences come from dataAddress (nibbles 42+).
//
// We control the "suffix path" starting at nibble 42 by setting bits in dataAddress.
createVertexWithDataPath := func(suffixPath []int, uniqueSuffix uint64, treeIdx int) application.Vertex {
dataAddr := [32]byte{}
// Pack the suffix path nibbles into bits of dataAddress
// Nibble 42 starts at bit 0 of dataAddress
bitPos := 0
for _, nibble := range suffixPath {
byteIdx := bitPos / 8
bitOffset := bitPos % 8
if bitOffset+6 <= 8 {
// Nibble fits in one byte
dataAddr[byteIdx] |= byte(nibble << (8 - bitOffset - 6))
} else {
// Nibble spans two bytes
bitsInFirstByte := 8 - bitOffset
dataAddr[byteIdx] |= byte(nibble >> (6 - bitsInFirstByte))
if byteIdx+1 < 32 {
dataAddr[byteIdx+1] |= byte(nibble << (8 - (6 - bitsInFirstByte)))
}
}
bitPos += 6
}
// Add unique suffix in the last 8 bytes to make each vertex distinct
binary.BigEndian.PutUint64(dataAddr[24:], uniqueSuffix)
return hgcrdt.NewVertex(
fixedDomain,
dataAddr,
dataTrees[treeIdx].Commit(inclusionProver, false),
dataTrees[treeIdx].GetSize(),
)
}
// Run the test in both directions
runSyncTest := func(direction string) {
t.Run(direction, func(t *testing.T) {
// Create fresh databases for this sub-test
nodeADB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeA_%s/store", direction)}}, 0)
defer nodeADB.Close()
nodeBDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeB_%s/store", direction)}}, 0)
defer nodeBDB.Close()
nodeAStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeA_%s/store", direction)},
nodeADB,
logger,
enc,
inclusionProver,
)
nodeBStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeB_%s/store", direction)},
nodeBDB,
logger,
enc,
inclusionProver,
)
nodeAHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeA-"+direction)),
nodeAStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
nodeBHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "nodeB-"+direction)),
nodeBStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create vertices with specific path structures to cause prefix mismatches.
// All vertices share the same appAddress (fixedDomain), so they're in the same shard.
// Their paths share the first 42 nibbles (all zeros from fixedDomain).
// Path differences come from dataAddress, starting at nibble 42.
//
// We create vertices with suffix paths (nibbles 42+) that differ:
// Node A: suffix paths 0,1,x and 0,2,x and 1,x
// Node B: suffix paths 0,0,x and 0,1,x and 0,3,x and 1,x
//
// This creates prefix mismatch scenarios in the dataAddress portion of the tree.
t.Log("Creating Node A structure")
nodeAVertices := []application.Vertex{
createVertexWithDataPath([]int{0, 1}, 100, 0), // suffix path 0,1,...
createVertexWithDataPath([]int{0, 2}, 101, 1), // suffix path 0,2,...
createVertexWithDataPath([]int{1}, 102, 2), // suffix path 1,...
}
t.Logf("Created Node A vertices with suffix paths: 0,1; 0,2; 1")
t.Log("Creating Node B structure")
nodeBVertices := []application.Vertex{
createVertexWithDataPath([]int{0, 0}, 200, 3), // suffix path 0,0,...
createVertexWithDataPath([]int{0, 1}, 201, 4), // suffix path 0,1,...
createVertexWithDataPath([]int{0, 3}, 202, 5), // suffix path 0,3,...
createVertexWithDataPath([]int{1}, 203, 6), // suffix path 1,...
}
t.Logf("Created Node B vertices with suffix paths: 0,0; 0,1; 0,3; 1")
// Verify the paths - show nibbles 40-50 where the difference should be
t.Log("Node A vertices paths (showing nibbles 40-50 where dataAddress starts):")
for i, v := range nodeAVertices {
id := v.GetID()
path := GetFullPath(id[:])
// Nibble 42 is where dataAddress bits start (256/6 = 42.67)
start := 40
end := min(50, len(path))
if end > start {
t.Logf(" Vertex %d path[%d:%d]: %v", i, start, end, path[start:end])
}
}
t.Log("Node B vertices paths (showing nibbles 40-50 where dataAddress starts):")
for i, v := range nodeBVertices {
id := v.GetID()
path := GetFullPath(id[:])
start := 40
end := min(50, len(path))
if end > start {
t.Logf(" Vertex %d path[%d:%d]: %v", i, start, end, path[start:end])
}
}
shardKey := application.GetShardKey(nodeAVertices[0])
// Add vertices to Node A
nodeATxn, err := nodeAStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeAVertices {
id := v.GetID()
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
}
require.NoError(t, nodeATxn.Commit())
// Add vertices to Node B
nodeBTxn, err := nodeBStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range nodeBVertices {
id := v.GetID()
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[3+i]))
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
}
require.NoError(t, nodeBTxn.Commit())
// Commit both
_, err = nodeAHG.Commit(1)
require.NoError(t, err)
_, err = nodeBHG.Commit(1)
require.NoError(t, err)
nodeARoot := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
nodeBRoot := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Node A root: %x", nodeARoot)
t.Logf("Node B root: %x", nodeBRoot)
// Setup gRPC server
const bufSize = 1 << 20
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
go func() { _ = grpcServer.Serve(lis) }()
return lis, grpcServer
}
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) { return lis.Dial() }
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
var serverHG, clientHG *hgcrdt.HypergraphCRDT
var serverRoot []byte
if direction == "A_syncs_from_B" {
serverHG = nodeBHG
clientHG = nodeAHG
serverRoot = nodeBRoot
} else {
serverHG = nodeAHG
clientHG = nodeBHG
serverRoot = nodeARoot
}
serverHG.PublishSnapshot(serverRoot)
lis, grpcServer := setupServer(serverHG)
defer grpcServer.Stop()
// Count client leaves before sync
clientTreeBefore := clientHG.GetVertexAddsSet(shardKey).GetTree()
clientLeavesBefore := tries.GetAllLeaves(
clientTreeBefore.SetType,
clientTreeBefore.PhaseType,
clientTreeBefore.ShardKey,
clientTreeBefore.Root,
)
clientLeafCountBefore := 0
for _, leaf := range clientLeavesBefore {
if leaf != nil {
clientLeafCountBefore++
}
}
t.Logf("Client has %d leaves before sync", clientLeafCountBefore)
conn, client := dialClient(lis)
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
conn.Close()
_, err = clientHG.Commit(2)
require.NoError(t, err)
// In CRDT sync, the client receives data from the server and MERGES it.
// The client should now have BOTH its original vertices AND the server's vertices.
// So the client root should differ from both original roots (it's a superset).
clientRoot := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Client root after sync: %x", clientRoot)
// Get all leaves from the client tree after sync
clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree()
clientLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
clientLeafCount := 0
for _, leaf := range clientLeaves {
if leaf != nil {
clientLeafCount++
}
}
// After sync, client should have received server's vertices (merged with its own)
// The client should have at least as many leaves as it started with
assert.GreaterOrEqual(t, clientLeafCount, clientLeafCountBefore,
"client should not lose leaves during sync")
// Client should have gained some leaves from the server (unless they already had them all)
t.Logf("Sync %s completed - client went from %d to %d leaves",
direction, clientLeafCountBefore, clientLeafCount)
// Verify the sync actually transferred data by checking that server's vertices are now in client
serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree()
serverLeaves := tries.GetAllLeaves(
serverTree.SetType,
serverTree.PhaseType,
serverTree.ShardKey,
serverTree.Root,
)
serverLeafCount := 0
for _, leaf := range serverLeaves {
if leaf != nil {
serverLeafCount++
}
}
t.Logf("Server has %d leaves", serverLeafCount)
// The client should have at least as many leaves as the server
// (since it's merging server data into its own)
assert.GreaterOrEqual(t, clientLeafCount, serverLeafCount,
"client should have at least as many leaves as server after sync")
})
}
// Test both directions
runSyncTest("A_syncs_from_B")
runSyncTest("B_syncs_from_A")
}
// TestMainnetBlossomsubFrameReceptionAndHypersync is an integration test that:
// 1. Connects to mainnet blossomsub using real bootstrap peers
// 2. Subscribes to the global frame bitmask (0x0000) as done in global_consensus_engine.go
// 3. Receives a real frame from a global prover on mainnet
// 4. Performs hypersync on the prover shard (000000ffffffff...ffffffff)
// 5. Confirms the synced data matches the prover root commitment from the frame
//
// This test requires network access and may take up to 5 minutes to receive a frame.
// Run with: go test -v -timeout 10m -run TestMainnetBlossomsubFrameReceptionAndHypersync
func TestMainnetBlossomsubFrameReceptionAndHypersync(t *testing.T) {
if testing.Short() {
t.Skip("skipping mainnet integration test in short mode")
}
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// The prover shard key from global consensus:
// L1 = [0x00, 0x00, 0x00], L2 = bytes.Repeat([]byte{0xff}, 32)
proverShardKey := tries.ShardKey{
L1: [3]byte{0x00, 0x00, 0x00},
L2: [32]byte(bytes.Repeat([]byte{0xff}, 32)),
}
// Frame bitmask from global consensus: []byte{0x00, 0x00}
globalFrameBitmask := []byte{0x00, 0x00}
// Create in-memory hypergraph store for the client
clientDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"}}, 0)
defer clientDB.Close()
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"},
clientDB,
logger,
enc,
inclusionProver,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "mainnet-client")),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Generate a random peer key for this test node
peerPrivKey, _, err := pcrypto.GenerateEd448Key(rand.Reader)
require.NoError(t, err)
peerPrivKeyBytes, err := peerPrivKey.Raw()
require.NoError(t, err)
// Create P2P config with mainnet bootstrap peers
p2pConfig := &config.P2PConfig{
ListenMultiaddr: "/ip4/0.0.0.0/udp/0/quic-v1", // Use random port
BootstrapPeers: config.BootstrapPeers,
PeerPrivKey: fmt.Sprintf("%x", peerPrivKeyBytes),
Network: 0, // Mainnet
D: 8,
DLo: 6,
DHi: 12,
DScore: 4,
DOut: 2,
HistoryLength: 5,
HistoryGossip: 3,
DLazy: 6,
GossipFactor: 0.25,
GossipRetransmission: 3,
HeartbeatInitialDelay: 100 * time.Millisecond,
HeartbeatInterval: 1 * time.Second,
FanoutTTL: 60 * time.Second,
PrunePeers: 16,
PruneBackoff: time.Minute,
UnsubscribeBackoff: 10 * time.Second,
Connectors: 8,
MaxPendingConnections: 128,
ConnectionTimeout: 30 * time.Second,
DirectConnectTicks: 300,
DirectConnectInitialDelay: 1 * time.Second,
OpportunisticGraftTicks: 60,
OpportunisticGraftPeers: 2,
GraftFloodThreshold: 10 * time.Second,
MaxIHaveLength: 5000,
MaxIHaveMessages: 10,
MaxIDontWantMessages: 10,
IWantFollowupTime: 3 * time.Second,
IDontWantMessageThreshold: 10000,
IDontWantMessageTTL: 3,
MinBootstrapPeers: 1,
BootstrapParallelism: 4,
DiscoveryParallelism: 4,
DiscoveryPeerLookupLimit: 100,
PingTimeout: 30 * time.Second,
PingPeriod: time.Minute,
PingAttempts: 3,
LowWatermarkConnections: -1,
HighWatermarkConnections: -1,
SubscriptionQueueSize: 128,
ValidateQueueSize: 128,
ValidateWorkers: 4,
PeerOutboundQueueSize: 128,
}
engineConfig := &config.EngineConfig{}
// Create a temporary config directory
configDir, err := os.MkdirTemp("", "quil-test-*")
require.NoError(t, err)
defer os.RemoveAll(configDir)
// Create connectivity cache file to bypass the connectivity test
// The cache file must be named "connectivity-check-<coreId>" and exist in configDir
connectivityCachePath := fmt.Sprintf("%s/connectivity-check-0", configDir)
err = os.WriteFile(connectivityCachePath, []byte(time.Now().Format(time.RFC3339)), 0644)
require.NoError(t, err)
t.Log("Connecting to mainnet blossomsub...")
// Create the real blossomsub instance
pubsub := p2p.NewBlossomSub(
p2pConfig,
engineConfig,
logger.Named("blossomsub"),
0,
p2p.ConfigDir(configDir),
)
defer pubsub.Close()
t.Logf("Connected to mainnet with peer ID: %x", pubsub.GetPeerID())
t.Logf("Bootstrap peers: %d", len(config.BootstrapPeers))
// Create a channel to receive frames
frameReceived := make(chan *protobufs.GlobalFrame, 1)
// Create a peer info manager to store peer reachability info
// We use a simple in-memory map to store peer info from the peer info bitmask
peerInfoMap := make(map[string]*tp2p.PeerInfo)
var peerInfoMu sync.RWMutex
// Create a key registry map to map prover addresses to identity peer IDs
// Key: prover address ([]byte as string), Value: identity peer ID
keyRegistryMap := make(map[string]peer.ID)
var keyRegistryMu sync.RWMutex
// Peer info bitmask from global consensus: []byte{0x00, 0x00, 0x00, 0x00}
globalPeerInfoBitmask := []byte{0x00, 0x00, 0x00, 0x00}
// Subscribe to peer info bitmask - this handles both PeerInfo and KeyRegistry messages
t.Log("Subscribing to global peer info bitmask...")
err = pubsub.Subscribe(globalPeerInfoBitmask, func(message *pb.Message) error {
if len(message.Data) < 4 {
return nil
}
// Check type prefix
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
switch typePrefix {
case protobufs.PeerInfoType:
peerInfoMsg := &protobufs.PeerInfo{}
if err := peerInfoMsg.FromCanonicalBytes(message.Data); err != nil {
t.Logf("Failed to unmarshal peer info: %v", err)
return nil
}
// Validate signature using Ed448
if len(peerInfoMsg.Signature) == 0 || len(peerInfoMsg.PublicKey) == 0 {
return nil
}
// Create a copy without signature for validation
infoCopy := &protobufs.PeerInfo{
PeerId: peerInfoMsg.PeerId,
Reachability: peerInfoMsg.Reachability,
Timestamp: peerInfoMsg.Timestamp,
Version: peerInfoMsg.Version,
PatchNumber: peerInfoMsg.PatchNumber,
Capabilities: peerInfoMsg.Capabilities,
PublicKey: peerInfoMsg.PublicKey,
LastReceivedFrame: peerInfoMsg.LastReceivedFrame,
LastGlobalHeadFrame: peerInfoMsg.LastGlobalHeadFrame,
}
msg, err := infoCopy.ToCanonicalBytes()
if err != nil {
return nil
}
// Validate Ed448 signature
if !ed448.Verify(ed448.PublicKey(peerInfoMsg.PublicKey), msg, peerInfoMsg.Signature, "") {
return nil
}
// Convert and store peer info
reachability := []tp2p.Reachability{}
for _, r := range peerInfoMsg.Reachability {
reachability = append(reachability, tp2p.Reachability{
Filter: r.Filter,
PubsubMultiaddrs: r.PubsubMultiaddrs,
StreamMultiaddrs: r.StreamMultiaddrs,
})
}
peerInfoMu.Lock()
peerInfoMap[string(peerInfoMsg.PeerId)] = &tp2p.PeerInfo{
PeerId: peerInfoMsg.PeerId,
Reachability: reachability,
Cores: uint32(len(reachability)),
LastSeen: time.Now().UnixMilli(),
Version: peerInfoMsg.Version,
PatchNumber: peerInfoMsg.PatchNumber,
LastReceivedFrame: peerInfoMsg.LastReceivedFrame,
LastGlobalHeadFrame: peerInfoMsg.LastGlobalHeadFrame,
}
peerInfoMu.Unlock()
// peerIdStr := peer.ID(peerInfoMsg.PeerId).String()
// t.Logf("Received peer info for %s with %d reachability entries",
// peerIdStr, len(reachability))
case protobufs.KeyRegistryType:
keyRegistry := &protobufs.KeyRegistry{}
if err := keyRegistry.FromCanonicalBytes(message.Data); err != nil {
t.Logf("Failed to unmarshal key registry: %v", err)
return nil
}
// We need identity key and prover key to establish the mapping
if keyRegistry.IdentityKey == nil || len(keyRegistry.IdentityKey.KeyValue) == 0 {
return nil
}
if keyRegistry.ProverKey == nil || len(keyRegistry.ProverKey.KeyValue) == 0 {
return nil
}
// Derive peer ID from identity key
pk, err := pcrypto.UnmarshalEd448PublicKey(keyRegistry.IdentityKey.KeyValue)
if err != nil {
t.Logf("Failed to unmarshal identity key: %v", err)
return nil
}
identityPeerID, err := peer.IDFromPublicKey(pk)
if err != nil {
t.Logf("Failed to derive peer ID from identity key: %v", err)
return nil
}
// Derive prover address from prover key (Poseidon hash)
proverAddrBI, err := poseidon.HashBytes(keyRegistry.ProverKey.KeyValue)
if err != nil {
t.Logf("Failed to derive prover address: %v", err)
return nil
}
proverAddress := proverAddrBI.FillBytes(make([]byte, 32))
// Store the mapping: prover address -> identity peer ID
keyRegistryMu.Lock()
keyRegistryMap[string(proverAddress)] = identityPeerID
keyRegistryMu.Unlock()
// t.Logf("Received key registry: prover %x -> peer %s",
// proverAddress, identityPeerID.String())
}
return nil
})
require.NoError(t, err)
// Register a validator for peer info messages with age checks
err = pubsub.RegisterValidator(globalPeerInfoBitmask, func(peerID peer.ID, message *pb.Message) tp2p.ValidationResult {
if len(message.Data) < 4 {
return tp2p.ValidationResultReject
}
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
now := time.Now().UnixMilli()
switch typePrefix {
case protobufs.PeerInfoType:
peerInfo := &protobufs.PeerInfo{}
if err := peerInfo.FromCanonicalBytes(message.Data); err != nil {
return tp2p.ValidationResultReject
}
// Age checks: timestamp must be within 1 second in the past, 5 seconds in the future
if peerInfo.Timestamp < now-1000 {
t.Logf("Rejecting peer info: timestamp too old (%d < %d)", peerInfo.Timestamp, now-1000)
return tp2p.ValidationResultReject
}
if peerInfo.Timestamp > now+5000 {
t.Logf("Ignoring peer info: timestamp too far in future (%d > %d)", peerInfo.Timestamp, now+5000)
return tp2p.ValidationResultIgnore
}
case protobufs.KeyRegistryType:
keyRegistry := &protobufs.KeyRegistry{}
if err := keyRegistry.FromCanonicalBytes(message.Data); err != nil {
return tp2p.ValidationResultReject
}
// Age checks: LastUpdated must be within 1 second in the past, 5 seconds in the future
if int64(keyRegistry.LastUpdated) < now-1000 {
t.Logf("Rejecting key registry: timestamp too old (%d < %d)", keyRegistry.LastUpdated, now-1000)
return tp2p.ValidationResultReject
}
if int64(keyRegistry.LastUpdated) > now+5000 {
t.Logf("Ignoring key registry: timestamp too far in future (%d > %d)", keyRegistry.LastUpdated, now+5000)
return tp2p.ValidationResultIgnore
}
default:
return tp2p.ValidationResultIgnore
}
return tp2p.ValidationResultAccept
}, true)
require.NoError(t, err)
// Subscribe to frame messages
t.Log("Subscribing to global frame bitmask...")
err = pubsub.Subscribe(globalFrameBitmask, func(message *pb.Message) error {
t.Logf("Received message on frame bitmask, data length: %d", len(message.Data))
if len(message.Data) < 4 {
return nil
}
// Check type prefix
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
t.Logf("Message type prefix: %d (GlobalFrameType=%d)", typePrefix, protobufs.GlobalFrameType)
if typePrefix != protobufs.GlobalFrameType {
return nil
}
frame := &protobufs.GlobalFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
t.Logf("Failed to unmarshal frame: %v", err)
return nil
}
t.Logf("Received frame %d from prover %x with root %x",
frame.Header.FrameNumber,
frame.Header.Prover,
frame.Header.ProverTreeCommitment)
select {
case frameReceived <- frame:
default:
}
return nil
})
require.NoError(t, err)
// Register a validator for frame messages with age checks
err = pubsub.RegisterValidator(globalFrameBitmask, func(peerID peer.ID, message *pb.Message) tp2p.ValidationResult {
if len(message.Data) < 4 {
return tp2p.ValidationResultReject
}
typePrefix := binary.BigEndian.Uint32(message.Data[:4])
if typePrefix != protobufs.GlobalFrameType {
return tp2p.ValidationResultIgnore
}
frame := &protobufs.GlobalFrame{}
if err := frame.FromCanonicalBytes(message.Data); err != nil {
t.Logf("Frame validation: failed to unmarshal: %v", err)
return tp2p.ValidationResultReject
}
// Check signature is present
if frame.Header.PublicKeySignatureBls48581 == nil ||
frame.Header.PublicKeySignatureBls48581.PublicKey == nil ||
frame.Header.PublicKeySignatureBls48581.PublicKey.KeyValue == nil {
t.Logf("Frame validation: missing signature")
return tp2p.ValidationResultReject
}
// Age check: frame must be within 120 seconds
frameAge := time.Since(time.UnixMilli(frame.Header.Timestamp))
if frameAge > 120*time.Second {
t.Logf("Frame validation: too old (age=%v)", frameAge)
return tp2p.ValidationResultIgnore
}
t.Logf("Frame validation: accepting frame %d (age=%v)", frame.Header.FrameNumber, frameAge)
return tp2p.ValidationResultAccept
}, true)
require.NoError(t, err)
t.Log("Waiting for a global frame from mainnet (this may take up to 20 minutes)...")
// Wait for a frame with a longer timeout for mainnet - frames can take a while
var receivedFrame *protobufs.GlobalFrame
select {
case receivedFrame = <-frameReceived:
t.Logf("Successfully received frame %d!", receivedFrame.Header.FrameNumber)
case <-time.After(20 * time.Minute):
t.Fatal("timeout waiting for frame from mainnet - ensure network connectivity")
}
// Verify frame has required fields
require.NotNil(t, receivedFrame.Header, "frame must have header")
require.NotEmpty(t, receivedFrame.Header.Prover, "frame must have prover")
require.NotEmpty(t, receivedFrame.Header.ProverTreeCommitment, "frame must have prover tree commitment")
expectedRoot := receivedFrame.Header.ProverTreeCommitment
proverAddress := receivedFrame.Header.Prover // This is the prover ADDRESS (hash of BLS key), not a peer ID
t.Logf("Frame details:")
t.Logf(" Frame number: %d", receivedFrame.Header.FrameNumber)
t.Logf(" Prover address: %x", proverAddress)
t.Logf(" Prover root commitment: %x", expectedRoot)
// Now we need to find the prover's peer info to connect and sync
// The prover address (in frame) needs to be mapped to a peer ID via key registry
t.Log("Looking up prover peer info...")
// Helper function to get prover's identity peer ID from key registry
getProverPeerID := func() (peer.ID, bool) {
keyRegistryMu.RLock()
defer keyRegistryMu.RUnlock()
peerID, ok := keyRegistryMap[string(proverAddress)]
return peerID, ok
}
// Helper function to get multiaddr from peer info map using peer ID
getMultiaddrForPeer := func(peerID peer.ID) string {
peerInfoMu.RLock()
defer peerInfoMu.RUnlock()
info, ok := peerInfoMap[string([]byte(peerID))]
if !ok || len(info.Reachability) == 0 {
return ""
}
// Try stream multiaddrs first (for direct gRPC connection)
for _, r := range info.Reachability {
if len(r.StreamMultiaddrs) > 0 {
return r.StreamMultiaddrs[0]
}
}
// Fall back to pubsub multiaddrs
for _, r := range info.Reachability {
if len(r.PubsubMultiaddrs) > 0 {
return r.PubsubMultiaddrs[0]
}
}
return ""
}
// Wait for key registry and peer info to arrive (provers broadcast every 5 minutes)
t.Log("Waiting for prover key registry and peer info (up to 10 minutes)...")
var proverPeerID peer.ID
var proverMultiaddr string
timeout := time.After(10 * time.Minute)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
waitLoop:
for {
select {
case <-timeout:
t.Log("Timeout waiting for prover info")
break waitLoop
case <-ticker.C:
// First try to get the peer ID from key registry
if proverPeerID == "" {
if pID, ok := getProverPeerID(); ok {
proverPeerID = pID
t.Logf("Found key registry: prover address %x -> peer ID %s", proverAddress, proverPeerID.String())
}
}
// If we have the peer ID, try to get the multiaddr from peer info
if proverPeerID != "" {
proverMultiaddr = getMultiaddrForPeer(proverPeerID)
if proverMultiaddr != "" {
t.Logf("Found prover peer info from peer info bitmask!")
break waitLoop
}
}
// Log progress
keyRegistryMu.RLock()
peerInfoMu.RLock()
t.Logf("Still waiting... key registries: %d, peer infos: %d, have prover peer ID: %v",
len(keyRegistryMap), len(peerInfoMap), proverPeerID != "")
peerInfoMu.RUnlock()
keyRegistryMu.RUnlock()
}
}
// If we have peer ID but no multiaddr, try connected peers
if proverPeerID != "" && proverMultiaddr == "" {
t.Log("Checking connected peers for prover...")
networkInfo := pubsub.GetNetworkInfo()
for _, info := range networkInfo.NetworkInfo {
if bytes.Equal(info.PeerId, []byte(proverPeerID)) && len(info.Multiaddrs) > 0 {
proverMultiaddr = info.Multiaddrs[0]
t.Logf("Found prover in connected peers")
break
}
}
}
// Final fallback - direct lookup using peer ID
if proverPeerID != "" && proverMultiaddr == "" {
t.Logf("Attempting direct peer lookup...")
proverMultiaddr = pubsub.GetMultiaddrOfPeer([]byte(proverPeerID))
}
if proverPeerID == "" {
t.Skip("Could not find prover key registry - prover may not have broadcast key info yet")
}
if proverMultiaddr == "" {
t.Skip("Could not find prover multiaddr - prover may not have broadcast peer info yet")
}
t.Logf("Prover multiaddr: %s", proverMultiaddr)
// Connect to the prover using direct gRPC connection via multiaddr
t.Log("Connecting to prover for hypersync...")
// Create TLS credentials for the connection
creds, err := p2p.NewPeerAuthenticator(
logger,
p2pConfig,
nil,
nil,
nil,
nil,
[][]byte{[]byte(proverPeerID)},
map[string]channel.AllowedPeerPolicyType{},
map[string]channel.AllowedPeerPolicyType{},
).CreateClientTLSCredentials([]byte(proverPeerID))
if err != nil {
t.Skipf("Could not create TLS credentials: %v", err)
}
// Parse the multiaddr and convert to network address
ma, err := multiaddr.StringCast(proverMultiaddr)
if err != nil {
t.Skipf("Could not parse multiaddr %s: %v", proverMultiaddr, err)
}
mga, err := mn.ToNetAddr(ma)
if err != nil {
t.Skipf("Could not convert multiaddr to net addr: %v", err)
}
// Create gRPC client connection
conn, err := grpc.NewClient(
mga.String(),
grpc.WithTransportCredentials(creds),
)
if err != nil {
t.Skipf("Could not establish connection to prover: %v", err)
}
defer conn.Close()
client := protobufs.NewHypergraphComparisonServiceClient(conn)
// First, query the server's root commitment to verify what it claims to have
t.Log("Querying server's root commitment before sync...")
{
diagStream, err := client.PerformSync(context.Background())
require.NoError(t, err)
shardKeyBytes := slices.Concat(proverShardKey.L1[:], proverShardKey.L2[:])
err = diagStream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetBranch{
GetBranch: &protobufs.HypergraphSyncGetBranchRequest{
ShardKey: shardKeyBytes,
PhaseSet: protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
Path: []int32{},
},
},
})
require.NoError(t, err)
resp, err := diagStream.Recv()
require.NoError(t, err)
if errResp := resp.GetError(); errResp != nil {
t.Logf("Server error on root query: %s", errResp.Message)
} else if branch := resp.GetBranch(); branch != nil {
t.Logf("Server root commitment: %x", branch.Commitment)
t.Logf("Server root path: %v", branch.FullPath)
t.Logf("Server root isLeaf: %v", branch.IsLeaf)
t.Logf("Server root children count: %d", len(branch.Children))
t.Logf("Server root leafCount: %d", branch.LeafCount)
t.Logf("Frame expected root: %x", expectedRoot)
if !bytes.Equal(branch.Commitment, expectedRoot) {
t.Logf("WARNING: Server root commitment does NOT match frame expected root!")
} else {
t.Logf("OK: Server root commitment matches frame expected root")
}
// Log each child's commitment
for _, child := range branch.Children {
t.Logf(" Server child[%d]: commitment=%x", child.Index, child.Commitment)
}
// Drill into child[37] specifically to compare
child37Path := append(slices.Clone(branch.FullPath), 37)
err = diagStream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetBranch{
GetBranch: &protobufs.HypergraphSyncGetBranchRequest{
ShardKey: shardKeyBytes,
PhaseSet: protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
Path: child37Path,
},
},
})
if err == nil {
resp37, err := diagStream.Recv()
if err == nil {
if b37 := resp37.GetBranch(); b37 != nil {
t.Logf("Server child[37] details: path=%v, leafCount=%d, isLeaf=%v, childrenCount=%d",
b37.FullPath, b37.LeafCount, b37.IsLeaf, len(b37.Children))
}
}
}
}
_ = diagStream.CloseSend()
}
// Perform hypersync on all phases
t.Log("Performing hypersync on prover shard...")
phases := []protobufs.HypergraphPhaseSet{
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
}
for _, phase := range phases {
stream, err := client.PerformSync(context.Background())
if err != nil {
t.Logf("PerformSync error: %v", err)
continue
}
_, err = clientHG.SyncFrom(stream, proverShardKey, phase, nil)
if err != nil {
t.Logf("SyncFrom error for phase %v: %v", phase, err)
}
_ = stream.CloseSend()
}
// Commit client to compute root
_, err = clientHG.Commit(uint64(receivedFrame.Header.FrameNumber))
require.NoError(t, err)
// Verify client now has the expected prover root
clientProverRoot := clientHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(nil, false)
t.Logf("Client prover root after sync: %x", clientProverRoot)
t.Logf("Expected prover root from frame: %x", expectedRoot)
// Diagnostic: show client tree structure
clientTreeForDiag := clientHG.GetVertexAddsSet(proverShardKey).GetTree()
if clientTreeForDiag != nil && clientTreeForDiag.Root != nil {
switch n := clientTreeForDiag.Root.(type) {
case *tries.LazyVectorCommitmentBranchNode:
t.Logf("Client root is BRANCH: path=%v, commitment=%x, leafCount=%d", n.FullPrefix, n.Commitment, n.LeafCount)
childCount := 0
for i := 0; i < 64; i++ {
if n.Children[i] != nil {
childCount++
child := n.Children[i]
switch c := child.(type) {
case *tries.LazyVectorCommitmentBranchNode:
t.Logf(" Client child[%d]: BRANCH commitment=%x, leafCount=%d", i, c.Commitment, c.LeafCount)
case *tries.LazyVectorCommitmentLeafNode:
t.Logf(" Client child[%d]: LEAF commitment=%x", i, c.Commitment)
}
}
}
t.Logf("Client root in-memory children: %d", childCount)
case *tries.LazyVectorCommitmentLeafNode:
t.Logf("Client root is LEAF: key=%x, commitment=%x", n.Key, n.Commitment)
}
} else {
t.Logf("Client tree root is nil")
}
// Deep dive into child[37] - get server leaves to compare
t.Log("=== Deep dive into child[37] ===")
var serverChild37Leaves []*protobufs.LeafData
{
diagStream, err := client.PerformSync(context.Background())
if err != nil {
t.Logf("Failed to create diag stream: %v", err)
} else {
shardKeyBytes := slices.Concat(proverShardKey.L1[:], proverShardKey.L2[:])
// Correct path: root is at [...60], child[37] is at [...60, 37]
child37Path := []int32{63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 60, 37}
err = diagStream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetLeaves{
GetLeaves: &protobufs.HypergraphSyncGetLeavesRequest{
ShardKey: shardKeyBytes,
PhaseSet: protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
Path: child37Path,
MaxLeaves: 1000,
},
},
})
if err != nil {
t.Logf("Failed to send GetLeaves request: %v", err)
} else {
resp, err := diagStream.Recv()
if err != nil {
t.Logf("Failed to receive GetLeaves response: %v", err)
} else if errResp := resp.GetError(); errResp != nil {
t.Logf("Server returned error: %s", errResp.Message)
} else if leaves := resp.GetLeaves(); leaves != nil {
serverChild37Leaves = leaves.Leaves
t.Logf("Server child[37] leaves: count=%d, total=%d", len(leaves.Leaves), leaves.TotalLeaves)
// Show first few leaf keys
for i, leaf := range leaves.Leaves {
if i < 5 {
t.Logf(" Server leaf[%d]: key=%x (len=%d)", i, leaf.Key[:min(32, len(leaf.Key))], len(leaf.Key))
}
}
if len(leaves.Leaves) > 5 {
t.Logf(" ... and %d more leaves", len(leaves.Leaves)-5)
}
} else {
t.Logf("Server returned unexpected response type")
}
}
_ = diagStream.CloseSend()
}
}
// Get all client leaves and compare with server child[37] leaves
clientTree := clientHG.GetVertexAddsSet(proverShardKey).GetTree()
allClientLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
t.Logf("Total client leaves: %d", len(allClientLeaves))
// Build map of client leaf keys -> values
clientLeafMap := make(map[string][]byte)
for _, leaf := range allClientLeaves {
if leaf != nil {
clientLeafMap[string(leaf.Key)] = leaf.Value
}
}
// Check which server child[37] leaves are in client and compare values
if len(serverChild37Leaves) > 0 {
found := 0
missing := 0
valueMismatch := 0
for _, serverLeaf := range serverChild37Leaves {
clientValue, exists := clientLeafMap[string(serverLeaf.Key)]
if !exists {
if missing < 3 {
t.Logf(" Missing server leaf: key=%x", serverLeaf.Key[:min(32, len(serverLeaf.Key))])
}
missing++
} else {
found++
if !bytes.Equal(clientValue, serverLeaf.Value) {
if valueMismatch < 5 {
t.Logf(" VALUE MISMATCH for key=%x: serverLen=%d, clientLen=%d",
serverLeaf.Key[:min(32, len(serverLeaf.Key))],
len(serverLeaf.Value), len(clientValue))
t.Logf(" Server value prefix: %x", serverLeaf.Value[:min(64, len(serverLeaf.Value))])
t.Logf(" Client value prefix: %x", clientValue[:min(64, len(clientValue))])
}
valueMismatch++
}
}
}
t.Logf("Server child[37] leaves in client: found=%d, missing=%d, valueMismatch=%d", found, missing, valueMismatch)
}
// Compare branch structure for child[37]
t.Log("=== Comparing branch structure for child[37] ===")
{
// Query server's child[37] branch info
diagStream, err := client.PerformSync(context.Background())
if err == nil {
shardKeyBytes := slices.Concat(proverShardKey.L1[:], proverShardKey.L2[:])
child37Path := []int32{63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 60, 37}
err = diagStream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetBranch{
GetBranch: &protobufs.HypergraphSyncGetBranchRequest{
ShardKey: shardKeyBytes,
PhaseSet: protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
Path: child37Path,
},
},
})
if err == nil {
resp, err := diagStream.Recv()
if err == nil {
if branch := resp.GetBranch(); branch != nil {
t.Logf("Server child[37] branch: path=%v, commitment=%x, children=%d",
branch.FullPath, branch.Commitment[:min(32, len(branch.Commitment))], len(branch.Children))
// Show first few children with their commitments
for i, child := range branch.Children {
if i < 10 {
t.Logf(" Server sub-child[%d]: commitment=%x", child.Index, child.Commitment[:min(32, len(child.Commitment))])
}
}
// Now check client's child[37] branch structure
if clientTree != nil && clientTree.Root != nil {
if rootBranch, ok := clientTree.Root.(*tries.LazyVectorCommitmentBranchNode); ok {
if child37 := rootBranch.Children[37]; child37 != nil {
if clientChild37Branch, ok := child37.(*tries.LazyVectorCommitmentBranchNode); ok {
t.Logf("Client child[37] branch: path=%v, commitment=%x, leafCount=%d",
clientChild37Branch.FullPrefix, clientChild37Branch.Commitment[:min(32, len(clientChild37Branch.Commitment))], clientChild37Branch.LeafCount)
// Count and show client's children
clientChildCount := 0
for i := 0; i < 64; i++ {
if clientChild37Branch.Children[i] != nil {
if clientChildCount < 10 {
switch c := clientChild37Branch.Children[i].(type) {
case *tries.LazyVectorCommitmentBranchNode:
t.Logf(" Client sub-child[%d]: BRANCH commitment=%x", i, c.Commitment[:min(32, len(c.Commitment))])
case *tries.LazyVectorCommitmentLeafNode:
t.Logf(" Client sub-child[%d]: LEAF commitment=%x", i, c.Commitment[:min(32, len(c.Commitment))])
}
}
clientChildCount++
}
}
t.Logf("Client child[37] has %d in-memory children, server has %d", clientChildCount, len(branch.Children))
} else if clientChild37Leaf, ok := child37.(*tries.LazyVectorCommitmentLeafNode); ok {
t.Logf("Client child[37] is LEAF: key=%x, commitment=%x",
clientChild37Leaf.Key[:min(32, len(clientChild37Leaf.Key))], clientChild37Leaf.Commitment[:min(32, len(clientChild37Leaf.Commitment))])
}
} else {
t.Logf("Client has NO child at index 37")
}
}
}
}
}
}
_ = diagStream.CloseSend()
}
}
// Recursive comparison function to drill into mismatches
var recursiveCompare func(path []int32, depth int)
recursiveCompare = func(path []int32, depth int) {
if depth > 10 {
t.Logf("DEPTH LIMIT REACHED at path=%v", path)
return
}
indent := strings.Repeat(" ", depth)
// Get server branch at path
diagStream, err := client.PerformSync(context.Background())
if err != nil {
t.Logf("%sERROR creating stream: %v", indent, err)
return
}
defer diagStream.CloseSend()
shardKeyBytes := slices.Concat(proverShardKey.L1[:], proverShardKey.L2[:])
err = diagStream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetBranch{
GetBranch: &protobufs.HypergraphSyncGetBranchRequest{
ShardKey: shardKeyBytes,
PhaseSet: protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
Path: path,
},
},
})
if err != nil {
t.Logf("%sERROR sending request: %v", indent, err)
return
}
resp, err := diagStream.Recv()
if err != nil {
t.Logf("%sERROR receiving response: %v", indent, err)
return
}
if errResp := resp.GetError(); errResp != nil {
t.Logf("%sSERVER ERROR: %s", indent, errResp.Message)
return
}
serverBranch := resp.GetBranch()
if serverBranch == nil {
t.Logf("%sNO BRANCH in response", indent)
return
}
t.Logf("%sSERVER: path=%v, fullPath=%v, leafCount=%d, children=%d, isLeaf=%v",
indent, path, serverBranch.FullPath, serverBranch.LeafCount,
len(serverBranch.Children), serverBranch.IsLeaf)
t.Logf("%sSERVER commitment: %x", indent, serverBranch.Commitment[:min(48, len(serverBranch.Commitment))])
// Get corresponding client node - convert []int32 to []int
pathInt := make([]int, len(path))
for i, p := range path {
pathInt[i] = int(p)
}
clientNode, err := clientTree.GetByPath(pathInt)
if err != nil {
t.Logf("%sERROR getting client node: %v", indent, err)
return
}
if clientNode == nil {
t.Logf("%sCLIENT: NO NODE at path=%v", indent, path)
return
}
switch cn := clientNode.(type) {
case *tries.LazyVectorCommitmentBranchNode:
t.Logf("%sCLIENT: path=%v, fullPrefix=%v, leafCount=%d, commitment=%x",
indent, path, cn.FullPrefix, cn.LeafCount, cn.Commitment[:min(48, len(cn.Commitment))])
// Check if server is leaf but client is branch
if serverBranch.IsLeaf {
t.Logf("%s*** TYPE MISMATCH: server is LEAF, client is BRANCH ***", indent)
t.Logf("%s SERVER: fullPath=%v, isLeaf=%v, commitment=%x",
indent, serverBranch.FullPath, serverBranch.IsLeaf, serverBranch.Commitment[:min(48, len(serverBranch.Commitment))])
return
}
// Check if FullPath differs from FullPrefix
serverPathStr := fmt.Sprintf("%v", serverBranch.FullPath)
clientPathStr := fmt.Sprintf("%v", cn.FullPrefix)
if serverPathStr != clientPathStr {
t.Logf("%s*** PATH MISMATCH: server fullPath=%v, client fullPrefix=%v ***",
indent, serverBranch.FullPath, cn.FullPrefix)
}
// Check commitment match
if !bytes.Equal(serverBranch.Commitment, cn.Commitment) {
t.Logf("%s*** COMMITMENT MISMATCH ***", indent)
// Compare children
serverChildren := make(map[int32][]byte)
for _, sc := range serverBranch.Children {
serverChildren[sc.Index] = sc.Commitment
}
for i := int32(0); i < 64; i++ {
serverCommit := serverChildren[i]
var clientCommit []byte
clientChild := cn.Children[i]
// Lazy-load client child from store if needed
if clientChild == nil && len(serverCommit) > 0 {
childPathInt := make([]int, len(cn.FullPrefix)+1)
for j, p := range cn.FullPrefix {
childPathInt[j] = p
}
childPathInt[len(cn.FullPrefix)] = int(i)
clientChild, _ = clientTree.Store.GetNodeByPath(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
childPathInt,
)
}
if clientChild != nil {
switch cc := clientChild.(type) {
case *tries.LazyVectorCommitmentBranchNode:
clientCommit = cc.Commitment
case *tries.LazyVectorCommitmentLeafNode:
clientCommit = cc.Commitment
}
}
if len(serverCommit) > 0 || len(clientCommit) > 0 {
if !bytes.Equal(serverCommit, clientCommit) {
t.Logf("%s CHILD[%d] MISMATCH: server=%x, client=%x",
indent, i,
serverCommit[:min(24, len(serverCommit))],
clientCommit[:min(24, len(clientCommit))])
// Recurse into mismatched child
childPath := append(slices.Clone(serverBranch.FullPath), i)
recursiveCompare(childPath, depth+1)
}
}
}
}
case *tries.LazyVectorCommitmentLeafNode:
t.Logf("%sCLIENT: LEAF key=%x, commitment=%x",
indent, cn.Key[:min(32, len(cn.Key))], cn.Commitment[:min(48, len(cn.Commitment))])
t.Logf("%sCLIENT LEAF DETAIL: fullKey=%x, value len=%d",
indent, cn.Key, len(cn.Value))
// Compare with server commitment
if serverBranch.IsLeaf {
if !bytes.Equal(serverBranch.Commitment, cn.Commitment) {
t.Logf("%s*** LEAF COMMITMENT MISMATCH ***", indent)
t.Logf("%s SERVER commitment: %x", indent, serverBranch.Commitment)
t.Logf("%s CLIENT commitment: %x", indent, cn.Commitment)
t.Logf("%s SERVER fullPath: %v", indent, serverBranch.FullPath)
// The key in LazyVectorCommitmentLeafNode doesn't have a "fullPrefix" directly -
// the path is determined by the key bytes
}
} else {
t.Logf("%s*** TYPE MISMATCH: server is branch, client is leaf ***", indent)
}
}
}
// Start recursive comparison at root
t.Log("=== RECURSIVE MISMATCH ANALYSIS ===")
rootPath := []int32{63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 60}
recursiveCompare(rootPath, 0)
// Now let's drill into the specific mismatched subtree to see the leaves
t.Log("=== LEAF-LEVEL ANALYSIS for [...60 37 1 50] ===")
{
// Get server leaves under this subtree
diagStream, err := client.PerformSync(context.Background())
if err == nil {
shardKeyBytes := slices.Concat(proverShardKey.L1[:], proverShardKey.L2[:])
mismatchPath := []int32{63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 60, 37, 1, 50}
err = diagStream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetLeaves{
GetLeaves: &protobufs.HypergraphSyncGetLeavesRequest{
ShardKey: shardKeyBytes,
PhaseSet: protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
Path: mismatchPath,
MaxLeaves: 100,
},
},
})
if err == nil {
resp, err := diagStream.Recv()
if err == nil {
if leaves := resp.GetLeaves(); leaves != nil {
t.Logf("SERVER leaves under [...60 37 1 50]: count=%d, total=%d",
len(leaves.Leaves), leaves.TotalLeaves)
for i, leaf := range leaves.Leaves {
t.Logf(" SERVER leaf[%d]: key=%x", i, leaf.Key)
}
}
}
}
_ = diagStream.CloseSend()
}
// Get client leaves under this subtree
clientTree = clientHG.GetVertexAddsSet(proverShardKey).GetTree()
mismatchPathInt := []int{63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 60, 37, 1, 50}
clientSubtreeNode, err := clientTree.GetByPath(mismatchPathInt)
if err != nil {
t.Logf("CLIENT error getting node at [...60 37 1 50]: %v", err)
} else if clientSubtreeNode != nil {
clientSubtreeLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientSubtreeNode,
)
t.Logf("CLIENT leaves under [...60 37 1 50]: count=%d", len(clientSubtreeLeaves))
for i, leaf := range clientSubtreeLeaves {
if leaf != nil {
t.Logf(" CLIENT leaf[%d]: key=%x", i, leaf.Key)
}
}
}
}
// Check the deeper path [...60 37 1 50 50] which server claims has leafCount=2
t.Log("=== LEAF-LEVEL ANALYSIS for [...60 37 1 50 50] ===")
{
diagStream, err := client.PerformSync(context.Background())
if err == nil {
shardKeyBytes := slices.Concat(proverShardKey.L1[:], proverShardKey.L2[:])
deepPath := []int32{63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 60, 37, 1, 50, 50}
err = diagStream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetLeaves{
GetLeaves: &protobufs.HypergraphSyncGetLeavesRequest{
ShardKey: shardKeyBytes,
PhaseSet: protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
Path: deepPath,
MaxLeaves: 100,
},
},
})
if err == nil {
resp, err := diagStream.Recv()
if err == nil {
if leaves := resp.GetLeaves(); leaves != nil {
t.Logf("SERVER leaves under [...60 37 1 50 50]: count=%d, total=%d",
len(leaves.Leaves), leaves.TotalLeaves)
for i, leaf := range leaves.Leaves {
t.Logf(" SERVER leaf[%d]: key=%x", i, leaf.Key)
}
} else if errResp := resp.GetError(); errResp != nil {
t.Logf("SERVER error for [...60 37 1 50 50]: %s", errResp.Message)
}
}
}
_ = diagStream.CloseSend()
}
}
// Also check path [...60 37 1] to see the 3 vs 3 children issue
t.Log("=== LEAF-LEVEL ANALYSIS for [...60 37 1] ===")
{
diagStream, err := client.PerformSync(context.Background())
if err == nil {
shardKeyBytes := slices.Concat(proverShardKey.L1[:], proverShardKey.L2[:])
path371 := []int32{63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 60, 37, 1}
err = diagStream.Send(&protobufs.HypergraphSyncQuery{
Request: &protobufs.HypergraphSyncQuery_GetLeaves{
GetLeaves: &protobufs.HypergraphSyncGetLeavesRequest{
ShardKey: shardKeyBytes,
PhaseSet: protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
Path: path371,
MaxLeaves: 100,
},
},
})
if err == nil {
resp, err := diagStream.Recv()
if err == nil {
if leaves := resp.GetLeaves(); leaves != nil {
t.Logf("SERVER leaves under [...60 37 1]: count=%d, total=%d",
len(leaves.Leaves), leaves.TotalLeaves)
for i, leaf := range leaves.Leaves {
t.Logf(" SERVER leaf[%d]: key=%x", i, leaf.Key)
}
}
}
}
_ = diagStream.CloseSend()
}
// Client leaves under [...60 37 1]
clientTree = clientHG.GetVertexAddsSet(proverShardKey).GetTree()
path371Int := []int{63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 60, 37, 1}
clientNode371, err := clientTree.GetByPath(path371Int)
if err != nil {
t.Logf("CLIENT error getting node at [...60 37 1]: %v", err)
} else if clientNode371 != nil {
clientLeaves371 := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientNode371,
)
t.Logf("CLIENT leaves under [...60 37 1]: count=%d", len(clientLeaves371))
for i, leaf := range clientLeaves371 {
if leaf != nil {
t.Logf(" CLIENT leaf[%d]: key=%x", i, leaf.Key)
}
}
}
}
assert.Equal(t, expectedRoot, clientProverRoot,
"client prover root should match frame's prover tree commitment after hypersync")
// Count vertices synced
clientTree = clientHG.GetVertexAddsSet(proverShardKey).GetTree()
clientLeaves2 := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
clientLeafCount2 := 0
for _, leaf := range clientLeaves2 {
if leaf != nil {
clientLeafCount2++
}
}
t.Logf("Hypersync complete: client synced %d prover vertices", clientLeafCount2)
assert.Greater(t, clientLeafCount2, 0, "should have synced at least some prover vertices")
// Verify the sync-based repair approach:
// 1. Create a second in-memory hypergraph
// 2. Sync from clientHG to the second hypergraph
// 3. Wipe the tree data from clientDB
// 4. Sync back from the second hypergraph to clientHG
// 5. Verify the root still matches
t.Log("Verifying sync-based repair approach...")
// Create second in-memory hypergraph
repairDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_repair/store"}}, 0)
defer repairDB.Close()
repairStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_repair/store"},
repairDB,
logger,
enc,
inclusionProver,
)
repairHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "repair")),
repairStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Get current root from clientHG before repair
clientRootBeforeRepair := clientHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(nil, false)
t.Logf("Client root before repair: %x", clientRootBeforeRepair)
// Publish snapshot on clientHG
clientHG.PublishSnapshot(clientRootBeforeRepair)
// Set up gRPC server backed by clientHG
const repairBufSize = 1 << 20
clientLis := bufconn.Listen(repairBufSize)
clientGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(clientGRPCServer, clientHG)
go func() { _ = clientGRPCServer.Serve(clientLis) }()
// Dial clientHG
clientDialer := func(context.Context, string) (net.Conn, error) {
return clientLis.Dial()
}
clientRepairConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(clientDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
require.NoError(t, err)
clientRepairClient := protobufs.NewHypergraphComparisonServiceClient(clientRepairConn)
// Sync from clientHG to repairHG for all phases
repairPhases := []protobufs.HypergraphPhaseSet{
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES,
}
t.Log("Syncing client -> repair hypergraph...")
for _, phase := range repairPhases {
stream, err := clientRepairClient.PerformSync(context.Background())
require.NoError(t, err)
_, err = repairHG.SyncFrom(stream, proverShardKey, phase, nil)
if err != nil {
t.Logf("Sync client->repair phase %v: %v", phase, err)
}
_ = stream.CloseSend()
}
// Verify repairHG has the data
repairRoot := repairHG.GetVertexAddsSet(proverShardKey).GetTree().Commit(nil, false)
t.Logf("Repair hypergraph root after sync: %x", repairRoot)
assert.Equal(t, clientRootBeforeRepair, repairRoot, "repair HG should match client root")
// Stop client server before wiping
clientGRPCServer.Stop()
clientRepairConn.Close()
// Wipe tree data from clientDB for the prover shard
t.Log("Wiping tree data from client DB...")
treePrefixes := []byte{
store.VERTEX_ADDS_TREE_NODE,
store.VERTEX_REMOVES_TREE_NODE,
store.HYPEREDGE_ADDS_TREE_NODE,
store.HYPEREDGE_REMOVES_TREE_NODE,
store.VERTEX_ADDS_TREE_NODE_BY_PATH,
store.VERTEX_REMOVES_TREE_NODE_BY_PATH,
store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH,
store.VERTEX_ADDS_CHANGE_RECORD,
store.VERTEX_REMOVES_CHANGE_RECORD,
store.HYPEREDGE_ADDS_CHANGE_RECORD,
store.HYPEREDGE_REMOVES_CHANGE_RECORD,
store.VERTEX_ADDS_TREE_ROOT,
store.VERTEX_REMOVES_TREE_ROOT,
store.HYPEREDGE_ADDS_TREE_ROOT,
store.HYPEREDGE_REMOVES_TREE_ROOT,
}
shardKeyBytes := make([]byte, 0, len(proverShardKey.L1)+len(proverShardKey.L2))
shardKeyBytes = append(shardKeyBytes, proverShardKey.L1[:]...)
shardKeyBytes = append(shardKeyBytes, proverShardKey.L2[:]...)
for _, prefix := range treePrefixes {
start := append([]byte{store.HYPERGRAPH_SHARD, prefix}, shardKeyBytes...)
// Increment shard key for end bound
endShardKeyBytes := make([]byte, len(shardKeyBytes))
copy(endShardKeyBytes, shardKeyBytes)
// Since all bytes of L2 are 0xff, incrementing would overflow, so use next prefix
end := []byte{store.HYPERGRAPH_SHARD, prefix + 1}
if err := clientDB.DeleteRange(start, end); err != nil {
t.Logf("DeleteRange for prefix 0x%02x: %v", prefix, err)
}
}
// Reload clientHG after wipe
t.Log("Reloading client hypergraph after wipe...")
clientStore2 := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_mainnet_client/store"},
clientDB,
logger,
enc,
inclusionProver,
)
clientHG2 := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "mainnet-client-reloaded")),
clientStore2,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Verify tree is now empty/different
clientRootAfterWipe := clientHG2.GetVertexAddsSet(proverShardKey).GetTree().Commit(nil, false)
t.Logf("Client root after wipe: %x (expected nil or different)", clientRootAfterWipe)
// Publish snapshot on repairHG for reverse sync
repairHG.PublishSnapshot(repairRoot)
// Set up gRPC server backed by repairHG
repairLis := bufconn.Listen(repairBufSize)
repairGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(repairGRPCServer, repairHG)
go func() { _ = repairGRPCServer.Serve(repairLis) }()
defer repairGRPCServer.Stop()
// Dial repairHG
repairDialer := func(context.Context, string) (net.Conn, error) {
return repairLis.Dial()
}
repairConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(repairDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
require.NoError(t, err)
defer repairConn.Close()
repairClient := protobufs.NewHypergraphComparisonServiceClient(repairConn)
// Sync from repairHG to clientHG2 for all phases
t.Log("Syncing repair -> client hypergraph...")
for _, phase := range repairPhases {
stream, err := repairClient.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG2.SyncFrom(stream, proverShardKey, phase, nil)
if err != nil {
t.Logf("Sync repair->client phase %v: %v", phase, err)
}
_ = stream.CloseSend()
}
// Commit and verify root after repair
clientRootAfterRepair := clientHG2.GetVertexAddsSet(proverShardKey).GetTree().Commit(nil, true)
t.Logf("Client root after repair: %x", clientRootAfterRepair)
t.Logf("Expected root from frame: %x", expectedRoot)
// Verify the root matches the original (before repair) - this confirms the round-trip works
assert.Equal(t, clientRootBeforeRepair, clientRootAfterRepair,
"root after sync repair should match root before repair")
// Note: The root may not match the frame's expected root if there was corruption,
// but it should at least match what we synced before the repair.
// The actual fix for the frame mismatch requires fixing the corruption at the source.
t.Logf("Sync-based repair verification complete.")
t.Logf(" Original client root: %x", clientRootBeforeRepair)
t.Logf(" Repaired client root: %x", clientRootAfterRepair)
t.Logf(" Frame expected root: %x", expectedRoot)
if bytes.Equal(clientRootAfterRepair, expectedRoot) {
t.Log("SUCCESS: Repaired root matches frame expected root!")
} else {
t.Log("Note: Repaired root differs from frame expected root - corruption exists at source")
}
}
// TestHypergraphSyncWithPagination tests that syncing a large tree with >1000 leaves
// correctly handles pagination through multiple GetLeaves requests.
func TestHypergraphSyncWithPagination(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create 1500 data trees to exceed the 1000 leaf batch size
numVertices := 1500
dataTrees := make([]*tries.VectorCommitmentTree, numVertices)
eg := errgroup.Group{}
eg.SetLimit(100)
for i := 0; i < numVertices; i++ {
eg.Go(func() error {
dataTrees[i] = buildDataTree(t, inclusionProver)
return nil
})
}
eg.Wait()
t.Log("Generated data trees")
// Create server DB and store
serverDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_server/store"}}, 0)
defer serverDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_server/store"},
serverDB,
logger,
enc,
inclusionProver,
)
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create client DB and store
clientDB := store.NewPebbleDB(logger, &config.Config{DB: &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_client/store"}}, 0)
defer clientDB.Close()
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtest_pagination_client/store"},
clientDB,
logger,
enc,
inclusionProver,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "client")),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create all vertices in a single domain
domain := randomBytes32(t)
vertices := make([]application.Vertex, numVertices)
for i := 0; i < numVertices; i++ {
vertices[i] = hgcrdt.NewVertex(
domain,
randomBytes32(t),
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
shardKey := application.GetShardKey(vertices[0])
// Add all vertices to server
t.Logf("Adding %d vertices to server", numVertices)
serverTxn, err := serverStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range vertices {
id := v.GetID()
require.NoError(t, serverStore.SaveVertexTree(serverTxn, id[:], dataTrees[i]))
require.NoError(t, serverHG.AddVertex(serverTxn, v))
}
require.NoError(t, serverTxn.Commit())
// Add initial vertex to client (to establish same shard key)
clientTxn, err := clientStore.NewTransaction(false)
require.NoError(t, err)
id := vertices[0].GetID()
require.NoError(t, clientStore.SaveVertexTree(clientTxn, id[:], dataTrees[0]))
require.NoError(t, clientHG.AddVertex(clientTxn, vertices[0]))
require.NoError(t, clientTxn.Commit())
// Commit both
_, err = serverHG.Commit(1)
require.NoError(t, err)
_, err = clientHG.Commit(1)
require.NoError(t, err)
serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
serverHG.PublishSnapshot(serverRoot)
t.Logf("Server root: %x", serverRoot)
// Verify server has 1500 vertices
serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree()
serverLeaves := tries.GetAllLeaves(
serverTree.SetType,
serverTree.PhaseType,
serverTree.ShardKey,
serverTree.Root,
)
serverLeafCount := 0
for _, leaf := range serverLeaves {
if leaf != nil {
serverLeafCount++
}
}
assert.Equal(t, numVertices, serverLeafCount, "server should have %d leaves", numVertices)
t.Logf("Server has %d leaves", serverLeafCount)
// Setup gRPC server
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxSendMsgSize(100*1024*1024), // 100 MB
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB
grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB
),
)
require.NoError(t, err)
defer conn.Close()
client := protobufs.NewHypergraphComparisonServiceClient(conn)
// Perform sync
t.Log("Starting sync with pagination...")
stream, err := client.PerformSync(context.Background())
require.NoError(t, err)
_, err = clientHG.SyncFrom(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client and verify
_, err = clientHG.Commit(2)
require.NoError(t, err)
clientRoot := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(nil, false)
t.Logf("Client root after sync: %x", clientRoot)
// Verify client now has all 1500 vertices
clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree()
clientLeaves := tries.GetAllLeaves(
clientTree.SetType,
clientTree.PhaseType,
clientTree.ShardKey,
clientTree.Root,
)
clientLeafCount := 0
for _, leaf := range clientLeaves {
if leaf != nil {
clientLeafCount++
}
}
assert.Equal(t, numVertices, clientLeafCount, "client should have %d leaves after sync", numVertices)
t.Logf("Client has %d leaves after sync", clientLeafCount)
// Verify roots match
assert.Equal(t, serverRoot, clientRoot, "client root should match server root after sync")
t.Log("Pagination test passed - client converged to server state")
}
// dumpHypergraphShardKeys dumps all database keys matching the global prover shard pattern.
// This replicates the behavior of: dbscan -prefix 09 -search 000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
// Parameters:
// - t: testing context for logging
// - db: the PebbleDB to inspect
// - label: a label to identify the database in output (e.g., "client", "server")
func dumpHypergraphShardKeys(t *testing.T, db *store.PebbleDB, label string) {
// Prefix 0x09 = HYPERGRAPH_SHARD
prefixFilter := []byte{store.HYPERGRAPH_SHARD}
// Global prover shard key: L1=[0x00,0x00,0x00], L2=[0xff * 32]
// As hex: 000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff (35 bytes)
keySearchPattern, err := hex.DecodeString("000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
if err != nil {
t.Logf("[%s] Failed to decode search pattern: %v", label, err)
return
}
// Set iteration bounds based on prefix
lowerBound := prefixFilter
upperBound := []byte{store.HYPERGRAPH_SHARD + 1}
iter, err := db.NewIter(lowerBound, upperBound)
if err != nil {
t.Logf("[%s] Failed to create iterator: %v", label, err)
return
}
defer iter.Close()
t.Logf("=== Database dump for %s (prefix=09, search=global prover shard) ===", label)
count := 0
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
value := iter.Value()
// Apply prefix filter
if !bytes.HasPrefix(key, prefixFilter) {
continue
}
// Apply key search pattern (must contain the global prover shard key bytes)
if !bytes.Contains(key, keySearchPattern) {
continue
}
count++
// Decode and display the key/value
semantic := describeHypergraphKeyForTest(key)
decoded := decodeHypergraphValueForTest(key, value)
t.Logf("[%s] key: %s", label, hex.EncodeToString(key))
t.Logf("[%s] semantic: %s", label, semantic)
t.Logf("[%s] value:\n%s\n", label, indentForTest(decoded))
}
t.Logf("=== End dump for %s: %d keys matched ===", label, count)
}
// describeHypergraphKeyForTest provides semantic description of hypergraph keys.
// Mirrors the logic from dbscan/main.go describeHypergraphKey.
func describeHypergraphKeyForTest(key []byte) string {
if len(key) < 2 {
return "hypergraph: invalid key length"
}
// Check for shard commit keys (frame-based)
if len(key) >= 10 {
switch key[9] {
case store.HYPERGRAPH_VERTEX_ADDS_SHARD_COMMIT,
store.HYPERGRAPH_VERTEX_REMOVES_SHARD_COMMIT,
store.HYPERGRAPH_HYPEREDGE_ADDS_SHARD_COMMIT,
store.HYPERGRAPH_HYPEREDGE_REMOVES_SHARD_COMMIT:
frame := binary.BigEndian.Uint64(key[1:9])
shard := key[10:]
var setPhase string
switch key[9] {
case store.HYPERGRAPH_VERTEX_ADDS_SHARD_COMMIT:
setPhase = "vertex-adds"
case store.HYPERGRAPH_VERTEX_REMOVES_SHARD_COMMIT:
setPhase = "vertex-removes"
case store.HYPERGRAPH_HYPEREDGE_ADDS_SHARD_COMMIT:
setPhase = "hyperedge-adds"
case store.HYPERGRAPH_HYPEREDGE_REMOVES_SHARD_COMMIT:
setPhase = "hyperedge-removes"
}
return fmt.Sprintf(
"hypergraph shard commit %s frame=%d shard=%s",
setPhase,
frame,
shortHexForTest(shard),
)
}
}
sub := key[1]
payload := key[2:]
switch sub {
case store.VERTEX_DATA:
return fmt.Sprintf("hypergraph vertex data id=%s", shortHexForTest(payload))
case store.VERTEX_TOMBSTONE:
return fmt.Sprintf("hypergraph vertex tombstone id=%s", shortHexForTest(payload))
case store.VERTEX_ADDS_TREE_NODE,
store.VERTEX_REMOVES_TREE_NODE,
store.HYPEREDGE_ADDS_TREE_NODE,
store.HYPEREDGE_REMOVES_TREE_NODE:
if len(payload) >= 35 {
l1 := payload[:3]
l2 := payload[3:35]
node := payload[35:]
return fmt.Sprintf(
"%s tree node shard=[%s|%s] node=%s",
describeHypergraphTreeTypeForTest(sub),
shortHexForTest(l1),
shortHexForTest(l2),
shortHexForTest(node),
)
}
return fmt.Sprintf(
"%s tree node (invalid length)",
describeHypergraphTreeTypeForTest(sub),
)
case store.VERTEX_ADDS_TREE_NODE_BY_PATH,
store.VERTEX_REMOVES_TREE_NODE_BY_PATH,
store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH:
if len(payload) >= 35 {
l1 := payload[:3]
l2 := payload[3:35]
path := parseUint64PathForTest(payload[35:])
return fmt.Sprintf(
"%s path shard=[%s|%s] path=%v",
describeHypergraphTreeTypeForTest(sub),
shortHexForTest(l1),
shortHexForTest(l2),
path,
)
}
return fmt.Sprintf(
"%s path (invalid length)",
describeHypergraphTreeTypeForTest(sub),
)
case store.VERTEX_ADDS_TREE_ROOT,
store.VERTEX_REMOVES_TREE_ROOT,
store.HYPEREDGE_ADDS_TREE_ROOT,
store.HYPEREDGE_REMOVES_TREE_ROOT:
if len(payload) >= 35 {
l1 := payload[:3]
l2 := payload[3:35]
return fmt.Sprintf(
"%s tree root shard=[%s|%s]",
describeHypergraphTreeTypeForTest(sub),
shortHexForTest(l1),
shortHexForTest(l2),
)
}
return fmt.Sprintf(
"%s tree root (invalid length)",
describeHypergraphTreeTypeForTest(sub),
)
case store.HYPERGRAPH_COVERED_PREFIX:
return "hypergraph covered prefix metadata"
case store.HYPERGRAPH_COMPLETE:
return "hypergraph completeness flag"
default:
return fmt.Sprintf(
"hypergraph unknown subtype 0x%02x raw=%s",
sub,
shortHexForTest(payload),
)
}
}
func describeHypergraphTreeTypeForTest(kind byte) string {
switch kind {
case store.VERTEX_ADDS_TREE_NODE,
store.VERTEX_ADDS_TREE_NODE_BY_PATH,
store.VERTEX_ADDS_TREE_ROOT:
return "vertex adds"
case store.VERTEX_REMOVES_TREE_NODE,
store.VERTEX_REMOVES_TREE_NODE_BY_PATH,
store.VERTEX_REMOVES_TREE_ROOT:
return "vertex removes"
case store.HYPEREDGE_ADDS_TREE_NODE,
store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
store.HYPEREDGE_ADDS_TREE_ROOT:
return "hyperedge adds"
case store.HYPEREDGE_REMOVES_TREE_NODE,
store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH,
store.HYPEREDGE_REMOVES_TREE_ROOT:
return "hyperedge removes"
default:
return "hypergraph"
}
}
// decodeHypergraphValueForTest decodes hypergraph values for display.
// Mirrors the logic from dbscan/main.go decodeHypergraphValue.
func decodeHypergraphValueForTest(key []byte, value []byte) string {
if len(value) == 0 {
return "<empty>"
}
sub := byte(0)
if len(key) > 1 {
sub = key[1]
}
switch sub {
case store.VERTEX_DATA:
return summarizeVectorCommitmentTreeForTest(key, value)
case store.VERTEX_TOMBSTONE:
return shortHexForTest(value)
case store.VERTEX_ADDS_TREE_NODE,
store.VERTEX_REMOVES_TREE_NODE,
store.HYPEREDGE_ADDS_TREE_NODE,
store.HYPEREDGE_REMOVES_TREE_NODE,
store.VERTEX_ADDS_TREE_NODE_BY_PATH,
store.VERTEX_REMOVES_TREE_NODE_BY_PATH,
store.HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
store.HYPEREDGE_REMOVES_TREE_NODE_BY_PATH,
store.VERTEX_ADDS_TREE_ROOT,
store.VERTEX_REMOVES_TREE_ROOT,
store.HYPEREDGE_ADDS_TREE_ROOT,
store.HYPEREDGE_REMOVES_TREE_ROOT:
return summarizeHypergraphTreeNodeForTest(value)
case store.HYPERGRAPH_COVERED_PREFIX:
return decodeCoveredPrefixForTest(value)
case store.HYPERGRAPH_COMPLETE:
if len(value) == 0 {
return "complete=false"
}
return fmt.Sprintf("complete=%t", value[len(value)-1] != 0)
default:
return shortHexForTest(value)
}
}
func summarizeVectorCommitmentTreeForTest(key []byte, value []byte) string {
tree, err := tries.DeserializeNonLazyTree(value)
if err != nil {
return fmt.Sprintf(
"vector_commitment_tree decode_error=%v raw=%s",
err,
shortHexForTest(value),
)
}
sum := sha256.Sum256(value)
summary := map[string]any{
"size_bytes": len(value),
"sha256": shortHexForTest(sum[:]),
}
// Check if this is a global intrinsic vertex (domain = 0xff*32)
globalIntrinsicAddress := bytes.Repeat([]byte{0xff}, 32)
if len(key) >= 66 {
domain := key[2:34]
address := key[34:66]
if bytes.Equal(domain, globalIntrinsicAddress) {
// This is a global intrinsic vertex - decode the fields
globalData := decodeGlobalIntrinsicVertexForTest(tree, address)
if globalData != nil {
for k, v := range globalData {
summary[k] = v
}
}
}
}
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
return fmt.Sprintf("vector_commitment_tree size_bytes=%d", len(value))
}
return string(jsonBytes)
}
func decodeGlobalIntrinsicVertexForTest(tree *tries.VectorCommitmentTree, address []byte) map[string]any {
result := make(map[string]any)
result["vertex_address"] = hex.EncodeToString(address)
// Check order 0 field
order0Value, err := tree.Get([]byte{0x00})
if err != nil || len(order0Value) == 0 {
result["type"] = "unknown (no order 0 field)"
return result
}
switch len(order0Value) {
case 585:
// Prover: PublicKey is 585 bytes
result["type"] = "prover:Prover"
result["public_key"] = shortHexForTest(order0Value)
decodeProverFieldsForTest(tree, result)
case 32:
// Could be Allocation (Prover reference) or Reward (DelegateAddress)
joinFrame, _ := tree.Get([]byte{0x10})
if len(joinFrame) == 8 {
result["type"] = "allocation:ProverAllocation"
result["prover_reference"] = hex.EncodeToString(order0Value)
decodeAllocationFieldsForTest(tree, result)
} else {
result["type"] = "reward:ProverReward"
result["delegate_address"] = hex.EncodeToString(order0Value)
}
default:
result["type"] = "unknown"
result["order_0_size"] = len(order0Value)
}
return result
}
func decodeProverFieldsForTest(tree *tries.VectorCommitmentTree, result map[string]any) {
if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 {
result["status"] = decodeProverStatusForTest(status[0])
result["status_raw"] = status[0]
}
if storage, err := tree.Get([]byte{0x08}); err == nil && len(storage) == 8 {
result["available_storage"] = binary.BigEndian.Uint64(storage)
}
if seniority, err := tree.Get([]byte{0x0c}); err == nil && len(seniority) == 8 {
result["seniority"] = binary.BigEndian.Uint64(seniority)
}
if kickFrame, err := tree.Get([]byte{0x10}); err == nil && len(kickFrame) == 8 {
result["kick_frame_number"] = binary.BigEndian.Uint64(kickFrame)
}
}
func decodeAllocationFieldsForTest(tree *tries.VectorCommitmentTree, result map[string]any) {
if status, err := tree.Get([]byte{0x04}); err == nil && len(status) == 1 {
result["status"] = decodeProverStatusForTest(status[0])
result["status_raw"] = status[0]
}
if confirmFilter, err := tree.Get([]byte{0x08}); err == nil && len(confirmFilter) > 0 {
result["confirmation_filter"] = hex.EncodeToString(confirmFilter)
if bytes.Equal(confirmFilter, make([]byte, len(confirmFilter))) {
result["is_global_prover"] = true
}
} else {
result["is_global_prover"] = true
}
if joinFrame, err := tree.Get([]byte{0x10}); err == nil && len(joinFrame) == 8 {
result["join_frame_number"] = binary.BigEndian.Uint64(joinFrame)
}
if leaveFrame, err := tree.Get([]byte{0x14}); err == nil && len(leaveFrame) == 8 {
result["leave_frame_number"] = binary.BigEndian.Uint64(leaveFrame)
}
if lastActive, err := tree.Get([]byte{0x34}); err == nil && len(lastActive) == 8 {
result["last_active_frame_number"] = binary.BigEndian.Uint64(lastActive)
}
}
func decodeProverStatusForTest(status byte) string {
switch status {
case 0:
return "Joining"
case 1:
return "Active"
case 2:
return "Paused"
case 3:
return "Leaving"
case 4:
return "Rejected"
case 5:
return "Kicked"
default:
return fmt.Sprintf("Unknown(%d)", status)
}
}
func summarizeHypergraphTreeNodeForTest(value []byte) string {
if len(value) == 0 {
return "hypergraph_tree_node <empty>"
}
hash := sha256.Sum256(value)
hashStr := shortHexForTest(hash[:])
reader := bytes.NewReader(value)
var nodeType byte
if err := binary.Read(reader, binary.BigEndian, &nodeType); err != nil {
return fmt.Sprintf("tree_node decode_error=%v sha256=%s", err, hashStr)
}
switch nodeType {
case tries.TypeNil:
return fmt.Sprintf("tree_nil sha256=%s", hashStr)
case tries.TypeLeaf:
leaf, err := tries.DeserializeLeafNode(nil, reader)
if err != nil {
return fmt.Sprintf("tree_leaf decode_error=%v sha256=%s", err, hashStr)
}
summary := map[string]any{
"type": "leaf",
"key": shortHexForTest(leaf.Key),
"value": shortHexForTest(leaf.Value),
"hash_target": shortHexForTest(leaf.HashTarget),
"commitment": shortHexForTest(leaf.Commitment),
"bytes_sha256": hashStr,
}
if leaf.Size != nil {
summary["size"] = leaf.Size.String()
}
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
return fmt.Sprintf(
"tree_leaf key=%s sha256=%s",
shortHexForTest(leaf.Key),
hashStr,
)
}
return string(jsonBytes)
case tries.TypeBranch:
branch, err := tries.DeserializeBranchNode(nil, reader, true)
if err != nil {
return fmt.Sprintf("tree_branch decode_error=%v sha256=%s", err, hashStr)
}
childSummary := map[string]int{
"branch": 0,
"leaf": 0,
"nil": 0,
}
for _, child := range branch.Children {
switch child.(type) {
case *tries.LazyVectorCommitmentBranchNode:
childSummary["branch"]++
case *tries.LazyVectorCommitmentLeafNode:
childSummary["leaf"]++
default:
childSummary["nil"]++
}
}
summary := map[string]any{
"type": "branch",
"prefix": branch.Prefix,
"leaf_count": branch.LeafCount,
"longest_branch": branch.LongestBranch,
"commitment": shortHexForTest(branch.Commitment),
"children": childSummary,
"bytes_sha256": hashStr,
}
if branch.Size != nil {
summary["size"] = branch.Size.String()
}
jsonBytes, err := json.MarshalIndent(summary, "", " ")
if err != nil {
return fmt.Sprintf(
"tree_branch prefix=%v leafs=%d sha256=%s",
branch.Prefix,
branch.LeafCount,
hashStr,
)
}
return string(jsonBytes)
default:
return fmt.Sprintf(
"tree_node type=0x%02x payload=%s sha256=%s",
nodeType,
shortHexForTest(value[1:]),
hashStr,
)
}
}
func decodeCoveredPrefixForTest(value []byte) string {
if len(value)%8 != 0 {
return shortHexForTest(value)
}
result := make([]int64, len(value)/8)
for i := range result {
result[i] = int64(binary.BigEndian.Uint64(value[i*8 : (i+1)*8]))
}
return fmt.Sprintf("covered_prefix=%v", result)
}
func shortHexForTest(b []byte) string {
if len(b) == 0 {
return "0x"
}
if len(b) <= 16 {
return "0x" + hex.EncodeToString(b)
}
return fmt.Sprintf(
"0x%s...%s(len=%d)",
hex.EncodeToString(b[:8]),
hex.EncodeToString(b[len(b)-8:]),
len(b),
)
}
func parseUint64PathForTest(b []byte) []uint64 {
if len(b)%8 != 0 {
return nil
}
out := make([]uint64, len(b)/8)
for i := range out {
out[i] = binary.BigEndian.Uint64(b[i*8 : (i+1)*8])
}
return out
}
func indentForTest(value string) string {
if value == "" {
return ""
}
lines := bytes.Split([]byte(value), []byte("\n"))
for i, line := range lines {
lines[i] = append([]byte(" "), line...)
}
return string(bytes.Join(lines, []byte("\n")))
}