mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-11 11:19:05 +08:00
* feat(p2p): add --foreground flag to listen and forward commands
adds `-f/--foreground` option that keeps the command running until
interrupted (SIGTERM/Ctrl+C) or closed via `ipfs p2p close`. the
listener/forwarder is automatically removed when the command exits.
useful for systemd services and scripts that need cleanup on exit.
* docs: add p2p-tunnels.md with systemd examples
- add dedicated docs/p2p-tunnels.md covering:
- why p2p tunnels (NAT traversal, no public IP needed)
- quick start with netcat
- background and foreground modes
- systemd integration with path-based activation
- security considerations and troubleshooting
- document Experimental.Libp2pStreamMounting in docs/config.md
- simplify docs/experimental-features.md, link to new doc
- add "Learn more" links to ipfs p2p listen/forward --help
- update changelog entry with doc link
- add cross-reference in misc/README.md
* chore: reference kubo#5460 for p2p config
Ref. https://github.com/ipfs/kubo/issues/5460
* fix(daemon): write api/gateway files only after HTTP server is ready
fixes race condition where $IPFS_PATH/api and $IPFS_PATH/gateway files
were written before the HTTP servers were ready to accept connections.
this caused issues for tools like systemd path units that immediately
try to connect when these files appear.
changes:
- add corehttp.ServeWithReady() that signals when server is ready
- wait for ready signal before writing address files
- use sync.WaitGroup.Go() (Go 1.25) for cleaner goroutine management
- add TestAddressFileReady to verify both api and gateway files
* fix(daemon): buffer errc channel and wait for all listeners
- buffer error channel with len(listeners) to prevent deadlock when
multiple servers write errors simultaneously
- wait for ALL listeners to be ready before writing api/gateway file,
not just the first one
Feedback-from: https://github.com/ipfs/kubo/pull/11099#pullrequestreview-3593885839
* docs(changelog): improve p2p tunnel section clarity
reframe to lead with user benefit and add example output
* docs(p2p): remove obsolete race condition caveat
the "First launch fails but restarts work" troubleshooting section
described a race where the api file was written before the daemon was
ready. this was fixed in 80b703a which ensures api/gateway files are
only written after HTTP servers are ready to accept connections.
---------
Co-authored-by: Andrew Gillis <11790789+gammazero@users.noreply.github.com>
711 lines
19 KiB
Go
711 lines
19 KiB
Go
package commands
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
core "github.com/ipfs/kubo/core"
|
|
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
|
|
p2p "github.com/ipfs/kubo/p2p"
|
|
|
|
cmds "github.com/ipfs/go-ipfs-cmds"
|
|
peer "github.com/libp2p/go-libp2p/core/peer"
|
|
pstore "github.com/libp2p/go-libp2p/core/peerstore"
|
|
protocol "github.com/libp2p/go-libp2p/core/protocol"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
madns "github.com/multiformats/go-multiaddr-dns"
|
|
)
|
|
|
|
// P2PProtoPrefix is the default required prefix for protocol names
|
|
const P2PProtoPrefix = "/x/"
|
|
|
|
// P2PListenerInfoOutput is output type of ls command
|
|
type P2PListenerInfoOutput struct {
|
|
Protocol string
|
|
ListenAddress string
|
|
TargetAddress string
|
|
}
|
|
|
|
// P2PStreamInfoOutput is output type of streams command
|
|
type P2PStreamInfoOutput struct {
|
|
HandlerID string
|
|
Protocol string
|
|
OriginAddress string
|
|
TargetAddress string
|
|
}
|
|
|
|
// P2PLsOutput is output type of ls command
|
|
type P2PLsOutput struct {
|
|
Listeners []P2PListenerInfoOutput
|
|
}
|
|
|
|
// P2PStreamsOutput is output type of streams command
|
|
type P2PStreamsOutput struct {
|
|
Streams []P2PStreamInfoOutput
|
|
}
|
|
|
|
// P2PForegroundOutput is output type for foreground mode status messages
|
|
type P2PForegroundOutput struct {
|
|
Status string // "active" or "closing"
|
|
Protocol string
|
|
Address string
|
|
}
|
|
|
|
const (
|
|
allowCustomProtocolOptionName = "allow-custom-protocol"
|
|
reportPeerIDOptionName = "report-peer-id"
|
|
foregroundOptionName = "foreground"
|
|
)
|
|
|
|
var resolveTimeout = 10 * time.Second
|
|
|
|
// P2PCmd is the 'ipfs p2p' command
|
|
var P2PCmd = &cmds.Command{
|
|
Status: cmds.Experimental,
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Libp2p stream mounting.",
|
|
ShortDescription: `
|
|
Create and use tunnels to remote peers over libp2p
|
|
|
|
Note: this command is experimental and subject to change as usecases and APIs
|
|
are refined`,
|
|
},
|
|
|
|
Subcommands: map[string]*cmds.Command{
|
|
"stream": p2pStreamCmd,
|
|
"forward": p2pForwardCmd,
|
|
"listen": p2pListenCmd,
|
|
"close": p2pCloseCmd,
|
|
"ls": p2pLsCmd,
|
|
},
|
|
}
|
|
|
|
var p2pForwardCmd = &cmds.Command{
|
|
Status: cmds.Experimental,
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Forward connections to libp2p service.",
|
|
ShortDescription: `
|
|
Forward connections made to <listen-address> to <target-address> via libp2p.
|
|
|
|
Creates a local TCP listener that tunnels connections through libp2p to a
|
|
remote peer's p2p listener. Similar to SSH port forwarding (-L flag).
|
|
|
|
ARGUMENTS:
|
|
|
|
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
|
|
<listen-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
|
|
<target-address> Remote peer multiaddr (e.g., /p2p/PeerID)
|
|
|
|
FOREGROUND MODE (--foreground, -f):
|
|
|
|
By default, the forwarder runs in the daemon and the command returns
|
|
immediately. Use --foreground to block until interrupted:
|
|
|
|
- Ctrl+C or SIGTERM: Removes the forwarder and exits
|
|
- 'ipfs p2p close': Removes the forwarder and exits
|
|
- Daemon shutdown: Forwarder is automatically removed
|
|
|
|
Useful for systemd services or scripts that need cleanup on exit.
|
|
|
|
EXAMPLES:
|
|
|
|
# Persistent forwarder (command returns immediately)
|
|
ipfs p2p forward /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
|
|
|
|
# Temporary forwarder (removed when command exits)
|
|
ipfs p2p forward -f /x/myapp /ip4/127.0.0.1/tcp/3000 /p2p/PeerID
|
|
|
|
Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md
|
|
`,
|
|
},
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("protocol", true, false, "Protocol name."),
|
|
cmds.StringArg("listen-address", true, false, "Listening endpoint."),
|
|
cmds.StringArg("target-address", true, false, "Target endpoint."),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
|
|
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; forwarder is removed when command exits"),
|
|
},
|
|
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
|
n, err := p2pGetNode(env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
protoOpt := req.Arguments[0]
|
|
listenOpt := req.Arguments[1]
|
|
targetOpt := req.Arguments[2]
|
|
|
|
proto := protocol.ID(protoOpt)
|
|
|
|
listen, err := ma.NewMultiaddr(listenOpt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
targets, err := parseIpfsAddr(targetOpt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
allowCustom, _ := req.Options[allowCustomProtocolOptionName].(bool)
|
|
|
|
if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) {
|
|
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
|
|
}
|
|
|
|
listener, err := forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
foreground, _ := req.Options[foregroundOptionName].(bool)
|
|
if foreground {
|
|
if err := res.Emit(&P2PForegroundOutput{
|
|
Status: "active",
|
|
Protocol: protoOpt,
|
|
Address: listenOpt,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
|
|
// or listener removal (ipfs p2p close)
|
|
select {
|
|
case <-req.Context.Done():
|
|
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
|
|
n.P2P.ListenersLocal.Close(func(l p2p.Listener) bool {
|
|
return l == listener
|
|
})
|
|
return nil
|
|
case <-listener.Done():
|
|
// Closed via "ipfs p2p close" - emit closing message
|
|
return res.Emit(&P2PForegroundOutput{
|
|
Status: "closing",
|
|
Protocol: protoOpt,
|
|
Address: listenOpt,
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
},
|
|
Type: P2PForegroundOutput{},
|
|
Encoders: cmds.EncoderMap{
|
|
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
|
|
if out.Status == "active" {
|
|
fmt.Fprintf(w, "Forwarding %s to %s, waiting for interrupt...\n", out.Protocol, out.Address)
|
|
} else if out.Status == "closing" {
|
|
fmt.Fprintf(w, "Received interrupt, removing forwarder for %s\n", out.Protocol)
|
|
}
|
|
return nil
|
|
}),
|
|
},
|
|
}
|
|
|
|
// parseIpfsAddr is a function that takes in addr string and return ipfsAddrs
|
|
func parseIpfsAddr(addr string) (*peer.AddrInfo, error) {
|
|
multiaddr, err := ma.NewMultiaddr(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pi, err := peer.AddrInfoFromP2pAddr(multiaddr)
|
|
if err == nil {
|
|
return pi, nil
|
|
}
|
|
|
|
// resolve multiaddr whose protocol is not ma.P_IPFS
|
|
ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)
|
|
defer cancel()
|
|
addrs, err := madns.Resolve(ctx, multiaddr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(addrs) == 0 {
|
|
return nil, errors.New("fail to resolve the multiaddr:" + multiaddr.String())
|
|
}
|
|
var info peer.AddrInfo
|
|
for _, addr := range addrs {
|
|
taddr, id := peer.SplitAddr(addr)
|
|
if id == "" {
|
|
// not an ipfs addr, skipping.
|
|
continue
|
|
}
|
|
switch info.ID {
|
|
case "":
|
|
info.ID = id
|
|
case id:
|
|
default:
|
|
return nil, fmt.Errorf(
|
|
"ambiguous multiaddr %s could refer to %s or %s",
|
|
multiaddr,
|
|
info.ID,
|
|
id,
|
|
)
|
|
}
|
|
info.Addrs = append(info.Addrs, taddr)
|
|
}
|
|
return &info, nil
|
|
}
|
|
|
|
var p2pListenCmd = &cmds.Command{
|
|
Status: cmds.Experimental,
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Create libp2p service.",
|
|
ShortDescription: `
|
|
Create a libp2p protocol handler that forwards incoming connections to
|
|
<target-address>.
|
|
|
|
When a remote peer connects using 'ipfs p2p forward', the connection is
|
|
forwarded to your local service. Similar to SSH port forwarding (server side).
|
|
|
|
ARGUMENTS:
|
|
|
|
<protocol> Protocol name (must start with '` + P2PProtoPrefix + `')
|
|
<target-address> Local multiaddr (e.g., /ip4/127.0.0.1/tcp/3000)
|
|
|
|
FOREGROUND MODE (--foreground, -f):
|
|
|
|
By default, the listener runs in the daemon and the command returns
|
|
immediately. Use --foreground to block until interrupted:
|
|
|
|
- Ctrl+C or SIGTERM: Removes the listener and exits
|
|
- 'ipfs p2p close': Removes the listener and exits
|
|
- Daemon shutdown: Listener is automatically removed
|
|
|
|
Useful for systemd services or scripts that need cleanup on exit.
|
|
|
|
EXAMPLES:
|
|
|
|
# Persistent listener (command returns immediately)
|
|
ipfs p2p listen /x/myapp /ip4/127.0.0.1/tcp/3000
|
|
|
|
# Temporary listener (removed when command exits)
|
|
ipfs p2p listen -f /x/myapp /ip4/127.0.0.1/tcp/3000
|
|
|
|
# Report connecting peer ID to the target application
|
|
ipfs p2p listen -r /x/myapp /ip4/127.0.0.1/tcp/3000
|
|
|
|
Learn more: https://github.com/ipfs/kubo/blob/master/docs/p2p-tunnels.md
|
|
`,
|
|
},
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("protocol", true, false, "Protocol name."),
|
|
cmds.StringArg("target-address", true, false, "Target endpoint."),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
|
|
cmds.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"),
|
|
cmds.BoolOption(foregroundOptionName, "f", "Run in foreground; listener is removed when command exits"),
|
|
},
|
|
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
|
n, err := p2pGetNode(env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
protoOpt := req.Arguments[0]
|
|
targetOpt := req.Arguments[1]
|
|
|
|
proto := protocol.ID(protoOpt)
|
|
|
|
target, err := ma.NewMultiaddr(targetOpt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// port can't be 0
|
|
if err := checkPort(target); err != nil {
|
|
return err
|
|
}
|
|
|
|
allowCustom, _ := req.Options[allowCustomProtocolOptionName].(bool)
|
|
reportPeerID, _ := req.Options[reportPeerIDOptionName].(bool)
|
|
|
|
if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) {
|
|
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
|
|
}
|
|
|
|
listener, err := n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
foreground, _ := req.Options[foregroundOptionName].(bool)
|
|
if foreground {
|
|
if err := res.Emit(&P2PForegroundOutput{
|
|
Status: "active",
|
|
Protocol: protoOpt,
|
|
Address: targetOpt,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
// Wait for either context cancellation (Ctrl+C/daemon shutdown)
|
|
// or listener removal (ipfs p2p close)
|
|
select {
|
|
case <-req.Context.Done():
|
|
// SIGTERM/Ctrl+C - cleanup silently (CLI stream already closing)
|
|
n.P2P.ListenersP2P.Close(func(l p2p.Listener) bool {
|
|
return l == listener
|
|
})
|
|
return nil
|
|
case <-listener.Done():
|
|
// Closed via "ipfs p2p close" - emit closing message
|
|
return res.Emit(&P2PForegroundOutput{
|
|
Status: "closing",
|
|
Protocol: protoOpt,
|
|
Address: targetOpt,
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
},
|
|
Type: P2PForegroundOutput{},
|
|
Encoders: cmds.EncoderMap{
|
|
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PForegroundOutput) error {
|
|
if out.Status == "active" {
|
|
fmt.Fprintf(w, "Listening on %s, forwarding to %s, waiting for interrupt...\n", out.Protocol, out.Address)
|
|
} else if out.Status == "closing" {
|
|
fmt.Fprintf(w, "Received interrupt, removing listener for %s\n", out.Protocol)
|
|
}
|
|
return nil
|
|
}),
|
|
},
|
|
}
|
|
|
|
// checkPort checks whether target multiaddr contains tcp or udp protocol
|
|
// and whether the port is equal to 0
|
|
func checkPort(target ma.Multiaddr) error {
|
|
// get tcp or udp port from multiaddr
|
|
getPort := func() (string, error) {
|
|
sport, _ := target.ValueForProtocol(ma.P_TCP)
|
|
if sport != "" {
|
|
return sport, nil
|
|
}
|
|
|
|
sport, _ = target.ValueForProtocol(ma.P_UDP)
|
|
if sport != "" {
|
|
return sport, nil
|
|
}
|
|
return "", errors.New("address does not contain tcp or udp protocol")
|
|
}
|
|
|
|
sport, err := getPort()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
port, err := strconv.Atoi(sport)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if port == 0 {
|
|
return errors.New("port can not be 0")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// forwardLocal forwards local connections to a libp2p service
|
|
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addr *peer.AddrInfo) (p2p.Listener, error) {
|
|
ps.AddAddrs(addr.ID, addr.Addrs, pstore.TempAddrTTL)
|
|
return p.ForwardLocal(ctx, addr.ID, proto, bindAddr)
|
|
}
|
|
|
|
const (
|
|
p2pHeadersOptionName = "headers"
|
|
)
|
|
|
|
var p2pLsCmd = &cmds.Command{
|
|
Status: cmds.Experimental,
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "List active p2p listeners.",
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption(p2pHeadersOptionName, "v", "Print table headers (Protocol, Listen, Target)."),
|
|
},
|
|
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
|
n, err := p2pGetNode(env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
output := &P2PLsOutput{}
|
|
|
|
n.P2P.ListenersLocal.Lock()
|
|
for _, listener := range n.P2P.ListenersLocal.Listeners {
|
|
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
|
|
Protocol: string(listener.Protocol()),
|
|
ListenAddress: listener.ListenAddress().String(),
|
|
TargetAddress: listener.TargetAddress().String(),
|
|
})
|
|
}
|
|
n.P2P.ListenersLocal.Unlock()
|
|
|
|
n.P2P.ListenersP2P.Lock()
|
|
for _, listener := range n.P2P.ListenersP2P.Listeners {
|
|
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
|
|
Protocol: string(listener.Protocol()),
|
|
ListenAddress: listener.ListenAddress().String(),
|
|
TargetAddress: listener.TargetAddress().String(),
|
|
})
|
|
}
|
|
n.P2P.ListenersP2P.Unlock()
|
|
|
|
return cmds.EmitOnce(res, output)
|
|
},
|
|
Type: P2PLsOutput{},
|
|
Encoders: cmds.EncoderMap{
|
|
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PLsOutput) error {
|
|
headers, _ := req.Options[p2pHeadersOptionName].(bool)
|
|
tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
|
|
for _, listener := range out.Listeners {
|
|
if headers {
|
|
fmt.Fprintln(tw, "Protocol\tListen Address\tTarget Address")
|
|
}
|
|
|
|
fmt.Fprintf(tw, "%s\t%s\t%s\n", listener.Protocol, listener.ListenAddress, listener.TargetAddress)
|
|
}
|
|
tw.Flush()
|
|
|
|
return nil
|
|
}),
|
|
},
|
|
}
|
|
|
|
const (
|
|
p2pAllOptionName = "all"
|
|
p2pProtocolOptionName = "protocol"
|
|
p2pListenAddressOptionName = "listen-address"
|
|
p2pTargetAddressOptionName = "target-address"
|
|
)
|
|
|
|
var p2pCloseCmd = &cmds.Command{
|
|
Status: cmds.Experimental,
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Stop listening for new connections to forward.",
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption(p2pAllOptionName, "a", "Close all listeners."),
|
|
cmds.StringOption(p2pProtocolOptionName, "p", "Match protocol name"),
|
|
cmds.StringOption(p2pListenAddressOptionName, "l", "Match listen address"),
|
|
cmds.StringOption(p2pTargetAddressOptionName, "t", "Match target address"),
|
|
},
|
|
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
|
n, err := p2pGetNode(env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
closeAll, _ := req.Options[p2pAllOptionName].(bool)
|
|
protoOpt, p := req.Options[p2pProtocolOptionName].(string)
|
|
listenOpt, l := req.Options[p2pListenAddressOptionName].(string)
|
|
targetOpt, t := req.Options[p2pTargetAddressOptionName].(string)
|
|
|
|
proto := protocol.ID(protoOpt)
|
|
|
|
var target, listen ma.Multiaddr
|
|
|
|
if l {
|
|
listen, err = ma.NewMultiaddr(listenOpt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if t {
|
|
target, err = ma.NewMultiaddr(targetOpt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !(closeAll || p || l || t) {
|
|
return errors.New("no matching options given")
|
|
}
|
|
|
|
if closeAll && (p || l || t) {
|
|
return errors.New("can't combine --all with other matching options")
|
|
}
|
|
|
|
match := func(listener p2p.Listener) bool {
|
|
if closeAll {
|
|
return true
|
|
}
|
|
if p && proto != listener.Protocol() {
|
|
return false
|
|
}
|
|
if l && !listen.Equal(listener.ListenAddress()) {
|
|
return false
|
|
}
|
|
if t && !target.Equal(listener.TargetAddress()) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
done := n.P2P.ListenersLocal.Close(match)
|
|
done += n.P2P.ListenersP2P.Close(match)
|
|
|
|
return cmds.EmitOnce(res, done)
|
|
},
|
|
Type: int(0),
|
|
Encoders: cmds.EncoderMap{
|
|
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out int) error {
|
|
fmt.Fprintf(w, "Closed %d stream(s)\n", out)
|
|
return nil
|
|
}),
|
|
},
|
|
}
|
|
|
|
///////
|
|
// Stream
|
|
//
|
|
|
|
// p2pStreamCmd is the 'ipfs p2p stream' command
|
|
var p2pStreamCmd = &cmds.Command{
|
|
Status: cmds.Experimental,
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "P2P stream management.",
|
|
ShortDescription: "Create and manage p2p streams",
|
|
},
|
|
|
|
Subcommands: map[string]*cmds.Command{
|
|
"ls": p2pStreamLsCmd,
|
|
"close": p2pStreamCloseCmd,
|
|
},
|
|
}
|
|
|
|
var p2pStreamLsCmd = &cmds.Command{
|
|
Status: cmds.Experimental,
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "List active p2p streams.",
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption(p2pHeadersOptionName, "v", "Print table headers (ID, Protocol, Local, Remote)."),
|
|
},
|
|
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
|
n, err := p2pGetNode(env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
output := &P2PStreamsOutput{}
|
|
|
|
n.P2P.Streams.Lock()
|
|
for id, s := range n.P2P.Streams.Streams {
|
|
output.Streams = append(output.Streams, P2PStreamInfoOutput{
|
|
HandlerID: strconv.FormatUint(id, 10),
|
|
|
|
Protocol: string(s.Protocol),
|
|
|
|
OriginAddress: s.OriginAddr.String(),
|
|
TargetAddress: s.TargetAddr.String(),
|
|
})
|
|
}
|
|
n.P2P.Streams.Unlock()
|
|
|
|
return cmds.EmitOnce(res, output)
|
|
},
|
|
Type: P2PStreamsOutput{},
|
|
Encoders: cmds.EncoderMap{
|
|
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PStreamsOutput) error {
|
|
headers, _ := req.Options[p2pHeadersOptionName].(bool)
|
|
tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
|
|
for _, stream := range out.Streams {
|
|
if headers {
|
|
fmt.Fprintln(tw, "ID\tProtocol\tOrigin\tTarget")
|
|
}
|
|
|
|
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.OriginAddress, stream.TargetAddress)
|
|
}
|
|
tw.Flush()
|
|
|
|
return nil
|
|
}),
|
|
},
|
|
}
|
|
|
|
var p2pStreamCloseCmd = &cmds.Command{
|
|
Status: cmds.Experimental,
|
|
Helptext: cmds.HelpText{
|
|
Tagline: "Close active p2p stream.",
|
|
},
|
|
Arguments: []cmds.Argument{
|
|
cmds.StringArg("id", false, false, "Stream identifier"),
|
|
},
|
|
Options: []cmds.Option{
|
|
cmds.BoolOption(p2pAllOptionName, "a", "Close all streams."),
|
|
},
|
|
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
|
n, err := p2pGetNode(env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
closeAll, _ := req.Options[p2pAllOptionName].(bool)
|
|
var handlerID uint64
|
|
|
|
if !closeAll {
|
|
if len(req.Arguments) == 0 {
|
|
return errors.New("no id specified")
|
|
}
|
|
|
|
handlerID, err = strconv.ParseUint(req.Arguments[0], 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
toClose := make([]*p2p.Stream, 0, 1)
|
|
n.P2P.Streams.Lock()
|
|
for id, stream := range n.P2P.Streams.Streams {
|
|
if !closeAll && handlerID != id {
|
|
continue
|
|
}
|
|
toClose = append(toClose, stream)
|
|
if !closeAll {
|
|
break
|
|
}
|
|
}
|
|
n.P2P.Streams.Unlock()
|
|
|
|
for _, s := range toClose {
|
|
n.P2P.Streams.Reset(s)
|
|
}
|
|
|
|
return nil
|
|
},
|
|
}
|
|
|
|
func p2pGetNode(env cmds.Environment) (*core.IpfsNode, error) {
|
|
nd, err := cmdenv.GetNode(env)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
config, err := nd.Repo.Config()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !config.Experimental.Libp2pStreamMounting {
|
|
return nil, errors.New("libp2p stream mounting not enabled")
|
|
}
|
|
|
|
if !nd.IsOnline {
|
|
return nil, ErrNotOnline
|
|
}
|
|
|
|
return nd, nil
|
|
}
|