conn: with new peer.ID

This commit is contained in:
Juan Batiz-Benet 2014-12-19 19:38:25 -08:00
parent fbee577d3b
commit 57c7ffab44
9 changed files with 320 additions and 325 deletions

View File

@ -32,14 +32,14 @@ func ReleaseBuffer(b []byte) {
// singleConn represents a single connection to another Peer (IPFS Node).
type singleConn struct {
local peer.Peer
remote peer.Peer
local peer.ID
remote peer.ID
maconn manet.Conn
msgrw msgio.ReadWriteCloser
}
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote peer.Peer, maconn manet.Conn) (Conn, error) {
func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn) (Conn, error) {
conn := &singleConn{
local: local,
@ -105,12 +105,12 @@ func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
}
// LocalPeer is the Peer on this side
func (c *singleConn) LocalPeer() peer.Peer {
func (c *singleConn) LocalPeer() peer.ID {
return c.local
}
// RemotePeer is the Peer on the remote side
func (c *singleConn) RemotePeer() peer.Peer {
func (c *singleConn) RemotePeer() peer.ID {
return c.remote
}
@ -145,8 +145,8 @@ func (c *singleConn) ReleaseMsg(m []byte) {
// ID returns the ID of a given Conn.
func ID(c Conn) string {
l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().ID())
r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().ID())
l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().Pretty())
r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().Pretty())
lh := u.Hash([]byte(l))
rh := u.Hash([]byte(r))
ch := u.XOR(lh, rh)

View File

@ -5,7 +5,6 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"sync"
"testing"
"time"
@ -14,6 +13,7 @@ import (
)
func testOneSendRecv(t *testing.T, c1, c2 Conn) {
log.Debugf("testOneSendRecv from %s to %s", c1.LocalPeer(), c2.LocalPeer())
m1 := []byte("hello")
if err := c1.WriteMsg(m1); err != nil {
t.Fatal(err)
@ -41,8 +41,9 @@ func testNotOneSendRecv(t *testing.T, c1, c2 Conn) {
func TestClose(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx := context.Background()
c1, c2 := setupSingleConn(t, ctx, "/ip4/127.0.0.1/tcp/5534", "/ip4/127.0.0.1/tcp/5545")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c1, c2, _, _ := setupSingleConn(t, ctx)
testOneSendRecv(t, c1, c2)
testOneSendRecv(t, c2, c1)
@ -56,6 +57,7 @@ func TestClose(t *testing.T) {
}
func TestCloseLeak(t *testing.T) {
// t.Skip("Skipping in favor of another test")
if testing.Short() {
t.SkipNow()
}
@ -66,11 +68,9 @@ func TestCloseLeak(t *testing.T) {
var wg sync.WaitGroup
runPair := func(p1, p2, num int) {
a1 := strconv.Itoa(p1)
a2 := strconv.Itoa(p2)
runPair := func(num int) {
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupSingleConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2)
c1, c2, _, _ := setupSingleConn(t, ctx)
for i := 0; i < num; i++ {
b1 := []byte(fmt.Sprintf("beep%d", i))
@ -102,15 +102,15 @@ func TestCloseLeak(t *testing.T) {
wg.Done()
}
var cons = 1
var cons = 10
var msgs = 100
fmt.Printf("Running %d connections * %d msgs.\n", cons, msgs)
log.Debugf("Running %d connections * %d msgs.\n", cons, msgs)
for i := 0; i < cons; i++ {
wg.Add(1)
go runPair(2000+i, 2001+i, msgs)
go runPair(msgs)
}
fmt.Printf("Waiting...\n")
log.Debugf("Waiting...\n")
wg.Wait()
// done!

View File

@ -1,6 +1,7 @@
package conn
import (
"fmt"
"strings"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
@ -11,49 +12,32 @@ import (
debugerror "github.com/jbenet/go-ipfs/util/debugerror"
)
// Dial connects to a particular peer, over a given network
// Example: d.Dial(ctx, "udp", peer)
func (d *Dialer) Dial(ctx context.Context, network string, remote peer.Peer) (Conn, error) {
raddr := remote.NetAddress(network)
if raddr == nil {
return nil, debugerror.Errorf("No remote address for network %s", network)
}
return d.DialAddr(ctx, raddr, remote)
// String returns the string rep of d.
func (d *Dialer) String() string {
return fmt.Sprintf("<Dialer %s %s ...>", d.LocalPeer, d.LocalAddrs[0])
}
// DialAddr connects to a peer over a particular address
// Dial connects to a peer over a particular address
// Ensures raddr is part of peer.Addresses()
// Example: d.DialAddr(ctx, peer.Addresses()[0], peer)
func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.Peer) (Conn, error) {
found := false
for _, addr := range remote.Addresses() {
if addr.Equal(raddr) {
found = true
}
}
if !found {
return nil, debugerror.Errorf("address %s is not in peer %s", raddr, remote)
}
func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) {
network, _, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
laddr := d.LocalPeer.NetAddress(network)
if laddr == nil {
return nil, debugerror.Errorf("No local address for network %s", network)
}
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
return nil, debugerror.Errorf("Attempted to connect to zero address: %s", raddr)
}
remote.SetType(peer.Remote)
remote, err = d.Peerstore.Add(remote)
if err != nil {
log.Errorf("Error putting peer into peerstore: %s", remote)
var laddr ma.Multiaddr
if len(d.LocalAddrs) > 0 {
// laddr := MultiaddrNetMatch(raddr, d.LocalAddrs)
laddr = NetAddress(network, d.LocalAddrs)
if laddr == nil {
return nil, debugerror.Errorf("No local address for network %s", network)
}
}
// TODO: try to get reusing addr/ports to work.
@ -69,7 +53,7 @@ func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.P
select {
case <-ctx.Done():
maconn.Close()
return nil, err
return nil, ctx.Err()
default:
}
@ -78,17 +62,58 @@ func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.P
return nil, err
}
if d.WithoutSecureTransport {
if d.PrivateKey == nil {
log.Warning("dialer %s dialing INSECURELY %s at %s!", d, remote, raddr)
return c, nil
}
select {
case <-ctx.Done():
c.Close()
return nil, err
return nil, ctx.Err()
default:
}
// return c, nil
return newSecureConn(ctx, c, d.Peerstore)
return newSecureConn(ctx, d.PrivateKey, c)
}
// MultiaddrProtocolsMatch returns whether two multiaddrs match in protocol stacks.
func MultiaddrProtocolsMatch(a, b ma.Multiaddr) bool {
ap := a.Protocols()
bp := b.Protocols()
if len(ap) != len(bp) {
return false
}
for i, api := range ap {
if api != bp[i] {
return false
}
}
return true
}
// MultiaddrNetMatch returns the first Multiaddr found to match network.
func MultiaddrNetMatch(tgt ma.Multiaddr, srcs []ma.Multiaddr) ma.Multiaddr {
for _, a := range srcs {
if MultiaddrProtocolsMatch(tgt, a) {
return a
}
}
return nil
}
// NetAddress returns the first Multiaddr found for a given network.
func NetAddress(n string, addrs []ma.Multiaddr) ma.Multiaddr {
for _, a := range addrs {
for _, p := range a.Protocols() {
if p.Name == n {
return a
}
}
}
return nil
}

View File

@ -1,8 +1,13 @@
package conn
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"testing"
"time"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
@ -12,35 +17,81 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func setupPeer(addr string) (peer.Peer, error) {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
type peerParams struct {
ID peer.ID
PrivKey ci.PrivKey
PubKey ci.PubKey
Addr ma.Multiaddr
}
func (p *peerParams) checkKeys() error {
if !p.ID.MatchesPrivateKey(p.PrivKey) {
return errors.New("p.ID does not match p.PrivKey")
}
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
return nil, err
if !p.ID.MatchesPublicKey(p.PubKey) {
return errors.New("p.ID does not match p.PubKey")
}
p, err := testutil.NewPeerWithKeyPair(sk, pk)
var buf bytes.Buffer
buf.Write([]byte("hello world. this is me, I swear."))
b := buf.Bytes()
sig, err := p.PrivKey.Sign(b)
if err != nil {
return nil, err
return fmt.Errorf("sig signing failed: %s", err)
}
p.AddAddress(tcp)
return p, nil
sigok, err := p.PubKey.Verify(b, sig)
if err != nil {
return fmt.Errorf("sig verify failed: %s", err)
}
if !sigok {
return fmt.Errorf("sig verify failed: sig invalid!")
}
return nil // ok. move along.
}
func randomPeer(t *testing.T) (p peerParams) {
var err error
p.Addr = testutil.RandLocalTCPAddress()
p.PrivKey, p.PubKey, err = ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
t.Fatal(err)
}
p.ID, err = peer.IDFromPublicKey(p.PubKey)
if err != nil {
t.Fatal(err)
}
if err := p.checkKeys(); err != nil {
t.Fatal(err)
}
return p
}
func echoListen(ctx context.Context, listener Listener) {
for {
c, err := listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return
default:
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
<-time.After(time.Microsecond * 10)
continue
}
log.Debugf("echoListen: listener appears to be closing")
return
}
go echo(c.(Conn))
}
}
@ -49,106 +100,86 @@ func echo(c Conn) {
io.Copy(c, c)
}
func setupSecureConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) {
return setupConn(t, ctx, a1, a2, true)
func setupSecureConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 peerParams) {
return setupConn(t, ctx, true)
}
func setupSingleConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) {
return setupConn(t, ctx, a1, a2, false)
func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 peerParams) {
return setupConn(t, ctx, false)
}
func setupConn(t *testing.T, ctx context.Context, a1, a2 string, secure bool) (a, b Conn) {
func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 peerParams) {
p1, err := setupPeer(a1)
if err != nil {
t.Fatal("error setting up peer", err)
p1 = randomPeer(t)
p2 = randomPeer(t)
laddr := p1.Addr
key1 := p1.PrivKey
key2 := p2.PrivKey
if !secure {
key1 = nil
key2 = nil
}
p2, err := setupPeer(a2)
if err != nil {
t.Fatal("error setting up peer", err)
}
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
ps1 := peer.NewPeerstore()
ps2 := peer.NewPeerstore()
ps1.Add(p1)
ps2.Add(p2)
l1, err := Listen(ctx, laddr, p1, ps1)
l1.SetWithoutSecureTransport(!secure)
l1, err := Listen(ctx, laddr, p1.ID, key1)
if err != nil {
t.Fatal(err)
}
d2 := &Dialer{
Peerstore: ps2,
LocalPeer: p2,
WithoutSecureTransport: !secure,
LocalPeer: p2.ID,
PrivateKey: key2,
}
var c2 Conn
done := make(chan struct{})
done := make(chan error)
go func() {
c2, err = d2.Dial(ctx, "tcp", p1)
var err error
c2, err = d2.Dial(ctx, p1.Addr, p1.ID)
if err != nil {
t.Fatal("error dialing peer", err)
done <- err
}
done <- struct{}{}
close(done)
}()
c1, err := l1.Accept()
if err != nil {
t.Fatal("failed to accept")
t.Fatal("failed to accept", err)
}
if err := <-done; err != nil {
t.Fatal(err)
}
<-done
return c1.(Conn), c2
return c1.(Conn), c2, p1, p2
}
func TestDialer(t *testing.T) {
func testDialer(t *testing.T, secure bool) {
// t.Skip("Skipping in favor of another test")
p1, err := setupPeer("/ip4/127.0.0.1/tcp/4234")
if err != nil {
t.Fatal("error setting up peer", err)
}
p1 := randomPeer(t)
p2 := randomPeer(t)
p2, err := setupPeer("/ip4/127.0.0.1/tcp/4235")
if err != nil {
t.Fatal("error setting up peer", err)
key1 := p1.PrivKey
key2 := p2.PrivKey
if !secure {
key1 = nil
key2 = nil
}
ctx, cancel := context.WithCancel(context.Background())
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
ps1 := peer.NewPeerstore()
ps2 := peer.NewPeerstore()
ps1.Add(p1)
ps2.Add(p2)
l, err := Listen(ctx, laddr, p1, ps1)
l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
if err != nil {
t.Fatal(err)
}
go echoListen(ctx, l)
d := &Dialer{
Peerstore: ps2,
LocalPeer: p2,
d2 := &Dialer{
LocalPeer: p2.ID,
PrivateKey: key2,
}
c, err := d.Dial(ctx, "tcp", p1)
go echoListen(ctx, l1)
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
if err != nil {
t.Fatal("error dialing peer", err)
}
@ -180,83 +211,16 @@ func TestDialer(t *testing.T) {
// fmt.Println("closing")
c.Close()
l.Close()
l1.Close()
cancel()
}
func TestDialAddr(t *testing.T) {
func TestDialerInsecure(t *testing.T) {
// t.Skip("Skipping in favor of another test")
p1, err := setupPeer("/ip4/127.0.0.1/tcp/4334")
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer("/ip4/127.0.0.1/tcp/4335")
if err != nil {
t.Fatal("error setting up peer", err)
}
ctx, cancel := context.WithCancel(context.Background())
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
ps1 := peer.NewPeerstore()
ps2 := peer.NewPeerstore()
ps1.Add(p1)
ps2.Add(p2)
l, err := Listen(ctx, laddr, p1, ps1)
if err != nil {
t.Fatal(err)
}
go echoListen(ctx, l)
d := &Dialer{
Peerstore: ps2,
LocalPeer: p2,
}
raddr := p1.NetAddress("tcp")
if raddr == nil {
t.Fatal("Dial address is nil.")
}
c, err := d.DialAddr(ctx, raddr, p1)
if err != nil {
t.Fatal("error dialing peer", err)
}
// fmt.Println("sending")
c.WriteMsg([]byte("beep"))
c.WriteMsg([]byte("boop"))
out, err := c.ReadMsg()
if err != nil {
t.Fatal(err)
}
// fmt.Println("recving", string(out))
data := string(out)
if data != "beep" {
t.Error("unexpected conn output", data)
}
out, err = c.ReadMsg()
if err != nil {
t.Fatal(err)
}
data = string(out)
if string(out) != "boop" {
t.Error("unexpected conn output", data)
}
// fmt.Println("closing")
c.Close()
l.Close()
cancel()
testDialer(t, false)
}
func TestDialerSecure(t *testing.T) {
// t.Skip("Skipping in favor of another test")
testDialer(t, true)
}

View File

@ -2,13 +2,12 @@ package conn
import (
"fmt"
"io"
handshake "github.com/jbenet/go-ipfs/net/handshake"
hspb "github.com/jbenet/go-ipfs/net/handshake/pb"
ggprotoio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ggprotoio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
)
// Handshake1 exchanges local and remote versions and compares them
@ -51,38 +50,3 @@ func Handshake1(ctx context.Context, c Conn) error {
log.Debugf("%s version handshake compatible %s", lpeer, rpeer)
return nil
}
// Handshake3 exchanges local and remote service information
func Handshake3(ctx context.Context, stream io.ReadWriter, c Conn) (*handshake.Handshake3Result, error) {
rpeer := c.RemotePeer()
lpeer := c.LocalPeer()
// setup up protobuf io
maxSize := 4096
r := ggprotoio.NewDelimitedReader(stream, maxSize)
w := ggprotoio.NewDelimitedWriter(stream)
localH := handshake.Handshake3Msg(lpeer, c.RemoteMultiaddr())
remoteH := new(hspb.Handshake3)
// setup + send the message to remote
if err := w.WriteMsg(localH); err != nil {
return nil, err
}
log.Debugf("Handshake3: sent to %s", rpeer)
log.Event(ctx, "handshake3Sent", lpeer, rpeer)
// wait + listen for response
if err := r.ReadMsg(remoteH); err != nil {
return nil, fmt.Errorf("Handshake3 could not receive remote msg: %q", err)
}
log.Debugf("Handshake3: received from %s", rpeer)
log.Event(ctx, "handshake3Received", lpeer, rpeer)
// actually update our state based on the new knowledge
res, err := handshake.Handshake3Update(lpeer, rpeer, remoteH)
if err != nil {
log.Errorf("Handshake3 failed to update %s", rpeer)
}
res.RemoteObservedAddress = c.RemoteMultiaddr()
return res, nil
}

View File

@ -5,6 +5,7 @@ import (
"net"
"time"
ic "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
@ -20,13 +21,13 @@ type PeerConn interface {
LocalMultiaddr() ma.Multiaddr
// LocalPeer is the Peer on our side of the connection
LocalPeer() peer.Peer
LocalPeer() peer.ID
// RemoteMultiaddr is the Multiaddr on the remote side
RemoteMultiaddr() ma.Multiaddr
// RemotePeer is the Peer on the remote side
RemotePeer() peer.Peer
RemotePeer() peer.ID
}
// Conn is a generic message-based Peer-to-Peer connection.
@ -54,16 +55,14 @@ type Conn interface {
type Dialer struct {
// LocalPeer is the identity of the local Peer.
LocalPeer peer.Peer
LocalPeer peer.ID
// Peerstore is the set of peers we know about locally. The Dialer needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
Peerstore peer.Peerstore
// LocalAddrs is a set of local addresses to use.
LocalAddrs []ma.Multiaddr
// WithoutSecureTransport determines whether to initialize an insecure connection.
// Phrased negatively so default is Secure, and verbosely to be very clear.
WithoutSecureTransport bool
// PrivateKey used to initialize a secure connection.
// Warning: if PrivateKey is nil, connection will not be secured.
PrivateKey ic.PrivKey
}
// Listener is an object that can accept connections. It matches net.Listener
@ -72,11 +71,6 @@ type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (net.Conn, error)
// {Set}WithoutSecureTransport decides whether to start insecure connections.
// Phrased negatively so default is Secure, and verbosely to be very clear.
WithoutSecureTransport() bool
SetWithoutSecureTransport(bool)
// Addr is the local address
Addr() net.Addr
@ -84,12 +78,7 @@ type Listener interface {
Multiaddr() ma.Multiaddr
// LocalPeer is the identity of the local Peer.
LocalPeer() peer.Peer
// Peerstore is the set of peers we know about locally. The Listener needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
Peerstore() peer.Peerstore
LocalPeer() peer.ID
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.

View File

@ -5,31 +5,37 @@ import (
"net"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
ic "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
)
// listener is an object that can accept connections. It implements Listener
type listener struct {
withoutSecureTransport bool
manet.Listener
// Local multiaddr to listen on
maddr ma.Multiaddr
maddr ma.Multiaddr // Local multiaddr to listen on
local peer.ID // LocalPeer is the identity of the local Peer
privk ic.PrivKey // private key to use to initialize secure conns
// LocalPeer is the identity of the local Peer.
local peer.Peer
cg ctxgroup.ContextGroup
}
// Peerstore is the set of peers we know about locally
peers peer.Peerstore
func (l *listener) teardown() error {
defer log.Debugf("listener closed: %s %s", l.local, l.maddr)
return l.Listener.Close()
}
func (l *listener) Close() error {
log.Infof("listener closing: %s %s", l.local, l.maddr)
return l.Listener.Close()
log.Debugf("listener closing: %s %s", l.local, l.maddr)
return l.cg.Close()
}
func (l *listener) String() string {
return fmt.Sprintf("<Listener %s %s>", l.local, l.maddr)
}
// Accept waits for and returns the next connection to the listener.
@ -46,29 +52,22 @@ func (l *listener) Accept() (net.Conn, error) {
return nil, err
}
c, err := newSingleConn(ctx, l.local, nil, maconn)
c, err := newSingleConn(ctx, l.local, "", maconn)
if err != nil {
return nil, fmt.Errorf("Error accepting connection: %v", err)
}
if l.withoutSecureTransport {
if l.privk == nil {
log.Warning("listener %s listening INSECURELY!", l)
return c, nil
}
sc, err := newSecureConn(ctx, c, l.peers)
sc, err := newSecureConn(ctx, l.privk, c)
if err != nil {
return nil, fmt.Errorf("Error securing connection: %v", err)
}
return sc, nil
}
func (l *listener) WithoutSecureTransport() bool {
return l.withoutSecureTransport
}
func (l *listener) SetWithoutSecureTransport(b bool) {
l.withoutSecureTransport = b
}
func (l *listener) Addr() net.Addr {
return l.Listener.Addr()
}
@ -79,29 +78,22 @@ func (l *listener) Multiaddr() ma.Multiaddr {
}
// LocalPeer is the identity of the local Peer.
func (l *listener) LocalPeer() peer.Peer {
func (l *listener) LocalPeer() peer.ID {
return l.local
}
// Peerstore is the set of peers we know about locally. The Listener needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
func (l *listener) Peerstore() peer.Peerstore {
return l.peers
}
func (l *listener) Loggable() map[string]interface{} {
return map[string]interface{}{
"listener": map[string]interface{}{
"peer": l.LocalPeer(),
"address": l.Multiaddr(),
"withoutSecureTransport": l.withoutSecureTransport,
"peer": l.LocalPeer(),
"address": l.Multiaddr(),
"secure": (l.privk != nil),
},
}
}
// Listen listens on the particular multiaddr, with given peer and peerstore.
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.Peerstore) (Listener, error) {
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
ml, err := manet.Listen(addr)
if err != nil {
@ -111,10 +103,11 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.
l := &listener{
Listener: ml,
maddr: addr,
peers: peers,
local: local,
withoutSecureTransport: false,
privk: sk,
cg: ctxgroup.WithContext(ctx),
}
l.cg.SetTeardown(l.teardown)
log.Infof("swarm listening on %s\n", l.Multiaddr())
log.Event(ctx, "swarmListen", l)

View File

@ -8,8 +8,10 @@ import (
msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ic "github.com/jbenet/go-ipfs/crypto"
secio "github.com/jbenet/go-ipfs/crypto/secio"
peer "github.com/jbenet/go-ipfs/peer"
errors "github.com/jbenet/go-ipfs/util/debugerror"
)
// secureConn wraps another Conn object with an encrypted channel.
@ -26,10 +28,21 @@ type secureConn struct {
}
// newConn constructs a new connection
func newSecureConn(ctx context.Context, insecure Conn, peers peer.Peerstore) (Conn, error) {
func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, error) {
if insecure == nil {
return nil, errors.New("insecure is nil")
}
if insecure.LocalPeer() == "" {
return nil, errors.New("insecure.LocalPeer() is nil")
}
if sk == nil {
panic("way")
return nil, errors.New("private key is nil")
}
// NewSession performs the secure handshake, which takes multiple RTT
sessgen := secio.SessionGenerator{Local: insecure.LocalPeer(), Peerstore: peers}
sessgen := secio.SessionGenerator{LocalID: insecure.LocalPeer(), PrivateKey: sk}
session, err := sessgen.NewSession(ctx, insecure)
if err != nil {
return nil, err
@ -92,12 +105,12 @@ func (c *secureConn) RemoteMultiaddr() ma.Multiaddr {
}
// LocalPeer is the Peer on this side
func (c *secureConn) LocalPeer() peer.Peer {
func (c *secureConn) LocalPeer() peer.ID {
return c.session.LocalPeer()
}
// RemotePeer is the Peer on the remote side
func (c *secureConn) RemotePeer() peer.Peer {
func (c *secureConn) RemotePeer() peer.ID {
return c.session.RemotePeer()
}

View File

@ -2,50 +2,73 @@ package conn
import (
"bytes"
"fmt"
"os"
"runtime"
"strconv"
"sync"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/peer"
ic "github.com/jbenet/go-ipfs/crypto"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func upgradeToSecureConn(t *testing.T, ctx context.Context, c Conn) (Conn, error) {
func upgradeToSecureConn(t *testing.T, ctx context.Context, sk ic.PrivKey, c Conn) (Conn, error) {
if c, ok := c.(*secureConn); ok {
return c, nil
}
// shouldn't happen, because dial + listen already return secure conns.
s, err := newSecureConn(ctx, c, peer.NewPeerstore())
s, err := newSecureConn(ctx, sk, c)
if err != nil {
return nil, err
}
return s, nil
}
func secureHandshake(t *testing.T, ctx context.Context, c Conn, done chan error) {
_, err := upgradeToSecureConn(t, ctx, c)
func secureHandshake(t *testing.T, ctx context.Context, sk ic.PrivKey, c Conn, done chan error) {
_, err := upgradeToSecureConn(t, ctx, sk, c)
done <- err
}
func TestSecureSimple(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx := context.Background()
c1, c2, p1, p2 := setupSingleConn(t, ctx)
done := make(chan error)
go secureHandshake(t, ctx, p1.PrivKey, c1, done)
go secureHandshake(t, ctx, p2.PrivKey, c2, done)
for i := 0; i < 2; i++ {
if err := <-done; err != nil {
t.Fatal(err)
}
}
for i := 0; i < 100; i++ {
testOneSendRecv(t, c1, c2)
testOneSendRecv(t, c2, c1)
}
c1.Close()
c2.Close()
}
func TestSecureClose(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx := context.Background()
c1, c2 := setupSingleConn(t, ctx, "/ip4/127.0.0.1/tcp/6634", "/ip4/127.0.0.1/tcp/6645")
c1, c2, p1, p2 := setupSingleConn(t, ctx)
done := make(chan error)
go secureHandshake(t, ctx, c1, done)
go secureHandshake(t, ctx, c2, done)
go secureHandshake(t, ctx, p1.PrivKey, c1, done)
go secureHandshake(t, ctx, p2.PrivKey, c2, done)
for i := 0; i < 2; i++ {
if err := <-done; err != nil {
t.Error(err)
t.Fatal(err)
}
}
@ -64,13 +87,13 @@ func TestSecureCancelHandshake(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupSingleConn(t, ctx, "/ip4/127.0.0.1/tcp/6634", "/ip4/127.0.0.1/tcp/6645")
c1, c2, p1, p2 := setupSingleConn(t, ctx)
done := make(chan error)
go secureHandshake(t, ctx, c1, done)
go secureHandshake(t, ctx, p1.PrivKey, c1, done)
<-time.After(50 * time.Millisecond)
cancel() // cancel ctx
go secureHandshake(t, ctx, c2, done)
go secureHandshake(t, ctx, p2.PrivKey, c2, done)
for i := 0; i < 2; i++ {
if err := <-done; err == nil {
@ -79,6 +102,24 @@ func TestSecureCancelHandshake(t *testing.T) {
}
}
func TestSecureHandshakeFailsWithWrongKeys(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c1, c2, p1, p2 := setupSingleConn(t, ctx)
done := make(chan error)
go secureHandshake(t, ctx, p2.PrivKey, c1, done)
go secureHandshake(t, ctx, p1.PrivKey, c2, done)
for i := 0; i < 2; i++ {
if err := <-done; err == nil {
t.Error("wrong keys should've errored out.")
}
}
}
func TestSecureCloseLeak(t *testing.T) {
// t.Skip("Skipping in favor of another test")
@ -89,15 +130,11 @@ func TestSecureCloseLeak(t *testing.T) {
t.Skip("this doesn't work well on travis")
}
var wg sync.WaitGroup
runPair := func(p1, p2, num int) {
a1 := strconv.Itoa(p1)
a2 := strconv.Itoa(p2)
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupSecureConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2)
runPair := func(c1, c2 Conn, num int) {
log.Debugf("runPair %d", num)
for i := 0; i < num; i++ {
log.Debugf("runPair iteration %d", i)
b1 := []byte("beep")
c1.WriteMsg(b1)
b2, err := c2.ReadMsg()
@ -120,22 +157,32 @@ func TestSecureCloseLeak(t *testing.T) {
<-time.After(time.Microsecond * 5)
}
c1.Close()
c2.Close()
cancel() // close the listener
wg.Done()
}
var cons = 20
var msgs = 100
fmt.Printf("Running %d connections * %d msgs.\n", cons, msgs)
log.Debugf("Running %d connections * %d msgs.\n", cons, msgs)
var wg sync.WaitGroup
for i := 0; i < cons; i++ {
wg.Add(1)
go runPair(2000+i, 2001+i, msgs)
ctx, cancel := context.WithCancel(context.Background())
c1, c2, _, _ := setupSecureConn(t, ctx)
go func(c1, c2 Conn) {
defer func() {
c1.Close()
c2.Close()
cancel()
wg.Done()
}()
runPair(c1, c2, msgs)
}(c1, c2)
}
fmt.Printf("Waiting...\n")
log.Debugf("Waiting...\n")
wg.Wait()
// done!