p2p: simplify listener startup

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2018-09-06 11:42:04 +02:00
parent 80b89405ed
commit f5cb640d25
5 changed files with 16 additions and 51 deletions

View File

@ -5,7 +5,6 @@ import (
"sync"
net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
p2phost "gx/ipfs/QmfH9FKYv3Jp1xiyL8sPchGBUBg6JA6XviwajAo3qgnT3B/go-libp2p-host"
@ -17,7 +16,6 @@ type Listener interface {
ListenAddress() ma.Multiaddr
TargetAddress() ma.Multiaddr
start() error
key() string
// Close closes the listener. Does not affect child streams
@ -30,20 +28,17 @@ type Listeners struct {
sync.RWMutex
Listeners map[string]Listener
starting map[string]struct{}
}
func newListenersLocal(id peer.ID) *Listeners {
func newListenersLocal() *Listeners {
return &Listeners{
Listeners: map[string]Listener{},
starting: map[string]struct{}{},
}
}
func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners {
func newListenersP2P(host p2phost.Host) *Listeners {
reg := &Listeners{
Listeners: map[string]Listener{},
starting: map[string]struct{}{},
}
host.SetStreamHandlerMatch("/x/", func(p string) bool {
@ -68,30 +63,13 @@ func newListenersP2P(id peer.ID, host p2phost.Host) *Listeners {
// Register registers listenerInfo into this registry and starts it
func (r *Listeners) Register(l Listener) error {
r.Lock()
k := l.key()
defer r.Unlock()
if _, ok := r.Listeners[k]; ok {
r.Unlock()
if _, ok := r.Listeners[l.key()]; ok {
return errors.New("listener already registered")
}
r.Listeners[k] = l
r.starting[k] = struct{}{}
r.Unlock()
err := l.start()
r.Lock()
defer r.Unlock()
delete(r.starting, k)
if err != nil {
delete(r.Listeners, k)
return err
}
r.Listeners[l.key()] = l
return nil
}
@ -100,10 +78,6 @@ func (r *Listeners) Deregister(k string) (bool, error) {
r.Lock()
defer r.Unlock()
if _, ok := r.starting[k]; ok {
return false, errors.New("listener didn't start yet")
}
_, ok := r.Listeners[k]
delete(r.Listeners, k)
return ok, nil

View File

@ -17,7 +17,6 @@ type localListener struct {
ctx context.Context
p2p *P2P
id peer.ID
proto protocol.ID
laddr ma.Multiaddr
@ -32,13 +31,19 @@ func (p2p *P2P) ForwardLocal(ctx context.Context, peer peer.ID, proto protocol.I
ctx: ctx,
p2p: p2p,
id: p2p.identity,
proto: proto,
laddr: bindAddr,
peer: peer,
}
maListener, err := manet.Listen(listener.laddr)
if err != nil {
return nil, err
}
listener.listener = maListener
if err := p2p.ListenersLocal.Register(listener); err != nil {
return nil, err
}
@ -91,17 +96,6 @@ func (l *localListener) setupStream(local manet.Conn) {
}
l.p2p.Streams.Register(stream)
stream.startStreaming()
}
func (l *localListener) start() error {
maListener, err := manet.Listen(l.laddr)
if err != nil {
return err
}
l.listener = maListener
return nil
}
func (l *localListener) Close() error {

View File

@ -27,8 +27,8 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore)
peerHost: peerHost,
peerstore: peerstore,
ListenersLocal: newListenersLocal(identity),
ListenersP2P: newListenersP2P(identity, peerHost),
ListenersLocal: newListenersLocal(),
ListenersP2P: newListenersP2P(peerHost),
Streams: &StreamRegistry{
Streams: map[uint64]*Stream{},

View File

@ -38,10 +38,6 @@ func (p2p *P2P) ForwardRemote(ctx context.Context, proto protocol.ID, addr ma.Mu
return listener, nil
}
func (l *remoteListener) start() error {
return nil
}
func (l *remoteListener) handleStream(remote net.Stream) {
local, err := manet.Dial(l.addr)
if err != nil {
@ -71,7 +67,6 @@ func (l *remoteListener) handleStream(remote net.Stream) {
}
l.p2p.Streams.Register(stream)
stream.startStreaming()
}
func (l *remoteListener) Protocol() protocol.ID {

View File

@ -84,6 +84,8 @@ func (r *StreamRegistry) Register(streamInfo *Stream) {
streamInfo.id = r.nextID
r.Streams[r.nextID] = streamInfo
r.nextID++
streamInfo.startStreaming()
}
// Deregister deregisters stream from the registry