diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 21b4d9ead..dfc1b3f02 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -38,7 +38,7 @@ type BitSwapNetwork interface { } type MessageSender interface { - SendMsg(bsmsg.BitSwapMessage) error + SendMsg(context.Context, bsmsg.BitSwapMessage) error Close() error } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 3d992769b..c854f853e 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "time" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" @@ -20,6 +21,8 @@ import ( var log = logging.Logger("bitswap_network") +var sendMessageTimeout = time.Minute * 10 + // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork { bitswapNetwork := impl{ @@ -53,11 +56,20 @@ func (s *streamMessageSender) Close() error { return s.s.Close() } -func (s *streamMessageSender) SendMsg(msg bsmsg.BitSwapMessage) error { - return msgToStream(s.s, msg) +func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error { + return msgToStream(ctx, s.s, msg) } -func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error { +func msgToStream(ctx context.Context, s inet.Stream, msg bsmsg.BitSwapMessage) error { + deadline := time.Now().Add(sendMessageTimeout) + if dl, ok := ctx.Deadline(); ok { + deadline = dl + } + + if err := s.SetWriteDeadline(deadline); err != nil { + log.Warningf("error setting deadline: %s", err) + } + switch s.Protocol() { case ProtocolBitswap: if err := msg.ToNetV1(s); err != nil { @@ -72,6 +84,10 @@ func msgToStream(s inet.Stream, msg bsmsg.BitSwapMessage) error { default: return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol()) } + + if err := s.SetWriteDeadline(time.Time{}); err != nil { + log.Warningf("error resetting deadline: %s", err) + } return nil } @@ -107,7 +123,7 @@ func (bsnet *impl) SendMessage( } defer s.Close() - return msgToStream(s, outgoing) + return msgToStream(ctx, s, outgoing) } func (bsnet *impl) SetDelegate(r Receiver) { diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index ab3535c1f..4d8769e5b 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -119,8 +119,8 @@ type messagePasser struct { ctx context.Context } -func (mp *messagePasser) SendMsg(m bsmsg.BitSwapMessage) error { - return mp.net.SendMessage(mp.ctx, mp.local, mp.target, m) +func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error { + return mp.net.SendMessage(ctx, mp.local, mp.target, m) } func (mp *messagePasser) Close() error { diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 388db20b5..f5869d82e 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -196,7 +196,7 @@ func (mq *msgQueue) doWork(ctx context.Context) { // send wantlist updates for { // try to send this message until we fail. - err := mq.sender.SendMsg(wlm) + err := mq.sender.SendMsg(ctx, wlm) if err == nil { return }