From 0f9d28442159e4d4b6645708d1a0fc36f47e7a0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 3 Sep 2018 11:41:14 +0200 Subject: [PATCH] p2p: use host.SetStreamHandlerMatch for now MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- p2p/listener.go | 39 ++++++++++++++++++++++++- p2p/p2p.go | 5 +--- p2p/remote.go | 77 ++++++++++++++++++++++++------------------------- 3 files changed, 77 insertions(+), 44 deletions(-) diff --git a/p2p/listener.go b/p2p/listener.go index c78a6d615..13e70207a 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -4,8 +4,11 @@ import ( "errors" "sync" + net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host" + peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer" ) // Listener listens for connections and proxies them to a target @@ -28,12 +31,46 @@ type listenerKey struct { // ListenerRegistry is a collection of local application proto listeners. type ListenerRegistry struct { - sync.Mutex + sync.RWMutex Listeners map[listenerKey]Listener starting map[listenerKey]struct{} } +func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenerRegistry { + reg := &ListenerRegistry{ + Listeners: map[listenerKey]Listener{}, + starting: map[listenerKey]struct{}{}, + } + + addr, err := ma.NewMultiaddr(maPrefix + id.Pretty()) + if err != nil { + panic(err) + } + + host.SetStreamHandlerMatch("/x/", func(p string) bool { + reg.RLock() + defer reg.RUnlock() + for _, l := range reg.Listeners { + if l.ListenAddress().Equal(addr) && string(l.Protocol()) == p { + return true + } + } + + return false + }, func(stream net.Stream) { + for _, l := range reg.Listeners { + if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() { + l.(*remoteListener).handleStream(stream) + } + } + + // panic? + }) + + return reg +} + // Register registers listenerInfo into this registry and starts it func (r *ListenerRegistry) Register(l Listener) error { r.Lock() diff --git a/p2p/p2p.go b/p2p/p2p.go index 737bf1434..a7c0b294e 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -26,10 +26,7 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) peerHost: peerHost, peerstore: peerstore, - Listeners: &ListenerRegistry{ - Listeners: map[listenerKey]Listener{}, - starting: map[listenerKey]struct{}{}, - }, + Listeners: newListenerRegistry(identity, peerHost), Streams: &StreamRegistry{ Streams: map[uint64]*Stream{}, }, diff --git a/p2p/remote.go b/p2p/remote.go index 53c25ddf4..d713b22eb 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -39,48 +39,47 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu } func (l *remoteListener) start() error { - // TODO: handle errors when https://github.com/libp2p/go-libp2p-host/issues/16 will be done - l.p2p.peerHost.SetStreamHandler(l.proto, func(remote net.Stream) { - local, err := manet.Dial(l.addr) - if err != nil { - remote.Reset() - return - } - - peer := remote.Conn().RemotePeer() - - peerMa, err := ma.NewMultiaddr(maPrefix + peer.Pretty()) - if err != nil { - remote.Reset() - return - } - - cmgr := l.p2p.peerHost.ConnManager() - cmgr.TagPeer(peer, CMGR_TAG, 20) - - stream := &Stream{ - Protocol: l.proto, - - OriginAddr: peerMa, - TargetAddr: l.addr, - - Local: local, - Remote: remote, - - Registry: l.p2p.Streams, - - cleanup: func() { - cmgr.UntagPeer(peer, CMGR_TAG) - }, - } - - l.p2p.Streams.Register(stream) - stream.startStreaming() - }) - return nil } +func (l *remoteListener) handleStream(remote net.Stream) { + local, err := manet.Dial(l.addr) + if err != nil { + remote.Reset() + return + } + + peer := remote.Conn().RemotePeer() + + peerMa, err := ma.NewMultiaddr(maPrefix + peer.Pretty()) + if err != nil { + remote.Reset() + return + } + + cmgr := l.p2p.peerHost.ConnManager() + cmgr.TagPeer(peer, CMGR_TAG, 20) + + stream := &Stream{ + Protocol: l.proto, + + OriginAddr: peerMa, + TargetAddr: l.addr, + + Local: local, + Remote: remote, + + Registry: l.p2p.Streams, + + cleanup: func() { + cmgr.UntagPeer(peer, CMGR_TAG) + }, + } + + l.p2p.Streams.Register(stream) + stream.startStreaming() +} + func (l *remoteListener) Protocol() protocol.ID { return l.proto }