package internal import ( "context" "sync" "sync/atomic" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/protocol/identify" "go.uber.org/zap" ) // PeerConnector is a connector to peers. type PeerConnector interface { // Connect connects to peers. Connect(context.Context) error } // TODO(2.1.1+): metrics only, no debug logging unless configurable logging, too // noisy type peerConnector struct { ctx context.Context logger *zap.Logger host host.Host idService identify.IDService connectCh chan (chan<- struct{}) minPeers int parallelism int source PeerSource warned bool } // Connect implements PeerConnector. func (pc *peerConnector) Connect(ctx context.Context) error { if pc.minPeers == 0 && !pc.warned { pc.warned = true pc.logger.Warn( "skipped due to minpeer = 0, make sure this was intentional", ) return nil } done := make(chan struct{}) select { case <-ctx.Done(): return ctx.Err() case <-pc.ctx.Done(): return pc.ctx.Err() case pc.connectCh <- done: } select { case <-ctx.Done(): return ctx.Err() case <-pc.ctx.Done(): return pc.ctx.Err() case <-done: return nil } } func (pc *peerConnector) connectToPeer( ctx context.Context, logger *zap.Logger, p peer.AddrInfo, wg *sync.WaitGroup, duplicate, success, failure *uint32, inflight <-chan struct{}, ) { defer func() { select { case <-ctx.Done(): case <-inflight: } }() defer wg.Done() if p.ID == pc.host.ID() || pc.host.Network().Connectedness(p.ID) == network.Connected || pc.host.Network().Connectedness(p.ID) == network.Limited { atomic.AddUint32(duplicate, 1) return } pc.host.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.AddressTTL) conn, err := pc.host.Network().DialPeer(ctx, p.ID) if err != nil { atomic.AddUint32(failure, 1) return } select { case <-ctx.Done(): return case <-time.After(identify.DefaultTimeout / 2): atomic.AddUint32(failure, 1) _ = conn.Close() case <-pc.idService.IdentifyWait(conn): atomic.AddUint32(success, 1) } } func (pc *peerConnector) connectToPeers( ctx context.Context, ch <-chan peer.AddrInfo, duplicate, success, failure *uint32, ) { var inflight chan struct{} = make(chan struct{}, pc.parallelism) var wg sync.WaitGroup defer wg.Wait() for p := range ch { logger := pc.logger.With(zap.String("peer_id", p.ID.String())) if atomic.LoadUint32(success) >= uint32(pc.minPeers) { return } select { case <-ctx.Done(): return case inflight <- struct{}{}: } wg.Add(1) go pc.connectToPeer( ctx, logger, p, &wg, duplicate, success, failure, inflight, ) } } func (pc *peerConnector) connect() { logger := pc.logger var success, failure, duplicate uint32 ctx, cancel := context.WithCancel(pc.ctx) defer cancel() peerChan, err := pc.source.Peers(ctx) if err != nil { logger.Error("could not find peers", zap.Error(err)) return } pc.connectToPeers( ctx, peerChan, &duplicate, &success, &failure, ) } func (pc *peerConnector) run() { for { select { case <-pc.ctx.Done(): return case done := <-pc.connectCh: pc.connect() close(done) } } } // NewPeerConnector creates a new peer connector. func NewPeerConnector( ctx context.Context, logger *zap.Logger, host host.Host, idService identify.IDService, minPeers, parallelism int, source PeerSource, ) PeerConnector { pc := &peerConnector{ ctx: ctx, logger: logger, host: host, idService: idService, connectCh: make(chan (chan<- struct{})), minPeers: minPeers, parallelism: parallelism, source: source, } go pc.run() return pc } type chainedPeerConnector struct { ctx context.Context connectors []PeerConnector connectCh chan (chan<- struct{}) } // Connect implements PeerConnector. func (cpc *chainedPeerConnector) Connect(ctx context.Context) error { done := make(chan struct{}) select { case <-ctx.Done(): return ctx.Err() case <-cpc.ctx.Done(): return cpc.ctx.Err() case cpc.connectCh <- done: } select { case <-ctx.Done(): return ctx.Err() case <-cpc.ctx.Done(): return cpc.ctx.Err() case <-done: return nil } } func (cpc *chainedPeerConnector) run() { for { select { case <-cpc.ctx.Done(): return case done := <-cpc.connectCh: for _, pc := range cpc.connectors { _ = pc.Connect(cpc.ctx) } close(done) } } } // NewChainedPeerConnector creates a new chained peer connector. func NewChainedPeerConnector(ctx context.Context, connectors ...PeerConnector) PeerConnector { cpc := &chainedPeerConnector{ ctx: ctx, connectors: connectors, // buildutils:allow-slice-alias slice is static connectCh: make(chan (chan<- struct{})), } go cpc.run() return cpc } type conditionalPeerConnector struct { ctx context.Context condition PeerConnectorCondition connector PeerConnector connectCh chan (chan<- struct{}) } func (cpc *conditionalPeerConnector) run() { for { select { case <-cpc.ctx.Done(): return case done := <-cpc.connectCh: if cpc.condition.Should() { _ = cpc.connector.Connect(cpc.ctx) } close(done) } } } // Connect implements PeerConnector. func (cpc *conditionalPeerConnector) Connect(ctx context.Context) error { done := make(chan struct{}) select { case <-ctx.Done(): return ctx.Err() case <-cpc.ctx.Done(): return cpc.ctx.Err() case cpc.connectCh <- done: } select { case <-ctx.Done(): return ctx.Err() case <-cpc.ctx.Done(): return cpc.ctx.Err() case <-done: return nil } } // NewConditionalPeerConnector creates a new conditional peer connector. func NewConditionalPeerConnector( ctx context.Context, condition PeerConnectorCondition, connector PeerConnector, ) PeerConnector { cpc := &conditionalPeerConnector{ ctx: ctx, condition: condition, connector: connector, connectCh: make(chan (chan<- struct{})), } go cpc.run() return cpc }