diff --git a/core/commands/p2p.go b/core/commands/p2p.go index ca2b7d73a..6695709dc 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -324,7 +324,7 @@ var p2pCloseCmd = &cmds.Command{ return } - match := func(listener p2p.ListenerLocal) bool { + match := func(listener p2p.Listener) bool { if closeAll { return true } @@ -340,7 +340,7 @@ var p2pCloseCmd = &cmds.Command{ return true } - todo := make([]p2p.ListenerLocal, 0) + todo := make([]p2p.Listener, 0) n.P2P.ListenersLocal.Lock() for _, l := range n.P2P.ListenersLocal.Listeners { if !match(l) { @@ -389,7 +389,7 @@ var p2pCloseCmd = &cmds.Command{ } /////// -// Listener +// Stream // // p2pStreamCmd is the 'ipfs p2p stream' command diff --git a/p2p/p2p_listener.go b/p2p/listener.go similarity index 75% rename from p2p/p2p_listener.go rename to p2p/listener.go index 7f84c2635..12d9697d3 100644 --- a/p2p/p2p_listener.go +++ b/p2p/listener.go @@ -12,30 +12,36 @@ import ( ) // Listener listens for connections and proxies them to a target -type ListenerP2P interface { +type Listener interface { Protocol() protocol.ID ListenAddress() ma.Multiaddr TargetAddress() ma.Multiaddr start() error - handleStream(remote net.Stream) + key() string // Close closes the listener. Does not affect child streams Close() error } -// ListenerRegistry is a collection of local application proto listeners. -type ListenersP2P struct { +type Listeners struct { sync.RWMutex - Listeners map[protocol.ID]ListenerP2P - starting map[protocol.ID]struct{} + Listeners map[string]Listener + starting map[string]struct{} } -func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P { - reg := &ListenersP2P{ - Listeners: map[protocol.ID]ListenerP2P{}, - starting: map[protocol.ID]struct{}{}, +func newListenersLocal(id peer.ID) *Listeners { + return &Listeners{ + Listeners: map[string]Listener{}, + starting: map[string]struct{}{}, + } +} + +func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners { + reg := &Listeners{ + Listeners: map[string]Listener{}, + starting: map[string]struct{}{}, } addr, err := ma.NewMultiaddr(maPrefix + id.Pretty()) @@ -60,7 +66,7 @@ func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P { for _, l := range reg.Listeners { if l.ListenAddress().Equal(addr) && l.Protocol() == stream.Protocol() { - go l.handleStream(stream) + go l.(*remoteListener).handleStream(stream) return } } @@ -70,10 +76,10 @@ func newListenerP2PRegistry(id peer.ID, host p2phost.Host) *ListenersP2P { } // Register registers listenerInfo into this registry and starts it -func (r *ListenersP2P) Register(l ListenerP2P) error { +func (r *Listeners) Register(l Listener) error { r.Lock() + k := l.key() - k := l.Protocol() if _, ok := r.Listeners[k]; ok { r.Unlock() return errors.New("listener already registered") @@ -100,7 +106,7 @@ func (r *ListenersP2P) Register(l ListenerP2P) error { } // Deregister removes p2p listener from this registry -func (r *ListenersP2P) Deregister(k protocol.ID) (bool, error) { +func (r *Listeners) Deregister(k string) (bool, error) { r.Lock() defer r.Unlock() diff --git a/p2p/local.go b/p2p/local.go index 42740d8ef..0ec579a43 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -27,7 +27,7 @@ type localListener struct { } // ForwardLocal creates new P2P stream to a remote listener -func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (ListenerLocal, error) { +func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.ID, bindAddr ma.Multiaddr) (Listener, error) { listener := &localListener{ ctx: ctx, @@ -130,3 +130,7 @@ func (l *localListener) TargetAddress() ma.Multiaddr { } return addr } + +func (l *localListener) key() string { + return l.ListenAddress().String() +} diff --git a/p2p/local_listener.go b/p2p/local_listener.go deleted file mode 100644 index 64d75f477..000000000 --- a/p2p/local_listener.go +++ /dev/null @@ -1,83 +0,0 @@ -package p2p - -import ( - "errors" - "sync" - - peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" - ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" - "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" -) - -// ListenerLocal listens for connections and proxies them to a target -type ListenerLocal interface { - Protocol() protocol.ID - ListenAddress() ma.Multiaddr - TargetAddress() ma.Multiaddr - - start() error - - // Close closes the listener. Does not affect child streams - Close() error -} - -// ListenersLocal is a collection of local application proto listeners. -type ListenersLocal struct { - sync.RWMutex - - Listeners map[string]ListenerLocal - starting map[string]struct{} -} - -func newListenerRegistry(id peer.ID) *ListenersLocal { - reg := &ListenersLocal{ - Listeners: map[string]ListenerLocal{}, - starting: map[string]struct{}{}, - } - - return reg -} - -// Register registers listenerInfo into this registry and starts it -func (r *ListenersLocal) Register(l ListenerLocal) error { - r.Lock() - k := l.ListenAddress().String() - - if _, ok := r.Listeners[k]; ok { - r.Unlock() - return errors.New("listener already registered") - } - - r.Listeners[k] = l - r.starting[k] = struct{}{} - - r.Unlock() - - err := l.start() - - r.Lock() - defer r.Unlock() - - delete(r.starting, k) - - if err != nil { - delete(r.Listeners, k) - return err - } - - return nil -} - -// Deregister removes p2p listener from this registry -func (r *ListenersLocal) Deregister(k string) (bool, error) { - r.Lock() - defer r.Unlock() - - if _, ok := r.starting[k]; ok { - return false, errors.New("listener didn't start yet") - } - - _, ok := r.Listeners[k] - delete(r.Listeners, k) - return ok, nil -} diff --git a/p2p/p2p.go b/p2p/p2p.go index eb773303e..031fb076b 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -9,10 +9,10 @@ import ( var log = logging.Logger("p2p-mount") -// P2P structure holds information on currently running streams/listeners +// P2P structure holds information on currently running streams/Listeners type P2P struct { - ListenersLocal *ListenersLocal - ListenersP2P *ListenersP2P + ListenersLocal *Listeners + ListenersP2P *Listeners Streams *StreamRegistry identity peer.ID @@ -27,8 +27,8 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) peerHost: peerHost, peerstore: peerstore, - ListenersLocal: newListenerRegistry(identity), - ListenersP2P: newListenerP2PRegistry(identity, peerHost), + ListenersLocal: newListenersLocal(identity), + ListenersP2P: newListenersP2P(identity, peerHost), Streams: &StreamRegistry{ Streams: map[uint64]*Stream{}, diff --git a/p2p/remote.go b/p2p/remote.go index 633497a0c..1b82727b1 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -23,7 +23,7 @@ type remoteListener struct { } // ForwardRemote creates new p2p listener -func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (ListenerP2P, error) { +func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Multiaddr) (Listener, error) { listener := &remoteListener{ p2p: p2p, @@ -91,12 +91,13 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr { } func (l *remoteListener) Close() error { - ok, err := l.p2p.ListenersP2P.Deregister(l.proto) + _, err := l.p2p.ListenersP2P.Deregister(string(l.proto)) if err != nil { return err } - if ok { - l.p2p.peerHost.RemoveStreamHandler(l.proto) - } return nil } + +func (l *remoteListener) key() string { + return string(l.proto) +}