From 8aed79cd9794487294a655b1456c8652b6a31f1b Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 17 Oct 2014 01:47:22 -0700 Subject: [PATCH] fixed data races --- crypto/spipe/pipe.go | 42 ++++++++++++++++------------------- net/conn/conn.go | 52 +++++++++++++++++++++++++++++--------------- 2 files changed, 54 insertions(+), 40 deletions(-) diff --git a/crypto/spipe/pipe.go b/crypto/spipe/pipe.go index 7f9ccc30f..b1c56f1c1 100644 --- a/crypto/spipe/pipe.go +++ b/crypto/spipe/pipe.go @@ -34,36 +34,31 @@ type params struct { // NewSecurePipe constructs a pipe with channels of a given buffer size. func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer, - peers peer.Peerstore) (*SecurePipe, error) { + peers peer.Peerstore, insecure Duplex) (*SecurePipe, error) { + + ctx, cancel := context.WithCancel(ctx) sp := &SecurePipe{ Duplex: Duplex{ In: make(chan []byte, bufsize), Out: make(chan []byte, bufsize), }, - local: local, - peers: peers, + local: local, + peers: peers, + insecure: insecure, + + ctx: ctx, + cancel: cancel, } + + if err := sp.handshake(); err != nil { + sp.Close() + return nil, err + } + return sp, nil } -// Wrap creates a secure connection on top of an insecure duplex channel. -func (s *SecurePipe) Wrap(ctx context.Context, insecure Duplex) error { - if s.ctx != nil { - return errors.New("Pipe in use") - } - - s.insecure = insecure - s.ctx, s.cancel = context.WithCancel(ctx) - - if err := s.handshake(); err != nil { - s.cancel() - return err - } - - return nil -} - // LocalPeer retrieves the local peer. func (s *SecurePipe) LocalPeer() *peer.Peer { return s.local @@ -76,11 +71,12 @@ func (s *SecurePipe) RemotePeer() *peer.Peer { // Close closes the secure pipe func (s *SecurePipe) Close() error { - if s.cancel == nil { - return errors.New("pipe already closed") + select { + case <-s.ctx.Done(): + return errors.New("already closed") + default: } s.cancel() - s.cancel = nil return nil } diff --git a/net/conn/conn.go b/net/conn/conn.go index 526f3c837..76c73aba1 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -45,6 +45,7 @@ type singleConn struct { // context + cancel ctx context.Context cancel context.CancelFunc + closed chan struct{} secure *spipe.SecurePipe insecure *msgioPipe @@ -66,6 +67,7 @@ func newSingleConn(ctx context.Context, local, remote *peer.Peer, maconn: maconn, ctx: ctx, cancel: cancel, + closed: make(chan struct{}), insecure: newMsgioPipe(10), msgpipe: msg.NewPipe(10), } @@ -92,20 +94,16 @@ func (c *singleConn) secureHandshake(peers peer.Peerstore) error { return errors.New("Conn is already secured or being secured.") } - var err error - c.secure, err = spipe.NewSecurePipe(c.ctx, 10, c.local, peers) - if err != nil { - return err - } - // setup a Duplex pipe for spipe insecure := spipe.Duplex{ In: c.insecure.incoming.MsgChan, Out: c.insecure.outgoing.MsgChan, } - // Wrap actually performs the secure handshake, which takes multiple RTT - if err := c.secure.Wrap(c.ctx, insecure); err != nil { + // spipe performs the secure handshake, which takes multiple RTT + var err error + c.secure, err = spipe.NewSecurePipe(c.ctx, 10, c.local, peers, insecure) + if err != nil { return err } @@ -166,29 +164,33 @@ func (c *singleConn) waitToClose(ctx context.Context) { // close underlying connection c.maconn.Close() - c.maconn = nil // closing channels c.insecure.outgoing.Close() c.secure.Close() close(c.msgpipe.Incoming) + close(c.closed) } -// IsOpen returns whether this Conn is open or closed. -func (c *singleConn) isOpen() bool { - return c.maconn != nil +// isClosed returns whether this Conn is open or closed. +func (c *singleConn) isClosed() bool { + select { + case <-c.closed: + return true + default: + return false + } } // Close closes the connection, and associated channels. func (c *singleConn) Close() error { log.Debug("%s closing Conn with %s", c.local, c.remote) - if !c.isOpen() { - return fmt.Errorf("Already closed") // already closed + if c.isClosed() { + return fmt.Errorf("connection already closed") } // cancel context. c.cancel() - c.cancel = nil return nil } @@ -278,6 +280,7 @@ type listener struct { // ctx + cancel func ctx context.Context cancel context.CancelFunc + closed chan struct{} } // waitToClose is needed to hand @@ -286,8 +289,17 @@ func (l *listener) waitToClose() { case <-l.ctx.Done(): } - l.cancel = nil l.Listener.Close() + close(l.closed) +} + +func (l *listener) isClosed() bool { + select { + case <-l.closed: + return true + default: + return false + } } func (l *listener) listen() { @@ -312,7 +324,7 @@ func (l *listener) listen() { if err != nil { // if cancel is nil we're closed. - if l.cancel == nil { + if l.isClosed() { return // done. } @@ -351,7 +363,12 @@ func (l *listener) Peerstore() peer.Peerstore { // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors func (l *listener) Close() error { + if l.isClosed() { + return errors.New("listener already closed") + } + l.cancel() + <-l.closed return nil } @@ -371,6 +388,7 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer l := &listener{ ctx: ctx, cancel: cancel, + closed: make(chan struct{}), Listener: ml, maddr: addr, peers: peers,