kubo/test/cli/p2p_test.go
Marcin Rataj e7bb8129d3 feat(p2p): add --foreground flag to listen and forward commands
adds `-f/--foreground` option that keeps the command running until
interrupted (SIGTERM/Ctrl+C) or closed via `ipfs p2p close`. the
listener/forwarder is automatically removed when the command exits.

useful for systemd services and scripts that need cleanup on exit.
2025-12-10 03:02:24 +01:00

431 lines
14 KiB
Go

package cli
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os/exec"
"slices"
"syscall"
"testing"
"time"
"github.com/ipfs/kubo/core/commands"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/require"
)
// waitForListenerCount waits until the node has exactly the expected number of listeners.
func waitForListenerCount(t *testing.T, node *harness.Node, expectedCount int) {
t.Helper()
require.Eventually(t, func() bool {
lsOut := node.IPFS("p2p", "ls", "--enc=json")
var lsResult commands.P2PLsOutput
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
return false
}
return len(lsResult.Listeners) == expectedCount
}, 5*time.Second, 100*time.Millisecond, "expected %d listeners", expectedCount)
}
// waitForListenerProtocol waits until the node has a listener with the given protocol.
func waitForListenerProtocol(t *testing.T, node *harness.Node, protocol string) {
t.Helper()
require.Eventually(t, func() bool {
lsOut := node.IPFS("p2p", "ls", "--enc=json")
var lsResult commands.P2PLsOutput
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
return false
}
return slices.ContainsFunc(lsResult.Listeners, func(l commands.P2PListenerInfoOutput) bool {
return l.Protocol == protocol
})
}, 5*time.Second, 100*time.Millisecond, "expected listener with protocol %s", protocol)
}
func TestP2PForeground(t *testing.T) {
t.Parallel()
t.Run("listen foreground creates listener and removes on interrupt", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Start foreground listener asynchronously
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/fgtest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/fgtest")
// Send SIGTERM
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Wait for listener to be removed
waitForListenerCount(t, node, 0)
})
t.Run("listen foreground text output on SIGTERM", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/sigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
waitForListenerProtocol(t, node, "/x/sigterm")
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Verify stdout shows "waiting for interrupt" message
stdout := res.Stdout.String()
require.Contains(t, stdout, "waiting for interrupt")
// Note: "Received interrupt, removing listener" message is NOT visible to CLI on SIGTERM
// because the command runs in the daemon via RPC and the response stream closes before
// the message can be emitted. The important behavior is verified in the first test:
// the listener IS removed when SIGTERM is sent.
})
t.Run("forward foreground creates forwarder and removes on interrupt", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Start foreground forwarder asynchronously on node 0
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fgfwd", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[0], 1)
// Send SIGTERM
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Wait for forwarder to be removed
waitForListenerCount(t, nodes[0], 0)
})
t.Run("forward foreground text output on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fwdsigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
waitForListenerCount(t, nodes[0], 1)
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Verify stdout shows "waiting for interrupt" message
stdout := res.Stdout.String()
require.Contains(t, stdout, "waiting for interrupt")
// Note: "Received interrupt, removing forwarder" message is NOT visible to CLI on SIGTERM
// because the response stream closes before the message can be emitted.
})
t.Run("listen without foreground returns immediately and persists", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// This should return immediately (not block)
node.IPFS("p2p", "listen", "/x/nofg", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort))
// Listener should still exist
waitForListenerProtocol(t, node, "/x/nofg")
// Clean up
node.IPFS("p2p", "close", "-p", "/x/nofg")
})
t.Run("listen foreground text output on p2p close", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/closetest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/closetest")
// Close the listener via ipfs p2p close command
node.IPFS("p2p", "close", "-p", "/x/closetest")
// Wait for foreground command to exit (it should exit quickly after close)
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - command exited
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit after listener was closed via ipfs p2p close")
}
// Wait for listener to be removed
waitForListenerCount(t, node, 0)
// Verify text output shows BOTH messages when closed via p2p close
// (unlike SIGTERM, the stream is still open so "Received interrupt" is emitted)
out := res.Stdout.String()
require.Contains(t, out, "waiting for interrupt")
require.Contains(t, out, "Received interrupt, removing listener")
})
t.Run("forward foreground text output on p2p close", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fwdclose", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[0], 1)
// Close the forwarder via ipfs p2p close command
nodes[0].IPFS("p2p", "close", "-a")
// Wait for foreground command to exit
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - command exited
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit after forwarder was closed via ipfs p2p close")
}
// Wait for forwarder to be removed
waitForListenerCount(t, nodes[0], 0)
// Verify text output shows BOTH messages when closed via p2p close
out := res.Stdout.String()
require.Contains(t, out, "waiting for interrupt")
require.Contains(t, out, "Received interrupt, removing forwarder")
})
t.Run("listen foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
httpServerPort := harness.NewRandPort()
forwardPort := harness.NewRandPort()
// Start HTTP server
expectedBody := "Hello from p2p tunnel!"
httpServer := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(expectedBody))
}),
}
listener, err := net.Listen("tcp", httpServer.Addr)
require.NoError(t, err)
go func() { _ = httpServer.Serve(listener) }()
defer httpServer.Close()
// Node 0: listen --foreground
listenRes := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, listenRes.Err)
// Wait for listener to be created
waitForListenerProtocol(t, nodes[0], "/x/httptest")
// Node 1: forward (non-foreground)
nodes[1].IPFS("p2p", "forward", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/"+nodes[0].PeerID().String())
// Verify data flows through tunnel
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
require.NoError(t, err)
require.Equal(t, expectedBody, string(body))
// Clean up forwarder on node 1
nodes[1].IPFS("p2p", "close", "-a")
// SIGTERM the listen --foreground command
_ = listenRes.Cmd.Process.Signal(syscall.SIGTERM)
_ = listenRes.Cmd.Wait()
// Wait for listener to be removed on node 0
waitForListenerCount(t, nodes[0], 0)
})
t.Run("forward foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
httpServerPort := harness.NewRandPort()
forwardPort := harness.NewRandPort()
// Start HTTP server
expectedBody := "Hello from forward foreground tunnel!"
httpServer := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(expectedBody))
}),
}
listener, err := net.Listen("tcp", httpServer.Addr)
require.NoError(t, err)
go func() { _ = httpServer.Serve(listener) }()
defer httpServer.Close()
// Node 0: listen (non-foreground)
nodes[0].IPFS("p2p", "listen", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort))
// Node 1: forward --foreground
forwardRes := nodes[1].Runner.Run(harness.RunRequest{
Path: nodes[1].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[0].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, forwardRes.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[1], 1)
// Verify data flows through tunnel
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
require.NoError(t, err)
require.Equal(t, expectedBody, string(body))
// SIGTERM the forward --foreground command
_ = forwardRes.Cmd.Process.Signal(syscall.SIGTERM)
_ = forwardRes.Cmd.Wait()
// Wait for forwarder to be removed on node 1
waitForListenerCount(t, nodes[1], 0)
// Clean up listener on node 0
nodes[0].IPFS("p2p", "close", "-a")
})
t.Run("foreground command exits when daemon shuts down", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Start foreground listener
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/daemontest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/daemontest")
// Stop the daemon
node.StopDaemon()
// Wait for foreground command to exit
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - foreground command exited when daemon stopped
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit when daemon was stopped")
}
})
}