mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-03-06 00:37:42 +08:00
Monitor peers via pings (#317)
This commit is contained in:
parent
fe964fc535
commit
dca4649281
@ -31,6 +31,7 @@ import (
|
||||
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/mr-tron/base58"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
madns "github.com/multiformats/go-multiaddr-dns"
|
||||
@ -299,6 +300,8 @@ func NewBlossomSub(
|
||||
|
||||
discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery, true)
|
||||
|
||||
go monitorPeers(ctx, logger, h)
|
||||
|
||||
// TODO: turn into an option flag for console logging, this is too noisy for
|
||||
// default logging behavior
|
||||
var tracer *blossomsub.JSONTracer
|
||||
@ -580,6 +583,79 @@ func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) {
|
||||
return []byte(peers[sel.Int64()]), nil
|
||||
}
|
||||
|
||||
// monitorPeers periodically looks up the peers connected to the host and pings them
|
||||
// up to 3 times to ensure they are still reachable. If the peer is not reachable after
|
||||
// 3 attempts, the connections to the peer are closed.
|
||||
func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) {
|
||||
const timeout, period, attempts = time.Minute, time.Minute, 3
|
||||
// Do not allow the pings to dial new connections. Adding new peers is a separate
|
||||
// process and should not be done during the ping process.
|
||||
ctx = network.WithNoDial(ctx, "monitor peers")
|
||||
pingOnce := func(ctx context.Context, logger *zap.Logger, id peer.ID) bool {
|
||||
pingCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-pingCtx.Done():
|
||||
logger.Debug("ping timeout")
|
||||
return false
|
||||
case res := <-ping.Ping(pingCtx, h, id):
|
||||
if res.Error != nil {
|
||||
logger.Debug("ping error", zap.Error(res.Error))
|
||||
return false
|
||||
}
|
||||
logger.Debug("ping success", zap.Duration("rtt", res.RTT))
|
||||
}
|
||||
return true
|
||||
}
|
||||
ping := func(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) {
|
||||
defer wg.Done()
|
||||
var conns []network.Conn
|
||||
for i := 0; i < attempts; i++ {
|
||||
// There are no fine grained semantics in libp2p that would allow us to 'ping via
|
||||
// a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection.
|
||||
// As such, we save a snapshot of the connections that were potentially in use before
|
||||
// the ping, and close them if the ping fails. If new connections occur between the snapshot
|
||||
// and the ping, they will not be closed, and will be pinged in the next iteration.
|
||||
conns = h.Network().ConnsToPeer(id)
|
||||
if pingOnce(ctx, logger, id) {
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, conn := range conns {
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(period):
|
||||
// This is once again a snapshot of the peers at the time of the ping. If new peers
|
||||
// are added between the snapshot and the ping, they will be pinged in the next iteration.
|
||||
peers := h.Network().Peers()
|
||||
connected := make([]peer.ID, 0, len(peers))
|
||||
for _, p := range peers {
|
||||
// The connection status may change both before and after the check. Still, it is better
|
||||
// to focus on pinging only connections which are potentially connected at the moment of the check.
|
||||
switch h.Network().Connectedness(p) {
|
||||
case network.Connected, network.Limited:
|
||||
connected = append(connected, p)
|
||||
}
|
||||
}
|
||||
logger.Debug("pinging connected peers", zap.Int("peer_count", len(connected)))
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, id := range connected {
|
||||
logger := logger.With(zap.String("peer_id", id.String()))
|
||||
wg.Add(1)
|
||||
go ping(ctx, logger, wg, id)
|
||||
}
|
||||
wg.Wait()
|
||||
logger.Debug("pinged connected peers")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func initDHT(
|
||||
ctx context.Context,
|
||||
p2pConfig *config.P2PConfig,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user