mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
* Add clock frame fragment message * Add clock frame fragment validation * Add clock frame fragmentation utilities * Add clock frame fragmentation message handling * Report publication errors * Publish info list after frame * Add frame publish configuration * Publish clock frame fragments * Update BlossomSub dashboard * Publish clock frame fragments in parallel
300 lines
7.0 KiB
Go
300 lines
7.0 KiB
Go
package data
|
|
|
|
import (
|
|
"crypto"
|
|
"crypto/rand"
|
|
mrand "math/rand"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/iden3/go-iden3-crypto/poseidon"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/config"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/consensus/data/fragmentation"
|
|
qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime"
|
|
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
|
|
)
|
|
|
|
func (e *DataClockConsensusEngine) handleFrameMessage(
|
|
message *pb.Message,
|
|
) error {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return e.ctx.Err()
|
|
case e.frameMessageProcessorCh <- message:
|
|
default:
|
|
e.logger.Warn("dropping frame message")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *DataClockConsensusEngine) handleFrameFragmentMessage(
|
|
message *pb.Message,
|
|
) error {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return e.ctx.Err()
|
|
case e.frameFragmentMessageProcessorCh <- message:
|
|
default:
|
|
e.logger.Warn("dropping frame fragment message")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *DataClockConsensusEngine) handleTxMessage(
|
|
message *pb.Message,
|
|
) error {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return e.ctx.Err()
|
|
case e.txMessageProcessorCh <- message:
|
|
default:
|
|
e.logger.Warn("dropping tx message")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *DataClockConsensusEngine) handleInfoMessage(
|
|
message *pb.Message,
|
|
) error {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return e.ctx.Err()
|
|
case e.infoMessageProcessorCh <- message:
|
|
default:
|
|
e.logger.Warn("dropping info message")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *DataClockConsensusEngine) publishProof(
|
|
frame *protobufs.ClockFrame,
|
|
) error {
|
|
e.logger.Debug(
|
|
"publishing frame and aggregations",
|
|
zap.Uint64("frame_number", frame.FrameNumber),
|
|
)
|
|
|
|
timestamp := time.Now().UnixMilli()
|
|
reachability := e.pubSub.Reachability()
|
|
|
|
e.peerMapMx.Lock()
|
|
e.peerMap[string(e.pubSub.GetPeerID())] = &peerInfo{
|
|
peerId: e.pubSub.GetPeerID(),
|
|
multiaddr: "",
|
|
maxFrame: frame.FrameNumber,
|
|
version: config.GetVersion(),
|
|
patchVersion: config.GetPatchNumber(),
|
|
timestamp: timestamp,
|
|
totalDistance: e.dataTimeReel.GetTotalDistance().FillBytes(
|
|
make([]byte, 256),
|
|
),
|
|
reachability: reachability,
|
|
}
|
|
e.peerMapMx.Unlock()
|
|
|
|
cfg := e.config.Engine.FramePublish.WithDefaults()
|
|
if cfg.BallastSize > 0 {
|
|
frame = proto.Clone(frame).(*protobufs.ClockFrame)
|
|
frame.Padding = make([]byte, cfg.BallastSize)
|
|
}
|
|
|
|
publishFragmented := func() error {
|
|
var splitter fragmentation.ClockFrameSplitter
|
|
switch cfg := cfg.Fragmentation; cfg.Algorithm {
|
|
case "reed-solomon":
|
|
var err error
|
|
splitter, err = fragmentation.NewReedSolomonClockFrameSplitter(
|
|
cfg.ReedSolomon.DataShards,
|
|
cfg.ReedSolomon.ParityShards,
|
|
)
|
|
if err != nil {
|
|
return errors.Wrap(err, "creating reed-solomon splitter")
|
|
}
|
|
default:
|
|
return errors.Errorf("unsupported fragmentation algorithm: %s", cfg.Algorithm)
|
|
}
|
|
fragments, err := splitter.SplitClockFrame(frame)
|
|
if err != nil {
|
|
return errors.Wrap(err, "splitting clock frame")
|
|
}
|
|
mrand.Shuffle(len(fragments), func(i, j int) {
|
|
fragments[i], fragments[j] = fragments[j], fragments[i]
|
|
})
|
|
sign := func(b []byte) ([]byte, error) {
|
|
return e.provingKey.Sign(rand.Reader, b, crypto.Hash(0))
|
|
}
|
|
var wg sync.WaitGroup
|
|
defer wg.Wait()
|
|
throttle := make(chan struct{}, qruntime.WorkerCount(0, false))
|
|
for _, fragment := range fragments {
|
|
throttle <- struct{}{}
|
|
wg.Add(1)
|
|
go func(fragment *protobufs.ClockFrameFragment) {
|
|
defer func() { <-throttle }()
|
|
defer wg.Done()
|
|
if err := fragment.SignED448(e.provingKeyBytes, sign); err != nil {
|
|
e.logger.Error("error signing clock frame fragment", zap.Error(err))
|
|
return
|
|
}
|
|
if err := e.publishMessage(e.frameFragmentFilter, fragment); err != nil {
|
|
e.logger.Error("error publishing clock frame fragment", zap.Error(err))
|
|
}
|
|
}(fragment)
|
|
}
|
|
return nil
|
|
}
|
|
publishFull := func() error {
|
|
if err := e.publishMessage(e.frameFilter, frame); err != nil {
|
|
e.logger.Error("error publishing clock frame", zap.Error(err))
|
|
}
|
|
return nil
|
|
}
|
|
switch cfg.Mode {
|
|
case "full":
|
|
if err := publishFull(); err != nil {
|
|
return err
|
|
}
|
|
case "fragmented":
|
|
if err := publishFragmented(); err != nil {
|
|
return err
|
|
}
|
|
case "dual":
|
|
if err := publishFragmented(); err != nil {
|
|
return err
|
|
}
|
|
if err := publishFull(); err != nil {
|
|
return err
|
|
}
|
|
case "threshold":
|
|
if proto.Size(frame) >= cfg.Threshold {
|
|
if err := publishFragmented(); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := publishFull(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
default:
|
|
return errors.Errorf("unsupported frame publish mode: %s", cfg.Mode)
|
|
}
|
|
|
|
list := &protobufs.DataPeerListAnnounce{
|
|
Peer: &protobufs.DataPeer{
|
|
PeerId: nil,
|
|
Multiaddr: "",
|
|
MaxFrame: frame.FrameNumber,
|
|
Version: config.GetVersion(),
|
|
PatchVersion: []byte{config.GetPatchNumber()},
|
|
Timestamp: timestamp,
|
|
TotalDistance: e.dataTimeReel.GetTotalDistance().FillBytes(
|
|
make([]byte, 256),
|
|
),
|
|
ExternallyReachable: reachability,
|
|
},
|
|
}
|
|
if err := e.publishMessage(e.infoFilter, list); err != nil {
|
|
e.logger.Debug("error publishing data peer list announce", zap.Error(err))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *DataClockConsensusEngine) insertTxMessage(
|
|
filter []byte,
|
|
message proto.Message,
|
|
) error {
|
|
a := &anypb.Any{}
|
|
if err := a.MarshalFrom(message); err != nil {
|
|
return errors.Wrap(err, "publish message")
|
|
}
|
|
|
|
a.TypeUrl = strings.Replace(
|
|
a.TypeUrl,
|
|
"type.googleapis.com",
|
|
"types.quilibrium.com",
|
|
1,
|
|
)
|
|
|
|
payload, err := proto.Marshal(a)
|
|
if err != nil {
|
|
return errors.Wrap(err, "publish message")
|
|
}
|
|
|
|
h, err := poseidon.HashBytes(payload)
|
|
if err != nil {
|
|
return errors.Wrap(err, "publish message")
|
|
}
|
|
|
|
msg := &protobufs.Message{
|
|
Hash: h.Bytes(),
|
|
Address: e.provingKeyAddress,
|
|
Payload: payload,
|
|
}
|
|
data, err := proto.Marshal(msg)
|
|
if err != nil {
|
|
return errors.Wrap(err, "publish message")
|
|
}
|
|
|
|
m := &pb.Message{
|
|
Data: data,
|
|
Bitmask: filter,
|
|
From: e.pubSub.GetPeerID(),
|
|
Seqno: nil,
|
|
}
|
|
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return e.ctx.Err()
|
|
case e.txMessageProcessorCh <- m:
|
|
default:
|
|
e.logger.Warn("dropping tx message")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *DataClockConsensusEngine) publishMessage(
|
|
filter []byte,
|
|
message proto.Message,
|
|
) error {
|
|
a := &anypb.Any{}
|
|
if err := a.MarshalFrom(message); err != nil {
|
|
return errors.Wrap(err, "publish message")
|
|
}
|
|
|
|
a.TypeUrl = strings.Replace(
|
|
a.TypeUrl,
|
|
"type.googleapis.com",
|
|
"types.quilibrium.com",
|
|
1,
|
|
)
|
|
|
|
payload, err := proto.Marshal(a)
|
|
if err != nil {
|
|
return errors.Wrap(err, "publish message")
|
|
}
|
|
|
|
h, err := poseidon.HashBytes(payload)
|
|
if err != nil {
|
|
return errors.Wrap(err, "publish message")
|
|
}
|
|
|
|
msg := &protobufs.Message{
|
|
Hash: h.Bytes(),
|
|
Address: e.provingKeyAddress,
|
|
Payload: payload,
|
|
}
|
|
data, err := proto.Marshal(msg)
|
|
if err != nil {
|
|
return errors.Wrap(err, "publish message")
|
|
}
|
|
return e.pubSub.PublishToBitmask(filter, data)
|
|
}
|