ceremonyclient/node/consensus/app/message_subscription.go
Cassandra Heart 1b2660b7df
v2.1.0.20 (#516)
* .20 testing

* Read in the debug by env variable (#514)

* v2.1.0.19

* enhanced error logging, fix seniority marker join blocker, fix sync message size limit defaults

* resolve signature failure

* additional error logging for merge-related signatures

* fix: one-shot sync message size, app shard TC signature size, collector/hotstuff race condition, expired joins blocking new joins due to pruning disable

* remove compat with old 2.0.0 blossomsub

* fix: resolve abandoned prover joins

* reload prover registry

* fix stale worker proposal edge

* add full sanity check on join before submitting to identify bug

* resolve non-fallthrough condition that should be fallthrough

* fix: resolve rare SIGFPE, fix orphan expired joins blocking workers from reallocating

* add reconnect fallback if no peers are found with variable reconnect time (#511)

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>

* update base peer count to 1 (#513)

* fix: expired prover join frames, starting port ranges, proposer getting stuck, and seniority on joins

* fix: panic on shutdown, libp2p discovery picking inaccessible peers, coverage event check not in shutdown logic, amend app shard worker behavior to mirror global for prover root reconciliation

* fix: shutdown scenario quirks, reload hanging

* fix: do not bailout early on shutdown of coverage check

* fix: force registry refresh on worker waiting for registration

* add more logging to wait for prover

* fix: worker manager refreshes the filter on allocation, snapshots blocking close on shutdown

* tweak: force shutdown after five seconds for app worker

* fix: don't loop when shutting down

* fix: slight reordering, also added named workers to trace hanging shutdowns

* use deterministic key for peer id of workers to stop flagging workers as sybil attacks

* fix: remove pubsub stop from app consensus engine as it shouldn't manage pubsub lifecycle, integrate shutdown context to PerformSync to prevent stuck syncs from halting respawn

* fix: blossomsub pubsub interface does not properly track subscription status

* fix: subscribe order to avoid nil panic

* switch from dnsaddr to dns4

* add missing quic-v1

* additional logging to isolate respawn quirks

* fix: dnsaddr -> dns4 for blossomsub

* allow debug env var to be read

---------

Co-authored-by: Cassandra Heart <cassandra@quilibrium.com>
Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com>

* fix newPebbleDB constructor config param (#517)

* fix: high CPU overhead in initial worker behaviors/ongoing sync

* faster docker builds with better caching

* qol: add extra data to node info, and query metrics from command line

* leave proposals for overcrowded shards

* hub-and-spoke global message broadcasts

* small tweaks to cli output for join frames

---------

Co-authored-by: winged-pegasus <55340199+winged-pegasus@users.noreply.github.com>
Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Black Swan <3999712+blacks1ne@users.noreply.github.com>
2026-03-04 01:37:04 -06:00

266 lines
6.0 KiB
Go

package app
import (
"bytes"
"context"
"io"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/global"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/rpm"
"source.quilibrium.com/quilibrium/monorepo/types/p2p"
)
func (e *AppConsensusEngine) subscribeToConsensusMessages() error {
proverKey, _, _, _ := e.GetProvingKey(e.config.Engine)
e.mixnet = rpm.NewRPMMixnet(
e.logger,
proverKey,
e.proverRegistry,
e.appAddress,
)
if err := e.pubsub.Subscribe(
e.getConsensusMessageBitmask(),
func(message *pb.Message) error {
select {
case <-e.haltCtx.Done():
return nil
case e.consensusMessageQueue <- message:
return nil
case <-e.ShutdownSignal():
return errors.New("context cancelled")
default:
e.logger.Warn("consensus message queue full, dropping message")
return nil
}
},
); err != nil {
return errors.Wrap(err, "subscribe to consensus messages")
}
// Register consensus message validator
if err := e.pubsub.RegisterValidator(
e.getConsensusMessageBitmask(),
func(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
return e.validateConsensusMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to consensus messages")
}
return nil
}
func (e *AppConsensusEngine) subscribeToProverMessages() error {
if err := e.pubsub.Subscribe(
e.getProverMessageBitmask(),
func(message *pb.Message) error {
select {
case <-e.haltCtx.Done():
return nil
case e.proverMessageQueue <- message:
e.logger.Debug("got prover message")
return nil
case <-e.ShutdownSignal():
return errors.New("context cancelled")
default:
e.logger.Warn("prover message queue full, dropping message")
return nil
}
},
); err != nil {
return errors.Wrap(err, "subscribe to prover messages")
}
// Register frame validator
if err := e.pubsub.RegisterValidator(
e.getProverMessageBitmask(),
func(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
return e.validateProverMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to prover messages")
}
return nil
}
func (e *AppConsensusEngine) subscribeToFrameMessages() error {
if err := e.pubsub.Subscribe(
e.getFrameMessageBitmask(),
func(message *pb.Message) error {
if e.IsInProverTrie(e.getProverAddress()) {
return nil
}
select {
case <-e.haltCtx.Done():
return nil
case e.frameMessageQueue <- message:
return nil
case <-e.ShutdownSignal():
return errors.New("context cancelled")
default:
e.logger.Warn("app message queue full, dropping message")
return nil
}
},
); err != nil {
return errors.Wrap(err, "subscribe to frame messages")
}
// Register frame validator
if err := e.pubsub.RegisterValidator(
e.getFrameMessageBitmask(),
func(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
return e.validateFrameMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to frame messages")
}
return nil
}
func (e *AppConsensusEngine) subscribeToDispatchMessages() error {
if err := e.pubsub.Subscribe(
e.getDispatchMessageBitmask(),
func(message *pb.Message) error {
select {
case e.dispatchMessageQueue <- message:
return nil
case <-e.ShutdownSignal():
return errors.New("context cancelled")
default:
e.logger.Warn("dispatch queue full, dropping message")
return nil
}
},
); err != nil {
return errors.Wrap(err, "subscribe to dispatch messages")
}
// Register dispatch validator
if err := e.pubsub.RegisterValidator(
e.getDispatchMessageBitmask(),
func(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
return e.validateDispatchMessage(peerID, message)
},
true,
); err != nil {
return errors.Wrap(err, "subscribe to dispatch messages")
}
return nil
}
func (e *AppConsensusEngine) streamGlobalMessagesFromMaster(
ctx lifecycle.SignalerContext,
) {
for {
select {
case <-ctx.Done():
return
default:
}
if err := e.ensureGlobalClient(); err != nil {
e.logger.Warn("global message stream: failed to connect to master",
zap.Error(err),
)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
continue
}
stream, err := e.globalClient.StreamGlobalMessages(
ctx,
&protobufs.StreamGlobalMessagesRequest{},
)
if err != nil {
e.logger.Warn("global message stream: failed to open stream",
zap.Error(err),
)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
continue
}
e.logger.Info("connected to master global message stream")
e.receiveGlobalMessages(ctx, stream)
e.logger.Warn("global message stream disconnected, reconnecting")
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
}
}
func (e *AppConsensusEngine) receiveGlobalMessages(
ctx lifecycle.SignalerContext,
stream protobufs.GlobalService_StreamGlobalMessagesClient,
) {
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
return
}
e.logger.Warn("global message stream: recv error",
zap.Error(err),
)
return
}
pbMsg := &pb.Message{
Data: msg.Data,
Bitmask: msg.Bitmask,
}
switch {
case bytes.Equal(msg.Bitmask, global.GLOBAL_FRAME_BITMASK):
select {
case e.globalFrameMessageQueue <- pbMsg:
default:
}
case bytes.Equal(msg.Bitmask, global.GLOBAL_ALERT_BITMASK):
select {
case e.globalAlertMessageQueue <- pbMsg:
default:
}
case bytes.Equal(msg.Bitmask, global.GLOBAL_PEER_INFO_BITMASK):
select {
case e.globalPeerInfoMessageQueue <- pbMsg:
default:
}
// GLOBAL_PROVER_BITMASK: intentionally not dispatched (workers discard)
}
}
}