Merge remote-tracking branch 'origin/master' into fix/pubsub-validator

# Conflicts:
#	docs/changelogs/v0.40.md
This commit is contained in:
Marcin Rataj 2026-01-09 19:31:42 +01:00
commit fada2b6dc4
46 changed files with 1483 additions and 160 deletions

View File

@ -887,23 +887,38 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error
return nil, fmt.Errorf("serveHTTPApi: ConstructNode() failed: %s", err)
}
// Buffer channel to prevent deadlock when multiple servers write errors simultaneously
errc := make(chan error, len(listeners))
var wg sync.WaitGroup
// Start all servers and wait for them to be ready before writing api file.
// This prevents race conditions where external tools (like systemd path units)
// see the file and try to connect before servers can accept connections.
if len(listeners) > 0 {
// Only add an api file if the API is running.
readyChannels := make([]chan struct{}, len(listeners))
for i, lis := range listeners {
readyChannels[i] = make(chan struct{})
ready := readyChannels[i]
wg.Go(func() {
errc <- corehttp.ServeWithReady(node, manet.NetListener(lis), ready, opts...)
})
}
// Wait for all listeners to be ready or any to fail
for _, ready := range readyChannels {
select {
case <-ready:
// This listener is ready
case err := <-errc:
return nil, fmt.Errorf("serveHTTPApi: %w", err)
}
}
if err := node.Repo.SetAPIAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())); err != nil {
return nil, fmt.Errorf("serveHTTPApi: SetAPIAddr() failed: %w", err)
}
}
errc := make(chan error)
var wg sync.WaitGroup
for _, apiLis := range listeners {
wg.Add(1)
go func(lis manet.Listener) {
defer wg.Done()
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
}(apiLis)
}
go func() {
wg.Wait()
close(errc)
@ -1062,26 +1077,42 @@ func serveHTTPGateway(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, e
return nil, fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err)
}
// Buffer channel to prevent deadlock when multiple servers write errors simultaneously
errc := make(chan error, len(listeners))
var wg sync.WaitGroup
// Start all servers and wait for them to be ready before writing gateway file.
// This prevents race conditions where external tools (like systemd path units)
// see the file and try to connect before servers can accept connections.
if len(listeners) > 0 {
readyChannels := make([]chan struct{}, len(listeners))
for i, lis := range listeners {
readyChannels[i] = make(chan struct{})
ready := readyChannels[i]
wg.Go(func() {
errc <- corehttp.ServeWithReady(node, manet.NetListener(lis), ready, opts...)
})
}
// Wait for all listeners to be ready or any to fail
for _, ready := range readyChannels {
select {
case <-ready:
// This listener is ready
case err := <-errc:
return nil, fmt.Errorf("serveHTTPGateway: %w", err)
}
}
addr, err := manet.ToNetAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr()))
if err != nil {
return nil, fmt.Errorf("serveHTTPGateway: manet.ToIP() failed: %w", err)
return nil, fmt.Errorf("serveHTTPGateway: manet.ToNetAddr() failed: %w", err)
}
if err := node.Repo.SetGatewayAddr(addr); err != nil {
return nil, fmt.Errorf("serveHTTPGateway: SetGatewayAddr() failed: %w", err)
}
}
errc := make(chan error)
var wg sync.WaitGroup
for _, lis := range listeners {
wg.Add(1)
go func(lis manet.Listener) {
defer wg.Done()
errc <- corehttp.Serve(node, manet.NetListener(lis), opts...)
}(lis)
}
go func() {
wg.Wait()
close(errc)

View File

@ -7,6 +7,7 @@ import (
"io"
"path"
"github.com/dustin/go-humanize"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"
@ -349,7 +350,11 @@ type DagStatSummary struct {
}
func (s *DagStatSummary) String() string {
return fmt.Sprintf("Total Size: %d\nUnique Blocks: %d\nShared Size: %d\nRatio: %f", s.TotalSize, s.UniqueBlocks, s.SharedSize, s.Ratio)
return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nShared Size: %d (%s)\nRatio: %f",
s.TotalSize, humanize.Bytes(s.TotalSize),
s.UniqueBlocks,
s.SharedSize, humanize.Bytes(s.SharedSize),
s.Ratio)
}
func (s *DagStatSummary) incrementTotalSize(size uint64) {
@ -384,7 +389,7 @@ Note: This command skips duplicate blocks in reporting both size and the number
cmds.StringArg("root", true, true, "CID of a DAG root to get statistics for").EnableStdin(),
},
Options: []cmds.Option{
cmds.BoolOption(progressOptionName, "p", "Return progressive data while reading through the DAG").WithDefault(true),
cmds.BoolOption(progressOptionName, "p", "Show progress on stderr. Auto-detected if stderr is a terminal."),
},
Run: dagStat,
Type: DagStatSummary{},

View File

@ -5,6 +5,7 @@ import (
"io"
"os"
"github.com/dustin/go-humanize"
mdag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/traverse"
cid "github.com/ipfs/go-cid"
@ -19,7 +20,11 @@ import (
// to compute the new state
func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
progressive := req.Options[progressOptionName].(bool)
// Default to true (emit intermediate states) for HTTP/RPC clients that want progress
progressive := true
if val, specified := req.Options[progressOptionName].(bool); specified {
progressive = val
}
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
@ -84,6 +89,18 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
}
func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error {
// Determine whether to show progress based on TTY detection or explicit flag
var showProgress bool
val, specified := res.Request().Options[progressOptionName]
if !specified {
// Auto-detect: show progress only if stderr is a TTY
if errStat, err := os.Stderr.Stat(); err == nil {
showProgress = (errStat.Mode() & os.ModeCharDevice) != 0
}
} else {
showProgress = val.(bool)
}
var dagStats *DagStatSummary
for {
v, err := res.Next()
@ -96,17 +113,26 @@ func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error {
switch out := v.(type) {
case *DagStatSummary:
dagStats = out
if dagStats.Ratio == 0 {
length := len(dagStats.DagStatsArray)
if length > 0 {
currentStat := dagStats.DagStatsArray[length-1]
fmt.Fprintf(os.Stderr, "CID: %s, Size: %d, NumBlocks: %d\n", currentStat.Cid, currentStat.Size, currentStat.NumBlocks)
// Ratio == 0 means this is a progress update (not final result)
if showProgress && dagStats.Ratio == 0 {
// Sum up total progress across all DAGs being scanned
var totalBlocks int64
var totalSize uint64
for _, stat := range dagStats.DagStatsArray {
totalBlocks += stat.NumBlocks
totalSize += stat.Size
}
fmt.Fprintf(os.Stderr, "Fetched/Processed %d blocks, %d bytes (%s)\r", totalBlocks, totalSize, humanize.Bytes(totalSize))
}
default:
return e.TypeErr(out, v)
}
}
// Clear the progress line before final output
if showProgress {
fmt.Fprint(os.Stderr, "\033[2K\r")
}
return re.Emit(dagStats)
}

View File

@ -50,9 +50,17 @@ type P2PStreamsOutput struct {
Streams []P2PStreamInfoOutput
}
// P2PForegroundOutput is output type for foreground mode status messages
type P2PForegroundOutput struct {
Status string // "active" or "closing"
Protocol string
Address string
}
const (
allowCustomProtocolOptionName = "allow-custom-protocol"
reportPeerIDOptionName = "report-peer-id"
foregroundOptionName = "foreground"
)
var resolveTimeout = 10 * time.Second
@ -83,15 +91,37 @@ var p2pForwardCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Forward connections to libp2p service.",
ShortDescription: `
Forward connections made to <listen-address> to <target-address>.
Forward connections made to <listen-address> to <target-address> via libp2p.
<protocol> specifies the libp2p protocol name to use for libp2p
connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'.
Creates a local TCP listener that tunnels connections through libp2p to a
remote peer's p2p listener. Similar to SSH port forwarding (-L flag).
Example:
ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /p2p/QmPeer
- Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /p2p/QmPeer
ARGUMENTS:
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
<listen-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
<target-address> Remote peer multiaddr (e.g., /p2p/PeerID)
FOREGROUND MODE (--foreground, -f):
By default, the forwarder runs in the daemon and the command returns
immediately. Use --foreground to block until interrupted:
- Ctrl+C or SIGTERM: Removes the forwarder and exits
- 'ipfs p2p close': Removes the forwarder and exits
- Daemon shutdown: Forwarder is automatically removed
Useful for systemd services or scripts that need cleanup on exit.
EXAMPLES:
# Persistent forwarder (command returns immediately)
ipfs p2p forward /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
# Temporary forwarder (removed when command exits)
ipfs p2p forward -f /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md
`,
},
Arguments: []cmds.Argument{
@ -101,6 +131,7 @@ Example:
},
Options: []cmds.Option{
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; forwarder is removed when command exits"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
@ -130,7 +161,51 @@ Example:
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}
return forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
listener, err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
if err != nil {
return err
}
foreground, _ := req.Options[foregroundOptionName].(bool)
if foreground {
if err := res.Emit(&P2PForegroundOutput{
Status: "active",
Protocol: protoOpt,
Address: listenOpt,
}); err != nil {
return err
}
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
// or listener removal (ipfs p2p close)
select {
case <-req.Context.Done():
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
n.P2P.ListenersLocal.Close(func(l p2p.Listener) bool {
return l == listener
})
return nil
case <-listener.Done():
// Closed via "ipfs p2p close" - emit closing message
return res.Emit(&P2PForegroundOutput{
Status: "closing",
Protocol: protoOpt,
Address: listenOpt,
})
}
}
return nil
},
Type: P2PForegroundOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
if out.Status == "active" {
fmt.Fprintf(w, "Forwarding %s to %s, waiting for interrupt...\n", out.Protocol, out.Address)
} else if out.Status == "closing" {
fmt.Fprintf(w, "Received interrupt, removing forwarder for %s\n", out.Protocol)
}
return nil
}),
},
}
@ -185,14 +260,40 @@ var p2pListenCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Create libp2p service.",
ShortDescription: `
Create libp2p service and forward connections made to <target-address>.
Create a libp2p protocol handler that forwards incoming connections to
<target-address>.
<protocol> specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'.
When a remote peer connects using 'ipfs p2p forward', the connection is
forwarded to your local service. Similar to SSH port forwarding (server side).
Example:
ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234
- Forward connections to 'myproto' libp2p service to 127.0.0.1:1234
ARGUMENTS:
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
<target-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
FOREGROUND MODE (--foreground, -f):
By default, the listener runs in the daemon and the command returns
immediately. Use --foreground to block until interrupted:
- Ctrl+C or SIGTERM: Removes the listener and exits
- 'ipfs p2p close': Removes the listener and exits
- Daemon shutdown: Listener is automatically removed
Useful for systemd services or scripts that need cleanup on exit.
EXAMPLES:
# Persistent listener (command returns immediately)
ipfs p2p listen /x/myapp /ip4/127.0.0.1/tcp/3000
# Temporary listener (removed when command exits)
ipfs p2p listen -f /x/myapp /ip4/127.0.0.1/tcp/3000
# Report connecting peer ID to the target application
ipfs p2p listen -r /x/myapp /ip4/127.0.0.1/tcp/3000
Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md
`,
},
Arguments: []cmds.Argument{
@ -202,6 +303,7 @@ Example:
Options: []cmds.Option{
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"),
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; listener is removed when command exits"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
@ -231,8 +333,51 @@ Example:
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}
_, err = n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
return err
listener, err := n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
if err != nil {
return err
}
foreground, _ := req.Options[foregroundOptionName].(bool)
if foreground {
if err := res.Emit(&P2PForegroundOutput{
Status: "active",
Protocol: protoOpt,
Address: targetOpt,
}); err != nil {
return err
}
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
// or listener removal (ipfs p2p close)
select {
case <-req.Context.Done():
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
n.P2P.ListenersP2P.Close(func(l p2p.Listener) bool {
return l == listener
})
return nil
case <-listener.Done():
// Closed via "ipfs p2p close" - emit closing message
return res.Emit(&P2PForegroundOutput{
Status: "closing",
Protocol: protoOpt,
Address: targetOpt,
})
}
}
return nil
},
Type: P2PForegroundOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
if out.Status == "active" {
fmt.Fprintf(w, "Listening on %s, forwarding to %s, waiting for interrupt...\n", out.Protocol, out.Address)
} else if out.Status == "closing" {
fmt.Fprintf(w, "Received interrupt, removing listener for %s\n", out.Protocol)
}
return nil
}),
},
}
@ -271,11 +416,9 @@ func checkPort(target ma.Multiaddr) error {
}
// forwardLocal forwards local connections to a libp2p service
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) error {
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) (p2p.Listener, error) {
ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL)
// TODO: return some info
_, err := p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
return err
return p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
}
const (

View File

@ -78,9 +78,23 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv
return Serve(n, manet.NetListener(list), options...)
}
// Serve accepts incoming HTTP connections on the listener and pass them
// Serve accepts incoming HTTP connections on the listener and passes them
// to ServeOption handlers.
func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error {
return ServeWithReady(node, lis, nil, options...)
}
// ServeWithReady is like Serve but signals on the ready channel when the
// server is about to accept connections. The channel is closed right before
// server.Serve() is called.
//
// This is useful for callers that need to perform actions (like writing
// address files) only after the server is guaranteed to be accepting
// connections, avoiding race conditions where clients see the file before
// the server is ready.
//
// Passing nil for ready is equivalent to calling Serve().
func ServeWithReady(node *core.IpfsNode, lis net.Listener, ready chan<- struct{}, options ...ServeOption) error {
// make sure we close this no matter what.
defer lis.Close()
@ -107,6 +121,9 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
var serverError error
serverClosed := make(chan struct{})
go func() {
if ready != nil {
close(ready)
}
serverError = server.Serve(lis)
close(serverClosed)
}()

View File

@ -692,6 +692,48 @@ See docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#providedhtmaxw
// ONLINE/OFFLINE
// hasDHTRouting checks if the routing configuration includes a DHT component.
// Returns false for HTTP-only custom routing configurations (e.g., Routing.Type="custom"
// with only HTTP routers). This is used to determine whether SweepingProviderOpt
// can be used, since it requires a DHT client.
func hasDHTRouting(cfg *config.Config) bool {
routingType := cfg.Routing.Type.WithDefault(config.DefaultRoutingType)
switch routingType {
case "auto", "autoclient", "dht", "dhtclient", "dhtserver":
return true
case "custom":
// Check if any router in custom config is DHT-based
for _, router := range cfg.Routing.Routers {
if routerIncludesDHT(router, cfg) {
return true
}
}
return false
default: // "none", "delegated"
return false
}
}
// routerIncludesDHT recursively checks if a router configuration includes DHT.
// Handles parallel and sequential composite routers by checking their children.
func routerIncludesDHT(rp config.RouterParser, cfg *config.Config) bool {
switch rp.Type {
case config.RouterTypeDHT:
return true
case config.RouterTypeParallel, config.RouterTypeSequential:
if children, ok := rp.Parameters.(*config.ComposableRouterParams); ok {
for _, child := range children.Routers {
if childRouter, exists := cfg.Routing.Routers[child.RouterName]; exists {
if routerIncludesDHT(childRouter, cfg) {
return true
}
}
}
}
}
return false
}
// OnlineProviders groups units managing provide routing records online
func OnlineProviders(provide bool, cfg *config.Config) fx.Option {
if !provide {
@ -708,7 +750,15 @@ func OnlineProviders(provide bool, cfg *config.Config) fx.Option {
opts := []fx.Option{
fx.Provide(setReproviderKeyProvider(providerStrategy)),
}
if cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled) {
sweepEnabled := cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled)
dhtAvailable := hasDHTRouting(cfg)
// Use SweepingProvider only when both sweep is enabled AND DHT is available.
// For HTTP-only routing (e.g., Routing.Type="custom" with only HTTP routers),
// fall back to LegacyProvider which works with ProvideManyRouter.
// See https://github.com/ipfs/kubo/issues/11089
if sweepEnabled && dhtAvailable {
opts = append(opts, SweepingProviderOpt(cfg))
} else {
reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval)

View File

@ -14,6 +14,8 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team.
- [Track total size when adding pins](#track-total-size-when-adding-pins)
- [Improved IPNS over PubSub validation](#improved-ipns-over-pubsub-validation)
- [New `ipfs diag datastore` commands](#new-ipfs-diag-datastore-commands)
- [🚇 Improved `ipfs p2p` tunnels with foreground mode](#-improved-ipfs-p2p-tunnels-with-foreground-mode)
- [Improved `ipfs dag stat` output](#improved-ipfs-dag-stat-output)
- [Skip bad keys when listing](#skip_bad_keys_when_listing)
- [📦️ Dependency updates](#-dependency-updates)
- [📝 Changelog](#-changelog)
@ -61,6 +63,48 @@ Key: /pubsub/seqno/12D3KooW...
Hex Dump:
00000000 18 81 81 c8 91 c0 ea f6 |........|
```
#### 🚇 Improved `ipfs p2p` tunnels with foreground mode
P2P tunnels can now run like SSH port forwarding: start a tunnel, use it, and it cleans up automatically when you're done.
The new `--foreground` (`-f`) flag for `ipfs p2p listen` and `ipfs p2p forward` keeps the command running until interrupted. When you Ctrl+C, send SIGTERM, or stop the service, the tunnel is removed automatically:
```console
$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 --foreground
Listening on /x/ssh, forwarding to /ip4/127.0.0.1/tcp/22, waiting for interrupt...
^C
Received interrupt, removing listener for /x/ssh
```
Without `--foreground`, commands return immediately and tunnels persist until explicitly closed (existing behavior).
See [docs/p2p-tunnels.md](https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md) for usage examples.
#### Improved `ipfs dag stat` output
The `ipfs dag stat` command has been improved for better terminal UX:
- Progress output now uses a single line with carriage return, avoiding terminal flooding
- Progress is auto-detected: shown only in interactive terminals by default
- Human-readable sizes are now displayed alongside raw byte counts
Example progress (interactive terminal):
```
Fetched/Processed 84 blocks, 2097152 bytes (2.1 MB)
```
Example summary output:
```
Summary
Total Size: 2097152 (2.1 MB)
Unique Blocks: 42
Shared Size: 1048576 (1.0 MB)
Ratio: 1.500000
```
Use `--progress=true` to force progress even when piped, or `--progress=false` to disable it.
#### Skip bad keys when listing
Change the `ipfs key list` behavior to log an error and continue listing keys when a key cannot be read from the keystore or decoded.

View File

@ -59,6 +59,7 @@ config file at runtime.
- [`Discovery.MDNS.Enabled`](#discoverymdnsenabled)
- [`Discovery.MDNS.Interval`](#discoverymdnsinterval)
- [`Experimental`](#experimental)
- [`Experimental.Libp2pStreamMounting`](#experimentallibp2pstreammounting)
- [`Gateway`](#gateway)
- [`Gateway.NoFetch`](#gatewaynofetch)
- [`Gateway.NoDNSLink`](#gatewaynodnslink)
@ -1071,11 +1072,26 @@ in the [new mDNS implementation](https://github.com/libp2p/zeroconf#readme).
Toggle and configure experimental features of Kubo. Experimental features are listed [here](./experimental-features.md).
### `Experimental.Libp2pStreamMounting`
Enables the `ipfs p2p` commands for tunneling TCP connections through libp2p
streams, similar to SSH port forwarding.
See [docs/p2p-tunnels.md](p2p-tunnels.md) for usage examples.
Default: `false`
Type: `bool`
## `Gateway`
Options for the HTTP gateway.
**NOTE:** support for `/api/v0` under the gateway path is now deprecated. It will be removed in future versions: <https://github.com/ipfs/kubo/issues/10312>.
> [!IMPORTANT]
> By default, Kubo's gateway is configured for local use at `127.0.0.1` and `localhost`.
> To run a public gateway, configure your domain names in [`Gateway.PublicGateways`](#gatewaypublicgateways).
> For production deployment considerations (reverse proxy, timeouts, rate limiting, CDN),
> see [Running in Production](gateway.md#running-in-production).
### `Gateway.NoFetch`
@ -1270,6 +1286,11 @@ Examples:
- `*.example.com` will match requests to `http://foo.example.com/ipfs/*` or `http://{cid}.ipfs.bar.example.com/*`.
- `foo-*.example.com` will match requests to `http://foo-bar.example.com/ipfs/*` or `http://{cid}.ipfs.foo-xyz.example.com/*`.
> [!IMPORTANT]
> **Reverse Proxy:** If running behind nginx or another reverse proxy, ensure
> `Host` and `X-Forwarded-*` headers are forwarded correctly.
> See [Reverse Proxy Caveats](gateway.md#reverse-proxy) in gateway documentation.
#### `Gateway.PublicGateways: Paths`
An array of paths that should be exposed on the hostname.
@ -1336,6 +1357,9 @@ Default: `false`
Type: `bool`
> [!IMPORTANT]
> See [Reverse Proxy Caveats](gateway.md#reverse-proxy) if running behind nginx or another reverse proxy.
#### `Gateway.PublicGateways: NoDNSLink`
A boolean to configure whether DNSLink for hostname present in `Host`
@ -1346,6 +1370,9 @@ Default: `false` (DNSLink lookup enabled by default for every defined hostname)
Type: `bool`
> [!IMPORTANT]
> See [Reverse Proxy Caveats](gateway.md#reverse-proxy) if running behind nginx or another reverse proxy.
#### `Gateway.PublicGateways: InlineDNSLink`
An optional flag to explicitly configure whether subdomain gateway's redirects
@ -1413,6 +1440,9 @@ ipfs config --json Gateway.PublicGateways '{"localhost": null }'
Below is a list of the most common gateway setups.
> [!IMPORTANT]
> See [Reverse Proxy Caveats](gateway.md#reverse-proxy) if running behind nginx or another reverse proxy.
- Public [subdomain gateway](https://docs.ipfs.tech/how-to/address-ipfs-on-web/#subdomain-gateway) at `http://{cid}.ipfs.dweb.link` (each content root gets its own Origin)
```console
@ -2197,6 +2227,9 @@ You can compare the effectiveness of sweep mode vs legacy mode by monitoring the
> [!NOTE]
> This is the default provider system as of Kubo v0.39. To use the legacy provider instead, set `Provide.DHT.SweepEnabled=false`.
> [!NOTE]
> When DHT routing is unavailable (e.g., `Routing.Type=custom` with only HTTP routers), the provider automatically falls back to the legacy provider regardless of this setting.
Default: `true`
Type: `flag`
@ -2363,8 +2396,8 @@ Replaced with [`Provide.DHT.MaxWorkers`](#providedhtmaxworkers).
## `Pubsub`
Pubsub configures the `ipfs pubsub` subsystem. To enable, set `Pubsub.Enabled`
to `true`.
Pubsub configures Kubo's opt-in, opinionated [libp2p pubsub](https://docs.libp2p.io/concepts/pubsub/overview/) instance.
To enable, set `Pubsub.Enabled` to `true`.
**EXPERIMENTAL:** This is an opt-in feature. Its primary use case is
[IPNS over PubSub](https://specs.ipfs.tech/ipns/ipns-pubsub-router/), which

View File

@ -199,9 +199,8 @@ configured, the daemon will fail to start.
## ipfs p2p
Allows tunneling of TCP connections through Libp2p streams. If you've ever used
port forwarding with SSH (the `-L` option in OpenSSH), this feature is quite
similar.
Allows tunneling of TCP connections through libp2p streams, similar to SSH port
forwarding (`ssh -L`).
### State
@ -220,98 +219,20 @@ Experimental, will be stabilized in 0.6.0
> If you enable this and plan to expose CLI or HTTP RPC to other users or machines,
> secure RPC API using [`API.Authorizations`](https://github.com/ipfs/kubo/blob/master/docs/config.md#apiauthorizations) or custom auth middleware.
The `p2p` command needs to be enabled in the config:
```sh
> ipfs config --json Experimental.Libp2pStreamMounting true
```
### How to use
**Netcat example:**
First, pick a protocol name for your application. Think of the protocol name as
a port number, just significantly more user-friendly. In this example, we're
going to use `/x/kickass/1.0`.
***Setup:***
1. A "server" node with peer ID `$SERVER_ID`
2. A "client" node.
***On the "server" node:***
First, start your application and have it listen for TCP connections on
port `$APP_PORT`.
Then, configure the p2p listener by running:
```sh
> ipfs p2p listen /x/kickass/1.0 /ip4/127.0.0.1/tcp/$APP_PORT
```
This will configure IPFS to forward all incoming `/x/kickass/1.0` streams to
`127.0.0.1:$APP_PORT` (opening a new connection to `127.0.0.1:$APP_PORT` per
incoming stream.
***On the "client" node:***
First, configure the client p2p dialer, so that it forwards all inbound
connections on `127.0.0.1:SOME_PORT` to the server node listening
on `/x/kickass/1.0`.
```sh
> ipfs p2p forward /x/kickass/1.0 /ip4/127.0.0.1/tcp/$SOME_PORT /p2p/$SERVER_ID
```
Next, have your application open a connection to `127.0.0.1:$SOME_PORT`. This
connection will be forwarded to the service running on `127.0.0.1:$APP_PORT` on
the remote machine. You can test it with netcat:
***On "server" node:***
```sh
> nc -v -l -p $APP_PORT
```
***On "client" node:***
```sh
> nc -v 127.0.0.1 $SOME_PORT
```
You should now see that a connection has been established and be able to
exchange messages between netcat instances.
(note that depending on your netcat version you may need to drop the `-v` flag)
**SSH example**
**Setup:**
1. A "server" node with peer ID `$SERVER_ID` and running ssh server on the
default port.
2. A "client" node.
_you can get `$SERVER_ID` by running `ipfs id -f "<id>\n"`_
***First, on the "server" node:***
```sh
ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22
```
***Then, on "client" node:***
```sh
ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID
```
You should now be able to connect to your ssh server through a libp2p connection
with `ssh [user]@127.0.0.1 -p 2222`.
See [docs/p2p-tunnels.md](p2p-tunnels.md) for usage examples, foreground mode,
and systemd integration.
### Road to being a real feature
- [ ] More documentation
- [x] More documentation
- [x] `ipfs p2p forward` mode
- [ ] Ability to define tunnels via JSON config, similar to [`Peering.Peers`](https://github.com/ipfs/kubo/blob/master/docs/config.md#peeringpeers), see [kubo#5460](https://github.com/ipfs/kubo/issues/5460)
## p2p http proxy

View File

@ -6,7 +6,7 @@ they were stored in a traditional web server.
[More about Gateways](https://docs.ipfs.tech/concepts/ipfs-gateway/) and [addressing IPFS on the web](https://docs.ipfs.tech/how-to/address-ipfs-on-web/).
Kubo's Gateway implementation follows [ipfs/specs: Specification for HTTP Gateways](https://github.com/ipfs/specs/tree/main/http-gateways#readme).
Kubo's Gateway implementation follows [IPFS Gateway Specifications](https://specs.ipfs.tech/http-gateways/) and is tested with [Gateway Conformance Test Suite](https://github.com/ipfs/gateway-conformance).
### Local gateway
@ -14,14 +14,21 @@ By default, Kubo nodes run
a [path gateway](https://docs.ipfs.tech/how-to/address-ipfs-on-web/#path-gateway) at `http://127.0.0.1:8080/`
and a [subdomain gateway](https://docs.ipfs.tech/how-to/address-ipfs-on-web/#subdomain-gateway) at `http://localhost:8080/`.
The path one also implements [trustless gateway spec](https://specs.ipfs.tech/http-gateways/trustless-gateway/)
and supports [trustless responses](https://docs.ipfs.tech/reference/http/gateway/#trustless-verifiable-retrieval) as opt-in via `Accept` header.
> [!CAUTION]
> **For browsing websites, web apps, and dapps in a browser, use the subdomain
> gateway** (`localhost`). Each content root gets its own
> [web origin](https://developer.mozilla.org/en-US/docs/Web/Security/Same-origin_policy),
> isolating localStorage, cookies, and session data between sites.
>
> **For file retrieval, use the path gateway** (`127.0.0.1`). Path gateways are
> suited for downloading files or fetching [verifiable](https://docs.ipfs.tech/reference/http/gateway/#trustless-verifiable-retrieval)
> content, but lack origin isolation (all content shares the same origin).
Additional listening addresses and gateway behaviors can be set in the [config](#configuration) file.
### Public gateways
Protocol Labs provides a public gateway at
IPFS Foundation [provides public gateways](https://docs.ipfs.tech/concepts/public-utilities/) at
`https://ipfs.io` ([path](https://specs.ipfs.tech/http-gateways/path-gateway/)),
`https://dweb.link` ([subdomain](https://docs.ipfs.tech/how-to/address-ipfs-on-web/#subdomain-gateway)),
and `https://trustless-gateway.link` ([trustless](https://specs.ipfs.tech/http-gateways/trustless-gateway/) only).
@ -41,6 +48,80 @@ The gateway's log level can be changed with this command:
> ipfs log level core/server debug
```
## Running in Production
When deploying Kubo's gateway in production, be aware of these important considerations:
<a id="reverse-proxy"></a>
> [!IMPORTANT]
> **Reverse Proxy:** When running Kubo behind a reverse proxy (such as nginx),
> the original `Host` header **must** be forwarded to Kubo for
> [`Gateway.PublicGateways`](config.md#gatewaypublicgateways) to work.
> Kubo uses the `Host` header to match configured hostnames and detect
> subdomain gateway patterns like `{cid}.ipfs.example.org` or DNSLink hostnames.
>
> If the `Host` header is not forwarded correctly, Kubo will not recognize
> the configured gateway hostnames and requests may be handled incorrectly.
>
> If `X-Forwarded-Proto` is not set, redirects over HTTPS will use wrong protocol
> and DNSLink names will not be inlined for subdomain gateways.
>
> Example: minimal nginx configuration for `example.org`
>
> ```nginx
> server {
> listen 80;
> listen [::]:80;
>
> # IMPORTANT: Include wildcard to match subdomain gateway requests.
> # The dot prefix matches both apex domain and all subdomains.
> server_name .example.org;
>
> location / {
> proxy_pass http://127.0.0.1:8080;
>
> # IMPORTANT: Forward the original Host header to Kubo.
> # Without this, PublicGateways configuration will not work.
> proxy_set_header Host $host;
>
> # IMPORTANT: X-Forwarded-Proto is required for correct behavior:
> # - Redirects will use https:// URLs when set to "https"
> # - DNSLink names will be inlined for subdomain gateways
> # (e.g., /ipns/en.wikipedia-on-ipfs.org → en-wikipedia--on--ipfs-org.ipns.example.org)
> proxy_set_header X-Forwarded-Proto $scheme;
> proxy_set_header X-Forwarded-Host $host;
> }
> }
> ```
>
> Common mistakes to avoid:
>
> - **Missing wildcard in `server_name`:** Using only `server_name example.org;`
> will not match subdomain requests like `{cid}.ipfs.example.org`. Always
> include `*.example.org` or use the dot prefix `.example.org`.
>
> - **Wrong `Host` header value:** Using `proxy_set_header Host $proxy_host;`
> sends the backend's hostname (e.g., `127.0.0.1:8080`) instead of the
> original `Host` header. Always use `$host` or `$http_host`.
>
> - **Missing `Host` header entirely:** If `proxy_set_header Host` is not
> specified, nginx defaults to `$proxy_host`, which breaks gateway routing.
> [!IMPORTANT]
> **Timeouts:** Configure [`Gateway.RetrievalTimeout`](config.md#gatewayretrievaltimeout)
> based on your expected content retrieval times.
> [!IMPORTANT]
> **Rate Limiting:** Use [`Gateway.MaxConcurrentRequests`](config.md#gatewaymaxconcurrentrequests)
> to protect against traffic spikes.
> [!IMPORTANT]
> **CDN/Cloudflare:** If using Cloudflare or other CDNs with
> [deserialized responses](config.md#gatewaydeserializedresponses) enabled, review
> [`Gateway.MaxRangeRequestFileSize`](config.md#gatewaymaxrangerequestfilesize) to avoid
> excess bandwidth billing from range request bugs. Cloudflare users may need additional
> protection via [Cloudflare Snippets](https://github.com/ipfs/boxo/issues/856#issuecomment-3523944976).
## Directories
For convenience, the gateway (mostly) acts like a normal web-server when serving
@ -53,7 +134,7 @@ a directory:
2. Dynamically build and serve a listing of the contents of the directory.
<sub><sup>&dagger;</sup>This redirect is skipped if the query string contains a
`go-get=1` parameter. See [PR#3964](https://github.com/ipfs/kubo/pull/3963)
`go-get=1` parameter. See [PR#3963](https://github.com/ipfs/kubo/pull/3963)
for details</sub>
## Static Websites
@ -107,10 +188,12 @@ This is equivalent of `ipfs block get`.
### `application/vnd.ipld.car`
Returns a [CAR](https://ipld.io/specs/transport/car/) stream for specific DAG and selector.
Returns a [CAR](https://ipld.io/specs/transport/car/) stream for a DAG or a subset of it.
Right now only 'full DAG' implicit selector is implemented.
Support for user-provided IPLD selectors is tracked in https://github.com/ipfs/kubo/issues/8769.
The `dag-scope` parameter controls which blocks are included: `all` (default, entire DAG),
`entity` (logical unit like a file), or `block` (single block). For [UnixFS](https://specs.ipfs.tech/unixfs/) files,
`entity-bytes` enables byte range requests. See [IPIP-402](https://specs.ipfs.tech/ipips/ipip-0402/)
for details.
This is a rough equivalent of `ipfs dag export`.

214
docs/p2p-tunnels.md Normal file
View File

@ -0,0 +1,214 @@
# P2P Tunnels
Kubo supports tunneling TCP connections through libp2p streams, similar to SSH
port forwarding (`ssh -L`). This allows exposing local services to remote peers
and forwarding remote services to local ports.
- [Why P2P Tunnels?](#why-p2p-tunnels)
- [Quick Start](#quick-start)
- [Background Mode](#background-mode)
- [Foreground Mode](#foreground-mode)
- [systemd Integration](#systemd-integration)
- [Security Considerations](#security-considerations)
- [Troubleshooting](#troubleshooting)
## Why P2P Tunnels?
Unlike traditional SSH tunnels, libp2p-based tunnels do not require:
- **No public IP or open ports**: The server does not need a static IP address
or port forwarding configured on the router. Connectivity to peers behind NAT
is facilitated by [Direct Connection Upgrade through Relay (DCUtR)](https://github.com/libp2p/specs/blob/master/relay/DCUtR.md),
which enables NAT hole-punching.
- **No DNS or IP address management**: All you need is the server's PeerID and
an agreed-upon protocol name (e.g., `/x/ssh`). Kubo handles peer discovery
and routing via the [Amino DHT](https://specs.ipfs.tech/routing/kad-dht/).
- **Simplified firewall rules**: Since connections are established through
libp2p's existing swarm connections, no additional firewall configuration is
needed beyond what Kubo already requires.
This makes p2p tunnels useful for connecting to machines on home networks,
behind corporate firewalls, or in environments where traditional port forwarding
is not available.
## Quick Start
Enable the experimental feature:
```console
$ ipfs config --json Experimental.Libp2pStreamMounting true
```
Test with netcat (`nc`) - no services required:
**On the server:**
```console
$ ipfs p2p listen /x/test /ip4/127.0.0.1/tcp/9999
$ nc -l -p 9999
```
**On the client:**
Replace `$SERVER_ID` with the server's peer ID (get it with `ipfs id -f "<id>\n"`
on the server).
```console
$ ipfs p2p forward /x/test /ip4/127.0.0.1/tcp/9998 /p2p/$SERVER_ID
$ nc 127.0.0.1 9998
```
Type in either terminal and the text appears in the other. Use Ctrl+C to exit.
## Background Mode
By default, `ipfs p2p listen` and `ipfs p2p forward` register the tunnel with
the daemon and return immediately. The tunnel persists until explicitly closed
with `ipfs p2p close` or the daemon shuts down.
This example exposes a local SSH server (listening on `localhost:22`) to a
remote peer. The same pattern works for any TCP service.
**On the server** (the machine running SSH):
Register a p2p listener that forwards incoming connections to the local SSH
server. The protocol name `/x/ssh` is an arbitrary identifier that both peers
must agree on (the `/x/` prefix is required for custom protocols).
```console
$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22
```
**On the client:**
Create a local port (`2222`) that tunnels through libp2p to the server's SSH
service.
```console
$ ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID
```
Now connect to SSH through the tunnel:
```console
$ ssh user@127.0.0.1 -p 2222
```
**Other services:** To tunnel a different service, change the port and protocol
name. For example, to expose a web server on port 8080, use `/x/mywebapp` and
`/ip4/127.0.0.1/tcp/8080`.
## Foreground Mode
Use `--foreground` (`-f`) to block until interrupted. The tunnel is
automatically removed when the command exits:
```console
$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 --foreground
Listening on /x/ssh, forwarding to /ip4/127.0.0.1/tcp/22, waiting for interrupt...
^C
Received interrupt, removing listener for /x/ssh
```
The listener/forwarder is automatically removed when:
- The command receives Ctrl+C or SIGTERM
- `ipfs p2p close` is called
- The daemon shuts down
This mode is useful for systemd services and scripts that need cleanup on exit.
### systemd Integration
The `--foreground` flag enables clean integration with systemd. The examples
below show how to run `ipfs p2p listen` as a user service that starts
automatically when the IPFS daemon is ready.
Ensure IPFS daemon runs as a systemd user service. See
[misc/README.md](https://github.com/ipfs/kubo/blob/master/misc/README.md#systemd)
for setup instructions and where to place unit files.
#### P2P listener with path-based activation
Use a `.path` unit to wait for the daemon's RPC API to be ready before starting
the p2p listener.
**`ipfs-p2p-tunnel.path`**:
```systemd
[Unit]
Description=Monitor for IPFS daemon startup
After=ipfs.service
Requires=ipfs.service
[Path]
PathExists=%h/.ipfs/api
Unit=ipfs-p2p-tunnel.service
[Install]
WantedBy=default.target
```
The `%h` specifier expands to the user's home directory. If you use a custom
`IPFS_PATH`, adjust accordingly.
**`ipfs-p2p-tunnel.service`**:
```systemd
[Unit]
Description=IPFS p2p tunnel
Requires=ipfs.service
[Service]
ExecStart=ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 -f
Restart=on-failure
RestartSec=10
[Install]
WantedBy=default.target
```
#### Enabling the services
```console
$ systemctl --user enable ipfs.service
$ systemctl --user enable ipfs-p2p-tunnel.path
$ systemctl --user start ipfs.service
```
The path unit monitors `~/.ipfs/api` and starts `ipfs-p2p-tunnel.service`
once the file exists.
## Security Considerations
> [!WARNING]
> This feature provides CLI and HTTP RPC users with the ability to set up port
> forwarding for localhost and LAN ports. If you enable this and plan to expose
> CLI or HTTP RPC to other users or machines, secure the RPC API using
> [`API.Authorizations`](https://github.com/ipfs/kubo/blob/master/docs/config.md#apiauthorizations)
> or custom auth middleware.
## Troubleshooting
### Foreground listener stops when terminal closes
When using `--foreground`, the listener stops if the terminal closes. For
persistent foreground listeners, use a systemd service, `nohup`, `tmux`, or
`screen`. Without `--foreground`, the listener persists in the daemon regardless
of terminal state.
### Connection refused errors
Verify:
1. The experimental feature is enabled: `ipfs config Experimental.Libp2pStreamMounting`
2. The listener is active: `ipfs p2p ls`
3. Both peers can connect: `ipfs swarm connect /p2p/$PEER_ID`
### Persistent tunnel configuration
There is currently no way to define tunnels in the Kubo JSON config file. Use
`--foreground` mode with a systemd service for persistent tunnels. Support for
configuring tunnels via JSON config may be added in the future (see [kubo#5460](https://github.com/ipfs/kubo/issues/5460) - PRs welcome!).

View File

@ -39,6 +39,12 @@ To run this in your user session, save it as `~/.config/systemd/user/ipfs.servic
```
Read more about `--user` services here: [wiki.archlinux.org:Systemd ](https://wiki.archlinux.org/index.php/Systemd/User#Automatic_start-up_of_systemd_user_instances)
#### P2P tunnel services
For running `ipfs p2p listen` or `ipfs p2p forward` as systemd services,
see [docs/p2p-tunnels.md](../docs/p2p-tunnels.md) for examples using the
`--foreground` flag and path-based activation.
### initd
- Here is a full-featured sample service file: https://github.com/dylanPowers/ipfs-linux-service/blob/master/init.d/ipfs

View File

@ -20,6 +20,10 @@ type Listener interface {
// close closes the listener. Does not affect child streams
close()
// Done returns a channel that is closed when the listener is closed.
// This allows callers to detect when a listener has been removed.
Done() <-chan struct{}
}
// Listeners manages a group of Listener implementations,
@ -73,15 +77,13 @@ func (r *Listeners) Register(l Listener) error {
return nil
}
// Close removes and closes all listeners for which matchFunc returns true.
// Returns the number of listeners closed.
func (r *Listeners) Close(matchFunc func(listener Listener) bool) int {
todo := make([]Listener, 0)
var todo []Listener
r.Lock()
for _, l := range r.Listeners {
if !matchFunc(l) {
continue
}
if _, ok := r.Listeners[l.key()]; ok {
if matchFunc(l) {
delete(r.Listeners, l.key())
todo = append(todo, l)
}

View File

@ -23,6 +23,7 @@ type localListener struct {
peer peer.ID
listener manet.Listener
done chan struct{}
}
// ForwardLocal creates new P2P stream to a remote listener.
@ -32,6 +33,7 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I
p2p: p2p,
proto: proto,
peer: peer,
done: make(chan struct{}),
}
maListener, err := manet.Listen(bindAddr)
@ -98,6 +100,11 @@ func (l *localListener) setupStream(local manet.Conn) {
func (l *localListener) close() {
l.listener.Close()
close(l.done)
}
func (l *localListener) Done() <-chan struct{} {
return l.done
}
func (l *localListener) Protocol() protocol.ID {

View File

@ -25,6 +25,8 @@ type remoteListener struct {
// reportRemote if set to true makes the handler send '<base58 remote peerid>\n'
// to target before any data is forwarded
reportRemote bool
done chan struct{}
}
// ForwardRemote creates new p2p listener.
@ -36,6 +38,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu
addr: addr,
reportRemote: reportRemote,
done: make(chan struct{}),
}
if err := p2p.ListenersP2P.Register(listener); err != nil {
@ -99,7 +102,13 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr {
return l.addr
}
func (l *remoteListener) close() {}
func (l *remoteListener) close() {
close(l.done)
}
func (l *remoteListener) Done() <-chan struct{} {
return l.done
}
func (l *remoteListener) key() protocol.ID {
return l.proto

104
test/cli/api_file_test.go Normal file
View File

@ -0,0 +1,104 @@
package cli
import (
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/require"
)
// TestAddressFileReady verifies that when address files ($IPFS_PATH/api and
// $IPFS_PATH/gateway) are created, the corresponding HTTP servers are ready
// to accept connections immediately. This prevents race conditions for tools
// like systemd path units that start services when these files appear.
func TestAddressFileReady(t *testing.T) {
t.Parallel()
t.Run("api file", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
node := h.NewNode().Init()
// Start daemon in background (don't use StartDaemon which waits for API)
res := node.Runner.MustRun(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"daemon"},
RunFunc: (*exec.Cmd).Start,
})
node.Daemon = res
defer node.StopDaemon()
// Poll for api file to appear
apiFile := filepath.Join(node.Dir, "api")
var fileExists bool
for i := 0; i < 100; i++ {
if _, err := os.Stat(apiFile); err == nil {
fileExists = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, fileExists, "api file should be created")
// Read the api file to get the address
apiAddr, err := node.TryAPIAddr()
require.NoError(t, err)
// Extract IP and port from multiaddr
ip, err := apiAddr.ValueForProtocol(4) // P_IP4
require.NoError(t, err)
port, err := apiAddr.ValueForProtocol(6) // P_TCP
require.NoError(t, err)
// Immediately try to use the API - should work on first attempt
url := "http://" + ip + ":" + port + "/api/v0/id"
resp, err := http.Post(url, "", nil)
require.NoError(t, err, "RPC API should be ready immediately when api file exists")
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
})
t.Run("gateway file", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
node := h.NewNode().Init()
// Start daemon in background
res := node.Runner.MustRun(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"daemon"},
RunFunc: (*exec.Cmd).Start,
})
node.Daemon = res
defer node.StopDaemon()
// Poll for gateway file to appear
gatewayFile := filepath.Join(node.Dir, "gateway")
var fileExists bool
for i := 0; i < 100; i++ {
if _, err := os.Stat(gatewayFile); err == nil {
fileExists = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, fileExists, "gateway file should be created")
// Read the gateway file to get the URL (already includes http:// prefix)
gatewayURL, err := os.ReadFile(gatewayFile)
require.NoError(t, err)
// Immediately try to use the Gateway - should work on first attempt
url := strings.TrimSpace(string(gatewayURL)) + "/ipfs/bafkqaaa" // empty file CID
resp, err := http.Get(url)
require.NoError(t, err, "Gateway should be ready immediately when gateway file exists")
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
})
}

View File

@ -39,7 +39,9 @@ func TestBackupBootstrapPeers(t *testing.T) {
// Start 1 and 2. 2 does not know anyone yet.
nodes[1].StartDaemon()
defer nodes[1].StopDaemon()
nodes[2].StartDaemon()
defer nodes[2].StopDaemon()
assert.Len(t, nodes[1].Peers(), 0)
assert.Len(t, nodes[2].Peers(), 0)
@ -51,6 +53,7 @@ func TestBackupBootstrapPeers(t *testing.T) {
// Start 0, wait a bit. Should connect to 1, and then discover 2 via the
// backup bootstrap peers.
nodes[0].StartDaemon()
defer nodes[0].StopDaemon()
time.Sleep(time.Millisecond * 500)
// Check if they're all connected.

View File

@ -22,7 +22,9 @@ func TestBitswapConfig(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
provider := h.NewNode().Init().StartDaemon()
defer provider.StopDaemon()
requester := h.NewNode().Init().StartDaemon()
defer requester.StopDaemon()
hash := provider.IPFSAddStr(string(testData))
requester.Connect(provider)
@ -38,8 +40,10 @@ func TestBitswapConfig(t *testing.T) {
provider := h.NewNode().Init()
provider.SetIPFSConfig("Bitswap.ServerEnabled", false)
provider = provider.StartDaemon()
defer provider.StopDaemon()
requester := h.NewNode().Init().StartDaemon()
defer requester.StopDaemon()
hash := provider.IPFSAddStr(string(testData))
requester.Connect(provider)
@ -70,8 +74,10 @@ func TestBitswapConfig(t *testing.T) {
requester := h.NewNode().Init()
requester.SetIPFSConfig("Bitswap.ServerEnabled", false)
requester.StartDaemon()
defer requester.StopDaemon()
provider := h.NewNode().Init().StartDaemon()
defer provider.StopDaemon()
hash := provider.IPFSAddStr(string(testData))
requester.Connect(provider)
@ -91,8 +97,10 @@ func TestBitswapConfig(t *testing.T) {
cfg.HTTPRetrieval.Enabled = config.True
})
requester.StartDaemon()
defer requester.StopDaemon()
provider := h.NewNode().Init().StartDaemon()
defer provider.StopDaemon()
hash := provider.IPFSAddStr(string(testData))
requester.Connect(provider)
@ -126,7 +134,9 @@ func TestBitswapConfig(t *testing.T) {
cfg.HTTPRetrieval.Enabled = config.True
})
provider = provider.StartDaemon()
defer provider.StopDaemon()
requester := h.NewNode().Init().StartDaemon()
defer requester.StopDaemon()
requester.Connect(provider)
// read libp2p identify from remote peer, and print protocols

View File

@ -76,6 +76,7 @@ func TestContentBlocking(t *testing.T) {
// Start daemon, it should pick up denylist from $IPFS_PATH/denylists/test.deny
node.StartDaemon() // we need online mode for GatewayOverLibp2p tests
t.Cleanup(func() { node.StopDaemon() })
client := node.GatewayClient()
// First, confirm gateway works

View File

@ -47,6 +47,8 @@ func TestDag(t *testing.T) {
t.Run("ipfs dag stat --enc=json", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Import fixture
r, err := os.Open(fixtureFile)
assert.Nil(t, err)
@ -91,6 +93,7 @@ func TestDag(t *testing.T) {
t.Run("ipfs dag stat", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
r, err := os.Open(fixtureFile)
assert.NoError(t, err)
defer r.Close()

View File

@ -60,6 +60,10 @@ func TestRoutingV1Proxy(t *testing.T) {
})
nodes[2].StartDaemon()
t.Cleanup(func() {
nodes.StopDaemons()
})
// Connect them.
nodes.Connect()

View File

@ -32,6 +32,7 @@ func TestRoutingV1Server(t *testing.T) {
})
})
nodes.StartDaemons().Connect()
t.Cleanup(func() { nodes.StopDaemons() })
return nodes
}
@ -133,6 +134,7 @@ func TestRoutingV1Server(t *testing.T) {
cfg.Routing.Type = config.NewOptionalString("dht")
})
node.StartDaemon()
defer node.StopDaemon()
// Put IPNS record in lonely node. It should be accepted as it is a valid record.
c, err = client.New(node.GatewayURL())
@ -196,6 +198,7 @@ func TestRoutingV1Server(t *testing.T) {
}
})
node.StartDaemon()
defer node.StopDaemon()
c, err := client.New(node.GatewayURL())
require.NoError(t, err)
@ -238,6 +241,7 @@ func TestRoutingV1Server(t *testing.T) {
cfg.Bootstrap = autoconf.FallbackBootstrapPeers
})
node.StartDaemon()
defer node.StopDaemon()
c, err := client.New(node.GatewayURL())
require.NoError(t, err)

View File

@ -16,6 +16,7 @@ func TestDHTAutoclient(t *testing.T) {
node.IPFS("config", "Routing.Type", "autoclient")
})
nodes.StartDaemons().Connect()
t.Cleanup(func() { nodes.StopDaemons() })
t.Run("file added on node in client mode is retrievable from node in client mode", func(t *testing.T) {
t.Parallel()

View File

@ -22,6 +22,7 @@ func TestDHTOptimisticProvide(t *testing.T) {
})
nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
hash := nodes[0].IPFSAddStr(string(random.Bytes(100)))
nodes[0].IPFS("routing", "provide", hash)

View File

@ -19,6 +19,7 @@ func TestFilesCp(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create simple text file
data := "testing files cp command"
@ -36,6 +37,7 @@ func TestFilesCp(t *testing.T) {
t.Run("files cp with unsupported DAG node type fails", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// MFS UnixFS is limited to dag-pb or raw, so we create a dag-cbor node to test this
jsonData := `{"data": "not a UnixFS node"}`
@ -53,6 +55,7 @@ func TestFilesCp(t *testing.T) {
t.Run("files cp with invalid UnixFS data structure fails", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create an invalid proto file
data := []byte{0xDE, 0xAD, 0xBE, 0xEF} // Invalid protobuf data
@ -75,6 +78,7 @@ func TestFilesCp(t *testing.T) {
t.Run("files cp with raw node succeeds", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create a raw node
data := "raw data"
@ -98,6 +102,7 @@ func TestFilesCp(t *testing.T) {
t.Run("files cp creates intermediate directories with -p", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create a simple text file and add it to IPFS
data := "hello parent directories"
@ -130,6 +135,7 @@ func TestFilesRm(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create a file to remove
node.IPFS("files", "mkdir", "/test-dir")
@ -149,6 +155,7 @@ func TestFilesRm(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create a file to remove
node.IPFS("files", "mkdir", "/test-dir")
@ -166,6 +173,7 @@ func TestFilesRm(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create a file to remove
node.IPFS("files", "mkdir", "/test-dir")
@ -186,6 +194,7 @@ func TestFilesNoFlushLimit(t *testing.T) {
t.Run("reaches default limit of 256 operations", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Perform 256 operations with --flush=false (should succeed)
for i := 0; i < 256; i++ {
@ -214,6 +223,7 @@ func TestFilesNoFlushLimit(t *testing.T) {
})
node.StartDaemon()
defer node.StopDaemon()
// Perform 5 operations (should succeed)
for i := 0; i < 5; i++ {
@ -239,6 +249,7 @@ func TestFilesNoFlushLimit(t *testing.T) {
})
node.StartDaemon()
defer node.StopDaemon()
// Do 2 operations with --flush=false
node.IPFS("files", "mkdir", "--flush=false", "/dir1")
@ -271,6 +282,7 @@ func TestFilesNoFlushLimit(t *testing.T) {
})
node.StartDaemon()
defer node.StopDaemon()
// Do 2 operations with --flush=false
node.IPFS("files", "mkdir", "--flush=false", "/dir1")
@ -303,6 +315,7 @@ func TestFilesNoFlushLimit(t *testing.T) {
})
node.StartDaemon()
defer node.StopDaemon()
// Should be able to do many operations without error
for i := 0; i < 300; i++ {
@ -322,6 +335,7 @@ func TestFilesNoFlushLimit(t *testing.T) {
})
node.StartDaemon()
defer node.StopDaemon()
// Mix of different MFS operations (5 operations to hit the limit)
node.IPFS("files", "mkdir", "--flush=false", "/testdir")

View File

@ -4,9 +4,9 @@ bafyreibmdfd7c5db4kls4ty57zljfhqv36gi43l6txl44pi423wwmeskwy 2 53
bafyreie3njilzdi4ixumru4nzgecsnjtu7fzfcwhg7e6s4s5i7cnbslvn4 2 53
Summary
Total Size: 99
Total Size: 99 (99 B)
Unique Blocks: 3
Shared Size: 7
Shared Size: 7 (7 B)
Ratio: 1.070707

View File

@ -28,6 +28,7 @@ func TestGatewayLimits(t *testing.T) {
cfg.Gateway.RetrievalTimeout = config.NewOptionalDuration(1 * time.Second)
})
node.StartDaemon()
defer node.StopDaemon()
// Add content that can be retrieved quickly
cid := node.IPFSAddStr("test content")
@ -69,6 +70,7 @@ func TestGatewayLimits(t *testing.T) {
cfg.Gateway.RetrievalTimeout = config.NewOptionalDuration(2 * time.Second)
})
node.StartDaemon()
defer node.StopDaemon()
// Add some content - use a non-existent CID that will block during retrieval
// to ensure we can control timing

View File

@ -27,6 +27,7 @@ func TestGatewayHAMTDirectory(t *testing.T) {
// Start node
h := harness.NewT(t)
node := h.NewNode().Init("--empty-repo", "--profile=test").StartDaemon("--offline")
defer node.StopDaemon()
client := node.GatewayClient()
// Import fixtures
@ -56,6 +57,7 @@ func TestGatewayHAMTRanges(t *testing.T) {
// Start node
h := harness.NewT(t)
node := h.NewNode().Init("--empty-repo", "--profile=test").StartDaemon("--offline")
t.Cleanup(func() { node.StopDaemon() })
client := node.GatewayClient()
// Import fixtures

View File

@ -28,6 +28,7 @@ func TestGateway(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon("--offline")
t.Cleanup(func() { node.StopDaemon() })
cid := node.IPFSAddStr("Hello Worlds!")
peerID, err := peer.ToCid(node.PeerID()).StringOfBase(multibase.Base36)
@ -234,6 +235,7 @@ func TestGateway(t *testing.T) {
cfg.API.HTTPHeaders = map[string][]string{header: values}
})
node.StartDaemon()
defer node.StopDaemon()
resp := node.APIClient().DisableRedirects().Get("/webui/")
assert.Equal(t, resp.Headers.Values(header), values)
@ -257,6 +259,7 @@ func TestGateway(t *testing.T) {
t.Run("pprof", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
t.Cleanup(func() { node.StopDaemon() })
apiClient := node.APIClient()
t.Run("mutex", func(t *testing.T) {
t.Parallel()
@ -300,6 +303,7 @@ func TestGateway(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
node := h.NewNode().Init().StartDaemon()
t.Cleanup(func() { node.StopDaemon() })
h.WriteFile("index/index.html", "<p></p>")
cid := node.IPFS("add", "-Q", "-r", filepath.Join(h.Dir, "index")).Stderr.Trimmed()
@ -367,6 +371,7 @@ func TestGateway(t *testing.T) {
cfg.Addresses.Gateway = config.Strings{"/ip4/127.0.0.1/tcp/32563"}
})
node.StartDaemon()
defer node.StopDaemon()
b, err := os.ReadFile(filepath.Join(node.Dir, "gateway"))
require.NoError(t, err)
@ -388,6 +393,7 @@ func TestGateway(t *testing.T) {
assert.NoError(t, err)
nodes.StartDaemons().Connect()
t.Cleanup(func() { nodes.StopDaemons() })
t.Run("not present", func(t *testing.T) {
cidFoo := node2.IPFSAddStr("foo")
@ -460,6 +466,7 @@ func TestGateway(t *testing.T) {
}
})
node.StartDaemon()
defer node.StopDaemon()
cidFoo := node.IPFSAddStr("foo")
client := node.GatewayClient()
@ -509,6 +516,7 @@ func TestGateway(t *testing.T) {
node := harness.NewT(t).NewNode().Init()
node.StartDaemon()
defer node.StopDaemon()
client := node.GatewayClient()
res := client.Get("/ipfs/invalid-thing", func(r *http.Request) {
@ -526,6 +534,7 @@ func TestGateway(t *testing.T) {
cfg.Gateway.DisableHTMLErrors = config.True
})
node.StartDaemon()
defer node.StopDaemon()
client := node.GatewayClient()
res := client.Get("/ipfs/invalid-thing", func(r *http.Request) {
@ -546,6 +555,7 @@ func TestLogs(t *testing.T) {
t.Setenv("GOLOG_LOG_LEVEL", "info")
node := h.NewNode().Init().StartDaemon("--offline")
defer node.StopDaemon()
cid := node.IPFSAddStr("Hello Worlds!")
peerID, err := peer.ToCid(node.PeerID()).StringOfBase(multibase.Base36)

View File

@ -32,6 +32,7 @@ func TestGatewayOverLibp2p(t *testing.T) {
p2pProxyNode := nodes[1]
nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
// Add data to the gateway node
cidDataOnGatewayNode := cid.MustParse(gwNode.IPFSAddStr("Hello Worlds2!"))
@ -65,6 +66,7 @@ func TestGatewayOverLibp2p(t *testing.T) {
// Enable the experimental feature and reconnect the nodes
gwNode.IPFS("config", "--json", "Experimental.GatewayOverLibp2p", "true")
gwNode.StopDaemon().StartDaemon()
t.Cleanup(func() { gwNode.StopDaemon() })
nodes.Connect()
// Note: the bare HTTP requests here assume that the gateway is mounted at `/`

View File

@ -75,6 +75,7 @@ func TestHTTPRetrievalClient(t *testing.T) {
// Start Kubo
node.StartDaemon()
defer node.StopDaemon()
if debug {
fmt.Printf("delegatedRoutingServer.URL: %s\n", delegatedRoutingServer.URL)

View File

@ -155,6 +155,7 @@ func TestInit(t *testing.T) {
t.Run("ipfs init should not run while daemon is running", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("init")
assert.NotEqual(t, 0, res.ExitErr.ExitCode())
assert.Contains(t, res.Stderr.String(), "Error: ipfs daemon is running. please stop it to run this command")

View File

@ -103,6 +103,7 @@ func TestName(t *testing.T) {
})
node.StartDaemon()
defer node.StopDaemon()
t.Run("Resolving self offline succeeds (daemon on)", func(t *testing.T) {
res = node.IPFS("name", "resolve", "--offline", "/ipns/"+name.String())
@ -147,6 +148,7 @@ func TestName(t *testing.T) {
t.Run("Fails to publish in offline mode", func(t *testing.T) {
t.Parallel()
node := makeDaemon(t, nil).StartDaemon("--offline")
defer node.StopDaemon()
res := node.RunIPFS("name", "publish", "/ipfs/"+fixtureCid)
require.Error(t, res.Err)
require.Equal(t, 1, res.ExitCode())
@ -157,6 +159,7 @@ func TestName(t *testing.T) {
t.Parallel()
node := makeDaemon(t, nil).StartDaemon()
defer node.StopDaemon()
ipnsName := ipns.NameFromPeer(node.PeerID()).String()
ipnsPath := ipns.NamespacePrefix + ipnsName
publishPath := "/ipfs/" + fixtureCid
@ -187,6 +190,7 @@ func TestName(t *testing.T) {
t.Parallel()
node := makeDaemon(t, nil).StartDaemon()
t.Cleanup(func() { node.StopDaemon() })
ipnsPath := ipns.NamespacePrefix + ipns.NameFromPeer(node.PeerID()).String()
publishPath := "/ipfs/" + fixtureCid
@ -227,6 +231,7 @@ func TestName(t *testing.T) {
t.Run("Inspect with verification using wrong RSA key errors", func(t *testing.T) {
t.Parallel()
node := makeDaemon(t, nil).StartDaemon()
defer node.StopDaemon()
// Prepare RSA Key 1
res := node.IPFS("key", "gen", "--type=rsa", "--size=4096", "key1")
@ -299,6 +304,7 @@ func TestName(t *testing.T) {
t.Parallel()
node := makeDaemon(t, nil).StartDaemon()
defer node.StopDaemon()
publishPath1 := "/ipfs/" + fixtureCid
publishPath2 := "/ipfs/" + dagCid // Different content
name := ipns.NameFromPeer(node.PeerID())

430
test/cli/p2p_test.go Normal file
View File

@ -0,0 +1,430 @@
package cli
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os/exec"
"slices"
"syscall"
"testing"
"time"
"github.com/ipfs/kubo/core/commands"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/require"
)
// waitForListenerCount waits until the node has exactly the expected number of listeners.
func waitForListenerCount(t *testing.T, node *harness.Node, expectedCount int) {
t.Helper()
require.Eventually(t, func() bool {
lsOut := node.IPFS("p2p", "ls", "--enc=json")
var lsResult commands.P2PLsOutput
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
return false
}
return len(lsResult.Listeners) == expectedCount
}, 5*time.Second, 100*time.Millisecond, "expected %d listeners", expectedCount)
}
// waitForListenerProtocol waits until the node has a listener with the given protocol.
func waitForListenerProtocol(t *testing.T, node *harness.Node, protocol string) {
t.Helper()
require.Eventually(t, func() bool {
lsOut := node.IPFS("p2p", "ls", "--enc=json")
var lsResult commands.P2PLsOutput
if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil {
return false
}
return slices.ContainsFunc(lsResult.Listeners, func(l commands.P2PListenerInfoOutput) bool {
return l.Protocol == protocol
})
}, 5*time.Second, 100*time.Millisecond, "expected listener with protocol %s", protocol)
}
func TestP2PForeground(t *testing.T) {
t.Parallel()
t.Run("listen foreground creates listener and removes on interrupt", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Start foreground listener asynchronously
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/fgtest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/fgtest")
// Send SIGTERM
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Wait for listener to be removed
waitForListenerCount(t, node, 0)
})
t.Run("listen foreground text output on SIGTERM", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/sigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
waitForListenerProtocol(t, node, "/x/sigterm")
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Verify stdout shows "waiting for interrupt" message
stdout := res.Stdout.String()
require.Contains(t, stdout, "waiting for interrupt")
// Note: "Received interrupt, removing listener" message is NOT visible to CLI on SIGTERM
// because the command runs in the daemon via RPC and the response stream closes before
// the message can be emitted. The important behavior is verified in the first test:
// the listener IS removed when SIGTERM is sent.
})
t.Run("forward foreground creates forwarder and removes on interrupt", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Start foreground forwarder asynchronously on node 0
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fgfwd", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[0], 1)
// Send SIGTERM
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Wait for forwarder to be removed
waitForListenerCount(t, nodes[0], 0)
})
t.Run("forward foreground text output on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fwdsigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
waitForListenerCount(t, nodes[0], 1)
_ = res.Cmd.Process.Signal(syscall.SIGTERM)
_ = res.Cmd.Wait()
// Verify stdout shows "waiting for interrupt" message
stdout := res.Stdout.String()
require.Contains(t, stdout, "waiting for interrupt")
// Note: "Received interrupt, removing forwarder" message is NOT visible to CLI on SIGTERM
// because the response stream closes before the message can be emitted.
})
t.Run("listen without foreground returns immediately and persists", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// This should return immediately (not block)
node.IPFS("p2p", "listen", "/x/nofg", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort))
// Listener should still exist
waitForListenerProtocol(t, node, "/x/nofg")
// Clean up
node.IPFS("p2p", "close", "-p", "/x/nofg")
})
t.Run("listen foreground text output on p2p close", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/closetest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/closetest")
// Close the listener via ipfs p2p close command
node.IPFS("p2p", "close", "-p", "/x/closetest")
// Wait for foreground command to exit (it should exit quickly after close)
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - command exited
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit after listener was closed via ipfs p2p close")
}
// Wait for listener to be removed
waitForListenerCount(t, node, 0)
// Verify text output shows BOTH messages when closed via p2p close
// (unlike SIGTERM, the stream is still open so "Received interrupt" is emitted)
out := res.Stdout.String()
require.Contains(t, out, "waiting for interrupt")
require.Contains(t, out, "Received interrupt, removing listener")
})
t.Run("forward foreground text output on p2p close", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
forwardPort := harness.NewRandPort()
// Run without --enc=json to test actual text output users see
res := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/fwdclose", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[0], 1)
// Close the forwarder via ipfs p2p close command
nodes[0].IPFS("p2p", "close", "-a")
// Wait for foreground command to exit
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - command exited
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit after forwarder was closed via ipfs p2p close")
}
// Wait for forwarder to be removed
waitForListenerCount(t, nodes[0], 0)
// Verify text output shows BOTH messages when closed via p2p close
out := res.Stdout.String()
require.Contains(t, out, "waiting for interrupt")
require.Contains(t, out, "Received interrupt, removing forwarder")
})
t.Run("listen foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
httpServerPort := harness.NewRandPort()
forwardPort := harness.NewRandPort()
// Start HTTP server
expectedBody := "Hello from p2p tunnel!"
httpServer := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(expectedBody))
}),
}
listener, err := net.Listen("tcp", httpServer.Addr)
require.NoError(t, err)
go func() { _ = httpServer.Serve(listener) }()
defer httpServer.Close()
// Node 0: listen --foreground
listenRes := nodes[0].Runner.Run(harness.RunRequest{
Path: nodes[0].IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, listenRes.Err)
// Wait for listener to be created
waitForListenerProtocol(t, nodes[0], "/x/httptest")
// Node 1: forward (non-foreground)
nodes[1].IPFS("p2p", "forward", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/"+nodes[0].PeerID().String())
// Verify data flows through tunnel
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
require.NoError(t, err)
require.Equal(t, expectedBody, string(body))
// Clean up forwarder on node 1
nodes[1].IPFS("p2p", "close", "-a")
// SIGTERM the listen --foreground command
_ = listenRes.Cmd.Process.Signal(syscall.SIGTERM)
_ = listenRes.Cmd.Wait()
// Wait for listener to be removed on node 0
waitForListenerCount(t, nodes[0], 0)
})
t.Run("forward foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init()
nodes.ForEachPar(func(n *harness.Node) {
n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
})
nodes.StartDaemons().Connect()
httpServerPort := harness.NewRandPort()
forwardPort := harness.NewRandPort()
// Start HTTP server
expectedBody := "Hello from forward foreground tunnel!"
httpServer := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort),
Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(expectedBody))
}),
}
listener, err := net.Listen("tcp", httpServer.Addr)
require.NoError(t, err)
go func() { _ = httpServer.Serve(listener) }()
defer httpServer.Close()
// Node 0: listen (non-foreground)
nodes[0].IPFS("p2p", "listen", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort))
// Node 1: forward --foreground
forwardRes := nodes[1].Runner.Run(harness.RunRequest{
Path: nodes[1].IPFSBin,
Args: []string{"p2p", "forward", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[0].PeerID().String()},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, forwardRes.Err)
// Wait for forwarder to be created
waitForListenerCount(t, nodes[1], 1)
// Verify data flows through tunnel
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort))
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
require.NoError(t, err)
require.Equal(t, expectedBody, string(body))
// SIGTERM the forward --foreground command
_ = forwardRes.Cmd.Process.Signal(syscall.SIGTERM)
_ = forwardRes.Cmd.Wait()
// Wait for forwarder to be removed on node 1
waitForListenerCount(t, nodes[1], 0)
// Clean up listener on node 0
nodes[0].IPFS("p2p", "close", "-a")
})
t.Run("foreground command exits when daemon shuts down", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true")
node.StartDaemon()
listenPort := harness.NewRandPort()
// Start foreground listener
res := node.Runner.Run(harness.RunRequest{
Path: node.IPFSBin,
Args: []string{"p2p", "listen", "--foreground", "/x/daemontest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)},
RunFunc: (*exec.Cmd).Start,
})
require.NoError(t, res.Err)
// Wait for listener to be created
waitForListenerProtocol(t, node, "/x/daemontest")
// Stop the daemon
node.StopDaemon()
// Wait for foreground command to exit
done := make(chan error, 1)
go func() {
done <- res.Cmd.Wait()
}()
select {
case <-done:
// Good - foreground command exited when daemon stopped
case <-time.After(5 * time.Second):
_ = res.Cmd.Process.Kill()
t.Fatal("foreground command did not exit when daemon was stopped")
}
})
}

View File

@ -62,6 +62,7 @@ func TestPeering(t *testing.T) {
h, nodes := harness.CreatePeerNodes(t, 3, peerings)
nodes.StartDaemons()
defer nodes.StopDaemons()
assertPeerings(h, nodes, peerings)
nodes[0].Disconnect(nodes[1])
@ -74,6 +75,7 @@ func TestPeering(t *testing.T) {
h, nodes := harness.CreatePeerNodes(t, 3, peerings)
nodes.StartDaemons()
defer nodes.StopDaemons()
assertPeerings(h, nodes, peerings)
nodes[2].Disconnect(nodes[1])
@ -85,6 +87,7 @@ func TestPeering(t *testing.T) {
peerings := []harness.Peering{{From: 0, To: 1}, {From: 1, To: 0}, {From: 1, To: 2}}
h, nodes := harness.CreatePeerNodes(t, 3, peerings)
defer nodes.StopDaemons()
nodes[0].StartDaemon()
nodes[1].StartDaemon()
assertPeerings(h, nodes, []harness.Peering{{From: 0, To: 1}, {From: 1, To: 0}})
@ -99,6 +102,7 @@ func TestPeering(t *testing.T) {
h, nodes := harness.CreatePeerNodes(t, 3, peerings)
nodes.StartDaemons()
defer nodes.StopDaemons()
assertPeerings(h, nodes, peerings)
nodes[2].StopDaemon()

View File

@ -28,6 +28,9 @@ func setupTestNode(t *testing.T) *harness.Node {
t.Helper()
node := harness.NewT(t).NewNode().Init()
node.StartDaemon("--offline")
t.Cleanup(func() {
node.StopDaemon()
})
return node
}
@ -498,7 +501,6 @@ func TestPinLsEdgeCases(t *testing.T) {
t.Run("invalid pin type returns error", func(t *testing.T) {
t.Parallel()
node := setupTestNode(t)
defer node.StopDaemon()
// Try to list pins with invalid type
res := node.RunIPFS("pin", "ls", "--type=invalid")
@ -510,7 +512,6 @@ func TestPinLsEdgeCases(t *testing.T) {
t.Run("non-existent path returns proper error", func(t *testing.T) {
t.Parallel()
node := setupTestNode(t)
defer node.StopDaemon()
// Try to list a non-existent CID
fakeCID := "QmNonExistent123456789"
@ -521,7 +522,6 @@ func TestPinLsEdgeCases(t *testing.T) {
t.Run("unpinned CID returns not pinned error", func(t *testing.T) {
t.Parallel()
node := setupTestNode(t)
defer node.StopDaemon()
// Add content but don't pin it explicitly (it's just in blockstore)
unpinnedCID := node.IPFSAddStr("unpinned content", "--pin=false")

View File

@ -15,6 +15,7 @@ func TestPing(t *testing.T) {
t.Run("other", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect()
defer nodes.StopDaemons()
node1 := nodes[0]
node2 := nodes[1]
@ -25,6 +26,7 @@ func TestPing(t *testing.T) {
t.Run("ping unreachable peer", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect()
defer nodes.StopDaemons()
node1 := nodes[0]
badPeer := "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJx"
@ -37,6 +39,7 @@ func TestPing(t *testing.T) {
t.Run("self", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons()
defer nodes.StopDaemons()
node1 := nodes[0]
node2 := nodes[1]
@ -52,6 +55,7 @@ func TestPing(t *testing.T) {
t.Run("0", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect()
defer nodes.StopDaemons()
node1 := nodes[0]
node2 := nodes[1]
@ -63,6 +67,7 @@ func TestPing(t *testing.T) {
t.Run("offline", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect()
defer nodes.StopDaemons()
node1 := nodes[0]
node2 := nodes[1]

View File

@ -51,6 +51,7 @@ func TestRemotePinning(t *testing.T) {
node.IPFS("config", "--json", "Pinning.RemoteServices.svc.Policies.MFS.Enable", "true")
node.StartDaemon()
t.Cleanup(func() { node.StopDaemon() })
node.IPFS("files", "cp", "/ipfs/bafkqaaa", "/mfs-pinning-test-"+uuid.NewString())
node.IPFS("files", "flush")
@ -133,6 +134,8 @@ func TestRemotePinning(t *testing.T) {
t.Run("pin remote service ls --stat", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
_, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken)
@ -155,6 +158,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("adding service with invalid URL fails", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("pin", "remote", "service", "add", "svc", "invalid-service.example.com", "key")
assert.Equal(t, 1, res.ExitCode())
@ -168,6 +172,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("unauthorized pinning service calls fail", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
_, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, "othertoken")
@ -180,6 +185,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("pinning service calls fail when there is a wrong path", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
_, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL+"/invalid-path", authToken)
@ -191,6 +197,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("pinning service calls fail when DNS resolution fails", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
node.IPFS("pin", "remote", "service", "add", "svc", "https://invalid-service.example.com", authToken)
res := node.RunIPFS("pin", "remote", "ls", "--service=svc")
@ -201,6 +208,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("pin remote service rm", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
node.IPFS("pin", "remote", "service", "add", "svc", "https://example.com", authToken)
node.IPFS("pin", "remote", "service", "rm", "svc")
res := node.IPFS("pin", "remote", "service", "ls")
@ -225,6 +233,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("'ipfs pin remote add --background=true'", func(t *testing.T) {
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
svc, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken)
@ -266,6 +275,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("'ipfs pin remote add --background=false'", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
svc, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken)
@ -287,6 +297,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("'ipfs pin remote ls' with multiple statuses", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
svc, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken)
@ -340,6 +351,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("'ipfs pin remote ls' by CID", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
svc, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken)
@ -360,6 +372,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("'ipfs pin remote rm --name' without --force when multiple pins match", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
svc, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken)
@ -388,6 +401,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("'ipfs pin remote rm --name --force' remove multiple pins", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
svc, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken)
@ -408,6 +422,7 @@ func TestRemotePinning(t *testing.T) {
t.Run("'ipfs pin remote rm --force' removes all pins", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
svc, svcURL := runPinningService(t, authToken)
node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken)

View File

@ -26,6 +26,7 @@ func testPins(t *testing.T, args testPinsArgs) {
node := harness.NewT(t).NewNode().Init()
if args.runDaemon {
node.StartDaemon("--offline")
defer node.StopDaemon()
}
strs := []string{"a", "b", "c", "d", "e", "f", "g"}
@ -127,6 +128,7 @@ func testPinsErrorReporting(t *testing.T, args testPinsArgs) {
node := harness.NewT(t).NewNode().Init()
if args.runDaemon {
node.StartDaemon("--offline")
defer node.StopDaemon()
}
randomCID := "Qme8uX5n9hn15pw9p6WcVKoziyyC9LXv4LEgvsmKMULjnV"
res := node.RunIPFS(StrCat("pin", "add", args.pinArg, randomCID)...)
@ -142,6 +144,7 @@ func testPinDAG(t *testing.T, args testPinsArgs) {
node := h.NewNode().Init()
if args.runDaemon {
node.StartDaemon("--offline")
defer node.StopDaemon()
}
bytes := random.Bytes(1 << 20) // 1 MiB
tmpFile := h.WriteToTemp(string(bytes))
@ -168,6 +171,7 @@ func testPinProgress(t *testing.T, args testPinsArgs) {
if args.runDaemon {
node.StartDaemon("--offline")
defer node.StopDaemon()
}
bytes := random.Bytes(1 << 20) // 1 MiB

View File

@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
@ -764,3 +765,81 @@ func TestProvider(t *testing.T) {
})
}
}
// TestHTTPOnlyProviderWithSweepEnabled tests that provider records are correctly
// sent to HTTP routers when Routing.Type="custom" with only HTTP routers configured,
// even when Provide.DHT.SweepEnabled=true (the default since v0.39).
//
// This is a regression test for https://github.com/ipfs/kubo/issues/11089
func TestHTTPOnlyProviderWithSweepEnabled(t *testing.T) {
t.Parallel()
// Track provide requests received by the mock HTTP router
var provideRequests atomic.Int32
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if (r.Method == http.MethodPut || r.Method == http.MethodPost) &&
strings.HasPrefix(r.URL.Path, "/routing/v1/providers") {
provideRequests.Add(1)
w.WriteHeader(http.StatusOK)
} else if strings.HasPrefix(r.URL.Path, "/routing/v1/providers") && r.Method == http.MethodGet {
// Return empty providers for findprovs
w.Header().Set("Content-Type", "application/x-ndjson")
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer mockServer.Close()
h := harness.NewT(t)
node := h.NewNode().Init()
// Explicitly set SweepEnabled=true (the default since v0.39, but be explicit for test clarity)
node.SetIPFSConfig("Provide.DHT.SweepEnabled", true)
node.SetIPFSConfig("Provide.Enabled", true)
// Configure HTTP-only custom routing (no DHT) with explicit Routing.Type=custom
routingConf := map[string]any{
"Type": "custom", // Explicitly set Routing.Type=custom
"Methods": map[string]any{
"provide": map[string]any{"RouterName": "HTTPRouter"},
"get-ipns": map[string]any{"RouterName": "HTTPRouter"},
"put-ipns": map[string]any{"RouterName": "HTTPRouter"},
"find-peers": map[string]any{"RouterName": "HTTPRouter"},
"find-providers": map[string]any{"RouterName": "HTTPRouter"},
},
"Routers": map[string]any{
"HTTPRouter": map[string]any{
"Type": "http",
"Parameters": map[string]any{
"Endpoint": mockServer.URL,
},
},
},
}
node.SetIPFSConfig("Routing", routingConf)
node.StartDaemon()
defer node.StopDaemon()
// Add content and manually provide it
cid := node.IPFSAddStr(time.Now().String())
// Manual provide should succeed even without libp2p peers
res := node.RunIPFS("routing", "provide", cid)
// Check that the command succeeded (exit code 0) and no provide-related errors
assert.Equal(t, 0, res.ExitCode(), "routing provide should succeed with HTTP-only routing and SweepEnabled=true")
assert.NotContains(t, res.Stderr.String(), "cannot provide", "should not have provide errors")
// Verify HTTP router received at least one provide request
assert.Greater(t, provideRequests.Load(), int32(0),
"HTTP router should have received provide requests")
// Verify 'provide stat' works with HTTP-only routing (regression test for stats)
statRes := node.RunIPFS("provide", "stat")
assert.Equal(t, 0, statRes.ExitCode(), "provide stat should succeed with HTTP-only routing")
assert.NotContains(t, statRes.Stderr.String(), "stats not available",
"should not report stats unavailable")
// LegacyProvider outputs "TotalReprovides:" in its stats
assert.Contains(t, statRes.Stdout.String(), "TotalReprovides:",
"should show legacy provider stats")
}

View File

@ -26,6 +26,7 @@ func TestRcmgr(t *testing.T) {
})
node.StartDaemon()
defer node.StopDaemon()
t.Run("swarm resources should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "resources")
@ -41,6 +42,7 @@ func TestRcmgr(t *testing.T) {
cfg.Swarm.ResourceMgr.Enabled = config.False
})
node.StartDaemon()
defer node.StopDaemon()
t.Run("swarm resources should fail", func(t *testing.T) {
res := node.RunIPFS("swarm", "resources")
@ -56,6 +58,7 @@ func TestRcmgr(t *testing.T) {
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(1000)
})
node.StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("swarm", "resources", "--enc=json")
require.Equal(t, 0, res.ExitCode())
@ -73,7 +76,9 @@ func TestRcmgr(t *testing.T) {
node.UpdateConfig(func(cfg *config.Config) {
cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(1000)
})
node.StartDaemon()
t.Cleanup(func() { node.StopDaemon() })
t.Run("conns and streams are above 800 for default connmgr settings", func(t *testing.T) {
t.Parallel()
@ -135,6 +140,7 @@ func TestRcmgr(t *testing.T) {
overrides.System.ConnsInbound = rcmgr.Unlimited
})
node.StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
@ -150,6 +156,7 @@ func TestRcmgr(t *testing.T) {
overrides.Transient.Memory = 88888
})
node.StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
@ -163,6 +170,7 @@ func TestRcmgr(t *testing.T) {
overrides.Service = map[string]rcmgr.ResourceLimits{"foo": {Memory: 77777}}
})
node.StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
@ -176,6 +184,7 @@ func TestRcmgr(t *testing.T) {
overrides.Protocol = map[protocol.ID]rcmgr.ResourceLimits{"foo": {Memory: 66666}}
})
node.StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
@ -191,6 +200,7 @@ func TestRcmgr(t *testing.T) {
overrides.Peer = map[peer.ID]rcmgr.ResourceLimits{validPeerID: {Memory: 55555}}
})
node.StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("swarm", "resources", "--enc=json")
limits := unmarshalLimits(t, res.Stdout.Bytes())
@ -218,6 +228,7 @@ func TestRcmgr(t *testing.T) {
})
nodes.StartDaemons()
t.Cleanup(func() { nodes.StopDaemons() })
t.Run("node 0 should fail to connect to and ping node 1", func(t *testing.T) {
t.Parallel()

View File

@ -57,6 +57,7 @@ func testRoutingDHT(t *testing.T, enablePubsub bool) {
}
nodes.StartDaemons(daemonArgs...).Connect()
t.Cleanup(func() { nodes.StopDaemons() })
t.Run("ipfs routing findpeer", func(t *testing.T) {
t.Parallel()
@ -157,6 +158,7 @@ func testSelfFindDHT(t *testing.T) {
})
nodes.StartDaemons()
defer nodes.StopDaemons()
res := nodes[0].RunIPFS("dht", "findpeer", nodes[0].PeerID().String())
assert.Equal(t, 1, res.ExitCode())

View File

@ -14,6 +14,7 @@ func TestStats(t *testing.T) {
t.Run("stats dht", func(t *testing.T) {
t.Parallel()
nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect()
defer nodes.StopDaemons()
node1 := nodes[0]
res := node1.IPFS("stats", "dht")

View File

@ -31,6 +31,7 @@ func TestSwarm(t *testing.T) {
t.Run("ipfs swarm peers returns empty peers when a node is not connected to any peers", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
res := node.RunIPFS("swarm", "peers", "--enc=json", "--identify")
var output expectedOutputType
err := json.Unmarshal(res.Stdout.Bytes(), &output)
@ -40,7 +41,9 @@ func TestSwarm(t *testing.T) {
t.Run("ipfs swarm peers with flag identify outputs expected identify information about connected peers", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
otherNode := harness.NewT(t).NewNode().Init().StartDaemon()
defer otherNode.StopDaemon()
node.Connect(otherNode)
res := node.RunIPFS("swarm", "peers", "--enc=json", "--identify")
@ -67,7 +70,9 @@ func TestSwarm(t *testing.T) {
t.Run("ipfs swarm peers with flag identify outputs Identify field with data that matches calling ipfs id on a peer", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
otherNode := harness.NewT(t).NewNode().Init().StartDaemon()
defer otherNode.StopDaemon()
node.Connect(otherNode)
otherNodeIDResponse := otherNode.RunIPFS("id", "--enc=json")

View File

@ -76,6 +76,7 @@ func TestTracing(t *testing.T) {
node.Runner.Env["OTEL_EXPORTER_OTLP_PROTOCOL"] = "grpc"
node.Runner.Env["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317"
node.StartDaemon()
defer node.StopDaemon()
assert.Eventually(t,
func() bool {

View File

@ -74,6 +74,7 @@ func TestTransports(t *testing.T) {
t.Parallel()
nodes := tcpNodes(t).StartDaemons().Connect()
runTests(nodes)
nodes.StopDaemons()
})
t.Run("tcp with NOISE", func(t *testing.T) {
@ -86,6 +87,7 @@ func TestTransports(t *testing.T) {
})
nodes.StartDaemons().Connect()
runTests(nodes)
nodes.StopDaemons()
})
t.Run("QUIC", func(t *testing.T) {
@ -104,6 +106,7 @@ func TestTransports(t *testing.T) {
disableRouting(nodes)
nodes.StartDaemons().Connect()
runTests(nodes)
nodes.StopDaemons()
})
t.Run("QUIC+Webtransport", func(t *testing.T) {
@ -122,6 +125,7 @@ func TestTransports(t *testing.T) {
disableRouting(nodes)
nodes.StartDaemons().Connect()
runTests(nodes)
nodes.StopDaemons()
})
t.Run("QUIC connects with non-dialable transports", func(t *testing.T) {
@ -144,6 +148,7 @@ func TestTransports(t *testing.T) {
disableRouting(nodes)
nodes.StartDaemons().Connect()
runTests(nodes)
nodes.StopDaemons()
})
t.Run("WebRTC Direct", func(t *testing.T) {
@ -162,5 +167,6 @@ func TestTransports(t *testing.T) {
disableRouting(nodes)
nodes.StartDaemons().Connect()
runTests(nodes)
nodes.StopDaemons()
})
}