mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
feat(p2p): add --foreground flag to listen and forward commands (#11099)
* feat(p2p): add --foreground flag to listen and forward commands
adds `-f/--foreground` option that keeps the command running until
interrupted (SIGTERM/Ctrl+C) or closed via `ipfs p2p close`. the
listener/forwarder is automatically removed when the command exits.
useful for systemd services and scripts that need cleanup on exit.
* docs: add p2p-tunnels.md with systemd examples
- add dedicated docs/p2p-tunnels.md covering:
- why p2p tunnels (NAT traversal, no public IP needed)
- quick start with netcat
- background and foreground modes
- systemd integration with path-based activation
- security considerations and troubleshooting
- document Experimental.Libp2pStreamMounting in docs/config.md
- simplify docs/experimental-features.md, link to new doc
- add "Learn more" links to ipfs p2p listen/forward --help
- update changelog entry with doc link
- add cross-reference in misc/README.md
* chore: reference kubo#5460 for p2p config
Ref. https://github.com/ipfs/kubo/issues/5460
* fix(daemon): write api/gateway files only after HTTP server is ready
fixes race condition where $IPFS_PATH/api and $IPFS_PATH/gateway files
were written before the HTTP servers were ready to accept connections.
this caused issues for tools like systemd path units that immediately
try to connect when these files appear.
changes:
- add corehttp.ServeWithReady() that signals when server is ready
- wait for ready signal before writing address files
- use sync.WaitGroup.Go() (Go 1.25) for cleaner goroutine management
- add TestAddressFileReady to verify both api and gateway files
* fix(daemon): buffer errc channel and wait for all listeners
- buffer error channel with len(listeners) to prevent deadlock when
multiple servers write errors simultaneously
- wait for ALL listeners to be ready before writing api/gateway file,
not just the first one
Feedback-from: https://github.com/ipfs/kubo/pull/11099#pullrequestreview-3593885839
* docs(changelog): improve p2p tunnel section clarity
reframe to lead with user benefit and add example output
* docs(p2p): remove obsolete race condition caveat
the "First launch fails but restarts work" troubleshooting section
described a race where the api file was written before the daemon was
ready. this was fixed in 80b703a which ensures api/gateway files are
only written after HTTP servers are ready to accept connections.
---------
Co-authored-by: Andrew Gillis <11790789+gammazero@users.noreply.github.com>
This commit is contained in:
parent
5288946fd1
commit
25ebab9dae
@ -883,21 +883,36 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error
|
||||
return nil, fmt.Errorf("serveHTTPApi: ConstructNode() failed: %s", err)
|
||||
}
|
||||
|
||||
// Buffer channel to prevent deadlock when multiple servers write errors simultaneously
|
||||
errc := make(chan error, len(listeners))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start all servers and wait for them to be ready before writing api file.
|
||||
// This prevents race conditions where external tools (like systemd path units)
|
||||
// see the file and try to connect before servers can accept connections.
|
||||
if len(listeners) > 0 {
|
||||
// Only add an api file if the API is running.
|
||||
if err := node.Repo.SetAPIAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())); err != nil {
|
||||
return nil, fmt.Errorf("serveHTTPApi: SetAPIAddr() failed: %w", err)
|
||||
readyChannels := make([]chan struct{}, len(listeners))
|
||||
for i, lis := range listeners {
|
||||
readyChannels[i] = make(chan struct{})
|
||||
ready := readyChannels[i]
|
||||
wg.Go(func() {
|
||||
errc <- corehttp.ServeWithReady(node, manet.NetListener(lis), ready, opts...)
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for all listeners to be ready or any to fail
|
||||
for _, ready := range readyChannels {
|
||||
select {
|
||||
case <-ready:
|
||||
// This listener is ready
|
||||
case err := <-errc:
|
||||
return nil, fmt.Errorf("serveHTTPApi: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
errc := make(chan error)
|
||||
var wg sync.WaitGroup
|
||||
for _, apiLis := range listeners {
|
||||
wg.Add(1)
|
||||
go func(lis manet.Listener) {
|
||||
defer wg.Done()
|
||||
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
|
||||
}(apiLis)
|
||||
if err := node.Repo.SetAPIAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())); err != nil {
|
||||
return nil, fmt.Errorf("serveHTTPApi: SetAPIAddr() failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
@ -1058,26 +1073,42 @@ func serveHTTPGateway(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, e
|
||||
return nil, fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err)
|
||||
}
|
||||
|
||||
// Buffer channel to prevent deadlock when multiple servers write errors simultaneously
|
||||
errc := make(chan error, len(listeners))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start all servers and wait for them to be ready before writing gateway file.
|
||||
// This prevents race conditions where external tools (like systemd path units)
|
||||
// see the file and try to connect before servers can accept connections.
|
||||
if len(listeners) > 0 {
|
||||
readyChannels := make([]chan struct{}, len(listeners))
|
||||
for i, lis := range listeners {
|
||||
readyChannels[i] = make(chan struct{})
|
||||
ready := readyChannels[i]
|
||||
wg.Go(func() {
|
||||
errc <- corehttp.ServeWithReady(node, manet.NetListener(lis), ready, opts...)
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for all listeners to be ready or any to fail
|
||||
for _, ready := range readyChannels {
|
||||
select {
|
||||
case <-ready:
|
||||
// This listener is ready
|
||||
case err := <-errc:
|
||||
return nil, fmt.Errorf("serveHTTPGateway: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
addr, err := manet.ToNetAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr()))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("serveHTTPGateway: manet.ToIP() failed: %w", err)
|
||||
return nil, fmt.Errorf("serveHTTPGateway: manet.ToNetAddr() failed: %w", err)
|
||||
}
|
||||
if err := node.Repo.SetGatewayAddr(addr); err != nil {
|
||||
return nil, fmt.Errorf("serveHTTPGateway: SetGatewayAddr() failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
errc := make(chan error)
|
||||
var wg sync.WaitGroup
|
||||
for _, lis := range listeners {
|
||||
wg.Add(1)
|
||||
go func(lis manet.Listener) {
|
||||
defer wg.Done()
|
||||
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
|
||||
}(lis)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errc)
|
||||
|
||||
@ -50,9 +50,17 @@ type P2PStreamsOutput struct {
|
||||
Streams []P2PStreamInfoOutput
|
||||
}
|
||||
|
||||
// P2PForegroundOutput is output type for foreground mode status messages
|
||||
type P2PForegroundOutput struct {
|
||||
Status string // "active" or "closing"
|
||||
Protocol string
|
||||
Address string
|
||||
}
|
||||
|
||||
const (
|
||||
allowCustomProtocolOptionName = "allow-custom-protocol"
|
||||
reportPeerIDOptionName = "report-peer-id"
|
||||
foregroundOptionName = "foreground"
|
||||
)
|
||||
|
||||
var resolveTimeout = 10 * time.Second
|
||||
@ -83,15 +91,37 @@ var p2pForwardCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
Tagline: "Forward connections to libp2p service.",
|
||||
ShortDescription: `
|
||||
Forward connections made to <listen-address> to <target-address>.
|
||||
Forward connections made to <listen-address> to <target-address> via libp2p.
|
||||
|
||||
<protocol> specifies the libp2p protocol name to use for libp2p
|
||||
connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'.
|
||||
Creates a local TCP listener that tunnels connections through libp2p to a
|
||||
remote peer's p2p listener. Similar to SSH port forwarding (-L flag).
|
||||
|
||||
Example:
|
||||
ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /p2p/QmPeer
|
||||
- Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /p2p/QmPeer
|
||||
ARGUMENTS:
|
||||
|
||||
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
|
||||
<listen-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
|
||||
<target-address> Remote peer multiaddr (e.g., /p2p/PeerID)
|
||||
|
||||
FOREGROUND MODE (--foreground, -f):
|
||||
|
||||
By default, the forwarder runs in the daemon and the command returns
|
||||
immediately. Use --foreground to block until interrupted:
|
||||
|
||||
- Ctrl+C or SIGTERM: Removes the forwarder and exits
|
||||
- 'ipfs p2p close': Removes the forwarder and exits
|
||||
- Daemon shutdown: Forwarder is automatically removed
|
||||
|
||||
Useful for systemd services or scripts that need cleanup on exit.
|
||||
|
||||
EXAMPLES:
|
||||
|
||||
# Persistent forwarder (command returns immediately)
|
||||
ipfs p2p forward /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
|
||||
|
||||
# Temporary forwarder (removed when command exits)
|
||||
ipfs p2p forward -f /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
|
||||
|
||||
Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md
|
||||
`,
|
||||
},
|
||||
Arguments: []cmds.Argument{
|
||||
@ -101,6 +131,7 @@ Example:
|
||||
},
|
||||
Options: []cmds.Option{
|
||||
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
|
||||
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; forwarder is removed when command exits"),
|
||||
},
|
||||
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
n, err := p2pGetNode(env)
|
||||
@ -130,7 +161,51 @@ Example:
|
||||
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
|
||||
}
|
||||
|
||||
return forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
|
||||
listener, err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
foreground, _ := req.Options[foregroundOptionName].(bool)
|
||||
if foreground {
|
||||
if err := res.Emit(&P2PForegroundOutput{
|
||||
Status: "active",
|
||||
Protocol: protoOpt,
|
||||
Address: listenOpt,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
|
||||
// or listener removal (ipfs p2p close)
|
||||
select {
|
||||
case <-req.Context.Done():
|
||||
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
|
||||
n.P2P.ListenersLocal.Close(func(l p2p.Listener) bool {
|
||||
return l == listener
|
||||
})
|
||||
return nil
|
||||
case <-listener.Done():
|
||||
// Closed via "ipfs p2p close" - emit closing message
|
||||
return res.Emit(&P2PForegroundOutput{
|
||||
Status: "closing",
|
||||
Protocol: protoOpt,
|
||||
Address: listenOpt,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Type: P2PForegroundOutput{},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
|
||||
if out.Status == "active" {
|
||||
fmt.Fprintf(w, "Forwarding %s to %s, waiting for interrupt...\n", out.Protocol, out.Address)
|
||||
} else if out.Status == "closing" {
|
||||
fmt.Fprintf(w, "Received interrupt, removing forwarder for %s\n", out.Protocol)
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
}
|
||||
|
||||
@ -185,14 +260,40 @@ var p2pListenCmd = &cmds.Command{
|
||||
Helptext: cmds.HelpText{
|
||||
Tagline: "Create libp2p service.",
|
||||
ShortDescription: `
|
||||
Create libp2p service and forward connections made to <target-address>.
|
||||
Create a libp2p protocol handler that forwards incoming connections to
|
||||
<target-address>.
|
||||
|
||||
<protocol> specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'.
|
||||
When a remote peer connects using 'ipfs p2p forward', the connection is
|
||||
forwarded to your local service. Similar to SSH port forwarding (server side).
|
||||
|
||||
Example:
|
||||
ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234
|
||||
- Forward connections to 'myproto' libp2p service to 127.0.0.1:1234
|
||||
ARGUMENTS:
|
||||
|
||||
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
|
||||
<target-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
|
||||
|
||||
FOREGROUND MODE (--foreground, -f):
|
||||
|
||||
By default, the listener runs in the daemon and the command returns
|
||||
immediately. Use --foreground to block until interrupted:
|
||||
|
||||
- Ctrl+C or SIGTERM: Removes the listener and exits
|
||||
- 'ipfs p2p close': Removes the listener and exits
|
||||
- Daemon shutdown: Listener is automatically removed
|
||||
|
||||
Useful for systemd services or scripts that need cleanup on exit.
|
||||
|
||||
EXAMPLES:
|
||||
|
||||
# Persistent listener (command returns immediately)
|
||||
ipfs p2p listen /x/myapp /ip4/127.0.0.1/tcp/3000
|
||||
|
||||
# Temporary listener (removed when command exits)
|
||||
ipfs p2p listen -f /x/myapp /ip4/127.0.0.1/tcp/3000
|
||||
|
||||
# Report connecting peer ID to the target application
|
||||
ipfs p2p listen -r /x/myapp /ip4/127.0.0.1/tcp/3000
|
||||
|
||||
Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md
|
||||
`,
|
||||
},
|
||||
Arguments: []cmds.Argument{
|
||||
@ -202,6 +303,7 @@ Example:
|
||||
Options: []cmds.Option{
|
||||
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
|
||||
cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"),
|
||||
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; listener is removed when command exits"),
|
||||
},
|
||||
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
n, err := p2pGetNode(env)
|
||||
@ -231,8 +333,51 @@ Example:
|
||||
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
|
||||
}
|
||||
|
||||
_, err = n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
|
||||
listener, err := n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
foreground, _ := req.Options[foregroundOptionName].(bool)
|
||||
if foreground {
|
||||
if err := res.Emit(&P2PForegroundOutput{
|
||||
Status: "active",
|
||||
Protocol: protoOpt,
|
||||
Address: targetOpt,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
|
||||
// or listener removal (ipfs p2p close)
|
||||
select {
|
||||
case <-req.Context.Done():
|
||||
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
|
||||
n.P2P.ListenersP2P.Close(func(l p2p.Listener) bool {
|
||||
return l == listener
|
||||
})
|
||||
return nil
|
||||
case <-listener.Done():
|
||||
// Closed via "ipfs p2p close" - emit closing message
|
||||
return res.Emit(&P2PForegroundOutput{
|
||||
Status: "closing",
|
||||
Protocol: protoOpt,
|
||||
Address: targetOpt,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
Type: P2PForegroundOutput{},
|
||||
Encoders: cmds.EncoderMap{
|
||||
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
|
||||
if out.Status == "active" {
|
||||
fmt.Fprintf(w, "Listening on %s, forwarding to %s, waiting for interrupt...\n", out.Protocol, out.Address)
|
||||
} else if out.Status == "closing" {
|
||||
fmt.Fprintf(w, "Received interrupt, removing listener for %s\n", out.Protocol)
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
}
|
||||
|
||||
@ -271,11 +416,9 @@ func checkPort(target ma.Multiaddr) error {
|
||||
}
|
||||
|
||||
// forwardLocal forwards local connections to a libp2p service
|
||||
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) error {
|
||||
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) (p2p.Listener, error) {
|
||||
ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL)
|
||||
// TODO: return some info
|
||||
_, err := p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
|
||||
return err
|
||||
return p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
|
||||
}
|
||||
|
||||
const (
|
||||
|
||||
@ -78,9 +78,23 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv
|
||||
return Serve(n, manet.NetListener(list), options...)
|
||||
}
|
||||
|
||||
// Serve accepts incoming HTTP connections on the listener and pass them
|
||||
// Serve accepts incoming HTTP connections on the listener and passes them
|
||||
// to ServeOption handlers.
|
||||
func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error {
|
||||
return ServeWithReady(node, lis, nil, options...)
|
||||
}
|
||||
|
||||
// ServeWithReady is like Serve but signals on the ready channel when the
|
||||
// server is about to accept connections. The channel is closed right before
|
||||
// server.Serve() is called.
|
||||
//
|
||||
// This is useful for callers that need to perform actions (like writing
|
||||
// address files) only after the server is guaranteed to be accepting
|
||||
// connections, avoiding race conditions where clients see the file before
|
||||
// the server is ready.
|
||||
//
|
||||
// Passing nil for ready is equivalent to calling Serve().
|
||||
func ServeWithReady(node *core.IpfsNode, lis net.Listener, ready chan<- struct{}, options ...ServeOption) error {
|
||||
// make sure we close this no matter what.
|
||||
defer lis.Close()
|
||||
|
||||
@ -107,6 +121,9 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
|
||||
var serverError error
|
||||
serverClosed := make(chan struct{})
|
||||
go func() {
|
||||
if ready != nil {
|
||||
close(ready)
|
||||
}
|
||||
serverError = server.Serve(lis)
|
||||
close(serverClosed)
|
||||
}()
|
||||
|
||||
@ -12,6 +12,7 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team.
|
||||
- [🔦 Highlights](#-highlights)
|
||||
- [Routing V1 HTTP API now exposed by default](#routing-v1-http-api-now-exposed-by-default)
|
||||
- [Track total size when adding pins](#track-total-size-when-adding-pins)
|
||||
- [🚇 Improved `ipfs p2p` tunnels with foreground mode](#-improved-ipfs-p2p-tunnels-with-foreground-mode)
|
||||
- [Improved `ipfs dag stat` output](#improved-ipfs-dag-stat-output)
|
||||
- [Skip bad keys when listing](#skip_bad_keys_when_listing)
|
||||
- [📦️ Dependency updates](#-dependency-updates)
|
||||
@ -35,6 +36,23 @@ Example output:
|
||||
Fetched/Processed 336 nodes (83 MB)
|
||||
```
|
||||
|
||||
#### 🚇 Improved `ipfs p2p` tunnels with foreground mode
|
||||
|
||||
P2P tunnels can now run like SSH port forwarding: start a tunnel, use it, and it cleans up automatically when you're done.
|
||||
|
||||
The new `--foreground` (`-f`) flag for `ipfs p2p listen` and `ipfs p2p forward` keeps the command running until interrupted. When you Ctrl+C, send SIGTERM, or stop the service, the tunnel is removed automatically:
|
||||
|
||||
```console
|
||||
$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 --foreground
|
||||
Listening on /x/ssh, forwarding to /ip4/127.0.0.1/tcp/22, waiting for interrupt...
|
||||
^C
|
||||
Received interrupt, removing listener for /x/ssh
|
||||
```
|
||||
|
||||
Without `--foreground`, commands return immediately and tunnels persist until explicitly closed (existing behavior).
|
||||
|
||||
See [docs/p2p-tunnels.md](https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md) for usage examples.
|
||||
|
||||
#### Improved `ipfs dag stat` output
|
||||
|
||||
The `ipfs dag stat` command has been improved for better terminal UX:
|
||||
|
||||
@ -59,6 +59,7 @@ config file at runtime.
|
||||
- [`Discovery.MDNS.Enabled`](#discoverymdnsenabled)
|
||||
- [`Discovery.MDNS.Interval`](#discoverymdnsinterval)
|
||||
- [`Experimental`](#experimental)
|
||||
- [`Experimental.Libp2pStreamMounting`](#experimentallibp2pstreammounting)
|
||||
- [`Gateway`](#gateway)
|
||||
- [`Gateway.NoFetch`](#gatewaynofetch)
|
||||
- [`Gateway.NoDNSLink`](#gatewaynodnslink)
|
||||
@ -1069,6 +1070,17 @@ in the [new mDNS implementation](https://github.com/libp2p/zeroconf#readme).
|
||||
|
||||
Toggle and configure experimental features of Kubo. Experimental features are listed [here](./experimental-features.md).
|
||||
|
||||
### `Experimental.Libp2pStreamMounting`
|
||||
|
||||
Enables the `ipfs p2p` commands for tunneling TCP connections through libp2p
|
||||
streams, similar to SSH port forwarding.
|
||||
|
||||
See [docs/p2p-tunnels.md](p2p-tunnels.md) for usage examples.
|
||||
|
||||
Default: `false`
|
||||
|
||||
Type: `bool`
|
||||
|
||||
## `Gateway`
|
||||
|
||||
Options for the HTTP gateway.
|
||||
|
||||
@ -199,9 +199,8 @@ configured, the daemon will fail to start.
|
||||
|
||||
## ipfs p2p
|
||||
|
||||
Allows tunneling of TCP connections through Libp2p streams. If you've ever used
|
||||
port forwarding with SSH (the `-L` option in OpenSSH), this feature is quite
|
||||
similar.
|
||||
Allows tunneling of TCP connections through libp2p streams, similar to SSH port
|
||||
forwarding (`ssh -L`).
|
||||
|
||||
### State
|
||||
|
||||
@ -220,98 +219,20 @@ Experimental, will be stabilized in 0.6.0
|
||||
> If you enable this and plan to expose CLI or HTTP RPC to other users or machines,
|
||||
> secure RPC API using [`API.Authorizations`](https://github.com/ipfs/kubo/blob/master/docs/config.md#apiauthorizations) or custom auth middleware.
|
||||
|
||||
The `p2p` command needs to be enabled in the config:
|
||||
|
||||
```sh
|
||||
> ipfs config --json Experimental.Libp2pStreamMounting true
|
||||
```
|
||||
|
||||
### How to use
|
||||
|
||||
**Netcat example:**
|
||||
|
||||
First, pick a protocol name for your application. Think of the protocol name as
|
||||
a port number, just significantly more user-friendly. In this example, we're
|
||||
going to use `/x/kickass/1.0`.
|
||||
|
||||
***Setup:***
|
||||
|
||||
1. A "server" node with peer ID `$SERVER_ID`
|
||||
2. A "client" node.
|
||||
|
||||
***On the "server" node:***
|
||||
|
||||
First, start your application and have it listen for TCP connections on
|
||||
port `$APP_PORT`.
|
||||
|
||||
Then, configure the p2p listener by running:
|
||||
|
||||
```sh
|
||||
> ipfs p2p listen /x/kickass/1.0 /ip4/127.0.0.1/tcp/$APP_PORT
|
||||
```
|
||||
|
||||
This will configure IPFS to forward all incoming `/x/kickass/1.0` streams to
|
||||
`127.0.0.1:$APP_PORT` (opening a new connection to `127.0.0.1:$APP_PORT` per
|
||||
incoming stream.
|
||||
|
||||
***On the "client" node:***
|
||||
|
||||
First, configure the client p2p dialer, so that it forwards all inbound
|
||||
connections on `127.0.0.1:SOME_PORT` to the server node listening
|
||||
on `/x/kickass/1.0`.
|
||||
|
||||
```sh
|
||||
> ipfs p2p forward /x/kickass/1.0 /ip4/127.0.0.1/tcp/$SOME_PORT /p2p/$SERVER_ID
|
||||
```
|
||||
|
||||
Next, have your application open a connection to `127.0.0.1:$SOME_PORT`. This
|
||||
connection will be forwarded to the service running on `127.0.0.1:$APP_PORT` on
|
||||
the remote machine. You can test it with netcat:
|
||||
|
||||
***On "server" node:***
|
||||
```sh
|
||||
> nc -v -l -p $APP_PORT
|
||||
```
|
||||
|
||||
***On "client" node:***
|
||||
```sh
|
||||
> nc -v 127.0.0.1 $SOME_PORT
|
||||
```
|
||||
|
||||
You should now see that a connection has been established and be able to
|
||||
exchange messages between netcat instances.
|
||||
|
||||
(note that depending on your netcat version you may need to drop the `-v` flag)
|
||||
|
||||
**SSH example**
|
||||
|
||||
**Setup:**
|
||||
|
||||
1. A "server" node with peer ID `$SERVER_ID` and running ssh server on the
|
||||
default port.
|
||||
2. A "client" node.
|
||||
|
||||
_you can get `$SERVER_ID` by running `ipfs id -f "<id>\n"`_
|
||||
|
||||
***First, on the "server" node:***
|
||||
|
||||
```sh
|
||||
ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22
|
||||
```
|
||||
|
||||
***Then, on "client" node:***
|
||||
|
||||
```sh
|
||||
ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID
|
||||
```
|
||||
|
||||
You should now be able to connect to your ssh server through a libp2p connection
|
||||
with `ssh [user]@127.0.0.1 -p 2222`.
|
||||
|
||||
See [docs/p2p-tunnels.md](p2p-tunnels.md) for usage examples, foreground mode,
|
||||
and systemd integration.
|
||||
|
||||
### Road to being a real feature
|
||||
|
||||
- [ ] More documentation
|
||||
- [x] More documentation
|
||||
- [x] `ipfs p2p forward` mode
|
||||
- [ ] Ability to define tunnels via JSON config, similar to [`Peering.Peers`](https://github.com/ipfs/kubo/blob/master/docs/config.md#peeringpeers), see [kubo#5460](https://github.com/ipfs/kubo/issues/5460)
|
||||
|
||||
## p2p http proxy
|
||||
|
||||
|
||||
214
docs/p2p-tunnels.md
Normal file
214
docs/p2p-tunnels.md
Normal file
@ -0,0 +1,214 @@
|
||||
# P2P Tunnels
|
||||
|
||||
Kubo supports tunneling TCP connections through libp2p streams, similar to SSH
|
||||
port forwarding (`ssh -L`). This allows exposing local services to remote peers
|
||||
and forwarding remote services to local ports.
|
||||
|
||||
- [Why P2P Tunnels?](#why-p2p-tunnels)
|
||||
- [Quick Start](#quick-start)
|
||||
- [Background Mode](#background-mode)
|
||||
- [Foreground Mode](#foreground-mode)
|
||||
- [systemd Integration](#systemd-integration)
|
||||
- [Security Considerations](#security-considerations)
|
||||
- [Troubleshooting](#troubleshooting)
|
||||
|
||||
## Why P2P Tunnels?
|
||||
|
||||
Unlike traditional SSH tunnels, libp2p-based tunnels do not require:
|
||||
|
||||
- **No public IP or open ports**: The server does not need a static IP address
|
||||
or port forwarding configured on the router. Connectivity to peers behind NAT
|
||||
is facilitated by [Direct Connection Upgrade through Relay (DCUtR)](https://github.com/libp2p/specs/blob/master/relay/DCUtR.md),
|
||||
which enables NAT hole-punching.
|
||||
|
||||
- **No DNS or IP address management**: All you need is the server's PeerID and
|
||||
an agreed-upon protocol name (e.g., `/x/ssh`). Kubo handles peer discovery
|
||||
and routing via the [Amino DHT](https://specs.ipfs.tech/routing/kad-dht/).
|
||||
|
||||
- **Simplified firewall rules**: Since connections are established through
|
||||
libp2p's existing swarm connections, no additional firewall configuration is
|
||||
needed beyond what Kubo already requires.
|
||||
|
||||
This makes p2p tunnels useful for connecting to machines on home networks,
|
||||
behind corporate firewalls, or in environments where traditional port forwarding
|
||||
is not available.
|
||||
|
||||
## Quick Start
|
||||
|
||||
Enable the experimental feature:
|
||||
|
||||
```console
|
||||
$ ipfs config --json Experimental.Libp2pStreamMounting true
|
||||
```
|
||||
|
||||
Test with netcat (`nc`) - no services required:
|
||||
|
||||
**On the server:**
|
||||
|
||||
```console
|
||||
$ ipfs p2p listen /x/test /ip4/127.0.0.1/tcp/9999
|
||||
$ nc -l -p 9999
|
||||
```
|
||||
|
||||
**On the client:**
|
||||
|
||||
Replace `$SERVER_ID` with the server's peer ID (get it with `ipfs id -f "<id>\n"`
|
||||
on the server).
|
||||
|
||||
```console
|
||||
$ ipfs p2p forward /x/test /ip4/127.0.0.1/tcp/9998 /p2p/$SERVER_ID
|
||||
$ nc 127.0.0.1 9998
|
||||
```
|
||||
|
||||
Type in either terminal and the text appears in the other. Use Ctrl+C to exit.
|
||||
|
||||
## Background Mode
|
||||
|
||||
By default, `ipfs p2p listen` and `ipfs p2p forward` register the tunnel with
|
||||
the daemon and return immediately. The tunnel persists until explicitly closed
|
||||
with `ipfs p2p close` or the daemon shuts down.
|
||||
|
||||
This example exposes a local SSH server (listening on `localhost:22`) to a
|
||||
remote peer. The same pattern works for any TCP service.
|
||||
|
||||
**On the server** (the machine running SSH):
|
||||
|
||||
Register a p2p listener that forwards incoming connections to the local SSH
|
||||
server. The protocol name `/x/ssh` is an arbitrary identifier that both peers
|
||||
must agree on (the `/x/` prefix is required for custom protocols).
|
||||
|
||||
```console
|
||||
$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22
|
||||
```
|
||||
|
||||
**On the client:**
|
||||
|
||||
Create a local port (`2222`) that tunnels through libp2p to the server's SSH
|
||||
service.
|
||||
|
||||
```console
|
||||
$ ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID
|
||||
```
|
||||
|
||||
Now connect to SSH through the tunnel:
|
||||
|
||||
```console
|
||||
$ ssh user@127.0.0.1 -p 2222
|
||||
```
|
||||
|
||||
**Other services:** To tunnel a different service, change the port and protocol
|
||||
name. For example, to expose a web server on port 8080, use `/x/mywebapp` and
|
||||
`/ip4/127.0.0.1/tcp/8080`.
|
||||
|
||||
## Foreground Mode
|
||||
|
||||
Use `--foreground` (`-f`) to block until interrupted. The tunnel is
|
||||
automatically removed when the command exits:
|
||||
|
||||
```console
|
||||
$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 --foreground
|
||||
Listening on /x/ssh, forwarding to /ip4/127.0.0.1/tcp/22, waiting for interrupt...
|
||||
^C
|
||||
Received interrupt, removing listener for /x/ssh
|
||||
```
|
||||
|
||||
The listener/forwarder is automatically removed when:
|
||||
|
||||
- The command receives Ctrl+C or SIGTERM
|
||||
- `ipfs p2p close` is called
|
||||
- The daemon shuts down
|
||||
|
||||
This mode is useful for systemd services and scripts that need cleanup on exit.
|
||||
|
||||
### systemd Integration
|
||||
|
||||
The `--foreground` flag enables clean integration with systemd. The examples
|
||||
below show how to run `ipfs p2p listen` as a user service that starts
|
||||
automatically when the IPFS daemon is ready.
|
||||
|
||||
Ensure IPFS daemon runs as a systemd user service. See
|
||||
[misc/README.md](https://github.com/ipfs/kubo/blob/master/misc/README.md#systemd)
|
||||
for setup instructions and where to place unit files.
|
||||
|
||||
#### P2P listener with path-based activation
|
||||
|
||||
Use a `.path` unit to wait for the daemon's RPC API to be ready before starting
|
||||
the p2p listener.
|
||||
|
||||
**`ipfs-p2p-tunnel.path`**:
|
||||
|
||||
```systemd
|
||||
[Unit]
|
||||
Description=Monitor for IPFS daemon startup
|
||||
After=ipfs.service
|
||||
Requires=ipfs.service
|
||||
|
||||
[Path]
|
||||
PathExists=%h/.ipfs/api
|
||||
Unit=ipfs-p2p-tunnel.service
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
```
|
||||
|
||||
The `%h` specifier expands to the user's home directory. If you use a custom
|
||||
`IPFS_PATH`, adjust accordingly.
|
||||
|
||||
**`ipfs-p2p-tunnel.service`**:
|
||||
|
||||
```systemd
|
||||
[Unit]
|
||||
Description=IPFS p2p tunnel
|
||||
Requires=ipfs.service
|
||||
|
||||
[Service]
|
||||
ExecStart=ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 -f
|
||||
Restart=on-failure
|
||||
RestartSec=10
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
```
|
||||
|
||||
#### Enabling the services
|
||||
|
||||
```console
|
||||
$ systemctl --user enable ipfs.service
|
||||
$ systemctl --user enable ipfs-p2p-tunnel.path
|
||||
$ systemctl --user start ipfs.service
|
||||
```
|
||||
|
||||
The path unit monitors `~/.ipfs/api` and starts `ipfs-p2p-tunnel.service`
|
||||
once the file exists.
|
||||
|
||||
## Security Considerations
|
||||
|
||||
> [!WARNING]
|
||||
> This feature provides CLI and HTTP RPC users with the ability to set up port
|
||||
> forwarding for localhost and LAN ports. If you enable this and plan to expose
|
||||
> CLI or HTTP RPC to other users or machines, secure the RPC API using
|
||||
> [`API.Authorizations`](https://github.com/ipfs/kubo/blob/master/docs/config.md#apiauthorizations)
|
||||
> or custom auth middleware.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Foreground listener stops when terminal closes
|
||||
|
||||
When using `--foreground`, the listener stops if the terminal closes. For
|
||||
persistent foreground listeners, use a systemd service, `nohup`, `tmux`, or
|
||||
`screen`. Without `--foreground`, the listener persists in the daemon regardless
|
||||
of terminal state.
|
||||
|
||||
### Connection refused errors
|
||||
|
||||
Verify:
|
||||
|
||||
1. The experimental feature is enabled: `ipfs config Experimental.Libp2pStreamMounting`
|
||||
2. The listener is active: `ipfs p2p ls`
|
||||
3. Both peers can connect: `ipfs swarm connect /p2p/$PEER_ID`
|
||||
|
||||
### Persistent tunnel configuration
|
||||
|
||||
There is currently no way to define tunnels in the Kubo JSON config file. Use
|
||||
`--foreground` mode with a systemd service for persistent tunnels. Support for
|
||||
configuring tunnels via JSON config may be added in the future (see [kubo#5460](https://github.com/ipfs/kubo/issues/5460) - PRs welcome!).
|
||||
@ -39,6 +39,12 @@ To run this in your user session, save it as `~/.config/systemd/user/ipfs.servic
|
||||
```
|
||||
Read more about `--user` services here: [wiki.archlinux.org:Systemd ](https://wiki.archlinux.org/index.php/Systemd/User#Automatic_start-up_of_systemd_user_instances)
|
||||
|
||||
#### P2P tunnel services
|
||||
|
||||
For running `ipfs p2p listen` or `ipfs p2p forward` as systemd services,
|
||||
see [docs/p2p-tunnels.md](../docs/p2p-tunnels.md) for examples using the
|
||||
`--foreground` flag and path-based activation.
|
||||
|
||||
### initd
|
||||
|
||||
- Here is a full-featured sample service file: https://github.com/dylanPowers/ipfs-linux-service/blob/master/init.d/ipfs
|
||||
|
||||
@ -20,6 +20,10 @@ type Listener interface {
|
||||
|
||||
// close closes the listener. Does not affect child streams
|
||||
close()
|
||||
|
||||
// Done returns a channel that is closed when the listener is closed.
|
||||
// This allows callers to detect when a listener has been removed.
|
||||
Done() <-chan struct{}
|
||||
}
|
||||
|
||||
// Listeners manages a group of Listener implementations,
|
||||
@ -73,15 +77,13 @@ func (r *Listeners) Register(l Listener) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close removes and closes all listeners for which matchFunc returns true.
|
||||
// Returns the number of listeners closed.
|
||||
func (r *Listeners) Close(matchFunc func(listener Listener) bool) int {
|
||||
todo := make([]Listener, 0)
|
||||
var todo []Listener
|
||||
r.Lock()
|
||||
for _, l := range r.Listeners {
|
||||
if !matchFunc(l) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := r.Listeners[l.key()]; ok {
|
||||
if matchFunc(l) {
|
||||
delete(r.Listeners, l.key())
|
||||
todo = append(todo, l)
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ type localListener struct {
|
||||
peer peer.ID
|
||||
|
||||
listener manet.Listener
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// ForwardLocal creates new P2P stream to a remote listener.
|
||||
@ -32,6 +33,7 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I
|
||||
p2p: p2p,
|
||||
proto: proto,
|
||||
peer: peer,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
maListener, err := manet.Listen(bindAddr)
|
||||
@ -98,6 +100,11 @@ func (l *localListener) setupStream(local manet.Conn) {
|
||||
|
||||
func (l *localListener) close() {
|
||||
l.listener.Close()
|
||||
close(l.done)
|
||||
}
|
||||
|
||||
func (l *localListener) Done() <-chan struct{} {
|
||||
return l.done
|
||||
}
|
||||
|
||||
func (l *localListener) Protocol() protocol.ID {
|
||||
|
||||
@ -25,6 +25,8 @@ type remoteListener struct {
|
||||
// reportRemote if set to true makes the handler send '<base58 remote peerid>\n'
|
||||
// to target before any data is forwarded
|
||||
reportRemote bool
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// ForwardRemote creates new p2p listener.
|
||||
@ -36,6 +38,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu
|
||||
addr: addr,
|
||||
|
||||
reportRemote: reportRemote,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
if err := p2p.ListenersP2P.Register(listener); err != nil {
|
||||
@ -99,7 +102,13 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr {
|
||||
return l.addr
|
||||
}
|
||||
|
||||
func (l *remoteListener) close() {}
|
||||
func (l *remoteListener) close() {
|
||||
close(l.done)
|
||||
}
|
||||
|
||||
func (l *remoteListener) Done() <-chan struct{} {
|
||||
return l.done
|
||||
}
|
||||
|
||||
func (l *remoteListener) key() protocol.ID {
|
||||
return l.proto
|
||||
|
||||
104
test/cli/api_file_test.go
Normal file
104
test/cli/api_file_test.go
Normal file
@ -0,0 +1,104 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/kubo/test/cli/harness"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestAddressFileReady verifies that when address files ($IPFS_PATH/api and
|
||||
// $IPFS_PATH/gateway) are created, the corresponding HTTP servers are ready
|
||||
// to accept connections immediately. This prevents race conditions for tools
|
||||
// like systemd path units that start services when these files appear.
|
||||
func TestAddressFileReady(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("api file", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
h := harness.NewT(t)
|
||||
node := h.NewNode().Init()
|
||||
|
||||
// Start daemon in background (don't use StartDaemon which waits for API)
|
||||
res := node.Runner.MustRun(harness.RunRequest{
|
||||
Path: node.IPFSBin,
|
||||
Args: []string{"daemon"},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
node.Daemon = res
|
||||
defer node.StopDaemon()
|
||||
|
||||
// Poll for api file to appear
|
||||
apiFile := filepath.Join(node.Dir, "api")
|
||||
var fileExists bool
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := os.Stat(apiFile); err == nil {
|
||||
fileExists = true
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
require.True(t, fileExists, "api file should be created")
|
||||
|
||||
// Read the api file to get the address
|
||||
apiAddr, err := node.TryAPIAddr()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Extract IP and port from multiaddr
|
||||
ip, err := apiAddr.ValueForProtocol(4) // P_IP4
|
||||
require.NoError(t, err)
|
||||
port, err := apiAddr.ValueForProtocol(6) // P_TCP
|
||||
require.NoError(t, err)
|
||||
|
||||
// Immediately try to use the API - should work on first attempt
|
||||
url := "http://" + ip + ":" + port + "/api/v0/id"
|
||||
resp, err := http.Post(url, "", nil)
|
||||
require.NoError(t, err, "RPC API should be ready immediately when api file exists")
|
||||
defer resp.Body.Close()
|
||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
})
|
||||
|
||||
t.Run("gateway file", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
h := harness.NewT(t)
|
||||
node := h.NewNode().Init()
|
||||
|
||||
// Start daemon in background
|
||||
res := node.Runner.MustRun(harness.RunRequest{
|
||||
Path: node.IPFSBin,
|
||||
Args: []string{"daemon"},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
node.Daemon = res
|
||||
defer node.StopDaemon()
|
||||
|
||||
// Poll for gateway file to appear
|
||||
gatewayFile := filepath.Join(node.Dir, "gateway")
|
||||
var fileExists bool
|
||||
for i := 0; i < 100; i++ {
|
||||
if _, err := os.Stat(gatewayFile); err == nil {
|
||||
fileExists = true
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
require.True(t, fileExists, "gateway file should be created")
|
||||
|
||||
// Read the gateway file to get the URL (already includes http:// prefix)
|
||||
gatewayURL, err := os.ReadFile(gatewayFile)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Immediately try to use the Gateway - should work on first attempt
|
||||
url := strings.TrimSpace(string(gatewayURL)) + "/ipfs/bafkqaaa" // empty file CID
|
||||
resp, err := http.Get(url)
|
||||
require.NoError(t, err, "Gateway should be ready immediately when gateway file exists")
|
||||
defer resp.Body.Close()
|
||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
})
|
||||
}
|
||||
430
test/cli/p2p_test.go
Normal file
430
test/cli/p2p_test.go
Normal file
@ -0,0 +1,430 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os/exec"
|
||||
"slices"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/kubo/core/commands"
|
||||
"github.com/ipfs/kubo/test/cli/harness"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// waitForListenerCount waits until the node has exactly the expected number of listeners.
|
||||
func waitForListenerCount(t *testing.T, node *harness.Node, expectedCount int) {
|
||||
t.Helper()
|
||||
require.Eventually(t, func() bool {
|
||||
lsOut := node.IPFS("p2p", "ls", "--enc=json")
|
||||
var lsResult commands.P2PLsOutput
|
||||
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
|
||||
return false
|
||||
}
|
||||
return len(lsResult.Listeners) == expectedCount
|
||||
}, 5*time.Second, 100*time.Millisecond, "expected %d listeners", expectedCount)
|
||||
}
|
||||
|
||||
// waitForListenerProtocol waits until the node has a listener with the given protocol.
|
||||
func waitForListenerProtocol(t *testing.T, node *harness.Node, protocol string) {
|
||||
t.Helper()
|
||||
require.Eventually(t, func() bool {
|
||||
lsOut := node.IPFS("p2p", "ls", "--enc=json")
|
||||
var lsResult commands.P2PLsOutput
|
||||
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
|
||||
return false
|
||||
}
|
||||
return slices.ContainsFunc(lsResult.Listeners, func(l commands.P2PListenerInfoOutput) bool {
|
||||
return l.Protocol == protocol
|
||||
})
|
||||
}, 5*time.Second, 100*time.Millisecond, "expected listener with protocol %s", protocol)
|
||||
}
|
||||
|
||||
func TestP2PForeground(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("listen foreground creates listener and removes on interrupt", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init()
|
||||
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
node.StartDaemon()
|
||||
|
||||
listenPort := harness.NewRandPort()
|
||||
|
||||
// Start foreground listener asynchronously
|
||||
res := node.Runner.Run(harness.RunRequest{
|
||||
Path: node.IPFSBin,
|
||||
Args: []string{"p2p", "listen", "--foreground", "/x/fgtest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
// Wait for listener to be created
|
||||
waitForListenerProtocol(t, node, "/x/fgtest")
|
||||
|
||||
// Send SIGTERM
|
||||
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
|
||||
_ = res.Cmd.Wait()
|
||||
|
||||
// Wait for listener to be removed
|
||||
waitForListenerCount(t, node, 0)
|
||||
})
|
||||
|
||||
t.Run("listen foreground text output on SIGTERM", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init()
|
||||
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
node.StartDaemon()
|
||||
|
||||
listenPort := harness.NewRandPort()
|
||||
|
||||
// Run without --enc=json to test actual text output users see
|
||||
res := node.Runner.Run(harness.RunRequest{
|
||||
Path: node.IPFSBin,
|
||||
Args: []string{"p2p", "listen", "--foreground", "/x/sigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
waitForListenerProtocol(t, node, "/x/sigterm")
|
||||
|
||||
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
|
||||
_ = res.Cmd.Wait()
|
||||
|
||||
// Verify stdout shows "waiting for interrupt" message
|
||||
stdout := res.Stdout.String()
|
||||
require.Contains(t, stdout, "waiting for interrupt")
|
||||
|
||||
// Note: "Received interrupt, removing listener" message is NOT visible to CLI on SIGTERM
|
||||
// because the command runs in the daemon via RPC and the response stream closes before
|
||||
// the message can be emitted. The important behavior is verified in the first test:
|
||||
// the listener IS removed when SIGTERM is sent.
|
||||
})
|
||||
|
||||
t.Run("forward foreground creates forwarder and removes on interrupt", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
nodes := harness.NewT(t).NewNodes(2).Init()
|
||||
nodes.ForEachPar(func(n *harness.Node) {
|
||||
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
})
|
||||
nodes.StartDaemons().Connect()
|
||||
|
||||
forwardPort := harness.NewRandPort()
|
||||
|
||||
// Start foreground forwarder asynchronously on node 0
|
||||
res := nodes[0].Runner.Run(harness.RunRequest{
|
||||
Path: nodes[0].IPFSBin,
|
||||
Args: []string{"p2p", "forward", "--foreground", "/x/fgfwd", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
// Wait for forwarder to be created
|
||||
waitForListenerCount(t, nodes[0], 1)
|
||||
|
||||
// Send SIGTERM
|
||||
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
|
||||
_ = res.Cmd.Wait()
|
||||
|
||||
// Wait for forwarder to be removed
|
||||
waitForListenerCount(t, nodes[0], 0)
|
||||
})
|
||||
|
||||
t.Run("forward foreground text output on SIGTERM", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
nodes := harness.NewT(t).NewNodes(2).Init()
|
||||
nodes.ForEachPar(func(n *harness.Node) {
|
||||
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
})
|
||||
nodes.StartDaemons().Connect()
|
||||
|
||||
forwardPort := harness.NewRandPort()
|
||||
|
||||
// Run without --enc=json to test actual text output users see
|
||||
res := nodes[0].Runner.Run(harness.RunRequest{
|
||||
Path: nodes[0].IPFSBin,
|
||||
Args: []string{"p2p", "forward", "--foreground", "/x/fwdsigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
waitForListenerCount(t, nodes[0], 1)
|
||||
|
||||
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
|
||||
_ = res.Cmd.Wait()
|
||||
|
||||
// Verify stdout shows "waiting for interrupt" message
|
||||
stdout := res.Stdout.String()
|
||||
require.Contains(t, stdout, "waiting for interrupt")
|
||||
|
||||
// Note: "Received interrupt, removing forwarder" message is NOT visible to CLI on SIGTERM
|
||||
// because the response stream closes before the message can be emitted.
|
||||
})
|
||||
|
||||
t.Run("listen without foreground returns immediately and persists", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init()
|
||||
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
node.StartDaemon()
|
||||
|
||||
listenPort := harness.NewRandPort()
|
||||
|
||||
// This should return immediately (not block)
|
||||
node.IPFS("p2p", "listen", "/x/nofg", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort))
|
||||
|
||||
// Listener should still exist
|
||||
waitForListenerProtocol(t, node, "/x/nofg")
|
||||
|
||||
// Clean up
|
||||
node.IPFS("p2p", "close", "-p", "/x/nofg")
|
||||
})
|
||||
|
||||
t.Run("listen foreground text output on p2p close", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init()
|
||||
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
node.StartDaemon()
|
||||
|
||||
listenPort := harness.NewRandPort()
|
||||
|
||||
// Run without --enc=json to test actual text output users see
|
||||
res := node.Runner.Run(harness.RunRequest{
|
||||
Path: node.IPFSBin,
|
||||
Args: []string{"p2p", "listen", "--foreground", "/x/closetest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
// Wait for listener to be created
|
||||
waitForListenerProtocol(t, node, "/x/closetest")
|
||||
|
||||
// Close the listener via ipfs p2p close command
|
||||
node.IPFS("p2p", "close", "-p", "/x/closetest")
|
||||
|
||||
// Wait for foreground command to exit (it should exit quickly after close)
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- res.Cmd.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Good - command exited
|
||||
case <-time.After(5 * time.Second):
|
||||
_ = res.Cmd.Process.Kill()
|
||||
t.Fatal("foreground command did not exit after listener was closed via ipfs p2p close")
|
||||
}
|
||||
|
||||
// Wait for listener to be removed
|
||||
waitForListenerCount(t, node, 0)
|
||||
|
||||
// Verify text output shows BOTH messages when closed via p2p close
|
||||
// (unlike SIGTERM, the stream is still open so "Received interrupt" is emitted)
|
||||
out := res.Stdout.String()
|
||||
require.Contains(t, out, "waiting for interrupt")
|
||||
require.Contains(t, out, "Received interrupt, removing listener")
|
||||
})
|
||||
|
||||
t.Run("forward foreground text output on p2p close", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
nodes := harness.NewT(t).NewNodes(2).Init()
|
||||
nodes.ForEachPar(func(n *harness.Node) {
|
||||
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
})
|
||||
nodes.StartDaemons().Connect()
|
||||
|
||||
forwardPort := harness.NewRandPort()
|
||||
|
||||
// Run without --enc=json to test actual text output users see
|
||||
res := nodes[0].Runner.Run(harness.RunRequest{
|
||||
Path: nodes[0].IPFSBin,
|
||||
Args: []string{"p2p", "forward", "--foreground", "/x/fwdclose", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
// Wait for forwarder to be created
|
||||
waitForListenerCount(t, nodes[0], 1)
|
||||
|
||||
// Close the forwarder via ipfs p2p close command
|
||||
nodes[0].IPFS("p2p", "close", "-a")
|
||||
|
||||
// Wait for foreground command to exit
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- res.Cmd.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Good - command exited
|
||||
case <-time.After(5 * time.Second):
|
||||
_ = res.Cmd.Process.Kill()
|
||||
t.Fatal("foreground command did not exit after forwarder was closed via ipfs p2p close")
|
||||
}
|
||||
|
||||
// Wait for forwarder to be removed
|
||||
waitForListenerCount(t, nodes[0], 0)
|
||||
|
||||
// Verify text output shows BOTH messages when closed via p2p close
|
||||
out := res.Stdout.String()
|
||||
require.Contains(t, out, "waiting for interrupt")
|
||||
require.Contains(t, out, "Received interrupt, removing forwarder")
|
||||
})
|
||||
|
||||
t.Run("listen foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
nodes := harness.NewT(t).NewNodes(2).Init()
|
||||
nodes.ForEachPar(func(n *harness.Node) {
|
||||
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
})
|
||||
nodes.StartDaemons().Connect()
|
||||
|
||||
httpServerPort := harness.NewRandPort()
|
||||
forwardPort := harness.NewRandPort()
|
||||
|
||||
// Start HTTP server
|
||||
expectedBody := "Hello from p2p tunnel!"
|
||||
httpServer := &http.Server{
|
||||
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
_, _ = w.Write([]byte(expectedBody))
|
||||
}),
|
||||
}
|
||||
listener, err := net.Listen("tcp", httpServer.Addr)
|
||||
require.NoError(t, err)
|
||||
go func() { _ = httpServer.Serve(listener) }()
|
||||
defer httpServer.Close()
|
||||
|
||||
// Node 0: listen --foreground
|
||||
listenRes := nodes[0].Runner.Run(harness.RunRequest{
|
||||
Path: nodes[0].IPFSBin,
|
||||
Args: []string{"p2p", "listen", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, listenRes.Err)
|
||||
|
||||
// Wait for listener to be created
|
||||
waitForListenerProtocol(t, nodes[0], "/x/httptest")
|
||||
|
||||
// Node 1: forward (non-foreground)
|
||||
nodes[1].IPFS("p2p", "forward", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/"+nodes[0].PeerID().String())
|
||||
|
||||
// Verify data flows through tunnel
|
||||
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
|
||||
require.NoError(t, err)
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedBody, string(body))
|
||||
|
||||
// Clean up forwarder on node 1
|
||||
nodes[1].IPFS("p2p", "close", "-a")
|
||||
|
||||
// SIGTERM the listen --foreground command
|
||||
_ = listenRes.Cmd.Process.Signal(syscall.SIGTERM)
|
||||
_ = listenRes.Cmd.Wait()
|
||||
|
||||
// Wait for listener to be removed on node 0
|
||||
waitForListenerCount(t, nodes[0], 0)
|
||||
})
|
||||
|
||||
t.Run("forward foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
nodes := harness.NewT(t).NewNodes(2).Init()
|
||||
nodes.ForEachPar(func(n *harness.Node) {
|
||||
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
})
|
||||
nodes.StartDaemons().Connect()
|
||||
|
||||
httpServerPort := harness.NewRandPort()
|
||||
forwardPort := harness.NewRandPort()
|
||||
|
||||
// Start HTTP server
|
||||
expectedBody := "Hello from forward foreground tunnel!"
|
||||
httpServer := &http.Server{
|
||||
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
_, _ = w.Write([]byte(expectedBody))
|
||||
}),
|
||||
}
|
||||
listener, err := net.Listen("tcp", httpServer.Addr)
|
||||
require.NoError(t, err)
|
||||
go func() { _ = httpServer.Serve(listener) }()
|
||||
defer httpServer.Close()
|
||||
|
||||
// Node 0: listen (non-foreground)
|
||||
nodes[0].IPFS("p2p", "listen", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort))
|
||||
|
||||
// Node 1: forward --foreground
|
||||
forwardRes := nodes[1].Runner.Run(harness.RunRequest{
|
||||
Path: nodes[1].IPFSBin,
|
||||
Args: []string{"p2p", "forward", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[0].PeerID().String()},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, forwardRes.Err)
|
||||
|
||||
// Wait for forwarder to be created
|
||||
waitForListenerCount(t, nodes[1], 1)
|
||||
|
||||
// Verify data flows through tunnel
|
||||
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
|
||||
require.NoError(t, err)
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedBody, string(body))
|
||||
|
||||
// SIGTERM the forward --foreground command
|
||||
_ = forwardRes.Cmd.Process.Signal(syscall.SIGTERM)
|
||||
_ = forwardRes.Cmd.Wait()
|
||||
|
||||
// Wait for forwarder to be removed on node 1
|
||||
waitForListenerCount(t, nodes[1], 0)
|
||||
|
||||
// Clean up listener on node 0
|
||||
nodes[0].IPFS("p2p", "close", "-a")
|
||||
})
|
||||
|
||||
t.Run("foreground command exits when daemon shuts down", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init()
|
||||
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
||||
node.StartDaemon()
|
||||
|
||||
listenPort := harness.NewRandPort()
|
||||
|
||||
// Start foreground listener
|
||||
res := node.Runner.Run(harness.RunRequest{
|
||||
Path: node.IPFSBin,
|
||||
Args: []string{"p2p", "listen", "--foreground", "/x/daemontest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
|
||||
RunFunc: (*exec.Cmd).Start,
|
||||
})
|
||||
require.NoError(t, res.Err)
|
||||
|
||||
// Wait for listener to be created
|
||||
waitForListenerProtocol(t, node, "/x/daemontest")
|
||||
|
||||
// Stop the daemon
|
||||
node.StopDaemon()
|
||||
|
||||
// Wait for foreground command to exit
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- res.Cmd.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Good - foreground command exited when daemon stopped
|
||||
case <-time.After(5 * time.Second):
|
||||
_ = res.Cmd.Process.Kill()
|
||||
t.Fatal("foreground command did not exit when daemon was stopped")
|
||||
}
|
||||
})
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user