mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-10 18:57:57 +08:00
p2p/net/swarm: rate limit dials. max of 10 addrs at a time.
This will mitigate the fd explosion, but slow down dials majorly as any peer with more addresses than the rate limit will have to wait a whole dial timeout (~15s)
This commit is contained in:
parent
e908effb4b
commit
793048d310
@ -15,6 +15,9 @@ import (
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
||||
ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
|
||||
)
|
||||
|
||||
// Diagram of dial sync:
|
||||
@ -353,43 +356,60 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
|
||||
conns := make(chan conn.Conn, len(remoteAddrs))
|
||||
errs := make(chan error, len(remoteAddrs))
|
||||
|
||||
//TODO: rate limiting just in case?
|
||||
for _, addr := range remoteAddrs {
|
||||
go func(addr ma.Multiaddr) {
|
||||
connC, err := s.dialAddr(ctx, d, p, addr)
|
||||
// dialSingleAddr is used in the rate-limited async thing below.
|
||||
dialSingleAddr := func(addr ma.Multiaddr) {
|
||||
connC, err := s.dialAddr(ctx, d, p, addr)
|
||||
|
||||
// check parent still wants our results
|
||||
select {
|
||||
case <-foundConn:
|
||||
if connC != nil {
|
||||
connC.Close()
|
||||
}
|
||||
return
|
||||
default:
|
||||
// check parent still wants our results
|
||||
select {
|
||||
case <-foundConn:
|
||||
if connC != nil {
|
||||
connC.Close()
|
||||
}
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
errs <- err
|
||||
} else if connC == nil {
|
||||
errs <- fmt.Errorf("failed to dial %s %s", p, addr)
|
||||
} else {
|
||||
conns <- connC
|
||||
}
|
||||
}(addr)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
} else if connC == nil {
|
||||
errs <- fmt.Errorf("failed to dial %s %s", p, addr)
|
||||
} else {
|
||||
conns <- connC
|
||||
}
|
||||
}
|
||||
|
||||
err := fmt.Errorf("failed to dial %s", p)
|
||||
// this whole thing is in a goroutine so we can use foundConn
|
||||
// to end early.
|
||||
go func() {
|
||||
// rate limiting just in case. at most 10 addrs at once.
|
||||
limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10)
|
||||
for _, addr := range remoteAddrs {
|
||||
select {
|
||||
case <-foundConn: // if one of them succeeded already
|
||||
break
|
||||
default:
|
||||
}
|
||||
workerAddr := addr // shadow variable to avoid race
|
||||
limiter.Go(func(worker process.Process) {
|
||||
dialSingleAddr(workerAddr)
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
// wair fot the results.
|
||||
exitErr := fmt.Errorf("failed to dial %s", p)
|
||||
for i := 0; i < len(remoteAddrs); i++ {
|
||||
select {
|
||||
case err = <-errs:
|
||||
log.Debug(err)
|
||||
case exitErr = <-errs: //
|
||||
log.Debug(exitErr)
|
||||
case connC := <-conns:
|
||||
// take the first + return asap
|
||||
close(foundConn)
|
||||
return connC, nil
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
return nil, exitErr
|
||||
}
|
||||
|
||||
func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user