From fb5779661b40d2d2ac66e65feb36c7f606dce899 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Sun, 23 Nov 2014 22:46:11 -0800 Subject: [PATCH] fix(bs/notifications) use SubOnceEach to provide uniqueness guarantee License: MIT Signed-off-by: Brian Tiger Chow vendor forked pubsub to get SubOnceEach License: MIT Signed-off-by: Brian Tiger Chow --- Godeps/Godeps.json | 8 ++-- .../maybebtc/pubsub/Godeps/Godeps.json | 13 ++++++ .../github.com/maybebtc/pubsub/Godeps/Readme | 5 +++ .../src/github.com/maybebtc/pubsub/Makefile | 2 + .../pubsub/README.md | 2 +- .../pubsub/pubsub.go | 45 +++++++++++++++---- .../pubsub/pubsub_test.go | 19 +++++++- .../bitswap/notifications/notifications.go | 28 +++--------- 8 files changed, 85 insertions(+), 37 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Godeps.json create mode 100644 Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Readme create mode 100644 Godeps/_workspace/src/github.com/maybebtc/pubsub/Makefile rename Godeps/_workspace/src/github.com/{tuxychandru => maybebtc}/pubsub/README.md (97%) rename Godeps/_workspace/src/github.com/{tuxychandru => maybebtc}/pubsub/pubsub.go (81%) rename Godeps/_workspace/src/github.com/{tuxychandru => maybebtc}/pubsub/pubsub_test.go (92%) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index b37ae027b..f487713e7 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -136,6 +136,10 @@ "Comment": "v0.6.0-5-gf92b795", "Rev": "f92b7950b372b1db80bd3527e4d40e42555fe6c2" }, + { + "ImportPath": "github.com/maybebtc/pubsub", + "Rev": "39ce5f556423a4c7223b370fa17a3bbd75b2d197" + }, { "ImportPath": "github.com/mitchellh/go-homedir", "Rev": "7d2d8c8a4e078ce3c58736ab521a40b37a504c52" @@ -144,10 +148,6 @@ "ImportPath": "github.com/syndtr/goleveldb/leveldb", "Rev": "99056d50e56252fbe0021d5c893defca5a76baf8" }, - { - "ImportPath": "github.com/tuxychandru/pubsub", - "Rev": "02de8aa2db3d570c5ab1be5ba67b456fd0fb7c4e" - }, { "ImportPath": "gopkg.in/natefinch/lumberjack.v2", "Comment": "v1.0-12-gd28785c", diff --git a/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Godeps.json new file mode 100644 index 000000000..3a0925c00 --- /dev/null +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Godeps.json @@ -0,0 +1,13 @@ +{ + "ImportPath": "github.com/maybebtc/pubsub", + "GoVersion": "go1.3.3", + "Packages": [ + "./..." + ], + "Deps": [ + { + "ImportPath": "gopkg.in/check.v1", + "Rev": "64131543e7896d5bcc6bd5a76287eb75ea96c673" + } + ] +} diff --git a/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Readme b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Readme new file mode 100644 index 000000000..4cdaa53d5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Godeps/Readme @@ -0,0 +1,5 @@ +This directory tree is generated automatically by godep. + +Please do not edit. + +See https://github.com/tools/godep for more information. diff --git a/Godeps/_workspace/src/github.com/maybebtc/pubsub/Makefile b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Makefile new file mode 100644 index 000000000..ed3992384 --- /dev/null +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/Makefile @@ -0,0 +1,2 @@ +vendor: + godep save -r ./... diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md b/Godeps/_workspace/src/github.com/maybebtc/pubsub/README.md similarity index 97% rename from Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md rename to Godeps/_workspace/src/github.com/maybebtc/pubsub/README.md index c1aab80b5..e176dab68 100644 --- a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/README.md +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/README.md @@ -6,7 +6,7 @@ View the [API Documentation](http://godoc.org/github.com/tuxychandru/pubsub). ## License -Copyright (c) 2013, Chandra Sekar S +Copyright (c) 2013, Chandra Sekar S All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go b/Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub.go similarity index 81% rename from Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go rename to Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub.go index 9cbf9cffa..f42587d07 100644 --- a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub.go +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub.go @@ -15,6 +15,7 @@ type operation int const ( sub operation = iota subOnce + subOnceEach pub unsub unsubAll @@ -55,6 +56,12 @@ func (ps *PubSub) SubOnce(topics ...string) chan interface{} { return ps.sub(subOnce, topics...) } +// SubOnceEach returns a channel on which callers receive, at most, one message +// for each topic. +func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} { + return ps.sub(subOnceEach, topics...) +} + func (ps *PubSub) sub(op operation, topics ...string) chan interface{} { ch := make(chan interface{}, ps.capacity) ps.cmdChan <- cmd{op: op, topics: topics, ch: ch} @@ -66,6 +73,12 @@ func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) { ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch} } +// AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach +// behavior. +func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) { + ps.cmdChan <- cmd{op: subOnceEach, topics: topics, ch: ch} +} + // Pub publishes the given message to all subscribers of // the specified topics. func (ps *PubSub) Pub(msg interface{}, topics ...string) { @@ -98,7 +111,7 @@ func (ps *PubSub) Shutdown() { func (ps *PubSub) start() { reg := registry{ - topics: make(map[string]map[chan interface{}]bool), + topics: make(map[string]map[chan interface{}]subtype), revTopics: make(map[chan interface{}]map[string]bool), } @@ -119,10 +132,13 @@ loop: for _, topic := range cmd.topics { switch cmd.op { case sub: - reg.add(topic, cmd.ch, false) + reg.add(topic, cmd.ch, stNorm) case subOnce: - reg.add(topic, cmd.ch, true) + reg.add(topic, cmd.ch, stOnceAny) + + case subOnceEach: + reg.add(topic, cmd.ch, stOnceEach) case pub: reg.send(topic, cmd.msg) @@ -146,15 +162,23 @@ loop: // registry maintains the current subscription state. It's not // safe to access a registry from multiple goroutines simultaneously. type registry struct { - topics map[string]map[chan interface{}]bool + topics map[string]map[chan interface{}]subtype revTopics map[chan interface{}]map[string]bool } -func (reg *registry) add(topic string, ch chan interface{}, once bool) { +type subtype int + +const ( + stOnceAny = iota + stOnceEach + stNorm +) + +func (reg *registry) add(topic string, ch chan interface{}, st subtype) { if reg.topics[topic] == nil { - reg.topics[topic] = make(map[chan interface{}]bool) + reg.topics[topic] = make(map[chan interface{}]subtype) } - reg.topics[topic][ch] = once + reg.topics[topic][ch] = st if reg.revTopics[ch] == nil { reg.revTopics[ch] = make(map[string]bool) @@ -163,12 +187,15 @@ func (reg *registry) add(topic string, ch chan interface{}, once bool) { } func (reg *registry) send(topic string, msg interface{}) { - for ch, once := range reg.topics[topic] { + for ch, st := range reg.topics[topic] { ch <- msg - if once { + switch st { + case stOnceAny: for topic := range reg.revTopics[ch] { reg.remove(topic, ch) } + case stOnceEach: + reg.remove(topic, ch) } } } diff --git a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go b/Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub_test.go similarity index 92% rename from Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go rename to Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub_test.go index 16392d33b..f4a80ab62 100644 --- a/Godeps/_workspace/src/github.com/tuxychandru/pubsub/pubsub_test.go +++ b/Godeps/_workspace/src/github.com/maybebtc/pubsub/pubsub_test.go @@ -5,7 +5,7 @@ package pubsub import ( - check "launchpad.net/gocheck" + check "gopkg.in/check.v1" "runtime" "testing" "time" @@ -181,6 +181,23 @@ func (s *Suite) TestMultiSubOnce(c *check.C) { ps.Shutdown() } +func (s *Suite) TestMultiSubOnceEach(c *check.C) { + ps := New(1) + ch := ps.SubOnceEach("t1", "t2") + + ps.Pub("hi", "t1") + c.Check(<-ch, check.Equals, "hi") + + ps.Pub("hi!", "t1") // ignored + + ps.Pub("hello", "t2") + c.Check(<-ch, check.Equals, "hello") + + _, ok := <-ch + c.Check(ok, check.Equals, false) + ps.Shutdown() +} + func (s *Suite) TestMultiPub(c *check.C) { ps := New(1) ch1 := ps.Sub("t1") diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index 20a0f623d..e9aac629c 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -2,7 +2,7 @@ package notifications import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub" + pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/maybebtc/pubsub" blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" @@ -39,20 +39,16 @@ func (ps *impl) Shutdown() { func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block { blocksCh := make(chan *blocks.Block, len(keys)) - valuesCh := make(chan interface{}, len(keys)) - ps.wrapped.AddSub(valuesCh, toStrings(keys)...) - + if len(keys) == 0 { + close(blocksCh) + return blocksCh + } + valuesCh := ps.wrapped.SubOnceEach(toStrings(keys)...) go func() { defer func() { - ps.wrapped.Unsub(valuesCh, toStrings(keys)...) close(blocksCh) }() - seen := make(map[u.Key]struct{}) - i := 0 // req'd because it only counts unique block sends for { - if i >= len(keys) { - return - } select { case <-ctx.Done(): return @@ -64,22 +60,10 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Blo if !ok { return } - if _, ok := seen[block.Key()]; ok { - continue - } select { case <-ctx.Done(): return case blocksCh <- block: // continue - // Unsub alone is insufficient for keeping out duplicates. - // It's a race to unsubscribe before pubsub handles the - // next Publish call. Therefore, must also check for - // duplicates manually. Unsub is a performance - // consideration to avoid lots of unnecessary channel - // chatter. - ps.wrapped.Unsub(valuesCh, string(block.Key())) - i++ - seen[block.Key()] = struct{}{} } } }