mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
193 lines
4.3 KiB
Go
193 lines
4.3 KiB
Go
package global
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
keyedaggregator "source.quilibrium.com/quilibrium/monorepo/node/keyedaggregator"
|
|
"source.quilibrium.com/quilibrium/monorepo/protobufs"
|
|
)
|
|
|
|
// GlobalLivenessProvider implements LivenessProvider
|
|
type GlobalLivenessProvider struct {
|
|
engine *GlobalConsensusEngine
|
|
}
|
|
|
|
func (p *GlobalLivenessProvider) Collect(
|
|
ctx context.Context,
|
|
frameNumber uint64,
|
|
rank uint64,
|
|
) (GlobalCollectedCommitments, error) {
|
|
timer := prometheus.NewTimer(shardCommitmentCollectionDuration)
|
|
defer timer.ObserveDuration()
|
|
|
|
mixnetMessages := []*protobufs.Message{}
|
|
currentSet, _ := p.engine.proverRegistry.GetActiveProvers(nil)
|
|
if len(currentSet) >= 9 {
|
|
err := p.engine.mixnet.PrepareMixnet()
|
|
if err != nil {
|
|
p.engine.logger.Error(
|
|
"error preparing mixnet",
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
// Get messages from mixnet
|
|
mixnetMessages = p.engine.mixnet.GetMessages()
|
|
}
|
|
|
|
var collector keyedaggregator.Collector[sequencedGlobalMessage]
|
|
var collectorRecords []*sequencedGlobalMessage
|
|
if p.engine.messageCollectors != nil {
|
|
var err error
|
|
var found bool
|
|
collector, found, err = p.engine.getMessageCollector(rank)
|
|
if err != nil && !errors.Is(err, keyedaggregator.ErrSequenceBelowRetention) {
|
|
p.engine.logger.Warn(
|
|
"could not fetch collector for rank",
|
|
zap.Uint64("rank", rank),
|
|
zap.Error(err),
|
|
)
|
|
} else if found {
|
|
collectorRecords = collector.Records()
|
|
}
|
|
}
|
|
|
|
acceptedMessages := make(
|
|
[]*protobufs.Message,
|
|
0,
|
|
len(collectorRecords)+len(mixnetMessages),
|
|
)
|
|
|
|
if collector != nil {
|
|
for _, record := range collectorRecords {
|
|
if record == nil || record.message == nil {
|
|
continue
|
|
}
|
|
if err := p.lockCollectorMessage(
|
|
frameNumber,
|
|
record.message,
|
|
); err != nil {
|
|
p.engine.logger.Debug(
|
|
"message failed lock",
|
|
zap.Uint64("frame_number", frameNumber),
|
|
zap.Error(err),
|
|
)
|
|
collector.Remove(record)
|
|
continue
|
|
}
|
|
acceptedMessages = append(acceptedMessages, record.message)
|
|
}
|
|
}
|
|
|
|
messages := append([]*protobufs.Message{}, mixnetMessages...)
|
|
|
|
p.engine.logger.Debug(
|
|
"collected messages, validating",
|
|
zap.Int("message_count", len(messages)+len(collectorRecords)),
|
|
)
|
|
|
|
for i, message := range messages {
|
|
err := p.validateAndLockMessage(frameNumber, i, message)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
acceptedMessages = append(acceptedMessages, message)
|
|
}
|
|
|
|
if p.engine.messageAggregator != nil {
|
|
p.engine.messageAggregator.OnSequenceChange(rank, rank+1)
|
|
}
|
|
|
|
err := p.engine.executionManager.Unlock()
|
|
if err != nil {
|
|
p.engine.logger.Error(
|
|
"unable to unlock",
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
commitmentHash, err := p.engine.rebuildShardCommitments(frameNumber, rank)
|
|
if err != nil {
|
|
return GlobalCollectedCommitments{}, errors.Wrap(err, "collect")
|
|
}
|
|
|
|
// Store the accepted messages as canonical bytes for inclusion in the frame
|
|
collectedMsgs := make([][]byte, 0, len(acceptedMessages))
|
|
for _, msg := range acceptedMessages {
|
|
collectedMsgs = append(collectedMsgs, msg.Payload)
|
|
}
|
|
p.engine.collectedMessages = collectedMsgs
|
|
|
|
return GlobalCollectedCommitments{
|
|
frameNumber: frameNumber,
|
|
commitmentHash: commitmentHash,
|
|
prover: p.engine.getProverAddress(),
|
|
}, nil
|
|
}
|
|
|
|
func (p *GlobalLivenessProvider) validateAndLockMessage(
|
|
frameNumber uint64,
|
|
i int,
|
|
message *protobufs.Message,
|
|
) (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
p.engine.logger.Error(
|
|
"panic recovered from message",
|
|
zap.Any("panic", r),
|
|
zap.Stack("stacktrace"),
|
|
)
|
|
err = errors.New("panicked processing message")
|
|
}
|
|
}()
|
|
|
|
err = p.engine.executionManager.ValidateMessage(
|
|
frameNumber,
|
|
message.Address,
|
|
message.Payload,
|
|
)
|
|
if err != nil {
|
|
p.engine.logger.Debug(
|
|
"invalid message",
|
|
zap.Int("message_index", i),
|
|
zap.Error(err),
|
|
)
|
|
return err
|
|
}
|
|
|
|
_, err = p.engine.executionManager.Lock(
|
|
frameNumber,
|
|
message.Address,
|
|
message.Payload,
|
|
)
|
|
if err != nil {
|
|
p.engine.logger.Debug(
|
|
"message failed lock",
|
|
zap.Int("message_index", i),
|
|
zap.Error(err),
|
|
)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *GlobalLivenessProvider) lockCollectorMessage(
|
|
frameNumber uint64,
|
|
message *protobufs.Message,
|
|
) error {
|
|
if message == nil {
|
|
return errors.New("nil message")
|
|
}
|
|
_, err := p.engine.executionManager.Lock(
|
|
frameNumber,
|
|
message.Address,
|
|
message.Payload,
|
|
)
|
|
return err
|
|
}
|