fix: expired prover join frames, starting port ranges, proposer getting stuck, and seniority on joins

This commit is contained in:
Cassandra Heart 2026-02-19 22:26:48 -06:00
parent 15ed550130
commit 5733047c3b
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
5 changed files with 270 additions and 104 deletions

View File

@ -6,8 +6,8 @@ const (
defaultMinimumPeersRequired = 3
priorDefaultDataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d"
defaultDataWorkerBaseListenMultiaddr = "/ip4/0.0.0.0/tcp/%d"
defaultDataWorkerBaseP2PPort = uint16(50000)
defaultDataWorkerBaseStreamPort = uint16(60000)
defaultDataWorkerBaseP2PPort = uint16(25000)
defaultDataWorkerBaseStreamPort = uint16(32500)
defaultDataWorkerMemoryLimit = int64(1792 * 1024 * 1024) // 1.75 GiB
defaultSyncTimeout = 4 * time.Second
defaultSyncCandidates = 8

View File

@ -613,6 +613,7 @@ func (e *GlobalConsensusEngine) evaluateForProposals(
decideDescriptors := snapshot.decideDescriptors
worldBytes := snapshot.worldBytes
joinProposedThisCycle := false
if len(proposalDescriptors) != 0 && allowProposals {
if canPropose {
proposals, err := e.proposer.PlanAndAllocate(
@ -626,6 +627,7 @@ func (e *GlobalConsensusEngine) evaluateForProposals(
e.logger.Error("could not plan shard allocations", zap.Error(err))
} else {
if len(proposals) > 0 {
joinProposedThisCycle = true
e.lastJoinAttemptFrame.Store(data.Frame.Header.FrameNumber)
}
expectedRewardSum := big.NewInt(0)
@ -665,6 +667,50 @@ func (e *GlobalConsensusEngine) evaluateForProposals(
zap.Uint64("frame_number", data.Frame.Header.FrameNumber),
)
}
// Standalone seniority merge: when no join was proposed this cycle but the
// prover exists with incorrect seniority, submit a seniority merge to fix
// it. This covers the case where all worker slots are filled and no new
// joins are being proposed.
if !joinProposedThisCycle && self != nil {
frameNum := data.Frame.Header.FrameNumber
mergeSeniority := e.estimateSeniorityFromConfig()
if mergeSeniority > self.Seniority {
lastJoin := e.lastJoinAttemptFrame.Load()
lastMerge := e.lastSeniorityMergeFrame.Load()
joinCooldownOk := lastJoin == 0 || frameNum-lastJoin >= 10
mergeCooldownOk := lastMerge == 0 || frameNum-lastMerge >= 10
if joinCooldownOk && mergeCooldownOk {
frame := e.GetFrame()
if frame != nil {
helpers, peerIds := e.buildMergeHelpers()
err := e.submitSeniorityMerge(
frame, helpers, mergeSeniority, peerIds,
)
if err != nil {
e.logger.Error(
"could not submit seniority merge",
zap.Error(err),
)
} else {
e.lastSeniorityMergeFrame.Store(frameNum)
}
}
} else {
e.logger.Debug(
"seniority merge deferred due to cooldown",
zap.Uint64("merge_seniority", mergeSeniority),
zap.Uint64("existing_seniority", self.Seniority),
zap.Uint64("last_join_frame", lastJoin),
zap.Uint64("last_merge_frame", lastMerge),
zap.Uint64("current_frame", frameNum),
)
}
}
}
if len(pendingFilters) != 0 {
if err := e.proposer.DecideJoins(
uint64(data.Frame.Header.Difficulty),
@ -1078,7 +1124,8 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
allocated = false
}
if allocation.Status == typesconsensus.ProverStatusJoining {
if allocation.Status == typesconsensus.ProverStatusJoining &&
data.Frame.Header.FrameNumber <= allocation.JoinFrameNumber+pendingFilterGraceFrames {
shardsPending++
awaitingFrame[allocation.JoinFrameNumber+360] = struct{}{}
}

View File

@ -152,8 +152,9 @@ type GlobalConsensusEngine struct {
alertPublicKey []byte
hasSentKeyBundle bool
proverSyncInProgress atomic.Bool
lastJoinAttemptFrame atomic.Uint64
lastObservedFrame atomic.Uint64
lastJoinAttemptFrame atomic.Uint64
lastSeniorityMergeFrame atomic.Uint64
lastObservedFrame atomic.Uint64
lastRejectFrame atomic.Uint64
proverRootVerifiedFrame atomic.Uint64
proverRootSynced atomic.Bool
@ -3360,28 +3361,15 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin(
mergeSeniority = mergeSeniorityBI.Uint64()
}
// If prover already exists, submit a seniority merge if needed, then
// fall through to the join — the filters being proposed are only for
// unallocated shards (already filtered by collectAllocationSnapshot),
// so the join is valid even when the prover has existing allocations.
// Always include merge targets in the join — Materialize handles
// seniority for both new and existing provers. A separate seniority
// merge is not submitted because it would double-count with the join.
if proverExists {
if mergeSeniority > info.Seniority {
e.logger.Info(
"existing prover has lower seniority than merge would provide, submitting seniority merge",
zap.Uint64("existing_seniority", info.Seniority),
zap.Uint64("merge_seniority", mergeSeniority),
zap.Strings("peer_ids", peerIds),
)
if mergeErr := e.submitSeniorityMerge(frame, helpers, mergeSeniority, peerIds); mergeErr != nil {
e.logger.Warn("failed to submit seniority merge", zap.Error(mergeErr))
}
}
// Clear merge targets for the join — Materialize only applies
// seniority from merge targets when creating a new prover vertex.
// Including them here would just consume the spent markers without
// updating seniority, racing with the separate seniority merge.
helpers = nil
peerIds = nil
e.logger.Debug(
"prover already exists, merge targets will be included in join",
zap.Uint64("existing_seniority", info.Seniority),
zap.Uint64("merge_seniority", mergeSeniority),
)
}
e.logger.Info(

View File

@ -234,15 +234,6 @@ func (m *Manager) PlanAndAllocate(
})
}
workerLookup := make(map[uint]*store.WorkerInfo, len(all))
for _, w := range all {
workerLookup[w.CoreId] = w
}
if len(proposals) > 0 {
m.persistPlannedFilters(proposals, workerLookup, frameNumber)
}
// Perform allocations
workerIds := []uint{}
filters := [][]byte{}
@ -258,9 +249,19 @@ func (m *Manager) PlanAndAllocate(
m.logger.Warn("allocate worker failed",
zap.Error(err),
)
return proposals, errors.Wrap(err, "plan and allocate")
}
return proposals, errors.Wrap(err, "plan and allocate")
// Persist filters only after successful publication — if the join
// fails to publish, we don't want workers stuck with filters that
// block them for proposalTimeoutFrames.
workerLookup := make(map[uint]*store.WorkerInfo, len(all))
for _, w := range all {
workerLookup[w.CoreId] = w
}
m.persistPlannedFilters(proposals, workerLookup, frameNumber)
return proposals, nil
}
func (m *Manager) persistPlannedFilters(

View File

@ -1,6 +1,7 @@
package global
import (
"bytes"
"encoding/binary"
"fmt"
"math/big"
@ -149,6 +150,66 @@ func (p *ProverJoin) Materialize(
}
}
// Compute seniority from merge targets before the prover-exists check,
// so it can be applied to both new and existing provers.
var computedSeniority uint64 = 0
if len(p.MergeTargets) > 0 {
var mergePeerIds []string
for _, target := range p.MergeTargets {
// Check if this merge target was already consumed
spentBI, err := poseidon.HashBytes(slices.Concat(
[]byte("PROVER_JOIN_MERGE"),
target.PublicKey,
))
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
v, vErr := hg.Get(
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
spentBI.FillBytes(make([]byte, 32)),
hgstate.VertexAddsDiscriminator,
)
if vErr == nil && v != nil {
// Spent marker exists — check who consumed it
spentTree, ok := v.(*tries.VectorCommitmentTree)
if ok && spentTree != nil {
storedAddr, getErr := p.rdfMultiprover.Get(
GLOBAL_RDF_SCHEMA,
"merge:SpentMerge",
"ProverAddress",
spentTree,
)
if getErr == nil && len(storedAddr) == 32 &&
!bytes.Equal(storedAddr, proverAddress) {
continue // consumed by a different prover
}
}
// Same prover or legacy empty marker — count seniority
}
if target.KeyType == crypto.KeyTypeEd448 {
pk, err := pcrypto.UnmarshalEd448PublicKey(target.PublicKey)
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
peerId, err := peer.IDFromPublicKey(pk)
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
mergePeerIds = append(mergePeerIds, peerId.String())
}
}
if len(mergePeerIds) > 0 {
seniorityBig := compat.GetAggregatedSeniority(mergePeerIds)
if seniorityBig.IsUint64() {
computedSeniority = seniorityBig.Uint64()
}
}
}
if !proverExists {
// Create new prover entry
proverTree = &qcrypto.VectorCommitmentTree{}
@ -194,56 +255,9 @@ func (p *ProverJoin) Materialize(
return nil, errors.Wrap(err, "materialize")
}
// Calculate seniority from MergeTargets, skipping already-consumed ones
var seniority uint64 = 0
if len(p.MergeTargets) > 0 {
// Convert Ed448 public keys to peer IDs
var peerIds []string
for _, target := range p.MergeTargets {
// Check if this merge target was already consumed
spentBI, err := poseidon.HashBytes(slices.Concat(
[]byte("PROVER_JOIN_MERGE"),
target.PublicKey,
))
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
v, vErr := hg.Get(
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
spentBI.FillBytes(make([]byte, 32)),
hgstate.VertexAddsDiscriminator,
)
if vErr == nil && v != nil {
continue // already consumed, skip
}
if target.KeyType == crypto.KeyTypeEd448 {
pk, err := pcrypto.UnmarshalEd448PublicKey(target.PublicKey)
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
peerId, err := peer.IDFromPublicKey(pk)
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
peerIds = append(peerIds, peerId.String())
}
}
// Get aggregated seniority
if len(peerIds) > 0 {
seniorityBig := compat.GetAggregatedSeniority(peerIds)
if seniorityBig.IsUint64() {
seniority = seniorityBig.Uint64()
}
}
}
// Store seniority
// Store seniority (computed above from merge targets)
seniorityBytes := make([]byte, 8)
binary.BigEndian.PutUint64(seniorityBytes, seniority)
binary.BigEndian.PutUint64(seniorityBytes, computedSeniority)
err = p.rdfMultiprover.Set(
GLOBAL_RDF_SCHEMA,
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
@ -335,6 +349,54 @@ func (p *ProverJoin) Materialize(
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
} else if computedSeniority > 0 {
// For existing provers, update seniority if merge targets provide a
// higher value than what's currently stored.
existingSeniorityData, err := p.rdfMultiprover.Get(
GLOBAL_RDF_SCHEMA,
"prover:Prover",
"Seniority",
proverTree,
)
var existingSeniority uint64 = 0
if err == nil && len(existingSeniorityData) == 8 {
existingSeniority = binary.BigEndian.Uint64(existingSeniorityData)
}
if computedSeniority > existingSeniority {
seniorityBytes := make([]byte, 8)
binary.BigEndian.PutUint64(seniorityBytes, computedSeniority)
err = p.rdfMultiprover.Set(
GLOBAL_RDF_SCHEMA,
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
"prover:Prover",
"Seniority",
seniorityBytes,
proverTree,
)
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
updatedVertex := hg.NewVertexAddMaterializedState(
intrinsics.GLOBAL_INTRINSIC_ADDRESS,
[32]byte(proverAddress),
frameNumber,
proverTree,
proverTree,
)
err = hg.Set(
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
proverAddress,
hgstate.VertexAddsDiscriminator,
frameNumber,
updatedVertex,
)
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
}
}
// Create hyperedge for this prover
@ -474,30 +536,59 @@ func (p *ProverJoin) Materialize(
return nil, errors.Wrap(err, "materialize")
}
// Skip already-consumed merge targets
spentAddress := [64]byte{}
copy(spentAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
copy(spentAddress[32:], spentMergeBI.FillBytes(make([]byte, 32)))
spentMergeAddr := spentMergeBI.FillBytes(make([]byte, 32))
// Check existing spent marker
var prior *tries.VectorCommitmentTree
existing, existErr := hg.Get(
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
spentMergeBI.FillBytes(make([]byte, 32)),
spentMergeAddr,
hgstate.VertexAddsDiscriminator,
)
if existErr == nil && existing != nil {
continue
existingTree, ok := existing.(*tries.VectorCommitmentTree)
if ok && existingTree != nil {
storedAddr, getErr := p.rdfMultiprover.Get(
GLOBAL_RDF_SCHEMA,
"merge:SpentMerge",
"ProverAddress",
existingTree,
)
if getErr == nil && len(storedAddr) == 32 {
// New format marker — already has a prover address.
// Skip regardless of whether it's ours or another's.
continue
}
// Legacy empty marker — overwrite with prover address
prior = existingTree
}
}
// Write spent marker with prover address
spentTree := &tries.VectorCommitmentTree{}
err = p.rdfMultiprover.Set(
GLOBAL_RDF_SCHEMA,
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
"merge:SpentMerge",
"ProverAddress",
proverAddress,
spentTree,
)
if err != nil {
return nil, errors.Wrap(err, "materialize")
}
spentMergeVertex := hg.NewVertexAddMaterializedState(
intrinsics.GLOBAL_INTRINSIC_ADDRESS,
[32]byte(spentMergeBI.FillBytes(make([]byte, 32))),
[32]byte(spentMergeAddr),
frameNumber,
nil,
&tries.VectorCommitmentTree{},
prior,
spentTree,
)
err = hg.Set(
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
spentMergeBI.FillBytes(make([]byte, 32)),
spentMergeAddr,
hgstate.VertexAddsDiscriminator,
frameNumber,
spentMergeVertex,
@ -695,9 +786,36 @@ func (p *ProverJoin) GetWriteAddresses(frameNumber uint64) ([][]byte, error) {
return nil, errors.Wrap(err, "get write addresses")
}
spentAddr := spentMergeBI.FillBytes(make([]byte, 32))
// Skip merge targets whose spent markers already contain a prover
// address (new format). These won't be written to — either they
// belong to this prover (already recorded) or a different one.
// Legacy empty markers and new markers need a write lock since
// Materialize will write them.
if p.hypergraph != nil {
spentFullAddr := [64]byte{}
copy(spentFullAddr[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
copy(spentFullAddr[32:], spentAddr)
spentData, dataErr := p.hypergraph.GetVertexData(spentFullAddr)
if dataErr == nil && spentData != nil {
storedAddr, getErr := p.rdfMultiprover.Get(
GLOBAL_RDF_SCHEMA,
"merge:SpentMerge",
"ProverAddress",
spentData,
)
if getErr == nil && len(storedAddr) == 32 {
// New format — won't be written to
continue
}
// Legacy empty — will be overwritten, need write lock
}
}
addresses[string(slices.Concat(
intrinsics.GLOBAL_INTRINSIC_ADDRESS[:],
spentMergeBI.FillBytes(make([]byte, 32)),
spentAddr,
))] = struct{}{}
}
@ -793,7 +911,6 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) {
}
for _, mt := range p.MergeTargets {
// Check spent status first if already consumed, skip entirely
spentMergeBI, err := poseidon.HashBytes(slices.Concat(
[]byte("PROVER_JOIN_MERGE"),
mt.PublicKey,
@ -802,15 +919,28 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) {
return false, errors.Wrap(err, "verify: invalid prover join")
}
spentAddress := [64]byte{}
copy(spentAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
copy(spentAddress[32:], spentMergeBI.FillBytes(make([]byte, 32)))
spentFullAddr := [64]byte{}
copy(spentFullAddr[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:])
copy(spentFullAddr[32:], spentMergeBI.FillBytes(make([]byte, 32)))
v, err := p.hypergraph.GetVertex(spentAddress)
v, err := p.hypergraph.GetVertex(spentFullAddr)
if err == nil && v != nil {
// merge target already consumed, skip join proceeds without
// this target's seniority
continue
// Spent marker exists — check if consumed by a different prover
spentData, dataErr := p.hypergraph.GetVertexData(spentFullAddr)
if dataErr == nil && spentData != nil {
storedAddr, getErr := p.rdfMultiprover.Get(
GLOBAL_RDF_SCHEMA,
"merge:SpentMerge",
"ProverAddress",
spentData,
)
if getErr == nil && len(storedAddr) == 32 &&
!bytes.Equal(storedAddr, address) {
// Consumed by a different prover — skip
continue
}
}
// Same prover or legacy empty — validate signature below
}
valid, err := p.keyManager.ValidateSignature(