Merge pull request #1473 from ipfs/godep-peerstream

update go-peerstream to newest version
This commit is contained in:
Juan Batiz-Benet 2015-07-14 21:29:38 -07:00
commit cdb18d2833
42 changed files with 1352 additions and 181 deletions

6
Godeps/Godeps.json generated
View File

@ -180,7 +180,7 @@
},
{
"ImportPath": "github.com/jbenet/go-peerstream",
"Rev": "62fe5ede12f9d9cd9406750160122525b3d6b694"
"Rev": "214e6057e1c0742c0b5f9bb3060e88dea32a4380"
},
{
"ImportPath": "github.com/jbenet/go-random",
@ -194,6 +194,10 @@
"ImportPath": "github.com/jbenet/go-sockaddr/net",
"Rev": "da304f94eea1af8ba8d1faf184623e1f9d9777dc"
},
{
"ImportPath": "github.com/jbenet/go-stream-muxer",
"Rev": "4a97500beeb081571128d41d539787e137f18404"
},
{
"ImportPath": "github.com/jbenet/go-temp-err-catcher",
"Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff"

View File

@ -7,5 +7,5 @@ go:
- tip
script:
- go test ./...
- go test -v ./...
# - go test -race -cpu=5 ./...

View File

@ -17,13 +17,17 @@
"ImportPath": "github.com/inconshreveable/muxado",
"Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"ImportPath": "github.com/jbenet/go-stream-muxer",
"Rev": "e2e261765847234749629e0190fef193a4548303"
},
{
"ImportPath": "github.com/jbenet/go-temp-err-catcher",
"Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff"
},
{
"ImportPath": "github.com/whyrusleeping/go-multiplex",
"Rev": "ce5baa716247510379cb7640a14da857afd3b622"
"Rev": "474b9aebeb391746f304ddf7c764a5da12319857"
},
{
"ImportPath": "github.com/whyrusleeping/go-multistream",

View File

@ -6,7 +6,7 @@ import (
"net"
"sync"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
)
// ConnHandler is a function which receives a Conn. It allows
@ -36,8 +36,8 @@ var ErrNoConnections = errors.New("no connections")
// Conn is a Swarm-associated connection.
type Conn struct {
pstConn pst.Conn
netConn net.Conn // underlying connection
smuxConn smux.Conn
netConn net.Conn // underlying connection
swarm *Swarm
groups groupSet
@ -46,13 +46,13 @@ type Conn struct {
streamLock sync.RWMutex
}
func newConn(nconn net.Conn, tconn pst.Conn, s *Swarm) *Conn {
func newConn(nconn net.Conn, tconn smux.Conn, s *Swarm) *Conn {
return &Conn{
netConn: nconn,
pstConn: tconn,
swarm: s,
groups: groupSet{m: make(map[Group]struct{})},
streams: make(map[*Stream]struct{}),
netConn: nconn,
smuxConn: tconn,
swarm: s,
groups: groupSet{m: make(map[Group]struct{})},
streams: make(map[*Stream]struct{}),
}
}
@ -77,8 +77,8 @@ func (c *Conn) NetConn() net.Conn {
// Conn returns the underlying transport Connection we use
// Warning: modifying this object is undefined.
func (c *Conn) Conn() pst.Conn {
return c.pstConn
func (c *Conn) Conn() smux.Conn {
return c.smuxConn
}
// Groups returns the Groups this Conn belongs to
@ -122,7 +122,7 @@ func (c *Conn) Close() error {
// close underlying connection
c.swarm.removeConn(c)
err := c.pstConn.Close()
err := c.smuxConn.Close()
c.swarm.notifyAll(func(n Notifiee) {
n.Disconnected(c)
})
@ -166,7 +166,7 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) {
s.ConnHandler()(c)
// go listen for incoming streams on this connection
go c.pstConn.Serve(func(ss pst.Stream) {
go c.smuxConn.Serve(func(ss smux.Stream) {
// log.Printf("accepted stream %d from %s\n", ssS.Identifier(), netConn.RemoteAddr())
stream := s.setupStream(ss, c)
s.StreamHandler()(stream) // call our handler
@ -225,21 +225,21 @@ func (s *Swarm) setupConn(netConn net.Conn, isServer bool) (*Conn, error) {
// all validation has happened.
func (s *Swarm) createStream(c *Conn) (*Stream, error) {
// Create a new pst.Stream
pstStream, err := c.pstConn.OpenStream()
// Create a new smux.Stream
smuxStream, err := c.smuxConn.OpenStream()
if err != nil {
return nil, err
}
return s.setupStream(pstStream, c), nil
return s.setupStream(smuxStream, c), nil
}
// newStream is the internal function that creates a new stream. assumes
// all validation has happened.
func (s *Swarm) setupStream(pstStream pst.Stream, c *Conn) *Stream {
func (s *Swarm) setupStream(smuxStream smux.Stream, c *Conn) *Stream {
// create a new stream
stream := newStream(pstStream, c)
stream := newStream(smuxStream, c)
// add it to our streams maps
s.streamLock.Lock()
@ -265,7 +265,7 @@ func (s *Swarm) removeStream(stream *Stream) error {
s.streamLock.Unlock()
stream.conn.streamLock.Unlock()
err := stream.pstStream.Close()
err := stream.smuxStream.Close()
s.notifyAll(func(n Notifiee) {
n.ClosedStream(stream)
})

View File

@ -8,7 +8,7 @@ import (
"time"
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
pstss "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream"
spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream"
)
func die(err error) {
@ -18,7 +18,7 @@ func die(err error) {
func main() {
// create a new Swarm
swarm := ps.NewSwarm(pstss.Transport)
swarm := ps.NewSwarm(spdy.Transport)
defer swarm.Close()
// tell swarm what to do with a new incoming streams.

View File

@ -7,13 +7,13 @@ import (
"os"
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
pstss "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream"
spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream"
)
func main() {
log("creating a new swarm with spdystream transport") // create a new Swarm
swarm := ps.NewSwarm(pstss.Transport)
swarm := ps.NewSwarm(spdy.Transport)
defer swarm.Close()
// tell swarm what to do with a new incoming streams.

View File

@ -0,0 +1,15 @@
package muxtest
import (
multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex"
multistream "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream"
muxado "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado"
spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream"
yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux"
)
var _ = multiplex.DefaultTransport
var _ = multistream.NewTransport
var _ = muxado.Transport
var _ = spdy.Transport
var _ = yamux.DefaultTransport

View File

@ -0,0 +1,31 @@
package muxtest
import (
"testing"
multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex"
multistream "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multistream"
muxado "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/muxado"
spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream"
yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux"
)
func TestYamuxTransport(t *testing.T) {
SubtestAll(t, yamux.DefaultTransport)
}
func TestSpdyStreamTransport(t *testing.T) {
SubtestAll(t, spdy.Transport)
}
func TestMultiplexTransport(t *testing.T) {
SubtestAll(t, multiplex.DefaultTransport)
}
func TestMuxadoTransport(t *testing.T) {
SubtestAll(t, muxado.Transport)
}
func TestMultistreamTransport(t *testing.T) {
SubtestAll(t, multistream.NewTransport())
}

View File

@ -1,4 +1,4 @@
package peerstream_transport_test
package muxtest
import (
"bytes"
@ -14,11 +14,12 @@ import (
"testing"
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
)
var randomness []byte
var nextPort = 20000
var verbose = false
func init() {
// read 1MB of randomness
@ -45,7 +46,7 @@ func checkErr(t *testing.T, err error) {
}
func log(s string, v ...interface{}) {
if testing.Verbose() {
if verbose {
fmt.Fprintf(os.Stderr, "> "+s+"\n", v...)
}
}
@ -55,7 +56,7 @@ type echoSetup struct {
conns []*ps.Conn
}
func singleConn(t *testing.T, tr pst.Transport) echoSetup {
func singleConn(t *testing.T, tr smux.Transport) echoSetup {
swarm := ps.NewSwarm(tr)
swarm.SetStreamHandler(func(s *ps.Stream) {
defer s.Close()
@ -84,7 +85,7 @@ func singleConn(t *testing.T, tr pst.Transport) echoSetup {
}
}
func makeSwarm(t *testing.T, tr pst.Transport, nListeners int) *ps.Swarm {
func makeSwarm(t *testing.T, tr smux.Transport, nListeners int) *ps.Swarm {
swarm := ps.NewSwarm(tr)
swarm.SetStreamHandler(func(s *ps.Stream) {
defer s.Close()
@ -104,7 +105,7 @@ func makeSwarm(t *testing.T, tr pst.Transport, nListeners int) *ps.Swarm {
return swarm
}
func makeSwarms(t *testing.T, tr pst.Transport, nSwarms, nListeners int) []*ps.Swarm {
func makeSwarms(t *testing.T, tr smux.Transport, nSwarms, nListeners int) []*ps.Swarm {
swarms := make([]*ps.Swarm, nSwarms)
for i := 0; i < nSwarms; i++ {
swarms[i] = makeSwarm(t, tr, nListeners)
@ -112,11 +113,11 @@ func makeSwarms(t *testing.T, tr pst.Transport, nSwarms, nListeners int) []*ps.S
return swarms
}
func SubtestConstructSwarm(t *testing.T, tr pst.Transport) {
func SubtestConstructSwarm(t *testing.T, tr smux.Transport) {
ps.NewSwarm(tr)
}
func SubtestSimpleWrite(t *testing.T, tr pst.Transport) {
func SubtestSimpleWrite(t *testing.T, tr smux.Transport) {
swarm := ps.NewSwarm(tr)
defer swarm.Close()
@ -171,18 +172,18 @@ func SubtestSimpleWrite(t *testing.T, tr pst.Transport) {
}
}
func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) {
func SubtestSimpleWrite100msgs(t *testing.T, tr smux.Transport) {
msgs := 100
msgsize := 1 << 19
es := singleConn(t, tr)
defer es.swarm.Close()
log("creating stream")
stream, err := es.conns[0].NewStream()
checkErr(t, err)
bufs := make(chan []byte, msgs)
errs := make(chan error, msgs*100)
var wg sync.WaitGroup
wg.Add(1)
@ -194,7 +195,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) {
bufs <- buf
log("writing %d bytes (message %d/%d #%x)", len(buf), i, msgs, buf[:3])
if _, err := stream.Write(buf); err != nil {
errs <- fmt.Errorf("stream.Write(buf): %s", err)
t.Error(fmt.Errorf("stream.Write(buf): %s", err))
continue
}
}
@ -212,26 +213,21 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) {
i++
if _, err := io.ReadFull(stream, buf2); err != nil {
errs <- fmt.Errorf("readFull(stream, buf2): %s", err)
t.Error(fmt.Errorf("readFull(stream, buf2): %s", err))
continue
}
if !bytes.Equal(buf1, buf2) {
errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
t.Error(fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]))
}
}
}()
wg.Wait()
close(errs)
for err := range errs {
t.Error(err)
}
}
func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, nConn, nStream, nMsg int) {
func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr smux.Transport, nSwarm, nConn, nStream, nMsg int) {
msgsize := 1 << 11
errs := make(chan error, nSwarm*nConn*nStream*nMsg*100) // dont block anything.
rateLimitN := 5000
rateLimitChan := make(chan struct{}, rateLimitN) // max of 5k funcs.
@ -253,7 +249,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
bufs <- buf
log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3])
if _, err := s.Write(buf); err != nil {
errs <- fmt.Errorf("s.Write(buf): %s", err)
t.Error(fmt.Errorf("s.Write(buf): %s", err))
continue
}
}
@ -269,12 +265,12 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
if _, err := io.ReadFull(s, buf2); err != nil {
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3])
t.Error(fmt.Errorf("io.ReadFull(s, buf2): %s", err))
continue
}
if !bytes.Equal(buf1, buf2) {
errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
t.Error(fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3]))
}
}
}
@ -284,7 +280,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
s, err := c.NewStream()
if err != nil {
errs <- fmt.Errorf("Failed to create NewStream: %s", err)
t.Error(fmt.Errorf("Failed to create NewStream: %s", err))
return
}
@ -308,13 +304,13 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
nc, err := net.Dial(nla.Network(), nla.String())
if err != nil {
errs <- fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err)
t.Fatal(fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err))
return
}
c, err := a.AddConn(nc)
if err != nil {
errs <- fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err)
t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err))
return
}
@ -363,47 +359,38 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm,
}
swarms := makeSwarms(t, tr, nSwarm, 3) // 3 listeners per swarm.
go func() {
connectSwarmsAndRW(swarms)
close(errs) // done
}()
for err := range errs {
t.Error(err)
}
connectSwarmsAndRW(swarms)
for _, s := range swarms {
s.Close()
}
}
func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr pst.Transport) {
func SubtestStress1Swarm1Conn1Stream1Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 1)
}
func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, tr pst.Transport) {
func SubtestStress1Swarm1Conn1Stream100Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 1, 100)
}
func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, tr pst.Transport) {
func SubtestStress1Swarm1Conn100Stream100Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 1, 100, 100)
}
func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, tr pst.Transport) {
func SubtestStress1Swarm10Conn50Stream50Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 1, 10, 50, 50)
}
func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, tr pst.Transport) {
func SubtestStress5Swarm2Conn20Stream20Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 5, 2, 20, 20)
}
func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, tr pst.Transport) {
func SubtestStress10Swarm2Conn100Stream100Msg(t *testing.T, tr smux.Transport) {
SubtestStressNSwarmNConnNStreamNMsg(t, tr, 10, 2, 100, 100)
}
func SubtestAll(t *testing.T, tr pst.Transport) {
func SubtestAll(t *testing.T, tr smux.Transport) {
tests := []TransportTest{
SubtestConstructSwarm,
@ -425,7 +412,7 @@ func SubtestAll(t *testing.T, tr pst.Transport) {
}
}
type TransportTest func(t *testing.T, tr pst.Transport)
type TransportTest func(t *testing.T, tr smux.Transport)
func TestNoOp(t *testing.T) {}

View File

@ -3,7 +3,7 @@ package peerstream
import (
"fmt"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
)
// StreamHandler is a function which receives a Stream. It
@ -17,17 +17,17 @@ type StreamHandler func(s *Stream)
// Stream is an io.{Read,Write,Close}r to a remote counterpart.
// It wraps a spdystream.Stream, and links it to a Conn and groups
type Stream struct {
pstStream pst.Stream
smuxStream smux.Stream
conn *Conn
groups groupSet
}
func newStream(ss pst.Stream, c *Conn) *Stream {
func newStream(ss smux.Stream, c *Conn) *Stream {
s := &Stream{
conn: c,
pstStream: ss,
groups: groupSet{m: make(map[Group]struct{})},
conn: c,
smuxStream: ss,
groups: groupSet{m: make(map[Group]struct{})},
}
s.groups.AddSet(&c.groups) // inherit groups
return s
@ -40,8 +40,8 @@ func (s *Stream) String() string {
}
// SPDYStream returns the underlying *spdystream.Stream
func (s *Stream) Stream() pst.Stream {
return s.pstStream
func (s *Stream) Stream() smux.Stream {
return s.smuxStream
}
// Conn returns the Conn associated with this Stream
@ -70,11 +70,11 @@ func (s *Stream) AddGroup(g Group) {
}
func (s *Stream) Read(p []byte) (n int, err error) {
return s.pstStream.Read(p)
return s.smuxStream.Read(p)
}
func (s *Stream) Write(p []byte) (n int, err error) {
return s.pstStream.Write(p)
return s.smuxStream.Write(p)
}
func (s *Stream) Close() error {

View File

@ -7,7 +7,7 @@ import (
"sync"
"time"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
)
// fd is a (file) descriptor, unix style
@ -18,7 +18,7 @@ var GarbageCollectTimeout = 5 * time.Second
type Swarm struct {
// the transport we'll use.
transport pst.Transport
transport smux.Transport
// active streams.
streams map[*Stream]struct{}
@ -46,7 +46,7 @@ type Swarm struct {
closed chan struct{}
}
func NewSwarm(t pst.Transport) *Swarm {
func NewSwarm(t smux.Transport) *Swarm {
s := &Swarm{
transport: t,
streams: make(map[*Stream]struct{}),
@ -183,7 +183,7 @@ func (s *Swarm) Conns() []*Conn {
open := make([]*Conn, 0, len(conns))
for _, c := range conns {
if c.pstConn.IsClosed() {
if c.smuxConn.IsClosed() {
c.Close()
} else {
open = append(open, c)
@ -292,7 +292,7 @@ func (s *Swarm) NewStreamWithConn(conn *Conn) (*Stream, error) {
return nil, errors.New("connection not associated with swarm")
}
if conn.pstConn.IsClosed() {
if conn.smuxConn.IsClosed() {
go conn.Close()
return nil, errors.New("conn is closed")
}
@ -360,7 +360,7 @@ func (s *Swarm) connGarbageCollect() {
}
for _, c := range s.Conns() {
if c.pstConn.IsClosed() {
if c.smuxConn.IsClosed() {
go c.Close()
}
}

View File

@ -1,11 +0,0 @@
package peerstream_multiplex
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestMultiplexTransport(t *testing.T) {
psttest.SubtestAll(t, DefaultTransport)
}

View File

@ -1,11 +0,0 @@
package multistream
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestMultiStreamTransport(t *testing.T) {
psttest.SubtestAll(t, NewTransport())
}

View File

@ -1,11 +0,0 @@
package peerstream_muxado
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestMuxadoTransport(t *testing.T) {
psttest.SubtestAll(t, Transport)
}

View File

@ -1,11 +0,0 @@
package peerstream_spdystream
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestSpdyStreamTransport(t *testing.T) {
psttest.SubtestAll(t, Transport)
}

View File

@ -1,11 +0,0 @@
package peerstream_yamux
import (
"testing"
psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test"
)
func TestYamuxTransport(t *testing.T) {
psttest.SubtestAll(t, DefaultTransport)
}

View File

@ -0,0 +1,11 @@
language: go
go:
- 1.3
- 1.4
- release
- tip
script:
- go test ./...
# - go test -race -cpu=5 ./...

View File

@ -0,0 +1,29 @@
{
"ImportPath": "github.com/jbenet/go-stream-muxer",
"GoVersion": "go1.4.2",
"Packages": [
"./..."
],
"Deps": [
{
"ImportPath": "github.com/docker/spdystream",
"Rev": "b2c3287865f3ad6aa22821ddb7b4692b896ac207"
},
{
"ImportPath": "github.com/hashicorp/yamux",
"Rev": "b2e55852ddaf823a85c67f798080eb7d08acd71d"
},
{
"ImportPath": "github.com/inconshreveable/muxado",
"Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848"
},
{
"ImportPath": "github.com/whyrusleeping/go-multiplex",
"Rev": "ce5baa716247510379cb7640a14da857afd3b622"
},
{
"ImportPath": "github.com/whyrusleeping/go-multistream",
"Rev": "08e8f9c9f5665ed0c63ffde4fa5ef1d5fb3d516d"
}
]
}

View File

@ -0,0 +1,5 @@
This directory tree is generated automatically by godep.
Please do not edit.
See https://github.com/tools/godep for more information.

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,15 @@
godep:
go get github.com/tools/godep
vendor: godep
godep save -r ./...
build:
go build ./...
test:
go test ./...
test_race:
go test -race -cpu 5 ./...

View File

@ -0,0 +1,98 @@
# go-stream-muxer - generalized stream multiplexing
go-stream-muxer is a common interface for stream muxers, with common tests. It wraps other stream muxers (like [muxado](https://github.com/inconshreveable/muxado), [spdystream](https://github.com/docker/spdystream) and [yamux](https://github.com/hashicorp/yamux)).
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
> A test suite and interface you can use to implement a stream muxer.
### Godoc: https://godoc.org/github.com/jbenet/go-stream-muxer
## Implementations
* [yamux](yamux)
* [muxado](muxado)
* [multiplex](multiplex)
* [spdystream](spdystream)
## Badge
Include this badge in your readme if you make a new module that uses abstract-stream-muxer API.
![](img/badge.png)
## Client example
```go
import (
"net"
"fmt"
"io"
ymux "github.com/jbenet/go-stream-muxer/yamux"
smux "github.com/jbenet/go-stream-muxer"
)
func dial() {
nconn, _ := net.Dial("tcp", "localhost:1234")
sconn, _ := ymux.DefaultTransport.NewConn(nconn, false) // false == client
go sconn.Serve(func(smux.Stream) {}) // no-op
s1, _ := sconn.OpenStream()
s1.Write([]byte("hello"))
s2, _ := sconn.OpenStream()
s2.Write([]byte("world"))
length := 20
buf2 := make([]byte, length)
fmt.Printf("reading %d bytes from stream (echoed)\n", length)
s1.Read(buf2)
fmt.Printf("received %s as a response\n", string(buf2))
s3, _ := sconn.OpenStream()
io.Copy(s3, os.Stdin)
}
```
## Server example
```go
import (
"net"
"fmt"
"io"
ymux "github.com/jbenet/go-stream-muxer/yamux"
smux "github.com/jbenet/go-stream-muxer"
)
func listen() {
tr := ymux.DefaultTransport
l, _ := net.Listen("tcp", "localhost:1234")
go func() {
for {
c, _ := l.Accept()
fmt.Println("accepted connection")
sc, _ := tr.NewConn(c, true)
go sc.Serve(func(s smux.Stream) {
fmt.Println("serving connection")
echoStream(s)
})
}
}()
}
func echoStream(s smux.Stream) {
defer s.Close()
fmt.Println("accepted stream")
io.Copy(s, s) // echo everything
fmt.Println("closing stream")
}
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.5 KiB

View File

@ -1,18 +1,20 @@
package peerstream_multiplex
import (
"errors"
"net"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
mp "github.com/whyrusleeping/go-multiplex"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
mp "github.com/whyrusleeping/go-multiplex" // Conn is a connection to a remote peer.
)
var ErrUseServe = errors.New("not implemented, use Serve")
type conn struct {
*mp.Multiplex
}
func ( // Conn is a connection to a remote peer.
c *conn) Close() error {
func (c *conn) Close() error {
return c.Multiplex.Close()
}
@ -21,13 +23,18 @@ func (c *conn) IsClosed() bool {
}
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
func (c *conn) OpenStream() (smux.Stream, error) {
return c.Multiplex.NewStream(), nil
}
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (smux.Stream, error) {
return nil, ErrUseServe
}
// Serve starts listening for incoming requests and handles them
// using given StreamHandler
func (c *conn) Serve(handler pst.StreamHandler) {
func (c *conn) Serve(handler smux.StreamHandler) {
c.Multiplex.Serve(func(s *mp.Stream) {
handler(s)
})
@ -40,6 +47,6 @@ type Transport struct{}
// DefaultTransport has default settings for multiplex
var DefaultTransport = &Transport{}
func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) {
return &conn{mp.NewMultiplex(nc, isServer)}, nil
}

View File

@ -0,0 +1,11 @@
package peerstream_multiplex
import (
"testing"
test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test"
)
func TestMultiplexTransport(t *testing.T) {
test.SubtestAll(t, DefaultTransport)
}

View File

@ -5,26 +5,27 @@ package multistream
import (
"net"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex"
spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream"
yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux"
mss "github.com/whyrusleeping/go-multistream"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/multiplex"
spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/spdystream"
yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux"
)
type transport struct {
mux *mss.MultistreamMuxer
tpts map[string]pst.Transport
tpts map[string]smux.Transport
}
func NewTransport() pst.Transport {
func NewTransport() smux.Transport {
mux := mss.NewMultistreamMuxer()
mux.AddHandler("/multiplex", nil)
mux.AddHandler("/spdystream", nil)
mux.AddHandler("/yamux", nil)
tpts := map[string]pst.Transport{
tpts := map[string]smux.Transport{
"/multiplex": multiplex.DefaultTransport,
"/spdystream": spdy.Transport,
"/yamux": yamux.DefaultTransport,
@ -36,7 +37,7 @@ func NewTransport() pst.Transport {
}
}
func (t *transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
func (t *transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) {
var proto string
if isServer {
selected, _, err := t.mux.Negotiate(nc)

View File

@ -0,0 +1,11 @@
package multistream
import (
"testing"
test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test"
)
func TestMultiStreamTransport(t *testing.T) {
test.SubtestAll(t, NewTransport())
}

View File

@ -4,10 +4,10 @@ import (
"net"
muxado "github.com/inconshreveable/muxado"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
)
// stream implements pst.Stream using a ss.Stream
// stream implements smux.Stream using a ss.Stream
type stream struct {
ms muxado.Stream
}
@ -53,7 +53,7 @@ func (c *conn) IsClosed() bool {
}
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
func (c *conn) OpenStream() (smux.Stream, error) {
s, err := c.ms.Open()
if err != nil {
return nil, err
@ -62,15 +62,24 @@ func (c *conn) OpenStream() (pst.Stream, error) {
return &stream{ms: s}, nil
}
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (smux.Stream, error) {
s, err := c.ms.Accept()
if err != nil {
return nil, err
}
return &stream{ms: s}, nil
}
// Serve starts listening for incoming requests and handles them
// using given StreamHandler
func (c *conn) Serve(handler pst.StreamHandler) {
func (c *conn) Serve(handler smux.StreamHandler) {
for { // accept loop
s, err := c.ms.Accept()
s, err := c.AcceptStream()
if err != nil {
return // err always means closed.
}
go handler(&stream{ms: s})
go handler(s)
}
}
@ -80,7 +89,7 @@ type transport struct{}
// spdystream-backed connections.
var Transport = transport{}
func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
func (t transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) {
var s muxado.Session
if isServer {
s = muxado.Server(nc)

View File

@ -0,0 +1,11 @@
package peerstream_muxado
import (
"testing"
test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test"
)
func TestMuxadoTransport(t *testing.T) {
test.SubtestAll(t, Transport)
}

View File

@ -1,4 +1,4 @@
package peerstream_transport
package streammux
import (
"io"
@ -16,6 +16,9 @@ type Stream interface {
// (usually those opened by the remote side)
type StreamHandler func(Stream)
// NoOpHandler do nothing. close streams as soon as they are opened.
var NoOpHandler = func(s Stream) { s.Close() }
// Conn is a stream-multiplexing connection to a remote peer.
type Conn interface {
io.Closer
@ -27,12 +30,15 @@ type Conn interface {
// OpenStream creates a new stream.
OpenStream() (Stream, error)
// Serve starts listening for incoming requests and handles them
// using given StreamHandler
// AcceptStream accepts a stream opened by the other side.
AcceptStream() (Stream, error)
// Serve starts a loop, accepting incoming requests and calling
// `StreamHandler with them. (Use _instead of_ accept. not both.)
Serve(StreamHandler)
}
// Transport constructs go-peerstream compatible connections.
// Transport constructs go-stream-muxer compatible connections.
type Transport interface {
// NewConn constructs a new connection

View File

@ -1,14 +1,17 @@
package peerstream_spdystream
import (
"errors"
"net"
"net/http"
ss "github.com/docker/spdystream"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
)
// stream implements pst.Stream using a ss.Stream
var ErrUseServe = errors.New("not implemented, use Serve")
// stream implements smux.Stream using a ss.Stream
type stream ss.Stream
func (s *stream) spdyStream() *ss.Stream {
@ -63,7 +66,7 @@ func (c *conn) IsClosed() bool {
}
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
func (c *conn) OpenStream() (smux.Stream, error) {
s, err := c.spdyConn().CreateStream(http.Header{
":method": []string{"GET"}, // this is here for HTTP/SPDY interop
":path": []string{"/"}, // this is here for HTTP/SPDY interop
@ -78,9 +81,14 @@ func (c *conn) OpenStream() (pst.Stream, error) {
return (*stream)(s), nil
}
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (smux.Stream, error) {
return nil, ErrUseServe
}
// Serve starts listening for incoming requests and handles them
// using given StreamHandler
func (c *conn) Serve(handler pst.StreamHandler) {
func (c *conn) Serve(handler smux.StreamHandler) {
c.spdyConn().Serve(func(s *ss.Stream) {
// Flow control and backpressure of Opening streams is broken.
@ -109,7 +117,7 @@ type transport struct{}
// spdystream-backed connections.
var Transport = transport{}
func (t transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
func (t transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) {
sc, err := ss.NewConnection(nc, isServer)
return &conn{sc: sc, closed: make(chan struct{})}, err
}

View File

@ -0,0 +1,11 @@
package peerstream_spdystream
import (
"testing"
test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test"
)
func TestSpdyStreamTransport(t *testing.T) {
test.SubtestAll(t, Transport)
}

View File

@ -0,0 +1,378 @@
package sm_test
import (
"bytes"
crand "crypto/rand"
"fmt"
"io"
mrand "math/rand"
"net"
"os"
"reflect"
"runtime"
"runtime/debug"
"sync"
"testing"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
)
var randomness []byte
func init() {
// read 1MB of randomness
randomness = make([]byte, 1<<20)
if _, err := crand.Read(randomness); err != nil {
panic(err)
}
}
type Options struct {
tr smux.Transport
connNum int
streamNum int
msgNum int
msgMin int
msgMax int
}
func randBuf(size int) []byte {
n := len(randomness) - size
if size < 1 {
panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness)))
}
start := mrand.Intn(n)
return randomness[start : start+size]
}
func checkErr(t *testing.T, err error) {
if err != nil {
debug.PrintStack()
t.Fatal(err)
}
}
func log(s string, v ...interface{}) {
if testing.Verbose() {
fmt.Fprintf(os.Stderr, "> "+s+"\n", v...)
}
}
func echoStream(s smux.Stream) {
defer s.Close()
log("accepted stream")
io.Copy(&LogWriter{s}, s) // echo everything
log("closing stream")
}
type LogWriter struct {
W io.Writer
}
func (lw *LogWriter) Write(buf []byte) (int, error) {
if testing.Verbose() {
log("logwriter: writing %d bytes", len(buf))
}
return lw.W.Write(buf)
}
func GoServe(t *testing.T, tr smux.Transport, l net.Listener) (done func()) {
closed := make(chan struct{}, 1)
go func() {
for {
c1, err := l.Accept()
if err != nil {
select {
case <-closed:
return // closed naturally.
default:
checkErr(t, err)
}
}
log("accepted connection")
sc1, err := tr.NewConn(c1, true)
checkErr(t, err)
go sc1.Serve(echoStream)
}
}()
return func() {
closed <- struct{}{}
}
}
func SubtestSimpleWrite(t *testing.T, tr smux.Transport) {
l, err := net.Listen("tcp", "localhost:0")
checkErr(t, err)
log("listening at %s", l.Addr().String())
done := GoServe(t, tr, l)
defer done()
log("dialing to %s", l.Addr().String())
nc1, err := net.Dial("tcp", l.Addr().String())
checkErr(t, err)
defer nc1.Close()
log("wrapping conn")
c1, err := tr.NewConn(nc1, false)
checkErr(t, err)
defer c1.Close()
// serve the outgoing conn, because some muxers assume
// that we _always_ call serve. (this is an error?)
go c1.Serve(smux.NoOpHandler)
log("creating stream")
s1, err := c1.OpenStream()
checkErr(t, err)
defer s1.Close()
buf1 := randBuf(4096)
log("writing %d bytes to stream", len(buf1))
_, err = s1.Write(buf1)
checkErr(t, err)
buf2 := make([]byte, len(buf1))
log("reading %d bytes from stream (echoed)", len(buf2))
_, err = s1.Read(buf2)
checkErr(t, err)
if string(buf2) != string(buf1) {
t.Error("buf1 and buf2 not equal: %s != %s", string(buf1), string(buf2))
}
log("done")
}
func SubtestStress(t *testing.T, opt Options) {
msgsize := 1 << 11
errs := make(chan error, 0) // dont block anything.
rateLimitN := 5000 // max of 5k funcs, because -race has 8k max.
rateLimitChan := make(chan struct{}, rateLimitN)
for i := 0; i < rateLimitN; i++ {
rateLimitChan <- struct{}{}
}
rateLimit := func(f func()) {
<-rateLimitChan
f()
rateLimitChan <- struct{}{}
}
writeStream := func(s smux.Stream, bufs chan<- []byte) {
log("writeStream %p, %d msgNum", s, opt.msgNum)
for i := 0; i < opt.msgNum; i++ {
buf := randBuf(msgsize)
bufs <- buf
log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3])
if _, err := s.Write(buf); err != nil {
errs <- fmt.Errorf("s.Write(buf): %s", err)
continue
}
}
}
readStream := func(s smux.Stream, bufs <-chan []byte) {
log("readStream %p, %d msgNum", s, opt.msgNum)
buf2 := make([]byte, msgsize)
i := 0
for buf1 := range bufs {
i++
log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
if _, err := io.ReadFull(s, buf2); err != nil {
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
continue
}
if !bytes.Equal(buf1, buf2) {
errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
}
}
}
openStreamAndRW := func(c smux.Conn) {
log("openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum)
s, err := c.OpenStream()
if err != nil {
errs <- fmt.Errorf("Failed to create NewStream: %s", err)
return
}
bufs := make(chan []byte, opt.msgNum)
go func() {
writeStream(s, bufs)
close(bufs)
}()
readStream(s, bufs)
s.Close()
}
openConnAndRW := func() {
log("openConnAndRW")
l, err := net.Listen("tcp", "localhost:0")
checkErr(t, err)
done := GoServe(t, opt.tr, l)
defer done()
nla := l.Addr()
nc, err := net.Dial(nla.Network(), nla.String())
checkErr(t, err)
if err != nil {
t.Fatal(fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err))
return
}
c, err := opt.tr.NewConn(nc, false)
if err != nil {
t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err))
return
}
// serve the outgoing conn, because some muxers assume
// that we _always_ call serve. (this is an error?)
go c.Serve(func(s smux.Stream) {
log("serving connection")
echoStream(s)
s.Close()
})
var wg sync.WaitGroup
for i := 0; i < opt.streamNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
openStreamAndRW(c)
})
}
wg.Wait()
c.Close()
}
openConnsAndRW := func() {
log("openConnsAndRW, %d conns", opt.connNum)
var wg sync.WaitGroup
for i := 0; i < opt.connNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
openConnAndRW()
})
}
wg.Wait()
}
go func() {
openConnsAndRW()
close(errs) // done
}()
for err := range errs {
t.Error(err)
}
}
func SubtestStress1Conn1Stream1Msg(t *testing.T, tr smux.Transport) {
SubtestStress(t, Options{
tr: tr,
connNum: 1,
streamNum: 1,
msgNum: 1,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress1Conn1Stream100Msg(t *testing.T, tr smux.Transport) {
SubtestStress(t, Options{
tr: tr,
connNum: 1,
streamNum: 1,
msgNum: 100,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress1Conn100Stream100Msg(t *testing.T, tr smux.Transport) {
SubtestStress(t, Options{
tr: tr,
connNum: 1,
streamNum: 100,
msgNum: 100,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress50Conn10Stream50Msg(t *testing.T, tr smux.Transport) {
SubtestStress(t, Options{
tr: tr,
connNum: 50,
streamNum: 10,
msgNum: 50,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress1Conn1000Stream10Msg(t *testing.T, tr smux.Transport) {
SubtestStress(t, Options{
tr: tr,
connNum: 1,
streamNum: 1000,
msgNum: 10,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, tr smux.Transport) {
SubtestStress(t, Options{
tr: tr,
connNum: 1,
streamNum: 100,
msgNum: 100,
msgMax: 10000,
msgMin: 1000,
})
}
func SubtestAll(t *testing.T, tr smux.Transport) {
tests := []TransportTest{
SubtestSimpleWrite,
SubtestStress1Conn1Stream1Msg,
SubtestStress1Conn1Stream100Msg,
SubtestStress1Conn100Stream100Msg,
SubtestStress50Conn10Stream50Msg,
SubtestStress1Conn1000Stream10Msg,
SubtestStress1Conn100Stream100Msg10MB,
}
for _, f := range tests {
if testing.Verbose() {
fmt.Fprintf(os.Stderr, "==== RUN %s\n", GetFunctionName(f))
}
f(t, tr)
}
}
type TransportTest func(t *testing.T, tr smux.Transport)
func TestNoOp(t *testing.T) {}
func GetFunctionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}

View File

@ -1,4 +1,4 @@
package peerstream_yamux
package sm_yamux
import (
"io/ioutil"
@ -6,10 +6,10 @@ import (
"time"
yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/hashicorp/yamux"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
smux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
)
// stream implements pst.Stream using a ss.Stream
// stream implements smux.Stream using a ss.Stream
type stream yamux.Stream
func (s *stream) yamuxStream() *yamux.Stream {
@ -44,7 +44,7 @@ func (c *conn) IsClosed() bool {
}
// OpenStream creates a new stream.
func (c *conn) OpenStream() (pst.Stream, error) {
func (c *conn) OpenStream() (smux.Stream, error) {
s, err := c.yamuxSession().OpenStream()
if err != nil {
return nil, err
@ -53,15 +53,21 @@ func (c *conn) OpenStream() (pst.Stream, error) {
return (*stream)(s), nil
}
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (smux.Stream, error) {
s, err := c.yamuxSession().AcceptStream()
return (*stream)(s), err
}
// Serve starts listening for incoming requests and handles them
// using given StreamHandler
func (c *conn) Serve(handler pst.StreamHandler) {
func (c *conn) Serve(handler smux.StreamHandler) {
for { // accept loop
s, err := c.yamuxSession().AcceptStream()
s, err := c.AcceptStream()
if err != nil {
return // err always means closed.
}
go handler((*stream)(s))
go handler(s)
}
}
@ -78,7 +84,7 @@ var DefaultTransport = (*Transport)(&yamux.Config{
LogOutput: ioutil.Discard,
})
func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) {
func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) {
var s *yamux.Session
var err error
if isServer {

View File

@ -0,0 +1,11 @@
package sm_yamux
import (
"testing"
test "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/test"
)
func TestYamuxTransport(t *testing.T) {
test.SubtestAll(t, DefaultTransport)
}

View File

@ -0,0 +1 @@
*.swp

View File

@ -0,0 +1 @@
QmYmW76Y7NxvWwW3GPUmpbHd4mm1iW14f5UimeJ9UoofEs

View File

@ -0,0 +1,18 @@
# go-multiplex
A super simple stream muxing library compatible with [multiplex](http://github.com/maxogden/multiplex)
## Usage
```go
mplex := multiplex.NewMultiplex(mysocket)
s := mplex.NewStream()
s.Write([]byte("Hello World!")
s.Close()
mplex.Serve(func(s *multiplex.Stream) {
// echo back everything received
io.Copy(s, s)
})
```

View File

@ -0,0 +1,393 @@
package multiplex
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
)
const (
NewStream = iota
Receiver
Initiator
Unknown
Close
)
var _ = ioutil.ReadAll
var _ = bufio.NewReadWriter
var _ = binary.MaxVarintLen16
type msg struct {
header uint64
data []byte
err chan<- error
}
type Stream struct {
id uint64
name string
header uint64
closed chan struct{}
data_in chan []byte
data_out chan<- msg
extra []byte
}
func newStream(id uint64, name string, initiator bool, send chan<- msg) *Stream {
var hfn uint64
if initiator {
hfn = 2
} else {
hfn = 1
}
return &Stream{
id: id,
name: name,
header: (id << 3) | hfn,
data_in: make(chan []byte, 8),
data_out: send,
closed: make(chan struct{}),
}
}
func (s *Stream) Name() string {
return s.name
}
func (s *Stream) receive(b []byte) {
select {
case s.data_in <- b:
case <-s.closed:
}
}
func (m *Multiplex) Accept() (*Stream, error) {
select {
case s, ok := <-m.nstreams:
if !ok {
return nil, errors.New("multiplex closed")
}
return s, nil
case err := <-m.errs:
return nil, err
case <-m.closed:
return nil, errors.New("multiplex closed")
}
}
func (s *Stream) Read(b []byte) (int, error) {
if s.extra == nil {
select {
case <-s.closed:
return 0, io.EOF
case read, ok := <-s.data_in:
if !ok {
return 0, io.EOF
}
s.extra = read
}
}
n := copy(b, s.extra)
if n < len(s.extra) {
s.extra = s.extra[n:]
} else {
s.extra = nil
}
return n, nil
}
func (s *Stream) Write(b []byte) (int, error) {
errs := make(chan error, 1)
select {
case s.data_out <- msg{header: s.header, data: b, err: errs}:
select {
case err := <-errs:
return len(b), err
case <-s.closed:
return 0, errors.New("stream closed")
}
case <-s.closed:
return 0, errors.New("stream closed")
}
}
func (s *Stream) Close() error {
select {
case <-s.closed:
return nil
default:
close(s.closed)
select {
case s.data_out <- msg{
header: (s.id << 3) | Close,
err: make(chan error, 1), //throw away error, whatever
}:
default:
}
close(s.data_in)
return nil
}
}
type Multiplex struct {
con io.ReadWriteCloser
buf *bufio.Reader
nextID uint64
outchan chan msg
closed chan struct{}
initiator bool
nstreams chan *Stream
errs chan error
channels map[uint64]*Stream
ch_lock sync.Mutex
}
func NewMultiplex(con io.ReadWriteCloser, initiator bool) *Multiplex {
mp := &Multiplex{
con: con,
initiator: initiator,
buf: bufio.NewReader(con),
channels: make(map[uint64]*Stream),
outchan: make(chan msg),
closed: make(chan struct{}),
nstreams: make(chan *Stream, 16),
errs: make(chan error),
}
go mp.handleOutgoing()
go mp.handleIncoming()
return mp
}
func (mp *Multiplex) Close() error {
if mp.IsClosed() {
return nil
}
close(mp.closed)
mp.ch_lock.Lock()
defer mp.ch_lock.Unlock()
for _, s := range mp.channels {
err := s.Close()
if err != nil {
return err
}
}
return nil
}
func (mp *Multiplex) IsClosed() bool {
select {
case <-mp.closed:
return true
default:
return false
}
}
func (mp *Multiplex) handleOutgoing() {
for {
select {
case msg, ok := <-mp.outchan:
if !ok {
return
}
buf := EncodeVarint(msg.header)
_, err := mp.con.Write(buf)
if err != nil {
msg.err <- err
continue
}
buf = EncodeVarint(uint64(len(msg.data)))
_, err = mp.con.Write(buf)
if err != nil {
msg.err <- err
continue
}
_, err = mp.con.Write(msg.data)
if err != nil {
msg.err <- err
continue
}
msg.err <- nil
case <-mp.closed:
return
}
}
}
func (mp *Multiplex) nextChanID() (out uint64) {
if mp.initiator {
out = mp.nextID + 1
} else {
out = mp.nextID
}
mp.nextID += 2
return
}
func (mp *Multiplex) NewStream() *Stream {
return mp.NewNamedStream("")
}
func (mp *Multiplex) NewNamedStream(name string) *Stream {
mp.ch_lock.Lock()
sid := mp.nextChanID()
header := (sid << 3) | NewStream
if name == "" {
name = fmt.Sprint(sid)
}
s := newStream(sid, name, true, mp.outchan)
mp.channels[sid] = s
mp.ch_lock.Unlock()
mp.outchan <- msg{
header: header,
data: []byte(name),
err: make(chan error, 1), //throw away error
}
return s
}
func (mp *Multiplex) sendErr(err error) {
select {
case mp.errs <- err:
case <-mp.closed:
}
}
func (mp *Multiplex) handleIncoming() {
defer mp.shutdown()
for {
ch, tag, err := mp.readNextHeader()
if err != nil {
mp.sendErr(err)
return
}
b, err := mp.readNext()
if err != nil {
mp.sendErr(err)
return
}
mp.ch_lock.Lock()
msch, ok := mp.channels[ch]
if !ok {
var name string
if tag == NewStream {
name = string(b)
}
msch = newStream(ch, name, false, mp.outchan)
mp.channels[ch] = msch
select {
case mp.nstreams <- msch:
case <-mp.closed:
return
}
if tag == NewStream {
mp.ch_lock.Unlock()
continue
}
}
mp.ch_lock.Unlock()
if tag == Close {
msch.Close()
mp.ch_lock.Lock()
delete(mp.channels, ch)
mp.ch_lock.Unlock()
continue
}
msch.receive(b)
}
}
func (mp *Multiplex) shutdown() {
mp.ch_lock.Lock()
defer mp.ch_lock.Unlock()
for _, s := range mp.channels {
s.Close()
}
}
func (mp *Multiplex) readNextHeader() (uint64, uint64, error) {
h, _, err := DecodeVarint(mp.buf)
if err != nil {
return 0, 0, err
}
// get channel ID
ch := h >> 3
rem := h & 7
return ch, rem, nil
}
func (mp *Multiplex) readNext() ([]byte, error) {
// get length
l, _, err := DecodeVarint(mp.buf)
if err != nil {
return nil, err
}
buf := make([]byte, l)
n, err := io.ReadFull(mp.buf, buf)
if err != nil {
return nil, err
}
if n != int(l) {
panic("NOT THE SAME")
}
return buf, nil
}
func EncodeVarint(x uint64) []byte {
var buf [10]byte
var n int
for n = 0; x > 127; n++ {
buf[n] = 0x80 | uint8(x&0x7F)
x >>= 7
}
buf[n] = uint8(x)
n++
return buf[0:n]
}
func DecodeVarint(r *bufio.Reader) (x uint64, n int, err error) {
// x, n already 0
for shift := uint(0); shift < 64; shift += 7 {
val, err := r.ReadByte()
if err != nil {
return 0, 0, err
}
b := uint64(val)
n++
x |= (b & 0x7F) << shift
if (b & 0x80) == 0 {
return x, n, nil
}
}
// The number is too large to represent in a 64-bit value.
return 0, 0, errors.New("Too large of a number!")
}

View File

@ -0,0 +1,118 @@
package multiplex
import (
"fmt"
"io"
"net"
"testing"
rand "github.com/dustin/randbo"
)
func TestBasicStreams(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
mes := []byte("Hello world")
go func() {
s, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
_, err = s.Write(mes)
if err != nil {
t.Fatal(err)
}
err = s.Close()
if err != nil {
t.Fatal(err)
}
}()
s := mpa.NewStream()
buf := make([]byte, len(mes))
n, err := s.Read(buf)
if err != nil {
t.Fatal(err)
}
if n != len(mes) {
t.Fatal("read wrong amount")
}
if string(buf) != string(mes) {
t.Fatal("got bad data")
}
s.Close()
mpa.Close()
mpb.Close()
}
func TestEcho(t *testing.T) {
a, b := net.Pipe()
mpa := NewMultiplex(a, false)
mpb := NewMultiplex(b, true)
mes := make([]byte, 40960)
rand.New().Read(mes)
go func() {
s, err := mpb.Accept()
if err != nil {
t.Fatal(err)
}
defer s.Close()
io.Copy(s, s)
}()
s := mpa.NewStream()
_, err := s.Write(mes)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, len(mes))
n, err := io.ReadFull(s, buf)
if err != nil {
t.Fatal(err)
}
if n != len(mes) {
t.Fatal("read wrong amount")
}
if err := arrComp(buf, mes); err != nil {
t.Fatal(err)
}
s.Close()
mpa.Close()
mpb.Close()
}
func arrComp(a, b []byte) error {
msg := ""
if len(a) != len(b) {
msg += fmt.Sprintf("arrays differ in length: %d %d\n", len(a), len(b))
}
for i := 0; i < len(a) && i < len(b); i++ {
if a[i] != b[i] {
msg += fmt.Sprintf("content differs at index %d [%d != %d]", i, a[i], b[i])
return fmt.Errorf(msg)
}
}
if len(msg) > 0 {
return fmt.Errorf(msg)
}
return nil
}

View File

@ -0,0 +1,5 @@
{
"name": "go-multiplex",
"version": "1.0.0",
"language": "go"
}

View File

@ -16,8 +16,8 @@ import (
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer"
psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-stream-muxer/yamux"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
prom "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"