From 2c4eb60961d752e1a02ee12ea94320ab9cfbf7a3 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 9 Jul 2015 14:42:41 -0700 Subject: [PATCH] allow multistream to have zero rtt stream opening License: MIT Signed-off-by: Jeromy --- Godeps/Godeps.json | 2 +- .../whyrusleeping/go-multistream/lazy.go | 129 ++++++++++++++++++ .../go-multistream/multistream.go | 30 ++-- .../go-multistream/multistream_test.go | 106 ++++++++++++++ p2p/host/basic/basic_host.go | 24 +++- p2p/test/backpressure/backpressure_test.go | 6 + 6 files changed, 279 insertions(+), 18 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/lazy.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 2200bd2f0..e4cab5487 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -344,7 +344,7 @@ }, { "ImportPath": "github.com/whyrusleeping/go-multistream", - "Rev": "c9eea2e3be705b7cfd730351b510cfa12ca038f4" + "Rev": "30c7a81b6c568654147bf6e106870c5d64ccebc8" }, { "ImportPath": "github.com/whyrusleeping/multiaddr-filter", diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/lazy.go b/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/lazy.go new file mode 100644 index 000000000..eed4cfbdb --- /dev/null +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/lazy.go @@ -0,0 +1,129 @@ +package multistream + +import ( + "fmt" + "io" + "sync" +) + +func NewLazyHandshakeConn(c io.ReadWriteCloser, proto string) io.ReadWriteCloser { + return &lazyConn{ + proto: proto, + con: c, + } +} + +type lazyConn struct { + rhandshake bool // only accessed by 'Read' should not call read async + + rhlock sync.Mutex + rhsync bool //protected by mutex + rerr error + + whandshake bool + + whlock sync.Mutex + whsync bool + werr error + + proto string + con io.ReadWriteCloser +} + +func (l *lazyConn) Read(b []byte) (int, error) { + if !l.rhandshake { + go l.writeHandshake() + err := l.readHandshake() + if err != nil { + return 0, err + } + + l.rhandshake = true + } + + if len(b) == 0 { + return 0, nil + } + + return l.con.Read(b) +} + +func (l *lazyConn) readHandshake() error { + l.rhlock.Lock() + defer l.rhlock.Unlock() + + // if we've already done this, exit + if l.rhsync { + return l.rerr + } + l.rhsync = true + + // read multistream version + tok, err := ReadNextToken(l.con) + if err != nil { + l.rerr = err + return err + } + + if tok != ProtocolID { + l.rerr = fmt.Errorf("multistream protocol mismatch ( %s != %s )", tok, ProtocolID) + return l.rerr + } + + // read protocol + tok, err = ReadNextToken(l.con) + if err != nil { + l.rerr = err + return err + } + + if tok != l.proto { + l.rerr = fmt.Errorf("protocol mismatch in lazy handshake ( %s != %s )", tok, l.proto) + return l.rerr + } + + return nil +} + +func (l *lazyConn) writeHandshake() error { + l.whlock.Lock() + defer l.whlock.Unlock() + + if l.whsync { + return l.werr + } + + l.whsync = true + + err := delimWrite(l.con, []byte(ProtocolID)) + if err != nil { + l.werr = err + return err + } + + err = delimWrite(l.con, []byte(l.proto)) + if err != nil { + l.werr = err + return err + } + + return nil +} + +func (l *lazyConn) Write(b []byte) (int, error) { + if !l.whandshake { + go l.readHandshake() + err := l.writeHandshake() + if err != nil { + return 0, err + } + + l.whandshake = true + } + + return l.con.Write(b) +} + +func (l *lazyConn) Close() error { + return l.con.Close() +} diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/multistream.go b/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/multistream.go index 8f18785cc..ecec8df73 100644 --- a/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/multistream.go +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/multistream.go @@ -100,17 +100,7 @@ loop: switch tok { case "ls": - buf := new(bytes.Buffer) - msm.handlerlock.Lock() - for proto, _ := range msm.handlers { - err := delimWrite(buf, []byte(proto)) - if err != nil { - msm.handlerlock.Unlock() - return "", nil, err - } - } - msm.handlerlock.Unlock() - err := delimWrite(rwc, buf.Bytes()) + err := msm.Ls(rwc) if err != nil { return "", nil, err } @@ -138,6 +128,24 @@ loop: } +func (msm *MultistreamMuxer) Ls(rwc io.Writer) error { + buf := new(bytes.Buffer) + msm.handlerlock.Lock() + for proto, _ := range msm.handlers { + err := delimWrite(buf, []byte(proto)) + if err != nil { + msm.handlerlock.Unlock() + return err + } + } + msm.handlerlock.Unlock() + err := delimWrite(rwc, buf.Bytes()) + if err != nil { + return err + } + return nil +} + func (msm *MultistreamMuxer) Handle(rwc io.ReadWriteCloser) error { _, h, err := msm.Negotiate(rwc) if err != nil { diff --git a/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/multistream_test.go b/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/multistream_test.go index 85e096877..be15259f5 100644 --- a/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/multistream_test.go +++ b/Godeps/_workspace/src/github.com/whyrusleeping/go-multistream/multistream_test.go @@ -118,6 +118,112 @@ func TestSelectOneAndWrite(t *testing.T) { verifyPipe(t, a, b) } +func TestLazyConns(t *testing.T) { + a, b := net.Pipe() + + mux := NewMultistreamMuxer() + mux.AddHandler("/a", nil) + mux.AddHandler("/b", nil) + mux.AddHandler("/c", nil) + + la := NewLazyHandshakeConn(a, "/c") + lb := NewLazyHandshakeConn(b, "/c") + + verifyPipe(t, la, lb) +} + +func TestLazyAndMux(t *testing.T) { + a, b := net.Pipe() + + mux := NewMultistreamMuxer() + mux.AddHandler("/a", nil) + mux.AddHandler("/b", nil) + mux.AddHandler("/c", nil) + + done := make(chan struct{}) + go func() { + selected, _, err := mux.Negotiate(a) + if err != nil { + t.Fatal(err) + } + if selected != "/c" { + t.Fatal("incorrect protocol selected") + } + + msg := make([]byte, 5) + _, err = a.Read(msg) + if err != nil { + t.Fatal(err) + } + + close(done) + }() + + lb := NewLazyHandshakeConn(b, "/c") + + // do a write to push the handshake through + _, err := lb.Write([]byte("hello")) + if err != nil { + t.Fatal(err) + } + + select { + case <-time.After(time.Second): + t.Fatal("failed to complete in time") + case <-done: + } + + verifyPipe(t, a, lb) +} + +func TestLazyAndMuxWrite(t *testing.T) { + a, b := net.Pipe() + + mux := NewMultistreamMuxer() + mux.AddHandler("/a", nil) + mux.AddHandler("/b", nil) + mux.AddHandler("/c", nil) + + done := make(chan struct{}) + go func() { + selected, _, err := mux.Negotiate(a) + if err != nil { + t.Fatal(err) + } + if selected != "/c" { + t.Fatal("incorrect protocol selected") + } + + _, err = a.Write([]byte("hello")) + if err != nil { + t.Fatal(err) + } + + close(done) + }() + + lb := NewLazyHandshakeConn(b, "/c") + + // do a write to push the handshake through + msg := make([]byte, 5) + _, err := lb.Read(msg) + if err != nil { + t.Fatal(err) + } + + if string(msg) != "hello" { + t.Fatal("wrong!") + } + + select { + case <-time.After(time.Second): + t.Fatal("failed to complete in time") + case <-done: + } + + verifyPipe(t, a, lb) +} + func verifyPipe(t *testing.T, a, b io.ReadWriter) { mes := make([]byte, 1024) rand.Read(mes) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 963668744..92e7792a1 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -170,12 +170,11 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) { logStream := mstream.WrapStream(s, pid, h.bwc) - if err := msmux.SelectProtoOrFail(string(pid), logStream); err != nil { - logStream.Close() - return nil, err - } - - return logStream, nil + lzcon := msmux.NewLazyHandshakeConn(logStream, string(pid)) + return &streamWrapper{ + Stream: logStream, + rw: lzcon, + }, nil } // Connect ensures there is a connection between this host and the peer with @@ -254,3 +253,16 @@ func (h *BasicHost) Close() error { func (h *BasicHost) GetBandwidthReporter() metrics.Reporter { return h.bwc } + +type streamWrapper struct { + inet.Stream + rw io.ReadWriter +} + +func (s *streamWrapper) Read(b []byte) (int, error) { + return s.rw.Read(b) +} + +func (s *streamWrapper) Write(b []byte) (int, error) { + return s.rw.Write(b) +} diff --git a/p2p/test/backpressure/backpressure_test.go b/p2p/test/backpressure/backpressure_test.go index bacdcec3d..b13d77246 100644 --- a/p2p/test/backpressure/backpressure_test.go +++ b/p2p/test/backpressure/backpressure_test.go @@ -299,6 +299,12 @@ func TestStBackpressureStreamWrite(t *testing.T) { } } + // trigger lazy connection handshaking + _, err = s.Read(nil) + if err != nil { + t.Fatal(err) + } + // 500ms rounds of lockstep write + drain roundsStart := time.Now() roundsTotal := 0