mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-01 22:37:51 +08:00
allow peers to realize that they are actually a provider for a value
This commit is contained in:
parent
9338caa9d8
commit
85273daaa5
@ -2,6 +2,7 @@ package dht
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@ -59,7 +60,7 @@ func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT {
|
||||
dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
|
||||
dht.datastore = dstore
|
||||
dht.self = p
|
||||
dht.providers = NewProviderManager()
|
||||
dht.providers = NewProviderManager(p.ID)
|
||||
dht.shutdown = make(chan struct{})
|
||||
|
||||
dht.routingTables = make([]*kb.RoutingTable, 3)
|
||||
@ -293,7 +294,15 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
|
||||
Response: true,
|
||||
}
|
||||
|
||||
has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
|
||||
if err != nil {
|
||||
dht.netChan.Errors <- err
|
||||
}
|
||||
|
||||
providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
|
||||
if has {
|
||||
providers = append(providers, dht.self)
|
||||
}
|
||||
if providers == nil || len(providers) == 0 {
|
||||
level := 0
|
||||
if len(pmes.GetValue()) > 0 {
|
||||
@ -637,3 +646,18 @@ func (dht *IpfsDHT) peerFromInfo(pbp *PBDHTMessage_PBPeer) (*peer.Peer, error) {
|
||||
|
||||
return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) loadProvidableKeys() error {
|
||||
kl := dht.datastore.KeyList()
|
||||
for _, k := range kl {
|
||||
dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Builds up list of peers by requesting random peer IDs
|
||||
func (dht *IpfsDHT) Bootstrap() {
|
||||
id := make([]byte, 16)
|
||||
rand.Read(id)
|
||||
dht.FindPeer(peer.ID(id), time.Second*10)
|
||||
}
|
||||
|
||||
@ -9,9 +9,13 @@ import (
|
||||
|
||||
type ProviderManager struct {
|
||||
providers map[u.Key][]*providerInfo
|
||||
local map[u.Key]struct{}
|
||||
lpeer peer.ID
|
||||
getlocal chan chan []u.Key
|
||||
newprovs chan *addProv
|
||||
getprovs chan *getProv
|
||||
halt chan struct{}
|
||||
period time.Duration
|
||||
}
|
||||
|
||||
type addProv struct {
|
||||
@ -24,11 +28,13 @@ type getProv struct {
|
||||
resp chan []*peer.Peer
|
||||
}
|
||||
|
||||
func NewProviderManager() *ProviderManager {
|
||||
func NewProviderManager(local peer.ID) *ProviderManager {
|
||||
pm := new(ProviderManager)
|
||||
pm.getprovs = make(chan *getProv)
|
||||
pm.newprovs = make(chan *addProv)
|
||||
pm.providers = make(map[u.Key][]*providerInfo)
|
||||
pm.getlocal = make(chan chan []u.Key)
|
||||
pm.local = make(map[u.Key]struct{})
|
||||
pm.halt = make(chan struct{})
|
||||
go pm.run()
|
||||
return pm
|
||||
@ -39,6 +45,9 @@ func (pm *ProviderManager) run() {
|
||||
for {
|
||||
select {
|
||||
case np := <-pm.newprovs:
|
||||
if np.val.ID.Equal(pm.lpeer) {
|
||||
pm.local[np.k] = struct{}{}
|
||||
}
|
||||
pi := new(providerInfo)
|
||||
pi.Creation = time.Now()
|
||||
pi.Value = np.val
|
||||
@ -51,6 +60,12 @@ func (pm *ProviderManager) run() {
|
||||
parr = append(parr, p.Value)
|
||||
}
|
||||
gp.resp <- parr
|
||||
case lc := <-pm.getlocal:
|
||||
var keys []u.Key
|
||||
for k, _ := range pm.local {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
lc <- keys
|
||||
case <-tick.C:
|
||||
for k, provs := range pm.providers {
|
||||
var filtered []*providerInfo
|
||||
@ -82,6 +97,12 @@ func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer {
|
||||
return <-gp.resp
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) GetLocal() []u.Key {
|
||||
resp := make(chan []u.Key)
|
||||
pm.getlocal <- resp
|
||||
return <-resp
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) Halt() {
|
||||
pm.halt <- struct{}{}
|
||||
}
|
||||
|
||||
20
routing/dht/providers_test.go
Normal file
20
routing/dht/providers_test.go
Normal file
@ -0,0 +1,20 @@
|
||||
package dht
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
func TestProviderManager(t *testing.T) {
|
||||
mid := peer.ID("testing")
|
||||
p := NewProviderManager(mid)
|
||||
a := u.Key("test")
|
||||
p.AddProvider(a, &peer.Peer{})
|
||||
resp := p.GetProviders(a)
|
||||
if len(resp) != 1 {
|
||||
t.Fatal("Could not retrieve provider.")
|
||||
}
|
||||
p.Halt()
|
||||
}
|
||||
@ -166,6 +166,7 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
|
||||
// Provide makes this node announce that it can provide a value for the given key
|
||||
func (dht *IpfsDHT) Provide(key u.Key) error {
|
||||
dht.providers.AddProvider(key, dht.self)
|
||||
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
|
||||
if len(peers) == 0 {
|
||||
return kb.ErrLookupFailure
|
||||
|
||||
Loading…
Reference in New Issue
Block a user