From 49566c2280ade4c4ec2b7e9e9715fc62055eba90 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 17 Nov 2024 00:54:34 +0100 Subject: [PATCH] Add additional P2P configuration (#352) * Add peer discovery configuration * Add peer monitor configuration * Add message validation configuration --------- Co-authored-by: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com> --- go-libp2p-blossomsub/validation.go | 15 +- node/config/p2p.go | 14 +- .../data/data_clock_consensus_engine.go | 6 +- node/consensus/data/token_handle_mint_test.go | 2 +- node/p2p/blossomsub.go | 186 ++++++++---------- node/p2p/internal/peer_monitor.go | 97 +++++++++ node/p2p/pubsub.go | 1 + 7 files changed, 205 insertions(+), 116 deletions(-) create mode 100644 node/p2p/internal/peer_monitor.go diff --git a/go-libp2p-blossomsub/validation.go b/go-libp2p-blossomsub/validation.go index bfcd49e..4602df1 100644 --- a/go-libp2p-blossomsub/validation.go +++ b/go-libp2p-blossomsub/validation.go @@ -11,9 +11,12 @@ import ( ) const ( - defaultValidateQueueSize = 32 - defaultValidateConcurrency = 1024 - defaultValidateThrottle = 8192 + // DefaultValidateQueueSize is the default size of the validation queue. + DefaultValidateQueueSize = 16384 + // DefaultValidateConcurrency is the default number of concurrent instances of a validator per bitmask. + DefaultValidateConcurrency = 1024 + // DefaultValidateThrottle is the default number of concurrent instances of all validators. + DefaultValidateThrottle = 8192 ) // ValidationError is an error that may be signalled from message publication when the message @@ -121,8 +124,8 @@ type rmValReq struct { func newValidation() *validation { return &validation{ bitmaskVals: make(map[string]*validatorImpl), - validateQ: make(chan *validateReq, defaultValidateQueueSize), - validateThrottle: make(chan struct{}, defaultValidateThrottle), + validateQ: make(chan *validateReq, DefaultValidateQueueSize), + validateThrottle: make(chan struct{}, DefaultValidateThrottle), validateWorkers: runtime.NumCPU(), } } @@ -196,7 +199,7 @@ func (v *validation) makeValidator(req *addValReq) (*validatorImpl, error) { bitmask: req.bitmask, validate: validator, validateTimeout: 0, - validateThrottle: make(chan struct{}, defaultValidateConcurrency), + validateThrottle: make(chan struct{}, DefaultValidateConcurrency), validateInline: req.inline, } diff --git a/node/config/p2p.go b/node/config/p2p.go index 95384ec..efddceb 100644 --- a/node/config/p2p.go +++ b/node/config/p2p.go @@ -35,10 +35,18 @@ type P2PConfig struct { ListenMultiaddr string `yaml:"listenMultiaddr"` PeerPrivKey string `yaml:"peerPrivKey"` TraceLogFile string `yaml:"traceLogFile"` - MinPeers int `yaml:"minPeers"` Network uint8 `yaml:"network"` - LowWatermarkConnections uint `yaml:"lowWatermarkConnections"` - HighWatermarkConnections uint `yaml:"highWatermarkConnections"` + LowWatermarkConnections int `yaml:"lowWatermarkConnections"` + HighWatermarkConnections int `yaml:"highWatermarkConnections"` DirectPeers []string `yaml:"directPeers"` GrpcServerRateLimit int `yaml:"grpcServerRateLimit"` + MinBootstrapPeers int `yaml:"minBootstrapPeers"` + BootstrapParallelism int `yaml:"bootstrapParallelism"` + DiscoveryParallelism int `yaml:"discoveryParallelism"` + DiscoveryPeerLookupLimit int `yaml:"discoveryPeerLookupLimit"` + PingTimeout time.Duration `yaml:"pingTimeout"` + PingPeriod time.Duration `yaml:"pingPeriod"` + PingAttempts int `yaml:"pingAttempts"` + ValidateQueueSize int `yaml:"validateQueueSize"` + ValidateWorkers int `yaml:"validateWorkers"` } diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 8e2b360..31c7874 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -315,9 +315,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error { go e.runInfoMessageHandler() e.logger.Info("subscribing to pubsub messages") - e.pubSub.RegisterValidator(e.frameFilter, e.validateFrameMessage) - e.pubSub.RegisterValidator(e.txFilter, e.validateTxMessage) - e.pubSub.RegisterValidator(e.infoFilter, e.validateInfoMessage) + e.pubSub.RegisterValidator(e.frameFilter, e.validateFrameMessage, true) + e.pubSub.RegisterValidator(e.txFilter, e.validateTxMessage, true) + e.pubSub.RegisterValidator(e.infoFilter, e.validateInfoMessage, true) e.pubSub.Subscribe(e.frameFilter, e.handleFrameMessage) e.pubSub.Subscribe(e.txFilter, e.handleTxMessage) e.pubSub.Subscribe(e.infoFilter, e.handleInfoMessage) diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index 8a12ee4..26a073e 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -44,7 +44,7 @@ func (pubsub) Publish(address []byte, data []byte) error func (pubsub) PublishToBitmask(bitmask []byte, data []byte) error { return nil } func (pubsub) Subscribe(bitmask []byte, handler func(message *pb.Message) error) error { return nil } func (pubsub) Unsubscribe(bitmask []byte, raw bool) {} -func (pubsub) RegisterValidator(bitmask []byte, validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult) error { +func (pubsub) RegisterValidator(bitmask []byte, validator func(peerID peer.ID, message *pb.Message) p2p.ValidationResult, sync bool) error { return nil } func (pubsub) UnregisterValidator(bitmask []byte) error { return nil } diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 078d1dc..aa637fd 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -12,6 +12,7 @@ import ( "math/bits" "net" "net/http" + "runtime" "strconv" "strings" "sync" @@ -32,7 +33,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/net/gostream" "github.com/libp2p/go-libp2p/p2p/net/swarm" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/mr-tron/base58" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" @@ -51,12 +51,19 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) +// The default watermarks are the defaults used by libp2p.DefaultConnectionManager. +// We explicitly set them here in order to force internal consistency between the +// connection manager and the resource manager. const ( - minPeersPerBitmask = 4 - minBootstrapPeers = 3 - discoveryParallelism = 10 - discoveryPeerLimit = 1000 - bootstrapParallelism = 10 + defaultLowWatermarkConnections = 160 + defaultHighWatermarkConnections = 192 + defaultMinBootstrapPeers = 3 + defaultBootstrapParallelism = 10 + defaultDiscoveryParallelism = 50 + defaultDiscoveryPeerLookupLimit = 1000 + defaultPingTimeout = 5 * time.Second + defaultPingPeriod = 30 * time.Second + defaultPingAttempts = 3 ) type BlossomSub struct { @@ -177,6 +184,7 @@ func NewBlossomSub( logger *zap.Logger, ) *BlossomSub { ctx := context.Background() + p2pConfig = withDefaults(p2pConfig) opts := []libp2pconfig.Option{ libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr), @@ -260,11 +268,11 @@ func NewBlossomSub( } allowedPeers = append(allowedPeers, directPeers...) - if p2pConfig.LowWatermarkConnections != 0 && - p2pConfig.HighWatermarkConnections != 0 { + if p2pConfig.LowWatermarkConnections != -1 && + p2pConfig.HighWatermarkConnections != -1 { cm, err := connmgr.NewConnManager( - int(p2pConfig.LowWatermarkConnections), - int(p2pConfig.HighWatermarkConnections), + p2pConfig.LowWatermarkConnections, + p2pConfig.HighWatermarkConnections, connmgr.WithEmergencyTrim(true), ) if err != nil { @@ -321,17 +329,15 @@ func NewBlossomSub( util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network)) verifyReachability(p2pConfig) - minBootstraps := minBootstrapPeers - if p2pConfig.Network != 0 { - minBootstraps = 1 - } + + minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers) bootstrap := internal.NewPeerConnector( ctx, logger.Named("bootstrap"), h, idService, - minBootstraps, - bootstrapParallelism, + minBootstrapPeers, + p2pConfig.BootstrapParallelism, internal.NewStaticPeerSource(bootstrappers, true), ) if err := bootstrap.Connect(ctx); err != nil { @@ -341,7 +347,7 @@ func NewBlossomSub( ctx, internal.NewNotEnoughPeersCondition( h, - minBootstraps, + minBootstrapPeers, internal.PeerAddrInfosToPeerIDMap(bootstrappers), ), bootstrap, @@ -352,12 +358,12 @@ func NewBlossomSub( logger.Named("discovery"), h, idService, - minPeersPerBitmask, - discoveryParallelism, + p2pConfig.D, + p2pConfig.DiscoveryParallelism, internal.NewRoutingDiscoveryPeerSource( routingDiscovery, getNetworkNamespace(p2pConfig.Network), - discoveryPeerLimit, + p2pConfig.DiscoveryPeerLookupLimit, ), ) if err := discovery.Connect(ctx); err != nil { @@ -366,7 +372,14 @@ func NewBlossomSub( discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery) bs.discovery = discovery - go monitorPeers(ctx, logger, h) + internal.MonitorPeers( + ctx, + logger.Named("peer-monitor"), + h, + p2pConfig.PingTimeout, + p2pConfig.PingPeriod, + p2pConfig.PingAttempts, + ) // TODO: turn into an option flag for console logging, this is too noisy for // default logging behavior @@ -420,6 +433,10 @@ func NewBlossomSub( OpportunisticGraftThreshold: 2, }, )) + blossomOpts = append(blossomOpts, + blossomsub.WithValidateQueueSize(p2pConfig.ValidateQueueSize), + blossomsub.WithValidateWorkers(p2pConfig.ValidateWorkers), + ) blossomOpts = append(blossomOpts, observability.WithPrometheusRawTracer()) blossomOpts = append(blossomOpts, blossomsub.WithPeerFilter(internal.NewStaticPeerFilter( // We filter out the bootstrap peers explicitly from BlossomSub @@ -432,7 +449,7 @@ func NewBlossomSub( true, ))) - params := mergeDefaults(p2pConfig) + params := toBlossomSubParams(p2pConfig) rt := blossomsub.NewBlossomSubRouter(h, params, bs.network) blossomOpts = append(blossomOpts, rt.WithDefaultTagTracer()) pubsub, err := blossomsub.NewBlossomSubWithRouter(ctx, h, rt, blossomOpts...) @@ -446,22 +463,11 @@ func NewBlossomSub( bs.h = h bs.signKey = privKey - allowedPeerIDs := make(map[peer.ID]struct{}, len(allowedPeers)) - for _, peerInfo := range allowedPeers { - allowedPeerIDs[peerInfo.ID] = struct{}{} - } go func() { for { time.Sleep(30 * time.Second) - for _, b := range bs.bitmaskMap { - bitmaskPeers := b.ListPeers() - peerCount := len(bitmaskPeers) - for _, p := range bitmaskPeers { - if _, ok := allowedPeerIDs[p]; ok { - peerCount-- - } - } - if peerCount < minPeersPerBitmask { + for _, mask := range pubsub.GetBitmasks() { + if !rt.EnoughPeers([]byte(mask), 0) { _ = discovery.Connect(ctx) break } @@ -474,7 +480,7 @@ func NewBlossomSub( // adjusted from Lotus' reference implementation, addressing // https://github.com/libp2p/go-libp2p/issues/1640 -func resourceManager(highWatermark uint, allowed []peer.AddrInfo) ( +func resourceManager(highWatermark int, allowed []peer.AddrInfo) ( network.ResourceManager, error, ) { @@ -639,7 +645,7 @@ func (b *BlossomSub) Unsubscribe(bitmask []byte, raw bool) { } func (b *BlossomSub) RegisterValidator( - bitmask []byte, validator func(peerID peer.ID, message *pb.Message) ValidationResult, + bitmask []byte, validator func(peerID peer.ID, message *pb.Message) ValidationResult, sync bool, ) error { validatorEx := func( ctx context.Context, peerID peer.ID, message *blossomsub.Message, @@ -656,7 +662,7 @@ func (b *BlossomSub) RegisterValidator( } } var _ blossomsub.ValidatorEx = validatorEx - return b.ps.RegisterBitmaskValidator(bitmask, validatorEx) + return b.ps.RegisterBitmaskValidator(bitmask, validatorEx, blossomsub.WithValidatorInline(sync)) } func (b *BlossomSub) UnregisterValidator(bitmask []byte) error { @@ -685,70 +691,6 @@ func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) { return []byte(peers[sel.Int64()]), nil } -// monitorPeers periodically looks up the peers connected to the host and pings them -// up to 3 times to ensure they are still reachable. If the peer is not reachable after -// 3 attempts, the connections to the peer are closed. -func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) { - const timeout, period, attempts = 20 * time.Second, time.Minute, 3 - // Do not allow the pings to dial new connections. Adding new peers is a separate - // process and should not be done during the ping process. - ctx = network.WithNoDial(ctx, "monitor peers") - pingOnce := func(ctx context.Context, logger *zap.Logger, id peer.ID) bool { - pingCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - select { - case <-ctx.Done(): - case <-pingCtx.Done(): - logger.Debug("ping timeout") - return false - case res := <-ping.Ping(pingCtx, h, id): - if res.Error != nil { - logger.Debug("ping error", zap.Error(res.Error)) - return false - } - logger.Debug("ping success", zap.Duration("rtt", res.RTT)) - } - return true - } - ping := func(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) { - defer wg.Done() - var conns []network.Conn - for i := 0; i < attempts; i++ { - // There are no fine grained semantics in libp2p that would allow us to 'ping via - // a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection. - // As such, we save a snapshot of the connections that were potentially in use before - // the ping, and close them if the ping fails. If new connections occur between the snapshot - // and the ping, they will not be closed, and will be pinged in the next iteration. - conns = h.Network().ConnsToPeer(id) - if pingOnce(ctx, logger, id) { - return - } - } - for _, conn := range conns { - _ = conn.Close() - } - } - for { - select { - case <-ctx.Done(): - return - case <-time.After(period): - // This is once again a snapshot of the connected peers at the time of the ping. If new peers - // are added between the snapshot and the ping, they will be pinged in the next iteration. - peers := h.Network().Peers() - logger.Debug("pinging connected peers", zap.Int("peer_count", len(peers))) - wg := &sync.WaitGroup{} - for _, id := range peers { - logger := logger.With(zap.String("peer_id", id.String())) - wg.Add(1) - go ping(ctx, logger, wg, id) - } - wg.Wait() - logger.Debug("pinged connected peers") - } - } -} - func initDHT( ctx context.Context, logger *zap.Logger, @@ -1059,7 +1001,9 @@ func verifyReachability(cfg *config.P2PConfig) bool { return true } -func mergeDefaults(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams { +func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig { + cfg := *p2pConfig + p2pConfig = &cfg if p2pConfig.D == 0 { p2pConfig.D = blossomsub.BlossomSubD } @@ -1141,7 +1085,43 @@ func mergeDefaults(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams { if p2pConfig.IWantFollowupTime == 0 { p2pConfig.IWantFollowupTime = blossomsub.BlossomSubIWantFollowupTime } + if p2pConfig.LowWatermarkConnections == 0 { + p2pConfig.LowWatermarkConnections = defaultLowWatermarkConnections + } + if p2pConfig.HighWatermarkConnections == 0 { + p2pConfig.HighWatermarkConnections = defaultHighWatermarkConnections + } + if p2pConfig.MinBootstrapPeers == 0 { + p2pConfig.MinBootstrapPeers = defaultMinBootstrapPeers + } + if p2pConfig.BootstrapParallelism == 0 { + p2pConfig.BootstrapParallelism = defaultBootstrapParallelism + } + if p2pConfig.DiscoveryParallelism == 0 { + p2pConfig.DiscoveryParallelism = defaultDiscoveryParallelism + } + if p2pConfig.DiscoveryPeerLookupLimit == 0 { + p2pConfig.DiscoveryPeerLookupLimit = defaultDiscoveryPeerLookupLimit + } + if p2pConfig.PingTimeout == 0 { + p2pConfig.PingTimeout = defaultPingTimeout + } + if p2pConfig.PingPeriod == 0 { + p2pConfig.PingPeriod = defaultPingPeriod + } + if p2pConfig.PingAttempts == 0 { + p2pConfig.PingAttempts = defaultPingAttempts + } + if p2pConfig.ValidateQueueSize == 0 { + p2pConfig.ValidateQueueSize = blossomsub.DefaultValidateQueueSize + } + if p2pConfig.ValidateWorkers == 0 { + p2pConfig.ValidateWorkers = runtime.NumCPU() + } + return p2pConfig +} +func toBlossomSubParams(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams { return blossomsub.BlossomSubParams{ D: p2pConfig.D, Dlo: p2pConfig.DLo, diff --git a/node/p2p/internal/peer_monitor.go b/node/p2p/internal/peer_monitor.go new file mode 100644 index 0000000..c40c775 --- /dev/null +++ b/node/p2p/internal/peer_monitor.go @@ -0,0 +1,97 @@ +package internal + +import ( + "context" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "go.uber.org/zap" +) + +type peerMonitor struct { + h host.Host + timeout time.Duration + period time.Duration + attempts int +} + +func (pm *peerMonitor) pingOnce(ctx context.Context, logger *zap.Logger, id peer.ID) bool { + pingCtx, cancel := context.WithTimeout(ctx, pm.timeout) + defer cancel() + select { + case <-ctx.Done(): + case <-pingCtx.Done(): + logger.Debug("ping timeout") + return false + case res := <-ping.Ping(pingCtx, pm.h, id): + if res.Error != nil { + logger.Debug("ping error", zap.Error(res.Error)) + return false + } + logger.Debug("ping success", zap.Duration("rtt", res.RTT)) + } + return true +} + +func (pm *peerMonitor) ping(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) { + defer wg.Done() + var conns []network.Conn + for i := 0; i < pm.attempts; i++ { + // There are no fine grained semantics in libp2p that would allow us to 'ping via + // a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection. + // As such, we save a snapshot of the connections that were potentially in use before + // the ping, and close them if the ping fails. If new connections occur between the snapshot + // and the ping, they will not be closed, and will be pinged in the next iteration. + conns = pm.h.Network().ConnsToPeer(id) + if pm.pingOnce(ctx, logger, id) { + return + } + } + for _, conn := range conns { + _ = conn.Close() + } +} + +func (pm *peerMonitor) run(ctx context.Context, logger *zap.Logger) { + // Do not allow the pings to dial new connections. Adding new peers is a separate + // process and should not be done during the ping process. + ctx = network.WithNoDial(ctx, "monitor peers") + for { + select { + case <-ctx.Done(): + return + case <-time.After(pm.period): + // This is once again a snapshot of the connected peers at the time of the ping. If new peers + // are added between the snapshot and the ping, they will be pinged in the next iteration. + peers := pm.h.Network().Peers() + logger.Debug("pinging connected peers", zap.Int("peer_count", len(peers))) + wg := &sync.WaitGroup{} + for _, id := range peers { + logger := logger.With(zap.String("peer_id", id.String())) + wg.Add(1) + go pm.ping(ctx, logger, wg, id) + } + wg.Wait() + logger.Debug("pinged connected peers") + } + } +} + +// MonitorPeers periodically looks up the peers connected to the host and pings them +// repeatedly to ensure they are still reachable. If the peer is not reachable after +// the attempts, the connections to the peer are closed. +func MonitorPeers( + ctx context.Context, logger *zap.Logger, h host.Host, timeout, period time.Duration, attempts int, +) { + pm := &peerMonitor{ + h: h, + timeout: timeout, + period: period, + attempts: attempts, + } + go pm.run(ctx, logger) +} diff --git a/node/p2p/pubsub.go b/node/p2p/pubsub.go index 939153d..10278cc 100644 --- a/node/p2p/pubsub.go +++ b/node/p2p/pubsub.go @@ -26,6 +26,7 @@ type PubSub interface { RegisterValidator( bitmask []byte, validator func(peerID peer.ID, message *pb.Message) ValidationResult, + sync bool, ) error UnregisterValidator(bitmask []byte) error GetPeerID() []byte