From 63f72a5155de4f9dbb38abd2ea87029035be73b7 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 2 Sep 2015 14:44:04 -0700 Subject: [PATCH] remove context from HasBlock, use bitswap process instead License: MIT Signed-off-by: Jeromy --- blockservice/blockservice.go | 4 +- blockservice/worker/bench/main.go | 91 ----------- blockservice/worker/bench_worker_test.go | 42 ------ blockservice/worker/worker.go | 184 ----------------------- blockservice/worker/worker_test.go | 63 -------- exchange/bitswap/bitswap.go | 10 +- exchange/bitswap/bitswap_test.go | 11 +- exchange/interface.go | 2 +- exchange/offline/offline.go | 2 +- exchange/offline/offline_test.go | 4 +- 10 files changed, 14 insertions(+), 399 deletions(-) delete mode 100644 blockservice/worker/bench/main.go delete mode 100644 blockservice/worker/bench_worker_test.go delete mode 100644 blockservice/worker/worker.go delete mode 100644 blockservice/worker/worker_test.go diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 7cd8f2875..f13c090e4 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -47,7 +47,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (key.Key, error) { if err != nil { return k, err } - if err := s.Exchange.HasBlock(context.TODO(), b); err != nil { + if err := s.Exchange.HasBlock(b); err != nil { return "", errors.New("blockservice is closed") } return k, nil @@ -61,7 +61,7 @@ func (s *BlockService) AddBlocks(bs []*blocks.Block) ([]key.Key, error) { var ks []key.Key for _, b := range bs { - if err := s.Exchange.HasBlock(context.TODO(), b); err != nil { + if err := s.Exchange.HasBlock(b); err != nil { return nil, errors.New("blockservice is closed") } ks = append(ks, b.Key()) diff --git a/blockservice/worker/bench/main.go b/blockservice/worker/bench/main.go deleted file mode 100644 index c23770f78..000000000 --- a/blockservice/worker/bench/main.go +++ /dev/null @@ -1,91 +0,0 @@ -/* -Benchmark github.com/ipfs/go-ipfs/blockservice/worker. - -Loop over a range of workers and buffer sizes and measure the time it -per block-transfer operation for each value. Run with: - - $ go run "${GOPATH}/src/github.com/ipfs/go-ipfs/blockservice/worker/bench/main.go" -*/ - -package main - -import ( - "log" - "math" - "testing" - "time" - - ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - ds_sync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - blocks "github.com/ipfs/go-ipfs/blocks" - blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" - worker "github.com/ipfs/go-ipfs/blockservice/worker" - "github.com/ipfs/go-ipfs/exchange/offline" - "github.com/ipfs/go-ipfs/thirdparty/delay" - "github.com/ipfs/go-ipfs/util/datastore2" -) - -const kEstRoutingDelay = time.Second - -const kBlocksPerOp = 100 - -func main() { - var bestConfig worker.Config - var quickestNsPerOp int64 = math.MaxInt64 - for NumWorkers := 1; NumWorkers < 10; NumWorkers++ { - for ClientBufferSize := 0; ClientBufferSize < 10; ClientBufferSize++ { - for WorkerBufferSize := 0; WorkerBufferSize < 10; WorkerBufferSize++ { - c := worker.Config{ - NumWorkers: NumWorkers, - ClientBufferSize: ClientBufferSize, - WorkerBufferSize: WorkerBufferSize, - } - result := testing.Benchmark(BenchmarkWithConfig(c)) - if result.NsPerOp() < quickestNsPerOp { - bestConfig = c - quickestNsPerOp = result.NsPerOp() - } - log.Printf("benched %+v \t result: %+v", c, result) - } - } - } - log.Println(bestConfig) -} - -func BenchmarkWithConfig(c worker.Config) func(b *testing.B) { - return func(b *testing.B) { - - routingDelay := delay.Fixed(0) // during setup - - dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), routingDelay)) - bstore := blockstore.NewBlockstore(dstore) - var testdata []*blocks.Block - var i int64 - for i = 0; i < kBlocksPerOp; i++ { - testdata = append(testdata, blocks.NewBlock([]byte(string(i)))) - } - b.ResetTimer() - b.SetBytes(kBlocksPerOp) - for i := 0; i < b.N; i++ { - - b.StopTimer() - w := worker.NewWorker(offline.Exchange(bstore), c) - b.StartTimer() - - prev := routingDelay.Set(kEstRoutingDelay) // during measured section - - for _, block := range testdata { - if err := w.HasBlock(block); err != nil { - b.Fatal(err) - } - } - - routingDelay.Set(prev) // to hasten the unmeasured close period - - b.StopTimer() - w.Close() - b.StartTimer() - - } - } -} diff --git a/blockservice/worker/bench_worker_test.go b/blockservice/worker/bench_worker_test.go deleted file mode 100644 index a5e34a107..000000000 --- a/blockservice/worker/bench_worker_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package worker - -import ( - "testing" - - ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - blocks "github.com/ipfs/go-ipfs/blocks" - blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" - "github.com/ipfs/go-ipfs/exchange/offline" -) - -func BenchmarkHandle10KBlocks(b *testing.B) { - bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - var testdata []*blocks.Block - for i := 0; i < 10000; i++ { - testdata = append(testdata, blocks.NewBlock([]byte(string(i)))) - } - b.ResetTimer() - b.SetBytes(10000) - for i := 0; i < b.N; i++ { - - b.StopTimer() - w := NewWorker(offline.Exchange(bstore), Config{ - NumWorkers: 1, - ClientBufferSize: 0, - WorkerBufferSize: 0, - }) - b.StartTimer() - - for _, block := range testdata { - if err := w.HasBlock(block); err != nil { - b.Fatal(err) - } - } - - b.StopTimer() - w.Close() - b.StartTimer() - - } -} diff --git a/blockservice/worker/worker.go b/blockservice/worker/worker.go deleted file mode 100644 index 3b4df2d73..000000000 --- a/blockservice/worker/worker.go +++ /dev/null @@ -1,184 +0,0 @@ -// TODO FIXME name me -package worker - -import ( - "container/list" - "errors" - "time" - - process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" - procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" - ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" - blocks "github.com/ipfs/go-ipfs/blocks" - key "github.com/ipfs/go-ipfs/blocks/key" - exchange "github.com/ipfs/go-ipfs/exchange" - - logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0" -) - -var log = logging.Logger("blockservice") - -var DefaultConfig = Config{ - NumWorkers: 1, - ClientBufferSize: 0, - WorkerBufferSize: 0, -} - -type Config struct { - // NumWorkers sets the number of background workers that provide blocks to - // the exchange. - NumWorkers int - - // ClientBufferSize allows clients of HasBlock to send up to - // |ClientBufferSize| blocks without blocking. - ClientBufferSize int - - // WorkerBufferSize can be used in conjunction with NumWorkers to reduce - // communication-coordination within the worker. - WorkerBufferSize int -} - -// TODO FIXME name me -type Worker struct { - // added accepts blocks from client - added chan *blocks.Block - exchange exchange.Interface - - // workQueue is owned by the client worker - // process manages life-cycle - process process.Process -} - -func NewWorker(e exchange.Interface, c Config) *Worker { - if c.NumWorkers < 1 { - c.NumWorkers = 1 // provide a sane default - } - w := &Worker{ - exchange: e, - added: make(chan *blocks.Block, c.ClientBufferSize), - process: process.WithParent(process.Background()), // internal management - } - w.start(c) - return w -} - -func (w *Worker) HasBlock(b *blocks.Block) error { - select { - case <-w.process.Closed(): - return errors.New("blockservice worker is closed") - case w.added <- b: - return nil - } -} - -func (w *Worker) Close() error { - log.Debug("blockservice provide worker is shutting down...") - return w.process.Close() -} - -func (w *Worker) start(c Config) { - - workerChan := make(chan *blocks.Block, c.WorkerBufferSize) - - // clientWorker handles incoming blocks from |w.added| and sends to - // |workerChan|. This will never block the client. - w.process.Go(func(proc process.Process) { - defer close(workerChan) - - var workQueue BlockList - debugInfo := time.NewTicker(5 * time.Second) - defer debugInfo.Stop() - for { - - // take advantage of the fact that sending on nil channel always - // blocks so that a message is only sent if a block exists - sendToWorker := workerChan - nextBlock := workQueue.Pop() - if nextBlock == nil { - sendToWorker = nil - } - - select { - - // if worker is ready and there's a block to process, send the - // block - case sendToWorker <- nextBlock: - case <-debugInfo.C: - if workQueue.Len() > 0 { - log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len()) - } - case block := <-w.added: - if nextBlock != nil { - workQueue.Push(nextBlock) // missed the chance to send it - } - // if the client sends another block, add it to the queue. - workQueue.Push(block) - case <-proc.Closing(): - return - } - } - }) - - // reads from |workerChan| until w.process closes - limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers) - limiter.Go(func(proc process.Process) { - ctx := procctx.OnClosingContext(proc) // shut down in-progress HasBlock when time to die - for { - select { - case <-proc.Closing(): - return - case block, ok := <-workerChan: - if !ok { - return - } - limiter.LimitedGo(func(proc process.Process) { - if err := w.exchange.HasBlock(ctx, block); err != nil { - log.Infof("blockservice worker error: %s", err) - } - }) - } - } - }) -} - -type BlockList struct { - list list.List - uniques map[key.Key]*list.Element -} - -func (s *BlockList) PushFront(b *blocks.Block) { - if s.uniques == nil { - s.uniques = make(map[key.Key]*list.Element) - } - _, ok := s.uniques[b.Key()] - if !ok { - e := s.list.PushFront(b) - s.uniques[b.Key()] = e - } -} - -func (s *BlockList) Push(b *blocks.Block) { - if s.uniques == nil { - s.uniques = make(map[key.Key]*list.Element) - } - _, ok := s.uniques[b.Key()] - if !ok { - e := s.list.PushBack(b) - s.uniques[b.Key()] = e - } -} - -func (s *BlockList) Pop() *blocks.Block { - if s.list.Len() == 0 { - return nil - } - e := s.list.Front() - s.list.Remove(e) - b := e.Value.(*blocks.Block) - delete(s.uniques, b.Key()) - return b -} - -func (s *BlockList) Len() int { - return s.list.Len() -} diff --git a/blockservice/worker/worker_test.go b/blockservice/worker/worker_test.go deleted file mode 100644 index 2b6a2d16f..000000000 --- a/blockservice/worker/worker_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package worker - -import ( - blocks "github.com/ipfs/go-ipfs/blocks" - "testing" -) - -func TestStartClose(t *testing.T) { - numRuns := 50 - if testing.Short() { - numRuns = 5 - } - for i := 0; i < numRuns; i++ { - w := NewWorker(nil, DefaultConfig) - w.Close() - } -} - -func TestQueueDeduplication(t *testing.T) { - numUniqBlocks := 5 // arbitrary - - var firstBatch []*blocks.Block - for i := 0; i < numUniqBlocks; i++ { - firstBatch = append(firstBatch, blockFromInt(i)) - } - - // to get different pointer values and prevent the implementation from - // cheating. The impl must check equality using Key. - var secondBatch []*blocks.Block - for i := 0; i < numUniqBlocks; i++ { - secondBatch = append(secondBatch, blockFromInt(i)) - } - var workQueue BlockList - - for _, b := range append(firstBatch, secondBatch...) { - workQueue.Push(b) - } - for i := 0; i < numUniqBlocks; i++ { - b := workQueue.Pop() - if b.Key() != firstBatch[i].Key() { - t.Fatal("list is not FIFO") - } - } - if b := workQueue.Pop(); b != nil { - t.Fatal("the workQueue did not de-duplicate the blocks") - } -} - -func TestPushPopPushPop(t *testing.T) { - var workQueue BlockList - orig := blockFromInt(1) - dup := blockFromInt(1) - workQueue.PushFront(orig) - workQueue.Pop() - workQueue.Push(dup) - if workQueue.Len() != 1 { - t.Fatal("the block list's internal state is corrupt") - } -} - -func blockFromInt(i int) *blocks.Block { - return blocks.NewBlock([]byte(string(i))) -} diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 059e23414..2f2e88ea4 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -228,7 +228,7 @@ func (bs *Bitswap) CancelWants(ks []key.Key) { // HasBlock announces the existance of a block to this bitswap service. The // service will potentially notify its peers. -func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { +func (bs *Bitswap) HasBlock(blk *blocks.Block) error { select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -246,8 +246,8 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { select { case bs.newBlocks <- blk: // send block off to be reprovided - case <-ctx.Done(): - return ctx.Err() + case <-bs.process.Closing(): + return bs.process.Close() } return nil } @@ -328,9 +328,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg log.Event(ctx, "Bitswap.GetBlockRequest.End", &k) log.Debugf("got block %s from %s", b, p) - hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) - defer cancel() - if err := bs.HasBlock(hasBlockCtx, b); err != nil { + if err := bs.HasBlock(b); err != nil { log.Warningf("ReceiveMessage HasBlock error: %s", err) } }(block) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 8f4b6f61f..c6de90d78 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -70,7 +70,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { hasBlock := peers[0] defer hasBlock.Exchange.Close() - if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { + if err := hasBlock.Exchange.HasBlock(block); err != nil { t.Fatal(err) } @@ -162,7 +162,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { first := instances[0] for _, b := range blocks { blkeys = append(blkeys, b.Key()) - first.Exchange.HasBlock(ctx, b) + first.Exchange.HasBlock(b) } t.Log("Distribute!") @@ -224,7 +224,6 @@ func TestSendToWantingPeer(t *testing.T) { t.Logf("Session %v\n", peerA.Peer) t.Logf("Session %v\n", peerB.Peer) - timeout := time.Second waitTime := time.Second * 5 alpha := bg.Next() @@ -237,9 +236,7 @@ func TestSendToWantingPeer(t *testing.T) { } // peerB announces to the network that he has block alpha - ctx, cancel = context.WithTimeout(context.Background(), timeout) - defer cancel() - err = peerB.Exchange.HasBlock(ctx, alpha) + err = peerB.Exchange.HasBlock(alpha) if err != nil { t.Fatal(err) } @@ -266,7 +263,7 @@ func TestBasicBitswap(t *testing.T) { instances := sg.Instances(2) blocks := bg.Blocks(1) - err := instances[0].Exchange.HasBlock(context.Background(), blocks[0]) + err := instances[0].Exchange.HasBlock(blocks[0]) if err != nil { t.Fatal(err) } diff --git a/exchange/interface.go b/exchange/interface.go index 81ae3483a..1a149ed9d 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -19,7 +19,7 @@ type Interface interface { // TODO Should callers be concerned with whether the block was made // available on the network? - HasBlock(context.Context, *blocks.Block) error + HasBlock(*blocks.Block) error io.Closer } diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index 9cf125ce0..9a448906e 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -28,7 +28,7 @@ func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (*blocks.Block, } // HasBlock always returns nil. -func (e *offlineExchange) HasBlock(_ context.Context, b *blocks.Block) error { +func (e *offlineExchange) HasBlock(b *blocks.Block) error { return e.bs.Put(b) } diff --git a/exchange/offline/offline_test.go b/exchange/offline/offline_test.go index 41e8bb216..dc0071606 100644 --- a/exchange/offline/offline_test.go +++ b/exchange/offline/offline_test.go @@ -26,7 +26,7 @@ func TestHasBlockReturnsNil(t *testing.T) { ex := Exchange(store) block := blocks.NewBlock([]byte("data")) - err := ex.HasBlock(context.Background(), block) + err := ex.HasBlock(block) if err != nil { t.Fail() } @@ -44,7 +44,7 @@ func TestGetBlocks(t *testing.T) { expected := g.Blocks(2) for _, b := range expected { - if err := ex.HasBlock(context.Background(), b); err != nil { + if err := ex.HasBlock(b); err != nil { t.Fail() } }