p2p: deduplicate some listeners logic

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2018-09-06 11:07:28 +02:00
parent c5090508a8
commit 228a71aef2
6 changed files with 39 additions and 111 deletions

View File

@ -324,7 +324,7 @@ var p2pCloseCmd = &cmds.Command{
return
}
match := func(listener p2p.ListenerLocal) bool {
match := func(listener p2p.Listener) bool {
if closeAll {
return true
}
@ -340,7 +340,7 @@ var p2pCloseCmd = &cmds.Command{
return true
}
todo := make([]p2p.ListenerLocal, 0)
todo := make([]p2p.Listener, 0)
n.P2P.ListenersLocal.Lock()
for _, l := range n.P2P.ListenersLocal.Listeners {
if !match(l) {
@ -389,7 +389,7 @@ var p2pCloseCmd = &cmds.Command{
}
///////
// Listener
// Stream
//
// p2pStreamCmd is the 'ipfs p2p stream' command

View File

@ -12,30 +12,36 @@ import (
)
// Listener listens for connections and proxies them to a target
type ListenerP2P interface {
type Listener interface {
Protocol() protocol.ID
ListenAddress() ma.Multiaddr
TargetAddress() ma.Multiaddr
start() error
handleStream(remote net.Stream)
key() string
// Close closes the listener. Does not affect child streams
Close() error
}
// ListenerRegistry is a collection of local application proto listeners.
type ListenersP2P struct {
type Listeners struct {
sync.RWMutex
Listeners map[protocol.ID]ListenerP2P
starting map[protocol.ID]struct{}
Listeners map[string]Listener
starting map[string]struct{}
}
func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P {
reg := &ListenersP2P{
Listeners: map[protocol.ID]ListenerP2P{},
starting: map[protocol.ID]struct{}{},
func newListenersLocal(id peer.ID) *Listeners {
return &Listeners{
Listeners: map[string]Listener{},
starting: map[string]struct{}{},
}
}
func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners {
reg := &Listeners{
Listeners: map[string]Listener{},
starting: map[string]struct{}{},
}
addr, err := ma.NewMultiaddr(maPrefix + id.Pretty())
@ -60,7 +66,7 @@ func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P {
for _, l := range reg.Listeners {
if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() {
go l.handleStream(stream)
go l.(*remoteListener).handleStream(stream)
return
}
}
@ -70,10 +76,10 @@ func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P {
}
// Register registers listenerInfo into this registry and starts it
func (r *ListenersP2P) Register(l ListenerP2P) error {
func (r *Listeners) Register(l Listener) error {
r.Lock()
k := l.key()
k := l.Protocol()
if _, ok := r.Listeners[k]; ok {
r.Unlock()
return errors.New("listener already registered")
@ -100,7 +106,7 @@ func (r *ListenersP2P) Register(l ListenerP2P) error {
}
// Deregister removes p2p listener from this registry
func (r *ListenersP2P) Deregister(k protocol.ID) (bool, error) {
func (r *Listeners) Deregister(k string) (bool, error) {
r.Lock()
defer r.Unlock()

View File

@ -27,7 +27,7 @@ type localListener struct {
}
// ForwardLocal creates new P2P stream to a remote listener
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (ListenerLocal, error) {
func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) {
listener := &localListener{
ctx: ctx,
@ -130,3 +130,7 @@ func (l *localListener) TargetAddress() ma.Multiaddr {
}
return addr
}
func (l *localListener) key() string {
return l.ListenAddress().String()
}

View File

@ -1,83 +0,0 @@
package p2p
import (
"errors"
"sync"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
)
// ListenerLocal listens for connections and proxies them to a target
type ListenerLocal interface {
Protocol() protocol.ID
ListenAddress() ma.Multiaddr
TargetAddress() ma.Multiaddr
start() error
// Close closes the listener. Does not affect child streams
Close() error
}
// ListenersLocal is a collection of local application proto listeners.
type ListenersLocal struct {
sync.RWMutex
Listeners map[string]ListenerLocal
starting map[string]struct{}
}
func newListenerRegistry(id peer.ID) *ListenersLocal {
reg := &ListenersLocal{
Listeners: map[string]ListenerLocal{},
starting: map[string]struct{}{},
}
return reg
}
// Register registers listenerInfo into this registry and starts it
func (r *ListenersLocal) Register(l ListenerLocal) error {
r.Lock()
k := l.ListenAddress().String()
if _, ok := r.Listeners[k]; ok {
r.Unlock()
return errors.New("listener already registered")
}
r.Listeners[k] = l
r.starting[k] = struct{}{}
r.Unlock()
err := l.start()
r.Lock()
defer r.Unlock()
delete(r.starting, k)
if err != nil {
delete(r.Listeners, k)
return err
}
return nil
}
// Deregister removes p2p listener from this registry
func (r *ListenersLocal) Deregister(k string) (bool, error) {
r.Lock()
defer r.Unlock()
if _, ok := r.starting[k]; ok {
return false, errors.New("listener didn't start yet")
}
_, ok := r.Listeners[k]
delete(r.Listeners, k)
return ok, nil
}

View File

@ -9,10 +9,10 @@ import (
var log = logging.Logger("p2p-mount")
// P2P structure holds information on currently running streams/listeners
// P2P structure holds information on currently running streams/Listeners
type P2P struct {
ListenersLocal *ListenersLocal
ListenersP2P *ListenersP2P
ListenersLocal *Listeners
ListenersP2P *Listeners
Streams *StreamRegistry
identity peer.ID
@ -27,8 +27,8 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
peerHost: peerHost,
peerstore: peerstore,
ListenersLocal: newListenerRegistry(identity),
ListenersP2P: newListenerP2PRegistry(identity, peerHost),
ListenersLocal: newListenersLocal(identity),
ListenersP2P: newListenersP2P(identity, peerHost),
Streams: &StreamRegistry{
Streams: map[uint64]*Stream{},

View File

@ -23,7 +23,7 @@ type remoteListener struct {
}
// ForwardRemote creates new p2p listener
func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (ListenerP2P, error) {
func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (Listener, error) {
listener := &remoteListener{
p2p: p2p,
@ -91,12 +91,13 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr {
}
func (l *remoteListener) Close() error {
ok, err := l.p2p.ListenersP2P.Deregister(l.proto)
_, err := l.p2p.ListenersP2P.Deregister(string(l.proto))
if err != nil {
return err
}
if ok {
l.p2p.peerHost.RemoveStreamHandler(l.proto)
}
return nil
}
func (l *remoteListener) key() string {
return string(l.proto)
}