mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-06 00:38:08 +08:00
fix bug in routing table lookups
This commit is contained in:
parent
dc451fba2d
commit
bd9fc2b782
@ -106,6 +106,14 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
if removed != nil {
|
||||
panic("need to remove this peer.")
|
||||
}
|
||||
|
||||
// Ping new peer to register in their routing table
|
||||
// NOTE: this should be done better...
|
||||
err = dht.Ping(peer, time.Second * 2)
|
||||
if err != nil {
|
||||
panic("Failed to ping new peer.")
|
||||
}
|
||||
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
@ -149,7 +157,7 @@ func (dht *IpfsDHT) handleMessages() {
|
||||
}
|
||||
//
|
||||
|
||||
u.DOut("Got message type: '%s' [id = %x]", mesNames[pmes.GetType()], pmes.GetId())
|
||||
u.DOut("Got message type: '%s' [id = %x]", DHTMessage_MessageType_name[int32(pmes.GetType())], pmes.GetId())
|
||||
switch pmes.GetType() {
|
||||
case DHTMessage_GET_VALUE:
|
||||
dht.handleGetValue(mes.Peer, pmes)
|
||||
@ -215,14 +223,18 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
|
||||
u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
|
||||
closest := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
|
||||
if closest == nil {
|
||||
panic("could not find anything.")
|
||||
}
|
||||
|
||||
if len(closest.Addresses) == 0 {
|
||||
panic("no addresses for connected peer...")
|
||||
}
|
||||
|
||||
u.POut("handleFindPeer: sending back '%s'", closest.ID.Pretty())
|
||||
|
||||
addr,err := closest.Addresses[0].String()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@ -45,7 +45,7 @@ func TestPing(t *testing.T) {
|
||||
dht_a.Start()
|
||||
dht_b.Start()
|
||||
|
||||
err = dht_a.Connect(addr_b)
|
||||
_,err = dht_a.Connect(addr_b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -92,7 +92,7 @@ func TestValueGetSet(t *testing.T) {
|
||||
dht_a.Start()
|
||||
dht_b.Start()
|
||||
|
||||
err = dht_a.Connect(addr_b)
|
||||
_,err = dht_a.Connect(addr_b)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ const (
|
||||
DHTMessage_GET_PROVIDERS DHTMessage_MessageType = 3
|
||||
DHTMessage_FIND_NODE DHTMessage_MessageType = 4
|
||||
DHTMessage_PING DHTMessage_MessageType = 5
|
||||
DHTMessage_DIAGNOSTIC DHTMessage_MessageType = 6
|
||||
)
|
||||
|
||||
var DHTMessage_MessageType_name = map[int32]string{
|
||||
@ -38,6 +39,7 @@ var DHTMessage_MessageType_name = map[int32]string{
|
||||
3: "GET_PROVIDERS",
|
||||
4: "FIND_NODE",
|
||||
5: "PING",
|
||||
6: "DIAGNOSTIC",
|
||||
}
|
||||
var DHTMessage_MessageType_value = map[string]int32{
|
||||
"PUT_VALUE": 0,
|
||||
@ -46,6 +48,7 @@ var DHTMessage_MessageType_value = map[string]int32{
|
||||
"GET_PROVIDERS": 3,
|
||||
"FIND_NODE": 4,
|
||||
"PING": 5,
|
||||
"DIAGNOSTIC": 6,
|
||||
}
|
||||
|
||||
func (x DHTMessage_MessageType) Enum() *DHTMessage_MessageType {
|
||||
@ -66,14 +69,12 @@ func (x *DHTMessage_MessageType) UnmarshalJSON(data []byte) error {
|
||||
}
|
||||
|
||||
type DHTMessage struct {
|
||||
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
|
||||
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
|
||||
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
|
||||
// Unique ID of this message, used to match queries with responses
|
||||
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
|
||||
// Signals whether or not this message is a response to another message
|
||||
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
Type *DHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.DHTMessage_MessageType" json:"type,omitempty"`
|
||||
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
|
||||
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
|
||||
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
|
||||
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *DHTMessage) Reset() { *m = DHTMessage{} }
|
||||
|
||||
@ -10,6 +10,7 @@ message DHTMessage {
|
||||
GET_PROVIDERS = 3;
|
||||
FIND_NODE = 4;
|
||||
PING = 5;
|
||||
DIAGNOSTIC = 6;
|
||||
}
|
||||
|
||||
required MessageType type = 1;
|
||||
|
||||
@ -9,17 +9,6 @@ type pDHTMessage struct {
|
||||
Id uint64
|
||||
}
|
||||
|
||||
var mesNames [10]string
|
||||
|
||||
func init() {
|
||||
mesNames[DHTMessage_ADD_PROVIDER] = "add provider"
|
||||
mesNames[DHTMessage_FIND_NODE] = "find node"
|
||||
mesNames[DHTMessage_GET_PROVIDERS] = "get providers"
|
||||
mesNames[DHTMessage_GET_VALUE] = "get value"
|
||||
mesNames[DHTMessage_PUT_VALUE] = "put value"
|
||||
mesNames[DHTMessage_PING] = "ping"
|
||||
}
|
||||
|
||||
func (m *pDHTMessage) ToProtobuf() *DHTMessage {
|
||||
pmes := new(DHTMessage)
|
||||
if m.Value != nil {
|
||||
|
||||
@ -194,6 +194,17 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.Connect(maddr)
|
||||
found_peer, err := s.Connect(maddr)
|
||||
if err != nil {
|
||||
u.POut("Found peer but couldnt connect.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !found_peer.ID.Equal(id) {
|
||||
u.POut("FindPeer: searching for '%s' but found '%s'", id.Pretty(), found_peer.ID.Pretty())
|
||||
return found_peer, u.ErrSearchIncomplete
|
||||
}
|
||||
|
||||
return found_peer, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
package dht
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"container/list"
|
||||
"sort"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
// RoutingTable defines the routing table.
|
||||
@ -87,13 +89,13 @@ func (p peerSorterArr) Less(a, b int) bool {
|
||||
}
|
||||
//
|
||||
|
||||
func (rt *RoutingTable) copyPeersFromList(peerArr peerSorterArr, peerList *list.List) peerSorterArr {
|
||||
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
|
||||
for e := peerList.Front(); e != nil; e = e.Next() {
|
||||
p := e.Value.(*peer.Peer)
|
||||
p_id := convertPeerID(p.ID)
|
||||
pd := peerDistance{
|
||||
p: p,
|
||||
distance: xor(rt.local, p_id),
|
||||
distance: xor(target, p_id),
|
||||
}
|
||||
peerArr = append(peerArr, &pd)
|
||||
}
|
||||
@ -112,6 +114,7 @@ func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
|
||||
|
||||
// Returns a list of the 'count' closest peers to the given ID
|
||||
func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
|
||||
u.POut("Searching table, size = %d", rt.Size())
|
||||
cpl := xor(id, rt.local).commonPrefixLen()
|
||||
|
||||
// Get bucket at cpl index or last bucket
|
||||
@ -127,16 +130,16 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
|
||||
// if this happens, search both surrounding buckets for nearest peer
|
||||
if cpl > 0 {
|
||||
plist := (*list.List)(rt.Buckets[cpl - 1])
|
||||
peerArr = rt.copyPeersFromList(peerArr, plist)
|
||||
peerArr = copyPeersFromList(id, peerArr, plist)
|
||||
}
|
||||
|
||||
if cpl < len(rt.Buckets) - 1 {
|
||||
plist := (*list.List)(rt.Buckets[cpl + 1])
|
||||
peerArr = rt.copyPeersFromList(peerArr, plist)
|
||||
peerArr = copyPeersFromList(id, peerArr, plist)
|
||||
}
|
||||
} else {
|
||||
plist := (*list.List)(bucket)
|
||||
peerArr = rt.copyPeersFromList(peerArr, plist)
|
||||
peerArr = copyPeersFromList(id, peerArr, plist)
|
||||
}
|
||||
|
||||
// Sort by distance to local peer
|
||||
@ -145,7 +148,18 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
|
||||
var out []*peer.Peer
|
||||
for i := 0; i < count && i < peerArr.Len(); i++ {
|
||||
out = append(out, peerArr[i].p)
|
||||
u.POut("peer out: %s - %s", peerArr[i].p.ID.Pretty(),
|
||||
hex.EncodeToString(xor(id, convertPeerID(peerArr[i].p.ID))))
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
// Returns the total number of peers in the routing table
|
||||
func (rt *RoutingTable) Size() int {
|
||||
var tot int
|
||||
for _,buck := range rt.Buckets {
|
||||
tot += buck.Len()
|
||||
}
|
||||
return tot
|
||||
}
|
||||
|
||||
@ -90,3 +90,20 @@ func TestTableUpdate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTableFind(t *testing.T) {
|
||||
local := _randPeer()
|
||||
rt := NewRoutingTable(10, convertPeerID(local.ID))
|
||||
|
||||
peers := make([]*peer.Peer, 100)
|
||||
for i := 0; i < 5; i++ {
|
||||
peers[i] = _randPeer()
|
||||
rt.Update(peers[i])
|
||||
}
|
||||
|
||||
t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty())
|
||||
found := rt.NearestPeer(convertPeerID(peers[2].ID))
|
||||
if !found.ID.Equal(peers[2].ID) {
|
||||
t.Fatalf("Failed to lookup known node...")
|
||||
}
|
||||
}
|
||||
|
||||
@ -245,7 +245,7 @@ func (s *Swarm) fanOut() {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key().Pretty())
|
||||
//u.DOut("fanOut: outgoing message for: '%s'", msg.Peer.Key().Pretty())
|
||||
|
||||
s.connsLock.RLock()
|
||||
conn, found := s.conns[msg.Peer.Key()]
|
||||
|
||||
@ -2,6 +2,7 @@ package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"errors"
|
||||
mh "github.com/jbenet/go-multihash"
|
||||
"os"
|
||||
"os/user"
|
||||
@ -13,10 +14,14 @@ import (
|
||||
var Debug bool
|
||||
|
||||
// ErrNotImplemented signifies a function has not been implemented yet.
|
||||
var ErrNotImplemented = fmt.Errorf("Error: not implemented yet.")
|
||||
var ErrNotImplemented = errors.New("Error: not implemented yet.")
|
||||
|
||||
// ErrTimeout implies that a timeout has been triggered
|
||||
var ErrTimeout = fmt.Errorf("Error: Call timed out.")
|
||||
var ErrTimeout = errors.New("Error: Call timed out.")
|
||||
|
||||
// ErrSeErrSearchIncomplete implies that a search type operation didnt
|
||||
// find the expected node, but did find 'a' node.
|
||||
var ErrSearchIncomplete = errors.New("Error: Search Incomplete.")
|
||||
|
||||
// Key is a string representation of multihash for use with maps.
|
||||
type Key string
|
||||
|
||||
Loading…
Reference in New Issue
Block a user