From 235a9ec5fcbb2ba6cc9099e3d77ef386ab62398b Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 21 Sep 2015 09:55:25 -0700 Subject: [PATCH] Fix dht queries Queries previously would sometimes only query three (alpha value) peers before halting the operation. This PR changes the number of peers grabbed from the routing table to start a query to K. Dht nodes would also not respond with enough peers, as per the kademlia paper, this has been changed to from 4 to 'K'. The query mechanism itself also was flawed in that it would pull all the peers it had yet to query out of the queue and 'start' the query for them. The concurrency rate limiting was done inside the 'queryPeer' method after the goroutine was spawned. This did not allow for peers receiver from query replies to be properly queried in order of distance. License: MIT Signed-off-by: Jeromy --- routing/dht/dht.go | 6 +----- routing/dht/handlers.go | 2 +- routing/dht/lookup.go | 2 +- routing/dht/query.go | 33 ++++++++++++++++----------------- routing/dht/routing.go | 8 ++++---- 5 files changed, 23 insertions(+), 28 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 64f28c74e..2f9a6ebe6 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -312,11 +312,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [ continue } - // must all be closer than self - key := key.Key(pmes.GetKey()) - if !kb.Closer(dht.self, clp, key) { - filtered = append(filtered, clp) - } + filtered = append(filtered, clp) } // ok seems like closer nodes diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 1c5d2b1c1..137995bed 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -14,7 +14,7 @@ import ( ) // The number of closer peers to send on requests. -var CloserPeerCount = 4 +var CloserPeerCount = KValue // dhthandler specifies the signature of functions that handle DHT messages. type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error) diff --git a/routing/dht/lookup.go b/routing/dht/lookup.go index 76173a615..dc377e8b7 100644 --- a/routing/dht/lookup.go +++ b/routing/dht/lookup.go @@ -23,7 +23,7 @@ func pointerizePeerInfos(pis []peer.PeerInfo) []*peer.PeerInfo { // to the given key func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key key.Key) (<-chan peer.ID, error) { e := log.EventBegin(ctx, "getClosestPeers", &key) - tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) if len(tablepeers) == 0 { return nil, kb.ErrLookupFailure } diff --git a/routing/dht/query.go b/routing/dht/query.go index ab4f28492..b1bec20bb 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -184,29 +184,28 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) { case <-r.proc.Closing(): return - case p, more := <-r.peersToQuery.DeqChan: - if !more { - return // channel closed. - } + case <-r.rateLimit: + select { + case p, more := <-r.peersToQuery.DeqChan: + if !more { + return // channel closed. + } - // do it as a child func to make sure Run exits - // ONLY AFTER spawn workers has exited. - proc.Go(func(proc process.Process) { - r.queryPeer(proc, p) - }) + // do it as a child func to make sure Run exits + // ONLY AFTER spawn workers has exited. + proc.Go(func(proc process.Process) { + r.queryPeer(proc, p) + }) + case <-r.proc.Closing(): + return + case <-r.peersRemaining.Done(): + return + } } } } func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { - // make sure we rate limit concurrency. - select { - case <-r.rateLimit: - case <-proc.Closing(): - r.peersRemaining.Decrement(1) - return - } - // ok let's do this! // create a context from our proc. diff --git a/routing/dht/routing.go b/routing/dht/routing.go index d5854155f..57341e69c 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -145,7 +145,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]ro } // get closest peers in the routing table - rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) log.Debugf("peers in rt: %s", len(rtp), rtp) if len(rtp) == 0 { log.Warning("No peers from routing table!") @@ -322,7 +322,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key, return &dhtQueryResult{closerPeers: clpeers}, nil }) - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) _, err := query.Run(ctx, peers) if err != nil { log.Debugf("Query error: %s", err) @@ -342,7 +342,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er return pi, nil } - peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue) if len(peers) == 0 { return peer.PeerInfo{}, kb.ErrLookupFailure } @@ -409,7 +409,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< peerchan := make(chan peer.PeerInfo, asyncQueryBuffer) peersSeen := peer.Set{} - peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue) if len(peers) == 0 { return nil, kb.ErrLookupFailure }