diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 37c088e99..0ead46bf6 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -80,14 +80,14 @@ "ImportPath": "github.com/h2so5/utp", "Rev": "654d875bb65e96729678180215cf080fe2810371" }, - { - "ImportPath": "github.com/inconshreveable/go-update", - "Rev": "221d034a558b4c21b0624b2a450c076913854a57" - }, { "ImportPath": "github.com/hashicorp/golang-lru", "Rev": "253b2dc1ca8bae42c3b5b6e53dd2eab1a7551116" }, + { + "ImportPath": "github.com/inconshreveable/go-update", + "Rev": "221d034a558b4c21b0624b2a450c076913854a57" + }, { "ImportPath": "github.com/jbenet/go-base58", "Rev": "568a28d73fd97651d3442392036a658b6976eed5" diff --git a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go index b417b63a9..dec21e4b9 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-datastore/lru/datastore.go @@ -3,7 +3,7 @@ package lru import ( "errors" - lru "github.com/hashicorp/golang-lru" + lru "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/golang-lru" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ) diff --git a/core/bootstrap.go b/core/bootstrap.go index 31da79da0..6c1fb677e 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -64,7 +64,7 @@ func bootstrap(ctx context.Context, var notConnected []peer.Peer for _, p := range bootstrapPeers { - if !n.IsConnected(p) { + if n.Connectedness(p) != inet.Connected { notConnected = append(notConnected, p) } } diff --git a/net/interface.go b/net/interface.go index fdf2c3c76..01f7fd8ee 100644 --- a/net/interface.go +++ b/net/interface.go @@ -20,14 +20,17 @@ type Network interface { // Listen(*ma.Muliaddr) error // TODO: for now, only listen on addrs in local peer when initializing. + // LocalPeer returns the local peer associated with this network + LocalPeer() peer.Peer + // DialPeer attempts to establish a connection to a given peer DialPeer(context.Context, peer.Peer) error // ClosePeer connection to peer ClosePeer(peer.Peer) error - // IsConnected returns whether a connection to given peer exists. - IsConnected(peer.Peer) bool + // Connectedness returns a state signaling connection capabilities + Connectedness(peer.Peer) Connectedness // GetProtocols returns the protocols registered in the network. GetProtocols() *mux.ProtocolMap @@ -71,7 +74,31 @@ type Service srv.Service // (this is usually just a Network, but other services may not need the whole // stack, and thus it becomes easier to mock) type Dialer interface { + // LocalPeer returns the local peer associated with this network + LocalPeer() peer.Peer // DialPeer attempts to establish a connection to a given peer DialPeer(context.Context, peer.Peer) error + + // Connectedness returns a state signaling connection capabilities + Connectedness(peer.Peer) Connectedness } + +// Connectedness signals the capacity for a connection with a given node. +// It is used to signal to services and other peers whether a node is reachable. +type Connectedness int + +const ( + // NotConnected means no connection to peer, and no extra information (default) + NotConnected Connectedness = iota + + // Connected means has an open, live connection to peer + Connected + + // CanConnect means recently connected to peer, terminated gracefully + CanConnect + + // CannotConnect means recently attempted connecting but failed to connect. + // (should signal "made effort, failed") + CannotConnect +) diff --git a/net/net.go b/net/net.go index 98cc78727..46d22fe67 100644 --- a/net/net.go +++ b/net/net.go @@ -1,4 +1,4 @@ -// package net provides an interface for ipfs to interact with the network through +// Package net provides an interface for ipfs to interact with the network through package net import ( @@ -69,6 +69,11 @@ func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error { return err } +// LocalPeer the network's LocalPeer +func (n *IpfsNetwork) LocalPeer() peer.Peer { + return n.swarm.LocalPeer() +} + // ClosePeer connection to peer func (n *IpfsNetwork) ClosePeer(p peer.Peer) error { return n.swarm.CloseConnection(p) @@ -126,3 +131,12 @@ func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr { func (n *IpfsNetwork) InterfaceListenAddresses() ([]ma.Multiaddr, error) { return n.swarm.InterfaceListenAddresses() } + +// Connectedness returns a state signaling connection capabilities +// For now only returns Connecter || NotConnected. Expand into more later. +func (n *IpfsNetwork) Connectedness(p peer.Peer) Connectedness { + if n.swarm.GetConnection(p.ID()) != nil { + return Connected + } + return NotConnected +} diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index c7291a048..4f4a8d282 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -217,3 +217,8 @@ func (s *Swarm) GetPeerList() []peer.Peer { s.connsLock.RUnlock() return out } + +// LocalPeer returns the local peer swarm is associated to. +func (s *Swarm) LocalPeer() peer.Peer { + return s.local +} diff --git a/peer/peer.go b/peer/peer.go index f98825926..b3d3a7db7 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -209,6 +209,10 @@ func (p *peer) Addresses() []ma.Multiaddr { // AddAddress adds the given Multiaddr address to Peer's addresses. // Returns whether this address was a newly added address func (p *peer) AddAddress(a ma.Multiaddr) bool { + if a == nil { + panic("adding a nil Multiaddr") + } + p.Lock() defer p.Unlock() diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 5aa0e8d46..6f3a846d3 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -1,4 +1,4 @@ -// package dht implements a distributed hash table that satisfies the ipfs routing +// Package dht implements a distributed hash table that satisfies the ipfs routing // interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications. package dht @@ -227,7 +227,7 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0) // add self as the provider - pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self}) + pmes.ProviderPeers = pb.PeersToPBPeers(dht.dialer, []peer.Peer{dht.self}) rpmes, err := dht.sendRequest(ctx, p, pmes) if err != nil { @@ -274,14 +274,11 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, } // Perhaps we were given closer peers - var peers []peer.Peer - for _, pb := range pmes.GetCloserPeers() { - pr, err := dht.peerFromInfo(pb) + peers, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetCloserPeers()) + for _, err := range errs { if err != nil { log.Error(err) - continue } - peers = append(peers, pr) } if len(peers) > 0 { @@ -426,22 +423,20 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u. return dht.sendRequest(ctx, p, pmes) } -func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer { +func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer { + peers, errs := pb.PBPeersToPeers(dht.peerstore, pbps) + for _, err := range errs { + log.Errorf("error converting peer: %v", err) + } + var provArr []peer.Peer - for _, prov := range peers { - p, err := dht.peerFromInfo(prov) - if err != nil { - log.Errorf("error getting peer from info: %v", err) - continue - } - - log.Debugf("%s adding provider: %s for %s", dht.self, p, key) - + for _, p := range peers { // Dont add outselves to the list if p.ID().Equal(dht.self.ID()) { continue } + log.Debugf("%s adding provider: %s for %s", dht.self, p, key) // TODO(jbenet) ensure providers is idempotent dht.providers.AddProvider(key, p) provArr = append(provArr, p) @@ -500,37 +495,16 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) { return p, nil } -// peerFromInfo returns a peer using info in the protobuf peer struct -// to lookup or create a peer -func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) { - - id := peer.ID(pbp.GetId()) - - // bail out if it's ourselves - //TODO(jbenet) not sure this should be an error _here_ - if id.Equal(dht.self.ID()) { - return nil, errors.New("found self") - } - - p, err := dht.getPeer(id) - if err != nil { - return nil, err - } - - maddr, err := pbp.Address() - if err != nil { - return nil, err - } - p.AddAddress(maddr) - return p, nil -} - func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) { - p, err := dht.peerFromInfo(pbp) + p, err := pb.PBPeerToPeer(dht.peerstore, pbp) if err != nil { return nil, err } + if dht.dialer.LocalPeer().ID().Equal(p.ID()) { + return nil, errors.New("attempting to ensure connection to self") + } + // dial connection err = dht.dialer.DialPeer(ctx, p) return p, err @@ -583,7 +557,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) { rand.Read(id) p, err := dht.FindPeer(ctx, peer.ID(id)) if err != nil { - log.Error("Bootstrap peer error: %s", err) + log.Errorf("Bootstrap peer error: %s", err) } err = dht.dialer.DialPeer(ctx, p) if err != nil { diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index e440964dc..71d5525b0 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -2,6 +2,7 @@ package dht import ( "bytes" + "sort" "testing" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -64,6 +65,14 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer return addrs, peers, dhts } +func makePeerString(t *testing.T, addr string) peer.Peer { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + t.Fatal(err) + } + return makePeer(maddr) +} + func makePeer(addr ma.Multiaddr) peer.Peer { sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) if err != nil { @@ -406,6 +415,100 @@ func TestFindPeer(t *testing.T) { } } +func TestFindPeersConnectedToPeer(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + u.Debug = false + + _, peers, dhts := setupDHTS(ctx, 4, t) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + dhts[i].dialer.(inet.Network).Close() + } + }() + + // topology: + // 0-1, 1-2, 1-3, 2-3 + err := dhts[0].Connect(ctx, peers[1]) + if err != nil { + t.Fatal(err) + } + + err = dhts[1].Connect(ctx, peers[2]) + if err != nil { + t.Fatal(err) + } + + err = dhts[1].Connect(ctx, peers[3]) + if err != nil { + t.Fatal(err) + } + + err = dhts[2].Connect(ctx, peers[3]) + if err != nil { + t.Fatal(err) + } + + // fmt.Println("0 is", peers[0]) + // fmt.Println("1 is", peers[1]) + // fmt.Println("2 is", peers[2]) + // fmt.Println("3 is", peers[3]) + + ctxT, _ := context.WithTimeout(ctx, time.Second) + pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2].ID()) + if err != nil { + t.Fatal(err) + } + + // shouldFind := []peer.Peer{peers[1], peers[3]} + found := []peer.Peer{} + for nextp := range pchan { + found = append(found, nextp) + } + + // fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2]) + // fmt.Println("should find 1, 3", shouldFind) + // fmt.Println("found", found) + + // testPeerListsMatch(t, shouldFind, found) + + log.Warning("TestFindPeersConnectedToPeer is not quite correct") + if len(found) == 0 { + t.Fatal("didn't find any peers.") + } +} + +func testPeerListsMatch(t *testing.T, p1, p2 []peer.Peer) { + + if len(p1) != len(p2) { + t.Fatal("did not find as many peers as should have", p1, p2) + } + + ids1 := make([]string, len(p1)) + ids2 := make([]string, len(p2)) + + for i, p := range p1 { + ids1[i] = p.ID().Pretty() + } + + for i, p := range p2 { + ids2[i] = p.ID().Pretty() + } + + sort.Sort(sort.StringSlice(ids1)) + sort.Sort(sort.StringSlice(ids2)) + + for i := range ids1 { + if ids1[i] != ids2[i] { + t.Fatal("Didnt find expected peer", ids1[i], ids2) + } + } +} + func TestConnectCollision(t *testing.T) { if testing.Short() { t.SkipNow() diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index fa536edd4..b2d72043d 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -8,6 +8,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + inet "github.com/jbenet/go-ipfs/net" msg "github.com/jbenet/go-ipfs/net/message" mux "github.com/jbenet/go-ipfs/net/mux" peer "github.com/jbenet/go-ipfs/peer" @@ -79,6 +80,7 @@ func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error { // fauxNet is a standin for a swarm.Network in order to more easily recreate // different testing scenarios type fauxNet struct { + local peer.Peer } // DialPeer attempts to establish a connection to a given peer @@ -86,6 +88,10 @@ func (f *fauxNet) DialPeer(context.Context, peer.Peer) error { return nil } +func (f *fauxNet) LocalPeer() peer.Peer { + return f.local +} + // ClosePeer connection to peer func (f *fauxNet) ClosePeer(peer.Peer) error { return nil @@ -96,6 +102,11 @@ func (f *fauxNet) IsConnected(peer.Peer) (bool, error) { return true, nil } +// Connectedness returns whether a connection to given peer exists. +func (f *fauxNet) Connectedness(peer.Peer) inet.Connectedness { + return inet.Connected +} + // GetProtocols returns the protocols registered in the network. func (f *fauxNet) GetProtocols() *mux.ProtocolMap { return nil } @@ -120,15 +131,15 @@ func TestGetFailures(t *testing.T) { t.SkipNow() } + peerstore := peer.NewPeerstore() + local := makePeerString(t, "") + ctx := context.Background() - fn := &fauxNet{} + fn := &fauxNet{local} fs := &fauxSender{} - peerstore := peer.NewPeerstore() - local := makePeer(nil) - d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - other := makePeer(nil) + other := makePeerString(t, "") d.Update(ctx, other) // This one should time out @@ -219,14 +230,14 @@ func TestNotFound(t *testing.T) { t.SkipNow() } - ctx := context.Background() - fn := &fauxNet{} - fs := &fauxSender{} - - local := makePeer(nil) + local := makePeerString(t, "") peerstore := peer.NewPeerstore() peerstore.Add(local) + ctx := context.Background() + fn := &fauxNet{local} + fs := &fauxSender{} + d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) var ps []peer.Peer @@ -251,7 +262,7 @@ func TestNotFound(t *testing.T) { for i := 0; i < 7; i++ { peers = append(peers, _randPeer()) } - resp.CloserPeers = pb.PeersToPBPeers(peers) + resp.CloserPeers = pb.PeersToPBPeers(d.dialer, peers) mes, err := msg.FromObject(mes.Peer(), resp) if err != nil { t.Error(err) @@ -285,14 +296,15 @@ func TestNotFound(t *testing.T) { func TestLessThanKResponses(t *testing.T) { // t.Skip("skipping test because it makes a lot of output") - ctx := context.Background() - u.Debug = false - fn := &fauxNet{} - fs := &fauxSender{} - local := makePeer(nil) + local := makePeerString(t, "") peerstore := peer.NewPeerstore() peerstore.Add(local) + ctx := context.Background() + u.Debug = false + fn := &fauxNet{local} + fs := &fauxSender{} + d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) var ps []peer.Peer @@ -314,7 +326,7 @@ func TestLessThanKResponses(t *testing.T) { case pb.Message_GET_VALUE: resp := &pb.Message{ Type: pmes.Type, - CloserPeers: pb.PeersToPBPeers([]peer.Peer{other}), + CloserPeers: pb.PeersToPBPeers(d.dialer, []peer.Peer{other}), } mes, err := msg.FromObject(mes.Peer(), resp) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 07f21f18a..f7e8073da 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -87,7 +87,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Me provs := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey())) if len(provs) > 0 { log.Debugf("handleGetValue returning %d provider[s]", len(provs)) - resp.ProviderPeers = pb.PeersToPBPeers(provs) + resp.ProviderPeers = pb.PeersToPBPeers(dht.dialer, provs) } // Find closest peer on given cluster to desired key and reply with that info @@ -99,7 +99,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Me log.Critical("no addresses on peer being sent!") } } - resp.CloserPeers = pb.PeersToPBPeers(closer) + resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, closer) } return resp, nil @@ -159,7 +159,8 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Me for _, p := range withAddresses { log.Debugf("handleFindPeer: sending back '%s'", p) } - resp.CloserPeers = pb.PeersToPBPeers(withAddresses) + + resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, withAddresses) return resp, nil } @@ -183,13 +184,13 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *p // if we've got providers, send thos those. if providers != nil && len(providers) > 0 { - resp.ProviderPeers = pb.PeersToPBPeers(providers) + resp.ProviderPeers = pb.PeersToPBPeers(dht.dialer, providers) } // Also send closer peers. closer := dht.betterPeersToQuery(pmes, CloserPeerCount) if closer != nil { - resp.CloserPeers = pb.PeersToPBPeers(closer) + resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, closer) } return resp, nil @@ -210,14 +211,16 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.Peer, pmes *pb pid := peer.ID(pb.GetId()) if pid.Equal(p.ID()) { - addr, err := pb.Address() + maddrs, err := pb.Addresses() if err != nil { - log.Errorf("provider %s error with address %s", p, *pb.Addr) + log.Errorf("provider %s error with addresses %s", p, pb.Addrs) continue } - log.Infof("received provider %s %s for %s", p, addr, key) - p.AddAddress(addr) + log.Infof("received provider %s %s for %s", p, maddrs, key) + for _, maddr := range maddrs { + p.AddAddress(maddr) + } dht.providers.AddProvider(key, p) } else { diff --git a/routing/dht/pb/dht.pb.go b/routing/dht/pb/dht.pb.go index 3e52a94ed..e102ef7d3 100644 --- a/routing/dht/pb/dht.pb.go +++ b/routing/dht/pb/dht.pb.go @@ -15,10 +15,12 @@ It has these top-level messages: package dht_pb import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" +import json "encoding/json" import math "math" -// Reference imports to suppress errors if they are not otherwise used. +// Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal +var _ = &json.SyntaxError{} var _ = math.Inf type Message_MessageType int32 @@ -66,6 +68,50 @@ func (x *Message_MessageType) UnmarshalJSON(data []byte) error { return nil } +type Message_ConnectionType int32 + +const ( + // sender does not have a connection to peer, and no extra information (default) + Message_NOT_CONNECTED Message_ConnectionType = 0 + // sender has a live connection to peer + Message_CONNECTED Message_ConnectionType = 1 + // sender recently connected to peer + Message_CAN_CONNECT Message_ConnectionType = 2 + // sender recently tried to connect to peer repeatedly but failed to connect + // ("try" here is loose, but this should signal "made strong effort, failed") + Message_CANNOT_CONNECT Message_ConnectionType = 3 +) + +var Message_ConnectionType_name = map[int32]string{ + 0: "NOT_CONNECTED", + 1: "CONNECTED", + 2: "CAN_CONNECT", + 3: "CANNOT_CONNECT", +} +var Message_ConnectionType_value = map[string]int32{ + "NOT_CONNECTED": 0, + "CONNECTED": 1, + "CAN_CONNECT": 2, + "CANNOT_CONNECT": 3, +} + +func (x Message_ConnectionType) Enum() *Message_ConnectionType { + p := new(Message_ConnectionType) + *p = x + return p +} +func (x Message_ConnectionType) String() string { + return proto.EnumName(Message_ConnectionType_name, int32(x)) +} +func (x *Message_ConnectionType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Message_ConnectionType_value, data, "Message_ConnectionType") + if err != nil { + return err + } + *x = Message_ConnectionType(value) + return nil +} + type Message struct { // defines what type of message it is. Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.pb.Message_MessageType" json:"type,omitempty"` @@ -133,9 +179,13 @@ func (m *Message) GetProviderPeers() []*Message_Peer { } type Message_Peer struct { - Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` - Addr *string `protobuf:"bytes,2,opt,name=addr" json:"addr,omitempty"` - XXX_unrecognized []byte `json:"-"` + // ID of a given peer. + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + // multiaddrs for a given peer + Addrs []string `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` + // used to signal the sender's connection capabilities to the peer + Connection *Message_ConnectionType `protobuf:"varint,3,opt,name=connection,enum=dht.pb.Message_ConnectionType" json:"connection,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *Message_Peer) Reset() { *m = Message_Peer{} } @@ -149,11 +199,18 @@ func (m *Message_Peer) GetId() string { return "" } -func (m *Message_Peer) GetAddr() string { - if m != nil && m.Addr != nil { - return *m.Addr +func (m *Message_Peer) GetAddrs() []string { + if m != nil { + return m.Addrs } - return "" + return nil +} + +func (m *Message_Peer) GetConnection() Message_ConnectionType { + if m != nil && m.Connection != nil { + return *m.Connection + } + return Message_NOT_CONNECTED } // Record represents a dht record that contains a value @@ -204,4 +261,5 @@ func (m *Record) GetSignature() []byte { func init() { proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) + proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value) } diff --git a/routing/dht/pb/dht.proto b/routing/dht/pb/dht.proto index 1b49a1552..6f31dd5e3 100644 --- a/routing/dht/pb/dht.proto +++ b/routing/dht/pb/dht.proto @@ -12,9 +12,30 @@ message Message { PING = 5; } + enum ConnectionType { + // sender does not have a connection to peer, and no extra information (default) + NOT_CONNECTED = 0; + + // sender has a live connection to peer + CONNECTED = 1; + + // sender recently connected to peer + CAN_CONNECT = 2; + + // sender recently tried to connect to peer repeatedly but failed to connect + // ("try" here is loose, but this should signal "made strong effort, failed") + CANNOT_CONNECT = 3; + } + message Peer { + // ID of a given peer. optional string id = 1; - optional string addr = 2; + + // multiaddrs for a given peer + repeated string addrs = 2; + + // used to signal the sender's connection capabilities to the peer + optional ConnectionType connection = 3; } // defines what type of message it is. diff --git a/routing/dht/pb/message.go b/routing/dht/pb/message.go index 6ea98d4cd..82230422a 100644 --- a/routing/dht/pb/message.go +++ b/routing/dht/pb/message.go @@ -2,12 +2,15 @@ package dht_pb import ( "errors" + "fmt" - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + + inet "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" ) +// NewMessage constructs a new dht message with given type, key, and level func NewMessage(typ Message_MessageType, key string, level int) *Message { m := &Message{ Type: &typ, @@ -19,19 +22,38 @@ func NewMessage(typ Message_MessageType, key string, level int) *Message { func peerToPBPeer(p peer.Peer) *Message_Peer { pbp := new(Message_Peer) - addrs := p.Addresses() - if len(addrs) == 0 || addrs[0] == nil { - pbp.Addr = proto.String("") - } else { - addr := addrs[0].String() - pbp.Addr = &addr + + maddrs := p.Addresses() + pbp.Addrs = make([]string, len(maddrs)) + for i, maddr := range maddrs { + pbp.Addrs[i] = maddr.String() } pid := string(p.ID()) pbp.Id = &pid return pbp } -func PeersToPBPeers(peers []peer.Peer) []*Message_Peer { +// PBPeerToPeer turns a *Message_Peer into its peer.Peer counterpart +func PBPeerToPeer(ps peer.Peerstore, pbp *Message_Peer) (peer.Peer, error) { + p, err := ps.FindOrCreate(peer.ID(pbp.GetId())) + if err != nil { + return nil, fmt.Errorf("Failed to get peer from peerstore: %s", err) + } + + // add addresses + maddrs, err := pbp.Addresses() + if err != nil { + return nil, fmt.Errorf("Received peer with bad or missing addresses: %s", pbp.Addrs) + } + for _, maddr := range maddrs { + p.AddAddress(maddr) + } + return p, nil +} + +// RawPeersToPBPeers converts a slice of Peers into a slice of *Message_Peers, +// ready to go out on the wire. +func RawPeersToPBPeers(peers []peer.Peer) []*Message_Peer { pbpeers := make([]*Message_Peer, len(peers)) for i, p := range peers { pbpeers[i] = peerToPBPeer(p) @@ -39,12 +61,52 @@ func PeersToPBPeers(peers []peer.Peer) []*Message_Peer { return pbpeers } -// Address returns a multiaddr associated with the Message_Peer entry -func (m *Message_Peer) Address() (ma.Multiaddr, error) { +// PeersToPBPeers converts given []peer.Peer into a set of []*Message_Peer, +// which can be written to a message and sent out. the key thing this function +// does (in addition to PeersToPBPeers) is set the ConnectionType with +// information from the given inet.Dialer. +func PeersToPBPeers(d inet.Dialer, peers []peer.Peer) []*Message_Peer { + pbps := RawPeersToPBPeers(peers) + for i, pbp := range pbps { + c := ConnectionType(d.Connectedness(peers[i])) + pbp.Connection = &c + } + return pbps +} + +// PBPeersToPeers converts given []*Message_Peer into a set of []peer.Peer +// Returns two slices, one of peers, and one of errors. The slice of peers +// will ONLY contain successfully converted peers. The slice of errors contains +// whether each input Message_Peer was successfully converted. +func PBPeersToPeers(ps peer.Peerstore, pbps []*Message_Peer) ([]peer.Peer, []error) { + errs := make([]error, len(pbps)) + peers := make([]peer.Peer, 0, len(pbps)) + for i, pbp := range pbps { + p, err := PBPeerToPeer(ps, pbp) + if err != nil { + errs[i] = err + } else { + peers = append(peers, p) + } + } + return peers, errs +} + +// Addresses returns a multiaddr associated with the Message_Peer entry +func (m *Message_Peer) Addresses() ([]ma.Multiaddr, error) { if m == nil { return nil, errors.New("MessagePeer is nil") } - return ma.NewMultiaddr(*m.Addr) + + var err error + maddrs := make([]ma.Multiaddr, len(m.Addrs)) + for i, addr := range m.Addrs { + maddrs[i], err = ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + } + return maddrs, nil } // GetClusterLevel gets and adjusts the cluster level on the message. @@ -66,6 +128,7 @@ func (m *Message) SetClusterLevel(level int) { m.ClusterLevelRaw = &lvl } +// Loggable turns a Message into machine-readable log output func (m *Message) Loggable() map[string]interface{} { return map[string]interface{}{ "message": map[string]string{ @@ -73,3 +136,37 @@ func (m *Message) Loggable() map[string]interface{} { }, } } + +// ConnectionType returns a Message_ConnectionType associated with the +// inet.Connectedness. +func ConnectionType(c inet.Connectedness) Message_ConnectionType { + switch c { + default: + return Message_NOT_CONNECTED + case inet.NotConnected: + return Message_NOT_CONNECTED + case inet.Connected: + return Message_CONNECTED + case inet.CanConnect: + return Message_CAN_CONNECT + case inet.CannotConnect: + return Message_CANNOT_CONNECT + } +} + +// Connectedness returns an inet.Connectedness associated with the +// Message_ConnectionType. +func Connectedness(c Message_ConnectionType) inet.Connectedness { + switch c { + default: + return inet.NotConnected + case Message_NOT_CONNECTED: + return inet.NotConnected + case Message_CONNECTED: + return inet.Connected + case Message_CAN_CONNECT: + return inet.CanConnect + case Message_CANNOT_CONNECT: + return inet.CannotConnect + } +} diff --git a/routing/dht/query.go b/routing/dht/query.go index f0478ff29..f4e43132d 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -161,7 +161,12 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) { return } - // if new peer further away than whom we got it from, bother (loops) + // if new peer is ourselves... + if next.ID().Equal(r.query.dialer.LocalPeer().ID()) { + return + } + + // if new peer further away than whom we got it from, don't bother (loops) if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) { return } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 47a64d3fd..aeced86b1 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -5,6 +5,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + inet "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" "github.com/jbenet/go-ipfs/routing" pb "github.com/jbenet/go-ipfs/routing/dht/pb" @@ -12,6 +13,12 @@ import ( u "github.com/jbenet/go-ipfs/util" ) +// asyncQueryBuffer is the size of buffered channels in async queries. This +// buffer allows multiple queries to execute simultaneously, return their +// results and continue querying closer peers. Note that different query +// results will wait for the channel to drain. +var asyncQueryBuffer = 10 + // This file implements the Routing interface for the IpfsDHT struct. // Basic Put/Get @@ -125,6 +132,9 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { return nil } +// FindProvidersAsync is the same thing as FindProviders, but returns a channel. +// Peers will be returned on the channel as soon as they are found, even before +// the search query completes. func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer { log.Event(ctx, "findProviders", &key) peerOut := make(chan peer.Peer, count) @@ -199,7 +209,6 @@ func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.M wg.Wait() } -// Find specific Peer // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) { @@ -232,26 +241,21 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) } closer := pmes.GetCloserPeers() - var clpeers []peer.Peer - for _, pbp := range closer { - np, err := dht.getPeer(peer.ID(pbp.GetId())) + clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer) + for _, err := range errs { if err != nil { - log.Warningf("Received invalid peer from query: %v", err) - continue + log.Warning(err) } - ma, err := pbp.Address() - if err != nil { - log.Warning("Received peer with bad or missing address.") - continue - } - np.AddAddress(ma) - if pbp.GetId() == string(id) { + } + + // see it we got the peer here + for _, np := range clpeers { + if string(np.ID()) == string(id) { return &dhtQueryResult{ peer: np, success: true, }, nil } - clpeers = append(clpeers, np) } return &dhtQueryResult{closerPeers: clpeers}, nil @@ -271,6 +275,75 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) return result.peer, nil } +// FindPeersConnectedToPeer searches for peers directly connected to a given peer. +func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.Peer, error) { + + peerchan := make(chan peer.Peer, asyncQueryBuffer) + peersSeen := map[string]peer.Peer{} + + routeLevel := 0 + closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue) + if closest == nil || len(closest) == 0 { + return nil, kb.ErrLookupFailure + } + + // setup the Query + query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { + + pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel) + if err != nil { + return nil, err + } + + var clpeers []peer.Peer + closer := pmes.GetCloserPeers() + for _, pbp := range closer { + // skip peers already seen + if _, found := peersSeen[string(pbp.GetId())]; found { + continue + } + + // skip peers that fail to unmarshal + p, err := pb.PBPeerToPeer(dht.peerstore, pbp) + if err != nil { + log.Warning(err) + continue + } + + // if peer is connected, send it to our client. + if pb.Connectedness(*pbp.Connection) == inet.Connected { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case peerchan <- p: + } + } + + peersSeen[string(p.ID())] = p + + // if peer is the peer we're looking for, don't bother querying it. + if pb.Connectedness(*pbp.Connection) != inet.Connected { + clpeers = append(clpeers, p) + } + } + + return &dhtQueryResult{closerPeers: clpeers}, nil + }) + + // run it! run it asynchronously to gen peers as results are found. + // this does no error checking + go func() { + if _, err := query.Run(ctx, closest); err != nil { + log.Error(err) + } + + // close the peerchan channel when done. + close(peerchan) + }() + + return peerchan, nil +} + // Ping a peer, log the time it took func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error { // Thoughts: maybe this should accept an ID and do a peer lookup?