fix: resolve abandoned prover joins

This commit is contained in:
Cassandra Heart 2026-02-17 21:47:56 -06:00
parent 1a838d097b
commit 9b35520575
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
5 changed files with 158 additions and 4 deletions

View File

@ -18,7 +18,7 @@ var (
var (
ResolvableProtocols = []ma.Protocol{dnsaddrProtocol, dns4Protocol, dns6Protocol, dnsProtocol}
DefaultResolver = &Resolver{def: net.DefaultResolver}
DefaultResolver = &Resolver{def: &net.Resolver{PreferGo: true}}
)
const maxResolvedAddrs = 100

View File

@ -1034,7 +1034,20 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
if self != nil {
for _, allocation := range self.Allocations {
if bytes.Equal(allocation.ConfirmationFilter, bp) {
allocated = allocation.Status != 4
allocated = allocation.Status != typesconsensus.ProverStatusLeaving
// Treat expired joins and leaves as unallocated so the
// proposer will submit a fresh join instead of sitting
// in limbo.
if allocation.Status == typesconsensus.ProverStatusJoining &&
data.Frame.Header.FrameNumber > allocation.JoinFrameNumber+pendingFilterGraceFrames {
allocated = false
}
if allocation.Status == typesconsensus.ProverStatusLeaving &&
data.Frame.Header.FrameNumber > allocation.LeaveFrameNumber+pendingFilterGraceFrames {
allocated = false
}
if allocation.Status == typesconsensus.ProverStatusJoining {
shardsPending++
awaitingFrame[allocation.JoinFrameNumber+360] = struct{}{}
@ -1187,7 +1200,7 @@ func (e *GlobalConsensusEngine) checkExcessPendingJoins(
self *typesconsensus.ProverInfo,
frameNumber uint64,
) {
excessFilters := e.selectExcessPendingFilters(self)
excessFilters := e.selectExcessPendingFilters(self, frameNumber)
if len(excessFilters) != 0 {
e.logger.Debug(
"identified excess pending joins",

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/config"
typesconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus"
"source.quilibrium.com/quilibrium/monorepo/types/store"
"source.quilibrium.com/quilibrium/monorepo/types/worker"
@ -411,3 +412,133 @@ func TestReconcileWorkerAllocations_UnconfirmedProposalWithNilSelf(t *testing.T)
require.Len(t, workers, 1)
assert.Nil(t, workers[0].Filter, "filter should be cleared after timeout even with nil self")
}
func TestSelectExcessPendingFilters_ExpiredJoinsNotCounted(t *testing.T) {
engine := &GlobalConsensusEngine{
logger: zap.NewNop(),
config: &config.Config{
Engine: &config.EngineConfig{
DataWorkerCount: 2,
},
},
}
filter1 := []byte("shard-filter-1")
filter2 := []byte("shard-filter-2")
filter3 := []byte("shard-filter-3")
joinFrame := uint64(260000)
// 3 pending joins: 2 expired, 1 valid. Capacity = 2, active = 0.
// Without the fix, all 3 count as pending, allowedPending = 2, excess = 1,
// and the valid join might be randomly selected for rejection.
// With the fix, only the valid join counts, so excess = 0.
self := &typesconsensus.ProverInfo{
Address: []byte("prover-address"),
Allocations: []typesconsensus.ProverAllocationInfo{
{
Status: typesconsensus.ProverStatusJoining,
ConfirmationFilter: filter1,
JoinFrameNumber: joinFrame,
},
{
Status: typesconsensus.ProverStatusJoining,
ConfirmationFilter: filter2,
JoinFrameNumber: joinFrame,
},
{
Status: typesconsensus.ProverStatusJoining,
ConfirmationFilter: filter3,
JoinFrameNumber: joinFrame + 500, // recent, not expired
},
},
}
// Frame is past the grace period for filter1 and filter2 but not filter3
frameNumber := joinFrame + pendingFilterGraceFrames + 1
excess := engine.selectExcessPendingFilters(self, frameNumber)
assert.Empty(t, excess, "expired joins should not count toward pending limit")
}
func TestSelectExcessPendingFilters_ValidJoinsStillLimited(t *testing.T) {
engine := &GlobalConsensusEngine{
logger: zap.NewNop(),
config: &config.Config{
Engine: &config.EngineConfig{
DataWorkerCount: 1,
},
},
}
filter1 := []byte("shard-filter-1")
filter2 := []byte("shard-filter-2")
joinFrame := uint64(260000)
// 2 valid pending joins, capacity = 1, active = 0 → excess = 1
self := &typesconsensus.ProverInfo{
Address: []byte("prover-address"),
Allocations: []typesconsensus.ProverAllocationInfo{
{
Status: typesconsensus.ProverStatusJoining,
ConfirmationFilter: filter1,
JoinFrameNumber: joinFrame,
},
{
Status: typesconsensus.ProverStatusJoining,
ConfirmationFilter: filter2,
JoinFrameNumber: joinFrame,
},
},
}
frameNumber := joinFrame + 100 // well within grace period
excess := engine.selectExcessPendingFilters(self, frameNumber)
assert.Len(t, excess, 1, "should identify 1 excess pending join")
}
func TestSelectExcessPendingFilters_MixedActiveAndExpired(t *testing.T) {
engine := &GlobalConsensusEngine{
logger: zap.NewNop(),
config: &config.Config{
Engine: &config.EngineConfig{
DataWorkerCount: 2,
},
},
}
filter1 := []byte("shard-filter-1")
filter2 := []byte("shard-filter-2")
filter3 := []byte("shard-filter-3")
// 1 active + 1 expired joining + 1 valid joining. Capacity = 2.
// Active uses 1 slot, so allowedPending = 1.
// Expired join should not count, leaving 1 valid pending → no excess.
self := &typesconsensus.ProverInfo{
Address: []byte("prover-address"),
Allocations: []typesconsensus.ProverAllocationInfo{
{
Status: typesconsensus.ProverStatusActive,
ConfirmationFilter: filter1,
JoinFrameNumber: 200000,
},
{
Status: typesconsensus.ProverStatusJoining,
ConfirmationFilter: filter2,
JoinFrameNumber: 250000, // expired
},
{
Status: typesconsensus.ProverStatusJoining,
ConfirmationFilter: filter3,
JoinFrameNumber: 260000, // valid
},
},
}
frameNumber := uint64(260500) // past 250000+720 but not 260000+720
excess := engine.selectExcessPendingFilters(self, frameNumber)
assert.Empty(t, excess, "expired joins should be excluded; 1 active + 1 valid pending fits capacity 2")
}

View File

@ -2241,6 +2241,7 @@ func (e *GlobalConsensusEngine) joinProposalReady(
func (e *GlobalConsensusEngine) selectExcessPendingFilters(
self *typesconsensus.ProverInfo,
frameNumber uint64,
) [][]byte {
if self == nil || e.config == nil || e.config.Engine == nil {
e.logger.Debug("excess pending evaluation skipped: missing config or prover info")
@ -2264,6 +2265,12 @@ func (e *GlobalConsensusEngine) selectExcessPendingFilters(
case typesconsensus.ProverStatusActive:
active++
case typesconsensus.ProverStatusJoining:
// Skip expired joins — they are implicitly rejected and should
// not count toward the pending limit or be candidates for
// explicit rejection.
if frameNumber > allocation.JoinFrameNumber+pendingFilterGraceFrames {
continue
}
filterCopy := make([]byte, len(allocation.ConfirmationFilter))
copy(filterCopy, allocation.ConfirmationFilter)
pending = append(pending, filterCopy)

View File

@ -926,7 +926,10 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) {
if !expired {
return false, errors.Wrap(
errors.New("prover already exists in non-left state"),
fmt.Errorf(
"prover already exists in non-left state (status=%d, frame=%d)",
status, frameNumber,
),
"verify: invalid prover join",
)
}