From acc714823b300cfc3e08fcbcb2e69bbc8fb06714 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Tue, 16 Dec 2014 23:07:58 -0800 Subject: [PATCH] rename to peerRequestQueue this opens up the possibility of having multiple queues. And for all outgoing messages to be managed by the decision engine License: MIT Signed-off-by: Brian Tiger Chow --- exchange/bitswap/decision/engine.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 3b81d2582..b8018eef0 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -22,9 +22,10 @@ type Envelope struct { } type Engine struct { - // FIXME taskqueue isn't threadsafe nor is it protected by a mutex. consider - // a way to avoid sharing the taskqueue between the worker and the receiver - taskqueue *taskQueue + // FIXME peerRequestQueue isn't threadsafe nor is it protected by a mutex. + // consider a way to avoid sharing the peerRequestQueue between the worker + // and the receiver + peerRequestQueue *taskQueue workSignal chan struct{} @@ -39,11 +40,11 @@ type Engine struct { func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { e := &Engine{ - ledgerMap: make(map[u.Key]*ledger), - bs: bs, - taskqueue: newTaskQueue(), - outbox: make(chan Envelope, 4), // TODO extract constant - workSignal: make(chan struct{}), + ledgerMap: make(map[u.Key]*ledger), + bs: bs, + peerRequestQueue: newTaskQueue(), + outbox: make(chan Envelope, 4), // TODO extract constant + workSignal: make(chan struct{}), } go e.taskWorker(ctx) return e @@ -51,7 +52,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { func (e *Engine) taskWorker(ctx context.Context) { for { - nextTask := e.taskqueue.Pop() + nextTask := e.peerRequestQueue.Pop() if nextTask == nil { // No tasks in the list? // Wait until there are! @@ -128,11 +129,11 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { l.CancelWant(entry.Key) - e.taskqueue.Remove(entry.Key, p) + e.peerRequestQueue.Remove(entry.Key, p) } else { l.Wants(entry.Key, entry.Priority) newWorkExists = true - e.taskqueue.Push(entry.Key, entry.Priority, p) + e.peerRequestQueue.Push(entry.Key, entry.Priority, p) } } @@ -142,7 +143,7 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { for _, l := range e.ledgerMap { if l.WantListContains(block.Key()) { newWorkExists = true - e.taskqueue.Push(block.Key(), 1, l.Partner) + e.peerRequestQueue.Push(block.Key(), 1, l.Partner) } } } @@ -163,7 +164,7 @@ func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { for _, block := range m.Blocks() { l.SentBytes(len(block.Data)) l.wantList.Remove(block.Key()) - e.taskqueue.Remove(block.Key(), p) + e.peerRequestQueue.Remove(block.Key(), p) } return nil