p2p: rework stream/listener registration

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2018-05-25 15:54:39 +02:00
parent 067154f22f
commit df6540e014
6 changed files with 111 additions and 197 deletions

View File

@ -30,10 +30,8 @@ type P2PListenerInfoOutput struct {
type P2PStreamInfoOutput struct {
HandlerID string
Protocol string
LocalPeer string
LocalAddress string
RemotePeer string
RemoteAddress string
OriginAddress string
TargetAddress string
}
// P2PLsOutput is output type of ls command
@ -61,6 +59,7 @@ are refined`,
"stream": p2pStreamCmd,
"forward": p2pForwardCmd,
"close": p2pCloseCmd,
"ls": p2pLsCmd,
},
}
@ -209,6 +208,65 @@ var p2pLsCmd = &cmds.Command{
},
}
var p2pCloseCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Stop listening for new connections to forward.",
},
Options: []cmdkit.Option{
cmdkit.BoolOption("all", "a", "Close all listeners."),
cmdkit.StringOption("protocol", "p", "Match protocol name"),
cmdkit.StringOption("listen-address", "l", "Match listen address"),
cmdkit.StringOption("target-address", "t", "Match target address"),
},
Run: func(req cmds.Request, res cmds.Response) {
res.SetOutput(nil)
n, err := getNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
closeAll, _, _ := req.Option("all").Bool()
proto, p, _ := req.Option("protocol").String()
listen, l, _ := req.Option("listen-address").String()
target, t, _ := req.Option("target-address").String()
if !(closeAll || p || l || t) {
res.SetError(errors.New("no connection matching options given"), cmdkit.ErrNormal)
return
}
if closeAll && (p || l || t) {
res.SetError(errors.New("can't combine --all with other matching options"), cmdkit.ErrNormal)
return
}
match := func(listener p2p.Listener) bool {
out := true
if p {
out = out && (proto == listener.Protocol())
}
if l {
out = out && (listen == listener.ListenAddress())
}
if t {
out = out && (target == listener.TargetAddress())
}
out = out || closeAll
return out
}
for _, listener := range n.P2P.Listeners.Listeners {
if !match(listener) {
continue
}
listener.Close()
}
},
}
///////
// Listener
//
@ -222,7 +280,6 @@ var p2pStreamCmd = &cmds.Command{
Subcommands: map[string]*cmds.Command{
"ls": p2pStreamLsCmd,
"dial": p2pStreamDialCmd,
"close": p2pStreamCloseCmd,
},
}
@ -249,11 +306,8 @@ var p2pStreamLsCmd = &cmds.Command{
Protocol: s.Protocol,
LocalPeer: s.LocalPeer.Pretty(),
LocalAddress: s.LocalAddr.String(),
RemotePeer: s.RemotePeer.Pretty(),
RemoteAddress: s.RemoteAddr.String(),
OriginAddress: s.OriginAddr.String(),
TargetAddress: s.TargetAddr.String(),
})
}
@ -273,10 +327,10 @@ var p2pStreamLsCmd = &cmds.Command{
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, stream := range list.Streams {
if headers {
fmt.Fprintln(w, "Id\tProtocol\tLocal\tRemote")
fmt.Fprintln(w, "Id\tProtocol\tOrigin\tTarget")
}
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.LocalAddress, stream.RemotePeer)
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.OriginAddress, stream.TargetAddress)
}
w.Flush()
@ -285,156 +339,6 @@ var p2pStreamLsCmd = &cmds.Command{
},
}
var p2pListenerListenCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Forward p2p connections to a network multiaddr.",
ShortDescription: `
Register a p2p connection handler and forward the connections to a specified
address.
Note that the connections originate from the ipfs daemon process.
`,
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("Protocol", true, false, "Protocol identifier."),
cmdkit.StringArg("Address", true, false, "Request handling application address."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
proto := "/p2p/" + req.Arguments()[0]
if n.P2P.CheckProtoExists(proto) {
res.SetError(errors.New("protocol handler already registered"), cmdkit.ErrNormal)
return
}
addr, err := ma.NewMultiaddr(req.Arguments()[1])
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
_, err = n.P2P.ForwardRemote(n.Context(), proto, addr)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
// Successful response.
res.SetOutput(&P2PListenerInfoOutput{
Protocol: proto,
TargetAddress: addr.String(),
})
},
}
var p2pStreamDialCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Dial to a p2p listener.",
ShortDescription: `
Establish a new connection to a peer service.
When a connection is made to a peer service the ipfs daemon will setup one
time TCP listener and return it's bind port, this way a dialing application
can transparently connect to a p2p service.
`,
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("Peer", true, false, "Remote peer to connect to"),
cmdkit.StringArg("Protocol", true, false, "Protocol identifier."),
cmdkit.StringArg("BindAddress", false, false, "Address to listen for connection/s (default: /ip4/127.0.0.1/tcp/0)."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := getNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
addr, peer, err := ParsePeerParam(req.Arguments()[0])
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
if addr != nil {
n.Peerstore.AddAddr(peer, addr, pstore.TempAddrTTL)
}
proto := "/p2p/" + req.Arguments()[1]
bindAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
if len(req.Arguments()) == 3 {
bindAddr, err = ma.NewMultiaddr(req.Arguments()[2])
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
}
listenerInfo, err := n.P2P.ForwardLocal(n.Context(), peer, proto, bindAddr)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
output := P2PListenerInfoOutput{
Protocol: listenerInfo.Protocol(),
ListenAddress: listenerInfo.ListenAddress(),
}
res.SetOutput(&output)
},
}
var p2pListenerCloseCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Close active p2p listener.",
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("Protocol", false, false, "P2P listener protocol"),
},
Options: []cmdkit.Option{
cmdkit.BoolOption("all", "a", "Close all listeners."),
},
Run: func(req cmds.Request, res cmds.Response) {
res.SetOutput(nil)
n, err := getNode(req)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
closeAll, _, _ := req.Option("all").Bool()
var proto string
if !closeAll {
if len(req.Arguments()) == 0 {
res.SetError(errors.New("no protocol name specified"), cmdkit.ErrNormal)
return
}
proto = "/p2p/" + req.Arguments()[0]
}
for _, listener := range n.P2P.Listeners.Listeners {
if !closeAll && listener.Protocol() != proto {
continue
}
listener.Close()
if !closeAll {
break
}
}
},
}
var p2pStreamCloseCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Close active p2p stream.",

View File

@ -9,17 +9,31 @@ type Listener interface {
Close() error
}
// ListenerRegistry is a collection of local application proto listeners.
type ListenerRegistry struct {
Listeners map[string]Listener
type listenerKey struct {
proto string
listen string
target string
}
// Register registers listenerInfo2 in this registry
func (c *ListenerRegistry) Register(listenerInfo Listener) {
c.Listeners[listenerInfo.Protocol()] = listenerInfo
// ListenerRegistry is a collection of local application proto listeners.
type ListenerRegistry struct {
Listeners map[listenerKey]Listener
}
// Register registers listenerInfo in this registry
func (c *ListenerRegistry) Register(l Listener) {
c.Listeners[getListenerKey(l)] = l
}
// Deregister removes p2p listener from this registry
func (c *ListenerRegistry) Deregister(proto string) {
delete(c.Listeners, proto)
func (c *ListenerRegistry) Deregister(k listenerKey) {
delete(c.Listeners, k)
}
func getListenerKey(l Listener) listenerKey {
return listenerKey{
proto: l.Protocol(),
listen: l.ListenAddress(),
target: l.TargetAddress(),
}
}

View File

@ -75,29 +75,32 @@ func (l *localListener) acceptConns() {
return
}
stream := Stream{
tgt, err := ma.NewMultiaddr(l.TargetAddress())
if err != nil {
local.Close()
return
}
stream := &Stream{
Protocol: l.proto,
LocalPeer: l.id,
LocalAddr: l.listener.Multiaddr(),
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
OriginAddr: local.RemoteMultiaddr(),
TargetAddr: tgt,
Local: local,
Remote: remote,
Registry: &l.p2p.Streams,
Registry: l.p2p.Streams,
}
l.p2p.Streams.Register(&stream)
l.p2p.Streams.Register(stream)
stream.startStreaming()
}
}
func (l *localListener) Close() error {
l.listener.Close()
l.p2p.Listeners.Deregister(l.proto)
l.p2p.Listeners.Deregister(getListenerKey(l))
return nil
}

View File

@ -8,8 +8,8 @@ import (
// P2P structure holds information on currently running streams/listeners
type P2P struct {
Listeners ListenerRegistry
Streams StreamRegistry
Listeners *ListenerRegistry
Streams *StreamRegistry
identity peer.ID
peerHost p2phost.Host
@ -23,10 +23,10 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
peerHost: peerHost,
peerstore: peerstore,
Listeners: ListenerRegistry{
Listeners: map[string]Listener{},
Listeners: &ListenerRegistry{
Listeners: map[listenerKey]Listener{},
},
Streams: StreamRegistry{
Streams: &StreamRegistry{
Streams: map[uint64]*Stream{},
},
}

View File

@ -41,16 +41,13 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto string, addr ma.Multiad
stream := Stream{
Protocol: proto,
LocalPeer: p2p.identity,
LocalAddr: addr,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
OriginAddr: remote.Conn().RemoteMultiaddr(),
TargetAddr: addr,
Local: local,
Remote: remote,
Registry: &p2p.Streams,
Registry: p2p.Streams,
}
p2p.Streams.Register(&stream)
@ -74,6 +71,6 @@ func (l *remoteListener) TargetAddress() string {
func (l *remoteListener) Close() error {
l.p2p.peerHost.RemoveStreamHandler(protocol.ID(l.proto))
l.p2p.Listeners.Deregister(l.proto)
l.p2p.Listeners.Deregister(getListenerKey(l))
return nil
}

View File

@ -6,7 +6,6 @@ import (
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
net "gx/ipfs/QmYj8wdn5sZEHX2XMDWGBvcXJNdzVbaVpHmXvhHBVZepen/go-libp2p-net"
manet "gx/ipfs/QmcGXGdw9BWDysPJQHxJinjGHha3eEg4vzFETre4woNwcX/go-multiaddr-net"
peer "gx/ipfs/QmcJukH2sAFjY3HdBKq35WDzWoL3UUu2gt9wdfqZTUyM74/go-libp2p-peer"
)
// Stream holds information on active incoming and outgoing p2p streams.
@ -15,11 +14,8 @@ type Stream struct {
Protocol string
LocalPeer peer.ID
LocalAddr ma.Multiaddr
RemotePeer peer.ID
RemoteAddr ma.Multiaddr
OriginAddr ma.Multiaddr
TargetAddr ma.Multiaddr
Local manet.Conn
Remote net.Stream