diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index c32dc93..32d3f31 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -31,6 +31,7 @@ import ( rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/mr-tron/base58" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" @@ -299,6 +300,8 @@ func NewBlossomSub( discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery, true) + go monitorPeers(ctx, logger, h) + // TODO: turn into an option flag for console logging, this is too noisy for // default logging behavior var tracer *blossomsub.JSONTracer @@ -580,6 +583,79 @@ func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) { return []byte(peers[sel.Int64()]), nil } +// monitorPeers periodically looks up the peers connected to the host and pings them +// up to 3 times to ensure they are still reachable. If the peer is not reachable after +// 3 attempts, the connections to the peer are closed. +func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) { + const timeout, period, attempts = time.Minute, time.Minute, 3 + // Do not allow the pings to dial new connections. Adding new peers is a separate + // process and should not be done during the ping process. + ctx = network.WithNoDial(ctx, "monitor peers") + pingOnce := func(ctx context.Context, logger *zap.Logger, id peer.ID) bool { + pingCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + select { + case <-ctx.Done(): + case <-pingCtx.Done(): + logger.Debug("ping timeout") + return false + case res := <-ping.Ping(pingCtx, h, id): + if res.Error != nil { + logger.Debug("ping error", zap.Error(res.Error)) + return false + } + logger.Debug("ping success", zap.Duration("rtt", res.RTT)) + } + return true + } + ping := func(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) { + defer wg.Done() + var conns []network.Conn + for i := 0; i < attempts; i++ { + // There are no fine grained semantics in libp2p that would allow us to 'ping via + // a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection. + // As such, we save a snapshot of the connections that were potentially in use before + // the ping, and close them if the ping fails. If new connections occur between the snapshot + // and the ping, they will not be closed, and will be pinged in the next iteration. + conns = h.Network().ConnsToPeer(id) + if pingOnce(ctx, logger, id) { + return + } + } + for _, conn := range conns { + _ = conn.Close() + } + } + for { + select { + case <-ctx.Done(): + return + case <-time.After(period): + // This is once again a snapshot of the peers at the time of the ping. If new peers + // are added between the snapshot and the ping, they will be pinged in the next iteration. + peers := h.Network().Peers() + connected := make([]peer.ID, 0, len(peers)) + for _, p := range peers { + // The connection status may change both before and after the check. Still, it is better + // to focus on pinging only connections which are potentially connected at the moment of the check. + switch h.Network().Connectedness(p) { + case network.Connected, network.Limited: + connected = append(connected, p) + } + } + logger.Debug("pinging connected peers", zap.Int("peer_count", len(connected))) + wg := &sync.WaitGroup{} + for _, id := range connected { + logger := logger.With(zap.String("peer_id", id.String())) + wg.Add(1) + go ping(ctx, logger, wg, id) + } + wg.Wait() + logger.Debug("pinged connected peers") + } + } +} + func initDHT( ctx context.Context, p2pConfig *config.P2PConfig,