p2p: separate listener types

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2018-09-05 23:06:17 +02:00
parent a5e5b5be66
commit dd48b8237a
6 changed files with 162 additions and 40 deletions

View File

@ -230,15 +230,25 @@ var p2pLsCmd = &cmds.Command{
output := &P2PLsOutput{}
n.P2P.Listeners.Lock()
for _, listener := range n.P2P.Listeners.Listeners {
n.P2P.ListenersLocal.Lock()
for _, listener := range n.P2P.ListenersLocal.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: string(listener.Protocol()),
ListenAddress: listener.ListenAddress().String(),
TargetAddress: listener.TargetAddress().String(),
})
}
n.P2P.Listeners.Unlock()
n.P2P.ListenersLocal.Unlock()
n.P2P.ListenersP2P.Lock()
for _, listener := range n.P2P.ListenersP2P.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: string(listener.Protocol()),
ListenAddress: listener.ListenAddress().String(),
TargetAddress: listener.TargetAddress().String(),
})
}
n.P2P.ListenersP2P.Unlock()
res.SetOutput(output)
},
@ -314,7 +324,7 @@ var p2pCloseCmd = &cmds.Command{
return
}
match := func(listener p2p.Listener) bool {
match := func(listener p2p.ListenerLocal) bool {
if closeAll {
return true
}
@ -330,15 +340,23 @@ var p2pCloseCmd = &cmds.Command{
return true
}
todo := make([]p2p.Listener, 0)
n.P2P.Listeners.Lock()
for _, l := range n.P2P.Listeners.Listeners {
todo := make([]p2p.ListenerLocal, 0)
n.P2P.ListenersLocal.Lock()
for _, l := range n.P2P.ListenersLocal.Listeners {
if !match(l) {
continue
}
todo = append(todo, l)
}
n.P2P.Listeners.Unlock()
n.P2P.ListenersLocal.Unlock()
n.P2P.ListenersP2P.Lock()
for _, l := range n.P2P.ListenersP2P.Listeners {
if !match(l) {
continue
}
todo = append(todo, l)
}
n.P2P.ListenersP2P.Unlock()
var errs []string
for _, l := range todo {

View File

@ -27,7 +27,7 @@ type localListener struct {
}
// ForwardLocal creates new P2P stream to a remote listener
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) {
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (ListenerLocal, error) {
listener := &localListener{
ctx: ctx,
@ -39,7 +39,7 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I
peer: peer,
}
if err := p2p.Listeners.Register(listener); err != nil {
if err := p2p.ListenersLocal.Register(listener); err != nil {
return nil, err
}
@ -111,7 +111,7 @@ func (l *localListener) start() error {
}
func (l *localListener) Close() error {
ok, err := l.p2p.Listeners.Deregister(getListenerKey(l))
ok, err := l.p2p.ListenersLocal.Deregister(l.laddr.String())
if err != nil {
return err
}

113
p2p/local_listener.go Normal file
View File

@ -0,0 +1,113 @@
package p2p
import (
"errors"
"sync"
net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host"
)
// ListenerLocal listens for connections and proxies them to a target
type ListenerLocal interface {
Protocol() protocol.ID
ListenAddress() ma.Multiaddr
TargetAddress() ma.Multiaddr
start() error
// Close closes the listener. Does not affect child streams
Close() error
}
// ListenersLocal is a collection of local application proto listeners.
type ListenersLocal struct {
sync.RWMutex
Listeners map[string]ListenerLocal
starting map[string]struct{}
}
func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenersLocal {
reg := &ListenersLocal{
Listeners: map[string]ListenerLocal{},
starting: map[string]struct{}{},
}
addr, err := ma.NewMultiaddr(maPrefix + id.Pretty())
if err != nil {
panic(err)
}
host.SetStreamHandlerMatch("/x/", func(p string) bool {
reg.RLock()
defer reg.RUnlock()
for _, l := range reg.Listeners {
if l.ListenAddress().Equal(addr) && string(l.Protocol()) == p {
return true
}
}
return false
}, func(stream net.Stream) {
reg.RLock()
defer reg.RUnlock()
for _, l := range reg.Listeners {
if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() {
go l.(*remoteListener).handleStream(stream)
return
}
}
})
return reg
}
// Register registers listenerInfo into this registry and starts it
func (r *ListenersLocal) Register(l ListenerLocal) error {
r.Lock()
k := l.ListenAddress().String()
if _, ok := r.Listeners[k]; ok {
r.Unlock()
return errors.New("listener already registered")
}
r.Listeners[k] = l
r.starting[k] = struct{}{}
r.Unlock()
err := l.start()
r.Lock()
defer r.Unlock()
delete(r.starting, k)
if err != nil {
delete(r.Listeners, k)
return err
}
return nil
}
// Deregister removes p2p listener from this registry
func (r *ListenersLocal) Deregister(k string) (bool, error) {
r.Lock()
defer r.Unlock()
if _, ok := r.starting[k]; ok {
return false, errors.New("listener didn't start yet")
}
_, ok := r.Listeners[k]
delete(r.Listeners, k)
return ok, nil
}

View File

@ -11,8 +11,9 @@ var log = logging.Logger("p2p-mount")
// P2P structure holds information on currently running streams/listeners
type P2P struct {
Listeners *ListenerRegistry
Streams *StreamRegistry
ListenersLocal *ListenersLocal
ListenersP2P *ListenersP2P
Streams *StreamRegistry
identity peer.ID
peerHost p2phost.Host
@ -26,7 +27,9 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
peerHost: peerHost,
peerstore: peerstore,
Listeners: newListenerRegistry(identity, peerHost),
ListenersLocal: newListenerRegistry(identity, peerHost),
ListenersP2P: newListenerP2PRegistry(identity, peerHost),
Streams: &StreamRegistry{
Streams: map[uint64]*Stream{},
},

View File

@ -12,7 +12,7 @@ import (
)
// Listener listens for connections and proxies them to a target
type Listener interface {
type P2PListener interface {
Protocol() protocol.ID
ListenAddress() ma.Multiaddr
TargetAddress() ma.Multiaddr
@ -23,23 +23,18 @@ type Listener interface {
Close() error
}
type listenerKey struct {
proto string
listen string
}
// ListenerRegistry is a collection of local application proto listeners.
type ListenerRegistry struct {
type ListenersP2P struct {
sync.RWMutex
Listeners map[listenerKey]Listener
starting map[listenerKey]struct{}
Listeners map[protocol.ID]ListenerLocal
starting map[protocol.ID]struct{}
}
func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenerRegistry {
reg := &ListenerRegistry{
Listeners: map[listenerKey]Listener{},
starting: map[listenerKey]struct{}{},
func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P {
reg := &ListenersP2P{
Listeners: map[protocol.ID]ListenerLocal{},
starting: map[protocol.ID]struct{}{},
}
addr, err := ma.NewMultiaddr(maPrefix + id.Pretty())
@ -74,10 +69,10 @@ func newListenerRegistry(id peer.ID, host p2phost.Host) *ListenerRegistry {
}
// Register registers listenerInfo into this registry and starts it
func (r *ListenerRegistry) Register(l Listener) error {
func (r *ListenersP2P) Register(l ListenerLocal) error {
r.Lock()
k := getListenerKey(l)
k := l.Protocol()
if _, ok := r.Listeners[k]; ok {
r.Unlock()
return errors.New("listener already registered")
@ -104,7 +99,7 @@ func (r *ListenerRegistry) Register(l Listener) error {
}
// Deregister removes p2p listener from this registry
func (r *ListenerRegistry) Deregister(k listenerKey) (bool, error) {
func (r *ListenersP2P) Deregister(k protocol.ID) (bool, error) {
r.Lock()
defer r.Unlock()
@ -116,10 +111,3 @@ func (r *ListenerRegistry) Deregister(k listenerKey) (bool, error) {
delete(r.Listeners, k)
return ok, nil
}
func getListenerKey(l Listener) listenerKey {
return listenerKey{
proto: string(l.Protocol()),
listen: l.ListenAddress().String(),
}
}

View File

@ -23,7 +23,7 @@ type remoteListener struct {
}
// ForwardRemote creates new p2p listener
func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (Listener, error) {
func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (P2PListener, error) {
listener := &remoteListener{
p2p: p2p,
@ -31,7 +31,7 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu
addr: addr,
}
if err := p2p.Listeners.Register(listener); err != nil {
if err := p2p.ListenersP2P.Register(listener); err != nil {
return nil, err
}
@ -97,7 +97,7 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr {
}
func (l *remoteListener) Close() error {
ok, err := l.p2p.Listeners.Deregister(getListenerKey(l))
ok, err := l.p2p.ListenersP2P.Deregister(l.proto)
if err != nil {
return err
}