kubo/core/commands/p2p.go
Marcin Rataj 25ebab9dae
feat(p2p): add --foreground flag to listen and forward commands (#11099)
* feat(p2p): add --foreground flag to listen and forward commands

adds `-f/--foreground` option that keeps the command running until
interrupted (SIGTERM/Ctrl+C) or closed via `ipfs p2p close`. the
listener/forwarder is automatically removed when the command exits.

useful for systemd services and scripts that need cleanup on exit.

* docs: add p2p-tunnels.md with systemd examples

- add dedicated docs/p2p-tunnels.md covering:
  - why p2p tunnels (NAT traversal, no public IP needed)
  - quick start with netcat
  - background and foreground modes
  - systemd integration with path-based activation
  - security considerations and troubleshooting
- document Experimental.Libp2pStreamMounting in docs/config.md
- simplify docs/experimental-features.md, link to new doc
- add "Learn more" links to ipfs p2p listen/forward --help
- update changelog entry with doc link
- add cross-reference in misc/README.md

* chore: reference kubo#5460 for p2p config

Ref. https://github.com/ipfs/kubo/issues/5460

* fix(daemon): write api/gateway files only after HTTP server is ready

fixes race condition where $IPFS_PATH/api and $IPFS_PATH/gateway files
were written before the HTTP servers were ready to accept connections.
this caused issues for tools like systemd path units that immediately
try to connect when these files appear.

changes:
- add corehttp.ServeWithReady() that signals when server is ready
- wait for ready signal before writing address files
- use sync.WaitGroup.Go() (Go 1.25) for cleaner goroutine management
- add TestAddressFileReady to verify both api and gateway files

* fix(daemon): buffer errc channel and wait for all listeners

- buffer error channel with len(listeners) to prevent deadlock when
  multiple servers write errors simultaneously
- wait for ALL listeners to be ready before writing api/gateway file,
  not just the first one

Feedback-from: https://github.com/ipfs/kubo/pull/11099#pullrequestreview-3593885839

* docs(changelog): improve p2p tunnel section clarity

reframe to lead with user benefit and add example output

* docs(p2p): remove obsolete race condition caveat

the "First launch fails but restarts work" troubleshooting section
described a race where the api file was written before the daemon was
ready. this was fixed in 80b703a which ensures api/gateway files are
only written after HTTP servers are ready to accept connections.

---------

Co-authored-by: Andrew Gillis <11790789+gammazero@users.noreply.github.com>
2026-01-09 19:22:43 +01:00

711 lines
19 KiB
Go

package commands
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"strings"
"text/tabwriter"
"time"
core "github.com/ipfs/kubo/core"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
p2p "github.com/ipfs/kubo/p2p"
cmds "github.com/ipfs/go-ipfs-cmds"
peer "github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
protocol "github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
)
// P2PProtoPrefix is the default required prefix for protocol names
const P2PProtoPrefix = "/x/"
// P2PListenerInfoOutput is output type of ls command
type P2PListenerInfoOutput struct {
Protocol string
ListenAddress string
TargetAddress string
}
// P2PStreamInfoOutput is output type of streams command
type P2PStreamInfoOutput struct {
HandlerID string
Protocol string
OriginAddress string
TargetAddress string
}
// P2PLsOutput is output type of ls command
type P2PLsOutput struct {
Listeners []P2PListenerInfoOutput
}
// P2PStreamsOutput is output type of streams command
type P2PStreamsOutput struct {
Streams []P2PStreamInfoOutput
}
// P2PForegroundOutput is output type for foreground mode status messages
type P2PForegroundOutput struct {
Status string // "active" or "closing"
Protocol string
Address string
}
const (
allowCustomProtocolOptionName = "allow-custom-protocol"
reportPeerIDOptionName = "report-peer-id"
foregroundOptionName = "foreground"
)
var resolveTimeout = 10 * time.Second
// P2PCmd is the 'ipfs p2p' command
var P2PCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Libp2p stream mounting.",
ShortDescription: `
Create and use tunnels to remote peers over libp2p
Note: this command is experimental and subject to change as usecases and APIs
are refined`,
},
Subcommands: map[string]*cmds.Command{
"stream": p2pStreamCmd,
"forward": p2pForwardCmd,
"listen": p2pListenCmd,
"close": p2pCloseCmd,
"ls": p2pLsCmd,
},
}
var p2pForwardCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Forward connections to libp2p service.",
ShortDescription: `
Forward connections made to <listen-address> to <target-address> via libp2p.
Creates a local TCP listener that tunnels connections through libp2p to a
remote peer's p2p listener. Similar to SSH port forwarding (-L flag).
ARGUMENTS:
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
<listen-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
<target-address> Remote peer multiaddr (e.g., /p2p/PeerID)
FOREGROUND MODE (--foreground, -f):
By default, the forwarder runs in the daemon and the command returns
immediately. Use --foreground to block until interrupted:
- Ctrl+C or SIGTERM: Removes the forwarder and exits
- 'ipfs p2p close': Removes the forwarder and exits
- Daemon shutdown: Forwarder is automatically removed
Useful for systemd services or scripts that need cleanup on exit.
EXAMPLES:
# Persistent forwarder (command returns immediately)
ipfs p2p forward /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
# Temporary forwarder (removed when command exits)
ipfs p2p forward -f /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("protocol", true, false, "Protocol name."),
cmds.StringArg("listen-address", true, false, "Listening endpoint."),
cmds.StringArg("target-address", true, false, "Target endpoint."),
},
Options: []cmds.Option{
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; forwarder is removed when command exits"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
protoOpt := req.Arguments[0]
listenOpt := req.Arguments[1]
targetOpt := req.Arguments[2]
proto := protocol.ID(protoOpt)
listen, err := ma.NewMultiaddr(listenOpt)
if err != nil {
return err
}
targets, err := parseIpfsAddr(targetOpt)
if err != nil {
return err
}
allowCustom, _ := req.Options[allowCustomProtocolOptionName].(bool)
if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) {
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}
listener, err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
if err != nil {
return err
}
foreground, _ := req.Options[foregroundOptionName].(bool)
if foreground {
if err := res.Emit(&P2PForegroundOutput{
Status: "active",
Protocol: protoOpt,
Address: listenOpt,
}); err != nil {
return err
}
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
// or listener removal (ipfs p2p close)
select {
case <-req.Context.Done():
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
n.P2P.ListenersLocal.Close(func(l p2p.Listener) bool {
return l == listener
})
return nil
case <-listener.Done():
// Closed via "ipfs p2p close" - emit closing message
return res.Emit(&P2PForegroundOutput{
Status: "closing",
Protocol: protoOpt,
Address: listenOpt,
})
}
}
return nil
},
Type: P2PForegroundOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
if out.Status == "active" {
fmt.Fprintf(w, "Forwarding %s to %s, waiting for interrupt...\n", out.Protocol, out.Address)
} else if out.Status == "closing" {
fmt.Fprintf(w, "Received interrupt, removing forwarder for %s\n", out.Protocol)
}
return nil
}),
},
}
// parseIpfsAddr is a function that takes in addr string and return ipfsAddrs
func parseIpfsAddr(addr string) (*peer.AddrInfo, error) {
multiaddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
pi, err := peer.AddrInfoFromP2pAddr(multiaddr)
if err == nil {
return pi, nil
}
// resolve multiaddr whose protocol is not ma.P_IPFS
ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)
defer cancel()
addrs, err := madns.Resolve(ctx, multiaddr)
if err != nil {
return nil, err
}
if len(addrs) == 0 {
return nil, errors.New("fail to resolve the multiaddr:" + multiaddr.String())
}
var info peer.AddrInfo
for _, addr := range addrs {
taddr, id := peer.SplitAddr(addr)
if id == "" {
// not an ipfs addr, skipping.
continue
}
switch info.ID {
case "":
info.ID = id
case id:
default:
return nil, fmt.Errorf(
"ambiguous multiaddr %s could refer to %s or %s",
multiaddr,
info.ID,
id,
)
}
info.Addrs = append(info.Addrs, taddr)
}
return &info, nil
}
var p2pListenCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Create libp2p service.",
ShortDescription: `
Create a libp2p protocol handler that forwards incoming connections to
<target-address>.
When a remote peer connects using 'ipfs p2p forward', the connection is
forwarded to your local service. Similar to SSH port forwarding (server side).
ARGUMENTS:
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
<target-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
FOREGROUND MODE (--foreground, -f):
By default, the listener runs in the daemon and the command returns
immediately. Use --foreground to block until interrupted:
- Ctrl+C or SIGTERM: Removes the listener and exits
- 'ipfs p2p close': Removes the listener and exits
- Daemon shutdown: Listener is automatically removed
Useful for systemd services or scripts that need cleanup on exit.
EXAMPLES:
# Persistent listener (command returns immediately)
ipfs p2p listen /x/myapp /ip4/127.0.0.1/tcp/3000
# Temporary listener (removed when command exits)
ipfs p2p listen -f /x/myapp /ip4/127.0.0.1/tcp/3000
# Report connecting peer ID to the target application
ipfs p2p listen -r /x/myapp /ip4/127.0.0.1/tcp/3000
Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("protocol", true, false, "Protocol name."),
cmds.StringArg("target-address", true, false, "Target endpoint."),
},
Options: []cmds.Option{
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"),
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; listener is removed when command exits"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
protoOpt := req.Arguments[0]
targetOpt := req.Arguments[1]
proto := protocol.ID(protoOpt)
target, err := ma.NewMultiaddr(targetOpt)
if err != nil {
return err
}
// port can't be 0
if err := checkPort(target); err != nil {
return err
}
allowCustom, _ := req.Options[allowCustomProtocolOptionName].(bool)
reportPeerID, _ := req.Options[reportPeerIDOptionName].(bool)
if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) {
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}
listener, err := n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
if err != nil {
return err
}
foreground, _ := req.Options[foregroundOptionName].(bool)
if foreground {
if err := res.Emit(&P2PForegroundOutput{
Status: "active",
Protocol: protoOpt,
Address: targetOpt,
}); err != nil {
return err
}
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
// or listener removal (ipfs p2p close)
select {
case <-req.Context.Done():
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
n.P2P.ListenersP2P.Close(func(l p2p.Listener) bool {
return l == listener
})
return nil
case <-listener.Done():
// Closed via "ipfs p2p close" - emit closing message
return res.Emit(&P2PForegroundOutput{
Status: "closing",
Protocol: protoOpt,
Address: targetOpt,
})
}
}
return nil
},
Type: P2PForegroundOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
if out.Status == "active" {
fmt.Fprintf(w, "Listening on %s, forwarding to %s, waiting for interrupt...\n", out.Protocol, out.Address)
} else if out.Status == "closing" {
fmt.Fprintf(w, "Received interrupt, removing listener for %s\n", out.Protocol)
}
return nil
}),
},
}
// checkPort checks whether target multiaddr contains tcp or udp protocol
// and whether the port is equal to 0
func checkPort(target ma.Multiaddr) error {
// get tcp or udp port from multiaddr
getPort := func() (string, error) {
sport, _ := target.ValueForProtocol(ma.P_TCP)
if sport != "" {
return sport, nil
}
sport, _ = target.ValueForProtocol(ma.P_UDP)
if sport != "" {
return sport, nil
}
return "", errors.New("address does not contain tcp or udp protocol")
}
sport, err := getPort()
if err != nil {
return err
}
port, err := strconv.Atoi(sport)
if err != nil {
return err
}
if port == 0 {
return errors.New("port can not be 0")
}
return nil
}
// forwardLocal forwards local connections to a libp2p service
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) (p2p.Listener, error) {
ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL)
return p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
}
const (
p2pHeadersOptionName = "headers"
)
var p2pLsCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "List active p2p listeners.",
},
Options: []cmds.Option{
cmds.BoolOption(p2pHeadersOptionName, "v", "Print table headers (Protocol, Listen, Target)."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
output := &P2PLsOutput{}
n.P2P.ListenersLocal.Lock()
for _, listener := range n.P2P.ListenersLocal.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: string(listener.Protocol()),
ListenAddress: listener.ListenAddress().String(),
TargetAddress: listener.TargetAddress().String(),
})
}
n.P2P.ListenersLocal.Unlock()
n.P2P.ListenersP2P.Lock()
for _, listener := range n.P2P.ListenersP2P.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: string(listener.Protocol()),
ListenAddress: listener.ListenAddress().String(),
TargetAddress: listener.TargetAddress().String(),
})
}
n.P2P.ListenersP2P.Unlock()
return cmds.EmitOnce(res, output)
},
Type: P2PLsOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PLsOutput) error {
headers, _ := req.Options[p2pHeadersOptionName].(bool)
tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
for _, listener := range out.Listeners {
if headers {
fmt.Fprintln(tw, "Protocol\tListen Address\tTarget Address")
}
fmt.Fprintf(tw, "%s\t%s\t%s\n", listener.Protocol, listener.ListenAddress, listener.TargetAddress)
}
tw.Flush()
return nil
}),
},
}
const (
p2pAllOptionName = "all"
p2pProtocolOptionName = "protocol"
p2pListenAddressOptionName = "listen-address"
p2pTargetAddressOptionName = "target-address"
)
var p2pCloseCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Stop listening for new connections to forward.",
},
Options: []cmds.Option{
cmds.BoolOption(p2pAllOptionName, "a", "Close all listeners."),
cmds.StringOption(p2pProtocolOptionName, "p", "Match protocol name"),
cmds.StringOption(p2pListenAddressOptionName, "l", "Match listen address"),
cmds.StringOption(p2pTargetAddressOptionName, "t", "Match target address"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
closeAll, _ := req.Options[p2pAllOptionName].(bool)
protoOpt, p := req.Options[p2pProtocolOptionName].(string)
listenOpt, l := req.Options[p2pListenAddressOptionName].(string)
targetOpt, t := req.Options[p2pTargetAddressOptionName].(string)
proto := protocol.ID(protoOpt)
var target, listen ma.Multiaddr
if l {
listen, err = ma.NewMultiaddr(listenOpt)
if err != nil {
return err
}
}
if t {
target, err = ma.NewMultiaddr(targetOpt)
if err != nil {
return err
}
}
if !(closeAll || p || l || t) {
return errors.New("no matching options given")
}
if closeAll && (p || l || t) {
return errors.New("can't combine --all with other matching options")
}
match := func(listener p2p.Listener) bool {
if closeAll {
return true
}
if p && proto != listener.Protocol() {
return false
}
if l && !listen.Equal(listener.ListenAddress()) {
return false
}
if t && !target.Equal(listener.TargetAddress()) {
return false
}
return true
}
done := n.P2P.ListenersLocal.Close(match)
done += n.P2P.ListenersP2P.Close(match)
return cmds.EmitOnce(res, done)
},
Type: int(0),
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out int) error {
fmt.Fprintf(w, "Closed %d stream(s)\n", out)
return nil
}),
},
}
///////
// Stream
//
// p2pStreamCmd is the 'ipfs p2p stream' command
var p2pStreamCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "P2P stream management.",
ShortDescription: "Create and manage p2p streams",
},
Subcommands: map[string]*cmds.Command{
"ls": p2pStreamLsCmd,
"close": p2pStreamCloseCmd,
},
}
var p2pStreamLsCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "List active p2p streams.",
},
Options: []cmds.Option{
cmds.BoolOption(p2pHeadersOptionName, "v", "Print table headers (ID, Protocol, Local, Remote)."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
output := &P2PStreamsOutput{}
n.P2P.Streams.Lock()
for id, s := range n.P2P.Streams.Streams {
output.Streams = append(output.Streams, P2PStreamInfoOutput{
HandlerID: strconv.FormatUint(id, 10),
Protocol: string(s.Protocol),
OriginAddress: s.OriginAddr.String(),
TargetAddress: s.TargetAddr.String(),
})
}
n.P2P.Streams.Unlock()
return cmds.EmitOnce(res, output)
},
Type: P2PStreamsOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PStreamsOutput) error {
headers, _ := req.Options[p2pHeadersOptionName].(bool)
tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
for _, stream := range out.Streams {
if headers {
fmt.Fprintln(tw, "ID\tProtocol\tOrigin\tTarget")
}
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.OriginAddress, stream.TargetAddress)
}
tw.Flush()
return nil
}),
},
}
var p2pStreamCloseCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Close active p2p stream.",
},
Arguments: []cmds.Argument{
cmds.StringArg("id", false, false, "Stream identifier"),
},
Options: []cmds.Option{
cmds.BoolOption(p2pAllOptionName, "a", "Close all streams."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
closeAll, _ := req.Options[p2pAllOptionName].(bool)
var handlerID uint64
if !closeAll {
if len(req.Arguments) == 0 {
return errors.New("no id specified")
}
handlerID, err = strconv.ParseUint(req.Arguments[0], 10, 64)
if err != nil {
return err
}
}
toClose := make([]*p2p.Stream, 0, 1)
n.P2P.Streams.Lock()
for id, stream := range n.P2P.Streams.Streams {
if !closeAll && handlerID != id {
continue
}
toClose = append(toClose, stream)
if !closeAll {
break
}
}
n.P2P.Streams.Unlock()
for _, s := range toClose {
n.P2P.Streams.Reset(s)
}
return nil
},
}
func p2pGetNode(env cmds.Environment) (*core.IpfsNode, error) {
nd, err := cmdenv.GetNode(env)
if err != nil {
return nil, err
}
config, err := nd.Repo.Config()
if err != nil {
return nil, err
}
if !config.Experimental.Libp2pStreamMounting {
return nil, errors.New("libp2p stream mounting not enabled")
}
if !nd.IsOnline {
return nil, ErrNotOnline
}
return nd, nil
}