diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 4431281..d57dddb 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -44,21 +44,29 @@ import ( blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" "source.quilibrium.com/quilibrium/monorepo/node/config" + "source.quilibrium.com/quilibrium/monorepo/node/p2p/internal" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) +const ( + minPeersPerBitmask = 4 + minBootstrapPeers = 3 + discoveryParallelism = 10 + discoveryPeerLimit = 1000 + bootstrapParallelism = 10 +) + type BlossomSub struct { - ps *blossomsub.PubSub - ctx context.Context - logger *zap.Logger - peerID peer.ID - bitmaskMap map[string]*blossomsub.Bitmask - h host.Host - signKey crypto.PrivKey - peerScore map[string]int64 - peerScoreMx sync.Mutex - isBootstrapPeer bool - network uint8 + ps *blossomsub.PubSub + ctx context.Context + logger *zap.Logger + peerID peer.ID + bitmaskMap map[string]*blossomsub.Bitmask + h host.Host + signKey crypto.PrivKey + peerScore map[string]int64 + peerScoreMx sync.Mutex + network uint8 } var _ PubSub = (*BlossomSub)(nil) @@ -128,13 +136,12 @@ func NewBlossomSubStreamer( } bs := &BlossomSub{ - ctx: ctx, - logger: logger, - bitmaskMap: make(map[string]*blossomsub.Bitmask), - signKey: privKey, - peerScore: make(map[string]int64), - isBootstrapPeer: false, - network: p2pConfig.Network, + ctx: ctx, + logger: logger, + bitmaskMap: make(map[string]*blossomsub.Bitmask), + signKey: privKey, + peerScore: make(map[string]int64), + network: p2pConfig.Network, } h, err := libp2p.New(opts...) @@ -144,20 +151,13 @@ func NewBlossomSubStreamer( logger.Info("established peer id", zap.String("peer_id", h.ID().String())) - kademliaDHT := initDHT( + _ = initDHT( ctx, - p2pConfig, logger, h, false, bootstrappers, ) - routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT) - util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network)) - - if err != nil { - panic(err) - } peerID := h.ID() bs.peerID = peerID @@ -286,36 +286,74 @@ func NewBlossomSub( } bs := &BlossomSub{ - ctx: ctx, - logger: logger, - bitmaskMap: make(map[string]*blossomsub.Bitmask), - signKey: privKey, - peerScore: make(map[string]int64), - isBootstrapPeer: isBootstrapPeer, - network: p2pConfig.Network, + ctx: ctx, + logger: logger, + bitmaskMap: make(map[string]*blossomsub.Bitmask), + signKey: privKey, + peerScore: make(map[string]int64), + network: p2pConfig.Network, } h, err := libp2p.New(opts...) if err != nil { panic(errors.Wrap(err, "error constructing p2p")) } + idService := internal.IDServiceFromHost(h) logger.Info("established peer id", zap.String("peer_id", h.ID().String())) kademliaDHT := initDHT( ctx, - p2pConfig, logger, h, isBootstrapPeer, bootstrappers, ) + routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT) util.Advertise(ctx, routingDiscovery, getNetworkNamespace(p2pConfig.Network)) verifyReachability(p2pConfig) - discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery, true) + bootstrap := internal.NewPeerConnector( + ctx, + logger.Named("bootstrap"), + h, + idService, + minBootstrapPeers, + bootstrapParallelism, + internal.NewStaticPeerSource(bootstrappers, true), + ) + if err := bootstrap.Connect(ctx); err != nil { + panic(err) + } + bootstrap = internal.NewConditionalPeerConnector( + ctx, + internal.NewNotEnoughPeersCondition( + h, + minBootstrapPeers, + internal.PeerAddrInfosToPeerIDMap(bootstrappers), + ), + bootstrap, + ) + + discovery := internal.NewPeerConnector( + ctx, + logger.Named("discovery"), + h, + idService, + minPeersPerBitmask, + discoveryParallelism, + internal.NewRoutingDiscoveryPeerSource( + routingDiscovery, + getNetworkNamespace(p2pConfig.Network), + discoveryPeerLimit, + ), + ) + if err := discovery.Connect(ctx); err != nil { + panic(err) + } + discovery = internal.NewChainedPeerConnector(ctx, bootstrap, discovery) go monitorPeers(ctx, logger, h) @@ -399,8 +437,8 @@ func NewBlossomSub( peerCount-- } } - if peerCount < 4 { - discoverPeers(p2pConfig, bs.ctx, logger, bs.h, routingDiscovery, false) + if peerCount < minPeersPerBitmask { + _ = discovery.Connect(ctx) break } } @@ -602,7 +640,7 @@ func (b *BlossomSub) GetRandomPeer(bitmask []byte) ([]byte, error) { // up to 3 times to ensure they are still reachable. If the peer is not reachable after // 3 attempts, the connections to the peer are closed. func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) { - const timeout, period, attempts = time.Minute, time.Minute, 3 + const timeout, period, attempts = time.Minute, 20 * time.Second, 3 // Do not allow the pings to dial new connections. Adding new peers is a separate // process and should not be done during the ping process. ctx = network.WithNoDial(ctx, "monitor peers") @@ -646,21 +684,12 @@ func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) { case <-ctx.Done(): return case <-time.After(period): - // This is once again a snapshot of the peers at the time of the ping. If new peers + // This is once again a snapshot of the connected peers at the time of the ping. If new peers // are added between the snapshot and the ping, they will be pinged in the next iteration. peers := h.Network().Peers() - connected := make([]peer.ID, 0, len(peers)) - for _, p := range peers { - // The connection status may change both before and after the check. Still, it is better - // to focus on pinging only connections which are potentially connected at the moment of the check. - switch h.Network().Connectedness(p) { - case network.Connected, network.Limited: - connected = append(connected, p) - } - } - logger.Debug("pinging connected peers", zap.Int("peer_count", len(connected))) + logger.Debug("pinging connected peers", zap.Int("peer_count", len(peers))) wg := &sync.WaitGroup{} - for _, id := range connected { + for _, id := range peers { logger := logger.With(zap.String("peer_id", id.String())) wg.Add(1) go ping(ctx, logger, wg, id) @@ -673,86 +702,30 @@ func monitorPeers(ctx context.Context, logger *zap.Logger, h host.Host) { func initDHT( ctx context.Context, - p2pConfig *config.P2PConfig, logger *zap.Logger, h host.Host, isBootstrapPeer bool, bootstrappers []peer.AddrInfo, ) *dht.IpfsDHT { logger.Info("establishing dht") - var kademliaDHT *dht.IpfsDHT - var err error + var mode dht.ModeOpt if isBootstrapPeer { - kademliaDHT, err = dht.New( - ctx, - h, - dht.Mode(dht.ModeServer), - dht.BootstrapPeers(bootstrappers...), - ) + mode = dht.ModeServer } else { - kademliaDHT, err = dht.New( - ctx, - h, - dht.Mode(dht.ModeClient), - dht.BootstrapPeers(bootstrappers...), - ) + mode = dht.ModeClient } + kademliaDHT, err := dht.New( + ctx, + h, + dht.Mode(mode), + dht.BootstrapPeers(bootstrappers...), + ) if err != nil { panic(err) } - if err = kademliaDHT.Bootstrap(ctx); err != nil { + if err := kademliaDHT.Bootstrap(ctx); err != nil { panic(err) } - - reconnect := func() { - wg := &sync.WaitGroup{} - defer wg.Wait() - for _, peerinfo := range bootstrappers { - peerinfo := peerinfo - wg.Add(1) - go func() { - defer wg.Done() - if peerinfo.ID == h.ID() || - h.Network().Connectedness(peerinfo.ID) == network.Connected || - h.Network().Connectedness(peerinfo.ID) == network.Limited { - return - } - - if err := h.Connect(ctx, peerinfo); err != nil { - logger.Debug("error while connecting to dht peer", zap.Error(err)) - } else { - h.ConnManager().Protect(peerinfo.ID, "bootstrap") - logger.Debug( - "connected to peer", - zap.String("peer_id", peerinfo.ID.String()), - ) - } - }() - } - } - - reconnect() - - bootstrapPeerIDs := make(map[peer.ID]struct{}, len(bootstrappers)) - for _, peerinfo := range bootstrappers { - bootstrapPeerIDs[peerinfo.ID] = struct{}{} - } - go func() { - for { - time.Sleep(30 * time.Second) - found := false - for _, p := range h.Network().Peers() { - if _, ok := bootstrapPeerIDs[p]; ok { - found = true - break - } - } - if !found { - reconnect() - } - } - }() - return kademliaDHT } @@ -1022,65 +995,6 @@ func verifyReachability(cfg *config.P2PConfig) bool { return true } -func discoverPeers( - p2pConfig *config.P2PConfig, - ctx context.Context, - logger *zap.Logger, - h host.Host, - routingDiscovery *routing.RoutingDiscovery, - init bool, -) { - discover := func() { - logger.Info("initiating peer discovery") - defer logger.Info("completed peer discovery") - - peerChan, err := routingDiscovery.FindPeers( - ctx, - getNetworkNamespace(p2pConfig.Network), - ) - if err != nil { - logger.Error("could not find peers", zap.Error(err)) - return - } - - wg := &sync.WaitGroup{} - defer wg.Wait() - for peer := range peerChan { - peer := peer - wg.Add(1) - go func() { - defer wg.Done() - if peer.ID == h.ID() || - h.Network().Connectedness(peer.ID) == network.Connected || - h.Network().Connectedness(peer.ID) == network.Limited { - return - } - - logger.Debug("found peer", zap.String("peer_id", peer.ID.String())) - err := h.Connect(ctx, peer) - if err != nil { - logger.Debug( - "error while connecting to blossomsub peer", - zap.String("peer_id", peer.ID.String()), - zap.Error(err), - ) - } else { - logger.Debug( - "connected to peer", - zap.String("peer_id", peer.ID.String()), - ) - } - }() - } - } - - if init { - go discover() - } else { - discover() - } -} - func mergeDefaults(p2pConfig *config.P2PConfig) blossomsub.BlossomSubParams { if p2pConfig.D == 0 { p2pConfig.D = blossomsub.BlossomSubD diff --git a/node/p2p/internal/peer_connector.go b/node/p2p/internal/peer_connector.go new file mode 100644 index 0000000..092846a --- /dev/null +++ b/node/p2p/internal/peer_connector.go @@ -0,0 +1,307 @@ +package internal + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" + "go.uber.org/zap" +) + +// PeerConnector is a connector to peers. +type PeerConnector interface { + // Connect connects to peers. + Connect(context.Context) error +} + +type peerConnector struct { + ctx context.Context + logger *zap.Logger + host host.Host + idService identify.IDService + connectCh chan (chan<- struct{}) + minPeers int + parallelism int + source PeerSource +} + +// Connect implements PeerConnector. +func (pc *peerConnector) Connect(ctx context.Context) error { + done := make(chan struct{}) + select { + case <-ctx.Done(): + return ctx.Err() + case <-pc.ctx.Done(): + return pc.ctx.Err() + case pc.connectCh <- done: + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-pc.ctx.Done(): + return pc.ctx.Err() + case <-done: + return nil + } +} + +func (pc *peerConnector) connectToPeer( + ctx context.Context, + logger *zap.Logger, + p peer.AddrInfo, + wg *sync.WaitGroup, + duplicate, success, failure *uint32, + inflight <-chan struct{}, +) { + defer func() { + select { + case <-ctx.Done(): + case <-inflight: + } + }() + defer wg.Done() + + if p.ID == pc.host.ID() || + pc.host.Network().Connectedness(p.ID) == network.Connected || + pc.host.Network().Connectedness(p.ID) == network.Limited { + logger.Debug("peer already connected") + atomic.AddUint32(duplicate, 1) + return + } + + pc.host.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.AddressTTL) + + conn, err := pc.host.Network().DialPeer(ctx, p.ID) + if err != nil { + logger.Debug("error while connecting to dht peer", zap.Error(err)) + atomic.AddUint32(failure, 1) + return + } + + select { + case <-ctx.Done(): + return + case <-time.After(identify.Timeout / 2): + logger.Debug("identifying peer timed out") + atomic.AddUint32(failure, 1) + _ = conn.Close() + case <-pc.idService.IdentifyWait(conn): + logger.Debug("connected to peer") + atomic.AddUint32(success, 1) + } +} + +func (pc *peerConnector) connectToPeers( + ctx context.Context, + ch <-chan peer.AddrInfo, + duplicate, success, failure *uint32, +) { + var inflight chan struct{} = make(chan struct{}, pc.parallelism) + var wg sync.WaitGroup + for p := range ch { + logger := pc.logger.With(zap.String("peer_id", p.ID.String())) + logger.Debug("received peer") + + if atomic.LoadUint32(success) >= uint32(pc.minPeers) { + logger.Debug("reached max findings") + return + } + + select { + case <-ctx.Done(): + return + case inflight <- struct{}{}: + } + wg.Add(1) + go pc.connectToPeer( + ctx, + logger, + p, + &wg, + duplicate, + success, + failure, + inflight, + ) + } +} + +func (pc *peerConnector) connect() { + logger := pc.logger + + logger.Info("initiating peer connections") + var success, failure, duplicate uint32 + defer func() { + logger.Info( + "completed peer connections", + zap.Uint32("success", success), + zap.Uint32("failure", failure), + zap.Uint32("duplicate", duplicate), + ) + }() + ctx, cancel := context.WithCancel(pc.ctx) + defer cancel() + + peerChan, err := pc.source.Peers(ctx) + if err != nil { + logger.Error("could not find peers", zap.Error(err)) + return + } + + pc.connectToPeers( + ctx, + peerChan, + &duplicate, + &success, + &failure, + ) +} + +func (pc *peerConnector) run() { + for { + select { + case <-pc.ctx.Done(): + return + case done := <-pc.connectCh: + pc.connect() + close(done) + } + } +} + +// NewPeerConnector creates a new peer connector. +func NewPeerConnector( + ctx context.Context, + logger *zap.Logger, + host host.Host, + idService identify.IDService, + minPeers, parallelism int, + source PeerSource, +) PeerConnector { + pc := &peerConnector{ + ctx: ctx, + logger: logger, + host: host, + idService: idService, + connectCh: make(chan (chan<- struct{})), + minPeers: minPeers, + parallelism: parallelism, + source: source, + } + go pc.run() + return pc +} + +type chainedPeerConnector struct { + ctx context.Context + connectors []PeerConnector + connectCh chan (chan<- struct{}) +} + +// Connect implements PeerConnector. +func (cpc *chainedPeerConnector) Connect(ctx context.Context) error { + done := make(chan struct{}) + select { + case <-ctx.Done(): + return ctx.Err() + case <-cpc.ctx.Done(): + return cpc.ctx.Err() + case cpc.connectCh <- done: + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-cpc.ctx.Done(): + return cpc.ctx.Err() + case <-done: + return nil + } +} + +func (cpc *chainedPeerConnector) run() { + for { + select { + case <-cpc.ctx.Done(): + return + case done := <-cpc.connectCh: + for _, pc := range cpc.connectors { + _ = pc.Connect(cpc.ctx) + } + close(done) + } + } +} + +// NewChainedPeerConnector creates a new chained peer connector. +func NewChainedPeerConnector(ctx context.Context, connectors ...PeerConnector) PeerConnector { + cpc := &chainedPeerConnector{ + ctx: ctx, + connectors: connectors, + connectCh: make(chan (chan<- struct{})), + } + go cpc.run() + return cpc +} + +type conditionalPeerConnector struct { + ctx context.Context + condition PeerConnectorCondition + connector PeerConnector + connectCh chan (chan<- struct{}) +} + +func (cpc *conditionalPeerConnector) run() { + for { + select { + case <-cpc.ctx.Done(): + return + case done := <-cpc.connectCh: + if cpc.condition.Should() { + _ = cpc.connector.Connect(cpc.ctx) + } + close(done) + } + } +} + +// Connect implements PeerConnector. +func (cpc *conditionalPeerConnector) Connect(ctx context.Context) error { + done := make(chan struct{}) + select { + case <-ctx.Done(): + return ctx.Err() + case <-cpc.ctx.Done(): + return cpc.ctx.Err() + case cpc.connectCh <- done: + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-cpc.ctx.Done(): + return cpc.ctx.Err() + case <-done: + return nil + } +} + +// NewConditionalPeerConnector creates a new conditional peer connector. +func NewConditionalPeerConnector( + ctx context.Context, + condition PeerConnectorCondition, + connector PeerConnector, +) PeerConnector { + cpc := &conditionalPeerConnector{ + ctx: ctx, + condition: condition, + connector: connector, + connectCh: make(chan (chan<- struct{})), + } + go cpc.run() + return cpc +} diff --git a/node/p2p/internal/peer_connector_condition.go b/node/p2p/internal/peer_connector_condition.go new file mode 100644 index 0000000..036a623 --- /dev/null +++ b/node/p2p/internal/peer_connector_condition.go @@ -0,0 +1,38 @@ +package internal + +import ( + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" +) + +// PeerConnectorCondition is a condition that determines whether a peer connector should connect. +type PeerConnectorCondition interface { + // Should returns true if the peer connector should connect. + Should() bool +} + +type notEnoughPeersCondition struct { + host host.Host + minPeers int + peers map[peer.ID]struct{} +} + +// Should implements PeerConnectorCondition. +func (c *notEnoughPeersCondition) Should() bool { + count := 0 + for _, p := range c.host.Network().Peers() { + if _, ok := c.peers[p]; ok { + count++ + } + } + return count < c.minPeers +} + +// NewNotEnoughPeersCondition creates a new not enough peers condition. +func NewNotEnoughPeersCondition(host host.Host, minPeers int, peers map[peer.ID]struct{}) PeerConnectorCondition { + return ¬EnoughPeersCondition{ + host: host, + minPeers: minPeers, + peers: peers, + } +} diff --git a/node/p2p/internal/peer_source.go b/node/p2p/internal/peer_source.go new file mode 100644 index 0000000..1c32e93 --- /dev/null +++ b/node/p2p/internal/peer_source.go @@ -0,0 +1,59 @@ +package internal + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/discovery" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/discovery/routing" +) + +// PeerSource is a source of peers. +type PeerSource interface { + // Peers returns a channel of peers. + Peers(context.Context) (<-chan peer.AddrInfo, error) +} + +type staticPeerSource struct { + peers []peer.AddrInfo + permute bool +} + +// Peers implements PeerSource. +func (s *staticPeerSource) Peers(context.Context) (<-chan peer.AddrInfo, error) { + peers := s.peers + if s.permute { + peers = Permuted(s.peers) + } + ch := make(chan peer.AddrInfo, len(peers)) + for _, p := range peers { + ch <- p + } + close(ch) + return ch, nil +} + +// NewStaticPeerSource creates a new static peer source. +func NewStaticPeerSource(peers []peer.AddrInfo, permute bool) PeerSource { + return &staticPeerSource{peers: peers, permute: permute} +} + +type routingDiscoveryPeerSource struct { + discovery *routing.RoutingDiscovery + namespace string + limit int +} + +// Peers implements PeerSource. +func (d *routingDiscoveryPeerSource) Peers(ctx context.Context) (<-chan peer.AddrInfo, error) { + return d.discovery.FindPeers(ctx, d.namespace, discovery.Limit(d.limit)) +} + +// NewRoutingDiscoveryPeerSource creates a new discovery peer source. +func NewRoutingDiscoveryPeerSource(discovery *routing.RoutingDiscovery, namespace string, limit int) PeerSource { + return &routingDiscoveryPeerSource{ + discovery: discovery, + namespace: namespace, + limit: limit, + } +} diff --git a/node/p2p/internal/utils.go b/node/p2p/internal/utils.go new file mode 100644 index 0000000..65a8695 --- /dev/null +++ b/node/p2p/internal/utils.go @@ -0,0 +1,41 @@ +package internal + +import ( + "math/rand" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/identify" +) + +// PeerAddrInfosToPeerIDSlice converts a slice of peer.AddrInfo to a slice of peer.ID. +func PeerAddrInfosToPeerIDSlice(p []peer.AddrInfo) []peer.ID { + ids := make([]peer.ID, len(p)) + for i, pi := range p { + ids[i] = pi.ID + } + return ids +} + +// PeerAddrInfosToPeerIDMap converts a slice of peer.AddrInfo to a map of peer.ID. +func PeerAddrInfosToPeerIDMap(p []peer.AddrInfo) map[peer.ID]struct{} { + m := make(map[peer.ID]struct{}, len(p)) + for _, pi := range p { + m[pi.ID] = struct{}{} + } + return m +} + +// IDServiceFromHost returns the identify.IDService from a host.Host. +func IDServiceFromHost(h host.Host) identify.IDService { + return h.(interface{ IDService() identify.IDService }).IDService() +} + +// Permuted returns a permuted copy of a slice. +func Permuted[T any](slice []T) []T { + permuted := make([]T, len(slice)) + for i := range rand.Perm(len(slice)) { + permuted[i] = slice[i] + } + return permuted +}