diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 4beb986a3..a384dbed9 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -16,9 +16,6 @@ import ( ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" - process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" - processctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" - ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -371,87 +368,83 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel work when we exit func - foundConn := make(chan struct{}) - conns := make(chan conn.Conn, len(remoteAddrs)) + conns := make(chan conn.Conn) errs := make(chan error, len(remoteAddrs)) // dialSingleAddr is used in the rate-limited async thing below. dialSingleAddr := func(addr ma.Multiaddr) { + // rebind chans in scope so we can nil them out easily + connsout := conns + errsout := errs + connC, err := s.dialAddr(ctx, d, p, addr) + if err != nil { + connsout = nil + } else if connC == nil { + // NOTE: this really should never happen + log.Errorf("failed to dial %s %s and got no error!", p, addr) + err = fmt.Errorf("failed to dial %s %s", p, addr) + connsout = nil + } else { + errsout = nil + } // check parent still wants our results select { - case <-foundConn: + case <-ctx.Done(): if connC != nil { connC.Close() } - return - default: - } - - if err != nil { - errs <- err - } else if connC == nil { - errs <- fmt.Errorf("failed to dial %s %s", p, addr) - } else { - conns <- connC + case errsout <- err: + case connsout <- connC: } } // this whole thing is in a goroutine so we can use foundConn // to end early. go func() { - // rate limiting just in case. at most 10 addrs at once. - limiter := ratelimit.NewRateLimiter(process.Background(), 8) - limiter.Go(func(worker process.Process) { - // permute addrs so we try different sets first each time. - for _, i := range rand.Perm(len(remoteAddrs)) { - select { - case <-foundConn: // if one of them succeeded already - break - case <-worker.Closing(): // our context was cancelled - break - default: - } - - workerAddr := remoteAddrs[i] // shadow variable to avoid race - - // we have to do the waiting concurrently because there are addrs - // that SHOULD NOT be rate limited (utp), nor blocked by other - // rate limited addrs (tcp). - // - // (and we need to call `limiter.Go`, instead of `go` as required - // by goproc/limiter semantics. note: limiter.Go is not LimitedGo.) - limiter.Go(func(p process.Process) { - - // returns whatever ratelimiting is acceptable for workerAddr. - // may not rate limit at all. - rl := s.addrDialRateLimit(workerAddr) - rl <- struct{}{} - - limiter.LimitedGo(func(worker process.Process) { - dialSingleAddr(workerAddr) - }) - - <-rl - }) + limiter := make(chan struct{}, 8) + // permute addrs so we try different sets first each time. + for _, i := range rand.Perm(len(remoteAddrs)) { + addr := remoteAddrs[i] + // returns whatever ratelimiting is acceptable for workerAddr. + // may not rate limit at all. + rl := s.addrDialRateLimit(addr) + select { + case <-ctx.Done(): // our context was cancelled + return + case rl <- struct{}{}: + // take the token, move on } - }) - processctx.CloseAfterContext(limiter, ctx) + select { + case <-ctx.Done(): // our context was cancelled + return + case limiter <- struct{}{}: + // take the token, move on + } + + go func(rlc <-chan struct{}, a ma.Multiaddr) { + dialSingleAddr(a) + <-limiter + <-rlc + }(rl, addr) + } }() - // wair fot the results. + // wair for the results. exitErr := fmt.Errorf("failed to dial %s", p) - for i := 0; i < len(remoteAddrs); i++ { + for range remoteAddrs { select { case exitErr = <-errs: // log.Debug("dial error: ", exitErr) case connC := <-conns: // take the first + return asap - close(foundConn) return connC, nil + case <-ctx.Done(): + // break out and return error + break } } return nil, exitErr