diff --git a/node/consensus/app/integration_helper_test.go b/node/consensus/app/integration_helper_test.go index 5a70ca5..2d82319 100644 --- a/node/consensus/app/integration_helper_test.go +++ b/node/consensus/app/integration_helper_test.go @@ -70,6 +70,14 @@ func (m *mockAppIntegrationPubSub) Close() error { panic("unimplemented") } +// SetShutdownContext implements p2p.PubSub. +func (m *mockAppIntegrationPubSub) SetShutdownContext(ctx context.Context) { + // Forward to underlying blossomsub if available + if m.underlyingBlossomSub != nil { + m.underlyingBlossomSub.SetShutdownContext(ctx) + } +} + // GetOwnMultiaddrs implements p2p.PubSub. func (m *mockAppIntegrationPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr { panic("unimplemented") diff --git a/node/consensus/global/coverage_events.go b/node/consensus/global/coverage_events.go index 3d565df..5adb0c9 100644 --- a/node/consensus/global/coverage_events.go +++ b/node/consensus/global/coverage_events.go @@ -61,6 +61,9 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { // Update state summaries metric stateSummariesAggregated.Set(float64(len(shardCoverageMap))) + // Collect all merge-eligible shard groups to emit as a single bulk event + var allMergeGroups []typesconsensus.ShardMergeEventData + for shardAddress, coverage := range shardCoverageMap { addressLen := len(shardAddress) @@ -187,7 +190,9 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { // Check for low coverage if proverCount < minProvers { - e.handleLowCoverage([]byte(shardAddress), coverage, minProvers) + if mergeData := e.handleLowCoverage([]byte(shardAddress), coverage, minProvers); mergeData != nil { + allMergeGroups = append(allMergeGroups, *mergeData) + } } // Check for high coverage (potential split) @@ -196,6 +201,11 @@ func (e *GlobalConsensusEngine) checkShardCoverage(frameNumber uint64) error { } } + // Emit a single bulk merge event if there are any merge-eligible shards + if len(allMergeGroups) > 0 { + e.emitBulkMergeEvent(allMergeGroups) + } + return nil } @@ -206,12 +216,13 @@ type ShardCoverage struct { TreeMetadata []typesconsensus.TreeMetadata } -// handleLowCoverage handles shards with insufficient provers +// handleLowCoverage handles shards with insufficient provers. +// Returns merge event data if merge is possible, nil otherwise. func (e *GlobalConsensusEngine) handleLowCoverage( shardAddress []byte, coverage *ShardCoverage, minProvers uint64, -) { +) *typesconsensus.ShardMergeEventData { addressLen := len(shardAddress) // Case 2.a: Full application address (32 bytes) @@ -235,7 +246,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage( Message: "Application shard has low prover coverage", }, ) - return + return nil } // Case 2.b: Longer than application address (> 32 bytes) @@ -260,24 +271,13 @@ func (e *GlobalConsensusEngine) handleLowCoverage( requiredStorage := e.calculateRequiredStorage(allShards) if totalStorage >= requiredStorage { - // Case 2.b.i: Merge is possible - e.logger.Info( - "shards eligible for merge", - zap.String("shard_address", hex.EncodeToString(shardAddress)), - zap.Int("sibling_count", len(siblingShards)), - zap.Uint64("total_storage", totalStorage), - zap.Uint64("required_storage", requiredStorage), - ) - - // Emit merge eligible event - e.emitMergeEvent( - &typesconsensus.ShardMergeEventData{ - ShardAddresses: allShards, - TotalProvers: totalProvers, - AttestedStorage: totalStorage, - RequiredStorage: requiredStorage, - }, - ) + // Case 2.b.i: Merge is possible - return the data for bulk emission + return &typesconsensus.ShardMergeEventData{ + ShardAddresses: allShards, + TotalProvers: totalProvers, + AttestedStorage: totalStorage, + RequiredStorage: requiredStorage, + } } else { // Case 2.b.ii: Insufficient storage for merge e.logger.Warn( @@ -315,6 +315,7 @@ func (e *GlobalConsensusEngine) handleLowCoverage( }, ) } + return nil } // handleHighCoverage handles shards with too many provers diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index c4703f5..3b49945 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -308,9 +308,18 @@ func (e *GlobalConsensusEngine) emitCoverageEvent( ) } -func (e *GlobalConsensusEngine) emitMergeEvent( - data *typesconsensus.ShardMergeEventData, +func (e *GlobalConsensusEngine) emitBulkMergeEvent( + mergeGroups []typesconsensus.ShardMergeEventData, ) { + if len(mergeGroups) == 0 { + return + } + + // Combine all merge groups into a single bulk event + data := &typesconsensus.BulkShardMergeEventData{ + MergeGroups: mergeGroups, + } + event := typesconsensus.ControlEvent{ Type: typesconsensus.ControlEventShardMergeEligible, Data: data, @@ -318,12 +327,18 @@ func (e *GlobalConsensusEngine) emitMergeEvent( go e.eventDistributor.Publish(event) + totalShards := 0 + totalProvers := 0 + for _, group := range mergeGroups { + totalShards += len(group.ShardAddresses) + totalProvers += group.TotalProvers + } + e.logger.Info( - "emitted merge eligible event", - zap.Int("shard_count", len(data.ShardAddresses)), - zap.Int("total_provers", data.TotalProvers), - zap.Uint64("attested_storage", data.AttestedStorage), - zap.Uint64("required_storage", data.RequiredStorage), + "emitted bulk merge eligible event", + zap.Int("merge_groups", len(mergeGroups)), + zap.Int("total_shards", totalShards), + zap.Int("total_provers", totalProvers), ) } diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 1f774c1..2827c17 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -1028,6 +1028,11 @@ func NewGlobalConsensusEngine( ) } + // Wire up pubsub shutdown to the component's shutdown signal + engine.pubsub.SetShutdownContext( + contextFromShutdownSignal(engine.ShutdownSignal()), + ) + // Set self peer ID on hypergraph to allow unlimited self-sync sessions if hgWithSelfPeer, ok := engine.hyperSync.(interface { SetSelfPeerID(string) diff --git a/node/consensus/global/global_consensus_engine_integration_test.go b/node/consensus/global/global_consensus_engine_integration_test.go index 5b437a4..47382da 100644 --- a/node/consensus/global/global_consensus_engine_integration_test.go +++ b/node/consensus/global/global_consensus_engine_integration_test.go @@ -83,6 +83,14 @@ func (m *mockIntegrationPubSub) Close() error { panic("unimplemented") } +// SetShutdownContext implements p2p.PubSub. +func (m *mockIntegrationPubSub) SetShutdownContext(ctx context.Context) { + // Forward to underlying blossomsub if available + if m.underlyingBlossomSub != nil { + m.underlyingBlossomSub.SetShutdownContext(ctx) + } +} + // GetOwnMultiaddrs implements p2p.PubSub. func (m *mockIntegrationPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr { ma, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/8336") diff --git a/node/execution/engines/compute_execution_engine_test.go b/node/execution/engines/compute_execution_engine_test.go index 8f979da..47a6d13 100644 --- a/node/execution/engines/compute_execution_engine_test.go +++ b/node/execution/engines/compute_execution_engine_test.go @@ -831,6 +831,8 @@ func (m *mockPubSub) Close() error { return nil } +func (m *mockPubSub) SetShutdownContext(ctx context.Context) {} + type mockTransaction struct{} // Abort implements store.Transaction. diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index d152544..814df5c 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -1957,6 +1957,21 @@ func (b *BlossomSub) Close() error { return nil } +// SetShutdownContext implements p2p.PubSub. When the provided context is +// cancelled, the internal BlossomSub context will also be cancelled, allowing +// subscription loops to exit gracefully. +func (b *BlossomSub) SetShutdownContext(ctx context.Context) { + go func() { + select { + case <-ctx.Done(): + b.logger.Debug("shutdown context cancelled, closing pubsub") + b.Close() + case <-b.ctx.Done(): + // Already closed + } + }() +} + // MultiaddrToIPNet converts a multiaddr containing /ip4 or /ip6 // into a *net.IPNet with a host mask (/32 or /128). func MultiaddrToIPNet(m ma.Multiaddr) (*net.IPNet, error) { diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 0737ede..adba9f4 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -1392,3 +1392,217 @@ func TestHypergraphSyncWithExpectedRoot(t *testing.T) { _ = stream.CloseSend() }) } + +// TestHypergraphSyncWithModifiedEntries tests sync behavior when both client +// and server have the same keys but with different values (modified entries). +// This verifies that sync correctly updates entries rather than just adding +// new ones or deleting orphans. +func TestHypergraphSyncWithModifiedEntries(t *testing.T) { + logger, _ := zap.NewDevelopment() + enc := verenc.NewMPCitHVerifiableEncryptor(1) + inclusionProver := bls48581.NewKZGInclusionProver(logger) + + // Create enough data trees for all vertices we'll need + numVertices := 50 + dataTrees := make([]*tries.VectorCommitmentTree, numVertices*2) // Extra for modified versions + for i := 0; i < len(dataTrees); i++ { + dataTrees[i] = buildDataTree(t, inclusionProver) + } + + // Create server and client databases + serverDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}, 0) + defer serverDB.Close() + + clientDB := store.NewPebbleDB(logger, &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}, 0) + defer clientDB.Close() + + serverStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestserver/store"}, + serverDB, + logger, + enc, + inclusionProver, + ) + + clientStore := store.NewPebbleHypergraphStore( + &config.DBConfig{InMemoryDONOTUSE: true, Path: ".configtestclient/store"}, + clientDB, + logger, + enc, + inclusionProver, + ) + + serverHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "server")), + serverStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + clientHG := hgcrdt.NewHypergraph( + logger.With(zap.String("side", "client")), + clientStore, + inclusionProver, + []int{}, + &tests.Nopthenticator{}, + 200, + ) + + // Create a shared domain for all vertices + domain := randomBytes32(t) + + // Generate fixed addresses that will be used by both client and server + // This ensures they share the same keys + addresses := make([][32]byte, numVertices) + for i := 0; i < numVertices; i++ { + addresses[i] = randomBytes32(t) + } + + // Create "original" vertices for the client (using first set of data trees) + clientVertices := make([]application.Vertex, numVertices) + for i := 0; i < numVertices; i++ { + clientVertices[i] = hgcrdt.NewVertex( + domain, + addresses[i], // Same address + dataTrees[i].Commit(inclusionProver, false), + dataTrees[i].GetSize(), + ) + } + + // Create "modified" vertices for the server (using second set of data trees) + // These have the SAME addresses but DIFFERENT data commitments + serverVertices := make([]application.Vertex, numVertices) + for i := 0; i < numVertices; i++ { + serverVertices[i] = hgcrdt.NewVertex( + domain, + addresses[i], // Same address as client + dataTrees[numVertices+i].Commit(inclusionProver, false), // Different data + dataTrees[numVertices+i].GetSize(), + ) + } + + shardKey := application.GetShardKey(clientVertices[0]) + + // Add original vertices to client + t.Log("Adding original vertices to client") + clientTxn, err := clientStore.NewTransaction(false) + require.NoError(t, err) + for i, v := range clientVertices { + id := v.GetID() + require.NoError(t, clientStore.SaveVertexTree(clientTxn, id[:], dataTrees[i])) + require.NoError(t, clientHG.AddVertex(clientTxn, v)) + } + require.NoError(t, clientTxn.Commit()) + + // Add modified vertices to server + t.Log("Adding modified vertices to server") + serverTxn, err := serverStore.NewTransaction(false) + require.NoError(t, err) + for i, v := range serverVertices { + id := v.GetID() + require.NoError(t, serverStore.SaveVertexTree(serverTxn, id[:], dataTrees[numVertices+i])) + require.NoError(t, serverHG.AddVertex(serverTxn, v)) + } + require.NoError(t, serverTxn.Commit()) + + // Commit both hypergraphs + _, err = clientHG.Commit(1) + require.NoError(t, err) + _, err = serverHG.Commit(1) + require.NoError(t, err) + + // Verify roots are different before sync (modified entries should cause different roots) + clientRootBefore := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + serverRoot := serverHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + require.NotEqual(t, clientRootBefore, serverRoot, "roots should differ before sync due to modified entries") + + t.Logf("Client root before sync: %x", clientRootBefore) + t.Logf("Server root: %x", serverRoot) + + // Publish server snapshot + serverHG.PublishSnapshot(serverRoot) + + // Start gRPC server + const bufSize = 1 << 20 + lis := bufconn.Listen(bufSize) + + grpcServer := grpc.NewServer( + grpc.ChainStreamInterceptor(func( + srv interface{}, + ss grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + _, priv, _ := ed448.GenerateKey(rand.Reader) + privKey, err := pcrypto.UnmarshalEd448PrivateKey(priv) + require.NoError(t, err) + + pub := privKey.GetPublic() + peerID, err := peer.IDFromPublicKey(pub) + require.NoError(t, err) + + return handler(srv, &serverStream{ + ServerStream: ss, + ctx: internal_grpc.NewContextWithPeerID(ss.Context(), peerID), + }) + }), + ) + protobufs.RegisterHypergraphComparisonServiceServer(grpcServer, serverHG) + defer grpcServer.Stop() + + go func() { + _ = grpcServer.Serve(lis) + }() + + dialClient := func() (*grpc.ClientConn, protobufs.HypergraphComparisonServiceClient) { + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + conn, err := grpc.DialContext( + context.Background(), + "bufnet", + grpc.WithContextDialer(dialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + return conn, protobufs.NewHypergraphComparisonServiceClient(conn) + } + + // Perform sync + t.Log("Performing sync to update modified entries") + conn, client := dialClient() + defer conn.Close() + + stream, err := client.HyperStream(context.Background()) + require.NoError(t, err) + + err = clientHG.Sync( + stream, + shardKey, + protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, + nil, + ) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + + // Commit client after sync + _, err = clientHG.Commit(2) + require.NoError(t, err) + + // Verify client now matches server + clientRootAfter := clientHG.GetVertexAddsSet(shardKey).GetTree().Commit(false) + t.Logf("Client root after sync: %x", clientRootAfter) + + assert.Equal(t, serverRoot, clientRootAfter, "client should converge to server state after sync with modified entries") + + // Verify all entries were updated by comparing the leaves + serverTree := serverHG.GetVertexAddsSet(shardKey).GetTree() + clientTree := clientHG.GetVertexAddsSet(shardKey).GetTree() + + diffLeaves := tries.CompareLeaves(serverTree, clientTree) + assert.Empty(t, diffLeaves, "there should be no difference in leaves after sync") + + t.Logf("Sync completed successfully - %d entries with same keys but different values were updated", numVertices) +} diff --git a/node/rpc/proxy_blossomsub.go b/node/rpc/proxy_blossomsub.go index 0ca28e0..1e180c7 100644 --- a/node/rpc/proxy_blossomsub.go +++ b/node/rpc/proxy_blossomsub.go @@ -168,6 +168,12 @@ func (p *ProxyBlossomSub) Close() error { return nil } +// SetShutdownContext implements p2p.PubSub. +func (p *ProxyBlossomSub) SetShutdownContext(ctx context.Context) { + // Forward to underlying client + p.client.SetShutdownContext(ctx) +} + // PublishToBitmask publishes data to a specific bitmask func (p *ProxyBlossomSub) PublishToBitmask(bitmask []byte, data []byte) error { return p.client.PublishToBitmask(bitmask, data) diff --git a/node/rpc/proxy_blossomsub_test.go b/node/rpc/proxy_blossomsub_test.go index 7e7ece6..548b34c 100644 --- a/node/rpc/proxy_blossomsub_test.go +++ b/node/rpc/proxy_blossomsub_test.go @@ -178,6 +178,7 @@ func (m *mockPubSub) GetNetwork() uint { return 0 } func (m *mockPubSub) IsPeerConnected(peerId []byte) bool { return true } func (m *mockPubSub) Reachability() *wrapperspb.BoolValue { return wrapperspb.Bool(true) } func (m *mockPubSub) Close() error { return nil } +func (m *mockPubSub) SetShutdownContext(ctx context.Context) {} // Test helper functions func createTestConfigs() (*config.P2PConfig, *config.EngineConfig, error) { diff --git a/node/rpc/pubsub_proxy.go b/node/rpc/pubsub_proxy.go index a05f216..94f87e3 100644 --- a/node/rpc/pubsub_proxy.go +++ b/node/rpc/pubsub_proxy.go @@ -564,6 +564,11 @@ func (c *PubSubProxyClient) Close() error { return nil } +// SetShutdownContext implements p2p.PubSub. +func (c *PubSubProxyClient) SetShutdownContext(ctx context.Context) { + // No-op for proxy client - shutdown is handled by the proxied pubsub +} + // NewPubSubProxyClient creates a new proxy client func NewPubSubProxyClient( ctx context.Context, diff --git a/types/consensus/distributor.go b/types/consensus/distributor.go index a5e51c5..3d0915c 100644 --- a/types/consensus/distributor.go +++ b/types/consensus/distributor.go @@ -103,7 +103,7 @@ type TreeMetadata struct { TotalLeaves uint64 } -// ShardMergeEventData contains data for shard merge eligibility +// ShardMergeEventData contains data for a single shard merge group type ShardMergeEventData struct { ShardAddresses [][]byte TotalProvers int @@ -113,6 +113,13 @@ type ShardMergeEventData struct { func (s *ShardMergeEventData) ControlEventData() {} +// BulkShardMergeEventData contains all merge-eligible shard groups in a single event +type BulkShardMergeEventData struct { + MergeGroups []ShardMergeEventData +} + +func (b *BulkShardMergeEventData) ControlEventData() {} + // ShardSplitEventData contains data for shard split eligibility type ShardSplitEventData struct { ShardAddress []byte diff --git a/types/mocks/pubsub.go b/types/mocks/pubsub.go index c5ee5fe..14454a0 100644 --- a/types/mocks/pubsub.go +++ b/types/mocks/pubsub.go @@ -23,6 +23,9 @@ func (m *MockPubSub) Close() error { return nil } +// SetShutdownContext implements p2p.PubSub. +func (m *MockPubSub) SetShutdownContext(ctx context.Context) {} + // GetOwnMultiaddrs implements p2p.PubSub. func (m *MockPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr { args := m.Called() diff --git a/types/p2p/pubsub.go b/types/p2p/pubsub.go index e17ee70..6616b47 100644 --- a/types/p2p/pubsub.go +++ b/types/p2p/pubsub.go @@ -20,6 +20,10 @@ const ( ) type PubSub interface { + // SetShutdownContext allows the caller to provide a context that, when + // cancelled, will trigger graceful shutdown of the pubsub subscription + // loops. This should be called before subscribing to any bitmasks. + SetShutdownContext(ctx context.Context) Close() error PublishToBitmask(bitmask []byte, data []byte) error Publish(address []byte, data []byte) error