From af6f2576c8f6c514532e5113d5372effd46a6149 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Tue, 17 Feb 2026 04:20:51 -0600 Subject: [PATCH] fix: one-shot sync message size, app shard TC signature size, collector/hotstuff race condition, expired joins blocking new joins due to pruning disable --- config/engine.go | 2 +- .../consensus_signature_aggregator_wrapper.go | 6 +- node/consensus/app/app_consensus_engine.go | 3 + .../app/consensus_liveness_provider.go | 19 ++++-- node/consensus/app/message_collector.go | 60 +++++++++++++++++-- .../global/consensus_liveness_provider.go | 41 +++++++++++-- node/consensus/global/event_distributor.go | 3 +- node/consensus/global/message_collector.go | 37 +++++++++++- node/consensus/global/message_processors.go | 3 + .../intrinsics/global/global_prover_join.go | 45 ++++++++++++-- 10 files changed, 193 insertions(+), 26 deletions(-) diff --git a/config/engine.go b/config/engine.go index 04fc1b9..0e1e923 100644 --- a/config/engine.go +++ b/config/engine.go @@ -11,7 +11,7 @@ const ( defaultDataWorkerMemoryLimit = int64(1792 * 1024 * 1024) // 1.75 GiB defaultSyncTimeout = 4 * time.Second defaultSyncCandidates = 8 - defaultSyncMessageReceiveLimit = 1 * 1024 * 1024 + defaultSyncMessageReceiveLimit = 600 * 1024 * 1024 defaultSyncMessageSendLimit = 600 * 1024 * 1024 defaultRewardStrategy = "reward-greedy" ) diff --git a/node/consensus/aggregator/consensus_signature_aggregator_wrapper.go b/node/consensus/aggregator/consensus_signature_aggregator_wrapper.go index 12bd189..371d0c1 100644 --- a/node/consensus/aggregator/consensus_signature_aggregator_wrapper.go +++ b/node/consensus/aggregator/consensus_signature_aggregator_wrapper.go @@ -86,7 +86,11 @@ func (c *ConsensusSignatureAggregatorWrapper) Aggregate( } // TODO: remove direct reference - if len(c.filter) != 0 { + // Only append extra bytes when extProofs were actually present (adj > 0). + // Timeout votes produce 74-byte signatures with no extProof, so appending + // 516*(n-1) zero bytes would bloat the TC aggregate signature beyond the + // deserialization limit (711 bytes) in TimeoutCertificate.FromCanonicalBytes. + if len(c.filter) != 0 && adj > 0 { output.(*bls48581.BlsAggregateOutput).AggregateSignature = append(output.(*bls48581.BlsAggregateOutput).AggregateSignature, extra...) } diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 638d0f0..bb17944 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -2783,6 +2783,9 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange( // OnRankChange implements consensus.Consumer. func (e *AppConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) { + if e.currentRank == newRank { + return + } e.currentRank = newRank err := e.ensureGlobalClient() if err != nil { diff --git a/node/consensus/app/consensus_liveness_provider.go b/node/consensus/app/consensus_liveness_provider.go index 3cc5ca3..9dc6b74 100644 --- a/node/consensus/app/consensus_liveness_provider.go +++ b/node/consensus/app/consensus_liveness_provider.go @@ -44,11 +44,17 @@ func (p *AppLivenessProvider) Collect( var collectorRecords []*sequencedAppMessage var collector keyedaggregator.Collector[sequencedAppMessage] + alreadyCollected := false if p.engine.messageCollectors != nil { var err error var found bool collector, found, err = p.engine.getAppMessageCollector(rank) - if err != nil && !errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) { + if err != nil && errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) { + // Collector was already pruned by a prior Collect call for this + // rank. We must not overwrite collectedMessages with an empty + // slice or the previously-collected messages will be lost. + alreadyCollected = true + } else if err != nil { p.engine.logger.Warn( "could not fetch collector for rank", zap.Uint64("rank", rank), @@ -133,9 +139,14 @@ func (p *AppLivenessProvider) Collect( } pendingMessagesCount.WithLabelValues(p.engine.appAddressHex).Set(0) - p.engine.collectedMessagesMu.Lock() - p.engine.collectedMessages = finalizedMessages - p.engine.collectedMessagesMu.Unlock() + // If we already collected for this rank (collector was pruned) and found no + // new messages, preserve the previously-collected messages rather than + // overwriting them with an empty slice. + if !alreadyCollected || len(finalizedMessages) > 0 { + p.engine.collectedMessagesMu.Lock() + p.engine.collectedMessages = finalizedMessages + p.engine.collectedMessagesMu.Unlock() + } return CollectedCommitments{ frameNumber: frameNumber, diff --git a/node/consensus/app/message_collector.go b/node/consensus/app/message_collector.go index 567d982..e9d3ff3 100644 --- a/node/consensus/app/message_collector.go +++ b/node/consensus/app/message_collector.go @@ -212,7 +212,7 @@ func (e *AppConsensusEngine) startAppMessageAggregator( } func (e *AppConsensusEngine) addAppMessage(message *protobufs.Message) { - if e.messageAggregator == nil || message == nil { + if e.messageCollectors == nil || message == nil { return } if len(message.Hash) == 0 { @@ -224,7 +224,29 @@ func (e *AppConsensusEngine) addAppMessage(message *protobufs.Message) { if record == nil { return } - e.messageAggregator.Add(record) + + // Add directly to the collector synchronously rather than going through + // the aggregator's async worker queue. The async path loses messages + // because OnSequenceChange advances the retention window before workers + // finish processing queued items, causing them to be silently pruned. + collector, _, err := e.messageCollectors.GetOrCreateCollector(rank) + if err != nil { + e.logger.Debug( + "could not get collector for app message", + zap.Uint64("rank", rank), + zap.Error(err), + ) + return + } + + if err := collector.Add(record); err != nil { + e.logger.Debug( + "could not add app message to collector", + zap.Uint64("rank", rank), + zap.Error(err), + ) + return + } } func (e *AppConsensusEngine) nextRank() uint64 { @@ -298,7 +320,7 @@ func (e *AppConsensusEngine) deferAppMessage( } func (e *AppConsensusEngine) flushDeferredAppMessages(targetRank uint64) { - if e == nil || e.messageAggregator == nil || targetRank == 0 { + if e == nil || e.messageCollectors == nil || targetRank == 0 { return } @@ -313,8 +335,36 @@ func (e *AppConsensusEngine) flushDeferredAppMessages(targetRank uint64) { return } + collector, _, err := e.messageCollectors.GetOrCreateCollector(targetRank) + if err != nil { + if e.logger != nil { + e.logger.Debug( + "could not get collector for deferred app messages", + zap.String("app_address", e.appAddressHex), + zap.Uint64("target_rank", targetRank), + zap.Error(err), + ) + } + return + } + + added := 0 for _, msg := range messages { - e.messageAggregator.Add(newSequencedAppMessage(targetRank, msg)) + record := newSequencedAppMessage(targetRank, msg) + if record == nil { + continue + } + if err := collector.Add(record); err != nil { + if e.logger != nil { + e.logger.Debug( + "could not add deferred app message to collector", + zap.Uint64("target_rank", targetRank), + zap.Error(err), + ) + } + continue + } + added++ } if e.logger != nil { @@ -322,7 +372,7 @@ func (e *AppConsensusEngine) flushDeferredAppMessages(targetRank uint64) { "replayed deferred app messages", zap.String("app_address", e.appAddressHex), zap.Uint64("target_rank", targetRank), - zap.Int("count", len(messages)), + zap.Int("count", added), ) } } diff --git a/node/consensus/global/consensus_liveness_provider.go b/node/consensus/global/consensus_liveness_provider.go index 75a9258..4ab7d04 100644 --- a/node/consensus/global/consensus_liveness_provider.go +++ b/node/consensus/global/consensus_liveness_provider.go @@ -40,11 +40,17 @@ func (p *GlobalLivenessProvider) Collect( var collector keyedaggregator.Collector[sequencedGlobalMessage] var collectorRecords []*sequencedGlobalMessage + alreadyCollected := false if p.engine.messageCollectors != nil { var err error var found bool collector, found, err = p.engine.getMessageCollector(rank) - if err != nil && !errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) { + if err != nil && errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) { + // Collector was already pruned by a prior Collect call for this + // rank. We must not overwrite collectedMessages with an empty + // slice or the previously-collected messages will be lost. + alreadyCollected = true + } else if err != nil { p.engine.logger.Warn( "could not fetch collector for rank", zap.Uint64("rank", rank), @@ -53,6 +59,15 @@ func (p *GlobalLivenessProvider) Collect( } else if found { collectorRecords = collector.Records() } + p.engine.logger.Debug( + "collector lookup for rank", + zap.Uint64("rank", rank), + zap.Uint64("frame_number", frameNumber), + zap.Bool("found", found), + zap.Bool("already_collected", alreadyCollected), + zap.Int("records", len(collectorRecords)), + zap.Uint64("current_rank", p.engine.currentRank), + ) } acceptedMessages := make( @@ -62,8 +77,10 @@ func (p *GlobalLivenessProvider) Collect( ) if collector != nil { + nilMsgCount := 0 for _, record := range collectorRecords { if record == nil || record.message == nil { + nilMsgCount++ continue } if err := p.lockCollectorMessage( @@ -80,6 +97,13 @@ func (p *GlobalLivenessProvider) Collect( } acceptedMessages = append(acceptedMessages, record.message) } + if nilMsgCount > 0 { + p.engine.logger.Debug( + "collector records with nil message (failed validation)", + zap.Int("nil_msg_count", nilMsgCount), + zap.Int("total_records", len(collectorRecords)), + ) + } } messages := append([]*protobufs.Message{}, mixnetMessages...) @@ -115,12 +139,17 @@ func (p *GlobalLivenessProvider) Collect( return GlobalCollectedCommitments{}, errors.Wrap(err, "collect") } - // Store the accepted messages as canonical bytes for inclusion in the frame - collectedMsgs := make([][]byte, 0, len(acceptedMessages)) - for _, msg := range acceptedMessages { - collectedMsgs = append(collectedMsgs, msg.Payload) + // Store the accepted messages as canonical bytes for inclusion in the frame. + // If we already collected for this rank (collector was pruned) and found no + // new messages, preserve the previously-collected messages rather than + // overwriting them with an empty slice. + if !alreadyCollected || len(acceptedMessages) > 0 { + collectedMsgs := make([][]byte, 0, len(acceptedMessages)) + for _, msg := range acceptedMessages { + collectedMsgs = append(collectedMsgs, msg.Payload) + } + p.engine.collectedMessages = collectedMsgs } - p.engine.collectedMessages = collectedMsgs return GlobalCollectedCommitments{ frameNumber: frameNumber, diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index 36e857e..2b308d7 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -1052,7 +1052,8 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot( data.Frame.Header.FrameNumber > token.FRAME_2_1_EXTENDED_ENROLL_END { pending = allocation.Status == typesconsensus.ProverStatusJoining && - allocation.JoinFrameNumber+360 <= data.Frame.Header.FrameNumber + allocation.JoinFrameNumber+360 <= data.Frame.Header.FrameNumber && + data.Frame.Header.FrameNumber <= allocation.JoinFrameNumber+pendingFilterGraceFrames } } } diff --git a/node/consensus/global/message_collector.go b/node/consensus/global/message_collector.go index 2973048..6b78645 100644 --- a/node/consensus/global/message_collector.go +++ b/node/consensus/global/message_collector.go @@ -215,7 +215,7 @@ func (e *GlobalConsensusEngine) startGlobalMessageAggregator( } func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) { - if e.messageAggregator == nil || len(data) == 0 { + if e.messageCollectors == nil || len(data) == 0 { return } @@ -270,8 +270,39 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) { } } - record := newSequencedGlobalMessage(e.currentRank+1, payload) - e.messageAggregator.Add(record) + seq := e.currentRank + 1 + record := newSequencedGlobalMessage(seq, payload) + + // Add directly to the collector synchronously rather than going through + // the aggregator's async worker queue. The async path loses messages + // because OnSequenceChange advances the retention window before workers + // finish processing queued items, causing them to be silently pruned. + collector, _, err := e.messageCollectors.GetOrCreateCollector(seq) + if err != nil { + e.logger.Debug( + "could not get collector for global message", + zap.Uint64("sequence", seq), + zap.Uint64("current_rank", e.currentRank), + zap.Error(err), + ) + return + } + + if err := collector.Add(record); err != nil { + e.logger.Debug( + "could not add global message to collector", + zap.Uint64("sequence", seq), + zap.Error(err), + ) + return + } + + e.logger.Debug( + "added global message to collector", + zap.Uint64("sequence", seq), + zap.Uint64("current_rank", e.currentRank), + zap.Int("payload_len", len(payload)), + ) } // filterProverOnlyRequests filters a list of message requests to only include diff --git a/node/consensus/global/message_processors.go b/node/consensus/global/message_processors.go index a5a54b0..8dbc31b 100644 --- a/node/consensus/global/message_processors.go +++ b/node/consensus/global/message_processors.go @@ -70,9 +70,12 @@ func (e *GlobalConsensusEngine) processProverMessageQueue( ctx lifecycle.SignalerContext, ) { if e.config.P2P.Network != 99 && !e.config.Engine.ArchiveMode { + e.logger.Debug("prover message queue processor disabled (not archive mode)") return } + e.logger.Info("prover message queue processor started") + for { select { case <-e.haltCtx.Done(): diff --git a/node/execution/intrinsics/global/global_prover_join.go b/node/execution/intrinsics/global/global_prover_join.go index bc8769e..46d7598 100644 --- a/node/execution/intrinsics/global/global_prover_join.go +++ b/node/execution/intrinsics/global/global_prover_join.go @@ -890,11 +890,46 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) { if err == nil && len(statusData) > 0 { status := statusData[0] if status != 4 { - // Prover is in some other state - cannot join - return false, errors.Wrap( - errors.New("prover already exists in non-left state"), - "verify: invalid prover join", - ) + // Check if the previous join/leave has implicitly expired + // (720 frames), making the prover effectively "left" + expired := false + if status == 0 { + // Joining: check if join expired + joinFrameBytes, jErr := p.rdfMultiprover.Get( + GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "JoinFrameNumber", + tree, + ) + if jErr == nil && len(joinFrameBytes) == 8 { + joinFrame := binary.BigEndian.Uint64(joinFrameBytes) + if joinFrame >= token.FRAME_2_1_EXTENDED_ENROLL_END && + frameNumber > joinFrame+720 { + expired = true + } + } + } else if status == 3 { + // Leaving: check if leave expired + leaveFrameBytes, lErr := p.rdfMultiprover.Get( + GLOBAL_RDF_SCHEMA, + "allocation:ProverAllocation", + "LeaveFrameNumber", + tree, + ) + if lErr == nil && len(leaveFrameBytes) == 8 { + leaveFrame := binary.BigEndian.Uint64(leaveFrameBytes) + if frameNumber > leaveFrame+720 { + expired = true + } + } + } + + if !expired { + return false, errors.Wrap( + errors.New("prover already exists in non-left state"), + "verify: invalid prover join", + ) + } } } }