From baf254123786b7b91a1f5aa65f460bcbaf88bdd0 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 13 Jan 2015 08:15:26 -0800 Subject: [PATCH] p2p/net/swarm: extensive dial tests --- p2p/net/swarm/dial_test.go | 427 ++++++++++++++++++++++++++++++++++++ p2p/net/swarm/simul_test.go | 43 ---- 2 files changed, 427 insertions(+), 43 deletions(-) create mode 100644 p2p/net/swarm/dial_test.go diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go new file mode 100644 index 000000000..bc5ec0dde --- /dev/null +++ b/p2p/net/swarm/dial_test.go @@ -0,0 +1,427 @@ +package swarm + +import ( + "net" + "sync" + "testing" + "time" + + addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" + peer "github.com/jbenet/go-ipfs/p2p/peer" + testutil "github.com/jbenet/go-ipfs/util/testutil" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" +) + +func acceptAndHang(l net.Listener) { + conns := make([]net.Conn, 0, 10) + for { + c, err := l.Accept() + if err != nil { + break + } + if c != nil { + conns = append(conns, c) + } + } + for _, c := range conns { + c.Close() + } +} + +func TestSimultDials(t *testing.T) { + // t.Skip("skipping for another test") + t.Parallel() + + ctx := context.Background() + swarms := makeSwarms(ctx, t, 2) + + // connect everyone + { + var wg sync.WaitGroup + connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { + // copy for other peer + log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr) + s.peers.AddAddress(dst, addr) + if _, err := s.Dial(ctx, dst); err != nil { + t.Fatal("error swarm dialing to peer", err) + } + wg.Done() + } + + ifaceAddrs0, err := swarms[0].InterfaceListenAddresses() + if err != nil { + t.Fatal(err) + } + ifaceAddrs1, err := swarms[1].InterfaceListenAddresses() + if err != nil { + t.Fatal(err) + } + + log.Info("Connecting swarms simultaneously.") + for i := 0; i < 10; i++ { // connect 10x for each. + wg.Add(2) + go connect(swarms[0], swarms[1].local, ifaceAddrs1[0]) + go connect(swarms[1], swarms[0].local, ifaceAddrs0[0]) + } + wg.Wait() + } + + // should still just have 1, at most 2 connections :) + c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local)) + if c01l > 2 { + t.Error("0->1 has", c01l) + } + c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local)) + if c10l > 2 { + t.Error("1->0 has", c10l) + } + + for _, s := range swarms { + s.Close() + } +} + +func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) { + dst := testutil.RandPeerIDFatal(t) + lst, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + addr, err := manet.FromNetAddr(lst.Addr()) + if err != nil { + t.Fatal(err) + } + addrs := []ma.Multiaddr{addr} + addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil) + if err != nil { + t.Fatal(err) + } + t.Log("new silent peer:", dst, addrs[0]) + return dst, addrs[0], lst +} + +func TestDialWait(t *testing.T) { + // t.Skip("skipping for another test") + t.Parallel() + + ctx := context.Background() + swarms := makeSwarms(ctx, t, 1) + s1 := swarms[0] + defer s1.Close() + + s1.dialT = time.Millisecond * 300 // lower timeout for tests. + + // dial to a non-existent peer. + s2p, s2addr, s2l := newSilentPeer(t) + go acceptAndHang(s2l) + defer s2l.Close() + s1.peers.AddAddress(s2p, s2addr) + + before := time.Now() + if c, err := s1.Dial(ctx, s2p); err == nil { + defer c.Close() + t.Fatal("error swarm dialing to unknown peer worked...", err) + } else { + t.Log("correctly got error:", err) + } + duration := time.Now().Sub(before) + + dt := s1.dialT + if duration < dt*dialAttempts { + t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts) + } + if duration > 2*dt*dialAttempts { + t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts) + } + + if !s1.backf.Backoff(s2p) { + t.Error("s2 should now be on backoff") + } +} + +func TestDialBackoff(t *testing.T) { + // t.Skip("skipping for another test") + t.Parallel() + + ctx := context.Background() + swarms := makeSwarms(ctx, t, 2) + s1 := swarms[0] + s2 := swarms[1] + defer s1.Close() + defer s2.Close() + + s1.dialT = time.Millisecond * 500 // lower timeout for tests. + s2.dialT = time.Millisecond * 500 // lower timeout for tests. + + s2addrs, err := s2.InterfaceListenAddresses() + if err != nil { + t.Fatal(err) + } + s1.peers.AddAddresses(s2.local, s2addrs) + + // dial to a non-existent peer. + s3p, s3addr, s3l := newSilentPeer(t) + go acceptAndHang(s3l) + defer s3l.Close() + s1.peers.AddAddress(s3p, s3addr) + + // in this test we will: + // 1) dial 10x to each node. + // 2) all dials should hang + // 3) s1->s2 should succeed. + // 4) s1->s3 should not (and should place s3 on backoff) + // 5) disconnect entirely + // 6) dial 10x to each node again + // 7) s3 dials should all return immediately (except 1) + // 8) s2 dials should all hang, and succeed + // 9) last s3 dial ends, unsuccessful + + dialOnlineNode := func(dst peer.ID, times int) <-chan bool { + ch := make(chan bool) + for i := 0; i < times; i++ { + go func() { + if _, err := s1.Dial(ctx, dst); err != nil { + t.Error("error dialing", dst, err) + ch <- false + } else { + ch <- true + } + }() + } + return ch + } + + dialOfflineNode := func(dst peer.ID, times int) <-chan bool { + ch := make(chan bool) + for i := 0; i < times; i++ { + go func() { + if c, err := s1.Dial(ctx, dst); err != nil { + ch <- false + } else { + t.Error("succeeded in dialing", dst) + ch <- true + c.Close() + } + }() + } + return ch + } + + { + // 1) dial 10x to each node. + N := 10 + s2done := dialOnlineNode(s2.local, N) + s3done := dialOfflineNode(s3p, N) + + // when all dials should be done by: + dialTimeout1x := time.After(s1.dialT) + dialTimeout1Ax := time.After(s1.dialT * dialAttempts) + dialTimeout10Ax := time.After(s1.dialT * dialAttempts * 10) + + // 2) all dials should hang + select { + case <-s2done: + t.Error("s2 should not happen immediately") + case <-s3done: + t.Error("s3 should not happen yet") + case <-time.After(time.Millisecond): + // s2 may finish very quickly, so let's get out. + } + + // 3) s1->s2 should succeed. + for i := 0; i < N; i++ { + select { + case r := <-s2done: + if !r { + t.Error("s2 should not fail") + } + case <-s3done: + t.Error("s3 should not happen yet") + case <-dialTimeout1x: + t.Error("s2 took too long") + } + } + + select { + case <-s2done: + t.Error("s2 should have no more") + case <-s3done: + t.Error("s3 should not happen yet") + case <-dialTimeout1x: // let it pass + } + + // 4) s1->s3 should not (and should place s3 on backoff) + // N-1 should finish before dialTimeout1Ax + for i := 0; i < N; i++ { + select { + case <-s2done: + t.Error("s2 should have no more") + case r := <-s3done: + if r { + t.Error("s3 should not succeed") + } + case <-dialTimeout1Ax: + if i < (N - 1) { + t.Fatal("s3 took too long") + } + t.Log("dialTimeout1Ax hit for last peer") + case <-dialTimeout10Ax: + t.Fatal("s3 took too long") + } + } + + // check backoff state + if s1.backf.Backoff(s2.local) { + t.Error("s2 should not be on backoff") + } + if !s1.backf.Backoff(s3p) { + t.Error("s3 should be on backoff") + } + + // 5) disconnect entirely + + for _, c := range s1.Connections() { + c.Close() + } + for i := 0; i < 100 && len(s1.Connections()) > 0; i++ { + <-time.After(time.Millisecond) + } + if len(s1.Connections()) > 0 { + t.Fatal("s1 conns must exit") + } + } + + { + // 6) dial 10x to each node again + N := 10 + s2done := dialOnlineNode(s2.local, N) + s3done := dialOfflineNode(s3p, N) + + // when all dials should be done by: + dialTimeout1x := time.After(s1.dialT) + dialTimeout1Ax := time.After(s1.dialT * dialAttempts) + dialTimeout10Ax := time.After(s1.dialT * dialAttempts * 10) + + // 7) s3 dials should all return immediately (except 1) + for i := 0; i < N-1; i++ { + select { + case <-s2done: + t.Error("s2 should not succeed yet") + case r := <-s3done: + if r { + t.Error("s3 should not succeed") + } + case <-dialTimeout1x: + t.Fatal("s3 took too long") + } + } + + // 8) s2 dials should all hang, and succeed + for i := 0; i < N; i++ { + select { + case r := <-s2done: + if !r { + t.Error("s2 should succeed") + } + // case <-s3done: + case <-dialTimeout1Ax: + t.Fatal("s3 took too long") + } + } + + // 9) the last s3 should return, failed. + select { + case <-s2done: + t.Error("s2 should have no more") + case r := <-s3done: + if r { + t.Error("s3 should not succeed") + } + case <-dialTimeout10Ax: + t.Fatal("s3 took too long") + } + + // check backoff state (the same) + if s1.backf.Backoff(s2.local) { + t.Error("s2 should not be on backoff") + } + if !s1.backf.Backoff(s3p) { + t.Error("s3 should be on backoff") + } + + } +} + +func TestDialBackoffClears(t *testing.T) { + // t.Skip("skipping for another test") + t.Parallel() + + ctx := context.Background() + swarms := makeSwarms(ctx, t, 2) + s1 := swarms[0] + s2 := swarms[1] + defer s1.Close() + defer s2.Close() + s1.dialT = time.Millisecond * 300 // lower timeout for tests. + s2.dialT = time.Millisecond * 300 // lower timeout for tests. + + // use another address first, that accept and hang on conns + _, s2bad, s2l := newSilentPeer(t) + go acceptAndHang(s2l) + defer s2l.Close() + + // phase 1 -- dial to non-operational addresses + s1.peers.AddAddress(s2.local, s2bad) + + before := time.Now() + if c, err := s1.Dial(ctx, s2.local); err == nil { + t.Fatal("dialing to broken addr worked...", err) + defer c.Close() + } else { + t.Log("correctly got error:", err) + } + duration := time.Now().Sub(before) + + dt := s1.dialT + if duration < dt*dialAttempts { + t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts) + } + if duration > 2*dt*dialAttempts { + t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts) + } + + if !s1.backf.Backoff(s2.local) { + t.Error("s2 should now be on backoff") + } else { + t.Log("correctly added to backoff") + } + + // phase 2 -- add the working address. dial should succeed. + ifaceAddrs1, err := swarms[1].InterfaceListenAddresses() + if err != nil { + t.Fatal(err) + } + s1.peers.AddAddresses(s2.local, ifaceAddrs1) + + before = time.Now() + if c, err := s1.Dial(ctx, s2.local); err != nil { + t.Fatal(err) + } else { + c.Close() + t.Log("correctly connected") + } + duration = time.Now().Sub(before) + + if duration >= dt { + // t.Error("took too long", duration, dt) + } + + if s1.backf.Backoff(s2.local) { + t.Error("s2 should no longer be on backoff") + } else { + t.Log("correctly cleared backoff") + } +} diff --git a/p2p/net/swarm/simul_test.go b/p2p/net/swarm/simul_test.go index 9382cb645..c87df91c3 100644 --- a/p2p/net/swarm/simul_test.go +++ b/p2p/net/swarm/simul_test.go @@ -11,49 +11,6 @@ import ( ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) -func TestSimultDials(t *testing.T) { - // t.Skip("skipping for another test") - - ctx := context.Background() - swarms := makeSwarms(ctx, t, 2) - - // connect everyone - { - var wg sync.WaitGroup - connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { - // copy for other peer - log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr) - s.peers.AddAddress(dst, addr) - if _, err := s.Dial(ctx, dst); err != nil { - t.Fatal("error swarm dialing to peer", err) - } - wg.Done() - } - - log.Info("Connecting swarms simultaneously.") - for i := 0; i < 10; i++ { // connect 10x for each. - wg.Add(2) - go connect(swarms[0], swarms[1].local, swarms[1].ListenAddresses()[0]) - go connect(swarms[1], swarms[0].local, swarms[0].ListenAddresses()[0]) - } - wg.Wait() - } - - // should still just have 1, at most 2 connections :) - c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local)) - if c01l > 2 { - t.Error("0->1 has", c01l) - } - c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local)) - if c10l > 2 { - t.Error("1->0 has", c10l) - } - - for _, s := range swarms { - s.Close() - } -} - func TestSimultOpen(t *testing.T) { // t.Skip("skipping for another test")