mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-26 04:47:45 +08:00
Merge pull request #1837 from ipfs/dial-smarter
refactor dialing to not panic, and to be smart about ordering
This commit is contained in:
commit
4de5eaad5f
@ -16,9 +16,6 @@ import (
|
||||
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
processctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
||||
ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
)
|
||||
|
||||
@ -371,87 +368,83 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel() // cancel work when we exit func
|
||||
|
||||
foundConn := make(chan struct{})
|
||||
conns := make(chan conn.Conn, len(remoteAddrs))
|
||||
conns := make(chan conn.Conn)
|
||||
errs := make(chan error, len(remoteAddrs))
|
||||
|
||||
// dialSingleAddr is used in the rate-limited async thing below.
|
||||
dialSingleAddr := func(addr ma.Multiaddr) {
|
||||
// rebind chans in scope so we can nil them out easily
|
||||
connsout := conns
|
||||
errsout := errs
|
||||
|
||||
connC, err := s.dialAddr(ctx, d, p, addr)
|
||||
if err != nil {
|
||||
connsout = nil
|
||||
} else if connC == nil {
|
||||
// NOTE: this really should never happen
|
||||
log.Errorf("failed to dial %s %s and got no error!", p, addr)
|
||||
err = fmt.Errorf("failed to dial %s %s", p, addr)
|
||||
connsout = nil
|
||||
} else {
|
||||
errsout = nil
|
||||
}
|
||||
|
||||
// check parent still wants our results
|
||||
select {
|
||||
case <-foundConn:
|
||||
case <-ctx.Done():
|
||||
if connC != nil {
|
||||
connC.Close()
|
||||
}
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
errs <- err
|
||||
} else if connC == nil {
|
||||
errs <- fmt.Errorf("failed to dial %s %s", p, addr)
|
||||
} else {
|
||||
conns <- connC
|
||||
case errsout <- err:
|
||||
case connsout <- connC:
|
||||
}
|
||||
}
|
||||
|
||||
// this whole thing is in a goroutine so we can use foundConn
|
||||
// to end early.
|
||||
go func() {
|
||||
// rate limiting just in case. at most 10 addrs at once.
|
||||
limiter := ratelimit.NewRateLimiter(process.Background(), 8)
|
||||
limiter.Go(func(worker process.Process) {
|
||||
// permute addrs so we try different sets first each time.
|
||||
for _, i := range rand.Perm(len(remoteAddrs)) {
|
||||
select {
|
||||
case <-foundConn: // if one of them succeeded already
|
||||
break
|
||||
case <-worker.Closing(): // our context was cancelled
|
||||
break
|
||||
default:
|
||||
}
|
||||
|
||||
workerAddr := remoteAddrs[i] // shadow variable to avoid race
|
||||
|
||||
// we have to do the waiting concurrently because there are addrs
|
||||
// that SHOULD NOT be rate limited (utp), nor blocked by other
|
||||
// rate limited addrs (tcp).
|
||||
//
|
||||
// (and we need to call `limiter.Go`, instead of `go` as required
|
||||
// by goproc/limiter semantics. note: limiter.Go is not LimitedGo.)
|
||||
limiter.Go(func(p process.Process) {
|
||||
|
||||
// returns whatever ratelimiting is acceptable for workerAddr.
|
||||
// may not rate limit at all.
|
||||
rl := s.addrDialRateLimit(workerAddr)
|
||||
rl <- struct{}{}
|
||||
|
||||
limiter.LimitedGo(func(worker process.Process) {
|
||||
dialSingleAddr(workerAddr)
|
||||
})
|
||||
|
||||
<-rl
|
||||
})
|
||||
limiter := make(chan struct{}, 8)
|
||||
// permute addrs so we try different sets first each time.
|
||||
for _, i := range rand.Perm(len(remoteAddrs)) {
|
||||
|
||||
addr := remoteAddrs[i]
|
||||
// returns whatever ratelimiting is acceptable for workerAddr.
|
||||
// may not rate limit at all.
|
||||
rl := s.addrDialRateLimit(addr)
|
||||
select {
|
||||
case <-ctx.Done(): // our context was cancelled
|
||||
return
|
||||
case rl <- struct{}{}:
|
||||
// take the token, move on
|
||||
}
|
||||
})
|
||||
|
||||
processctx.CloseAfterContext(limiter, ctx)
|
||||
select {
|
||||
case <-ctx.Done(): // our context was cancelled
|
||||
return
|
||||
case limiter <- struct{}{}:
|
||||
// take the token, move on
|
||||
}
|
||||
|
||||
go func(rlc <-chan struct{}, a ma.Multiaddr) {
|
||||
dialSingleAddr(a)
|
||||
<-limiter
|
||||
<-rlc
|
||||
}(rl, addr)
|
||||
}
|
||||
}()
|
||||
|
||||
// wair fot the results.
|
||||
// wair for the results.
|
||||
exitErr := fmt.Errorf("failed to dial %s", p)
|
||||
for i := 0; i < len(remoteAddrs); i++ {
|
||||
for range remoteAddrs {
|
||||
select {
|
||||
case exitErr = <-errs: //
|
||||
log.Debug("dial error: ", exitErr)
|
||||
case connC := <-conns:
|
||||
// take the first + return asap
|
||||
close(foundConn)
|
||||
return connC, nil
|
||||
case <-ctx.Done():
|
||||
// break out and return error
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil, exitErr
|
||||
|
||||
Loading…
Reference in New Issue
Block a user