From 9b35520575090c2c063ea81abd5d06064df0f09d Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Tue, 17 Feb 2026 21:47:56 -0600 Subject: [PATCH] fix: resolve abandoned prover joins --- go-multiaddr-dns/resolve.go | 2 +- node/consensus/global/event_distributor.go | 17 ++- .../global/event_distributor_test.go | 131 ++++++++++++++++++ .../global/global_consensus_engine.go | 7 + .../intrinsics/global/global_prover_join.go | 5 +- 5 files changed, 158 insertions(+), 4 deletions(-) diff --git a/go-multiaddr-dns/resolve.go b/go-multiaddr-dns/resolve.go index 23e6e5e..ae5f8e5 100644 --- a/go-multiaddr-dns/resolve.go +++ b/go-multiaddr-dns/resolve.go @@ -18,7 +18,7 @@ var ( var ( ResolvableProtocols = []ma.Protocol{dnsaddrProtocol, dns4Protocol, dns6Protocol, dnsProtocol} - DefaultResolver = &Resolver{def: net.DefaultResolver} + DefaultResolver = &Resolver{def: &net.Resolver{PreferGo: true}} ) const maxResolvedAddrs = 100 diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index 2b308d7..5c9b561 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -1034,7 +1034,20 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot( if self != nil { for _, allocation := range self.Allocations { if bytes.Equal(allocation.ConfirmationFilter, bp) { - allocated = allocation.Status != 4 + allocated = allocation.Status != typesconsensus.ProverStatusLeaving + + // Treat expired joins and leaves as unallocated so the + // proposer will submit a fresh join instead of sitting + // in limbo. + if allocation.Status == typesconsensus.ProverStatusJoining && + data.Frame.Header.FrameNumber > allocation.JoinFrameNumber+pendingFilterGraceFrames { + allocated = false + } + if allocation.Status == typesconsensus.ProverStatusLeaving && + data.Frame.Header.FrameNumber > allocation.LeaveFrameNumber+pendingFilterGraceFrames { + allocated = false + } + if allocation.Status == typesconsensus.ProverStatusJoining { shardsPending++ awaitingFrame[allocation.JoinFrameNumber+360] = struct{}{} @@ -1187,7 +1200,7 @@ func (e *GlobalConsensusEngine) checkExcessPendingJoins( self *typesconsensus.ProverInfo, frameNumber uint64, ) { - excessFilters := e.selectExcessPendingFilters(self) + excessFilters := e.selectExcessPendingFilters(self, frameNumber) if len(excessFilters) != 0 { e.logger.Debug( "identified excess pending joins", diff --git a/node/consensus/global/event_distributor_test.go b/node/consensus/global/event_distributor_test.go index 34329d0..b6bd03f 100644 --- a/node/consensus/global/event_distributor_test.go +++ b/node/consensus/global/event_distributor_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "source.quilibrium.com/quilibrium/monorepo/config" typesconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus" "source.quilibrium.com/quilibrium/monorepo/types/store" "source.quilibrium.com/quilibrium/monorepo/types/worker" @@ -411,3 +412,133 @@ func TestReconcileWorkerAllocations_UnconfirmedProposalWithNilSelf(t *testing.T) require.Len(t, workers, 1) assert.Nil(t, workers[0].Filter, "filter should be cleared after timeout even with nil self") } + +func TestSelectExcessPendingFilters_ExpiredJoinsNotCounted(t *testing.T) { + engine := &GlobalConsensusEngine{ + logger: zap.NewNop(), + config: &config.Config{ + Engine: &config.EngineConfig{ + DataWorkerCount: 2, + }, + }, + } + + filter1 := []byte("shard-filter-1") + filter2 := []byte("shard-filter-2") + filter3 := []byte("shard-filter-3") + + joinFrame := uint64(260000) + + // 3 pending joins: 2 expired, 1 valid. Capacity = 2, active = 0. + // Without the fix, all 3 count as pending, allowedPending = 2, excess = 1, + // and the valid join might be randomly selected for rejection. + // With the fix, only the valid join counts, so excess = 0. + self := &typesconsensus.ProverInfo{ + Address: []byte("prover-address"), + Allocations: []typesconsensus.ProverAllocationInfo{ + { + Status: typesconsensus.ProverStatusJoining, + ConfirmationFilter: filter1, + JoinFrameNumber: joinFrame, + }, + { + Status: typesconsensus.ProverStatusJoining, + ConfirmationFilter: filter2, + JoinFrameNumber: joinFrame, + }, + { + Status: typesconsensus.ProverStatusJoining, + ConfirmationFilter: filter3, + JoinFrameNumber: joinFrame + 500, // recent, not expired + }, + }, + } + + // Frame is past the grace period for filter1 and filter2 but not filter3 + frameNumber := joinFrame + pendingFilterGraceFrames + 1 + + excess := engine.selectExcessPendingFilters(self, frameNumber) + assert.Empty(t, excess, "expired joins should not count toward pending limit") +} + +func TestSelectExcessPendingFilters_ValidJoinsStillLimited(t *testing.T) { + engine := &GlobalConsensusEngine{ + logger: zap.NewNop(), + config: &config.Config{ + Engine: &config.EngineConfig{ + DataWorkerCount: 1, + }, + }, + } + + filter1 := []byte("shard-filter-1") + filter2 := []byte("shard-filter-2") + + joinFrame := uint64(260000) + + // 2 valid pending joins, capacity = 1, active = 0 → excess = 1 + self := &typesconsensus.ProverInfo{ + Address: []byte("prover-address"), + Allocations: []typesconsensus.ProverAllocationInfo{ + { + Status: typesconsensus.ProverStatusJoining, + ConfirmationFilter: filter1, + JoinFrameNumber: joinFrame, + }, + { + Status: typesconsensus.ProverStatusJoining, + ConfirmationFilter: filter2, + JoinFrameNumber: joinFrame, + }, + }, + } + + frameNumber := joinFrame + 100 // well within grace period + + excess := engine.selectExcessPendingFilters(self, frameNumber) + assert.Len(t, excess, 1, "should identify 1 excess pending join") +} + +func TestSelectExcessPendingFilters_MixedActiveAndExpired(t *testing.T) { + engine := &GlobalConsensusEngine{ + logger: zap.NewNop(), + config: &config.Config{ + Engine: &config.EngineConfig{ + DataWorkerCount: 2, + }, + }, + } + + filter1 := []byte("shard-filter-1") + filter2 := []byte("shard-filter-2") + filter3 := []byte("shard-filter-3") + + // 1 active + 1 expired joining + 1 valid joining. Capacity = 2. + // Active uses 1 slot, so allowedPending = 1. + // Expired join should not count, leaving 1 valid pending → no excess. + self := &typesconsensus.ProverInfo{ + Address: []byte("prover-address"), + Allocations: []typesconsensus.ProverAllocationInfo{ + { + Status: typesconsensus.ProverStatusActive, + ConfirmationFilter: filter1, + JoinFrameNumber: 200000, + }, + { + Status: typesconsensus.ProverStatusJoining, + ConfirmationFilter: filter2, + JoinFrameNumber: 250000, // expired + }, + { + Status: typesconsensus.ProverStatusJoining, + ConfirmationFilter: filter3, + JoinFrameNumber: 260000, // valid + }, + }, + } + + frameNumber := uint64(260500) // past 250000+720 but not 260000+720 + + excess := engine.selectExcessPendingFilters(self, frameNumber) + assert.Empty(t, excess, "expired joins should be excluded; 1 active + 1 valid pending fits capacity 2") +} diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 5acd02b..03a38e7 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -2241,6 +2241,7 @@ func (e *GlobalConsensusEngine) joinProposalReady( func (e *GlobalConsensusEngine) selectExcessPendingFilters( self *typesconsensus.ProverInfo, + frameNumber uint64, ) [][]byte { if self == nil || e.config == nil || e.config.Engine == nil { e.logger.Debug("excess pending evaluation skipped: missing config or prover info") @@ -2264,6 +2265,12 @@ func (e *GlobalConsensusEngine) selectExcessPendingFilters( case typesconsensus.ProverStatusActive: active++ case typesconsensus.ProverStatusJoining: + // Skip expired joins — they are implicitly rejected and should + // not count toward the pending limit or be candidates for + // explicit rejection. + if frameNumber > allocation.JoinFrameNumber+pendingFilterGraceFrames { + continue + } filterCopy := make([]byte, len(allocation.ConfirmationFilter)) copy(filterCopy, allocation.ConfirmationFilter) pending = append(pending, filterCopy) diff --git a/node/execution/intrinsics/global/global_prover_join.go b/node/execution/intrinsics/global/global_prover_join.go index 46d7598..58a84f1 100644 --- a/node/execution/intrinsics/global/global_prover_join.go +++ b/node/execution/intrinsics/global/global_prover_join.go @@ -926,7 +926,10 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) { if !expired { return false, errors.Wrap( - errors.New("prover already exists in non-left state"), + fmt.Errorf( + "prover already exists in non-left state (status=%d, frame=%d)", + status, frameNumber, + ), "verify: invalid prover join", ) }