From c43f97d64e8a7431553009ee588ace009a69ab09 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 20 Jan 2015 05:53:56 -0800 Subject: [PATCH] updated goprocess, for periodic --- Godeps/Godeps.json | 2 +- .../github.com/jbenet/goprocess/.travis.yml | 11 + .../src/github.com/jbenet/goprocess/LICENSE | 21 ++ .../src/github.com/jbenet/goprocess/README.md | 2 + .../github.com/jbenet/goprocess/goprocess.go | 2 +- .../jbenet/goprocess/impl-goroutines.go | 114 -------- .../github.com/jbenet/goprocess/impl-mutex.go | 29 +- .../jbenet/goprocess/periodic/README.md | 4 + .../goprocess/periodic/examples_test.go | 85 ++++++ .../jbenet/goprocess/periodic/periodic.go | 232 ++++++++++++++++ .../goprocess/periodic/periodic_test.go | 260 ++++++++++++++++++ 11 files changed, 638 insertions(+), 124 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE delete mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index f7c7ab7cc..5c85e4117 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -172,7 +172,7 @@ }, { "ImportPath": "github.com/jbenet/goprocess", - "Rev": "7f96033e206c3cd4e79d1c61cbdfff57869feaf8" + "Rev": "c37725a4a97d6ad772818b071ceef82789562142" }, { "ImportPath": "github.com/kr/binarydist", diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml new file mode 100644 index 000000000..7669438ed --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml @@ -0,0 +1,11 @@ +language: go + +go: + - 1.2 + - 1.3 + - 1.4 + - release + - tip + +script: + - go test -v ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE b/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE new file mode 100644 index 000000000..c7386b3c9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Juan Batiz-Benet + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md b/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md index a19ed3197..e2f12e16d 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md @@ -1,5 +1,7 @@ # goprocess - lifecycles in go +[![travisbadge](https://travis-ci.org/jbenet/goprocess.svg)](https://travis-ci.org/jbenet/goprocess) + (Based on https://github.com/jbenet/go-ctxgroup) - Godoc: https://godoc.org/github.com/jbenet/goprocess diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go index afe848c61..762cecb20 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go @@ -18,7 +18,7 @@ import ( // More specifically, it fits this: // // p := WithTeardown(tf) // new process is created, it is now running. -// p.AddChild(q) // can register children **before** Closing. +// p.AddChild(q) // can register children **before** Closed(). // go p.Close() // blocks until done running teardown func. // <-p.Closing() // would now return true. // <-p.childrenDone() // wait on all children to be done diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go deleted file mode 100644 index 831dc938f..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go +++ /dev/null @@ -1,114 +0,0 @@ -// +build ignore - -// WARNING: this implementation is not correct. -// here only for historical purposes. - -package goprocess - -import ( - "sync" -) - -// process implements Process -type process struct { - children sync.WaitGroup // wait group for child goroutines - teardown TeardownFunc // called to run the teardown logic. - closing chan struct{} // closed once close starts. - closed chan struct{} // closed once close is done. - closeOnce sync.Once // ensure close is only called once. - closeErr error // error to return to clients of Close() -} - -// newProcess constructs and returns a Process. -// It will call tf TeardownFunc exactly once: -// **after** all children have fully Closed, -// **after** entering <-Closing(), and -// **before** <-Closed(). -func newProcess(tf TeardownFunc) *process { - if tf == nil { - tf = nilTeardownFunc - } - - return &process{ - teardown: tf, - closed: make(chan struct{}), - closing: make(chan struct{}), - } -} - -func (p *process) WaitFor(q Process) { - p.children.Add(1) // p waits on q to be done - go func(p *process, q Process) { - <-q.Closed() // wait until q is closed - p.children.Done() // p done waiting on q - }(p, q) -} - -func (p *process) AddChildNoWait(child Process) { - go func(p, child Process) { - <-p.Closing() // wait until p is closing - child.Close() // close child - }(p, child) -} - -func (p *process) AddChild(child Process) { - select { - case <-p.Closing(): - panic("attempt to add child to closing or closed process") - default: - } - - p.children.Add(1) // p waits on child to be done - go func(p *process, child Process) { - <-p.Closing() // wait until p is closing - child.Close() // close child and wait - p.children.Done() // p done waiting on child - }(p, child) -} - -func (p *process) Go(f ProcessFunc) Process { - select { - case <-p.Closing(): - panic("attempt to add child to closing or closed process") - default: - } - - // this is very similar to AddChild, but also runs the func - // in the child. we replicate it here to save one goroutine. - child := newProcessGoroutines(nil) - child.children.Add(1) // child waits on func to be done - p.AddChild(child) - go func() { - f(child) - child.children.Done() // wait on child's children to be done. - child.Close() // close to tear down. - }() - return child -} - -// Close is the external close function. -// it's a wrapper around internalClose that waits on Closed() -func (p *process) Close() error { - p.closeOnce.Do(p.doClose) - <-p.Closed() // sync.Once should block, but this checks chan is closed too - return p.closeErr -} - -func (p *process) Closing() <-chan struct{} { - return p.closing -} - -func (p *process) Closed() <-chan struct{} { - return p.closed -} - -// the _actual_ close process. -func (p *process) doClose() { - // this function should only be called once (hence the sync.Once). - // and it will panic (on closing channels) otherwise. - - close(p.closing) // signal that we're shutting down (Closing) - p.children.Wait() // wait till all children are done (before teardown) - p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) - close(p.closed) // signal that we're shut down (Closed) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go index ed68b9a03..633d5b056 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go @@ -92,16 +92,18 @@ func (p *process) Go(f ProcessFunc) Process { // it's a wrapper around internalClose that waits on Closed() func (p *process) Close() error { p.Lock() - defer p.Unlock() - // if already closed, get out. + // if already closing, or closed, get out. (but wait!) select { - case <-p.Closed(): + case <-p.Closing(): + p.Unlock() + <-p.Closed() return p.closeErr default: } p.doClose() + p.Unlock() return p.closeErr } @@ -120,12 +122,23 @@ func (p *process) doClose() { close(p.closing) // signal that we're shutting down (Closing) - for _, c := range p.children { - go c.Close() // force all children to shut down - } + for len(p.children) > 0 || len(p.waitfors) > 0 { + for _, c := range p.children { + go c.Close() // force all children to shut down + } + p.children = nil // clear them - for _, w := range p.waitfors { - <-w.Closed() // wait till all waitfors are fully closed (before teardown) + // we must be careful not to iterate over waitfors directly, as it may + // change under our feet. + wf := p.waitfors + p.waitfors = nil // clear them + for _, w := range wf { + // Here, we wait UNLOCKED, so that waitfors who are in the middle of + // adding a child to us can finish. we will immediately close the child. + p.Unlock() + <-w.Closed() // wait till all waitfors are fully closed (before teardown) + p.Lock() + } } p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md new file mode 100644 index 000000000..7a2c55db1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md @@ -0,0 +1,4 @@ +# goprocess/periodic - periodic process creation + +- goprocess: https://github.com/jbenet/goprocess +- Godoc: https://godoc.org/github.com/jbenet/goprocess/periodic diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go new file mode 100644 index 000000000..782353db1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go @@ -0,0 +1,85 @@ +package periodicproc_test + +import ( + "fmt" + "time" + + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" +) + +func ExampleEvery() { + tock := make(chan struct{}) + + i := 0 + p := periodicproc.Every(time.Second, func(proc goprocess.Process) { + tock <- struct{}{} + fmt.Printf("hello %d\n", i) + i++ + }) + + <-tock + <-tock + <-tock + p.Close() + + // Output: + // hello 0 + // hello 1 + // hello 2 +} + +func ExampleTick() { + p := periodicproc.Tick(time.Second, func(proc goprocess.Process) { + fmt.Println("tick") + }) + + <-time.After(3*time.Second + 500*time.Millisecond) + p.Close() + + // Output: + // tick + // tick + // tick +} + +func ExampleTickGo() { + + // with TickGo, execution is not rate limited, + // there can be many in-flight simultaneously + + wait := make(chan struct{}) + p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) { + fmt.Println("tick") + <-wait + }) + + <-time.After(3*time.Second + 500*time.Millisecond) + + wait <- struct{}{} + wait <- struct{}{} + wait <- struct{}{} + p.Close() // blocks us until all children are closed. + + // Output: + // tick + // tick + // tick +} + +func ExampleOnSignal() { + sig := make(chan struct{}) + p := periodicproc.OnSignal(sig, func(proc goprocess.Process) { + fmt.Println("fire!") + }) + + sig <- struct{}{} + sig <- struct{}{} + sig <- struct{}{} + p.Close() + + // Output: + // fire! + // fire! + // fire! +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go new file mode 100644 index 000000000..ce1c4611e --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go @@ -0,0 +1,232 @@ +// Package periodic is part of github.com/jbenet/goprocess. +// It provides a simple periodic processor that calls a function +// periodically based on some options. +// +// For example: +// +// // use a time.Duration +// p := periodicproc.Every(time.Second, func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// <-time.After(5*time.Second) +// p.Close() +// +// // use a time.Time channel (like time.Ticker) +// p := periodicproc.Tick(time.Tick(time.Second), func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// <-time.After(5*time.Second) +// p.Close() +// +// // or arbitrary signals +// signal := make(chan struct{}) +// p := periodicproc.OnSignal(signal, func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// signal<- struct{}{} +// signal<- struct{}{} +// <-time.After(5 * time.Second) +// signal<- struct{}{} +// p.Close() +// +package periodicproc + +import ( + "time" + + gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + +// Every calls the given ProcessFunc at periodic intervals. Internally, it uses +// <-time.After(interval), so it will have the behavior of waiting _at least_ +// interval in between calls. If you'd prefer the time.Ticker behavior, use +// periodicproc.Tick instead. +// This is sequentially rate limited, only one call will be in-flight at a time. +func Every(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-time.After(interval): + select { + case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// EveryGo calls the given ProcessFunc at periodic intervals. Internally, it uses +// <-time.After(interval) +// This is not rate limited, multiple calls could be in-flight at the same time. +func EveryGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-time.After(interval): + proc.Go(procfunc) + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// Tick constructs a ticker with interval, and calls the given ProcessFunc every +// time the ticker fires. +// This is sequentially rate limited, only one call will be in-flight at a time. +// +// p := periodicproc.Tick(time.Second, func(proc goprocess.Process) { +// fmt.Println("fire!") +// }) +// +// <-time.After(3 * time.Second) +// p.Close() +// +// // Output: +// // fire! +// // fire! +// // fire! +func Tick(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + ticker := time.NewTicker(interval) + callOnTicker(ticker.C, procfunc)(proc) + ticker.Stop() + }) +} + +// TickGo constructs a ticker with interval, and calls the given ProcessFunc every +// time the ticker fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +// +// p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(10 * time.Second) // will not block sequential execution +// }) +// +// <-time.After(3 * time.Second) +// p.Close() +// +// // Output: +// // fire! +// // fire! +// // fire! +func TickGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + ticker := time.NewTicker(interval) + goCallOnTicker(ticker.C, procfunc)(proc) + ticker.Stop() + }) +} + +// Ticker calls the given ProcessFunc every time the ticker fires. +// This is sequentially rate limited, only one call will be in-flight at a time. +func Ticker(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(callOnTicker(ticker, procfunc)) +} + +// TickerGo calls the given ProcessFunc every time the ticker fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +func TickerGo(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(goCallOnTicker(ticker, procfunc)) +} + +func callOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc { + return func(proc gp.Process) { + for { + select { + case <-ticker: + select { + case <-proc.Go(pf).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + } +} + +func goCallOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc { + return func(proc gp.Process) { + for { + select { + case <-ticker: + proc.Go(pf) + case <-proc.Closing(): // we're told to close + return + } + } + } +} + +// OnSignal calls the given ProcessFunc every time the signal fires, and waits for it to exit. +// This is sequentially rate limited, only one call will be in-flight at a time. +// +// sig := make(chan struct{}) +// p := periodicproc.OnSignal(sig, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(time.Second) // delays sequential execution by 1 second +// }) +// +// sig<- struct{} +// sig<- struct{} +// sig<- struct{} +// +// // Output: +// // fire! +// // fire! +// // fire! +func OnSignal(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-sig: + select { + case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// OnSignalGo calls the given ProcessFunc every time the signal fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +// +// sig := make(chan struct{}) +// p := periodicproc.OnSignalGo(sig, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(time.Second) // wont block execution +// }) +// +// sig<- struct{} +// sig<- struct{} +// sig<- struct{} +// +// // Output: +// // fire! +// // fire! +// // fire! +func OnSignalGo(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-sig: + proc.Go(procfunc) + case <-proc.Closing(): // we're told to close + return + } + } + }) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go new file mode 100644 index 000000000..c79ed50c6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go @@ -0,0 +1,260 @@ +package periodicproc + +import ( + "testing" + "time" + + ci "github.com/jbenet/go-cienv" + gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + +var ( + grace = time.Millisecond * 5 + interval = time.Millisecond * 10 + timeout = time.Second * 5 +) + +func init() { + if ci.IsRunning() { + grace = time.Millisecond * 500 + interval = time.Millisecond * 1000 + timeout = time.Second * 15 + } +} + +func between(min, diff, max time.Duration) bool { + return min <= diff && diff <= max +} + +func testBetween(t *testing.T, min, diff, max time.Duration) { + if !between(min, diff, max) { + t.Error("time diff incorrect:", min, diff, max) + } +} + +type intervalFunc func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) + +func testSeq(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + p := toTest(times, nil) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + last = next + } + + go p.Close() + select { + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testSeqWait(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + + <-time.After(interval * 2) // make it wait. + last = time.Now() // make it now (sequential) + wait <- struct{}{} // release it. + } + + go p.Close() + + select { + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testSeqNoWait(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, 0, next.Sub(last), interval+grace) // min of 0 + + <-time.After(interval * 2) // make it wait. + last = time.Now() // make it now (sequential) + wait <- struct{}{} // release it. + } + + go p.Close() + +end: + select { + case wait <- struct{}{}: // drain any extras. + goto end + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testParallel(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + last = next + + <-time.After(interval * 2) // make it wait. + wait <- struct{}{} // release it. + } + + go p.Close() + +end: + select { + case wait <- struct{}{}: // drain any extras. + goto end + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func TestEverySeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Every(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestEverySeqWait(t *testing.T) { + testSeqWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Every(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestEveryGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return EveryGo(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestEveryGoSeqParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return EveryGo(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Tick(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickSeqNoWait(t *testing.T) { + testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Tick(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickGo(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickGoSeqParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickGo(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickerSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Ticker(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickerSeqNoWait(t *testing.T) { + testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Ticker(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickerGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickerGo(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickerGoParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickerGo(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +}