From 8849193de0f0d11c792f01b3d309401f488fd5d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 13 Jul 2018 16:40:11 +0200 Subject: [PATCH] p2p: more locks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/commands/p2p.go | 13 ++++++++++++- p2p/stream.go | 14 +++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/commands/p2p.go b/core/commands/p2p.go index e316639ce..8d01b2607 100644 --- a/core/commands/p2p.go +++ b/core/commands/p2p.go @@ -231,6 +231,7 @@ var p2pLsCmd = &cmds.Command{ output := &P2PLsOutput{} + n.P2P.Listeners.Lock() for _, listener := range n.P2P.Listeners.Listeners { output.Listeners = append(output.Listeners, P2PListenerInfoOutput{ Protocol: string(listener.Protocol()), @@ -238,6 +239,7 @@ var p2pLsCmd = &cmds.Command{ TargetAddress: listener.TargetAddress().String(), }) } + n.P2P.Listeners.Unlock() res.SetOutput(output) }, @@ -402,6 +404,7 @@ var p2pStreamLsCmd = &cmds.Command{ output := &P2PStreamsOutput{} + n.P2P.Streams.Lock() for id, s := range n.P2P.Streams.Streams { output.Streams = append(output.Streams, P2PStreamInfoOutput{ HandlerID: strconv.FormatUint(id, 10), @@ -412,6 +415,7 @@ var p2pStreamLsCmd = &cmds.Command{ TargetAddress: s.TargetAddr.String(), }) } + n.P2P.Streams.Unlock() res.SetOutput(output) }, @@ -476,15 +480,22 @@ var p2pStreamCloseCmd = &cmds.Command{ } } + toClose := make([]*p2p.Stream, 0, 1) + n.P2P.Streams.Lock() for id, stream := range n.P2P.Streams.Streams { if !closeAll && handlerID != id { continue } - stream.Reset() + toClose = append(toClose, stream) if !closeAll { break } } + n.P2P.Streams.Unlock() + + for _, s := range toClose { + s.Reset() + } }, } diff --git a/p2p/stream.go b/p2p/stream.go index 626c842f9..26e70ad76 100644 --- a/p2p/stream.go +++ b/p2p/stream.go @@ -63,16 +63,16 @@ func (s *Stream) startStreaming() { // StreamRegistry is a collection of active incoming and outgoing proto app streams. type StreamRegistry struct { - Streams map[uint64]*Stream - lk sync.Mutex + sync.Mutex - nextID uint64 + Streams map[uint64]*Stream + nextID uint64 } // Register registers a stream to the registry func (r *StreamRegistry) Register(streamInfo *Stream) { - r.lk.Lock() - defer r.lk.Unlock() + r.Lock() + defer r.Unlock() streamInfo.id = r.nextID r.Streams[r.nextID] = streamInfo @@ -81,8 +81,8 @@ func (r *StreamRegistry) Register(streamInfo *Stream) { // Deregister deregisters stream from the registry func (r *StreamRegistry) Deregister(streamID uint64) { - r.lk.Lock() - defer r.lk.Unlock() + r.Lock() + defer r.Unlock() delete(r.Streams, streamID) }