mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-03 07:18:12 +08:00
feat(core) supervise bootstrap connections
License: MIT Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
parent
80bc9251a6
commit
175da4f584
110
core/bootstrap.go
Normal file
110
core/bootstrap.go
Normal file
@ -0,0 +1,110 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
"github.com/jbenet/go-ipfs/config"
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
"github.com/jbenet/go-ipfs/peer"
|
||||
"github.com/jbenet/go-ipfs/routing/dht"
|
||||
)
|
||||
|
||||
const period time.Duration = 30 * time.Second
|
||||
const timeout time.Duration = period / 3
|
||||
|
||||
func superviseConnections(parent context.Context,
|
||||
n *IpfsNode,
|
||||
route *dht.IpfsDHT,
|
||||
store peer.Peerstore,
|
||||
peers []*config.BootstrapPeer) error {
|
||||
|
||||
for {
|
||||
ctx, _ := context.WithTimeout(parent, timeout)
|
||||
// TODO get config from disk so |peers| always reflects the latest
|
||||
// information
|
||||
if err := bootstrap(ctx, n.Network, route, store, peers); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
select {
|
||||
case <-parent.Done():
|
||||
return parent.Err()
|
||||
case <-time.Tick(period):
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func bootstrap(ctx context.Context,
|
||||
n inet.Network,
|
||||
r *dht.IpfsDHT,
|
||||
ps peer.Peerstore,
|
||||
boots []*config.BootstrapPeer) error {
|
||||
|
||||
var peers []peer.Peer
|
||||
for _, bootstrap := range boots {
|
||||
p, err := toPeer(ps, bootstrap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
peers = append(peers, p)
|
||||
}
|
||||
|
||||
var notConnected []peer.Peer
|
||||
for _, p := range peers {
|
||||
if !n.IsConnected(p) {
|
||||
notConnected = append(notConnected, p)
|
||||
}
|
||||
}
|
||||
for _, p := range notConnected {
|
||||
log.Infof("not connected to %v", p)
|
||||
}
|
||||
if err := connect(ctx, r, notConnected); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func connect(ctx context.Context, r *dht.IpfsDHT, peers []peer.Peer) error {
|
||||
var wg sync.WaitGroup
|
||||
for _, p := range peers {
|
||||
|
||||
// performed asynchronously because when performed synchronously, if
|
||||
// one `Connect` call hangs, subsequent calls are more likely to
|
||||
// fail/abort due to an expiring context.
|
||||
|
||||
wg.Add(1)
|
||||
go func(p peer.Peer) {
|
||||
defer wg.Done()
|
||||
err := r.Connect(ctx, p)
|
||||
if err != nil {
|
||||
log.Event(ctx, "bootstrapFailed", p)
|
||||
log.Criticalf("failed to bootstrap with %v", p)
|
||||
return
|
||||
}
|
||||
log.Event(ctx, "bootstrapSuccess", p)
|
||||
log.Infof("bootstrapped with %v", p)
|
||||
}(p)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func toPeer(ps peer.Peerstore, bootstrap *config.BootstrapPeer) (peer.Peer, error) {
|
||||
id, err := peer.DecodePrettyID(bootstrap.PeerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p, err := ps.FindOrCreate(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
maddr, err := ma.NewMultiaddr(bootstrap.Address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.AddAddress(maddr)
|
||||
return p, nil
|
||||
}
|
||||
39
core/core.go
39
core/core.go
@ -179,13 +179,13 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
|
||||
|
||||
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, blockstore, alwaysSendToPeer)
|
||||
|
||||
// TODO consider connection supervision into the Network. We've
|
||||
// TODO consider moving connection supervision into the Network. We've
|
||||
// discussed improvements to this Node constructor. One improvement
|
||||
// would be to make the node configurable, allowing clients to inject
|
||||
// an Exchange, Network, or Routing component and have the constructor
|
||||
// manage the wiring. In that scenario, this dangling function is a bit
|
||||
// awkward.
|
||||
go initConnections(ctx, n.Config.Bootstrap, n.Peerstore, dhtRouting)
|
||||
go superviseConnections(ctx, n, dhtRouting, n.Peerstore, n.Config.Bootstrap)
|
||||
}
|
||||
|
||||
// TODO(brian): when offline instantiate the BlockService with a bitswap
|
||||
@ -256,41 +256,6 @@ func initIdentity(cfg *config.Identity, peers peer.Peerstore, online bool) (peer
|
||||
return self, nil
|
||||
}
|
||||
|
||||
func initConnections(ctx context.Context, bootstrap []*config.BootstrapPeer, pstore peer.Peerstore, route *dht.IpfsDHT) {
|
||||
// TODO consider stricter error handling
|
||||
// TODO consider Criticalf error logging
|
||||
for _, p := range bootstrap {
|
||||
if p.PeerID == "" {
|
||||
log.Criticalf("error: peer does not include PeerID. %v", p)
|
||||
}
|
||||
|
||||
maddr, err := ma.NewMultiaddr(p.Address)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// setup peer
|
||||
id, err := peer.DecodePrettyID(p.PeerID)
|
||||
if err != nil {
|
||||
log.Criticalf("Bootstrapping error: %v", err)
|
||||
continue
|
||||
}
|
||||
npeer, err := pstore.FindOrCreate(id)
|
||||
if err != nil {
|
||||
log.Criticalf("Bootstrapping error: %v", err)
|
||||
continue
|
||||
}
|
||||
npeer.AddAddress(maddr)
|
||||
|
||||
if err := route.Connect(ctx, npeer); err != nil {
|
||||
log.Criticalf("Bootstrapping error: %v", err)
|
||||
continue
|
||||
}
|
||||
log.Event(ctx, "bootstrap", npeer)
|
||||
}
|
||||
}
|
||||
|
||||
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
|
||||
|
||||
var err error
|
||||
|
||||
Loading…
Reference in New Issue
Block a user