Add additional P2P configuration (#352)

* Add peer discovery configuration

* Add peer monitor configuration

* Add message validation configuration

---------

Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com>
This commit is contained in:
petricadaipegsp 2024-11-17 00:54:34 +01:00 committed by GitHub
parent 80c7ec2889
commit 49566c2280
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 205 additions and 116 deletions

View File

@ -11,9 +11,12 @@ import (
)
const (
defaultValidateQueueSize = 32
defaultValidateConcurrency = 1024
defaultValidateThrottle = 8192
// DefaultValidateQueueSize is the default size of the validation queue.
DefaultValidateQueueSize = 16384
// DefaultValidateConcurrency is the default number of concurrent instances of a validator per bitmask.
DefaultValidateConcurrency = 1024
// DefaultValidateThrottle is the default number of concurrent instances of all validators.
DefaultValidateThrottle = 8192
)
// ValidationError is an error that may be signalled from message publication when the message
@ -121,8 +124,8 @@ type rmValReq struct {
func newValidation() *validation {
return &validation{
bitmaskVals: make(map[string]*validatorImpl),
validateQ: make(chan *validateReq, defaultValidateQueueSize),
validateThrottle: make(chan struct{}, defaultValidateThrottle),
validateQ: make(chan *validateReq, DefaultValidateQueueSize),
validateThrottle: make(chan struct{}, DefaultValidateThrottle),
validateWorkers: runtime.NumCPU(),
}
}
@ -196,7 +199,7 @@ func (v *validation) makeValidator(req *addValReq) (*validatorImpl, error) {
bitmask: req.bitmask,
validate: validator,
validateTimeout: 0,
validateThrottle: make(chan struct{}, defaultValidateConcurrency),
validateThrottle: make(chan struct{}, DefaultValidateConcurrency),
validateInline: req.inline,
}

View File

@ -35,10 +35,18 @@ type P2PConfig struct {
ListenMultiaddr string `yaml:"listenMultiaddr"`
PeerPrivKey string `yaml:"peerPrivKey"`
TraceLogFile string `yaml:"traceLogFile"`
MinPeers int `yaml:"minPeers"`
Network uint8 `yaml:"network"`
LowWatermarkConnections uint `yaml:"lowWatermarkConnections"`
HighWatermarkConnections uint `yaml:"highWatermarkConnections"`
LowWatermarkConnections int `yaml:"lowWatermarkConnections"`
HighWatermarkConnections int `yaml:"highWatermarkConnections"`
DirectPeers []string `yaml:"directPeers"`
GrpcServerRateLimit int `yaml:"grpcServerRateLimit"`
MinBootstrapPeers int `yaml:"minBootstrapPeers"`
BootstrapParallelism int `yaml:"bootstrapParallelism"`
DiscoveryParallelism int `yaml:"discoveryParallelism"`
DiscoveryPeerLookupLimit int `yaml:"discoveryPeerLookupLimit"`
PingTimeout time.Duration `yaml:"pingTimeout"`
PingPeriod time.Duration `yaml:"pingPeriod"`
PingAttempts int `yaml:"pingAttempts"`
ValidateQueueSize int `yaml:"validateQueueSize"`
ValidateWorkers int `yaml:"validateWorkers"`
}

View File

@ -315,9 +315,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
go e.runInfoMessageHandler()
e.logger.Info("subscribing to pubsub messages")
e.pubSub.RegisterValidator(e.frameFilter, e.validateFrameMessage)
e.pubSub.RegisterValidator(e.txFilter, e.validateTxMessage)
e.pubSub.RegisterValidator(e.infoFilter, e.validateInfoMessage)
e.pubSub.RegisterValidator(e.frameFilter, e.validateFrameMessage, true)
e.pubSub.RegisterValidator(e.txFilter, e.validateTxMessage, true)
e.pubSub.RegisterValidator(e.infoFilter, e.validateInfoMessage, true)
e.pubSub.Subscribe(e.frameFilter, e.handleFrameMessage)
e.pubSub.Subscribe(e.txFilter, e.handleTxMessage)
e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage)

View File

@ -44,7 +44,7 @@ func (pubsub) Publish(address []byte, data []byte) error
func (pubsub) PublishToBitmask(bitmask []byte, data []byte) error { return nil }
func (pubsub) Subscribe(bitmask []byte, handler func(message *pb.Message) error) error { return nil }
func (pubsub) Unsubscribe(bitmask []byte, raw bool) {}
func (pubsub) RegisterValidator(bitmask []byte, validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult) error {
func (pubsub) RegisterValidator(bitmask []byte, validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult, sync bool) error {
return nil
}
func (pubsub) UnregisterValidator(bitmask []byte) error { return nil }

View File

@ -12,6 +12,7 @@ import (
"math/bits"
"net"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
@ -32,7 +33,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/mr-tron/base58"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
@ -51,12 +51,19 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
// The default watermarks are the defaults used by libp2p.DefaultConnectionManager.
// We explicitly set them here in order to force internal consistency between the
// connection manager and the resource manager.
const (
minPeersPerBitmask = 4
minBootstrapPeers = 3
discoveryParallelism = 10
discoveryPeerLimit = 1000
bootstrapParallelism = 10
defaultLowWatermarkConnections = 160
defaultHighWatermarkConnections = 192
defaultMinBootstrapPeers = 3
defaultBootstrapParallelism = 10
defaultDiscoveryParallelism = 50
defaultDiscoveryPeerLookupLimit = 1000
defaultPingTimeout = 5 * time.Second
defaultPingPeriod = 30 * time.Second
defaultPingAttempts = 3
)
type BlossomSub struct {
@ -177,6 +184,7 @@ func NewBlossomSub(
logger *zap.Logger,
) *BlossomSub {
ctx := context.Background()
p2pConfig = withDefaults(p2pConfig)
opts := []libp2pconfig.Option{
libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr),
@ -260,11 +268,11 @@ func NewBlossomSub(
}
allowedPeers = append(allowedPeers, directPeers...)
if p2pConfig.LowWatermarkConnections != 0 &&
p2pConfig.HighWatermarkConnections != 0 {
if p2pConfig.LowWatermarkConnections != -1 &&
p2pConfig.HighWatermarkConnections != -1 {
cm, err := connmgr.NewConnManager(
int(p2pConfig.LowWatermarkConnections),
int(p2pConfig.HighWatermarkConnections),
p2pConfig.LowWatermarkConnections,
p2pConfig.HighWatermarkConnections,
connmgr.WithEmergencyTrim(true),
)
if err != nil {
@ -321,17 +329,15 @@ func NewBlossomSub(
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
verifyReachability(p2pConfig)
minBootstraps := minBootstrapPeers
if p2pConfig.Network != 0 {
minBootstraps = 1
}
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
bootstrap := internal.NewPeerConnector(
ctx,
logger.Named("bootstrap"),
h,
idService,
minBootstraps,
bootstrapParallelism,
minBootstrapPeers,
p2pConfig.BootstrapParallelism,
internal.NewStaticPeerSource(bootstrappers, true),
)
if err := bootstrap.Connect(ctx); err != nil {
@ -341,7 +347,7 @@ func NewBlossomSub(
ctx,
internal.NewNotEnoughPeersCondition(
h,
minBootstraps,
minBootstrapPeers,
internal.PeerAddrInfosToPeerIDMap(bootstrappers),
),
bootstrap,
@ -352,12 +358,12 @@ func NewBlossomSub(
logger.Named("discovery"),
h,
idService,
minPeersPerBitmask,
discoveryParallelism,
p2pConfig.D,
p2pConfig.DiscoveryParallelism,
internal.NewRoutingDiscoveryPeerSource(
routingDiscovery,
getNetworkNamespace(p2pConfig.Network),
discoveryPeerLimit,
p2pConfig.DiscoveryPeerLookupLimit,
),
)
if err := discovery.Connect(ctx); err != nil {
@ -366,7 +372,14 @@ func NewBlossomSub(
discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery)
bs.discovery = discovery
go monitorPeers(ctx, logger, h)
internal.MonitorPeers(
ctx,
logger.Named("peer-monitor"),
h,
p2pConfig.PingTimeout,
p2pConfig.PingPeriod,
p2pConfig.PingAttempts,
)
// TODO: turn into an option flag for console logging, this is too noisy for
// default logging behavior
@ -420,6 +433,10 @@ func NewBlossomSub(
OpportunisticGraftThreshold: 2,
},
))
blossomOpts = append(blossomOpts,
blossomsub.WithValidateQueueSize(p2pConfig.ValidateQueueSize),
blossomsub.WithValidateWorkers(p2pConfig.ValidateWorkers),
)
blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer())
blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(internal.NewStaticPeerFilter(
// We filter out the bootstrap peers explicitly from BlossomSub
@ -432,7 +449,7 @@ func NewBlossomSub(
true,
)))
params := mergeDefaults(p2pConfig)
params := toBlossomSubParams(p2pConfig)
rt := blossomsub.NewBlossomSubRouter(h, params, bs.network)
blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer())
pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, h, rt, blossomOpts...)
@ -446,22 +463,11 @@ func NewBlossomSub(
bs.h = h
bs.signKey = privKey
allowedPeerIDs := make(map[peer.ID]struct{}, len(allowedPeers))
for _, peerInfo := range allowedPeers {
allowedPeerIDs[peerInfo.ID] = struct{}{}
}
go func() {
for {
time.Sleep(30 * time.Second)
for _, b := range bs.bitmaskMap {
bitmaskPeers := b.ListPeers()
peerCount := len(bitmaskPeers)
for _, p := range bitmaskPeers {
if _, ok := allowedPeerIDs[p]; ok {
peerCount--
}
}
if peerCount < minPeersPerBitmask {
for _, mask := range pubsub.GetBitmasks() {
if !rt.EnoughPeers([]byte(mask), 0) {
_ = discovery.Connect(ctx)
break
}
@ -474,7 +480,7 @@ func NewBlossomSub(
// adjusted from Lotus' reference implementation, addressing
// https://github.com/libp2p/go-libp2p/issues/1640
func resourceManager(highWatermark uint, allowed []peer.AddrInfo) (
func resourceManager(highWatermark int, allowed []peer.AddrInfo) (
network.ResourceManager,
error,
) {
@ -639,7 +645,7 @@ func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) {
}
func (b *BlossomSub) RegisterValidator(
bitmask []byte, validator func(peerID peer.ID, message *pb.Message) ValidationResult,
bitmask []byte, validator func(peerID peer.ID, message *pb.Message) ValidationResult, sync bool,
) error {
validatorEx := func(
ctx context.Context, peerID peer.ID, message *blossomsub.Message,
@ -656,7 +662,7 @@ func (b *BlossomSub) RegisterValidator(
}
}
var _ blossomsub.ValidatorEx = validatorEx
return b.ps.RegisterBitmaskValidator(bitmask, validatorEx)
return b.ps.RegisterBitmaskValidator(bitmask, validatorEx, blossomsub.WithValidatorInline(sync))
}
func (b *BlossomSub) UnregisterValidator(bitmask []byte) error {
@ -685,70 +691,6 @@ func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) {
return []byte(peers[sel.Int64()]), nil
}
// monitorPeers periodically looks up the peers connected to the host and pings them
// up to 3 times to ensure they are still reachable. If the peer is not reachable after
// 3 attempts, the connections to the peer are closed.
func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) {
const timeout, period, attempts = 20 * time.Second, time.Minute, 3
// Do not allow the pings to dial new connections. Adding new peers is a separate
// process and should not be done during the ping process.
ctx = network.WithNoDial(ctx, "monitor peers")
pingOnce := func(ctx context.Context, logger *zap.Logger, id peer.ID) bool {
pingCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
select {
case <-ctx.Done():
case <-pingCtx.Done():
logger.Debug("ping timeout")
return false
case res := <-ping.Ping(pingCtx, h, id):
if res.Error != nil {
logger.Debug("ping error", zap.Error(res.Error))
return false
}
logger.Debug("ping success", zap.Duration("rtt", res.RTT))
}
return true
}
ping := func(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) {
defer wg.Done()
var conns []network.Conn
for i := 0; i < attempts; i++ {
// There are no fine grained semantics in libp2p that would allow us to 'ping via
// a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection.
// As such, we save a snapshot of the connections that were potentially in use before
// the ping, and close them if the ping fails. If new connections occur between the snapshot
// and the ping, they will not be closed, and will be pinged in the next iteration.
conns = h.Network().ConnsToPeer(id)
if pingOnce(ctx, logger, id) {
return
}
}
for _, conn := range conns {
_ = conn.Close()
}
}
for {
select {
case <-ctx.Done():
return
case <-time.After(period):
// This is once again a snapshot of the connected peers at the time of the ping. If new peers
// are added between the snapshot and the ping, they will be pinged in the next iteration.
peers := h.Network().Peers()
logger.Debug("pinging connected peers", zap.Int("peer_count", len(peers)))
wg := &sync.WaitGroup{}
for _, id := range peers {
logger := logger.With(zap.String("peer_id", id.String()))
wg.Add(1)
go ping(ctx, logger, wg, id)
}
wg.Wait()
logger.Debug("pinged connected peers")
}
}
}
func initDHT(
ctx context.Context,
logger *zap.Logger,
@ -1059,7 +1001,9 @@ func verifyReachability(cfg *config.P2PConfig) bool {
return true
}
func mergeDefaults(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams {
func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig {
cfg := *p2pConfig
p2pConfig = &cfg
if p2pConfig.D == 0 {
p2pConfig.D = blossomsub.BlossomSubD
}
@ -1141,7 +1085,43 @@ func mergeDefaults(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams {
if p2pConfig.IWantFollowupTime == 0 {
p2pConfig.IWantFollowupTime = blossomsub.BlossomSubIWantFollowupTime
}
if p2pConfig.LowWatermarkConnections == 0 {
p2pConfig.LowWatermarkConnections = defaultLowWatermarkConnections
}
if p2pConfig.HighWatermarkConnections == 0 {
p2pConfig.HighWatermarkConnections = defaultHighWatermarkConnections
}
if p2pConfig.MinBootstrapPeers == 0 {
p2pConfig.MinBootstrapPeers = defaultMinBootstrapPeers
}
if p2pConfig.BootstrapParallelism == 0 {
p2pConfig.BootstrapParallelism = defaultBootstrapParallelism
}
if p2pConfig.DiscoveryParallelism == 0 {
p2pConfig.DiscoveryParallelism = defaultDiscoveryParallelism
}
if p2pConfig.DiscoveryPeerLookupLimit == 0 {
p2pConfig.DiscoveryPeerLookupLimit = defaultDiscoveryPeerLookupLimit
}
if p2pConfig.PingTimeout == 0 {
p2pConfig.PingTimeout = defaultPingTimeout
}
if p2pConfig.PingPeriod == 0 {
p2pConfig.PingPeriod = defaultPingPeriod
}
if p2pConfig.PingAttempts == 0 {
p2pConfig.PingAttempts = defaultPingAttempts
}
if p2pConfig.ValidateQueueSize == 0 {
p2pConfig.ValidateQueueSize = blossomsub.DefaultValidateQueueSize
}
if p2pConfig.ValidateWorkers == 0 {
p2pConfig.ValidateWorkers = runtime.NumCPU()
}
return p2pConfig
}
func toBlossomSubParams(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams {
return blossomsub.BlossomSubParams{
D: p2pConfig.D,
Dlo: p2pConfig.DLo,

View File

@ -0,0 +1,97 @@
package internal
import (
"context"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"go.uber.org/zap"
)
type peerMonitor struct {
h host.Host
timeout time.Duration
period time.Duration
attempts int
}
func (pm *peerMonitor) pingOnce(ctx context.Context, logger *zap.Logger, id peer.ID) bool {
pingCtx, cancel := context.WithTimeout(ctx, pm.timeout)
defer cancel()
select {
case <-ctx.Done():
case <-pingCtx.Done():
logger.Debug("ping timeout")
return false
case res := <-ping.Ping(pingCtx, pm.h, id):
if res.Error != nil {
logger.Debug("ping error", zap.Error(res.Error))
return false
}
logger.Debug("ping success", zap.Duration("rtt", res.RTT))
}
return true
}
func (pm *peerMonitor) ping(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) {
defer wg.Done()
var conns []network.Conn
for i := 0; i < pm.attempts; i++ {
// There are no fine grained semantics in libp2p that would allow us to 'ping via
// a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection.
// As such, we save a snapshot of the connections that were potentially in use before
// the ping, and close them if the ping fails. If new connections occur between the snapshot
// and the ping, they will not be closed, and will be pinged in the next iteration.
conns = pm.h.Network().ConnsToPeer(id)
if pm.pingOnce(ctx, logger, id) {
return
}
}
for _, conn := range conns {
_ = conn.Close()
}
}
func (pm *peerMonitor) run(ctx context.Context, logger *zap.Logger) {
// Do not allow the pings to dial new connections. Adding new peers is a separate
// process and should not be done during the ping process.
ctx = network.WithNoDial(ctx, "monitor peers")
for {
select {
case <-ctx.Done():
return
case <-time.After(pm.period):
// This is once again a snapshot of the connected peers at the time of the ping. If new peers
// are added between the snapshot and the ping, they will be pinged in the next iteration.
peers := pm.h.Network().Peers()
logger.Debug("pinging connected peers", zap.Int("peer_count", len(peers)))
wg := &sync.WaitGroup{}
for _, id := range peers {
logger := logger.With(zap.String("peer_id", id.String()))
wg.Add(1)
go pm.ping(ctx, logger, wg, id)
}
wg.Wait()
logger.Debug("pinged connected peers")
}
}
}
// MonitorPeers periodically looks up the peers connected to the host and pings them
// repeatedly to ensure they are still reachable. If the peer is not reachable after
// the attempts, the connections to the peer are closed.
func MonitorPeers(
ctx context.Context, logger *zap.Logger, h host.Host, timeout, period time.Duration, attempts int,
) {
pm := &peerMonitor{
h: h,
timeout: timeout,
period: period,
attempts: attempts,
}
go pm.run(ctx, logger)
}

View File

@ -26,6 +26,7 @@ type PubSub interface {
RegisterValidator(
bitmask []byte,
validator func(peerID peer.ID, message *pb.Message) ValidationResult,
sync bool,
) error
UnregisterValidator(bitmask []byte) error
GetPeerID() []byte