From f5cb640d252f6e432e1843e870cfced45a6e648f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 6 Sep 2018 11:42:04 +0200 Subject: [PATCH] p2p: simplify listener startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- p2p/listener.go | 36 +++++------------------------------- p2p/local.go | 20 +++++++------------- p2p/p2p.go | 4 ++-- p2p/remote.go | 5 ----- p2p/stream.go | 2 ++ 5 files changed, 16 insertions(+), 51 deletions(-) diff --git a/p2p/listener.go b/p2p/listener.go index 59df5dde5..84db8f62a 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -5,7 +5,6 @@ import ( "sync" net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net" - peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host" @@ -17,7 +16,6 @@ type Listener interface { ListenAddress() ma.Multiaddr TargetAddress() ma.Multiaddr - start() error key() string // Close closes the listener. Does not affect child streams @@ -30,20 +28,17 @@ type Listeners struct { sync.RWMutex Listeners map[string]Listener - starting map[string]struct{} } -func newListenersLocal(id peer.ID) *Listeners { +func newListenersLocal() *Listeners { return &Listeners{ Listeners: map[string]Listener{}, - starting: map[string]struct{}{}, } } -func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners { +func newListenersP2P(host p2phost.Host) *Listeners { reg := &Listeners{ Listeners: map[string]Listener{}, - starting: map[string]struct{}{}, } host.SetStreamHandlerMatch("/x/", func(p string) bool { @@ -68,30 +63,13 @@ func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners { // Register registers listenerInfo into this registry and starts it func (r *Listeners) Register(l Listener) error { r.Lock() - k := l.key() + defer r.Unlock() - if _, ok := r.Listeners[k]; ok { - r.Unlock() + if _, ok := r.Listeners[l.key()]; ok { return errors.New("listener already registered") } - r.Listeners[k] = l - r.starting[k] = struct{}{} - - r.Unlock() - - err := l.start() - - r.Lock() - defer r.Unlock() - - delete(r.starting, k) - - if err != nil { - delete(r.Listeners, k) - return err - } - + r.Listeners[l.key()] = l return nil } @@ -100,10 +78,6 @@ func (r *Listeners) Deregister(k string) (bool, error) { r.Lock() defer r.Unlock() - if _, ok := r.starting[k]; ok { - return false, errors.New("listener didn't start yet") - } - _, ok := r.Listeners[k] delete(r.Listeners, k) return ok, nil diff --git a/p2p/local.go b/p2p/local.go index 0ec579a43..010d8099d 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -17,7 +17,6 @@ type localListener struct { ctx context.Context p2p *P2P - id peer.ID proto protocol.ID laddr ma.Multiaddr @@ -32,13 +31,19 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I ctx: ctx, p2p: p2p, - id: p2p.identity, proto: proto, laddr: bindAddr, peer: peer, } + maListener, err := manet.Listen(listener.laddr) + if err != nil { + return nil, err + } + + listener.listener = maListener + if err := p2p.ListenersLocal.Register(listener); err != nil { return nil, err } @@ -91,17 +96,6 @@ func (l *localListener) setupStream(local manet.Conn) { } l.p2p.Streams.Register(stream) - stream.startStreaming() -} - -func (l *localListener) start() error { - maListener, err := manet.Listen(l.laddr) - if err != nil { - return err - } - - l.listener = maListener - return nil } func (l *localListener) Close() error { diff --git a/p2p/p2p.go b/p2p/p2p.go index 031fb076b..b9924ce03 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -27,8 +27,8 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) peerHost: peerHost, peerstore: peerstore, - ListenersLocal: newListenersLocal(identity), - ListenersP2P: newListenersP2P(identity, peerHost), + ListenersLocal: newListenersLocal(), + ListenersP2P: newListenersP2P(peerHost), Streams: &StreamRegistry{ Streams: map[uint64]*Stream{}, diff --git a/p2p/remote.go b/p2p/remote.go index 1b82727b1..13ed044a5 100644 --- a/p2p/remote.go +++ b/p2p/remote.go @@ -38,10 +38,6 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu return listener, nil } -func (l *remoteListener) start() error { - return nil -} - func (l *remoteListener) handleStream(remote net.Stream) { local, err := manet.Dial(l.addr) if err != nil { @@ -71,7 +67,6 @@ func (l *remoteListener) handleStream(remote net.Stream) { } l.p2p.Streams.Register(stream) - stream.startStreaming() } func (l *remoteListener) Protocol() protocol.ID { diff --git a/p2p/stream.go b/p2p/stream.go index 5a496ec39..ba4632ccf 100644 --- a/p2p/stream.go +++ b/p2p/stream.go @@ -84,6 +84,8 @@ func (r *StreamRegistry) Register(streamInfo *Stream) { streamInfo.id = r.nextID r.Streams[r.nextID] = streamInfo r.nextID++ + + streamInfo.startStreaming() } // Deregister deregisters stream from the registry