diff --git a/core/commands/p2p.go b/core/commands/p2p.go index 8b0b39023..6046ce453 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -30,10 +30,8 @@ type P2PListenerInfoOutput struct { type P2PStreamInfoOutput struct { HandlerID string Protocol string - LocalPeer string - LocalAddress string - RemotePeer string - RemoteAddress string + OriginAddress string + TargetAddress string } // P2PLsOutput is output type of ls command @@ -61,6 +59,7 @@ are refined`, "stream": p2pStreamCmd, "forward": p2pForwardCmd, + "close": p2pCloseCmd, "ls": p2pLsCmd, }, } @@ -209,6 +208,65 @@ var p2pLsCmd = &cmds.Command{ }, } +var p2pCloseCmd = &cmds.Command{ + Helptext: cmdkit.HelpText{ + Tagline: "Stop listening for new connections to forward.", + }, + Options: []cmdkit.Option{ + cmdkit.BoolOption("all", "a", "Close all listeners."), + cmdkit.StringOption("protocol", "p", "Match protocol name"), + cmdkit.StringOption("listen-address", "l", "Match listen address"), + cmdkit.StringOption("target-address", "t", "Match target address"), + }, + Run: func(req cmds.Request, res cmds.Response) { + res.SetOutput(nil) + + n, err := getNode(req) + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + closeAll, _, _ := req.Option("all").Bool() + proto, p, _ := req.Option("protocol").String() + listen, l, _ := req.Option("listen-address").String() + target, t, _ := req.Option("target-address").String() + + if !(closeAll || p || l || t) { + res.SetError(errors.New("no connection matching options given"), cmdkit.ErrNormal) + return + } + + if closeAll && (p || l || t) { + res.SetError(errors.New("can't combine --all with other matching options"), cmdkit.ErrNormal) + return + } + + match := func(listener p2p.Listener) bool { + out := true + if p { + out = out && (proto == listener.Protocol()) + } + if l { + out = out && (listen == listener.ListenAddress()) + } + if t { + out = out && (target == listener.TargetAddress()) + } + + out = out || closeAll + return out + } + + for _, listener := range n.P2P.Listeners.Listeners { + if !match(listener) { + continue + } + listener.Close() + } + }, +} + /////// // Listener // @@ -222,7 +280,6 @@ var p2pStreamCmd = &cmds.Command{ Subcommands: map[string]*cmds.Command{ "ls": p2pStreamLsCmd, - "dial": p2pStreamDialCmd, "close": p2pStreamCloseCmd, }, } @@ -249,11 +306,8 @@ var p2pStreamLsCmd = &cmds.Command{ Protocol: s.Protocol, - LocalPeer: s.LocalPeer.Pretty(), - LocalAddress: s.LocalAddr.String(), - - RemotePeer: s.RemotePeer.Pretty(), - RemoteAddress: s.RemoteAddr.String(), + OriginAddress: s.OriginAddr.String(), + TargetAddress: s.TargetAddr.String(), }) } @@ -273,10 +327,10 @@ var p2pStreamLsCmd = &cmds.Command{ w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0) for _, stream := range list.Streams { if headers { - fmt.Fprintln(w, "Id\tProtocol\tLocal\tRemote") + fmt.Fprintln(w, "Id\tProtocol\tOrigin\tTarget") } - fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.LocalAddress, stream.RemotePeer) + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.OriginAddress, stream.TargetAddress) } w.Flush() @@ -285,156 +339,6 @@ var p2pStreamLsCmd = &cmds.Command{ }, } -var p2pListenerListenCmd = &cmds.Command{ - Helptext: cmdkit.HelpText{ - Tagline: "Forward p2p connections to a network multiaddr.", - ShortDescription: ` -Register a p2p connection handler and forward the connections to a specified -address. - -Note that the connections originate from the ipfs daemon process. - `, - }, - Arguments: []cmdkit.Argument{ - cmdkit.StringArg("Protocol", true, false, "Protocol identifier."), - cmdkit.StringArg("Address", true, false, "Request handling application address."), - }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := getNode(req) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - proto := "/p2p/" + req.Arguments()[0] - if n.P2P.CheckProtoExists(proto) { - res.SetError(errors.New("protocol handler already registered"), cmdkit.ErrNormal) - return - } - - addr, err := ma.NewMultiaddr(req.Arguments()[1]) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - _, err = n.P2P.ForwardRemote(n.Context(), proto, addr) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - // Successful response. - res.SetOutput(&P2PListenerInfoOutput{ - Protocol: proto, - TargetAddress: addr.String(), - }) - }, -} - -var p2pStreamDialCmd = &cmds.Command{ - Helptext: cmdkit.HelpText{ - Tagline: "Dial to a p2p listener.", - - ShortDescription: ` -Establish a new connection to a peer service. - -When a connection is made to a peer service the ipfs daemon will setup one -time TCP listener and return it's bind port, this way a dialing application -can transparently connect to a p2p service. - `, - }, - Arguments: []cmdkit.Argument{ - cmdkit.StringArg("Peer", true, false, "Remote peer to connect to"), - cmdkit.StringArg("Protocol", true, false, "Protocol identifier."), - cmdkit.StringArg("BindAddress", false, false, "Address to listen for connection/s (default: /ip4/127.0.0.1/tcp/0)."), - }, - Run: func(req cmds.Request, res cmds.Response) { - n, err := getNode(req) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - addr, peer, err := ParsePeerParam(req.Arguments()[0]) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - if addr != nil { - n.Peerstore.AddAddr(peer, addr, pstore.TempAddrTTL) - } - - proto := "/p2p/" + req.Arguments()[1] - - bindAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - if len(req.Arguments()) == 3 { - bindAddr, err = ma.NewMultiaddr(req.Arguments()[2]) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - } - - listenerInfo, err := n.P2P.ForwardLocal(n.Context(), peer, proto, bindAddr) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - output := P2PListenerInfoOutput{ - Protocol: listenerInfo.Protocol(), - ListenAddress: listenerInfo.ListenAddress(), - } - - res.SetOutput(&output) - }, -} - -var p2pListenerCloseCmd = &cmds.Command{ - Helptext: cmdkit.HelpText{ - Tagline: "Close active p2p listener.", - }, - Arguments: []cmdkit.Argument{ - cmdkit.StringArg("Protocol", false, false, "P2P listener protocol"), - }, - Options: []cmdkit.Option{ - cmdkit.BoolOption("all", "a", "Close all listeners."), - }, - Run: func(req cmds.Request, res cmds.Response) { - res.SetOutput(nil) - - n, err := getNode(req) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return - } - - closeAll, _, _ := req.Option("all").Bool() - var proto string - - if !closeAll { - if len(req.Arguments()) == 0 { - res.SetError(errors.New("no protocol name specified"), cmdkit.ErrNormal) - return - } - - proto = "/p2p/" + req.Arguments()[0] - } - - for _, listener := range n.P2P.Listeners.Listeners { - if !closeAll && listener.Protocol() != proto { - continue - } - listener.Close() - if !closeAll { - break - } - } - }, -} - var p2pStreamCloseCmd = &cmds.Command{ Helptext: cmdkit.HelpText{ Tagline: "Close active p2p stream.", diff --git a/p2p/listener.go b/p2p/listener.go index b408b8b74..d51c7a317 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -9,17 +9,31 @@ type Listener interface { Close() error } -// ListenerRegistry is a collection of local application proto listeners. -type ListenerRegistry struct { - Listeners map[string]Listener +type listenerKey struct { + proto string + listen string + target string } -// Register registers listenerInfo2 in this registry -func (c *ListenerRegistry) Register(listenerInfo Listener) { - c.Listeners[listenerInfo.Protocol()] = listenerInfo +// ListenerRegistry is a collection of local application proto listeners. +type ListenerRegistry struct { + Listeners map[listenerKey]Listener +} + +// Register registers listenerInfo in this registry +func (c *ListenerRegistry) Register(l Listener) { + c.Listeners[getListenerKey(l)] = l } // Deregister removes p2p listener from this registry -func (c *ListenerRegistry) Deregister(proto string) { - delete(c.Listeners, proto) +func (c *ListenerRegistry) Deregister(k listenerKey) { + delete(c.Listeners, k) +} + +func getListenerKey(l Listener) listenerKey { + return listenerKey{ + proto: l.Protocol(), + listen: l.ListenAddress(), + target: l.TargetAddress(), + } } diff --git a/p2p/local.go b/p2p/local.go index bc899850a..acb23ba08 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -75,29 +75,32 @@ func (l *localListener) acceptConns() { return } - stream := Stream{ + tgt, err := ma.NewMultiaddr(l.TargetAddress()) + if err != nil { + local.Close() + return + } + + stream := &Stream{ Protocol: l.proto, - LocalPeer: l.id, - LocalAddr: l.listener.Multiaddr(), - - RemotePeer: remote.Conn().RemotePeer(), - RemoteAddr: remote.Conn().RemoteMultiaddr(), + OriginAddr: local.RemoteMultiaddr(), + TargetAddr: tgt, Local: local, Remote: remote, - Registry: &l.p2p.Streams, + Registry: l.p2p.Streams, } - l.p2p.Streams.Register(&stream) + l.p2p.Streams.Register(stream) stream.startStreaming() } } func (l *localListener) Close() error { l.listener.Close() - l.p2p.Listeners.Deregister(l.proto) + l.p2p.Listeners.Deregister(getListenerKey(l)) return nil } diff --git a/p2p/p2p.go b/p2p/p2p.go index 3998dd3b6..407e65864 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -8,8 +8,8 @@ import ( // P2P structure holds information on currently running streams/listeners type P2P struct { - Listeners ListenerRegistry - Streams StreamRegistry + Listeners *ListenerRegistry + Streams *StreamRegistry identity peer.ID peerHost p2phost.Host @@ -23,10 +23,10 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) peerHost: peerHost, peerstore: peerstore, - Listeners: ListenerRegistry{ - Listeners: map[string]Listener{}, + Listeners: &ListenerRegistry{ + Listeners: map[listenerKey]Listener{}, }, - Streams: StreamRegistry{ + Streams: &StreamRegistry{ Streams: map[uint64]*Stream{}, }, } diff --git a/p2p/remote.go b/p2p/remote.go index 391ebbb0d..d466690c5 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -41,16 +41,13 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad stream := Stream{ Protocol: proto, - LocalPeer: p2p.identity, - LocalAddr: addr, - - RemotePeer: remote.Conn().RemotePeer(), - RemoteAddr: remote.Conn().RemoteMultiaddr(), + OriginAddr: remote.Conn().RemoteMultiaddr(), + TargetAddr: addr, Local: local, Remote: remote, - Registry: &p2p.Streams, + Registry: p2p.Streams, } p2p.Streams.Register(&stream) @@ -74,6 +71,6 @@ func (l *remoteListener) TargetAddress() string { func (l *remoteListener) Close() error { l.p2p.peerHost.RemoveStreamHandler(protocol.ID(l.proto)) - l.p2p.Listeners.Deregister(l.proto) + l.p2p.Listeners.Deregister(getListenerKey(l)) return nil } diff --git a/p2p/stream.go b/p2p/stream.go index ed2bf5f0c..f0f0608e1 100644 --- a/p2p/stream.go +++ b/p2p/stream.go @@ -6,7 +6,6 @@ import ( ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" net "gx/ipfs/QmYj8wdn5sZEHX2XMDWGBvcXJNdzVbaVpHmXvhHBVZepen/go-libp2p-net" manet "gx/ipfs/QmcGXGdw9BWDysPJQHxJinjGHha3eEg4vzFETre4woNwcX/go-multiaddr-net" - peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer" ) // Stream holds information on active incoming and outgoing p2p streams. @@ -15,11 +14,8 @@ type Stream struct { Protocol string - LocalPeer peer.ID - LocalAddr ma.Multiaddr - - RemotePeer peer.ID - RemoteAddr ma.Multiaddr + OriginAddr ma.Multiaddr + TargetAddr ma.Multiaddr Local manet.Conn Remote net.Stream