mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-28 05:47:51 +08:00
feat(exch:bitswap) simply get method
This commit is contained in:
parent
74e81e06fa
commit
98a6e9fac2
@ -65,63 +65,38 @@ type bitswap struct {
|
||||
// deadline enforced by the context
|
||||
//
|
||||
// TODO ensure only one active request per key
|
||||
func (bs *bitswap) Block(ctx context.Context, k u.Key) (*blocks.Block, error) {
|
||||
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
|
||||
|
||||
const maxProviders = 20
|
||||
provs_ch := bs.routing.FindProvidersAsync(ctx, k, maxProviders)
|
||||
ctx, cancelFunc := context.WithCancel(parent)
|
||||
promise := bs.notifications.Subscribe(ctx, k)
|
||||
|
||||
blockChannel := make(chan blocks.Block)
|
||||
|
||||
// TODO: when the data is received, shut down this for loop ASAP
|
||||
go func() {
|
||||
for p := range provs_ch {
|
||||
go func(pr *peer.Peer) {
|
||||
blk, err := bs.getBlock(ctx, k, pr)
|
||||
const maxProviders = 20
|
||||
peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders)
|
||||
message := bsmsg.New()
|
||||
message.AppendWanted(k)
|
||||
for i := range peersToQuery {
|
||||
go func(p *peer.Peer) {
|
||||
response, err := bs.sender.SendRequest(ctx, p, message)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case blockChannel <- *blk:
|
||||
default:
|
||||
}
|
||||
}(p)
|
||||
// FIXME ensure accounting is handled correctly when
|
||||
// communication fails. May require slightly different API to
|
||||
// get better guarantees. May need shared sequence numbers.
|
||||
bs.strategy.MessageSent(p, message)
|
||||
|
||||
bs.ReceiveMessage(ctx, p, response)
|
||||
}(i)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case block := <-blockChannel:
|
||||
close(blockChannel)
|
||||
case block := <-promise:
|
||||
cancelFunc()
|
||||
return &block, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *bitswap) getBlock(ctx context.Context, k u.Key, p *peer.Peer) (*blocks.Block, error) {
|
||||
|
||||
blockChannel := bs.notifications.Subscribe(ctx, k)
|
||||
|
||||
message := bsmsg.New()
|
||||
message.AppendWanted(k)
|
||||
|
||||
bs.send(ctx, p, message)
|
||||
|
||||
block, ok := <-blockChannel
|
||||
if !ok {
|
||||
return nil, u.ErrTimeout
|
||||
}
|
||||
return &block, nil
|
||||
}
|
||||
|
||||
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
|
||||
for _, p := range bs.strategy.Peers() {
|
||||
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
|
||||
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
||||
message := bsmsg.New()
|
||||
message.AppendBlock(block)
|
||||
go bs.send(ctx, p, message)
|
||||
}
|
||||
}
|
||||
case <-parent.Done():
|
||||
return nil, parent.Err()
|
||||
}
|
||||
}
|
||||
|
||||
@ -173,3 +148,15 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag
|
||||
func numBytes(b blocks.Block) int {
|
||||
return len(b.Data)
|
||||
}
|
||||
|
||||
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
|
||||
for _, p := range bs.strategy.Peers() {
|
||||
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
|
||||
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
||||
message := bsmsg.New()
|
||||
message.AppendBlock(block)
|
||||
go bs.send(ctx, p, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user