mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-24 20:07:45 +08:00
p2p/net/swarm: extensive dial tests
This commit is contained in:
parent
7f3a651f5d
commit
baf2541237
427
p2p/net/swarm/dial_test.go
Normal file
427
p2p/net/swarm/dial_test.go
Normal file
@ -0,0 +1,427 @@
|
||||
package swarm
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
|
||||
)
|
||||
|
||||
func acceptAndHang(l net.Listener) {
|
||||
conns := make([]net.Conn, 0, 10)
|
||||
for {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if c != nil {
|
||||
conns = append(conns, c)
|
||||
}
|
||||
}
|
||||
for _, c := range conns {
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSimultDials(t *testing.T) {
|
||||
// t.Skip("skipping for another test")
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
swarms := makeSwarms(ctx, t, 2)
|
||||
|
||||
// connect everyone
|
||||
{
|
||||
var wg sync.WaitGroup
|
||||
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
|
||||
// copy for other peer
|
||||
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
|
||||
s.peers.AddAddress(dst, addr)
|
||||
if _, err := s.Dial(ctx, dst); err != nil {
|
||||
t.Fatal("error swarm dialing to peer", err)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
ifaceAddrs0, err := swarms[0].InterfaceListenAddresses()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
log.Info("Connecting swarms simultaneously.")
|
||||
for i := 0; i < 10; i++ { // connect 10x for each.
|
||||
wg.Add(2)
|
||||
go connect(swarms[0], swarms[1].local, ifaceAddrs1[0])
|
||||
go connect(swarms[1], swarms[0].local, ifaceAddrs0[0])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// should still just have 1, at most 2 connections :)
|
||||
c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local))
|
||||
if c01l > 2 {
|
||||
t.Error("0->1 has", c01l)
|
||||
}
|
||||
c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local))
|
||||
if c10l > 2 {
|
||||
t.Error("1->0 has", c10l)
|
||||
}
|
||||
|
||||
for _, s := range swarms {
|
||||
s.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
|
||||
dst := testutil.RandPeerIDFatal(t)
|
||||
lst, err := net.Listen("tcp", ":0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
addr, err := manet.FromNetAddr(lst.Addr())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
addrs := []ma.Multiaddr{addr}
|
||||
addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("new silent peer:", dst, addrs[0])
|
||||
return dst, addrs[0], lst
|
||||
}
|
||||
|
||||
func TestDialWait(t *testing.T) {
|
||||
// t.Skip("skipping for another test")
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
swarms := makeSwarms(ctx, t, 1)
|
||||
s1 := swarms[0]
|
||||
defer s1.Close()
|
||||
|
||||
s1.dialT = time.Millisecond * 300 // lower timeout for tests.
|
||||
|
||||
// dial to a non-existent peer.
|
||||
s2p, s2addr, s2l := newSilentPeer(t)
|
||||
go acceptAndHang(s2l)
|
||||
defer s2l.Close()
|
||||
s1.peers.AddAddress(s2p, s2addr)
|
||||
|
||||
before := time.Now()
|
||||
if c, err := s1.Dial(ctx, s2p); err == nil {
|
||||
defer c.Close()
|
||||
t.Fatal("error swarm dialing to unknown peer worked...", err)
|
||||
} else {
|
||||
t.Log("correctly got error:", err)
|
||||
}
|
||||
duration := time.Now().Sub(before)
|
||||
|
||||
dt := s1.dialT
|
||||
if duration < dt*dialAttempts {
|
||||
t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts)
|
||||
}
|
||||
if duration > 2*dt*dialAttempts {
|
||||
t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts)
|
||||
}
|
||||
|
||||
if !s1.backf.Backoff(s2p) {
|
||||
t.Error("s2 should now be on backoff")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDialBackoff(t *testing.T) {
|
||||
// t.Skip("skipping for another test")
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
swarms := makeSwarms(ctx, t, 2)
|
||||
s1 := swarms[0]
|
||||
s2 := swarms[1]
|
||||
defer s1.Close()
|
||||
defer s2.Close()
|
||||
|
||||
s1.dialT = time.Millisecond * 500 // lower timeout for tests.
|
||||
s2.dialT = time.Millisecond * 500 // lower timeout for tests.
|
||||
|
||||
s2addrs, err := s2.InterfaceListenAddresses()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s1.peers.AddAddresses(s2.local, s2addrs)
|
||||
|
||||
// dial to a non-existent peer.
|
||||
s3p, s3addr, s3l := newSilentPeer(t)
|
||||
go acceptAndHang(s3l)
|
||||
defer s3l.Close()
|
||||
s1.peers.AddAddress(s3p, s3addr)
|
||||
|
||||
// in this test we will:
|
||||
// 1) dial 10x to each node.
|
||||
// 2) all dials should hang
|
||||
// 3) s1->s2 should succeed.
|
||||
// 4) s1->s3 should not (and should place s3 on backoff)
|
||||
// 5) disconnect entirely
|
||||
// 6) dial 10x to each node again
|
||||
// 7) s3 dials should all return immediately (except 1)
|
||||
// 8) s2 dials should all hang, and succeed
|
||||
// 9) last s3 dial ends, unsuccessful
|
||||
|
||||
dialOnlineNode := func(dst peer.ID, times int) <-chan bool {
|
||||
ch := make(chan bool)
|
||||
for i := 0; i < times; i++ {
|
||||
go func() {
|
||||
if _, err := s1.Dial(ctx, dst); err != nil {
|
||||
t.Error("error dialing", dst, err)
|
||||
ch <- false
|
||||
} else {
|
||||
ch <- true
|
||||
}
|
||||
}()
|
||||
}
|
||||
return ch
|
||||
}
|
||||
|
||||
dialOfflineNode := func(dst peer.ID, times int) <-chan bool {
|
||||
ch := make(chan bool)
|
||||
for i := 0; i < times; i++ {
|
||||
go func() {
|
||||
if c, err := s1.Dial(ctx, dst); err != nil {
|
||||
ch <- false
|
||||
} else {
|
||||
t.Error("succeeded in dialing", dst)
|
||||
ch <- true
|
||||
c.Close()
|
||||
}
|
||||
}()
|
||||
}
|
||||
return ch
|
||||
}
|
||||
|
||||
{
|
||||
// 1) dial 10x to each node.
|
||||
N := 10
|
||||
s2done := dialOnlineNode(s2.local, N)
|
||||
s3done := dialOfflineNode(s3p, N)
|
||||
|
||||
// when all dials should be done by:
|
||||
dialTimeout1x := time.After(s1.dialT)
|
||||
dialTimeout1Ax := time.After(s1.dialT * dialAttempts)
|
||||
dialTimeout10Ax := time.After(s1.dialT * dialAttempts * 10)
|
||||
|
||||
// 2) all dials should hang
|
||||
select {
|
||||
case <-s2done:
|
||||
t.Error("s2 should not happen immediately")
|
||||
case <-s3done:
|
||||
t.Error("s3 should not happen yet")
|
||||
case <-time.After(time.Millisecond):
|
||||
// s2 may finish very quickly, so let's get out.
|
||||
}
|
||||
|
||||
// 3) s1->s2 should succeed.
|
||||
for i := 0; i < N; i++ {
|
||||
select {
|
||||
case r := <-s2done:
|
||||
if !r {
|
||||
t.Error("s2 should not fail")
|
||||
}
|
||||
case <-s3done:
|
||||
t.Error("s3 should not happen yet")
|
||||
case <-dialTimeout1x:
|
||||
t.Error("s2 took too long")
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-s2done:
|
||||
t.Error("s2 should have no more")
|
||||
case <-s3done:
|
||||
t.Error("s3 should not happen yet")
|
||||
case <-dialTimeout1x: // let it pass
|
||||
}
|
||||
|
||||
// 4) s1->s3 should not (and should place s3 on backoff)
|
||||
// N-1 should finish before dialTimeout1Ax
|
||||
for i := 0; i < N; i++ {
|
||||
select {
|
||||
case <-s2done:
|
||||
t.Error("s2 should have no more")
|
||||
case r := <-s3done:
|
||||
if r {
|
||||
t.Error("s3 should not succeed")
|
||||
}
|
||||
case <-dialTimeout1Ax:
|
||||
if i < (N - 1) {
|
||||
t.Fatal("s3 took too long")
|
||||
}
|
||||
t.Log("dialTimeout1Ax hit for last peer")
|
||||
case <-dialTimeout10Ax:
|
||||
t.Fatal("s3 took too long")
|
||||
}
|
||||
}
|
||||
|
||||
// check backoff state
|
||||
if s1.backf.Backoff(s2.local) {
|
||||
t.Error("s2 should not be on backoff")
|
||||
}
|
||||
if !s1.backf.Backoff(s3p) {
|
||||
t.Error("s3 should be on backoff")
|
||||
}
|
||||
|
||||
// 5) disconnect entirely
|
||||
|
||||
for _, c := range s1.Connections() {
|
||||
c.Close()
|
||||
}
|
||||
for i := 0; i < 100 && len(s1.Connections()) > 0; i++ {
|
||||
<-time.After(time.Millisecond)
|
||||
}
|
||||
if len(s1.Connections()) > 0 {
|
||||
t.Fatal("s1 conns must exit")
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// 6) dial 10x to each node again
|
||||
N := 10
|
||||
s2done := dialOnlineNode(s2.local, N)
|
||||
s3done := dialOfflineNode(s3p, N)
|
||||
|
||||
// when all dials should be done by:
|
||||
dialTimeout1x := time.After(s1.dialT)
|
||||
dialTimeout1Ax := time.After(s1.dialT * dialAttempts)
|
||||
dialTimeout10Ax := time.After(s1.dialT * dialAttempts * 10)
|
||||
|
||||
// 7) s3 dials should all return immediately (except 1)
|
||||
for i := 0; i < N-1; i++ {
|
||||
select {
|
||||
case <-s2done:
|
||||
t.Error("s2 should not succeed yet")
|
||||
case r := <-s3done:
|
||||
if r {
|
||||
t.Error("s3 should not succeed")
|
||||
}
|
||||
case <-dialTimeout1x:
|
||||
t.Fatal("s3 took too long")
|
||||
}
|
||||
}
|
||||
|
||||
// 8) s2 dials should all hang, and succeed
|
||||
for i := 0; i < N; i++ {
|
||||
select {
|
||||
case r := <-s2done:
|
||||
if !r {
|
||||
t.Error("s2 should succeed")
|
||||
}
|
||||
// case <-s3done:
|
||||
case <-dialTimeout1Ax:
|
||||
t.Fatal("s3 took too long")
|
||||
}
|
||||
}
|
||||
|
||||
// 9) the last s3 should return, failed.
|
||||
select {
|
||||
case <-s2done:
|
||||
t.Error("s2 should have no more")
|
||||
case r := <-s3done:
|
||||
if r {
|
||||
t.Error("s3 should not succeed")
|
||||
}
|
||||
case <-dialTimeout10Ax:
|
||||
t.Fatal("s3 took too long")
|
||||
}
|
||||
|
||||
// check backoff state (the same)
|
||||
if s1.backf.Backoff(s2.local) {
|
||||
t.Error("s2 should not be on backoff")
|
||||
}
|
||||
if !s1.backf.Backoff(s3p) {
|
||||
t.Error("s3 should be on backoff")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func TestDialBackoffClears(t *testing.T) {
|
||||
// t.Skip("skipping for another test")
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
swarms := makeSwarms(ctx, t, 2)
|
||||
s1 := swarms[0]
|
||||
s2 := swarms[1]
|
||||
defer s1.Close()
|
||||
defer s2.Close()
|
||||
s1.dialT = time.Millisecond * 300 // lower timeout for tests.
|
||||
s2.dialT = time.Millisecond * 300 // lower timeout for tests.
|
||||
|
||||
// use another address first, that accept and hang on conns
|
||||
_, s2bad, s2l := newSilentPeer(t)
|
||||
go acceptAndHang(s2l)
|
||||
defer s2l.Close()
|
||||
|
||||
// phase 1 -- dial to non-operational addresses
|
||||
s1.peers.AddAddress(s2.local, s2bad)
|
||||
|
||||
before := time.Now()
|
||||
if c, err := s1.Dial(ctx, s2.local); err == nil {
|
||||
t.Fatal("dialing to broken addr worked...", err)
|
||||
defer c.Close()
|
||||
} else {
|
||||
t.Log("correctly got error:", err)
|
||||
}
|
||||
duration := time.Now().Sub(before)
|
||||
|
||||
dt := s1.dialT
|
||||
if duration < dt*dialAttempts {
|
||||
t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts)
|
||||
}
|
||||
if duration > 2*dt*dialAttempts {
|
||||
t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts)
|
||||
}
|
||||
|
||||
if !s1.backf.Backoff(s2.local) {
|
||||
t.Error("s2 should now be on backoff")
|
||||
} else {
|
||||
t.Log("correctly added to backoff")
|
||||
}
|
||||
|
||||
// phase 2 -- add the working address. dial should succeed.
|
||||
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s1.peers.AddAddresses(s2.local, ifaceAddrs1)
|
||||
|
||||
before = time.Now()
|
||||
if c, err := s1.Dial(ctx, s2.local); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
c.Close()
|
||||
t.Log("correctly connected")
|
||||
}
|
||||
duration = time.Now().Sub(before)
|
||||
|
||||
if duration >= dt {
|
||||
// t.Error("took too long", duration, dt)
|
||||
}
|
||||
|
||||
if s1.backf.Backoff(s2.local) {
|
||||
t.Error("s2 should no longer be on backoff")
|
||||
} else {
|
||||
t.Log("correctly cleared backoff")
|
||||
}
|
||||
}
|
||||
@ -11,49 +11,6 @@ import (
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
)
|
||||
|
||||
func TestSimultDials(t *testing.T) {
|
||||
// t.Skip("skipping for another test")
|
||||
|
||||
ctx := context.Background()
|
||||
swarms := makeSwarms(ctx, t, 2)
|
||||
|
||||
// connect everyone
|
||||
{
|
||||
var wg sync.WaitGroup
|
||||
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
|
||||
// copy for other peer
|
||||
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
|
||||
s.peers.AddAddress(dst, addr)
|
||||
if _, err := s.Dial(ctx, dst); err != nil {
|
||||
t.Fatal("error swarm dialing to peer", err)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
log.Info("Connecting swarms simultaneously.")
|
||||
for i := 0; i < 10; i++ { // connect 10x for each.
|
||||
wg.Add(2)
|
||||
go connect(swarms[0], swarms[1].local, swarms[1].ListenAddresses()[0])
|
||||
go connect(swarms[1], swarms[0].local, swarms[0].ListenAddresses()[0])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// should still just have 1, at most 2 connections :)
|
||||
c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local))
|
||||
if c01l > 2 {
|
||||
t.Error("0->1 has", c01l)
|
||||
}
|
||||
c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local))
|
||||
if c10l > 2 {
|
||||
t.Error("1->0 has", c10l)
|
||||
}
|
||||
|
||||
for _, s := range swarms {
|
||||
s.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSimultOpen(t *testing.T) {
|
||||
// t.Skip("skipping for another test")
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user