From 98a6e9fac27f22be200583e650c9fc27f0fd720f Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Thu, 18 Sep 2014 20:30:04 -0700 Subject: [PATCH] feat(exch:bitswap) simply get method --- exchange/bitswap/bitswap.go | 79 ++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 46 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 418d5046e..aab1c6f1e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -65,63 +65,38 @@ type bitswap struct { // deadline enforced by the context // // TODO ensure only one active request per key -func (bs *bitswap) Block(ctx context.Context, k u.Key) (*blocks.Block, error) { +func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) { - const maxProviders = 20 - provs_ch := bs.routing.FindProvidersAsync(ctx, k, maxProviders) + ctx, cancelFunc := context.WithCancel(parent) + promise := bs.notifications.Subscribe(ctx, k) - blockChannel := make(chan blocks.Block) - - // TODO: when the data is received, shut down this for loop ASAP go func() { - for p := range provs_ch { - go func(pr *peer.Peer) { - blk, err := bs.getBlock(ctx, k, pr) + const maxProviders = 20 + peersToQuery := bs.routing.FindProvidersAsync(ctx, k, maxProviders) + message := bsmsg.New() + message.AppendWanted(k) + for i := range peersToQuery { + go func(p *peer.Peer) { + response, err := bs.sender.SendRequest(ctx, p, message) if err != nil { return } - select { - case blockChannel <- *blk: - default: - } - }(p) + // FIXME ensure accounting is handled correctly when + // communication fails. May require slightly different API to + // get better guarantees. May need shared sequence numbers. + bs.strategy.MessageSent(p, message) + + bs.ReceiveMessage(ctx, p, response) + }(i) } }() select { - case block := <-blockChannel: - close(blockChannel) + case block := <-promise: + cancelFunc() return &block, nil - case <-ctx.Done(): - return nil, ctx.Err() - } -} - -func (bs *bitswap) getBlock(ctx context.Context, k u.Key, p *peer.Peer) (*blocks.Block, error) { - - blockChannel := bs.notifications.Subscribe(ctx, k) - - message := bsmsg.New() - message.AppendWanted(k) - - bs.send(ctx, p, message) - - block, ok := <-blockChannel - if !ok { - return nil, u.ErrTimeout - } - return &block, nil -} - -func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { - for _, p := range bs.strategy.Peers() { - if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { - if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { - message := bsmsg.New() - message.AppendBlock(block) - go bs.send(ctx, p, message) - } - } + case <-parent.Done(): + return nil, parent.Err() } } @@ -173,3 +148,15 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag func numBytes(b blocks.Block) int { return len(b.Data) } + +func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { + for _, p := range bs.strategy.Peers() { + if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { + if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { + message := bsmsg.New() + message.AppendBlock(block) + go bs.send(ctx, p, message) + } + } + } +}