fix: one-shot sync message size, app shard TC signature size, collector/hotstuff race condition, expired joins blocking new joins due to pruning disable

This commit is contained in:
Cassandra Heart 2026-02-17 04:20:51 -06:00
parent 3ab99fe411
commit af6f2576c8
No known key found for this signature in database
GPG Key ID: 371083BFA6C240AA
10 changed files with 193 additions and 26 deletions

View File

@ -11,7 +11,7 @@ const (
defaultDataWorkerMemoryLimit = int64(1792 * 1024 * 1024) // 1.75 GiB
defaultSyncTimeout = 4 * time.Second
defaultSyncCandidates = 8
defaultSyncMessageReceiveLimit = 1 * 1024 * 1024
defaultSyncMessageReceiveLimit = 600 * 1024 * 1024
defaultSyncMessageSendLimit = 600 * 1024 * 1024
defaultRewardStrategy = "reward-greedy"
)

View File

@ -86,7 +86,11 @@ func (c *ConsensusSignatureAggregatorWrapper) Aggregate(
}
// TODO: remove direct reference
if len(c.filter) != 0 {
// Only append extra bytes when extProofs were actually present (adj > 0).
// Timeout votes produce 74-byte signatures with no extProof, so appending
// 516*(n-1) zero bytes would bloat the TC aggregate signature beyond the
// deserialization limit (711 bytes) in TimeoutCertificate.FromCanonicalBytes.
if len(c.filter) != 0 && adj > 0 {
output.(*bls48581.BlsAggregateOutput).AggregateSignature =
append(output.(*bls48581.BlsAggregateOutput).AggregateSignature, extra...)
}

View File

@ -2783,6 +2783,9 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange(
// OnRankChange implements consensus.Consumer.
func (e *AppConsensusEngine) OnRankChange(oldRank uint64, newRank uint64) {
if e.currentRank == newRank {
return
}
e.currentRank = newRank
err := e.ensureGlobalClient()
if err != nil {

View File

@ -44,11 +44,17 @@ func (p *AppLivenessProvider) Collect(
var collectorRecords []*sequencedAppMessage
var collector keyedaggregator.Collector[sequencedAppMessage]
alreadyCollected := false
if p.engine.messageCollectors != nil {
var err error
var found bool
collector, found, err = p.engine.getAppMessageCollector(rank)
if err != nil && !errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) {
if err != nil && errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) {
// Collector was already pruned by a prior Collect call for this
// rank. We must not overwrite collectedMessages with an empty
// slice or the previously-collected messages will be lost.
alreadyCollected = true
} else if err != nil {
p.engine.logger.Warn(
"could not fetch collector for rank",
zap.Uint64("rank", rank),
@ -133,9 +139,14 @@ func (p *AppLivenessProvider) Collect(
}
pendingMessagesCount.WithLabelValues(p.engine.appAddressHex).Set(0)
p.engine.collectedMessagesMu.Lock()
p.engine.collectedMessages = finalizedMessages
p.engine.collectedMessagesMu.Unlock()
// If we already collected for this rank (collector was pruned) and found no
// new messages, preserve the previously-collected messages rather than
// overwriting them with an empty slice.
if !alreadyCollected || len(finalizedMessages) > 0 {
p.engine.collectedMessagesMu.Lock()
p.engine.collectedMessages = finalizedMessages
p.engine.collectedMessagesMu.Unlock()
}
return CollectedCommitments{
frameNumber: frameNumber,

View File

@ -212,7 +212,7 @@ func (e *AppConsensusEngine) startAppMessageAggregator(
}
func (e *AppConsensusEngine) addAppMessage(message *protobufs.Message) {
if e.messageAggregator == nil || message == nil {
if e.messageCollectors == nil || message == nil {
return
}
if len(message.Hash) == 0 {
@ -224,7 +224,29 @@ func (e *AppConsensusEngine) addAppMessage(message *protobufs.Message) {
if record == nil {
return
}
e.messageAggregator.Add(record)
// Add directly to the collector synchronously rather than going through
// the aggregator's async worker queue. The async path loses messages
// because OnSequenceChange advances the retention window before workers
// finish processing queued items, causing them to be silently pruned.
collector, _, err := e.messageCollectors.GetOrCreateCollector(rank)
if err != nil {
e.logger.Debug(
"could not get collector for app message",
zap.Uint64("rank", rank),
zap.Error(err),
)
return
}
if err := collector.Add(record); err != nil {
e.logger.Debug(
"could not add app message to collector",
zap.Uint64("rank", rank),
zap.Error(err),
)
return
}
}
func (e *AppConsensusEngine) nextRank() uint64 {
@ -298,7 +320,7 @@ func (e *AppConsensusEngine) deferAppMessage(
}
func (e *AppConsensusEngine) flushDeferredAppMessages(targetRank uint64) {
if e == nil || e.messageAggregator == nil || targetRank == 0 {
if e == nil || e.messageCollectors == nil || targetRank == 0 {
return
}
@ -313,8 +335,36 @@ func (e *AppConsensusEngine) flushDeferredAppMessages(targetRank uint64) {
return
}
collector, _, err := e.messageCollectors.GetOrCreateCollector(targetRank)
if err != nil {
if e.logger != nil {
e.logger.Debug(
"could not get collector for deferred app messages",
zap.String("app_address", e.appAddressHex),
zap.Uint64("target_rank", targetRank),
zap.Error(err),
)
}
return
}
added := 0
for _, msg := range messages {
e.messageAggregator.Add(newSequencedAppMessage(targetRank, msg))
record := newSequencedAppMessage(targetRank, msg)
if record == nil {
continue
}
if err := collector.Add(record); err != nil {
if e.logger != nil {
e.logger.Debug(
"could not add deferred app message to collector",
zap.Uint64("target_rank", targetRank),
zap.Error(err),
)
}
continue
}
added++
}
if e.logger != nil {
@ -322,7 +372,7 @@ func (e *AppConsensusEngine) flushDeferredAppMessages(targetRank uint64) {
"replayed deferred app messages",
zap.String("app_address", e.appAddressHex),
zap.Uint64("target_rank", targetRank),
zap.Int("count", len(messages)),
zap.Int("count", added),
)
}
}

View File

@ -40,11 +40,17 @@ func (p *GlobalLivenessProvider) Collect(
var collector keyedaggregator.Collector[sequencedGlobalMessage]
var collectorRecords []*sequencedGlobalMessage
alreadyCollected := false
if p.engine.messageCollectors != nil {
var err error
var found bool
collector, found, err = p.engine.getMessageCollector(rank)
if err != nil && !errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) {
if err != nil && errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) {
// Collector was already pruned by a prior Collect call for this
// rank. We must not overwrite collectedMessages with an empty
// slice or the previously-collected messages will be lost.
alreadyCollected = true
} else if err != nil {
p.engine.logger.Warn(
"could not fetch collector for rank",
zap.Uint64("rank", rank),
@ -53,6 +59,15 @@ func (p *GlobalLivenessProvider) Collect(
} else if found {
collectorRecords = collector.Records()
}
p.engine.logger.Debug(
"collector lookup for rank",
zap.Uint64("rank", rank),
zap.Uint64("frame_number", frameNumber),
zap.Bool("found", found),
zap.Bool("already_collected", alreadyCollected),
zap.Int("records", len(collectorRecords)),
zap.Uint64("current_rank", p.engine.currentRank),
)
}
acceptedMessages := make(
@ -62,8 +77,10 @@ func (p *GlobalLivenessProvider) Collect(
)
if collector != nil {
nilMsgCount := 0
for _, record := range collectorRecords {
if record == nil || record.message == nil {
nilMsgCount++
continue
}
if err := p.lockCollectorMessage(
@ -80,6 +97,13 @@ func (p *GlobalLivenessProvider) Collect(
}
acceptedMessages = append(acceptedMessages, record.message)
}
if nilMsgCount > 0 {
p.engine.logger.Debug(
"collector records with nil message (failed validation)",
zap.Int("nil_msg_count", nilMsgCount),
zap.Int("total_records", len(collectorRecords)),
)
}
}
messages := append([]*protobufs.Message{}, mixnetMessages...)
@ -115,12 +139,17 @@ func (p *GlobalLivenessProvider) Collect(
return GlobalCollectedCommitments{}, errors.Wrap(err, "collect")
}
// Store the accepted messages as canonical bytes for inclusion in the frame
collectedMsgs := make([][]byte, 0, len(acceptedMessages))
for _, msg := range acceptedMessages {
collectedMsgs = append(collectedMsgs, msg.Payload)
// Store the accepted messages as canonical bytes for inclusion in the frame.
// If we already collected for this rank (collector was pruned) and found no
// new messages, preserve the previously-collected messages rather than
// overwriting them with an empty slice.
if !alreadyCollected || len(acceptedMessages) > 0 {
collectedMsgs := make([][]byte, 0, len(acceptedMessages))
for _, msg := range acceptedMessages {
collectedMsgs = append(collectedMsgs, msg.Payload)
}
p.engine.collectedMessages = collectedMsgs
}
p.engine.collectedMessages = collectedMsgs
return GlobalCollectedCommitments{
frameNumber: frameNumber,

View File

@ -1052,7 +1052,8 @@ func (e *GlobalConsensusEngine) collectAllocationSnapshot(
data.Frame.Header.FrameNumber > token.FRAME_2_1_EXTENDED_ENROLL_END {
pending = allocation.Status ==
typesconsensus.ProverStatusJoining &&
allocation.JoinFrameNumber+360 <= data.Frame.Header.FrameNumber
allocation.JoinFrameNumber+360 <= data.Frame.Header.FrameNumber &&
data.Frame.Header.FrameNumber <= allocation.JoinFrameNumber+pendingFilterGraceFrames
}
}
}

View File

@ -215,7 +215,7 @@ func (e *GlobalConsensusEngine) startGlobalMessageAggregator(
}
func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
if e.messageAggregator == nil || len(data) == 0 {
if e.messageCollectors == nil || len(data) == 0 {
return
}
@ -270,8 +270,39 @@ func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
}
}
record := newSequencedGlobalMessage(e.currentRank+1, payload)
e.messageAggregator.Add(record)
seq := e.currentRank + 1
record := newSequencedGlobalMessage(seq, payload)
// Add directly to the collector synchronously rather than going through
// the aggregator's async worker queue. The async path loses messages
// because OnSequenceChange advances the retention window before workers
// finish processing queued items, causing them to be silently pruned.
collector, _, err := e.messageCollectors.GetOrCreateCollector(seq)
if err != nil {
e.logger.Debug(
"could not get collector for global message",
zap.Uint64("sequence", seq),
zap.Uint64("current_rank", e.currentRank),
zap.Error(err),
)
return
}
if err := collector.Add(record); err != nil {
e.logger.Debug(
"could not add global message to collector",
zap.Uint64("sequence", seq),
zap.Error(err),
)
return
}
e.logger.Debug(
"added global message to collector",
zap.Uint64("sequence", seq),
zap.Uint64("current_rank", e.currentRank),
zap.Int("payload_len", len(payload)),
)
}
// filterProverOnlyRequests filters a list of message requests to only include

View File

@ -70,9 +70,12 @@ func (e *GlobalConsensusEngine) processProverMessageQueue(
ctx lifecycle.SignalerContext,
) {
if e.config.P2P.Network != 99 && !e.config.Engine.ArchiveMode {
e.logger.Debug("prover message queue processor disabled (not archive mode)")
return
}
e.logger.Info("prover message queue processor started")
for {
select {
case <-e.haltCtx.Done():

View File

@ -890,11 +890,46 @@ func (p *ProverJoin) Verify(frameNumber uint64) (valid bool, err error) {
if err == nil && len(statusData) > 0 {
status := statusData[0]
if status != 4 {
// Prover is in some other state - cannot join
return false, errors.Wrap(
errors.New("prover already exists in non-left state"),
"verify: invalid prover join",
)
// Check if the previous join/leave has implicitly expired
// (720 frames), making the prover effectively "left"
expired := false
if status == 0 {
// Joining: check if join expired
joinFrameBytes, jErr := p.rdfMultiprover.Get(
GLOBAL_RDF_SCHEMA,
"allocation:ProverAllocation",
"JoinFrameNumber",
tree,
)
if jErr == nil && len(joinFrameBytes) == 8 {
joinFrame := binary.BigEndian.Uint64(joinFrameBytes)
if joinFrame >= token.FRAME_2_1_EXTENDED_ENROLL_END &&
frameNumber > joinFrame+720 {
expired = true
}
}
} else if status == 3 {
// Leaving: check if leave expired
leaveFrameBytes, lErr := p.rdfMultiprover.Get(
GLOBAL_RDF_SCHEMA,
"allocation:ProverAllocation",
"LeaveFrameNumber",
tree,
)
if lErr == nil && len(leaveFrameBytes) == 8 {
leaveFrame := binary.BigEndian.Uint64(leaveFrameBytes)
if frameNumber > leaveFrame+720 {
expired = true
}
}
}
if !expired {
return false, errors.Wrap(
errors.New("prover already exists in non-left state"),
"verify: invalid prover join",
)
}
}
}
}