ceremonyclient/node/consensus/global/message_collector.go
Cassandra Heart 12996487c3
v2.1.0.18 (#508)
* experiment: reject bad peer info messages

* v2.1.0.18 preview

* add tagged sync

* Add missing hypergraph changes

* small tweaks to sync

* allow local sync, use it for provers with workers

* missing file

* resolve build error

* resolve sync issue, remove raw sync

* resolve deletion promotion bug

* resolve sync abstraction leak from tree deletion changes

* rearrange prover sync

* remove pruning from sync

* restore removed sync flag

* fix: sync, event stream deadlock, heuristic scoring of better shards

* resolve hanging shutdown + pubsub proxy issue

* further bugfixes: sync (restore old leaf sync), pubsub shutdown, merge events

* fix: clean up rust ffi, background coverage events, and sync tweaks

* fix: linking issue for channel, connectivity test aggression, sync regression, join tests

* fix: disjoint sync, improper application of filter

* resolve sync/reel/validation deadlock

* adjust sync to handle no leaf edge cases, multi-path segment traversal

* use simpler sync

* faster, simpler sync with some debug extras

* migration to recalculate

* don't use batch

* square up the roots

* fix nil pointer

* fix: seniority calculation, sync race condition, migration

* make sync dumber

* fix: tree deletion issue

* fix: missing seniority merge request canonical serialization

* address issues from previous commit test

* stale workers should be cleared

* remove missing gap check

* rearrange collect, reduce sync logging noise

* fix: the disjoint leaf/branch sync case

* nuclear option on sync failures

* v2.1.0.18, finalized
2026-02-08 23:51:51 -06:00

516 lines
14 KiB
Go

package global
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"slices"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing"
keyedaggregator "source.quilibrium.com/quilibrium/monorepo/node/keyedaggregator"
keyedcollector "source.quilibrium.com/quilibrium/monorepo/node/keyedcollector"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
)
const maxGlobalMessagesPerFrame = 100
var globalMessageAddress = bytes.Repeat([]byte{0xff}, 32)
type sequencedGlobalMessage struct {
sequence uint64
identity models.Identity
payload []byte
message *protobufs.Message
}
func newSequencedGlobalMessage(
sequence uint64,
payload []byte,
) *sequencedGlobalMessage {
copyPayload := slices.Clone(payload)
hash := sha3.Sum256(copyPayload)
return &sequencedGlobalMessage{
sequence: sequence,
identity: models.Identity(string(hash[:])),
payload: copyPayload,
}
}
var globalMessageTraits = keyedcollector.RecordTraits[sequencedGlobalMessage]{
Sequence: func(m *sequencedGlobalMessage) uint64 {
if m == nil {
return 0
}
return m.sequence
},
Identity: func(m *sequencedGlobalMessage) models.Identity {
if m == nil {
return ""
}
return m.identity
},
Equals: func(a, b *sequencedGlobalMessage) bool {
if a == nil || b == nil {
return a == b
}
return slices.Equal(a.payload, b.payload)
},
}
type globalMessageProcessorFactory struct {
engine *GlobalConsensusEngine
}
func (f *globalMessageProcessorFactory) Create(
sequence uint64,
) (keyedcollector.Processor[sequencedGlobalMessage], error) {
return &globalMessageProcessor{
engine: f.engine,
sequence: sequence,
}, nil
}
type globalMessageProcessor struct {
engine *GlobalConsensusEngine
sequence uint64
}
func (p *globalMessageProcessor) Process(
record *sequencedGlobalMessage,
) error {
if record == nil {
return keyedcollector.NewInvalidRecordError(
record,
errors.New("nil global message"),
)
}
if len(record.payload) < 4 {
return keyedcollector.NewInvalidRecordError(
record,
errors.New("global message payload too short"),
)
}
typePrefix := binary.BigEndian.Uint32(record.payload[:4])
if typePrefix != protobufs.MessageBundleType {
return keyedcollector.NewInvalidRecordError(
record,
fmt.Errorf("unexpected message type: %d", typePrefix),
)
}
message := &protobufs.Message{
Address: globalMessageAddress,
Payload: record.payload,
}
if err := p.enforceCollectorLimit(record); err != nil {
return err
}
qc, err := p.engine.clockStore.GetQuorumCertificate(nil, record.sequence-1)
if err != nil {
qc, err = p.engine.clockStore.GetLatestQuorumCertificate(nil)
}
if err != nil {
return keyedcollector.NewInvalidRecordError(record, err)
}
if err := p.engine.executionManager.ValidateMessage(
qc.FrameNumber+1,
message.Address,
message.Payload,
); err != nil {
return keyedcollector.NewInvalidRecordError(record, err)
}
record.message = message
return nil
}
func (p *globalMessageProcessor) enforceCollectorLimit(
record *sequencedGlobalMessage,
) error {
collector, found, err := p.engine.getMessageCollector(p.sequence)
if err != nil || !found {
return nil
}
if len(collector.Records()) >= maxGlobalMessagesPerFrame {
collector.Remove(record)
// p.engine.deferGlobalMessage(record.sequence+1, record.payload)
return keyedcollector.NewInvalidRecordError(
record,
fmt.Errorf("message limit reached for frame %d", p.sequence),
)
}
return nil
}
func (e *GlobalConsensusEngine) initGlobalMessageAggregator() error {
tracer := tracing.NewZapTracer(e.logger.Named("global_message_collector"))
processorFactory := &globalMessageProcessorFactory{engine: e}
collectorFactory, err := keyedcollector.NewFactory(
tracer,
globalMessageTraits,
nil,
processorFactory,
)
if err != nil {
return fmt.Errorf("global message collector factory: %w", err)
}
e.messageCollectors = keyedaggregator.NewSequencedCollectors(
tracer,
0,
collectorFactory,
)
aggregator, err := keyedaggregator.NewSequencedAggregator(
tracer,
0,
e.messageCollectors,
func(m *sequencedGlobalMessage) uint64 {
if m == nil {
return 0
}
return m.sequence
},
)
if err != nil {
return fmt.Errorf("global message aggregator: %w", err)
}
e.messageAggregator = aggregator
return nil
}
func (e *GlobalConsensusEngine) startGlobalMessageAggregator(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
if e.messageAggregator == nil {
ready()
<-ctx.Done()
return
}
go func() {
if err := e.messageAggregator.ComponentManager.Start(ctx); err != nil {
ctx.Throw(err)
}
}()
<-e.messageAggregator.ComponentManager.Ready()
ready()
<-e.messageAggregator.ComponentManager.Done()
}
func (e *GlobalConsensusEngine) addGlobalMessage(data []byte) {
if e.messageAggregator == nil || len(data) == 0 {
return
}
payload := data // buildutils:allow-slice-alias slice is static
if len(data) >= 4 {
typePrefix := binary.BigEndian.Uint32(data[:4])
if typePrefix == protobufs.MessageBundleType {
bundle := &protobufs.MessageBundle{}
if err := bundle.FromCanonicalBytes(data); err != nil {
if e.logger != nil {
e.logger.Debug(
"failed to decode message bundle for collector",
zap.Error(err),
)
}
return
}
// In prover-only mode, filter out non-prover messages
if e.proverOnlyMode.Load() {
bundle.Requests = e.filterProverOnlyRequests(bundle.Requests)
if len(bundle.Requests) == 0 {
// All requests were filtered out
return
}
}
if len(bundle.Requests) > maxGlobalMessagesPerFrame {
if e.logger != nil {
e.logger.Debug(
"truncating message bundle requests for collector",
zap.Int("original", len(bundle.Requests)),
zap.Int("limit", maxGlobalMessagesPerFrame),
)
}
bundle.Requests = bundle.Requests[:maxGlobalMessagesPerFrame]
}
e.logBundleRequestTypes(bundle)
encoded, err := bundle.ToCanonicalBytes()
if err != nil {
if e.logger != nil {
e.logger.Debug(
"failed to re-encode message bundle for collector",
zap.Error(err),
)
}
return
}
payload = encoded
}
}
record := newSequencedGlobalMessage(e.currentRank+1, payload)
e.messageAggregator.Add(record)
}
// filterProverOnlyRequests filters a list of message requests to only include
// prover-related messages. This is used when in prover-only mode due to
// insufficient coverage.
func (e *GlobalConsensusEngine) filterProverOnlyRequests(
requests []*protobufs.MessageRequest,
) []*protobufs.MessageRequest {
filtered := make([]*protobufs.MessageRequest, 0, len(requests))
droppedCount := 0
for _, req := range requests {
if req == nil || req.GetRequest() == nil {
continue
}
// Only allow prover-related message types
switch req.GetRequest().(type) {
case *protobufs.MessageRequest_Join,
*protobufs.MessageRequest_Leave,
*protobufs.MessageRequest_Pause,
*protobufs.MessageRequest_Resume,
*protobufs.MessageRequest_Confirm,
*protobufs.MessageRequest_Reject,
*protobufs.MessageRequest_Kick,
*protobufs.MessageRequest_Update,
*protobufs.MessageRequest_SeniorityMerge:
// Prover messages are allowed
filtered = append(filtered, req)
default:
// All other messages are dropped in prover-only mode
droppedCount++
}
}
if droppedCount > 0 && e.logger != nil {
e.logger.Debug(
"dropped non-prover messages in prover-only mode",
zap.Int("dropped_count", droppedCount),
zap.Int("allowed_count", len(filtered)),
)
}
return filtered
}
func (e *GlobalConsensusEngine) logBundleRequestTypes(
bundle *protobufs.MessageBundle,
) {
requestTypes := make([]string, 0, len(bundle.Requests))
detailFields := make([]zap.Field, 0)
for idx, request := range bundle.Requests {
typeName, detailField, hasDetail := requestTypeNameAndDetail(idx, request)
requestTypes = append(requestTypes, typeName)
if hasDetail {
detailFields = append(detailFields, detailField)
}
}
fields := []zap.Field{
zap.Int("request_count", len(bundle.Requests)),
zap.Strings("request_types", requestTypes),
zap.Int64("bundle_timestamp", bundle.Timestamp),
}
fields = append(fields, detailFields...)
e.logger.Debug("collected global request bundle", fields...)
}
func requestTypeNameAndDetail(
idx int,
req *protobufs.MessageRequest,
) (string, zap.Field, bool) {
if req == nil || req.GetRequest() == nil {
return "nil_request", zap.Field{}, false
}
switch actual := req.GetRequest().(type) {
case *protobufs.MessageRequest_Join:
return "ProverJoin", zap.Field{}, false
case *protobufs.MessageRequest_Leave:
return "ProverLeave", zap.Field{}, false
case *protobufs.MessageRequest_Pause:
return "ProverPause", zap.Field{}, false
case *protobufs.MessageRequest_Resume:
return "ProverResume", zap.Field{}, false
case *protobufs.MessageRequest_Confirm:
return "ProverConfirm", zap.Field{}, false
case *protobufs.MessageRequest_Reject:
return "ProverReject", zap.Field{}, false
case *protobufs.MessageRequest_Kick:
return "ProverKick", zap.Field{}, false
case *protobufs.MessageRequest_Update:
return "ProverUpdate",
zap.Any(fmt.Sprintf("request_%d_prover_update", idx), actual.Update),
true
case *protobufs.MessageRequest_SeniorityMerge:
return "ProverSeniorityMerge",
zap.Any(fmt.Sprintf("request_%d_seniority_merge", idx), actual.SeniorityMerge),
true
case *protobufs.MessageRequest_TokenDeploy:
return "TokenDeploy",
zap.Any(fmt.Sprintf("request_%d_token_deploy", idx), actual.TokenDeploy),
true
case *protobufs.MessageRequest_TokenUpdate:
return "TokenUpdate",
zap.Any(fmt.Sprintf("request_%d_token_update", idx), actual.TokenUpdate),
true
case *protobufs.MessageRequest_Transaction:
return "Transaction",
zap.Any(fmt.Sprintf("request_%d_transaction", idx), actual.Transaction),
true
case *protobufs.MessageRequest_PendingTransaction:
return "PendingTransaction",
zap.Any(
fmt.Sprintf("request_%d_pending_transaction", idx),
actual.PendingTransaction,
),
true
case *protobufs.MessageRequest_MintTransaction:
return "MintTransaction",
zap.Any(fmt.Sprintf("request_%d_mint_transaction", idx), actual.MintTransaction),
true
case *protobufs.MessageRequest_HypergraphDeploy:
return "HypergraphDeploy",
zap.Any(fmt.Sprintf("request_%d_hypergraph_deploy", idx), actual.HypergraphDeploy),
true
case *protobufs.MessageRequest_HypergraphUpdate:
return "HypergraphUpdate",
zap.Any(fmt.Sprintf("request_%d_hypergraph_update", idx), actual.HypergraphUpdate),
true
case *protobufs.MessageRequest_VertexAdd:
return "VertexAdd",
zap.Any(fmt.Sprintf("request_%d_vertex_add", idx), actual.VertexAdd),
true
case *protobufs.MessageRequest_VertexRemove:
return "VertexRemove",
zap.Any(fmt.Sprintf("request_%d_vertex_remove", idx), actual.VertexRemove),
true
case *protobufs.MessageRequest_HyperedgeAdd:
return "HyperedgeAdd",
zap.Any(fmt.Sprintf("request_%d_hyperedge_add", idx), actual.HyperedgeAdd),
true
case *protobufs.MessageRequest_HyperedgeRemove:
return "HyperedgeRemove",
zap.Any(fmt.Sprintf("request_%d_hyperedge_remove", idx), actual.HyperedgeRemove),
true
case *protobufs.MessageRequest_ComputeDeploy:
return "ComputeDeploy",
zap.Any(fmt.Sprintf("request_%d_compute_deploy", idx), actual.ComputeDeploy),
true
case *protobufs.MessageRequest_ComputeUpdate:
return "ComputeUpdate",
zap.Any(fmt.Sprintf("request_%d_compute_update", idx), actual.ComputeUpdate),
true
case *protobufs.MessageRequest_CodeDeploy:
return "CodeDeploy",
zap.Any(fmt.Sprintf("request_%d_code_deploy", idx), actual.CodeDeploy),
true
case *protobufs.MessageRequest_CodeExecute:
return "CodeExecute",
zap.Any(fmt.Sprintf("request_%d_code_execute", idx), actual.CodeExecute),
true
case *protobufs.MessageRequest_CodeFinalize:
return "CodeFinalize",
zap.Any(fmt.Sprintf("request_%d_code_finalize", idx), actual.CodeFinalize),
true
case *protobufs.MessageRequest_Shard:
return "ShardFrame",
zap.Any(fmt.Sprintf("request_%d_shard_frame", idx), actual.Shard),
true
default:
return "unknown_request", zap.Field{}, false
}
}
func (e *GlobalConsensusEngine) getMessageCollector(
rank uint64,
) (keyedaggregator.Collector[sequencedGlobalMessage], bool, error) {
if e.messageCollectors == nil {
return nil, false, nil
}
return e.messageCollectors.GetCollector(rank)
}
func (e *GlobalConsensusEngine) deferGlobalMessage(
targetRank uint64,
payload []byte,
) {
if e == nil || len(payload) == 0 || targetRank == 0 {
return
}
cloned := slices.Clone(payload)
e.globalSpilloverMu.Lock()
e.globalMessageSpillover[targetRank] = append(
e.globalMessageSpillover[targetRank],
cloned,
)
pending := len(e.globalMessageSpillover[targetRank])
e.globalSpilloverMu.Unlock()
if e.logger != nil {
e.logger.Debug(
"deferred global message due to collector limit",
zap.Uint64("target_rank", targetRank),
zap.Int("pending", pending),
)
}
}
func (e *GlobalConsensusEngine) flushDeferredGlobalMessages(targetRank uint64) {
if e == nil || e.messageAggregator == nil || targetRank == 0 {
return
}
e.globalSpilloverMu.Lock()
payloads := e.globalMessageSpillover[targetRank]
if len(payloads) > 0 {
delete(e.globalMessageSpillover, targetRank)
}
e.globalSpilloverMu.Unlock()
if len(payloads) == 0 {
return
}
for _, payload := range payloads {
e.messageAggregator.Add(
newSequencedGlobalMessage(targetRank, payload),
)
}
if e.logger != nil {
e.logger.Debug(
"replayed deferred global messages",
zap.Uint64("target_rank", targetRank),
zap.Int("count", len(payloads)),
)
}
}