stale workers should be cleared

This commit is contained in:
Cassandra Heart 2026-01-26 23:38:44 -06:00
parent 1e7d5331d2
commit 4b1adde2e6
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
2 changed files with 29 additions and 6 deletions

View File

@ -116,7 +116,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop(
self, effectiveSeniority := e.allocationContext()
// Still reconcile allocations even when all workers appear
// allocated - this clears stale filters that no longer match
// prover allocations on-chain.
// prover allocations in the registry.
e.reconcileWorkerAllocations(data.Frame.Header.FrameNumber, self)
e.checkExcessPendingJoins(self, data.Frame.Header.FrameNumber)
e.logAllocationStatusOnly(ctx, data, self, effectiveSeniority)
@ -279,7 +279,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop(
const pendingFilterGraceFrames = 720
// proposalTimeoutFrames is the number of frames to wait for a join proposal
// to appear on-chain before clearing the worker's filter. If a proposal is
// to appear in the registry before clearing the worker's filter. If a proposal is
// submitted but never lands (e.g., network issues, not included in frame),
// we should reset the filter so the worker can try again.
const proposalTimeoutFrames = 10
@ -480,7 +480,8 @@ func (e *GlobalConsensusEngine) evaluateForProposals(
} else if len(proposalDescriptors) != 0 && !allowProposals {
e.logger.Info(
"skipping join proposals",
zap.String("reason", "all workers already assigned filters"),
zap.String("reason", "all workers have local filters but some may not be allocated in registry"),
zap.Int("unallocated_shards", len(proposalDescriptors)),
zap.Uint64("frame_number", data.Frame.Header.FrameNumber),
)
}
@ -636,6 +637,11 @@ func (e *GlobalConsensusEngine) reconcileWorkerAllocations(
// Immediately clear workers whose allocations were rejected
// (no grace period needed - the rejection is definitive)
if _, rejected := rejectedFilters[string(worker.Filter)]; rejected {
e.logger.Info(
"clearing rejected worker filter",
zap.Uint("core_id", worker.CoreId),
zap.String("filter", hex.EncodeToString(worker.Filter)),
)
worker.Filter = nil
worker.Allocated = false
worker.PendingFilterFrame = 0
@ -653,7 +659,7 @@ func (e *GlobalConsensusEngine) reconcileWorkerAllocations(
if frameNumber <= worker.PendingFilterFrame {
continue
}
// Worker has a filter set from a proposal, but no on-chain allocation
// Worker has a filter set from a proposal, but no registry allocation
// exists for this filter. Use shorter timeout since the proposal
// likely didn't land at all.
if frameNumber-worker.PendingFilterFrame < proposalTimeoutFrames {
@ -661,10 +667,27 @@ func (e *GlobalConsensusEngine) reconcileWorkerAllocations(
}
}
// If we can't get prover info (self == nil) and the worker has a filter
// with PendingFilterFrame == 0 (not from a recent proposal), log a warning
// but still clear it after a grace period to avoid stuck state
if worker.PendingFilterFrame == 0 && self == nil {
continue
e.logger.Warn(
"worker has orphaned filter with no prover info available",
zap.Uint("core_id", worker.CoreId),
zap.String("filter", hex.EncodeToString(worker.Filter)),
zap.Bool("allocated", worker.Allocated),
)
// Still clear it - if we can't verify the allocation, assume it's stale
}
e.logger.Info(
"clearing stale worker filter",
zap.Uint("core_id", worker.CoreId),
zap.String("filter", hex.EncodeToString(worker.Filter)),
zap.Bool("was_allocated", worker.Allocated),
zap.Uint64("pending_frame", worker.PendingFilterFrame),
zap.Bool("self_nil", self == nil),
)
worker.Filter = nil
worker.Allocated = false
worker.PendingFilterFrame = 0

View File

@ -347,7 +347,7 @@ func TestReconcileWorkerAllocations_UnconfirmedProposalClearsAfterTimeout(t *tes
workerManager: wm,
}
// Prover has no allocations at all - the proposal never landed on-chain
// Prover has no allocations at all - the proposal never landed in registry
self := &typesconsensus.ProverInfo{
Address: []byte("prover-address"),
Allocations: []typesconsensus.ProverAllocationInfo{},