diff --git a/peer/queue/sync.go b/peer/queue/sync.go index 886efba2d..f369493a7 100644 --- a/peer/queue/sync.go +++ b/peer/queue/sync.go @@ -9,50 +9,66 @@ import ( // ChanQueue makes any PeerQueue synchronizable through channels. type ChanQueue struct { Queue PeerQueue - EnqChan chan *peer.Peer - DeqChan chan *peer.Peer + EnqChan chan<- *peer.Peer + DeqChan <-chan *peer.Peer } // NewChanQueue creates a ChanQueue by wrapping pq. func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue { - cq := &ChanQueue{ - Queue: pq, - EnqChan: make(chan *peer.Peer, 10), - DeqChan: make(chan *peer.Peer, 10), - } - go cq.process(ctx) + cq := &ChanQueue{Queue: pq} + cq.process(ctx) return cq } func (cq *ChanQueue) process(ctx context.Context) { - var next *peer.Peer - for { + // construct the channels here to be able to use them bidirectionally + enqChan := make(chan *peer.Peer, 10) + deqChan := make(chan *peer.Peer, 10) - if cq.Queue.Len() == 0 { - select { - case next = <-cq.EnqChan: - case <-ctx.Done(): - close(cq.DeqChan) - return + cq.EnqChan = enqChan + cq.DeqChan = deqChan + + go func() { + defer close(deqChan) + + var next *peer.Peer + var item *peer.Peer + var more bool + + for { + if cq.Queue.Len() == 0 { + select { + case next, more = <-enqChan: + if !more { + return + } + + case <-ctx.Done(): + return + } + + } else { + next = cq.Queue.Dequeue() } - } else { - next = cq.Queue.Dequeue() + select { + case item, more = <-enqChan: + if !more { + return + } + + cq.Queue.Enqueue(item) + cq.Queue.Enqueue(next) + next = nil + + case deqChan <- next: + next = nil + + case <-ctx.Done(): + return + } } - select { - case item := <-cq.EnqChan: - cq.Queue.Enqueue(item) - cq.Queue.Enqueue(next) - next = nil - - case cq.DeqChan <- next: - next = nil - - case <-ctx.Done(): - close(cq.DeqChan) - return - } - } + }() }