mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-05 16:28:06 +08:00
Merge pull request #273 from jbenet/fix-data-races
Fix data races in #270
This commit is contained in:
commit
9d7e0bbd4b
@ -149,8 +149,6 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro
|
||||
|
||||
// TODO simplify this test. get to the _essence_!
|
||||
func TestSendToWantingPeer(t *testing.T) {
|
||||
util.Debug = true
|
||||
|
||||
net := tn.VirtualNetwork()
|
||||
rs := mock.VirtualRoutingServer()
|
||||
sg := NewSessionGenerator(net, rs)
|
||||
|
||||
@ -239,17 +239,17 @@ func TestFastRepublish(t *testing.T) {
|
||||
}
|
||||
|
||||
// constantly keep writing to the file
|
||||
go func() {
|
||||
go func(timeout time.Duration) {
|
||||
for {
|
||||
select {
|
||||
case <-closed:
|
||||
return
|
||||
|
||||
case <-time.After(shortRepublishTimeout * 8 / 10):
|
||||
case <-time.After(timeout * 8 / 10):
|
||||
writeFileData(t, dataB, fname)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}(shortRepublishTimeout)
|
||||
|
||||
hasPublished := func() bool {
|
||||
res, err := node.Namesys.Resolve(pubkeyHash)
|
||||
|
||||
@ -89,13 +89,13 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
|
||||
log.Info("newSingleConn: %v to %v", local, remote)
|
||||
|
||||
// setup the various io goroutines
|
||||
conn.Children().Add(1)
|
||||
go func() {
|
||||
conn.Children().Add(1)
|
||||
conn.msgio.outgoing.WriteTo(maconn)
|
||||
conn.Children().Done()
|
||||
}()
|
||||
conn.Children().Add(1)
|
||||
go func() {
|
||||
conn.Children().Add(1)
|
||||
conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize)
|
||||
conn.Children().Done()
|
||||
}()
|
||||
|
||||
@ -47,7 +47,6 @@ func (l *listener) close() error {
|
||||
}
|
||||
|
||||
func (l *listener) listen() {
|
||||
l.Children().Add(1)
|
||||
defer l.Children().Done()
|
||||
|
||||
// handle at most chansize concurrent handshakes
|
||||
@ -143,6 +142,7 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.
|
||||
ctx2, _ := context.WithCancel(ctx)
|
||||
l.ContextCloser = ctxc.NewContextCloser(ctx2, l.close)
|
||||
|
||||
l.Children().Add(1)
|
||||
go l.listen()
|
||||
|
||||
return l, nil
|
||||
|
||||
@ -57,6 +57,8 @@ func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (*
|
||||
if conns != nil && len(conns) > 0 {
|
||||
c.Add(conns...)
|
||||
}
|
||||
|
||||
c.Children().Add(1)
|
||||
go c.fanOut()
|
||||
return c, nil
|
||||
}
|
||||
@ -81,6 +83,8 @@ func (c *MultiConn) Add(conns ...Conn) {
|
||||
}
|
||||
|
||||
c.conns[c2.ID()] = c2
|
||||
c.Children().Add(1)
|
||||
c2.Children().Add(1) // yep, on the child too.
|
||||
go c.fanInSingle(c2)
|
||||
log.Infof("MultiConn: added %s", c2)
|
||||
}
|
||||
@ -134,7 +138,6 @@ func CloseConns(conns ...Conn) {
|
||||
// fanOut is the multiplexor out -- it sends outgoing messages over the
|
||||
// underlying single connections.
|
||||
func (c *MultiConn) fanOut() {
|
||||
c.Children().Add(1)
|
||||
defer c.Children().Done()
|
||||
|
||||
i := 0
|
||||
@ -165,9 +168,6 @@ func (c *MultiConn) fanOut() {
|
||||
// fanInSingle is a multiplexor in -- it receives incoming messages over the
|
||||
// underlying single connections.
|
||||
func (c *MultiConn) fanInSingle(child Conn) {
|
||||
c.Children().Add(1)
|
||||
child.Children().Add(1) // yep, on the child too.
|
||||
|
||||
// cleanup all data associated with this child Connection.
|
||||
defer func() {
|
||||
log.Infof("closing: %s", child)
|
||||
@ -297,6 +297,8 @@ func (c *MultiConn) Out() chan<- []byte {
|
||||
}
|
||||
|
||||
func (c *MultiConn) GetError() error {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
for _, sub := range c.conns {
|
||||
err := sub.GetError()
|
||||
if err != nil {
|
||||
|
||||
@ -3,6 +3,7 @@ package mux
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -121,6 +122,7 @@ func TestSimultMuxer(t *testing.T) {
|
||||
total := 10000
|
||||
speed := time.Microsecond * 1
|
||||
counts := [2][2][2]int{}
|
||||
var countsLock sync.Mutex
|
||||
|
||||
// run producers at every end sending incrementing messages
|
||||
produceOut := func(pid pb.ProtocolID, size int) {
|
||||
@ -130,7 +132,9 @@ func TestSimultMuxer(t *testing.T) {
|
||||
s := fmt.Sprintf("proto %v out %v", pid, i)
|
||||
m := msg.New(peer1, []byte(s))
|
||||
mux1.Protocols[pid].GetPipe().Outgoing <- m
|
||||
countsLock.Lock()
|
||||
counts[pid][0][0]++
|
||||
countsLock.Unlock()
|
||||
// log.Debug("sent %v", s)
|
||||
}
|
||||
}
|
||||
@ -147,7 +151,9 @@ func TestSimultMuxer(t *testing.T) {
|
||||
|
||||
m := msg.New(peer1, d)
|
||||
mux1.Incoming <- m
|
||||
countsLock.Lock()
|
||||
counts[pid][1][0]++
|
||||
countsLock.Unlock()
|
||||
// log.Debug("sent %v", s)
|
||||
}
|
||||
}
|
||||
@ -163,7 +169,9 @@ func TestSimultMuxer(t *testing.T) {
|
||||
|
||||
// log.Debug("got %v", string(data))
|
||||
_ = data
|
||||
countsLock.Lock()
|
||||
counts[pid][1][1]++
|
||||
countsLock.Unlock()
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@ -175,7 +183,9 @@ func TestSimultMuxer(t *testing.T) {
|
||||
for {
|
||||
select {
|
||||
case m := <-mux1.Protocols[pid].GetPipe().Incoming:
|
||||
countsLock.Lock()
|
||||
counts[pid][0][1]++
|
||||
countsLock.Unlock()
|
||||
// log.Debug("got %v", string(m.Data()))
|
||||
_ = m
|
||||
case <-ctx.Done():
|
||||
@ -195,10 +205,12 @@ func TestSimultMuxer(t *testing.T) {
|
||||
limiter := time.Tick(speed)
|
||||
for {
|
||||
<-limiter
|
||||
countsLock.Lock()
|
||||
got := counts[0][0][0] + counts[0][0][1] +
|
||||
counts[0][1][0] + counts[0][1][1] +
|
||||
counts[1][0][0] + counts[1][0][1] +
|
||||
counts[1][1][0] + counts[1][1][1]
|
||||
countsLock.Unlock()
|
||||
|
||||
if got == total*8 {
|
||||
cancel()
|
||||
|
||||
@ -139,6 +139,8 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
|
||||
s.connsLock.Unlock()
|
||||
|
||||
// kick off reader goroutine
|
||||
s.Children().Add(1)
|
||||
mc.Children().Add(1) // child of Conn as well.
|
||||
go s.fanInSingle(mc)
|
||||
log.Debugf("added new multiconn: %s", mc)
|
||||
} else {
|
||||
@ -154,7 +156,6 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
|
||||
|
||||
// Handles the unwrapping + sending of messages to the right connection.
|
||||
func (s *Swarm) fanOut() {
|
||||
s.Children().Add(1)
|
||||
defer s.Children().Done()
|
||||
|
||||
i := 0
|
||||
@ -194,9 +195,6 @@ func (s *Swarm) fanOut() {
|
||||
// Handles the receiving + wrapping of messages, per conn.
|
||||
// Consider using reflect.Select with one goroutine instead of n.
|
||||
func (s *Swarm) fanInSingle(c conn.Conn) {
|
||||
s.Children().Add(1)
|
||||
c.Children().Add(1) // child of Conn as well.
|
||||
|
||||
// cleanup all data associated with this child Connection.
|
||||
defer func() {
|
||||
// remove it from the map.
|
||||
|
||||
@ -83,6 +83,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer,
|
||||
// ContextCloser for proper child management.
|
||||
s.ContextCloser = ctxc.NewContextCloser(ctx, s.close)
|
||||
|
||||
s.Children().Add(1)
|
||||
go s.fanOut()
|
||||
return s, s.listen(listenAddrs)
|
||||
}
|
||||
|
||||
@ -417,14 +417,14 @@ func TestConnectCollision(t *testing.T) {
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
_, err = dhtA.Connect(ctx, peerB)
|
||||
_, err := dhtA.Connect(ctx, peerB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
go func() {
|
||||
_, err = dhtB.Connect(ctx, peerA)
|
||||
_, err := dhtB.Connect(ctx, peerA)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@ import (
|
||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -28,15 +29,24 @@ type mesHandleFunc func(msg.NetMessage) msg.NetMessage
|
||||
// fauxNet is a standin for a swarm.Network in order to more easily recreate
|
||||
// different testing scenarios
|
||||
type fauxSender struct {
|
||||
sync.Mutex
|
||||
handlers []mesHandleFunc
|
||||
}
|
||||
|
||||
func (f *fauxSender) AddHandler(fn func(msg.NetMessage) msg.NetMessage) {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
f.handlers = append(f.handlers, fn)
|
||||
}
|
||||
|
||||
func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
|
||||
for _, h := range f.handlers {
|
||||
f.Lock()
|
||||
handlers := make([]mesHandleFunc, len(f.handlers))
|
||||
copy(handlers, f.handlers)
|
||||
f.Unlock()
|
||||
|
||||
for _, h := range handlers {
|
||||
reply := h(m)
|
||||
if reply != nil {
|
||||
return reply, nil
|
||||
@ -52,7 +62,12 @@ func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.Net
|
||||
}
|
||||
|
||||
func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error {
|
||||
for _, h := range f.handlers {
|
||||
f.Lock()
|
||||
handlers := make([]mesHandleFunc, len(f.handlers))
|
||||
copy(handlers, f.handlers)
|
||||
f.Unlock()
|
||||
|
||||
for _, h := range handlers {
|
||||
reply := h(m)
|
||||
if reply != nil {
|
||||
return nil
|
||||
|
||||
@ -120,6 +120,7 @@ func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser {
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
c.Children().Add(1) // we're a child goroutine, to be waited upon.
|
||||
go c.closeOnContextDone()
|
||||
return c
|
||||
}
|
||||
@ -176,7 +177,6 @@ func (c *contextCloser) closeLogic() {
|
||||
// we need to go through the Close motions anyway. Hence all the sync
|
||||
// stuff all over the place...
|
||||
func (c *contextCloser) closeOnContextDone() {
|
||||
c.Children().Add(1) // we're a child goroutine, to be waited upon.
|
||||
<-c.Context().Done() // wait until parent (context) is done.
|
||||
c.internalClose()
|
||||
c.Children().Done()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user