From 24bfbfe37252b0f312f90428ecdebd4e6855384a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 7 Aug 2014 18:06:50 -0700 Subject: [PATCH] implement timeouts on listeners for the dht and add diagnostic stuff --- routing/dht/dht.go | 64 +++++++++++++++++++++++++++++++++------ routing/dht/diag.go | 44 +++++++++++++++++++++++++++ routing/dht/routing.go | 30 +++++++++--------- routing/dht/table_test.go | 17 +++++++++++ 4 files changed, 130 insertions(+), 25 deletions(-) create mode 100644 routing/dht/diag.go diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 5791c3fed..63a368b32 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -23,6 +23,8 @@ import ( // IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. // It is used to implement the base IpfsRouting module. type IpfsDHT struct { + // Array of routing tables for differently distanced nodes + // NOTE: (currently, only a single table is used) routes []*RoutingTable network *swarm.Swarm @@ -55,6 +57,7 @@ type IpfsDHT struct { type listenInfo struct { resp chan *swarm.Message count int + eol time.Time } // Create a new DHT object with the given peer as the 'local' host @@ -161,14 +164,19 @@ func (dht *IpfsDHT) handleMessages() { if pmes.GetResponse() { dht.listenLock.RLock() list, ok := dht.listeners[pmes.GetId()] + dht.listenLock.RUnlock() + if time.Now().After(list.eol) { + dht.Unlisten(pmes.GetId()) + ok = false + } if list.count > 1 { list.count-- - } else if list.count == 1 { - delete(dht.listeners, pmes.GetId()) } - dht.listenLock.RUnlock() if ok { list.resp <- mes + if list.count == 1 { + dht.Unlisten(pmes.GetId()) + } } else { // this is expected behaviour during a timeout u.DOut("Received response with nobody listening...") @@ -217,10 +225,35 @@ func (dht *IpfsDHT) handleMessages() { dht.providers[k] = cleaned } dht.providerLock.Unlock() + dht.listenLock.Lock() + var remove []uint64 + now := time.Now() + for k,v := range dht.listeners { + if now.After(v.eol) { + remove = append(remove, k) + } + } + for _,k := range remove { + delete(dht.listeners, k) + } + dht.listenLock.Unlock() } } } +func (dht *IpfsDHT) putValueToPeer(p *peer.Peer, key string, value []byte) error { + pmes := pDHTMessage{ + Type: DHTMessage_PUT_VALUE, + Key: key, + Value: value, + Id: GenerateMessageID(), + } + + mes := swarm.NewMessage(p, pmes.ToProtobuf()) + dht.network.Chan.Outgoing <- mes + return nil +} + func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) { dskey := ds.NewKey(pmes.GetKey()) var resp *pDHTMessage @@ -351,10 +384,10 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) { // Register a handler for a specific message ID, used for getting replies // to certain messages (i.e. response to a GET_VALUE message) -func (dht *IpfsDHT) ListenFor(mesid uint64, count int) <-chan *swarm.Message { +func (dht *IpfsDHT) ListenFor(mesid uint64, count int, timeout time.Duration) <-chan *swarm.Message { lchan := make(chan *swarm.Message) dht.listenLock.Lock() - dht.listeners[mesid] = &listenInfo{lchan, count} + dht.listeners[mesid] = &listenInfo{lchan, count, time.Now().Add(timeout)} dht.listenLock.Unlock() return lchan } @@ -372,8 +405,14 @@ func (dht *IpfsDHT) Unlisten(mesid uint64) { func (dht *IpfsDHT) IsListening(mesid uint64) bool { dht.listenLock.RLock() - _,ok := dht.listeners[mesid] + li,ok := dht.listeners[mesid] dht.listenLock.RUnlock() + if time.Now().After(li.eol) { + dht.listenLock.Lock() + delete(dht.listeners, mesid) + dht.listenLock.Unlock() + return false + } return ok } @@ -401,7 +440,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) { dht.diaglock.Unlock() seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10) - listen_chan := dht.ListenFor(pmes.GetId(), len(seq)) + listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second * 30) for _,ps := range seq { mes := swarm.NewMessage(ps, pmes) @@ -411,6 +450,9 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) { buf := new(bytes.Buffer) + di := dht.getDiagInfo() + buf.Write(di.Marshal()) + // NOTE: this shouldnt be a hardcoded value after := time.After(time.Second * 20) count := len(seq) @@ -420,14 +462,18 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) { //Timeout, return what we have goto out case req_resp := <-listen_chan: + pmes_out := new(DHTMessage) + err := proto.Unmarshal(req_resp.Data, pmes_out) + if err != nil { + // It broke? eh, whatever, keep going + continue + } buf.Write(req_resp.Data) count-- } } out: - di := dht.getDiagInfo() - buf.Write(di.Marshal()) resp := pDHTMessage{ Type: DHTMessage_DIAGNOSTIC, Id: pmes.GetId(), diff --git a/routing/dht/diag.go b/routing/dht/diag.go new file mode 100644 index 000000000..b8f211e40 --- /dev/null +++ b/routing/dht/diag.go @@ -0,0 +1,44 @@ +package dht + +import ( + "encoding/json" + "time" + + peer "github.com/jbenet/go-ipfs/peer" +) + +type connDiagInfo struct { + Latency time.Duration + Id peer.ID +} + +type diagInfo struct { + Id peer.ID + Connections []connDiagInfo + Keys []string + LifeSpan time.Duration + CodeVersion string +} + +func (di *diagInfo) Marshal() []byte { + b, err := json.Marshal(di) + if err != nil { + panic(err) + } + //TODO: also consider compressing this. There will be a lot of these + return b +} + + +func (dht *IpfsDHT) getDiagInfo() *diagInfo { + di := new(diagInfo) + di.CodeVersion = "github.com/jbenet/go-ipfs" + di.Id = dht.self.ID + di.LifeSpan = time.Since(dht.birth) + di.Keys = nil // Currently no way to query datastore + + for _,p := range dht.routes[0].listpeers() { + di.Connections = append(di.Connections, connDiagInfo{p.GetDistance(), p.ID}) + } + return di +} diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 8d45a4a43..1a90ce76b 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -29,6 +29,7 @@ func GenerateMessageID() uint64 { // Basic Put/Get // PutValue adds value corresponding to given Key. +// This is the top level "Store" operation of the DHT func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { var p *peer.Peer p = s.routes[0].NearestPeer(convertKey(key)) @@ -36,16 +37,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { panic("Table returned nil peer!") } - pmes := pDHTMessage{ - Type: DHTMessage_PUT_VALUE, - Key: string(key), - Value: value, - Id: GenerateMessageID(), - } - - mes := swarm.NewMessage(p, pmes.ToProtobuf()) - s.network.Chan.Outgoing <- mes - return nil + return s.putValueToPeer(p, string(key), value) } // GetValue searches for the value corresponding to given Key. @@ -63,7 +55,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { Key: string(key), Id: GenerateMessageID(), } - response_chan := s.ListenFor(pmes.Id, 1) + response_chan := s.ListenFor(pmes.Id, 1, time.Minute) mes := swarm.NewMessage(p, pmes.ToProtobuf()) s.network.Chan.Outgoing <- mes @@ -74,7 +66,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { case <-timeup: s.Unlisten(pmes.Id) return nil, u.ErrTimeout - case resp := <-response_chan: + case resp, ok := <-response_chan: + if !ok { + panic("Channel was closed...") + } + if resp == nil { + panic("Why the hell is this response nil?") + } pmes_out := new(DHTMessage) err := proto.Unmarshal(resp.Data, pmes_out) if err != nil { @@ -123,7 +121,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, mes := swarm.NewMessage(p, pmes.ToProtobuf()) - listen_chan := s.ListenFor(pmes.Id, 1) + listen_chan := s.ListenFor(pmes.Id, 1, time.Minute) u.DOut("Find providers for: '%s'", key) s.network.Chan.Outgoing <-mes after := time.After(timeout) @@ -181,7 +179,7 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error mes := swarm.NewMessage(p, pmes.ToProtobuf()) - listen_chan := s.ListenFor(pmes.Id, 1) + listen_chan := s.ListenFor(pmes.Id, 1, time.Minute) s.network.Chan.Outgoing <-mes after := time.After(timeout) select { @@ -224,7 +222,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { mes := swarm.NewMessage(p, pmes.ToProtobuf()) before := time.Now() - response_chan := dht.ListenFor(pmes.Id, 1) + response_chan := dht.ListenFor(pmes.Id, 1, time.Minute) dht.network.Chan.Outgoing <- mes tout := time.After(timeout) @@ -253,7 +251,7 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { Id: GenerateMessageID(), } - listen_chan := dht.ListenFor(pmes.Id, len(targets)) + listen_chan := dht.ListenFor(pmes.Id, len(targets), time.Minute * 2) pbmes := pmes.ToProtobuf() for _,p := range targets { diff --git a/routing/dht/table_test.go b/routing/dht/table_test.go index debec5e16..393a1c585 100644 --- a/routing/dht/table_test.go +++ b/routing/dht/table_test.go @@ -107,3 +107,20 @@ func TestTableFind(t *testing.T) { t.Fatalf("Failed to lookup known node...") } } + +func TestTableFindMultiple(t *testing.T) { + local := _randPeer() + rt := NewRoutingTable(20, convertPeerID(local.ID)) + + peers := make([]*peer.Peer, 100) + for i := 0; i < 18; i++ { + peers[i] = _randPeer() + rt.Update(peers[i]) + } + + t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) + found := rt.NearestPeers(convertPeerID(peers[2].ID), 15) + if len(found) != 15 { + t.Fatalf("Got back different number of peers than we expected.") + } +}