further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events

This commit is contained in:
Cassandra Heart 2026-01-02 04:39:33 -06:00
parent 62ad785d5f
commit 9f6174e39b
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
14 changed files with 324 additions and 30 deletions

View File

@ -70,6 +70,14 @@ func (m *mockAppIntegrationPubSub) Close() error {
panic("unimplemented")
}
// SetShutdownContext implements p2p.PubSub.
func (m *mockAppIntegrationPubSub) SetShutdownContext(ctx context.Context) {
// Forward to underlying blossomsub if available
if m.underlyingBlossomSub != nil {
m.underlyingBlossomSub.SetShutdownContext(ctx)
}
}
// GetOwnMultiaddrs implements p2p.PubSub.
func (m *mockAppIntegrationPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr {
panic("unimplemented")

View File

@ -61,6 +61,9 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
// Update state summaries metric
stateSummariesAggregated.Set(float64(len(shardCoverageMap)))
// Collect all merge-eligible shard groups to emit as a single bulk event
var allMergeGroups []typesconsensus.ShardMergeEventData
for shardAddress, coverage := range shardCoverageMap {
addressLen := len(shardAddress)
@ -187,7 +190,9 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
// Check for low coverage
if proverCount < minProvers {
e.handleLowCoverage([]byte(shardAddress), coverage, minProvers)
if mergeData := e.handleLowCoverage([]byte(shardAddress), coverage, minProvers); mergeData != nil {
allMergeGroups = append(allMergeGroups, *mergeData)
}
}
// Check for high coverage (potential split)
@ -196,6 +201,11 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error {
}
}
// Emit a single bulk merge event if there are any merge-eligible shards
if len(allMergeGroups) > 0 {
e.emitBulkMergeEvent(allMergeGroups)
}
return nil
}
@ -206,12 +216,13 @@ type ShardCoverage struct {
TreeMetadata []typesconsensus.TreeMetadata
}
// handleLowCoverage handles shards with insufficient provers
// handleLowCoverage handles shards with insufficient provers.
// Returns merge event data if merge is possible, nil otherwise.
func (e *GlobalConsensusEngine) handleLowCoverage(
shardAddress []byte,
coverage *ShardCoverage,
minProvers uint64,
) {
) *typesconsensus.ShardMergeEventData {
addressLen := len(shardAddress)
// Case 2.a: Full application address (32 bytes)
@ -235,7 +246,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage(
Message: "Application shard has low prover coverage",
},
)
return
return nil
}
// Case 2.b: Longer than application address (> 32 bytes)
@ -260,24 +271,13 @@ func (e *GlobalConsensusEngine) handleLowCoverage(
requiredStorage := e.calculateRequiredStorage(allShards)
if totalStorage >= requiredStorage {
// Case 2.b.i: Merge is possible
e.logger.Info(
"shards eligible for merge",
zap.String("shard_address", hex.EncodeToString(shardAddress)),
zap.Int("sibling_count", len(siblingShards)),
zap.Uint64("total_storage", totalStorage),
zap.Uint64("required_storage", requiredStorage),
)
// Emit merge eligible event
e.emitMergeEvent(
&typesconsensus.ShardMergeEventData{
ShardAddresses: allShards,
TotalProvers: totalProvers,
AttestedStorage: totalStorage,
RequiredStorage: requiredStorage,
},
)
// Case 2.b.i: Merge is possible - return the data for bulk emission
return &typesconsensus.ShardMergeEventData{
ShardAddresses: allShards,
TotalProvers: totalProvers,
AttestedStorage: totalStorage,
RequiredStorage: requiredStorage,
}
} else {
// Case 2.b.ii: Insufficient storage for merge
e.logger.Warn(
@ -315,6 +315,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage(
},
)
}
return nil
}
// handleHighCoverage handles shards with too many provers

View File

@ -308,9 +308,18 @@ func (e *GlobalConsensusEngine) emitCoverageEvent(
)
}
func (e *GlobalConsensusEngine) emitMergeEvent(
data *typesconsensus.ShardMergeEventData,
func (e *GlobalConsensusEngine) emitBulkMergeEvent(
mergeGroups []typesconsensus.ShardMergeEventData,
) {
if len(mergeGroups) == 0 {
return
}
// Combine all merge groups into a single bulk event
data := &typesconsensus.BulkShardMergeEventData{
MergeGroups: mergeGroups,
}
event := typesconsensus.ControlEvent{
Type: typesconsensus.ControlEventShardMergeEligible,
Data: data,
@ -318,12 +327,18 @@ func (e *GlobalConsensusEngine) emitMergeEvent(
go e.eventDistributor.Publish(event)
totalShards := 0
totalProvers := 0
for _, group := range mergeGroups {
totalShards += len(group.ShardAddresses)
totalProvers += group.TotalProvers
}
e.logger.Info(
"emitted merge eligible event",
zap.Int("shard_count", len(data.ShardAddresses)),
zap.Int("total_provers", data.TotalProvers),
zap.Uint64("attested_storage", data.AttestedStorage),
zap.Uint64("required_storage", data.RequiredStorage),
"emitted bulk merge eligible event",
zap.Int("merge_groups", len(mergeGroups)),
zap.Int("total_shards", totalShards),
zap.Int("total_provers", totalProvers),
)
}

View File

@ -1028,6 +1028,11 @@ func NewGlobalConsensusEngine(
)
}
// Wire up pubsub shutdown to the component's shutdown signal
engine.pubsub.SetShutdownContext(
contextFromShutdownSignal(engine.ShutdownSignal()),
)
// Set self peer ID on hypergraph to allow unlimited self-sync sessions
if hgWithSelfPeer, ok := engine.hyperSync.(interface {
SetSelfPeerID(string)

View File

@ -83,6 +83,14 @@ func (m *mockIntegrationPubSub) Close() error {
panic("unimplemented")
}
// SetShutdownContext implements p2p.PubSub.
func (m *mockIntegrationPubSub) SetShutdownContext(ctx context.Context) {
// Forward to underlying blossomsub if available
if m.underlyingBlossomSub != nil {
m.underlyingBlossomSub.SetShutdownContext(ctx)
}
}
// GetOwnMultiaddrs implements p2p.PubSub.
func (m *mockIntegrationPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr {
ma, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/8336")

View File

@ -831,6 +831,8 @@ func (m *mockPubSub) Close() error {
return nil
}
func (m *mockPubSub) SetShutdownContext(ctx context.Context) {}
type mockTransaction struct{}
// Abort implements store.Transaction.

View File

@ -1957,6 +1957,21 @@ func (b *BlossomSub) Close() error {
return nil
}
// SetShutdownContext implements p2p.PubSub. When the provided context is
// cancelled, the internal BlossomSub context will also be cancelled, allowing
// subscription loops to exit gracefully.
func (b *BlossomSub) SetShutdownContext(ctx context.Context) {
go func() {
select {
case <-ctx.Done():
b.logger.Debug("shutdown context cancelled, closing pubsub")
b.Close()
case <-b.ctx.Done():
// Already closed
}
}()
}
// MultiaddrToIPNet converts a multiaddr containing /ip4 or /ip6
// into a *net.IPNet with a host mask (/32 or /128).
func MultiaddrToIPNet(m ma.Multiaddr) (*net.IPNet, error) {

View File

@ -1392,3 +1392,217 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) {
_ = stream.CloseSend()
})
}
// TestHypergraphSyncWithModifiedEntries tests sync behavior when both client
// and server have the same keys but with different values (modified entries).
// This verifies that sync correctly updates entries rather than just adding
// new ones or deleting orphans.
func TestHypergraphSyncWithModifiedEntries(t *testing.T) {
logger, _ := zap.NewDevelopment()
enc := verenc.NewMPCitHVerifiableEncryptor(1)
inclusionProver := bls48581.NewKZGInclusionProver(logger)
// Create enough data trees for all vertices we'll need
numVertices := 50
dataTrees := make([]*tries.VectorCommitmentTree, numVertices*2) // Extra for modified versions
for i := 0; i < len(dataTrees); i++ {
dataTrees[i] = buildDataTree(t, inclusionProver)
}
// Create server and client databases
serverDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}, 0)
defer serverDB.Close()
clientDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}, 0)
defer clientDB.Close()
serverStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"},
serverDB,
logger,
enc,
inclusionProver,
)
clientStore := store.NewPebbleHypergraphStore(
&config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"},
clientDB,
logger,
enc,
inclusionProver,
)
serverHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "server")),
serverStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
clientHG := hgcrdt.NewHypergraph(
logger.With(zap.String("side", "client")),
clientStore,
inclusionProver,
[]int{},
&tests.Nopthenticator{},
200,
)
// Create a shared domain for all vertices
domain := randomBytes32(t)
// Generate fixed addresses that will be used by both client and server
// This ensures they share the same keys
addresses := make([][32]byte, numVertices)
for i := 0; i < numVertices; i++ {
addresses[i] = randomBytes32(t)
}
// Create "original" vertices for the client (using first set of data trees)
clientVertices := make([]application.Vertex, numVertices)
for i := 0; i < numVertices; i++ {
clientVertices[i] = hgcrdt.NewVertex(
domain,
addresses[i], // Same address
dataTrees[i].Commit(inclusionProver, false),
dataTrees[i].GetSize(),
)
}
// Create "modified" vertices for the server (using second set of data trees)
// These have the SAME addresses but DIFFERENT data commitments
serverVertices := make([]application.Vertex, numVertices)
for i := 0; i < numVertices; i++ {
serverVertices[i] = hgcrdt.NewVertex(
domain,
addresses[i], // Same address as client
dataTrees[numVertices+i].Commit(inclusionProver, false), // Different data
dataTrees[numVertices+i].GetSize(),
)
}
shardKey := application.GetShardKey(clientVertices[0])
// Add original vertices to client
t.Log("Adding original vertices to client")
clientTxn, err := clientStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range clientVertices {
id := v.GetID()
require.NoError(t, clientStore.SaveVertexTree(clientTxn, id[:], dataTrees[i]))
require.NoError(t, clientHG.AddVertex(clientTxn, v))
}
require.NoError(t, clientTxn.Commit())
// Add modified vertices to server
t.Log("Adding modified vertices to server")
serverTxn, err := serverStore.NewTransaction(false)
require.NoError(t, err)
for i, v := range serverVertices {
id := v.GetID()
require.NoError(t, serverStore.SaveVertexTree(serverTxn, id[:], dataTrees[numVertices+i]))
require.NoError(t, serverHG.AddVertex(serverTxn, v))
}
require.NoError(t, serverTxn.Commit())
// Commit both hypergraphs
_, err = clientHG.Commit(1)
require.NoError(t, err)
_, err = serverHG.Commit(1)
require.NoError(t, err)
// Verify roots are different before sync (modified entries should cause different roots)
clientRootBefore := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
require.NotEqual(t, clientRootBefore, serverRoot, "roots should differ before sync due to modified entries")
t.Logf("Client root before sync: %x", clientRootBefore)
t.Logf("Server root: %x", serverRoot)
// Publish server snapshot
serverHG.PublishSnapshot(serverRoot)
// Start gRPC server
const bufSize = 1 << 20
lis := bufconn.Listen(bufSize)
grpcServer := grpc.NewServer(
grpc.ChainStreamInterceptor(func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
_, priv, _ := ed448.GenerateKey(rand.Reader)
privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv)
require.NoError(t, err)
pub := privKey.GetPublic()
peerID, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)
return handler(srv, &serverStream{
ServerStream: ss,
ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID),
})
}),
)
protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG)
defer grpcServer.Stop()
go func() {
_ = grpcServer.Serve(lis)
}()
dialClient := func() (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) {
dialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
return conn, protobufs.NewHypergraphComparisonServiceClient(conn)
}
// Perform sync
t.Log("Performing sync to update modified entries")
conn, client := dialClient()
defer conn.Close()
stream, err := client.HyperStream(context.Background())
require.NoError(t, err)
err = clientHG.Sync(
stream,
shardKey,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
nil,
)
require.NoError(t, err)
require.NoError(t, stream.CloseSend())
// Commit client after sync
_, err = clientHG.Commit(2)
require.NoError(t, err)
// Verify client now matches server
clientRootAfter := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false)
t.Logf("Client root after sync: %x", clientRootAfter)
assert.Equal(t, serverRoot, clientRootAfter, "client should converge to server state after sync with modified entries")
// Verify all entries were updated by comparing the leaves
serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree()
clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree()
diffLeaves := tries.CompareLeaves(serverTree, clientTree)
assert.Empty(t, diffLeaves, "there should be no difference in leaves after sync")
t.Logf("Sync completed successfully - %d entries with same keys but different values were updated", numVertices)
}

View File

@ -168,6 +168,12 @@ func (p *ProxyBlossomSub) Close() error {
return nil
}
// SetShutdownContext implements p2p.PubSub.
func (p *ProxyBlossomSub) SetShutdownContext(ctx context.Context) {
// Forward to underlying client
p.client.SetShutdownContext(ctx)
}
// PublishToBitmask publishes data to a specific bitmask
func (p *ProxyBlossomSub) PublishToBitmask(bitmask []byte, data []byte) error {
return p.client.PublishToBitmask(bitmask, data)

View File

@ -178,6 +178,7 @@ func (m *mockPubSub) GetNetwork() uint { return 0 }
func (m *mockPubSub) IsPeerConnected(peerId []byte) bool { return true }
func (m *mockPubSub) Reachability() *wrapperspb.BoolValue { return wrapperspb.Bool(true) }
func (m *mockPubSub) Close() error { return nil }
func (m *mockPubSub) SetShutdownContext(ctx context.Context) {}
// Test helper functions
func createTestConfigs() (*config.P2PConfig, *config.EngineConfig, error) {

View File

@ -564,6 +564,11 @@ func (c *PubSubProxyClient) Close() error {
return nil
}
// SetShutdownContext implements p2p.PubSub.
func (c *PubSubProxyClient) SetShutdownContext(ctx context.Context) {
// No-op for proxy client - shutdown is handled by the proxied pubsub
}
// NewPubSubProxyClient creates a new proxy client
func NewPubSubProxyClient(
ctx context.Context,

View File

@ -103,7 +103,7 @@ type TreeMetadata struct {
TotalLeaves uint64
}
// ShardMergeEventData contains data for shard merge eligibility
// ShardMergeEventData contains data for a single shard merge group
type ShardMergeEventData struct {
ShardAddresses [][]byte
TotalProvers int
@ -113,6 +113,13 @@ type ShardMergeEventData struct {
func (s *ShardMergeEventData) ControlEventData() {}
// BulkShardMergeEventData contains all merge-eligible shard groups in a single event
type BulkShardMergeEventData struct {
MergeGroups []ShardMergeEventData
}
func (b *BulkShardMergeEventData) ControlEventData() {}
// ShardSplitEventData contains data for shard split eligibility
type ShardSplitEventData struct {
ShardAddress []byte

View File

@ -23,6 +23,9 @@ func (m *MockPubSub) Close() error {
return nil
}
// SetShutdownContext implements p2p.PubSub.
func (m *MockPubSub) SetShutdownContext(ctx context.Context) {}
// GetOwnMultiaddrs implements p2p.PubSub.
func (m *MockPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr {
args := m.Called()

View File

@ -20,6 +20,10 @@ const (
)
type PubSub interface {
// SetShutdownContext allows the caller to provide a context that, when
// cancelled, will trigger graceful shutdown of the pubsub subscription
// loops. This should be called before subscribing to any bitmasks.
SetShutdownContext(ctx context.Context)
Close() error
PublishToBitmask(bitmask []byte, data []byte) error
Publish(address []byte, data []byte) error