diff --git a/cmd/ipfs/kubo/daemon.go b/cmd/ipfs/kubo/daemon.go index 133fcb435..6d5a209cb 100644 --- a/cmd/ipfs/kubo/daemon.go +++ b/cmd/ipfs/kubo/daemon.go @@ -887,23 +887,38 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error return nil, fmt.Errorf("serveHTTPApi: ConstructNode() failed: %s", err) } + // Buffer channel to prevent deadlock when multiple servers write errors simultaneously + errc := make(chan error, len(listeners)) + var wg sync.WaitGroup + + // Start all servers and wait for them to be ready before writing api file. + // This prevents race conditions where external tools (like systemd path units) + // see the file and try to connect before servers can accept connections. if len(listeners) > 0 { - // Only add an api file if the API is running. + readyChannels := make([]chan struct{}, len(listeners)) + for i, lis := range listeners { + readyChannels[i] = make(chan struct{}) + ready := readyChannels[i] + wg.Go(func() { + errc <- corehttp.ServeWithReady(node, manet.NetListener(lis), ready, opts...) + }) + } + + // Wait for all listeners to be ready or any to fail + for _, ready := range readyChannels { + select { + case <-ready: + // This listener is ready + case err := <-errc: + return nil, fmt.Errorf("serveHTTPApi: %w", err) + } + } + if err := node.Repo.SetAPIAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())); err != nil { return nil, fmt.Errorf("serveHTTPApi: SetAPIAddr() failed: %w", err) } } - errc := make(chan error) - var wg sync.WaitGroup - for _, apiLis := range listeners { - wg.Add(1) - go func(lis manet.Listener) { - defer wg.Done() - errc <- corehttp.Serve(node, manet.NetListener(lis), opts...) - }(apiLis) - } - go func() { wg.Wait() close(errc) @@ -1062,26 +1077,42 @@ func serveHTTPGateway(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, e return nil, fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err) } + // Buffer channel to prevent deadlock when multiple servers write errors simultaneously + errc := make(chan error, len(listeners)) + var wg sync.WaitGroup + + // Start all servers and wait for them to be ready before writing gateway file. + // This prevents race conditions where external tools (like systemd path units) + // see the file and try to connect before servers can accept connections. if len(listeners) > 0 { + readyChannels := make([]chan struct{}, len(listeners)) + for i, lis := range listeners { + readyChannels[i] = make(chan struct{}) + ready := readyChannels[i] + wg.Go(func() { + errc <- corehttp.ServeWithReady(node, manet.NetListener(lis), ready, opts...) + }) + } + + // Wait for all listeners to be ready or any to fail + for _, ready := range readyChannels { + select { + case <-ready: + // This listener is ready + case err := <-errc: + return nil, fmt.Errorf("serveHTTPGateway: %w", err) + } + } + addr, err := manet.ToNetAddr(rewriteMaddrToUseLocalhostIfItsAny(listeners[0].Multiaddr())) if err != nil { - return nil, fmt.Errorf("serveHTTPGateway: manet.ToIP() failed: %w", err) + return nil, fmt.Errorf("serveHTTPGateway: manet.ToNetAddr() failed: %w", err) } if err := node.Repo.SetGatewayAddr(addr); err != nil { return nil, fmt.Errorf("serveHTTPGateway: SetGatewayAddr() failed: %w", err) } } - errc := make(chan error) - var wg sync.WaitGroup - for _, lis := range listeners { - wg.Add(1) - go func(lis manet.Listener) { - defer wg.Done() - errc <- corehttp.Serve(node, manet.NetListener(lis), opts...) - }(lis) - } - go func() { wg.Wait() close(errc) diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index 6827e46fa..caf7a5474 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -7,6 +7,7 @@ import ( "io" "path" + "github.com/dustin/go-humanize" "github.com/ipfs/kubo/core/commands/cmdenv" "github.com/ipfs/kubo/core/commands/cmdutils" @@ -349,7 +350,11 @@ type DagStatSummary struct { } func (s *DagStatSummary) String() string { - return fmt.Sprintf("Total Size: %d\nUnique Blocks: %d\nShared Size: %d\nRatio: %f", s.TotalSize, s.UniqueBlocks, s.SharedSize, s.Ratio) + return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nShared Size: %d (%s)\nRatio: %f", + s.TotalSize, humanize.Bytes(s.TotalSize), + s.UniqueBlocks, + s.SharedSize, humanize.Bytes(s.SharedSize), + s.Ratio) } func (s *DagStatSummary) incrementTotalSize(size uint64) { @@ -384,7 +389,7 @@ Note: This command skips duplicate blocks in reporting both size and the number cmds.StringArg("root", true, true, "CID of a DAG root to get statistics for").EnableStdin(), }, Options: []cmds.Option{ - cmds.BoolOption(progressOptionName, "p", "Return progressive data while reading through the DAG").WithDefault(true), + cmds.BoolOption(progressOptionName, "p", "Show progress on stderr. Auto-detected if stderr is a terminal."), }, Run: dagStat, Type: DagStatSummary{}, diff --git a/core/commands/dag/stat.go b/core/commands/dag/stat.go index bb9be7e0d..916aae71a 100644 --- a/core/commands/dag/stat.go +++ b/core/commands/dag/stat.go @@ -5,6 +5,7 @@ import ( "io" "os" + "github.com/dustin/go-humanize" mdag "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/boxo/ipld/merkledag/traverse" cid "github.com/ipfs/go-cid" @@ -19,7 +20,11 @@ import ( // to compute the new state func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - progressive := req.Options[progressOptionName].(bool) + // Default to true (emit intermediate states) for HTTP/RPC clients that want progress + progressive := true + if val, specified := req.Options[progressOptionName].(bool); specified { + progressive = val + } api, err := cmdenv.GetApi(env, req) if err != nil { return err @@ -84,6 +89,18 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) } func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error { + // Determine whether to show progress based on TTY detection or explicit flag + var showProgress bool + val, specified := res.Request().Options[progressOptionName] + if !specified { + // Auto-detect: show progress only if stderr is a TTY + if errStat, err := os.Stderr.Stat(); err == nil { + showProgress = (errStat.Mode() & os.ModeCharDevice) != 0 + } + } else { + showProgress = val.(bool) + } + var dagStats *DagStatSummary for { v, err := res.Next() @@ -96,17 +113,26 @@ func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error { switch out := v.(type) { case *DagStatSummary: dagStats = out - if dagStats.Ratio == 0 { - length := len(dagStats.DagStatsArray) - if length > 0 { - currentStat := dagStats.DagStatsArray[length-1] - fmt.Fprintf(os.Stderr, "CID: %s, Size: %d, NumBlocks: %d\n", currentStat.Cid, currentStat.Size, currentStat.NumBlocks) + // Ratio == 0 means this is a progress update (not final result) + if showProgress && dagStats.Ratio == 0 { + // Sum up total progress across all DAGs being scanned + var totalBlocks int64 + var totalSize uint64 + for _, stat := range dagStats.DagStatsArray { + totalBlocks += stat.NumBlocks + totalSize += stat.Size } + fmt.Fprintf(os.Stderr, "Fetched/Processed %d blocks, %d bytes (%s)\r", totalBlocks, totalSize, humanize.Bytes(totalSize)) } default: return e.TypeErr(out, v) - } } + + // Clear the progress line before final output + if showProgress { + fmt.Fprint(os.Stderr, "\033[2K\r") + } + return re.Emit(dagStats) } diff --git a/core/commands/p2p.go b/core/commands/p2p.go index 1fbdc8a28..1de0bfca3 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -50,9 +50,17 @@ type P2PStreamsOutput struct { Streams []P2PStreamInfoOutput } +// P2PForegroundOutput is output type for foreground mode status messages +type P2PForegroundOutput struct { + Status string // "active" or "closing" + Protocol string + Address string +} + const ( allowCustomProtocolOptionName = "allow-custom-protocol" reportPeerIDOptionName = "report-peer-id" + foregroundOptionName = "foreground" ) var resolveTimeout = 10 * time.Second @@ -83,15 +91,37 @@ var p2pForwardCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Forward connections to libp2p service.", ShortDescription: ` -Forward connections made to to . +Forward connections made to to via libp2p. - specifies the libp2p protocol name to use for libp2p -connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'. +Creates a local TCP listener that tunnels connections through libp2p to a +remote peer's p2p listener. Similar to SSH port forwarding (-L flag). -Example: - ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /p2p/QmPeer - - Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /p2p/QmPeer +ARGUMENTS: + Protocol name (must start with '` + P2PProtoPrefix + `') + Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000) + Remote peer multiaddr (e.g., /p2p/PeerID) + +FOREGROUND MODE (--foreground, -f): + + By default, the forwarder runs in the daemon and the command returns + immediately. Use --foreground to block until interrupted: + + - Ctrl+C or SIGTERM: Removes the forwarder and exits + - 'ipfs p2p close': Removes the forwarder and exits + - Daemon shutdown: Forwarder is automatically removed + + Useful for systemd services or scripts that need cleanup on exit. + +EXAMPLES: + + # Persistent forwarder (command returns immediately) + ipfs p2p forward /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID + + # Temporary forwarder (removed when command exits) + ipfs p2p forward -f /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID + +Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md `, }, Arguments: []cmds.Argument{ @@ -101,6 +131,7 @@ Example: }, Options: []cmds.Option{ cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"), + cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; forwarder is removed when command exits"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { n, err := p2pGetNode(env) @@ -130,7 +161,51 @@ Example: return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace") } - return forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets) + listener, err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets) + if err != nil { + return err + } + + foreground, _ := req.Options[foregroundOptionName].(bool) + if foreground { + if err := res.Emit(&P2PForegroundOutput{ + Status: "active", + Protocol: protoOpt, + Address: listenOpt, + }); err != nil { + return err + } + // Wait for either context cancellation (Ctrl+C/daemon shutdown) + // or listener removal (ipfs p2p close) + select { + case <-req.Context.Done(): + // SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing) + n.P2P.ListenersLocal.Close(func(l p2p.Listener) bool { + return l == listener + }) + return nil + case <-listener.Done(): + // Closed via "ipfs p2p close" - emit closing message + return res.Emit(&P2PForegroundOutput{ + Status: "closing", + Protocol: protoOpt, + Address: listenOpt, + }) + } + } + + return nil + }, + Type: P2PForegroundOutput{}, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error { + if out.Status == "active" { + fmt.Fprintf(w, "Forwarding %s to %s, waiting for interrupt...\n", out.Protocol, out.Address) + } else if out.Status == "closing" { + fmt.Fprintf(w, "Received interrupt, removing forwarder for %s\n", out.Protocol) + } + return nil + }), }, } @@ -185,14 +260,40 @@ var p2pListenCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Create libp2p service.", ShortDescription: ` -Create libp2p service and forward connections made to . +Create a libp2p protocol handler that forwards incoming connections to +. - specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'. +When a remote peer connects using 'ipfs p2p forward', the connection is +forwarded to your local service. Similar to SSH port forwarding (server side). -Example: - ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234 - - Forward connections to 'myproto' libp2p service to 127.0.0.1:1234 +ARGUMENTS: + Protocol name (must start with '` + P2PProtoPrefix + `') + Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000) + +FOREGROUND MODE (--foreground, -f): + + By default, the listener runs in the daemon and the command returns + immediately. Use --foreground to block until interrupted: + + - Ctrl+C or SIGTERM: Removes the listener and exits + - 'ipfs p2p close': Removes the listener and exits + - Daemon shutdown: Listener is automatically removed + + Useful for systemd services or scripts that need cleanup on exit. + +EXAMPLES: + + # Persistent listener (command returns immediately) + ipfs p2p listen /x/myapp /ip4/127.0.0.1/tcp/3000 + + # Temporary listener (removed when command exits) + ipfs p2p listen -f /x/myapp /ip4/127.0.0.1/tcp/3000 + + # Report connecting peer ID to the target application + ipfs p2p listen -r /x/myapp /ip4/127.0.0.1/tcp/3000 + +Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md `, }, Arguments: []cmds.Argument{ @@ -202,6 +303,7 @@ Example: Options: []cmds.Option{ cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"), cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"), + cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; listener is removed when command exits"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { n, err := p2pGetNode(env) @@ -231,8 +333,51 @@ Example: return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace") } - _, err = n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID) - return err + listener, err := n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID) + if err != nil { + return err + } + + foreground, _ := req.Options[foregroundOptionName].(bool) + if foreground { + if err := res.Emit(&P2PForegroundOutput{ + Status: "active", + Protocol: protoOpt, + Address: targetOpt, + }); err != nil { + return err + } + // Wait for either context cancellation (Ctrl+C/daemon shutdown) + // or listener removal (ipfs p2p close) + select { + case <-req.Context.Done(): + // SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing) + n.P2P.ListenersP2P.Close(func(l p2p.Listener) bool { + return l == listener + }) + return nil + case <-listener.Done(): + // Closed via "ipfs p2p close" - emit closing message + return res.Emit(&P2PForegroundOutput{ + Status: "closing", + Protocol: protoOpt, + Address: targetOpt, + }) + } + } + + return nil + }, + Type: P2PForegroundOutput{}, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error { + if out.Status == "active" { + fmt.Fprintf(w, "Listening on %s, forwarding to %s, waiting for interrupt...\n", out.Protocol, out.Address) + } else if out.Status == "closing" { + fmt.Fprintf(w, "Received interrupt, removing listener for %s\n", out.Protocol) + } + return nil + }), }, } @@ -271,11 +416,9 @@ func checkPort(target ma.Multiaddr) error { } // forwardLocal forwards local connections to a libp2p service -func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) error { +func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) (p2p.Listener, error) { ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL) - // TODO: return some info - _, err := p.ForwardLocal(ctx, addr.ID, proto, bindAddr) - return err + return p.ForwardLocal(ctx, addr.ID, proto, bindAddr) } const ( diff --git a/core/corehttp/corehttp.go b/core/corehttp/corehttp.go index 344991923..6749c738b 100644 --- a/core/corehttp/corehttp.go +++ b/core/corehttp/corehttp.go @@ -78,9 +78,23 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv return Serve(n, manet.NetListener(list), options...) } -// Serve accepts incoming HTTP connections on the listener and pass them +// Serve accepts incoming HTTP connections on the listener and passes them // to ServeOption handlers. func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error { + return ServeWithReady(node, lis, nil, options...) +} + +// ServeWithReady is like Serve but signals on the ready channel when the +// server is about to accept connections. The channel is closed right before +// server.Serve() is called. +// +// This is useful for callers that need to perform actions (like writing +// address files) only after the server is guaranteed to be accepting +// connections, avoiding race conditions where clients see the file before +// the server is ready. +// +// Passing nil for ready is equivalent to calling Serve(). +func ServeWithReady(node *core.IpfsNode, lis net.Listener, ready chan<- struct{}, options ...ServeOption) error { // make sure we close this no matter what. defer lis.Close() @@ -107,6 +121,9 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error var serverError error serverClosed := make(chan struct{}) go func() { + if ready != nil { + close(ready) + } serverError = server.Serve(lis) close(serverClosed) }() diff --git a/core/node/provider.go b/core/node/provider.go index a780da3d7..1470112bb 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -692,6 +692,48 @@ See docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#providedhtmaxw // ONLINE/OFFLINE +// hasDHTRouting checks if the routing configuration includes a DHT component. +// Returns false for HTTP-only custom routing configurations (e.g., Routing.Type="custom" +// with only HTTP routers). This is used to determine whether SweepingProviderOpt +// can be used, since it requires a DHT client. +func hasDHTRouting(cfg *config.Config) bool { + routingType := cfg.Routing.Type.WithDefault(config.DefaultRoutingType) + switch routingType { + case "auto", "autoclient", "dht", "dhtclient", "dhtserver": + return true + case "custom": + // Check if any router in custom config is DHT-based + for _, router := range cfg.Routing.Routers { + if routerIncludesDHT(router, cfg) { + return true + } + } + return false + default: // "none", "delegated" + return false + } +} + +// routerIncludesDHT recursively checks if a router configuration includes DHT. +// Handles parallel and sequential composite routers by checking their children. +func routerIncludesDHT(rp config.RouterParser, cfg *config.Config) bool { + switch rp.Type { + case config.RouterTypeDHT: + return true + case config.RouterTypeParallel, config.RouterTypeSequential: + if children, ok := rp.Parameters.(*config.ComposableRouterParams); ok { + for _, child := range children.Routers { + if childRouter, exists := cfg.Routing.Routers[child.RouterName]; exists { + if routerIncludesDHT(childRouter, cfg) { + return true + } + } + } + } + } + return false +} + // OnlineProviders groups units managing provide routing records online func OnlineProviders(provide bool, cfg *config.Config) fx.Option { if !provide { @@ -708,7 +750,15 @@ func OnlineProviders(provide bool, cfg *config.Config) fx.Option { opts := []fx.Option{ fx.Provide(setReproviderKeyProvider(providerStrategy)), } - if cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled) { + + sweepEnabled := cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled) + dhtAvailable := hasDHTRouting(cfg) + + // Use SweepingProvider only when both sweep is enabled AND DHT is available. + // For HTTP-only routing (e.g., Routing.Type="custom" with only HTTP routers), + // fall back to LegacyProvider which works with ProvideManyRouter. + // See https://github.com/ipfs/kubo/issues/11089 + if sweepEnabled && dhtAvailable { opts = append(opts, SweepingProviderOpt(cfg)) } else { reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) diff --git a/docs/changelogs/v0.40.md b/docs/changelogs/v0.40.md index 204a01bf6..9836bc243 100644 --- a/docs/changelogs/v0.40.md +++ b/docs/changelogs/v0.40.md @@ -14,6 +14,8 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team. - [Track total size when adding pins](#track-total-size-when-adding-pins) - [Improved IPNS over PubSub validation](#improved-ipns-over-pubsub-validation) - [New `ipfs diag datastore` commands](#new-ipfs-diag-datastore-commands) + - [🚇 Improved `ipfs p2p` tunnels with foreground mode](#-improved-ipfs-p2p-tunnels-with-foreground-mode) + - [Improved `ipfs dag stat` output](#improved-ipfs-dag-stat-output) - [Skip bad keys when listing](#skip_bad_keys_when_listing) - [📦️ Dependency updates](#-dependency-updates) - [📝 Changelog](#-changelog) @@ -61,6 +63,48 @@ Key: /pubsub/seqno/12D3KooW... Hex Dump: 00000000 18 81 81 c8 91 c0 ea f6 |........| ``` + +#### 🚇 Improved `ipfs p2p` tunnels with foreground mode + +P2P tunnels can now run like SSH port forwarding: start a tunnel, use it, and it cleans up automatically when you're done. + +The new `--foreground` (`-f`) flag for `ipfs p2p listen` and `ipfs p2p forward` keeps the command running until interrupted. When you Ctrl+C, send SIGTERM, or stop the service, the tunnel is removed automatically: + +```console +$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 --foreground +Listening on /x/ssh, forwarding to /ip4/127.0.0.1/tcp/22, waiting for interrupt... +^C +Received interrupt, removing listener for /x/ssh +``` + +Without `--foreground`, commands return immediately and tunnels persist until explicitly closed (existing behavior). + +See [docs/p2p-tunnels.md](https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md) for usage examples. + +#### Improved `ipfs dag stat` output + +The `ipfs dag stat` command has been improved for better terminal UX: + +- Progress output now uses a single line with carriage return, avoiding terminal flooding +- Progress is auto-detected: shown only in interactive terminals by default +- Human-readable sizes are now displayed alongside raw byte counts + +Example progress (interactive terminal): +``` +Fetched/Processed 84 blocks, 2097152 bytes (2.1 MB) +``` + +Example summary output: +``` +Summary +Total Size: 2097152 (2.1 MB) +Unique Blocks: 42 +Shared Size: 1048576 (1.0 MB) +Ratio: 1.500000 +``` + +Use `--progress=true` to force progress even when piped, or `--progress=false` to disable it. + #### Skip bad keys when listing Change the `ipfs key list` behavior to log an error and continue listing keys when a key cannot be read from the keystore or decoded. diff --git a/docs/config.md b/docs/config.md index 77cea0829..23a8261c5 100644 --- a/docs/config.md +++ b/docs/config.md @@ -59,6 +59,7 @@ config file at runtime. - [`Discovery.MDNS.Enabled`](#discoverymdnsenabled) - [`Discovery.MDNS.Interval`](#discoverymdnsinterval) - [`Experimental`](#experimental) + - [`Experimental.Libp2pStreamMounting`](#experimentallibp2pstreammounting) - [`Gateway`](#gateway) - [`Gateway.NoFetch`](#gatewaynofetch) - [`Gateway.NoDNSLink`](#gatewaynodnslink) @@ -1071,11 +1072,26 @@ in the [new mDNS implementation](https://github.com/libp2p/zeroconf#readme). Toggle and configure experimental features of Kubo. Experimental features are listed [here](./experimental-features.md). +### `Experimental.Libp2pStreamMounting` + +Enables the `ipfs p2p` commands for tunneling TCP connections through libp2p +streams, similar to SSH port forwarding. + +See [docs/p2p-tunnels.md](p2p-tunnels.md) for usage examples. + +Default: `false` + +Type: `bool` + ## `Gateway` Options for the HTTP gateway. -**NOTE:** support for `/api/v0` under the gateway path is now deprecated. It will be removed in future versions: . +> [!IMPORTANT] +> By default, Kubo's gateway is configured for local use at `127.0.0.1` and `localhost`. +> To run a public gateway, configure your domain names in [`Gateway.PublicGateways`](#gatewaypublicgateways). +> For production deployment considerations (reverse proxy, timeouts, rate limiting, CDN), +> see [Running in Production](gateway.md#running-in-production). ### `Gateway.NoFetch` @@ -1270,6 +1286,11 @@ Examples: - `*.example.com` will match requests to `http://foo.example.com/ipfs/*` or `http://{cid}.ipfs.bar.example.com/*`. - `foo-*.example.com` will match requests to `http://foo-bar.example.com/ipfs/*` or `http://{cid}.ipfs.foo-xyz.example.com/*`. +> [!IMPORTANT] +> **Reverse Proxy:** If running behind nginx or another reverse proxy, ensure +> `Host` and `X-Forwarded-*` headers are forwarded correctly. +> See [Reverse Proxy Caveats](gateway.md#reverse-proxy) in gateway documentation. + #### `Gateway.PublicGateways: Paths` An array of paths that should be exposed on the hostname. @@ -1336,6 +1357,9 @@ Default: `false` Type: `bool` +> [!IMPORTANT] +> See [Reverse Proxy Caveats](gateway.md#reverse-proxy) if running behind nginx or another reverse proxy. + #### `Gateway.PublicGateways: NoDNSLink` A boolean to configure whether DNSLink for hostname present in `Host` @@ -1346,6 +1370,9 @@ Default: `false` (DNSLink lookup enabled by default for every defined hostname) Type: `bool` +> [!IMPORTANT] +> See [Reverse Proxy Caveats](gateway.md#reverse-proxy) if running behind nginx or another reverse proxy. + #### `Gateway.PublicGateways: InlineDNSLink` An optional flag to explicitly configure whether subdomain gateway's redirects @@ -1413,6 +1440,9 @@ ipfs config --json Gateway.PublicGateways '{"localhost": null }' Below is a list of the most common gateway setups. +> [!IMPORTANT] +> See [Reverse Proxy Caveats](gateway.md#reverse-proxy) if running behind nginx or another reverse proxy. + - Public [subdomain gateway](https://docs.ipfs.tech/how-to/address-ipfs-on-web/#subdomain-gateway) at `http://{cid}.ipfs.dweb.link` (each content root gets its own Origin) ```console @@ -2197,6 +2227,9 @@ You can compare the effectiveness of sweep mode vs legacy mode by monitoring the > [!NOTE] > This is the default provider system as of Kubo v0.39. To use the legacy provider instead, set `Provide.DHT.SweepEnabled=false`. +> [!NOTE] +> When DHT routing is unavailable (e.g., `Routing.Type=custom` with only HTTP routers), the provider automatically falls back to the legacy provider regardless of this setting. + Default: `true` Type: `flag` @@ -2363,8 +2396,8 @@ Replaced with [`Provide.DHT.MaxWorkers`](#providedhtmaxworkers). ## `Pubsub` -Pubsub configures the `ipfs pubsub` subsystem. To enable, set `Pubsub.Enabled` -to `true`. +Pubsub configures Kubo's opt-in, opinionated [libp2p pubsub](https://docs.libp2p.io/concepts/pubsub/overview/) instance. +To enable, set `Pubsub.Enabled` to `true`. **EXPERIMENTAL:** This is an opt-in feature. Its primary use case is [IPNS over PubSub](https://specs.ipfs.tech/ipns/ipns-pubsub-router/), which diff --git a/docs/experimental-features.md b/docs/experimental-features.md index 358dc58ab..2b490e44a 100644 --- a/docs/experimental-features.md +++ b/docs/experimental-features.md @@ -199,9 +199,8 @@ configured, the daemon will fail to start. ## ipfs p2p -Allows tunneling of TCP connections through Libp2p streams. If you've ever used -port forwarding with SSH (the `-L` option in OpenSSH), this feature is quite -similar. +Allows tunneling of TCP connections through libp2p streams, similar to SSH port +forwarding (`ssh -L`). ### State @@ -220,98 +219,20 @@ Experimental, will be stabilized in 0.6.0 > If you enable this and plan to expose CLI or HTTP RPC to other users or machines, > secure RPC API using [`API.Authorizations`](https://github.com/ipfs/kubo/blob/master/docs/config.md#apiauthorizations) or custom auth middleware. -The `p2p` command needs to be enabled in the config: - ```sh > ipfs config --json Experimental.Libp2pStreamMounting true ``` ### How to use -**Netcat example:** - -First, pick a protocol name for your application. Think of the protocol name as -a port number, just significantly more user-friendly. In this example, we're -going to use `/x/kickass/1.0`. - -***Setup:*** - -1. A "server" node with peer ID `$SERVER_ID` -2. A "client" node. - -***On the "server" node:*** - -First, start your application and have it listen for TCP connections on -port `$APP_PORT`. - -Then, configure the p2p listener by running: - -```sh -> ipfs p2p listen /x/kickass/1.0 /ip4/127.0.0.1/tcp/$APP_PORT -``` - -This will configure IPFS to forward all incoming `/x/kickass/1.0` streams to -`127.0.0.1:$APP_PORT` (opening a new connection to `127.0.0.1:$APP_PORT` per -incoming stream. - -***On the "client" node:*** - -First, configure the client p2p dialer, so that it forwards all inbound -connections on `127.0.0.1:SOME_PORT` to the server node listening -on `/x/kickass/1.0`. - -```sh -> ipfs p2p forward /x/kickass/1.0 /ip4/127.0.0.1/tcp/$SOME_PORT /p2p/$SERVER_ID -``` - -Next, have your application open a connection to `127.0.0.1:$SOME_PORT`. This -connection will be forwarded to the service running on `127.0.0.1:$APP_PORT` on -the remote machine. You can test it with netcat: - -***On "server" node:*** -```sh -> nc -v -l -p $APP_PORT -``` - -***On "client" node:*** -```sh -> nc -v 127.0.0.1 $SOME_PORT -``` - -You should now see that a connection has been established and be able to -exchange messages between netcat instances. - -(note that depending on your netcat version you may need to drop the `-v` flag) - -**SSH example** - -**Setup:** - -1. A "server" node with peer ID `$SERVER_ID` and running ssh server on the - default port. -2. A "client" node. - -_you can get `$SERVER_ID` by running `ipfs id -f "\n"`_ - -***First, on the "server" node:*** - -```sh -ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 -``` - -***Then, on "client" node:*** - -```sh -ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID -``` - -You should now be able to connect to your ssh server through a libp2p connection -with `ssh [user]@127.0.0.1 -p 2222`. - +See [docs/p2p-tunnels.md](p2p-tunnels.md) for usage examples, foreground mode, +and systemd integration. ### Road to being a real feature -- [ ] More documentation +- [x] More documentation +- [x] `ipfs p2p forward` mode +- [ ] Ability to define tunnels via JSON config, similar to [`Peering.Peers`](https://github.com/ipfs/kubo/blob/master/docs/config.md#peeringpeers), see [kubo#5460](https://github.com/ipfs/kubo/issues/5460) ## p2p http proxy diff --git a/docs/gateway.md b/docs/gateway.md index 3a616a158..d51eab4cc 100644 --- a/docs/gateway.md +++ b/docs/gateway.md @@ -6,7 +6,7 @@ they were stored in a traditional web server. [More about Gateways](https://docs.ipfs.tech/concepts/ipfs-gateway/) and [addressing IPFS on the web](https://docs.ipfs.tech/how-to/address-ipfs-on-web/). -Kubo's Gateway implementation follows [ipfs/specs: Specification for HTTP Gateways](https://github.com/ipfs/specs/tree/main/http-gateways#readme). +Kubo's Gateway implementation follows [IPFS Gateway Specifications](https://specs.ipfs.tech/http-gateways/) and is tested with [Gateway Conformance Test Suite](https://github.com/ipfs/gateway-conformance). ### Local gateway @@ -14,14 +14,21 @@ By default, Kubo nodes run a [path gateway](https://docs.ipfs.tech/how-to/address-ipfs-on-web/#path-gateway) at `http://127.0.0.1:8080/` and a [subdomain gateway](https://docs.ipfs.tech/how-to/address-ipfs-on-web/#subdomain-gateway) at `http://localhost:8080/`. -The path one also implements [trustless gateway spec](https://specs.ipfs.tech/http-gateways/trustless-gateway/) -and supports [trustless responses](https://docs.ipfs.tech/reference/http/gateway/#trustless-verifiable-retrieval) as opt-in via `Accept` header. +> [!CAUTION] +> **For browsing websites, web apps, and dapps in a browser, use the subdomain +> gateway** (`localhost`). Each content root gets its own +> [web origin](https://developer.mozilla.org/en-US/docs/Web/Security/Same-origin_policy), +> isolating localStorage, cookies, and session data between sites. +> +> **For file retrieval, use the path gateway** (`127.0.0.1`). Path gateways are +> suited for downloading files or fetching [verifiable](https://docs.ipfs.tech/reference/http/gateway/#trustless-verifiable-retrieval) +> content, but lack origin isolation (all content shares the same origin). Additional listening addresses and gateway behaviors can be set in the [config](#configuration) file. ### Public gateways -Protocol Labs provides a public gateway at +IPFS Foundation [provides public gateways](https://docs.ipfs.tech/concepts/public-utilities/) at `https://ipfs.io` ([path](https://specs.ipfs.tech/http-gateways/path-gateway/)), `https://dweb.link` ([subdomain](https://docs.ipfs.tech/how-to/address-ipfs-on-web/#subdomain-gateway)), and `https://trustless-gateway.link` ([trustless](https://specs.ipfs.tech/http-gateways/trustless-gateway/) only). @@ -41,6 +48,80 @@ The gateway's log level can be changed with this command: > ipfs log level core/server debug ``` +## Running in Production + +When deploying Kubo's gateway in production, be aware of these important considerations: + + +> [!IMPORTANT] +> **Reverse Proxy:** When running Kubo behind a reverse proxy (such as nginx), +> the original `Host` header **must** be forwarded to Kubo for +> [`Gateway.PublicGateways`](config.md#gatewaypublicgateways) to work. +> Kubo uses the `Host` header to match configured hostnames and detect +> subdomain gateway patterns like `{cid}.ipfs.example.org` or DNSLink hostnames. +> +> If the `Host` header is not forwarded correctly, Kubo will not recognize +> the configured gateway hostnames and requests may be handled incorrectly. +> +> If `X-Forwarded-Proto` is not set, redirects over HTTPS will use wrong protocol +> and DNSLink names will not be inlined for subdomain gateways. +> +> Example: minimal nginx configuration for `example.org` +> +> ```nginx +> server { +> listen 80; +> listen [::]:80; +> +> # IMPORTANT: Include wildcard to match subdomain gateway requests. +> # The dot prefix matches both apex domain and all subdomains. +> server_name .example.org; +> +> location / { +> proxy_pass http://127.0.0.1:8080; +> +> # IMPORTANT: Forward the original Host header to Kubo. +> # Without this, PublicGateways configuration will not work. +> proxy_set_header Host $host; +> +> # IMPORTANT: X-Forwarded-Proto is required for correct behavior: +> # - Redirects will use https:// URLs when set to "https" +> # - DNSLink names will be inlined for subdomain gateways +> # (e.g., /ipns/en.wikipedia-on-ipfs.org → en-wikipedia--on--ipfs-org.ipns.example.org) +> proxy_set_header X-Forwarded-Proto $scheme; +> proxy_set_header X-Forwarded-Host $host; +> } +> } +> ``` +> +> Common mistakes to avoid: +> +> - **Missing wildcard in `server_name`:** Using only `server_name example.org;` +> will not match subdomain requests like `{cid}.ipfs.example.org`. Always +> include `*.example.org` or use the dot prefix `.example.org`. +> +> - **Wrong `Host` header value:** Using `proxy_set_header Host $proxy_host;` +> sends the backend's hostname (e.g., `127.0.0.1:8080`) instead of the +> original `Host` header. Always use `$host` or `$http_host`. +> +> - **Missing `Host` header entirely:** If `proxy_set_header Host` is not +> specified, nginx defaults to `$proxy_host`, which breaks gateway routing. + +> [!IMPORTANT] +> **Timeouts:** Configure [`Gateway.RetrievalTimeout`](config.md#gatewayretrievaltimeout) +> based on your expected content retrieval times. + +> [!IMPORTANT] +> **Rate Limiting:** Use [`Gateway.MaxConcurrentRequests`](config.md#gatewaymaxconcurrentrequests) +> to protect against traffic spikes. + +> [!IMPORTANT] +> **CDN/Cloudflare:** If using Cloudflare or other CDNs with +> [deserialized responses](config.md#gatewaydeserializedresponses) enabled, review +> [`Gateway.MaxRangeRequestFileSize`](config.md#gatewaymaxrangerequestfilesize) to avoid +> excess bandwidth billing from range request bugs. Cloudflare users may need additional +> protection via [Cloudflare Snippets](https://github.com/ipfs/boxo/issues/856#issuecomment-3523944976). + ## Directories For convenience, the gateway (mostly) acts like a normal web-server when serving @@ -53,7 +134,7 @@ a directory: 2. Dynamically build and serve a listing of the contents of the directory. This redirect is skipped if the query string contains a -`go-get=1` parameter. See [PR#3964](https://github.com/ipfs/kubo/pull/3963) +`go-get=1` parameter. See [PR#3963](https://github.com/ipfs/kubo/pull/3963) for details ## Static Websites @@ -107,10 +188,12 @@ This is equivalent of `ipfs block get`. ### `application/vnd.ipld.car` -Returns a [CAR](https://ipld.io/specs/transport/car/) stream for specific DAG and selector. +Returns a [CAR](https://ipld.io/specs/transport/car/) stream for a DAG or a subset of it. -Right now only 'full DAG' implicit selector is implemented. -Support for user-provided IPLD selectors is tracked in https://github.com/ipfs/kubo/issues/8769. +The `dag-scope` parameter controls which blocks are included: `all` (default, entire DAG), +`entity` (logical unit like a file), or `block` (single block). For [UnixFS](https://specs.ipfs.tech/unixfs/) files, +`entity-bytes` enables byte range requests. See [IPIP-402](https://specs.ipfs.tech/ipips/ipip-0402/) +for details. This is a rough equivalent of `ipfs dag export`. diff --git a/docs/p2p-tunnels.md b/docs/p2p-tunnels.md new file mode 100644 index 000000000..9f3c310d8 --- /dev/null +++ b/docs/p2p-tunnels.md @@ -0,0 +1,214 @@ +# P2P Tunnels + +Kubo supports tunneling TCP connections through libp2p streams, similar to SSH +port forwarding (`ssh -L`). This allows exposing local services to remote peers +and forwarding remote services to local ports. + +- [Why P2P Tunnels?](#why-p2p-tunnels) +- [Quick Start](#quick-start) +- [Background Mode](#background-mode) +- [Foreground Mode](#foreground-mode) + - [systemd Integration](#systemd-integration) +- [Security Considerations](#security-considerations) +- [Troubleshooting](#troubleshooting) + +## Why P2P Tunnels? + +Unlike traditional SSH tunnels, libp2p-based tunnels do not require: + +- **No public IP or open ports**: The server does not need a static IP address + or port forwarding configured on the router. Connectivity to peers behind NAT + is facilitated by [Direct Connection Upgrade through Relay (DCUtR)](https://github.com/libp2p/specs/blob/master/relay/DCUtR.md), + which enables NAT hole-punching. + +- **No DNS or IP address management**: All you need is the server's PeerID and + an agreed-upon protocol name (e.g., `/x/ssh`). Kubo handles peer discovery + and routing via the [Amino DHT](https://specs.ipfs.tech/routing/kad-dht/). + +- **Simplified firewall rules**: Since connections are established through + libp2p's existing swarm connections, no additional firewall configuration is + needed beyond what Kubo already requires. + +This makes p2p tunnels useful for connecting to machines on home networks, +behind corporate firewalls, or in environments where traditional port forwarding +is not available. + +## Quick Start + +Enable the experimental feature: + +```console +$ ipfs config --json Experimental.Libp2pStreamMounting true +``` + +Test with netcat (`nc`) - no services required: + +**On the server:** + +```console +$ ipfs p2p listen /x/test /ip4/127.0.0.1/tcp/9999 +$ nc -l -p 9999 +``` + +**On the client:** + +Replace `$SERVER_ID` with the server's peer ID (get it with `ipfs id -f "\n"` +on the server). + +```console +$ ipfs p2p forward /x/test /ip4/127.0.0.1/tcp/9998 /p2p/$SERVER_ID +$ nc 127.0.0.1 9998 +``` + +Type in either terminal and the text appears in the other. Use Ctrl+C to exit. + +## Background Mode + +By default, `ipfs p2p listen` and `ipfs p2p forward` register the tunnel with +the daemon and return immediately. The tunnel persists until explicitly closed +with `ipfs p2p close` or the daemon shuts down. + +This example exposes a local SSH server (listening on `localhost:22`) to a +remote peer. The same pattern works for any TCP service. + +**On the server** (the machine running SSH): + +Register a p2p listener that forwards incoming connections to the local SSH +server. The protocol name `/x/ssh` is an arbitrary identifier that both peers +must agree on (the `/x/` prefix is required for custom protocols). + +```console +$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 +``` + +**On the client:** + +Create a local port (`2222`) that tunnels through libp2p to the server's SSH +service. + +```console +$ ipfs p2p forward /x/ssh /ip4/127.0.0.1/tcp/2222 /p2p/$SERVER_ID +``` + +Now connect to SSH through the tunnel: + +```console +$ ssh user@127.0.0.1 -p 2222 +``` + +**Other services:** To tunnel a different service, change the port and protocol +name. For example, to expose a web server on port 8080, use `/x/mywebapp` and +`/ip4/127.0.0.1/tcp/8080`. + +## Foreground Mode + +Use `--foreground` (`-f`) to block until interrupted. The tunnel is +automatically removed when the command exits: + +```console +$ ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 --foreground +Listening on /x/ssh, forwarding to /ip4/127.0.0.1/tcp/22, waiting for interrupt... +^C +Received interrupt, removing listener for /x/ssh +``` + +The listener/forwarder is automatically removed when: + +- The command receives Ctrl+C or SIGTERM +- `ipfs p2p close` is called +- The daemon shuts down + +This mode is useful for systemd services and scripts that need cleanup on exit. + +### systemd Integration + +The `--foreground` flag enables clean integration with systemd. The examples +below show how to run `ipfs p2p listen` as a user service that starts +automatically when the IPFS daemon is ready. + +Ensure IPFS daemon runs as a systemd user service. See +[misc/README.md](https://github.com/ipfs/kubo/blob/master/misc/README.md#systemd) +for setup instructions and where to place unit files. + +#### P2P listener with path-based activation + +Use a `.path` unit to wait for the daemon's RPC API to be ready before starting +the p2p listener. + +**`ipfs-p2p-tunnel.path`**: + +```systemd +[Unit] +Description=Monitor for IPFS daemon startup +After=ipfs.service +Requires=ipfs.service + +[Path] +PathExists=%h/.ipfs/api +Unit=ipfs-p2p-tunnel.service + +[Install] +WantedBy=default.target +``` + +The `%h` specifier expands to the user's home directory. If you use a custom +`IPFS_PATH`, adjust accordingly. + +**`ipfs-p2p-tunnel.service`**: + +```systemd +[Unit] +Description=IPFS p2p tunnel +Requires=ipfs.service + +[Service] +ExecStart=ipfs p2p listen /x/ssh /ip4/127.0.0.1/tcp/22 -f +Restart=on-failure +RestartSec=10 + +[Install] +WantedBy=default.target +``` + +#### Enabling the services + +```console +$ systemctl --user enable ipfs.service +$ systemctl --user enable ipfs-p2p-tunnel.path +$ systemctl --user start ipfs.service +``` + +The path unit monitors `~/.ipfs/api` and starts `ipfs-p2p-tunnel.service` +once the file exists. + +## Security Considerations + +> [!WARNING] +> This feature provides CLI and HTTP RPC users with the ability to set up port +> forwarding for localhost and LAN ports. If you enable this and plan to expose +> CLI or HTTP RPC to other users or machines, secure the RPC API using +> [`API.Authorizations`](https://github.com/ipfs/kubo/blob/master/docs/config.md#apiauthorizations) +> or custom auth middleware. + +## Troubleshooting + +### Foreground listener stops when terminal closes + +When using `--foreground`, the listener stops if the terminal closes. For +persistent foreground listeners, use a systemd service, `nohup`, `tmux`, or +`screen`. Without `--foreground`, the listener persists in the daemon regardless +of terminal state. + +### Connection refused errors + +Verify: + +1. The experimental feature is enabled: `ipfs config Experimental.Libp2pStreamMounting` +2. The listener is active: `ipfs p2p ls` +3. Both peers can connect: `ipfs swarm connect /p2p/$PEER_ID` + +### Persistent tunnel configuration + +There is currently no way to define tunnels in the Kubo JSON config file. Use +`--foreground` mode with a systemd service for persistent tunnels. Support for +configuring tunnels via JSON config may be added in the future (see [kubo#5460](https://github.com/ipfs/kubo/issues/5460) - PRs welcome!). diff --git a/misc/README.md b/misc/README.md index 28511d3fc..ea683519b 100644 --- a/misc/README.md +++ b/misc/README.md @@ -39,6 +39,12 @@ To run this in your user session, save it as `~/.config/systemd/user/ipfs.servic ``` Read more about `--user` services here: [wiki.archlinux.org:Systemd ](https://wiki.archlinux.org/index.php/Systemd/User#Automatic_start-up_of_systemd_user_instances) +#### P2P tunnel services + +For running `ipfs p2p listen` or `ipfs p2p forward` as systemd services, +see [docs/p2p-tunnels.md](../docs/p2p-tunnels.md) for examples using the +`--foreground` flag and path-based activation. + ### initd - Here is a full-featured sample service file: https://github.com/dylanPowers/ipfs-linux-service/blob/master/init.d/ipfs diff --git a/p2p/listener.go b/p2p/listener.go index f5942ffa0..823f68e81 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -20,6 +20,10 @@ type Listener interface { // close closes the listener. Does not affect child streams close() + + // Done returns a channel that is closed when the listener is closed. + // This allows callers to detect when a listener has been removed. + Done() <-chan struct{} } // Listeners manages a group of Listener implementations, @@ -73,15 +77,13 @@ func (r *Listeners) Register(l Listener) error { return nil } +// Close removes and closes all listeners for which matchFunc returns true. +// Returns the number of listeners closed. func (r *Listeners) Close(matchFunc func(listener Listener) bool) int { - todo := make([]Listener, 0) + var todo []Listener r.Lock() for _, l := range r.Listeners { - if !matchFunc(l) { - continue - } - - if _, ok := r.Listeners[l.key()]; ok { + if matchFunc(l) { delete(r.Listeners, l.key()) todo = append(todo, l) } diff --git a/p2p/local.go b/p2p/local.go index 98028c5d4..31f70e5fc 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -23,6 +23,7 @@ type localListener struct { peer peer.ID listener manet.Listener + done chan struct{} } // ForwardLocal creates new P2P stream to a remote listener. @@ -32,6 +33,7 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I p2p: p2p, proto: proto, peer: peer, + done: make(chan struct{}), } maListener, err := manet.Listen(bindAddr) @@ -98,6 +100,11 @@ func (l *localListener) setupStream(local manet.Conn) { func (l *localListener) close() { l.listener.Close() + close(l.done) +} + +func (l *localListener) Done() <-chan struct{} { + return l.done } func (l *localListener) Protocol() protocol.ID { diff --git a/p2p/remote.go b/p2p/remote.go index b867cb313..fb7b7ccba 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -25,6 +25,8 @@ type remoteListener struct { // reportRemote if set to true makes the handler send '\n' // to target before any data is forwarded reportRemote bool + + done chan struct{} } // ForwardRemote creates new p2p listener. @@ -36,6 +38,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu addr: addr, reportRemote: reportRemote, + done: make(chan struct{}), } if err := p2p.ListenersP2P.Register(listener); err != nil { @@ -99,7 +102,13 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr { return l.addr } -func (l *remoteListener) close() {} +func (l *remoteListener) close() { + close(l.done) +} + +func (l *remoteListener) Done() <-chan struct{} { + return l.done +} func (l *remoteListener) key() protocol.ID { return l.proto diff --git a/test/cli/api_file_test.go b/test/cli/api_file_test.go new file mode 100644 index 000000000..a0ba30fd2 --- /dev/null +++ b/test/cli/api_file_test.go @@ -0,0 +1,104 @@ +package cli + +import ( + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/ipfs/kubo/test/cli/harness" + "github.com/stretchr/testify/require" +) + +// TestAddressFileReady verifies that when address files ($IPFS_PATH/api and +// $IPFS_PATH/gateway) are created, the corresponding HTTP servers are ready +// to accept connections immediately. This prevents race conditions for tools +// like systemd path units that start services when these files appear. +func TestAddressFileReady(t *testing.T) { + t.Parallel() + + t.Run("api file", func(t *testing.T) { + t.Parallel() + h := harness.NewT(t) + node := h.NewNode().Init() + + // Start daemon in background (don't use StartDaemon which waits for API) + res := node.Runner.MustRun(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"daemon"}, + RunFunc: (*exec.Cmd).Start, + }) + node.Daemon = res + defer node.StopDaemon() + + // Poll for api file to appear + apiFile := filepath.Join(node.Dir, "api") + var fileExists bool + for i := 0; i < 100; i++ { + if _, err := os.Stat(apiFile); err == nil { + fileExists = true + break + } + time.Sleep(100 * time.Millisecond) + } + require.True(t, fileExists, "api file should be created") + + // Read the api file to get the address + apiAddr, err := node.TryAPIAddr() + require.NoError(t, err) + + // Extract IP and port from multiaddr + ip, err := apiAddr.ValueForProtocol(4) // P_IP4 + require.NoError(t, err) + port, err := apiAddr.ValueForProtocol(6) // P_TCP + require.NoError(t, err) + + // Immediately try to use the API - should work on first attempt + url := "http://" + ip + ":" + port + "/api/v0/id" + resp, err := http.Post(url, "", nil) + require.NoError(t, err, "RPC API should be ready immediately when api file exists") + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + }) + + t.Run("gateway file", func(t *testing.T) { + t.Parallel() + h := harness.NewT(t) + node := h.NewNode().Init() + + // Start daemon in background + res := node.Runner.MustRun(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"daemon"}, + RunFunc: (*exec.Cmd).Start, + }) + node.Daemon = res + defer node.StopDaemon() + + // Poll for gateway file to appear + gatewayFile := filepath.Join(node.Dir, "gateway") + var fileExists bool + for i := 0; i < 100; i++ { + if _, err := os.Stat(gatewayFile); err == nil { + fileExists = true + break + } + time.Sleep(100 * time.Millisecond) + } + require.True(t, fileExists, "gateway file should be created") + + // Read the gateway file to get the URL (already includes http:// prefix) + gatewayURL, err := os.ReadFile(gatewayFile) + require.NoError(t, err) + + // Immediately try to use the Gateway - should work on first attempt + url := strings.TrimSpace(string(gatewayURL)) + "/ipfs/bafkqaaa" // empty file CID + resp, err := http.Get(url) + require.NoError(t, err, "Gateway should be ready immediately when gateway file exists") + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + }) +} diff --git a/test/cli/backup_bootstrap_test.go b/test/cli/backup_bootstrap_test.go index 017499f3d..eff00048a 100644 --- a/test/cli/backup_bootstrap_test.go +++ b/test/cli/backup_bootstrap_test.go @@ -39,7 +39,9 @@ func TestBackupBootstrapPeers(t *testing.T) { // Start 1 and 2. 2 does not know anyone yet. nodes[1].StartDaemon() + defer nodes[1].StopDaemon() nodes[2].StartDaemon() + defer nodes[2].StopDaemon() assert.Len(t, nodes[1].Peers(), 0) assert.Len(t, nodes[2].Peers(), 0) @@ -51,6 +53,7 @@ func TestBackupBootstrapPeers(t *testing.T) { // Start 0, wait a bit. Should connect to 1, and then discover 2 via the // backup bootstrap peers. nodes[0].StartDaemon() + defer nodes[0].StopDaemon() time.Sleep(time.Millisecond * 500) // Check if they're all connected. diff --git a/test/cli/bitswap_config_test.go b/test/cli/bitswap_config_test.go index 52e9ea541..5ee59ea56 100644 --- a/test/cli/bitswap_config_test.go +++ b/test/cli/bitswap_config_test.go @@ -22,7 +22,9 @@ func TestBitswapConfig(t *testing.T) { t.Parallel() h := harness.NewT(t) provider := h.NewNode().Init().StartDaemon() + defer provider.StopDaemon() requester := h.NewNode().Init().StartDaemon() + defer requester.StopDaemon() hash := provider.IPFSAddStr(string(testData)) requester.Connect(provider) @@ -38,8 +40,10 @@ func TestBitswapConfig(t *testing.T) { provider := h.NewNode().Init() provider.SetIPFSConfig("Bitswap.ServerEnabled", false) provider = provider.StartDaemon() + defer provider.StopDaemon() requester := h.NewNode().Init().StartDaemon() + defer requester.StopDaemon() hash := provider.IPFSAddStr(string(testData)) requester.Connect(provider) @@ -70,8 +74,10 @@ func TestBitswapConfig(t *testing.T) { requester := h.NewNode().Init() requester.SetIPFSConfig("Bitswap.ServerEnabled", false) requester.StartDaemon() + defer requester.StopDaemon() provider := h.NewNode().Init().StartDaemon() + defer provider.StopDaemon() hash := provider.IPFSAddStr(string(testData)) requester.Connect(provider) @@ -91,8 +97,10 @@ func TestBitswapConfig(t *testing.T) { cfg.HTTPRetrieval.Enabled = config.True }) requester.StartDaemon() + defer requester.StopDaemon() provider := h.NewNode().Init().StartDaemon() + defer provider.StopDaemon() hash := provider.IPFSAddStr(string(testData)) requester.Connect(provider) @@ -126,7 +134,9 @@ func TestBitswapConfig(t *testing.T) { cfg.HTTPRetrieval.Enabled = config.True }) provider = provider.StartDaemon() + defer provider.StopDaemon() requester := h.NewNode().Init().StartDaemon() + defer requester.StopDaemon() requester.Connect(provider) // read libp2p identify from remote peer, and print protocols diff --git a/test/cli/content_blocking_test.go b/test/cli/content_blocking_test.go index 8c50aee2b..513de5e59 100644 --- a/test/cli/content_blocking_test.go +++ b/test/cli/content_blocking_test.go @@ -76,6 +76,7 @@ func TestContentBlocking(t *testing.T) { // Start daemon, it should pick up denylist from $IPFS_PATH/denylists/test.deny node.StartDaemon() // we need online mode for GatewayOverLibp2p tests + t.Cleanup(func() { node.StopDaemon() }) client := node.GatewayClient() // First, confirm gateway works diff --git a/test/cli/dag_test.go b/test/cli/dag_test.go index f6758a710..38457318a 100644 --- a/test/cli/dag_test.go +++ b/test/cli/dag_test.go @@ -47,6 +47,8 @@ func TestDag(t *testing.T) { t.Run("ipfs dag stat --enc=json", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() + // Import fixture r, err := os.Open(fixtureFile) assert.Nil(t, err) @@ -91,6 +93,7 @@ func TestDag(t *testing.T) { t.Run("ipfs dag stat", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() r, err := os.Open(fixtureFile) assert.NoError(t, err) defer r.Close() diff --git a/test/cli/delegated_routing_v1_http_proxy_test.go b/test/cli/delegated_routing_v1_http_proxy_test.go index 562532cb9..2b82a2714 100644 --- a/test/cli/delegated_routing_v1_http_proxy_test.go +++ b/test/cli/delegated_routing_v1_http_proxy_test.go @@ -60,6 +60,10 @@ func TestRoutingV1Proxy(t *testing.T) { }) nodes[2].StartDaemon() + t.Cleanup(func() { + nodes.StopDaemons() + }) + // Connect them. nodes.Connect() diff --git a/test/cli/delegated_routing_v1_http_server_test.go b/test/cli/delegated_routing_v1_http_server_test.go index ffcc571b7..503dba39b 100644 --- a/test/cli/delegated_routing_v1_http_server_test.go +++ b/test/cli/delegated_routing_v1_http_server_test.go @@ -32,6 +32,7 @@ func TestRoutingV1Server(t *testing.T) { }) }) nodes.StartDaemons().Connect() + t.Cleanup(func() { nodes.StopDaemons() }) return nodes } @@ -133,6 +134,7 @@ func TestRoutingV1Server(t *testing.T) { cfg.Routing.Type = config.NewOptionalString("dht") }) node.StartDaemon() + defer node.StopDaemon() // Put IPNS record in lonely node. It should be accepted as it is a valid record. c, err = client.New(node.GatewayURL()) @@ -196,6 +198,7 @@ func TestRoutingV1Server(t *testing.T) { } }) node.StartDaemon() + defer node.StopDaemon() c, err := client.New(node.GatewayURL()) require.NoError(t, err) @@ -238,6 +241,7 @@ func TestRoutingV1Server(t *testing.T) { cfg.Bootstrap = autoconf.FallbackBootstrapPeers }) node.StartDaemon() + defer node.StopDaemon() c, err := client.New(node.GatewayURL()) require.NoError(t, err) diff --git a/test/cli/dht_autoclient_test.go b/test/cli/dht_autoclient_test.go index adb200509..75e1cc241 100644 --- a/test/cli/dht_autoclient_test.go +++ b/test/cli/dht_autoclient_test.go @@ -16,6 +16,7 @@ func TestDHTAutoclient(t *testing.T) { node.IPFS("config", "Routing.Type", "autoclient") }) nodes.StartDaemons().Connect() + t.Cleanup(func() { nodes.StopDaemons() }) t.Run("file added on node in client mode is retrievable from node in client mode", func(t *testing.T) { t.Parallel() diff --git a/test/cli/dht_opt_prov_test.go b/test/cli/dht_opt_prov_test.go index 17b846dc7..291d48c54 100644 --- a/test/cli/dht_opt_prov_test.go +++ b/test/cli/dht_opt_prov_test.go @@ -22,6 +22,7 @@ func TestDHTOptimisticProvide(t *testing.T) { }) nodes.StartDaemons().Connect() + defer nodes.StopDaemons() hash := nodes[0].IPFSAddStr(string(random.Bytes(100))) nodes[0].IPFS("routing", "provide", hash) diff --git a/test/cli/files_test.go b/test/cli/files_test.go index ece87850e..4760c23aa 100644 --- a/test/cli/files_test.go +++ b/test/cli/files_test.go @@ -19,6 +19,7 @@ func TestFilesCp(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // Create simple text file data := "testing files cp command" @@ -36,6 +37,7 @@ func TestFilesCp(t *testing.T) { t.Run("files cp with unsupported DAG node type fails", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // MFS UnixFS is limited to dag-pb or raw, so we create a dag-cbor node to test this jsonData := `{"data": "not a UnixFS node"}` @@ -53,6 +55,7 @@ func TestFilesCp(t *testing.T) { t.Run("files cp with invalid UnixFS data structure fails", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // Create an invalid proto file data := []byte{0xDE, 0xAD, 0xBE, 0xEF} // Invalid protobuf data @@ -75,6 +78,7 @@ func TestFilesCp(t *testing.T) { t.Run("files cp with raw node succeeds", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // Create a raw node data := "raw data" @@ -98,6 +102,7 @@ func TestFilesCp(t *testing.T) { t.Run("files cp creates intermediate directories with -p", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // Create a simple text file and add it to IPFS data := "hello parent directories" @@ -130,6 +135,7 @@ func TestFilesRm(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // Create a file to remove node.IPFS("files", "mkdir", "/test-dir") @@ -149,6 +155,7 @@ func TestFilesRm(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // Create a file to remove node.IPFS("files", "mkdir", "/test-dir") @@ -166,6 +173,7 @@ func TestFilesRm(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // Create a file to remove node.IPFS("files", "mkdir", "/test-dir") @@ -186,6 +194,7 @@ func TestFilesNoFlushLimit(t *testing.T) { t.Run("reaches default limit of 256 operations", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() // Perform 256 operations with --flush=false (should succeed) for i := 0; i < 256; i++ { @@ -214,6 +223,7 @@ func TestFilesNoFlushLimit(t *testing.T) { }) node.StartDaemon() + defer node.StopDaemon() // Perform 5 operations (should succeed) for i := 0; i < 5; i++ { @@ -239,6 +249,7 @@ func TestFilesNoFlushLimit(t *testing.T) { }) node.StartDaemon() + defer node.StopDaemon() // Do 2 operations with --flush=false node.IPFS("files", "mkdir", "--flush=false", "/dir1") @@ -271,6 +282,7 @@ func TestFilesNoFlushLimit(t *testing.T) { }) node.StartDaemon() + defer node.StopDaemon() // Do 2 operations with --flush=false node.IPFS("files", "mkdir", "--flush=false", "/dir1") @@ -303,6 +315,7 @@ func TestFilesNoFlushLimit(t *testing.T) { }) node.StartDaemon() + defer node.StopDaemon() // Should be able to do many operations without error for i := 0; i < 300; i++ { @@ -322,6 +335,7 @@ func TestFilesNoFlushLimit(t *testing.T) { }) node.StartDaemon() + defer node.StopDaemon() // Mix of different MFS operations (5 operations to hit the limit) node.IPFS("files", "mkdir", "--flush=false", "/testdir") diff --git a/test/cli/fixtures/TestDagStatExpectedOutput.txt b/test/cli/fixtures/TestDagStatExpectedOutput.txt index 9e709f4a2..87bc405a1 100644 --- a/test/cli/fixtures/TestDagStatExpectedOutput.txt +++ b/test/cli/fixtures/TestDagStatExpectedOutput.txt @@ -4,9 +4,9 @@ bafyreibmdfd7c5db4kls4ty57zljfhqv36gi43l6txl44pi423wwmeskwy 2 53 bafyreie3njilzdi4ixumru4nzgecsnjtu7fzfcwhg7e6s4s5i7cnbslvn4 2 53 Summary -Total Size: 99 +Total Size: 99 (99 B) Unique Blocks: 3 -Shared Size: 7 +Shared Size: 7 (7 B) Ratio: 1.070707 diff --git a/test/cli/gateway_limits_test.go b/test/cli/gateway_limits_test.go index 2c5554cf3..990eabb1a 100644 --- a/test/cli/gateway_limits_test.go +++ b/test/cli/gateway_limits_test.go @@ -28,6 +28,7 @@ func TestGatewayLimits(t *testing.T) { cfg.Gateway.RetrievalTimeout = config.NewOptionalDuration(1 * time.Second) }) node.StartDaemon() + defer node.StopDaemon() // Add content that can be retrieved quickly cid := node.IPFSAddStr("test content") @@ -69,6 +70,7 @@ func TestGatewayLimits(t *testing.T) { cfg.Gateway.RetrievalTimeout = config.NewOptionalDuration(2 * time.Second) }) node.StartDaemon() + defer node.StopDaemon() // Add some content - use a non-existent CID that will block during retrieval // to ensure we can control timing diff --git a/test/cli/gateway_range_test.go b/test/cli/gateway_range_test.go index 2d8ce1a3e..9efe08710 100644 --- a/test/cli/gateway_range_test.go +++ b/test/cli/gateway_range_test.go @@ -27,6 +27,7 @@ func TestGatewayHAMTDirectory(t *testing.T) { // Start node h := harness.NewT(t) node := h.NewNode().Init("--empty-repo", "--profile=test").StartDaemon("--offline") + defer node.StopDaemon() client := node.GatewayClient() // Import fixtures @@ -56,6 +57,7 @@ func TestGatewayHAMTRanges(t *testing.T) { // Start node h := harness.NewT(t) node := h.NewNode().Init("--empty-repo", "--profile=test").StartDaemon("--offline") + t.Cleanup(func() { node.StopDaemon() }) client := node.GatewayClient() // Import fixtures diff --git a/test/cli/gateway_test.go b/test/cli/gateway_test.go index 2d500c655..b80d2d700 100644 --- a/test/cli/gateway_test.go +++ b/test/cli/gateway_test.go @@ -28,6 +28,7 @@ func TestGateway(t *testing.T) { t.Parallel() h := harness.NewT(t) node := h.NewNode().Init().StartDaemon("--offline") + t.Cleanup(func() { node.StopDaemon() }) cid := node.IPFSAddStr("Hello Worlds!") peerID, err := peer.ToCid(node.PeerID()).StringOfBase(multibase.Base36) @@ -234,6 +235,7 @@ func TestGateway(t *testing.T) { cfg.API.HTTPHeaders = map[string][]string{header: values} }) node.StartDaemon() + defer node.StopDaemon() resp := node.APIClient().DisableRedirects().Get("/webui/") assert.Equal(t, resp.Headers.Values(header), values) @@ -257,6 +259,7 @@ func TestGateway(t *testing.T) { t.Run("pprof", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + t.Cleanup(func() { node.StopDaemon() }) apiClient := node.APIClient() t.Run("mutex", func(t *testing.T) { t.Parallel() @@ -300,6 +303,7 @@ func TestGateway(t *testing.T) { t.Parallel() h := harness.NewT(t) node := h.NewNode().Init().StartDaemon() + t.Cleanup(func() { node.StopDaemon() }) h.WriteFile("index/index.html", "

") cid := node.IPFS("add", "-Q", "-r", filepath.Join(h.Dir, "index")).Stderr.Trimmed() @@ -367,6 +371,7 @@ func TestGateway(t *testing.T) { cfg.Addresses.Gateway = config.Strings{"/ip4/127.0.0.1/tcp/32563"} }) node.StartDaemon() + defer node.StopDaemon() b, err := os.ReadFile(filepath.Join(node.Dir, "gateway")) require.NoError(t, err) @@ -388,6 +393,7 @@ func TestGateway(t *testing.T) { assert.NoError(t, err) nodes.StartDaemons().Connect() + t.Cleanup(func() { nodes.StopDaemons() }) t.Run("not present", func(t *testing.T) { cidFoo := node2.IPFSAddStr("foo") @@ -460,6 +466,7 @@ func TestGateway(t *testing.T) { } }) node.StartDaemon() + defer node.StopDaemon() cidFoo := node.IPFSAddStr("foo") client := node.GatewayClient() @@ -509,6 +516,7 @@ func TestGateway(t *testing.T) { node := harness.NewT(t).NewNode().Init() node.StartDaemon() + defer node.StopDaemon() client := node.GatewayClient() res := client.Get("/ipfs/invalid-thing", func(r *http.Request) { @@ -526,6 +534,7 @@ func TestGateway(t *testing.T) { cfg.Gateway.DisableHTMLErrors = config.True }) node.StartDaemon() + defer node.StopDaemon() client := node.GatewayClient() res := client.Get("/ipfs/invalid-thing", func(r *http.Request) { @@ -546,6 +555,7 @@ func TestLogs(t *testing.T) { t.Setenv("GOLOG_LOG_LEVEL", "info") node := h.NewNode().Init().StartDaemon("--offline") + defer node.StopDaemon() cid := node.IPFSAddStr("Hello Worlds!") peerID, err := peer.ToCid(node.PeerID()).StringOfBase(multibase.Base36) diff --git a/test/cli/http_gateway_over_libp2p_test.go b/test/cli/http_gateway_over_libp2p_test.go index f8cfe0071..58ab0217b 100644 --- a/test/cli/http_gateway_over_libp2p_test.go +++ b/test/cli/http_gateway_over_libp2p_test.go @@ -32,6 +32,7 @@ func TestGatewayOverLibp2p(t *testing.T) { p2pProxyNode := nodes[1] nodes.StartDaemons().Connect() + defer nodes.StopDaemons() // Add data to the gateway node cidDataOnGatewayNode := cid.MustParse(gwNode.IPFSAddStr("Hello Worlds2!")) @@ -65,6 +66,7 @@ func TestGatewayOverLibp2p(t *testing.T) { // Enable the experimental feature and reconnect the nodes gwNode.IPFS("config", "--json", "Experimental.GatewayOverLibp2p", "true") gwNode.StopDaemon().StartDaemon() + t.Cleanup(func() { gwNode.StopDaemon() }) nodes.Connect() // Note: the bare HTTP requests here assume that the gateway is mounted at `/` diff --git a/test/cli/http_retrieval_client_test.go b/test/cli/http_retrieval_client_test.go index e2934fc99..32628bfce 100644 --- a/test/cli/http_retrieval_client_test.go +++ b/test/cli/http_retrieval_client_test.go @@ -75,6 +75,7 @@ func TestHTTPRetrievalClient(t *testing.T) { // Start Kubo node.StartDaemon() + defer node.StopDaemon() if debug { fmt.Printf("delegatedRoutingServer.URL: %s\n", delegatedRoutingServer.URL) diff --git a/test/cli/init_test.go b/test/cli/init_test.go index 217ec64c3..dee844608 100644 --- a/test/cli/init_test.go +++ b/test/cli/init_test.go @@ -155,6 +155,7 @@ func TestInit(t *testing.T) { t.Run("ipfs init should not run while daemon is running", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("init") assert.NotEqual(t, 0, res.ExitErr.ExitCode()) assert.Contains(t, res.Stderr.String(), "Error: ipfs daemon is running. please stop it to run this command") diff --git a/test/cli/name_test.go b/test/cli/name_test.go index a0931bfa0..cf5df2bb0 100644 --- a/test/cli/name_test.go +++ b/test/cli/name_test.go @@ -103,6 +103,7 @@ func TestName(t *testing.T) { }) node.StartDaemon() + defer node.StopDaemon() t.Run("Resolving self offline succeeds (daemon on)", func(t *testing.T) { res = node.IPFS("name", "resolve", "--offline", "/ipns/"+name.String()) @@ -147,6 +148,7 @@ func TestName(t *testing.T) { t.Run("Fails to publish in offline mode", func(t *testing.T) { t.Parallel() node := makeDaemon(t, nil).StartDaemon("--offline") + defer node.StopDaemon() res := node.RunIPFS("name", "publish", "/ipfs/"+fixtureCid) require.Error(t, res.Err) require.Equal(t, 1, res.ExitCode()) @@ -157,6 +159,7 @@ func TestName(t *testing.T) { t.Parallel() node := makeDaemon(t, nil).StartDaemon() + defer node.StopDaemon() ipnsName := ipns.NameFromPeer(node.PeerID()).String() ipnsPath := ipns.NamespacePrefix + ipnsName publishPath := "/ipfs/" + fixtureCid @@ -187,6 +190,7 @@ func TestName(t *testing.T) { t.Parallel() node := makeDaemon(t, nil).StartDaemon() + t.Cleanup(func() { node.StopDaemon() }) ipnsPath := ipns.NamespacePrefix + ipns.NameFromPeer(node.PeerID()).String() publishPath := "/ipfs/" + fixtureCid @@ -227,6 +231,7 @@ func TestName(t *testing.T) { t.Run("Inspect with verification using wrong RSA key errors", func(t *testing.T) { t.Parallel() node := makeDaemon(t, nil).StartDaemon() + defer node.StopDaemon() // Prepare RSA Key 1 res := node.IPFS("key", "gen", "--type=rsa", "--size=4096", "key1") @@ -299,6 +304,7 @@ func TestName(t *testing.T) { t.Parallel() node := makeDaemon(t, nil).StartDaemon() + defer node.StopDaemon() publishPath1 := "/ipfs/" + fixtureCid publishPath2 := "/ipfs/" + dagCid // Different content name := ipns.NameFromPeer(node.PeerID()) diff --git a/test/cli/p2p_test.go b/test/cli/p2p_test.go new file mode 100644 index 000000000..2400d7d8b --- /dev/null +++ b/test/cli/p2p_test.go @@ -0,0 +1,430 @@ +package cli + +import ( + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os/exec" + "slices" + "syscall" + "testing" + "time" + + "github.com/ipfs/kubo/core/commands" + "github.com/ipfs/kubo/test/cli/harness" + "github.com/stretchr/testify/require" +) + +// waitForListenerCount waits until the node has exactly the expected number of listeners. +func waitForListenerCount(t *testing.T, node *harness.Node, expectedCount int) { + t.Helper() + require.Eventually(t, func() bool { + lsOut := node.IPFS("p2p", "ls", "--enc=json") + var lsResult commands.P2PLsOutput + if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil { + return false + } + return len(lsResult.Listeners) == expectedCount + }, 5*time.Second, 100*time.Millisecond, "expected %d listeners", expectedCount) +} + +// waitForListenerProtocol waits until the node has a listener with the given protocol. +func waitForListenerProtocol(t *testing.T, node *harness.Node, protocol string) { + t.Helper() + require.Eventually(t, func() bool { + lsOut := node.IPFS("p2p", "ls", "--enc=json") + var lsResult commands.P2PLsOutput + if err := json.Unmarshal(lsOut.Stdout.Bytes(), &lsResult); err != nil { + return false + } + return slices.ContainsFunc(lsResult.Listeners, func(l commands.P2PListenerInfoOutput) bool { + return l.Protocol == protocol + }) + }, 5*time.Second, 100*time.Millisecond, "expected listener with protocol %s", protocol) +} + +func TestP2PForeground(t *testing.T) { + t.Parallel() + + t.Run("listen foreground creates listener and removes on interrupt", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // Start foreground listener asynchronously + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/fgtest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for listener to be created + waitForListenerProtocol(t, node, "/x/fgtest") + + // Send SIGTERM + _ = res.Cmd.Process.Signal(syscall.SIGTERM) + _ = res.Cmd.Wait() + + // Wait for listener to be removed + waitForListenerCount(t, node, 0) + }) + + t.Run("listen foreground text output on SIGTERM", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // Run without --enc=json to test actual text output users see + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/sigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + waitForListenerProtocol(t, node, "/x/sigterm") + + _ = res.Cmd.Process.Signal(syscall.SIGTERM) + _ = res.Cmd.Wait() + + // Verify stdout shows "waiting for interrupt" message + stdout := res.Stdout.String() + require.Contains(t, stdout, "waiting for interrupt") + + // Note: "Received interrupt, removing listener" message is NOT visible to CLI on SIGTERM + // because the command runs in the daemon via RPC and the response stream closes before + // the message can be emitted. The important behavior is verified in the first test: + // the listener IS removed when SIGTERM is sent. + }) + + t.Run("forward foreground creates forwarder and removes on interrupt", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + forwardPort := harness.NewRandPort() + + // Start foreground forwarder asynchronously on node 0 + res := nodes[0].Runner.Run(harness.RunRequest{ + Path: nodes[0].IPFSBin, + Args: []string{"p2p", "forward", "--foreground", "/x/fgfwd", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for forwarder to be created + waitForListenerCount(t, nodes[0], 1) + + // Send SIGTERM + _ = res.Cmd.Process.Signal(syscall.SIGTERM) + _ = res.Cmd.Wait() + + // Wait for forwarder to be removed + waitForListenerCount(t, nodes[0], 0) + }) + + t.Run("forward foreground text output on SIGTERM", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + forwardPort := harness.NewRandPort() + + // Run without --enc=json to test actual text output users see + res := nodes[0].Runner.Run(harness.RunRequest{ + Path: nodes[0].IPFSBin, + Args: []string{"p2p", "forward", "--foreground", "/x/fwdsigterm", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + waitForListenerCount(t, nodes[0], 1) + + _ = res.Cmd.Process.Signal(syscall.SIGTERM) + _ = res.Cmd.Wait() + + // Verify stdout shows "waiting for interrupt" message + stdout := res.Stdout.String() + require.Contains(t, stdout, "waiting for interrupt") + + // Note: "Received interrupt, removing forwarder" message is NOT visible to CLI on SIGTERM + // because the response stream closes before the message can be emitted. + }) + + t.Run("listen without foreground returns immediately and persists", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // This should return immediately (not block) + node.IPFS("p2p", "listen", "/x/nofg", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)) + + // Listener should still exist + waitForListenerProtocol(t, node, "/x/nofg") + + // Clean up + node.IPFS("p2p", "close", "-p", "/x/nofg") + }) + + t.Run("listen foreground text output on p2p close", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // Run without --enc=json to test actual text output users see + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/closetest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for listener to be created + waitForListenerProtocol(t, node, "/x/closetest") + + // Close the listener via ipfs p2p close command + node.IPFS("p2p", "close", "-p", "/x/closetest") + + // Wait for foreground command to exit (it should exit quickly after close) + done := make(chan error, 1) + go func() { + done <- res.Cmd.Wait() + }() + + select { + case <-done: + // Good - command exited + case <-time.After(5 * time.Second): + _ = res.Cmd.Process.Kill() + t.Fatal("foreground command did not exit after listener was closed via ipfs p2p close") + } + + // Wait for listener to be removed + waitForListenerCount(t, node, 0) + + // Verify text output shows BOTH messages when closed via p2p close + // (unlike SIGTERM, the stream is still open so "Received interrupt" is emitted) + out := res.Stdout.String() + require.Contains(t, out, "waiting for interrupt") + require.Contains(t, out, "Received interrupt, removing listener") + }) + + t.Run("forward foreground text output on p2p close", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + forwardPort := harness.NewRandPort() + + // Run without --enc=json to test actual text output users see + res := nodes[0].Runner.Run(harness.RunRequest{ + Path: nodes[0].IPFSBin, + Args: []string{"p2p", "forward", "--foreground", "/x/fwdclose", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[1].PeerID().String()}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for forwarder to be created + waitForListenerCount(t, nodes[0], 1) + + // Close the forwarder via ipfs p2p close command + nodes[0].IPFS("p2p", "close", "-a") + + // Wait for foreground command to exit + done := make(chan error, 1) + go func() { + done <- res.Cmd.Wait() + }() + + select { + case <-done: + // Good - command exited + case <-time.After(5 * time.Second): + _ = res.Cmd.Process.Kill() + t.Fatal("foreground command did not exit after forwarder was closed via ipfs p2p close") + } + + // Wait for forwarder to be removed + waitForListenerCount(t, nodes[0], 0) + + // Verify text output shows BOTH messages when closed via p2p close + out := res.Stdout.String() + require.Contains(t, out, "waiting for interrupt") + require.Contains(t, out, "Received interrupt, removing forwarder") + }) + + t.Run("listen foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + httpServerPort := harness.NewRandPort() + forwardPort := harness.NewRandPort() + + // Start HTTP server + expectedBody := "Hello from p2p tunnel!" + httpServer := &http.Server{ + Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort), + Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(expectedBody)) + }), + } + listener, err := net.Listen("tcp", httpServer.Addr) + require.NoError(t, err) + go func() { _ = httpServer.Serve(listener) }() + defer httpServer.Close() + + // Node 0: listen --foreground + listenRes := nodes[0].Runner.Run(harness.RunRequest{ + Path: nodes[0].IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, listenRes.Err) + + // Wait for listener to be created + waitForListenerProtocol(t, nodes[0], "/x/httptest") + + // Node 1: forward (non-foreground) + nodes[1].IPFS("p2p", "forward", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/"+nodes[0].PeerID().String()) + + // Verify data flows through tunnel + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort)) + require.NoError(t, err) + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoError(t, err) + require.Equal(t, expectedBody, string(body)) + + // Clean up forwarder on node 1 + nodes[1].IPFS("p2p", "close", "-a") + + // SIGTERM the listen --foreground command + _ = listenRes.Cmd.Process.Signal(syscall.SIGTERM) + _ = listenRes.Cmd.Wait() + + // Wait for listener to be removed on node 0 + waitForListenerCount(t, nodes[0], 0) + }) + + t.Run("forward foreground tunnel transfers data and cleans up on SIGTERM", func(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(2).Init() + nodes.ForEachPar(func(n *harness.Node) { + n.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + }) + nodes.StartDaemons().Connect() + + httpServerPort := harness.NewRandPort() + forwardPort := harness.NewRandPort() + + // Start HTTP server + expectedBody := "Hello from forward foreground tunnel!" + httpServer := &http.Server{ + Addr: fmt.Sprintf("127.0.0.1:%d", httpServerPort), + Handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(expectedBody)) + }), + } + listener, err := net.Listen("tcp", httpServer.Addr) + require.NoError(t, err) + go func() { _ = httpServer.Serve(listener) }() + defer httpServer.Close() + + // Node 0: listen (non-foreground) + nodes[0].IPFS("p2p", "listen", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", httpServerPort)) + + // Node 1: forward --foreground + forwardRes := nodes[1].Runner.Run(harness.RunRequest{ + Path: nodes[1].IPFSBin, + Args: []string{"p2p", "forward", "--foreground", "/x/httptest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", forwardPort), "/p2p/" + nodes[0].PeerID().String()}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, forwardRes.Err) + + // Wait for forwarder to be created + waitForListenerCount(t, nodes[1], 1) + + // Verify data flows through tunnel + resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", forwardPort)) + require.NoError(t, err) + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoError(t, err) + require.Equal(t, expectedBody, string(body)) + + // SIGTERM the forward --foreground command + _ = forwardRes.Cmd.Process.Signal(syscall.SIGTERM) + _ = forwardRes.Cmd.Wait() + + // Wait for forwarder to be removed on node 1 + waitForListenerCount(t, nodes[1], 0) + + // Clean up listener on node 0 + nodes[0].IPFS("p2p", "close", "-a") + }) + + t.Run("foreground command exits when daemon shuts down", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.IPFS("config", "--json", "Experimental.Libp2pStreamMounting", "true") + node.StartDaemon() + + listenPort := harness.NewRandPort() + + // Start foreground listener + res := node.Runner.Run(harness.RunRequest{ + Path: node.IPFSBin, + Args: []string{"p2p", "listen", "--foreground", "/x/daemontest", fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)}, + RunFunc: (*exec.Cmd).Start, + }) + require.NoError(t, res.Err) + + // Wait for listener to be created + waitForListenerProtocol(t, node, "/x/daemontest") + + // Stop the daemon + node.StopDaemon() + + // Wait for foreground command to exit + done := make(chan error, 1) + go func() { + done <- res.Cmd.Wait() + }() + + select { + case <-done: + // Good - foreground command exited when daemon stopped + case <-time.After(5 * time.Second): + _ = res.Cmd.Process.Kill() + t.Fatal("foreground command did not exit when daemon was stopped") + } + }) +} diff --git a/test/cli/peering_test.go b/test/cli/peering_test.go index 9c6ab975d..227e83f18 100644 --- a/test/cli/peering_test.go +++ b/test/cli/peering_test.go @@ -62,6 +62,7 @@ func TestPeering(t *testing.T) { h, nodes := harness.CreatePeerNodes(t, 3, peerings) nodes.StartDaemons() + defer nodes.StopDaemons() assertPeerings(h, nodes, peerings) nodes[0].Disconnect(nodes[1]) @@ -74,6 +75,7 @@ func TestPeering(t *testing.T) { h, nodes := harness.CreatePeerNodes(t, 3, peerings) nodes.StartDaemons() + defer nodes.StopDaemons() assertPeerings(h, nodes, peerings) nodes[2].Disconnect(nodes[1]) @@ -85,6 +87,7 @@ func TestPeering(t *testing.T) { peerings := []harness.Peering{{From: 0, To: 1}, {From: 1, To: 0}, {From: 1, To: 2}} h, nodes := harness.CreatePeerNodes(t, 3, peerings) + defer nodes.StopDaemons() nodes[0].StartDaemon() nodes[1].StartDaemon() assertPeerings(h, nodes, []harness.Peering{{From: 0, To: 1}, {From: 1, To: 0}}) @@ -99,6 +102,7 @@ func TestPeering(t *testing.T) { h, nodes := harness.CreatePeerNodes(t, 3, peerings) nodes.StartDaemons() + defer nodes.StopDaemons() assertPeerings(h, nodes, peerings) nodes[2].StopDaemon() diff --git a/test/cli/pin_ls_names_test.go b/test/cli/pin_ls_names_test.go index 54532b6b2..f8ae76885 100644 --- a/test/cli/pin_ls_names_test.go +++ b/test/cli/pin_ls_names_test.go @@ -28,6 +28,9 @@ func setupTestNode(t *testing.T) *harness.Node { t.Helper() node := harness.NewT(t).NewNode().Init() node.StartDaemon("--offline") + t.Cleanup(func() { + node.StopDaemon() + }) return node } @@ -498,7 +501,6 @@ func TestPinLsEdgeCases(t *testing.T) { t.Run("invalid pin type returns error", func(t *testing.T) { t.Parallel() node := setupTestNode(t) - defer node.StopDaemon() // Try to list pins with invalid type res := node.RunIPFS("pin", "ls", "--type=invalid") @@ -510,7 +512,6 @@ func TestPinLsEdgeCases(t *testing.T) { t.Run("non-existent path returns proper error", func(t *testing.T) { t.Parallel() node := setupTestNode(t) - defer node.StopDaemon() // Try to list a non-existent CID fakeCID := "QmNonExistent123456789" @@ -521,7 +522,6 @@ func TestPinLsEdgeCases(t *testing.T) { t.Run("unpinned CID returns not pinned error", func(t *testing.T) { t.Parallel() node := setupTestNode(t) - defer node.StopDaemon() // Add content but don't pin it explicitly (it's just in blockstore) unpinnedCID := node.IPFSAddStr("unpinned content", "--pin=false") diff --git a/test/cli/ping_test.go b/test/cli/ping_test.go index 9470e67d8..85de29cf9 100644 --- a/test/cli/ping_test.go +++ b/test/cli/ping_test.go @@ -15,6 +15,7 @@ func TestPing(t *testing.T) { t.Run("other", func(t *testing.T) { t.Parallel() nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect() + defer nodes.StopDaemons() node1 := nodes[0] node2 := nodes[1] @@ -25,6 +26,7 @@ func TestPing(t *testing.T) { t.Run("ping unreachable peer", func(t *testing.T) { t.Parallel() nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect() + defer nodes.StopDaemons() node1 := nodes[0] badPeer := "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJx" @@ -37,6 +39,7 @@ func TestPing(t *testing.T) { t.Run("self", func(t *testing.T) { t.Parallel() nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons() + defer nodes.StopDaemons() node1 := nodes[0] node2 := nodes[1] @@ -52,6 +55,7 @@ func TestPing(t *testing.T) { t.Run("0", func(t *testing.T) { t.Parallel() nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect() + defer nodes.StopDaemons() node1 := nodes[0] node2 := nodes[1] @@ -63,6 +67,7 @@ func TestPing(t *testing.T) { t.Run("offline", func(t *testing.T) { t.Parallel() nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect() + defer nodes.StopDaemons() node1 := nodes[0] node2 := nodes[1] diff --git a/test/cli/pinning_remote_test.go b/test/cli/pinning_remote_test.go index fd9ae8e94..6c802aaa0 100644 --- a/test/cli/pinning_remote_test.go +++ b/test/cli/pinning_remote_test.go @@ -51,6 +51,7 @@ func TestRemotePinning(t *testing.T) { node.IPFS("config", "--json", "Pinning.RemoteServices.svc.Policies.MFS.Enable", "true") node.StartDaemon() + t.Cleanup(func() { node.StopDaemon() }) node.IPFS("files", "cp", "/ipfs/bafkqaaa", "/mfs-pinning-test-"+uuid.NewString()) node.IPFS("files", "flush") @@ -133,6 +134,8 @@ func TestRemotePinning(t *testing.T) { t.Run("pin remote service ls --stat", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() + _, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken) @@ -155,6 +158,7 @@ func TestRemotePinning(t *testing.T) { t.Run("adding service with invalid URL fails", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("pin", "remote", "service", "add", "svc", "invalid-service.example.com", "key") assert.Equal(t, 1, res.ExitCode()) @@ -168,6 +172,7 @@ func TestRemotePinning(t *testing.T) { t.Run("unauthorized pinning service calls fail", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() _, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, "othertoken") @@ -180,6 +185,7 @@ func TestRemotePinning(t *testing.T) { t.Run("pinning service calls fail when there is a wrong path", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() _, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL+"/invalid-path", authToken) @@ -191,6 +197,7 @@ func TestRemotePinning(t *testing.T) { t.Run("pinning service calls fail when DNS resolution fails", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() node.IPFS("pin", "remote", "service", "add", "svc", "https://invalid-service.example.com", authToken) res := node.RunIPFS("pin", "remote", "ls", "--service=svc") @@ -201,6 +208,7 @@ func TestRemotePinning(t *testing.T) { t.Run("pin remote service rm", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() node.IPFS("pin", "remote", "service", "add", "svc", "https://example.com", authToken) node.IPFS("pin", "remote", "service", "rm", "svc") res := node.IPFS("pin", "remote", "service", "ls") @@ -225,6 +233,7 @@ func TestRemotePinning(t *testing.T) { t.Run("'ipfs pin remote add --background=true'", func(t *testing.T) { node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() svc, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken) @@ -266,6 +275,7 @@ func TestRemotePinning(t *testing.T) { t.Run("'ipfs pin remote add --background=false'", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() svc, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken) @@ -287,6 +297,7 @@ func TestRemotePinning(t *testing.T) { t.Run("'ipfs pin remote ls' with multiple statuses", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() svc, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken) @@ -340,6 +351,7 @@ func TestRemotePinning(t *testing.T) { t.Run("'ipfs pin remote ls' by CID", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() svc, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken) @@ -360,6 +372,7 @@ func TestRemotePinning(t *testing.T) { t.Run("'ipfs pin remote rm --name' without --force when multiple pins match", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() svc, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken) @@ -388,6 +401,7 @@ func TestRemotePinning(t *testing.T) { t.Run("'ipfs pin remote rm --name --force' remove multiple pins", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() svc, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken) @@ -408,6 +422,7 @@ func TestRemotePinning(t *testing.T) { t.Run("'ipfs pin remote rm --force' removes all pins", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() svc, svcURL := runPinningService(t, authToken) node.IPFS("pin", "remote", "service", "add", "svc", svcURL, authToken) diff --git a/test/cli/pins_test.go b/test/cli/pins_test.go index 1425a90b2..8e98aa7fe 100644 --- a/test/cli/pins_test.go +++ b/test/cli/pins_test.go @@ -26,6 +26,7 @@ func testPins(t *testing.T, args testPinsArgs) { node := harness.NewT(t).NewNode().Init() if args.runDaemon { node.StartDaemon("--offline") + defer node.StopDaemon() } strs := []string{"a", "b", "c", "d", "e", "f", "g"} @@ -127,6 +128,7 @@ func testPinsErrorReporting(t *testing.T, args testPinsArgs) { node := harness.NewT(t).NewNode().Init() if args.runDaemon { node.StartDaemon("--offline") + defer node.StopDaemon() } randomCID := "Qme8uX5n9hn15pw9p6WcVKoziyyC9LXv4LEgvsmKMULjnV" res := node.RunIPFS(StrCat("pin", "add", args.pinArg, randomCID)...) @@ -142,6 +144,7 @@ func testPinDAG(t *testing.T, args testPinsArgs) { node := h.NewNode().Init() if args.runDaemon { node.StartDaemon("--offline") + defer node.StopDaemon() } bytes := random.Bytes(1 << 20) // 1 MiB tmpFile := h.WriteToTemp(string(bytes)) @@ -168,6 +171,7 @@ func testPinProgress(t *testing.T, args testPinsArgs) { if args.runDaemon { node.StartDaemon("--offline") + defer node.StopDaemon() } bytes := random.Bytes(1 << 20) // 1 MiB diff --git a/test/cli/provider_test.go b/test/cli/provider_test.go index ccd164860..9d5e0d175 100644 --- a/test/cli/provider_test.go +++ b/test/cli/provider_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "strings" + "sync/atomic" "testing" "time" @@ -764,3 +765,81 @@ func TestProvider(t *testing.T) { }) } } + +// TestHTTPOnlyProviderWithSweepEnabled tests that provider records are correctly +// sent to HTTP routers when Routing.Type="custom" with only HTTP routers configured, +// even when Provide.DHT.SweepEnabled=true (the default since v0.39). +// +// This is a regression test for https://github.com/ipfs/kubo/issues/11089 +func TestHTTPOnlyProviderWithSweepEnabled(t *testing.T) { + t.Parallel() + + // Track provide requests received by the mock HTTP router + var provideRequests atomic.Int32 + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if (r.Method == http.MethodPut || r.Method == http.MethodPost) && + strings.HasPrefix(r.URL.Path, "/routing/v1/providers") { + provideRequests.Add(1) + w.WriteHeader(http.StatusOK) + } else if strings.HasPrefix(r.URL.Path, "/routing/v1/providers") && r.Method == http.MethodGet { + // Return empty providers for findprovs + w.Header().Set("Content-Type", "application/x-ndjson") + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer mockServer.Close() + + h := harness.NewT(t) + node := h.NewNode().Init() + + // Explicitly set SweepEnabled=true (the default since v0.39, but be explicit for test clarity) + node.SetIPFSConfig("Provide.DHT.SweepEnabled", true) + node.SetIPFSConfig("Provide.Enabled", true) + + // Configure HTTP-only custom routing (no DHT) with explicit Routing.Type=custom + routingConf := map[string]any{ + "Type": "custom", // Explicitly set Routing.Type=custom + "Methods": map[string]any{ + "provide": map[string]any{"RouterName": "HTTPRouter"}, + "get-ipns": map[string]any{"RouterName": "HTTPRouter"}, + "put-ipns": map[string]any{"RouterName": "HTTPRouter"}, + "find-peers": map[string]any{"RouterName": "HTTPRouter"}, + "find-providers": map[string]any{"RouterName": "HTTPRouter"}, + }, + "Routers": map[string]any{ + "HTTPRouter": map[string]any{ + "Type": "http", + "Parameters": map[string]any{ + "Endpoint": mockServer.URL, + }, + }, + }, + } + node.SetIPFSConfig("Routing", routingConf) + node.StartDaemon() + defer node.StopDaemon() + + // Add content and manually provide it + cid := node.IPFSAddStr(time.Now().String()) + + // Manual provide should succeed even without libp2p peers + res := node.RunIPFS("routing", "provide", cid) + // Check that the command succeeded (exit code 0) and no provide-related errors + assert.Equal(t, 0, res.ExitCode(), "routing provide should succeed with HTTP-only routing and SweepEnabled=true") + assert.NotContains(t, res.Stderr.String(), "cannot provide", "should not have provide errors") + + // Verify HTTP router received at least one provide request + assert.Greater(t, provideRequests.Load(), int32(0), + "HTTP router should have received provide requests") + + // Verify 'provide stat' works with HTTP-only routing (regression test for stats) + statRes := node.RunIPFS("provide", "stat") + assert.Equal(t, 0, statRes.ExitCode(), "provide stat should succeed with HTTP-only routing") + assert.NotContains(t, statRes.Stderr.String(), "stats not available", + "should not report stats unavailable") + // LegacyProvider outputs "TotalReprovides:" in its stats + assert.Contains(t, statRes.Stdout.String(), "TotalReprovides:", + "should show legacy provider stats") +} diff --git a/test/cli/rcmgr_test.go b/test/cli/rcmgr_test.go index 50ea26979..66e6eb6ac 100644 --- a/test/cli/rcmgr_test.go +++ b/test/cli/rcmgr_test.go @@ -26,6 +26,7 @@ func TestRcmgr(t *testing.T) { }) node.StartDaemon() + defer node.StopDaemon() t.Run("swarm resources should fail", func(t *testing.T) { res := node.RunIPFS("swarm", "resources") @@ -41,6 +42,7 @@ func TestRcmgr(t *testing.T) { cfg.Swarm.ResourceMgr.Enabled = config.False }) node.StartDaemon() + defer node.StopDaemon() t.Run("swarm resources should fail", func(t *testing.T) { res := node.RunIPFS("swarm", "resources") @@ -56,6 +58,7 @@ func TestRcmgr(t *testing.T) { cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(1000) }) node.StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("swarm", "resources", "--enc=json") require.Equal(t, 0, res.ExitCode()) @@ -73,7 +76,9 @@ func TestRcmgr(t *testing.T) { node.UpdateConfig(func(cfg *config.Config) { cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(1000) }) + node.StartDaemon() + t.Cleanup(func() { node.StopDaemon() }) t.Run("conns and streams are above 800 for default connmgr settings", func(t *testing.T) { t.Parallel() @@ -135,6 +140,7 @@ func TestRcmgr(t *testing.T) { overrides.System.ConnsInbound = rcmgr.Unlimited }) node.StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("swarm", "resources", "--enc=json") limits := unmarshalLimits(t, res.Stdout.Bytes()) @@ -150,6 +156,7 @@ func TestRcmgr(t *testing.T) { overrides.Transient.Memory = 88888 }) node.StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("swarm", "resources", "--enc=json") limits := unmarshalLimits(t, res.Stdout.Bytes()) @@ -163,6 +170,7 @@ func TestRcmgr(t *testing.T) { overrides.Service = map[string]rcmgr.ResourceLimits{"foo": {Memory: 77777}} }) node.StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("swarm", "resources", "--enc=json") limits := unmarshalLimits(t, res.Stdout.Bytes()) @@ -176,6 +184,7 @@ func TestRcmgr(t *testing.T) { overrides.Protocol = map[protocol.ID]rcmgr.ResourceLimits{"foo": {Memory: 66666}} }) node.StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("swarm", "resources", "--enc=json") limits := unmarshalLimits(t, res.Stdout.Bytes()) @@ -191,6 +200,7 @@ func TestRcmgr(t *testing.T) { overrides.Peer = map[peer.ID]rcmgr.ResourceLimits{validPeerID: {Memory: 55555}} }) node.StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("swarm", "resources", "--enc=json") limits := unmarshalLimits(t, res.Stdout.Bytes()) @@ -218,6 +228,7 @@ func TestRcmgr(t *testing.T) { }) nodes.StartDaemons() + t.Cleanup(func() { nodes.StopDaemons() }) t.Run("node 0 should fail to connect to and ping node 1", func(t *testing.T) { t.Parallel() diff --git a/test/cli/routing_dht_test.go b/test/cli/routing_dht_test.go index 27ef2b19a..b1f3907b6 100644 --- a/test/cli/routing_dht_test.go +++ b/test/cli/routing_dht_test.go @@ -57,6 +57,7 @@ func testRoutingDHT(t *testing.T, enablePubsub bool) { } nodes.StartDaemons(daemonArgs...).Connect() + t.Cleanup(func() { nodes.StopDaemons() }) t.Run("ipfs routing findpeer", func(t *testing.T) { t.Parallel() @@ -157,6 +158,7 @@ func testSelfFindDHT(t *testing.T) { }) nodes.StartDaemons() + defer nodes.StopDaemons() res := nodes[0].RunIPFS("dht", "findpeer", nodes[0].PeerID().String()) assert.Equal(t, 1, res.ExitCode()) diff --git a/test/cli/stats_test.go b/test/cli/stats_test.go index 05c1702b4..f835381e0 100644 --- a/test/cli/stats_test.go +++ b/test/cli/stats_test.go @@ -14,6 +14,7 @@ func TestStats(t *testing.T) { t.Run("stats dht", func(t *testing.T) { t.Parallel() nodes := harness.NewT(t).NewNodes(2).Init().StartDaemons().Connect() + defer nodes.StopDaemons() node1 := nodes[0] res := node1.IPFS("stats", "dht") diff --git a/test/cli/swarm_test.go b/test/cli/swarm_test.go index 88f5f403b..56c484ae1 100644 --- a/test/cli/swarm_test.go +++ b/test/cli/swarm_test.go @@ -31,6 +31,7 @@ func TestSwarm(t *testing.T) { t.Run("ipfs swarm peers returns empty peers when a node is not connected to any peers", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() res := node.RunIPFS("swarm", "peers", "--enc=json", "--identify") var output expectedOutputType err := json.Unmarshal(res.Stdout.Bytes(), &output) @@ -40,7 +41,9 @@ func TestSwarm(t *testing.T) { t.Run("ipfs swarm peers with flag identify outputs expected identify information about connected peers", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() otherNode := harness.NewT(t).NewNode().Init().StartDaemon() + defer otherNode.StopDaemon() node.Connect(otherNode) res := node.RunIPFS("swarm", "peers", "--enc=json", "--identify") @@ -67,7 +70,9 @@ func TestSwarm(t *testing.T) { t.Run("ipfs swarm peers with flag identify outputs Identify field with data that matches calling ipfs id on a peer", func(t *testing.T) { t.Parallel() node := harness.NewT(t).NewNode().Init().StartDaemon() + defer node.StopDaemon() otherNode := harness.NewT(t).NewNode().Init().StartDaemon() + defer otherNode.StopDaemon() node.Connect(otherNode) otherNodeIDResponse := otherNode.RunIPFS("id", "--enc=json") diff --git a/test/cli/tracing_test.go b/test/cli/tracing_test.go index 6f19759be..7be60fea0 100644 --- a/test/cli/tracing_test.go +++ b/test/cli/tracing_test.go @@ -76,6 +76,7 @@ func TestTracing(t *testing.T) { node.Runner.Env["OTEL_EXPORTER_OTLP_PROTOCOL"] = "grpc" node.Runner.Env["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:4317" node.StartDaemon() + defer node.StopDaemon() assert.Eventually(t, func() bool { diff --git a/test/cli/transports_test.go b/test/cli/transports_test.go index 43daa8ed4..e36d27287 100644 --- a/test/cli/transports_test.go +++ b/test/cli/transports_test.go @@ -74,6 +74,7 @@ func TestTransports(t *testing.T) { t.Parallel() nodes := tcpNodes(t).StartDaemons().Connect() runTests(nodes) + nodes.StopDaemons() }) t.Run("tcp with NOISE", func(t *testing.T) { @@ -86,6 +87,7 @@ func TestTransports(t *testing.T) { }) nodes.StartDaemons().Connect() runTests(nodes) + nodes.StopDaemons() }) t.Run("QUIC", func(t *testing.T) { @@ -104,6 +106,7 @@ func TestTransports(t *testing.T) { disableRouting(nodes) nodes.StartDaemons().Connect() runTests(nodes) + nodes.StopDaemons() }) t.Run("QUIC+Webtransport", func(t *testing.T) { @@ -122,6 +125,7 @@ func TestTransports(t *testing.T) { disableRouting(nodes) nodes.StartDaemons().Connect() runTests(nodes) + nodes.StopDaemons() }) t.Run("QUIC connects with non-dialable transports", func(t *testing.T) { @@ -144,6 +148,7 @@ func TestTransports(t *testing.T) { disableRouting(nodes) nodes.StartDaemons().Connect() runTests(nodes) + nodes.StopDaemons() }) t.Run("WebRTC Direct", func(t *testing.T) { @@ -162,5 +167,6 @@ func TestTransports(t *testing.T) { disableRouting(nodes) nodes.StartDaemons().Connect() runTests(nodes) + nodes.StopDaemons() }) }