From a804e4570357d7c3029e8de759fb0c2b80c90855 Mon Sep 17 00:00:00 2001 From: Black Swan Date: Thu, 18 Dec 2025 04:06:10 +0200 Subject: [PATCH] add ipv6 and dns support --- node/p2p/blossomsub.go | 140 +----------------- node/p2p/connectivity_service.go | 200 ++++++++++++++++++++++++++ node/p2p/connectivity_service_test.go | 159 ++++++++++++++++++++ 3 files changed, 363 insertions(+), 136 deletions(-) create mode 100644 node/p2p/connectivity_service.go create mode 100644 node/p2p/connectivity_service_test.go diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 9936e99..dd74961 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -12,6 +12,7 @@ import ( "math/big" "math/bits" "net" + "runtime/debug" "slices" "strings" @@ -26,7 +27,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/discovery/util" @@ -34,7 +35,7 @@ import ( routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/net/gostream" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/mr-tron/base58" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" @@ -44,7 +45,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - grpcpeer "google.golang.org/grpc/peer" + "google.golang.org/protobuf/types/known/wrapperspb" "source.quilibrium.com/quilibrium/monorepo/config" blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub" @@ -1360,139 +1361,6 @@ func (b *BlossomSub) recordManualReachability(success bool) { b.manualReachability.Store(state) } -type connectivityService struct { - protobufs.UnimplementedConnectivityServiceServer - logger *zap.Logger - host host.Host - ping *ping.PingService -} - -func newConnectivityService( - logger *zap.Logger, - h host.Host, -) *connectivityService { - return &connectivityService{ - logger: logger, - host: h, - ping: ping.NewPingService(h), - } -} - -func (s *connectivityService) TestConnectivity( - ctx context.Context, - req *protobufs.ConnectivityTestRequest, -) (*protobufs.ConnectivityTestResponse, error) { - resp := &protobufs.ConnectivityTestResponse{} - peerID := peer.ID(req.GetPeerId()) - if peerID == "" { - resp.ErrorMessage = "peer id required" - return resp, nil - } - - // Get the actual IP address from the gRPC peer context - pr, ok := grpcpeer.FromContext(ctx) - if !ok || pr.Addr == nil { - resp.ErrorMessage = "unable to determine peer address from context" - return resp, nil - } - - // Extract the IP from the remote address - remoteAddr := pr.Addr.String() - host, _, err := net.SplitHostPort(remoteAddr) - if err != nil { - resp.ErrorMessage = fmt.Sprintf("invalid remote address: %v", err) - return resp, nil - } - - s.logger.Debug( - "connectivity test from peer", - zap.String("peer_id", peerID.String()), - zap.String("remote_ip", host), - ) - - addrs := make([]ma.Multiaddr, 0, len(req.GetMultiaddrs())) - for _, addrStr := range req.GetMultiaddrs() { - maddr, err := ma.NewMultiaddr(addrStr) - if err != nil { - s.logger.Debug( - "invalid multiaddr in connectivity request", - zap.String("peer_id", peerID.String()), - zap.String("multiaddr", addrStr), - zap.Error(err), - ) - continue - } - - // Extract the port from the multiaddr but use the actual IP from the - // connection - port, err := maddr.ValueForProtocol(ma.P_TCP) - if err != nil { - // If it's not TCP, try UDP - port, err = maddr.ValueForProtocol(ma.P_UDP) - if err != nil { - continue - } - // Build UDP multiaddr with actual IP - newAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/%s/quic-v1", host, port)) - if err != nil { - continue - } - addrs = append(addrs, newAddr) - continue - } - - // Build TCP multiaddr with actual IP - newAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", host, port)) - if err != nil { - continue - } - addrs = append(addrs, newAddr) - } - - if len(addrs) == 0 { - resp.ErrorMessage = "no valid multiaddrs to test" - return resp, nil - } - - s.logger.Debug( - "attempting to connect to peer", - zap.String("peer_id", peerID.String()), - zap.Any("addrs", addrs), - ) - - s.host.Peerstore().AddAddrs(peerID, addrs, peerstore.TempAddrTTL) - - connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - err = s.host.Connect(connectCtx, peer.AddrInfo{ - ID: peerID, - Addrs: addrs, - }) - if err != nil { - resp.ErrorMessage = err.Error() - return resp, nil - } - - defer s.host.Network().ClosePeer(peerID) - - pingCtx, cancelPing := context.WithTimeout(ctx, 10*time.Second) - defer cancelPing() - - select { - case <-pingCtx.Done(): - resp.ErrorMessage = pingCtx.Err().Error() - return resp, nil - case result := <-s.ping.Ping(pingCtx, peerID): - if result.Error != nil { - resp.ErrorMessage = result.Error.Error() - return resp, nil - } - } - - resp.Success = true - return resp, nil -} - func initDHT( ctx context.Context, logger *zap.Logger, diff --git a/node/p2p/connectivity_service.go b/node/p2p/connectivity_service.go new file mode 100644 index 0000000..3330efa --- /dev/null +++ b/node/p2p/connectivity_service.go @@ -0,0 +1,200 @@ +package p2p + +import ( + "context" + "fmt" + "net" + "net/netip" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + ma "github.com/multiformats/go-multiaddr" + mn "github.com/multiformats/go-multiaddr/net" + "github.com/pkg/errors" + "go.uber.org/zap" + grpcpeer "google.golang.org/grpc/peer" + "source.quilibrium.com/quilibrium/monorepo/protobufs" +) + +type connectivityService struct { + protobufs.UnimplementedConnectivityServiceServer + logger *zap.Logger + host host.Host + ping *ping.PingService +} + +func newConnectivityService( + logger *zap.Logger, + h host.Host, +) *connectivityService { + return &connectivityService{ + logger: logger, + host: h, + ping: ping.NewPingService(h), + } +} + +func (s *connectivityService) TestConnectivity( + ctx context.Context, + req *protobufs.ConnectivityTestRequest, +) (*protobufs.ConnectivityTestResponse, error) { + resp := &protobufs.ConnectivityTestResponse{} + peerID := peer.ID(req.GetPeerId()) + if peerID == "" { + resp.ErrorMessage = "peer id required" + return resp, nil + } + + reqMaddrStrs := req.GetMultiaddrs() + reqMaddrs := getValidMultiaddrs(reqMaddrStrs) + + // categorize submitted multiaddrs into public and private + publicMaddrs, privateMaddrs := categorizePublicPrivateMultiaddrs(reqMaddrs) + + if len(publicMaddrs)+len(privateMaddrs) == 0 { + resp.ErrorMessage = "no valid multiaddrs to test" + return resp, nil + } + + // ping public multiaddrs first + if err := s.pingPeer(ctx, peerID, publicMaddrs); err == nil { + resp.Success = true + return resp, nil + } + + // get remote IP address from the gRPC peer context + pr, ok := grpcpeer.FromContext(ctx) + if !ok || pr.Addr == nil { + resp.ErrorMessage = "unable to determine peer information from context" + return resp, nil + } + + remoteAddr := pr.Addr.String() + remoteHost, _, err := net.SplitHostPort(remoteAddr) + if err != nil { + resp.ErrorMessage = fmt.Sprintf("unable to parse remote host from remote address: %s", remoteAddr) + return resp, nil + } + + remoteIP, err := netip.ParseAddr(remoteHost) + + if err != nil { + resp.ErrorMessage = fmt.Sprintf("unable to parse remote ip from remote host: %s", remoteHost) + return resp, nil + } + + s.logger.Debug( + "connectivity test from peer", + zap.String("peer_id", peerID.String()), + zap.String("remote_ip", remoteIP.String()), + ) + + // replace private multiaddr IPs with remote IP + guessAddrs := make([]ma.Multiaddr, 0, len(privateMaddrs)) + for _, maddr := range privateMaddrs { + guessAddr, err := replaceMultiaddrIP(maddr, remoteIP) + if err == nil { + guessAddrs = append(guessAddrs, guessAddr) + } + } + + // ping guessed public multiaddrs + if err := s.pingPeer(ctx, peerID, guessAddrs); err != nil { + resp.ErrorMessage = err.Error() + return resp, nil + } + + resp.Success = true + return resp, nil +} + +func categorizePublicPrivateMultiaddrs(reqMaddrs []ma.Multiaddr) ([]ma.Multiaddr, []ma.Multiaddr) { + publicMaddrs := make([]ma.Multiaddr, 0, len(reqMaddrs)) + privateMaddrs := make([]ma.Multiaddr, 0, len(reqMaddrs)) + + for _, maddr := range reqMaddrs { + if isPubl, _ := mn.IsPublicAddr(maddr); isPubl { + publicMaddrs = append(publicMaddrs, maddr) + } else if isPriv, _ := mn.IsPrivateAddr(maddr); isPriv { + privateMaddrs = append(privateMaddrs, maddr) + } + } + return publicMaddrs, privateMaddrs +} + +func getValidMultiaddrs(reqMaddrStrs []string) []ma.Multiaddr { + reqMaddrs := make([]ma.Multiaddr, 0, len(reqMaddrStrs)) + + for _, maddrStr := range reqMaddrStrs { + maddr, err := ma.NewMultiaddr(maddrStr) + if err == nil { + reqMaddrs = append(reqMaddrs, maddr) + } + } + return reqMaddrs +} + +func replaceMultiaddrIP(maddr ma.Multiaddr, ip netip.Addr) (ma.Multiaddr, error) { + ipComp, rest := ma.SplitFirst(maddr) + + if ipComp.Protocol().Code == ma.P_IP6 { + // if remote IP is IPv4, convert to IPv4-in-IPv6 + if ip.Is4() { + ip = netip.AddrFrom16(ip.As16()) + } + } else if ipComp.Protocol().Code == ma.P_IP4 { + // if remote IP is IPv4-in-IPv6, unmap, skip IPv6 + if ip.Is4In6() { + ip = ip.Unmap() + } else if ip.Is6() { + return nil, errors.New("can't put IPv6 remote IP into IPv4 multiaddr") + } + } else { + // skip other protocols + return nil, errors.New("unsupported multiaddr protocol") + } + + newIpComp, err := ma.NewComponent(ipComp.Protocol().Name, ip.String()) + if err != nil { + return nil, err + } + return ma.Join(newIpComp, rest), nil +} + +func (s *connectivityService) pingPeer(ctx context.Context, peerID peer.ID, addrs []ma.Multiaddr) error { + s.logger.Debug( + "attempting to connect to peer", + zap.String("peer_id", peerID.String()), + zap.Any("addrs", addrs), + ) + + s.host.Peerstore().AddAddrs(peerID, addrs, peerstore.TempAddrTTL) + + connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + err := s.host.Connect(connectCtx, peer.AddrInfo{ + ID: peerID, + Addrs: addrs, + }) + if err != nil { + return err + } + + defer s.host.Network().ClosePeer(peerID) + + pingCtx, cancelPing := context.WithTimeout(ctx, 10*time.Second) + defer cancelPing() + + select { + case <-pingCtx.Done(): + return pingCtx.Err() + case result := <-s.ping.Ping(pingCtx, peerID): + if result.Error != nil { + return result.Error + } + } + return nil +} diff --git a/node/p2p/connectivity_service_test.go b/node/p2p/connectivity_service_test.go new file mode 100644 index 0000000..d231998 --- /dev/null +++ b/node/p2p/connectivity_service_test.go @@ -0,0 +1,159 @@ +package p2p + +import ( + "net/netip" + "testing" + + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/assert" +) + +func Test_categorizeMultiaddrs(t *testing.T) { + publicMaddrStrs := []string{ + "/ip4/17.3.2.5/tcp/123", + "/ip4/8.6.3.2/udp/765", + "/ip4/36.15.23.85/udp/8654/quic-v1", + "/dns/test.com/udp/1543/quic-v1", + } + + privateMaddrStrs := []string{ + "/ip4/192.168.0.1/tcp/123", + "/ip4/127.0.0.1/udp/2345", + "/ip4/172.18.0.2/udp/456/quic-v1", + "/dns/localhost/tcp/123", + } + + testPublicMaddrs := getValidMultiaddrs(publicMaddrStrs) + testPrivateMaddrs := getValidMultiaddrs(privateMaddrStrs) + + testMaddrs := append(testPublicMaddrs, testPrivateMaddrs...) + + t.Run("Test categorization of multiaddrs into public and private", func(t *testing.T) { + publicMaddrs, privateMaddrs := categorizePublicPrivateMultiaddrs(testMaddrs) + assert.Equal(t, len(testPublicMaddrs), len(publicMaddrs)) + assert.Equal(t, len(testPrivateMaddrs), len(privateMaddrs)) + for _, maddr := range publicMaddrs { + assert.Contains(t, publicMaddrStrs, maddr.String()) + assert.NotContains(t, privateMaddrStrs, maddr.String()) + } + for _, maddr := range privateMaddrs { + assert.Contains(t, privateMaddrStrs, maddr.String()) + assert.NotContains(t, publicMaddrStrs, maddr.String()) + } + }) +} + +func Test_getValidMultiaddrs(t *testing.T) { + validMaddrsStrs := []string{ + "/ip4/127.0.0.1/tcp/123", + "/ip6/::1/udp/234", + "/dns/localhost/udp/456/quic-v1", + "/ip6/::ffff:15.32.65.95/tcp/123", + "/ip6/2001:db8:85a3::8a2e:370:7334/tcp/123", + } + + invalidMaddrStrs := []string{ + "/ip/127.0.0.1/tcp/123", + "/ip4/127.0.0.1/123", + "/ip4/127.0.0.1/tcp/", + "/", + "/ip4/127.0.0.1/tcp/123/quicv1", + "/ip4/127.0.0.1/udp/123/qui", + "/ip6/2001:db8:85a3:8a2e:370:7334/tcp/123", + } + + testMaddrStrs := append(validMaddrsStrs, invalidMaddrStrs...) + + t.Run("Test valid multiaddr extraction from string array", func(t *testing.T) { + maddrs := getValidMultiaddrs(testMaddrStrs) + assert.Equal(t, len(validMaddrsStrs), len(maddrs)) + for _, maddr := range maddrs { + assert.Contains(t, validMaddrsStrs, maddr.String()) + assert.NotContains(t, invalidMaddrStrs, maddr.String()) + } + }) +} + +func Test_replaceMultiaddrIP(t *testing.T) { + + t.Run("IPv4-IPv4", func(t *testing.T) { + privateMaddr, err := ma.NewMultiaddr("/ip4/192.168.0.3/tcp/123") + assert.NoError(t, err) + newIP, err := netip.ParseAddr("15.32.65.95") + assert.NoError(t, err) + maddr, err := replaceMultiaddrIP(privateMaddr, newIP) + assert.NoError(t, err) + assert.Equal(t, maddr.String(), "/ip4/15.32.65.95/tcp/123") + }) + + t.Run("IPv4-IPv6/4", func(t *testing.T) { + privateMaddr, err := ma.NewMultiaddr("/ip4/192.168.0.3/tcp/123") + assert.NoError(t, err) + newIP, err := netip.ParseAddr("::ffff:12.18.0.95") + assert.NoError(t, err) + maddr, err := replaceMultiaddrIP(privateMaddr, newIP) + assert.NoError(t, err) + assert.Equal(t, maddr.String(), "/ip4/12.18.0.95/tcp/123") + }) + + t.Run("IPv4-IPv6", func(t *testing.T) { + privateMaddr, err := ma.NewMultiaddr("/ip4/192.168.0.3/tcp/123") + assert.NoError(t, err) + newIP, err := netip.ParseAddr("2001:db8:85a3::8a2e:370:7334") + assert.NoError(t, err) + maddr, err := replaceMultiaddrIP(privateMaddr, newIP) + assert.Error(t, err) + assert.Nil(t, maddr) + }) + + t.Run("IPv6-IPv4", func(t *testing.T) { + privateMaddr, err := ma.NewMultiaddr("/ip6/fd12:3456:789a:1:a:b:c:d/tcp/123") + assert.NoError(t, err) + newIP, err := netip.ParseAddr("15.32.65.95") + assert.NoError(t, err) + maddr, err := replaceMultiaddrIP(privateMaddr, newIP) + assert.NoError(t, err) + assert.Equal(t, maddr.String(), "/ip6/::ffff:15.32.65.95/tcp/123") + }) + + t.Run("IPv6/4-IPv4", func(t *testing.T) { + privateMaddr, err := ma.NewMultiaddr("/ip6/::ffff:172.18.0.95/tcp/123") + assert.NoError(t, err) + newIP, err := netip.ParseAddr("15.32.65.95") + assert.NoError(t, err) + maddr, err := replaceMultiaddrIP(privateMaddr, newIP) + assert.NoError(t, err) + assert.Equal(t, maddr.String(), "/ip6/::ffff:15.32.65.95/tcp/123") + }) + + t.Run("IPv6/4-IPv6", func(t *testing.T) { + privateMaddr, err := ma.NewMultiaddr("/ip6/::ffff:172.18.0.95/udp/123/quic-v1") + assert.NoError(t, err) + newIP, err := netip.ParseAddr("2001:db8:85a3::8a2e:370:7334") + assert.NoError(t, err) + maddr, err := replaceMultiaddrIP(privateMaddr, newIP) + assert.NoError(t, err) + assert.Equal(t, maddr.String(), "/ip6/2001:db8:85a3::8a2e:370:7334/udp/123/quic-v1") + }) + + t.Run("IPv6-IPv6", func(t *testing.T) { + privateMaddr, err := ma.NewMultiaddr("/ip6/::ffff:172.18.0.95/tcp/123") + assert.NoError(t, err) + newIP, err := netip.ParseAddr("2001:db8:85a3::8a2e:370:7334") + assert.NoError(t, err) + maddr, err := replaceMultiaddrIP(privateMaddr, newIP) + assert.NoError(t, err) + assert.Equal(t, maddr.String(), "/ip6/2001:db8:85a3::8a2e:370:7334/tcp/123") + }) + + t.Run("DNS-IPv6", func(t *testing.T) { + privateMaddr, err := ma.NewMultiaddr("/dns/test.com/tcp/123") + assert.NoError(t, err) + newIP, err := netip.ParseAddr("2001:db8:85a3::8a2e:370:7334") + assert.NoError(t, err) + maddr, err := replaceMultiaddrIP(privateMaddr, newIP) + assert.Error(t, err) + assert.Nil(t, maddr) + }) + +}