From f81f6ca6a63ff50f23c1c37984d9533f26eb10f0 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 17 Jun 2016 11:01:35 -0700 Subject: [PATCH 1/2] implement some simple dht request read timeouts License: MIT Signed-off-by: Jeromy --- routing/dht/dht.go | 43 +++++++++++++++++++++++++++++++++++++++--- routing/dht/dht_net.go | 9 +++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 3e06e978f..b2164ff1f 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -107,6 +107,16 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) pmes.Record = rec rpmes, err := dht.sendRequest(ctx, p, pmes) + switch err { + case ErrReadTimeout: + log.Errorf("read timeout: %s %s", p, key) + fallthrough + default: + return err + case nil: + break + } + if err != nil { return err } @@ -164,7 +174,16 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, defer log.EventBegin(ctx, "getValueSingle", p, &key).Done() pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0) - return dht.sendRequest(ctx, p, pmes) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Errorf("read timeout: %s %s", p, key) + fallthrough + default: + return nil, err + } } // getLocal attempts to retrieve the value from the datastore @@ -238,14 +257,32 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( defer log.EventBegin(ctx, "findPeerSingle", p, id).Done() pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) - return dht.sendRequest(ctx, p, pmes) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Errorf("read timeout: %s %s", p, id) + fallthrough + default: + return nil, err + } } func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.Key) (*pb.Message, error) { defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done() pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0) - return dht.sendRequest(ctx, p, pmes) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Errorf("read timeout: %s %s", p, key) + fallthrough + default: + return nil, err + } } // nearestPeersToQuery returns the routing tables closest peers. diff --git a/routing/dht/dht_net.go b/routing/dht/dht_net.go index bc5d02d3d..b864d2caa 100644 --- a/routing/dht/dht_net.go +++ b/routing/dht/dht_net.go @@ -1,6 +1,7 @@ package dht import ( + "fmt" "sync" "time" @@ -12,6 +13,9 @@ import ( inet "gx/ipfs/QmdBpVuSYuTGDA8Kn66CbKvEThXqKUh2nTANZEhzSxqrmJ/go-libp2p/p2p/net" ) +var dhtReadMessageTimeout = time.Minute +var ErrReadTimeout = fmt.Errorf("timed out reading response") + // handleNewStream implements the inet.StreamHandler func (dht *IpfsDHT) handleNewStream(s inet.Stream) { go dht.handleNewMessage(s) @@ -232,10 +236,15 @@ func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error errc <- r.ReadMsg(mes) }(ms.r) + t := time.NewTimer(dhtReadMessageTimeout) + defer t.Stop() + select { case err := <-errc: return err case <-ctx.Done(): return ctx.Err() + case <-t.C: + return ErrReadTimeout } } From 62c986d5c28810312a2e5326da415ddcc51a5cc1 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 17 Jun 2016 14:34:48 -0700 Subject: [PATCH 2/2] demote errors to warnings License: MIT Signed-off-by: Jeromy --- routing/dht/dht.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b2164ff1f..f96b67692 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -109,7 +109,7 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rpmes, err := dht.sendRequest(ctx, p, pmes) switch err { case ErrReadTimeout: - log.Errorf("read timeout: %s %s", p, key) + log.Warningf("read timeout: %s %s", p.Pretty(), key) fallthrough default: return err @@ -179,7 +179,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, case nil: return resp, nil case ErrReadTimeout: - log.Errorf("read timeout: %s %s", p, key) + log.Warningf("read timeout: %s %s", p.Pretty(), key) fallthrough default: return nil, err @@ -262,7 +262,7 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( case nil: return resp, nil case ErrReadTimeout: - log.Errorf("read timeout: %s %s", p, id) + log.Warningf("read timeout: %s %s", p.Pretty(), id) fallthrough default: return nil, err @@ -278,7 +278,7 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key. case nil: return resp, nil case ErrReadTimeout: - log.Errorf("read timeout: %s %s", p, key) + log.Warningf("read timeout: %s %s", p.Pretty(), key) fallthrough default: return nil, err