From 3dbe0723bd00e592bdb3432fedc84ef574b19e60 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Mon, 11 Nov 2024 21:10:00 +0100 Subject: [PATCH] Add message validators (#346) --- .../data/data_clock_consensus_engine.go | 10 ++ node/consensus/data/message_validators.go | 101 ++++++++++++++++++ node/consensus/data/token_handle_mint_test.go | 13 ++- node/p2p/blossomsub.go | 25 +++++ node/p2p/pubsub.go | 14 +++ 5 files changed, 159 insertions(+), 4 deletions(-) create mode 100644 node/consensus/data/message_validators.go diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index b4c65b0..8f5cdcc 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -302,6 +302,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error { go e.runInfoMessageHandler() e.logger.Info("subscribing to pubsub messages") + e.pubSub.RegisterValidator(e.frameFilter, e.validateFrameMessage) + e.pubSub.RegisterValidator(e.txFilter, e.validateTxMessage) + e.pubSub.RegisterValidator(e.infoFilter, e.validateInfoMessage) e.pubSub.Subscribe(e.frameFilter, e.handleFrameMessage) e.pubSub.Subscribe(e.txFilter, e.handleTxMessage) e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage) @@ -630,6 +633,13 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { }(name) } + e.pubSub.Unsubscribe(e.frameFilter, false) + e.pubSub.Unsubscribe(e.txFilter, false) + e.pubSub.Unsubscribe(e.infoFilter, false) + e.pubSub.UnregisterValidator(e.frameFilter) + e.pubSub.UnregisterValidator(e.txFilter) + e.pubSub.UnregisterValidator(e.infoFilter) + e.logger.Info("waiting for execution engines to stop") wg.Wait() e.logger.Info("execution engines stopped") diff --git a/node/consensus/data/message_validators.go b/node/consensus/data/message_validators.go new file mode 100644 index 0000000..7b1eac6 --- /dev/null +++ b/node/consensus/data/message_validators.go @@ -0,0 +1,101 @@ +package data + +import ( + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" + "source.quilibrium.com/quilibrium/monorepo/node/p2p" + "source.quilibrium.com/quilibrium/monorepo/node/protobufs" +) + +func (e *DataClockConsensusEngine) validateFrameMessage(peerID peer.ID, message *pb.Message) p2p.ValidationResult { + msg := &protobufs.Message{} + if err := proto.Unmarshal(message.Data, msg); err != nil { + e.logger.Debug("could not unmarshal message", zap.Error(err)) + return p2p.ValidationResultReject + } + a := &anypb.Any{} + if err := proto.Unmarshal(msg.Payload, a); err != nil { + e.logger.Debug("could not unmarshal payload", zap.Error(err)) + return p2p.ValidationResultReject + } + switch a.TypeUrl { + case protobufs.ClockFrameType: + frame := &protobufs.ClockFrame{} + if err := proto.Unmarshal(a.Value, frame); err != nil { + e.logger.Debug("could not unmarshal frame", zap.Error(err)) + return p2p.ValidationResultReject + } + if ts := time.UnixMilli(frame.Timestamp); time.Since(ts) > time.Hour { + e.logger.Debug("frame is too old", zap.Time("timestamp", ts)) + return p2p.ValidationResultIgnore + } + return p2p.ValidationResultAccept + default: + e.logger.Debug("unknown message type", zap.String("type_url", a.TypeUrl)) + return p2p.ValidationResultReject + } +} + +func (e *DataClockConsensusEngine) validateTxMessage(peerID peer.ID, message *pb.Message) p2p.ValidationResult { + msg := &protobufs.Message{} + if err := proto.Unmarshal(message.Data, msg); err != nil { + e.logger.Debug("could not unmarshal message", zap.Error(err)) + return p2p.ValidationResultReject + } + a := &anypb.Any{} + if err := proto.Unmarshal(msg.Payload, a); err != nil { + e.logger.Debug("could not unmarshal payload", zap.Error(err)) + return p2p.ValidationResultReject + } + switch a.TypeUrl { + case protobufs.TokenRequestType: + tx := &protobufs.TokenRequest{} + if err := proto.Unmarshal(a.Value, tx); err != nil { + e.logger.Debug("could not unmarshal token request", zap.Error(err)) + return p2p.ValidationResultReject + } + // NOTE: There are no timestamps to be validated for token requests. + return p2p.ValidationResultAccept + default: + e.logger.Debug("unknown message type", zap.String("type_url", a.TypeUrl)) + return p2p.ValidationResultReject + } +} + +func (e *DataClockConsensusEngine) validateInfoMessage(peerID peer.ID, message *pb.Message) p2p.ValidationResult { + msg := &protobufs.Message{} + if err := proto.Unmarshal(message.Data, msg); err != nil { + e.logger.Debug("could not unmarshal message", zap.Error(err)) + return p2p.ValidationResultReject + } + a := &anypb.Any{} + if err := proto.Unmarshal(msg.Payload, a); err != nil { + e.logger.Debug("could not unmarshal payload", zap.Error(err)) + return p2p.ValidationResultReject + } + switch a.TypeUrl { + case protobufs.DataPeerListAnnounceType: + announce := &protobufs.DataPeerListAnnounce{} + if err := proto.Unmarshal(a.Value, announce); err != nil { + e.logger.Debug("could not unmarshal network info request", zap.Error(err)) + return p2p.ValidationResultReject + } + if announce.Peer == nil { + e.logger.Debug("peer list announce is missing peer") + return p2p.ValidationResultIgnore + } + if ts := time.UnixMilli(announce.Peer.Timestamp); time.Since(ts) > 10*time.Minute { + e.logger.Debug("peer list announce is too old", zap.Time("timestamp", ts)) + return p2p.ValidationResultIgnore + } + return p2p.ValidationResultAccept + default: + e.logger.Debug("unknown message type", zap.String("type_url", a.TypeUrl)) + return p2p.ValidationResultReject + } +} diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index ac2d39f..033221f 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -44,10 +44,14 @@ func (pubsub) Publish(address []byte, data []byte) error func (pubsub) PublishToBitmask(bitmask []byte, data []byte) error { return nil } func (pubsub) Subscribe(bitmask []byte, handler func(message *pb.Message) error) error { return nil } func (pubsub) Unsubscribe(bitmask []byte, raw bool) {} -func (pubsub) GetPeerID() []byte { return nil } -func (pubsub) GetPeerstoreCount() int { return 0 } -func (pubsub) GetNetworkPeersCount() int { return 0 } -func (pubsub) GetRandomPeer(bitmask []byte) ([]byte, error) { return nil, nil } +func (pubsub) RegisterValidator(bitmask []byte, validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult) error { + return nil +} +func (pubsub) UnregisterValidator(bitmask []byte) error { return nil } +func (pubsub) GetPeerID() []byte { return nil } +func (pubsub) GetPeerstoreCount() int { return 0 } +func (pubsub) GetNetworkPeersCount() int { return 0 } +func (pubsub) GetRandomPeer(bitmask []byte) ([]byte, error) { return nil, nil } func (pubsub) GetMultiaddrOfPeerStream(ctx context.Context, peerId []byte) <-chan multiaddr.Multiaddr { return nil } @@ -75,6 +79,7 @@ func (pubsub) SetPeerScore(peerId []byte, score int64) {} func (pubsub) AddPeerScore(peerId []byte, scoreDelta int64) {} func (pubsub) Reconnect(peerId []byte) error { return nil } func (pubsub) DiscoverPeers() error { return nil } +func (pubsub) GetNetwork() uint { return 0 } type outputs struct { difficulty uint32 diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 7a3a708..c4c74f8 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -623,6 +623,31 @@ func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) { bm.Close() } +func (b *BlossomSub) RegisterValidator( + bitmask []byte, validator func(peerID peer.ID, message *pb.Message) ValidationResult, +) error { + validatorEx := func( + ctx context.Context, peerID peer.ID, message *blossomsub.Message, + ) blossomsub.ValidationResult { + switch v := validator(peerID, message.Message); v { + case ValidationResultAccept: + return blossomsub.ValidationAccept + case ValidationResultReject: + return blossomsub.ValidationReject + case ValidationResultIgnore: + return blossomsub.ValidationIgnore + default: + panic("unreachable") + } + } + var _ blossomsub.ValidatorEx = validatorEx + return b.ps.RegisterBitmaskValidator(bitmask, validatorEx) +} + +func (b *BlossomSub) UnregisterValidator(bitmask []byte) error { + return b.ps.UnregisterBitmaskValidator(bitmask) +} + func (b *BlossomSub) GetPeerID() []byte { return []byte(b.peerID) } diff --git a/node/p2p/pubsub.go b/node/p2p/pubsub.go index fe70d34..939153d 100644 --- a/node/p2p/pubsub.go +++ b/node/p2p/pubsub.go @@ -3,17 +3,31 @@ package p2p import ( "context" + "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "google.golang.org/grpc" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) +type ValidationResult int + +const ( + ValidationResultAccept ValidationResult = iota + ValidationResultReject + ValidationResultIgnore +) + type PubSub interface { PublishToBitmask(bitmask []byte, data []byte) error Publish(address []byte, data []byte) error Subscribe(bitmask []byte, handler func(message *pb.Message) error) error Unsubscribe(bitmask []byte, raw bool) + RegisterValidator( + bitmask []byte, + validator func(peerID peer.ID, message *pb.Message) ValidationResult, + ) error + UnregisterValidator(bitmask []byte) error GetPeerID() []byte GetBitmaskPeers() map[string][]string GetPeerstoreCount() int