mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
Merge branch 'develop' into v2.0.4-p1
This commit is contained in:
commit
5e135d36de
@ -111,7 +111,11 @@ var rootCmd = &cobra.Command{
|
||||
}
|
||||
|
||||
if publicRPC {
|
||||
fmt.Println("gRPC not enabled, using light node")
|
||||
fmt.Println("Public RPC enabled, using light node")
|
||||
LightNode = true
|
||||
}
|
||||
if !LightNode && NodeConfig.ListenGRPCMultiaddr == "" {
|
||||
fmt.Println("No ListenGRPCMultiaddr found in config, using light node")
|
||||
LightNode = true
|
||||
}
|
||||
},
|
||||
|
||||
1343
dashboards/grafana/BlossomSub.json
Normal file
1343
dashboards/grafana/BlossomSub.json
Normal file
File diff suppressed because it is too large
Load Diff
@ -15,6 +15,9 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
mstream "github.com/multiformats/go-multistream"
|
||||
)
|
||||
|
||||
var log = logging.Logger("ping")
|
||||
@ -114,14 +117,7 @@ func pingError(err error) chan Result {
|
||||
return ch
|
||||
}
|
||||
|
||||
// Ping pings the remote peer until the context is canceled, returning a stream
|
||||
// of RTTs or errors.
|
||||
func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result {
|
||||
s, err := h.NewStream(network.WithAllowLimitedConn(ctx, "ping"), p, ID)
|
||||
if err != nil {
|
||||
return pingError(err)
|
||||
}
|
||||
|
||||
func pingStream(ctx context.Context, ps peerstore.Peerstore, s network.Stream) <-chan Result {
|
||||
if err := s.Scope().SetService(ServiceName); err != nil {
|
||||
log.Debugf("error attaching stream to ping service: %s", err)
|
||||
s.Reset()
|
||||
@ -153,7 +149,7 @@ func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result {
|
||||
|
||||
// No error, record the RTT.
|
||||
if res.Error == nil {
|
||||
h.Peerstore().RecordLatency(p, res.RTT)
|
||||
ps.RecordLatency(s.Conn().RemotePeer(), res.RTT)
|
||||
}
|
||||
|
||||
select {
|
||||
@ -175,6 +171,54 @@ func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result {
|
||||
return out
|
||||
}
|
||||
|
||||
// PingConn pings the peer via the connection until the context is canceled, returning a stream
|
||||
// of RTTs or errors.
|
||||
func PingConn(ctx context.Context, ps peerstore.Peerstore, conn network.Conn) <-chan Result {
|
||||
s, err := conn.NewStream(ctx)
|
||||
if err != nil {
|
||||
return pingError(err)
|
||||
}
|
||||
var selected protocol.ID
|
||||
var errCh chan error = make(chan error, 1)
|
||||
go func() {
|
||||
var err error
|
||||
selected, err = mstream.SelectOneOf([]protocol.ID{ID}, s)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case errCh <- err:
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = s.Reset()
|
||||
return pingError(ctx.Err())
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
_ = s.Reset()
|
||||
return pingError(err)
|
||||
}
|
||||
}
|
||||
if err := s.SetProtocol(selected); err != nil {
|
||||
_ = s.Reset()
|
||||
return pingError(err)
|
||||
}
|
||||
if err := ps.AddProtocols(conn.RemotePeer(), selected); err != nil {
|
||||
_ = s.Reset()
|
||||
return pingError(err)
|
||||
}
|
||||
return pingStream(ctx, ps, s)
|
||||
}
|
||||
|
||||
// Ping pings the remote peer until the context is canceled, returning a stream
|
||||
// of RTTs or errors.
|
||||
func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result {
|
||||
s, err := h.NewStream(network.WithAllowLimitedConn(ctx, "ping"), p, ID)
|
||||
if err != nil {
|
||||
return pingError(err)
|
||||
}
|
||||
return pingStream(ctx, h.Peerstore(), s)
|
||||
}
|
||||
|
||||
func ping(s network.Stream, randReader io.Reader) (time.Duration, error) {
|
||||
if err := s.Scope().ReserveMemory(2*PingSize, network.ReservationPriorityAlways); err != nil {
|
||||
log.Debugf("error reserving memory for ping stream: %s", err)
|
||||
|
||||
@ -34,4 +34,6 @@ type EngineConfig struct {
|
||||
// Values used only for testing – do not override these in production, your
|
||||
// node will get kicked out
|
||||
Difficulty uint32 `yaml:"difficulty"`
|
||||
// Whether to allow GOMAXPROCS values above the number of physical cores.
|
||||
AllowExcessiveGOMAXPROCS bool `yaml:"allowExcessiveGOMAXPROCS"`
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@ import (
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
|
||||
)
|
||||
|
||||
const defaultSyncTimeout = 2 * time.Second
|
||||
const defaultSyncTimeout = 4 * time.Second
|
||||
|
||||
func (e *DataClockConsensusEngine) collect(
|
||||
enqueuedFrame *protobufs.ClockFrame,
|
||||
|
||||
@ -383,7 +383,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
|
||||
}
|
||||
if currentHead.FrameNumber == lastHead.FrameNumber {
|
||||
currentBackoff = min(maxBackoff, currentBackoff+1)
|
||||
_ = e.pubSub.DiscoverPeers()
|
||||
_ = e.pubSub.DiscoverPeers(e.ctx)
|
||||
} else {
|
||||
currentBackoff = max(0, currentBackoff-1)
|
||||
lastHead = currentHead
|
||||
|
||||
@ -93,20 +93,19 @@ func (e *DataClockConsensusEngine) runSync() {
|
||||
case <-e.ctx.Done():
|
||||
return
|
||||
case enqueuedFrame := <-e.requestSyncCh:
|
||||
if enqueuedFrame == nil {
|
||||
var err error
|
||||
enqueuedFrame, err = e.dataTimeReel.Head()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
if err := e.pubSub.Bootstrap(e.ctx); err != nil {
|
||||
e.logger.Error("could not bootstrap", zap.Error(err))
|
||||
}
|
||||
if _, err := e.collect(enqueuedFrame); err != nil {
|
||||
e.logger.Error("could not collect", zap.Error(err))
|
||||
}
|
||||
case <-time.After(20 * time.Second):
|
||||
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
|
||||
continue
|
||||
}
|
||||
head, err := e.dataTimeReel.Head()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if _, err := e.collect(head); err != nil {
|
||||
e.logger.Error("could not collect", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -339,6 +339,13 @@ func (e *DataClockConsensusEngine) handleDataPeerListAnnounce(
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
return nil
|
||||
case e.requestSyncCh <- nil:
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -78,7 +78,8 @@ func (pubsub) GetPeerScore(peerId []byte) int64 { return 0 }
|
||||
func (pubsub) SetPeerScore(peerId []byte, score int64) {}
|
||||
func (pubsub) AddPeerScore(peerId []byte, scoreDelta int64) {}
|
||||
func (pubsub) Reconnect(peerId []byte) error { return nil }
|
||||
func (pubsub) DiscoverPeers() error { return nil }
|
||||
func (pubsub) Bootstrap(context.Context) error { return nil }
|
||||
func (pubsub) DiscoverPeers(context.Context) error { return nil }
|
||||
|
||||
type outputs struct {
|
||||
difficulty uint32
|
||||
|
||||
@ -394,6 +394,12 @@ func main() {
|
||||
nodeConfig.Engine.DataWorkerMemoryLimit = 1792 * 1024 * 1024 // 1.75GiB
|
||||
}
|
||||
if len(nodeConfig.Engine.DataWorkerMultiaddrs) == 0 {
|
||||
maxProcs, numCPU := runtime.GOMAXPROCS(0), runtime.NumCPU()
|
||||
if maxProcs > numCPU && !nodeConfig.Engine.AllowExcessiveGOMAXPROCS {
|
||||
fmt.Println("GOMAXPROCS is set higher than the number of available CPUs.")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
nodeConfig.Engine.DataWorkerCount = qruntime.WorkerCount(
|
||||
nodeConfig.Engine.DataWorkerCount, true,
|
||||
)
|
||||
|
||||
@ -5,15 +5,10 @@ import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/big"
|
||||
"math/bits"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -21,12 +16,14 @@ import (
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
libp2pconfig "github.com/libp2p/go-libp2p/config"
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
|
||||
"github.com/libp2p/go-libp2p/p2p/discovery/util"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
|
||||
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
|
||||
@ -35,7 +32,6 @@ import (
|
||||
"github.com/mr-tron/base58"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
madns "github.com/multiformats/go-multiaddr-dns"
|
||||
mn "github.com/multiformats/go-multiaddr/net"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
@ -77,6 +73,7 @@ type BlossomSub struct {
|
||||
peerScore map[string]int64
|
||||
peerScoreMx sync.Mutex
|
||||
network uint8
|
||||
bootstrap internal.PeerConnector
|
||||
discovery internal.PeerConnector
|
||||
}
|
||||
|
||||
@ -188,6 +185,8 @@ func NewBlossomSub(
|
||||
|
||||
opts := []libp2pconfig.Option{
|
||||
libp2p.ListenAddrStrings(p2pConfig.ListenMultiaddr),
|
||||
libp2p.EnableNATService(),
|
||||
libp2p.NATPortMap(),
|
||||
}
|
||||
|
||||
isBootstrapPeer := false
|
||||
@ -315,6 +314,35 @@ func NewBlossomSub(
|
||||
|
||||
logger.Info("established peer id", zap.String("peer_id", h.ID().String()))
|
||||
|
||||
reachabilitySub, err := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}, eventbus.Name("blossomsub"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
defer reachabilitySub.Close()
|
||||
logger := logger.Named("reachability")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case evt, ok := <-reachabilitySub.Out():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch state := evt.(event.EvtLocalReachabilityChanged).Reachability; state {
|
||||
case network.ReachabilityPublic:
|
||||
logger.Info("node is externally reachable")
|
||||
case network.ReachabilityPrivate:
|
||||
logger.Info("node is not externally reachable")
|
||||
case network.ReachabilityUnknown:
|
||||
logger.Info("node reachability is unknown")
|
||||
default:
|
||||
logger.Debug("unknown reachability state", zap.Any("state", state))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
kademliaDHT := initDHT(
|
||||
ctx,
|
||||
logger,
|
||||
@ -328,8 +356,6 @@ func NewBlossomSub(
|
||||
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
|
||||
util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network))
|
||||
|
||||
verifyReachability(p2pConfig)
|
||||
|
||||
minBootstrapPeers := min(len(bootstrappers), p2pConfig.MinBootstrapPeers)
|
||||
bootstrap := internal.NewPeerConnector(
|
||||
ctx,
|
||||
@ -352,6 +378,7 @@ func NewBlossomSub(
|
||||
),
|
||||
bootstrap,
|
||||
)
|
||||
bs.bootstrap = bootstrap
|
||||
|
||||
discovery := internal.NewPeerConnector(
|
||||
ctx,
|
||||
@ -731,8 +758,12 @@ func (b *BlossomSub) Reconnect(peerId []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlossomSub) DiscoverPeers() error {
|
||||
return b.discovery.Connect(b.ctx)
|
||||
func (b *BlossomSub) Bootstrap(ctx context.Context) error {
|
||||
return b.bootstrap.Connect(ctx)
|
||||
}
|
||||
|
||||
func (b *BlossomSub) DiscoverPeers(ctx context.Context) error {
|
||||
return b.discovery.Connect(ctx)
|
||||
}
|
||||
|
||||
func (b *BlossomSub) GetPeerScore(peerId []byte) int64 {
|
||||
@ -838,53 +869,77 @@ func (b *BlossomSub) StartDirectChannelListener(
|
||||
return errors.Wrap(server.Serve(bind), "start direct channel listener")
|
||||
}
|
||||
|
||||
func (b *BlossomSub) GetDirectChannel(key []byte, purpose string) (
|
||||
dialCtx *grpc.ClientConn,
|
||||
err error,
|
||||
type extraCloseConn struct {
|
||||
net.Conn
|
||||
extraClose func()
|
||||
}
|
||||
|
||||
func (c *extraCloseConn) Close() error {
|
||||
err := c.Conn.Close()
|
||||
c.extraClose()
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *BlossomSub) GetDirectChannel(peerID []byte, purpose string) (
|
||||
cc *grpc.ClientConn, err error,
|
||||
) {
|
||||
// Kind of a weird hack, but gostream can induce panics if the peer drops at
|
||||
// the time of connection, this avoids the problem.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
dialCtx = nil
|
||||
cc = nil
|
||||
err = errors.New("connection failed")
|
||||
}
|
||||
}()
|
||||
|
||||
id := peer.ID(peerID)
|
||||
|
||||
// Open question: should we prefix this so a node can run both in mainnet and
|
||||
// testnet? Feels like a bad idea and would be preferable to discourage.
|
||||
dialCtx, err = qgrpc.DialContext(
|
||||
cc, err = qgrpc.DialContext(
|
||||
b.ctx,
|
||||
base58.Encode(key),
|
||||
grpc.WithDialer(
|
||||
func(peerIdStr string, timeout time.Duration) (net.Conn, error) {
|
||||
subCtx, subCtxCancel := context.WithTimeout(b.ctx, timeout)
|
||||
defer subCtxCancel()
|
||||
|
||||
id, err := peer.Decode(peerIdStr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "dial context")
|
||||
"passthrough:///",
|
||||
grpc.WithContextDialer(
|
||||
func(ctx context.Context, _ string) (net.Conn, error) {
|
||||
// If we are not already connected to the peer, we will manually dial it
|
||||
// before opening the direct channel. We will close the peer connection
|
||||
// when the direct channel is closed.
|
||||
alreadyConnected := false
|
||||
switch connectedness := b.h.Network().Connectedness(id); connectedness {
|
||||
case network.Connected, network.Limited:
|
||||
alreadyConnected = true
|
||||
default:
|
||||
if err := b.h.Connect(ctx, peer.AddrInfo{ID: id}); err != nil {
|
||||
return nil, errors.Wrap(err, "connect")
|
||||
}
|
||||
}
|
||||
|
||||
c, err := gostream.Dial(
|
||||
subCtx,
|
||||
network.WithNoDial(ctx, "direct-channel"),
|
||||
b.h,
|
||||
peer.ID(key),
|
||||
id,
|
||||
protocol.ID(
|
||||
"/p2p/direct-channel/"+peer.ID(id).String()+purpose,
|
||||
"/p2p/direct-channel/"+id.String()+purpose,
|
||||
),
|
||||
)
|
||||
|
||||
return c, errors.Wrap(err, "dial context")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "dial direct channel")
|
||||
}
|
||||
if alreadyConnected {
|
||||
return c, nil
|
||||
}
|
||||
return &extraCloseConn{
|
||||
Conn: c,
|
||||
extraClose: func() { _ = b.h.Network().ClosePeer(id) },
|
||||
}, nil
|
||||
},
|
||||
),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get direct channel")
|
||||
return nil, errors.Wrap(err, "dial context")
|
||||
}
|
||||
|
||||
return dialCtx, nil
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
func (b *BlossomSub) GetPublicKey() []byte {
|
||||
@ -897,101 +952,6 @@ func (b *BlossomSub) SignMessage(msg []byte) ([]byte, error) {
|
||||
return sig, errors.Wrap(err, "sign message")
|
||||
}
|
||||
|
||||
type ReachabilityRequest struct {
|
||||
Port uint16 `json:"port"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
type ReachabilityResponse struct {
|
||||
Reachable bool `json:"reachable"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func verifyReachability(cfg *config.P2PConfig) bool {
|
||||
a, err := ma.NewMultiaddr(cfg.ListenMultiaddr)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
transport, addr, err := mn.DialArgs(a)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
addrparts := strings.Split(addr, ":")
|
||||
if len(addrparts) != 2 {
|
||||
return false
|
||||
}
|
||||
|
||||
port, err := strconv.ParseUint(addrparts[1], 10, 0)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if !strings.Contains(transport, "tcp") {
|
||||
transport = "quic"
|
||||
} else {
|
||||
transport = "tcp"
|
||||
}
|
||||
|
||||
req := &ReachabilityRequest{
|
||||
Port: uint16(port),
|
||||
Type: transport,
|
||||
}
|
||||
|
||||
b, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
resp, err := http.Post(
|
||||
"https://rpc.quilibrium.com/connectivity-check",
|
||||
"application/json",
|
||||
bytes.NewBuffer(b),
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Println("Reachability check not currently available, skipping test.")
|
||||
return true
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
fmt.Println("Reachability check not currently available, skipping test.")
|
||||
return true
|
||||
}
|
||||
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
fmt.Println("Reachability check not currently available, skipping test.")
|
||||
return true
|
||||
}
|
||||
|
||||
r := &ReachabilityResponse{}
|
||||
err = json.Unmarshal(bodyBytes, r)
|
||||
if err != nil {
|
||||
fmt.Println("Reachability check not currently available, skipping test.")
|
||||
return true
|
||||
}
|
||||
|
||||
if r.Error != "" {
|
||||
fmt.Println("Reachability check failed: " + r.Error)
|
||||
if transport == "quic" {
|
||||
fmt.Println("WARNING!")
|
||||
fmt.Println("WARNING!")
|
||||
fmt.Println("WARNING!")
|
||||
fmt.Println("You failed reachability with QUIC enabled. Consider switching to TCP")
|
||||
fmt.Println("WARNING!")
|
||||
fmt.Println("WARNING!")
|
||||
fmt.Println("WARNING!")
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
fmt.Println("Node passed reachability check.")
|
||||
return true
|
||||
}
|
||||
|
||||
func withDefaults(p2pConfig *config.P2PConfig) *config.P2PConfig {
|
||||
cfg := *p2pConfig
|
||||
p2pConfig = &cfg
|
||||
|
||||
@ -7,7 +7,6 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -19,7 +18,7 @@ type peerMonitor struct {
|
||||
attempts int
|
||||
}
|
||||
|
||||
func (pm *peerMonitor) pingOnce(ctx context.Context, logger *zap.Logger, id peer.ID) bool {
|
||||
func (pm *peerMonitor) pingOnce(ctx context.Context, logger *zap.Logger, conn network.Conn) bool {
|
||||
pingCtx, cancel := context.WithTimeout(ctx, pm.timeout)
|
||||
defer cancel()
|
||||
select {
|
||||
@ -27,7 +26,7 @@ func (pm *peerMonitor) pingOnce(ctx context.Context, logger *zap.Logger, id peer
|
||||
case <-pingCtx.Done():
|
||||
logger.Debug("ping timeout")
|
||||
return false
|
||||
case res := <-ping.Ping(pingCtx, pm.h, id):
|
||||
case res := <-ping.PingConn(pingCtx, pm.h.Peerstore(), conn):
|
||||
if res.Error != nil {
|
||||
logger.Debug("ping error", zap.Error(res.Error))
|
||||
return false
|
||||
@ -37,43 +36,35 @@ func (pm *peerMonitor) pingOnce(ctx context.Context, logger *zap.Logger, id peer
|
||||
return true
|
||||
}
|
||||
|
||||
func (pm *peerMonitor) ping(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, id peer.ID) {
|
||||
func (pm *peerMonitor) ping(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup, conn network.Conn) {
|
||||
defer wg.Done()
|
||||
var conns []network.Conn
|
||||
for i := 0; i < pm.attempts; i++ {
|
||||
// There are no fine grained semantics in libp2p that would allow us to 'ping via
|
||||
// a specific connection'. We can only ping a peer, which will attempt to open a stream via a connection.
|
||||
// As such, we save a snapshot of the connections that were potentially in use before
|
||||
// the ping, and close them if the ping fails. If new connections occur between the snapshot
|
||||
// and the ping, they will not be closed, and will be pinged in the next iteration.
|
||||
conns = pm.h.Network().ConnsToPeer(id)
|
||||
if pm.pingOnce(ctx, logger, id) {
|
||||
if pm.pingOnce(ctx, logger, conn) {
|
||||
return
|
||||
}
|
||||
if conn.IsClosed() {
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, conn := range conns {
|
||||
_ = conn.Close()
|
||||
}
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func (pm *peerMonitor) run(ctx context.Context, logger *zap.Logger) {
|
||||
// Do not allow the pings to dial new connections. Adding new peers is a separate
|
||||
// process and should not be done during the ping process.
|
||||
ctx = network.WithNoDial(ctx, "monitor peers")
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(pm.period):
|
||||
// This is once again a snapshot of the connected peers at the time of the ping. If new peers
|
||||
// are added between the snapshot and the ping, they will be pinged in the next iteration.
|
||||
peers := pm.h.Network().Peers()
|
||||
logger.Debug("pinging connected peers", zap.Int("peer_count", len(peers)))
|
||||
wg := &sync.WaitGroup{}
|
||||
for _, id := range peers {
|
||||
logger := logger.With(zap.String("peer_id", id.String()))
|
||||
wg.Add(1)
|
||||
go pm.ping(ctx, logger, wg, id)
|
||||
for _, conn := range pm.h.Network().ConnsToPeer(id) {
|
||||
logger := logger.With(zap.String("connection_id", conn.ID()))
|
||||
wg.Add(1)
|
||||
go pm.ping(ctx, logger, wg, conn)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
logger.Debug("pinged connected peers")
|
||||
|
||||
@ -52,6 +52,7 @@ type PubSub interface {
|
||||
SetPeerScore(peerId []byte, score int64)
|
||||
AddPeerScore(peerId []byte, scoreDelta int64)
|
||||
Reconnect(peerId []byte) error
|
||||
DiscoverPeers() error
|
||||
Bootstrap(ctx context.Context) error
|
||||
DiscoverPeers(ctx context.Context) error
|
||||
GetNetwork() uint
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user