diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index 64e2470..cb21579 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -116,7 +116,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( self, effectiveSeniority := e.allocationContext() // Still reconcile allocations even when all workers appear // allocated - this clears stale filters that no longer match - // prover allocations on-chain. + // prover allocations in the registry. e.reconcileWorkerAllocations(data.Frame.Header.FrameNumber, self) e.checkExcessPendingJoins(self, data.Frame.Header.FrameNumber) e.logAllocationStatusOnly(ctx, data, self, effectiveSeniority) @@ -279,7 +279,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( const pendingFilterGraceFrames = 720 // proposalTimeoutFrames is the number of frames to wait for a join proposal -// to appear on-chain before clearing the worker's filter. If a proposal is +// to appear in the registry before clearing the worker's filter. If a proposal is // submitted but never lands (e.g., network issues, not included in frame), // we should reset the filter so the worker can try again. const proposalTimeoutFrames = 10 @@ -480,7 +480,8 @@ func (e *GlobalConsensusEngine) evaluateForProposals( } else if len(proposalDescriptors) != 0 && !allowProposals { e.logger.Info( "skipping join proposals", - zap.String("reason", "all workers already assigned filters"), + zap.String("reason", "all workers have local filters but some may not be allocated in registry"), + zap.Int("unallocated_shards", len(proposalDescriptors)), zap.Uint64("frame_number", data.Frame.Header.FrameNumber), ) } @@ -636,6 +637,11 @@ func (e *GlobalConsensusEngine) reconcileWorkerAllocations( // Immediately clear workers whose allocations were rejected // (no grace period needed - the rejection is definitive) if _, rejected := rejectedFilters[string(worker.Filter)]; rejected { + e.logger.Info( + "clearing rejected worker filter", + zap.Uint("core_id", worker.CoreId), + zap.String("filter", hex.EncodeToString(worker.Filter)), + ) worker.Filter = nil worker.Allocated = false worker.PendingFilterFrame = 0 @@ -653,7 +659,7 @@ func (e *GlobalConsensusEngine) reconcileWorkerAllocations( if frameNumber <= worker.PendingFilterFrame { continue } - // Worker has a filter set from a proposal, but no on-chain allocation + // Worker has a filter set from a proposal, but no registry allocation // exists for this filter. Use shorter timeout since the proposal // likely didn't land at all. if frameNumber-worker.PendingFilterFrame < proposalTimeoutFrames { @@ -661,10 +667,27 @@ func (e *GlobalConsensusEngine) reconcileWorkerAllocations( } } + // If we can't get prover info (self == nil) and the worker has a filter + // with PendingFilterFrame == 0 (not from a recent proposal), log a warning + // but still clear it after a grace period to avoid stuck state if worker.PendingFilterFrame == 0 && self == nil { - continue + e.logger.Warn( + "worker has orphaned filter with no prover info available", + zap.Uint("core_id", worker.CoreId), + zap.String("filter", hex.EncodeToString(worker.Filter)), + zap.Bool("allocated", worker.Allocated), + ) + // Still clear it - if we can't verify the allocation, assume it's stale } + e.logger.Info( + "clearing stale worker filter", + zap.Uint("core_id", worker.CoreId), + zap.String("filter", hex.EncodeToString(worker.Filter)), + zap.Bool("was_allocated", worker.Allocated), + zap.Uint64("pending_frame", worker.PendingFilterFrame), + zap.Bool("self_nil", self == nil), + ) worker.Filter = nil worker.Allocated = false worker.PendingFilterFrame = 0 diff --git a/node/consensus/global/event_distributor_test.go b/node/consensus/global/event_distributor_test.go index 143e59a..34329d0 100644 --- a/node/consensus/global/event_distributor_test.go +++ b/node/consensus/global/event_distributor_test.go @@ -347,7 +347,7 @@ func TestReconcileWorkerAllocations_UnconfirmedProposalClearsAfterTimeout(t *tes workerManager: wm, } - // Prover has no allocations at all - the proposal never landed on-chain + // Prover has no allocations at all - the proposal never landed in registry self := &typesconsensus.ProverInfo{ Address: []byte("prover-address"), Allocations: []typesconsensus.ProverAllocationInfo{},