mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-02 23:08:07 +08:00
refactor peer distance search + handleGetProviders
This commit is contained in:
parent
9eb41e7237
commit
69ed45c555
@ -245,24 +245,8 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
|
||||
}
|
||||
|
||||
// Find closest peer on given cluster to desired key and reply with that info
|
||||
// TODO: this should probably be decomposed.
|
||||
|
||||
// stored levels are > 1, to distinguish missing levels.
|
||||
level := pmes.GetClusterLevel()
|
||||
u.DOut("handleGetValue searching level %d clusters\n", level)
|
||||
|
||||
ck := kb.ConvertKey(u.Key(pmes.GetKey()))
|
||||
closer := dht.routingTables[level].NearestPeer(ck)
|
||||
|
||||
// if closer peer is self, return nil
|
||||
if closer.ID.Equal(dht.self.ID) {
|
||||
u.DOut("Attempted to return self! this shouldnt happen...\n")
|
||||
resp.CloserPeers = nil
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// if self is closer than the one from the table, return nil
|
||||
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
|
||||
closer := dht.betterPeerToQuery(pmes)
|
||||
if closer == nil {
|
||||
u.DOut("handleGetValue could not find a closer node than myself.\n")
|
||||
resp.CloserPeers = nil
|
||||
return resp, nil
|
||||
@ -293,12 +277,15 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
|
||||
|
||||
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
|
||||
resp := &Message{Type: pmes.Type}
|
||||
var closest *peer.Peer
|
||||
|
||||
level := pmes.GetClusterLevel()
|
||||
u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
|
||||
// if looking for self... special case where we send it on CloserPeers.
|
||||
if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
|
||||
closest = dht.self
|
||||
} else {
|
||||
closest = dht.betterPeerToQuery(pmes)
|
||||
}
|
||||
|
||||
ck := kb.ConvertKey(u.Key(pmes.GetKey()))
|
||||
closest := dht.routingTables[level].NearestPeer(ck)
|
||||
if closest == nil {
|
||||
u.PErr("handleFindPeer: could not find anything.\n")
|
||||
return resp, nil
|
||||
@ -309,52 +296,42 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// If the found peer further away than this peer...
|
||||
if kb.Closer(dht.self.ID, closest.ID, u.Key(pmes.GetKey())) {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
|
||||
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closest})
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) {
|
||||
resp := Message{
|
||||
Type: Message_GET_PROVIDERS,
|
||||
Key: pmes.GetKey(),
|
||||
ID: pmes.GetId(),
|
||||
Response: true,
|
||||
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
|
||||
resp := &Message{
|
||||
Type: pmes.Type,
|
||||
Key: pmes.Key,
|
||||
}
|
||||
|
||||
// check if we have this value, to add ourselves as provider.
|
||||
has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
|
||||
if err != nil {
|
||||
dht.netChan.Errors <- err
|
||||
if err != nil && err != ds.ErrNotFound {
|
||||
u.PErr("unexpected datastore error: %v\n", err)
|
||||
has = false
|
||||
}
|
||||
|
||||
// setup providers
|
||||
providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
|
||||
if has {
|
||||
providers = append(providers, dht.self)
|
||||
}
|
||||
if providers == nil || len(providers) == 0 {
|
||||
level := 0
|
||||
if len(pmes.GetValue()) > 0 {
|
||||
level = int(pmes.GetValue()[0])
|
||||
}
|
||||
|
||||
closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
|
||||
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
|
||||
resp.Peers = nil
|
||||
} else {
|
||||
resp.Peers = []*peer.Peer{closer}
|
||||
}
|
||||
} else {
|
||||
resp.Peers = providers
|
||||
resp.Success = true
|
||||
// if we've got providers, send thos those.
|
||||
if providers != nil && len(providers) > 0 {
|
||||
resp.ProviderPeers = peersToPBPeers(providers)
|
||||
}
|
||||
|
||||
mes := swarm.NewMessage(p, resp.ToProtobuf())
|
||||
dht.netChan.Outgoing <- mes
|
||||
// Also send closer peers.
|
||||
closer := dht.betterPeerToQuery(pmes)
|
||||
if closer != nil {
|
||||
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type providerInfo struct {
|
||||
@ -671,6 +648,40 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*Message_PBPeer) []*peer.Peer
|
||||
return provArr
|
||||
}
|
||||
|
||||
// nearestPeerToQuery returns the routing tables closest peers.
|
||||
func (dht *IpfsDHT) nearestPeerToQuery(pmes *Message) *peer.Peer {
|
||||
level := pmes.GetClusterLevel()
|
||||
cluster := dht.routingTables[level]
|
||||
|
||||
key := u.Key(pmes.GetKey())
|
||||
closer := cluster.NearestPeer(kb.ConvertKey(key))
|
||||
return closer
|
||||
}
|
||||
|
||||
// betterPeerToQuery returns nearestPeerToQuery, but iff closer than self.
|
||||
func (dht *IpfsDHT) betterPeerToQuery(pmes *Message) *peer.Peer {
|
||||
closer := dht.nearestPeerToQuery(pmes)
|
||||
|
||||
// no node? nil
|
||||
if closer == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// == to self? nil
|
||||
if closer.ID.Equal(dht.self.ID) {
|
||||
u.DOut("Attempted to return self! this shouldnt happen...\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
// self is closer? nil
|
||||
if kb.Closer(dht.self.ID, closer.ID, key) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ok seems like a closer node.
|
||||
return closer
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) peerFromInfo(pbp *Message_PBPeer) (*peer.Peer, error) {
|
||||
maddr, err := ma.NewMultiaddr(pbp.GetAddr())
|
||||
if err != nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user