diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index f34ea3c84..4a01444e5 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -149,8 +149,6 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro // TODO simplify this test. get to the _essence_! func TestSendToWantingPeer(t *testing.T) { - util.Debug = true - net := tn.VirtualNetwork() rs := mock.VirtualRoutingServer() sg := NewSessionGenerator(net, rs) diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index 2d02199b9..25b3bf9f5 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -239,17 +239,17 @@ func TestFastRepublish(t *testing.T) { } // constantly keep writing to the file - go func() { + go func(timeout time.Duration) { for { select { case <-closed: return - case <-time.After(shortRepublishTimeout * 8 / 10): + case <-time.After(timeout * 8 / 10): writeFileData(t, dataB, fname) } } - }() + }(shortRepublishTimeout) hasPublished := func() bool { res, err := node.Namesys.Resolve(pubkeyHash) diff --git a/net/conn/conn.go b/net/conn/conn.go index 4acafff47..92bf2259b 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -89,13 +89,13 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer, log.Info("newSingleConn: %v to %v", local, remote) // setup the various io goroutines + conn.Children().Add(1) go func() { - conn.Children().Add(1) conn.msgio.outgoing.WriteTo(maconn) conn.Children().Done() }() + conn.Children().Add(1) go func() { - conn.Children().Add(1) conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize) conn.Children().Done() }() diff --git a/net/conn/listen.go b/net/conn/listen.go index 41335ee88..4cbe9a766 100644 --- a/net/conn/listen.go +++ b/net/conn/listen.go @@ -47,7 +47,6 @@ func (l *listener) close() error { } func (l *listener) listen() { - l.Children().Add(1) defer l.Children().Done() // handle at most chansize concurrent handshakes @@ -143,6 +142,7 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer. ctx2, _ := context.WithCancel(ctx) l.ContextCloser = ctxc.NewContextCloser(ctx2, l.close) + l.Children().Add(1) go l.listen() return l, nil diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index 885ad6008..6c8752308 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -57,6 +57,8 @@ func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (* if conns != nil && len(conns) > 0 { c.Add(conns...) } + + c.Children().Add(1) go c.fanOut() return c, nil } @@ -81,6 +83,8 @@ func (c *MultiConn) Add(conns ...Conn) { } c.conns[c2.ID()] = c2 + c.Children().Add(1) + c2.Children().Add(1) // yep, on the child too. go c.fanInSingle(c2) log.Infof("MultiConn: added %s", c2) } @@ -134,7 +138,6 @@ func CloseConns(conns ...Conn) { // fanOut is the multiplexor out -- it sends outgoing messages over the // underlying single connections. func (c *MultiConn) fanOut() { - c.Children().Add(1) defer c.Children().Done() i := 0 @@ -165,9 +168,6 @@ func (c *MultiConn) fanOut() { // fanInSingle is a multiplexor in -- it receives incoming messages over the // underlying single connections. func (c *MultiConn) fanInSingle(child Conn) { - c.Children().Add(1) - child.Children().Add(1) // yep, on the child too. - // cleanup all data associated with this child Connection. defer func() { log.Infof("closing: %s", child) @@ -297,6 +297,8 @@ func (c *MultiConn) Out() chan<- []byte { } func (c *MultiConn) GetError() error { + c.RLock() + defer c.RUnlock() for _, sub := range c.conns { err := sub.GetError() if err != nil { diff --git a/net/mux/mux_test.go b/net/mux/mux_test.go index 3b0235820..7ce096247 100644 --- a/net/mux/mux_test.go +++ b/net/mux/mux_test.go @@ -3,6 +3,7 @@ package mux import ( "bytes" "fmt" + "sync" "testing" "time" @@ -121,6 +122,7 @@ func TestSimultMuxer(t *testing.T) { total := 10000 speed := time.Microsecond * 1 counts := [2][2][2]int{} + var countsLock sync.Mutex // run producers at every end sending incrementing messages produceOut := func(pid pb.ProtocolID, size int) { @@ -130,7 +132,9 @@ func TestSimultMuxer(t *testing.T) { s := fmt.Sprintf("proto %v out %v", pid, i) m := msg.New(peer1, []byte(s)) mux1.Protocols[pid].GetPipe().Outgoing <- m + countsLock.Lock() counts[pid][0][0]++ + countsLock.Unlock() // log.Debug("sent %v", s) } } @@ -147,7 +151,9 @@ func TestSimultMuxer(t *testing.T) { m := msg.New(peer1, d) mux1.Incoming <- m + countsLock.Lock() counts[pid][1][0]++ + countsLock.Unlock() // log.Debug("sent %v", s) } } @@ -163,7 +169,9 @@ func TestSimultMuxer(t *testing.T) { // log.Debug("got %v", string(data)) _ = data + countsLock.Lock() counts[pid][1][1]++ + countsLock.Unlock() case <-ctx.Done(): return @@ -175,7 +183,9 @@ func TestSimultMuxer(t *testing.T) { for { select { case m := <-mux1.Protocols[pid].GetPipe().Incoming: + countsLock.Lock() counts[pid][0][1]++ + countsLock.Unlock() // log.Debug("got %v", string(m.Data())) _ = m case <-ctx.Done(): @@ -195,10 +205,12 @@ func TestSimultMuxer(t *testing.T) { limiter := time.Tick(speed) for { <-limiter + countsLock.Lock() got := counts[0][0][0] + counts[0][0][1] + counts[0][1][0] + counts[0][1][1] + counts[1][0][0] + counts[1][0][1] + counts[1][1][0] + counts[1][1][1] + countsLock.Unlock() if got == total*8 { cancel() diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 63f6910a4..aa160b924 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -139,6 +139,8 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { s.connsLock.Unlock() // kick off reader goroutine + s.Children().Add(1) + mc.Children().Add(1) // child of Conn as well. go s.fanInSingle(mc) log.Debugf("added new multiconn: %s", mc) } else { @@ -154,7 +156,6 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { // Handles the unwrapping + sending of messages to the right connection. func (s *Swarm) fanOut() { - s.Children().Add(1) defer s.Children().Done() i := 0 @@ -194,9 +195,6 @@ func (s *Swarm) fanOut() { // Handles the receiving + wrapping of messages, per conn. // Consider using reflect.Select with one goroutine instead of n. func (s *Swarm) fanInSingle(c conn.Conn) { - s.Children().Add(1) - c.Children().Add(1) // child of Conn as well. - // cleanup all data associated with this child Connection. defer func() { // remove it from the map. diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 59279ed32..2f022db00 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -83,6 +83,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, // ContextCloser for proper child management. s.ContextCloser = ctxc.NewContextCloser(ctx, s.close) + s.Children().Add(1) go s.fanOut() return s, s.listen(listenAddrs) } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 3748e6519..ffbbef819 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -417,14 +417,14 @@ func TestConnectCollision(t *testing.T) { done := make(chan struct{}) go func() { - _, err = dhtA.Connect(ctx, peerB) + _, err := dhtA.Connect(ctx, peerB) if err != nil { t.Fatal(err) } done <- struct{}{} }() go func() { - _, err = dhtB.Connect(ctx, peerA) + _, err := dhtB.Connect(ctx, peerA) if err != nil { t.Fatal(err) } diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 19275338d..a6d1d933d 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -16,6 +16,7 @@ import ( pb "github.com/jbenet/go-ipfs/routing/dht/pb" u "github.com/jbenet/go-ipfs/util" + "sync" "time" ) @@ -28,15 +29,24 @@ type mesHandleFunc func(msg.NetMessage) msg.NetMessage // fauxNet is a standin for a swarm.Network in order to more easily recreate // different testing scenarios type fauxSender struct { + sync.Mutex handlers []mesHandleFunc } func (f *fauxSender) AddHandler(fn func(msg.NetMessage) msg.NetMessage) { + f.Lock() + defer f.Unlock() + f.handlers = append(f.handlers, fn) } func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) { - for _, h := range f.handlers { + f.Lock() + handlers := make([]mesHandleFunc, len(f.handlers)) + copy(handlers, f.handlers) + f.Unlock() + + for _, h := range handlers { reply := h(m) if reply != nil { return reply, nil @@ -52,7 +62,12 @@ func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.Net } func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error { - for _, h := range f.handlers { + f.Lock() + handlers := make([]mesHandleFunc, len(f.handlers)) + copy(handlers, f.handlers) + f.Unlock() + + for _, h := range handlers { reply := h(m) if reply != nil { return nil diff --git a/util/ctxcloser/closer.go b/util/ctxcloser/closer.go index ca80368ea..5baf6591d 100644 --- a/util/ctxcloser/closer.go +++ b/util/ctxcloser/closer.go @@ -120,6 +120,7 @@ func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser { closed: make(chan struct{}), } + c.Children().Add(1) // we're a child goroutine, to be waited upon. go c.closeOnContextDone() return c } @@ -176,7 +177,6 @@ func (c *contextCloser) closeLogic() { // we need to go through the Close motions anyway. Hence all the sync // stuff all over the place... func (c *contextCloser) closeOnContextDone() { - c.Children().Add(1) // we're a child goroutine, to be waited upon. <-c.Context().Done() // wait until parent (context) is done. c.internalClose() c.Children().Done()