diff --git a/p2p/listener.go b/p2p/listener.go index aad3aab53..c78a6d615 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -31,26 +31,33 @@ type ListenerRegistry struct { sync.Mutex Listeners map[listenerKey]Listener + starting map[listenerKey]struct{} } // Register registers listenerInfo into this registry and starts it func (r *ListenerRegistry) Register(l Listener) error { r.Lock() + k := getListenerKey(l) - if _, ok := r.Listeners[getListenerKey(l)]; ok { + if _, ok := r.Listeners[k]; ok { r.Unlock() return errors.New("listener already registered") } - r.Listeners[getListenerKey(l)] = l + r.Listeners[k] = l + r.starting[k] = struct{}{} r.Unlock() - if err := l.start(); err != nil { - r.Lock() - defer r.Lock() + err := l.start() - delete(r.Listeners, getListenerKey(l)) + r.Lock() + defer r.Unlock() + + delete(r.starting, k) + + if err != nil { + delete(r.Listeners, k) return err } @@ -58,13 +65,17 @@ func (r *ListenerRegistry) Register(l Listener) error { } // Deregister removes p2p listener from this registry -func (r *ListenerRegistry) Deregister(k listenerKey) bool { +func (r *ListenerRegistry) Deregister(k listenerKey) (bool, error) { r.Lock() defer r.Unlock() + if _, ok := r.starting[k]; ok { + return false, errors.New("listener didn't start yet") + } + _, ok := r.Listeners[k] delete(r.Listeners, k) - return ok + return ok, nil } func getListenerKey(l Listener) listenerKey { diff --git a/p2p/local.go b/p2p/local.go index 9ab34a265..4d6c8a894 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "errors" "time" "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" @@ -105,11 +104,11 @@ func (l *localListener) start() error { } func (l *localListener) Close() error { - if l.listener == nil { - return errors.New("uninitialized") + ok, err := l.p2p.Listeners.Deregister(getListenerKey(l)) + if err != nil { + return err } - - if l.p2p.Listeners.Deregister(getListenerKey(l)) { + if ok { l.listener.Close() l.listener = nil } diff --git a/p2p/p2p.go b/p2p/p2p.go index 40d82c375..8228e97c8 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -28,6 +28,7 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) Listeners: &ListenerRegistry{ Listeners: map[listenerKey]Listener{}, + starting: map[listenerKey]struct{}{}, }, Streams: &StreamRegistry{ Streams: map[uint64]*Stream{}, diff --git a/p2p/remote.go b/p2p/remote.go index cd6bc1f0c..23161bedc 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "errors" manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" @@ -21,8 +20,6 @@ type remoteListener struct { // Address to proxy the incoming connections to addr ma.Multiaddr - - initialized bool } // ForwardRemote creates new p2p listener @@ -72,7 +69,6 @@ func (l *remoteListener) start() error { stream.startStreaming() }) - l.initialized = true return nil } @@ -93,13 +89,12 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr { } func (l *remoteListener) Close() error { - if !l.initialized { - return errors.New("uninitialized") + ok, err := l.p2p.Listeners.Deregister(getListenerKey(l)) + if err != nil { + return err } - - if l.p2p.Listeners.Deregister(getListenerKey(l)) { + if ok { l.p2p.peerHost.RemoveStreamHandler(l.proto) - l.initialized = false } return nil }