From 27ea4268eb3acc36fc46b88b582202ba1d9015bc Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Sun, 16 Nov 2025 20:07:25 -0600 Subject: [PATCH] resolved: sync skipping, time reel disconnect for consensus nodes, proxy pubsub bugs, worker management bugs --- Dockerfile.source | 2 - Dockerfile.sourceavx512 | 2 - node/app/wire.go | 2 + node/app/wire_gen.go | 6 +- node/consensus/app/app_consensus_engine.go | 109 ++++++--- .../global/global_consensus_engine.go | 82 ++++--- node/consensus/global/message_processors.go | 71 ++++-- node/consensus/sync/sync_provider.go | 2 +- node/datarpc/data_worker_ipc_server.go | 4 + node/p2p/blossomsub.go | 5 + node/rpc/proxy_blossomsub.go | 7 +- node/rpc/pubsub_proxy.go | 209 ++++++++++-------- node/worker/manager.go | 66 +++++- types/mocks/pubsub.go | 5 + types/p2p/pubsub.go | 1 + 15 files changed, 393 insertions(+), 180 deletions(-) diff --git a/Dockerfile.source b/Dockerfile.source index 298e43e..9f5644c 100644 --- a/Dockerfile.source +++ b/Dockerfile.source @@ -91,8 +91,6 @@ RUN cd emp-tool && sed -i 's/add_library(${NAME} SHARED ${sources})/add_library( RUN cd emp-ot && mkdir build && cd build && cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local && cd .. && make && make install && cd .. -# RUN go mod download - ## Generate Rust bindings for channel WORKDIR /opt/ceremonyclient/channel diff --git a/Dockerfile.sourceavx512 b/Dockerfile.sourceavx512 index 6b61429..963cf62 100644 --- a/Dockerfile.sourceavx512 +++ b/Dockerfile.sourceavx512 @@ -94,8 +94,6 @@ RUN cd emp-tool && sed -i 's/add_library(${NAME} SHARED ${sources})/add_library( RUN cd emp-ot && mkdir build && cd build && cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local && cd .. && make && make install && cd .. -RUN go mod download - ## Generate Rust bindings for channel WORKDIR /opt/ceremonyclient/channel diff --git a/node/app/wire.go b/node/app/wire.go index 286642a..3d6a84d 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -296,6 +296,7 @@ func provideDataWorkerIPC( proverRegistry consensus.ProverRegistry, appConsensusEngineFactory *app.AppConsensusEngineFactory, peerInfoManager tp2p.PeerInfoManager, + pubsub tp2p.PubSub, frameProver crypto.FrameProver, logger *zap.Logger, coreId uint, @@ -307,6 +308,7 @@ func provideDataWorkerIPC( signerRegistry, proverRegistry, peerInfoManager, + pubsub, frameProver, appConsensusEngineFactory, logger, diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index 825210a..2c3bca6 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -120,7 +120,7 @@ func NewDataWorkerNodeWithProxyPubsub(logger *zap.Logger, config2 *config.Config optimizedProofOfMeaningfulWorkRewardIssuance := reward.NewOptRewardIssuance() doubleRatchetEncryptedChannel := channel.NewDoubleRatchetEncryptedChannel() appConsensusEngineFactory := app.NewAppConsensusEngineFactory(logger, config2, proxyBlossomSub, hypergraph, fileKeyManager, pebbleKeyStore, pebbleClockStore, pebbleInboxStore, pebbleShardsStore, pebbleHypergraphStore, pebbleConsensusStore, frameProver, kzgInclusionProver, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, cachedSignerRegistry, proverRegistry, inMemoryPeerInfoManager, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, bls48581KeyConstructor, doubleRatchetEncryptedChannel) - dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, frameProver, logger, coreId, parentProcess) + dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, proxyBlossomSub, frameProver, logger, coreId, parentProcess) globalTimeReel, err := provideGlobalTimeReel(appConsensusEngineFactory) if err != nil { return nil, err @@ -177,7 +177,7 @@ func NewDataWorkerNodeWithoutProxyPubsub(logger *zap.Logger, config2 *config.Con optimizedProofOfMeaningfulWorkRewardIssuance := reward.NewOptRewardIssuance() doubleRatchetEncryptedChannel := channel.NewDoubleRatchetEncryptedChannel() appConsensusEngineFactory := app.NewAppConsensusEngineFactory(logger, config2, blossomSub, hypergraph, fileKeyManager, pebbleKeyStore, pebbleClockStore, pebbleInboxStore, pebbleShardsStore, pebbleHypergraphStore, pebbleConsensusStore, frameProver, kzgInclusionProver, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, cachedSignerRegistry, proverRegistry, inMemoryPeerInfoManager, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, bls48581KeyConstructor, doubleRatchetEncryptedChannel) - dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, frameProver, logger, coreId, parentProcess) + dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, blossomSub, frameProver, logger, coreId, parentProcess) globalTimeReel, err := provideGlobalTimeReel(appConsensusEngineFactory) if err != nil { return nil, err @@ -361,6 +361,7 @@ func provideDataWorkerIPC( proverRegistry consensus2.ProverRegistry, appConsensusEngineFactory *app.AppConsensusEngineFactory, peerInfoManager p2p2.PeerInfoManager, + pubsub p2p2.PubSub, frameProver crypto.FrameProver, logger *zap.Logger, coreId uint, @@ -370,6 +371,7 @@ func provideDataWorkerIPC( rpcMultiaddr, config2, signerRegistry, proverRegistry, peerInfoManager, + pubsub, frameProver, appConsensusEngineFactory, logger, diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 46d314a..bc301de 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -1767,6 +1767,35 @@ func (e *AppConsensusEngine) OnOwnProposal( return } + txn, err := e.clockStore.NewTransaction(false) + if err != nil { + e.logger.Error("could not create transaction", zap.Error(err)) + return + } + + if err := e.clockStore.PutProposalVote(txn, *proposal.Vote); err != nil { + e.logger.Error("could not put vote", zap.Error(err)) + txn.Abort() + return + } + + err = e.clockStore.StageShardClockFrame( + []byte(proposal.State.Identifier), + *proposal.State.State, + txn, + ) + if err != nil { + e.logger.Error("could not put frame candidate", zap.Error(err)) + txn.Abort() + return + } + + if err := txn.Commit(); err != nil { + e.logger.Error("could not commit transaction", zap.Error(err)) + txn.Abort() + return + } + e.voteAggregator.AddState(proposal) e.consensusParticipant.SubmitProposal(proposal) @@ -1976,6 +2005,29 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange( frame.Header.PublicKeySignatureBls48581 = aggregateSig + latest, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress) + if err != nil { + e.logger.Error("could not obtain latest frame", zap.Error(err)) + return + } + if latest.Header.FrameNumber+1 != frame.Header.FrameNumber || + !bytes.Equal([]byte(latest.Identity()), frame.Header.ParentSelector) { + e.logger.Debug( + "not next frame, cannot advance", + zap.Uint64("latest_frame_number", latest.Header.FrameNumber), + zap.Uint64("new_frame_number", frame.Header.FrameNumber), + zap.String( + "latest_frame_selector", + hex.EncodeToString([]byte(latest.Identity())), + ), + zap.String( + "new_frame_number", + hex.EncodeToString(frame.Header.ParentSelector), + ), + ) + return + } + txn, err = e.clockStore.NewTransaction(false) if err != nil { e.logger.Error("could not create transaction", zap.Error(err)) @@ -2568,52 +2620,55 @@ func (e *AppConsensusEngine) getPendingProposals( *protobufs.AppShardFrame, *protobufs.ProposalVote, ] { - pendingFrames, err := e.clockStore.RangeShardClockFrames( - e.appAddress, - frameNumber, - 0xfffffffffffffffe, - ) + root, _, err := e.clockStore.GetShardClockFrame(e.appAddress, frameNumber, false) if err != nil { panic(err) } - defer pendingFrames.Close() result := []*models.SignedProposal[ *protobufs.AppShardFrame, *protobufs.ProposalVote, ]{} - pendingFrames.First() - if !pendingFrames.Valid() { - return result + e.logger.Debug("getting pending proposals", zap.Uint64("start", frameNumber)) + + startRank := root.Header.Rank + latestQC, err := e.clockStore.GetLatestQuorumCertificate(e.appAddress) + if err != nil { + panic(err) } - value, err := pendingFrames.Value() - if err != nil || value == nil { - return result + endRank := latestQC.Rank + + parent, err := e.clockStore.GetQuorumCertificate(e.appAddress, startRank) + if err != nil { + panic(err) } - previous := value - for pendingFrames.First(); pendingFrames.Valid(); pendingFrames.Next() { - value, err := pendingFrames.Value() - if err != nil || value == nil { - break + for rank := startRank + 1; rank <= endRank; rank++ { + nextQC, err := e.clockStore.GetQuorumCertificate(e.appAddress, rank) + if err != nil { + e.logger.Debug("no qc for rank", zap.Error(err)) + continue } - parent, err := e.clockStore.GetQuorumCertificate( + value, err := e.clockStore.GetStagedShardClockFrame( e.appAddress, - previous.GetRank(), + nextQC.FrameNumber, + []byte(nextQC.Identity()), + false, ) if err != nil { - panic(err) + e.logger.Debug("no frame for qc", zap.Error(err)) + parent = nextQC + continue } - priorTC, _ := e.clockStore.GetTimeoutCertificate( - e.appAddress, - value.GetRank()-1, - ) var priorTCModel models.TimeoutCertificate = nil - if priorTC != nil { - priorTCModel = priorTC + if parent.Rank != rank-1 { + priorTC, _ := e.clockStore.GetTimeoutCertificate(e.appAddress, rank-1) + if priorTC != nil { + priorTCModel = priorTC + } } vote := &protobufs.ProposalVote{ @@ -2642,7 +2697,7 @@ func (e *AppConsensusEngine) getPendingProposals( }, Vote: &vote, }) - previous = value + parent = nextQC } return result } diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index f1f8f2a..f2d267f 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -2914,6 +2914,30 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange( frame.Header.PublicKeySignatureBls48581 = aggregateSig + latest, err := e.clockStore.GetLatestGlobalClockFrame() + if err != nil { + e.logger.Error("could not obtain latest frame", zap.Error(err)) + return + } + + if latest.Header.FrameNumber+1 != frame.Header.FrameNumber || + !bytes.Equal([]byte(latest.Identity()), frame.Header.ParentSelector) { + e.logger.Debug( + "not next frame, cannot advance", + zap.Uint64("latest_frame_number", latest.Header.FrameNumber), + zap.Uint64("new_frame_number", frame.Header.FrameNumber), + zap.String( + "latest_frame_selector", + hex.EncodeToString([]byte(latest.Identity())), + ), + zap.String( + "new_frame_number", + hex.EncodeToString(frame.Header.ParentSelector), + ), + ) + return + } + txn, err = e.clockStore.NewTransaction(false) if err != nil { e.logger.Error("could not create transaction", zap.Error(err)) @@ -3249,14 +3273,10 @@ func (e *GlobalConsensusEngine) getPendingProposals( *protobufs.GlobalFrame, *protobufs.ProposalVote, ] { - pendingFrames, err := e.clockStore.RangeGlobalClockFrames( - frameNumber, - 0xfffffffffffffffe, - ) + root, err := e.clockStore.GetGlobalClockFrame(frameNumber) if err != nil { panic(err) } - defer pendingFrames.Close() result := []*models.SignedProposal[ *protobufs.GlobalFrame, @@ -3264,34 +3284,42 @@ func (e *GlobalConsensusEngine) getPendingProposals( ]{} e.logger.Debug("getting pending proposals", zap.Uint64("start", frameNumber)) - pendingFrames.First() - if !pendingFrames.Valid() { - e.logger.Debug("no valid frame") - return result + + startRank := root.Header.Rank + latestQC, err := e.clockStore.GetLatestQuorumCertificate(nil) + if err != nil { + panic(err) } - value, err := pendingFrames.Value() - if err != nil || value == nil { - e.logger.Debug("value was invalid", zap.Error(err)) - return result + endRank := latestQC.Rank + + parent, err := e.clockStore.GetQuorumCertificate(nil, startRank) + if err != nil { + panic(err) } - previous := value - for pendingFrames.Next(); pendingFrames.Valid(); pendingFrames.Next() { - value, err := pendingFrames.Value() - if err != nil || value == nil { - e.logger.Debug("iter value was invalid or empty", zap.Error(err)) - break - } - - parent, err := e.clockStore.GetQuorumCertificate(nil, previous.GetRank()) + for rank := startRank + 1; rank <= endRank; rank++ { + nextQC, err := e.clockStore.GetQuorumCertificate(nil, rank) if err != nil { - panic(err) + e.logger.Debug("no qc for rank", zap.Error(err)) + continue + } + + value, err := e.clockStore.GetGlobalClockFrameCandidate( + nextQC.FrameNumber, + []byte(nextQC.Identity()), + ) + if err != nil { + e.logger.Debug("no frame for qc", zap.Error(err)) + parent = nextQC + continue } - priorTC, _ := e.clockStore.GetTimeoutCertificate(nil, value.GetRank()-1) var priorTCModel models.TimeoutCertificate = nil - if priorTC != nil { - priorTCModel = priorTC + if parent.Rank != rank-1 { + priorTC, _ := e.clockStore.GetTimeoutCertificate(nil, rank-1) + if priorTC != nil { + priorTCModel = priorTC + } } vote := &protobufs.ProposalVote{ @@ -3319,7 +3347,7 @@ func (e *GlobalConsensusEngine) getPendingProposals( }, Vote: &vote, }) - previous = value + parent = nextQC } return result } diff --git a/node/consensus/global/message_processors.go b/node/consensus/global/message_processors.go index 56bff89..95ef00f 100644 --- a/node/consensus/global/message_processors.go +++ b/node/consensus/global/message_processors.go @@ -911,6 +911,8 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( e.logger.Debug( "handling global proposal", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))), ) @@ -950,7 +952,13 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( // drop proposals if we already processed them if frameNumber <= finalizedFrameNumber || proposal.State.Header.Rank <= finalizedRank { - e.logger.Debug("dropping stale proposal") + e.logger.Debug( + "dropping stale (lower than finalized) proposal", + zap.Uint64("finalized_rank", finalizedRank), + zap.Uint64("finalized_frame_number", finalizedFrameNumber), + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + ) return } @@ -963,7 +971,11 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( if qcErr == nil && qc != nil && qc.GetFrameNumber() == frameNumber && qc.Identity() == proposal.State.Identity() { - e.logger.Debug("dropping stale proposal") + e.logger.Debug( + "dropping stale (already committed) proposal", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + ) return } } @@ -980,7 +992,9 @@ func (e *GlobalConsensusEngine) handleGlobalProposal( ) { e.logger.Debug( "parent frame not stored, requesting sync", - zap.Uint64("frame_number", proposal.State.Header.FrameNumber-1), + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Uint64("parent_frame_number", proposal.State.Header.FrameNumber-1), ) e.cacheProposal(proposal) @@ -1041,38 +1055,65 @@ func (e *GlobalConsensusEngine) processProposal( ) bool { e.logger.Debug( "processing proposal", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))), ) err := e.VerifyQuorumCertificate(proposal.ParentQuorumCertificate) if err != nil { - e.logger.Debug("proposal has invalid qc", zap.Error(err)) + e.logger.Debug( + "proposal has invalid qc", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } if proposal.PriorRankTimeoutCertificate != nil { err := e.VerifyTimeoutCertificate(proposal.PriorRankTimeoutCertificate) if err != nil { - e.logger.Debug("proposal has invalid tc", zap.Error(err)) + e.logger.Debug( + "proposal has invalid tc", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } } err = e.VerifyVote(&proposal.Vote) if err != nil { - e.logger.Debug("proposal has invalid vote", zap.Error(err)) + e.logger.Debug( + "proposal has invalid vote", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } err = proposal.State.Validate() if err != nil { - e.logger.Debug("proposal is not valid", zap.Error(err)) + e.logger.Debug( + "proposal is not valid", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } valid, err := e.frameValidator.Validate(proposal.State) if !valid || err != nil { - e.logger.Debug("invalid frame in proposal", zap.Error(err)) + e.logger.Debug( + "invalid frame in proposal", + zap.Uint64("rank", proposal.GetRank()), + zap.Uint64("frame_number", proposal.State.GetFrameNumber()), + zap.Error(err), + ) return false } @@ -1129,6 +1170,7 @@ func (e *GlobalConsensusEngine) cacheProposal( e.logger.Debug( "cached out-of-order proposal", + zap.Uint64("rank", proposal.GetRank()), zap.Uint64("frame_number", frameNumber), zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))), ) @@ -1165,6 +1207,7 @@ func (e *GlobalConsensusEngine) drainProposalCache(startFrame uint64) { if !e.processProposal(prop) { e.logger.Debug( "cached proposal failed processing, retaining for retry", + zap.Uint64("rank", prop.GetRank()), zap.Uint64("frame_number", next), ) e.cacheProposal(prop) @@ -1318,18 +1361,6 @@ func (e *GlobalConsensusEngine) addCertifiedState( return } - if err := txn.Commit(); err != nil { - _ = txn.Abort() - e.logger.Error("could not commit transaction", zap.Error(err)) - return - } - - txn, err = e.clockStore.NewTransaction(false) - if err != nil { - e.logger.Error("could not create transaction", zap.Error(err)) - return - } - if err := e.clockStore.PutCertifiedGlobalState( parent, txn, diff --git a/node/consensus/sync/sync_provider.go b/node/consensus/sync/sync_provider.go index ce2e339..c2da4b1 100644 --- a/node/consensus/sync/sync_provider.go +++ b/node/consensus/sync/sync_provider.go @@ -29,7 +29,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/types/tries" ) -const defaultStateQueueCapacity = 10 +const defaultStateQueueCapacity = 1 type syncRequest struct { frameNumber uint64 diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index ab1d22f..a5c1afa 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -38,6 +38,7 @@ type DataWorkerIPCServer struct { signerRegistry consensus.SignerRegistry proverRegistry consensus.ProverRegistry peerInfoManager tp2p.PeerInfoManager + pubsub tp2p.PubSub authProvider channel.AuthenticationProvider appConsensusEngineFactory *app.AppConsensusEngineFactory appConsensusEngine *app.AppConsensusEngine @@ -52,6 +53,7 @@ func NewDataWorkerIPCServer( signerRegistry consensus.SignerRegistry, proverRegistry consensus.ProverRegistry, peerInfoManager tp2p.PeerInfoManager, + pubsub tp2p.PubSub, frameProver crypto.FrameProver, appConsensusEngineFactory *app.AppConsensusEngineFactory, logger *zap.Logger, @@ -89,6 +91,7 @@ func NewDataWorkerIPCServer( logger: logger, coreId: coreId, parentProcessId: parentProcessId, + pubsub: pubsub, signer: signer, appConsensusEngineFactory: appConsensusEngineFactory, signerRegistry: signerRegistry, @@ -108,6 +111,7 @@ func (r *DataWorkerIPCServer) Start() error { func (r *DataWorkerIPCServer) Stop() error { r.logger.Info("stopping server gracefully") + r.pubsub.Close() if r.server != nil { r.server.GracefulStop() } diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index d6f282b..385b5c1 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -1606,3 +1606,8 @@ func getNetworkNamespace(network uint8) string { return ANNOUNCE_PREFIX + network_name } + +// Close implements p2p.PubSub. +func (b *BlossomSub) Close() error { + return nil +} diff --git a/node/rpc/proxy_blossomsub.go b/node/rpc/proxy_blossomsub.go index 138f640..0ca28e0 100644 --- a/node/rpc/proxy_blossomsub.go +++ b/node/rpc/proxy_blossomsub.go @@ -26,6 +26,7 @@ type ProxyBlossomSub struct { client *PubSubProxyClient conn *grpc.ClientConn logger *zap.Logger + cancel context.CancelFunc coreId uint } @@ -144,10 +145,13 @@ func NewProxyBlossomSub( return nil, errors.Wrap(err, "new proxy blossom sub") } + ctx, cancel := context.WithCancel(context.Background()) + // Create the proxy client - client := NewPubSubProxyClient(conn, logger) + client := NewPubSubProxyClient(ctx, conn, logger) return &ProxyBlossomSub{ + cancel: cancel, client: client, conn: conn, logger: logger, @@ -157,6 +161,7 @@ func NewProxyBlossomSub( // Close closes the proxy connection func (p *ProxyBlossomSub) Close() error { + p.cancel() if p.conn != nil { return p.conn.Close() } diff --git a/node/rpc/pubsub_proxy.go b/node/rpc/pubsub_proxy.go index 95664a9..48bba53 100644 --- a/node/rpc/pubsub_proxy.go +++ b/node/rpc/pubsub_proxy.go @@ -520,6 +520,7 @@ type PubSubProxyClient struct { client protobufs.PubSubProxyClient conn *grpc.ClientConn logger *zap.Logger + ctx context.Context // Track active subscriptions and validators subscriptions map[string]context.CancelFunc @@ -530,8 +531,14 @@ type PubSubProxyClient struct { mu sync.RWMutex } +// Close implements p2p.PubSub. +func (c *PubSubProxyClient) Close() error { + return nil +} + // NewPubSubProxyClient creates a new proxy client func NewPubSubProxyClient( + ctx context.Context, conn *grpc.ClientConn, logger *zap.Logger, ) *PubSubProxyClient { @@ -539,6 +546,7 @@ func NewPubSubProxyClient( client: protobufs.NewPubSubProxyClient(conn), conn: conn, logger: logger, + ctx: ctx, subscriptions: make(map[string]context.CancelFunc), validators: make(map[string]func( peer.ID, @@ -555,116 +563,131 @@ func NewPubSubProxyClient( time.Sleep(10 * time.Second) // Initialize validator stream - if err := client.initValidatorStream(); err != nil { + if err := client.initValidatorStream(ctx); err != nil { logger.Error("failed to initialize validator stream", zap.Error(err)) } return client } -func (c *PubSubProxyClient) initValidatorStream() error { +func (c *PubSubProxyClient) initValidatorStream(ctx context.Context) error { backoff := time.Second for { - stream, err := c.client.ValidatorStream(context.Background()) - if err != nil { - c.logger.Error( - "validator stream connect failed, retrying", - zap.Error(err), - zap.Duration("retry_in", backoff), - ) - time.Sleep(backoff) - if backoff < 30*time.Second { - backoff *= 2 - if backoff > 30*time.Second { - backoff = 30 * time.Second - } - } - continue - } - - c.validatorStreamMu.Lock() - c.validatorStream = stream - c.validatorStreamMu.Unlock() - - // Start goroutine to handle incoming validation requests - go c.handleValidationRequests() - - return nil - } -} - -func (c *PubSubProxyClient) handleValidationRequests() { - for { - msg, err := c.validatorStream.Recv() - if err != nil { - c.logger.Error("validator stream recv error", zap.Error(err)) - c.validatorStreamMu.Lock() - c.validatorStream = nil - c.validatorStreamMu.Unlock() - // Try to reconnect - time.Sleep(1 * time.Second) - if err := c.initValidatorStream(); err != nil { + select { + case <-ctx.Done(): + return nil + default: + stream, err := c.client.ValidatorStream(ctx) + if err != nil { c.logger.Error( - "failed to reinitialize validator stream", + "validator stream connect failed, retrying", zap.Error(err), + zap.Duration("retry_in", backoff), ) - } - return - } + select { + case <-ctx.Done(): + return nil + case <-time.After(backoff): + if backoff < 30*time.Second { + backoff *= 2 + if backoff > 30*time.Second { + backoff = 30 * time.Second + } + } + } - switch m := msg.Message.(type) { - case *protobufs.ValidationStreamMessage_ValidationRequest: - req := m.ValidationRequest - - // Look up the validator function - c.mu.RLock() - validator, exists := c.validators[req.ValidatorId] - c.mu.RUnlock() - - if !exists { - c.logger.Warn("received validation request for unknown validator", - zap.String("validator_id", req.ValidatorId)) continue } - // Convert message and call validator - pbMsg := &pb.Message{ - Data: req.Message.Data, - From: req.Message.From, - Seqno: req.Message.Seqno, - Bitmask: req.Message.Bitmask, - Signature: req.Message.Signature, - Key: req.Message.Key, - } - - result := validator(peer.ID(req.PeerId), pbMsg) - - // Send response - var protoResult protobufs.ValidationResponse_ValidationResult - switch result { - case p2p.ValidationResultAccept: - protoResult = protobufs.ValidationResponse_ACCEPT - case p2p.ValidationResultReject: - protoResult = protobufs.ValidationResponse_REJECT - default: - protoResult = protobufs.ValidationResponse_IGNORE - } - - resp := &protobufs.ValidationStreamMessage{ - Message: &protobufs.ValidationStreamMessage_ValidationResponse{ - ValidationResponse: &protobufs.ValidationResponse{ - ValidatorId: req.ValidatorId, - Result: protoResult, - }, - }, - } - c.validatorStreamMu.Lock() - if err := c.validatorStream.Send(resp); err != nil { - c.logger.Error("failed to send validation response", zap.Error(err)) - } + c.validatorStream = stream c.validatorStreamMu.Unlock() + + // Start goroutine to handle incoming validation requests + go c.handleValidationRequests(ctx) + + return nil + } + } +} + +func (c *PubSubProxyClient) handleValidationRequests(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + msg, err := c.validatorStream.Recv() + if err != nil { + c.logger.Error("validator stream recv error", zap.Error(err)) + c.validatorStreamMu.Lock() + c.validatorStream = nil + c.validatorStreamMu.Unlock() + // Try to reconnect + time.Sleep(1 * time.Second) + if err := c.initValidatorStream(ctx); err != nil { + c.logger.Error( + "failed to reinitialize validator stream", + zap.Error(err), + ) + } + return + } + + switch m := msg.Message.(type) { + case *protobufs.ValidationStreamMessage_ValidationRequest: + req := m.ValidationRequest + + // Look up the validator function + c.mu.RLock() + validator, exists := c.validators[req.ValidatorId] + c.mu.RUnlock() + + if !exists { + c.logger.Warn("received validation request for unknown validator", + zap.String("validator_id", req.ValidatorId)) + continue + } + + // Convert message and call validator + pbMsg := &pb.Message{ + Data: req.Message.Data, + From: req.Message.From, + Seqno: req.Message.Seqno, + Bitmask: req.Message.Bitmask, + Signature: req.Message.Signature, + Key: req.Message.Key, + } + + result := validator(peer.ID(req.PeerId), pbMsg) + + // Send response + var protoResult protobufs.ValidationResponse_ValidationResult + switch result { + case p2p.ValidationResultAccept: + protoResult = protobufs.ValidationResponse_ACCEPT + case p2p.ValidationResultReject: + protoResult = protobufs.ValidationResponse_REJECT + default: + protoResult = protobufs.ValidationResponse_IGNORE + } + + resp := &protobufs.ValidationStreamMessage{ + Message: &protobufs.ValidationStreamMessage_ValidationResponse{ + ValidationResponse: &protobufs.ValidationResponse{ + ValidatorId: req.ValidatorId, + Result: protoResult, + }, + }, + } + + c.validatorStreamMu.Lock() + if err := c.validatorStream.Send(resp); err != nil { + c.logger.Error("failed to send validation response", zap.Error(err)) + } + c.validatorStreamMu.Unlock() + } } } } @@ -819,7 +842,7 @@ func (c *PubSubProxyClient) RegisterValidator( if c.validatorStream == nil { c.validatorStreamMu.Unlock() // Try to initialize stream if not already done - if err := c.initValidatorStream(); err != nil { + if err := c.initValidatorStream(c.ctx); err != nil { c.mu.Lock() delete(c.validators, validatorID) delete(c.bitmaskValidators, bitmaskKey) diff --git a/node/worker/manager.go b/node/worker/manager.go index 24240b7..1ed0bf0 100644 --- a/node/worker/manager.go +++ b/node/worker/manager.go @@ -539,14 +539,70 @@ func (w *WorkerManager) loadWorkersFromStore() error { return errors.Wrap(err, "load workers from store") } - if len(workers) != w.config.Engine.DataWorkerCount { - for i := range w.config.Engine.DataWorkerCount { - _, err := w.getIPCOfWorker(uint(i + 1)) - if err != nil { - w.logger.Error("could not obtain IPC for worker", zap.Error(err)) + if len(workers) != int(w.config.Engine.DataWorkerCount) { + existingWorkers := make(map[uint]*typesStore.WorkerInfo, len(workers)) + for _, worker := range workers { + existingWorkers[worker.CoreId] = worker + } + + // Ensure all configured workers exist + for i := uint(1); i <= uint(w.config.Engine.DataWorkerCount); i++ { + if _, ok := existingWorkers[i]; ok { continue } + if _, err := w.getIPCOfWorker(i); err != nil { + w.logger.Error( + "could not obtain IPC for worker", + zap.Uint("core_id", i), + zap.Error(err), + ) + } } + + // Remove workers beyond configured count + for _, worker := range workers { + if worker.CoreId <= uint(w.config.Engine.DataWorkerCount) { + continue + } + + txn, err := w.store.NewTransaction(false) + if err != nil { + w.logger.Error( + "could not create txn to delete worker", + zap.Uint("core_id", worker.CoreId), + zap.Error(err), + ) + continue + } + + if err := w.store.DeleteWorker(txn, worker.CoreId); err != nil { + _ = txn.Abort() + w.logger.Error( + "could not delete worker", + zap.Uint("core_id", worker.CoreId), + zap.Error(err), + ) + } + if err := txn.Commit(); err != nil { + _ = txn.Abort() + w.logger.Error( + "could not commit worker delete", + zap.Uint("core_id", worker.CoreId), + zap.Error(err), + ) + } + + if client, ok := w.serviceClients[worker.CoreId]; ok { + _ = client.Close() + delete(w.serviceClients, worker.CoreId) + } + delete(w.filtersByWorker, worker.CoreId) + delete(w.allocatedWorkers, worker.CoreId) + if len(worker.Filter) > 0 { + delete(w.workersByFilter, string(worker.Filter)) + } + } + workers, err = w.store.RangeWorkers() if err != nil { return errors.Wrap(err, "load workers from store") diff --git a/types/mocks/pubsub.go b/types/mocks/pubsub.go index e3b9d7f..c5ee5fe 100644 --- a/types/mocks/pubsub.go +++ b/types/mocks/pubsub.go @@ -18,6 +18,11 @@ type MockPubSub struct { mock.Mock } +// Close implements p2p.PubSub. +func (m *MockPubSub) Close() error { + return nil +} + // GetOwnMultiaddrs implements p2p.PubSub. func (m *MockPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr { args := m.Called() diff --git a/types/p2p/pubsub.go b/types/p2p/pubsub.go index 50768b4..e17ee70 100644 --- a/types/p2p/pubsub.go +++ b/types/p2p/pubsub.go @@ -20,6 +20,7 @@ const ( ) type PubSub interface { + Close() error PublishToBitmask(bitmask []byte, data []byte) error Publish(address []byte, data []byte) error Subscribe(bitmask []byte, handler func(message *pb.Message) error) error