From 8937f5fbda7eb780dd41e42938d31ad0729cedde Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 5 Mar 2015 15:18:57 -0800 Subject: [PATCH] implement a worker to consolidate HasBlock provide calls into one to alieviate memory pressure --- exchange/bitswap/bitswap.go | 3 ++ exchange/bitswap/workers.go | 58 ++++++++++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 3a81015be..60672d0c3 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -89,6 +89,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, batchRequests: make(chan *blockRequest, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), + provideKeys: make(chan u.Key), } network.SetDelegate(bs) @@ -124,6 +125,8 @@ type Bitswap struct { process process.Process newBlocks chan *blocks.Block + + provideKeys chan u.Key } type blockRequest struct { diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index da521ef46..a14b30092 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -6,6 +6,7 @@ import ( inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/chuckpreslar/inflect" process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + u "github.com/jbenet/go-ipfs/util" ) func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { @@ -24,6 +25,10 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { bs.rebroadcastWorker(ctx) }) + px.Go(func(px process.Process) { + bs.provideCollector(ctx) + }) + // Spawn up multiple workers to handle incoming blocks // consider increasing number if providing blocks bottlenecks // file transfers @@ -56,6 +61,40 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { } func (bs *Bitswap) provideWorker(ctx context.Context) { + for { + select { + case k, ok := <-bs.provideKeys: + if !ok { + log.Debug("provideKeys channel closed") + return + } + ctx, _ := context.WithTimeout(ctx, provideTimeout) + err := bs.network.Provide(ctx, k) + if err != nil { + log.Error(err) + } + case <-ctx.Done(): + return + } + } +} + +func (bs *Bitswap) provideCollector(ctx context.Context) { + defer close(bs.provideKeys) + var toprovide []u.Key + var nextKey u.Key + + select { + case blk, ok := <-bs.newBlocks: + if !ok { + log.Debug("newBlocks channel closed") + return + } + nextKey = blk.Key() + case <-ctx.Done(): + return + } + for { select { case blk, ok := <-bs.newBlocks: @@ -63,10 +102,21 @@ func (bs *Bitswap) provideWorker(ctx context.Context) { log.Debug("newBlocks channel closed") return } - ctx, _ := context.WithTimeout(ctx, provideTimeout) - err := bs.network.Provide(ctx, blk.Key()) - if err != nil { - log.Error(err) + toprovide = append(toprovide, blk.Key()) + case bs.provideKeys <- nextKey: + if len(toprovide) > 0 { + nextKey = toprovide[0] + toprovide = toprovide[1:] + } else { + select { + case blk, ok := <-bs.newBlocks: + if !ok { + return + } + nextKey = blk.Key() + case <-ctx.Done(): + return + } } case <-ctx.Done(): return