diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index adf44d88d..40f11a455 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -9,10 +9,6 @@ "ImportPath": "bazil.org/fuse", "Rev": "e4fcc9a2c7567d1c42861deebeb483315d222262" }, - { - "ImportPath": "github.com/briantigerchow/pubsub", - "Rev": "39ce5f556423a4c7223b370fa17a3bbd75b2d197" - }, { "ImportPath": "github.com/camlistore/lock", "Rev": "ae27720f340952636b826119b58130b9c1a847a0" diff --git a/Godeps/_workspace/src/github.com/briantigerchow/pubsub/README.md b/Godeps/_workspace/src/github.com/briantigerchow/pubsub/README.md deleted file mode 100644 index c1aab80b5..000000000 --- a/Godeps/_workspace/src/github.com/briantigerchow/pubsub/README.md +++ /dev/null @@ -1,30 +0,0 @@ -Install pubsub with, - - go get github.com/tuxychandru/pubsub - -View the [API Documentation](http://godoc.org/github.com/tuxychandru/pubsub). - -## License - -Copyright (c) 2013, Chandra Sekar S -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR -ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub.go b/Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub.go deleted file mode 100644 index f42587d07..000000000 --- a/Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub.go +++ /dev/null @@ -1,235 +0,0 @@ -// Copyright 2013, Chandra Sekar S. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the README.md file. - -// Package pubsub implements a simple multi-topic pub-sub -// library. -// -// Topics must be strings and messages of any type can be -// published. A topic can have any number of subcribers and -// all of them receive messages published on the topic. -package pubsub - -type operation int - -const ( - sub operation = iota - subOnce - subOnceEach - pub - unsub - unsubAll - closeTopic - shutdown -) - -// PubSub is a collection of topics. -type PubSub struct { - cmdChan chan cmd - capacity int -} - -type cmd struct { - op operation - topics []string - ch chan interface{} - msg interface{} -} - -// New creates a new PubSub and starts a goroutine for handling operations. -// The capacity of the channels created by Sub and SubOnce will be as specified. -func New(capacity int) *PubSub { - ps := &PubSub{make(chan cmd), capacity} - go ps.start() - return ps -} - -// Sub returns a channel on which messages published on any of -// the specified topics can be received. -func (ps *PubSub) Sub(topics ...string) chan interface{} { - return ps.sub(sub, topics...) -} - -// SubOnce is similar to Sub, but only the first message published, after subscription, -// on any of the specified topics can be received. -func (ps *PubSub) SubOnce(topics ...string) chan interface{} { - return ps.sub(subOnce, topics...) -} - -// SubOnceEach returns a channel on which callers receive, at most, one message -// for each topic. -func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} { - return ps.sub(subOnceEach, topics...) -} - -func (ps *PubSub) sub(op operation, topics ...string) chan interface{} { - ch := make(chan interface{}, ps.capacity) - ps.cmdChan <- cmd{op: op, topics: topics, ch: ch} - return ch -} - -// AddSub adds subscriptions to an existing channel. -func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) { - ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch} -} - -// AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach -// behavior. -func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) { - ps.cmdChan <- cmd{op: subOnceEach, topics: topics, ch: ch} -} - -// Pub publishes the given message to all subscribers of -// the specified topics. -func (ps *PubSub) Pub(msg interface{}, topics ...string) { - ps.cmdChan <- cmd{op: pub, topics: topics, msg: msg} -} - -// Unsub unsubscribes the given channel from the specified -// topics. If no topic is specified, it is unsubscribed -// from all topics. -func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) { - if len(topics) == 0 { - ps.cmdChan <- cmd{op: unsubAll, ch: ch} - return - } - - ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch} -} - -// Close closes all channels currently subscribed to the specified topics. -// If a channel is subscribed to multiple topics, some of which is -// not specified, it is not closed. -func (ps *PubSub) Close(topics ...string) { - ps.cmdChan <- cmd{op: closeTopic, topics: topics} -} - -// Shutdown closes all subscribed channels and terminates the goroutine. -func (ps *PubSub) Shutdown() { - ps.cmdChan <- cmd{op: shutdown} -} - -func (ps *PubSub) start() { - reg := registry{ - topics: make(map[string]map[chan interface{}]subtype), - revTopics: make(map[chan interface{}]map[string]bool), - } - -loop: - for cmd := range ps.cmdChan { - if cmd.topics == nil { - switch cmd.op { - case unsubAll: - reg.removeChannel(cmd.ch) - - case shutdown: - break loop - } - - continue loop - } - - for _, topic := range cmd.topics { - switch cmd.op { - case sub: - reg.add(topic, cmd.ch, stNorm) - - case subOnce: - reg.add(topic, cmd.ch, stOnceAny) - - case subOnceEach: - reg.add(topic, cmd.ch, stOnceEach) - - case pub: - reg.send(topic, cmd.msg) - - case unsub: - reg.remove(topic, cmd.ch) - - case closeTopic: - reg.removeTopic(topic) - } - } - } - - for topic, chans := range reg.topics { - for ch, _ := range chans { - reg.remove(topic, ch) - } - } -} - -// registry maintains the current subscription state. It's not -// safe to access a registry from multiple goroutines simultaneously. -type registry struct { - topics map[string]map[chan interface{}]subtype - revTopics map[chan interface{}]map[string]bool -} - -type subtype int - -const ( - stOnceAny = iota - stOnceEach - stNorm -) - -func (reg *registry) add(topic string, ch chan interface{}, st subtype) { - if reg.topics[topic] == nil { - reg.topics[topic] = make(map[chan interface{}]subtype) - } - reg.topics[topic][ch] = st - - if reg.revTopics[ch] == nil { - reg.revTopics[ch] = make(map[string]bool) - } - reg.revTopics[ch][topic] = true -} - -func (reg *registry) send(topic string, msg interface{}) { - for ch, st := range reg.topics[topic] { - ch <- msg - switch st { - case stOnceAny: - for topic := range reg.revTopics[ch] { - reg.remove(topic, ch) - } - case stOnceEach: - reg.remove(topic, ch) - } - } -} - -func (reg *registry) removeTopic(topic string) { - for ch := range reg.topics[topic] { - reg.remove(topic, ch) - } -} - -func (reg *registry) removeChannel(ch chan interface{}) { - for topic := range reg.revTopics[ch] { - reg.remove(topic, ch) - } -} - -func (reg *registry) remove(topic string, ch chan interface{}) { - if _, ok := reg.topics[topic]; !ok { - return - } - - if _, ok := reg.topics[topic][ch]; !ok { - return - } - - delete(reg.topics[topic], ch) - delete(reg.revTopics[ch], topic) - - if len(reg.topics[topic]) == 0 { - delete(reg.topics, topic) - } - - if len(reg.revTopics[ch]) == 0 { - close(ch) - delete(reg.revTopics, ch) - } -} diff --git a/Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub_test.go b/Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub_test.go deleted file mode 100644 index f4a80ab62..000000000 --- a/Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub_test.go +++ /dev/null @@ -1,247 +0,0 @@ -// Copyright 2013, Chandra Sekar S. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the README.md file. - -package pubsub - -import ( - check "gopkg.in/check.v1" - "runtime" - "testing" - "time" -) - -var _ = check.Suite(new(Suite)) - -func Test(t *testing.T) { - check.TestingT(t) -} - -type Suite struct{} - -func (s *Suite) TestSub(c *check.C) { - ps := New(1) - ch1 := ps.Sub("t1") - ch2 := ps.Sub("t1") - ch3 := ps.Sub("t2") - - ps.Pub("hi", "t1") - c.Check(<-ch1, check.Equals, "hi") - c.Check(<-ch2, check.Equals, "hi") - - ps.Pub("hello", "t2") - c.Check(<-ch3, check.Equals, "hello") - - ps.Shutdown() - _, ok := <-ch1 - c.Check(ok, check.Equals, false) - _, ok = <-ch2 - c.Check(ok, check.Equals, false) - _, ok = <-ch3 - c.Check(ok, check.Equals, false) -} - -func (s *Suite) TestSubOnce(c *check.C) { - ps := New(1) - ch := ps.SubOnce("t1") - - ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") - - _, ok := <-ch - c.Check(ok, check.Equals, false) - ps.Shutdown() -} - -func (s *Suite) TestAddSub(c *check.C) { - ps := New(1) - ch1 := ps.Sub("t1") - ch2 := ps.Sub("t2") - - ps.Pub("hi1", "t1") - c.Check(<-ch1, check.Equals, "hi1") - - ps.Pub("hi2", "t2") - c.Check(<-ch2, check.Equals, "hi2") - - ps.AddSub(ch1, "t2", "t3") - ps.Pub("hi3", "t2") - c.Check(<-ch1, check.Equals, "hi3") - c.Check(<-ch2, check.Equals, "hi3") - - ps.Pub("hi4", "t3") - c.Check(<-ch1, check.Equals, "hi4") - - ps.Shutdown() -} - -func (s *Suite) TestUnsub(c *check.C) { - ps := New(1) - ch := ps.Sub("t1") - - ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") - - ps.Unsub(ch, "t1") - _, ok := <-ch - c.Check(ok, check.Equals, false) - ps.Shutdown() -} - -func (s *Suite) TestUnsubAll(c *check.C) { - ps := New(1) - ch1 := ps.Sub("t1", "t2", "t3") - ch2 := ps.Sub("t1", "t3") - - ps.Unsub(ch1) - - m, ok := <-ch1 - c.Check(ok, check.Equals, false) - - ps.Pub("hi", "t1") - m, ok = <-ch2 - c.Check(m, check.Equals, "hi") - - ps.Shutdown() -} - -func (s *Suite) TestClose(c *check.C) { - ps := New(1) - ch1 := ps.Sub("t1") - ch2 := ps.Sub("t1") - ch3 := ps.Sub("t2") - ch4 := ps.Sub("t3") - - ps.Pub("hi", "t1") - ps.Pub("hello", "t2") - c.Check(<-ch1, check.Equals, "hi") - c.Check(<-ch2, check.Equals, "hi") - c.Check(<-ch3, check.Equals, "hello") - - ps.Close("t1", "t2") - _, ok := <-ch1 - c.Check(ok, check.Equals, false) - _, ok = <-ch2 - c.Check(ok, check.Equals, false) - _, ok = <-ch3 - c.Check(ok, check.Equals, false) - - ps.Pub("welcome", "t3") - c.Check(<-ch4, check.Equals, "welcome") - - ps.Shutdown() -} - -func (s *Suite) TestUnsubAfterClose(c *check.C) { - ps := New(1) - ch := ps.Sub("t1") - defer func() { - ps.Unsub(ch, "t1") - ps.Shutdown() - }() - - ps.Close("t1") - _, ok := <-ch - c.Check(ok, check.Equals, false) -} - -func (s *Suite) TestShutdown(c *check.C) { - start := runtime.NumGoroutine() - New(10).Shutdown() - time.Sleep(1) - c.Check(runtime.NumGoroutine()-start, check.Equals, 1) -} - -func (s *Suite) TestMultiSub(c *check.C) { - ps := New(1) - ch := ps.Sub("t1", "t2") - - ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") - - ps.Pub("hello", "t2") - c.Check(<-ch, check.Equals, "hello") - - ps.Shutdown() - _, ok := <-ch - c.Check(ok, check.Equals, false) -} - -func (s *Suite) TestMultiSubOnce(c *check.C) { - ps := New(1) - ch := ps.SubOnce("t1", "t2") - - ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") - - ps.Pub("hello", "t2") - - _, ok := <-ch - c.Check(ok, check.Equals, false) - ps.Shutdown() -} - -func (s *Suite) TestMultiSubOnceEach(c *check.C) { - ps := New(1) - ch := ps.SubOnceEach("t1", "t2") - - ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") - - ps.Pub("hi!", "t1") // ignored - - ps.Pub("hello", "t2") - c.Check(<-ch, check.Equals, "hello") - - _, ok := <-ch - c.Check(ok, check.Equals, false) - ps.Shutdown() -} - -func (s *Suite) TestMultiPub(c *check.C) { - ps := New(1) - ch1 := ps.Sub("t1") - ch2 := ps.Sub("t2") - - ps.Pub("hi", "t1", "t2") - c.Check(<-ch1, check.Equals, "hi") - c.Check(<-ch2, check.Equals, "hi") - - ps.Shutdown() -} - -func (s *Suite) TestMultiUnsub(c *check.C) { - ps := New(1) - ch := ps.Sub("t1", "t2", "t3") - - ps.Unsub(ch, "t1") - - ps.Pub("hi", "t1") - - ps.Pub("hello", "t2") - c.Check(<-ch, check.Equals, "hello") - - ps.Unsub(ch, "t2", "t3") - _, ok := <-ch - c.Check(ok, check.Equals, false) - - ps.Shutdown() -} - -func (s *Suite) TestMultiClose(c *check.C) { - ps := New(1) - ch := ps.Sub("t1", "t2") - - ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") - - ps.Close("t1") - ps.Pub("hello", "t2") - c.Check(<-ch, check.Equals, "hello") - - ps.Close("t2") - _, ok := <-ch - c.Check(ok, check.Equals, false) - - ps.Shutdown() -} diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index 9a6f10b52..be0f11c5a 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -4,10 +4,9 @@ import ( "context" "sync" - blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" - - pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" + pubsub "gx/ipfs/QmdbxjQWogRCHRaxhhGnYdT1oQJzL9GdqSKzCdqWr85AP2/pubsub" + blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" ) const bufferSize = 16 diff --git a/package.json b/package.json index e49388793..da6683909 100644 --- a/package.json +++ b/package.json @@ -563,6 +563,12 @@ "hash": "QmTVDM4LCSUMFNQzbDLL9zQwp8usE6QHymFdh3h8vL9v6b", "name": "go-ipfs-blockstore", "version": "0.0.1" + }, + { + "author": "why", + "hash": "QmdbxjQWogRCHRaxhhGnYdT1oQJzL9GdqSKzCdqWr85AP2", + "name": "pubsub", + "version": "1.0.0" } ], "gxVersion": "0.10.0",