much simpler net

- removed ctxcloser
- removed multiconn
- focused on netio
This commit is contained in:
Juan Batiz-Benet 2014-12-14 14:14:22 -08:00
parent 55d1e794c7
commit 393842e245
12 changed files with 215 additions and 1066 deletions

View File

@ -13,17 +13,14 @@ import (
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = u.Logger("conn")
var log = eventlog.Logger("conn")
const (
// MaxMessageSize is the size of the largest single message. (4MB)
MaxMessageSize = 1 << 22
// HandshakeTimeout for when nodes first connect
HandshakeTimeout = time.Second * 5
)
// ReleaseBuffer puts the given byte array back into the buffer pool,
@ -39,13 +36,10 @@ type singleConn struct {
remote peer.Peer
maconn manet.Conn
msgrw msgio.ReadWriteCloser
ctxc.ContextCloser
}
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote peer.Peer,
maconn manet.Conn) (Conn, error) {
func newSingleConn(ctx context.Context, local, remote peer.Peer, maconn manet.Conn) (Conn, error) {
conn := &singleConn{
local: local,
@ -53,14 +47,10 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
maconn: maconn,
msgrw: msgio.NewReadWriter(maconn),
}
conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)
log.Debugf("newSingleConn %p: %v to %v", conn, local, remote)
// version handshake
ctxT, _ := context.WithTimeout(ctx, HandshakeTimeout)
if err := Handshake1(ctxT, conn); err != nil {
if err := Handshake1(ctx, conn); err != nil {
conn.Close()
return nil, fmt.Errorf("Handshake1 failed: %s", err)
}
@ -70,9 +60,8 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
}
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) close() error {
func (c *singleConn) Close() error {
log.Debugf("%s closing Conn with %s", c.local, c.remote)
// close underlying connection
return c.msgrw.Close()
}

View File

@ -13,73 +13,51 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func testOneSendRecv(t *testing.T, c1, c2 Conn) {
m1 := []byte("hello")
if err := c1.WriteMsg(m1); err != nil {
t.Fatal(err)
}
m2, err := c2.ReadMsg()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(m1, m2) {
t.Fatal("failed to send: %s %s", m1, m2)
}
}
func testNotOneSendRecv(t *testing.T, c1, c2 Conn) {
m1 := []byte("hello")
if err := c1.WriteMsg(m1); err == nil {
t.Fatal("write should have failed", err)
}
_, err := c2.ReadMsg()
if err == nil {
t.Fatal("read should have failed", err)
}
}
func TestClose(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/5534", "/ip4/127.0.0.1/tcp/5545")
select {
case <-c1.Closed():
t.Fatal("done before close")
case <-c2.Closed():
t.Fatal("done before close")
default:
}
testOneSendRecv(t, c1, c2)
testOneSendRecv(t, c2, c1)
c1.Close()
select {
case <-c1.Closed():
default:
t.Fatal("not done after cancel")
}
time.After(200 * time.Millisecond)
testNotOneSendRecv(t, c1, c2)
testNotOneSendRecv(t, c2, c1)
c2.Close()
select {
case <-c2.Closed():
default:
t.Fatal("not done after cancel")
}
cancel() // close the listener :P
}
func TestCancel(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/5534", "/ip4/127.0.0.1/tcp/5545")
select {
case <-c1.Closed():
t.Fatal("done before close")
case <-c2.Closed():
t.Fatal("done before close")
default:
}
c1.Close()
c2.Close()
cancel() // listener
// wait to ensure other goroutines run and close things.
<-time.After(time.Microsecond * 10)
// test that cancel called Close.
select {
case <-c1.Closed():
default:
t.Fatal("not done after cancel")
}
select {
case <-c2.Closed():
default:
t.Fatal("not done after cancel")
}
time.After(20000 * time.Millisecond)
testNotOneSendRecv(t, c1, c2)
testNotOneSendRecv(t, c2, c1)
}
func TestCloseLeak(t *testing.T) {

View File

@ -4,7 +4,6 @@ import (
"strings"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
@ -67,11 +66,25 @@ func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.P
return nil, err
}
select {
case <-ctx.Done():
maconn.Close()
return nil, err
default:
}
c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
c.Close()
return nil, err
default:
}
// return c, nil
return newSecureConn(ctx, c, d.Peerstore)
}

View File

@ -33,16 +33,19 @@ func setupPeer(addr string) (peer.Peer, error) {
func echoListen(ctx context.Context, listener Listener) {
for {
select {
case <-ctx.Done():
return
case c := <-listener.Accept():
go echo(ctx, c)
c, err := listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return
default:
}
}
go echo(c.(Conn))
}
}
func echo(ctx context.Context, c Conn) {
func echo(c Conn) {
io.Copy(c, c)
}
@ -78,14 +81,24 @@ func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) {
LocalPeer: p2,
}
c2, err := d2.Dial(ctx, "tcp", p1)
var c2 Conn
done := make(chan struct{})
go func() {
c2, err = d2.Dial(ctx, "tcp", p1)
if err != nil {
t.Fatal("error dialing peer", err)
}
done <- struct{}{}
}()
c1, err := l1.Accept()
if err != nil {
t.Fatal("error dialing peer", err)
t.Fatal("failed to accept")
}
<-done
c1 := <-l1.Accept()
return c1, c2
return c1.(Conn), c2
}
func TestDialer(t *testing.T) {

View File

@ -6,8 +6,9 @@ import (
handshake "github.com/jbenet/go-ipfs/net/handshake"
hspb "github.com/jbenet/go-ipfs/net/handshake/pb"
ggprotoio "code.google.com/p/gogoprotobuf/io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ps "github.com/jbenet/go-peerstream"
)
// Handshake1 exchanges local and remote versions and compares them
@ -16,30 +17,31 @@ func Handshake1(ctx context.Context, c Conn) error {
rpeer := c.RemotePeer()
lpeer := c.LocalPeer()
var remoteH, localH *hspb.Handshake1
localH = handshake.Handshake1Msg()
// setup up protobuf io
maxSize := 4096
r := ggprotoio.NewDelimitedReader(c, maxSize)
w := ggprotoio.NewDelimitedWriter(c)
localH := handshake.Handshake1Msg()
remoteH := new(hspb.Handshake1)
myVerBytes, err := proto.Marshal(localH)
if err != nil {
return err
}
if err := CtxWriteMsg(ctx, c, myVerBytes); err != nil {
// send the outgoing handshake message
if err := w.WriteMsg(localH); err != nil {
return err
}
log.Debugf("%p sent my version (%s) to %s", c, localH, rpeer)
log.Event(ctx, "handshake1Sent", lpeer)
data, err := CtxReadMsg(ctx, c)
if err != nil {
return err
select {
case <-ctx.Done():
return ctx.Err()
default:
}
remoteH = new(hspb.Handshake1)
err = proto.Unmarshal(data, remoteH)
if err != nil {
return fmt.Errorf("could not decode remote version: %q", err)
if err := r.ReadMsg(remoteH); err != nil {
return fmt.Errorf("could not receive remote version: %q", err)
}
log.Debugf("%p received remote version (%s) from %s", c, remoteH, rpeer)
log.Event(ctx, "handshake1Received", lpeer)
if err := handshake.Handshake1Compatible(localH, remoteH); err != nil {
log.Infof("%s (%s) incompatible version with %s (%s)", lpeer, localH, rpeer, remoteH)
@ -51,36 +53,30 @@ func Handshake1(ctx context.Context, c Conn) error {
}
// Handshake3 exchanges local and remote service information
func Handshake3(ctx context.Context, c Conn) (*handshake.Handshake3Result, error) {
func Handshake3(ctx context.Context, stream ps.Stream, c Conn) (*handshake.Handshake3Result, error) {
rpeer := c.RemotePeer()
lpeer := c.LocalPeer()
// setup + send the message to remote
var remoteH, localH *hspb.Handshake3
localH = handshake.Handshake3Msg(lpeer, c.RemoteMultiaddr())
localB, err := proto.Marshal(localH)
if err != nil {
return nil, err
}
// setup up protobuf io
maxSize := 4096
r := ggprotoio.NewDelimitedReader(stream, maxSize)
w := ggprotoio.NewDelimitedWriter(stream)
localH := handshake.Handshake3Msg(lpeer, c.RemoteMultiaddr())
remoteH := new(hspb.Handshake3)
if err := CtxWriteMsg(ctx, c, localB); err != nil {
// setup + send the message to remote
if err := w.WriteMsg(localH); err != nil {
return nil, err
}
log.Debugf("Handshake1: sent to %s", rpeer)
log.Debugf("Handshake3: sent to %s", rpeer)
log.Event(ctx, "handshake3Sent", lpeer, rpeer)
// wait + listen for response
remoteB, err := CtxReadMsg(ctx, c)
if err != nil {
return nil, err
if err := r.ReadMsg(remoteH); err != nil {
return nil, fmt.Errorf("Handshake3 could not receive remote msg: %q", err)
}
remoteH = new(hspb.Handshake3)
err = proto.Unmarshal(remoteB, remoteH)
if err != nil {
return nil, fmt.Errorf("Handshake3 could not decode remote msg: %q", err)
}
log.Debugf("Handshake3 received from %s", rpeer)
log.Debugf("Handshake3: received from %s", rpeer)
log.Event(ctx, "handshake3Received", lpeer, rpeer)
// actually update our state based on the new knowledge
res, err := handshake.Handshake3Update(lpeer, rpeer, remoteH)

View File

@ -1,15 +1,13 @@
package conn
import (
"errors"
"io"
"net"
"time"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
@ -20,9 +18,6 @@ type Map map[u.Key]Conn
// Conn is a generic message-based Peer-to-Peer connection.
type Conn interface {
// implement ContextCloser too!
ctxc.ContextCloser
// ID is an identifier unique to this connection.
ID() string
@ -47,6 +42,7 @@ type Conn interface {
msgio.Reader
msgio.Writer
io.Closer
}
// Dialer is an object that can open connections. We could have a "convenience"
@ -67,9 +63,12 @@ type Dialer struct {
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() <-chan Conn
Accept() (net.Conn, error)
// Multiaddr is the identity of the local Peer.
// Addr is the local address
Addr() net.Addr
// Multiaddr is the local multiaddr address
Multiaddr() ma.Multiaddr
// LocalPeer is the identity of the local Peer.
@ -84,89 +83,3 @@ type Listener interface {
// Any blocked Accept operations will be unblocked and return errors.
Close() error
}
// CtxRead is a function that Reads from a connection while respecting a
// Context. Though it cannot cancel the read per-se (as not all Connections
// implement SetTimeout, and a CancelFunc can't be predicted), at least it
// doesn't hang. The Read will eventually return and the goroutine will exit.
func CtxRead(ctx context.Context, c Conn, buf []byte) (n int, err error) {
done := make(chan struct{})
go func() {
n, err = c.Read(buf)
close(done)
}()
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-c.Closing():
return 0, errors.New("remote connection closed")
case <-done:
return n, err
}
}
// CtxReadMsg is a function that Reads from a connection while respecting a
// Context. See CtxRead.
func CtxReadMsg(ctx context.Context, c Conn) (msg []byte, err error) {
done := make(chan struct{})
go func() {
msg, err = c.ReadMsg()
close(done)
}()
select {
case <-ctx.Done():
return msg, ctx.Err()
case <-c.Closing():
return msg, errors.New("remote connection closed")
case <-done:
return msg, err
}
}
// CtxWrite is a function that Writes to a connection while respecting a
// Context. See CtxRead.
func CtxWrite(ctx context.Context, c Conn, buf []byte) (n int, err error) {
done := make(chan struct{})
go func() {
n, err = c.Read(buf)
close(done)
}()
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-c.Closing():
return 0, errors.New("remote connection closed")
case <-done:
return n, err
}
}
// CtxWriteMsg is a function that Writes to a connection while respecting a
// Context. See CtxRead.
func CtxWriteMsg(ctx context.Context, c Conn, buf []byte) (err error) {
done := make(chan struct{})
go func() {
err = c.WriteMsg(buf)
close(done)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-c.Closing():
return errors.New("remote connection closed")
case <-done:
return err
}
}

View File

@ -2,25 +2,22 @@ package conn
import (
"fmt"
"net"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
peer "github.com/jbenet/go-ipfs/peer"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)
// listener is an object that can accept connections. It implements Listener
type listener struct {
notSecure bool
notSecureIMeanIt bool
manet.Listener
// chansize is the size of the internal channels for concurrency
chansize int
// channel of incoming conections
conns chan Conn
// Local multiaddr to listen on
maddr ma.Multiaddr
@ -29,79 +26,49 @@ type listener struct {
// Peerstore is the set of peers we know about locally
peers peer.Peerstore
// Context for children Conn
ctx context.Context
// embedded ContextCloser
ctxc.ContextCloser
}
// disambiguate
func (l *listener) Close() error {
return l.ContextCloser.Close()
}
// close called by ContextCloser.Close
func (l *listener) close() error {
log.Infof("listener closing: %s %s", l.local, l.maddr)
return l.Listener.Close()
}
func (l *listener) listen() {
defer l.Children().Done()
// handle at most chansize concurrent handshakes
sem := make(chan struct{}, l.chansize)
// handle is a goroutine work function that handles the handshake.
// it's here only so that accepting new connections can happen quickly.
handle := func(maconn manet.Conn) {
defer func() { <-sem }() // release
c, err := newSingleConn(l.ctx, l.local, nil, maconn)
if err != nil {
log.Errorf("Error accepting connection: %v", err)
return
}
// if insecure:
// l.conns <- c
// if secure
sc, err := newSecureConn(l.ctx, c, l.peers)
if err != nil {
log.Errorf("Error securing connection: %v", err)
return
}
l.conns <- sc
}
for {
log.Infof("swarm listening on %s -- %v\n", l.Multiaddr(), l.Listener)
maconn, err := l.Listener.Accept()
if err != nil {
// if closing, we should exit.
select {
case <-l.Closing():
return // done.
default:
}
log.Errorf("Failed to accept connection: %v", err)
continue
}
sem <- struct{}{} // acquire
go handle(maconn)
}
}
// Accept waits for and returns the next connection to the listener.
// Note that unfortunately this
func (l *listener) Accept() <-chan Conn {
return l.conns
func (l *listener) Accept() (net.Conn, error) {
// listeners dont have contexts. given changes dont make sense here anymore
// note that the parent of listener will Close, which will interrupt all io.
// Contexts and io don't mix.
ctx := context.Background()
maconn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
c, err := newSingleConn(ctx, l.local, nil, maconn)
if err != nil {
return nil, fmt.Errorf("Error accepting connection: %v", err)
}
if l.Secure() {
sc, err := newSecureConn(ctx, c, l.peers)
if err != nil {
return nil, fmt.Errorf("Error securing connection: %v", err)
}
return sc, nil
}
return c, nil
}
func (l *listener) Secure() bool {
return !(l.notSecure && l.notSecureIMeanIt)
}
func (l *listener) Addr() net.Addr {
return l.Listener.Addr()
}
// Multiaddr is the identity of the local Peer.
@ -121,6 +88,16 @@ func (l *listener) Peerstore() peer.Peerstore {
return l.peers
}
func (l *listener) Loggable() map[string]interface{} {
return map[string]interface{}{
"listener": map[string]interface{}{
"peer": l.LocalPeer(),
"address": l.Multiaddr(),
"secure": l.Secure(),
},
}
}
// Listen listens on the particular multiaddr, with given peer and peerstore.
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.Peerstore) (Listener, error) {
@ -129,27 +106,16 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.
return nil, fmt.Errorf("Failed to listen on %s: %s", addr, err)
}
// todo make this a variable
chansize := 10
l := &listener{
Listener: ml,
maddr: addr,
peers: peers,
local: local,
conns: make(chan Conn, chansize),
chansize: chansize,
ctx: ctx,
Listener: ml,
maddr: addr,
peers: peers,
local: local,
notSecure: false,
notSecureIMeanIt: false,
}
// need a separate context to use for the context closer.
// This is because the parent context will be given to all connections too,
// and if we close the listener, the connections shouldn't share the fate.
ctx2, _ := context.WithCancel(ctx)
l.ContextCloser = ctxc.NewContextCloser(ctx2, l.close)
l.Children().Add(1)
go l.listen()
log.Infof("swarm listening on %s\n", l.Multiaddr())
log.Event(ctx, "swarmListen", l)
return l, nil
}

View File

@ -1,351 +0,0 @@
package conn
import (
"errors"
"fmt"
"net"
"sync"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)
// MultiConnMap is for shorthand
type MultiConnMap map[u.Key]*MultiConn
// MultiConn represents a single connection to another Peer (IPFS Node).
type MultiConn struct {
// connections, mapped by a string, which uniquely identifies the connection.
// this string is: /addr1/peer1/addr2/peer2 (peers ordered lexicographically)
conns map[string]Conn
local peer.Peer
remote peer.Peer
// fan-in
fanIn chan []byte
// for adding/removing connections concurrently
sync.RWMutex
ctxc.ContextCloser
}
// NewMultiConn constructs a new connection
func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (*MultiConn, error) {
c := &MultiConn{
local: local,
remote: remote,
conns: map[string]Conn{},
fanIn: make(chan []byte),
}
// must happen before Adds / fanOut
c.ContextCloser = ctxc.NewContextCloser(ctx, c.close)
if conns != nil && len(conns) > 0 {
c.Add(conns...)
}
return c, nil
}
// Add adds given Conn instances to multiconn.
func (c *MultiConn) Add(conns ...Conn) {
c.Lock()
defer c.Unlock()
for _, c2 := range conns {
log.Debugf("MultiConn: adding %s", c2)
if c.LocalPeer() != c2.LocalPeer() || c.RemotePeer() != c2.RemotePeer() {
log.Error(c2)
c.Unlock() // ok to unlock (to log). panicing.
log.Error(c)
// log.Errorf("c.LocalPeer: %s %p", c.LocalPeer(), c.LocalPeer())
// log.Errorf("c2.LocalPeer: %s %p", c2.LocalPeer(), c2.LocalPeer())
// log.Errorf("c.RemotePeer: %s %p", c.RemotePeer(), c.RemotePeer())
// log.Errorf("c2.RemotePeer: %s %p", c2.RemotePeer(), c2.RemotePeer())
c.Lock() // gotta relock to avoid lock panic from deferring.
panic("connection addresses mismatch")
}
c.conns[c2.ID()] = c2
c.Children().Add(1)
c2.Children().Add(1) // yep, on the child too.
go c.fanInSingle(c2)
log.Debugf("MultiConn: added %s", c2)
}
}
// Remove removes given Conn instances from multiconn.
func (c *MultiConn) Remove(conns ...Conn) {
// first remove them to avoid sending any more messages through it.
{
c.Lock()
for _, c1 := range conns {
c2, found := c.conns[c1.ID()]
if !found {
panic("Conn not in MultiConn")
}
if c1 != c2 {
panic("different Conn objects for same id.")
}
delete(c.conns, c2.ID())
}
c.Unlock()
}
// close all in parallel, but wait for all to be done closing.
CloseConns(conns...)
}
// CloseConns closes multiple connections in parallel, and waits for all
// to finish closing.
func CloseConns(conns ...Conn) {
var wg sync.WaitGroup
for _, child := range conns {
select {
case <-child.Closed(): // if already closed, continue
continue
default:
}
wg.Add(1)
go func(child Conn) {
child.Close()
wg.Done()
}(child)
}
wg.Wait()
}
// fanInSingle Reads from a connection, and sends to the fanIn.
// waits for child to close and reclaims resources
func (c *MultiConn) fanInSingle(child Conn) {
// cleanup all data associated with this child Connection.
defer func() {
log.Debugf("closing: %s", child)
// in case it still is in the map, remove it.
c.Lock()
delete(c.conns, child.ID())
connLen := len(c.conns)
c.Unlock()
c.Children().Done()
child.Children().Done()
if connLen == 0 {
c.Close() // close self if all underlying children are gone?
}
}()
for {
msg, err := child.ReadMsg()
if err != nil {
log.Warning(err)
return
}
select {
case <-c.Closing(): // multiconn closing
return
case <-child.Closing(): // child closing
return
case c.fanIn <- msg:
}
}
}
// close is the internal close function, called by ContextCloser.Close
func (c *MultiConn) close() error {
log.Debugf("%s closing Conn with %s", c.local, c.remote)
// get connections
c.RLock()
conns := make([]Conn, 0, len(c.conns))
for _, c := range c.conns {
conns = append(conns, c)
}
c.RUnlock()
// close underlying connections
CloseConns(conns...)
close(c.fanIn)
return nil
}
// BestConn is the best connection in this MultiConn
func (c *MultiConn) BestConn() Conn {
c.RLock()
defer c.RUnlock()
var id1 string
var c1 Conn
for id2, c2 := range c.conns {
if id1 == "" || id1 < id2 {
id1 = id2
c1 = c2
}
}
return c1
}
// ID is an identifier unique to this connection.
// In MultiConn, this is all the children IDs XORed together.
func (c *MultiConn) ID() string {
c.RLock()
defer c.RUnlock()
ids := []byte(nil)
for i := range c.conns {
if ids == nil {
ids = []byte(i)
} else {
ids = u.XOR(ids, []byte(i))
}
}
return string(ids)
}
func (c *MultiConn) getConns() []Conn {
c.RLock()
defer c.RUnlock()
var conns []Conn
for _, c := range c.conns {
conns = append(conns, c)
}
return conns
}
func (c *MultiConn) String() string {
return String(c, "MultiConn")
}
func (c *MultiConn) LocalAddr() net.Addr {
bc := c.BestConn()
if bc == nil {
return nil
}
return bc.LocalAddr()
}
func (c *MultiConn) RemoteAddr() net.Addr {
bc := c.BestConn()
if bc == nil {
return nil
}
return bc.RemoteAddr()
}
func (c *MultiConn) SetDeadline(t time.Time) error {
bc := c.BestConn()
if bc == nil {
return nil
}
return bc.SetDeadline(t)
}
func (c *MultiConn) SetReadDeadline(t time.Time) error {
bc := c.BestConn()
if bc == nil {
return nil
}
return bc.SetReadDeadline(t)
}
func (c *MultiConn) SetWriteDeadline(t time.Time) error {
bc := c.BestConn()
if bc == nil {
return nil
}
return bc.SetWriteDeadline(t)
}
// LocalMultiaddr is the Multiaddr on this side
func (c *MultiConn) LocalMultiaddr() ma.Multiaddr {
bc := c.BestConn()
if bc == nil {
return nil
}
return bc.LocalMultiaddr()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *MultiConn) RemoteMultiaddr() ma.Multiaddr {
bc := c.BestConn()
if bc == nil {
return nil
}
return bc.RemoteMultiaddr()
}
// LocalPeer is the Peer on this side
func (c *MultiConn) LocalPeer() peer.Peer {
return c.local
}
// RemotePeer is the Peer on the remote side
func (c *MultiConn) RemotePeer() peer.Peer {
return c.remote
}
// Read reads data, net.Conn style
func (c *MultiConn) Read(buf []byte) (int, error) {
return 0, errors.New("multiconn does not support Read. use ReadMsg")
}
// Write writes data, net.Conn style
func (c *MultiConn) Write(buf []byte) (int, error) {
bc := c.BestConn()
if bc == nil {
return 0, errors.New("no best connection")
}
return bc.Write(buf)
}
func (c *MultiConn) NextMsgLen() (int, error) {
bc := c.BestConn()
if bc == nil {
return 0, errors.New("no best connection")
}
return bc.NextMsgLen()
}
// ReadMsg reads data, net.Conn style
func (c *MultiConn) ReadMsg() ([]byte, error) {
next, ok := <-c.fanIn
if !ok {
return nil, fmt.Errorf("multiconn closed")
}
return next, nil
}
// WriteMsg writes data, net.Conn style
func (c *MultiConn) WriteMsg(buf []byte) error {
bc := c.BestConn()
if bc == nil {
return errors.New("no best connection")
}
return bc.WriteMsg(buf)
}
// ReleaseMsg releases a buffer
func (c *MultiConn) ReleaseMsg(m []byte) {
// here, we dont know where it came from. hm.
for _, c := range c.getConns() {
c.ReleaseMsg(m)
}
}

View File

@ -1,338 +0,0 @@
package conn
import (
"fmt"
"sync"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func tcpAddr(t *testing.T, port int) ma.Multiaddr {
tcp, err := ma.NewMultiaddr(tcpAddrString(port))
if err != nil {
t.Fatal(err)
}
return tcp
}
func tcpAddrString(port int) string {
return fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)
}
type msg struct {
sent bool
received bool
payload string
}
func (m *msg) Sent(t *testing.T) {
if m.sent {
t.Fatal("sent msg at incorrect state:", m)
}
m.sent = true
}
func (m *msg) Received(t *testing.T) {
if m.received {
t.Fatal("received msg at incorrect state:", m)
}
m.received = true
}
type msgMap struct {
sent int
recv int
msgs map[string]*msg
}
func (mm *msgMap) Sent(t *testing.T, payload string) {
mm.msgs[payload].Sent(t)
mm.sent++
}
func (mm *msgMap) Received(t *testing.T, payload string) {
mm.msgs[payload].Received(t)
mm.recv++
}
func (mm *msgMap) CheckDone(t *testing.T) {
if mm.sent != len(mm.msgs) {
t.Fatal("failed to send all msgs", mm.sent, len(mm.msgs))
}
if mm.sent != len(mm.msgs) {
t.Fatal("failed to send all msgs", mm.sent, len(mm.msgs))
}
}
func genMessages(num int, tag string) *msgMap {
msgs := &msgMap{msgs: map[string]*msg{}}
for i := 0; i < num; i++ {
s := fmt.Sprintf("Message #%d -- %s", i, tag)
msgs.msgs[s] = &msg{payload: s}
}
return msgs
}
func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) {
log.Info("Setting up peers")
p1, err := setupPeer(tcpAddrString(11000))
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer(tcpAddrString(12000))
if err != nil {
t.Fatal("error setting up peer", err)
}
// peerstores
p1ps := peer.NewPeerstore()
p2ps := peer.NewPeerstore()
p1ps.Add(p1)
p2ps.Add(p2)
// listeners
listen := func(addr ma.Multiaddr, p peer.Peer, ps peer.Peerstore) Listener {
l, err := Listen(ctx, addr, p, ps)
if err != nil {
t.Fatal(err)
}
return l
}
log.Info("Setting up listeners")
p1l := listen(p1.Addresses()[0], p1, p1ps)
p2l := listen(p2.Addresses()[0], p2, p2ps)
// dialers
p1d := &Dialer{Peerstore: p1ps, LocalPeer: p1}
p2d := &Dialer{Peerstore: p2ps, LocalPeer: p2}
dial := func(d *Dialer, dst peer.Peer) <-chan Conn {
cc := make(chan Conn)
go func() {
c, err := d.Dial(ctx, "tcp", dst)
if err != nil {
t.Fatal("error dialing peer", err)
}
cc <- c
}()
return cc
}
// connect simultaneously
log.Info("Connecting...")
p1dc := dial(p1d, p2)
p2dc := dial(p2d, p1)
c12a := <-p1l.Accept()
c12b := <-p1dc
c21a := <-p2l.Accept()
c21b := <-p2dc
log.Info("Ok, making multiconns")
c1, err := NewMultiConn(ctx, p1, p2, []Conn{c12a, c12b})
if err != nil {
t.Fatal(err)
}
c2, err := NewMultiConn(ctx, p2, p1, []Conn{c21a, c21b})
if err != nil {
t.Fatal(err)
}
p1l.Close()
p2l.Close()
log.Info("did you make multiconns?")
return c1, c2
}
func TestMulticonnSend(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
log.Info("TestMulticonnSend")
ctx := context.Background()
ctxC, cancel := context.WithCancel(ctx)
c1, c2 := setupMultiConns(t, ctx)
log.Info("gen msgs")
num := 100
msgsFrom1 := genMessages(num, "from p1 to p2")
msgsFrom2 := genMessages(num, "from p2 to p1")
var wg sync.WaitGroup
send := func(c *MultiConn, msgs *msgMap) {
defer wg.Done()
for _, m := range msgs.msgs {
log.Info("send: %s", m.payload)
c.WriteMsg([]byte(m.payload))
msgs.Sent(t, m.payload)
<-time.After(time.Microsecond * 10)
}
}
recv := func(ctx context.Context, c *MultiConn, msgs *msgMap) {
defer wg.Done()
for {
select {
default:
case <-ctx.Done():
return
}
payload, err := c.ReadMsg()
if err != nil {
panic(err)
}
msgs.Received(t, string(payload))
log.Info("recv: %s", payload)
if msgs.recv == len(msgs.msgs) {
return
}
}
}
log.Info("msg send + recv")
wg.Add(4)
go send(c1, msgsFrom1)
go send(c2, msgsFrom2)
go recv(ctxC, c1, msgsFrom2)
go recv(ctxC, c2, msgsFrom1)
wg.Wait()
cancel()
c1.Close()
c2.Close()
msgsFrom1.CheckDone(t)
msgsFrom2.CheckDone(t)
<-time.After(100 * time.Millisecond)
}
func TestMulticonnSendUnderlying(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
log.Info("TestMulticonnSendUnderlying")
ctx := context.Background()
ctxC, cancel := context.WithCancel(ctx)
c1, c2 := setupMultiConns(t, ctx)
log.Info("gen msgs")
num := 100
msgsFrom1 := genMessages(num, "from p1 to p2")
msgsFrom2 := genMessages(num, "from p2 to p1")
var wg sync.WaitGroup
send := func(c *MultiConn, msgs *msgMap) {
defer wg.Done()
conns := make([]Conn, 0, len(c.conns))
for _, c1 := range c.conns {
conns = append(conns, c1)
}
i := 0
for _, m := range msgs.msgs {
log.Info("send: %s", m.payload)
switch i % 3 {
case 0:
conns[0].WriteMsg([]byte(m.payload))
case 1:
conns[1].WriteMsg([]byte(m.payload))
case 2:
c.WriteMsg([]byte(m.payload))
}
msgs.Sent(t, m.payload)
<-time.After(time.Microsecond * 10)
i++
}
}
recv := func(ctx context.Context, c *MultiConn, msgs *msgMap) {
defer wg.Done()
for {
select {
default:
case <-ctx.Done():
return
}
payload, err := c.ReadMsg()
if err != nil {
panic(err)
}
msgs.Received(t, string(payload))
log.Info("recv: %s", payload)
if msgs.recv == len(msgs.msgs) {
return
}
}
}
log.Info("msg send + recv")
wg.Add(4)
go send(c1, msgsFrom1)
go send(c2, msgsFrom2)
go recv(ctxC, c1, msgsFrom2)
go recv(ctxC, c2, msgsFrom1)
wg.Wait()
cancel()
c1.Close()
c2.Close()
msgsFrom1.CheckDone(t)
msgsFrom2.CheckDone(t)
}
func TestMulticonnClose(t *testing.T) {
// t.Skip("fooo")
log.Info("TestMulticonnSendUnderlying")
ctx := context.Background()
c1, c2 := setupMultiConns(t, ctx)
for _, c := range c1.getConns() {
c.Close()
}
for _, c := range c2.getConns() {
c.Close()
}
timeout := time.After(100 * time.Millisecond)
select {
case <-c1.Closed():
case <-timeout:
t.Fatal("timeout")
}
select {
case <-c2.Closed():
case <-timeout:
t.Fatal("timeout")
}
}

View File

@ -10,7 +10,6 @@ import (
secio "github.com/jbenet/go-ipfs/crypto/secio"
peer "github.com/jbenet/go-ipfs/peer"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)
// secureConn wraps another Conn object with an encrypted channel.
@ -24,8 +23,6 @@ type secureConn struct {
// secure Session
session secio.Session
ctxc.ContextCloser
}
// newConn constructs a new connection
@ -43,13 +40,11 @@ func newSecureConn(ctx context.Context, insecure Conn, peers peer.Peerstore) (Co
session: session,
secure: session.ReadWriter(),
}
conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)
log.Debugf("newSecureConn: %v to %v handshake success!", conn.LocalPeer(), conn.RemotePeer())
return conn, nil
}
// close is called by ContextCloser
func (c *secureConn) close() error {
func (c *secureConn) Close() error {
if err := c.secure.Close(); err != nil {
c.insecure.Close()
return err

View File

@ -15,93 +15,65 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func setupSecureConn(t *testing.T, c Conn) Conn {
func setupSecureConn(t *testing.T, ctx context.Context, c Conn) (Conn, error) {
c, ok := c.(*secureConn)
if ok {
return c
return c, nil
}
// shouldn't happen, because dial + listen already return secure conns.
s, err := newSecureConn(c.Context(), c, peer.NewPeerstore())
s, err := newSecureConn(ctx, c, peer.NewPeerstore())
if err != nil {
t.Fatal(err)
return nil, err
}
return s
return s, nil
}
func TestSecureClose(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/6634", "/ip4/127.0.0.1/tcp/6645")
c1 = setupSecureConn(t, c1)
c2 = setupSecureConn(t, c2)
select {
case <-c1.Closed():
t.Fatal("done before close")
case <-c2.Closed():
t.Fatal("done before close")
default:
c1, err1 := setupSecureConn(t, ctx, c1)
c2, err2 := setupSecureConn(t, ctx, c2)
if err1 != nil {
t.Fatal(err1)
}
if err2 != nil {
t.Fatal(err2)
}
testOneSendRecv(t, c1, c2)
testOneSendRecv(t, c2, c1)
c1.Close()
select {
case <-c1.Closed():
default:
t.Fatal("not done after close")
}
testNotOneSendRecv(t, c1, c2)
testNotOneSendRecv(t, c2, c1)
c2.Close()
select {
case <-c2.Closed():
default:
t.Fatal("not done after close")
}
cancel() // close the listener :P
}
func TestSecureCancel(t *testing.T) {
func TestSecureCancelHandshake(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/6634", "/ip4/127.0.0.1/tcp/6645")
c1 = setupSecureConn(t, c1)
c2 = setupSecureConn(t, c2)
select {
case <-c1.Closed():
t.Fatal("done before close")
case <-c2.Closed():
t.Fatal("done before close")
default:
}
c1.Close()
c2.Close()
cancel() // listener
// wait to ensure other goroutines run and close things.
<-time.After(time.Microsecond * 10)
// test that cancel called Close.
select {
case <-c1.Closed():
default:
t.Fatal("not done after cancel")
}
select {
case <-c2.Closed():
default:
t.Fatal("not done after cancel")
}
done := make(chan struct{})
go func() {
_, err1 := setupSecureConn(t, ctx, c1)
_, err2 := setupSecureConn(t, ctx, c2)
if err1 == nil {
t.Fatal(err1)
}
if err2 == nil {
t.Fatal(err2)
}
done <- struct{}{}
}()
<-done
}
func TestSecureCloseLeak(t *testing.T) {
@ -122,8 +94,14 @@ func TestSecureCloseLeak(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2)
c1 = setupSecureConn(t, c1)
c2 = setupSecureConn(t, c2)
c1, err1 := setupSecureConn(t, ctx, c1)
c2, err2 := setupSecureConn(t, ctx, c2)
if err1 != nil {
t.Fatal(err1)
}
if err2 != nil {
t.Fatal(err2)
}
for i := 0; i < num; i++ {
b1 := []byte("beep")

View File

@ -23,8 +23,5 @@ func (e *Error) Error() string {
}
func New(errs ...error) *Error {
if len(errs) == 0 {
return nil
}
return &Error{errs}
}