resolved: sync skipping, time reel disconnect for consensus nodes, proxy pubsub bugs, worker management bugs

This commit is contained in:
Cassandra Heart 2025-11-16 20:07:25 -06:00
parent 905af45f9d
commit 27ea4268eb
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
15 changed files with 393 additions and 180 deletions

View File

@ -91,8 +91,6 @@ RUN cd emp-tool && sed -i 's/add_library(${NAME} SHARED ${sources})/add_library(
RUN cd emp-ot && mkdir build && cd build && cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local && cd .. && make && make install && cd ..
# RUN go mod download
## Generate Rust bindings for channel
WORKDIR /opt/ceremonyclient/channel

View File

@ -94,8 +94,6 @@ RUN cd emp-tool && sed -i 's/add_library(${NAME} SHARED ${sources})/add_library(
RUN cd emp-ot && mkdir build && cd build && cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local && cd .. && make && make install && cd ..
RUN go mod download
## Generate Rust bindings for channel
WORKDIR /opt/ceremonyclient/channel

View File

@ -296,6 +296,7 @@ func provideDataWorkerIPC(
proverRegistry consensus.ProverRegistry,
appConsensusEngineFactory *app.AppConsensusEngineFactory,
peerInfoManager tp2p.PeerInfoManager,
pubsub tp2p.PubSub,
frameProver crypto.FrameProver,
logger *zap.Logger,
coreId uint,
@ -307,6 +308,7 @@ func provideDataWorkerIPC(
signerRegistry,
proverRegistry,
peerInfoManager,
pubsub,
frameProver,
appConsensusEngineFactory,
logger,

View File

@ -120,7 +120,7 @@ func NewDataWorkerNodeWithProxyPubsub(logger *zap.Logger, config2 *config.Config
optimizedProofOfMeaningfulWorkRewardIssuance := reward.NewOptRewardIssuance()
doubleRatchetEncryptedChannel := channel.NewDoubleRatchetEncryptedChannel()
appConsensusEngineFactory := app.NewAppConsensusEngineFactory(logger, config2, proxyBlossomSub, hypergraph, fileKeyManager, pebbleKeyStore, pebbleClockStore, pebbleInboxStore, pebbleShardsStore, pebbleHypergraphStore, pebbleConsensusStore, frameProver, kzgInclusionProver, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, cachedSignerRegistry, proverRegistry, inMemoryPeerInfoManager, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, bls48581KeyConstructor, doubleRatchetEncryptedChannel)
dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, frameProver, logger, coreId, parentProcess)
dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, proxyBlossomSub, frameProver, logger, coreId, parentProcess)
globalTimeReel, err := provideGlobalTimeReel(appConsensusEngineFactory)
if err != nil {
return nil, err
@ -177,7 +177,7 @@ func NewDataWorkerNodeWithoutProxyPubsub(logger *zap.Logger, config2 *config.Con
optimizedProofOfMeaningfulWorkRewardIssuance := reward.NewOptRewardIssuance()
doubleRatchetEncryptedChannel := channel.NewDoubleRatchetEncryptedChannel()
appConsensusEngineFactory := app.NewAppConsensusEngineFactory(logger, config2, blossomSub, hypergraph, fileKeyManager, pebbleKeyStore, pebbleClockStore, pebbleInboxStore, pebbleShardsStore, pebbleHypergraphStore, pebbleConsensusStore, frameProver, kzgInclusionProver, decaf448BulletproofProver, mpCitHVerifiableEncryptor, decaf448KeyConstructor, bedlamCompiler, cachedSignerRegistry, proverRegistry, inMemoryPeerInfoManager, dynamicFeeManager, blsAppFrameValidator, blsGlobalFrameValidator, asertDifficultyAdjuster, optimizedProofOfMeaningfulWorkRewardIssuance, bls48581KeyConstructor, doubleRatchetEncryptedChannel)
dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, frameProver, logger, coreId, parentProcess)
dataWorkerIPCServer := provideDataWorkerIPC(rpcMultiaddr, config2, cachedSignerRegistry, proverRegistry, appConsensusEngineFactory, inMemoryPeerInfoManager, blossomSub, frameProver, logger, coreId, parentProcess)
globalTimeReel, err := provideGlobalTimeReel(appConsensusEngineFactory)
if err != nil {
return nil, err
@ -361,6 +361,7 @@ func provideDataWorkerIPC(
proverRegistry consensus2.ProverRegistry,
appConsensusEngineFactory *app.AppConsensusEngineFactory,
peerInfoManager p2p2.PeerInfoManager,
pubsub p2p2.PubSub,
frameProver crypto.FrameProver,
logger *zap.Logger,
coreId uint,
@ -370,6 +371,7 @@ func provideDataWorkerIPC(
rpcMultiaddr, config2, signerRegistry,
proverRegistry,
peerInfoManager,
pubsub,
frameProver,
appConsensusEngineFactory,
logger,

View File

@ -1767,6 +1767,35 @@ func (e *AppConsensusEngine) OnOwnProposal(
return
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutProposalVote(txn, *proposal.Vote); err != nil {
e.logger.Error("could not put vote", zap.Error(err))
txn.Abort()
return
}
err = e.clockStore.StageShardClockFrame(
[]byte(proposal.State.Identifier),
*proposal.State.State,
txn,
)
if err != nil {
e.logger.Error("could not put frame candidate", zap.Error(err))
txn.Abort()
return
}
if err := txn.Commit(); err != nil {
e.logger.Error("could not commit transaction", zap.Error(err))
txn.Abort()
return
}
e.voteAggregator.AddState(proposal)
e.consensusParticipant.SubmitProposal(proposal)
@ -1976,6 +2005,29 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange(
frame.Header.PublicKeySignatureBls48581 = aggregateSig
latest, _, err := e.clockStore.GetLatestShardClockFrame(e.appAddress)
if err != nil {
e.logger.Error("could not obtain latest frame", zap.Error(err))
return
}
if latest.Header.FrameNumber+1 != frame.Header.FrameNumber ||
!bytes.Equal([]byte(latest.Identity()), frame.Header.ParentSelector) {
e.logger.Debug(
"not next frame, cannot advance",
zap.Uint64("latest_frame_number", latest.Header.FrameNumber),
zap.Uint64("new_frame_number", frame.Header.FrameNumber),
zap.String(
"latest_frame_selector",
hex.EncodeToString([]byte(latest.Identity())),
),
zap.String(
"new_frame_number",
hex.EncodeToString(frame.Header.ParentSelector),
),
)
return
}
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
@ -2568,53 +2620,56 @@ func (e *AppConsensusEngine) getPendingProposals(
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
] {
pendingFrames, err := e.clockStore.RangeShardClockFrames(
e.appAddress,
frameNumber,
0xfffffffffffffffe,
)
root, _, err := e.clockStore.GetShardClockFrame(e.appAddress, frameNumber, false)
if err != nil {
panic(err)
}
defer pendingFrames.Close()
result := []*models.SignedProposal[
*protobufs.AppShardFrame,
*protobufs.ProposalVote,
]{}
pendingFrames.First()
if !pendingFrames.Valid() {
return result
}
value, err := pendingFrames.Value()
if err != nil || value == nil {
return result
}
e.logger.Debug("getting pending proposals", zap.Uint64("start", frameNumber))
previous := value
for pendingFrames.First(); pendingFrames.Valid(); pendingFrames.Next() {
value, err := pendingFrames.Value()
if err != nil || value == nil {
break
startRank := root.Header.Rank
latestQC, err := e.clockStore.GetLatestQuorumCertificate(e.appAddress)
if err != nil {
panic(err)
}
endRank := latestQC.Rank
parent, err := e.clockStore.GetQuorumCertificate(
e.appAddress,
previous.GetRank(),
)
parent, err := e.clockStore.GetQuorumCertificate(e.appAddress, startRank)
if err != nil {
panic(err)
}
priorTC, _ := e.clockStore.GetTimeoutCertificate(
for rank := startRank + 1; rank <= endRank; rank++ {
nextQC, err := e.clockStore.GetQuorumCertificate(e.appAddress, rank)
if err != nil {
e.logger.Debug("no qc for rank", zap.Error(err))
continue
}
value, err := e.clockStore.GetStagedShardClockFrame(
e.appAddress,
value.GetRank()-1,
nextQC.FrameNumber,
[]byte(nextQC.Identity()),
false,
)
if err != nil {
e.logger.Debug("no frame for qc", zap.Error(err))
parent = nextQC
continue
}
var priorTCModel models.TimeoutCertificate = nil
if parent.Rank != rank-1 {
priorTC, _ := e.clockStore.GetTimeoutCertificate(e.appAddress, rank-1)
if priorTC != nil {
priorTCModel = priorTC
}
}
vote := &protobufs.ProposalVote{
Filter: e.appAddress,
@ -2642,7 +2697,7 @@ func (e *AppConsensusEngine) getPendingProposals(
},
Vote: &vote,
})
previous = value
parent = nextQC
}
return result
}

View File

@ -2914,6 +2914,30 @@ func (e *GlobalConsensusEngine) OnQuorumCertificateTriggeredRankChange(
frame.Header.PublicKeySignatureBls48581 = aggregateSig
latest, err := e.clockStore.GetLatestGlobalClockFrame()
if err != nil {
e.logger.Error("could not obtain latest frame", zap.Error(err))
return
}
if latest.Header.FrameNumber+1 != frame.Header.FrameNumber ||
!bytes.Equal([]byte(latest.Identity()), frame.Header.ParentSelector) {
e.logger.Debug(
"not next frame, cannot advance",
zap.Uint64("latest_frame_number", latest.Header.FrameNumber),
zap.Uint64("new_frame_number", frame.Header.FrameNumber),
zap.String(
"latest_frame_selector",
hex.EncodeToString([]byte(latest.Identity())),
),
zap.String(
"new_frame_number",
hex.EncodeToString(frame.Header.ParentSelector),
),
)
return
}
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
@ -3249,14 +3273,10 @@ func (e *GlobalConsensusEngine) getPendingProposals(
*protobufs.GlobalFrame,
*protobufs.ProposalVote,
] {
pendingFrames, err := e.clockStore.RangeGlobalClockFrames(
frameNumber,
0xfffffffffffffffe,
)
root, err := e.clockStore.GetGlobalClockFrame(frameNumber)
if err != nil {
panic(err)
}
defer pendingFrames.Close()
result := []*models.SignedProposal[
*protobufs.GlobalFrame,
@ -3264,35 +3284,43 @@ func (e *GlobalConsensusEngine) getPendingProposals(
]{}
e.logger.Debug("getting pending proposals", zap.Uint64("start", frameNumber))
pendingFrames.First()
if !pendingFrames.Valid() {
e.logger.Debug("no valid frame")
return result
}
value, err := pendingFrames.Value()
if err != nil || value == nil {
e.logger.Debug("value was invalid", zap.Error(err))
return result
}
previous := value
for pendingFrames.Next(); pendingFrames.Valid(); pendingFrames.Next() {
value, err := pendingFrames.Value()
if err != nil || value == nil {
e.logger.Debug("iter value was invalid or empty", zap.Error(err))
break
startRank := root.Header.Rank
latestQC, err := e.clockStore.GetLatestQuorumCertificate(nil)
if err != nil {
panic(err)
}
endRank := latestQC.Rank
parent, err := e.clockStore.GetQuorumCertificate(nil, previous.GetRank())
parent, err := e.clockStore.GetQuorumCertificate(nil, startRank)
if err != nil {
panic(err)
}
priorTC, _ := e.clockStore.GetTimeoutCertificate(nil, value.GetRank()-1)
for rank := startRank + 1; rank <= endRank; rank++ {
nextQC, err := e.clockStore.GetQuorumCertificate(nil, rank)
if err != nil {
e.logger.Debug("no qc for rank", zap.Error(err))
continue
}
value, err := e.clockStore.GetGlobalClockFrameCandidate(
nextQC.FrameNumber,
[]byte(nextQC.Identity()),
)
if err != nil {
e.logger.Debug("no frame for qc", zap.Error(err))
parent = nextQC
continue
}
var priorTCModel models.TimeoutCertificate = nil
if parent.Rank != rank-1 {
priorTC, _ := e.clockStore.GetTimeoutCertificate(nil, rank-1)
if priorTC != nil {
priorTCModel = priorTC
}
}
vote := &protobufs.ProposalVote{
Rank: value.GetRank(),
@ -3319,7 +3347,7 @@ func (e *GlobalConsensusEngine) getPendingProposals(
},
Vote: &vote,
})
previous = value
parent = nextQC
}
return result
}

View File

@ -911,6 +911,8 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
e.logger.Debug(
"handling global proposal",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))),
)
@ -950,7 +952,13 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
// drop proposals if we already processed them
if frameNumber <= finalizedFrameNumber ||
proposal.State.Header.Rank <= finalizedRank {
e.logger.Debug("dropping stale proposal")
e.logger.Debug(
"dropping stale (lower than finalized) proposal",
zap.Uint64("finalized_rank", finalizedRank),
zap.Uint64("finalized_frame_number", finalizedFrameNumber),
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
)
return
}
@ -963,7 +971,11 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
if qcErr == nil && qc != nil &&
qc.GetFrameNumber() == frameNumber &&
qc.Identity() == proposal.State.Identity() {
e.logger.Debug("dropping stale proposal")
e.logger.Debug(
"dropping stale (already committed) proposal",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
)
return
}
}
@ -980,7 +992,9 @@ func (e *GlobalConsensusEngine) handleGlobalProposal(
) {
e.logger.Debug(
"parent frame not stored, requesting sync",
zap.Uint64("frame_number", proposal.State.Header.FrameNumber-1),
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
zap.Uint64("parent_frame_number", proposal.State.Header.FrameNumber-1),
)
e.cacheProposal(proposal)
@ -1041,38 +1055,65 @@ func (e *GlobalConsensusEngine) processProposal(
) bool {
e.logger.Debug(
"processing proposal",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))),
)
err := e.VerifyQuorumCertificate(proposal.ParentQuorumCertificate)
if err != nil {
e.logger.Debug("proposal has invalid qc", zap.Error(err))
e.logger.Debug(
"proposal has invalid qc",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
zap.Error(err),
)
return false
}
if proposal.PriorRankTimeoutCertificate != nil {
err := e.VerifyTimeoutCertificate(proposal.PriorRankTimeoutCertificate)
if err != nil {
e.logger.Debug("proposal has invalid tc", zap.Error(err))
e.logger.Debug(
"proposal has invalid tc",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
zap.Error(err),
)
return false
}
}
err = e.VerifyVote(&proposal.Vote)
if err != nil {
e.logger.Debug("proposal has invalid vote", zap.Error(err))
e.logger.Debug(
"proposal has invalid vote",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
zap.Error(err),
)
return false
}
err = proposal.State.Validate()
if err != nil {
e.logger.Debug("proposal is not valid", zap.Error(err))
e.logger.Debug(
"proposal is not valid",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
zap.Error(err),
)
return false
}
valid, err := e.frameValidator.Validate(proposal.State)
if !valid || err != nil {
e.logger.Debug("invalid frame in proposal", zap.Error(err))
e.logger.Debug(
"invalid frame in proposal",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", proposal.State.GetFrameNumber()),
zap.Error(err),
)
return false
}
@ -1129,6 +1170,7 @@ func (e *GlobalConsensusEngine) cacheProposal(
e.logger.Debug(
"cached out-of-order proposal",
zap.Uint64("rank", proposal.GetRank()),
zap.Uint64("frame_number", frameNumber),
zap.String("id", hex.EncodeToString([]byte(proposal.State.Identity()))),
)
@ -1165,6 +1207,7 @@ func (e *GlobalConsensusEngine) drainProposalCache(startFrame uint64) {
if !e.processProposal(prop) {
e.logger.Debug(
"cached proposal failed processing, retaining for retry",
zap.Uint64("rank", prop.GetRank()),
zap.Uint64("frame_number", next),
)
e.cacheProposal(prop)
@ -1318,18 +1361,6 @@ func (e *GlobalConsensusEngine) addCertifiedState(
return
}
if err := txn.Commit(); err != nil {
_ = txn.Abort()
e.logger.Error("could not commit transaction", zap.Error(err))
return
}
txn, err = e.clockStore.NewTransaction(false)
if err != nil {
e.logger.Error("could not create transaction", zap.Error(err))
return
}
if err := e.clockStore.PutCertifiedGlobalState(
parent,
txn,

View File

@ -29,7 +29,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/types/tries"
)
const defaultStateQueueCapacity = 10
const defaultStateQueueCapacity = 1
type syncRequest struct {
frameNumber uint64

View File

@ -38,6 +38,7 @@ type DataWorkerIPCServer struct {
signerRegistry consensus.SignerRegistry
proverRegistry consensus.ProverRegistry
peerInfoManager tp2p.PeerInfoManager
pubsub tp2p.PubSub
authProvider channel.AuthenticationProvider
appConsensusEngineFactory *app.AppConsensusEngineFactory
appConsensusEngine *app.AppConsensusEngine
@ -52,6 +53,7 @@ func NewDataWorkerIPCServer(
signerRegistry consensus.SignerRegistry,
proverRegistry consensus.ProverRegistry,
peerInfoManager tp2p.PeerInfoManager,
pubsub tp2p.PubSub,
frameProver crypto.FrameProver,
appConsensusEngineFactory *app.AppConsensusEngineFactory,
logger *zap.Logger,
@ -89,6 +91,7 @@ func NewDataWorkerIPCServer(
logger: logger,
coreId: coreId,
parentProcessId: parentProcessId,
pubsub: pubsub,
signer: signer,
appConsensusEngineFactory: appConsensusEngineFactory,
signerRegistry: signerRegistry,
@ -108,6 +111,7 @@ func (r *DataWorkerIPCServer) Start() error {
func (r *DataWorkerIPCServer) Stop() error {
r.logger.Info("stopping server gracefully")
r.pubsub.Close()
if r.server != nil {
r.server.GracefulStop()
}

View File

@ -1606,3 +1606,8 @@ func getNetworkNamespace(network uint8) string {
return ANNOUNCE_PREFIX + network_name
}
// Close implements p2p.PubSub.
func (b *BlossomSub) Close() error {
return nil
}

View File

@ -26,6 +26,7 @@ type ProxyBlossomSub struct {
client *PubSubProxyClient
conn *grpc.ClientConn
logger *zap.Logger
cancel context.CancelFunc
coreId uint
}
@ -144,10 +145,13 @@ func NewProxyBlossomSub(
return nil, errors.Wrap(err, "new proxy blossom sub")
}
ctx, cancel := context.WithCancel(context.Background())
// Create the proxy client
client := NewPubSubProxyClient(conn, logger)
client := NewPubSubProxyClient(ctx, conn, logger)
return &ProxyBlossomSub{
cancel: cancel,
client: client,
conn: conn,
logger: logger,
@ -157,6 +161,7 @@ func NewProxyBlossomSub(
// Close closes the proxy connection
func (p *ProxyBlossomSub) Close() error {
p.cancel()
if p.conn != nil {
return p.conn.Close()
}

View File

@ -520,6 +520,7 @@ type PubSubProxyClient struct {
client protobufs.PubSubProxyClient
conn *grpc.ClientConn
logger *zap.Logger
ctx context.Context
// Track active subscriptions and validators
subscriptions map[string]context.CancelFunc
@ -530,8 +531,14 @@ type PubSubProxyClient struct {
mu sync.RWMutex
}
// Close implements p2p.PubSub.
func (c *PubSubProxyClient) Close() error {
return nil
}
// NewPubSubProxyClient creates a new proxy client
func NewPubSubProxyClient(
ctx context.Context,
conn *grpc.ClientConn,
logger *zap.Logger,
) *PubSubProxyClient {
@ -539,6 +546,7 @@ func NewPubSubProxyClient(
client: protobufs.NewPubSubProxyClient(conn),
conn: conn,
logger: logger,
ctx: ctx,
subscriptions: make(map[string]context.CancelFunc),
validators: make(map[string]func(
peer.ID,
@ -555,31 +563,40 @@ func NewPubSubProxyClient(
time.Sleep(10 * time.Second)
// Initialize validator stream
if err := client.initValidatorStream(); err != nil {
if err := client.initValidatorStream(ctx); err != nil {
logger.Error("failed to initialize validator stream", zap.Error(err))
}
return client
}
func (c *PubSubProxyClient) initValidatorStream() error {
func (c *PubSubProxyClient) initValidatorStream(ctx context.Context) error {
backoff := time.Second
for {
stream, err := c.client.ValidatorStream(context.Background())
select {
case <-ctx.Done():
return nil
default:
stream, err := c.client.ValidatorStream(ctx)
if err != nil {
c.logger.Error(
"validator stream connect failed, retrying",
zap.Error(err),
zap.Duration("retry_in", backoff),
)
time.Sleep(backoff)
select {
case <-ctx.Done():
return nil
case <-time.After(backoff):
if backoff < 30*time.Second {
backoff *= 2
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
}
}
continue
}
@ -588,14 +605,19 @@ func (c *PubSubProxyClient) initValidatorStream() error {
c.validatorStreamMu.Unlock()
// Start goroutine to handle incoming validation requests
go c.handleValidationRequests()
go c.handleValidationRequests(ctx)
return nil
}
}
}
func (c *PubSubProxyClient) handleValidationRequests() {
func (c *PubSubProxyClient) handleValidationRequests(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
msg, err := c.validatorStream.Recv()
if err != nil {
c.logger.Error("validator stream recv error", zap.Error(err))
@ -604,7 +626,7 @@ func (c *PubSubProxyClient) handleValidationRequests() {
c.validatorStreamMu.Unlock()
// Try to reconnect
time.Sleep(1 * time.Second)
if err := c.initValidatorStream(); err != nil {
if err := c.initValidatorStream(ctx); err != nil {
c.logger.Error(
"failed to reinitialize validator stream",
zap.Error(err),
@ -668,6 +690,7 @@ func (c *PubSubProxyClient) handleValidationRequests() {
}
}
}
}
// Ensure PubSubProxyClient implements p2p.PubSub
var _ p2p.PubSub = (*PubSubProxyClient)(nil)
@ -819,7 +842,7 @@ func (c *PubSubProxyClient) RegisterValidator(
if c.validatorStream == nil {
c.validatorStreamMu.Unlock()
// Try to initialize stream if not already done
if err := c.initValidatorStream(); err != nil {
if err := c.initValidatorStream(c.ctx); err != nil {
c.mu.Lock()
delete(c.validators, validatorID)
delete(c.bitmaskValidators, bitmaskKey)

View File

@ -539,14 +539,70 @@ func (w *WorkerManager) loadWorkersFromStore() error {
return errors.Wrap(err, "load workers from store")
}
if len(workers) != w.config.Engine.DataWorkerCount {
for i := range w.config.Engine.DataWorkerCount {
_, err := w.getIPCOfWorker(uint(i + 1))
if err != nil {
w.logger.Error("could not obtain IPC for worker", zap.Error(err))
if len(workers) != int(w.config.Engine.DataWorkerCount) {
existingWorkers := make(map[uint]*typesStore.WorkerInfo, len(workers))
for _, worker := range workers {
existingWorkers[worker.CoreId] = worker
}
// Ensure all configured workers exist
for i := uint(1); i <= uint(w.config.Engine.DataWorkerCount); i++ {
if _, ok := existingWorkers[i]; ok {
continue
}
if _, err := w.getIPCOfWorker(i); err != nil {
w.logger.Error(
"could not obtain IPC for worker",
zap.Uint("core_id", i),
zap.Error(err),
)
}
}
// Remove workers beyond configured count
for _, worker := range workers {
if worker.CoreId <= uint(w.config.Engine.DataWorkerCount) {
continue
}
txn, err := w.store.NewTransaction(false)
if err != nil {
w.logger.Error(
"could not create txn to delete worker",
zap.Uint("core_id", worker.CoreId),
zap.Error(err),
)
continue
}
if err := w.store.DeleteWorker(txn, worker.CoreId); err != nil {
_ = txn.Abort()
w.logger.Error(
"could not delete worker",
zap.Uint("core_id", worker.CoreId),
zap.Error(err),
)
}
if err := txn.Commit(); err != nil {
_ = txn.Abort()
w.logger.Error(
"could not commit worker delete",
zap.Uint("core_id", worker.CoreId),
zap.Error(err),
)
}
if client, ok := w.serviceClients[worker.CoreId]; ok {
_ = client.Close()
delete(w.serviceClients, worker.CoreId)
}
delete(w.filtersByWorker, worker.CoreId)
delete(w.allocatedWorkers, worker.CoreId)
if len(worker.Filter) > 0 {
delete(w.workersByFilter, string(worker.Filter))
}
}
workers, err = w.store.RangeWorkers()
if err != nil {
return errors.Wrap(err, "load workers from store")

View File

@ -18,6 +18,11 @@ type MockPubSub struct {
mock.Mock
}
// Close implements p2p.PubSub.
func (m *MockPubSub) Close() error {
return nil
}
// GetOwnMultiaddrs implements p2p.PubSub.
func (m *MockPubSub) GetOwnMultiaddrs() []multiaddr.Multiaddr {
args := m.Called()

View File

@ -20,6 +20,7 @@ const (
)
type PubSub interface {
Close() error
PublishToBitmask(bitmask []byte, data []byte) error
Publish(address []byte, data []byte) error
Subscribe(bitmask []byte, handler func(message *pb.Message) error) error