bitswap: add a deadline to sendmsg calls

License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
This commit is contained in:
Jeromy 2016-11-29 15:22:05 -08:00
parent f81cccc3fc
commit 02975bde9d
4 changed files with 24 additions and 8 deletions

View File

@ -38,7 +38,7 @@ type BitSwapNetwork interface {
}
type MessageSender interface {
SendMsg(bsmsg.BitSwapMessage) error
SendMsg(context.Context, bsmsg.BitSwapMessage) error
Close() error
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"time"
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
@ -20,6 +21,8 @@ import (
var log = logging.Logger("bitswap_network")
var sendMessageTimeout = time.Minute * 10
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork {
bitswapNetwork := impl{
@ -53,11 +56,20 @@ func (s *streamMessageSender) Close() error {
return s.s.Close()
}
func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error {
return msgToStream(s.s, msg)
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return msgToStream(ctx, s.s, msg)
}
func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error {
func msgToStream(ctx context.Context, s inet.Stream, msg bsmsg.BitSwapMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
if err := s.SetWriteDeadline(deadline); err != nil {
log.Warningf("error setting deadline: %s", err)
}
switch s.Protocol() {
case ProtocolBitswap:
if err := msg.ToNetV1(s); err != nil {
@ -72,6 +84,10 @@ func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error {
default:
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}
if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warningf("error resetting deadline: %s", err)
}
return nil
}
@ -107,7 +123,7 @@ func (bsnet *impl) SendMessage(
}
defer s.Close()
return msgToStream(s, outgoing)
return msgToStream(ctx, s, outgoing)
}
func (bsnet *impl) SetDelegate(r Receiver) {

View File

@ -119,8 +119,8 @@ type messagePasser struct {
ctx context.Context
}
func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m)
func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(ctx, mp.local, mp.target, m)
}
func (mp *messagePasser) Close() error {

View File

@ -196,7 +196,7 @@ func (mq *msgQueue) doWork(ctx context.Context) {
// send wantlist updates
for { // try to send this message until we fail.
err := mq.sender.SendMsg(wlm)
err := mq.sender.SendMsg(ctx, wlm)
if err == nil {
return
}