mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-24 20:07:45 +08:00
refac(exch:bitswap) always notify strategy when message sent
This commit is contained in:
parent
335b50f4c6
commit
d82a2517d1
@ -79,6 +79,9 @@ type bitswap struct {
|
||||
// GetBlock attempts to retrieve a particular block from peers, within timeout.
|
||||
func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
|
||||
*blocks.Block, error) {
|
||||
ctx, _ := context.WithTimeout(context.Background(), timeout)
|
||||
|
||||
// TODO replace timeout with ctx in routing interface
|
||||
begin := time.Now()
|
||||
tleft := timeout - time.Now().Sub(begin)
|
||||
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
|
||||
@ -90,7 +93,7 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
|
||||
go func() {
|
||||
for p := range provs_ch {
|
||||
go func(pr *peer.Peer) {
|
||||
blk, err := bs.getBlock(k, pr, tleft)
|
||||
blk, err := bs.getBlock(ctx, k, pr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -111,19 +114,14 @@ func (bs *bitswap) Block(k u.Key, timeout time.Duration) (
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) {
|
||||
func (bs *bitswap) getBlock(ctx context.Context, k u.Key, p *peer.Peer) (*blocks.Block, error) {
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), timeout)
|
||||
blockChannel := bs.notifications.Subscribe(ctx, k)
|
||||
|
||||
message := bsmsg.New()
|
||||
message.AppendWanted(k)
|
||||
|
||||
// FIXME(brian): register the accountant on the service wrapper to ensure
|
||||
// that accounting is _always_ performed when SendMessage and
|
||||
// ReceiveMessage are called
|
||||
bs.sender.SendMessage(ctx, p, message)
|
||||
bs.strategy.MessageSent(p, message)
|
||||
bs.send(ctx, p, message)
|
||||
|
||||
block, ok := <-blockChannel
|
||||
if !ok {
|
||||
@ -132,11 +130,13 @@ func (bs *bitswap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*bloc
|
||||
return &block, nil
|
||||
}
|
||||
|
||||
func (bs *bitswap) sendToPeersThatWant(block blocks.Block) {
|
||||
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
|
||||
for _, p := range bs.strategy.Peers() {
|
||||
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
|
||||
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
||||
go bs.send(p, block)
|
||||
message := bsmsg.New()
|
||||
message.AppendBlock(block)
|
||||
go bs.send(ctx, p, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -145,16 +145,17 @@ func (bs *bitswap) sendToPeersThatWant(block blocks.Block) {
|
||||
// HasBlock announces the existance of a block to bitswap, potentially sending
|
||||
// it to peers (Partners) whose WantLists include it.
|
||||
func (bs *bitswap) HasBlock(blk blocks.Block) error {
|
||||
go bs.sendToPeersThatWant(blk)
|
||||
ctx := context.TODO()
|
||||
go bs.sendToPeersThatWant(ctx, blk)
|
||||
return bs.routing.Provide(blk.Key())
|
||||
}
|
||||
|
||||
// TODO(brian): handle errors
|
||||
func (bs *bitswap) ReceiveMessage(
|
||||
ctx context.Context, sender *peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||
ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||
*peer.Peer, bsmsg.BitSwapMessage, error) {
|
||||
|
||||
bs.strategy.MessageReceived(sender, incoming)
|
||||
bs.strategy.MessageReceived(p, incoming)
|
||||
|
||||
if incoming.Blocks() != nil {
|
||||
for _, block := range incoming.Blocks() {
|
||||
@ -165,26 +166,26 @@ func (bs *bitswap) ReceiveMessage(
|
||||
|
||||
if incoming.Wantlist() != nil {
|
||||
for _, key := range incoming.Wantlist() {
|
||||
if bs.strategy.ShouldSendBlockToPeer(key, sender) {
|
||||
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
||||
block, errBlockNotFound := bs.blockstore.Get(key)
|
||||
if errBlockNotFound != nil {
|
||||
// TODO(brian): log/return the error
|
||||
continue
|
||||
}
|
||||
go bs.send(sender, *block)
|
||||
message := bsmsg.New()
|
||||
message.AppendBlock(*block)
|
||||
go bs.send(ctx, p, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil, errors.New("TODO implement")
|
||||
}
|
||||
|
||||
// TODO(brian): get a return value
|
||||
func (bs *bitswap) send(p *peer.Peer, b blocks.Block) {
|
||||
message := bsmsg.New()
|
||||
message.AppendBlock(b)
|
||||
// FIXME(brian): pass ctx
|
||||
bs.sender.SendMessage(context.Background(), p, message)
|
||||
bs.strategy.MessageSent(p, message)
|
||||
// send strives to ensure that accounting is always performed when a message is
|
||||
// sent
|
||||
func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessage) {
|
||||
bs.sender.SendMessage(context.Background(), p, m)
|
||||
bs.strategy.MessageSent(p, m)
|
||||
}
|
||||
|
||||
func numBytes(b blocks.Block) int {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user