mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
Point briantigerchow/pubsub GoDep'ed module to the gx'ed version
This removes briantigerchow/pubsub from Godeps and uses our gx'ed version instead. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
476ad38ecc
commit
6950d0688e
4
Godeps/Godeps.json
generated
4
Godeps/Godeps.json
generated
@ -9,10 +9,6 @@
|
||||
"ImportPath": "bazil.org/fuse",
|
||||
"Rev": "e4fcc9a2c7567d1c42861deebeb483315d222262"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/briantigerchow/pubsub",
|
||||
"Rev": "39ce5f556423a4c7223b370fa17a3bbd75b2d197"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/camlistore/lock",
|
||||
"Rev": "ae27720f340952636b826119b58130b9c1a847a0"
|
||||
|
||||
30
Godeps/_workspace/src/github.com/briantigerchow/pubsub/README.md
generated
vendored
30
Godeps/_workspace/src/github.com/briantigerchow/pubsub/README.md
generated
vendored
@ -1,30 +0,0 @@
|
||||
Install pubsub with,
|
||||
|
||||
go get github.com/tuxychandru/pubsub
|
||||
|
||||
View the [API Documentation](http://godoc.org/github.com/tuxychandru/pubsub).
|
||||
|
||||
## License
|
||||
|
||||
Copyright (c) 2013, Chandra Sekar S
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
1. Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
|
||||
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
235
Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub.go
generated
vendored
235
Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub.go
generated
vendored
@ -1,235 +0,0 @@
|
||||
// Copyright 2013, Chandra Sekar S. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the README.md file.
|
||||
|
||||
// Package pubsub implements a simple multi-topic pub-sub
|
||||
// library.
|
||||
//
|
||||
// Topics must be strings and messages of any type can be
|
||||
// published. A topic can have any number of subcribers and
|
||||
// all of them receive messages published on the topic.
|
||||
package pubsub
|
||||
|
||||
type operation int
|
||||
|
||||
const (
|
||||
sub operation = iota
|
||||
subOnce
|
||||
subOnceEach
|
||||
pub
|
||||
unsub
|
||||
unsubAll
|
||||
closeTopic
|
||||
shutdown
|
||||
)
|
||||
|
||||
// PubSub is a collection of topics.
|
||||
type PubSub struct {
|
||||
cmdChan chan cmd
|
||||
capacity int
|
||||
}
|
||||
|
||||
type cmd struct {
|
||||
op operation
|
||||
topics []string
|
||||
ch chan interface{}
|
||||
msg interface{}
|
||||
}
|
||||
|
||||
// New creates a new PubSub and starts a goroutine for handling operations.
|
||||
// The capacity of the channels created by Sub and SubOnce will be as specified.
|
||||
func New(capacity int) *PubSub {
|
||||
ps := &PubSub{make(chan cmd), capacity}
|
||||
go ps.start()
|
||||
return ps
|
||||
}
|
||||
|
||||
// Sub returns a channel on which messages published on any of
|
||||
// the specified topics can be received.
|
||||
func (ps *PubSub) Sub(topics ...string) chan interface{} {
|
||||
return ps.sub(sub, topics...)
|
||||
}
|
||||
|
||||
// SubOnce is similar to Sub, but only the first message published, after subscription,
|
||||
// on any of the specified topics can be received.
|
||||
func (ps *PubSub) SubOnce(topics ...string) chan interface{} {
|
||||
return ps.sub(subOnce, topics...)
|
||||
}
|
||||
|
||||
// SubOnceEach returns a channel on which callers receive, at most, one message
|
||||
// for each topic.
|
||||
func (ps *PubSub) SubOnceEach(topics ...string) chan interface{} {
|
||||
return ps.sub(subOnceEach, topics...)
|
||||
}
|
||||
|
||||
func (ps *PubSub) sub(op operation, topics ...string) chan interface{} {
|
||||
ch := make(chan interface{}, ps.capacity)
|
||||
ps.cmdChan <- cmd{op: op, topics: topics, ch: ch}
|
||||
return ch
|
||||
}
|
||||
|
||||
// AddSub adds subscriptions to an existing channel.
|
||||
func (ps *PubSub) AddSub(ch chan interface{}, topics ...string) {
|
||||
ps.cmdChan <- cmd{op: sub, topics: topics, ch: ch}
|
||||
}
|
||||
|
||||
// AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach
|
||||
// behavior.
|
||||
func (ps *PubSub) AddSubOnceEach(ch chan interface{}, topics ...string) {
|
||||
ps.cmdChan <- cmd{op: subOnceEach, topics: topics, ch: ch}
|
||||
}
|
||||
|
||||
// Pub publishes the given message to all subscribers of
|
||||
// the specified topics.
|
||||
func (ps *PubSub) Pub(msg interface{}, topics ...string) {
|
||||
ps.cmdChan <- cmd{op: pub, topics: topics, msg: msg}
|
||||
}
|
||||
|
||||
// Unsub unsubscribes the given channel from the specified
|
||||
// topics. If no topic is specified, it is unsubscribed
|
||||
// from all topics.
|
||||
func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) {
|
||||
if len(topics) == 0 {
|
||||
ps.cmdChan <- cmd{op: unsubAll, ch: ch}
|
||||
return
|
||||
}
|
||||
|
||||
ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch}
|
||||
}
|
||||
|
||||
// Close closes all channels currently subscribed to the specified topics.
|
||||
// If a channel is subscribed to multiple topics, some of which is
|
||||
// not specified, it is not closed.
|
||||
func (ps *PubSub) Close(topics ...string) {
|
||||
ps.cmdChan <- cmd{op: closeTopic, topics: topics}
|
||||
}
|
||||
|
||||
// Shutdown closes all subscribed channels and terminates the goroutine.
|
||||
func (ps *PubSub) Shutdown() {
|
||||
ps.cmdChan <- cmd{op: shutdown}
|
||||
}
|
||||
|
||||
func (ps *PubSub) start() {
|
||||
reg := registry{
|
||||
topics: make(map[string]map[chan interface{}]subtype),
|
||||
revTopics: make(map[chan interface{}]map[string]bool),
|
||||
}
|
||||
|
||||
loop:
|
||||
for cmd := range ps.cmdChan {
|
||||
if cmd.topics == nil {
|
||||
switch cmd.op {
|
||||
case unsubAll:
|
||||
reg.removeChannel(cmd.ch)
|
||||
|
||||
case shutdown:
|
||||
break loop
|
||||
}
|
||||
|
||||
continue loop
|
||||
}
|
||||
|
||||
for _, topic := range cmd.topics {
|
||||
switch cmd.op {
|
||||
case sub:
|
||||
reg.add(topic, cmd.ch, stNorm)
|
||||
|
||||
case subOnce:
|
||||
reg.add(topic, cmd.ch, stOnceAny)
|
||||
|
||||
case subOnceEach:
|
||||
reg.add(topic, cmd.ch, stOnceEach)
|
||||
|
||||
case pub:
|
||||
reg.send(topic, cmd.msg)
|
||||
|
||||
case unsub:
|
||||
reg.remove(topic, cmd.ch)
|
||||
|
||||
case closeTopic:
|
||||
reg.removeTopic(topic)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for topic, chans := range reg.topics {
|
||||
for ch, _ := range chans {
|
||||
reg.remove(topic, ch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// registry maintains the current subscription state. It's not
|
||||
// safe to access a registry from multiple goroutines simultaneously.
|
||||
type registry struct {
|
||||
topics map[string]map[chan interface{}]subtype
|
||||
revTopics map[chan interface{}]map[string]bool
|
||||
}
|
||||
|
||||
type subtype int
|
||||
|
||||
const (
|
||||
stOnceAny = iota
|
||||
stOnceEach
|
||||
stNorm
|
||||
)
|
||||
|
||||
func (reg *registry) add(topic string, ch chan interface{}, st subtype) {
|
||||
if reg.topics[topic] == nil {
|
||||
reg.topics[topic] = make(map[chan interface{}]subtype)
|
||||
}
|
||||
reg.topics[topic][ch] = st
|
||||
|
||||
if reg.revTopics[ch] == nil {
|
||||
reg.revTopics[ch] = make(map[string]bool)
|
||||
}
|
||||
reg.revTopics[ch][topic] = true
|
||||
}
|
||||
|
||||
func (reg *registry) send(topic string, msg interface{}) {
|
||||
for ch, st := range reg.topics[topic] {
|
||||
ch <- msg
|
||||
switch st {
|
||||
case stOnceAny:
|
||||
for topic := range reg.revTopics[ch] {
|
||||
reg.remove(topic, ch)
|
||||
}
|
||||
case stOnceEach:
|
||||
reg.remove(topic, ch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (reg *registry) removeTopic(topic string) {
|
||||
for ch := range reg.topics[topic] {
|
||||
reg.remove(topic, ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (reg *registry) removeChannel(ch chan interface{}) {
|
||||
for topic := range reg.revTopics[ch] {
|
||||
reg.remove(topic, ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (reg *registry) remove(topic string, ch chan interface{}) {
|
||||
if _, ok := reg.topics[topic]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := reg.topics[topic][ch]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
delete(reg.topics[topic], ch)
|
||||
delete(reg.revTopics[ch], topic)
|
||||
|
||||
if len(reg.topics[topic]) == 0 {
|
||||
delete(reg.topics, topic)
|
||||
}
|
||||
|
||||
if len(reg.revTopics[ch]) == 0 {
|
||||
close(ch)
|
||||
delete(reg.revTopics, ch)
|
||||
}
|
||||
}
|
||||
247
Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub_test.go
generated
vendored
247
Godeps/_workspace/src/github.com/briantigerchow/pubsub/pubsub_test.go
generated
vendored
@ -1,247 +0,0 @@
|
||||
// Copyright 2013, Chandra Sekar S. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the README.md file.
|
||||
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
check "gopkg.in/check.v1"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var _ = check.Suite(new(Suite))
|
||||
|
||||
func Test(t *testing.T) {
|
||||
check.TestingT(t)
|
||||
}
|
||||
|
||||
type Suite struct{}
|
||||
|
||||
func (s *Suite) TestSub(c *check.C) {
|
||||
ps := New(1)
|
||||
ch1 := ps.Sub("t1")
|
||||
ch2 := ps.Sub("t1")
|
||||
ch3 := ps.Sub("t2")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
c.Check(<-ch1, check.Equals, "hi")
|
||||
c.Check(<-ch2, check.Equals, "hi")
|
||||
|
||||
ps.Pub("hello", "t2")
|
||||
c.Check(<-ch3, check.Equals, "hello")
|
||||
|
||||
ps.Shutdown()
|
||||
_, ok := <-ch1
|
||||
c.Check(ok, check.Equals, false)
|
||||
_, ok = <-ch2
|
||||
c.Check(ok, check.Equals, false)
|
||||
_, ok = <-ch3
|
||||
c.Check(ok, check.Equals, false)
|
||||
}
|
||||
|
||||
func (s *Suite) TestSubOnce(c *check.C) {
|
||||
ps := New(1)
|
||||
ch := ps.SubOnce("t1")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
c.Check(<-ch, check.Equals, "hi")
|
||||
|
||||
_, ok := <-ch
|
||||
c.Check(ok, check.Equals, false)
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestAddSub(c *check.C) {
|
||||
ps := New(1)
|
||||
ch1 := ps.Sub("t1")
|
||||
ch2 := ps.Sub("t2")
|
||||
|
||||
ps.Pub("hi1", "t1")
|
||||
c.Check(<-ch1, check.Equals, "hi1")
|
||||
|
||||
ps.Pub("hi2", "t2")
|
||||
c.Check(<-ch2, check.Equals, "hi2")
|
||||
|
||||
ps.AddSub(ch1, "t2", "t3")
|
||||
ps.Pub("hi3", "t2")
|
||||
c.Check(<-ch1, check.Equals, "hi3")
|
||||
c.Check(<-ch2, check.Equals, "hi3")
|
||||
|
||||
ps.Pub("hi4", "t3")
|
||||
c.Check(<-ch1, check.Equals, "hi4")
|
||||
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestUnsub(c *check.C) {
|
||||
ps := New(1)
|
||||
ch := ps.Sub("t1")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
c.Check(<-ch, check.Equals, "hi")
|
||||
|
||||
ps.Unsub(ch, "t1")
|
||||
_, ok := <-ch
|
||||
c.Check(ok, check.Equals, false)
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestUnsubAll(c *check.C) {
|
||||
ps := New(1)
|
||||
ch1 := ps.Sub("t1", "t2", "t3")
|
||||
ch2 := ps.Sub("t1", "t3")
|
||||
|
||||
ps.Unsub(ch1)
|
||||
|
||||
m, ok := <-ch1
|
||||
c.Check(ok, check.Equals, false)
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
m, ok = <-ch2
|
||||
c.Check(m, check.Equals, "hi")
|
||||
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestClose(c *check.C) {
|
||||
ps := New(1)
|
||||
ch1 := ps.Sub("t1")
|
||||
ch2 := ps.Sub("t1")
|
||||
ch3 := ps.Sub("t2")
|
||||
ch4 := ps.Sub("t3")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
ps.Pub("hello", "t2")
|
||||
c.Check(<-ch1, check.Equals, "hi")
|
||||
c.Check(<-ch2, check.Equals, "hi")
|
||||
c.Check(<-ch3, check.Equals, "hello")
|
||||
|
||||
ps.Close("t1", "t2")
|
||||
_, ok := <-ch1
|
||||
c.Check(ok, check.Equals, false)
|
||||
_, ok = <-ch2
|
||||
c.Check(ok, check.Equals, false)
|
||||
_, ok = <-ch3
|
||||
c.Check(ok, check.Equals, false)
|
||||
|
||||
ps.Pub("welcome", "t3")
|
||||
c.Check(<-ch4, check.Equals, "welcome")
|
||||
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestUnsubAfterClose(c *check.C) {
|
||||
ps := New(1)
|
||||
ch := ps.Sub("t1")
|
||||
defer func() {
|
||||
ps.Unsub(ch, "t1")
|
||||
ps.Shutdown()
|
||||
}()
|
||||
|
||||
ps.Close("t1")
|
||||
_, ok := <-ch
|
||||
c.Check(ok, check.Equals, false)
|
||||
}
|
||||
|
||||
func (s *Suite) TestShutdown(c *check.C) {
|
||||
start := runtime.NumGoroutine()
|
||||
New(10).Shutdown()
|
||||
time.Sleep(1)
|
||||
c.Check(runtime.NumGoroutine()-start, check.Equals, 1)
|
||||
}
|
||||
|
||||
func (s *Suite) TestMultiSub(c *check.C) {
|
||||
ps := New(1)
|
||||
ch := ps.Sub("t1", "t2")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
c.Check(<-ch, check.Equals, "hi")
|
||||
|
||||
ps.Pub("hello", "t2")
|
||||
c.Check(<-ch, check.Equals, "hello")
|
||||
|
||||
ps.Shutdown()
|
||||
_, ok := <-ch
|
||||
c.Check(ok, check.Equals, false)
|
||||
}
|
||||
|
||||
func (s *Suite) TestMultiSubOnce(c *check.C) {
|
||||
ps := New(1)
|
||||
ch := ps.SubOnce("t1", "t2")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
c.Check(<-ch, check.Equals, "hi")
|
||||
|
||||
ps.Pub("hello", "t2")
|
||||
|
||||
_, ok := <-ch
|
||||
c.Check(ok, check.Equals, false)
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestMultiSubOnceEach(c *check.C) {
|
||||
ps := New(1)
|
||||
ch := ps.SubOnceEach("t1", "t2")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
c.Check(<-ch, check.Equals, "hi")
|
||||
|
||||
ps.Pub("hi!", "t1") // ignored
|
||||
|
||||
ps.Pub("hello", "t2")
|
||||
c.Check(<-ch, check.Equals, "hello")
|
||||
|
||||
_, ok := <-ch
|
||||
c.Check(ok, check.Equals, false)
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestMultiPub(c *check.C) {
|
||||
ps := New(1)
|
||||
ch1 := ps.Sub("t1")
|
||||
ch2 := ps.Sub("t2")
|
||||
|
||||
ps.Pub("hi", "t1", "t2")
|
||||
c.Check(<-ch1, check.Equals, "hi")
|
||||
c.Check(<-ch2, check.Equals, "hi")
|
||||
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestMultiUnsub(c *check.C) {
|
||||
ps := New(1)
|
||||
ch := ps.Sub("t1", "t2", "t3")
|
||||
|
||||
ps.Unsub(ch, "t1")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
|
||||
ps.Pub("hello", "t2")
|
||||
c.Check(<-ch, check.Equals, "hello")
|
||||
|
||||
ps.Unsub(ch, "t2", "t3")
|
||||
_, ok := <-ch
|
||||
c.Check(ok, check.Equals, false)
|
||||
|
||||
ps.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Suite) TestMultiClose(c *check.C) {
|
||||
ps := New(1)
|
||||
ch := ps.Sub("t1", "t2")
|
||||
|
||||
ps.Pub("hi", "t1")
|
||||
c.Check(<-ch, check.Equals, "hi")
|
||||
|
||||
ps.Close("t1")
|
||||
ps.Pub("hello", "t2")
|
||||
c.Check(<-ch, check.Equals, "hello")
|
||||
|
||||
ps.Close("t2")
|
||||
_, ok := <-ch
|
||||
c.Check(ok, check.Equals, false)
|
||||
|
||||
ps.Shutdown()
|
||||
}
|
||||
@ -4,10 +4,9 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
|
||||
|
||||
pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
|
||||
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
|
||||
pubsub "gx/ipfs/QmdbxjQWogRCHRaxhhGnYdT1oQJzL9GdqSKzCdqWr85AP2/pubsub"
|
||||
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
|
||||
)
|
||||
|
||||
const bufferSize = 16
|
||||
|
||||
@ -563,6 +563,12 @@
|
||||
"hash": "QmTVDM4LCSUMFNQzbDLL9zQwp8usE6QHymFdh3h8vL9v6b",
|
||||
"name": "go-ipfs-blockstore",
|
||||
"version": "0.0.1"
|
||||
},
|
||||
{
|
||||
"author": "why",
|
||||
"hash": "QmdbxjQWogRCHRaxhhGnYdT1oQJzL9GdqSKzCdqWr85AP2",
|
||||
"name": "pubsub",
|
||||
"version": "1.0.0"
|
||||
}
|
||||
],
|
||||
"gxVersion": "0.10.0",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user