ceremonyclient/node/consensus/data/broadcast_messaging.go
petricadaipegsp d1e65c1c92
Support frame fragmentation and dissemination (#396)
* Add clock frame fragment message

* Add clock frame fragment validation

* Add clock frame fragmentation utilities

* Add clock frame fragmentation message handling

* Report publication errors

* Publish info list after frame

* Add frame publish configuration

* Publish clock frame fragments

* Update BlossomSub dashboard

* Publish clock frame fragments in parallel
2024-12-02 16:25:10 -06:00

300 lines
7.0 KiB
Go

package data
import (
"crypto"
"crypto/rand"
mrand "math/rand"
"strings"
"sync"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/data/fragmentation"
qruntime "source.quilibrium.com/quilibrium/monorepo/node/internal/runtime"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (e *DataClockConsensusEngine) handleFrameMessage(
message *pb.Message,
) error {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.frameMessageProcessorCh <- message:
default:
e.logger.Warn("dropping frame message")
}
return nil
}
func (e *DataClockConsensusEngine) handleFrameFragmentMessage(
message *pb.Message,
) error {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.frameFragmentMessageProcessorCh <- message:
default:
e.logger.Warn("dropping frame fragment message")
}
return nil
}
func (e *DataClockConsensusEngine) handleTxMessage(
message *pb.Message,
) error {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.txMessageProcessorCh <- message:
default:
e.logger.Warn("dropping tx message")
}
return nil
}
func (e *DataClockConsensusEngine) handleInfoMessage(
message *pb.Message,
) error {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.infoMessageProcessorCh <- message:
default:
e.logger.Warn("dropping info message")
}
return nil
}
func (e *DataClockConsensusEngine) publishProof(
frame *protobufs.ClockFrame,
) error {
e.logger.Debug(
"publishing frame and aggregations",
zap.Uint64("frame_number", frame.FrameNumber),
)
timestamp := time.Now().UnixMilli()
reachability := e.pubSub.Reachability()
e.peerMapMx.Lock()
e.peerMap[string(e.pubSub.GetPeerID())] = &peerInfo{
peerId: e.pubSub.GetPeerID(),
multiaddr: "",
maxFrame: frame.FrameNumber,
version: config.GetVersion(),
patchVersion: config.GetPatchNumber(),
timestamp: timestamp,
totalDistance: e.dataTimeReel.GetTotalDistance().FillBytes(
make([]byte, 256),
),
reachability: reachability,
}
e.peerMapMx.Unlock()
cfg := e.config.Engine.FramePublish.WithDefaults()
if cfg.BallastSize > 0 {
frame = proto.Clone(frame).(*protobufs.ClockFrame)
frame.Padding = make([]byte, cfg.BallastSize)
}
publishFragmented := func() error {
var splitter fragmentation.ClockFrameSplitter
switch cfg := cfg.Fragmentation; cfg.Algorithm {
case "reed-solomon":
var err error
splitter, err = fragmentation.NewReedSolomonClockFrameSplitter(
cfg.ReedSolomon.DataShards,
cfg.ReedSolomon.ParityShards,
)
if err != nil {
return errors.Wrap(err, "creating reed-solomon splitter")
}
default:
return errors.Errorf("unsupported fragmentation algorithm: %s", cfg.Algorithm)
}
fragments, err := splitter.SplitClockFrame(frame)
if err != nil {
return errors.Wrap(err, "splitting clock frame")
}
mrand.Shuffle(len(fragments), func(i, j int) {
fragments[i], fragments[j] = fragments[j], fragments[i]
})
sign := func(b []byte) ([]byte, error) {
return e.provingKey.Sign(rand.Reader, b, crypto.Hash(0))
}
var wg sync.WaitGroup
defer wg.Wait()
throttle := make(chan struct{}, qruntime.WorkerCount(0, false))
for _, fragment := range fragments {
throttle <- struct{}{}
wg.Add(1)
go func(fragment *protobufs.ClockFrameFragment) {
defer func() { <-throttle }()
defer wg.Done()
if err := fragment.SignED448(e.provingKeyBytes, sign); err != nil {
e.logger.Error("error signing clock frame fragment", zap.Error(err))
return
}
if err := e.publishMessage(e.frameFragmentFilter, fragment); err != nil {
e.logger.Error("error publishing clock frame fragment", zap.Error(err))
}
}(fragment)
}
return nil
}
publishFull := func() error {
if err := e.publishMessage(e.frameFilter, frame); err != nil {
e.logger.Error("error publishing clock frame", zap.Error(err))
}
return nil
}
switch cfg.Mode {
case "full":
if err := publishFull(); err != nil {
return err
}
case "fragmented":
if err := publishFragmented(); err != nil {
return err
}
case "dual":
if err := publishFragmented(); err != nil {
return err
}
if err := publishFull(); err != nil {
return err
}
case "threshold":
if proto.Size(frame) >= cfg.Threshold {
if err := publishFragmented(); err != nil {
return err
}
} else {
if err := publishFull(); err != nil {
return err
}
}
default:
return errors.Errorf("unsupported frame publish mode: %s", cfg.Mode)
}
list := &protobufs.DataPeerListAnnounce{
Peer: &protobufs.DataPeer{
PeerId: nil,
Multiaddr: "",
MaxFrame: frame.FrameNumber,
Version: config.GetVersion(),
PatchVersion: []byte{config.GetPatchNumber()},
Timestamp: timestamp,
TotalDistance: e.dataTimeReel.GetTotalDistance().FillBytes(
make([]byte, 256),
),
ExternallyReachable: reachability,
},
}
if err := e.publishMessage(e.infoFilter, list); err != nil {
e.logger.Debug("error publishing data peer list announce", zap.Error(err))
}
return nil
}
func (e *DataClockConsensusEngine) insertTxMessage(
filter []byte,
message proto.Message,
) error {
a := &anypb.Any{}
if err := a.MarshalFrom(message); err != nil {
return errors.Wrap(err, "publish message")
}
a.TypeUrl = strings.Replace(
a.TypeUrl,
"type.googleapis.com",
"types.quilibrium.com",
1,
)
payload, err := proto.Marshal(a)
if err != nil {
return errors.Wrap(err, "publish message")
}
h, err := poseidon.HashBytes(payload)
if err != nil {
return errors.Wrap(err, "publish message")
}
msg := &protobufs.Message{
Hash: h.Bytes(),
Address: e.provingKeyAddress,
Payload: payload,
}
data, err := proto.Marshal(msg)
if err != nil {
return errors.Wrap(err, "publish message")
}
m := &pb.Message{
Data: data,
Bitmask: filter,
From: e.pubSub.GetPeerID(),
Seqno: nil,
}
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.txMessageProcessorCh <- m:
default:
e.logger.Warn("dropping tx message")
}
return nil
}
func (e *DataClockConsensusEngine) publishMessage(
filter []byte,
message proto.Message,
) error {
a := &anypb.Any{}
if err := a.MarshalFrom(message); err != nil {
return errors.Wrap(err, "publish message")
}
a.TypeUrl = strings.Replace(
a.TypeUrl,
"type.googleapis.com",
"types.quilibrium.com",
1,
)
payload, err := proto.Marshal(a)
if err != nil {
return errors.Wrap(err, "publish message")
}
h, err := poseidon.HashBytes(payload)
if err != nil {
return errors.Wrap(err, "publish message")
}
msg := &protobufs.Message{
Hash: h.Bytes(),
Address: e.provingKeyAddress,
Payload: payload,
}
data, err := proto.Marshal(msg)
if err != nil {
return errors.Wrap(err, "publish message")
}
return e.pubSub.PublishToBitmask(filter, data)
}