ceremonyclient/node/consensus/app/message_collector.go
2025-11-26 03:22:48 -06:00

329 lines
7.0 KiB
Go

package app
import (
"fmt"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/consensus/models"
"source.quilibrium.com/quilibrium/monorepo/lifecycle"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/tracing"
keyedaggregator "source.quilibrium.com/quilibrium/monorepo/node/keyedaggregator"
keyedcollector "source.quilibrium.com/quilibrium/monorepo/node/keyedcollector"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
)
const maxAppMessagesPerRank = 100
type sequencedAppMessage struct {
rank uint64
frameNumber uint64
identity models.Identity
message *protobufs.Message
}
func newSequencedAppMessage(
rank uint64,
message *protobufs.Message,
) *sequencedAppMessage {
if message == nil {
return nil
}
cloned := proto.Clone(message).(*protobufs.Message)
return &sequencedAppMessage{
rank: rank,
identity: models.Identity(string(cloned.Hash)),
message: cloned,
}
}
var appMessageTraits = keyedcollector.RecordTraits[sequencedAppMessage]{
Sequence: func(m *sequencedAppMessage) uint64 {
if m == nil {
return 0
}
return m.rank
},
Identity: func(m *sequencedAppMessage) models.Identity {
if m == nil {
return ""
}
return m.identity
},
Equals: func(a, b *sequencedAppMessage) bool {
if a == nil || b == nil {
return a == b
}
return string(a.identity) == string(b.identity)
},
}
type appMessageProcessorFactory struct {
engine *AppConsensusEngine
}
func (f *appMessageProcessorFactory) Create(
sequence uint64,
) (keyedcollector.Processor[sequencedAppMessage], error) {
return &appMessageProcessor{
engine: f.engine,
rank: sequence,
}, nil
}
type appMessageProcessor struct {
engine *AppConsensusEngine
rank uint64
}
func (p *appMessageProcessor) Process(
record *sequencedAppMessage,
) error {
if record == nil || record.message == nil {
return keyedcollector.NewInvalidRecordError(
record,
fmt.Errorf("nil app message"),
)
}
if err := p.enforceCollectorLimit(record); err != nil {
return err
}
frameNumber, err := p.frameNumberForRank()
if err != nil {
return keyedcollector.NewInvalidRecordError(record, err)
}
if err := p.engine.executionManager.ValidateMessage(
frameNumber,
record.message.Address,
record.message.Payload,
); err != nil {
return keyedcollector.NewInvalidRecordError(record, err)
}
record.frameNumber = frameNumber
p.engine.updatePendingMessagesGauge(p.rank)
return nil
}
func (p *appMessageProcessor) frameNumberForRank() (uint64, error) {
rank := p.rank
if rank == 0 {
rank = 1
}
qc, err := p.engine.clockStore.GetQuorumCertificate(
p.engine.appAddress,
rank-1,
)
if err != nil {
qc, err = p.engine.clockStore.GetLatestQuorumCertificate(
p.engine.appAddress,
)
if err != nil {
return 0, err
}
}
return qc.GetFrameNumber() + 1, nil
}
func (p *appMessageProcessor) enforceCollectorLimit(
record *sequencedAppMessage,
) error {
collector, found, err := p.engine.getAppMessageCollector(p.rank)
if err != nil || !found {
return nil
}
if len(collector.Records()) >= maxAppMessagesPerRank {
collector.Remove(record)
p.engine.deferAppMessage(p.rank+1, record.message)
return keyedcollector.NewInvalidRecordError(
record,
fmt.Errorf("message limit reached for rank %d", p.rank),
)
}
return nil
}
func (e *AppConsensusEngine) initAppMessageAggregator() error {
tracer := tracing.NewZapTracer(e.logger.Named("app_message_collector"))
processorFactory := &appMessageProcessorFactory{engine: e}
collectorFactory, err := keyedcollector.NewFactory(
tracer,
appMessageTraits,
nil,
processorFactory,
)
if err != nil {
return err
}
e.messageCollectors = keyedaggregator.NewSequencedCollectors(
tracer,
0,
collectorFactory,
)
aggregator, err := keyedaggregator.NewSequencedAggregator(
tracer,
0,
e.messageCollectors,
func(m *sequencedAppMessage) uint64 {
if m == nil {
return 0
}
return m.rank
},
)
if err != nil {
return err
}
e.messageAggregator = aggregator
return nil
}
func (e *AppConsensusEngine) startAppMessageAggregator(
ctx lifecycle.SignalerContext,
ready lifecycle.ReadyFunc,
) {
if e.messageAggregator == nil {
ready()
<-ctx.Done()
return
}
go func() {
if err := e.messageAggregator.ComponentManager.Start(ctx); err != nil {
ctx.Throw(err)
}
}()
<-e.messageAggregator.ComponentManager.Ready()
ready()
<-e.messageAggregator.ComponentManager.Done()
}
func (e *AppConsensusEngine) addAppMessage(message *protobufs.Message) {
if e.messageAggregator == nil || message == nil {
return
}
if len(message.Hash) == 0 {
hash := sha3.Sum256(message.Payload)
message.Hash = hash[:]
}
rank := e.nextRank()
record := newSequencedAppMessage(rank, message)
if record == nil {
return
}
e.messageAggregator.Add(record)
}
func (e *AppConsensusEngine) nextRank() uint64 {
e.lastProposalRankMu.RLock()
last := e.lastProposalRank
e.lastProposalRankMu.RUnlock()
if last > 0 {
return last + 1
}
return e.currentRank + 1
}
func (e *AppConsensusEngine) getAppMessageCollector(
rank uint64,
) (keyedaggregator.Collector[sequencedAppMessage], bool, error) {
if e.messageCollectors == nil {
return nil, false, nil
}
return e.messageCollectors.GetCollector(rank)
}
func (e *AppConsensusEngine) recordProposalRank(rank uint64) {
if rank == 0 {
return
}
e.lastProposalRankMu.Lock()
if rank > e.lastProposalRank {
e.lastProposalRank = rank
}
e.lastProposalRankMu.Unlock()
}
func (e *AppConsensusEngine) updatePendingMessagesGauge(rank uint64) {
if e.messageCollectors == nil {
return
}
collector, found, err := e.getAppMessageCollector(rank)
if err != nil || !found {
return
}
pendingMessagesCount.WithLabelValues(e.appAddressHex).Set(
float64(len(collector.Records())),
)
}
func (e *AppConsensusEngine) deferAppMessage(
targetRank uint64,
message *protobufs.Message,
) {
if e == nil || message == nil || targetRank == 0 {
return
}
cloned := proto.Clone(message).(*protobufs.Message)
e.appSpilloverMu.Lock()
e.appMessageSpillover[targetRank] = append(
e.appMessageSpillover[targetRank],
cloned,
)
pending := len(e.appMessageSpillover[targetRank])
e.appSpilloverMu.Unlock()
if e.logger != nil {
e.logger.Debug(
"deferred app message due to collector limit",
zap.String("app_address", e.appAddressHex),
zap.Uint64("target_rank", targetRank),
zap.Int("pending", pending),
)
}
}
func (e *AppConsensusEngine) flushDeferredAppMessages(targetRank uint64) {
if e == nil || e.messageAggregator == nil || targetRank == 0 {
return
}
e.appSpilloverMu.Lock()
messages := e.appMessageSpillover[targetRank]
if len(messages) > 0 {
delete(e.appMessageSpillover, targetRank)
}
e.appSpilloverMu.Unlock()
if len(messages) == 0 {
return
}
for _, msg := range messages {
e.messageAggregator.Add(newSequencedAppMessage(targetRank, msg))
}
if e.logger != nil {
e.logger.Debug(
"replayed deferred app messages",
zap.String("app_address", e.appAddressHex),
zap.Uint64("target_rank", targetRank),
zap.Int("count", len(messages)),
)
}
}