dht: more tests pass

This commit is contained in:
Juan Batiz-Benet 2014-12-21 02:23:06 -08:00
parent 334557f9d5
commit 16c25efbcb
4 changed files with 274 additions and 327 deletions

View File

@ -52,11 +52,11 @@ type IpfsDHT struct {
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, p peer.ID, ps peer.Peerstore, n inet.Network, dstore ds.ThreadSafeDatastore) *IpfsDHT {
func NewDHT(ctx context.Context, p peer.ID, n inet.Network, dstore ds.ThreadSafeDatastore) *IpfsDHT {
dht := new(IpfsDHT)
dht.datastore = dstore
dht.self = p
dht.peerstore = ps
dht.peerstore = n.Peerstore()
dht.ContextGroup = ctxgroup.WithContext(ctx)
dht.network = n
n.SetHandler(inet.ProtocolDHT, dht.handleNewStream)

View File

@ -1,8 +1,8 @@
package dht
import (
// "fmt"
"bytes"
"sort"
"testing"
"time"
@ -42,7 +42,7 @@ func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT {
}
dss := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, p, peerstore, n, dss)
d := NewDHT(ctx, p, n, dss)
d.Validators["v"] = func(u.Key, []byte) error {
return nil
@ -208,7 +208,6 @@ func TestProvides(t *testing.T) {
}
}
/*
func TestProvidesAsync(t *testing.T) {
if testing.Short() {
t.SkipNow()
@ -216,7 +215,7 @@ func TestProvidesAsync(t *testing.T) {
ctx := context.Background()
_, peers, dhts := setupDHTS(ctx, 4, t)
_, _, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
@ -224,22 +223,11 @@ func TestProvidesAsync(t *testing.T) {
}
}()
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
err = dhts[3].putLocal(u.Key("hello"), []byte("world"))
err := dhts[3].putLocal(u.Key("hello"), []byte("world"))
if err != nil {
t.Fatal(err)
}
@ -263,10 +251,10 @@ func TestProvidesAsync(t *testing.T) {
if !ok {
t.Fatal("Provider channel was closed...")
}
if p == nil {
if p.ID == "" {
t.Fatal("Got back nil provider!")
}
if !p.ID().Equal(dhts[3].self.ID()) {
if p.ID != dhts[3].self {
t.Fatalf("got a provider, but not the right one. %s", p)
}
case <-ctxT.Done():
@ -281,7 +269,7 @@ func TestLayeredGet(t *testing.T) {
ctx := context.Background()
_, peers, dhts := setupDHTS(ctx, 4, t)
_, _, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
@ -289,22 +277,11 @@ func TestLayeredGet(t *testing.T) {
}
}()
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatalf("Failed to connect: %s", err)
}
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
err = dhts[3].putLocal(u.Key("/v/hello"), []byte("world"))
err := dhts[3].putLocal(u.Key("/v/hello"), []byte("world"))
if err != nil {
t.Fatal(err)
}
@ -343,32 +320,21 @@ func TestFindPeer(t *testing.T) {
}
}()
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
ctxT, _ := context.WithTimeout(ctx, time.Second)
p, err := dhts[0].FindPeer(ctxT, peers[2].ID())
p, err := dhts[0].FindPeer(ctxT, peers[2])
if err != nil {
t.Fatal(err)
}
if p == nil {
if p.ID == "" {
t.Fatal("Failed to find peer.")
}
if !p.ID().Equal(peers[2].ID()) {
if p.ID != peers[2] {
t.Fatal("Didnt find expected peer.")
}
}
@ -392,25 +358,10 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
// topology:
// 0-1, 1-2, 1-3, 2-3
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
err = dhts[2].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
connect(t, ctx, dhts[2], dhts[3])
// fmt.Println("0 is", peers[0])
// fmt.Println("1 is", peers[1])
@ -418,13 +369,13 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
// fmt.Println("3 is", peers[3])
ctxT, _ := context.WithTimeout(ctx, time.Second)
pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2].ID())
pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2])
if err != nil {
t.Fatal(err)
}
// shouldFind := []peer.ID{peers[1], peers[3]}
found := []peer.ID{}
found := []peer.PeerInfo{}
for nextp := range pchan {
found = append(found, nextp)
}
@ -451,11 +402,11 @@ func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) {
ids2 := make([]string, len(p2))
for i, p := range p1 {
ids1[i] = p.ID().Pretty()
ids1[i] = string(p)
}
for i, p := range p2 {
ids2[i] = p.ID().Pretty()
ids2[i] = string(p)
}
sort.Sort(sort.StringSlice(ids1))
@ -480,39 +431,41 @@ func TestConnectCollision(t *testing.T) {
ctx := context.Background()
addrA := randMultiaddr(t)
addrB := randMultiaddr(t)
addrA := testutil.RandLocalTCPAddress()
addrB := testutil.RandLocalTCPAddress()
peerA := makePeer(addrA)
peerB := makePeer(addrB)
dhtA := setupDHT(ctx, t, addrA)
dhtB := setupDHT(ctx, t, addrB)
dhtA := setupDHT(ctx, t, peerA)
dhtB := setupDHT(ctx, t, peerB)
peerA := dhtA.self
peerB := dhtB.self
done := make(chan struct{})
errs := make(chan error)
go func() {
dhtA.peerstore.AddAddress(peerB, addrB)
err := dhtA.Connect(ctx, peerB)
if err != nil {
t.Fatal(err)
}
done <- struct{}{}
errs <- err
}()
go func() {
dhtB.peerstore.AddAddress(peerA, addrA)
err := dhtB.Connect(ctx, peerA)
if err != nil {
t.Fatal(err)
}
done <- struct{}{}
errs <- err
}()
timeout := time.After(time.Second)
select {
case <-done:
case e := <-errs:
if e != nil {
t.Fatal(e)
}
case <-timeout:
t.Fatal("Timeout received!")
}
select {
case <-done:
case e := <-errs:
if e != nil {
t.Fatal(e)
}
case <-timeout:
t.Fatal("Timeout received!")
}
@ -521,8 +474,5 @@ func TestConnectCollision(t *testing.T) {
dhtB.Close()
dhtA.network.Close()
dhtB.network.Close()
<-time.After(200 * time.Millisecond)
}
}
*/

View File

@ -1,270 +1,264 @@
package dht
// import (
// "math/rand"
// "testing"
import (
"math/rand"
"testing"
// crand "crypto/rand"
inet "github.com/jbenet/go-ipfs/net"
mocknet "github.com/jbenet/go-ipfs/net/mock"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"
// inet "github.com/jbenet/go-ipfs/net"
// mocknet "github.com/jbenet/go-ipfs/net/mock"
// peer "github.com/jbenet/go-ipfs/peer"
// routing "github.com/jbenet/go-ipfs/routing"
// pb "github.com/jbenet/go-ipfs/routing/dht/pb"
// u "github.com/jbenet/go-ipfs/util"
// testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
// context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
// ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
// ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"time"
)
// "time"
// )
func TestGetFailures(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// func TestGetFailures(t *testing.T) {
// if testing.Short() {
// t.SkipNow()
// }
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
nets := mn.Nets()
peers := mn.Peers()
// ctx := context.Background()
// mn, err := mocknet.FullMeshConnected(ctx, 2)
// if err != nil {
// t.Fatal(err)
// }
// nets := mn.Nets()
// peers := mn.Peers()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, peers[0], nets[0], tsds)
d.Update(ctx, peers[1])
// ps := peer.NewPeerstore()
// d := NewDHT(ctx, peers[0], ps, nets[0], ds.NewMapDatastore())
// d.Update(ctx, peers[1])
// This one should time out
// u.POut("Timout Test\n")
ctx1, _ := context.WithTimeout(context.Background(), time.Second)
if _, err := d.GetValue(ctx1, u.Key("test")); err != nil {
if err != context.DeadlineExceeded {
t.Fatal("Got different error than we expected", err)
}
} else {
t.Fatal("Did not get expected error!")
}
// // This one should time out
// // u.POut("Timout Test\n")
// ctx1, _ := context.WithTimeout(context.Background(), time.Second)
// if _, err := d.GetValue(ctx1, u.Key("test")); err != nil {
// if err != context.DeadlineExceeded {
// t.Fatal("Got different error than we expected", err)
// }
// } else {
// t.Fatal("Did not get expected error!")
// }
msgs := make(chan *pb.Message, 100)
// msgs := make(chan *pb.Message, 100)
// u.POut("NotFound Test\n")
// Reply with failures to every message
nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close()
// // u.POut("NotFound Test\n")
// // Reply with failures to every message
// nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
// defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
// pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
// pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
// pmes := new(pb.Message)
// if err := pbr.ReadMsg(pmes); err != nil {
// panic(err)
// }
resp := &pb.Message{
Type: pmes.Type,
}
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
// resp := &pb.Message{
// Type: pmes.Type,
// }
// if err := pbw.WriteMsg(resp); err != nil {
// panic(err)
// }
msgs <- resp
})
// msgs <- resp
// })
// This one should fail with NotFound
ctx2, _ := context.WithTimeout(context.Background(), time.Second)
_, err = d.GetValue(ctx2, u.Key("test"))
if err != nil {
if err != routing.ErrNotFound {
t.Fatalf("Expected ErrNotFound, got: %s", err)
}
} else {
t.Fatal("expected error, got none.")
}
// // This one should fail with NotFound
// ctx2, _ := context.WithTimeout(context.Background(), time.Second)
// _, err = d.GetValue(ctx2, u.Key("test"))
// if err != nil {
// if err != routing.ErrNotFound {
// t.Fatalf("Expected ErrNotFound, got: %s", err)
// }
// } else {
// t.Fatal("expected error, got none.")
// }
// Now we test this DHT's handleGetValue failure
{
typ := pb.Message_GET_VALUE
str := "hello"
rec, err := d.makePutRecord(u.Key(str), []byte("blah"))
if err != nil {
t.Fatal(err)
}
req := pb.Message{
Type: &typ,
Key: &str,
Record: rec,
}
// // Now we test this DHT's handleGetValue failure
// {
// typ := pb.Message_GET_VALUE
// str := "hello"
// rec, err := d.makePutRecord(u.Key(str), []byte("blah"))
// if err != nil {
// t.Fatal(err)
// }
// req := pb.Message{
// Type: &typ,
// Key: &str,
// Record: rec,
// }
// u.POut("handleGetValue Test\n")
s, err := nets[1].NewStream(inet.ProtocolDHT, peers[0])
if err != nil {
t.Fatal(err)
}
defer s.Close()
// // u.POut("handleGetValue Test\n")
// s, err := nets[1].NewStream(inet.ProtocolDHT, peers[0])
// if err != nil {
// t.Fatal(err)
// }
// defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
// pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
// pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(&req); err != nil {
t.Fatal(err)
}
// if err := pbw.WriteMsg(&req); err != nil {
// t.Fatal(err)
// }
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
t.Fatal(err)
}
if pmes.GetRecord() != nil {
t.Fatal("shouldnt have value")
}
if pmes.GetProviderPeers() != nil {
t.Fatal("shouldnt have provider peers")
}
}
}
// pmes := new(pb.Message)
// if err := pbr.ReadMsg(pmes); err != nil {
// t.Fatal(err)
// }
// if pmes.GetRecord() != nil {
// t.Fatal("shouldnt have value")
// }
// if pmes.GetProviderPeers() != nil {
// t.Fatal("shouldnt have provider peers")
// }
// }
// }
func TestNotFound(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// // TODO: Maybe put these in some sort of "ipfs_testutil" package
// func _randPeer() peer.Peer {
// id := make(peer.ID, 16)
// crand.Read(id)
// p := testutil.NewPeerWithID(id)
// return p
// }
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 16)
if err != nil {
t.Fatal(err)
}
nets := mn.Nets()
peers := mn.Peers()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, peers[0], nets[0], tsds)
// func TestNotFound(t *testing.T) {
// if testing.Short() {
// t.SkipNow()
// }
for _, p := range peers {
d.Update(ctx, p)
}
// ctx := context.Background()
// mn, err := mocknet.FullMeshConnected(ctx, 16)
// if err != nil {
// t.Fatal(err)
// }
// nets := mn.Nets()
// peers := mn.Peers()
// peerstore := peer.NewPeerstore()
// Reply with random peers to every message
for _, neti := range nets {
neti := neti // shadow loop var
neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close()
// d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore())
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
// for _, p := range peers {
// d.Update(ctx, p)
// }
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
// // Reply with random peers to every message
// for _, neti := range nets {
// neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
// defer s.Close()
switch pmes.GetType() {
case pb.Message_GET_VALUE:
resp := &pb.Message{Type: pmes.Type}
// pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
// pbw := ggio.NewDelimitedWriter(s)
ps := []peer.PeerInfo{}
for i := 0; i < 7; i++ {
p := peers[rand.Intn(len(peers))]
pi := neti.Peerstore().PeerInfo(p)
ps = append(ps, pi)
}
// pmes := new(pb.Message)
// if err := pbr.ReadMsg(pmes); err != nil {
// panic(err)
// }
resp.CloserPeers = pb.PeerInfosToPBPeers(d.network, ps)
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
// switch pmes.GetType() {
// case pb.Message_GET_VALUE:
// resp := &pb.Message{Type: pmes.Type}
default:
panic("Shouldnt recieve this.")
}
})
}
// ps := []peer.Peer{}
// for i := 0; i < 7; i++ {
// ps = append(ps, peers[rand.Intn(len(peers))])
// }
ctx, _ = context.WithTimeout(ctx, time.Second*5)
v, err := d.GetValue(ctx, u.Key("hello"))
log.Debugf("get value got %v", v)
if err != nil {
switch err {
case routing.ErrNotFound:
//Success!
return
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
// resp.CloserPeers = pb.PeersToPBPeers(d.network, peers)
// if err := pbw.WriteMsg(resp); err != nil {
// panic(err)
// }
// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
// t.Skip("skipping test because it makes a lot of output")
// default:
// panic("Shouldnt recieve this.")
// }
// })
// }
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 6)
if err != nil {
t.Fatal(err)
}
nets := mn.Nets()
peers := mn.Peers()
// ctx, _ = context.WithTimeout(ctx, time.Second*5)
// v, err := d.GetValue(ctx, u.Key("hello"))
// log.Debugf("get value got %v", v)
// if err != nil {
// switch err {
// case routing.ErrNotFound:
// //Success!
// return
// case u.ErrTimeout:
// t.Fatal("Should not have gotten timeout!")
// default:
// t.Fatalf("Got unexpected error: %s", err)
// }
// }
// t.Fatal("Expected to recieve an error.")
// }
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, peers[0], nets[0], tsds)
// // If less than K nodes are in the entire network, it should fail when we make
// // a GET rpc and nobody has the value
// func TestLessThanKResponses(t *testing.T) {
// // t.Skip("skipping test because it makes a lot of output")
for i := 1; i < 5; i++ {
d.Update(ctx, peers[i])
}
// ctx := context.Background()
// mn, err := mocknet.FullMeshConnected(ctx, 6)
// if err != nil {
// t.Fatal(err)
// }
// nets := mn.Nets()
// peers := mn.Peers()
// peerstore := peer.NewPeerstore()
// Reply with random peers to every message
for _, neti := range nets {
neti := neti // shadow loop var
neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close()
// d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore())
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
// for i := 1; i < 5; i++ {
// d.Update(ctx, peers[i])
// }
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
// // Reply with random peers to every message
// for _, neti := range nets {
// neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
// defer s.Close()
switch pmes.GetType() {
case pb.Message_GET_VALUE:
pi := neti.Peerstore().PeerInfo(peers[1])
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: pb.PeerInfosToPBPeers(d.network, []peer.PeerInfo{pi}),
}
// pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
// pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
// pmes := new(pb.Message)
// if err := pbr.ReadMsg(pmes); err != nil {
// panic(err)
// }
})
}
// switch pmes.GetType() {
// case pb.Message_GET_VALUE:
// resp := &pb.Message{
// Type: pmes.Type,
// CloserPeers: pb.PeersToPBPeers(d.network, []peer.Peer{peers[1]}),
// }
// if err := pbw.WriteMsg(resp); err != nil {
// panic(err)
// }
// default:
// panic("Shouldnt recieve this.")
// }
// })
// }
// ctx, _ = context.WithTimeout(ctx, time.Second*30)
// if _, err := d.GetValue(ctx, u.Key("hello")); err != nil {
// switch err {
// case routing.ErrNotFound:
// //Success!
// return
// case u.ErrTimeout:
// t.Fatal("Should not have gotten timeout!")
// default:
// t.Fatalf("Got unexpected error: %s", err)
// }
// }
// t.Fatal("Expected to recieve an error.")
// }
ctx, _ = context.WithTimeout(ctx, time.Second*30)
if _, err := d.GetValue(ctx, u.Key("hello")); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
return
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}

View File

@ -99,7 +99,10 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
for _, pi := range closerinfos {
log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
if len(pi.Addrs) < 1 {
log.Critical("no addresses on peer being sent!")
log.Criticalf(`no addresses on peer being sent!
[local:%s]
[sending:%s]
[remote:%s]`, dht.self, pi.ID, p)
}
}