diff --git a/net/conn/closer.go b/net/conn/closer.go index 503b035d3..519b9e513 100644 --- a/net/conn/closer.go +++ b/net/conn/closer.go @@ -1,6 +1,8 @@ package conn import ( + "errors" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) @@ -62,13 +64,14 @@ func (c *contextCloser) Done() Wait { func (c *contextCloser) Close() error { select { case <-c.Done(): - panic("closed twice") + // panic("closed twice") + return errors.New("closed twice") default: } - c.cancel() // release anyone waiting on the context err := c.closeFunc() // actually run the close logic close(c.closed) // relase everyone waiting on Done + c.cancel() // release anyone waiting on the context return err } diff --git a/net/conn/conn.go b/net/conn/conn.go index d75ea1e15..1763776b4 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -1,7 +1,6 @@ package conn import ( - "errors" "fmt" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -9,7 +8,6 @@ import ( ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net" - spipe "github.com/jbenet/go-ipfs/crypto/spipe" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) @@ -40,22 +38,20 @@ type singleConn struct { local *peer.Peer remote *peer.Peer maconn manet.Conn - - secure *spipe.SecurePipe - insecure *msgioPipe + msgio *msgioPipe ContextCloser } // newConn constructs a new connection func newSingleConn(ctx context.Context, local, remote *peer.Peer, - peers peer.Peerstore, maconn manet.Conn) (Conn, error) { + maconn manet.Conn) (Conn, error) { conn := &singleConn{ - local: local, - remote: remote, - maconn: maconn, - insecure: newMsgioPipe(10), + local: local, + remote: remote, + maconn: maconn, + msgio: newMsgioPipe(10), } conn.ContextCloser = NewContextCloser(ctx, conn.close) @@ -63,65 +59,19 @@ func newSingleConn(ctx context.Context, local, remote *peer.Peer, log.Info("newSingleConn: %v to %v", local, remote) // setup the various io goroutines - go conn.insecure.outgoing.WriteTo(maconn) - go conn.insecure.incoming.ReadFrom(maconn, MaxMessageSize) - - // perform secure handshake before returning this connection. - if err := conn.secureHandshake(peers); err != nil { - conn.Close() - return nil, err - } + go conn.msgio.outgoing.WriteTo(maconn) + go conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize) return conn, nil } -// secureHandshake performs the spipe secure handshake. -func (c *singleConn) secureHandshake(peers peer.Peerstore) error { - if c.secure != nil { - return errors.New("Conn is already secured or being secured.") - } - - // setup a Duplex pipe for spipe - insecure := spipe.Duplex{ - In: c.insecure.incoming.MsgChan, - Out: c.insecure.outgoing.MsgChan, - } - - // spipe performs the secure handshake, which takes multiple RTT - sp, err := spipe.NewSecurePipe(c.Context(), 10, c.local, peers, insecure) - if err != nil { - return err - } - - // assign it into the conn object - c.secure = sp - - if c.remote == nil { - c.remote = c.secure.RemotePeer() - - } else if c.remote != c.secure.RemotePeer() { - // this panic is here because this would be an insidious programmer error - // that we need to ensure we catch. - log.Error("%v != %v", c.remote, c.secure.RemotePeer()) - panic("peers not being constructed correctly.") - } - - return nil -} - // close is the internal close function, called by ContextCloser.Close func (c *singleConn) close() error { log.Debug("%s closing Conn with %s", c.local, c.remote) // close underlying connection err := c.maconn.Close() - - // closing channels - c.insecure.outgoing.Close() - if c.secure != nil { // may never have gotten here. - c.secure.Close() - } - + c.msgio.outgoing.Close() return err } @@ -137,12 +87,12 @@ func (c *singleConn) RemotePeer() *peer.Peer { // In returns a readable message channel func (c *singleConn) In() <-chan []byte { - return c.secure.In + return c.msgio.incoming.MsgChan } // Out returns a writable message channel func (c *singleConn) Out() chan<- []byte { - return c.secure.Out + return c.msgio.outgoing.MsgChan } // Dialer is an object that can open connections. We could have a "convenience" @@ -186,7 +136,12 @@ func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (C log.Error("Error putting peer into peerstore: %s", remote) } - return newSingleConn(ctx, d.LocalPeer, remote, d.Peerstore, maconn) + c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn) + if err != nil { + return nil, err + } + + return newSecureConn(ctx, c, d.Peerstore) } // listener is an object that can accept connections. It implements Listener @@ -240,13 +195,21 @@ func (l *listener) listen() { // handle is a goroutine work function that handles the handshake. // it's here only so that accepting new connections can happen quickly. handle := func(maconn manet.Conn) { - c, err := newSingleConn(l.Context(), l.local, nil, l.peers, maconn) + defer func() { <-sem }() // release + + c, err := newSingleConn(l.Context(), l.local, nil, maconn) if err != nil { log.Error("Error accepting connection: %v", err) - } else { - l.conns <- c + return } - <-sem // release + + sc, err := newSecureConn(l.Context(), c, l.peers) + if err != nil { + log.Error("Error securing connection: %v", err) + return + } + + l.conns <- sc } for { diff --git a/net/conn/conn_test.go b/net/conn/conn_test.go index 0e758fd69..715c7a900 100644 --- a/net/conn/conn_test.go +++ b/net/conn/conn_test.go @@ -9,154 +9,9 @@ import ( "testing" "time" - ci "github.com/jbenet/go-ipfs/crypto" - peer "github.com/jbenet/go-ipfs/peer" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) -func setupPeer(addr string) (*peer.Peer, error) { - tcp, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) - if err != nil { - return nil, err - } - - id, err := peer.IDFromPubKey(pk) - if err != nil { - return nil, err - } - - p := &peer.Peer{ID: id} - p.PrivKey = sk - p.PubKey = pk - p.AddAddress(tcp) - return p, nil -} - -func echoListen(ctx context.Context, listener Listener) { - for { - select { - case <-ctx.Done(): - return - case c := <-listener.Accept(): - go echo(ctx, c) - } - } -} - -func echo(ctx context.Context, c Conn) { - for { - select { - case <-ctx.Done(): - return - case m := <-c.In(): - c.Out() <- m - } - } -} - -func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) { - - p1, err := setupPeer(a1) - if err != nil { - t.Fatal("error setting up peer", err) - } - - p2, err := setupPeer(a2) - if err != nil { - t.Fatal("error setting up peer", err) - } - - laddr := p1.NetAddress("tcp") - if laddr == nil { - t.Fatal("Listen address is nil.") - } - - l1, err := Listen(ctx, laddr, p1, peer.NewPeerstore()) - if err != nil { - t.Fatal(err) - } - - d2 := &Dialer{ - Peerstore: peer.NewPeerstore(), - LocalPeer: p2, - } - - c2, err := d2.Dial(ctx, "tcp", p1) - if err != nil { - t.Fatal("error dialing peer", err) - } - - c1 := <-l1.Accept() - - return c1, c2 -} - -func TestDialer(t *testing.T) { - - p1, err := setupPeer("/ip4/127.0.0.1/tcp/1234") - if err != nil { - t.Fatal("error setting up peer", err) - } - - p2, err := setupPeer("/ip4/127.0.0.1/tcp/3456") - if err != nil { - t.Fatal("error setting up peer", err) - } - - ctx, cancel := context.WithCancel(context.Background()) - - laddr := p1.NetAddress("tcp") - if laddr == nil { - t.Fatal("Listen address is nil.") - } - - l, err := Listen(ctx, laddr, p1, peer.NewPeerstore()) - if err != nil { - t.Fatal(err) - } - - go echoListen(ctx, l) - - d := &Dialer{ - Peerstore: peer.NewPeerstore(), - LocalPeer: p2, - } - - c, err := d.Dial(ctx, "tcp", p1) - if err != nil { - t.Fatal("error dialing peer", err) - } - - // fmt.Println("sending") - c.Out() <- []byte("beep") - c.Out() <- []byte("boop") - - out := <-c.In() - // fmt.Println("recving", string(out)) - data := string(out) - if data != "beep" { - t.Error("unexpected conn output", data) - } - - out = <-c.In() - data = string(out) - if string(out) != "boop" { - t.Error("unexpected conn output", data) - } - - // fmt.Println("closing") - c.Close() - l.Close() - cancel() -} - func TestClose(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/net/conn/dial_test.go b/net/conn/dial_test.go new file mode 100644 index 000000000..09f99d799 --- /dev/null +++ b/net/conn/dial_test.go @@ -0,0 +1,152 @@ +package conn + +import ( + "testing" + + ci "github.com/jbenet/go-ipfs/crypto" + peer "github.com/jbenet/go-ipfs/peer" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +func setupPeer(addr string) (*peer.Peer, error) { + tcp, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + + sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) + if err != nil { + return nil, err + } + + id, err := peer.IDFromPubKey(pk) + if err != nil { + return nil, err + } + + p := &peer.Peer{ID: id} + p.PrivKey = sk + p.PubKey = pk + p.AddAddress(tcp) + return p, nil +} + +func echoListen(ctx context.Context, listener Listener) { + for { + select { + case <-ctx.Done(): + return + case c := <-listener.Accept(): + go echo(ctx, c) + } + } +} + +func echo(ctx context.Context, c Conn) { + for { + select { + case <-ctx.Done(): + return + case m := <-c.In(): + c.Out() <- m + } + } +} + +func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) { + + p1, err := setupPeer(a1) + if err != nil { + t.Fatal("error setting up peer", err) + } + + p2, err := setupPeer(a2) + if err != nil { + t.Fatal("error setting up peer", err) + } + + laddr := p1.NetAddress("tcp") + if laddr == nil { + t.Fatal("Listen address is nil.") + } + + l1, err := Listen(ctx, laddr, p1, peer.NewPeerstore()) + if err != nil { + t.Fatal(err) + } + + d2 := &Dialer{ + Peerstore: peer.NewPeerstore(), + LocalPeer: p2, + } + + c2, err := d2.Dial(ctx, "tcp", p1) + if err != nil { + t.Fatal("error dialing peer", err) + } + + c1 := <-l1.Accept() + + return c1, c2 +} + +func TestDialer(t *testing.T) { + + p1, err := setupPeer("/ip4/127.0.0.1/tcp/1234") + if err != nil { + t.Fatal("error setting up peer", err) + } + + p2, err := setupPeer("/ip4/127.0.0.1/tcp/3456") + if err != nil { + t.Fatal("error setting up peer", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + laddr := p1.NetAddress("tcp") + if laddr == nil { + t.Fatal("Listen address is nil.") + } + + l, err := Listen(ctx, laddr, p1, peer.NewPeerstore()) + if err != nil { + t.Fatal(err) + } + + go echoListen(ctx, l) + + d := &Dialer{ + Peerstore: peer.NewPeerstore(), + LocalPeer: p2, + } + + c, err := d.Dial(ctx, "tcp", p1) + if err != nil { + t.Fatal("error dialing peer", err) + } + + // fmt.Println("sending") + c.Out() <- []byte("beep") + c.Out() <- []byte("boop") + + out := <-c.In() + // fmt.Println("recving", string(out)) + data := string(out) + if data != "beep" { + t.Error("unexpected conn output", data) + } + + out = <-c.In() + data = string(out) + if string(out) != "boop" { + t.Error("unexpected conn output", data) + } + + // fmt.Println("closing") + c.Close() + l.Close() + cancel() +} diff --git a/net/conn/secure_conn.go b/net/conn/secure_conn.go new file mode 100644 index 000000000..0a1d89b66 --- /dev/null +++ b/net/conn/secure_conn.go @@ -0,0 +1,113 @@ +package conn + +import ( + "errors" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + spipe "github.com/jbenet/go-ipfs/crypto/spipe" + peer "github.com/jbenet/go-ipfs/peer" +) + +// secureConn wraps another Conn object with an encrypted channel. +type secureConn struct { + + // the wrapped conn + insecure Conn + + // secure pipe, wrapping insecure + secure *spipe.SecurePipe + + ContextCloser +} + +// newConn constructs a new connection +func newSecureConn(ctx context.Context, insecure Conn, peers peer.Peerstore) (Conn, error) { + + conn := &secureConn{ + insecure: insecure, + } + conn.ContextCloser = NewContextCloser(ctx, conn.close) + + log.Debug("newSecureConn: %v to %v", insecure.LocalPeer(), insecure.RemotePeer()) + // perform secure handshake before returning this connection. + if err := conn.secureHandshake(peers); err != nil { + conn.Close() + return nil, err + } + log.Debug("newSecureConn: %v to %v handshake success!", insecure.LocalPeer(), insecure.RemotePeer()) + + return conn, nil +} + +// secureHandshake performs the spipe secure handshake. +func (c *secureConn) secureHandshake(peers peer.Peerstore) error { + if c.secure != nil { + return errors.New("Conn is already secured or being secured.") + } + + // ok to panic here if this type assertion fails. Interface hack. + // when we support wrapping other Conns, we'll need to change + // spipe to do something else. + insecureSC := c.insecure.(*singleConn) + + // setup a Duplex pipe for spipe + insecureD := spipe.Duplex{ + In: insecureSC.msgio.incoming.MsgChan, + Out: insecureSC.msgio.outgoing.MsgChan, + } + + // spipe performs the secure handshake, which takes multiple RTT + sp, err := spipe.NewSecurePipe(c.Context(), 10, c.LocalPeer(), peers, insecureD) + if err != nil { + return err + } + + // assign it into the conn object + c.secure = sp + + // if we do not know RemotePeer, get it from secure chan (who identifies it) + if insecureSC.remote == nil { + insecureSC.remote = c.secure.RemotePeer() + + } else if insecureSC.remote != c.secure.RemotePeer() { + // this panic is here because this would be an insidious programmer error + // that we need to ensure we catch. + // update: this actually might happen under normal operation-- should + // perhaps return an error. TBD. + + log.Error("secureConn peer mismatch. %v != %v", insecureSC.remote, c.secure.RemotePeer()) + panic("secureConn peer mismatch. consructed incorrectly?") + } + + return nil +} + +// close is called by ContextCloser +func (c *secureConn) close() error { + err := c.insecure.Close() + if c.secure != nil { // may never have gotten here. + err = c.secure.Close() + } + return err +} + +// LocalPeer is the Peer on this side +func (c *secureConn) LocalPeer() *peer.Peer { + return c.insecure.LocalPeer() +} + +// RemotePeer is the Peer on the remote side +func (c *secureConn) RemotePeer() *peer.Peer { + return c.insecure.RemotePeer() +} + +// In returns a readable message channel +func (c *secureConn) In() <-chan []byte { + return c.secure.In +} + +// Out returns a writable message channel +func (c *secureConn) Out() chan<- []byte { + return c.secure.Out +} diff --git a/net/conn/secure_conn_test.go b/net/conn/secure_conn_test.go new file mode 100644 index 000000000..9a1dd429a --- /dev/null +++ b/net/conn/secure_conn_test.go @@ -0,0 +1,154 @@ +package conn + +import ( + "bytes" + "fmt" + "runtime" + "strconv" + "sync" + "testing" + "time" + + peer "github.com/jbenet/go-ipfs/peer" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func setupSecureConn(t *testing.T, c Conn) Conn { + c, ok := c.(*secureConn) + if ok { + return c + } + + // shouldn't happen, because dial + listen already return secure conns. + s, err := newSecureConn(c.Context(), c, peer.NewPeerstore()) + if err != nil { + t.Fatal(err) + } + return s +} + +func TestSecureClose(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345") + + c1 = setupSecureConn(t, c1) + c2 = setupSecureConn(t, c2) + + select { + case <-c1.Done(): + t.Fatal("done before close") + case <-c2.Done(): + t.Fatal("done before close") + default: + } + + c1.Close() + + select { + case <-c1.Done(): + default: + t.Fatal("not done after cancel") + } + + c2.Close() + + select { + case <-c2.Done(): + default: + t.Fatal("not done after cancel") + } + + cancel() // close the listener :P +} + +func TestSecureCancel(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345") + + c1 = setupSecureConn(t, c1) + c2 = setupSecureConn(t, c2) + + select { + case <-c1.Done(): + t.Fatal("done before close") + case <-c2.Done(): + t.Fatal("done before close") + default: + } + + cancel() + + // wait to ensure other goroutines run and close things. + <-time.After(time.Microsecond * 10) + // test that cancel called Close. + + select { + case <-c1.Done(): + default: + t.Fatal("not done after cancel") + } + + select { + case <-c2.Done(): + default: + t.Fatal("not done after cancel") + } + +} + +func TestSecureCloseLeak(t *testing.T) { + + var wg sync.WaitGroup + + runPair := func(p1, p2, num int) { + a1 := strconv.Itoa(p1) + a2 := strconv.Itoa(p2) + ctx, cancel := context.WithCancel(context.Background()) + c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2) + + c1 = setupSecureConn(t, c1) + c2 = setupSecureConn(t, c2) + + for i := 0; i < num; i++ { + b1 := []byte("beep") + c1.Out() <- b1 + b2 := <-c2.In() + if !bytes.Equal(b1, b2) { + panic("bytes not equal") + } + + b2 = []byte("boop") + c2.Out() <- b2 + b1 = <-c1.In() + if !bytes.Equal(b1, b2) { + panic("bytes not equal") + } + + <-time.After(time.Microsecond * 5) + } + + cancel() // close the listener + wg.Done() + } + + var cons = 20 + var msgs = 100 + fmt.Printf("Running %d connections * %d msgs.\n", cons, msgs) + for i := 0; i < cons; i++ { + wg.Add(1) + go runPair(2000+i, 2001+i, msgs) + } + + fmt.Printf("Waiting...\n") + wg.Wait() + // done! + + <-time.After(time.Microsecond * 100) + if runtime.NumGoroutine() > 10 { + // panic("uncomment me to debug") + t.Fatal("leaking goroutines:", runtime.NumGoroutine()) + } +}