mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-07 09:17:49 +08:00
p2p/net/swarm: dial backoff
This commit introduces a backoff when failing to dial peers. It makes everything much faster.
This commit is contained in:
parent
8de8ed0842
commit
70ceaa975d
@ -33,6 +33,7 @@ type Swarm struct {
|
||||
peers peer.Peerstore
|
||||
connh ConnHandler
|
||||
dsync dialsync
|
||||
backf dialbackoff
|
||||
|
||||
cg ctxgroup.ContextGroup
|
||||
}
|
||||
@ -50,10 +51,10 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
|
||||
}
|
||||
|
||||
s := &Swarm{
|
||||
swarm: ps.NewSwarm(PSTransport),
|
||||
local: local,
|
||||
peers: peers,
|
||||
cg: ctxgroup.WithContext(ctx),
|
||||
swarm: ps.NewSwarm(PSTransport),
|
||||
local: local,
|
||||
peers: peers,
|
||||
cg: ctxgroup.WithContext(ctx),
|
||||
}
|
||||
|
||||
// configure Swarm
|
||||
|
||||
@ -96,6 +96,71 @@ func (ds *dialsync) Unlock(dst peer.ID) {
|
||||
ds.lock.Unlock()
|
||||
}
|
||||
|
||||
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
|
||||
// Whenever we totally time out on a peer (all three attempts), we add them
|
||||
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
|
||||
// check dialbackoff. If it's there, they don't wait and exit promptly with
|
||||
// an error. (the single goroutine that is actually dialing continues to
|
||||
// dial). If a dial is successful, the peer is removed from backoff.
|
||||
// Example:
|
||||
//
|
||||
// for {
|
||||
// if ok, wait := dialsync.Lock(p); !ok {
|
||||
// if backoff.Backoff(p) {
|
||||
// return errDialFailed
|
||||
// }
|
||||
// <-wait
|
||||
// continue
|
||||
// }
|
||||
// defer dialsync.Unlock(p)
|
||||
// c, err := actuallyDial(p)
|
||||
// if err != nil {
|
||||
// dialbackoff.AddBackoff(p)
|
||||
// continue
|
||||
// }
|
||||
// dialbackoff.Clear(p)
|
||||
// }
|
||||
//
|
||||
type dialbackoff struct {
|
||||
entries map[peer.ID]struct{}
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func (db *dialbackoff) init() {
|
||||
if db.entries == nil {
|
||||
db.entries = make(map[peer.ID]struct{})
|
||||
}
|
||||
}
|
||||
|
||||
// Backoff returns whether the client should backoff from dialing
|
||||
// peeer p
|
||||
func (db *dialbackoff) Backoff(p peer.ID) bool {
|
||||
db.lock.Lock()
|
||||
db.init()
|
||||
_, found := db.entries[p]
|
||||
db.lock.Unlock()
|
||||
return found
|
||||
}
|
||||
|
||||
// AddBackoff lets other nodes know that we've entered backoff with
|
||||
// peer p, so dialers should not wait unnecessarily. We still will
|
||||
// attempt to dial with one goroutine, in case we get through.
|
||||
func (db *dialbackoff) AddBackoff(p peer.ID) {
|
||||
db.lock.Lock()
|
||||
db.init()
|
||||
db.entries[p] = struct{}{}
|
||||
db.lock.Unlock()
|
||||
}
|
||||
|
||||
// Clear removes a backoff record. Clients should call this after a
|
||||
// successful Dial.
|
||||
func (db *dialbackoff) Clear(p peer.ID) {
|
||||
db.lock.Lock()
|
||||
db.init()
|
||||
delete(db.entries, p)
|
||||
db.lock.Unlock()
|
||||
}
|
||||
|
||||
// Dial connects to a peer.
|
||||
//
|
||||
// The idea is that the client of Swarm does not need to know what network
|
||||
@ -103,6 +168,7 @@ func (ds *dialsync) Unlock(dst peer.ID) {
|
||||
// This allows us to use various transport protocols, do NAT traversal/relay,
|
||||
// etc. to achive connection.
|
||||
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
||||
log := log.Prefix("swarm %s dialing %s", s.local, p)
|
||||
if p == s.local {
|
||||
return nil, errors.New("Attempted connection to self!")
|
||||
}
|
||||
@ -126,7 +192,13 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
||||
|
||||
// check if there's an ongoing dial to this peer
|
||||
if ok, wait := s.dsync.Lock(p); !ok {
|
||||
log.Debugf("swarm %s dialing %s -- waiting for ongoing dial", s.local, p)
|
||||
|
||||
if s.backf.Backoff(p) {
|
||||
log.Debugf("backoff")
|
||||
return nil, fmt.Errorf("%s failed to dial %s, backing off.", s.local, p)
|
||||
}
|
||||
|
||||
log.Debugf("waiting for ongoing dial")
|
||||
select {
|
||||
case <-wait: // wait for that dial to finish.
|
||||
continue // and see if it worked (loop), OR we got an incoming dial.
|
||||
@ -137,14 +209,17 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
|
||||
|
||||
// ok, we have been charged to dial! let's do it.
|
||||
// if it succeeds, dial will add the conn to the swarm itself.
|
||||
log.Debugf("swarm %s dialing %s -- dial start", s.local, p)
|
||||
log.Debugf("dial start")
|
||||
ctxT, _ := context.WithTimeout(ctx, DialTimeout)
|
||||
conn, err = s.dial(ctxT, p)
|
||||
s.dsync.Unlock(p)
|
||||
log.Debugf("swarm %s dialing %s -- dial end %s", s.local, p, conn)
|
||||
log.Debugf("dial end %s", conn)
|
||||
if err != nil {
|
||||
s.backf.AddBackoff(p) // let others know to backoff
|
||||
|
||||
continue // ok, we failed. try again. (if loop is done, our error is output)
|
||||
}
|
||||
s.backf.Clear(p) // okay, no longer need to backoff
|
||||
return conn, nil
|
||||
}
|
||||
if err == nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user