Add message validators (#346)

This commit is contained in:
petricadaipegsp 2024-11-11 21:10:00 +01:00 committed by GitHub
parent 26cbb2092e
commit 3dbe0723bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 159 additions and 4 deletions

View File

@ -302,6 +302,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
go e.runInfoMessageHandler()
e.logger.Info("subscribing to pubsub messages")
e.pubSub.RegisterValidator(e.frameFilter, e.validateFrameMessage)
e.pubSub.RegisterValidator(e.txFilter, e.validateTxMessage)
e.pubSub.RegisterValidator(e.infoFilter, e.validateInfoMessage)
e.pubSub.Subscribe(e.frameFilter, e.handleFrameMessage)
e.pubSub.Subscribe(e.txFilter, e.handleTxMessage)
e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage)
@ -630,6 +633,13 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
}(name)
}
e.pubSub.Unsubscribe(e.frameFilter, false)
e.pubSub.Unsubscribe(e.txFilter, false)
e.pubSub.Unsubscribe(e.infoFilter, false)
e.pubSub.UnregisterValidator(e.frameFilter)
e.pubSub.UnregisterValidator(e.txFilter)
e.pubSub.UnregisterValidator(e.infoFilter)
e.logger.Info("waiting for execution engines to stop")
wg.Wait()
e.logger.Info("execution engines stopped")

View File

@ -0,0 +1,101 @@
package data
import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (e *DataClockConsensusEngine) validateFrameMessage(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
msg := &protobufs.Message{}
if err := proto.Unmarshal(message.Data, msg); err != nil {
e.logger.Debug("could not unmarshal message", zap.Error(err))
return p2p.ValidationResultReject
}
a := &anypb.Any{}
if err := proto.Unmarshal(msg.Payload, a); err != nil {
e.logger.Debug("could not unmarshal payload", zap.Error(err))
return p2p.ValidationResultReject
}
switch a.TypeUrl {
case protobufs.ClockFrameType:
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(a.Value, frame); err != nil {
e.logger.Debug("could not unmarshal frame", zap.Error(err))
return p2p.ValidationResultReject
}
if ts := time.UnixMilli(frame.Timestamp); time.Since(ts) > time.Hour {
e.logger.Debug("frame is too old", zap.Time("timestamp", ts))
return p2p.ValidationResultIgnore
}
return p2p.ValidationResultAccept
default:
e.logger.Debug("unknown message type", zap.String("type_url", a.TypeUrl))
return p2p.ValidationResultReject
}
}
func (e *DataClockConsensusEngine) validateTxMessage(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
msg := &protobufs.Message{}
if err := proto.Unmarshal(message.Data, msg); err != nil {
e.logger.Debug("could not unmarshal message", zap.Error(err))
return p2p.ValidationResultReject
}
a := &anypb.Any{}
if err := proto.Unmarshal(msg.Payload, a); err != nil {
e.logger.Debug("could not unmarshal payload", zap.Error(err))
return p2p.ValidationResultReject
}
switch a.TypeUrl {
case protobufs.TokenRequestType:
tx := &protobufs.TokenRequest{}
if err := proto.Unmarshal(a.Value, tx); err != nil {
e.logger.Debug("could not unmarshal token request", zap.Error(err))
return p2p.ValidationResultReject
}
// NOTE: There are no timestamps to be validated for token requests.
return p2p.ValidationResultAccept
default:
e.logger.Debug("unknown message type", zap.String("type_url", a.TypeUrl))
return p2p.ValidationResultReject
}
}
func (e *DataClockConsensusEngine) validateInfoMessage(peerID peer.ID, message *pb.Message) p2p.ValidationResult {
msg := &protobufs.Message{}
if err := proto.Unmarshal(message.Data, msg); err != nil {
e.logger.Debug("could not unmarshal message", zap.Error(err))
return p2p.ValidationResultReject
}
a := &anypb.Any{}
if err := proto.Unmarshal(msg.Payload, a); err != nil {
e.logger.Debug("could not unmarshal payload", zap.Error(err))
return p2p.ValidationResultReject
}
switch a.TypeUrl {
case protobufs.DataPeerListAnnounceType:
announce := &protobufs.DataPeerListAnnounce{}
if err := proto.Unmarshal(a.Value, announce); err != nil {
e.logger.Debug("could not unmarshal network info request", zap.Error(err))
return p2p.ValidationResultReject
}
if announce.Peer == nil {
e.logger.Debug("peer list announce is missing peer")
return p2p.ValidationResultIgnore
}
if ts := time.UnixMilli(announce.Peer.Timestamp); time.Since(ts) > 10*time.Minute {
e.logger.Debug("peer list announce is too old", zap.Time("timestamp", ts))
return p2p.ValidationResultIgnore
}
return p2p.ValidationResultAccept
default:
e.logger.Debug("unknown message type", zap.String("type_url", a.TypeUrl))
return p2p.ValidationResultReject
}
}

View File

@ -44,10 +44,14 @@ func (pubsub) Publish(address []byte, data []byte) error
func (pubsub) PublishToBitmask(bitmask []byte, data []byte) error { return nil }
func (pubsub) Subscribe(bitmask []byte, handler func(message *pb.Message) error) error { return nil }
func (pubsub) Unsubscribe(bitmask []byte, raw bool) {}
func (pubsub) GetPeerID() []byte { return nil }
func (pubsub) GetPeerstoreCount() int { return 0 }
func (pubsub) GetNetworkPeersCount() int { return 0 }
func (pubsub) GetRandomPeer(bitmask []byte) ([]byte, error) { return nil, nil }
func (pubsub) RegisterValidator(bitmask []byte, validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult) error {
return nil
}
func (pubsub) UnregisterValidator(bitmask []byte) error { return nil }
func (pubsub) GetPeerID() []byte { return nil }
func (pubsub) GetPeerstoreCount() int { return 0 }
func (pubsub) GetNetworkPeersCount() int { return 0 }
func (pubsub) GetRandomPeer(bitmask []byte) ([]byte, error) { return nil, nil }
func (pubsub) GetMultiaddrOfPeerStream(ctx context.Context, peerId []byte) <-chan multiaddr.Multiaddr {
return nil
}
@ -75,6 +79,7 @@ func (pubsub) SetPeerScore(peerId []byte, score int64) {}
func (pubsub) AddPeerScore(peerId []byte, scoreDelta int64) {}
func (pubsub) Reconnect(peerId []byte) error { return nil }
func (pubsub) DiscoverPeers() error { return nil }
func (pubsub) GetNetwork() uint { return 0 }
type outputs struct {
difficulty uint32

View File

@ -623,6 +623,31 @@ func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
bm.Close()
}
func (b *BlossomSub) RegisterValidator(
bitmask []byte, validator func(peerID peer.ID, message *pb.Message) ValidationResult,
) error {
validatorEx := func(
ctx context.Context, peerID peer.ID, message *blossomsub.Message,
) blossomsub.ValidationResult {
switch v := validator(peerID, message.Message); v {
case ValidationResultAccept:
return blossomsub.ValidationAccept
case ValidationResultReject:
return blossomsub.ValidationReject
case ValidationResultIgnore:
return blossomsub.ValidationIgnore
default:
panic("unreachable")
}
}
var _ blossomsub.ValidatorEx = validatorEx
return b.ps.RegisterBitmaskValidator(bitmask, validatorEx)
}
func (b *BlossomSub) UnregisterValidator(bitmask []byte) error {
return b.ps.UnregisterBitmaskValidator(bitmask)
}
func (b *BlossomSub) GetPeerID() []byte {
return []byte(b.peerID)
}

View File

@ -3,17 +3,31 @@ package p2p
import (
"context"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"google.golang.org/grpc"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
type ValidationResult int
const (
ValidationResultAccept ValidationResult = iota
ValidationResultReject
ValidationResultIgnore
)
type PubSub interface {
PublishToBitmask(bitmask []byte, data []byte) error
Publish(address []byte, data []byte) error
Subscribe(bitmask []byte, handler func(message *pb.Message) error) error
Unsubscribe(bitmask []byte, raw bool)
RegisterValidator(
bitmask []byte,
validator func(peerID peer.ID, message *pb.Message) ValidationResult,
) error
UnregisterValidator(bitmask []byte) error
GetPeerID() []byte
GetBitmaskPeers() map[string][]string
GetPeerstoreCount() int