From 57c7ffab44a26c720dfd5c70c175e499b9f0bc0c Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 19 Dec 2014 19:38:25 -0800 Subject: [PATCH] conn: with new peer.ID --- net/conn/conn.go | 14 +- net/conn/conn_test.go | 22 +-- net/conn/dial.go | 91 +++++++----- net/conn/dial_test.go | 260 +++++++++++++++-------------------- net/conn/handshake.go | 38 +---- net/conn/interface.go | 31 ++--- net/conn/listen.go | 63 ++++----- net/conn/secure_conn.go | 21 ++- net/conn/secure_conn_test.go | 105 ++++++++++---- 9 files changed, 320 insertions(+), 325 deletions(-) diff --git a/net/conn/conn.go b/net/conn/conn.go index 0d8c8e00e..66c9f4815 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -32,14 +32,14 @@ func ReleaseBuffer(b []byte) { // singleConn represents a single connection to another Peer (IPFS Node). type singleConn struct { - local peer.Peer - remote peer.Peer + local peer.ID + remote peer.ID maconn manet.Conn msgrw msgio.ReadWriteCloser } // newConn constructs a new connection -func newSingleConn(ctx context.Context, local, remote peer.Peer, maconn manet.Conn) (Conn, error) { +func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn) (Conn, error) { conn := &singleConn{ local: local, @@ -105,12 +105,12 @@ func (c *singleConn) RemoteMultiaddr() ma.Multiaddr { } // LocalPeer is the Peer on this side -func (c *singleConn) LocalPeer() peer.Peer { +func (c *singleConn) LocalPeer() peer.ID { return c.local } // RemotePeer is the Peer on the remote side -func (c *singleConn) RemotePeer() peer.Peer { +func (c *singleConn) RemotePeer() peer.ID { return c.remote } @@ -145,8 +145,8 @@ func (c *singleConn) ReleaseMsg(m []byte) { // ID returns the ID of a given Conn. func ID(c Conn) string { - l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().ID()) - r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().ID()) + l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().Pretty()) + r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().Pretty()) lh := u.Hash([]byte(l)) rh := u.Hash([]byte(r)) ch := u.XOR(lh, rh) diff --git a/net/conn/conn_test.go b/net/conn/conn_test.go index cda64088e..5d002b97d 100644 --- a/net/conn/conn_test.go +++ b/net/conn/conn_test.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "runtime" - "strconv" "sync" "testing" "time" @@ -14,6 +13,7 @@ import ( ) func testOneSendRecv(t *testing.T, c1, c2 Conn) { + log.Debugf("testOneSendRecv from %s to %s", c1.LocalPeer(), c2.LocalPeer()) m1 := []byte("hello") if err := c1.WriteMsg(m1); err != nil { t.Fatal(err) @@ -41,8 +41,9 @@ func testNotOneSendRecv(t *testing.T, c1, c2 Conn) { func TestClose(t *testing.T) { // t.Skip("Skipping in favor of another test") - ctx := context.Background() - c1, c2 := setupSingleConn(t, ctx, "/ip4/127.0.0.1/tcp/5534", "/ip4/127.0.0.1/tcp/5545") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c1, c2, _, _ := setupSingleConn(t, ctx) testOneSendRecv(t, c1, c2) testOneSendRecv(t, c2, c1) @@ -56,6 +57,7 @@ func TestClose(t *testing.T) { } func TestCloseLeak(t *testing.T) { + // t.Skip("Skipping in favor of another test") if testing.Short() { t.SkipNow() } @@ -66,11 +68,9 @@ func TestCloseLeak(t *testing.T) { var wg sync.WaitGroup - runPair := func(p1, p2, num int) { - a1 := strconv.Itoa(p1) - a2 := strconv.Itoa(p2) + runPair := func(num int) { ctx, cancel := context.WithCancel(context.Background()) - c1, c2 := setupSingleConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2) + c1, c2, _, _ := setupSingleConn(t, ctx) for i := 0; i < num; i++ { b1 := []byte(fmt.Sprintf("beep%d", i)) @@ -102,15 +102,15 @@ func TestCloseLeak(t *testing.T) { wg.Done() } - var cons = 1 + var cons = 10 var msgs = 100 - fmt.Printf("Running %d connections * %d msgs.\n", cons, msgs) + log.Debugf("Running %d connections * %d msgs.\n", cons, msgs) for i := 0; i < cons; i++ { wg.Add(1) - go runPair(2000+i, 2001+i, msgs) + go runPair(msgs) } - fmt.Printf("Waiting...\n") + log.Debugf("Waiting...\n") wg.Wait() // done! diff --git a/net/conn/dial.go b/net/conn/dial.go index aa41d088a..5eed05d06 100644 --- a/net/conn/dial.go +++ b/net/conn/dial.go @@ -1,6 +1,7 @@ package conn import ( + "fmt" "strings" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -11,49 +12,32 @@ import ( debugerror "github.com/jbenet/go-ipfs/util/debugerror" ) -// Dial connects to a particular peer, over a given network -// Example: d.Dial(ctx, "udp", peer) -func (d *Dialer) Dial(ctx context.Context, network string, remote peer.Peer) (Conn, error) { - raddr := remote.NetAddress(network) - if raddr == nil { - return nil, debugerror.Errorf("No remote address for network %s", network) - } - return d.DialAddr(ctx, raddr, remote) +// String returns the string rep of d. +func (d *Dialer) String() string { + return fmt.Sprintf("", d.LocalPeer, d.LocalAddrs[0]) } -// DialAddr connects to a peer over a particular address +// Dial connects to a peer over a particular address // Ensures raddr is part of peer.Addresses() // Example: d.DialAddr(ctx, peer.Addresses()[0], peer) -func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.Peer) (Conn, error) { - - found := false - for _, addr := range remote.Addresses() { - if addr.Equal(raddr) { - found = true - } - } - if !found { - return nil, debugerror.Errorf("address %s is not in peer %s", raddr, remote) - } +func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) { network, _, err := manet.DialArgs(raddr) if err != nil { return nil, err } - laddr := d.LocalPeer.NetAddress(network) - if laddr == nil { - return nil, debugerror.Errorf("No local address for network %s", network) - } - if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") { return nil, debugerror.Errorf("Attempted to connect to zero address: %s", raddr) } - remote.SetType(peer.Remote) - remote, err = d.Peerstore.Add(remote) - if err != nil { - log.Errorf("Error putting peer into peerstore: %s", remote) + var laddr ma.Multiaddr + if len(d.LocalAddrs) > 0 { + // laddr := MultiaddrNetMatch(raddr, d.LocalAddrs) + laddr = NetAddress(network, d.LocalAddrs) + if laddr == nil { + return nil, debugerror.Errorf("No local address for network %s", network) + } } // TODO: try to get reusing addr/ports to work. @@ -69,7 +53,7 @@ func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.P select { case <-ctx.Done(): maconn.Close() - return nil, err + return nil, ctx.Err() default: } @@ -78,17 +62,58 @@ func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.P return nil, err } - if d.WithoutSecureTransport { + if d.PrivateKey == nil { + log.Warning("dialer %s dialing INSECURELY %s at %s!", d, remote, raddr) return c, nil } select { case <-ctx.Done(): c.Close() - return nil, err + return nil, ctx.Err() default: } // return c, nil - return newSecureConn(ctx, c, d.Peerstore) + return newSecureConn(ctx, d.PrivateKey, c) +} + +// MultiaddrProtocolsMatch returns whether two multiaddrs match in protocol stacks. +func MultiaddrProtocolsMatch(a, b ma.Multiaddr) bool { + ap := a.Protocols() + bp := b.Protocols() + + if len(ap) != len(bp) { + return false + } + + for i, api := range ap { + if api != bp[i] { + return false + } + } + + return true +} + +// MultiaddrNetMatch returns the first Multiaddr found to match network. +func MultiaddrNetMatch(tgt ma.Multiaddr, srcs []ma.Multiaddr) ma.Multiaddr { + for _, a := range srcs { + if MultiaddrProtocolsMatch(tgt, a) { + return a + } + } + return nil +} + +// NetAddress returns the first Multiaddr found for a given network. +func NetAddress(n string, addrs []ma.Multiaddr) ma.Multiaddr { + for _, a := range addrs { + for _, p := range a.Protocols() { + if p.Name == n { + return a + } + } + } + return nil } diff --git a/net/conn/dial_test.go b/net/conn/dial_test.go index 85377fb7a..52d653c69 100644 --- a/net/conn/dial_test.go +++ b/net/conn/dial_test.go @@ -1,8 +1,13 @@ package conn import ( + "bytes" + "errors" + "fmt" "io" + "net" "testing" + "time" ci "github.com/jbenet/go-ipfs/crypto" peer "github.com/jbenet/go-ipfs/peer" @@ -12,35 +17,81 @@ import ( ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) -func setupPeer(addr string) (peer.Peer, error) { - tcp, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err +type peerParams struct { + ID peer.ID + PrivKey ci.PrivKey + PubKey ci.PubKey + Addr ma.Multiaddr +} + +func (p *peerParams) checkKeys() error { + if !p.ID.MatchesPrivateKey(p.PrivKey) { + return errors.New("p.ID does not match p.PrivKey") } - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) - if err != nil { - return nil, err + if !p.ID.MatchesPublicKey(p.PubKey) { + return errors.New("p.ID does not match p.PubKey") } - p, err := testutil.NewPeerWithKeyPair(sk, pk) + var buf bytes.Buffer + buf.Write([]byte("hello world. this is me, I swear.")) + b := buf.Bytes() + + sig, err := p.PrivKey.Sign(b) if err != nil { - return nil, err + return fmt.Errorf("sig signing failed: %s", err) } - p.AddAddress(tcp) - return p, nil + + sigok, err := p.PubKey.Verify(b, sig) + if err != nil { + return fmt.Errorf("sig verify failed: %s", err) + } + if !sigok { + return fmt.Errorf("sig verify failed: sig invalid!") + } + + return nil // ok. move along. +} + +func randomPeer(t *testing.T) (p peerParams) { + var err error + p.Addr = testutil.RandLocalTCPAddress() + p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512) + if err != nil { + t.Fatal(err) + } + + p.ID, err = peer.IDFromPublicKey(p.PubKey) + if err != nil { + t.Fatal(err) + } + + if err := p.checkKeys(); err != nil { + t.Fatal(err) + } + return p } func echoListen(ctx context.Context, listener Listener) { for { c, err := listener.Accept() if err != nil { + select { case <-ctx.Done(): return default: } + + if ne, ok := err.(net.Error); ok && ne.Temporary() { + <-time.After(time.Microsecond * 10) + continue + } + + log.Debugf("echoListen: listener appears to be closing") + return } + go echo(c.(Conn)) } } @@ -49,106 +100,86 @@ func echo(c Conn) { io.Copy(c, c) } -func setupSecureConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) { - return setupConn(t, ctx, a1, a2, true) +func setupSecureConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 peerParams) { + return setupConn(t, ctx, true) } -func setupSingleConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) { - return setupConn(t, ctx, a1, a2, false) +func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 peerParams) { + return setupConn(t, ctx, false) } -func setupConn(t *testing.T, ctx context.Context, a1, a2 string, secure bool) (a, b Conn) { +func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 peerParams) { - p1, err := setupPeer(a1) - if err != nil { - t.Fatal("error setting up peer", err) + p1 = randomPeer(t) + p2 = randomPeer(t) + laddr := p1.Addr + + key1 := p1.PrivKey + key2 := p2.PrivKey + if !secure { + key1 = nil + key2 = nil } - - p2, err := setupPeer(a2) - if err != nil { - t.Fatal("error setting up peer", err) - } - - laddr := p1.NetAddress("tcp") - if laddr == nil { - t.Fatal("Listen address is nil.") - } - - ps1 := peer.NewPeerstore() - ps2 := peer.NewPeerstore() - ps1.Add(p1) - ps2.Add(p2) - - l1, err := Listen(ctx, laddr, p1, ps1) - l1.SetWithoutSecureTransport(!secure) + l1, err := Listen(ctx, laddr, p1.ID, key1) if err != nil { t.Fatal(err) } d2 := &Dialer{ - Peerstore: ps2, - LocalPeer: p2, - WithoutSecureTransport: !secure, + LocalPeer: p2.ID, + PrivateKey: key2, } var c2 Conn - done := make(chan struct{}) + done := make(chan error) go func() { - c2, err = d2.Dial(ctx, "tcp", p1) + var err error + c2, err = d2.Dial(ctx, p1.Addr, p1.ID) if err != nil { - t.Fatal("error dialing peer", err) + done <- err } - done <- struct{}{} + close(done) }() c1, err := l1.Accept() if err != nil { - t.Fatal("failed to accept") + t.Fatal("failed to accept", err) + } + if err := <-done; err != nil { + t.Fatal(err) } - <-done - return c1.(Conn), c2 + return c1.(Conn), c2, p1, p2 } -func TestDialer(t *testing.T) { +func testDialer(t *testing.T, secure bool) { // t.Skip("Skipping in favor of another test") - p1, err := setupPeer("/ip4/127.0.0.1/tcp/4234") - if err != nil { - t.Fatal("error setting up peer", err) - } + p1 := randomPeer(t) + p2 := randomPeer(t) - p2, err := setupPeer("/ip4/127.0.0.1/tcp/4235") - if err != nil { - t.Fatal("error setting up peer", err) + key1 := p1.PrivKey + key2 := p2.PrivKey + if !secure { + key1 = nil + key2 = nil } ctx, cancel := context.WithCancel(context.Background()) - - laddr := p1.NetAddress("tcp") - if laddr == nil { - t.Fatal("Listen address is nil.") - } - - ps1 := peer.NewPeerstore() - ps2 := peer.NewPeerstore() - ps1.Add(p1) - ps2.Add(p2) - - l, err := Listen(ctx, laddr, p1, ps1) + l1, err := Listen(ctx, p1.Addr, p1.ID, key1) if err != nil { t.Fatal(err) } - go echoListen(ctx, l) - - d := &Dialer{ - Peerstore: ps2, - LocalPeer: p2, + d2 := &Dialer{ + LocalPeer: p2.ID, + PrivateKey: key2, } - c, err := d.Dial(ctx, "tcp", p1) + go echoListen(ctx, l1) + + c, err := d2.Dial(ctx, p1.Addr, p1.ID) if err != nil { t.Fatal("error dialing peer", err) } @@ -180,83 +211,16 @@ func TestDialer(t *testing.T) { // fmt.Println("closing") c.Close() - l.Close() + l1.Close() cancel() } -func TestDialAddr(t *testing.T) { +func TestDialerInsecure(t *testing.T) { // t.Skip("Skipping in favor of another test") - - p1, err := setupPeer("/ip4/127.0.0.1/tcp/4334") - if err != nil { - t.Fatal("error setting up peer", err) - } - - p2, err := setupPeer("/ip4/127.0.0.1/tcp/4335") - if err != nil { - t.Fatal("error setting up peer", err) - } - - ctx, cancel := context.WithCancel(context.Background()) - - laddr := p1.NetAddress("tcp") - if laddr == nil { - t.Fatal("Listen address is nil.") - } - - ps1 := peer.NewPeerstore() - ps2 := peer.NewPeerstore() - ps1.Add(p1) - ps2.Add(p2) - - l, err := Listen(ctx, laddr, p1, ps1) - if err != nil { - t.Fatal(err) - } - - go echoListen(ctx, l) - - d := &Dialer{ - Peerstore: ps2, - LocalPeer: p2, - } - - raddr := p1.NetAddress("tcp") - if raddr == nil { - t.Fatal("Dial address is nil.") - } - - c, err := d.DialAddr(ctx, raddr, p1) - if err != nil { - t.Fatal("error dialing peer", err) - } - - // fmt.Println("sending") - c.WriteMsg([]byte("beep")) - c.WriteMsg([]byte("boop")) - - out, err := c.ReadMsg() - if err != nil { - t.Fatal(err) - } - // fmt.Println("recving", string(out)) - data := string(out) - if data != "beep" { - t.Error("unexpected conn output", data) - } - - out, err = c.ReadMsg() - if err != nil { - t.Fatal(err) - } - - data = string(out) - if string(out) != "boop" { - t.Error("unexpected conn output", data) - } - - // fmt.Println("closing") - c.Close() - l.Close() - cancel() + testDialer(t, false) +} + +func TestDialerSecure(t *testing.T) { + // t.Skip("Skipping in favor of another test") + testDialer(t, true) } diff --git a/net/conn/handshake.go b/net/conn/handshake.go index e815a7e65..3a995bc9a 100644 --- a/net/conn/handshake.go +++ b/net/conn/handshake.go @@ -2,13 +2,12 @@ package conn import ( "fmt" - "io" handshake "github.com/jbenet/go-ipfs/net/handshake" hspb "github.com/jbenet/go-ipfs/net/handshake/pb" - ggprotoio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ggprotoio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" ) // Handshake1 exchanges local and remote versions and compares them @@ -51,38 +50,3 @@ func Handshake1(ctx context.Context, c Conn) error { log.Debugf("%s version handshake compatible %s", lpeer, rpeer) return nil } - -// Handshake3 exchanges local and remote service information -func Handshake3(ctx context.Context, stream io.ReadWriter, c Conn) (*handshake.Handshake3Result, error) { - rpeer := c.RemotePeer() - lpeer := c.LocalPeer() - - // setup up protobuf io - maxSize := 4096 - r := ggprotoio.NewDelimitedReader(stream, maxSize) - w := ggprotoio.NewDelimitedWriter(stream) - localH := handshake.Handshake3Msg(lpeer, c.RemoteMultiaddr()) - remoteH := new(hspb.Handshake3) - - // setup + send the message to remote - if err := w.WriteMsg(localH); err != nil { - return nil, err - } - log.Debugf("Handshake3: sent to %s", rpeer) - log.Event(ctx, "handshake3Sent", lpeer, rpeer) - - // wait + listen for response - if err := r.ReadMsg(remoteH); err != nil { - return nil, fmt.Errorf("Handshake3 could not receive remote msg: %q", err) - } - log.Debugf("Handshake3: received from %s", rpeer) - log.Event(ctx, "handshake3Received", lpeer, rpeer) - - // actually update our state based on the new knowledge - res, err := handshake.Handshake3Update(lpeer, rpeer, remoteH) - if err != nil { - log.Errorf("Handshake3 failed to update %s", rpeer) - } - res.RemoteObservedAddress = c.RemoteMultiaddr() - return res, nil -} diff --git a/net/conn/interface.go b/net/conn/interface.go index c07a9e254..fc95ea103 100644 --- a/net/conn/interface.go +++ b/net/conn/interface.go @@ -5,6 +5,7 @@ import ( "net" "time" + ic "github.com/jbenet/go-ipfs/crypto" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" @@ -20,13 +21,13 @@ type PeerConn interface { LocalMultiaddr() ma.Multiaddr // LocalPeer is the Peer on our side of the connection - LocalPeer() peer.Peer + LocalPeer() peer.ID // RemoteMultiaddr is the Multiaddr on the remote side RemoteMultiaddr() ma.Multiaddr // RemotePeer is the Peer on the remote side - RemotePeer() peer.Peer + RemotePeer() peer.ID } // Conn is a generic message-based Peer-to-Peer connection. @@ -54,16 +55,14 @@ type Conn interface { type Dialer struct { // LocalPeer is the identity of the local Peer. - LocalPeer peer.Peer + LocalPeer peer.ID - // Peerstore is the set of peers we know about locally. The Dialer needs it - // because when an incoming connection is identified, we should reuse the - // same peer objects (otherwise things get inconsistent). - Peerstore peer.Peerstore + // LocalAddrs is a set of local addresses to use. + LocalAddrs []ma.Multiaddr - // WithoutSecureTransport determines whether to initialize an insecure connection. - // Phrased negatively so default is Secure, and verbosely to be very clear. - WithoutSecureTransport bool + // PrivateKey used to initialize a secure connection. + // Warning: if PrivateKey is nil, connection will not be secured. + PrivateKey ic.PrivKey } // Listener is an object that can accept connections. It matches net.Listener @@ -72,11 +71,6 @@ type Listener interface { // Accept waits for and returns the next connection to the listener. Accept() (net.Conn, error) - // {Set}WithoutSecureTransport decides whether to start insecure connections. - // Phrased negatively so default is Secure, and verbosely to be very clear. - WithoutSecureTransport() bool - SetWithoutSecureTransport(bool) - // Addr is the local address Addr() net.Addr @@ -84,12 +78,7 @@ type Listener interface { Multiaddr() ma.Multiaddr // LocalPeer is the identity of the local Peer. - LocalPeer() peer.Peer - - // Peerstore is the set of peers we know about locally. The Listener needs it - // because when an incoming connection is identified, we should reuse the - // same peer objects (otherwise things get inconsistent). - Peerstore() peer.Peerstore + LocalPeer() peer.ID // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. diff --git a/net/conn/listen.go b/net/conn/listen.go index bae2168eb..17eb03dbe 100644 --- a/net/conn/listen.go +++ b/net/conn/listen.go @@ -5,31 +5,37 @@ import ( "net" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + ic "github.com/jbenet/go-ipfs/crypto" peer "github.com/jbenet/go-ipfs/peer" ) // listener is an object that can accept connections. It implements Listener type listener struct { - withoutSecureTransport bool - manet.Listener - // Local multiaddr to listen on - maddr ma.Multiaddr + maddr ma.Multiaddr // Local multiaddr to listen on + local peer.ID // LocalPeer is the identity of the local Peer + privk ic.PrivKey // private key to use to initialize secure conns - // LocalPeer is the identity of the local Peer. - local peer.Peer + cg ctxgroup.ContextGroup +} - // Peerstore is the set of peers we know about locally - peers peer.Peerstore +func (l *listener) teardown() error { + defer log.Debugf("listener closed: %s %s", l.local, l.maddr) + return l.Listener.Close() } func (l *listener) Close() error { - log.Infof("listener closing: %s %s", l.local, l.maddr) - return l.Listener.Close() + log.Debugf("listener closing: %s %s", l.local, l.maddr) + return l.cg.Close() +} + +func (l *listener) String() string { + return fmt.Sprintf("", l.local, l.maddr) } // Accept waits for and returns the next connection to the listener. @@ -46,29 +52,22 @@ func (l *listener) Accept() (net.Conn, error) { return nil, err } - c, err := newSingleConn(ctx, l.local, nil, maconn) + c, err := newSingleConn(ctx, l.local, "", maconn) if err != nil { return nil, fmt.Errorf("Error accepting connection: %v", err) } - if l.withoutSecureTransport { + if l.privk == nil { + log.Warning("listener %s listening INSECURELY!", l) return c, nil } - sc, err := newSecureConn(ctx, c, l.peers) + sc, err := newSecureConn(ctx, l.privk, c) if err != nil { return nil, fmt.Errorf("Error securing connection: %v", err) } return sc, nil } -func (l *listener) WithoutSecureTransport() bool { - return l.withoutSecureTransport -} - -func (l *listener) SetWithoutSecureTransport(b bool) { - l.withoutSecureTransport = b -} - func (l *listener) Addr() net.Addr { return l.Listener.Addr() } @@ -79,29 +78,22 @@ func (l *listener) Multiaddr() ma.Multiaddr { } // LocalPeer is the identity of the local Peer. -func (l *listener) LocalPeer() peer.Peer { +func (l *listener) LocalPeer() peer.ID { return l.local } -// Peerstore is the set of peers we know about locally. The Listener needs it -// because when an incoming connection is identified, we should reuse the -// same peer objects (otherwise things get inconsistent). -func (l *listener) Peerstore() peer.Peerstore { - return l.peers -} - func (l *listener) Loggable() map[string]interface{} { return map[string]interface{}{ "listener": map[string]interface{}{ - "peer": l.LocalPeer(), - "address": l.Multiaddr(), - "withoutSecureTransport": l.withoutSecureTransport, + "peer": l.LocalPeer(), + "address": l.Multiaddr(), + "secure": (l.privk != nil), }, } } // Listen listens on the particular multiaddr, with given peer and peerstore. -func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.Peerstore) (Listener, error) { +func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) { ml, err := manet.Listen(addr) if err != nil { @@ -111,10 +103,11 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer. l := &listener{ Listener: ml, maddr: addr, - peers: peers, local: local, - withoutSecureTransport: false, + privk: sk, + cg: ctxgroup.WithContext(ctx), } + l.cg.SetTeardown(l.teardown) log.Infof("swarm listening on %s\n", l.Multiaddr()) log.Event(ctx, "swarmListen", l) diff --git a/net/conn/secure_conn.go b/net/conn/secure_conn.go index 6437ea05f..2edea78c6 100644 --- a/net/conn/secure_conn.go +++ b/net/conn/secure_conn.go @@ -8,8 +8,10 @@ import ( msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + ic "github.com/jbenet/go-ipfs/crypto" secio "github.com/jbenet/go-ipfs/crypto/secio" peer "github.com/jbenet/go-ipfs/peer" + errors "github.com/jbenet/go-ipfs/util/debugerror" ) // secureConn wraps another Conn object with an encrypted channel. @@ -26,10 +28,21 @@ type secureConn struct { } // newConn constructs a new connection -func newSecureConn(ctx context.Context, insecure Conn, peers peer.Peerstore) (Conn, error) { +func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, error) { + + if insecure == nil { + return nil, errors.New("insecure is nil") + } + if insecure.LocalPeer() == "" { + return nil, errors.New("insecure.LocalPeer() is nil") + } + if sk == nil { + panic("way") + return nil, errors.New("private key is nil") + } // NewSession performs the secure handshake, which takes multiple RTT - sessgen := secio.SessionGenerator{Local: insecure.LocalPeer(), Peerstore: peers} + sessgen := secio.SessionGenerator{LocalID: insecure.LocalPeer(), PrivateKey: sk} session, err := sessgen.NewSession(ctx, insecure) if err != nil { return nil, err @@ -92,12 +105,12 @@ func (c *secureConn) RemoteMultiaddr() ma.Multiaddr { } // LocalPeer is the Peer on this side -func (c *secureConn) LocalPeer() peer.Peer { +func (c *secureConn) LocalPeer() peer.ID { return c.session.LocalPeer() } // RemotePeer is the Peer on the remote side -func (c *secureConn) RemotePeer() peer.Peer { +func (c *secureConn) RemotePeer() peer.ID { return c.session.RemotePeer() } diff --git a/net/conn/secure_conn_test.go b/net/conn/secure_conn_test.go index 9618624c4..0360f17e5 100644 --- a/net/conn/secure_conn_test.go +++ b/net/conn/secure_conn_test.go @@ -2,50 +2,73 @@ package conn import ( "bytes" - "fmt" "os" "runtime" - "strconv" "sync" "testing" "time" - peer "github.com/jbenet/go-ipfs/peer" + ic "github.com/jbenet/go-ipfs/crypto" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) -func upgradeToSecureConn(t *testing.T, ctx context.Context, c Conn) (Conn, error) { +func upgradeToSecureConn(t *testing.T, ctx context.Context, sk ic.PrivKey, c Conn) (Conn, error) { if c, ok := c.(*secureConn); ok { return c, nil } // shouldn't happen, because dial + listen already return secure conns. - s, err := newSecureConn(ctx, c, peer.NewPeerstore()) + s, err := newSecureConn(ctx, sk, c) if err != nil { return nil, err } return s, nil } -func secureHandshake(t *testing.T, ctx context.Context, c Conn, done chan error) { - _, err := upgradeToSecureConn(t, ctx, c) +func secureHandshake(t *testing.T, ctx context.Context, sk ic.PrivKey, c Conn, done chan error) { + _, err := upgradeToSecureConn(t, ctx, sk, c) done <- err } +func TestSecureSimple(t *testing.T) { + // t.Skip("Skipping in favor of another test") + + ctx := context.Background() + c1, c2, p1, p2 := setupSingleConn(t, ctx) + + done := make(chan error) + go secureHandshake(t, ctx, p1.PrivKey, c1, done) + go secureHandshake(t, ctx, p2.PrivKey, c2, done) + + for i := 0; i < 2; i++ { + if err := <-done; err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 100; i++ { + testOneSendRecv(t, c1, c2) + testOneSendRecv(t, c2, c1) + } + + c1.Close() + c2.Close() +} + func TestSecureClose(t *testing.T) { // t.Skip("Skipping in favor of another test") ctx := context.Background() - c1, c2 := setupSingleConn(t, ctx, "/ip4/127.0.0.1/tcp/6634", "/ip4/127.0.0.1/tcp/6645") + c1, c2, p1, p2 := setupSingleConn(t, ctx) done := make(chan error) - go secureHandshake(t, ctx, c1, done) - go secureHandshake(t, ctx, c2, done) + go secureHandshake(t, ctx, p1.PrivKey, c1, done) + go secureHandshake(t, ctx, p2.PrivKey, c2, done) for i := 0; i < 2; i++ { if err := <-done; err != nil { - t.Error(err) + t.Fatal(err) } } @@ -64,13 +87,13 @@ func TestSecureCancelHandshake(t *testing.T) { // t.Skip("Skipping in favor of another test") ctx, cancel := context.WithCancel(context.Background()) - c1, c2 := setupSingleConn(t, ctx, "/ip4/127.0.0.1/tcp/6634", "/ip4/127.0.0.1/tcp/6645") + c1, c2, p1, p2 := setupSingleConn(t, ctx) done := make(chan error) - go secureHandshake(t, ctx, c1, done) + go secureHandshake(t, ctx, p1.PrivKey, c1, done) <-time.After(50 * time.Millisecond) cancel() // cancel ctx - go secureHandshake(t, ctx, c2, done) + go secureHandshake(t, ctx, p2.PrivKey, c2, done) for i := 0; i < 2; i++ { if err := <-done; err == nil { @@ -79,6 +102,24 @@ func TestSecureCancelHandshake(t *testing.T) { } } +func TestSecureHandshakeFailsWithWrongKeys(t *testing.T) { + // t.Skip("Skipping in favor of another test") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c1, c2, p1, p2 := setupSingleConn(t, ctx) + + done := make(chan error) + go secureHandshake(t, ctx, p2.PrivKey, c1, done) + go secureHandshake(t, ctx, p1.PrivKey, c2, done) + + for i := 0; i < 2; i++ { + if err := <-done; err == nil { + t.Error("wrong keys should've errored out.") + } + } +} + func TestSecureCloseLeak(t *testing.T) { // t.Skip("Skipping in favor of another test") @@ -89,15 +130,11 @@ func TestSecureCloseLeak(t *testing.T) { t.Skip("this doesn't work well on travis") } - var wg sync.WaitGroup - - runPair := func(p1, p2, num int) { - a1 := strconv.Itoa(p1) - a2 := strconv.Itoa(p2) - ctx, cancel := context.WithCancel(context.Background()) - c1, c2 := setupSecureConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2) + runPair := func(c1, c2 Conn, num int) { + log.Debugf("runPair %d", num) for i := 0; i < num; i++ { + log.Debugf("runPair iteration %d", i) b1 := []byte("beep") c1.WriteMsg(b1) b2, err := c2.ReadMsg() @@ -120,22 +157,32 @@ func TestSecureCloseLeak(t *testing.T) { <-time.After(time.Microsecond * 5) } - - c1.Close() - c2.Close() - cancel() // close the listener - wg.Done() } var cons = 20 var msgs = 100 - fmt.Printf("Running %d connections * %d msgs.\n", cons, msgs) + log.Debugf("Running %d connections * %d msgs.\n", cons, msgs) + + var wg sync.WaitGroup for i := 0; i < cons; i++ { wg.Add(1) - go runPair(2000+i, 2001+i, msgs) + + ctx, cancel := context.WithCancel(context.Background()) + c1, c2, _, _ := setupSecureConn(t, ctx) + go func(c1, c2 Conn) { + + defer func() { + c1.Close() + c2.Close() + cancel() + wg.Done() + }() + + runPair(c1, c2, msgs) + }(c1, c2) } - fmt.Printf("Waiting...\n") + log.Debugf("Waiting...\n") wg.Wait() // done!