mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-02 06:47:51 +08:00
feat(bitswap:network) propagate errors up the stack
Rather than pushing errors back down to lower layers, propagate the
errors upward.
This commit adds a `ReceiveError` method to BitSwap's network receiver.
Still TODO: rm the error return value from:
net.service.handler.HandleMessage
This is inspired by delegation patterns in found in the wild.
This commit is contained in:
parent
bc883bd0cf
commit
0e494690b3
@ -1,8 +1,6 @@
|
||||
package bitswap
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
|
||||
|
||||
@ -120,14 +118,16 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
||||
|
||||
// TODO(brian): handle errors
|
||||
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||
*peer.Peer, bsmsg.BitSwapMessage, error) {
|
||||
*peer.Peer, bsmsg.BitSwapMessage) {
|
||||
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())
|
||||
|
||||
if p == nil {
|
||||
return nil, nil, errors.New("Received nil Peer")
|
||||
// TODO propagate the error upward
|
||||
return nil, nil
|
||||
}
|
||||
if incoming == nil {
|
||||
return nil, nil, errors.New("Received nil Message")
|
||||
// TODO propagate the error upward
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
bs.strategy.MessageReceived(p, incoming) // FIRST
|
||||
@ -157,7 +157,12 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
|
||||
}
|
||||
}
|
||||
defer bs.strategy.MessageSent(p, message)
|
||||
return p, message, nil
|
||||
return p, message
|
||||
}
|
||||
|
||||
func (bs *bitswap) ReceiveError(err error) {
|
||||
// TODO log the network error
|
||||
// TODO bubble the network error up to the parent context/error logger
|
||||
}
|
||||
|
||||
// send strives to ensure that accounting is always performed when a message is
|
||||
|
||||
@ -33,7 +33,9 @@ type Adapter interface {
|
||||
type Receiver interface {
|
||||
ReceiveMessage(
|
||||
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||
destination *peer.Peer, outgoing bsmsg.BitSwapMessage, err error)
|
||||
destination *peer.Peer, outgoing bsmsg.BitSwapMessage)
|
||||
|
||||
ReceiveError(error)
|
||||
}
|
||||
|
||||
// TODO(brian): move this to go-ipfs/net package
|
||||
|
||||
@ -1,8 +1,6 @@
|
||||
package network
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
@ -34,18 +32,16 @@ func (adapter *impl) HandleMessage(
|
||||
ctx context.Context, incoming netmsg.NetMessage) (netmsg.NetMessage, error) {
|
||||
|
||||
if adapter.receiver == nil {
|
||||
return nil, errors.New("No receiver. NetMessage dropped")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
received, err := bsmsg.FromNet(incoming)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
adapter.receiver.ReceiveError(err)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
p, bsmsg, err := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p, bsmsg := adapter.receiver.ReceiveMessage(ctx, incoming.Peer(), received)
|
||||
|
||||
// TODO(brian): put this in a helper function
|
||||
if bsmsg == nil || p == nil {
|
||||
@ -54,7 +50,8 @@ func (adapter *impl) HandleMessage(
|
||||
|
||||
outgoing, err := bsmsg.ToNet(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
adapter.receiver.ReceiveError(err)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return outgoing, nil
|
||||
|
||||
@ -76,18 +76,7 @@ func (n *network) deliver(
|
||||
return errors.New("Invalid input")
|
||||
}
|
||||
|
||||
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
|
||||
if err != nil {
|
||||
|
||||
// TODO should this error be returned across network boundary?
|
||||
|
||||
// TODO this raises an interesting question about network contract. How
|
||||
// can the network be expected to behave under different failure
|
||||
// conditions? What if peer is unreachable? Will we know if messages
|
||||
// aren't delivered?
|
||||
|
||||
return err
|
||||
}
|
||||
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
|
||||
|
||||
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
|
||||
return errors.New("Malformed client request")
|
||||
@ -119,15 +108,12 @@ func (n *network) SendRequest(
|
||||
if !ok {
|
||||
return nil, errors.New("Cannot locate peer on network")
|
||||
}
|
||||
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// TODO return nil, NoResponse
|
||||
}
|
||||
nextPeer, nextMsg := r.ReceiveMessage(context.TODO(), from, message)
|
||||
|
||||
// TODO dedupe code
|
||||
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
|
||||
return nil, errors.New("Malformed client request")
|
||||
r.ReceiveError(errors.New("Malformed client request"))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// TODO dedupe code
|
||||
@ -144,7 +130,7 @@ func (n *network) SendRequest(
|
||||
}
|
||||
n.deliver(nextReceiver, nextPeer, nextMsg)
|
||||
}()
|
||||
return nil, NoResponse
|
||||
return nil, nil
|
||||
}
|
||||
return nextMsg, nil
|
||||
}
|
||||
|
||||
@ -26,7 +26,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
|
||||
ctx context.Context,
|
||||
from *peer.Peer,
|
||||
incoming bsmsg.BitSwapMessage) (
|
||||
*peer.Peer, bsmsg.BitSwapMessage, error) {
|
||||
*peer.Peer, bsmsg.BitSwapMessage) {
|
||||
|
||||
t.Log("Recipient received a message from the network")
|
||||
|
||||
@ -35,7 +35,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
|
||||
m := bsmsg.New()
|
||||
m.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))
|
||||
|
||||
return from, m, nil
|
||||
return from, m
|
||||
}))
|
||||
|
||||
t.Log("Build a message and send a synchronous request to recipient")
|
||||
@ -74,19 +74,19 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
||||
ctx context.Context,
|
||||
fromWaiter *peer.Peer,
|
||||
msgFromWaiter bsmsg.BitSwapMessage) (
|
||||
*peer.Peer, bsmsg.BitSwapMessage, error) {
|
||||
*peer.Peer, bsmsg.BitSwapMessage) {
|
||||
|
||||
msgToWaiter := bsmsg.New()
|
||||
msgToWaiter.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))
|
||||
|
||||
return fromWaiter, msgToWaiter, nil
|
||||
return fromWaiter, msgToWaiter
|
||||
}))
|
||||
|
||||
waiter.SetDelegate(lambda(func(
|
||||
ctx context.Context,
|
||||
fromResponder *peer.Peer,
|
||||
msgFromResponder bsmsg.BitSwapMessage) (
|
||||
*peer.Peer, bsmsg.BitSwapMessage, error) {
|
||||
*peer.Peer, bsmsg.BitSwapMessage) {
|
||||
|
||||
// TODO assert that this came from the correct peer and that the message contents are as expected
|
||||
ok := false
|
||||
@ -101,7 +101,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
||||
t.Fatal("Message not received from the responder")
|
||||
|
||||
}
|
||||
return nil, nil, nil
|
||||
return nil, nil
|
||||
}))
|
||||
|
||||
messageSentAsync := bsmsg.New()
|
||||
@ -116,7 +116,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
|
||||
}
|
||||
|
||||
type receiverFunc func(ctx context.Context, p *peer.Peer,
|
||||
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error)
|
||||
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage)
|
||||
|
||||
// lambda returns a Receiver instance given a receiver function
|
||||
func lambda(f receiverFunc) bsnet.Receiver {
|
||||
@ -126,13 +126,16 @@ func lambda(f receiverFunc) bsnet.Receiver {
|
||||
}
|
||||
|
||||
type lambdaImpl struct {
|
||||
f func(ctx context.Context, p *peer.Peer,
|
||||
incoming bsmsg.BitSwapMessage) (
|
||||
*peer.Peer, bsmsg.BitSwapMessage, error)
|
||||
f func(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||
*peer.Peer, bsmsg.BitSwapMessage)
|
||||
}
|
||||
|
||||
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
|
||||
p *peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||
*peer.Peer, bsmsg.BitSwapMessage, error) {
|
||||
*peer.Peer, bsmsg.BitSwapMessage) {
|
||||
return lam.f(ctx, p, incoming)
|
||||
}
|
||||
|
||||
func (lam *lambdaImpl) ReceiveError(err error) {
|
||||
// TODO log error
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user