ceremonyclient/node/consensus/global/consensus_liveness_provider.go
Cassandra Heart 0425b38fa2
v2.1.0.15 (#485)
* v2.1.0.15

* add release notes
2025-12-09 21:55:18 -06:00

193 lines
4.3 KiB
Go

package global
import (
"context"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
keyedaggregator "source.quilibrium.com/quilibrium/monorepo/node/keyedaggregator"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
)
// GlobalLivenessProvider implements LivenessProvider
type GlobalLivenessProvider struct {
engine *GlobalConsensusEngine
}
func (p *GlobalLivenessProvider) Collect(
ctx context.Context,
frameNumber uint64,
rank uint64,
) (GlobalCollectedCommitments, error) {
timer := prometheus.NewTimer(shardCommitmentCollectionDuration)
defer timer.ObserveDuration()
mixnetMessages := []*protobufs.Message{}
currentSet, _ := p.engine.proverRegistry.GetActiveProvers(nil)
if len(currentSet) >= 9 {
err := p.engine.mixnet.PrepareMixnet()
if err != nil {
p.engine.logger.Error(
"error preparing mixnet",
zap.Error(err),
)
}
// Get messages from mixnet
mixnetMessages = p.engine.mixnet.GetMessages()
}
var collector keyedaggregator.Collector[sequencedGlobalMessage]
var collectorRecords []*sequencedGlobalMessage
if p.engine.messageCollectors != nil {
var err error
var found bool
collector, found, err = p.engine.getMessageCollector(rank)
if err != nil && !errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) {
p.engine.logger.Warn(
"could not fetch collector for rank",
zap.Uint64("rank", rank),
zap.Error(err),
)
} else if found {
collectorRecords = collector.Records()
}
}
acceptedMessages := make(
[]*protobufs.Message,
0,
len(collectorRecords)+len(mixnetMessages),
)
if collector != nil {
for _, record := range collectorRecords {
if record == nil || record.message == nil {
continue
}
if err := p.lockCollectorMessage(
frameNumber,
record.message,
); err != nil {
p.engine.logger.Debug(
"message failed lock",
zap.Uint64("frame_number", frameNumber),
zap.Error(err),
)
collector.Remove(record)
continue
}
acceptedMessages = append(acceptedMessages, record.message)
}
}
messages := append([]*protobufs.Message{}, mixnetMessages...)
p.engine.logger.Debug(
"collected messages, validating",
zap.Int("message_count", len(messages)+len(collectorRecords)),
)
for i, message := range messages {
err := p.validateAndLockMessage(frameNumber, i, message)
if err != nil {
continue
}
acceptedMessages = append(acceptedMessages, message)
}
if p.engine.messageAggregator != nil {
p.engine.messageAggregator.OnSequenceChange(rank, rank+1)
}
err := p.engine.executionManager.Unlock()
if err != nil {
p.engine.logger.Error(
"unable to unlock",
zap.Error(err),
)
}
commitmentHash, err := p.engine.rebuildShardCommitments(frameNumber, rank)
if err != nil {
return GlobalCollectedCommitments{}, errors.Wrap(err, "collect")
}
// Store the accepted messages as canonical bytes for inclusion in the frame
collectedMsgs := make([][]byte, 0, len(acceptedMessages))
for _, msg := range acceptedMessages {
collectedMsgs = append(collectedMsgs, msg.Payload)
}
p.engine.collectedMessages = collectedMsgs
return GlobalCollectedCommitments{
frameNumber: frameNumber,
commitmentHash: commitmentHash,
prover: p.engine.getProverAddress(),
}, nil
}
func (p *GlobalLivenessProvider) validateAndLockMessage(
frameNumber uint64,
i int,
message *protobufs.Message,
) (err error) {
defer func() {
if r := recover(); r != nil {
p.engine.logger.Error(
"panic recovered from message",
zap.Any("panic", r),
zap.Stack("stacktrace"),
)
err = errors.New("panicked processing message")
}
}()
err = p.engine.executionManager.ValidateMessage(
frameNumber,
message.Address,
message.Payload,
)
if err != nil {
p.engine.logger.Debug(
"invalid message",
zap.Int("message_index", i),
zap.Error(err),
)
return err
}
_, err = p.engine.executionManager.Lock(
frameNumber,
message.Address,
message.Payload,
)
if err != nil {
p.engine.logger.Debug(
"message failed lock",
zap.Int("message_index", i),
zap.Error(err),
)
return err
}
return nil
}
func (p *GlobalLivenessProvider) lockCollectorMessage(
frameNumber uint64,
message *protobufs.Message,
) error {
if message == nil {
return errors.New("nil message")
}
_, err := p.engine.executionManager.Lock(
frameNumber,
message.Address,
message.Payload,
)
return err
}