diff --git a/config/engine.go b/config/engine.go index 0e1e923..d60b4c8 100644 --- a/config/engine.go +++ b/config/engine.go @@ -6,8 +6,8 @@ const ( defaultMinimumPeersRequired = 3 priorDefaultDataWorkerBaseListenMultiaddr = "/ip4/127.0.0.1/tcp/%d" defaultDataWorkerBaseListenMultiaddr = "/ip4/0.0.0.0/tcp/%d" - defaultDataWorkerBaseP2PPort = uint16(50000) - defaultDataWorkerBaseStreamPort = uint16(60000) + defaultDataWorkerBaseP2PPort = uint16(25000) + defaultDataWorkerBaseStreamPort = uint16(32500) defaultDataWorkerMemoryLimit = int64(1792 * 1024 * 1024) // 1.75 GiB defaultSyncTimeout = 4 * time.Second defaultSyncCandidates = 8 diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index 8ed2850..baee81e 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -613,6 +613,7 @@ func (e *GlobalConsensusEngine) evaluateForProposals( decideDescriptors := snapshot.decideDescriptors worldBytes := snapshot.worldBytes + joinProposedThisCycle := false if len(proposalDescriptors) != 0 && allowProposals { if canPropose { proposals, err := e.proposer.PlanAndAllocate( @@ -626,6 +627,7 @@ func (e *GlobalConsensusEngine) evaluateForProposals( e.logger.Error("could not plan shard allocations", zap.Error(err)) } else { if len(proposals) > 0 { + joinProposedThisCycle = true e.lastJoinAttemptFrame.Store(data.Frame.Header.FrameNumber) } expectedRewardSum := big.NewInt(0) @@ -665,6 +667,50 @@ func (e *GlobalConsensusEngine) evaluateForProposals( zap.Uint64("frame_number", data.Frame.Header.FrameNumber), ) } + + // Standalone seniority merge: when no join was proposed this cycle but the + // prover exists with incorrect seniority, submit a seniority merge to fix + // it. This covers the case where all worker slots are filled and no new + // joins are being proposed. + if !joinProposedThisCycle && self != nil { + frameNum := data.Frame.Header.FrameNumber + mergeSeniority := e.estimateSeniorityFromConfig() + + if mergeSeniority > self.Seniority { + lastJoin := e.lastJoinAttemptFrame.Load() + lastMerge := e.lastSeniorityMergeFrame.Load() + joinCooldownOk := lastJoin == 0 || frameNum-lastJoin >= 10 + mergeCooldownOk := lastMerge == 0 || frameNum-lastMerge >= 10 + + if joinCooldownOk && mergeCooldownOk { + frame := e.GetFrame() + if frame != nil { + helpers, peerIds := e.buildMergeHelpers() + err := e.submitSeniorityMerge( + frame, helpers, mergeSeniority, peerIds, + ) + if err != nil { + e.logger.Error( + "could not submit seniority merge", + zap.Error(err), + ) + } else { + e.lastSeniorityMergeFrame.Store(frameNum) + } + } + } else { + e.logger.Debug( + "seniority merge deferred due to cooldown", + zap.Uint64("merge_seniority", mergeSeniority), + zap.Uint64("existing_seniority", self.Seniority), + zap.Uint64("last_join_frame", lastJoin), + zap.Uint64("last_merge_frame", lastMerge), + zap.Uint64("current_frame", frameNum), + ) + } + } + } + if len(pendingFilters) != 0 { if err := e.proposer.DecideJoins( uint64(data.Frame.Header.Difficulty), @@ -1078,7 +1124,8 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot( allocated = false } - if allocation.Status == typesconsensus.ProverStatusJoining { + if allocation.Status == typesconsensus.ProverStatusJoining && + data.Frame.Header.FrameNumber <= allocation.JoinFrameNumber+pendingFilterGraceFrames { shardsPending++ awaitingFrame[allocation.JoinFrameNumber+360] = struct{}{} } diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index b389cd7..42ee511 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -152,8 +152,9 @@ type GlobalConsensusEngine struct { alertPublicKey []byte hasSentKeyBundle bool proverSyncInProgress atomic.Bool - lastJoinAttemptFrame atomic.Uint64 - lastObservedFrame atomic.Uint64 + lastJoinAttemptFrame atomic.Uint64 + lastSeniorityMergeFrame atomic.Uint64 + lastObservedFrame atomic.Uint64 lastRejectFrame atomic.Uint64 proverRootVerifiedFrame atomic.Uint64 proverRootSynced atomic.Bool @@ -3360,28 +3361,15 @@ func (e *GlobalConsensusEngine) ProposeWorkerJoin( mergeSeniority = mergeSeniorityBI.Uint64() } - // If prover already exists, submit a seniority merge if needed, then - // fall through to the join — the filters being proposed are only for - // unallocated shards (already filtered by collectAllocationSnapshot), - // so the join is valid even when the prover has existing allocations. + // Always include merge targets in the join — Materialize handles + // seniority for both new and existing provers. A separate seniority + // merge is not submitted because it would double-count with the join. if proverExists { - if mergeSeniority > info.Seniority { - e.logger.Info( - "existing prover has lower seniority than merge would provide, submitting seniority merge", - zap.Uint64("existing_seniority", info.Seniority), - zap.Uint64("merge_seniority", mergeSeniority), - zap.Strings("peer_ids", peerIds), - ) - if mergeErr := e.submitSeniorityMerge(frame, helpers, mergeSeniority, peerIds); mergeErr != nil { - e.logger.Warn("failed to submit seniority merge", zap.Error(mergeErr)) - } - } - // Clear merge targets for the join — Materialize only applies - // seniority from merge targets when creating a new prover vertex. - // Including them here would just consume the spent markers without - // updating seniority, racing with the separate seniority merge. - helpers = nil - peerIds = nil + e.logger.Debug( + "prover already exists, merge targets will be included in join", + zap.Uint64("existing_seniority", info.Seniority), + zap.Uint64("merge_seniority", mergeSeniority), + ) } e.logger.Info( diff --git a/node/consensus/provers/proposer.go b/node/consensus/provers/proposer.go index ed983dc..d30bb91 100644 --- a/node/consensus/provers/proposer.go +++ b/node/consensus/provers/proposer.go @@ -234,15 +234,6 @@ func (m *Manager) PlanAndAllocate( }) } - workerLookup := make(map[uint]*store.WorkerInfo, len(all)) - for _, w := range all { - workerLookup[w.CoreId] = w - } - - if len(proposals) > 0 { - m.persistPlannedFilters(proposals, workerLookup, frameNumber) - } - // Perform allocations workerIds := []uint{} filters := [][]byte{} @@ -258,9 +249,19 @@ func (m *Manager) PlanAndAllocate( m.logger.Warn("allocate worker failed", zap.Error(err), ) + return proposals, errors.Wrap(err, "plan and allocate") } - return proposals, errors.Wrap(err, "plan and allocate") + // Persist filters only after successful publication — if the join + // fails to publish, we don't want workers stuck with filters that + // block them for proposalTimeoutFrames. + workerLookup := make(map[uint]*store.WorkerInfo, len(all)) + for _, w := range all { + workerLookup[w.CoreId] = w + } + m.persistPlannedFilters(proposals, workerLookup, frameNumber) + + return proposals, nil } func (m *Manager) persistPlannedFilters( diff --git a/node/execution/intrinsics/global/global_prover_join.go b/node/execution/intrinsics/global/global_prover_join.go index 58a84f1..5334740 100644 --- a/node/execution/intrinsics/global/global_prover_join.go +++ b/node/execution/intrinsics/global/global_prover_join.go @@ -1,6 +1,7 @@ package global import ( + "bytes" "encoding/binary" "fmt" "math/big" @@ -149,6 +150,66 @@ func (p *ProverJoin) Materialize( } } + // Compute seniority from merge targets before the prover-exists check, + // so it can be applied to both new and existing provers. + var computedSeniority uint64 = 0 + if len(p.MergeTargets) > 0 { + var mergePeerIds []string + for _, target := range p.MergeTargets { + // Check if this merge target was already consumed + spentBI, err := poseidon.HashBytes(slices.Concat( + []byte("PROVER_JOIN_MERGE"), + target.PublicKey, + )) + if err != nil { + return nil, errors.Wrap(err, "materialize") + } + v, vErr := hg.Get( + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + spentBI.FillBytes(make([]byte, 32)), + hgstate.VertexAddsDiscriminator, + ) + if vErr == nil && v != nil { + // Spent marker exists — check who consumed it + spentTree, ok := v.(*tries.VectorCommitmentTree) + if ok && spentTree != nil { + storedAddr, getErr := p.rdfMultiprover.Get( + GLOBAL_RDF_SCHEMA, + "merge:SpentMerge", + "ProverAddress", + spentTree, + ) + if getErr == nil && len(storedAddr) == 32 && + !bytes.Equal(storedAddr, proverAddress) { + continue // consumed by a different prover + } + } + // Same prover or legacy empty marker — count seniority + } + + if target.KeyType == crypto.KeyTypeEd448 { + pk, err := pcrypto.UnmarshalEd448PublicKey(target.PublicKey) + if err != nil { + return nil, errors.Wrap(err, "materialize") + } + + peerId, err := peer.IDFromPublicKey(pk) + if err != nil { + return nil, errors.Wrap(err, "materialize") + } + + mergePeerIds = append(mergePeerIds, peerId.String()) + } + } + + if len(mergePeerIds) > 0 { + seniorityBig := compat.GetAggregatedSeniority(mergePeerIds) + if seniorityBig.IsUint64() { + computedSeniority = seniorityBig.Uint64() + } + } + } + if !proverExists { // Create new prover entry proverTree = &qcrypto.VectorCommitmentTree{} @@ -194,56 +255,9 @@ func (p *ProverJoin) Materialize( return nil, errors.Wrap(err, "materialize") } - // Calculate seniority from MergeTargets, skipping already-consumed ones - var seniority uint64 = 0 - if len(p.MergeTargets) > 0 { - // Convert Ed448 public keys to peer IDs - var peerIds []string - for _, target := range p.MergeTargets { - // Check if this merge target was already consumed - spentBI, err := poseidon.HashBytes(slices.Concat( - []byte("PROVER_JOIN_MERGE"), - target.PublicKey, - )) - if err != nil { - return nil, errors.Wrap(err, "materialize") - } - v, vErr := hg.Get( - intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], - spentBI.FillBytes(make([]byte, 32)), - hgstate.VertexAddsDiscriminator, - ) - if vErr == nil && v != nil { - continue // already consumed, skip - } - - if target.KeyType == crypto.KeyTypeEd448 { - pk, err := pcrypto.UnmarshalEd448PublicKey(target.PublicKey) - if err != nil { - return nil, errors.Wrap(err, "materialize") - } - - peerId, err := peer.IDFromPublicKey(pk) - if err != nil { - return nil, errors.Wrap(err, "materialize") - } - - peerIds = append(peerIds, peerId.String()) - } - } - - // Get aggregated seniority - if len(peerIds) > 0 { - seniorityBig := compat.GetAggregatedSeniority(peerIds) - if seniorityBig.IsUint64() { - seniority = seniorityBig.Uint64() - } - } - } - - // Store seniority + // Store seniority (computed above from merge targets) seniorityBytes := make([]byte, 8) - binary.BigEndian.PutUint64(seniorityBytes, seniority) + binary.BigEndian.PutUint64(seniorityBytes, computedSeniority) err = p.rdfMultiprover.Set( GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], @@ -335,6 +349,54 @@ func (p *ProverJoin) Materialize( if err != nil { return nil, errors.Wrap(err, "materialize") } + } else if computedSeniority > 0 { + // For existing provers, update seniority if merge targets provide a + // higher value than what's currently stored. + existingSeniorityData, err := p.rdfMultiprover.Get( + GLOBAL_RDF_SCHEMA, + "prover:Prover", + "Seniority", + proverTree, + ) + var existingSeniority uint64 = 0 + if err == nil && len(existingSeniorityData) == 8 { + existingSeniority = binary.BigEndian.Uint64(existingSeniorityData) + } + + if computedSeniority > existingSeniority { + seniorityBytes := make([]byte, 8) + binary.BigEndian.PutUint64(seniorityBytes, computedSeniority) + err = p.rdfMultiprover.Set( + GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "prover:Prover", + "Seniority", + seniorityBytes, + proverTree, + ) + if err != nil { + return nil, errors.Wrap(err, "materialize") + } + + updatedVertex := hg.NewVertexAddMaterializedState( + intrinsics.GLOBAL_INTRINSIC_ADDRESS, + [32]byte(proverAddress), + frameNumber, + proverTree, + proverTree, + ) + + err = hg.Set( + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + proverAddress, + hgstate.VertexAddsDiscriminator, + frameNumber, + updatedVertex, + ) + if err != nil { + return nil, errors.Wrap(err, "materialize") + } + } } // Create hyperedge for this prover @@ -474,30 +536,59 @@ func (p *ProverJoin) Materialize( return nil, errors.Wrap(err, "materialize") } - // Skip already-consumed merge targets - spentAddress := [64]byte{} - copy(spentAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) - copy(spentAddress[32:], spentMergeBI.FillBytes(make([]byte, 32))) + spentMergeAddr := spentMergeBI.FillBytes(make([]byte, 32)) + + // Check existing spent marker + var prior *tries.VectorCommitmentTree existing, existErr := hg.Get( intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], - spentMergeBI.FillBytes(make([]byte, 32)), + spentMergeAddr, hgstate.VertexAddsDiscriminator, ) if existErr == nil && existing != nil { - continue + existingTree, ok := existing.(*tries.VectorCommitmentTree) + if ok && existingTree != nil { + storedAddr, getErr := p.rdfMultiprover.Get( + GLOBAL_RDF_SCHEMA, + "merge:SpentMerge", + "ProverAddress", + existingTree, + ) + if getErr == nil && len(storedAddr) == 32 { + // New format marker — already has a prover address. + // Skip regardless of whether it's ours or another's. + continue + } + // Legacy empty marker — overwrite with prover address + prior = existingTree + } + } + + // Write spent marker with prover address + spentTree := &tries.VectorCommitmentTree{} + err = p.rdfMultiprover.Set( + GLOBAL_RDF_SCHEMA, + intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], + "merge:SpentMerge", + "ProverAddress", + proverAddress, + spentTree, + ) + if err != nil { + return nil, errors.Wrap(err, "materialize") } spentMergeVertex := hg.NewVertexAddMaterializedState( intrinsics.GLOBAL_INTRINSIC_ADDRESS, - [32]byte(spentMergeBI.FillBytes(make([]byte, 32))), + [32]byte(spentMergeAddr), frameNumber, - nil, - &tries.VectorCommitmentTree{}, + prior, + spentTree, ) err = hg.Set( intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], - spentMergeBI.FillBytes(make([]byte, 32)), + spentMergeAddr, hgstate.VertexAddsDiscriminator, frameNumber, spentMergeVertex, @@ -695,9 +786,36 @@ func (p *ProverJoin) GetWriteAddresses(frameNumber uint64) ([][]byte, error) { return nil, errors.Wrap(err, "get write addresses") } + spentAddr := spentMergeBI.FillBytes(make([]byte, 32)) + + // Skip merge targets whose spent markers already contain a prover + // address (new format). These won't be written to — either they + // belong to this prover (already recorded) or a different one. + // Legacy empty markers and new markers need a write lock since + // Materialize will write them. + if p.hypergraph != nil { + spentFullAddr := [64]byte{} + copy(spentFullAddr[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(spentFullAddr[32:], spentAddr) + spentData, dataErr := p.hypergraph.GetVertexData(spentFullAddr) + if dataErr == nil && spentData != nil { + storedAddr, getErr := p.rdfMultiprover.Get( + GLOBAL_RDF_SCHEMA, + "merge:SpentMerge", + "ProverAddress", + spentData, + ) + if getErr == nil && len(storedAddr) == 32 { + // New format — won't be written to + continue + } + // Legacy empty — will be overwritten, need write lock + } + } + addresses[string(slices.Concat( intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], - spentMergeBI.FillBytes(make([]byte, 32)), + spentAddr, ))] = struct{}{} } @@ -793,7 +911,6 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) { } for _, mt := range p.MergeTargets { - // Check spent status first – if already consumed, skip entirely spentMergeBI, err := poseidon.HashBytes(slices.Concat( []byte("PROVER_JOIN_MERGE"), mt.PublicKey, @@ -802,15 +919,28 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) { return false, errors.Wrap(err, "verify: invalid prover join") } - spentAddress := [64]byte{} - copy(spentAddress[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) - copy(spentAddress[32:], spentMergeBI.FillBytes(make([]byte, 32))) + spentFullAddr := [64]byte{} + copy(spentFullAddr[:32], intrinsics.GLOBAL_INTRINSIC_ADDRESS[:]) + copy(spentFullAddr[32:], spentMergeBI.FillBytes(make([]byte, 32))) - v, err := p.hypergraph.GetVertex(spentAddress) + v, err := p.hypergraph.GetVertex(spentFullAddr) if err == nil && v != nil { - // merge target already consumed, skip – join proceeds without - // this target's seniority - continue + // Spent marker exists — check if consumed by a different prover + spentData, dataErr := p.hypergraph.GetVertexData(spentFullAddr) + if dataErr == nil && spentData != nil { + storedAddr, getErr := p.rdfMultiprover.Get( + GLOBAL_RDF_SCHEMA, + "merge:SpentMerge", + "ProverAddress", + spentData, + ) + if getErr == nil && len(storedAddr) == 32 && + !bytes.Equal(storedAddr, address) { + // Consumed by a different prover — skip + continue + } + } + // Same prover or legacy empty — validate signature below } valid, err := p.keyManager.ValidateSignature(