diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 67350eb98..ef2f0f200 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -172,7 +172,7 @@ }, { "ImportPath": "github.com/jbenet/go-peerstream", - "Rev": "8d52ed2801410a2af995b4e87660272d11c8a9a4" + "Rev": "62fe5ede12f9d9cd9406750160122525b3d6b694" }, { "ImportPath": "github.com/jbenet/go-random", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml index 3c1c47ab9..415030aa9 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/.travis.yml @@ -7,4 +7,5 @@ go: - tip script: - - go test -race -cpu=5 ./... + - go test ./... + # - go test -race -cpu=5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json new file mode 100644 index 000000000..675d730e1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Godeps.json @@ -0,0 +1,33 @@ +{ + "ImportPath": "github.com/jbenet/go-peerstream", + "GoVersion": "go1.4.2", + "Packages": [ + "./..." + ], + "Deps": [ + { + "ImportPath": "github.com/docker/spdystream", + "Rev": "b2c3287865f3ad6aa22821ddb7b4692b896ac207" + }, + { + "ImportPath": "github.com/hashicorp/yamux", + "Rev": "b2e55852ddaf823a85c67f798080eb7d08acd71d" + }, + { + "ImportPath": "github.com/inconshreveable/muxado", + "Rev": "f693c7e88ba316d1a0ae3e205e22a01aa3ec2848" + }, + { + "ImportPath": "github.com/jbenet/go-temp-err-catcher", + "Rev": "aac704a3f4f27190b4ccc05f303a4931fd1241ff" + }, + { + "ImportPath": "github.com/whyrusleeping/go-multiplex", + "Rev": "ce5baa716247510379cb7640a14da857afd3b622" + }, + { + "ImportPath": "github.com/whyrusleeping/go-multistream", + "Rev": "08e8f9c9f5665ed0c63ffde4fa5ef1d5fb3d516d" + } + ] +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Readme b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Readme new file mode 100644 index 000000000..4cdaa53d5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Godeps/Readme @@ -0,0 +1,5 @@ +This directory tree is generated automatically by godep. + +Please do not edit. + +See https://github.com/tools/godep for more information. diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Makefile b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Makefile new file mode 100644 index 000000000..f15fcaa68 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/Makefile @@ -0,0 +1,15 @@ + +godep: + go get github.com/tools/godep + +vendor: godep + godep save -r ./... + +build: + go build ./... + +test: + go test ./... + +test_race: + go test -race -cpu 5 ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go index 15821b533..998e849d4 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/conn.go @@ -158,37 +158,7 @@ func ConnInConns(c1 *Conn, conns []*Conn) bool { // addConn is the internal version of AddConn. we need the server bool // as spdystream requires it. func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) { - if netConn == nil { - return nil, errors.New("nil conn") - } - - // this function is so we can defer our lock, which needs to be - // unlocked **before** the Handler is called (which needs to be - // sequential). This was the simplest thing :) - setupConn := func() (*Conn, error) { - s.connLock.Lock() - defer s.connLock.Unlock() - - // first, check if we already have it... - for c := range s.conns { - if c.netConn == netConn { - return c, nil - } - } - - // create a new spdystream connection - ssConn, err := s.transport.NewConn(netConn, isServer) - if err != nil { - return nil, err - } - - // add the connection - c := newConn(netConn, ssConn, s) - s.conns[c] = struct{}{} - return c, nil - } - - c, err := setupConn() + c, err := s.setupConn(netConn, isServer) if err != nil { return nil, err } @@ -208,6 +178,49 @@ func (s *Swarm) addConn(netConn net.Conn, isServer bool) (*Conn, error) { return c, nil } +// setupConn adds the relevant connection to the map, first checking if it +// was already there. +func (s *Swarm) setupConn(netConn net.Conn, isServer bool) (*Conn, error) { + if netConn == nil { + return nil, errors.New("nil conn") + } + + // first, check if we already have it, to avoid constructing it + // if it is already there + s.connLock.Lock() + for c := range s.conns { + if c.netConn == netConn { + s.connLock.Unlock() + return c, nil + } + } + s.connLock.Unlock() + // construct the connection without hanging onto the lock + // (as there could be deadlock if so.) + + // create a new spdystream connection + ssConn, err := s.transport.NewConn(netConn, isServer) + if err != nil { + return nil, err + } + + // take the lock to add it to the map. + s.connLock.Lock() + defer s.connLock.Unlock() + + // check for it again as it may have been added already. (TOCTTOU) + for c := range s.conns { + if c.netConn == netConn { + return c, nil + } + } + + // add the connection + c := newConn(netConn, ssConn, s) + s.conns[c] = struct{}{} + return c, nil +} + // createStream is the internal function that creates a new stream. assumes // all validation has happened. func (s *Swarm) createStream(c *Conn) (*Stream, error) { diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/listener.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/listener.go index 2d2dd8ebc..2fbc7c77c 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/listener.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/listener.go @@ -4,12 +4,13 @@ import ( "errors" "fmt" "net" + "sync" tec "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-temp-err-catcher" ) // AcceptConcurrency is how many connections can simultaneously be -// in process of being accepted. Handshakes can sometimes occurr as +// in process of being accepted. Handshakes can sometimes occur as // part of this process, so it may take some time. It is imporant to // rate limit lest a malicious influx of connections would cause our // node to consume all its resources accepting new connections. @@ -73,7 +74,11 @@ func ListenersWithGroup(g Group, ls []*Listener) []*Listener { // run in a goroutine. // TODO: add rate limiting func (l *Listener) accept() { - defer l.teardown() + var wg sync.WaitGroup + defer func() { + wg.Wait() // must happen before teardown + l.teardown() + }() // catching the error here is odd. doing what net/http does: // http://golang.org/src/net/http/server.go?s=51504:51550#L1728 @@ -98,12 +103,15 @@ func (l *Listener) accept() { // do this in a goroutine to avoid blocking the Accept loop. // note that this does not rate limit accepts. limit <- struct{}{} // sema down + wg.Add(1) go func(conn net.Conn) { defer func() { <-limit }() // sema up + defer wg.Done() conn2, err := l.swarm.addConn(conn, true) if err != nil { l.acceptErr <- err + return } conn2.groups.AddSet(&l.groups) // add out groups }(conn) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go new file mode 100644 index 000000000..79721acd5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex.go @@ -0,0 +1,45 @@ +package peerstream_multiplex + +import ( + "net" + + pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + mp "github.com/whyrusleeping/go-multiplex" +) + +type conn struct { + *mp.Multiplex +} + +func ( // Conn is a connection to a remote peer. +c *conn) Close() error { + return c.Multiplex.Close() +} + +func (c *conn) IsClosed() bool { + return c.Multiplex.IsClosed() +} + +// OpenStream creates a new stream. +func (c *conn) OpenStream() (pst.Stream, error) { + return c.Multiplex.NewStream(), nil +} + +// Serve starts listening for incoming requests and handles them +// using given StreamHandler +func (c *conn) Serve(handler pst.StreamHandler) { + c.Multiplex.Serve(func(s *mp.Stream) { + handler(s) + }) +} + +// Transport is a go-peerstream transport that constructs +// multiplex-backed connections. +type Transport struct{} + +// DefaultTransport has default settings for multiplex +var DefaultTransport = &Transport{} + +func (t *Transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { + return &conn{mp.NewMultiplex(nc, isServer)}, nil +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go new file mode 100644 index 000000000..43a87af34 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex/multiplex_test.go @@ -0,0 +1,11 @@ +package peerstream_multiplex + +import ( + "testing" + + psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" +) + +func TestMultiplexTransport(t *testing.T) { + psttest.SubtestAll(t, DefaultTransport) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go new file mode 100644 index 000000000..175f75c52 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream.go @@ -0,0 +1,59 @@ +// package multistream implements a peerstream transport using +// go-multistream to select the underlying stream muxer +package multistream + +import ( + "net" + + pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" + multiplex "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multiplex" + spdy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream" + yamux "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux" + mss "github.com/whyrusleeping/go-multistream" +) + +type transport struct { + mux *mss.MultistreamMuxer + + tpts map[string]pst.Transport +} + +func NewTransport() pst.Transport { + mux := mss.NewMultistreamMuxer() + mux.AddHandler("/multiplex", nil) + mux.AddHandler("/spdystream", nil) + mux.AddHandler("/yamux", nil) + + tpts := map[string]pst.Transport{ + "/multiplex": multiplex.DefaultTransport, + "/spdystream": spdy.Transport, + "/yamux": yamux.DefaultTransport, + } + + return &transport{ + mux: mux, + tpts: tpts, + } +} + +func (t *transport) NewConn(nc net.Conn, isServer bool) (pst.Conn, error) { + var proto string + if isServer { + selected, _, err := t.mux.Negotiate(nc) + if err != nil { + return nil, err + } + proto = selected + } else { + // prefer yamux + selected, err := mss.SelectOneOf([]string{"/yamux", "/spdystream", "/multiplex"}, nc) + if err != nil { + return nil, err + } + proto = selected + } + + tpt := t.tpts[proto] + + return tpt.NewConn(nc, isServer) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go new file mode 100644 index 000000000..ebe46b814 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/multistream/multistream_test.go @@ -0,0 +1,11 @@ +package multistream + +import ( + "testing" + + psttest "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test" +) + +func TestMultiStreamTransport(t *testing.T) { + psttest.SubtestAll(t, NewTransport()) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go index 58ea947c4..90d0ca6b6 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream.go @@ -4,8 +4,8 @@ import ( "net" "net/http" + ss "github.com/docker/spdystream" pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" - ss "github.com/jbenet/spdystream" ) // stream implements pst.Stream using a ss.Stream @@ -55,6 +55,8 @@ func (c *conn) IsClosed() bool { select { case <-c.closed: return true + case <-c.sc.CloseChan(): + return true default: return false } @@ -62,7 +64,10 @@ func (c *conn) IsClosed() bool { // OpenStream creates a new stream. func (c *conn) OpenStream() (pst.Stream, error) { - s, err := c.spdyConn().CreateStream(http.Header{}, nil, false) + s, err := c.spdyConn().CreateStream(http.Header{ + ":method": []string{"GET"}, // this is here for HTTP/SPDY interop + ":path": []string{"/"}, // this is here for HTTP/SPDY interop + }, nil, false) if err != nil { return nil, err } @@ -87,10 +92,14 @@ func (c *conn) Serve(handler pst.StreamHandler) { // -- at this moment -- not the solution. Either spdystream must // change, or we must throttle another way. go-peerstream handles // every new stream in its own goroutine. - go func() { - s.SendReply(http.Header{}, false) - handler((*stream)(s)) - }() + err := s.SendReply(http.Header{}, false) + if err != nil { + // this _could_ error out. not sure how to handle this failure. + // don't return, and let the caller handle a broken stream. + // better than _hiding_ an error. + // return + } + go handler((*stream)(s)) }) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go index 2418071f0..16e754b77 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/spdystream/spdystream_test.go @@ -7,6 +7,5 @@ import ( ) func TestSpdyStreamTransport(t *testing.T) { - t.Skip("spdystream is known to be broken") psttest.SubtestAll(t, Transport) } diff --git a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go index 5d543327a..4d86614fb 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/test/ttest.go @@ -194,7 +194,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) { bufs <- buf log("writing %d bytes (message %d/%d #%x)", len(buf), i, msgs, buf[:3]) if _, err := stream.Write(buf); err != nil { - errs <- err + errs <- fmt.Errorf("stream.Write(buf): %s", err) continue } } @@ -212,7 +212,7 @@ func SubtestSimpleWrite100msgs(t *testing.T, tr pst.Transport) { i++ if _, err := io.ReadFull(stream, buf2); err != nil { - errs <- err + errs <- fmt.Errorf("readFull(stream, buf2): %s", err) continue } if !bytes.Equal(buf1, buf2) { @@ -253,7 +253,7 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, bufs <- buf log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, nMsg, buf[:3]) if _, err := s.Write(buf); err != nil { - errs <- err + errs <- fmt.Errorf("s.Write(buf): %s", err) continue } } @@ -265,11 +265,12 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, buf2 := make([]byte, msgsize) i := 0 for buf1 := range bufs { - log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i, nMsg, buf1[:3]) i++ + log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) if _, err := io.ReadFull(s, buf2); err != nil { - errs <- err + errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err) + log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, nMsg, buf1[:3]) continue } if !bytes.Equal(buf1, buf2) { @@ -307,13 +308,13 @@ func SubtestStressNSwarmNConnNStreamNMsg(t *testing.T, tr pst.Transport, nSwarm, nc, err := net.Dial(nla.Network(), nla.String()) if err != nil { - errs <- err + errs <- fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err) return } c, err := a.AddConn(nc) if err != nil { - errs <- err + errs <- fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err) return }