mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
152 lines
3.2 KiB
Go
152 lines
3.2 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
|
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type peerMonitor struct {
|
|
ps *ping.PingService
|
|
h host.Host
|
|
timeout time.Duration
|
|
period time.Duration
|
|
attempts int
|
|
direct []peer.AddrInfo
|
|
directPeriod time.Duration
|
|
}
|
|
|
|
func (pm *peerMonitor) pingOnce(
|
|
ctx context.Context,
|
|
logger *zap.Logger,
|
|
peer peer.ID,
|
|
) bool {
|
|
pingCtx, cancel := context.WithTimeout(ctx, pm.timeout)
|
|
defer cancel()
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-pingCtx.Done():
|
|
return false
|
|
case res := <-pm.ps.Ping(pingCtx, peer):
|
|
if res.Error != nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (pm *peerMonitor) ping(
|
|
ctx context.Context,
|
|
logger *zap.Logger,
|
|
wg *sync.WaitGroup,
|
|
peer peer.ID,
|
|
) {
|
|
defer wg.Done()
|
|
for i := 0; i < pm.attempts; i++ {
|
|
pm.pingOnce(ctx, logger, peer)
|
|
}
|
|
}
|
|
|
|
func (pm *peerMonitor) run(ctx context.Context, logger *zap.Logger) {
|
|
var (
|
|
pingTicker = time.NewTicker(pm.period)
|
|
directTicker *time.Ticker
|
|
directChannel <-chan time.Time
|
|
)
|
|
defer pingTicker.Stop()
|
|
|
|
if len(pm.direct) > 0 && pm.directPeriod > 0 {
|
|
directTicker = time.NewTicker(pm.directPeriod)
|
|
directChannel = directTicker.C
|
|
defer directTicker.Stop()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-pingTicker.C:
|
|
pm.pingConnectedPeers(ctx, logger)
|
|
case <-directChannel:
|
|
pm.ensureDirectPeers(ctx, logger)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pm *peerMonitor) pingConnectedPeers(
|
|
ctx context.Context,
|
|
logger *zap.Logger,
|
|
) {
|
|
peers := pm.h.Network().Peers()
|
|
wg := &sync.WaitGroup{}
|
|
for _, id := range peers {
|
|
slogger := logger.With(zap.String("peer_id", id.String()))
|
|
wg.Add(1)
|
|
go pm.ping(ctx, slogger, wg, id)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (pm *peerMonitor) ensureDirectPeers(
|
|
ctx context.Context,
|
|
logger *zap.Logger,
|
|
) {
|
|
for _, info := range pm.direct {
|
|
if info.ID == pm.h.ID() {
|
|
continue
|
|
}
|
|
|
|
slogger := logger.With(zap.String("peer_id", info.ID.String()))
|
|
pm.h.Peerstore().AddAddrs(
|
|
info.ID,
|
|
info.Addrs,
|
|
peerstore.PermanentAddrTTL,
|
|
)
|
|
|
|
if pm.h.Network().Connectedness(info.ID) == network.Connected {
|
|
continue
|
|
}
|
|
|
|
connectCtx, cancel := context.WithTimeout(ctx, pm.timeout)
|
|
err := pm.h.Connect(connectCtx, info)
|
|
cancel()
|
|
if err != nil {
|
|
slogger.Debug("failed to connect to direct peer", zap.Error(err))
|
|
continue
|
|
}
|
|
slogger.Info("connected to direct peer")
|
|
}
|
|
}
|
|
|
|
// MonitorPeers periodically looks up the peers connected to the host and pings
|
|
// them repeatedly to ensure they are still reachable. If the peer is not
|
|
// reachable after the attempts, the connections to the peer are closed.
|
|
func MonitorPeers(
|
|
ctx context.Context,
|
|
logger *zap.Logger,
|
|
h host.Host,
|
|
timeout, period time.Duration,
|
|
attempts int,
|
|
directPeers []peer.AddrInfo,
|
|
) {
|
|
ps := ping.NewPingService(h)
|
|
pm := &peerMonitor{
|
|
ps: ps,
|
|
h: h,
|
|
timeout: timeout,
|
|
period: period,
|
|
attempts: attempts,
|
|
direct: directPeers, // buildutils:allow-slice-alias slice is static
|
|
directPeriod: 10 * time.Second,
|
|
}
|
|
|
|
go pm.run(ctx, logger)
|
|
}
|