From 7a785ded741de648ea50455afe32be0fe7da2470 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Thu, 18 Sep 2014 19:42:53 -0700 Subject: [PATCH] sync counter for processing things --- util/todocounter/counter.go | 118 ++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 util/todocounter/counter.go diff --git a/util/todocounter/counter.go b/util/todocounter/counter.go new file mode 100644 index 000000000..0df638204 --- /dev/null +++ b/util/todocounter/counter.go @@ -0,0 +1,118 @@ +package todocounter + +import ( + "sync" +) + +// Counter records things remaining to process. It is needed for complicated +// cases where multiple goroutines are spawned to process items, and they may +// generate more items to process. For example, say a query over a set of nodes +// may yield either a result value, or more nodes to query. Signaling is subtly +// complicated, because the queue may be empty while items are being processed, +// that will end up adding more items to the queue. +// +// Use Counter like this: +// +// todos := make(chan int, 10) +// ctr := todoctr.NewCounter() +// +// process := func(item int) { +// fmt.Println("processing %d\n...", item) +// +// // this task may randomly generate more tasks +// if rand.Intn(5) == 0 { +// todos<- item + 1 +// ctr.Increment(1) // increment counter for new task. +// } +// +// ctr.Decrement(1) // decrement one to signal the task being done. +// } +// +// // add some tasks. +// todos<- 1 +// todos<- 2 +// todos<- 3 +// todos<- 4 +// ctr.Increment(4) +// +// for { +// select { +// case item := <- todos: +// go process(item) +// case <-ctr.Done(): +// fmt.Println("done processing everything.") +// close(todos) +// } +// } +type Counter interface { + // Incrememnt adds a number of todos to track. + // If the counter is **below** zero, it panics. + Increment(i uint32) + + // Decrement removes a number of todos to track. + // If the count drops to zero, signals done and destroys the counter. + // If the count drops **below** zero, panics. It means you have tried to remove + // more things than you added, i.e. sync issues. + Decrement(i uint32) + + // Done returns a channel to wait upon. Use it in selects: + // + // select { + // case <-ctr.Done(): + // // done processing all items + // } + // + Done() <-chan struct{} +} + +type todoCounter struct { + count int32 + done chan struct{} + sync.RWMutex +} + +// NewSyncCounter constructs a new counter +func NewSyncCounter() Counter { + return &todoCounter{ + done: make(chan struct{}), + } +} + +func (c *todoCounter) Increment(i uint32) { + c.Lock() + defer c.Unlock() + + if c.count < 0 { + panic("counter already signaled done. use a new counter.") + } + + // increment count + c.count += int32(i) +} + +// Decrement removes a number of todos to track. +// If the count drops to zero, signals done and destroys the counter. +// If the count drops **below** zero, panics. It means you have tried to remove +// more things than you added, i.e. sync issues. +func (c *todoCounter) Decrement(i uint32) { + c.Lock() + defer c.Unlock() + + if c.count < 0 { + panic("counter already signaled done. probably have sync issues.") + } + + if int32(i) > c.count { + panic("decrement amount creater than counter. sync issues.") + } + + c.count -= int32(i) + if c.count == 0 { // done! signal it. + c.count-- // set it to -1 to prevent reuse + close(c.done) // a closed channel will always return nil + } +} + +func (c *todoCounter) Done() <-chan struct{} { + return c.done +}