mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
fix: disjoint sync, improper application of filter
This commit is contained in:
parent
f3f3d625ce
commit
8c8fca2ab7
@ -570,8 +570,11 @@ const (
|
||||
leafAckPerLeafBudget = 20 * time.Millisecond // Generous budget for tree building overhead
|
||||
|
||||
// Session-level timeouts
|
||||
maxSyncSessionDuration = 15 * time.Minute // Maximum total time for a sync session
|
||||
syncIdleTimeout = 5 * time.Minute // Maximum time without activity before session is killed
|
||||
maxSyncSessionDuration = 30 * time.Minute // Maximum total time for a sync session
|
||||
syncIdleTimeout = 10 * time.Minute // Maximum time without activity before session is killed
|
||||
|
||||
// Operation-level timeouts
|
||||
syncOperationTimeout = 2 * time.Minute // Timeout for individual sync operations (queries, responses)
|
||||
)
|
||||
|
||||
func leafAckTimeout(count uint64) time.Duration {
|
||||
@ -588,6 +591,17 @@ func leafAckTimeout(count uint64) time.Duration {
|
||||
return timeout
|
||||
}
|
||||
|
||||
// isTimeoutError checks if an error is a timeout-related error that should abort the sync.
|
||||
func isTimeoutError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
errMsg := err.Error()
|
||||
return strings.Contains(errMsg, "timed out") ||
|
||||
strings.Contains(errMsg, "context deadline exceeded") ||
|
||||
strings.Contains(errMsg, "context canceled")
|
||||
}
|
||||
|
||||
// sendLeafData builds a LeafData message (with the full leaf data) for the
|
||||
// node at the given path in the local tree and sends it over the stream.
|
||||
func (s *streamManager) sendLeafData(
|
||||
@ -1123,7 +1137,7 @@ func (s *streamManager) queryNext(
|
||||
s.updateActivity()
|
||||
resp = r
|
||||
return resp, nil
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-time.After(syncOperationTimeout):
|
||||
return nil, errors.Wrap(
|
||||
errors.New("timed out"),
|
||||
"handle query",
|
||||
@ -1168,7 +1182,7 @@ func (s *streamManager) handleLeafData(
|
||||
case *protobufs.HypergraphComparison_Metadata:
|
||||
expectedLeaves = msg.GetMetadata().Leaves
|
||||
}
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-time.After(syncOperationTimeout):
|
||||
return errors.Wrap(
|
||||
errors.New("timed out"),
|
||||
"handle leaf data",
|
||||
@ -1240,7 +1254,7 @@ func (s *streamManager) handleLeafData(
|
||||
"handle leaf data",
|
||||
)
|
||||
}
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-time.After(syncOperationTimeout):
|
||||
return errors.Wrap(
|
||||
errors.New("timed out"),
|
||||
"handle leaf data",
|
||||
@ -1360,7 +1374,7 @@ func (s *streamManager) handleQueryNext(
|
||||
s.updateActivity()
|
||||
branch = branchInfo
|
||||
return branch, nil
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-time.After(syncOperationTimeout):
|
||||
return nil, errors.Wrap(
|
||||
errors.New("timed out"),
|
||||
"handle query next",
|
||||
@ -1436,7 +1450,7 @@ func (s *streamManager) descendIndex(
|
||||
local = branchInfo
|
||||
remote = r
|
||||
return local, remote, nil
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-time.After(syncOperationTimeout):
|
||||
return nil, nil, errors.Wrap(
|
||||
errors.New("timed out"),
|
||||
"descend index",
|
||||
@ -1527,6 +1541,7 @@ func (s *streamManager) walk(
|
||||
traversePath := append([]int32{}, rpref...)
|
||||
for _, nibble := range traverse {
|
||||
// s.logger.Debug("attempting remote traversal step")
|
||||
found := false
|
||||
for _, child := range rtrav.Children {
|
||||
if child.Index == nibble {
|
||||
// s.logger.Debug("sending query")
|
||||
@ -1540,14 +1555,14 @@ func (s *streamManager) walk(
|
||||
s.logger.Error("query failed", zap.Error(err))
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If no child matched or queryNext returned nil, remote doesn't
|
||||
// have the path that local has
|
||||
if rtrav == nil {
|
||||
if !found || rtrav == nil {
|
||||
// s.logger.Debug("traversal could not reach path")
|
||||
if isServer {
|
||||
err := s.sendLeafData(
|
||||
@ -1583,6 +1598,7 @@ func (s *streamManager) walk(
|
||||
for _, nibble := range traverse {
|
||||
// s.logger.Debug("attempting local traversal step")
|
||||
preTraversal := append([]int32{}, traversedPath...)
|
||||
found := false
|
||||
for _, child := range ltrav.Children {
|
||||
if child.Index == nibble {
|
||||
traversedPath = append(traversedPath, nibble)
|
||||
@ -1611,6 +1627,7 @@ func (s *streamManager) walk(
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
}
|
||||
found = true
|
||||
} else {
|
||||
// Local has a child that's not on remote's traversal path
|
||||
missingPath := append(append([]int32{}, preTraversal...), child.Index)
|
||||
@ -1630,6 +1647,21 @@ func (s *streamManager) walk(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no child matched the nibble, local doesn't have the path
|
||||
// that remote expects - receive remote's data
|
||||
if !found {
|
||||
if isServer {
|
||||
err := s.sendLeafData(
|
||||
preTraversal,
|
||||
incomingLeaves,
|
||||
)
|
||||
return errors.Wrap(err, "walk")
|
||||
} else {
|
||||
err := s.handleLeafData(incomingLeaves)
|
||||
return errors.Wrap(err, "walk")
|
||||
}
|
||||
}
|
||||
}
|
||||
// s.logger.Debug("traversal completed, performing walk", pathString)
|
||||
return s.walk(
|
||||
@ -1708,6 +1740,16 @@ func (s *streamManager) walk(
|
||||
nextPath,
|
||||
)
|
||||
if err != nil {
|
||||
// If this is a timeout or context error, abort the sync entirely
|
||||
// rather than trying to continue with leaf data exchange
|
||||
if isTimeoutError(err) {
|
||||
s.logger.Warn(
|
||||
"branch descension timeout - aborting sync",
|
||||
zap.Error(err),
|
||||
zap.String("path", hex.EncodeToString(packPath(nextPath))),
|
||||
)
|
||||
return errors.Wrap(err, "walk: branch descension timeout")
|
||||
}
|
||||
s.logger.Info("incomplete branch descension", zap.Error(err))
|
||||
if isServer {
|
||||
if err := s.sendLeafData(
|
||||
|
||||
@ -27,7 +27,7 @@ func (p *AppLivenessProvider) Collect(
|
||||
}
|
||||
|
||||
mixnetMessages := []*protobufs.Message{}
|
||||
currentSet, _ := p.engine.proverRegistry.GetActiveProvers(nil)
|
||||
currentSet, _ := p.engine.proverRegistry.GetActiveProvers(p.engine.appAddress)
|
||||
if len(currentSet) >= 9 {
|
||||
// Prepare mixnet for collecting messages
|
||||
err := p.engine.mixnet.PrepareMixnet()
|
||||
|
||||
@ -2078,6 +2078,10 @@ func (e *GlobalConsensusEngine) performBlockingProverHypersync(
|
||||
}
|
||||
|
||||
e.logger.Info("blocking hypersync completed")
|
||||
if len(newRoots) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return newRoots[0]
|
||||
}
|
||||
|
||||
|
||||
@ -714,7 +714,7 @@ func (e *SyncProvider[StateT, ProposalT]) getRandomProverPeerId() (
|
||||
peer.ID,
|
||||
error,
|
||||
) {
|
||||
provers, err := e.proverRegistry.GetActiveProvers(nil)
|
||||
provers, err := e.proverRegistry.GetActiveProvers(e.filter)
|
||||
if err != nil {
|
||||
e.logger.Error(
|
||||
"could not get active provers for sync",
|
||||
|
||||
@ -1401,7 +1401,10 @@ const connectivityCacheValidity = 7 * 24 * time.Hour // 1 week
|
||||
// connectivityCachePath returns the path to the connectivity check cache file
|
||||
// for this core. The file is stored in <configDir>/connectivity-check-<coreId>
|
||||
func (b *BlossomSub) connectivityCachePath() string {
|
||||
return filepath.Join(string(b.configDir), fmt.Sprintf("connectivity-check-%d", b.coreId))
|
||||
return filepath.Join(
|
||||
string(b.configDir),
|
||||
fmt.Sprintf("connectivity-check-%d", b.coreId),
|
||||
)
|
||||
}
|
||||
|
||||
// isConnectivityCacheValid checks if there's a valid (< 1 week old) connectivity
|
||||
|
||||
@ -1606,3 +1606,621 @@ func TestHypergraphSyncWithModifiedEntries(t *testing.T) {
|
||||
|
||||
t.Logf("Sync completed successfully - %d entries with same keys but different values were updated", numVertices)
|
||||
}
|
||||
|
||||
// TestHypergraphBidirectionalSyncWithDisjointData tests that when node A has 500
|
||||
// unique vertices and node B has 500 different unique vertices, syncing in both
|
||||
// directions results in both nodes having all 1000 vertices.
|
||||
func TestHypergraphBidirectionalSyncWithDisjointData(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
enc := verenc.NewMPCitHVerifiableEncryptor(1)
|
||||
inclusionProver := bls48581.NewKZGInclusionProver(logger)
|
||||
|
||||
// Create data trees for all 1000 vertices
|
||||
numVerticesPerNode := 500
|
||||
totalVertices := numVerticesPerNode * 2
|
||||
dataTrees := make([]*tries.VectorCommitmentTree, totalVertices)
|
||||
eg := errgroup.Group{}
|
||||
eg.SetLimit(100)
|
||||
for i := 0; i < totalVertices; i++ {
|
||||
eg.Go(func() error {
|
||||
dataTrees[i] = buildDataTree(t, inclusionProver)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
eg.Wait()
|
||||
t.Log("Generated data trees")
|
||||
|
||||
// Create databases and stores for both nodes
|
||||
nodeADB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA/store"}, 0)
|
||||
defer nodeADB.Close()
|
||||
|
||||
nodeBDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB/store"}, 0)
|
||||
defer nodeBDB.Close()
|
||||
|
||||
nodeAStore := store.NewPebbleHypergraphStore(
|
||||
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeA/store"},
|
||||
nodeADB,
|
||||
logger,
|
||||
enc,
|
||||
inclusionProver,
|
||||
)
|
||||
|
||||
nodeBStore := store.NewPebbleHypergraphStore(
|
||||
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestnodeB/store"},
|
||||
nodeBDB,
|
||||
logger,
|
||||
enc,
|
||||
inclusionProver,
|
||||
)
|
||||
|
||||
nodeAHG := hgcrdt.NewHypergraph(
|
||||
logger.With(zap.String("side", "nodeA")),
|
||||
nodeAStore,
|
||||
inclusionProver,
|
||||
[]int{},
|
||||
&tests.Nopthenticator{},
|
||||
200,
|
||||
)
|
||||
|
||||
nodeBHG := hgcrdt.NewHypergraph(
|
||||
logger.With(zap.String("side", "nodeB")),
|
||||
nodeBStore,
|
||||
inclusionProver,
|
||||
[]int{},
|
||||
&tests.Nopthenticator{},
|
||||
200,
|
||||
)
|
||||
|
||||
// Create a shared domain for all vertices
|
||||
domain := randomBytes32(t)
|
||||
|
||||
// Generate vertices for node A (first 500)
|
||||
nodeAVertices := make([]application.Vertex, numVerticesPerNode)
|
||||
for i := 0; i < numVerticesPerNode; i++ {
|
||||
addr := randomBytes32(t)
|
||||
nodeAVertices[i] = hgcrdt.NewVertex(
|
||||
domain,
|
||||
addr,
|
||||
dataTrees[i].Commit(inclusionProver, false),
|
||||
dataTrees[i].GetSize(),
|
||||
)
|
||||
}
|
||||
|
||||
// Generate vertices for node B (second 500, completely different)
|
||||
nodeBVertices := make([]application.Vertex, numVerticesPerNode)
|
||||
for i := 0; i < numVerticesPerNode; i++ {
|
||||
addr := randomBytes32(t)
|
||||
nodeBVertices[i] = hgcrdt.NewVertex(
|
||||
domain,
|
||||
addr,
|
||||
dataTrees[numVerticesPerNode+i].Commit(inclusionProver, false),
|
||||
dataTrees[numVerticesPerNode+i].GetSize(),
|
||||
)
|
||||
}
|
||||
|
||||
shardKey := application.GetShardKey(nodeAVertices[0])
|
||||
|
||||
// Add vertices to node A
|
||||
t.Log("Adding 500 vertices to node A")
|
||||
nodeATxn, err := nodeAStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
for i, v := range nodeAVertices {
|
||||
id := v.GetID()
|
||||
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
|
||||
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
|
||||
}
|
||||
require.NoError(t, nodeATxn.Commit())
|
||||
|
||||
// Add vertices to node B
|
||||
t.Log("Adding 500 different vertices to node B")
|
||||
nodeBTxn, err := nodeBStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
for i, v := range nodeBVertices {
|
||||
id := v.GetID()
|
||||
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[numVerticesPerNode+i]))
|
||||
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
|
||||
}
|
||||
require.NoError(t, nodeBTxn.Commit())
|
||||
|
||||
// Commit both hypergraphs
|
||||
_, err = nodeAHG.Commit(1)
|
||||
require.NoError(t, err)
|
||||
_, err = nodeBHG.Commit(1)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeARootBefore := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
nodeBRootBefore := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
t.Logf("Node A root before sync: %x", nodeARootBefore)
|
||||
t.Logf("Node B root before sync: %x", nodeBRootBefore)
|
||||
require.NotEqual(t, nodeARootBefore, nodeBRootBefore, "roots should differ before sync")
|
||||
|
||||
// Helper to set up gRPC server for a hypergraph
|
||||
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
|
||||
const bufSize = 1 << 20
|
||||
lis := bufconn.Listen(bufSize)
|
||||
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.ChainStreamInterceptor(func(
|
||||
srv interface{},
|
||||
ss grpc.ServerStream,
|
||||
info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler,
|
||||
) error {
|
||||
_, priv, _ := ed448.GenerateKey(rand.Reader)
|
||||
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
|
||||
pub := privKey.GetPublic()
|
||||
peerID, err := peer.IDFromPublicKey(pub)
|
||||
require.NoError(t, err)
|
||||
|
||||
return handler(srv, &serverStream{
|
||||
ServerStream: ss,
|
||||
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
|
||||
})
|
||||
}),
|
||||
)
|
||||
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
|
||||
|
||||
go func() {
|
||||
_ = grpcServer.Serve(lis)
|
||||
}()
|
||||
|
||||
return lis, grpcServer
|
||||
}
|
||||
|
||||
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
|
||||
dialer := func(context.Context, string) (net.Conn, error) {
|
||||
return lis.Dial()
|
||||
}
|
||||
conn, err := grpc.DialContext(
|
||||
context.Background(),
|
||||
"bufnet",
|
||||
grpc.WithContextDialer(dialer),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
|
||||
}
|
||||
|
||||
// Step 1: Node A syncs from Node B (as server)
|
||||
// Node A should receive Node B's 500 vertices
|
||||
t.Log("Step 1: Node A syncs from Node B (B is server)")
|
||||
nodeBHG.PublishSnapshot(nodeBRootBefore)
|
||||
lisB, serverB := setupServer(nodeBHG)
|
||||
defer serverB.Stop()
|
||||
|
||||
connB, clientB := dialClient(lisB)
|
||||
streamB, err := clientB.HyperStream(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = nodeAHG.Sync(
|
||||
streamB,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, streamB.CloseSend())
|
||||
connB.Close()
|
||||
|
||||
_, err = nodeAHG.Commit(2)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeARootAfterFirstSync := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
t.Logf("Node A root after syncing from B: %x", nodeARootAfterFirstSync)
|
||||
|
||||
// Step 2: Node B syncs from Node A (as server)
|
||||
// Node B should receive Node A's 500 vertices
|
||||
t.Log("Step 2: Node B syncs from Node A (A is server)")
|
||||
nodeAHG.PublishSnapshot(nodeARootAfterFirstSync)
|
||||
lisA, serverA := setupServer(nodeAHG)
|
||||
defer serverA.Stop()
|
||||
|
||||
connA, clientA := dialClient(lisA)
|
||||
streamA, err := clientA.HyperStream(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = nodeBHG.Sync(
|
||||
streamA,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, streamA.CloseSend())
|
||||
connA.Close()
|
||||
|
||||
_, err = nodeBHG.Commit(2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify both nodes have converged
|
||||
nodeARootFinal := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
nodeBRootFinal := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
t.Logf("Node A final root: %x", nodeARootFinal)
|
||||
t.Logf("Node B final root: %x", nodeBRootFinal)
|
||||
|
||||
assert.Equal(t, nodeARootFinal, nodeBRootFinal, "both nodes should have identical roots after bidirectional sync")
|
||||
|
||||
// Verify the tree contains all 1000 vertices
|
||||
nodeATree := nodeAHG.GetVertexAddsSet(shardKey).GetTree()
|
||||
nodeBTree := nodeBHG.GetVertexAddsSet(shardKey).GetTree()
|
||||
|
||||
nodeALeaves := tries.GetAllLeaves(
|
||||
nodeATree.SetType,
|
||||
nodeATree.PhaseType,
|
||||
nodeATree.ShardKey,
|
||||
nodeATree.Root,
|
||||
)
|
||||
nodeBLeaves := tries.GetAllLeaves(
|
||||
nodeBTree.SetType,
|
||||
nodeBTree.PhaseType,
|
||||
nodeBTree.ShardKey,
|
||||
nodeBTree.Root,
|
||||
)
|
||||
|
||||
nodeALeafCount := 0
|
||||
for _, leaf := range nodeALeaves {
|
||||
if leaf != nil {
|
||||
nodeALeafCount++
|
||||
}
|
||||
}
|
||||
nodeBLeafCount := 0
|
||||
for _, leaf := range nodeBLeaves {
|
||||
if leaf != nil {
|
||||
nodeBLeafCount++
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Node A has %d leaves, Node B has %d leaves", nodeALeafCount, nodeBLeafCount)
|
||||
assert.Equal(t, totalVertices, nodeALeafCount, "Node A should have all 1000 vertices")
|
||||
assert.Equal(t, totalVertices, nodeBLeafCount, "Node B should have all 1000 vertices")
|
||||
|
||||
// Verify no differences between the trees
|
||||
diffLeaves := tries.CompareLeaves(nodeATree, nodeBTree)
|
||||
assert.Empty(t, diffLeaves, "there should be no differences between the trees")
|
||||
|
||||
t.Log("Bidirectional sync test passed - both nodes have all 1000 vertices")
|
||||
}
|
||||
|
||||
// TestHypergraphSyncWithPrefixLengthMismatch tests sync behavior when one node
|
||||
// has a deeper tree structure (longer prefix path) than the other. This tests
|
||||
// the prefix length mismatch handling in the walk function.
|
||||
//
|
||||
// We create two nodes with different tree structures that will cause prefix
|
||||
// length mismatches during sync. Node A has deeper prefixes at certain branches
|
||||
// while Node B has shallower but wider structures.
|
||||
func TestHypergraphSyncWithPrefixLengthMismatch(t *testing.T) {
|
||||
logger, _ := zap.NewDevelopment()
|
||||
enc := verenc.NewMPCitHVerifiableEncryptor(1)
|
||||
inclusionProver := bls48581.NewKZGInclusionProver(logger)
|
||||
|
||||
// Create data trees
|
||||
numTrees := 20
|
||||
dataTrees := make([]*tries.VectorCommitmentTree, numTrees)
|
||||
for i := 0; i < numTrees; i++ {
|
||||
dataTrees[i] = buildDataTree(t, inclusionProver)
|
||||
}
|
||||
|
||||
// Fixed domain (appAddress) - all vertices must share this to be in the same shard
|
||||
fixedDomain := [32]byte{}
|
||||
|
||||
// Helper to create a vertex with a specific dataAddress path suffix.
|
||||
// The vertex ID is [appAddress (32 bytes) || dataAddress (32 bytes)].
|
||||
// The path is derived from the full 64-byte ID.
|
||||
// With BranchBits=6, nibbles 0-41 come from appAddress, nibbles 42+ from dataAddress.
|
||||
// Since all vertices share the same appAddress, their paths share the first 42 nibbles.
|
||||
// Path differences come from dataAddress (nibbles 42+).
|
||||
//
|
||||
// We control the "suffix path" starting at nibble 42 by setting bits in dataAddress.
|
||||
createVertexWithDataPath := func(suffixPath []int, uniqueSuffix uint64, treeIdx int) application.Vertex {
|
||||
dataAddr := [32]byte{}
|
||||
|
||||
// Pack the suffix path nibbles into bits of dataAddress
|
||||
// Nibble 42 starts at bit 0 of dataAddress
|
||||
bitPos := 0
|
||||
for _, nibble := range suffixPath {
|
||||
byteIdx := bitPos / 8
|
||||
bitOffset := bitPos % 8
|
||||
|
||||
if bitOffset+6 <= 8 {
|
||||
// Nibble fits in one byte
|
||||
dataAddr[byteIdx] |= byte(nibble << (8 - bitOffset - 6))
|
||||
} else {
|
||||
// Nibble spans two bytes
|
||||
bitsInFirstByte := 8 - bitOffset
|
||||
dataAddr[byteIdx] |= byte(nibble >> (6 - bitsInFirstByte))
|
||||
if byteIdx+1 < 32 {
|
||||
dataAddr[byteIdx+1] |= byte(nibble << (8 - (6 - bitsInFirstByte)))
|
||||
}
|
||||
}
|
||||
bitPos += 6
|
||||
}
|
||||
|
||||
// Add unique suffix in the last 8 bytes to make each vertex distinct
|
||||
binary.BigEndian.PutUint64(dataAddr[24:], uniqueSuffix)
|
||||
|
||||
return hgcrdt.NewVertex(
|
||||
fixedDomain,
|
||||
dataAddr,
|
||||
dataTrees[treeIdx].Commit(inclusionProver, false),
|
||||
dataTrees[treeIdx].GetSize(),
|
||||
)
|
||||
}
|
||||
|
||||
// Run the test in both directions
|
||||
runSyncTest := func(direction string) {
|
||||
t.Run(direction, func(t *testing.T) {
|
||||
// Create fresh databases for this sub-test
|
||||
nodeADB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeA_%s/store", direction)}, 0)
|
||||
defer nodeADB.Close()
|
||||
|
||||
nodeBDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeB_%s/store", direction)}, 0)
|
||||
defer nodeBDB.Close()
|
||||
|
||||
nodeAStore := store.NewPebbleHypergraphStore(
|
||||
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeA_%s/store", direction)},
|
||||
nodeADB,
|
||||
logger,
|
||||
enc,
|
||||
inclusionProver,
|
||||
)
|
||||
|
||||
nodeBStore := store.NewPebbleHypergraphStore(
|
||||
&config.DBConfig{InMemoryDONOTUSE: true, Path: fmt.Sprintf(".configtestnodeB_%s/store", direction)},
|
||||
nodeBDB,
|
||||
logger,
|
||||
enc,
|
||||
inclusionProver,
|
||||
)
|
||||
|
||||
nodeAHG := hgcrdt.NewHypergraph(
|
||||
logger.With(zap.String("side", "nodeA-"+direction)),
|
||||
nodeAStore,
|
||||
inclusionProver,
|
||||
[]int{},
|
||||
&tests.Nopthenticator{},
|
||||
200,
|
||||
)
|
||||
|
||||
nodeBHG := hgcrdt.NewHypergraph(
|
||||
logger.With(zap.String("side", "nodeB-"+direction)),
|
||||
nodeBStore,
|
||||
inclusionProver,
|
||||
[]int{},
|
||||
&tests.Nopthenticator{},
|
||||
200,
|
||||
)
|
||||
|
||||
// Create vertices with specific path structures to cause prefix mismatches.
|
||||
// All vertices share the same appAddress (fixedDomain), so they're in the same shard.
|
||||
// Their paths share the first 42 nibbles (all zeros from fixedDomain).
|
||||
// Path differences come from dataAddress, starting at nibble 42.
|
||||
//
|
||||
// We create vertices with suffix paths (nibbles 42+) that differ:
|
||||
// Node A: suffix paths 0,1,x and 0,2,x and 1,x
|
||||
// Node B: suffix paths 0,0,x and 0,1,x and 0,3,x and 1,x
|
||||
//
|
||||
// This creates prefix mismatch scenarios in the dataAddress portion of the tree.
|
||||
|
||||
t.Log("Creating Node A structure")
|
||||
nodeAVertices := []application.Vertex{
|
||||
createVertexWithDataPath([]int{0, 1}, 100, 0), // suffix path 0,1,...
|
||||
createVertexWithDataPath([]int{0, 2}, 101, 1), // suffix path 0,2,...
|
||||
createVertexWithDataPath([]int{1}, 102, 2), // suffix path 1,...
|
||||
}
|
||||
t.Logf("Created Node A vertices with suffix paths: 0,1; 0,2; 1")
|
||||
|
||||
t.Log("Creating Node B structure")
|
||||
nodeBVertices := []application.Vertex{
|
||||
createVertexWithDataPath([]int{0, 0}, 200, 3), // suffix path 0,0,...
|
||||
createVertexWithDataPath([]int{0, 1}, 201, 4), // suffix path 0,1,...
|
||||
createVertexWithDataPath([]int{0, 3}, 202, 5), // suffix path 0,3,...
|
||||
createVertexWithDataPath([]int{1}, 203, 6), // suffix path 1,...
|
||||
}
|
||||
t.Logf("Created Node B vertices with suffix paths: 0,0; 0,1; 0,3; 1")
|
||||
|
||||
// Verify the paths - show nibbles 40-50 where the difference should be
|
||||
t.Log("Node A vertices paths (showing nibbles 40-50 where dataAddress starts):")
|
||||
for i, v := range nodeAVertices {
|
||||
id := v.GetID()
|
||||
path := GetFullPath(id[:])
|
||||
// Nibble 42 is where dataAddress bits start (256/6 = 42.67)
|
||||
start := 40
|
||||
end := min(50, len(path))
|
||||
if end > start {
|
||||
t.Logf(" Vertex %d path[%d:%d]: %v", i, start, end, path[start:end])
|
||||
}
|
||||
}
|
||||
t.Log("Node B vertices paths (showing nibbles 40-50 where dataAddress starts):")
|
||||
for i, v := range nodeBVertices {
|
||||
id := v.GetID()
|
||||
path := GetFullPath(id[:])
|
||||
start := 40
|
||||
end := min(50, len(path))
|
||||
if end > start {
|
||||
t.Logf(" Vertex %d path[%d:%d]: %v", i, start, end, path[start:end])
|
||||
}
|
||||
}
|
||||
|
||||
shardKey := application.GetShardKey(nodeAVertices[0])
|
||||
|
||||
// Add vertices to Node A
|
||||
nodeATxn, err := nodeAStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
for i, v := range nodeAVertices {
|
||||
id := v.GetID()
|
||||
require.NoError(t, nodeAStore.SaveVertexTree(nodeATxn, id[:], dataTrees[i]))
|
||||
require.NoError(t, nodeAHG.AddVertex(nodeATxn, v))
|
||||
}
|
||||
require.NoError(t, nodeATxn.Commit())
|
||||
|
||||
// Add vertices to Node B
|
||||
nodeBTxn, err := nodeBStore.NewTransaction(false)
|
||||
require.NoError(t, err)
|
||||
for i, v := range nodeBVertices {
|
||||
id := v.GetID()
|
||||
require.NoError(t, nodeBStore.SaveVertexTree(nodeBTxn, id[:], dataTrees[3+i]))
|
||||
require.NoError(t, nodeBHG.AddVertex(nodeBTxn, v))
|
||||
}
|
||||
require.NoError(t, nodeBTxn.Commit())
|
||||
|
||||
// Commit both
|
||||
_, err = nodeAHG.Commit(1)
|
||||
require.NoError(t, err)
|
||||
_, err = nodeBHG.Commit(1)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodeARoot := nodeAHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
nodeBRoot := nodeBHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
t.Logf("Node A root: %x", nodeARoot)
|
||||
t.Logf("Node B root: %x", nodeBRoot)
|
||||
|
||||
// Setup gRPC server
|
||||
const bufSize = 1 << 20
|
||||
setupServer := func(hg *hgcrdt.HypergraphCRDT) (*bufconn.Listener, *grpc.Server) {
|
||||
lis := bufconn.Listen(bufSize)
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.ChainStreamInterceptor(func(
|
||||
srv interface{},
|
||||
ss grpc.ServerStream,
|
||||
info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler,
|
||||
) error {
|
||||
_, priv, _ := ed448.GenerateKey(rand.Reader)
|
||||
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
pub := privKey.GetPublic()
|
||||
peerID, err := peer.IDFromPublicKey(pub)
|
||||
require.NoError(t, err)
|
||||
return handler(srv, &serverStream{
|
||||
ServerStream: ss,
|
||||
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
|
||||
})
|
||||
}),
|
||||
)
|
||||
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, hg)
|
||||
go func() { _ = grpcServer.Serve(lis) }()
|
||||
return lis, grpcServer
|
||||
}
|
||||
|
||||
dialClient := func(lis *bufconn.Listener) (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
|
||||
dialer := func(context.Context, string) (net.Conn, error) { return lis.Dial() }
|
||||
conn, err := grpc.DialContext(
|
||||
context.Background(),
|
||||
"bufnet",
|
||||
grpc.WithContextDialer(dialer),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
|
||||
}
|
||||
|
||||
var serverHG, clientHG *hgcrdt.HypergraphCRDT
|
||||
var serverRoot []byte
|
||||
|
||||
if direction == "A_syncs_from_B" {
|
||||
serverHG = nodeBHG
|
||||
clientHG = nodeAHG
|
||||
serverRoot = nodeBRoot
|
||||
} else {
|
||||
serverHG = nodeAHG
|
||||
clientHG = nodeBHG
|
||||
serverRoot = nodeARoot
|
||||
}
|
||||
|
||||
serverHG.PublishSnapshot(serverRoot)
|
||||
lis, grpcServer := setupServer(serverHG)
|
||||
defer grpcServer.Stop()
|
||||
|
||||
// Count client leaves before sync
|
||||
clientTreeBefore := clientHG.GetVertexAddsSet(shardKey).GetTree()
|
||||
clientLeavesBefore := tries.GetAllLeaves(
|
||||
clientTreeBefore.SetType,
|
||||
clientTreeBefore.PhaseType,
|
||||
clientTreeBefore.ShardKey,
|
||||
clientTreeBefore.Root,
|
||||
)
|
||||
clientLeafCountBefore := 0
|
||||
for _, leaf := range clientLeavesBefore {
|
||||
if leaf != nil {
|
||||
clientLeafCountBefore++
|
||||
}
|
||||
}
|
||||
t.Logf("Client has %d leaves before sync", clientLeafCountBefore)
|
||||
|
||||
conn, client := dialClient(lis)
|
||||
stream, err := client.HyperStream(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = clientHG.Sync(
|
||||
stream,
|
||||
shardKey,
|
||||
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, stream.CloseSend())
|
||||
conn.Close()
|
||||
|
||||
_, err = clientHG.Commit(2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// In CRDT sync, the client receives data from the server and MERGES it.
|
||||
// The client should now have BOTH its original vertices AND the server's vertices.
|
||||
// So the client root should differ from both original roots (it's a superset).
|
||||
clientRoot := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
|
||||
t.Logf("Client root after sync: %x", clientRoot)
|
||||
|
||||
// Get all leaves from the client tree after sync
|
||||
clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree()
|
||||
clientLeaves := tries.GetAllLeaves(
|
||||
clientTree.SetType,
|
||||
clientTree.PhaseType,
|
||||
clientTree.ShardKey,
|
||||
clientTree.Root,
|
||||
)
|
||||
|
||||
clientLeafCount := 0
|
||||
for _, leaf := range clientLeaves {
|
||||
if leaf != nil {
|
||||
clientLeafCount++
|
||||
}
|
||||
}
|
||||
|
||||
// After sync, client should have received server's vertices (merged with its own)
|
||||
// The client should have at least as many leaves as it started with
|
||||
assert.GreaterOrEqual(t, clientLeafCount, clientLeafCountBefore,
|
||||
"client should not lose leaves during sync")
|
||||
|
||||
// Client should have gained some leaves from the server (unless they already had them all)
|
||||
t.Logf("Sync %s completed - client went from %d to %d leaves",
|
||||
direction, clientLeafCountBefore, clientLeafCount)
|
||||
|
||||
// Verify the sync actually transferred data by checking that server's vertices are now in client
|
||||
serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree()
|
||||
serverLeaves := tries.GetAllLeaves(
|
||||
serverTree.SetType,
|
||||
serverTree.PhaseType,
|
||||
serverTree.ShardKey,
|
||||
serverTree.Root,
|
||||
)
|
||||
serverLeafCount := 0
|
||||
for _, leaf := range serverLeaves {
|
||||
if leaf != nil {
|
||||
serverLeafCount++
|
||||
}
|
||||
}
|
||||
t.Logf("Server has %d leaves", serverLeafCount)
|
||||
|
||||
// The client should have at least as many leaves as the server
|
||||
// (since it's merging server data into its own)
|
||||
assert.GreaterOrEqual(t, clientLeafCount, serverLeafCount,
|
||||
"client should have at least as many leaves as server after sync")
|
||||
})
|
||||
}
|
||||
|
||||
// Test both directions
|
||||
runSyncTest("A_syncs_from_B")
|
||||
runSyncTest("B_syncs_from_A")
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user