p2p: use host.SetStreamHandlerMatch for now

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2018-09-03 11:41:14 +02:00
parent 4badcdc340
commit 0f9d284421
3 changed files with 77 additions and 44 deletions

View File

@ -4,8 +4,11 @@ import (
"errors"
"sync"
net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
)
// Listener listens for connections and proxies them to a target
@ -28,12 +31,46 @@ type listenerKey struct {
// ListenerRegistry is a collection of local application proto listeners.
type ListenerRegistry struct {
sync.Mutex
sync.RWMutex
Listeners map[listenerKey]Listener
starting map[listenerKey]struct{}
}
func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenerRegistry {
reg := &ListenerRegistry{
Listeners: map[listenerKey]Listener{},
starting: map[listenerKey]struct{}{},
}
addr, err := ma.NewMultiaddr(maPrefix + id.Pretty())
if err != nil {
panic(err)
}
host.SetStreamHandlerMatch("/x/", func(p string) bool {
reg.RLock()
defer reg.RUnlock()
for _, l := range reg.Listeners {
if l.ListenAddress().Equal(addr) && string(l.Protocol()) == p {
return true
}
}
return false
}, func(stream net.Stream) {
for _, l := range reg.Listeners {
if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() {
l.(*remoteListener).handleStream(stream)
}
}
// panic?
})
return reg
}
// Register registers listenerInfo into this registry and starts it
func (r *ListenerRegistry) Register(l Listener) error {
r.Lock()

View File

@ -26,10 +26,7 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
peerHost: peerHost,
peerstore: peerstore,
Listeners: &ListenerRegistry{
Listeners: map[listenerKey]Listener{},
starting: map[listenerKey]struct{}{},
},
Listeners: newListenerRegistry(identity, peerHost),
Streams: &StreamRegistry{
Streams: map[uint64]*Stream{},
},

View File

@ -39,48 +39,47 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu
}
func (l *remoteListener) start() error {
// TODO: handle errors when https://github.com/libp2p/go-libp2p-host/issues/16 will be done
l.p2p.peerHost.SetStreamHandler(l.proto, func(remote net.Stream) {
local, err := manet.Dial(l.addr)
if err != nil {
remote.Reset()
return
}
peer := remote.Conn().RemotePeer()
peerMa, err := ma.NewMultiaddr(maPrefix + peer.Pretty())
if err != nil {
remote.Reset()
return
}
cmgr := l.p2p.peerHost.ConnManager()
cmgr.TagPeer(peer, CMGR_TAG, 20)
stream := &Stream{
Protocol: l.proto,
OriginAddr: peerMa,
TargetAddr: l.addr,
Local: local,
Remote: remote,
Registry: l.p2p.Streams,
cleanup: func() {
cmgr.UntagPeer(peer, CMGR_TAG)
},
}
l.p2p.Streams.Register(stream)
stream.startStreaming()
})
return nil
}
func (l *remoteListener) handleStream(remote net.Stream) {
local, err := manet.Dial(l.addr)
if err != nil {
remote.Reset()
return
}
peer := remote.Conn().RemotePeer()
peerMa, err := ma.NewMultiaddr(maPrefix + peer.Pretty())
if err != nil {
remote.Reset()
return
}
cmgr := l.p2p.peerHost.ConnManager()
cmgr.TagPeer(peer, CMGR_TAG, 20)
stream := &Stream{
Protocol: l.proto,
OriginAddr: peerMa,
TargetAddr: l.addr,
Local: local,
Remote: remote,
Registry: l.p2p.Streams,
cleanup: func() {
cmgr.UntagPeer(peer, CMGR_TAG)
},
}
l.p2p.Streams.Register(stream)
stream.startStreaming()
}
func (l *remoteListener) Protocol() protocol.ID {
return l.proto
}