From 175da4f584a1790afdaa9d0d237643ac6c3c675d Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Mon, 8 Dec 2014 05:11:04 -0800 Subject: [PATCH] feat(core) supervise bootstrap connections License: MIT Signed-off-by: Brian Tiger Chow --- core/bootstrap.go | 110 ++++++++++++++++++++++++++++++++++++++++++++++ core/core.go | 39 +--------------- 2 files changed, 112 insertions(+), 37 deletions(-) create mode 100644 core/bootstrap.go diff --git a/core/bootstrap.go b/core/bootstrap.go new file mode 100644 index 000000000..afd00f4a3 --- /dev/null +++ b/core/bootstrap.go @@ -0,0 +1,110 @@ +package core + +import ( + "sync" + "time" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + "github.com/jbenet/go-ipfs/config" + inet "github.com/jbenet/go-ipfs/net" + "github.com/jbenet/go-ipfs/peer" + "github.com/jbenet/go-ipfs/routing/dht" +) + +const period time.Duration = 30 * time.Second +const timeout time.Duration = period / 3 + +func superviseConnections(parent context.Context, + n *IpfsNode, + route *dht.IpfsDHT, + store peer.Peerstore, + peers []*config.BootstrapPeer) error { + + for { + ctx, _ := context.WithTimeout(parent, timeout) + // TODO get config from disk so |peers| always reflects the latest + // information + if err := bootstrap(ctx, n.Network, route, store, peers); err != nil { + log.Error(err) + } + select { + case <-parent.Done(): + return parent.Err() + case <-time.Tick(period): + } + } + return nil +} + +func bootstrap(ctx context.Context, + n inet.Network, + r *dht.IpfsDHT, + ps peer.Peerstore, + boots []*config.BootstrapPeer) error { + + var peers []peer.Peer + for _, bootstrap := range boots { + p, err := toPeer(ps, bootstrap) + if err != nil { + return err + } + peers = append(peers, p) + } + + var notConnected []peer.Peer + for _, p := range peers { + if !n.IsConnected(p) { + notConnected = append(notConnected, p) + } + } + for _, p := range notConnected { + log.Infof("not connected to %v", p) + } + if err := connect(ctx, r, notConnected); err != nil { + return err + } + return nil +} + +func connect(ctx context.Context, r *dht.IpfsDHT, peers []peer.Peer) error { + var wg sync.WaitGroup + for _, p := range peers { + + // performed asynchronously because when performed synchronously, if + // one `Connect` call hangs, subsequent calls are more likely to + // fail/abort due to an expiring context. + + wg.Add(1) + go func(p peer.Peer) { + defer wg.Done() + err := r.Connect(ctx, p) + if err != nil { + log.Event(ctx, "bootstrapFailed", p) + log.Criticalf("failed to bootstrap with %v", p) + return + } + log.Event(ctx, "bootstrapSuccess", p) + log.Infof("bootstrapped with %v", p) + }(p) + } + wg.Wait() + return nil +} + +func toPeer(ps peer.Peerstore, bootstrap *config.BootstrapPeer) (peer.Peer, error) { + id, err := peer.DecodePrettyID(bootstrap.PeerID) + if err != nil { + return nil, err + } + p, err := ps.FindOrCreate(id) + if err != nil { + return nil, err + } + maddr, err := ma.NewMultiaddr(bootstrap.Address) + if err != nil { + return nil, err + } + p.AddAddress(maddr) + return p, nil +} diff --git a/core/core.go b/core/core.go index aadade420..4136d8f0a 100644 --- a/core/core.go +++ b/core/core.go @@ -179,13 +179,13 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) { n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, blockstore, alwaysSendToPeer) - // TODO consider connection supervision into the Network. We've + // TODO consider moving connection supervision into the Network. We've // discussed improvements to this Node constructor. One improvement // would be to make the node configurable, allowing clients to inject // an Exchange, Network, or Routing component and have the constructor // manage the wiring. In that scenario, this dangling function is a bit // awkward. - go initConnections(ctx, n.Config.Bootstrap, n.Peerstore, dhtRouting) + go superviseConnections(ctx, n, dhtRouting, n.Peerstore, n.Config.Bootstrap) } // TODO(brian): when offline instantiate the BlockService with a bitswap @@ -256,41 +256,6 @@ func initIdentity(cfg *config.Identity, peers peer.Peerstore, online bool) (peer return self, nil } -func initConnections(ctx context.Context, bootstrap []*config.BootstrapPeer, pstore peer.Peerstore, route *dht.IpfsDHT) { - // TODO consider stricter error handling - // TODO consider Criticalf error logging - for _, p := range bootstrap { - if p.PeerID == "" { - log.Criticalf("error: peer does not include PeerID. %v", p) - } - - maddr, err := ma.NewMultiaddr(p.Address) - if err != nil { - log.Error(err) - continue - } - - // setup peer - id, err := peer.DecodePrettyID(p.PeerID) - if err != nil { - log.Criticalf("Bootstrapping error: %v", err) - continue - } - npeer, err := pstore.FindOrCreate(id) - if err != nil { - log.Criticalf("Bootstrapping error: %v", err) - continue - } - npeer.AddAddress(maddr) - - if err := route.Connect(ctx, npeer); err != nil { - log.Criticalf("Bootstrapping error: %v", err) - continue - } - log.Event(ctx, "bootstrap", npeer) - } -} - func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { var err error