diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index a51a179f6..7880400d5 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -180,7 +180,7 @@ }, { "ImportPath": "github.com/jbenet/go-peerstream", - "Rev": "62fe5ede12f9d9cd9406750160122525b3d6b694" + "Rev": "214e6057e1c0742c0b5f9bb3060e88dea32a4380" }, { "ImportPath": "github.com/jbenet/go-random", @@ -194,6 +194,10 @@ "ImportPath": "github.com/jbenet/go-sockaddr/net", "Rev": "da304f94eea1af8ba8d1faf184623e1f9d9777dc" }, + { + "ImportPath": "github.com/jbenet/go-stream-muxer", + "Rev": "4a97500beeb081571128d41d539787e137f18404" + }, { "ImportPath": "github.com/jbenet/go-temp-err-catcher", "Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff" diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml index 415030aa9..5828fd674 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml @@ -7,5 +7,5 @@ go: - tip script: - - go test ./... + - go test -v ./... # - go test -race -cpu=5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json index 675d730e1..11479d2d9 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json @@ -17,13 +17,17 @@ "ImportPath": "github.com/inconshreveable/muxado", "Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848" }, + { + "ImportPath": "github.com/jbenet/go-stream-muxer", + "Rev": "e2e261765847234749629e0190fef193a4548303" + }, { "ImportPath": "github.com/jbenet/go-temp-err-catcher", "Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff" }, { "ImportPath": "github.com/whyrusleeping/go-multiplex", - "Rev": "ce5baa716247510379cb7640a14da857afd3b622" + "Rev": "474b9aebeb391746f304ddf7c764a5da12319857" }, { "ImportPath": "github.com/whyrusleeping/go-multistream", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go index 998e849d4..181ae4687 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go @@ -6,7 +6,7 @@ import ( "net" "sync" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) // ConnHandler is a function which receives a Conn. It allows @@ -36,8 +36,8 @@ var ErrNoConnections = errors.New("no connections") // Conn is a Swarm-associated connection. type Conn struct { - pstConn pst.Conn - netConn net.Conn // underlying connection + smuxConn smux.Conn + netConn net.Conn // underlying connection swarm *Swarm groups groupSet @@ -46,13 +46,13 @@ type Conn struct { streamLock sync.RWMutex } -func newConn(nconn net.Conn, tconn pst.Conn, s *Swarm) *Conn { +func newConn(nconn net.Conn, tconn smux.Conn, s *Swarm) *Conn { return &Conn{ - netConn: nconn, - pstConn: tconn, - swarm: s, - groups: groupSet{m: make(map[Group]struct{})}, - streams: make(map[*Stream]struct{}), + netConn: nconn, + smuxConn: tconn, + swarm: s, + groups: groupSet{m: make(map[Group]struct{})}, + streams: make(map[*Stream]struct{}), } } @@ -77,8 +77,8 @@ func (c *Conn) NetConn() net.Conn { // Conn returns the underlying transport Connection we use // Warning: modifying this object is undefined. -func (c *Conn) Conn() pst.Conn { - return c.pstConn +func (c *Conn) Conn() smux.Conn { + return c.smuxConn } // Groups returns the Groups this Conn belongs to @@ -122,7 +122,7 @@ func (c *Conn) Close() error { // close underlying connection c.swarm.removeConn(c) - err := c.pstConn.Close() + err := c.smuxConn.Close() c.swarm.notifyAll(func(n Notifiee) { n.Disconnected(c) }) @@ -166,7 +166,7 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) { s.ConnHandler()(c) // go listen for incoming streams on this connection - go c.pstConn.Serve(func(ss pst.Stream) { + go c.smuxConn.Serve(func(ss smux.Stream) { // log.Printf("accepted stream %d from %s\n", ssS.Identifier(), netConn.RemoteAddr()) stream := s.setupStream(ss, c) s.StreamHandler()(stream) // call our handler @@ -225,21 +225,21 @@ func (s *Swarm) setupConn(netConn net.Conn, isServer bool) (*Conn, error) { // all validation has happened. func (s *Swarm) createStream(c *Conn) (*Stream, error) { - // Create a new pst.Stream - pstStream, err := c.pstConn.OpenStream() + // Create a new smux.Stream + smuxStream, err := c.smuxConn.OpenStream() if err != nil { return nil, err } - return s.setupStream(pstStream, c), nil + return s.setupStream(smuxStream, c), nil } // newStream is the internal function that creates a new stream. assumes // all validation has happened. -func (s *Swarm) setupStream(pstStream pst.Stream, c *Conn) *Stream { +func (s *Swarm) setupStream(smuxStream smux.Stream, c *Conn) *Stream { // create a new stream - stream := newStream(pstStream, c) + stream := newStream(smuxStream, c) // add it to our streams maps s.streamLock.Lock() @@ -265,7 +265,7 @@ func (s *Swarm) removeStream(stream *Stream) error { s.streamLock.Unlock() stream.conn.streamLock.Unlock() - err := stream.pstStream.Close() + err := stream.smuxStream.Close() s.notifyAll(func(n Notifiee) { n.ClosedStream(stream) }) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go index f383f8497..4a7036583 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/blockhandler/blockhandler.go @@ -8,7 +8,7 @@ import ( "time" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" - pstss "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" ) func die(err error) { @@ -18,7 +18,7 @@ func die(err error) { func main() { // create a new Swarm - swarm := ps.NewSwarm(pstss.Transport) + swarm := ps.NewSwarm(spdy.Transport) defer swarm.Close() // tell swarm what to do with a new incoming streams. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/example.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/example.go index 0dc14e179..9149f5a9b 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/example.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/example/example.go @@ -7,13 +7,13 @@ import ( "os" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" - pstss "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" ) func main() { log("creating a new swarm with spdystream transport") // create a new Swarm - swarm := ps.NewSwarm(pstss.Transport) + swarm := ps.NewSwarm(spdy.Transport) defer swarm.Close() // tell swarm what to do with a new incoming streams. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/hack.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/hack.go new file mode 100644 index 000000000..c1af4e81b --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/hack.go @@ -0,0 +1,15 @@ +package muxtest + +import ( + multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex" + multistream "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream" + muxado "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" + yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" +) + +var _ = multiplex.DefaultTransport +var _ = multistream.NewTransport +var _ = muxado.Transport +var _ = spdy.Transport +var _ = yamux.DefaultTransport diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/mux_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/mux_test.go new file mode 100644 index 000000000..811c89d29 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/mux_test.go @@ -0,0 +1,31 @@ +package muxtest + +import ( + "testing" + + multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex" + multistream "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream" + muxado "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" + yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" +) + +func TestYamuxTransport(t *testing.T) { + SubtestAll(t, yamux.DefaultTransport) +} + +func TestSpdyStreamTransport(t *testing.T) { + SubtestAll(t, spdy.Transport) +} + +func TestMultiplexTransport(t *testing.T) { + SubtestAll(t, multiplex.DefaultTransport) +} + +func TestMuxadoTransport(t *testing.T) { + SubtestAll(t, muxado.Transport) +} + +func TestMultistreamTransport(t *testing.T) { + SubtestAll(t, multistream.NewTransport()) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/muxt.go similarity index 78% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go rename to Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/muxt.go index 4d86614fb..4bfef14e6 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/muxtest/muxt.go @@ -1,4 +1,4 @@ -package peerstream_transport_test +package muxtest import ( "bytes" @@ -14,11 +14,12 @@ import ( "testing" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) var randomness []byte var nextPort = 20000 +var verbose = false func init() { // read 1MB of randomness @@ -45,7 +46,7 @@ func checkErr(t *testing.T, err error) { } func log(s string, v ...interface{}) { - if testing.Verbose() { + if verbose { fmt.Fprintf(os.Stderr, "> "+s+"\n", v...) } } @@ -55,7 +56,7 @@ type echoSetup struct { conns []*ps.Conn } -func singleConn(t *testing.T, tr pst.Transport) echoSetup { +func singleConn(t *testing.T, tr smux.Transport) echoSetup { swarm := ps.NewSwarm(tr) swarm.SetStreamHandler(func(s *ps.Stream) { defer s.Close() @@ -84,7 +85,7 @@ func singleConn(t *testing.T, tr pst.Transport) echoSetup { } } -func makeSwarm(t *testing.T, tr pst.Transport, nListeners int) *ps.Swarm { +func makeSwarm(t *testing.T, tr smux.Transport, nListeners int) *ps.Swarm { swarm := ps.NewSwarm(tr) swarm.SetStreamHandler(func(s *ps.Stream) { defer s.Close() @@ -104,7 +105,7 @@ func makeSwarm(t *testing.T, tr pst.Transport, nListeners int) *ps.Swarm { return swarm } -func makeSwarms(t *testing.T, tr pst.Transport, nSwarms, nListeners int) []*ps.Swarm { +func makeSwarms(t *testing.T, tr smux.Transport, nSwarms, nListeners int) []*ps.Swarm { swarms := make([]*ps.Swarm, nSwarms) for i := 0; i < nSwarms; i++ { swarms[i] = makeSwarm(t, tr, nListeners) @@ -112,11 +113,11 @@ func makeSwarms(t *testing.T, tr pst.Transport, nSwarms, nListeners int) []*ps.S return swarms } -func SubtestConstructSwarm(t *testing.T, tr pst.Transport) { +func SubtestConstructSwarm(t *testing.T, tr smux.Transport) { ps.NewSwarm(tr) } -func SubtestSimpleWrite(t *testing.T, tr pst.Transport) { +func SubtestSimpleWrite(t *testing.T, tr smux.Transport) { swarm := ps.NewSwarm(tr) defer swarm.Close() @@ -171,18 +172,18 @@ func SubtestSimpleWrite(t *testing.T, tr pst.Transport) { } } -func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) { +func SubtestSimpleWrite100msgs(t *testing.T, tr smux.Transport) { msgs := 100 msgsize := 1 << 19 es := singleConn(t, tr) + defer es.swarm.Close() log("creating stream") stream, err := es.conns[0].NewStream() checkErr(t, err) bufs := make(chan []byte, msgs) - errs := make(chan error, msgs*100) var wg sync.WaitGroup wg.Add(1) @@ -194,7 +195,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) { bufs <- buf log("writing %d bytes (message %d/%d #%x)", len(buf), i, msgs, buf[:3]) if _, err := stream.Write(buf); err != nil { - errs <- fmt.Errorf("stream.Write(buf): %s", err) + t.Error(fmt.Errorf("stream.Write(buf): %s", err)) continue } } @@ -212,26 +213,21 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) { i++ if _, err := io.ReadFull(stream, buf2); err != nil { - errs <- fmt.Errorf("readFull(stream, buf2): %s", err) + t.Error(fmt.Errorf("readFull(stream, buf2): %s", err)) continue } if !bytes.Equal(buf1, buf2) { - errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + t.Error(fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])) } } }() wg.Wait() - close(errs) - for err := range errs { - t.Error(err) - } } -func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, nConn, nStream, nMsg int) { +func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm, nConn, nStream, nMsg int) { msgsize := 1 << 11 - errs := make(chan error, nSwarm*nConn*nStream*nMsg*100) // dont block anything. rateLimitN := 5000 rateLimitChan := make(chan struct{}, rateLimitN) // max of 5k funcs. @@ -253,7 +249,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, bufs <- buf log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3]) if _, err := s.Write(buf); err != nil { - errs <- fmt.Errorf("s.Write(buf): %s", err) + t.Error(fmt.Errorf("s.Write(buf): %s", err)) continue } } @@ -269,12 +265,12 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) if _, err := io.ReadFull(s, buf2); err != nil { - errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) + t.Error(fmt.Errorf("io.ReadFull(s, buf2): %s", err)) continue } if !bytes.Equal(buf1, buf2) { - errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + t.Error(fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])) } } } @@ -284,7 +280,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, s, err := c.NewStream() if err != nil { - errs <- fmt.Errorf("Failed to create NewStream: %s", err) + t.Error(fmt.Errorf("Failed to create NewStream: %s", err)) return } @@ -308,13 +304,13 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, nc, err := net.Dial(nla.Network(), nla.String()) if err != nil { - errs <- fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err) + t.Fatal(fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err)) return } c, err := a.AddConn(nc) if err != nil { - errs <- fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err) + t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)) return } @@ -363,47 +359,38 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, } swarms := makeSwarms(t, tr, nSwarm, 3) // 3 listeners per swarm. - - go func() { - connectSwarmsAndRW(swarms) - close(errs) // done - }() - - for err := range errs { - t.Error(err) - } - + connectSwarmsAndRW(swarms) for _, s := range swarms { s.Close() } } -func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr pst.Transport) { +func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 1) } -func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, tr pst.Transport) { +func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 100) } -func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, tr pst.Transport) { +func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 100, 100) } -func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, tr pst.Transport) { +func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 10, 50, 50) } -func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, tr pst.Transport) { +func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 5, 2, 20, 20) } -func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, tr pst.Transport) { +func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, tr smux.Transport) { SubtestStressNSwarmNConnNStreamNMsg(t, tr, 10, 2, 100, 100) } -func SubtestAll(t *testing.T, tr pst.Transport) { +func SubtestAll(t *testing.T, tr smux.Transport) { tests := []TransportTest{ SubtestConstructSwarm, @@ -425,7 +412,7 @@ func SubtestAll(t *testing.T, tr pst.Transport) { } } -type TransportTest func(t *testing.T, tr pst.Transport) +type TransportTest func(t *testing.T, tr smux.Transport) func TestNoOp(t *testing.T) {} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/stream.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/stream.go index f1eb175d8..c9033fce3 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/stream.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/stream.go @@ -3,7 +3,7 @@ package peerstream import ( "fmt" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) // StreamHandler is a function which receives a Stream. It @@ -17,17 +17,17 @@ type StreamHandler func(s *Stream) // Stream is an io.{Read,Write,Close}r to a remote counterpart. // It wraps a spdystream.Stream, and links it to a Conn and groups type Stream struct { - pstStream pst.Stream + smuxStream smux.Stream conn *Conn groups groupSet } -func newStream(ss pst.Stream, c *Conn) *Stream { +func newStream(ss smux.Stream, c *Conn) *Stream { s := &Stream{ - conn: c, - pstStream: ss, - groups: groupSet{m: make(map[Group]struct{})}, + conn: c, + smuxStream: ss, + groups: groupSet{m: make(map[Group]struct{})}, } s.groups.AddSet(&c.groups) // inherit groups return s @@ -40,8 +40,8 @@ func (s *Stream) String() string { } // SPDYStream returns the underlying *spdystream.Stream -func (s *Stream) Stream() pst.Stream { - return s.pstStream +func (s *Stream) Stream() smux.Stream { + return s.smuxStream } // Conn returns the Conn associated with this Stream @@ -70,11 +70,11 @@ func (s *Stream) AddGroup(g Group) { } func (s *Stream) Read(p []byte) (n int, err error) { - return s.pstStream.Read(p) + return s.smuxStream.Read(p) } func (s *Stream) Write(p []byte) (n int, err error) { - return s.pstStream.Write(p) + return s.smuxStream.Write(p) } func (s *Stream) Close() error { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go index 82eb28487..92d4f65d7 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/swarm.go @@ -7,7 +7,7 @@ import ( "sync" "time" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) // fd is a (file) descriptor, unix style @@ -18,7 +18,7 @@ var GarbageCollectTimeout = 5 * time.Second type Swarm struct { // the transport we'll use. - transport pst.Transport + transport smux.Transport // active streams. streams map[*Stream]struct{} @@ -46,7 +46,7 @@ type Swarm struct { closed chan struct{} } -func NewSwarm(t pst.Transport) *Swarm { +func NewSwarm(t smux.Transport) *Swarm { s := &Swarm{ transport: t, streams: make(map[*Stream]struct{}), @@ -183,7 +183,7 @@ func (s *Swarm) Conns() []*Conn { open := make([]*Conn, 0, len(conns)) for _, c := range conns { - if c.pstConn.IsClosed() { + if c.smuxConn.IsClosed() { c.Close() } else { open = append(open, c) @@ -292,7 +292,7 @@ func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error) { return nil, errors.New("connection not associated with swarm") } - if conn.pstConn.IsClosed() { + if conn.smuxConn.IsClosed() { go conn.Close() return nil, errors.New("conn is closed") } @@ -360,7 +360,7 @@ func (s *Swarm) connGarbageCollect() { } for _, c := range s.Conns() { - if c.pstConn.IsClosed() { + if c.smuxConn.IsClosed() { go c.Close() } } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go deleted file mode 100644 index 43a87af34..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package peerstream_multiplex - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestMultiplexTransport(t *testing.T) { - psttest.SubtestAll(t, DefaultTransport) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go deleted file mode 100644 index ebe46b814..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package multistream - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestMultiStreamTransport(t *testing.T) { - psttest.SubtestAll(t, NewTransport()) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado_test.go deleted file mode 100644 index 7f4ff7591..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package peerstream_muxado - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestMuxadoTransport(t *testing.T) { - psttest.SubtestAll(t, Transport) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go deleted file mode 100644 index 16e754b77..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package peerstream_spdystream - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestSpdyStreamTransport(t *testing.T) { - psttest.SubtestAll(t, Transport) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux_test.go deleted file mode 100644 index 84e3b8656..000000000 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package peerstream_yamux - -import ( - "testing" - - psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" -) - -func TestYamuxTransport(t *testing.T) { - psttest.SubtestAll(t, DefaultTransport) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/.travis.yml new file mode 100644 index 000000000..415030aa9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/.travis.yml @@ -0,0 +1,11 @@ +language: go + +go: + - 1.3 + - 1.4 + - release + - tip + +script: + - go test ./... + # - go test -race -cpu=5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Godeps.json new file mode 100644 index 000000000..47481401c --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Godeps.json @@ -0,0 +1,29 @@ +{ + "ImportPath": "github.com/jbenet/go-stream-muxer", + "GoVersion": "go1.4.2", + "Packages": [ + "./..." + ], + "Deps": [ + { + "ImportPath": "github.com/docker/spdystream", + "Rev": "b2c3287865f3ad6aa22821ddb7b4692b896ac207" + }, + { + "ImportPath": "github.com/hashicorp/yamux", + "Rev": "b2e55852ddaf823a85c67f798080eb7d08acd71d" + }, + { + "ImportPath": "github.com/inconshreveable/muxado", + "Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848" + }, + { + "ImportPath": "github.com/whyrusleeping/go-multiplex", + "Rev": "ce5baa716247510379cb7640a14da857afd3b622" + }, + { + "ImportPath": "github.com/whyrusleeping/go-multistream", + "Rev": "08e8f9c9f5665ed0c63ffde4fa5ef1d5fb3d516d" + } + ] +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Readme b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Readme new file mode 100644 index 000000000..4cdaa53d5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Godeps/Readme @@ -0,0 +1,5 @@ +This directory tree is generated automatically by godep. + +Please do not edit. + +See https://github.com/tools/godep for more information. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/LICENSE b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/LICENSE new file mode 100644 index 000000000..c7386b3c9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Juan Batiz-Benet + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Makefile b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Makefile new file mode 100644 index 000000000..f15fcaa68 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/Makefile @@ -0,0 +1,15 @@ + +godep: + go get github.com/tools/godep + +vendor: godep + godep save -r ./... + +build: + go build ./... + +test: + go test ./... + +test_race: + go test -race -cpu 5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/README.md b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/README.md new file mode 100644 index 000000000..a0a08869a --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/README.md @@ -0,0 +1,98 @@ +# go-stream-muxer - generalized stream multiplexing + + +go-stream-muxer is a common interface for stream muxers, with common tests. It wraps other stream muxers (like [muxado](https://github.com/inconshreveable/muxado), [spdystream](https://github.com/docker/spdystream) and [yamux](https://github.com/hashicorp/yamux)). + +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) + +> A test suite and interface you can use to implement a stream muxer. + +### Godoc: https://godoc.org/github.com/jbenet/go-stream-muxer + +## Implementations + +* [yamux](yamux) +* [muxado](muxado) +* [multiplex](multiplex) +* [spdystream](spdystream) + +## Badge + +Include this badge in your readme if you make a new module that uses abstract-stream-muxer API. + +![](img/badge.png) + +## Client example + +```go +import ( + "net" + "fmt" + "io" + ymux "github.com/jbenet/go-stream-muxer/yamux" + smux "github.com/jbenet/go-stream-muxer" +) + +func dial() { + nconn, _ := net.Dial("tcp", "localhost:1234") + sconn, _ := ymux.DefaultTransport.NewConn(nconn, false) // false == client + + go sconn.Serve(func(smux.Stream) {}) // no-op + + s1, _ := sconn.OpenStream() + s1.Write([]byte("hello")) + + s2, _ := sconn.OpenStream() + s2.Write([]byte("world")) + + length := 20 + buf2 := make([]byte, length) + fmt.Printf("reading %d bytes from stream (echoed)\n", length) + + s1.Read(buf2) + + fmt.Printf("received %s as a response\n", string(buf2)) + + s3, _ := sconn.OpenStream() + io.Copy(s3, os.Stdin) +} +``` + +## Server example + +```go +import ( + "net" + "fmt" + "io" + ymux "github.com/jbenet/go-stream-muxer/yamux" + smux "github.com/jbenet/go-stream-muxer" +) + +func listen() { + tr := ymux.DefaultTransport + l, _ := net.Listen("tcp", "localhost:1234") + + go func() { + for { + c, _ := l.Accept() + + fmt.Println("accepted connection") + sc, _ := tr.NewConn(c, true) + + go sc.Serve(func(s smux.Stream) { + fmt.Println("serving connection") + echoStream(s) + }) + } + }() +} + +func echoStream(s smux.Stream) { + defer s.Close() + + fmt.Println("accepted stream") + io.Copy(s, s) // echo everything + fmt.Println("closing stream") +} +``` diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/img/badge.png b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/img/badge.png new file mode 100644 index 000000000..738bcf4a2 Binary files /dev/null and b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/img/badge.png differ diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex.go similarity index 53% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex.go index 79721acd5..e3257d7af 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex.go @@ -1,18 +1,20 @@ package peerstream_multiplex import ( + "errors" "net" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" - mp "github.com/whyrusleeping/go-multiplex" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" + mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer. ) +var ErrUseServe = errors.New("not implemented, use Serve") + type conn struct { *mp.Multiplex } -func ( // Conn is a connection to a remote peer. -c *conn) Close() error { +func (c *conn) Close() error { return c.Multiplex.Close() } @@ -21,13 +23,18 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (pst.Stream, error) { +func (c *conn) OpenStream() (smux.Stream, error) { return c.Multiplex.NewStream(), nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + return nil, ErrUseServe +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler -func (c *conn) Serve(handler pst.StreamHandler) { +func (c *conn) Serve(handler smux.StreamHandler) { c.Multiplex.Serve(func(s *mp.Stream) { handler(s) }) @@ -40,6 +47,6 @@ type Transport struct{} // DefaultTransport has default settings for multiplex var DefaultTransport = &Transport{} -func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { return &conn{mp.NewMultiplex(nc, isServer)}, nil } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex_test.go new file mode 100644 index 000000000..de4b691d7 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex/multiplex_test.go @@ -0,0 +1,11 @@ +package peerstream_multiplex + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestMultiplexTransport(t *testing.T) { + test.SubtestAll(t, DefaultTransport) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream.go similarity index 73% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream.go index 175f75c52..fe04c4d19 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream.go @@ -5,26 +5,27 @@ package multistream import ( "net" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" - multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex" - spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream" - yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux" mss "github.com/whyrusleeping/go-multistream" + + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" + multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream" + yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" ) type transport struct { mux *mss.MultistreamMuxer - tpts map[string]pst.Transport + tpts map[string]smux.Transport } -func NewTransport() pst.Transport { +func NewTransport() smux.Transport { mux := mss.NewMultistreamMuxer() mux.AddHandler("/multiplex", nil) mux.AddHandler("/spdystream", nil) mux.AddHandler("/yamux", nil) - tpts := map[string]pst.Transport{ + tpts := map[string]smux.Transport{ "/multiplex": multiplex.DefaultTransport, "/spdystream": spdy.Transport, "/yamux": yamux.DefaultTransport, @@ -36,7 +37,7 @@ func NewTransport() pst.Transport { } } -func (t *transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t *transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { var proto string if isServer { selected, _, err := t.mux.Negotiate(nc) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream_test.go new file mode 100644 index 000000000..bfacf2ed4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream/multistream_test.go @@ -0,0 +1,11 @@ +package multistream + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestMultiStreamTransport(t *testing.T) { + test.SubtestAll(t, NewTransport()) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado.go similarity index 71% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado.go index 9c103ff69..74edbf48c 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/muxado/muxado.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado.go @@ -4,10 +4,10 @@ import ( "net" muxado "github.com/inconshreveable/muxado" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) -// stream implements pst.Stream using a ss.Stream +// stream implements smux.Stream using a ss.Stream type stream struct { ms muxado.Stream } @@ -53,7 +53,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (pst.Stream, error) { +func (c *conn) OpenStream() (smux.Stream, error) { s, err := c.ms.Open() if err != nil { return nil, err @@ -62,15 +62,24 @@ func (c *conn) OpenStream() (pst.Stream, error) { return &stream{ms: s}, nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + s, err := c.ms.Accept() + if err != nil { + return nil, err + } + return &stream{ms: s}, nil +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler -func (c *conn) Serve(handler pst.StreamHandler) { +func (c *conn) Serve(handler smux.StreamHandler) { for { // accept loop - s, err := c.ms.Accept() + s, err := c.AcceptStream() if err != nil { return // err always means closed. } - go handler(&stream{ms: s}) + go handler(s) } } @@ -80,7 +89,7 @@ type transport struct{} // spdystream-backed connections. var Transport = transport{} -func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { var s muxado.Session if isServer { s = muxado.Server(nc) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado_test.go new file mode 100644 index 000000000..91726a9ed --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado/muxado_test.go @@ -0,0 +1,11 @@ +package peerstream_muxado + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestMuxadoTransport(t *testing.T) { + test.SubtestAll(t, Transport) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxer.go similarity index 62% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxer.go index 6bace1855..e9531750a 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/transport.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxer.go @@ -1,4 +1,4 @@ -package peerstream_transport +package streammux import ( "io" @@ -16,6 +16,9 @@ type Stream interface { // (usually those opened by the remote side) type StreamHandler func(Stream) +// NoOpHandler do nothing. close streams as soon as they are opened. +var NoOpHandler = func(s Stream) { s.Close() } + // Conn is a stream-multiplexing connection to a remote peer. type Conn interface { io.Closer @@ -27,12 +30,15 @@ type Conn interface { // OpenStream creates a new stream. OpenStream() (Stream, error) - // Serve starts listening for incoming requests and handles them - // using given StreamHandler + // AcceptStream accepts a stream opened by the other side. + AcceptStream() (Stream, error) + + // Serve starts a loop, accepting incoming requests and calling + // `StreamHandler with them. (Use _instead of_ accept. not both.) Serve(StreamHandler) } -// Transport constructs go-peerstream compatible connections. +// Transport constructs go-stream-muxer compatible connections. type Transport interface { // NewConn constructs a new connection diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream.go similarity index 84% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream.go index 90d0ca6b6..17baf08fa 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream.go @@ -1,14 +1,17 @@ package peerstream_spdystream import ( + "errors" "net" "net/http" ss "github.com/docker/spdystream" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) -// stream implements pst.Stream using a ss.Stream +var ErrUseServe = errors.New("not implemented, use Serve") + +// stream implements smux.Stream using a ss.Stream type stream ss.Stream func (s *stream) spdyStream() *ss.Stream { @@ -63,7 +66,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (pst.Stream, error) { +func (c *conn) OpenStream() (smux.Stream, error) { s, err := c.spdyConn().CreateStream(http.Header{ ":method": []string{"GET"}, // this is here for HTTP/SPDY interop ":path": []string{"/"}, // this is here for HTTP/SPDY interop @@ -78,9 +81,14 @@ func (c *conn) OpenStream() (pst.Stream, error) { return (*stream)(s), nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + return nil, ErrUseServe +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler -func (c *conn) Serve(handler pst.StreamHandler) { +func (c *conn) Serve(handler smux.StreamHandler) { c.spdyConn().Serve(func(s *ss.Stream) { // Flow control and backpressure of Opening streams is broken. @@ -109,7 +117,7 @@ type transport struct{} // spdystream-backed connections. var Transport = transport{} -func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { sc, err := ss.NewConnection(nc, isServer) return &conn{sc: sc, closed: make(chan struct{})}, err } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream_test.go new file mode 100644 index 000000000..97f3d17e4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream/spdystream_test.go @@ -0,0 +1,11 @@ +package peerstream_spdystream + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestSpdyStreamTransport(t *testing.T) { + test.SubtestAll(t, Transport) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test/ttest.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test/ttest.go new file mode 100644 index 000000000..371f6de00 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test/ttest.go @@ -0,0 +1,378 @@ +package sm_test + +import ( + "bytes" + crand "crypto/rand" + "fmt" + "io" + mrand "math/rand" + "net" + "os" + "reflect" + "runtime" + "runtime/debug" + "sync" + "testing" + + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" +) + +var randomness []byte + +func init() { + // read 1MB of randomness + randomness = make([]byte, 1<<20) + if _, err := crand.Read(randomness); err != nil { + panic(err) + } +} + +type Options struct { + tr smux.Transport + connNum int + streamNum int + msgNum int + msgMin int + msgMax int +} + +func randBuf(size int) []byte { + n := len(randomness) - size + if size < 1 { + panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness))) + } + + start := mrand.Intn(n) + return randomness[start : start+size] +} + +func checkErr(t *testing.T, err error) { + if err != nil { + debug.PrintStack() + t.Fatal(err) + } +} + +func log(s string, v ...interface{}) { + if testing.Verbose() { + fmt.Fprintf(os.Stderr, "> "+s+"\n", v...) + } +} + +func echoStream(s smux.Stream) { + defer s.Close() + log("accepted stream") + io.Copy(&LogWriter{s}, s) // echo everything + log("closing stream") +} + +type LogWriter struct { + W io.Writer +} + +func (lw *LogWriter) Write(buf []byte) (int, error) { + if testing.Verbose() { + log("logwriter: writing %d bytes", len(buf)) + } + return lw.W.Write(buf) +} + +func GoServe(t *testing.T, tr smux.Transport, l net.Listener) (done func()) { + closed := make(chan struct{}, 1) + + go func() { + for { + c1, err := l.Accept() + if err != nil { + select { + case <-closed: + return // closed naturally. + default: + checkErr(t, err) + } + } + + log("accepted connection") + sc1, err := tr.NewConn(c1, true) + checkErr(t, err) + go sc1.Serve(echoStream) + } + }() + + return func() { + closed <- struct{}{} + } +} + +func SubtestSimpleWrite(t *testing.T, tr smux.Transport) { + l, err := net.Listen("tcp", "localhost:0") + checkErr(t, err) + log("listening at %s", l.Addr().String()) + done := GoServe(t, tr, l) + defer done() + + log("dialing to %s", l.Addr().String()) + nc1, err := net.Dial("tcp", l.Addr().String()) + checkErr(t, err) + defer nc1.Close() + + log("wrapping conn") + c1, err := tr.NewConn(nc1, false) + checkErr(t, err) + defer c1.Close() + + // serve the outgoing conn, because some muxers assume + // that we _always_ call serve. (this is an error?) + go c1.Serve(smux.NoOpHandler) + + log("creating stream") + s1, err := c1.OpenStream() + checkErr(t, err) + defer s1.Close() + + buf1 := randBuf(4096) + log("writing %d bytes to stream", len(buf1)) + _, err = s1.Write(buf1) + checkErr(t, err) + + buf2 := make([]byte, len(buf1)) + log("reading %d bytes from stream (echoed)", len(buf2)) + _, err = s1.Read(buf2) + checkErr(t, err) + + if string(buf2) != string(buf1) { + t.Error("buf1 and buf2 not equal: %s != %s", string(buf1), string(buf2)) + } + log("done") +} + +func SubtestStress(t *testing.T, opt Options) { + + msgsize := 1 << 11 + errs := make(chan error, 0) // dont block anything. + + rateLimitN := 5000 // max of 5k funcs, because -race has 8k max. + rateLimitChan := make(chan struct{}, rateLimitN) + for i := 0; i < rateLimitN; i++ { + rateLimitChan <- struct{}{} + } + + rateLimit := func(f func()) { + <-rateLimitChan + f() + rateLimitChan <- struct{}{} + } + + writeStream := func(s smux.Stream, bufs chan<- []byte) { + log("writeStream %p, %d msgNum", s, opt.msgNum) + + for i := 0; i < opt.msgNum; i++ { + buf := randBuf(msgsize) + bufs <- buf + log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3]) + if _, err := s.Write(buf); err != nil { + errs <- fmt.Errorf("s.Write(buf): %s", err) + continue + } + } + } + + readStream := func(s smux.Stream, bufs <-chan []byte) { + log("readStream %p, %d msgNum", s, opt.msgNum) + + buf2 := make([]byte, msgsize) + i := 0 + for buf1 := range bufs { + i++ + log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3]) + + if _, err := io.ReadFull(s, buf2); err != nil { + errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) + log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3]) + continue + } + if !bytes.Equal(buf1, buf2) { + errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]) + } + } + } + + openStreamAndRW := func(c smux.Conn) { + log("openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum) + + s, err := c.OpenStream() + if err != nil { + errs <- fmt.Errorf("Failed to create NewStream: %s", err) + return + } + + bufs := make(chan []byte, opt.msgNum) + go func() { + writeStream(s, bufs) + close(bufs) + }() + + readStream(s, bufs) + s.Close() + } + + openConnAndRW := func() { + log("openConnAndRW") + + l, err := net.Listen("tcp", "localhost:0") + checkErr(t, err) + done := GoServe(t, opt.tr, l) + defer done() + + nla := l.Addr() + nc, err := net.Dial(nla.Network(), nla.String()) + checkErr(t, err) + if err != nil { + t.Fatal(fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err)) + return + } + + c, err := opt.tr.NewConn(nc, false) + if err != nil { + t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)) + return + } + + // serve the outgoing conn, because some muxers assume + // that we _always_ call serve. (this is an error?) + go c.Serve(func(s smux.Stream) { + log("serving connection") + echoStream(s) + s.Close() + }) + + var wg sync.WaitGroup + for i := 0; i < opt.streamNum; i++ { + wg.Add(1) + go rateLimit(func() { + defer wg.Done() + openStreamAndRW(c) + }) + } + wg.Wait() + c.Close() + } + + openConnsAndRW := func() { + log("openConnsAndRW, %d conns", opt.connNum) + + var wg sync.WaitGroup + for i := 0; i < opt.connNum; i++ { + wg.Add(1) + go rateLimit(func() { + defer wg.Done() + openConnAndRW() + }) + } + wg.Wait() + } + + go func() { + openConnsAndRW() + close(errs) // done + }() + + for err := range errs { + t.Error(err) + } + +} + +func SubtestStress1Conn1Stream1Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 1, + msgNum: 1, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn1Stream100Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 1, + msgNum: 100, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn100Stream100Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 100, + msgNum: 100, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress50Conn10Stream50Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 50, + streamNum: 10, + msgNum: 50, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn1000Stream10Msg(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 1000, + msgNum: 10, + msgMax: 100, + msgMin: 100, + }) +} + +func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, tr smux.Transport) { + SubtestStress(t, Options{ + tr: tr, + connNum: 1, + streamNum: 100, + msgNum: 100, + msgMax: 10000, + msgMin: 1000, + }) +} + +func SubtestAll(t *testing.T, tr smux.Transport) { + + tests := []TransportTest{ + SubtestSimpleWrite, + SubtestStress1Conn1Stream1Msg, + SubtestStress1Conn1Stream100Msg, + SubtestStress1Conn100Stream100Msg, + SubtestStress50Conn10Stream50Msg, + SubtestStress1Conn1000Stream10Msg, + SubtestStress1Conn100Stream100Msg10MB, + } + + for _, f := range tests { + if testing.Verbose() { + fmt.Fprintf(os.Stderr, "==== RUN %s\n", GetFunctionName(f)) + } + f(t, tr) + } +} + +type TransportTest func(t *testing.T, tr smux.Transport) + +func TestNoOp(t *testing.T) {} + +func GetFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux.go similarity index 76% rename from Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go rename to Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux.go index 210cd521d..29884e2e2 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux/yamux.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux.go @@ -1,4 +1,4 @@ -package peerstream_yamux +package sm_yamux import ( "io/ioutil" @@ -6,10 +6,10 @@ import ( "time" yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/yamux" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" ) -// stream implements pst.Stream using a ss.Stream +// stream implements smux.Stream using a ss.Stream type stream yamux.Stream func (s *stream) yamuxStream() *yamux.Stream { @@ -44,7 +44,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (pst.Stream, error) { +func (c *conn) OpenStream() (smux.Stream, error) { s, err := c.yamuxSession().OpenStream() if err != nil { return nil, err @@ -53,15 +53,21 @@ func (c *conn) OpenStream() (pst.Stream, error) { return (*stream)(s), nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + s, err := c.yamuxSession().AcceptStream() + return (*stream)(s), err +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler -func (c *conn) Serve(handler pst.StreamHandler) { +func (c *conn) Serve(handler smux.StreamHandler) { for { // accept loop - s, err := c.yamuxSession().AcceptStream() + s, err := c.AcceptStream() if err != nil { return // err always means closed. } - go handler((*stream)(s)) + go handler(s) } } @@ -78,7 +84,7 @@ var DefaultTransport = (*Transport)(&yamux.Config{ LogOutput: ioutil.Discard, }) -func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { +func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { var s *yamux.Session var err error if isServer { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux_test.go b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux_test.go new file mode 100644 index 000000000..9e699bf4a --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux/yamux_test.go @@ -0,0 +1,11 @@ +package sm_yamux + +import ( + "testing" + + test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test" +) + +func TestYamuxTransport(t *testing.T) { + test.SubtestAll(t, DefaultTransport) +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gitignore b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gitignore new file mode 100644 index 000000000..1377554eb --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gitignore @@ -0,0 +1 @@ +*.swp diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gxlastpubver b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gxlastpubver new file mode 100644 index 000000000..b91cdc721 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/.gxlastpubver @@ -0,0 +1 @@ +QmYmW76Y7NxvWwW3GPUmpbHd4mm1iW14f5UimeJ9UoofEs \ No newline at end of file diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/README.md b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/README.md new file mode 100644 index 000000000..8b9cd9504 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/README.md @@ -0,0 +1,18 @@ +# go-multiplex + +A super simple stream muxing library compatible with [multiplex](http://github.com/maxogden/multiplex) + +## Usage + +```go +mplex := multiplex.NewMultiplex(mysocket) + +s := mplex.NewStream() +s.Write([]byte("Hello World!") +s.Close() + +mplex.Serve(func(s *multiplex.Stream) { + // echo back everything received + io.Copy(s, s) +}) +``` diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex.go b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex.go new file mode 100644 index 000000000..01b2dc1c5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex.go @@ -0,0 +1,393 @@ +package multiplex + +import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" + "io/ioutil" + "sync" +) + +const ( + NewStream = iota + Receiver + Initiator + Unknown + Close +) + +var _ = ioutil.ReadAll +var _ = bufio.NewReadWriter +var _ = binary.MaxVarintLen16 + +type msg struct { + header uint64 + data []byte + err chan<- error +} + +type Stream struct { + id uint64 + name string + header uint64 + closed chan struct{} + data_in chan []byte + data_out chan<- msg + extra []byte +} + +func newStream(id uint64, name string, initiator bool, send chan<- msg) *Stream { + var hfn uint64 + if initiator { + hfn = 2 + } else { + hfn = 1 + } + return &Stream{ + id: id, + name: name, + header: (id << 3) | hfn, + data_in: make(chan []byte, 8), + data_out: send, + closed: make(chan struct{}), + } +} + +func (s *Stream) Name() string { + return s.name +} + +func (s *Stream) receive(b []byte) { + select { + case s.data_in <- b: + case <-s.closed: + } +} + +func (m *Multiplex) Accept() (*Stream, error) { + select { + case s, ok := <-m.nstreams: + if !ok { + return nil, errors.New("multiplex closed") + } + return s, nil + case err := <-m.errs: + return nil, err + case <-m.closed: + return nil, errors.New("multiplex closed") + } +} + +func (s *Stream) Read(b []byte) (int, error) { + if s.extra == nil { + select { + case <-s.closed: + return 0, io.EOF + case read, ok := <-s.data_in: + if !ok { + return 0, io.EOF + } + s.extra = read + } + } + n := copy(b, s.extra) + if n < len(s.extra) { + s.extra = s.extra[n:] + } else { + s.extra = nil + } + return n, nil +} + +func (s *Stream) Write(b []byte) (int, error) { + errs := make(chan error, 1) + select { + case s.data_out <- msg{header: s.header, data: b, err: errs}: + select { + case err := <-errs: + return len(b), err + case <-s.closed: + return 0, errors.New("stream closed") + } + + case <-s.closed: + return 0, errors.New("stream closed") + } +} + +func (s *Stream) Close() error { + select { + case <-s.closed: + return nil + default: + close(s.closed) + select { + case s.data_out <- msg{ + header: (s.id << 3) | Close, + err: make(chan error, 1), //throw away error, whatever + }: + default: + } + close(s.data_in) + return nil + } +} + +type Multiplex struct { + con io.ReadWriteCloser + buf *bufio.Reader + nextID uint64 + outchan chan msg + closed chan struct{} + initiator bool + + nstreams chan *Stream + errs chan error + + channels map[uint64]*Stream + ch_lock sync.Mutex +} + +func NewMultiplex(con io.ReadWriteCloser, initiator bool) *Multiplex { + mp := &Multiplex{ + con: con, + initiator: initiator, + buf: bufio.NewReader(con), + channels: make(map[uint64]*Stream), + outchan: make(chan msg), + closed: make(chan struct{}), + nstreams: make(chan *Stream, 16), + errs: make(chan error), + } + + go mp.handleOutgoing() + go mp.handleIncoming() + + return mp +} + +func (mp *Multiplex) Close() error { + if mp.IsClosed() { + return nil + } + close(mp.closed) + mp.ch_lock.Lock() + defer mp.ch_lock.Unlock() + for _, s := range mp.channels { + err := s.Close() + if err != nil { + return err + } + } + return nil +} + +func (mp *Multiplex) IsClosed() bool { + select { + case <-mp.closed: + return true + default: + return false + } +} + +func (mp *Multiplex) handleOutgoing() { + for { + select { + case msg, ok := <-mp.outchan: + if !ok { + return + } + + buf := EncodeVarint(msg.header) + _, err := mp.con.Write(buf) + if err != nil { + msg.err <- err + continue + } + + buf = EncodeVarint(uint64(len(msg.data))) + _, err = mp.con.Write(buf) + if err != nil { + msg.err <- err + continue + } + + _, err = mp.con.Write(msg.data) + if err != nil { + msg.err <- err + continue + } + + msg.err <- nil + case <-mp.closed: + return + } + } +} + +func (mp *Multiplex) nextChanID() (out uint64) { + if mp.initiator { + out = mp.nextID + 1 + } else { + out = mp.nextID + } + mp.nextID += 2 + return +} + +func (mp *Multiplex) NewStream() *Stream { + return mp.NewNamedStream("") +} + +func (mp *Multiplex) NewNamedStream(name string) *Stream { + mp.ch_lock.Lock() + sid := mp.nextChanID() + header := (sid << 3) | NewStream + + if name == "" { + name = fmt.Sprint(sid) + } + s := newStream(sid, name, true, mp.outchan) + mp.channels[sid] = s + mp.ch_lock.Unlock() + + mp.outchan <- msg{ + header: header, + data: []byte(name), + err: make(chan error, 1), //throw away error + } + + return s +} + +func (mp *Multiplex) sendErr(err error) { + select { + case mp.errs <- err: + case <-mp.closed: + } +} + +func (mp *Multiplex) handleIncoming() { + defer mp.shutdown() + for { + ch, tag, err := mp.readNextHeader() + if err != nil { + mp.sendErr(err) + return + } + + b, err := mp.readNext() + if err != nil { + mp.sendErr(err) + return + } + + mp.ch_lock.Lock() + msch, ok := mp.channels[ch] + if !ok { + var name string + if tag == NewStream { + name = string(b) + } + msch = newStream(ch, name, false, mp.outchan) + mp.channels[ch] = msch + select { + case mp.nstreams <- msch: + case <-mp.closed: + return + } + if tag == NewStream { + mp.ch_lock.Unlock() + continue + } + } + mp.ch_lock.Unlock() + + if tag == Close { + msch.Close() + mp.ch_lock.Lock() + delete(mp.channels, ch) + mp.ch_lock.Unlock() + continue + } + + msch.receive(b) + } +} + +func (mp *Multiplex) shutdown() { + mp.ch_lock.Lock() + defer mp.ch_lock.Unlock() + for _, s := range mp.channels { + s.Close() + } +} + +func (mp *Multiplex) readNextHeader() (uint64, uint64, error) { + h, _, err := DecodeVarint(mp.buf) + if err != nil { + return 0, 0, err + } + + // get channel ID + ch := h >> 3 + + rem := h & 7 + + return ch, rem, nil +} + +func (mp *Multiplex) readNext() ([]byte, error) { + // get length + l, _, err := DecodeVarint(mp.buf) + if err != nil { + return nil, err + } + + buf := make([]byte, l) + n, err := io.ReadFull(mp.buf, buf) + if err != nil { + return nil, err + } + + if n != int(l) { + panic("NOT THE SAME") + } + + return buf, nil +} + +func EncodeVarint(x uint64) []byte { + var buf [10]byte + var n int + for n = 0; x > 127; n++ { + buf[n] = 0x80 | uint8(x&0x7F) + x >>= 7 + } + buf[n] = uint8(x) + n++ + return buf[0:n] +} + +func DecodeVarint(r *bufio.Reader) (x uint64, n int, err error) { + // x, n already 0 + for shift := uint(0); shift < 64; shift += 7 { + val, err := r.ReadByte() + if err != nil { + return 0, 0, err + } + + b := uint64(val) + n++ + x |= (b & 0x7F) << shift + if (b & 0x80) == 0 { + return x, n, nil + } + } + + // The number is too large to represent in a 64-bit value. + return 0, 0, errors.New("Too large of a number!") +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex_test.go b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex_test.go new file mode 100644 index 000000000..5d713effc --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/multiplex_test.go @@ -0,0 +1,118 @@ +package multiplex + +import ( + "fmt" + "io" + "net" + "testing" + + rand "github.com/dustin/randbo" +) + +func TestBasicStreams(t *testing.T) { + a, b := net.Pipe() + + mpa := NewMultiplex(a, false) + mpb := NewMultiplex(b, true) + + mes := []byte("Hello world") + go func() { + s, err := mpb.Accept() + if err != nil { + t.Fatal(err) + } + + _, err = s.Write(mes) + if err != nil { + t.Fatal(err) + } + + err = s.Close() + if err != nil { + t.Fatal(err) + } + }() + + s := mpa.NewStream() + + buf := make([]byte, len(mes)) + n, err := s.Read(buf) + if err != nil { + t.Fatal(err) + } + + if n != len(mes) { + t.Fatal("read wrong amount") + } + + if string(buf) != string(mes) { + t.Fatal("got bad data") + } + + s.Close() + + mpa.Close() + mpb.Close() +} + +func TestEcho(t *testing.T) { + a, b := net.Pipe() + + mpa := NewMultiplex(a, false) + mpb := NewMultiplex(b, true) + + mes := make([]byte, 40960) + rand.New().Read(mes) + go func() { + s, err := mpb.Accept() + if err != nil { + t.Fatal(err) + } + + defer s.Close() + io.Copy(s, s) + }() + + s := mpa.NewStream() + + _, err := s.Write(mes) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, len(mes)) + n, err := io.ReadFull(s, buf) + if err != nil { + t.Fatal(err) + } + + if n != len(mes) { + t.Fatal("read wrong amount") + } + + if err := arrComp(buf, mes); err != nil { + t.Fatal(err) + } + s.Close() + + mpa.Close() + mpb.Close() +} + +func arrComp(a, b []byte) error { + msg := "" + if len(a) != len(b) { + msg += fmt.Sprintf("arrays differ in length: %d %d\n", len(a), len(b)) + } + + for i := 0; i < len(a) && i < len(b); i++ { + if a[i] != b[i] { + msg += fmt.Sprintf("content differs at index %d [%d != %d]", i, a[i], b[i]) + return fmt.Errorf(msg) + } + } + if len(msg) > 0 { + return fmt.Errorf(msg) + } + return nil +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/package.json b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/package.json new file mode 100644 index 000000000..9b0e7000a --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multiplex/package.json @@ -0,0 +1,5 @@ +{ + "name": "go-multiplex", + "version": "1.0.0", + "language": "go" +} \ No newline at end of file diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 9fe112c7c..e919591ff 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -16,8 +16,8 @@ import ( ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" - pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" - psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux" + pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer" + psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" prom "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"