Merge pull request #583 from jbenet/bootstrap-fix

add periodic bootstrapping
This commit is contained in:
Juan Batiz-Benet 2015-01-23 05:43:44 -08:00
commit 343940dafa
33 changed files with 1422 additions and 521 deletions

2
Godeps/Godeps.json generated
View File

@ -172,7 +172,7 @@
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "7f96033e206c3cd4e79d1c61cbdfff57869feaf8"
"Rev": "c37725a4a97d6ad772818b071ceef82789562142"
},
{
"ImportPath": "github.com/kr/binarydist",

View File

@ -3,7 +3,7 @@ package main
import (
"os"
"github.com/op/go-logging"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging"
)
var log = logging.MustGetLogger("example")

View File

@ -0,0 +1,11 @@
language: go
go:
- 1.2
- 1.3
- 1.4
- release
- tip
script:
- go test -v ./...

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,5 +1,7 @@
# goprocess - lifecycles in go
[![travisbadge](https://travis-ci.org/jbenet/goprocess.svg)](https://travis-ci.org/jbenet/goprocess)
(Based on https://github.com/jbenet/go-ctxgroup)
- Godoc: https://godoc.org/github.com/jbenet/goprocess

View File

@ -18,7 +18,7 @@ import (
// More specifically, it fits this:
//
// p := WithTeardown(tf) // new process is created, it is now running.
// p.AddChild(q) // can register children **before** Closing.
// p.AddChild(q) // can register children **before** Closed().
// go p.Close() // blocks until done running teardown func.
// <-p.Closing() // would now return true.
// <-p.childrenDone() // wait on all children to be done

View File

@ -1,114 +0,0 @@
// +build ignore
// WARNING: this implementation is not correct.
// here only for historical purposes.
package goprocess
import (
"sync"
)
// process implements Process
type process struct {
children sync.WaitGroup // wait group for child goroutines
teardown TeardownFunc // called to run the teardown logic.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeOnce sync.Once // ensure close is only called once.
closeErr error // error to return to clients of Close()
}
// newProcess constructs and returns a Process.
// It will call tf TeardownFunc exactly once:
// **after** all children have fully Closed,
// **after** entering <-Closing(), and
// **before** <-Closed().
func newProcess(tf TeardownFunc) *process {
if tf == nil {
tf = nilTeardownFunc
}
return &process{
teardown: tf,
closed: make(chan struct{}),
closing: make(chan struct{}),
}
}
func (p *process) WaitFor(q Process) {
p.children.Add(1) // p waits on q to be done
go func(p *process, q Process) {
<-q.Closed() // wait until q is closed
p.children.Done() // p done waiting on q
}(p, q)
}
func (p *process) AddChildNoWait(child Process) {
go func(p, child Process) {
<-p.Closing() // wait until p is closing
child.Close() // close child
}(p, child)
}
func (p *process) AddChild(child Process) {
select {
case <-p.Closing():
panic("attempt to add child to closing or closed process")
default:
}
p.children.Add(1) // p waits on child to be done
go func(p *process, child Process) {
<-p.Closing() // wait until p is closing
child.Close() // close child and wait
p.children.Done() // p done waiting on child
}(p, child)
}
func (p *process) Go(f ProcessFunc) Process {
select {
case <-p.Closing():
panic("attempt to add child to closing or closed process")
default:
}
// this is very similar to AddChild, but also runs the func
// in the child. we replicate it here to save one goroutine.
child := newProcessGoroutines(nil)
child.children.Add(1) // child waits on func to be done
p.AddChild(child)
go func() {
f(child)
child.children.Done() // wait on child's children to be done.
child.Close() // close to tear down.
}()
return child
}
// Close is the external close function.
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.closeOnce.Do(p.doClose)
<-p.Closed() // sync.Once should block, but this checks chan is closed too
return p.closeErr
}
func (p *process) Closing() <-chan struct{} {
return p.closing
}
func (p *process) Closed() <-chan struct{} {
return p.closed
}
// the _actual_ close process.
func (p *process) doClose() {
// this function should only be called once (hence the sync.Once).
// and it will panic (on closing channels) otherwise.
close(p.closing) // signal that we're shutting down (Closing)
p.children.Wait() // wait till all children are done (before teardown)
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
close(p.closed) // signal that we're shut down (Closed)
}

View File

@ -92,16 +92,18 @@ func (p *process) Go(f ProcessFunc) Process {
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.Lock()
defer p.Unlock()
// if already closed, get out.
// if already closing, or closed, get out. (but wait!)
select {
case <-p.Closed():
case <-p.Closing():
p.Unlock()
<-p.Closed()
return p.closeErr
default:
}
p.doClose()
p.Unlock()
return p.closeErr
}
@ -120,12 +122,23 @@ func (p *process) doClose() {
close(p.closing) // signal that we're shutting down (Closing)
for _, c := range p.children {
go c.Close() // force all children to shut down
}
for len(p.children) > 0 || len(p.waitfors) > 0 {
for _, c := range p.children {
go c.Close() // force all children to shut down
}
p.children = nil // clear them
for _, w := range p.waitfors {
<-w.Closed() // wait till all waitfors are fully closed (before teardown)
// we must be careful not to iterate over waitfors directly, as it may
// change under our feet.
wf := p.waitfors
p.waitfors = nil // clear them
for _, w := range wf {
// Here, we wait UNLOCKED, so that waitfors who are in the middle of
// adding a child to us can finish. we will immediately close the child.
p.Unlock()
<-w.Closed() // wait till all waitfors are fully closed (before teardown)
p.Lock()
}
}
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)

View File

@ -0,0 +1,4 @@
# goprocess/periodic - periodic process creation
- goprocess: https://github.com/jbenet/goprocess
- Godoc: https://godoc.org/github.com/jbenet/goprocess/periodic

View File

@ -0,0 +1,85 @@
package periodicproc_test
import (
"fmt"
"time"
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)
func ExampleEvery() {
tock := make(chan struct{})
i := 0
p := periodicproc.Every(time.Second, func(proc goprocess.Process) {
tock <- struct{}{}
fmt.Printf("hello %d\n", i)
i++
})
<-tock
<-tock
<-tock
p.Close()
// Output:
// hello 0
// hello 1
// hello 2
}
func ExampleTick() {
p := periodicproc.Tick(time.Second, func(proc goprocess.Process) {
fmt.Println("tick")
})
<-time.After(3*time.Second + 500*time.Millisecond)
p.Close()
// Output:
// tick
// tick
// tick
}
func ExampleTickGo() {
// with TickGo, execution is not rate limited,
// there can be many in-flight simultaneously
wait := make(chan struct{})
p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) {
fmt.Println("tick")
<-wait
})
<-time.After(3*time.Second + 500*time.Millisecond)
wait <- struct{}{}
wait <- struct{}{}
wait <- struct{}{}
p.Close() // blocks us until all children are closed.
// Output:
// tick
// tick
// tick
}
func ExampleOnSignal() {
sig := make(chan struct{})
p := periodicproc.OnSignal(sig, func(proc goprocess.Process) {
fmt.Println("fire!")
})
sig <- struct{}{}
sig <- struct{}{}
sig <- struct{}{}
p.Close()
// Output:
// fire!
// fire!
// fire!
}

View File

@ -0,0 +1,232 @@
// Package periodic is part of github.com/jbenet/goprocess.
// It provides a simple periodic processor that calls a function
// periodically based on some options.
//
// For example:
//
// // use a time.Duration
// p := periodicproc.Every(time.Second, func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// <-time.After(5*time.Second)
// p.Close()
//
// // use a time.Time channel (like time.Ticker)
// p := periodicproc.Tick(time.Tick(time.Second), func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// <-time.After(5*time.Second)
// p.Close()
//
// // or arbitrary signals
// signal := make(chan struct{})
// p := periodicproc.OnSignal(signal, func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// signal<- struct{}{}
// signal<- struct{}{}
// <-time.After(5 * time.Second)
// signal<- struct{}{}
// p.Close()
//
package periodicproc
import (
"time"
gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
// Every calls the given ProcessFunc at periodic intervals. Internally, it uses
// <-time.After(interval), so it will have the behavior of waiting _at least_
// interval in between calls. If you'd prefer the time.Ticker behavior, use
// periodicproc.Tick instead.
// This is sequentially rate limited, only one call will be in-flight at a time.
func Every(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-time.After(interval):
select {
case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// EveryGo calls the given ProcessFunc at periodic intervals. Internally, it uses
// <-time.After(interval)
// This is not rate limited, multiple calls could be in-flight at the same time.
func EveryGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-time.After(interval):
proc.Go(procfunc)
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// Tick constructs a ticker with interval, and calls the given ProcessFunc every
// time the ticker fires.
// This is sequentially rate limited, only one call will be in-flight at a time.
//
// p := periodicproc.Tick(time.Second, func(proc goprocess.Process) {
// fmt.Println("fire!")
// })
//
// <-time.After(3 * time.Second)
// p.Close()
//
// // Output:
// // fire!
// // fire!
// // fire!
func Tick(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
ticker := time.NewTicker(interval)
callOnTicker(ticker.C, procfunc)(proc)
ticker.Stop()
})
}
// TickGo constructs a ticker with interval, and calls the given ProcessFunc every
// time the ticker fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
//
// p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(10 * time.Second) // will not block sequential execution
// })
//
// <-time.After(3 * time.Second)
// p.Close()
//
// // Output:
// // fire!
// // fire!
// // fire!
func TickGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
ticker := time.NewTicker(interval)
goCallOnTicker(ticker.C, procfunc)(proc)
ticker.Stop()
})
}
// Ticker calls the given ProcessFunc every time the ticker fires.
// This is sequentially rate limited, only one call will be in-flight at a time.
func Ticker(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(callOnTicker(ticker, procfunc))
}
// TickerGo calls the given ProcessFunc every time the ticker fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
func TickerGo(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(goCallOnTicker(ticker, procfunc))
}
func callOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc {
return func(proc gp.Process) {
for {
select {
case <-ticker:
select {
case <-proc.Go(pf).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
}
}
func goCallOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc {
return func(proc gp.Process) {
for {
select {
case <-ticker:
proc.Go(pf)
case <-proc.Closing(): // we're told to close
return
}
}
}
}
// OnSignal calls the given ProcessFunc every time the signal fires, and waits for it to exit.
// This is sequentially rate limited, only one call will be in-flight at a time.
//
// sig := make(chan struct{})
// p := periodicproc.OnSignal(sig, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(time.Second) // delays sequential execution by 1 second
// })
//
// sig<- struct{}
// sig<- struct{}
// sig<- struct{}
//
// // Output:
// // fire!
// // fire!
// // fire!
func OnSignal(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-sig:
select {
case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// OnSignalGo calls the given ProcessFunc every time the signal fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
//
// sig := make(chan struct{})
// p := periodicproc.OnSignalGo(sig, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(time.Second) // wont block execution
// })
//
// sig<- struct{}
// sig<- struct{}
// sig<- struct{}
//
// // Output:
// // fire!
// // fire!
// // fire!
func OnSignalGo(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-sig:
proc.Go(procfunc)
case <-proc.Closing(): // we're told to close
return
}
}
})
}

View File

@ -0,0 +1,260 @@
package periodicproc
import (
"testing"
"time"
ci "github.com/jbenet/go-cienv"
gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
var (
grace = time.Millisecond * 5
interval = time.Millisecond * 10
timeout = time.Second * 5
)
func init() {
if ci.IsRunning() {
grace = time.Millisecond * 500
interval = time.Millisecond * 1000
timeout = time.Second * 15
}
}
func between(min, diff, max time.Duration) bool {
return min <= diff && diff <= max
}
func testBetween(t *testing.T, min, diff, max time.Duration) {
if !between(min, diff, max) {
t.Error("time diff incorrect:", min, diff, max)
}
}
type intervalFunc func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process)
func testSeq(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
p := toTest(times, nil)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
last = next
}
go p.Close()
select {
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testSeqWait(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
<-time.After(interval * 2) // make it wait.
last = time.Now() // make it now (sequential)
wait <- struct{}{} // release it.
}
go p.Close()
select {
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testSeqNoWait(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, 0, next.Sub(last), interval+grace) // min of 0
<-time.After(interval * 2) // make it wait.
last = time.Now() // make it now (sequential)
wait <- struct{}{} // release it.
}
go p.Close()
end:
select {
case wait <- struct{}{}: // drain any extras.
goto end
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testParallel(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
last = next
<-time.After(interval * 2) // make it wait.
wait <- struct{}{} // release it.
}
go p.Close()
end:
select {
case wait <- struct{}{}: // drain any extras.
goto end
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func TestEverySeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Every(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestEverySeqWait(t *testing.T) {
testSeqWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Every(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestEveryGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return EveryGo(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestEveryGoSeqParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return EveryGo(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Tick(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickSeqNoWait(t *testing.T) {
testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Tick(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickGo(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickGoSeqParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickGo(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickerSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Ticker(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickerSeqNoWait(t *testing.T) {
testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Ticker(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickerGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickerGo(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickerGoParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickerGo(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}

View File

@ -2,6 +2,9 @@ package core
import (
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"sync"
"time"
@ -16,119 +19,213 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)
const (
period = 30 * time.Second // how often to check connection status
connectiontimeout time.Duration = period / 3 // duration to wait when attempting to connect
recoveryThreshold = 4 // attempt to bootstrap if connection count falls below this value
numDHTBootstrapQueries = 15 // number of DHT queries to execute
)
// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap
// peers to bootstrap correctly.
var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap")
func superviseConnections(parent context.Context,
h host.Host,
route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes
store peer.Peerstore,
peers []peer.PeerInfo) error {
// BootstrapConfig specifies parameters used in an IpfsNode's network
// bootstrapping process.
type BootstrapConfig struct {
for {
ctx, _ := context.WithTimeout(parent, connectiontimeout)
// TODO get config from disk so |peers| always reflects the latest
// information
if err := bootstrap(ctx, h, route, store, peers); err != nil {
log.Error(err)
}
select {
case <-parent.Done():
return parent.Err()
case <-time.Tick(period):
}
}
return nil
// MinPeerThreshold governs whether to bootstrap more connections. If the
// node has less open connections than this number, it will open connections
// to the bootstrap nodes. From there, the routing system should be able
// to use the connections to the bootstrap nodes to connect to even more
// peers. Routing systems like the IpfsDHT do so in their own Bootstrap
// process, which issues random queries to find more peers.
MinPeerThreshold int
// Period governs the periodic interval at which the node will
// attempt to bootstrap. The bootstrap process is not very expensive, so
// this threshold can afford to be small (<=30s).
Period time.Duration
// ConnectionTimeout determines how long to wait for a bootstrap
// connection attempt before cancelling it.
ConnectionTimeout time.Duration
// BootstrapPeers is a function that returns a set of bootstrap peers
// for the bootstrap process to use. This makes it possible for clients
// to control the peers the process uses at any moment.
BootstrapPeers func() []peer.PeerInfo
}
func bootstrap(ctx context.Context,
h host.Host,
r *dht.IpfsDHT,
ps peer.Peerstore,
bootstrapPeers []peer.PeerInfo) error {
// DefaultBootstrapConfig specifies default sane parameters for bootstrapping.
var DefaultBootstrapConfig = BootstrapConfig{
MinPeerThreshold: 4,
Period: 30 * time.Second,
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
}
connectedPeers := h.Network().Peers()
if len(connectedPeers) >= recoveryThreshold {
log.Event(ctx, "bootstrapSkip", h.ID())
log.Debugf("%s bootstrap skipped -- connected to %d (> %d) nodes",
h.ID(), len(connectedPeers), recoveryThreshold)
func BootstrapConfigWithPeers(pis []peer.PeerInfo) BootstrapConfig {
cfg := DefaultBootstrapConfig
cfg.BootstrapPeers = func() []peer.PeerInfo {
return pis
}
return cfg
}
// Bootstrap kicks off IpfsNode bootstrapping. This function will periodically
// check the number of open connections and -- if there are too few -- initiate
// connections to well-known bootstrap peers. It also kicks off subsystem
// bootstrapping (i.e. routing).
func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
// TODO what bootstrapping should happen if there is no DHT? i.e. we could
// continue connecting to our bootstrap peers, but for what purpose? for now
// simply exit without connecting to any of them. When we introduce another
// routing system that uses bootstrap peers we can change this.
thedht, ok := n.Routing.(*dht.IpfsDHT)
if !ok {
return ioutil.NopCloser(nil), nil
}
// the periodic bootstrap function -- the connection supervisor
periodic := func(worker goprocess.Process) {
ctx := procctx.WithProcessClosing(context.Background(), worker)
defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done()
if err := bootstrapRound(ctx, n.PeerHost, thedht, n.Peerstore, cfg); err != nil {
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", n.Identity, err)
}
}
// kick off the node's periodic bootstrapping
proc := periodicproc.Tick(cfg.Period, periodic)
proc.Go(periodic) // run one right now.
// kick off dht bootstrapping.
dbproc, err := thedht.Bootstrap(dht.DefaultBootstrapConfig)
if err != nil {
proc.Close()
return nil, err
}
// add dht bootstrap proc as a child, so it is closed automatically when we are.
proc.AddChild(dbproc)
return proc, nil
}
func bootstrapRound(ctx context.Context,
host host.Host,
route *dht.IpfsDHT,
peerstore peer.Peerstore,
cfg BootstrapConfig) error {
ctx, _ = context.WithTimeout(ctx, cfg.ConnectionTimeout)
id := host.ID()
// get bootstrap peers from config. retrieving them here makes
// sure we remain observant of changes to client configuration.
peers := cfg.BootstrapPeers()
// determine how many bootstrap connections to open
connected := host.Network().Peers()
if len(connected) >= cfg.MinPeerThreshold {
log.Event(ctx, "bootstrapSkip", id)
log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes",
id, len(connected), cfg.MinPeerThreshold)
return nil
}
numCxnsToCreate := recoveryThreshold - len(connectedPeers)
log.Event(ctx, "bootstrapStart", h.ID())
log.Debugf("%s bootstrapping to %d more nodes", h.ID(), numCxnsToCreate)
numToDial := cfg.MinPeerThreshold - len(connected)
// filter out bootstrap nodes we are already connected to
var notConnected []peer.PeerInfo
for _, p := range bootstrapPeers {
if h.Network().Connectedness(p.ID) != inet.Connected {
for _, p := range peers {
if host.Network().Connectedness(p.ID) != inet.Connected {
notConnected = append(notConnected, p)
}
}
// if not connected to all bootstrap peer candidates
if len(notConnected) > 0 {
var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate)
log.Debugf("%s bootstrapping to %d nodes: %s", h.ID(), numCxnsToCreate, randomSubset)
if err := connect(ctx, ps, r, randomSubset); err != nil {
log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", h.ID(), err)
return err
}
// if connected to all bootstrap peer candidates, exit
if len(notConnected) < 1 {
log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial)
return ErrNotEnoughBootstrapPeers
}
// we can try running dht bootstrap even if we're connected to all bootstrap peers.
if len(h.Network().Conns()) > 0 {
if err := r.Bootstrap(ctx, numDHTBootstrapQueries); err != nil {
// log this as Info. later on, discern better between errors.
log.Infof("dht bootstrap err: %s", err)
return nil
}
// connect to a random susbset of bootstrap candidates
randSubset := randomSubsetOfPeers(notConnected, numToDial)
defer log.EventBegin(ctx, "bootstrapStart", id).Done()
log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset)
if err := bootstrapConnect(ctx, peerstore, route, randSubset); err != nil {
return err
}
return nil
}
func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error {
func bootstrapConnect(ctx context.Context,
ps peer.Peerstore,
route *dht.IpfsDHT,
peers []peer.PeerInfo) error {
if len(peers) < 1 {
return errors.New("bootstrap set empty")
return ErrNotEnoughBootstrapPeers
}
errs := make(chan error, len(peers))
var wg sync.WaitGroup
for _, p := range peers {
// performed asynchronously because when performed synchronously, if
// one `Connect` call hangs, subsequent calls are more likely to
// fail/abort due to an expiring context.
// Also, performed asynchronously for dial speed.
wg.Add(1)
go func(p peer.PeerInfo) {
defer wg.Done()
log.Event(ctx, "bootstrapDial", r.LocalPeer(), p.ID)
log.Debugf("%s bootstrapping to %s", r.LocalPeer(), p.ID)
defer log.EventBegin(ctx, "bootstrapDial", route.LocalPeer(), p.ID).Done()
log.Debugf("%s bootstrapping to %s", route.LocalPeer(), p.ID)
ps.AddAddresses(p.ID, p.Addrs)
err := r.Connect(ctx, p.ID)
err := route.Connect(ctx, p.ID)
if err != nil {
log.Event(ctx, "bootstrapFailed", p.ID)
log.Criticalf("failed to bootstrap with %v: %s", p.ID, err)
log.Event(ctx, "bootstrapDialFailed", p.ID)
log.Errorf("failed to bootstrap with %v: %s", p.ID, err)
errs <- err
return
}
log.Event(ctx, "bootstrapSuccess", p.ID)
log.Event(ctx, "bootstrapDialSuccess", p.ID)
log.Infof("bootstrapped with %v", p.ID)
}(p)
}
wg.Wait()
// our failure condition is when no connection attempt succeeded.
// So drain the errs channel, counting the results.
close(errs)
count := 0
var err error
for err = range errs {
if err != nil {
count++
}
}
if count == len(peers) {
return fmt.Errorf("failed to bootstrap. %s", err)
}
return nil
}
func toPeer(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) {
func toPeerInfos(bpeers []config.BootstrapPeer) ([]peer.PeerInfo, error) {
var peers []peer.PeerInfo
for _, bootstrap := range bpeers {
p, err := toPeerInfo(bootstrap)
if err != nil {
return nil, err
}
peers = append(peers, p)
}
return peers, nil
}
func toPeerInfo(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) {
id, err := peer.IDB58Decode(bootstrap.PeerID)
if err != nil {
return

View File

@ -36,6 +36,8 @@ type DiagnosticOutput struct {
Peers []DiagnosticPeer
}
var DefaultDiagnosticTimeout = time.Second * 20
var DiagCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Generates diagnostic reports",
@ -57,6 +59,7 @@ connected peers and latencies between them.
},
Options: []cmds.Option{
cmds.StringOption("timeout", "diagnostic timeout duration"),
cmds.StringOption("vis", "output vis. one of: "+strings.Join(visFmts, ", ")),
},
@ -75,7 +78,20 @@ connected peers and latencies between them.
return nil, err
}
info, err := n.Diagnostics.GetDiagnostic(time.Second * 20)
timeoutS, _, err := req.Option("timeout").String()
if err != nil {
return nil, err
}
timeout := DefaultDiagnosticTimeout
if timeoutS != "" {
t, err := time.ParseDuration(timeoutS)
if err != nil {
return nil, cmds.ClientError("error parsing timeout")
}
timeout = t
}
info, err := n.Diagnostics.GetDiagnostic(timeout)
if err != nil {
return nil, err
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"path"
"sort"
cmds "github.com/jbenet/go-ipfs/commands"
peer "github.com/jbenet/go-ipfs/p2p/peer"
@ -64,6 +65,7 @@ ipfs swarm peers lists the set of peers this node is connected to.
addrs[i] = fmt.Sprintf("%s/%s", addr, pid.Pretty())
}
sort.Sort(sort.StringSlice(addrs))
return &stringList{addrs}, nil
},
Marshalers: cmds.MarshalerMap{

View File

@ -11,33 +11,36 @@ import (
datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bserv "github.com/jbenet/go-ipfs/blockservice"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
diag "github.com/jbenet/go-ipfs/diagnostics"
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
offline "github.com/jbenet/go-ipfs/exchange/offline"
rp "github.com/jbenet/go-ipfs/exchange/reprovide"
mount "github.com/jbenet/go-ipfs/fuse/mount"
merkledag "github.com/jbenet/go-ipfs/merkledag"
namesys "github.com/jbenet/go-ipfs/namesys"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
p2phost "github.com/jbenet/go-ipfs/p2p/host"
p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic"
swarm "github.com/jbenet/go-ipfs/p2p/net/swarm"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
peer "github.com/jbenet/go-ipfs/p2p/peer"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
offroute "github.com/jbenet/go-ipfs/routing/offline"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bserv "github.com/jbenet/go-ipfs/blockservice"
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
offline "github.com/jbenet/go-ipfs/exchange/offline"
rp "github.com/jbenet/go-ipfs/exchange/reprovide"
mount "github.com/jbenet/go-ipfs/fuse/mount"
merkledag "github.com/jbenet/go-ipfs/merkledag"
namesys "github.com/jbenet/go-ipfs/namesys"
path "github.com/jbenet/go-ipfs/path"
pin "github.com/jbenet/go-ipfs/pin"
repo "github.com/jbenet/go-ipfs/repo"
config "github.com/jbenet/go-ipfs/repo/config"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
offroute "github.com/jbenet/go-ipfs/routing/offline"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
)
const IpnsValidatorTag = "ipns"
@ -75,13 +78,14 @@ type IpfsNode struct {
Resolver *path.Resolver // the path resolution system
// Online
PrivateKey ic.PrivKey // the local node's private Key
PeerHost p2phost.Host // the network host (server+client)
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Diagnostics *diag.Diagnostics // the diagnostics service
Reprovider *rp.Reprovider // the value reprovider system
PrivateKey ic.PrivKey // the local node's private Key
PeerHost p2phost.Host // the network host (server+client)
Bootstrapper io.Closer // the periodic bootstrapper
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Diagnostics *diag.Diagnostics // the diagnostics service
Reprovider *rp.Reprovider // the value reprovider system
ctxgroup.ContextGroup
@ -235,29 +239,10 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error {
// TODO implement an offline namesys that serves only local names.
n.Namesys = namesys.NewNameSystem(n.Routing)
// TODO consider moving connection supervision into the Network. We've
// discussed improvements to this Node constructor. One improvement
// would be to make the node configurable, allowing clients to inject
// an Exchange, Network, or Routing component and have the constructor
// manage the wiring. In that scenario, this dangling function is a bit
// awkward.
var bootstrapPeers []peer.PeerInfo
for _, bootstrap := range n.Repo.Config().Bootstrap {
p, err := toPeer(bootstrap)
if err != nil {
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", n.Identity, err)
return err
}
bootstrapPeers = append(bootstrapPeers, p)
}
go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, bootstrapPeers)
n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency)
return nil
return n.Bootstrap(DefaultBootstrapConfig)
}
// teardown closes owned children. If any errors occur, this function returns
@ -266,20 +251,20 @@ func (n *IpfsNode) teardown() error {
// owned objects are closed in this teardown to ensure that they're closed
// regardless of which constructor was used to add them to the node.
var closers []io.Closer
if n.Repo != nil {
closers = append(closers, n.Repo)
}
if n.Blocks != nil {
closers = append(closers, n.Blocks)
}
if n.Routing != nil {
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht)
addCloser := func(c io.Closer) {
if c != nil {
closers = append(closers, c)
}
}
if n.PeerHost != nil {
closers = append(closers, n.PeerHost)
addCloser(n.Bootstrapper)
addCloser(n.Repo)
addCloser(n.Blocks)
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
addCloser(dht)
}
addCloser(n.PeerHost)
var errs []error
for _, closer := range closers {
if err := closer.Close(); err != nil {
@ -305,17 +290,34 @@ func (n *IpfsNode) Resolve(path string) (*merkledag.Node, error) {
return n.Resolver.ResolvePath(path)
}
// Bootstrap is undefined when node is not in OnlineMode
func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error {
func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error {
// TODO what should return value be when in offlineMode?
if n.Routing == nil {
return nil
}
if n.Routing != nil {
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
return bootstrap(ctx, n.PeerHost, dht, n.Peerstore, peers)
if n.Bootstrapper != nil {
n.Bootstrapper.Close() // stop previous bootstrap process.
}
// if the caller did not specify a bootstrap peer function, get the
// freshest bootstrap peers from config. this responds to live changes.
if cfg.BootstrapPeers == nil {
cfg.BootstrapPeers = func() []peer.PeerInfo {
bpeers := n.Repo.Config().Bootstrap
ps, err := toPeerInfos(bpeers)
if err != nil {
log.Error("failed to parse bootstrap peers from config: %s", bpeers)
return nil
}
return ps
}
}
return nil
var err error
n.Bootstrapper, err = Bootstrap(n, cfg)
return err
}
func (n *IpfsNode) loadID() error {

View File

@ -4,10 +4,9 @@
package diagnostics
import (
"bytes"
"encoding/json"
"errors"
"io"
"fmt"
"sync"
"time"
@ -16,6 +15,7 @@ import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ctxutil "github.com/jbenet/go-ipfs/util/ctx"
host "github.com/jbenet/go-ipfs/p2p/host"
inet "github.com/jbenet/go-ipfs/p2p/net"
@ -31,7 +31,10 @@ var log = util.Logger("diagnostics")
// ProtocolDiag is the diagnostics protocol.ID
var ProtocolDiag protocol.ID = "/ipfs/diagnostics"
var ErrAlreadyRunning = errors.New("diagnostic with that ID already running")
const ResponseTimeout = time.Second * 10
const HopTimeoutDecrement = time.Second * 2
// Diagnostics is a net service that manages requesting and responding to diagnostic
// requests
@ -148,62 +151,113 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error)
peers := d.getPeers()
log.Debugf("Sending diagnostic request to %d peers.", len(peers))
var out []*DiagInfo
di := d.getDiagInfo()
out = append(out, di)
pmes := newMessage(diagID)
respdata := make(chan []byte)
sends := 0
for p, _ := range peers {
log.Debugf("Sending getDiagnostic to: %s", p)
sends++
go func(p peer.ID) {
data, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Errorf("GetDiagnostic error: %v", err)
respdata <- nil
return
}
respdata <- data
}(p)
pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement) // decrease timeout per hop
dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes)
if err != nil {
return nil, fmt.Errorf("diagnostic from peers err: %s", err)
}
for i := 0; i < sends; i++ {
data := <-respdata
if data == nil {
continue
}
out = appendDiagnostics(data, out)
di := d.getDiagInfo()
out := []*DiagInfo{di}
for dpi := range dpeers {
out = append(out, dpi)
}
return out, nil
}
func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
buf := bytes.NewBuffer(data)
dec := json.NewDecoder(buf)
for {
di := new(DiagInfo)
err := dec.Decode(di)
if err != nil {
if err != io.EOF {
log.Errorf("error decoding DiagInfo: %v", err)
}
break
}
cur = append(cur, di)
}
return cur
}
// TODO: this method no longer needed.
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, mes *pb.Message) ([]byte, error) {
rpmes, err := d.sendRequest(ctx, p, mes)
func decodeDiagJson(data []byte) (*DiagInfo, error) {
di := new(DiagInfo)
err := json.Unmarshal(data, di)
if err != nil {
return nil, err
}
return rpmes.GetData(), nil
return di, nil
}
func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) (<-chan *DiagInfo, error) {
respdata := make(chan *DiagInfo)
wg := sync.WaitGroup{}
for p, _ := range peers {
wg.Add(1)
log.Debugf("Sending diagnostic request to peer: %s", p)
go func(p peer.ID) {
defer wg.Done()
out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Errorf("Error getting diagnostic from %s: %s", p, err)
return
}
for d := range out {
respdata <- d
}
}(p)
}
go func() {
wg.Wait()
close(respdata)
}()
return respdata, nil
}
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (<-chan *DiagInfo, error) {
s, err := d.host.NewStream(ProtocolDiag, p)
if err != nil {
return nil, err
}
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw)
start := time.Now()
if err := w.WriteMsg(pmes); err != nil {
return nil, err
}
out := make(chan *DiagInfo)
go func() {
defer func() {
close(out)
s.Close()
rtt := time.Since(start)
log.Infof("diagnostic request took: %s", rtt.String())
}()
for {
rpmes := new(pb.Message)
if err := r.ReadMsg(rpmes); err != nil {
log.Errorf("Error reading diagnostic from stream: %s", err)
return
}
if rpmes == nil {
log.Error("Got no response back from diag request.")
return
}
di, err := decodeDiagJson(rpmes.GetData())
if err != nil {
log.Error(err)
return
}
select {
case out <- di:
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func newMessage(diagID string) *pb.Message {
@ -212,89 +266,12 @@ func newMessage(diagID string) *pb.Message {
return pmes
}
func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
s, err := d.host.NewStream(ProtocolDiag, p)
if err != nil {
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
start := time.Now()
if err := w.WriteMsg(pmes); err != nil {
return nil, err
}
rpmes := new(pb.Message)
if err := r.ReadMsg(rpmes); err != nil {
return nil, err
}
if rpmes == nil {
return nil, errors.New("no response to request")
}
rtt := time.Since(start)
log.Infof("diagnostic request took: %s", rtt.String())
return rpmes, nil
}
func (d *Diagnostics) handleDiagnostic(p peer.ID, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("HandleDiagnostic from %s for id = %s", p, util.Key(pmes.GetDiagID()).B58String())
resp := newMessage(pmes.GetDiagID())
// Make sure we havent already handled this request to prevent loops
d.diagLock.Lock()
_, found := d.diagMap[pmes.GetDiagID()]
if found {
d.diagLock.Unlock()
return resp, nil
}
d.diagMap[pmes.GetDiagID()] = time.Now()
d.diagLock.Unlock()
buf := new(bytes.Buffer)
di := d.getDiagInfo()
buf.Write(di.Marshal())
ctx, _ := context.WithTimeout(context.TODO(), ResponseTimeout)
respdata := make(chan []byte)
sendcount := 0
for p, _ := range d.getPeers() {
log.Debugf("Sending diagnostic request to peer: %s", p)
sendcount++
go func(p peer.ID) {
out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Errorf("getDiagnostic error: %v", err)
respdata <- nil
return
}
respdata <- out
}(p)
}
for i := 0; i < sendcount; i++ {
out := <-respdata
_, err := buf.Write(out)
if err != nil {
log.Errorf("getDiagnostic write output error: %v", err)
continue
}
}
resp.Data = buf.Bytes()
return resp, nil
}
func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error {
r := ggio.NewDelimitedReader(s, 32768) // maxsize
w := ggio.NewDelimitedWriter(s)
cr := ctxutil.NewReader(ctx, s)
cw := ctxutil.NewWriter(ctx, s)
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) // maxsize
w := ggio.NewDelimitedWriter(cw)
// deserialize msg
pmes := new(pb.Message)
@ -307,25 +284,51 @@ func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error {
log.Infof("[peer: %s] Got message from [%s]\n",
d.self.Pretty(), s.Conn().RemotePeer())
// dispatch handler.
p := s.Conn().RemotePeer()
rpmes, err := d.handleDiagnostic(p, pmes)
// Make sure we havent already handled this request to prevent loops
if err := d.startDiag(pmes.GetDiagID()); err != nil {
return nil
}
resp := newMessage(pmes.GetDiagID())
resp.Data = d.getDiagInfo().Marshal()
if err := w.WriteMsg(resp); err != nil {
log.Errorf("Failed to write protobuf message over stream: %s", err)
return err
}
timeout := pmes.GetTimeoutDuration()
if timeout < HopTimeoutDecrement {
return fmt.Errorf("timeout too short: %s", timeout)
}
ctx, _ = context.WithTimeout(ctx, timeout)
pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement)
dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes)
if err != nil {
log.Errorf("handleDiagnostic error: %s", err)
return nil
log.Errorf("diagnostic from peers err: %s", err)
return err
}
for b := range dpeers {
resp := newMessage(pmes.GetDiagID())
resp.Data = b.Marshal()
if err := w.WriteMsg(resp); err != nil {
log.Errorf("Failed to write protobuf message over stream: %s", err)
return err
}
}
// if nil response, return it before serializing
if rpmes == nil {
return nil
}
return nil
}
// serialize + send response msg
if err := w.WriteMsg(rpmes); err != nil {
log.Errorf("Failed to encode protobuf message: %v", err)
return nil
func (d *Diagnostics) startDiag(id string) error {
d.diagLock.Lock()
_, found := d.diagMap[id]
if found {
d.diagLock.Unlock()
return ErrAlreadyRunning
}
d.diagMap[id] = time.Now()
d.diagLock.Unlock()
return nil
}

View File

@ -14,15 +14,18 @@ It has these top-level messages:
package diagnostics_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type Message struct {
DiagID *string `protobuf:"bytes,1,req" json:"DiagID,omitempty"`
Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"`
Timeout *int64 `protobuf:"varint,3,opt" json:"Timeout,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -44,5 +47,12 @@ func (m *Message) GetData() []byte {
return nil
}
func (m *Message) GetTimeout() int64 {
if m != nil && m.Timeout != nil {
return *m.Timeout
}
return 0
}
func init() {
}

View File

@ -3,4 +3,5 @@ package diagnostics.pb;
message Message {
required string DiagID = 1;
optional bytes Data = 2;
optional int64 Timeout = 3; // in nanoseconds
}

View File

@ -0,0 +1,14 @@
package diagnostics_pb
import (
"time"
)
func (m *Message) GetTimeoutDuration() time.Duration {
return time.Duration(m.GetTimeout())
}
func (m *Message) SetTimeoutDuration(t time.Duration) {
it := int64(t)
m.Timeout = &it
}

View File

@ -30,7 +30,10 @@ func NewReprovider(rsys routing.IpfsRouting, bstore blocks.Blockstore) *Reprovid
}
func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) {
after := time.After(0)
// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
for {
select {
case <-ctx.Done():

View File

@ -147,6 +147,11 @@ func reuseErrShouldRetry(err error) bool {
return false // hey, it worked! no need to retry.
}
// if it's a network timeout error, it's a legitimate failure.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
return true
}
errno, ok := err.(syscall.Errno)
if !ok { // not an errno? who knows what this is. retry.
return true

View File

@ -118,8 +118,8 @@ func (ids *IDService) ResponseHandler(s inet.Stream) {
r := ggio.NewDelimitedReader(s, 2048)
mes := pb.Identify{}
if err := r.ReadMsg(&mes); err != nil {
log.Errorf("%s error receiving message from %s %s", ID,
c.RemotePeer(), c.RemoteMultiaddr())
log.Errorf("%s error receiving message from %s %s %s", ID,
c.RemotePeer(), c.RemoteMultiaddr(), err)
return
}
ids.consumeMessage(&mes, c)

View File

@ -115,7 +115,7 @@ func (m *Mux) HandleSync(s inet.Stream) {
return
}
log.Infof("muxer handle protocol: %s", name)
log.Infof("muxer handle protocol %s: %s", s.Conn().RemotePeer(), name)
handler(s)
}

View File

@ -32,7 +32,7 @@ func loadIndirPin(d ds.Datastore, k ds.Key) (*indirectPin, error) {
keys = append(keys, k)
refcnt[k] = v
}
log.Debugf("indirPin keys: %#v", keys)
// log.Debugf("indirPin keys: %#v", keys)
return &indirectPin{blockset: set.SimpleSetFromKeys(keys), refCounts: refcnt}, nil
}

View File

@ -370,66 +370,3 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
}
}
}
// Bootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error {
var merr u.MultiErr
randomID := func() peer.ID {
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id := make([]byte, 16)
rand.Read(id)
return peer.ID(id)
}
// bootstrap sequentially, as results will compound
runQuery := func(ctx context.Context, id peer.ID) {
p, err := dht.FindPeer(ctx, id)
if err == routing.ErrNotFound {
// this isn't an error. this is precisely what we expect.
} else if err != nil {
merr = append(merr, err)
} else {
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
log.Errorf("%s", err)
merr = append(merr, err)
}
}
sequential := true
if sequential {
// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for i := 0; i < queries; i++ {
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
runQuery(ctx, id)
}
} else {
// note on parallelism here: the context is passed in to the queries, so they
// **should** exit when it exceeds, making this function exit on ctx cancel.
// normally, we should be selecting on ctx.Done() here too, but this gets
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
var wg sync.WaitGroup
for i := 0; i < queries; i++ {
wg.Add(1)
go func() {
defer wg.Done()
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
runQuery(ctx, id)
}()
}
wg.Wait()
}
if len(merr) > 0 {
return merr
}
return nil
}

View File

@ -0,0 +1,158 @@
// Package dht implements a distributed hash table that satisfies the ipfs routing
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
package dht
import (
"crypto/rand"
"fmt"
"sync"
"time"
peer "github.com/jbenet/go-ipfs/p2p/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)
// BootstrapConfig specifies parameters used bootstrapping the DHT.
//
// Note there is a tradeoff between the bootstrap period and the
// number of queries. We could support a higher period with less
// queries.
type BootstrapConfig struct {
Queries int // how many queries to run per period
Period time.Duration // how often to run periodi cbootstrap.
Timeout time.Duration // how long to wait for a bootstrao query to run
}
var DefaultBootstrapConfig = BootstrapConfig{
// For now, this is set to 1 query.
// We are currently more interested in ensuring we have a properly formed
// DHT than making sure our dht minimizes traffic. Once we are more certain
// of our implementation's robustness, we should lower this down to 8 or 4.
Queries: 1,
// For now, this is set to 10 seconds, which is an aggressive period. We are
// We are currently more interested in ensuring we have a properly formed
// DHT than making sure our dht minimizes traffic. Once we are more certain
// implementation's robustness, we should lower this down to 30s or 1m.
Period: time.Duration(20 * time.Second),
Timeout: time.Duration(20 * time.Second),
}
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// Bootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) Bootstrap(config BootstrapConfig) (goprocess.Process, error) {
sig := time.Tick(config.Period)
return dht.BootstrapOnSignal(config, sig)
}
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
if cfg.Queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
if signal == nil {
return nil, fmt.Errorf("invalid signal: %v", signal)
}
proc := periodicproc.Ticker(signal, func(worker goprocess.Process) {
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?
ctx := dht.Context()
if err := dht.runBootstrap(ctx, cfg); err != nil {
log.Error(err)
// A bootstrapping error is important to notice but not fatal.
}
})
return proc, nil
}
// runBootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
bslog := func(msg string) {
log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size())
}
bslog("start")
defer bslog("end")
defer log.EventBegin(ctx, "dhtRunBootstrap").Done()
var merr u.MultiErr
randomID := func() peer.ID {
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id := make([]byte, 16)
rand.Read(id)
id = u.Hash(id)
return peer.ID(id)
}
// bootstrap sequentially, as results will compound
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
defer cancel()
runQuery := func(ctx context.Context, id peer.ID) {
p, err := dht.FindPeer(ctx, id)
if err == routing.ErrNotFound {
// this isn't an error. this is precisely what we expect.
} else if err != nil {
merr = append(merr, err)
} else {
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
log.Errorf("%s", err)
merr = append(merr, err)
}
}
sequential := true
if sequential {
// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for i := 0; i < cfg.Queries; i++ {
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
runQuery(ctx, id)
}
} else {
// note on parallelism here: the context is passed in to the queries, so they
// **should** exit when it exceeds, making this function exit on ctx cancel.
// normally, we should be selecting on ctx.Done() here too, but this gets
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
var wg sync.WaitGroup
for i := 0; i < cfg.Queries; i++ {
wg.Add(1)
go func() {
defer wg.Done()
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
runQuery(ctx, id)
}()
}
wg.Wait()
}
if len(merr) > 0 {
return merr
}
return nil
}

View File

@ -75,25 +75,22 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
ctx, cancel := context.WithCancel(ctx)
log.Debugf("bootstrapping dhts...")
rounds := 1
// tried async. sequential fares much better. compare:
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
// probably because results compound
for i := 0; i < rounds; i++ {
log.Debugf("bootstrapping round %d/%d\n", i, rounds)
var cfg BootstrapConfig
cfg = DefaultBootstrapConfig
cfg.Queries = 3
// tried async. sequential fares much better. compare:
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
// probably because results compound
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
log.Debugf("bootstrapping round %d/%d -- %s\n", i, rounds, dht.self)
dht.Bootstrap(ctx, 3)
}
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.runBootstrap(ctx, cfg)
}
cancel()
}
@ -235,6 +232,53 @@ func TestProvides(t *testing.T) {
}
}
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
// test "well-formed-ness" (>= minPeers peers in every routing table)
checkTables := func() bool {
totalPeers := 0
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
totalPeers += rtlen
if minPeers > 0 && rtlen < minPeers {
t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
return false
}
}
actualAvgPeers := totalPeers / len(dhts)
t.Logf("avg rt size: %d", actualAvgPeers)
if avgPeers > 0 && actualAvgPeers < avgPeers {
t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
return false
}
return true
}
timeoutA := time.After(timeout)
for {
select {
case <-timeoutA:
log.Error("did not reach well-formed routing tables by %s", timeout)
return false // failed
case <-time.After(5 * time.Millisecond):
if checkTables() {
return true // succeeded
}
}
}
}
func printRoutingTables(dhts []*IpfsDHT) {
// the routing tables should be full now. let's inspect them.
fmt.Println("checking routing table of %d", len(dhts))
for _, dht := range dhts {
fmt.Printf("checking routing table of %s\n", dht.self)
dht.routingTable.Print()
fmt.Println("")
}
}
func TestBootstrap(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
@ -258,38 +302,109 @@ func TestBootstrap(t *testing.T) {
}
<-time.After(100 * time.Millisecond)
t.Logf("bootstrapping them so they find each other", nDHTs)
ctxT, _ := context.WithTimeout(ctx, 5*time.Second)
bootstrap(t, ctxT, dhts)
// bootstrap a few times until we get good tables.
stop := make(chan struct{})
go func() {
for {
t.Logf("bootstrapping them so they find each other", nDHTs)
ctxT, _ := context.WithTimeout(ctx, 5*time.Second)
bootstrap(t, ctxT, dhts)
select {
case <-time.After(50 * time.Millisecond):
continue // being explicit
case <-stop:
return
}
}
}()
waitForWellFormedTables(t, dhts, 7, 10, 5*time.Second)
close(stop)
if u.Debug {
// the routing tables should be full now. let's inspect them.
<-time.After(5 * time.Second)
t.Logf("checking routing table of %d", nDHTs)
for _, dht := range dhts {
fmt.Printf("checking routing table of %s\n", dht.self)
dht.routingTable.Print()
fmt.Println("")
printRoutingTables(dhts)
}
}
func TestPeriodicBootstrap(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
nDHTs := 30
_, _, dhts := setupDHTS(ctx, nDHTs, t)
defer func() {
for i := 0; i < nDHTs; i++ {
dhts[i].Close()
defer dhts[i].host.Close()
}
}()
// signal amplifier
amplify := func(signal chan time.Time, other []chan time.Time) {
for t := range signal {
for _, s := range other {
s <- t
}
}
for _, s := range other {
close(s)
}
}
// test "well-formed-ness" (>= 3 peers in every routing table)
avgsize := 0
signal := make(chan time.Time)
allSignals := []chan time.Time{}
var cfg BootstrapConfig
cfg = DefaultBootstrapConfig
cfg.Queries = 5
// kick off periodic bootstrappers with instrumented signals.
for _, dht := range dhts {
s := make(chan time.Time)
allSignals = append(allSignals, s)
dht.BootstrapOnSignal(cfg, s)
}
go amplify(signal, allSignals)
t.Logf("dhts are not connected.", nDHTs)
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
avgsize += rtlen
t.Logf("routing table for %s has %d peers", dht.self, rtlen)
if rtlen < 4 {
// currently, we dont have good bootstrapping guarantees.
// t.Errorf("routing table for %s only has %d peers", dht.self, rtlen)
if rtlen > 0 {
t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
}
}
avgsize = avgsize / len(dhts)
avgsizeExpected := 6
t.Logf("avg rt size: %d", avgsize)
if avgsize < avgsizeExpected {
t.Errorf("avg rt size: %d < %d", avgsize, avgsizeExpected)
for i := 0; i < nDHTs; i++ {
connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
}
t.Logf("dhts are now connected to 1-2 others.", nDHTs)
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
if rtlen > 2 {
t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
}
}
if u.Debug {
printRoutingTables(dhts)
}
t.Logf("bootstrapping them so they find each other", nDHTs)
signal <- time.Now()
// this is async, and we dont know when it's finished with one cycle, so keep checking
// until the routing tables look better, or some long timeout for the failure case.
waitForWellFormedTables(t, dhts, 7, 10, 5*time.Second)
if u.Debug {
printRoutingTables(dhts)
}
}
@ -319,7 +434,6 @@ func TestProvidesMany(t *testing.T) {
if u.Debug {
// the routing tables should be full now. let's inspect them.
<-time.After(5 * time.Second)
t.Logf("checking routing table of %d", nDHTs)
for _, dht := range dhts {
fmt.Printf("checking routing table of %s\n", dht.self)

View File

@ -49,6 +49,10 @@ func TestGetFailures(t *testing.T) {
// u.POut("Timout Test\n")
ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond)
if _, err := d.GetValue(ctx1, u.Key("test")); err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
if err != context.DeadlineExceeded {
t.Fatal("Got different error than we expected", err)
}
@ -86,6 +90,9 @@ func TestGetFailures(t *testing.T) {
ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second)
_, err = d.GetValue(ctx2, u.Key("test"))
if err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
if err != routing.ErrNotFound {
t.Fatalf("Expected ErrNotFound, got: %s", err)
}
@ -202,6 +209,9 @@ func TestNotFound(t *testing.T) {
v, err := d.GetValue(ctx, u.Key("hello"))
log.Debugf("get value got %v", v)
if err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
switch err {
case routing.ErrNotFound:
//Success!

View File

@ -62,7 +62,7 @@ type dhtQueryRunner struct {
peersRemaining todoctr.Counter // peersToQuery + currently processing
result *dhtQueryResult // query result
errs []error // result errors. maybe should be a map[peer.ID]error
errs u.MultiErr // result errors. maybe should be a map[peer.ID]error
rateLimit chan struct{} // processing semaphore
log eventlog.EventLogger
@ -122,8 +122,12 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
r.RLock()
defer r.RUnlock()
if len(r.errs) > 0 {
err = r.errs[0] // take the first?
err = routing.ErrNotFound
// if every query to every peer failed, something must be very wrong.
if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
log.Debugf("query errs: %s", r.errs)
err = r.errs[0]
}
case <-r.cg.Closed():

View File

@ -88,9 +88,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
// get closest peers in the routing table
rtp := dht.routingTable.ListPeers()
log.Debugf("peers in rt: %s", len(rtp), rtp)
closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
if len(rtp) == 0 {
log.Warning("No peers from routing table!")
return nil, errors.Wrap(kb.ErrLookupFailure)
}
@ -111,7 +109,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
})
// run it!
result, err := query.Run(ctx, closest)
result, err := query.Run(ctx, rtp)
if err != nil {
return nil, err
}
@ -170,7 +168,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn
// to the given key
func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) {
e := log.EventBegin(ctx, "getClosestPeers", &key)
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
tablepeers := dht.routingTable.ListPeers()
if len(tablepeers) == 0 {
return nil, errors.Wrap(kb.ErrLookupFailure)
}
@ -313,7 +311,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
return &dhtQueryResult{closerPeers: clpeers}, nil
})
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
peers := dht.routingTable.ListPeers()
_, err := query.Run(ctx, peers)
if err != nil {
log.Errorf("Query error: %s", err)
@ -329,13 +327,13 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
return pi, nil
}
closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 {
peers := dht.routingTable.ListPeers()
if len(peers) == 0 {
return peer.PeerInfo{}, errors.Wrap(kb.ErrLookupFailure)
}
// Sanity...
for _, p := range closest {
for _, p := range peers {
if p == id {
log.Error("Found target peer in list of closest peers...")
return dht.peerstore.PeerInfo(p), nil
@ -367,7 +365,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
})
// run it!
result, err := query.Run(ctx, closest)
result, err := query.Run(ctx, peers)
if err != nil {
return peer.PeerInfo{}, err
}
@ -386,8 +384,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
peersSeen := peer.Set{}
closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 {
peers := dht.routingTable.ListPeers()
if len(peers) == 0 {
return nil, errors.Wrap(kb.ErrLookupFailure)
}
@ -432,7 +430,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
// run it! run it asynchronously to gen peers as results are found.
// this does no error checking
go func() {
if _, err := query.Run(ctx, closest); err != nil {
if _, err := query.Run(ctx, peers); err != nil {
log.Error(err)
}

View File

@ -115,8 +115,15 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
}
defer catter.Close()
catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)})
adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)})
bs1 := []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}
bs2 := []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}
if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil {
return err
}
if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil {
return err
}
keyAdded, err := coreunix.Add(adder, bytes.NewReader(data))
if err != nil {

View File

@ -62,9 +62,15 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
return err
}
defer bootstrap.Close()
boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID())
adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo})
catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo})
bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID())
bcfg := core.BootstrapConfigWithPeers([]peer.PeerInfo{bis})
if err := adder.Bootstrap(bcfg); err != nil {
return err
}
if err := catter.Bootstrap(bcfg); err != nil {
return err
}
keyAdded, err := coreunix.Add(adder, bytes.NewReader(data))
if err != nil {