feat(p2p): add --foreground flag to listen and forward commands

adds `-f/--foreground` option that keeps the command running until
interrupted (SIGTERM/Ctrl+C) or closed via `ipfs p2p close`. the
listener/forwarder is automatically removed when the command exits.

useful for systemd services and scripts that need cleanup on exit.
This commit is contained in:
Marcin Rataj 2025-12-10 02:51:42 +01:00
parent d29c0b9c01
commit e7bb8129d3
7 changed files with 624 additions and 26 deletions

View File

@ -50,9 +50,17 @@ type P2PStreamsOutput struct {
Streams []P2PStreamInfoOutput
}
// P2PForegroundOutput is output type for foreground mode status messages
type P2PForegroundOutput struct {
Status string // "active" or "closing"
Protocol string
Address string
}
const (
allowCustomProtocolOptionName = "allow-custom-protocol"
reportPeerIDOptionName = "report-peer-id"
foregroundOptionName = "foreground"
)
var resolveTimeout = 10 * time.Second
@ -83,15 +91,35 @@ var p2pForwardCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Forward connections to libp2p service.",
ShortDescription: `
Forward connections made to <listen-address> to <target-address>.
Forward connections made to <listen-address> to <target-address> via libp2p.
<protocol> specifies the libp2p protocol name to use for libp2p
connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'.
Creates a local TCP listener that tunnels connections through libp2p to a
remote peer's p2p listener. Similar to SSH port forwarding (-L flag).
Example:
ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /p2p/QmPeer
- Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /p2p/QmPeer
ARGUMENTS:
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
<listen-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
<target-address> Remote peer multiaddr (e.g., /p2p/PeerID)
FOREGROUND MODE (--foreground, -f):
By default, the forwarder runs in the daemon and the command returns
immediately. Use --foreground to block until interrupted:
- Ctrl+C or SIGTERM: Removes the forwarder and exits
- 'ipfs p2p close': Removes the forwarder and exits
- Daemon shutdown: Forwarder is automatically removed
Useful for systemd services or scripts that need cleanup on exit.
EXAMPLES:
# Persistent forwarder (command returns immediately)
ipfs p2p forward /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
# Temporary forwarder (removed when command exits)
ipfs p2p forward -f /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
`,
},
Arguments: []cmds.Argument{
@ -101,6 +129,7 @@ Example:
},
Options: []cmds.Option{
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; forwarder is removed when command exits"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
@ -130,7 +159,51 @@ Example:
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}
return forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
listener, err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
if err != nil {
return err
}
foreground, _ := req.Options[foregroundOptionName].(bool)
if foreground {
if err := res.Emit(&P2PForegroundOutput{
Status: "active",
Protocol: protoOpt,
Address: listenOpt,
}); err != nil {
return err
}
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
// or listener removal (ipfs p2p close)
select {
case <-req.Context.Done():
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
n.P2P.ListenersLocal.Close(func(l p2p.Listener) bool {
return l == listener
})
return nil
case <-listener.Done():
// Closed via "ipfs p2p close" - emit closing message
return res.Emit(&P2PForegroundOutput{
Status: "closing",
Protocol: protoOpt,
Address: listenOpt,
})
}
}
return nil
},
Type: P2PForegroundOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
if out.Status == "active" {
fmt.Fprintf(w, "Forwarding %s to %s, waiting for interrupt...\n", out.Protocol, out.Address)
} else if out.Status == "closing" {
fmt.Fprintf(w, "Received interrupt, removing forwarder for %s\n", out.Protocol)
}
return nil
}),
},
}
@ -185,14 +258,38 @@ var p2pListenCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Create libp2p service.",
ShortDescription: `
Create libp2p service and forward connections made to <target-address>.
Create a libp2p protocol handler that forwards incoming connections to
<target-address>.
<protocol> specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'.
When a remote peer connects using 'ipfs p2p forward', the connection is
forwarded to your local service. Similar to SSH port forwarding (server side).
Example:
ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234
- Forward connections to 'myproto' libp2p service to 127.0.0.1:1234
ARGUMENTS:
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
<target-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
FOREGROUND MODE (--foreground, -f):
By default, the listener runs in the daemon and the command returns
immediately. Use --foreground to block until interrupted:
- Ctrl+C or SIGTERM: Removes the listener and exits
- 'ipfs p2p close': Removes the listener and exits
- Daemon shutdown: Listener is automatically removed
Useful for systemd services or scripts that need cleanup on exit.
EXAMPLES:
# Persistent listener (command returns immediately)
ipfs p2p listen /x/myapp /ip4/127.0.0.1/tcp/3000
# Temporary listener (removed when command exits)
ipfs p2p listen -f /x/myapp /ip4/127.0.0.1/tcp/3000
# Report connecting peer ID to the target application
ipfs p2p listen -r /x/myapp /ip4/127.0.0.1/tcp/3000
`,
},
Arguments: []cmds.Argument{
@ -202,6 +299,7 @@ Example:
Options: []cmds.Option{
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"),
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; listener is removed when command exits"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
@ -231,8 +329,51 @@ Example:
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}
_, err = n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
return err
listener, err := n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
if err != nil {
return err
}
foreground, _ := req.Options[foregroundOptionName].(bool)
if foreground {
if err := res.Emit(&P2PForegroundOutput{
Status: "active",
Protocol: protoOpt,
Address: targetOpt,
}); err != nil {
return err
}
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
// or listener removal (ipfs p2p close)
select {
case <-req.Context.Done():
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
n.P2P.ListenersP2P.Close(func(l p2p.Listener) bool {
return l == listener
})
return nil
case <-listener.Done():
// Closed via "ipfs p2p close" - emit closing message
return res.Emit(&P2PForegroundOutput{
Status: "closing",
Protocol: protoOpt,
Address: targetOpt,
})
}
}
return nil
},
Type: P2PForegroundOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
if out.Status == "active" {
fmt.Fprintf(w, "Listening on %s, forwarding to %s, waiting for interrupt...\n", out.Protocol, out.Address)
} else if out.Status == "closing" {
fmt.Fprintf(w, "Received interrupt, removing listener for %s\n", out.Protocol)
}
return nil
}),
},
}
@ -271,11 +412,9 @@ func checkPort(target ma.Multiaddr) error {
}
// forwardLocal forwards local connections to a libp2p service
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) error {
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) (p2p.Listener, error) {
ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL)
// TODO: return some info
_, err := p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
return err
return p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
}
const (

View File

@ -11,7 +11,8 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team.
- [Overview](#overview)
- [🔦 Highlights](#-highlights)
- [Routing V1 HTTP API now exposed by default](#routing-v1-http-api-now-exposed-by-default)
- [Track total size when adding pins](#track-total-size-when-adding-pins]
- [Track total size when adding pins](#track-total-size-when-adding-pins)
- [`ipfs p2p` foreground mode](#ipfs-p2p-foreground-mode)
- [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors)
@ -32,6 +33,14 @@ Example output:
Fetched/Processed 336 nodes (83 MB)
```
#### `ipfs p2p` foreground mode
The `ipfs p2p listen` and `ipfs p2p forward` commands now support a `--foreground` (`-f`) flag that keeps the command running until interrupted. When the command exits (via Ctrl+C, SIGTERM, or daemon shutdown), the listener/forwarder is automatically removed.
Without `--foreground`, the commands return immediately and the listener persists in the daemon (existing behavior).
Run `ipfs p2p listen --help` or `ipfs p2p forward --help` for details and examples.
### 📝 Changelog
### 👨‍👩‍👧‍👦 Contributors

View File

@ -308,6 +308,8 @@ ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID
You should now be able to connect to your ssh server through a libp2p connection
with `ssh [user]@127.0.0.1 -p 2222`.
> **Tip:** Both commands support `--foreground` (`-f`) flag for blocking behavior.
> Run `ipfs p2p listen --help` or `ipfs p2p forward --help` for details.
### Road to being a real feature

View File

@ -20,6 +20,10 @@ type Listener interface {
// close closes the listener. Does not affect child streams
close()
// Done returns a channel that is closed when the listener is closed.
// This allows callers to detect when a listener has been removed.
Done() <-chan struct{}
}
// Listeners manages a group of Listener implementations,
@ -73,15 +77,13 @@ func (r *Listeners) Register(l Listener) error {
return nil
}
// Close removes and closes all listeners for which matchFunc returns true.
// Returns the number of listeners closed.
func (r *Listeners) Close(matchFunc func(listener Listener) bool) int {
todo := make([]Listener, 0)
var todo []Listener
r.Lock()
for _, l := range r.Listeners {
if !matchFunc(l) {
continue
}
if _, ok := r.Listeners[l.key()]; ok {
if matchFunc(l) {
delete(r.Listeners, l.key())
todo = append(todo, l)
}

View File

@ -23,6 +23,7 @@ type localListener struct {
peer peer.ID
listener manet.Listener
done chan struct{}
}
// ForwardLocal creates new P2P stream to a remote listener.
@ -32,6 +33,7 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I
p2p: p2p,
proto: proto,
peer: peer,
done: make(chan struct{}),
}
maListener, err := manet.Listen(bindAddr)
@ -98,6 +100,11 @@ func (l *localListener) setupStream(local manet.Conn) {
func (l *localListener) close() {
l.listener.Close()
close(l.done)
}
func (l *localListener) Done() <-chan struct{} {
return l.done
}
func (l *localListener) Protocol() protocol.ID {

View File

@ -25,6 +25,8 @@ type remoteListener struct {
// reportRemote if set to true makes the handler send '<base58 remote peerid>\n'
// to target before any data is forwarded
reportRemote bool
done chan struct{}
}
// ForwardRemote creates new p2p listener.
@ -36,6 +38,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu
addr: addr,
reportRemote: reportRemote,
done: make(chan struct{}),
}
if err := p2p.ListenersP2P.Register(listener); err != nil {
@ -99,7 +102,13 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr {
return l.addr
}
func (l *remoteListener) close() {}
func (l *remoteListener) close() {
close(l.done)
}
func (l *remoteListener) Done() <-chan struct{} {
return l.done
}
func (l *remoteListener) key() protocol.ID {
return l.proto

430
test/cli/p2p_test.go Normal file
View File

@ -0,0 +1,430 @@
package cli
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os/exec"
"slices"
"syscall"
"testing"
"time"
"github.com/ipfs/kubo/core/commands"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/require"
)
// waitForListenerCount waits until the node has exactly the expected number of listeners.
func waitForListenerCount(t *testing.T, node *harness.Node, expectedCount int) {
t.Helper()
require.Eventually(t, func() bool {
lsOut := node.IPFS("p2p", "ls", "--enc=json")
var lsResult commands.P2PLsOutput
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
return false
}
return len(lsResult.Listeners) == expectedCount
}, 5*time.Second, 100*time.Millisecond, "expected %d listeners", expectedCount)
}
// waitForListenerProtocol waits until the node has a listener with the given protocol.
func waitForListenerProtocol(t *testing.T, node *harness.Node, protocol string) {
t.Helper()
require.Eventually(t, func() bool {
lsOut := node.IPFS("p2p", "ls", "--enc=json")
var lsResult commands.P2PLsOutput
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
return false
}
return slices.ContainsFunc(lsResult.Listeners, func(l commands.P2PListenerInfoOutput) bool {
return l.Protocol == protocol
})
}, 5*time.Second, 100*time.Millisecond, "expected listener with protocol %s", protocol)
}
func TestP2PForeground(t *testing.T) {
t.Parallel()
t.Run("listen foreground creates listener and removes on interrupt", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Start foreground listener asynchronously
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/fgtest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/fgtest")
// Send SIGTERM
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Wait for listener to be removed
waitForListenerCount(t, node, 0)
})
t.Run("listen foreground text output on SIGTERM", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/sigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
waitForListenerProtocol(t, node, "/x/sigterm")
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Verify stdout shows "waiting for interrupt" message
stdout := res.Stdout.String()
require.Contains(t, stdout, "waiting for interrupt")
// Note: "Received interrupt, removing listener" message is NOT visible to CLI on SIGTERM
// because the command runs in the daemon via RPC and the response stream closes before
// the message can be emitted. The important behavior is verified in the first test:
// the listener IS removed when SIGTERM is sent.
})
t.Run("forward foreground creates forwarder and removes on interrupt", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Start foreground forwarder asynchronously on node 0
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fgfwd", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[0], 1)
// Send SIGTERM
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Wait for forwarder to be removed
waitForListenerCount(t, nodes[0], 0)
})
t.Run("forward foreground text output on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fwdsigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
waitForListenerCount(t, nodes[0], 1)
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Verify stdout shows "waiting for interrupt" message
stdout := res.Stdout.String()
require.Contains(t, stdout, "waiting for interrupt")
// Note: "Received interrupt, removing forwarder" message is NOT visible to CLI on SIGTERM
// because the response stream closes before the message can be emitted.
})
t.Run("listen without foreground returns immediately and persists", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// This should return immediately (not block)
node.IPFS("p2p", "listen", "/x/nofg", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort))
// Listener should still exist
waitForListenerProtocol(t, node, "/x/nofg")
// Clean up
node.IPFS("p2p", "close", "-p", "/x/nofg")
})
t.Run("listen foreground text output on p2p close", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/closetest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/closetest")
// Close the listener via ipfs p2p close command
node.IPFS("p2p", "close", "-p", "/x/closetest")
// Wait for foreground command to exit (it should exit quickly after close)
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - command exited
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit after listener was closed via ipfs p2p close")
}
// Wait for listener to be removed
waitForListenerCount(t, node, 0)
// Verify text output shows BOTH messages when closed via p2p close
// (unlike SIGTERM, the stream is still open so "Received interrupt" is emitted)
out := res.Stdout.String()
require.Contains(t, out, "waiting for interrupt")
require.Contains(t, out, "Received interrupt, removing listener")
})
t.Run("forward foreground text output on p2p close", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fwdclose", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[0], 1)
// Close the forwarder via ipfs p2p close command
nodes[0].IPFS("p2p", "close", "-a")
// Wait for foreground command to exit
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - command exited
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit after forwarder was closed via ipfs p2p close")
}
// Wait for forwarder to be removed
waitForListenerCount(t, nodes[0], 0)
// Verify text output shows BOTH messages when closed via p2p close
out := res.Stdout.String()
require.Contains(t, out, "waiting for interrupt")
require.Contains(t, out, "Received interrupt, removing forwarder")
})
t.Run("listen foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
httpServerPort := harness.NewRandPort()
forwardPort := harness.NewRandPort()
// Start HTTP server
expectedBody := "Hello from p2p tunnel!"
httpServer := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(expectedBody))
}),
}
listener, err := net.Listen("tcp", httpServer.Addr)
require.NoError(t, err)
go func() { _ = httpServer.Serve(listener) }()
defer httpServer.Close()
// Node 0: listen --foreground
listenRes := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, listenRes.Err)
// Wait for listener to be created
waitForListenerProtocol(t, nodes[0], "/x/httptest")
// Node 1: forward (non-foreground)
nodes[1].IPFS("p2p", "forward", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/"+nodes[0].PeerID().String())
// Verify data flows through tunnel
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
require.NoError(t, err)
require.Equal(t, expectedBody, string(body))
// Clean up forwarder on node 1
nodes[1].IPFS("p2p", "close", "-a")
// SIGTERM the listen --foreground command
_ = listenRes.Cmd.Process.Signal(syscall.SIGTERM)
_ = listenRes.Cmd.Wait()
// Wait for listener to be removed on node 0
waitForListenerCount(t, nodes[0], 0)
})
t.Run("forward foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
httpServerPort := harness.NewRandPort()
forwardPort := harness.NewRandPort()
// Start HTTP server
expectedBody := "Hello from forward foreground tunnel!"
httpServer := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(expectedBody))
}),
}
listener, err := net.Listen("tcp", httpServer.Addr)
require.NoError(t, err)
go func() { _ = httpServer.Serve(listener) }()
defer httpServer.Close()
// Node 0: listen (non-foreground)
nodes[0].IPFS("p2p", "listen", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort))
// Node 1: forward --foreground
forwardRes := nodes[1].Runner.Run(harness.RunRequest{
Path: nodes[1].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[0].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, forwardRes.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[1], 1)
// Verify data flows through tunnel
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
require.NoError(t, err)
require.Equal(t, expectedBody, string(body))
// SIGTERM the forward --foreground command
_ = forwardRes.Cmd.Process.Signal(syscall.SIGTERM)
_ = forwardRes.Cmd.Wait()
// Wait for forwarder to be removed on node 1
waitForListenerCount(t, nodes[1], 0)
// Clean up listener on node 0
nodes[0].IPFS("p2p", "close", "-a")
})
t.Run("foreground command exits when daemon shuts down", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Start foreground listener
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/daemontest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/daemontest")
// Stop the daemon
node.StopDaemon()
// Wait for foreground command to exit
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - foreground command exited when daemon stopped
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit when daemon was stopped")
}
})
}