diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 5ee049441..c7ac6f993 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -15,6 +15,9 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" + ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" ) // Diagram of dial sync: @@ -353,43 +356,60 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote conns := make(chan conn.Conn, len(remoteAddrs)) errs := make(chan error, len(remoteAddrs)) - //TODO: rate limiting just in case? - for _, addr := range remoteAddrs { - go func(addr ma.Multiaddr) { - connC, err := s.dialAddr(ctx, d, p, addr) + // dialSingleAddr is used in the rate-limited async thing below. + dialSingleAddr := func(addr ma.Multiaddr) { + connC, err := s.dialAddr(ctx, d, p, addr) - // check parent still wants our results - select { - case <-foundConn: - if connC != nil { - connC.Close() - } - return - default: + // check parent still wants our results + select { + case <-foundConn: + if connC != nil { + connC.Close() } + return + default: + } - if err != nil { - errs <- err - } else if connC == nil { - errs <- fmt.Errorf("failed to dial %s %s", p, addr) - } else { - conns <- connC - } - }(addr) + if err != nil { + errs <- err + } else if connC == nil { + errs <- fmt.Errorf("failed to dial %s %s", p, addr) + } else { + conns <- connC + } } - err := fmt.Errorf("failed to dial %s", p) + // this whole thing is in a goroutine so we can use foundConn + // to end early. + go func() { + // rate limiting just in case. at most 10 addrs at once. + limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10) + for _, addr := range remoteAddrs { + select { + case <-foundConn: // if one of them succeeded already + break + default: + } + workerAddr := addr // shadow variable to avoid race + limiter.Go(func(worker process.Process) { + dialSingleAddr(workerAddr) + }) + } + }() + + // wair fot the results. + exitErr := fmt.Errorf("failed to dial %s", p) for i := 0; i < len(remoteAddrs); i++ { select { - case err = <-errs: - log.Debug(err) + case exitErr = <-errs: // + log.Debug(exitErr) case connC := <-conns: // take the first + return asap close(foundConn) return connC, nil } } - return nil, err + return nil, exitErr } func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {