From a9c971fdbb305944756427e01d025c3c983b6103 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 14 Jul 2015 08:59:42 -0700 Subject: [PATCH] update go-peerstream to newest version License: MIT Signed-off-by: Jeromy --- Godeps/Godeps.json | 6 +- .../jbenet/go-peerstream/.travis.yml | 2 +- .../jbenet/go-peerstream/Godeps/Godeps.json | 6 +- .../github.com/jbenet/go-peerstream/conn.go | 38 +- .../example/blockhandler/blockhandler.go | 4 +- .../jbenet/go-peerstream/example/example.go | 4 +- .../jbenet/go-peerstream/muxtest/hack.go | 15 + .../jbenet/go-peerstream/muxtest/mux_test.go | 31 ++ .../test/ttest.go => muxtest/muxt.go} | 73 ++-- .../github.com/jbenet/go-peerstream/stream.go | 20 +- .../github.com/jbenet/go-peerstream/swarm.go | 12 +- .../transport/multiplex/multiplex_test.go | 11 - .../transport/multistream/multistream_test.go | 11 - .../transport/muxado/muxado_test.go | 11 - .../transport/spdystream/spdystream_test.go | 11 - .../transport/yamux/yamux_test.go | 11 - .../jbenet/go-stream-muxer/.travis.yml | 11 + .../jbenet/go-stream-muxer/Godeps/Godeps.json | 29 ++ .../jbenet/go-stream-muxer/Godeps/Readme | 5 + .../github.com/jbenet/go-stream-muxer/LICENSE | 21 + .../jbenet/go-stream-muxer/Makefile | 15 + .../jbenet/go-stream-muxer/README.md | 98 +++++ .../jbenet/go-stream-muxer/img/badge.png | Bin 0 -> 7694 bytes .../multiplex/multiplex.go | 21 +- .../multiplex/multiplex_test.go | 11 + .../multistream/multistream.go | 17 +- .../multistream/multistream_test.go | 11 + .../muxado/muxado.go | 23 +- .../go-stream-muxer/muxado/muxado_test.go | 11 + .../transport.go => go-stream-muxer/muxer.go} | 14 +- .../spdystream/spdystream.go | 18 +- .../spdystream/spdystream_test.go | 11 + .../jbenet/go-stream-muxer/test/ttest.go | 378 +++++++++++++++++ .../yamux/yamux.go | 22 +- .../go-stream-muxer/yamux/yamux_test.go | 11 + .../whyrusleeping/go-multiplex/.gitignore | 1 + .../whyrusleeping/go-multiplex/.gxlastpubver | 1 + .../whyrusleeping/go-multiplex/README.md | 18 + .../whyrusleeping/go-multiplex/multiplex.go | 393 ++++++++++++++++++ .../go-multiplex/multiplex_test.go | 118 ++++++ .../whyrusleeping/go-multiplex/package.json | 5 + p2p/net/swarm/swarm.go | 4 +- 42 files changed, 1352 insertions(+), 181 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/hack.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/mux_test.go rename Godeps/_workspace/src/github.com/jbenet/go-peerstream/{transport/test/ttest.go => muxtest/muxt.go} (78%) delete mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go delete mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go delete mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado_test.go delete mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go delete mode 100644 Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux_test.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/.travis.yml create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Godeps.json create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Readme create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/LICENSE create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Makefile create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/README.md create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/img/badge.png rename Godeps/_workspace/src/github.com/jbenet/{go-peerstream/transport => go-stream-muxer}/multiplex/multiplex.go (53%) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex_test.go rename Godeps/_workspace/src/github.com/jbenet/{go-peerstream/transport => go-stream-muxer}/multistream/multistream.go (73%) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream_test.go rename Godeps/_workspace/src/github.com/jbenet/{go-peerstream/transport => go-stream-muxer}/muxado/muxado.go (71%) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado_test.go rename Godeps/_workspace/src/github.com/jbenet/{go-peerstream/transport/transport.go => go-stream-muxer/muxer.go} (62%) rename Godeps/_workspace/src/github.com/jbenet/{go-peerstream/transport => go-stream-muxer}/spdystream/spdystream.go (84%) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream_test.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test/ttest.go rename Godeps/_workspace/src/github.com/jbenet/{go-peerstream/transport => go-stream-muxer}/yamux/yamux.go (76%) create mode 100644 Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux_test.go create mode 100644 Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gitignore create mode 100644 Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gxlastpubver create mode 100644 Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/README.md create mode 100644 Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex.go create mode 100644 Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex_test.go create mode 100644 Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/package.json diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index a613a529d..755c718c7 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -180,7 +180,7 @@ }, { "ImportPath": "github.com/jbenet/go-peerstream", - "Rev": "62fe5ede12f9d9cd9406750160122525b3d6b694" + "Rev": "214e6057e1c0742c0b5f9bb3060e88dea32a4380" }, { "ImportPath": "github.com/jbenet/go-random", @@ -194,6 +194,10 @@ "ImportPath": "github.com/jbenet/go-sockaddr/net", "Rev": "da304f94eea1af8ba8d1faf184623e1f9d9777dc" }, + { + "ImportPath": "github.com/jbenet/go-stream-muxer", + "Rev": "4a97500beeb081571128d41d539787e137f18404" + }, { "ImportPath": "github.com/jbenet/go-temp-err-catcher", "Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff" diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml index 415030aa9..5828fd674 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml @@ -7,5 +7,5 @@ go: - tip script: - - go test ./... + - go test -v ./... # - go test -race -cpu=5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json index 675d730e1..11479d2d9 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json @@ -17,13 +17,17 @@ "ImportPath": "github.com/inconshreveable/muxado", "Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848" }, + { + "ImportPath": "github.com/jbenet/go-stream-muxer", + "Rev": "e2e261765847234749629e0190fef193a4548303" + }, { "ImportPath": "github.com/jbenet/go-temp-err-catcher", "Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff" }, { "ImportPath": "github.com/whyrusleeping/go-multiplex", - "Rev": "ce5baa716247510379cb7640a14da857afd3b622" + "Rev": "474b9aebeb391746f304ddf7c764a5da12319857" }, { "ImportPath": "github.com/whyrusleeping/go-multistream", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go index 998e849d4..181ae4687 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go @@ -6,7 +6,7 @@ import ( "net" "sync" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) // ConnHandler is a function which receives a Conn. It allows @@ -36,8 +36,8 @@ var ErrNoConnections = errors.New("no connections") // Conn is a Swarm-associated connection. type Conn struct { - pstConn pst.Conn - netConn net.Conn // underlying connection + smuxConn smux.Conn + netConn net.Conn // underlying connection swarm *Swarm groups groupSet @@ -46,13 +46,13 @@ type Conn struct { streamLock sync.RWMutex } -func newConn(nconn net.Conn, tconn pst.Conn, s *Swarm) *Conn { +func newConn(nconn net.Conn, tconn smux.Conn, s *Swarm) *Conn { return &Conn{ - netConn: nconn, - pstConn: tconn, - swarm: s, - groups: groupSet{m: make(map[Group]struct{})}, - streams: make(map[*Stream]struct{}), + netConn: nconn, + smuxConn: tconn, + swarm: s, + groups: groupSet{m: make(map[Group]struct{})}, + streams: make(map[*Stream]struct{}), } } @@ -77,8 +77,8 @@ func (c *Conn) NetConn() net.Conn { // Conn returns the underlying transport Connection we use // Warning: modifying this object is undefined. -func (c *Conn) Conn() pst.Conn { - return c.pstConn +func (c *Conn) Conn() smux.Conn { + return c.smuxConn } // Groups returns the Groups this Conn belongs to @@ -122,7 +122,7 @@ func (c *Conn) Close() error { // close underlying connection c.swarm.removeConn(c) - err := c.pstConn.Close() + err := c.smuxConn.Close() c.swarm.notifyAll(func(n Notifiee) { n.Disconnected(c) }) @@ -166,7 +166,7 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) { s.ConnHandler()(c) // go listen for incoming streams on this connection - go c.pstConn.Serve(func(ss pst.Stream) { + go c.smuxConn.Serve(func(ss smux.Stream) { // log.Printf("accepted stream %d from %s\n", ssS.Identifier(), netConn.RemoteAddr()) stream := s.setupStream(ss, c) s.StreamHandler()(stream) // call our handler @@ -225,21 +225,21 @@ func (s *Swarm) setupConn(netConn net.Conn, isServer bool) (*Conn, error) { // all validation has happened. func (s *Swarm) createStream(c *Conn) (*Stream, error) { - // Create a new pst.Stream - pstStream, err := c.pstConn.OpenStream() + // Create a new smux.Stream + smuxStream, err := c.smuxConn.OpenStream() if err != nil { return nil, err } - return s.setupStream(pstStream, c), nil + return s.setupStream(smuxStream, c), nil } // newStream is the internal function that creates a new stream. assumes // all validation has happened. -func (s *Swarm) setupStream(pstStream pst.Stream, c *Conn) *Stream { +func (s *Swarm) setupStream(smuxStream smux.Stream, c *Conn) *Stream { // create a new stream - stream := newStream(pstStream, c) + stream := newStream(smuxStream, c) // add it to our streams maps s.streamLock.Lock() @@ -265,7 +265,7 @@ func (s *Swarm) removeStream(stream *Stream) error { s.streamLock.Unlock() stream.conn.streamLock.Unlock() - err := stream.pstStream.Close() + err := stream.smuxStream.Close() s.notifyAll(func(n Notifiee) { n.ClosedStream(stream) }) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go index f383f8497..4a7036583 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go @@ -8,7 +8,7 @@ import ( "time" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" - pstss "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" ) func die(err error) { @@ -18,7 +18,7 @@ func die(err error) { func main() { // create a new Swarm - swarm := ps.NewSwarm(pstss.Transport) + swarm := ps.NewSwarm(spdy.Transport) defer swarm.Close() // tell swarm what to do with a new incoming streams. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/example.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/example.go index 0dc14e179..9149f5a9b 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/example.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/example.go @@ -7,13 +7,13 @@ import ( "os" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" - pstss "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" ) func main() { log("creating a new swarm with spdystream transport") // create a new Swarm - swarm := ps.NewSwarm(pstss.Transport) + swarm := ps.NewSwarm(spdy.Transport) defer swarm.Close() // tell swarm what to do with a new incoming streams. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/hack.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/hack.go new file mode 100644 index 000000000..c1af4e81b --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/hack.go @@ -0,0 +1,15 @@ +package muxtest + +import ( + multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex" + multistream "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream" + muxado "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" + yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" +) + +var _ = multiplex.DefaultTransport +var _ = multistream.NewTransport +var _ = muxado.Transport +var _ = spdy.Transport +var _ = yamux.DefaultTransport diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/mux_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/mux_test.go new file mode 100644 index 000000000..811c89d29 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/mux_test.go @@ -0,0 +1,31 @@ +package muxtest + +import ( + "testing" + + multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex" + multistream "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream" + muxado "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" + yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" +) + +func TestYamuxTransport(t *testing.T) { + SubtestAll(t, yamux.DefaultTransport) +} + +func TestSpdyStreamTransport(t *testing.T) { + SubtestAll(t, spdy.Transport) +} + +func TestMultiplexTransport(t *testing.T) { + SubtestAll(t, multiplex.DefaultTransport) +} + +func TestMuxadoTransport(t *testing.T) { + SubtestAll(t, muxado.Transport) +} + +func TestMultistreamTransport(t *testing.T) { + SubtestAll(t, multistream.NewTransport()) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/muxt.go similarity index 78% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go rename to Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/muxt.go index 4d86614fb..4bfef14e6 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/muxt.go @@ -1,4 +1,4 @@ -package peerstream_transport_test +package muxtest import ( "bytes" @@ -14,11 +14,12 @@ import ( "testing" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) var randomness []byte var nextPort = 20000 +var verbose = false func init() { // read 1MB of randomness @@ -45,7 +46,7 @@ func checkErr(t *testing.T, err error) { } func log(s string, v ...interface{}) { - if testing.Verbose() { + if verbose { fmt.Fprintf(os.Stderr, "> "+s+"\n", v...) } } @@ -55,7 +56,7 @@ type echoSetup struct { conns []*ps.Conn } -func singleConn(t *testing.T, tr pst.Transport) echoSetup { +func singleConn(t *testing.T, tr smux.Transport) echoSetup { swarm := ps.NewSwarm(tr) swarm.SetStreamHandler(func(s *ps.Stream) { defer s.Close() @@ -84,7 +85,7 @@ func singleConn(t *testing.T, tr pst.Transport) echoSetup { } } -func makeSwarm(t *testing.T, tr pst.Transport, nListeners int) *ps.Swarm { +func makeSwarm(t *testing.T, tr smux.Transport, nListeners int) *ps.Swarm { swarm := ps.NewSwarm(tr) swarm.SetStreamHandler(func(s *ps.Stream) { defer s.Close() @@ -104,7 +105,7 @@ func makeSwarm(t *testing.T, tr pst.Transport, nListeners int) *ps.Swarm { return swarm } -func makeSwarms(t *testing.T, tr pst.Transport, nSwarms, nListeners int) []*ps.Swarm { +func makeSwarms(t *testing.T, tr smux.Transport, nSwarms, nListeners int) []*ps.Swarm { swarms := make([]*ps.Swarm, nSwarms) for i := 0; i < nSwarms; i++ { swarms[i] = makeSwarm(t, tr, nListeners) @@ -112,11 +113,11 @@ func makeSwarms(t *testing.T, tr pst.Transport, nSwarms, nListeners int) []*ps.S return swarms } -func SubtestConstructSwarm(t *testing.T, tr pst.Transport) { +func SubtestConstructSwarm(t *testing.T, tr smux.Transport) { ps.NewSwarm(tr) } -func SubtestSimpleWrite(t *testing.T, tr pst.Transport) { +func SubtestSimpleWrite(t *testing.T, tr smux.Transport) { swarm := ps.NewSwarm(tr) defer swarm.Close() @@ -171,18 +172,18 @@ func SubtestSimpleWrite(t *testing.T, tr pst.Transport) { } } -func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) { +func SubtestSimpleWrite100msgs(t *testing.T, tr smux.Transport) { msgs := 100 msgsize := 1 << 19 es := singleConn(t, tr) + defer es.swarm.Close() log("creating stream") stream, err := es.conns[0].NewStream() checkErr(t, err) bufs := make(chan []byte, msgs) - errs := make(chan error, msgs*100) var wg sync.WaitGroup wg.Add(1) @@ -194,7 +195,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) { bufs <- buf log("writing %d bytes (message %d/%d #%x)", len(buf), i, msgs, buf[:3]) if _, err := stream.Write(buf); err != nil { - errs <- fmt.Errorf("stream.Write(buf): %s", err) + t.Error(fmt.Errorf("stream.Write(buf): %s", err)) continue } } @@ -212,26 +213,21 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) { i++ if _, err := io.ReadFull(stream, buf2); err != nil { - errs <- fmt.Errorf("readFull(stream, buf2): %s", err) + t.Error(fmt.Errorf("readFull(stream, buf2): %s", err)) continue } if !bytes.Equal(buf1, buf2) { - errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + t.Error(fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])) } } }() wg.Wait() - close(errs) - for err := range errs { - t.Error(err) - } } -func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, nConn, nStream, nMsg int) { +func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm, nConn, nStream, nMsg int) { msgsize := 1 << 11 - errs := make(chan error, nSwarm*nConn*nStream*nMsg*100) // dont block anything. rateLimitN := 5000 rateLimitChan := make(chan struct{}, rateLimitN) // max of 5k funcs. @@ -253,7 +249,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, bufs <- buf log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3]) if _, err := s.Write(buf); err != nil { - errs <- fmt.Errorf("s.Write(buf): %s", err) + t.Error(fmt.Errorf("s.Write(buf): %s", err)) continue } } @@ -269,12 +265,12 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) if _, err := io.ReadFull(s, buf2); err != nil { - errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) + t.Error(fmt.Errorf("io.ReadFull(s, buf2): %s", err)) continue } if !bytes.Equal(buf1, buf2) { - errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + t.Error(fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])) } } } @@ -284,7 +280,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, s, err := c.NewStream() if err != nil { - errs <- fmt.Errorf("Failed to create NewStream: %s", err) + t.Error(fmt.Errorf("Failed to create NewStream: %s", err)) return } @@ -308,13 +304,13 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, nc, err := net.Dial(nla.Network(), nla.String()) if err != nil { - errs <- fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err) + t.Fatal(fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err)) return } c, err := a.AddConn(nc) if err != nil { - errs <- fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err) + t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)) return } @@ -363,47 +359,38 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, } swarms := makeSwarms(t, tr, nSwarm, 3) // 3 listeners per swarm. - - go func() { - connectSwarmsAndRW(swarms) - close(errs) // done - }() - - for err := range errs { - t.Error(err) - } - + connectSwarmsAndRW(swarms) for _, s := range swarms { s.Close() } } -func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr pst.Transport) { +func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 1) } -func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, tr pst.Transport) { +func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 100) } -func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, tr pst.Transport) { +func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 100, 100) } -func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, tr pst.Transport) { +func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 10, 50, 50) } -func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, tr pst.Transport) { +func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 5, 2, 20, 20) } -func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, tr pst.Transport) { +func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 10, 2, 100, 100) } -func SubtestAll(t *testing.T, tr pst.Transport) { +func SubtestAll(t *testing.T, tr smux.Transport) { tests := []TransportTest{ SubtestConstructSwarm, @@ -425,7 +412,7 @@ func SubtestAll(t *testing.T, tr pst.Transport) { } } -type TransportTest func(t *testing.T, tr pst.Transport) +type TransportTest func(t *testing.T, tr smux.Transport) func TestNoOp(t *testing.T) {} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/stream.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/stream.go index f1eb175d8..c9033fce3 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/stream.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/stream.go @@ -3,7 +3,7 @@ package peerstream import ( "fmt" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) // StreamHandler is a function which receives a Stream. It @@ -17,17 +17,17 @@ type StreamHandler func(s *Stream) // Stream is an io.{Read,Write,Close}r to a remote counterpart. // It wraps a spdystream.Stream, and links it to a Conn and groups type Stream struct { - pstStream pst.Stream + smuxStream smux.Stream conn *Conn groups groupSet } -func newStream(ss pst.Stream, c *Conn) *Stream { +func newStream(ss smux.Stream, c *Conn) *Stream { s := &Stream{ - conn: c, - pstStream: ss, - groups: groupSet{m: make(map[Group]struct{})}, + conn: c, + smuxStream: ss, + groups: groupSet{m: make(map[Group]struct{})}, } s.groups.AddSet(&c.groups) // inherit groups return s @@ -40,8 +40,8 @@ func (s *Stream) String() string { } // SPDYStream returns the underlying *spdystream.Stream -func (s *Stream) Stream() pst.Stream { - return s.pstStream +func (s *Stream) Stream() smux.Stream { + return s.smuxStream } // Conn returns the Conn associated with this Stream @@ -70,11 +70,11 @@ func (s *Stream) AddGroup(g Group) { } func (s *Stream) Read(p []byte) (n int, err error) { - return s.pstStream.Read(p) + return s.smuxStream.Read(p) } func (s *Stream) Write(p []byte) (n int, err error) { - return s.pstStream.Write(p) + return s.smuxStream.Write(p) } func (s *Stream) Close() error { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go index 82eb28487..92d4f65d7 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go @@ -7,7 +7,7 @@ import ( "sync" "time" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) // fd is a (file) descriptor, unix style @@ -18,7 +18,7 @@ var GarbageCollectTimeout = 5 * time.Second type Swarm struct { // the transport we'll use. - transport pst.Transport + transport smux.Transport // active streams. streams map[*Stream]struct{} @@ -46,7 +46,7 @@ type Swarm struct { closed chan struct{} } -func NewSwarm(t pst.Transport) *Swarm { +func NewSwarm(t smux.Transport) *Swarm { s := &Swarm{ transport: t, streams: make(map[*Stream]struct{}), @@ -183,7 +183,7 @@ func (s *Swarm) Conns() []*Conn { open := make([]*Conn, 0, len(conns)) for _, c := range conns { - if c.pstConn.IsClosed() { + if c.smuxConn.IsClosed() { c.Close() } else { open = append(open, c) @@ -292,7 +292,7 @@ func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error) { return nil, errors.New("connection not associated with swarm") } - if conn.pstConn.IsClosed() { + if conn.smuxConn.IsClosed() { go conn.Close() return nil, errors.New("conn is closed") } @@ -360,7 +360,7 @@ func (s *Swarm) connGarbageCollect() { } for _, c := range s.Conns() { - if c.pstConn.IsClosed() { + if c.smuxConn.IsClosed() { go c.Close() } } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go deleted file mode 100644 index 43a87af34..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package peerstream_multiplex - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestMultiplexTransport(t *testing.T) { - psttest.SubtestAll(t, DefaultTransport) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go deleted file mode 100644 index ebe46b814..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package multistream - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestMultiStreamTransport(t *testing.T) { - psttest.SubtestAll(t, NewTransport()) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado_test.go deleted file mode 100644 index 7f4ff7591..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package peerstream_muxado - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestMuxadoTransport(t *testing.T) { - psttest.SubtestAll(t, Transport) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go deleted file mode 100644 index 16e754b77..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package peerstream_spdystream - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestSpdyStreamTransport(t *testing.T) { - psttest.SubtestAll(t, Transport) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux_test.go deleted file mode 100644 index 84e3b8656..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package peerstream_yamux - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestYamuxTransport(t *testing.T) { - psttest.SubtestAll(t, DefaultTransport) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/.travis.yml new file mode 100644 index 000000000..415030aa9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/.travis.yml @@ -0,0 +1,11 @@ +language: go + +go: + - 1.3 + - 1.4 + - release + - tip + +script: + - go test ./... + # - go test -race -cpu=5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Godeps.json new file mode 100644 index 000000000..47481401c --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Godeps.json @@ -0,0 +1,29 @@ +{ + "ImportPath": "github.com/jbenet/go-stream-muxer", + "GoVersion": "go1.4.2", + "Packages": [ + "./..." + ], + "Deps": [ + { + "ImportPath": "github.com/docker/spdystream", + "Rev": "b2c3287865f3ad6aa22821ddb7b4692b896ac207" + }, + { + "ImportPath": "github.com/hashicorp/yamux", + "Rev": "b2e55852ddaf823a85c67f798080eb7d08acd71d" + }, + { + "ImportPath": "github.com/inconshreveable/muxado", + "Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848" + }, + { + "ImportPath": "github.com/whyrusleeping/go-multiplex", + "Rev": "ce5baa716247510379cb7640a14da857afd3b622" + }, + { + "ImportPath": "github.com/whyrusleeping/go-multistream", + "Rev": "08e8f9c9f5665ed0c63ffde4fa5ef1d5fb3d516d" + } + ] +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Readme b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Readme new file mode 100644 index 000000000..4cdaa53d5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Readme @@ -0,0 +1,5 @@ +This directory tree is generated automatically by godep. + +Please do not edit. + +See https://github.com/tools/godep for more information. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/LICENSE b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/LICENSE new file mode 100644 index 000000000..c7386b3c9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Juan Batiz-Benet + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Makefile b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Makefile new file mode 100644 index 000000000..f15fcaa68 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Makefile @@ -0,0 +1,15 @@ + +godep: + go get github.com/tools/godep + +vendor: godep + godep save -r ./... + +build: + go build ./... + +test: + go test ./... + +test_race: + go test -race -cpu 5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/README.md b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/README.md new file mode 100644 index 000000000..a0a08869a --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/README.md @@ -0,0 +1,98 @@ +# go-stream-muxer - generalized stream multiplexing + + +go-stream-muxer is a common interface for stream muxers, with common tests. It wraps other stream muxers (like [muxado](https://github.com/inconshreveable/muxado), [spdystream](https://github.com/docker/spdystream) and [yamux](https://github.com/hashicorp/yamux)). + +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) + +> A test suite and interface you can use to implement a stream muxer. + +### Godoc: https://godoc.org/github.com/jbenet/go-stream-muxer + +## Implementations + +* [yamux](yamux) +* [muxado](muxado) +* [multiplex](multiplex) +* [spdystream](spdystream) + +## Badge + +Include this badge in your readme if you make a new module that uses abstract-stream-muxer API. + +![](img/badge.png) + +## Client example + +```go +import ( + "net" + "fmt" + "io" + ymux "github.com/jbenet/go-stream-muxer/yamux" + smux "github.com/jbenet/go-stream-muxer" +) + +func dial() { + nconn, _ := net.Dial("tcp", "localhost:1234") + sconn, _ := ymux.DefaultTransport.NewConn(nconn, false) // false == client + + go sconn.Serve(func(smux.Stream) {}) // no-op + + s1, _ := sconn.OpenStream() + s1.Write([]byte("hello")) + + s2, _ := sconn.OpenStream() + s2.Write([]byte("world")) + + length := 20 + buf2 := make([]byte, length) + fmt.Printf("reading %d bytes from stream (echoed)\n", length) + + s1.Read(buf2) + + fmt.Printf("received %s as a response\n", string(buf2)) + + s3, _ := sconn.OpenStream() + io.Copy(s3, os.Stdin) +} +``` + +## Server example + +```go +import ( + "net" + "fmt" + "io" + ymux "github.com/jbenet/go-stream-muxer/yamux" + smux "github.com/jbenet/go-stream-muxer" +) + +func listen() { + tr := ymux.DefaultTransport + l, _ := net.Listen("tcp", "localhost:1234") + + go func() { + for { + c, _ := l.Accept() + + fmt.Println("accepted connection") + sc, _ := tr.NewConn(c, true) + + go sc.Serve(func(s smux.Stream) { + fmt.Println("serving connection") + echoStream(s) + }) + } + }() +} + +func echoStream(s smux.Stream) { + defer s.Close() + + fmt.Println("accepted stream") + io.Copy(s, s) // echo everything + fmt.Println("closing stream") +} +``` diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/img/badge.png b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/img/badge.png new file mode 100644 index 0000000000000000000000000000000000000000..738bcf4a2f5d9383b844ebc9d0f2ce6d2aa1f642 GIT binary patch literal 7694 zcmV+p9`WIcP)Py7$Vo&&RCodHT@O@L*O~vJpn!scf(A^$K#V3jB)AsZZd5SoO4H-etl8{Q=+B1K z_TAg_;W`4ojt#e3y#m*9JiOPp&j|SV=%fJfO4{4!y?9&V;Rtv^>)}9ov)w);;05jN z^DewC@o)sZp!IN|yxDG_5pYt(`fb)Oxgj6(&xch)z={CWKDFz4zYRZ&P9C z>;32PrVQ&5&|k5FWxv*gzYu7)zz8^44{l#Otq$0cBSs=9C`bX84UdeB#Q5>!5i~Xk!9l^W zT(`h%Hs1lS#oGl>N5C_B;Aw5QC?5e^Syc(&k-nH1HW58NJ+N3T@b&dYkEI6z0Rigp z8|5c#zz58K{rYu`9656E7Ff*a=x9ghRUj}l6k$RBgO~Bw!~ z4J#&0n1IovMk^^xN^t4YC6MEi4*2@{D!^J{#Yo?g0>lov8i>cVp=f;ueEb3s8Xk%0 z=tzVI2OutW9rialN@^3DYHLwj-v-NV^{NJGjJX!8WhUZm#3TQh$+dv_7&3bEq@&7B z7S}?zU(9Ui9z|YrD(E&;gN>`>+>XI5TPEiH{ z>y;~4(B9sT3+FFj#E5>nV>*F%Y;SAr&Kg*cmf_v9qv)(Fg;5PhG5%wFwYm!Mvnh2` zgAw2~RJZw!9}wiw#Oje||MQ-j?!&^u)YMP@ zN^p`c=zt9h4pKm-lqH9y0!B$Z`L4+b2?;@6U0pv7mp7_{0r7KUd-DXvJc<=-lhl3O z#6Z-StjFT#U(wQwm*Ly#t5I~UL&c}3r(=E5e(cU)t=g5EUnTi0I99R|smXCF&L=Jz z*<1EI$*{Jn60A;7MO>UVZO0|2V`E7bbUg8ytUEiu2-}3d#Hr~!4jCJFqphPB8#9s- zmn`KaK0SLIT0G_~n^AJKqqnaxS7YY}?T2ZP4McrLVcZ1R1I6SeL&a zr*^MLs-$Pgc$D=vpauD>87JeiyUAANAmbqvX1$EXx^0J0e`p(0)tKBoP2s#MSm-C* zka&mVvgxw*;K75|0|yRRFI~Lkl;)Vvlk3jTPV2U9+pKSIe_Pe}@y8!qKmFuWYhz=h z_2kKu*5k*ITN@i1trss|v|hPekw6&+AyuxT(^)(fVCirM-^^TUlv`-o@5ES=zG=2CG(|(OTKb!kyMlNm~BA z%67drY5OKEe^Q>c-P&l)((9Ca7OTkB$I58k*Qz~bu<@{VR$66d>m38FpGIpn2Wybv zFeXEHnECkV$9R72^Z5DCe~w@L;upxt$w5P7gVZ_nK;^2WxI}6Hl(3`(>Do35I9j5bQzEDB0I|(~$+NGWzl-i`f6vUp! zE-5E*S=x+HyyGw&q3$uo3z!?%f zIR5A3$j!|~TWcGFgM-!a`Pt9$v!DJ9OdCqLa`g&+`@7$Q%2#-JxKg@k1z=zt0XcTe zSPig@m*3}&8#mC_)`qKBuBwFv0=%QW1Lw}3Lv?jE{Qdjyeaxbl(bX(dc9{lT|3Dk34IT}DF$fAvu?IsIO7k-i9}|U3mH(~9CoROR37t50>=@bu z?nkPK5{A<9lZgM9Vzf3lqVA<{p{uDDhssLu+g%k}hCY7{YyTaKr-w+aFAQ@vfWo){ z--u$Ld@(=uDXoM6%&|#*s*&4j1mB;8zmus2D8#PA0&&X;6e>=?9I5l}cP-*UjBmt3 z5ir^hzq(ihtTC?w&nB?VK+StQwOR%k4=scCcWd42Se!5&5n=j|)-iS9YmY%obJK5r z{TrnSg-!_7KqjD}Vnoe;{rdIz;DZkaewrFO=(Q)LHD(8{Q1P6708p4Y%jY{E6g1&sIRXVP|d*NjOXyy zyGPN}*^25tTSb;>A@ijoX9?EXrxLj}KK(c%^yqHeMfn;SqGMq&7!Vt6z)A{V2@U2R zn43SwD?eN$@sWs(o`J*#My%Pay>Km;{qNMM@x3>pk>Tpvm~4RlGwjh*-Ly+j($C#~ zlknJ54PcqS-;er(?}N=Uta=Kw1Y^34*YmqO`@vzA(Uu`<(bK)-#i+E!oP}(`gYnR_ z0?QGvevI~*p_0QJ`h zDphY4z6DBXMG0hqfPm%HPddOw1g>~dvEt$a!wu^VoIQIM9Tz*G78FMKs5mNV!hbPB z7WU@OovYg5?ztykPeE*KEF$N>)~h7>28Cf(@)8tOZ_+4*>?8GJ&3MTV|R1 zYZ~iNQ&XcIBJnFLDzPK&9=yGBE^>;iVK9n>uB=95YbTm@`q8(c`b(GiUV!$^J7Bk3dV4Qr*cq+OEiE{I{=5PuSHNhGC15}K zT6n>VGgD`&QzV<11>I1O-RZy1lqm(_9_RJP+9h*Qlt)XR3UJwD3Km8Py$nWHn%h*_b0hZS*xh1 zP>R#3hEv#_zZp%9O`3!i=NBJJOrZpnu!2oYoG=lgA)&qYSpt>-r<5h%B_*ID#{9Li zYDLhK5>SB~C`2rL2BQ6{kSTmNz$Xr=sbWU*i^Q_cni**Mnr}g7PJ56S5-KLF{QVl& zwI^s-Aqk6-tLog0#Lzgb&dx@9axj*v`qJ>|*B(Z^jjgOOBp_qmI!O z%OPD(^{sv2d$(3E!Kqz>GKVkbrRDZkl$84fV%$o%Ak0bD8k%E_M*6yJWF&`TnS)5R zh^Z>muuRouSX+u0GM>fHcCJ$Q#VclGizuCgj>kh`y>0oT7aoDOtgK7{jgBko5UEPZ zN{L8c0D_h)RRkbCuLy1en930;!;3Gzhz+l7!09umm4xLY0jG2&F{%BF&6Pg@a%BZh z0W(2-`m@tGbNY)ANvJ8=AtLOl&SU%4}(4`;kHTX!G;fpbD-+Vdg@0 z6|*=K1*-4Bvb0|bO&DxE>`(rpSAq@|1R{bHnV#K+AN=44Sg>G$;;+oZgCL@of28t= z0uw0$l_qNRx>O{{R9=ygbSos_Nht#S15{n)s;q}jF*6^WsnV2!CH1#_-lE(PX%S$4 zE+TNb_SC6Uitna;JHFM!9Ab)rvvS8sRuSFbdEEZq+akxu$ zE?_owpxtCafb=iK#R10xllVnQGJ-wX39^ql(_|A8}qvG^l zIGua@<`nC_8RjD}WYAI(qJ5Tpl{*JW2R=LfnR3QffR(A9PAmjyuf(O)wH*YV0q6%i4ES%z7dF=GbCj~kDZbtkc7#}0%|48xeQW0b{#lzjygMnc5v0$P1h+!?~A=p9eOgCUg@+#kdl`&PitE%{`ki~D)SZh zcF{^e3j(KqO5AsLyrY@R#P+I!&=6&}y?h0A^>vszb0*T34gs)wW4z}(dIW9?wC+ZP z(>V{?XBj^F=p!6Hd{{ZX&~8h+F1N{00+Rw9K5`h%&CP0~BI5{FE*#KNMqCnMh%;c> z_m}l=C2-dWZ`@a61a28<_0bX^SG5Cy762|5Z~;OAmV3QKu_9ne9}fTNu%^6;MS-?l zN@sFjS}I5(mjBc8F9)!u799HUe{ty3Q_|#cOiOqK^Alov9aHpveAUiPRfk}iIvX(| z&XDM+ufnI8ii8-a^}>FKXQ=u6t!RJ*k5=+)Us{B48B{XhO|$-m9j*~a4v zJjm$$v+|&TyDanrpanqx0t_8zNE5h<#Xw+kp@60{x)#!zhIUzcaxsp8W#B@^!w)~K z79do_E&S6`mW|kyOyupU!?VdTy){4gqt#d!Fas+K7vbISYfHJkiFeS&QE`9?O384$ z`Ul@w6o{8v=ycr?Qtx;hj>!6LUX6o?mY(GH+<}AQXOmSi&28$fiu^*7mJ;)wo$si1 zd@5qPbfv2z&0L)J$yaF&p!UaQUs@eV59r`csY}}}Ed>NF^ONV^@*t9A&4Ns1j28-l!rVnzz3o_^96-zd*RWylRR?bbTNcwP?3T+*@BL?Qo02m&qTcSA<6M`y zLGl~$&1r6^gm~0#%TBkAgAbBdZ-LX!edn46yt^sTazi1vdcX0;8_KCAOxDo3#7hq? z+H46%N>ILNQUKBeI=Y-`KBG1xF8rH?Djz|r6fmI&qzL4@vL{UW91Pg%Attu$dsF;C zm*bg5QEp34|Ngt^dGBeLmv(GRI%cG-K+2wK+40+oimeY|#e!&TbLVDfX-0X@W~_Yj zNlbpW2JMaekXX0^(LzTG$NvD$Qhwj2V62$C!`a!~)*?&uab#}J!O|zou&T07c0AN! zYg`%TMrOgCpV|KG`7o|Lk8I{`ZA9t9qez+Yx=Y>ukqhvz{}?CkIR+HuzKZmODK2AB zU%C=ymt3O7iM!gDJT`@7 zQM_nTAm62e#v^56it^|(cbk=$*Ny8p)FJ{wOXnEoeS62-s*YRr=j`!az}lx^Ck3uG zlXE+Tj{QhmR)|fNo#LuDOQ=j35|(6Pt9V1@?sD}NwzQs9o=~~_T1A>qM@ZB|c#sZY z#sa+c+#*Cs`8nVChD7+gouXT7s+bx3+TX>Z_!xvm#9&EIfk15+PP<4pGtRY7l_|YI z%NrJjM^jRrn%{flAA&ivrpoQp_}hh#$X21gQc#=Hm(F7z=g?TSM(F zA|B7Zwo^oQF>2gQ0L@hgWK%+-sJG6czUW1)sm?{~QK=^;3}I0Tc)6rN_RE5S_{#ogL4&TTVl_vOHlSBXa)F(ot# znBXSAKT@;uaIpLxp>+0BAz2M36cLjSdx=n*^O8{=BykY4d*E=SOM6{?|53<~00nOFp{Axn8zX0%rb5PvVs6 z1^G0UTn-_4ENL%wUc#~C$CO2YRsh;mxtP#-sZ%Lc*SfC>?zDgOHK1QD{bjlIOULhN z_f5lt4?d`xM!y0|Rr1(gdcgYRC9xbW4!`{6FV%|J5c)De)(fcab_JR$AQCm%$93=h zRg-Y*#~vF7`Guh=?rw$egLcawavXE~U6p5UiKhuuFQy)mo(m8$VWK|tF zA_~mngg(;Fd_~}K!R1-?yR+dJ9F6}dugB<#_u|nfo-rt4GUO=#yacujBj5S=-Kt<+GG~Fm3hnAA+aM z!`9-TB5sI)^%S2YzS5q0DM5uIK|ymM}Ho zf93SRwcg_DqH6m~;#B@aJmAze6LyIeWO->b@)t(f3e=-Ou2LR!2AE0|uD!Pycr4Eq zOiWi(dYaK?N=HXM$|QY`)20v$KKRW_p#Y_r-WNIYBh3*RTp8`uSk;8?a``>~^jUmA z-Y%M}wyeYZqrT}ZtX1y8|JczoNM7aKEubXec+Eht5v1I5LqPLd#fkDokkcQ5pDUsR zIm_vR`RHT7Je0Khq3?d+J>4L=FYKk4UQ(Yu(#zh=Icm`=VP=Iee0v=ZHFe03%q))C zO(@Ddi z4~yQt>aXmQ*L)H(N#mRM+9j|4M6+yd7hEByuoT)S#Vn{T$;RxJ<=Fb#Ke&<1GO;$D z)ryqggkR0xtyZ}s{`$EQc+G8I!@nh=0#t3;XmH?cY>9xQ&*W$Q6}Sq`Sph}4S@ z>FhCRx#NIRmVhNapmR2rFMazBr}S~-$En>11RcL?aQgIVJoL~*h`7rc5|qN+9!}6w z(oz{yXHnu9M`v&z+~i0u{FB_DsC{Nb>X}pKD7@TWjoipxNQ|b1BLKmn(Rea52sR+X zZmt!Hz80c~xP)dZJZX8@6qt)}`GAOzj~`|j1+bN^SmI(k4Z`9*c_^2?S;`C-5RSc5 z)(R|(k(gQHs*MLslnssLm>V8|?EP(hp528{D}ZFt?E8ks17$Gg6=Un&Z`#@vfW&1d zRht$?>HV~qI&(;jU0Dgv`|MP*FBWFWEQB?P4bb`)yI>9SO6%aR9(#@bU-{UQtUT+= zluLG%3cUaR`^s#^FLRTx>QYtXDcZ;3SoZAaXO+E`^kM1JrE2-MZ=J7fcm>v-uL}aKl5@E!GDvN)XUfr%pxY3z_Oi(zh~?{S+3YOkbqh+SVsUbY>Ds1KnGlHDnp*suj3_kGlp`<%I2 z#Vs!cDZj)`V3Eg?pK|8{D<+t^9h{#oq7Q6;J7uYuDd}pX2b9#Ti>qRk)a1$pJp-Rd zQ(NsPwV3l)bNaMuXt%xszM*KV1gqftOAKP)-7I|c8I5y#{5;JD>ld(WI`_KW zd>1@!(;$xs5z2Qfg2(iL594$_YvwG-2jP?iBsB;e6R1p8l$5lwUc7WsEe;rF8PqnK z{u;^e1kgp1UR{*7{FsK?_$Vu71Sw5s{86x8-6xk^_>Kz}Kg`M9gh%gl0N@?h$y?i@ zAxT<3Iw2uJZMot08v>X>A`d072tahnA z!6l#L#P|N;drDdk)F*XoeY3i~=LhK4JikUsY+4iKySGa6GM?Z1 z;{Se$`@=HjLm2l@yWedcE|GeVToSwy!+8W86*2u4-xO$X#UOkjOSbgEjgF31<}!Y| zv$3gBN$upMWc8J7a$S#u-Xo6-cry-%5xA{D%K_tOIP>!Ilw&w82mBbvcfb2xJEUn=iU+Uj(~RrzWfpRKkk=Q@VaJAMgRZ+07*qo IM6N<$f@W_bMF0Q* literal 0 HcmV?d00001 diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex.go similarity index 53% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex.go index 79721acd5..e3257d7af 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex.go @@ -1,18 +1,20 @@ package peerstream_multiplex import ( + "errors" "net" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" - mp "github.com/whyrusleeping/go-multiplex" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" + mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) +var ErrUseServe = errors.New("not implemented, use Serve") + type conn struct { *mp.Multiplex } -func ( // Conn is a connection to a remote peer. -c *conn) Close() error { +func (c *conn) Close() error { return c.Multiplex.Close() } @@ -21,13 +23,18 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (pst.Stream, error) { +func (c *conn) OpenStream() (smux.Stream, error) { return c.Multiplex.NewStream(), nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + return nil, ErrUseServe +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler -func (c *conn) Serve(handler pst.StreamHandler) { +func (c *conn) Serve(handler smux.StreamHandler) { c.Multiplex.Serve(func(s *mp.Stream) { handler(s) }) @@ -40,6 +47,6 @@ type Transport struct{} // DefaultTransport has default settings for multiplex var DefaultTransport = &Transport{} -func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { return &conn{mp.NewMultiplex(nc, isServer)}, nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex_test.go new file mode 100644 index 000000000..de4b691d7 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex_test.go @@ -0,0 +1,11 @@ +package peerstream_multiplex + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestMultiplexTransport(t *testing.T) { + test.SubtestAll(t, DefaultTransport) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream.go similarity index 73% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream.go index 175f75c52..fe04c4d19 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream.go @@ -5,26 +5,27 @@ package multistream import ( "net" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" - multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex" - spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream" - yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux" mss "github.com/whyrusleeping/go-multistream" + + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" + multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" + yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" ) type transport struct { mux *mss.MultistreamMuxer - tpts map[string]pst.Transport + tpts map[string]smux.Transport } -func NewTransport() pst.Transport { +func NewTransport() smux.Transport { mux := mss.NewMultistreamMuxer() mux.AddHandler("/multiplex", nil) mux.AddHandler("/spdystream", nil) mux.AddHandler("/yamux", nil) - tpts := map[string]pst.Transport{ + tpts := map[string]smux.Transport{ "/multiplex": multiplex.DefaultTransport, "/spdystream": spdy.Transport, "/yamux": yamux.DefaultTransport, @@ -36,7 +37,7 @@ func NewTransport() pst.Transport { } } -func (t *transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t *transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { var proto string if isServer { selected, _, err := t.mux.Negotiate(nc) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream_test.go new file mode 100644 index 000000000..bfacf2ed4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream_test.go @@ -0,0 +1,11 @@ +package multistream + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestMultiStreamTransport(t *testing.T) { + test.SubtestAll(t, NewTransport()) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado.go similarity index 71% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado.go index 9c103ff69..74edbf48c 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado.go @@ -4,10 +4,10 @@ import ( "net" muxado "github.com/inconshreveable/muxado" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) -// stream implements pst.Stream using a ss.Stream +// stream implements smux.Stream using a ss.Stream type stream struct { ms muxado.Stream } @@ -53,7 +53,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (pst.Stream, error) { +func (c *conn) OpenStream() (smux.Stream, error) { s, err := c.ms.Open() if err != nil { return nil, err @@ -62,15 +62,24 @@ func (c *conn) OpenStream() (pst.Stream, error) { return &stream{ms: s}, nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + s, err := c.ms.Accept() + if err != nil { + return nil, err + } + return &stream{ms: s}, nil +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler -func (c *conn) Serve(handler pst.StreamHandler) { +func (c *conn) Serve(handler smux.StreamHandler) { for { // accept loop - s, err := c.ms.Accept() + s, err := c.AcceptStream() if err != nil { return // err always means closed. } - go handler(&stream{ms: s}) + go handler(s) } } @@ -80,7 +89,7 @@ type transport struct{} // spdystream-backed connections. var Transport = transport{} -func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { var s muxado.Session if isServer { s = muxado.Server(nc) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado_test.go new file mode 100644 index 000000000..91726a9ed --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado_test.go @@ -0,0 +1,11 @@ +package peerstream_muxado + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestMuxadoTransport(t *testing.T) { + test.SubtestAll(t, Transport) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxer.go similarity index 62% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxer.go index 6bace1855..e9531750a 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxer.go @@ -1,4 +1,4 @@ -package peerstream_transport +package streammux import ( "io" @@ -16,6 +16,9 @@ type Stream interface { // (usually those opened by the remote side) type StreamHandler func(Stream) +// NoOpHandler do nothing. close streams as soon as they are opened. +var NoOpHandler = func(s Stream) { s.Close() } + // Conn is a stream-multiplexing connection to a remote peer. type Conn interface { io.Closer @@ -27,12 +30,15 @@ type Conn interface { // OpenStream creates a new stream. OpenStream() (Stream, error) - // Serve starts listening for incoming requests and handles them - // using given StreamHandler + // AcceptStream accepts a stream opened by the other side. + AcceptStream() (Stream, error) + + // Serve starts a loop, accepting incoming requests and calling + // `StreamHandler with them. (Use _instead of_ accept. not both.) Serve(StreamHandler) } -// Transport constructs go-peerstream compatible connections. +// Transport constructs go-stream-muxer compatible connections. type Transport interface { // NewConn constructs a new connection diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream.go similarity index 84% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream.go index 90d0ca6b6..17baf08fa 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream.go @@ -1,14 +1,17 @@ package peerstream_spdystream import ( + "errors" "net" "net/http" ss "github.com/docker/spdystream" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) -// stream implements pst.Stream using a ss.Stream +var ErrUseServe = errors.New("not implemented, use Serve") + +// stream implements smux.Stream using a ss.Stream type stream ss.Stream func (s *stream) spdyStream() *ss.Stream { @@ -63,7 +66,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (pst.Stream, error) { +func (c *conn) OpenStream() (smux.Stream, error) { s, err := c.spdyConn().CreateStream(http.Header{ ":method": []string{"GET"}, // this is here for HTTP/SPDY interop ":path": []string{"/"}, // this is here for HTTP/SPDY interop @@ -78,9 +81,14 @@ func (c *conn) OpenStream() (pst.Stream, error) { return (*stream)(s), nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + return nil, ErrUseServe +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler -func (c *conn) Serve(handler pst.StreamHandler) { +func (c *conn) Serve(handler smux.StreamHandler) { c.spdyConn().Serve(func(s *ss.Stream) { // Flow control and backpressure of Opening streams is broken. @@ -109,7 +117,7 @@ type transport struct{} // spdystream-backed connections. var Transport = transport{} -func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { sc, err := ss.NewConnection(nc, isServer) return &conn{sc: sc, closed: make(chan struct{})}, err } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream_test.go new file mode 100644 index 000000000..97f3d17e4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream_test.go @@ -0,0 +1,11 @@ +package peerstream_spdystream + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestSpdyStreamTransport(t *testing.T) { + test.SubtestAll(t, Transport) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test/ttest.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test/ttest.go new file mode 100644 index 000000000..371f6de00 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test/ttest.go @@ -0,0 +1,378 @@ +package sm_test + +import ( + "bytes" + crand "crypto/rand" + "fmt" + "io" + mrand "math/rand" + "net" + "os" + "reflect" + "runtime" + "runtime/debug" + "sync" + "testing" + + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" +) + +var randomness []byte + +func init() { + // read 1MB of randomness + randomness = make([]byte, 1<<20) + if _, err := crand.Read(randomness); err != nil { + panic(err) + } +} + +type Options struct { + tr smux.Transport + connNum int + streamNum int + msgNum int + msgMin int + msgMax int +} + +func randBuf(size int) []byte { + n := len(randomness) - size + if size < 1 { + panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness))) + } + + start := mrand.Intn(n) + return randomness[start : start+size] +} + +func checkErr(t *testing.T, err error) { + if err != nil { + debug.PrintStack() + t.Fatal(err) + } +} + +func log(s string, v ...interface{}) { + if testing.Verbose() { + fmt.Fprintf(os.Stderr, "> "+s+"\n", v...) + } +} + +func echoStream(s smux.Stream) { + defer s.Close() + log("accepted stream") + io.Copy(&LogWriter{s}, s) // echo everything + log("closing stream") +} + +type LogWriter struct { + W io.Writer +} + +func (lw *LogWriter) Write(buf []byte) (int, error) { + if testing.Verbose() { + log("logwriter: writing %d bytes", len(buf)) + } + return lw.W.Write(buf) +} + +func GoServe(t *testing.T, tr smux.Transport, l net.Listener) (done func()) { + closed := make(chan struct{}, 1) + + go func() { + for { + c1, err := l.Accept() + if err != nil { + select { + case <-closed: + return // closed naturally. + default: + checkErr(t, err) + } + } + + log("accepted connection") + sc1, err := tr.NewConn(c1, true) + checkErr(t, err) + go sc1.Serve(echoStream) + } + }() + + return func() { + closed <- struct{}{} + } +} + +func SubtestSimpleWrite(t *testing.T, tr smux.Transport) { + l, err := net.Listen("tcp", "localhost:0") + checkErr(t, err) + log("listening at %s", l.Addr().String()) + done := GoServe(t, tr, l) + defer done() + + log("dialing to %s", l.Addr().String()) + nc1, err := net.Dial("tcp", l.Addr().String()) + checkErr(t, err) + defer nc1.Close() + + log("wrapping conn") + c1, err := tr.NewConn(nc1, false) + checkErr(t, err) + defer c1.Close() + + // serve the outgoing conn, because some muxers assume + // that we _always_ call serve. (this is an error?) + go c1.Serve(smux.NoOpHandler) + + log("creating stream") + s1, err := c1.OpenStream() + checkErr(t, err) + defer s1.Close() + + buf1 := randBuf(4096) + log("writing %d bytes to stream", len(buf1)) + _, err = s1.Write(buf1) + checkErr(t, err) + + buf2 := make([]byte, len(buf1)) + log("reading %d bytes from stream (echoed)", len(buf2)) + _, err = s1.Read(buf2) + checkErr(t, err) + + if string(buf2) != string(buf1) { + t.Error("buf1 and buf2 not equal: %s != %s", string(buf1), string(buf2)) + } + log("done") +} + +func SubtestStress(t *testing.T, opt Options) { + + msgsize := 1 << 11 + errs := make(chan error, 0) // dont block anything. + + rateLimitN := 5000 // max of 5k funcs, because -race has 8k max. + rateLimitChan := make(chan struct{}, rateLimitN) + for i := 0; i < rateLimitN; i++ { + rateLimitChan <- struct{}{} + } + + rateLimit := func(f func()) { + <-rateLimitChan + f() + rateLimitChan <- struct{}{} + } + + writeStream := func(s smux.Stream, bufs chan<- []byte) { + log("writeStream %p, %d msgNum", s, opt.msgNum) + + for i := 0; i < opt.msgNum; i++ { + buf := randBuf(msgsize) + bufs <- buf + log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3]) + if _, err := s.Write(buf); err != nil { + errs <- fmt.Errorf("s.Write(buf): %s", err) + continue + } + } + } + + readStream := func(s smux.Stream, bufs <-chan []byte) { + log("readStream %p, %d msgNum", s, opt.msgNum) + + buf2 := make([]byte, msgsize) + i := 0 + for buf1 := range bufs { + i++ + log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3]) + + if _, err := io.ReadFull(s, buf2); err != nil { + errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) + log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3]) + continue + } + if !bytes.Equal(buf1, buf2) { + errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + } + } + } + + openStreamAndRW := func(c smux.Conn) { + log("openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum) + + s, err := c.OpenStream() + if err != nil { + errs <- fmt.Errorf("Failed to create NewStream: %s", err) + return + } + + bufs := make(chan []byte, opt.msgNum) + go func() { + writeStream(s, bufs) + close(bufs) + }() + + readStream(s, bufs) + s.Close() + } + + openConnAndRW := func() { + log("openConnAndRW") + + l, err := net.Listen("tcp", "localhost:0") + checkErr(t, err) + done := GoServe(t, opt.tr, l) + defer done() + + nla := l.Addr() + nc, err := net.Dial(nla.Network(), nla.String()) + checkErr(t, err) + if err != nil { + t.Fatal(fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err)) + return + } + + c, err := opt.tr.NewConn(nc, false) + if err != nil { + t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)) + return + } + + // serve the outgoing conn, because some muxers assume + // that we _always_ call serve. (this is an error?) + go c.Serve(func(s smux.Stream) { + log("serving connection") + echoStream(s) + s.Close() + }) + + var wg sync.WaitGroup + for i := 0; i < opt.streamNum; i++ { + wg.Add(1) + go rateLimit(func() { + defer wg.Done() + openStreamAndRW(c) + }) + } + wg.Wait() + c.Close() + } + + openConnsAndRW := func() { + log("openConnsAndRW, %d conns", opt.connNum) + + var wg sync.WaitGroup + for i := 0; i < opt.connNum; i++ { + wg.Add(1) + go rateLimit(func() { + defer wg.Done() + openConnAndRW() + }) + } + wg.Wait() + } + + go func() { + openConnsAndRW() + close(errs) // done + }() + + for err := range errs { + t.Error(err) + } + +} + +func SubtestStress1Conn1Stream1Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 1, + msgNum: 1, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn1Stream100Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 1, + msgNum: 100, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn100Stream100Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 100, + msgNum: 100, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress50Conn10Stream50Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 50, + streamNum: 10, + msgNum: 50, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn1000Stream10Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 1000, + msgNum: 10, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 100, + msgNum: 100, + msgMax: 10000, + msgMin: 1000, + }) +} + +func SubtestAll(t *testing.T, tr smux.Transport) { + + tests := []TransportTest{ + SubtestSimpleWrite, + SubtestStress1Conn1Stream1Msg, + SubtestStress1Conn1Stream100Msg, + SubtestStress1Conn100Stream100Msg, + SubtestStress50Conn10Stream50Msg, + SubtestStress1Conn1000Stream10Msg, + SubtestStress1Conn100Stream100Msg10MB, + } + + for _, f := range tests { + if testing.Verbose() { + fmt.Fprintf(os.Stderr, "==== RUN %s\n", GetFunctionName(f)) + } + f(t, tr) + } +} + +type TransportTest func(t *testing.T, tr smux.Transport) + +func TestNoOp(t *testing.T) {} + +func GetFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux.go similarity index 76% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux.go index 210cd521d..29884e2e2 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux.go @@ -1,4 +1,4 @@ -package peerstream_yamux +package sm_yamux import ( "io/ioutil" @@ -6,10 +6,10 @@ import ( "time" yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/yamux" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) -// stream implements pst.Stream using a ss.Stream +// stream implements smux.Stream using a ss.Stream type stream yamux.Stream func (s *stream) yamuxStream() *yamux.Stream { @@ -44,7 +44,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (pst.Stream, error) { +func (c *conn) OpenStream() (smux.Stream, error) { s, err := c.yamuxSession().OpenStream() if err != nil { return nil, err @@ -53,15 +53,21 @@ func (c *conn) OpenStream() (pst.Stream, error) { return (*stream)(s), nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + s, err := c.yamuxSession().AcceptStream() + return (*stream)(s), err +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler -func (c *conn) Serve(handler pst.StreamHandler) { +func (c *conn) Serve(handler smux.StreamHandler) { for { // accept loop - s, err := c.yamuxSession().AcceptStream() + s, err := c.AcceptStream() if err != nil { return // err always means closed. } - go handler((*stream)(s)) + go handler(s) } } @@ -78,7 +84,7 @@ var DefaultTransport = (*Transport)(&yamux.Config{ LogOutput: ioutil.Discard, }) -func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { var s *yamux.Session var err error if isServer { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux_test.go new file mode 100644 index 000000000..9e699bf4a --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux_test.go @@ -0,0 +1,11 @@ +package sm_yamux + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestYamuxTransport(t *testing.T) { + test.SubtestAll(t, DefaultTransport) +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gitignore b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gitignore new file mode 100644 index 000000000..1377554eb --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gitignore @@ -0,0 +1 @@ +*.swp diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gxlastpubver b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gxlastpubver new file mode 100644 index 000000000..b91cdc721 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gxlastpubver @@ -0,0 +1 @@ +QmYmW76Y7NxvWwW3GPUmpbHd4mm1iW14f5UimeJ9UoofEs \ No newline at end of file diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/README.md b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/README.md new file mode 100644 index 000000000..8b9cd9504 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/README.md @@ -0,0 +1,18 @@ +# go-multiplex + +A super simple stream muxing library compatible with [multiplex](http://github.com/maxogden/multiplex) + +## Usage + +```go +mplex := multiplex.NewMultiplex(mysocket) + +s := mplex.NewStream() +s.Write([]byte("Hello World!") +s.Close() + +mplex.Serve(func(s *multiplex.Stream) { + // echo back everything received + io.Copy(s, s) +}) +``` diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex.go b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex.go new file mode 100644 index 000000000..01b2dc1c5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex.go @@ -0,0 +1,393 @@ +package multiplex + +import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" + "io/ioutil" + "sync" +) + +const ( + NewStream = iota + Receiver + Initiator + Unknown + Close +) + +var _ = ioutil.ReadAll +var _ = bufio.NewReadWriter +var _ = binary.MaxVarintLen16 + +type msg struct { + header uint64 + data []byte + err chan<- error +} + +type Stream struct { + id uint64 + name string + header uint64 + closed chan struct{} + data_in chan []byte + data_out chan<- msg + extra []byte +} + +func newStream(id uint64, name string, initiator bool, send chan<- msg) *Stream { + var hfn uint64 + if initiator { + hfn = 2 + } else { + hfn = 1 + } + return &Stream{ + id: id, + name: name, + header: (id << 3) | hfn, + data_in: make(chan []byte, 8), + data_out: send, + closed: make(chan struct{}), + } +} + +func (s *Stream) Name() string { + return s.name +} + +func (s *Stream) receive(b []byte) { + select { + case s.data_in <- b: + case <-s.closed: + } +} + +func (m *Multiplex) Accept() (*Stream, error) { + select { + case s, ok := <-m.nstreams: + if !ok { + return nil, errors.New("multiplex closed") + } + return s, nil + case err := <-m.errs: + return nil, err + case <-m.closed: + return nil, errors.New("multiplex closed") + } +} + +func (s *Stream) Read(b []byte) (int, error) { + if s.extra == nil { + select { + case <-s.closed: + return 0, io.EOF + case read, ok := <-s.data_in: + if !ok { + return 0, io.EOF + } + s.extra = read + } + } + n := copy(b, s.extra) + if n < len(s.extra) { + s.extra = s.extra[n:] + } else { + s.extra = nil + } + return n, nil +} + +func (s *Stream) Write(b []byte) (int, error) { + errs := make(chan error, 1) + select { + case s.data_out <- msg{header: s.header, data: b, err: errs}: + select { + case err := <-errs: + return len(b), err + case <-s.closed: + return 0, errors.New("stream closed") + } + + case <-s.closed: + return 0, errors.New("stream closed") + } +} + +func (s *Stream) Close() error { + select { + case <-s.closed: + return nil + default: + close(s.closed) + select { + case s.data_out <- msg{ + header: (s.id << 3) | Close, + err: make(chan error, 1), //throw away error, whatever + }: + default: + } + close(s.data_in) + return nil + } +} + +type Multiplex struct { + con io.ReadWriteCloser + buf *bufio.Reader + nextID uint64 + outchan chan msg + closed chan struct{} + initiator bool + + nstreams chan *Stream + errs chan error + + channels map[uint64]*Stream + ch_lock sync.Mutex +} + +func NewMultiplex(con io.ReadWriteCloser, initiator bool) *Multiplex { + mp := &Multiplex{ + con: con, + initiator: initiator, + buf: bufio.NewReader(con), + channels: make(map[uint64]*Stream), + outchan: make(chan msg), + closed: make(chan struct{}), + nstreams: make(chan *Stream, 16), + errs: make(chan error), + } + + go mp.handleOutgoing() + go mp.handleIncoming() + + return mp +} + +func (mp *Multiplex) Close() error { + if mp.IsClosed() { + return nil + } + close(mp.closed) + mp.ch_lock.Lock() + defer mp.ch_lock.Unlock() + for _, s := range mp.channels { + err := s.Close() + if err != nil { + return err + } + } + return nil +} + +func (mp *Multiplex) IsClosed() bool { + select { + case <-mp.closed: + return true + default: + return false + } +} + +func (mp *Multiplex) handleOutgoing() { + for { + select { + case msg, ok := <-mp.outchan: + if !ok { + return + } + + buf := EncodeVarint(msg.header) + _, err := mp.con.Write(buf) + if err != nil { + msg.err <- err + continue + } + + buf = EncodeVarint(uint64(len(msg.data))) + _, err = mp.con.Write(buf) + if err != nil { + msg.err <- err + continue + } + + _, err = mp.con.Write(msg.data) + if err != nil { + msg.err <- err + continue + } + + msg.err <- nil + case <-mp.closed: + return + } + } +} + +func (mp *Multiplex) nextChanID() (out uint64) { + if mp.initiator { + out = mp.nextID + 1 + } else { + out = mp.nextID + } + mp.nextID += 2 + return +} + +func (mp *Multiplex) NewStream() *Stream { + return mp.NewNamedStream("") +} + +func (mp *Multiplex) NewNamedStream(name string) *Stream { + mp.ch_lock.Lock() + sid := mp.nextChanID() + header := (sid << 3) | NewStream + + if name == "" { + name = fmt.Sprint(sid) + } + s := newStream(sid, name, true, mp.outchan) + mp.channels[sid] = s + mp.ch_lock.Unlock() + + mp.outchan <- msg{ + header: header, + data: []byte(name), + err: make(chan error, 1), //throw away error + } + + return s +} + +func (mp *Multiplex) sendErr(err error) { + select { + case mp.errs <- err: + case <-mp.closed: + } +} + +func (mp *Multiplex) handleIncoming() { + defer mp.shutdown() + for { + ch, tag, err := mp.readNextHeader() + if err != nil { + mp.sendErr(err) + return + } + + b, err := mp.readNext() + if err != nil { + mp.sendErr(err) + return + } + + mp.ch_lock.Lock() + msch, ok := mp.channels[ch] + if !ok { + var name string + if tag == NewStream { + name = string(b) + } + msch = newStream(ch, name, false, mp.outchan) + mp.channels[ch] = msch + select { + case mp.nstreams <- msch: + case <-mp.closed: + return + } + if tag == NewStream { + mp.ch_lock.Unlock() + continue + } + } + mp.ch_lock.Unlock() + + if tag == Close { + msch.Close() + mp.ch_lock.Lock() + delete(mp.channels, ch) + mp.ch_lock.Unlock() + continue + } + + msch.receive(b) + } +} + +func (mp *Multiplex) shutdown() { + mp.ch_lock.Lock() + defer mp.ch_lock.Unlock() + for _, s := range mp.channels { + s.Close() + } +} + +func (mp *Multiplex) readNextHeader() (uint64, uint64, error) { + h, _, err := DecodeVarint(mp.buf) + if err != nil { + return 0, 0, err + } + + // get channel ID + ch := h >> 3 + + rem := h & 7 + + return ch, rem, nil +} + +func (mp *Multiplex) readNext() ([]byte, error) { + // get length + l, _, err := DecodeVarint(mp.buf) + if err != nil { + return nil, err + } + + buf := make([]byte, l) + n, err := io.ReadFull(mp.buf, buf) + if err != nil { + return nil, err + } + + if n != int(l) { + panic("NOT THE SAME") + } + + return buf, nil +} + +func EncodeVarint(x uint64) []byte { + var buf [10]byte + var n int + for n = 0; x > 127; n++ { + buf[n] = 0x80 | uint8(x&0x7F) + x >>= 7 + } + buf[n] = uint8(x) + n++ + return buf[0:n] +} + +func DecodeVarint(r *bufio.Reader) (x uint64, n int, err error) { + // x, n already 0 + for shift := uint(0); shift < 64; shift += 7 { + val, err := r.ReadByte() + if err != nil { + return 0, 0, err + } + + b := uint64(val) + n++ + x |= (b & 0x7F) << shift + if (b & 0x80) == 0 { + return x, n, nil + } + } + + // The number is too large to represent in a 64-bit value. + return 0, 0, errors.New("Too large of a number!") +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex_test.go b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex_test.go new file mode 100644 index 000000000..5d713effc --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex_test.go @@ -0,0 +1,118 @@ +package multiplex + +import ( + "fmt" + "io" + "net" + "testing" + + rand "github.com/dustin/randbo" +) + +func TestBasicStreams(t *testing.T) { + a, b := net.Pipe() + + mpa := NewMultiplex(a, false) + mpb := NewMultiplex(b, true) + + mes := []byte("Hello world") + go func() { + s, err := mpb.Accept() + if err != nil { + t.Fatal(err) + } + + _, err = s.Write(mes) + if err != nil { + t.Fatal(err) + } + + err = s.Close() + if err != nil { + t.Fatal(err) + } + }() + + s := mpa.NewStream() + + buf := make([]byte, len(mes)) + n, err := s.Read(buf) + if err != nil { + t.Fatal(err) + } + + if n != len(mes) { + t.Fatal("read wrong amount") + } + + if string(buf) != string(mes) { + t.Fatal("got bad data") + } + + s.Close() + + mpa.Close() + mpb.Close() +} + +func TestEcho(t *testing.T) { + a, b := net.Pipe() + + mpa := NewMultiplex(a, false) + mpb := NewMultiplex(b, true) + + mes := make([]byte, 40960) + rand.New().Read(mes) + go func() { + s, err := mpb.Accept() + if err != nil { + t.Fatal(err) + } + + defer s.Close() + io.Copy(s, s) + }() + + s := mpa.NewStream() + + _, err := s.Write(mes) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, len(mes)) + n, err := io.ReadFull(s, buf) + if err != nil { + t.Fatal(err) + } + + if n != len(mes) { + t.Fatal("read wrong amount") + } + + if err := arrComp(buf, mes); err != nil { + t.Fatal(err) + } + s.Close() + + mpa.Close() + mpb.Close() +} + +func arrComp(a, b []byte) error { + msg := "" + if len(a) != len(b) { + msg += fmt.Sprintf("arrays differ in length: %d %d\n", len(a), len(b)) + } + + for i := 0; i < len(a) && i < len(b); i++ { + if a[i] != b[i] { + msg += fmt.Sprintf("content differs at index %d [%d != %d]", i, a[i], b[i]) + return fmt.Errorf(msg) + } + } + if len(msg) > 0 { + return fmt.Errorf(msg) + } + return nil +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/package.json b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/package.json new file mode 100644 index 000000000..9b0e7000a --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/package.json @@ -0,0 +1,5 @@ +{ + "name": "go-multiplex", + "version": "1.0.0", + "language": "go" +} \ No newline at end of file diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index bf3d0395f..b994a8276 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -16,8 +16,8 @@ import ( ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" - psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux" + pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" + psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" prom "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"