mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
* feat(p2p): add --foreground flag to listen and forward commands
adds `-f/--foreground` option that keeps the command running until
interrupted (SIGTERM/Ctrl+C) or closed via `ipfs p2p close`. the
listener/forwarder is automatically removed when the command exits.
useful for systemd services and scripts that need cleanup on exit.
* docs: add p2p-tunnels.md with systemd examples
- add dedicated docs/p2p-tunnels.md covering:
- why p2p tunnels (NAT traversal, no public IP needed)
- quick start with netcat
- background and foreground modes
- systemd integration with path-based activation
- security considerations and troubleshooting
- document Experimental.Libp2pStreamMounting in docs/config.md
- simplify docs/experimental-features.md, link to new doc
- add "Learn more" links to ipfs p2p listen/forward --help
- update changelog entry with doc link
- add cross-reference in misc/README.md
* chore: reference kubo#5460 for p2p config
Ref. https://github.com/ipfs/kubo/issues/5460
* fix(daemon): write api/gateway files only after HTTP server is ready
fixes race condition where $IPFS_PATH/api and $IPFS_PATH/gateway files
were written before the HTTP servers were ready to accept connections.
this caused issues for tools like systemd path units that immediately
try to connect when these files appear.
changes:
- add corehttp.ServeWithReady() that signals when server is ready
- wait for ready signal before writing address files
- use sync.WaitGroup.Go() (Go 1.25) for cleaner goroutine management
- add TestAddressFileReady to verify both api and gateway files
* fix(daemon): buffer errc channel and wait for all listeners
- buffer error channel with len(listeners) to prevent deadlock when
multiple servers write errors simultaneously
- wait for ALL listeners to be ready before writing api/gateway file,
not just the first one
Feedback-from: https://github.com/ipfs/kubo/pull/11099#pullrequestreview-3593885839
* docs(changelog): improve p2p tunnel section clarity
reframe to lead with user benefit and add example output
* docs(p2p): remove obsolete race condition caveat
the "First launch fails but restarts work" troubleshooting section
described a race where the api file was written before the daemon was
ready. this was fixed in 80b703a which ensures api/gateway files are
only written after HTTP servers are ready to accept connections.
---------
Co-authored-by: Andrew Gillis <11790789+gammazero@users.noreply.github.com>
431 lines
14 KiB
Go
431 lines
14 KiB
Go
package cli
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os/exec"
|
|
"slices"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ipfs/kubo/core/commands"
|
|
"github.com/ipfs/kubo/test/cli/harness"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// waitForListenerCount waits until the node has exactly the expected number of listeners.
|
|
func waitForListenerCount(t *testing.T, node *harness.Node, expectedCount int) {
|
|
t.Helper()
|
|
require.Eventually(t, func() bool {
|
|
lsOut := node.IPFS("p2p", "ls", "--enc=json")
|
|
var lsResult commands.P2PLsOutput
|
|
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
|
|
return false
|
|
}
|
|
return len(lsResult.Listeners) == expectedCount
|
|
}, 5*time.Second, 100*time.Millisecond, "expected %d listeners", expectedCount)
|
|
}
|
|
|
|
// waitForListenerProtocol waits until the node has a listener with the given protocol.
|
|
func waitForListenerProtocol(t *testing.T, node *harness.Node, protocol string) {
|
|
t.Helper()
|
|
require.Eventually(t, func() bool {
|
|
lsOut := node.IPFS("p2p", "ls", "--enc=json")
|
|
var lsResult commands.P2PLsOutput
|
|
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
|
|
return false
|
|
}
|
|
return slices.ContainsFunc(lsResult.Listeners, func(l commands.P2PListenerInfoOutput) bool {
|
|
return l.Protocol == protocol
|
|
})
|
|
}, 5*time.Second, 100*time.Millisecond, "expected listener with protocol %s", protocol)
|
|
}
|
|
|
|
func TestP2PForeground(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("listen foreground creates listener and removes on interrupt", func(t *testing.T) {
|
|
t.Parallel()
|
|
node := harness.NewT(t).NewNode().Init()
|
|
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
node.StartDaemon()
|
|
|
|
listenPort := harness.NewRandPort()
|
|
|
|
// Start foreground listener asynchronously
|
|
res := node.Runner.Run(harness.RunRequest{
|
|
Path: node.IPFSBin,
|
|
Args: []string{"p2p", "listen", "--foreground", "/x/fgtest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, res.Err)
|
|
|
|
// Wait for listener to be created
|
|
waitForListenerProtocol(t, node, "/x/fgtest")
|
|
|
|
// Send SIGTERM
|
|
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
|
|
_ = res.Cmd.Wait()
|
|
|
|
// Wait for listener to be removed
|
|
waitForListenerCount(t, node, 0)
|
|
})
|
|
|
|
t.Run("listen foreground text output on SIGTERM", func(t *testing.T) {
|
|
t.Parallel()
|
|
node := harness.NewT(t).NewNode().Init()
|
|
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
node.StartDaemon()
|
|
|
|
listenPort := harness.NewRandPort()
|
|
|
|
// Run without --enc=json to test actual text output users see
|
|
res := node.Runner.Run(harness.RunRequest{
|
|
Path: node.IPFSBin,
|
|
Args: []string{"p2p", "listen", "--foreground", "/x/sigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, res.Err)
|
|
|
|
waitForListenerProtocol(t, node, "/x/sigterm")
|
|
|
|
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
|
|
_ = res.Cmd.Wait()
|
|
|
|
// Verify stdout shows "waiting for interrupt" message
|
|
stdout := res.Stdout.String()
|
|
require.Contains(t, stdout, "waiting for interrupt")
|
|
|
|
// Note: "Received interrupt, removing listener" message is NOT visible to CLI on SIGTERM
|
|
// because the command runs in the daemon via RPC and the response stream closes before
|
|
// the message can be emitted. The important behavior is verified in the first test:
|
|
// the listener IS removed when SIGTERM is sent.
|
|
})
|
|
|
|
t.Run("forward foreground creates forwarder and removes on interrupt", func(t *testing.T) {
|
|
t.Parallel()
|
|
nodes := harness.NewT(t).NewNodes(2).Init()
|
|
nodes.ForEachPar(func(n *harness.Node) {
|
|
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
})
|
|
nodes.StartDaemons().Connect()
|
|
|
|
forwardPort := harness.NewRandPort()
|
|
|
|
// Start foreground forwarder asynchronously on node 0
|
|
res := nodes[0].Runner.Run(harness.RunRequest{
|
|
Path: nodes[0].IPFSBin,
|
|
Args: []string{"p2p", "forward", "--foreground", "/x/fgfwd", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, res.Err)
|
|
|
|
// Wait for forwarder to be created
|
|
waitForListenerCount(t, nodes[0], 1)
|
|
|
|
// Send SIGTERM
|
|
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
|
|
_ = res.Cmd.Wait()
|
|
|
|
// Wait for forwarder to be removed
|
|
waitForListenerCount(t, nodes[0], 0)
|
|
})
|
|
|
|
t.Run("forward foreground text output on SIGTERM", func(t *testing.T) {
|
|
t.Parallel()
|
|
nodes := harness.NewT(t).NewNodes(2).Init()
|
|
nodes.ForEachPar(func(n *harness.Node) {
|
|
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
})
|
|
nodes.StartDaemons().Connect()
|
|
|
|
forwardPort := harness.NewRandPort()
|
|
|
|
// Run without --enc=json to test actual text output users see
|
|
res := nodes[0].Runner.Run(harness.RunRequest{
|
|
Path: nodes[0].IPFSBin,
|
|
Args: []string{"p2p", "forward", "--foreground", "/x/fwdsigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, res.Err)
|
|
|
|
waitForListenerCount(t, nodes[0], 1)
|
|
|
|
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
|
|
_ = res.Cmd.Wait()
|
|
|
|
// Verify stdout shows "waiting for interrupt" message
|
|
stdout := res.Stdout.String()
|
|
require.Contains(t, stdout, "waiting for interrupt")
|
|
|
|
// Note: "Received interrupt, removing forwarder" message is NOT visible to CLI on SIGTERM
|
|
// because the response stream closes before the message can be emitted.
|
|
})
|
|
|
|
t.Run("listen without foreground returns immediately and persists", func(t *testing.T) {
|
|
t.Parallel()
|
|
node := harness.NewT(t).NewNode().Init()
|
|
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
node.StartDaemon()
|
|
|
|
listenPort := harness.NewRandPort()
|
|
|
|
// This should return immediately (not block)
|
|
node.IPFS("p2p", "listen", "/x/nofg", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort))
|
|
|
|
// Listener should still exist
|
|
waitForListenerProtocol(t, node, "/x/nofg")
|
|
|
|
// Clean up
|
|
node.IPFS("p2p", "close", "-p", "/x/nofg")
|
|
})
|
|
|
|
t.Run("listen foreground text output on p2p close", func(t *testing.T) {
|
|
t.Parallel()
|
|
node := harness.NewT(t).NewNode().Init()
|
|
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
node.StartDaemon()
|
|
|
|
listenPort := harness.NewRandPort()
|
|
|
|
// Run without --enc=json to test actual text output users see
|
|
res := node.Runner.Run(harness.RunRequest{
|
|
Path: node.IPFSBin,
|
|
Args: []string{"p2p", "listen", "--foreground", "/x/closetest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, res.Err)
|
|
|
|
// Wait for listener to be created
|
|
waitForListenerProtocol(t, node, "/x/closetest")
|
|
|
|
// Close the listener via ipfs p2p close command
|
|
node.IPFS("p2p", "close", "-p", "/x/closetest")
|
|
|
|
// Wait for foreground command to exit (it should exit quickly after close)
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- res.Cmd.Wait()
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// Good - command exited
|
|
case <-time.After(5 * time.Second):
|
|
_ = res.Cmd.Process.Kill()
|
|
t.Fatal("foreground command did not exit after listener was closed via ipfs p2p close")
|
|
}
|
|
|
|
// Wait for listener to be removed
|
|
waitForListenerCount(t, node, 0)
|
|
|
|
// Verify text output shows BOTH messages when closed via p2p close
|
|
// (unlike SIGTERM, the stream is still open so "Received interrupt" is emitted)
|
|
out := res.Stdout.String()
|
|
require.Contains(t, out, "waiting for interrupt")
|
|
require.Contains(t, out, "Received interrupt, removing listener")
|
|
})
|
|
|
|
t.Run("forward foreground text output on p2p close", func(t *testing.T) {
|
|
t.Parallel()
|
|
nodes := harness.NewT(t).NewNodes(2).Init()
|
|
nodes.ForEachPar(func(n *harness.Node) {
|
|
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
})
|
|
nodes.StartDaemons().Connect()
|
|
|
|
forwardPort := harness.NewRandPort()
|
|
|
|
// Run without --enc=json to test actual text output users see
|
|
res := nodes[0].Runner.Run(harness.RunRequest{
|
|
Path: nodes[0].IPFSBin,
|
|
Args: []string{"p2p", "forward", "--foreground", "/x/fwdclose", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, res.Err)
|
|
|
|
// Wait for forwarder to be created
|
|
waitForListenerCount(t, nodes[0], 1)
|
|
|
|
// Close the forwarder via ipfs p2p close command
|
|
nodes[0].IPFS("p2p", "close", "-a")
|
|
|
|
// Wait for foreground command to exit
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- res.Cmd.Wait()
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// Good - command exited
|
|
case <-time.After(5 * time.Second):
|
|
_ = res.Cmd.Process.Kill()
|
|
t.Fatal("foreground command did not exit after forwarder was closed via ipfs p2p close")
|
|
}
|
|
|
|
// Wait for forwarder to be removed
|
|
waitForListenerCount(t, nodes[0], 0)
|
|
|
|
// Verify text output shows BOTH messages when closed via p2p close
|
|
out := res.Stdout.String()
|
|
require.Contains(t, out, "waiting for interrupt")
|
|
require.Contains(t, out, "Received interrupt, removing forwarder")
|
|
})
|
|
|
|
t.Run("listen foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
|
|
t.Parallel()
|
|
nodes := harness.NewT(t).NewNodes(2).Init()
|
|
nodes.ForEachPar(func(n *harness.Node) {
|
|
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
})
|
|
nodes.StartDaemons().Connect()
|
|
|
|
httpServerPort := harness.NewRandPort()
|
|
forwardPort := harness.NewRandPort()
|
|
|
|
// Start HTTP server
|
|
expectedBody := "Hello from p2p tunnel!"
|
|
httpServer := &http.Server{
|
|
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
|
|
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
_, _ = w.Write([]byte(expectedBody))
|
|
}),
|
|
}
|
|
listener, err := net.Listen("tcp", httpServer.Addr)
|
|
require.NoError(t, err)
|
|
go func() { _ = httpServer.Serve(listener) }()
|
|
defer httpServer.Close()
|
|
|
|
// Node 0: listen --foreground
|
|
listenRes := nodes[0].Runner.Run(harness.RunRequest{
|
|
Path: nodes[0].IPFSBin,
|
|
Args: []string{"p2p", "listen", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, listenRes.Err)
|
|
|
|
// Wait for listener to be created
|
|
waitForListenerProtocol(t, nodes[0], "/x/httptest")
|
|
|
|
// Node 1: forward (non-foreground)
|
|
nodes[1].IPFS("p2p", "forward", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/"+nodes[0].PeerID().String())
|
|
|
|
// Verify data flows through tunnel
|
|
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
|
|
require.NoError(t, err)
|
|
body, err := io.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedBody, string(body))
|
|
|
|
// Clean up forwarder on node 1
|
|
nodes[1].IPFS("p2p", "close", "-a")
|
|
|
|
// SIGTERM the listen --foreground command
|
|
_ = listenRes.Cmd.Process.Signal(syscall.SIGTERM)
|
|
_ = listenRes.Cmd.Wait()
|
|
|
|
// Wait for listener to be removed on node 0
|
|
waitForListenerCount(t, nodes[0], 0)
|
|
})
|
|
|
|
t.Run("forward foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
|
|
t.Parallel()
|
|
nodes := harness.NewT(t).NewNodes(2).Init()
|
|
nodes.ForEachPar(func(n *harness.Node) {
|
|
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
})
|
|
nodes.StartDaemons().Connect()
|
|
|
|
httpServerPort := harness.NewRandPort()
|
|
forwardPort := harness.NewRandPort()
|
|
|
|
// Start HTTP server
|
|
expectedBody := "Hello from forward foreground tunnel!"
|
|
httpServer := &http.Server{
|
|
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
|
|
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
_, _ = w.Write([]byte(expectedBody))
|
|
}),
|
|
}
|
|
listener, err := net.Listen("tcp", httpServer.Addr)
|
|
require.NoError(t, err)
|
|
go func() { _ = httpServer.Serve(listener) }()
|
|
defer httpServer.Close()
|
|
|
|
// Node 0: listen (non-foreground)
|
|
nodes[0].IPFS("p2p", "listen", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort))
|
|
|
|
// Node 1: forward --foreground
|
|
forwardRes := nodes[1].Runner.Run(harness.RunRequest{
|
|
Path: nodes[1].IPFSBin,
|
|
Args: []string{"p2p", "forward", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[0].PeerID().String()},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, forwardRes.Err)
|
|
|
|
// Wait for forwarder to be created
|
|
waitForListenerCount(t, nodes[1], 1)
|
|
|
|
// Verify data flows through tunnel
|
|
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
|
|
require.NoError(t, err)
|
|
body, err := io.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, expectedBody, string(body))
|
|
|
|
// SIGTERM the forward --foreground command
|
|
_ = forwardRes.Cmd.Process.Signal(syscall.SIGTERM)
|
|
_ = forwardRes.Cmd.Wait()
|
|
|
|
// Wait for forwarder to be removed on node 1
|
|
waitForListenerCount(t, nodes[1], 0)
|
|
|
|
// Clean up listener on node 0
|
|
nodes[0].IPFS("p2p", "close", "-a")
|
|
})
|
|
|
|
t.Run("foreground command exits when daemon shuts down", func(t *testing.T) {
|
|
t.Parallel()
|
|
node := harness.NewT(t).NewNode().Init()
|
|
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
|
|
node.StartDaemon()
|
|
|
|
listenPort := harness.NewRandPort()
|
|
|
|
// Start foreground listener
|
|
res := node.Runner.Run(harness.RunRequest{
|
|
Path: node.IPFSBin,
|
|
Args: []string{"p2p", "listen", "--foreground", "/x/daemontest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
|
|
RunFunc: (*exec.Cmd).Start,
|
|
})
|
|
require.NoError(t, res.Err)
|
|
|
|
// Wait for listener to be created
|
|
waitForListenerProtocol(t, node, "/x/daemontest")
|
|
|
|
// Stop the daemon
|
|
node.StopDaemon()
|
|
|
|
// Wait for foreground command to exit
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- res.Cmd.Wait()
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// Good - foreground command exited when daemon stopped
|
|
case <-time.After(5 * time.Second):
|
|
_ = res.Cmd.Process.Kill()
|
|
t.Fatal("foreground command did not exit when daemon was stopped")
|
|
}
|
|
})
|
|
}
|