p2p: Close on Listeners

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2018-09-12 23:27:15 +02:00
parent 0b575bc0c5
commit b53936a706
5 changed files with 30 additions and 55 deletions

View File

@ -340,36 +340,10 @@ var p2pCloseCmd = &cmds.Command{
return true
}
todo := make([]p2p.Listener, 0)
n.P2P.ListenersLocal.Lock()
for _, l := range n.P2P.ListenersLocal.Listeners {
if !match(l) {
continue
}
todo = append(todo, l)
}
n.P2P.ListenersLocal.Unlock()
n.P2P.ListenersP2P.Lock()
for _, l := range n.P2P.ListenersP2P.Listeners {
if !match(l) {
continue
}
todo = append(todo, l)
}
n.P2P.ListenersP2P.Unlock()
done := n.P2P.ListenersLocal.Close(match)
done += n.P2P.ListenersP2P.Close(match)
var errs []string
for _, l := range todo {
if err := l.Close(); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) != 0 {
res.SetError(fmt.Errorf("errors when closing streams: %s", strings.Join(errs, "; ")), cmdkit.ErrNormal)
return
}
res.SetOutput(len(todo))
res.SetOutput(done)
},
Type: int(0),
Marshalers: cmds.MarshalerMap{

View File

@ -18,8 +18,8 @@ type Listener interface {
key() string
// Close closes the listener. Does not affect child streams
Close() error
// close closes the listener. Does not affect child streams
close()
}
// Listeners manages a group of Listener implementations,
@ -73,12 +73,24 @@ func (r *Listeners) Register(l Listener) error {
return nil
}
// Deregister removes p2p listener from this registry
func (r *Listeners) Deregister(k string) (bool, error) {
func (r *Listeners) Close(matchFunc func(listener Listener) bool) int {
todo := make([]Listener, 0)
r.Lock()
defer r.Unlock()
for _, l := range r.Listeners {
if !matchFunc(l) {
continue
}
_, ok := r.Listeners[k]
delete(r.Listeners, k)
return ok, nil
if _, ok := r.Listeners[l.key()]; ok {
delete(r.Listeners, l.key())
todo = append(todo, l)
}
}
r.Unlock()
for _, l := range todo {
l.close()
}
return len(todo)
}

View File

@ -98,15 +98,8 @@ func (l *localListener) setupStream(local manet.Conn) {
l.p2p.Streams.Register(stream)
}
func (l *localListener) Close() error {
ok, err := l.p2p.ListenersLocal.Deregister(l.laddr.String())
if err != nil {
return err
}
if ok {
return l.listener.Close()
}
return nil
func (l *localListener) close() {
l.listener.Close()
}
func (l *localListener) Protocol() protocol.ID {

View File

@ -85,13 +85,7 @@ func (l *remoteListener) TargetAddress() ma.Multiaddr {
return l.addr
}
func (l *remoteListener) Close() error {
_, err := l.p2p.ListenersP2P.Deregister(string(l.proto))
if err != nil {
return err
}
return nil
}
func (l *remoteListener) close() {}
func (l *remoteListener) key() string {
return string(l.proto)

View File

@ -285,11 +285,13 @@ test_expect_success 'S->C Setup client side (custom proto)' '
test_server_to_client
test_expect_success 'C->S Close local listener' '
ipfsi 0 p2p close -p /p2p-test
ipfsi 0 p2p ls > actual &&
ipfsi 1 p2p close -p /p2p-test
ipfsi 1 p2p ls > actual &&
test_must_be_empty actual
'
check_test_ports
test_expect_success 'stop iptb' '
iptb stop
'