This commit is contained in:
Black Swan 2026-01-05 13:31:05 +03:00 committed by GitHub
commit a5f0c27ac6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 372 additions and 138 deletions

View File

@ -12,6 +12,7 @@ import (
"math/big"
"math/bits"
"net"
"runtime/debug"
"slices"
"strings"
@ -26,7 +27,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/discovery/util"
@ -34,7 +35,7 @@ import (
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/gostream"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/mr-tron/base58"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
@ -44,7 +45,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
grpcpeer "google.golang.org/grpc/peer"
"google.golang.org/protobuf/types/known/wrapperspb"
"source.quilibrium.com/quilibrium/monorepo/config"
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
@ -1306,10 +1307,17 @@ func (b *BlossomSub) invokeConnectivityTest(
}
defer conn.Close()
connMultiaddrs := b.collectConnectivityMultiaddrs()
b.logger.Debug("own multiaddrs", zap.Strings("mas", connMultiaddrs))
if len(connMultiaddrs) == 0 {
return errors.New("connectivity test: no connectivity multiaddrs found")
}
client := protobufs.NewConnectivityServiceClient(conn)
req := &protobufs.ConnectivityTestRequest{
PeerId: []byte(b.h.ID()),
Multiaddrs: b.collectConnectivityMultiaddrs(),
Multiaddrs: connMultiaddrs,
}
resp, err := client.TestConnectivity(dialCtx, req)
@ -1353,139 +1361,6 @@ func (b *BlossomSub) recordManualReachability(success bool) {
b.manualReachability.Store(state)
}
type connectivityService struct {
protobufs.UnimplementedConnectivityServiceServer
logger *zap.Logger
host host.Host
ping *ping.PingService
}
func newConnectivityService(
logger *zap.Logger,
h host.Host,
) *connectivityService {
return &connectivityService{
logger: logger,
host: h,
ping: ping.NewPingService(h),
}
}
func (s *connectivityService) TestConnectivity(
ctx context.Context,
req *protobufs.ConnectivityTestRequest,
) (*protobufs.ConnectivityTestResponse, error) {
resp := &protobufs.ConnectivityTestResponse{}
peerID := peer.ID(req.GetPeerId())
if peerID == "" {
resp.ErrorMessage = "peer id required"
return resp, nil
}
// Get the actual IP address from the gRPC peer context
pr, ok := grpcpeer.FromContext(ctx)
if !ok || pr.Addr == nil {
resp.ErrorMessage = "unable to determine peer address from context"
return resp, nil
}
// Extract the IP from the remote address
remoteAddr := pr.Addr.String()
host, _, err := net.SplitHostPort(remoteAddr)
if err != nil {
resp.ErrorMessage = fmt.Sprintf("invalid remote address: %v", err)
return resp, nil
}
s.logger.Debug(
"connectivity test from peer",
zap.String("peer_id", peerID.String()),
zap.String("remote_ip", host),
)
addrs := make([]ma.Multiaddr, 0, len(req.GetMultiaddrs()))
for _, addrStr := range req.GetMultiaddrs() {
maddr, err := ma.NewMultiaddr(addrStr)
if err != nil {
s.logger.Debug(
"invalid multiaddr in connectivity request",
zap.String("peer_id", peerID.String()),
zap.String("multiaddr", addrStr),
zap.Error(err),
)
continue
}
// Extract the port from the multiaddr but use the actual IP from the
// connection
port, err := maddr.ValueForProtocol(ma.P_TCP)
if err != nil {
// If it's not TCP, try UDP
port, err = maddr.ValueForProtocol(ma.P_UDP)
if err != nil {
continue
}
// Build UDP multiaddr with actual IP
newAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/%s/quic-v1", host, port))
if err != nil {
continue
}
addrs = append(addrs, newAddr)
continue
}
// Build TCP multiaddr with actual IP
newAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", host, port))
if err != nil {
continue
}
addrs = append(addrs, newAddr)
}
if len(addrs) == 0 {
resp.ErrorMessage = "no valid multiaddrs to test"
return resp, nil
}
s.logger.Debug(
"attempting to connect to peer",
zap.String("peer_id", peerID.String()),
zap.Any("addrs", addrs),
)
s.host.Peerstore().AddAddrs(peerID, addrs, peerstore.TempAddrTTL)
connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
err = s.host.Connect(connectCtx, peer.AddrInfo{
ID: peerID,
Addrs: addrs,
})
if err != nil {
resp.ErrorMessage = err.Error()
return resp, nil
}
defer s.host.Network().ClosePeer(peerID)
pingCtx, cancelPing := context.WithTimeout(ctx, 10*time.Second)
defer cancelPing()
select {
case <-pingCtx.Done():
resp.ErrorMessage = pingCtx.Err().Error()
return resp, nil
case result := <-s.ping.Ping(pingCtx, peerID):
if result.Error != nil {
resp.ErrorMessage = result.Error.Error()
return resp, nil
}
}
resp.Success = true
return resp, nil
}
func initDHT(
ctx context.Context,
logger *zap.Logger,
@ -1698,7 +1573,7 @@ func (b *BlossomSub) isLocalOnlyAddr(addr ma.Multiaddr) bool {
if err != nil {
ipComponent, err = addr.ValueForProtocol(ma.P_IP6)
if err != nil {
return false
return addr.String() == "/p2p-circuit"
}
}

View File

@ -0,0 +1,200 @@
package p2p
import (
"context"
"fmt"
"net"
"net/netip"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr"
mn "github.com/multiformats/go-multiaddr/net"
"github.com/pkg/errors"
"go.uber.org/zap"
grpcpeer "google.golang.org/grpc/peer"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
)
type connectivityService struct {
protobufs.UnimplementedConnectivityServiceServer
logger *zap.Logger
host host.Host
ping *ping.PingService
}
func newConnectivityService(
logger *zap.Logger,
h host.Host,
) *connectivityService {
return &connectivityService{
logger: logger,
host: h,
ping: ping.NewPingService(h),
}
}
func (s *connectivityService) TestConnectivity(
ctx context.Context,
req *protobufs.ConnectivityTestRequest,
) (*protobufs.ConnectivityTestResponse, error) {
resp := &protobufs.ConnectivityTestResponse{}
peerID := peer.ID(req.GetPeerId())
if peerID == "" {
resp.ErrorMessage = "peer id required"
return resp, nil
}
reqMaddrStrs := req.GetMultiaddrs()
reqMaddrs := getValidMultiaddrs(reqMaddrStrs)
// categorize submitted multiaddrs into public and private
publicMaddrs, privateMaddrs := categorizePublicPrivateMultiaddrs(reqMaddrs)
if len(publicMaddrs)+len(privateMaddrs) == 0 {
resp.ErrorMessage = "no valid multiaddrs to test"
return resp, nil
}
// ping public multiaddrs first
if err := s.pingPeer(ctx, peerID, publicMaddrs); err == nil {
resp.Success = true
return resp, nil
}
// get remote IP address from the gRPC peer context
pr, ok := grpcpeer.FromContext(ctx)
if !ok || pr.Addr == nil {
resp.ErrorMessage = "unable to determine peer information from context"
return resp, nil
}
remoteAddr := pr.Addr.String()
remoteHost, _, err := net.SplitHostPort(remoteAddr)
if err != nil {
resp.ErrorMessage = fmt.Sprintf("unable to parse remote host from remote address: %s", remoteAddr)
return resp, nil
}
remoteIP, err := netip.ParseAddr(remoteHost)
if err != nil {
resp.ErrorMessage = fmt.Sprintf("unable to parse remote ip from remote host: %s", remoteHost)
return resp, nil
}
s.logger.Debug(
"connectivity test from peer",
zap.String("peer_id", peerID.String()),
zap.String("remote_ip", remoteIP.String()),
)
// replace private multiaddr IPs with remote IP
guessAddrs := make([]ma.Multiaddr, 0, len(privateMaddrs))
for _, maddr := range privateMaddrs {
guessAddr, err := replaceMultiaddrIP(maddr, remoteIP)
if err == nil {
guessAddrs = append(guessAddrs, guessAddr)
}
}
// ping guessed public multiaddrs
if err := s.pingPeer(ctx, peerID, guessAddrs); err != nil {
resp.ErrorMessage = err.Error()
return resp, nil
}
resp.Success = true
return resp, nil
}
func categorizePublicPrivateMultiaddrs(reqMaddrs []ma.Multiaddr) ([]ma.Multiaddr, []ma.Multiaddr) {
publicMaddrs := make([]ma.Multiaddr, 0, len(reqMaddrs))
privateMaddrs := make([]ma.Multiaddr, 0, len(reqMaddrs))
for _, maddr := range reqMaddrs {
if isPubl, _ := mn.IsPublicAddr(maddr); isPubl {
publicMaddrs = append(publicMaddrs, maddr)
} else if isPriv, _ := mn.IsPrivateAddr(maddr); isPriv {
privateMaddrs = append(privateMaddrs, maddr)
}
}
return publicMaddrs, privateMaddrs
}
func getValidMultiaddrs(reqMaddrStrs []string) []ma.Multiaddr {
reqMaddrs := make([]ma.Multiaddr, 0, len(reqMaddrStrs))
for _, maddrStr := range reqMaddrStrs {
maddr, err := ma.NewMultiaddr(maddrStr)
if err == nil {
reqMaddrs = append(reqMaddrs, maddr)
}
}
return reqMaddrs
}
func replaceMultiaddrIP(maddr ma.Multiaddr, ip netip.Addr) (ma.Multiaddr, error) {
ipComp, rest := ma.SplitFirst(maddr)
if ipComp.Protocol().Code == ma.P_IP6 {
// if remote IP is IPv4, convert to IPv4-in-IPv6
if ip.Is4() {
ip = netip.AddrFrom16(ip.As16())
}
} else if ipComp.Protocol().Code == ma.P_IP4 {
// if remote IP is IPv4-in-IPv6, unmap, skip IPv6
if ip.Is4In6() {
ip = ip.Unmap()
} else if ip.Is6() {
return nil, errors.New("can't put IPv6 remote IP into IPv4 multiaddr")
}
} else {
// skip other protocols
return nil, errors.New("unsupported multiaddr protocol")
}
newIpComp, err := ma.NewComponent(ipComp.Protocol().Name, ip.String())
if err != nil {
return nil, err
}
return ma.Join(newIpComp, rest), nil
}
func (s *connectivityService) pingPeer(ctx context.Context, peerID peer.ID, addrs []ma.Multiaddr) error {
s.logger.Debug(
"attempting to connect to peer",
zap.String("peer_id", peerID.String()),
zap.Any("addrs", addrs),
)
s.host.Peerstore().AddAddrs(peerID, addrs, peerstore.TempAddrTTL)
connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
err := s.host.Connect(connectCtx, peer.AddrInfo{
ID: peerID,
Addrs: addrs,
})
if err != nil {
return err
}
defer s.host.Network().ClosePeer(peerID)
pingCtx, cancelPing := context.WithTimeout(ctx, 10*time.Second)
defer cancelPing()
select {
case <-pingCtx.Done():
return pingCtx.Err()
case result := <-s.ping.Ping(pingCtx, peerID):
if result.Error != nil {
return result.Error
}
}
return nil
}

View File

@ -0,0 +1,159 @@
package p2p
import (
"net/netip"
"testing"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
)
func Test_categorizeMultiaddrs(t *testing.T) {
publicMaddrStrs := []string{
"/ip4/17.3.2.5/tcp/123",
"/ip4/8.6.3.2/udp/765",
"/ip4/36.15.23.85/udp/8654/quic-v1",
"/dns/test.com/udp/1543/quic-v1",
}
privateMaddrStrs := []string{
"/ip4/192.168.0.1/tcp/123",
"/ip4/127.0.0.1/udp/2345",
"/ip4/172.18.0.2/udp/456/quic-v1",
"/dns/localhost/tcp/123",
}
testPublicMaddrs := getValidMultiaddrs(publicMaddrStrs)
testPrivateMaddrs := getValidMultiaddrs(privateMaddrStrs)
testMaddrs := append(testPublicMaddrs, testPrivateMaddrs...)
t.Run("Test categorization of multiaddrs into public and private", func(t *testing.T) {
publicMaddrs, privateMaddrs := categorizePublicPrivateMultiaddrs(testMaddrs)
assert.Equal(t, len(testPublicMaddrs), len(publicMaddrs))
assert.Equal(t, len(testPrivateMaddrs), len(privateMaddrs))
for _, maddr := range publicMaddrs {
assert.Contains(t, publicMaddrStrs, maddr.String())
assert.NotContains(t, privateMaddrStrs, maddr.String())
}
for _, maddr := range privateMaddrs {
assert.Contains(t, privateMaddrStrs, maddr.String())
assert.NotContains(t, publicMaddrStrs, maddr.String())
}
})
}
func Test_getValidMultiaddrs(t *testing.T) {
validMaddrsStrs := []string{
"/ip4/127.0.0.1/tcp/123",
"/ip6/::1/udp/234",
"/dns/localhost/udp/456/quic-v1",
"/ip6/::ffff:15.32.65.95/tcp/123",
"/ip6/2001:db8:85a3::8a2e:370:7334/tcp/123",
}
invalidMaddrStrs := []string{
"/ip/127.0.0.1/tcp/123",
"/ip4/127.0.0.1/123",
"/ip4/127.0.0.1/tcp/",
"/",
"/ip4/127.0.0.1/tcp/123/quicv1",
"/ip4/127.0.0.1/udp/123/qui",
"/ip6/2001:db8:85a3:8a2e:370:7334/tcp/123",
}
testMaddrStrs := append(validMaddrsStrs, invalidMaddrStrs...)
t.Run("Test valid multiaddr extraction from string array", func(t *testing.T) {
maddrs := getValidMultiaddrs(testMaddrStrs)
assert.Equal(t, len(validMaddrsStrs), len(maddrs))
for _, maddr := range maddrs {
assert.Contains(t, validMaddrsStrs, maddr.String())
assert.NotContains(t, invalidMaddrStrs, maddr.String())
}
})
}
func Test_replaceMultiaddrIP(t *testing.T) {
t.Run("IPv4-IPv4", func(t *testing.T) {
privateMaddr, err := ma.NewMultiaddr("/ip4/192.168.0.3/tcp/123")
assert.NoError(t, err)
newIP, err := netip.ParseAddr("15.32.65.95")
assert.NoError(t, err)
maddr, err := replaceMultiaddrIP(privateMaddr, newIP)
assert.NoError(t, err)
assert.Equal(t, maddr.String(), "/ip4/15.32.65.95/tcp/123")
})
t.Run("IPv4-IPv6/4", func(t *testing.T) {
privateMaddr, err := ma.NewMultiaddr("/ip4/192.168.0.3/tcp/123")
assert.NoError(t, err)
newIP, err := netip.ParseAddr("::ffff:12.18.0.95")
assert.NoError(t, err)
maddr, err := replaceMultiaddrIP(privateMaddr, newIP)
assert.NoError(t, err)
assert.Equal(t, maddr.String(), "/ip4/12.18.0.95/tcp/123")
})
t.Run("IPv4-IPv6", func(t *testing.T) {
privateMaddr, err := ma.NewMultiaddr("/ip4/192.168.0.3/tcp/123")
assert.NoError(t, err)
newIP, err := netip.ParseAddr("2001:db8:85a3::8a2e:370:7334")
assert.NoError(t, err)
maddr, err := replaceMultiaddrIP(privateMaddr, newIP)
assert.Error(t, err)
assert.Nil(t, maddr)
})
t.Run("IPv6-IPv4", func(t *testing.T) {
privateMaddr, err := ma.NewMultiaddr("/ip6/fd12:3456:789a:1:a:b:c:d/tcp/123")
assert.NoError(t, err)
newIP, err := netip.ParseAddr("15.32.65.95")
assert.NoError(t, err)
maddr, err := replaceMultiaddrIP(privateMaddr, newIP)
assert.NoError(t, err)
assert.Equal(t, maddr.String(), "/ip6/::ffff:15.32.65.95/tcp/123")
})
t.Run("IPv6/4-IPv4", func(t *testing.T) {
privateMaddr, err := ma.NewMultiaddr("/ip6/::ffff:172.18.0.95/tcp/123")
assert.NoError(t, err)
newIP, err := netip.ParseAddr("15.32.65.95")
assert.NoError(t, err)
maddr, err := replaceMultiaddrIP(privateMaddr, newIP)
assert.NoError(t, err)
assert.Equal(t, maddr.String(), "/ip6/::ffff:15.32.65.95/tcp/123")
})
t.Run("IPv6/4-IPv6", func(t *testing.T) {
privateMaddr, err := ma.NewMultiaddr("/ip6/::ffff:172.18.0.95/udp/123/quic-v1")
assert.NoError(t, err)
newIP, err := netip.ParseAddr("2001:db8:85a3::8a2e:370:7334")
assert.NoError(t, err)
maddr, err := replaceMultiaddrIP(privateMaddr, newIP)
assert.NoError(t, err)
assert.Equal(t, maddr.String(), "/ip6/2001:db8:85a3::8a2e:370:7334/udp/123/quic-v1")
})
t.Run("IPv6-IPv6", func(t *testing.T) {
privateMaddr, err := ma.NewMultiaddr("/ip6/::ffff:172.18.0.95/tcp/123")
assert.NoError(t, err)
newIP, err := netip.ParseAddr("2001:db8:85a3::8a2e:370:7334")
assert.NoError(t, err)
maddr, err := replaceMultiaddrIP(privateMaddr, newIP)
assert.NoError(t, err)
assert.Equal(t, maddr.String(), "/ip6/2001:db8:85a3::8a2e:370:7334/tcp/123")
})
t.Run("DNS-IPv6", func(t *testing.T) {
privateMaddr, err := ma.NewMultiaddr("/dns/test.com/tcp/123")
assert.NoError(t, err)
newIP, err := netip.ParseAddr("2001:db8:85a3::8a2e:370:7334")
assert.NoError(t, err)
maddr, err := replaceMultiaddrIP(privateMaddr, newIP)
assert.Error(t, err)
assert.Nil(t, maddr)
})
}