From b63cbf5331495e9ab9683d98b7efde34bc7d955a Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 24 Jan 2015 05:36:03 -0800 Subject: [PATCH] updated peerstream --- Godeps/Godeps.json | 2 +- .../jbenet/go-peerstream/.gitignore | 1 + .../jbenet/go-peerstream/.travis.yml | 3 +- .../github.com/jbenet/go-peerstream/conn.go | 19 ++++++++- .../github.com/jbenet/go-peerstream/swarm.go | 39 +++++++++++++++++++ .../go-peerstream/transport/test/ttest.go | 6 +++ 6 files changed, 65 insertions(+), 5 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/.gitignore diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 4cb9a36a9..1e8a98ebc 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -165,7 +165,7 @@ }, { "ImportPath": "github.com/jbenet/go-peerstream", - "Rev": "530b09b2300da11cc19f479289be5d014c146581" + "Rev": "bbe2a6461aa80ee25fd87eccf35bd54bac7f788d" }, { "ImportPath": "github.com/jbenet/go-random", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.gitignore b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.gitignore new file mode 100644 index 000000000..00a069958 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.gitignore @@ -0,0 +1 @@ +example/closer diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml index 66d247e53..3c1c47ab9 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml @@ -1,11 +1,10 @@ language: go go: - - 1.2 - 1.3 - 1.4 - release - tip script: - - go test -race -cpu=5 -v ./... + - go test -race -cpu=5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go index 378218e84..c26136e29 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go @@ -122,7 +122,11 @@ func (c *Conn) Close() error { // close underlying connection c.swarm.removeConn(c) - return c.pstConn.Close() + err := c.pstConn.Close() + c.swarm.notifyAll(func(n Notifiee) { + n.Disconnected(c) + }) + return err } // ConnsWithGroup narrows down a set of connections to those in a given group. @@ -198,6 +202,9 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) { s.StreamHandler()(stream) // call our handler }) + s.notifyAll(func(n Notifiee) { + n.Connected(c) + }) return c, nil } @@ -228,6 +235,10 @@ func (s *Swarm) setupStream(pstStream pst.Stream, c *Conn) *Stream { c.streams[stream] = struct{}{} s.streamLock.Unlock() c.streamLock.Unlock() + + s.notifyAll(func(n Notifiee) { + n.OpenedStream(stream) + }) return stream } @@ -241,7 +252,11 @@ func (s *Swarm) removeStream(stream *Stream) error { s.streamLock.Unlock() stream.conn.streamLock.Unlock() - return stream.pstStream.Close() + err := stream.pstStream.Close() + s.notifyAll(func(n Notifiee) { + n.ClosedStream(stream) + }) + return err } func (s *Swarm) removeConn(conn *Conn) { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go index e25b44d82..923fc4324 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go @@ -39,6 +39,10 @@ type Swarm struct { streamHandler StreamHandler // receives Streams initiated remotely selectConn SelectConn // default SelectConn function + // notification listeners + notifiees map[Notifiee]struct{} + notifieeLock sync.RWMutex + closed chan struct{} } @@ -48,6 +52,7 @@ func NewSwarm(t pst.Transport) *Swarm { streams: make(map[*Stream]struct{}), conns: make(map[*Conn]struct{}), listeners: make(map[*Listener]struct{}), + notifiees: make(map[Notifiee]struct{}), selectConn: SelectRandomConn, streamHandler: NoOpStreamHandler, connHandler: NoOpConnHandler, @@ -361,3 +366,37 @@ func (s *Swarm) connGarbageCollect() { } } } + +// Notify signs up Notifiee to receive signals when events happen +func (s *Swarm) Notify(n Notifiee) { + s.notifieeLock.Lock() + s.notifiees[n] = struct{}{} + s.notifieeLock.Unlock() +} + +// StopNotify unregisters Notifiee fromr receiving signals +func (s *Swarm) StopNotify(n Notifiee) { + s.notifieeLock.Lock() + delete(s.notifiees, n) + s.notifieeLock.Unlock() +} + +// notifyAll runs the notification function on all Notifiees +func (s *Swarm) notifyAll(notification func(n Notifiee)) { + s.notifieeLock.RLock() + for n := range s.notifiees { + // make sure we dont block + // and they dont block each other. + go notification(n) + } + s.notifieeLock.RUnlock() +} + +// Notifiee is an interface for an object wishing to receive +// notifications from a Swarm +type Notifiee interface { + Connected(*Conn) // called when a connection opened + Disconnected(*Conn) // called when a connection closed + OpenedStream(*Stream) // called when a stream opened + ClosedStream(*Stream) // called when a stream closed +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go index d9aaff969..cd0047c83 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go @@ -350,6 +350,8 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, for _, a := range swarms { for _, b := range swarms { wg.Add(1) + a := a // race + b := b // race go rateLimit(func() { defer wg.Done() openConnsAndRW(a, b) @@ -370,6 +372,10 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, t.Error(err) } + for _, s := range swarms { + s.Close() + } + } func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr pst.Transport) {