From dd48b8237a402c765e8afb7360336ef194b06893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 5 Sep 2018 23:06:17 +0200 Subject: [PATCH] p2p: separate listener types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/commands/p2p.go | 34 ++++++-- p2p/local.go | 6 +- p2p/local_listener.go | 113 +++++++++++++++++++++++++++ p2p/p2p.go | 9 ++- p2p/{listener.go => p2p_listener.go} | 34 +++----- p2p/remote.go | 6 +- 6 files changed, 162 insertions(+), 40 deletions(-) create mode 100644 p2p/local_listener.go rename p2p/{listener.go => p2p_listener.go} (75%) diff --git a/core/commands/p2p.go b/core/commands/p2p.go index 8769af6cd..f8063040c 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -230,15 +230,25 @@ var p2pLsCmd = &cmds.Command{ output := &P2PLsOutput{} - n.P2P.Listeners.Lock() - for _, listener := range n.P2P.Listeners.Listeners { + n.P2P.ListenersLocal.Lock() + for _, listener := range n.P2P.ListenersLocal.Listeners { output.Listeners = append(output.Listeners, P2PListenerInfoOutput{ Protocol: string(listener.Protocol()), ListenAddress: listener.ListenAddress().String(), TargetAddress: listener.TargetAddress().String(), }) } - n.P2P.Listeners.Unlock() + n.P2P.ListenersLocal.Unlock() + + n.P2P.ListenersP2P.Lock() + for _, listener := range n.P2P.ListenersP2P.Listeners { + output.Listeners = append(output.Listeners, P2PListenerInfoOutput{ + Protocol: string(listener.Protocol()), + ListenAddress: listener.ListenAddress().String(), + TargetAddress: listener.TargetAddress().String(), + }) + } + n.P2P.ListenersP2P.Unlock() res.SetOutput(output) }, @@ -314,7 +324,7 @@ var p2pCloseCmd = &cmds.Command{ return } - match := func(listener p2p.Listener) bool { + match := func(listener p2p.ListenerLocal) bool { if closeAll { return true } @@ -330,15 +340,23 @@ var p2pCloseCmd = &cmds.Command{ return true } - todo := make([]p2p.Listener, 0) - n.P2P.Listeners.Lock() - for _, l := range n.P2P.Listeners.Listeners { + todo := make([]p2p.ListenerLocal, 0) + n.P2P.ListenersLocal.Lock() + for _, l := range n.P2P.ListenersLocal.Listeners { if !match(l) { continue } todo = append(todo, l) } - n.P2P.Listeners.Unlock() + n.P2P.ListenersLocal.Unlock() + n.P2P.ListenersP2P.Lock() + for _, l := range n.P2P.ListenersP2P.Listeners { + if !match(l) { + continue + } + todo = append(todo, l) + } + n.P2P.ListenersP2P.Unlock() var errs []string for _, l := range todo { diff --git a/p2p/local.go b/p2p/local.go index c763ebb40..70fd80d12 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -27,7 +27,7 @@ type localListener struct { } // ForwardLocal creates new P2P stream to a remote listener -func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) { +func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (ListenerLocal, error) { listener := &localListener{ ctx: ctx, @@ -39,7 +39,7 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I peer: peer, } - if err := p2p.Listeners.Register(listener); err != nil { + if err := p2p.ListenersLocal.Register(listener); err != nil { return nil, err } @@ -111,7 +111,7 @@ func (l *localListener) start() error { } func (l *localListener) Close() error { - ok, err := l.p2p.Listeners.Deregister(getListenerKey(l)) + ok, err := l.p2p.ListenersLocal.Deregister(l.laddr.String()) if err != nil { return err } diff --git a/p2p/local_listener.go b/p2p/local_listener.go new file mode 100644 index 000000000..88c76ce06 --- /dev/null +++ b/p2p/local_listener.go @@ -0,0 +1,113 @@ +package p2p + +import ( + "errors" + "sync" + + net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net" + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" + "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host" +) + +// ListenerLocal listens for connections and proxies them to a target +type ListenerLocal interface { + Protocol() protocol.ID + ListenAddress() ma.Multiaddr + TargetAddress() ma.Multiaddr + + start() error + + // Close closes the listener. Does not affect child streams + Close() error +} + +// ListenersLocal is a collection of local application proto listeners. +type ListenersLocal struct { + sync.RWMutex + + Listeners map[string]ListenerLocal + starting map[string]struct{} +} + +func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenersLocal { + reg := &ListenersLocal{ + Listeners: map[string]ListenerLocal{}, + starting: map[string]struct{}{}, + } + + addr, err := ma.NewMultiaddr(maPrefix + id.Pretty()) + if err != nil { + panic(err) + } + + host.SetStreamHandlerMatch("/x/", func(p string) bool { + reg.RLock() + defer reg.RUnlock() + + for _, l := range reg.Listeners { + if l.ListenAddress().Equal(addr) && string(l.Protocol()) == p { + return true + } + } + + return false + }, func(stream net.Stream) { + reg.RLock() + defer reg.RUnlock() + + for _, l := range reg.Listeners { + if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() { + go l.(*remoteListener).handleStream(stream) + return + } + } + }) + + return reg +} + +// Register registers listenerInfo into this registry and starts it +func (r *ListenersLocal) Register(l ListenerLocal) error { + r.Lock() + k := l.ListenAddress().String() + + if _, ok := r.Listeners[k]; ok { + r.Unlock() + return errors.New("listener already registered") + } + + r.Listeners[k] = l + r.starting[k] = struct{}{} + + r.Unlock() + + err := l.start() + + r.Lock() + defer r.Unlock() + + delete(r.starting, k) + + if err != nil { + delete(r.Listeners, k) + return err + } + + return nil +} + +// Deregister removes p2p listener from this registry +func (r *ListenersLocal) Deregister(k string) (bool, error) { + r.Lock() + defer r.Unlock() + + if _, ok := r.starting[k]; ok { + return false, errors.New("listener didn't start yet") + } + + _, ok := r.Listeners[k] + delete(r.Listeners, k) + return ok, nil +} diff --git a/p2p/p2p.go b/p2p/p2p.go index 647a65509..7c69d1c26 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -11,8 +11,9 @@ var log = logging.Logger("p2p-mount") // P2P structure holds information on currently running streams/listeners type P2P struct { - Listeners *ListenerRegistry - Streams *StreamRegistry + ListenersLocal *ListenersLocal + ListenersP2P *ListenersP2P + Streams *StreamRegistry identity peer.ID peerHost p2phost.Host @@ -26,7 +27,9 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) peerHost: peerHost, peerstore: peerstore, - Listeners: newListenerRegistry(identity, peerHost), + ListenersLocal: newListenerRegistry(identity, peerHost), + ListenersP2P: newListenerP2PRegistry(identity, peerHost), + Streams: &StreamRegistry{ Streams: map[uint64]*Stream{}, }, diff --git a/p2p/listener.go b/p2p/p2p_listener.go similarity index 75% rename from p2p/listener.go rename to p2p/p2p_listener.go index c33486180..7b1d96b84 100644 --- a/p2p/listener.go +++ b/p2p/p2p_listener.go @@ -12,7 +12,7 @@ import ( ) // Listener listens for connections and proxies them to a target -type Listener interface { +type P2PListener interface { Protocol() protocol.ID ListenAddress() ma.Multiaddr TargetAddress() ma.Multiaddr @@ -23,23 +23,18 @@ type Listener interface { Close() error } -type listenerKey struct { - proto string - listen string -} - // ListenerRegistry is a collection of local application proto listeners. -type ListenerRegistry struct { +type ListenersP2P struct { sync.RWMutex - Listeners map[listenerKey]Listener - starting map[listenerKey]struct{} + Listeners map[protocol.ID]ListenerLocal + starting map[protocol.ID]struct{} } -func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenerRegistry { - reg := &ListenerRegistry{ - Listeners: map[listenerKey]Listener{}, - starting: map[listenerKey]struct{}{}, +func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P { + reg := &ListenersP2P{ + Listeners: map[protocol.ID]ListenerLocal{}, + starting: map[protocol.ID]struct{}{}, } addr, err := ma.NewMultiaddr(maPrefix + id.Pretty()) @@ -74,10 +69,10 @@ func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenerRegistry { } // Register registers listenerInfo into this registry and starts it -func (r *ListenerRegistry) Register(l Listener) error { +func (r *ListenersP2P) Register(l ListenerLocal) error { r.Lock() - k := getListenerKey(l) + k := l.Protocol() if _, ok := r.Listeners[k]; ok { r.Unlock() return errors.New("listener already registered") @@ -104,7 +99,7 @@ func (r *ListenerRegistry) Register(l Listener) error { } // Deregister removes p2p listener from this registry -func (r *ListenerRegistry) Deregister(k listenerKey) (bool, error) { +func (r *ListenersP2P) Deregister(k protocol.ID) (bool, error) { r.Lock() defer r.Unlock() @@ -116,10 +111,3 @@ func (r *ListenerRegistry) Deregister(k listenerKey) (bool, error) { delete(r.Listeners, k) return ok, nil } - -func getListenerKey(l Listener) listenerKey { - return listenerKey{ - proto: string(l.Protocol()), - listen: l.ListenAddress().String(), - } -} diff --git a/p2p/remote.go b/p2p/remote.go index 65f503df7..26b401ba0 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -23,7 +23,7 @@ type remoteListener struct { } // ForwardRemote creates new p2p listener -func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (Listener, error) { +func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (P2PListener, error) { listener := &remoteListener{ p2p: p2p, @@ -31,7 +31,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu addr: addr, } - if err := p2p.Listeners.Register(listener); err != nil { + if err := p2p.ListenersP2P.Register(listener); err != nil { return nil, err } @@ -97,7 +97,7 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr { } func (l *remoteListener) Close() error { - ok, err := l.p2p.Listeners.Deregister(getListenerKey(l)) + ok, err := l.p2p.ListenersP2P.Deregister(l.proto) if err != nil { return err }