diff --git a/thirdparty/notifier/notifier.go b/thirdparty/notifier/notifier.go new file mode 100644 index 000000000..5549c90ad --- /dev/null +++ b/thirdparty/notifier/notifier.go @@ -0,0 +1,113 @@ +// Package notifier provides a simple notification dispatcher +// meant to be embedded in larger structres who wish to allow +// clients to sign up for event notifications. +package notifier + +import ( + "sync" +) + +// Notifiee is a generic interface. Clients implement +// their own Notifiee interfaces to ensure type-safety +// of notifications: +// +// type RocketNotifiee interface{ +// Countdown(r Rocket, countdown time.Duration) +// LiftedOff(Rocket) +// ReachedOrbit(Rocket) +// Detached(Rocket, Capsule) +// Landed(Rocket) +// } +// +type Notifiee interface{} + +// Notifier is a notification dispatcher. It's meant +// to be composed, and its zero-value is ready to be used. +// +// type Rocket struct { +// notifier notifier.Notifier +// } +// +type Notifier struct { + mu sync.RWMutex // guards notifiees + nots map[Notifiee]struct{} +} + +// Notify signs up Notifiee e for notifications. This function +// is meant to be called behind your own type-safe function(s): +// +// // generic function for pattern-following +// func (r *Rocket) Notify(n Notifiee) { +// r.notifier.Notify(n) +// } +// +// // or as part of other functions +// func (r *Rocket) Onboard(a Astronaut) { +// r.astronauts = append(r.austronauts, a) +// r.notifier.Notify(a) +// } +// +func (n *Notifier) Notify(e Notifiee) { + n.mu.Lock() + if n.nots == nil { // so that zero-value is ready to be used. + n.nots = make(map[Notifiee]struct{}) + } + n.nots[e] = struct{}{} + n.mu.Unlock() +} + +// StopNotifying stops notifying Notifiee e. This function +// is meant to be called behind your own type-safe function(s): +// +// // generic function for pattern-following +// func (r *Rocket) StopNotify(n Notifiee) { +// r.notifier.StopNotify(n) +// } +// +// // or as part of other functions +// func (r *Rocket) Detach(c Capsule) { +// r.notifier.StopNotify(c) +// r.capsule = nil +// } +// +func (n *Notifier) StopNotify(e Notifiee) { + n.mu.Lock() + if n.nots != nil { // so that zero-value is ready to be used. + delete(n.nots, e) + } + n.mu.Unlock() +} + +// NotifyAll messages the notifier's notifiees with a given notification. +// This is done by calling the given function with each notifiee. It is +// meant to be called with your own type-safe notification functions: +// +// func (r *Rocket) Launch() { +// r.notifyAll(func(n Notifiee) { +// n.Launched(r) +// }) +// } +// +// // make it private so only you can use it. This function is necessary +// // to make sure you only up-cast in one place. You control who you added +// // to be a notifiee. If Go adds generics, maybe we can get rid of this +// // method but for now it is like wrapping a type-less container with +// // a type safe interface. +// func (r *Rocket) notifyAll(notify func(Notifiee)) { +// r.notifier.NotifyAll(func(n notifier.Notifiee) { +// notify(n.(Notifiee)) +// }) +// } +// +func (n *Notifier) NotifyAll(notify func(Notifiee)) { + n.mu.Lock() + if n.nots != nil { // so that zero-value is ready to be used. + for notifiee := range n.nots { + // we spin out a goroutine so that whatever the notification does + // it _never_ blocks out the client. This is so that consumers + // _cannot_ add hooks into your object that block you accidentally. + go notify(notifiee) + } + } + n.mu.Unlock() +} diff --git a/thirdparty/notifier/notifier_test.go b/thirdparty/notifier/notifier_test.go new file mode 100644 index 000000000..15741bf38 --- /dev/null +++ b/thirdparty/notifier/notifier_test.go @@ -0,0 +1,207 @@ +package notifier + +import ( + "fmt" + "sync" + "testing" +) + +// test data structures +type Router struct { + queue chan Packet + notifier Notifier +} + +type Packet struct{} + +type RouterNotifiee interface { + Enqueued(*Router, Packet) + Forwarded(*Router, Packet) + Dropped(*Router, Packet) +} + +func (r *Router) Notify(n RouterNotifiee) { + r.notifier.Notify(n) +} + +func (r *Router) StopNotify(n RouterNotifiee) { + r.notifier.StopNotify(n) +} + +func (r *Router) notifyAll(notify func(n RouterNotifiee)) { + r.notifier.NotifyAll(func(n Notifiee) { + notify(n.(RouterNotifiee)) + }) +} + +func (r *Router) Receive(p Packet) { + + select { + case r.queue <- p: // enqueued + r.notifyAll(func(n RouterNotifiee) { + n.Enqueued(r, p) + }) + + default: // drop + r.notifyAll(func(n RouterNotifiee) { + n.Dropped(r, p) + }) + } +} + +func (r *Router) Forward() { + p := <-r.queue + r.notifyAll(func(n RouterNotifiee) { + n.Forwarded(r, p) + }) +} + +type Metrics struct { + enqueued int + forwarded int + dropped int + received chan struct{} + sync.Mutex +} + +func (m *Metrics) Enqueued(*Router, Packet) { + m.Lock() + m.enqueued++ + m.Unlock() + if m.received != nil { + m.received <- struct{}{} + } +} + +func (m *Metrics) Forwarded(*Router, Packet) { + m.Lock() + m.forwarded++ + m.Unlock() + if m.received != nil { + m.received <- struct{}{} + } +} + +func (m *Metrics) Dropped(*Router, Packet) { + m.Lock() + m.dropped++ + m.Unlock() + if m.received != nil { + m.received <- struct{}{} + } +} + +func (m *Metrics) String() string { + m.Lock() + defer m.Unlock() + return fmt.Sprintf("%d enqueued, %d forwarded, %d in queue, %d dropped", + m.enqueued, m.forwarded, m.enqueued-m.forwarded, m.dropped) +} + +func TestNotifies(t *testing.T) { + + m := Metrics{received: make(chan struct{})} + r := Router{queue: make(chan Packet, 10)} + r.Notify(&m) + + for i := 0; i < 10; i++ { + r.Receive(Packet{}) + <-m.received + if m.enqueued != (1 + i) { + t.Error("not notifying correctly", m.enqueued, 1+i) + } + + } + + for i := 0; i < 10; i++ { + r.Receive(Packet{}) + <-m.received + if m.enqueued != 10 { + t.Error("not notifying correctly", m.enqueued, 10) + } + if m.dropped != (1 + i) { + t.Error("not notifying correctly", m.dropped, 1+i) + } + } +} + +func TestStopsNotifying(t *testing.T) { + m := Metrics{received: make(chan struct{})} + r := Router{queue: make(chan Packet, 10)} + r.Notify(&m) + + for i := 0; i < 5; i++ { + r.Receive(Packet{}) + <-m.received + if m.enqueued != (1 + i) { + t.Error("not notifying correctly") + } + } + + r.StopNotify(&m) + + for i := 0; i < 5; i++ { + r.Receive(Packet{}) + select { + case <-m.received: + t.Error("did not stop notifying") + default: + } + if m.enqueued != 5 { + t.Error("did not stop notifying") + } + } +} + +func TestThreadsafe(t *testing.T) { + N := 1000 + r := Router{queue: make(chan Packet, 10)} + m1 := Metrics{received: make(chan struct{})} + m2 := Metrics{received: make(chan struct{})} + m3 := Metrics{received: make(chan struct{})} + r.Notify(&m1) + r.Notify(&m2) + r.Notify(&m3) + + var n int + var wg sync.WaitGroup + for i := 0; i < N; i++ { + n++ + wg.Add(1) + go func() { + defer wg.Done() + r.Receive(Packet{}) + }() + + if i%3 == 0 { + n++ + wg.Add(1) + go func() { + defer wg.Done() + r.Forward() + }() + } + } + + // drain queues + for i := 0; i < (n * 3); i++ { + select { + case <-m1.received: + case <-m2.received: + case <-m3.received: + } + } + + wg.Wait() + + // counts should be correct and all agree. and this should + // run fine under `go test -race -cpu=5` + + t.Log("m1", m1.String()) + t.Log("m2", m2.String()) + t.Log("m3", m3.String()) + + if m1.String() != m2.String() || m2.String() != m3.String() { + t.Error("counts disagree") + } +}