fix(bs/notifications) use SubOnceEach to provide uniqueness guarantee

License: MIT
Signed-off-by: Brian Tiger Chow <brian@perfmode.com>

vendor forked pubsub to get SubOnceEach

License: MIT
Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
Brian Tiger Chow 2014-11-23 22:46:11 -08:00 committed by Jeromy
parent 19de3041fb
commit fb5779661b
8 changed files with 85 additions and 37 deletions

8
Godeps/Godeps.json generated
View File

@ -136,6 +136,10 @@
"Comment": "v0.6.0-5-gf92b795",
"Rev": "f92b7950b372b1db80bd3527e4d40e42555fe6c2"
},
{
"ImportPath": "github.com/maybebtc/pubsub",
"Rev": "39ce5f556423a4c7223b370fa17a3bbd75b2d197"
},
{
"ImportPath": "github.com/mitchellh/go-homedir",
"Rev": "7d2d8c8a4e078ce3c58736ab521a40b37a504c52"
@ -144,10 +148,6 @@
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "99056d50e56252fbe0021d5c893defca5a76baf8"
},
{
"ImportPath": "github.com/tuxychandru/pubsub",
"Rev": "02de8aa2db3d570c5ab1be5ba67b456fd0fb7c4e"
},
{
"ImportPath": "gopkg.in/natefinch/lumberjack.v2",
"Comment": "v1.0-12-gd28785c",

View File

@ -0,0 +1,13 @@
{
"ImportPath": "github.com/maybebtc/pubsub",
"GoVersion": "go1.3.3",
"Packages": [
"./..."
],
"Deps": [
{
"ImportPath": "gopkg.in/check.v1",
"Rev": "64131543e7896d5bcc6bd5a76287eb75ea96c673"
}
]
}

View File

@ -0,0 +1,5 @@
This directory tree is generated automatically by godep.
Please do not edit.
See https://github.com/tools/godep for more information.

View File

@ -0,0 +1,2 @@
vendor:
godep save -r ./...

View File

@ -6,7 +6,7 @@ View the [API Documentation](http://godoc.org/github.com/tuxychandru/pubsub).
## License
Copyright (c) 2013, Chandra Sekar S
Copyright (c) 2013, Chandra Sekar S
All rights reserved.
Redistribution and use in source and binary forms, with or without

View File

@ -15,6 +15,7 @@ type operation int
const (
sub operation = iota
subOnce
subOnceEach
pub
unsub
unsubAll
@ -55,6 +56,12 @@ func (ps *PubSub) SubOnce(topics ...string) chan interface{} {
return ps.sub(subOnce, topics...)
}
// SubOnceEach returns a channel on which callers receive, at most, one message
// for each topic.
func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} {
return ps.sub(subOnceEach, topics...)
}
func (ps *PubSub) sub(op operation, topics ...string) chan interface{} {
ch := make(chan interface{}, ps.capacity)
ps.cmdChan <- cmd{op: op, topics: topics, ch: ch}
@ -66,6 +73,12 @@ func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) {
ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch}
}
// AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach
// behavior.
func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) {
ps.cmdChan <- cmd{op: subOnceEach, topics: topics, ch: ch}
}
// Pub publishes the given message to all subscribers of
// the specified topics.
func (ps *PubSub) Pub(msg interface{}, topics ...string) {
@ -98,7 +111,7 @@ func (ps *PubSub) Shutdown() {
func (ps *PubSub) start() {
reg := registry{
topics: make(map[string]map[chan interface{}]bool),
topics: make(map[string]map[chan interface{}]subtype),
revTopics: make(map[chan interface{}]map[string]bool),
}
@ -119,10 +132,13 @@ loop:
for _, topic := range cmd.topics {
switch cmd.op {
case sub:
reg.add(topic, cmd.ch, false)
reg.add(topic, cmd.ch, stNorm)
case subOnce:
reg.add(topic, cmd.ch, true)
reg.add(topic, cmd.ch, stOnceAny)
case subOnceEach:
reg.add(topic, cmd.ch, stOnceEach)
case pub:
reg.send(topic, cmd.msg)
@ -146,15 +162,23 @@ loop:
// registry maintains the current subscription state. It's not
// safe to access a registry from multiple goroutines simultaneously.
type registry struct {
topics map[string]map[chan interface{}]bool
topics map[string]map[chan interface{}]subtype
revTopics map[chan interface{}]map[string]bool
}
func (reg *registry) add(topic string, ch chan interface{}, once bool) {
type subtype int
const (
stOnceAny = iota
stOnceEach
stNorm
)
func (reg *registry) add(topic string, ch chan interface{}, st subtype) {
if reg.topics[topic] == nil {
reg.topics[topic] = make(map[chan interface{}]bool)
reg.topics[topic] = make(map[chan interface{}]subtype)
}
reg.topics[topic][ch] = once
reg.topics[topic][ch] = st
if reg.revTopics[ch] == nil {
reg.revTopics[ch] = make(map[string]bool)
@ -163,12 +187,15 @@ func (reg *registry) add(topic string, ch chan interface{}, once bool) {
}
func (reg *registry) send(topic string, msg interface{}) {
for ch, once := range reg.topics[topic] {
for ch, st := range reg.topics[topic] {
ch <- msg
if once {
switch st {
case stOnceAny:
for topic := range reg.revTopics[ch] {
reg.remove(topic, ch)
}
case stOnceEach:
reg.remove(topic, ch)
}
}
}

View File

@ -5,7 +5,7 @@
package pubsub
import (
check "launchpad.net/gocheck"
check "gopkg.in/check.v1"
"runtime"
"testing"
"time"
@ -181,6 +181,23 @@ func (s *Suite) TestMultiSubOnce(c *check.C) {
ps.Shutdown()
}
func (s *Suite) TestMultiSubOnceEach(c *check.C) {
ps := New(1)
ch := ps.SubOnceEach("t1", "t2")
ps.Pub("hi", "t1")
c.Check(<-ch, check.Equals, "hi")
ps.Pub("hi!", "t1") // ignored
ps.Pub("hello", "t2")
c.Check(<-ch, check.Equals, "hello")
_, ok := <-ch
c.Check(ok, check.Equals, false)
ps.Shutdown()
}
func (s *Suite) TestMultiPub(c *check.C) {
ps := New(1)
ch1 := ps.Sub("t1")

View File

@ -2,7 +2,7 @@ package notifications
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub"
pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/maybebtc/pubsub"
blocks "github.com/jbenet/go-ipfs/blocks"
u "github.com/jbenet/go-ipfs/util"
@ -39,20 +39,16 @@ func (ps *impl) Shutdown() {
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
blocksCh := make(chan *blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys))
ps.wrapped.AddSub(valuesCh, toStrings(keys)...)
if len(keys) == 0 {
close(blocksCh)
return blocksCh
}
valuesCh := ps.wrapped.SubOnceEach(toStrings(keys)...)
go func() {
defer func() {
ps.wrapped.Unsub(valuesCh, toStrings(keys)...)
close(blocksCh)
}()
seen := make(map[u.Key]struct{})
i := 0 // req'd because it only counts unique block sends
for {
if i >= len(keys) {
return
}
select {
case <-ctx.Done():
return
@ -64,22 +60,10 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Blo
if !ok {
return
}
if _, ok := seen[block.Key()]; ok {
continue
}
select {
case <-ctx.Done():
return
case blocksCh <- block: // continue
// Unsub alone is insufficient for keeping out duplicates.
// It's a race to unsubscribe before pubsub handles the
// next Publish call. Therefore, must also check for
// duplicates manually. Unsub is a performance
// consideration to avoid lots of unnecessary channel
// chatter.
ps.wrapped.Unsub(valuesCh, string(block.Key()))
i++
seen[block.Key()] = struct{}{}
}
}
}