implement a worker to consolidate HasBlock provide calls into one to alieviate memory pressure

This commit is contained in:
Jeromy 2015-03-05 15:18:57 -08:00
parent 4b7493e9fd
commit 8937f5fbda
2 changed files with 57 additions and 4 deletions

View File

@ -89,6 +89,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key),
}
network.SetDelegate(bs)
@ -124,6 +125,8 @@ type Bitswap struct {
process process.Process
newBlocks chan *blocks.Block
provideKeys chan u.Key
}
type blockRequest struct {

View File

@ -6,6 +6,7 @@ import (
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/chuckpreslar/inflect"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
u "github.com/jbenet/go-ipfs/util"
)
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
@ -24,6 +25,10 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
bs.rebroadcastWorker(ctx)
})
px.Go(func(px process.Process) {
bs.provideCollector(ctx)
})
// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
@ -56,6 +61,40 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
}
func (bs *Bitswap) provideWorker(ctx context.Context) {
for {
select {
case k, ok := <-bs.provideKeys:
if !ok {
log.Debug("provideKeys channel closed")
return
}
ctx, _ := context.WithTimeout(ctx, provideTimeout)
err := bs.network.Provide(ctx, k)
if err != nil {
log.Error(err)
}
case <-ctx.Done():
return
}
}
}
func (bs *Bitswap) provideCollector(ctx context.Context) {
defer close(bs.provideKeys)
var toprovide []u.Key
var nextKey u.Key
select {
case blk, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}
nextKey = blk.Key()
case <-ctx.Done():
return
}
for {
select {
case blk, ok := <-bs.newBlocks:
@ -63,10 +102,21 @@ func (bs *Bitswap) provideWorker(ctx context.Context) {
log.Debug("newBlocks channel closed")
return
}
ctx, _ := context.WithTimeout(ctx, provideTimeout)
err := bs.network.Provide(ctx, blk.Key())
if err != nil {
log.Error(err)
toprovide = append(toprovide, blk.Key())
case bs.provideKeys <- nextKey:
if len(toprovide) > 0 {
nextKey = toprovide[0]
toprovide = toprovide[1:]
} else {
select {
case blk, ok := <-bs.newBlocks:
if !ok {
return
}
nextKey = blk.Key()
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return