implement timeouts on listeners for the dht and add diagnostic stuff

This commit is contained in:
Jeromy 2014-08-07 18:06:50 -07:00
parent 01ca93b4f5
commit 24bfbfe372
4 changed files with 130 additions and 25 deletions

View File

@ -23,6 +23,8 @@ import (
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
type IpfsDHT struct {
// Array of routing tables for differently distanced nodes
// NOTE: (currently, only a single table is used)
routes []*RoutingTable
network *swarm.Swarm
@ -55,6 +57,7 @@ type IpfsDHT struct {
type listenInfo struct {
resp chan *swarm.Message
count int
eol time.Time
}
// Create a new DHT object with the given peer as the 'local' host
@ -161,14 +164,19 @@ func (dht *IpfsDHT) handleMessages() {
if pmes.GetResponse() {
dht.listenLock.RLock()
list, ok := dht.listeners[pmes.GetId()]
dht.listenLock.RUnlock()
if time.Now().After(list.eol) {
dht.Unlisten(pmes.GetId())
ok = false
}
if list.count > 1 {
list.count--
} else if list.count == 1 {
delete(dht.listeners, pmes.GetId())
}
dht.listenLock.RUnlock()
if ok {
list.resp <- mes
if list.count == 1 {
dht.Unlisten(pmes.GetId())
}
} else {
// this is expected behaviour during a timeout
u.DOut("Received response with nobody listening...")
@ -217,10 +225,35 @@ func (dht *IpfsDHT) handleMessages() {
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
dht.listenLock.Lock()
var remove []uint64
now := time.Now()
for k,v := range dht.listeners {
if now.After(v.eol) {
remove = append(remove, k)
}
}
for _,k := range remove {
delete(dht.listeners, k)
}
dht.listenLock.Unlock()
}
}
}
func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error {
pmes := pDHTMessage{
Type: DHTMessage_PUT_VALUE,
Key: key,
Value: value,
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
dht.network.Chan.Outgoing <- mes
return nil
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
dskey := ds.NewKey(pmes.GetKey())
var resp *pDHTMessage
@ -351,10 +384,10 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
// Register a handler for a specific message ID, used for getting replies
// to certain messages (i.e. response to a GET_VALUE message)
func (dht *IpfsDHT) ListenFor(mesid uint64, count int) <-chan *swarm.Message {
func (dht *IpfsDHT) ListenFor(mesid uint64, count int, timeout time.Duration) <-chan *swarm.Message {
lchan := make(chan *swarm.Message)
dht.listenLock.Lock()
dht.listeners[mesid] = &listenInfo{lchan, count}
dht.listeners[mesid] = &listenInfo{lchan, count, time.Now().Add(timeout)}
dht.listenLock.Unlock()
return lchan
}
@ -372,8 +405,14 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) {
func (dht *IpfsDHT) IsListening(mesid uint64) bool {
dht.listenLock.RLock()
_,ok := dht.listeners[mesid]
li,ok := dht.listeners[mesid]
dht.listenLock.RUnlock()
if time.Now().After(li.eol) {
dht.listenLock.Lock()
delete(dht.listeners, mesid)
dht.listenLock.Unlock()
return false
}
return ok
}
@ -401,7 +440,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
dht.diaglock.Unlock()
seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
listen_chan := dht.ListenFor(pmes.GetId(), len(seq))
listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second * 30)
for _,ps := range seq {
mes := swarm.NewMessage(ps, pmes)
@ -411,6 +450,9 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
buf := new(bytes.Buffer)
di := dht.getDiagInfo()
buf.Write(di.Marshal())
// NOTE: this shouldnt be a hardcoded value
after := time.After(time.Second * 20)
count := len(seq)
@ -420,14 +462,18 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
//Timeout, return what we have
goto out
case req_resp := <-listen_chan:
pmes_out := new(DHTMessage)
err := proto.Unmarshal(req_resp.Data, pmes_out)
if err != nil {
// It broke? eh, whatever, keep going
continue
}
buf.Write(req_resp.Data)
count--
}
}
out:
di := dht.getDiagInfo()
buf.Write(di.Marshal())
resp := pDHTMessage{
Type: DHTMessage_DIAGNOSTIC,
Id: pmes.GetId(),

44
routing/dht/diag.go Normal file
View File

@ -0,0 +1,44 @@
package dht
import (
"encoding/json"
"time"
peer "github.com/jbenet/go-ipfs/peer"
)
type connDiagInfo struct {
Latency time.Duration
Id peer.ID
}
type diagInfo struct {
Id peer.ID
Connections []connDiagInfo
Keys []string
LifeSpan time.Duration
CodeVersion string
}
func (di *diagInfo) Marshal() []byte {
b, err := json.Marshal(di)
if err != nil {
panic(err)
}
//TODO: also consider compressing this. There will be a lot of these
return b
}
func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di := new(diagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.Id = dht.self.ID
di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore
for _,p := range dht.routes[0].listpeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetDistance(), p.ID})
}
return di
}

View File

@ -29,6 +29,7 @@ func GenerateMessageID() uint64 {
// Basic Put/Get
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
var p *peer.Peer
p = s.routes[0].NearestPeer(convertKey(key))
@ -36,16 +37,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
panic("Table returned nil peer!")
}
pmes := pDHTMessage{
Type: DHTMessage_PUT_VALUE,
Key: string(key),
Value: value,
Id: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
s.network.Chan.Outgoing <- mes
return nil
return s.putValueToPeer(p, string(key), value)
}
// GetValue searches for the value corresponding to given Key.
@ -63,7 +55,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
Key: string(key),
Id: GenerateMessageID(),
}
response_chan := s.ListenFor(pmes.Id, 1)
response_chan := s.ListenFor(pmes.Id, 1, time.Minute)
mes := swarm.NewMessage(p, pmes.ToProtobuf())
s.network.Chan.Outgoing <- mes
@ -74,7 +66,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
case <-timeup:
s.Unlisten(pmes.Id)
return nil, u.ErrTimeout
case resp := <-response_chan:
case resp, ok := <-response_chan:
if !ok {
panic("Channel was closed...")
}
if resp == nil {
panic("Why the hell is this response nil?")
}
pmes_out := new(DHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
if err != nil {
@ -123,7 +121,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id, 1)
listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
u.DOut("Find providers for: '%s'", key)
s.network.Chan.Outgoing <-mes
after := time.After(timeout)
@ -181,7 +179,7 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listen_chan := s.ListenFor(pmes.Id, 1)
listen_chan := s.ListenFor(pmes.Id, 1, time.Minute)
s.network.Chan.Outgoing <-mes
after := time.After(timeout)
select {
@ -224,7 +222,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
response_chan := dht.ListenFor(pmes.Id, 1)
response_chan := dht.ListenFor(pmes.Id, 1, time.Minute)
dht.network.Chan.Outgoing <- mes
tout := time.After(timeout)
@ -253,7 +251,7 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
Id: GenerateMessageID(),
}
listen_chan := dht.ListenFor(pmes.Id, len(targets))
listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute * 2)
pbmes := pmes.ToProtobuf()
for _,p := range targets {

View File

@ -107,3 +107,20 @@ func TestTableFind(t *testing.T) {
t.Fatalf("Failed to lookup known node...")
}
}
func TestTableFindMultiple(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(20, convertPeerID(local.ID))
peers := make([]*peer.Peer, 100)
for i := 0; i < 18; i++ {
peers[i] = _randPeer()
rt.Update(peers[i])
}
t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty())
found := rt.NearestPeers(convertPeerID(peers[2].ID), 15)
if len(found) != 15 {
t.Fatalf("Got back different number of peers than we expected.")
}
}