From 393842e24569ed6c3ced3490f2f42433f1ed71a8 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 14 Dec 2014 14:14:22 -0800 Subject: [PATCH] much simpler net - removed ctxcloser - removed multiconn - focused on netio --- net/conn/conn.go | 21 +-- net/conn/conn_test.go | 90 ++++----- net/conn/dial.go | 15 +- net/conn/dial_test.go | 35 ++-- net/conn/handshake.go | 70 ++++--- net/conn/interface.go | 101 +--------- net/conn/listen.go | 146 ++++++--------- net/conn/multiconn.go | 351 ----------------------------------- net/conn/multiconn_test.go | 338 --------------------------------- net/conn/secure_conn.go | 7 +- net/conn/secure_conn_test.go | 104 ++++------- util/multierr/multierr.go | 3 - 12 files changed, 215 insertions(+), 1066 deletions(-) delete mode 100644 net/conn/multiconn.go delete mode 100644 net/conn/multiconn_test.go diff --git a/net/conn/conn.go b/net/conn/conn.go index 0b8c86a19..0d8c8e00e 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -13,17 +13,14 @@ import ( peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" - ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) -var log = u.Logger("conn") +var log = eventlog.Logger("conn") const ( // MaxMessageSize is the size of the largest single message. (4MB) MaxMessageSize = 1 << 22 - - // HandshakeTimeout for when nodes first connect - HandshakeTimeout = time.Second * 5 ) // ReleaseBuffer puts the given byte array back into the buffer pool, @@ -39,13 +36,10 @@ type singleConn struct { remote peer.Peer maconn manet.Conn msgrw msgio.ReadWriteCloser - - ctxc.ContextCloser } // newConn constructs a new connection -func newSingleConn(ctx context.Context, local, remote peer.Peer, - maconn manet.Conn) (Conn, error) { +func newSingleConn(ctx context.Context, local, remote peer.Peer, maconn manet.Conn) (Conn, error) { conn := &singleConn{ local: local, @@ -53,14 +47,10 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer, maconn: maconn, msgrw: msgio.NewReadWriter(maconn), } - - conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close) - log.Debugf("newSingleConn %p: %v to %v", conn, local, remote) // version handshake - ctxT, _ := context.WithTimeout(ctx, HandshakeTimeout) - if err := Handshake1(ctxT, conn); err != nil { + if err := Handshake1(ctx, conn); err != nil { conn.Close() return nil, fmt.Errorf("Handshake1 failed: %s", err) } @@ -70,9 +60,8 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer, } // close is the internal close function, called by ContextCloser.Close -func (c *singleConn) close() error { +func (c *singleConn) Close() error { log.Debugf("%s closing Conn with %s", c.local, c.remote) - // close underlying connection return c.msgrw.Close() } diff --git a/net/conn/conn_test.go b/net/conn/conn_test.go index cca6f7fd7..cdc786c1e 100644 --- a/net/conn/conn_test.go +++ b/net/conn/conn_test.go @@ -13,73 +13,51 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) +func testOneSendRecv(t *testing.T, c1, c2 Conn) { + m1 := []byte("hello") + if err := c1.WriteMsg(m1); err != nil { + t.Fatal(err) + } + m2, err := c2.ReadMsg() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(m1, m2) { + t.Fatal("failed to send: %s %s", m1, m2) + } +} + +func testNotOneSendRecv(t *testing.T, c1, c2 Conn) { + m1 := []byte("hello") + if err := c1.WriteMsg(m1); err == nil { + t.Fatal("write should have failed", err) + } + _, err := c2.ReadMsg() + if err == nil { + t.Fatal("read should have failed", err) + } +} + func TestClose(t *testing.T) { // t.Skip("Skipping in favor of another test") - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/5534", "/ip4/127.0.0.1/tcp/5545") - select { - case <-c1.Closed(): - t.Fatal("done before close") - case <-c2.Closed(): - t.Fatal("done before close") - default: - } + testOneSendRecv(t, c1, c2) + testOneSendRecv(t, c2, c1) c1.Close() - select { - case <-c1.Closed(): - default: - t.Fatal("not done after cancel") - } + time.After(200 * time.Millisecond) + testNotOneSendRecv(t, c1, c2) + testNotOneSendRecv(t, c2, c1) c2.Close() - select { - case <-c2.Closed(): - default: - t.Fatal("not done after cancel") - } - - cancel() // close the listener :P -} - -func TestCancel(t *testing.T) { - // t.Skip("Skipping in favor of another test") - - ctx, cancel := context.WithCancel(context.Background()) - c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/5534", "/ip4/127.0.0.1/tcp/5545") - - select { - case <-c1.Closed(): - t.Fatal("done before close") - case <-c2.Closed(): - t.Fatal("done before close") - default: - } - - c1.Close() - c2.Close() - cancel() // listener - - // wait to ensure other goroutines run and close things. - <-time.After(time.Microsecond * 10) - // test that cancel called Close. - - select { - case <-c1.Closed(): - default: - t.Fatal("not done after cancel") - } - - select { - case <-c2.Closed(): - default: - t.Fatal("not done after cancel") - } - + time.After(20000 * time.Millisecond) + testNotOneSendRecv(t, c1, c2) + testNotOneSendRecv(t, c2, c1) } func TestCloseLeak(t *testing.T) { diff --git a/net/conn/dial.go b/net/conn/dial.go index 96850e376..8fe3b7c85 100644 --- a/net/conn/dial.go +++ b/net/conn/dial.go @@ -4,7 +4,6 @@ import ( "strings" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" @@ -67,11 +66,25 @@ func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.P return nil, err } + select { + case <-ctx.Done(): + maconn.Close() + return nil, err + default: + } + c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn) if err != nil { return nil, err } + select { + case <-ctx.Done(): + c.Close() + return nil, err + default: + } + // return c, nil return newSecureConn(ctx, c, d.Peerstore) } diff --git a/net/conn/dial_test.go b/net/conn/dial_test.go index 06bad7949..b941b2d17 100644 --- a/net/conn/dial_test.go +++ b/net/conn/dial_test.go @@ -33,16 +33,19 @@ func setupPeer(addr string) (peer.Peer, error) { func echoListen(ctx context.Context, listener Listener) { for { - select { - case <-ctx.Done(): - return - case c := <-listener.Accept(): - go echo(ctx, c) + c, err := listener.Accept() + if err != nil { + select { + case <-ctx.Done(): + return + default: + } } + go echo(c.(Conn)) } } -func echo(ctx context.Context, c Conn) { +func echo(c Conn) { io.Copy(c, c) } @@ -78,14 +81,24 @@ func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) { LocalPeer: p2, } - c2, err := d2.Dial(ctx, "tcp", p1) + var c2 Conn + + done := make(chan struct{}) + go func() { + c2, err = d2.Dial(ctx, "tcp", p1) + if err != nil { + t.Fatal("error dialing peer", err) + } + done <- struct{}{} + }() + + c1, err := l1.Accept() if err != nil { - t.Fatal("error dialing peer", err) + t.Fatal("failed to accept") } + <-done - c1 := <-l1.Accept() - - return c1, c2 + return c1.(Conn), c2 } func TestDialer(t *testing.T) { diff --git a/net/conn/handshake.go b/net/conn/handshake.go index ca2164d2b..c188fc864 100644 --- a/net/conn/handshake.go +++ b/net/conn/handshake.go @@ -6,8 +6,9 @@ import ( handshake "github.com/jbenet/go-ipfs/net/handshake" hspb "github.com/jbenet/go-ipfs/net/handshake/pb" + ggprotoio "code.google.com/p/gogoprotobuf/io" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + ps "github.com/jbenet/go-peerstream" ) // Handshake1 exchanges local and remote versions and compares them @@ -16,30 +17,31 @@ func Handshake1(ctx context.Context, c Conn) error { rpeer := c.RemotePeer() lpeer := c.LocalPeer() - var remoteH, localH *hspb.Handshake1 - localH = handshake.Handshake1Msg() + // setup up protobuf io + maxSize := 4096 + r := ggprotoio.NewDelimitedReader(c, maxSize) + w := ggprotoio.NewDelimitedWriter(c) + localH := handshake.Handshake1Msg() + remoteH := new(hspb.Handshake1) - myVerBytes, err := proto.Marshal(localH) - if err != nil { - return err - } - - if err := CtxWriteMsg(ctx, c, myVerBytes); err != nil { + // send the outgoing handshake message + if err := w.WriteMsg(localH); err != nil { return err } log.Debugf("%p sent my version (%s) to %s", c, localH, rpeer) + log.Event(ctx, "handshake1Sent", lpeer) - data, err := CtxReadMsg(ctx, c) - if err != nil { - return err + select { + case <-ctx.Done(): + return ctx.Err() + default: } - remoteH = new(hspb.Handshake1) - err = proto.Unmarshal(data, remoteH) - if err != nil { - return fmt.Errorf("could not decode remote version: %q", err) + if err := r.ReadMsg(remoteH); err != nil { + return fmt.Errorf("could not receive remote version: %q", err) } log.Debugf("%p received remote version (%s) from %s", c, remoteH, rpeer) + log.Event(ctx, "handshake1Received", lpeer) if err := handshake.Handshake1Compatible(localH, remoteH); err != nil { log.Infof("%s (%s) incompatible version with %s (%s)", lpeer, localH, rpeer, remoteH) @@ -51,36 +53,30 @@ func Handshake1(ctx context.Context, c Conn) error { } // Handshake3 exchanges local and remote service information -func Handshake3(ctx context.Context, c Conn) (*handshake.Handshake3Result, error) { +func Handshake3(ctx context.Context, stream ps.Stream, c Conn) (*handshake.Handshake3Result, error) { rpeer := c.RemotePeer() lpeer := c.LocalPeer() - // setup + send the message to remote - var remoteH, localH *hspb.Handshake3 - localH = handshake.Handshake3Msg(lpeer, c.RemoteMultiaddr()) - localB, err := proto.Marshal(localH) - if err != nil { - return nil, err - } + // setup up protobuf io + maxSize := 4096 + r := ggprotoio.NewDelimitedReader(stream, maxSize) + w := ggprotoio.NewDelimitedWriter(stream) + localH := handshake.Handshake3Msg(lpeer, c.RemoteMultiaddr()) + remoteH := new(hspb.Handshake3) - if err := CtxWriteMsg(ctx, c, localB); err != nil { + // setup + send the message to remote + if err := w.WriteMsg(localH); err != nil { return nil, err } - log.Debugf("Handshake1: sent to %s", rpeer) + log.Debugf("Handshake3: sent to %s", rpeer) + log.Event(ctx, "handshake3Sent", lpeer, rpeer) // wait + listen for response - remoteB, err := CtxReadMsg(ctx, c) - if err != nil { - return nil, err + if err := r.ReadMsg(remoteH); err != nil { + return nil, fmt.Errorf("Handshake3 could not receive remote msg: %q", err) } - - remoteH = new(hspb.Handshake3) - err = proto.Unmarshal(remoteB, remoteH) - if err != nil { - return nil, fmt.Errorf("Handshake3 could not decode remote msg: %q", err) - } - - log.Debugf("Handshake3 received from %s", rpeer) + log.Debugf("Handshake3: received from %s", rpeer) + log.Event(ctx, "handshake3Received", lpeer, rpeer) // actually update our state based on the new knowledge res, err := handshake.Handshake3Update(lpeer, rpeer, remoteH) diff --git a/net/conn/interface.go b/net/conn/interface.go index ed6631f71..55f1a0e9b 100644 --- a/net/conn/interface.go +++ b/net/conn/interface.go @@ -1,15 +1,13 @@ package conn import ( - "errors" + "io" "net" "time" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" - ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) @@ -20,9 +18,6 @@ type Map map[u.Key]Conn // Conn is a generic message-based Peer-to-Peer connection. type Conn interface { - // implement ContextCloser too! - ctxc.ContextCloser - // ID is an identifier unique to this connection. ID() string @@ -47,6 +42,7 @@ type Conn interface { msgio.Reader msgio.Writer + io.Closer } // Dialer is an object that can open connections. We could have a "convenience" @@ -67,9 +63,12 @@ type Dialer struct { type Listener interface { // Accept waits for and returns the next connection to the listener. - Accept() <-chan Conn + Accept() (net.Conn, error) - // Multiaddr is the identity of the local Peer. + // Addr is the local address + Addr() net.Addr + + // Multiaddr is the local multiaddr address Multiaddr() ma.Multiaddr // LocalPeer is the identity of the local Peer. @@ -84,89 +83,3 @@ type Listener interface { // Any blocked Accept operations will be unblocked and return errors. Close() error } - -// CtxRead is a function that Reads from a connection while respecting a -// Context. Though it cannot cancel the read per-se (as not all Connections -// implement SetTimeout, and a CancelFunc can't be predicted), at least it -// doesn't hang. The Read will eventually return and the goroutine will exit. -func CtxRead(ctx context.Context, c Conn, buf []byte) (n int, err error) { - done := make(chan struct{}) - go func() { - n, err = c.Read(buf) - close(done) - }() - - select { - case <-ctx.Done(): - return 0, ctx.Err() - - case <-c.Closing(): - return 0, errors.New("remote connection closed") - - case <-done: - return n, err - } -} - -// CtxReadMsg is a function that Reads from a connection while respecting a -// Context. See CtxRead. -func CtxReadMsg(ctx context.Context, c Conn) (msg []byte, err error) { - done := make(chan struct{}) - go func() { - msg, err = c.ReadMsg() - close(done) - }() - - select { - case <-ctx.Done(): - return msg, ctx.Err() - - case <-c.Closing(): - return msg, errors.New("remote connection closed") - - case <-done: - return msg, err - } -} - -// CtxWrite is a function that Writes to a connection while respecting a -// Context. See CtxRead. -func CtxWrite(ctx context.Context, c Conn, buf []byte) (n int, err error) { - done := make(chan struct{}) - go func() { - n, err = c.Read(buf) - close(done) - }() - - select { - case <-ctx.Done(): - return 0, ctx.Err() - - case <-c.Closing(): - return 0, errors.New("remote connection closed") - - case <-done: - return n, err - } -} - -// CtxWriteMsg is a function that Writes to a connection while respecting a -// Context. See CtxRead. -func CtxWriteMsg(ctx context.Context, c Conn, buf []byte) (err error) { - done := make(chan struct{}) - go func() { - err = c.WriteMsg(buf) - close(done) - }() - - select { - case <-ctx.Done(): - return ctx.Err() - - case <-c.Closing(): - return errors.New("remote connection closed") - - case <-done: - return err - } -} diff --git a/net/conn/listen.go b/net/conn/listen.go index fc25db9f0..fec811df8 100644 --- a/net/conn/listen.go +++ b/net/conn/listen.go @@ -2,25 +2,22 @@ package conn import ( "fmt" + "net" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" peer "github.com/jbenet/go-ipfs/peer" - ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" ) // listener is an object that can accept connections. It implements Listener type listener struct { + notSecure bool + notSecureIMeanIt bool + manet.Listener - // chansize is the size of the internal channels for concurrency - chansize int - - // channel of incoming conections - conns chan Conn - // Local multiaddr to listen on maddr ma.Multiaddr @@ -29,79 +26,49 @@ type listener struct { // Peerstore is the set of peers we know about locally peers peer.Peerstore - - // Context for children Conn - ctx context.Context - - // embedded ContextCloser - ctxc.ContextCloser } -// disambiguate func (l *listener) Close() error { - return l.ContextCloser.Close() -} - -// close called by ContextCloser.Close -func (l *listener) close() error { log.Infof("listener closing: %s %s", l.local, l.maddr) return l.Listener.Close() } -func (l *listener) listen() { - defer l.Children().Done() - - // handle at most chansize concurrent handshakes - sem := make(chan struct{}, l.chansize) - - // handle is a goroutine work function that handles the handshake. - // it's here only so that accepting new connections can happen quickly. - handle := func(maconn manet.Conn) { - defer func() { <-sem }() // release - - c, err := newSingleConn(l.ctx, l.local, nil, maconn) - if err != nil { - log.Errorf("Error accepting connection: %v", err) - return - } - - // if insecure: - // l.conns <- c - - // if secure - sc, err := newSecureConn(l.ctx, c, l.peers) - if err != nil { - log.Errorf("Error securing connection: %v", err) - return - } - l.conns <- sc - } - - for { - log.Infof("swarm listening on %s -- %v\n", l.Multiaddr(), l.Listener) - maconn, err := l.Listener.Accept() - if err != nil { - - // if closing, we should exit. - select { - case <-l.Closing(): - return // done. - default: - } - - log.Errorf("Failed to accept connection: %v", err) - continue - } - - sem <- struct{}{} // acquire - go handle(maconn) - } -} - // Accept waits for and returns the next connection to the listener. // Note that unfortunately this -func (l *listener) Accept() <-chan Conn { - return l.conns +func (l *listener) Accept() (net.Conn, error) { + + // listeners dont have contexts. given changes dont make sense here anymore + // note that the parent of listener will Close, which will interrupt all io. + // Contexts and io don't mix. + ctx := context.Background() + + maconn, err := l.Listener.Accept() + if err != nil { + return nil, err + } + + c, err := newSingleConn(ctx, l.local, nil, maconn) + if err != nil { + return nil, fmt.Errorf("Error accepting connection: %v", err) + } + + if l.Secure() { + sc, err := newSecureConn(ctx, c, l.peers) + if err != nil { + return nil, fmt.Errorf("Error securing connection: %v", err) + } + return sc, nil + } + + return c, nil +} + +func (l *listener) Secure() bool { + return !(l.notSecure && l.notSecureIMeanIt) +} + +func (l *listener) Addr() net.Addr { + return l.Listener.Addr() } // Multiaddr is the identity of the local Peer. @@ -121,6 +88,16 @@ func (l *listener) Peerstore() peer.Peerstore { return l.peers } +func (l *listener) Loggable() map[string]interface{} { + return map[string]interface{}{ + "listener": map[string]interface{}{ + "peer": l.LocalPeer(), + "address": l.Multiaddr(), + "secure": l.Secure(), + }, + } +} + // Listen listens on the particular multiaddr, with given peer and peerstore. func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.Peerstore) (Listener, error) { @@ -129,27 +106,16 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer. return nil, fmt.Errorf("Failed to listen on %s: %s", addr, err) } - // todo make this a variable - chansize := 10 - l := &listener{ - Listener: ml, - maddr: addr, - peers: peers, - local: local, - conns: make(chan Conn, chansize), - chansize: chansize, - ctx: ctx, + Listener: ml, + maddr: addr, + peers: peers, + local: local, + notSecure: false, + notSecureIMeanIt: false, } - // need a separate context to use for the context closer. - // This is because the parent context will be given to all connections too, - // and if we close the listener, the connections shouldn't share the fate. - ctx2, _ := context.WithCancel(ctx) - l.ContextCloser = ctxc.NewContextCloser(ctx2, l.close) - - l.Children().Add(1) - go l.listen() - + log.Infof("swarm listening on %s\n", l.Multiaddr()) + log.Event(ctx, "swarmListen", l) return l, nil } diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go deleted file mode 100644 index e085e8e2c..000000000 --- a/net/conn/multiconn.go +++ /dev/null @@ -1,351 +0,0 @@ -package conn - -import ( - "errors" - "fmt" - "net" - "sync" - "time" - - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" - ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" -) - -// MultiConnMap is for shorthand -type MultiConnMap map[u.Key]*MultiConn - -// MultiConn represents a single connection to another Peer (IPFS Node). -type MultiConn struct { - - // connections, mapped by a string, which uniquely identifies the connection. - // this string is: /addr1/peer1/addr2/peer2 (peers ordered lexicographically) - conns map[string]Conn - - local peer.Peer - remote peer.Peer - - // fan-in - fanIn chan []byte - - // for adding/removing connections concurrently - sync.RWMutex - ctxc.ContextCloser -} - -// NewMultiConn constructs a new connection -func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (*MultiConn, error) { - - c := &MultiConn{ - local: local, - remote: remote, - conns: map[string]Conn{}, - fanIn: make(chan []byte), - } - - // must happen before Adds / fanOut - c.ContextCloser = ctxc.NewContextCloser(ctx, c.close) - - if conns != nil && len(conns) > 0 { - c.Add(conns...) - } - - return c, nil -} - -// Add adds given Conn instances to multiconn. -func (c *MultiConn) Add(conns ...Conn) { - c.Lock() - defer c.Unlock() - - for _, c2 := range conns { - log.Debugf("MultiConn: adding %s", c2) - if c.LocalPeer() != c2.LocalPeer() || c.RemotePeer() != c2.RemotePeer() { - log.Error(c2) - c.Unlock() // ok to unlock (to log). panicing. - log.Error(c) - // log.Errorf("c.LocalPeer: %s %p", c.LocalPeer(), c.LocalPeer()) - // log.Errorf("c2.LocalPeer: %s %p", c2.LocalPeer(), c2.LocalPeer()) - // log.Errorf("c.RemotePeer: %s %p", c.RemotePeer(), c.RemotePeer()) - // log.Errorf("c2.RemotePeer: %s %p", c2.RemotePeer(), c2.RemotePeer()) - c.Lock() // gotta relock to avoid lock panic from deferring. - panic("connection addresses mismatch") - } - - c.conns[c2.ID()] = c2 - c.Children().Add(1) - c2.Children().Add(1) // yep, on the child too. - go c.fanInSingle(c2) - log.Debugf("MultiConn: added %s", c2) - } -} - -// Remove removes given Conn instances from multiconn. -func (c *MultiConn) Remove(conns ...Conn) { - - // first remove them to avoid sending any more messages through it. - { - c.Lock() - for _, c1 := range conns { - c2, found := c.conns[c1.ID()] - if !found { - panic("Conn not in MultiConn") - } - if c1 != c2 { - panic("different Conn objects for same id.") - } - - delete(c.conns, c2.ID()) - } - c.Unlock() - } - - // close all in parallel, but wait for all to be done closing. - CloseConns(conns...) -} - -// CloseConns closes multiple connections in parallel, and waits for all -// to finish closing. -func CloseConns(conns ...Conn) { - var wg sync.WaitGroup - for _, child := range conns { - - select { - case <-child.Closed(): // if already closed, continue - continue - default: - } - - wg.Add(1) - go func(child Conn) { - child.Close() - wg.Done() - }(child) - } - wg.Wait() -} - -// fanInSingle Reads from a connection, and sends to the fanIn. -// waits for child to close and reclaims resources -func (c *MultiConn) fanInSingle(child Conn) { - // cleanup all data associated with this child Connection. - defer func() { - log.Debugf("closing: %s", child) - - // in case it still is in the map, remove it. - c.Lock() - delete(c.conns, child.ID()) - connLen := len(c.conns) - c.Unlock() - - c.Children().Done() - child.Children().Done() - - if connLen == 0 { - c.Close() // close self if all underlying children are gone? - } - }() - - for { - msg, err := child.ReadMsg() - if err != nil { - log.Warning(err) - return - } - - select { - case <-c.Closing(): // multiconn closing - return - - case <-child.Closing(): // child closing - return - - case c.fanIn <- msg: - } - } -} - -// close is the internal close function, called by ContextCloser.Close -func (c *MultiConn) close() error { - log.Debugf("%s closing Conn with %s", c.local, c.remote) - - // get connections - c.RLock() - conns := make([]Conn, 0, len(c.conns)) - for _, c := range c.conns { - conns = append(conns, c) - } - c.RUnlock() - - // close underlying connections - CloseConns(conns...) - close(c.fanIn) - return nil -} - -// BestConn is the best connection in this MultiConn -func (c *MultiConn) BestConn() Conn { - c.RLock() - defer c.RUnlock() - - var id1 string - var c1 Conn - for id2, c2 := range c.conns { - if id1 == "" || id1 < id2 { - id1 = id2 - c1 = c2 - } - } - return c1 -} - -// ID is an identifier unique to this connection. -// In MultiConn, this is all the children IDs XORed together. -func (c *MultiConn) ID() string { - c.RLock() - defer c.RUnlock() - - ids := []byte(nil) - for i := range c.conns { - if ids == nil { - ids = []byte(i) - } else { - ids = u.XOR(ids, []byte(i)) - } - } - - return string(ids) -} - -func (c *MultiConn) getConns() []Conn { - c.RLock() - defer c.RUnlock() - var conns []Conn - for _, c := range c.conns { - conns = append(conns, c) - } - return conns -} - -func (c *MultiConn) String() string { - return String(c, "MultiConn") -} - -func (c *MultiConn) LocalAddr() net.Addr { - bc := c.BestConn() - if bc == nil { - return nil - } - return bc.LocalAddr() -} - -func (c *MultiConn) RemoteAddr() net.Addr { - bc := c.BestConn() - if bc == nil { - return nil - } - return bc.RemoteAddr() -} - -func (c *MultiConn) SetDeadline(t time.Time) error { - bc := c.BestConn() - if bc == nil { - return nil - } - return bc.SetDeadline(t) -} - -func (c *MultiConn) SetReadDeadline(t time.Time) error { - bc := c.BestConn() - if bc == nil { - return nil - } - return bc.SetReadDeadline(t) -} - -func (c *MultiConn) SetWriteDeadline(t time.Time) error { - bc := c.BestConn() - if bc == nil { - return nil - } - return bc.SetWriteDeadline(t) -} - -// LocalMultiaddr is the Multiaddr on this side -func (c *MultiConn) LocalMultiaddr() ma.Multiaddr { - bc := c.BestConn() - if bc == nil { - return nil - } - return bc.LocalMultiaddr() -} - -// RemoteMultiaddr is the Multiaddr on the remote side -func (c *MultiConn) RemoteMultiaddr() ma.Multiaddr { - bc := c.BestConn() - if bc == nil { - return nil - } - return bc.RemoteMultiaddr() -} - -// LocalPeer is the Peer on this side -func (c *MultiConn) LocalPeer() peer.Peer { - return c.local -} - -// RemotePeer is the Peer on the remote side -func (c *MultiConn) RemotePeer() peer.Peer { - return c.remote -} - -// Read reads data, net.Conn style -func (c *MultiConn) Read(buf []byte) (int, error) { - return 0, errors.New("multiconn does not support Read. use ReadMsg") -} - -// Write writes data, net.Conn style -func (c *MultiConn) Write(buf []byte) (int, error) { - bc := c.BestConn() - if bc == nil { - return 0, errors.New("no best connection") - } - return bc.Write(buf) -} - -func (c *MultiConn) NextMsgLen() (int, error) { - bc := c.BestConn() - if bc == nil { - return 0, errors.New("no best connection") - } - return bc.NextMsgLen() -} - -// ReadMsg reads data, net.Conn style -func (c *MultiConn) ReadMsg() ([]byte, error) { - next, ok := <-c.fanIn - if !ok { - return nil, fmt.Errorf("multiconn closed") - } - return next, nil -} - -// WriteMsg writes data, net.Conn style -func (c *MultiConn) WriteMsg(buf []byte) error { - bc := c.BestConn() - if bc == nil { - return errors.New("no best connection") - } - return bc.WriteMsg(buf) -} - -// ReleaseMsg releases a buffer -func (c *MultiConn) ReleaseMsg(m []byte) { - // here, we dont know where it came from. hm. - for _, c := range c.getConns() { - c.ReleaseMsg(m) - } -} diff --git a/net/conn/multiconn_test.go b/net/conn/multiconn_test.go deleted file mode 100644 index 6b28b3d52..000000000 --- a/net/conn/multiconn_test.go +++ /dev/null @@ -1,338 +0,0 @@ -package conn - -import ( - "fmt" - "sync" - "testing" - "time" - - peer "github.com/jbenet/go-ipfs/peer" - - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" -) - -func tcpAddr(t *testing.T, port int) ma.Multiaddr { - tcp, err := ma.NewMultiaddr(tcpAddrString(port)) - if err != nil { - t.Fatal(err) - } - return tcp -} - -func tcpAddrString(port int) string { - return fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port) -} - -type msg struct { - sent bool - received bool - payload string -} - -func (m *msg) Sent(t *testing.T) { - if m.sent { - t.Fatal("sent msg at incorrect state:", m) - } - m.sent = true -} - -func (m *msg) Received(t *testing.T) { - if m.received { - t.Fatal("received msg at incorrect state:", m) - } - m.received = true -} - -type msgMap struct { - sent int - recv int - msgs map[string]*msg -} - -func (mm *msgMap) Sent(t *testing.T, payload string) { - mm.msgs[payload].Sent(t) - mm.sent++ -} - -func (mm *msgMap) Received(t *testing.T, payload string) { - mm.msgs[payload].Received(t) - mm.recv++ -} - -func (mm *msgMap) CheckDone(t *testing.T) { - if mm.sent != len(mm.msgs) { - t.Fatal("failed to send all msgs", mm.sent, len(mm.msgs)) - } - - if mm.sent != len(mm.msgs) { - t.Fatal("failed to send all msgs", mm.sent, len(mm.msgs)) - } -} - -func genMessages(num int, tag string) *msgMap { - msgs := &msgMap{msgs: map[string]*msg{}} - for i := 0; i < num; i++ { - s := fmt.Sprintf("Message #%d -- %s", i, tag) - msgs.msgs[s] = &msg{payload: s} - } - return msgs -} - -func setupMultiConns(t *testing.T, ctx context.Context) (a, b *MultiConn) { - - log.Info("Setting up peers") - p1, err := setupPeer(tcpAddrString(11000)) - if err != nil { - t.Fatal("error setting up peer", err) - } - - p2, err := setupPeer(tcpAddrString(12000)) - if err != nil { - t.Fatal("error setting up peer", err) - } - - // peerstores - p1ps := peer.NewPeerstore() - p2ps := peer.NewPeerstore() - p1ps.Add(p1) - p2ps.Add(p2) - - // listeners - listen := func(addr ma.Multiaddr, p peer.Peer, ps peer.Peerstore) Listener { - l, err := Listen(ctx, addr, p, ps) - if err != nil { - t.Fatal(err) - } - return l - } - - log.Info("Setting up listeners") - p1l := listen(p1.Addresses()[0], p1, p1ps) - p2l := listen(p2.Addresses()[0], p2, p2ps) - - // dialers - p1d := &Dialer{Peerstore: p1ps, LocalPeer: p1} - p2d := &Dialer{Peerstore: p2ps, LocalPeer: p2} - - dial := func(d *Dialer, dst peer.Peer) <-chan Conn { - cc := make(chan Conn) - go func() { - c, err := d.Dial(ctx, "tcp", dst) - if err != nil { - t.Fatal("error dialing peer", err) - } - cc <- c - }() - return cc - } - - // connect simultaneously - log.Info("Connecting...") - p1dc := dial(p1d, p2) - p2dc := dial(p2d, p1) - - c12a := <-p1l.Accept() - c12b := <-p1dc - c21a := <-p2l.Accept() - c21b := <-p2dc - - log.Info("Ok, making multiconns") - c1, err := NewMultiConn(ctx, p1, p2, []Conn{c12a, c12b}) - if err != nil { - t.Fatal(err) - } - - c2, err := NewMultiConn(ctx, p2, p1, []Conn{c21a, c21b}) - if err != nil { - t.Fatal(err) - } - - p1l.Close() - p2l.Close() - - log.Info("did you make multiconns?") - return c1, c2 -} - -func TestMulticonnSend(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - - log.Info("TestMulticonnSend") - ctx := context.Background() - ctxC, cancel := context.WithCancel(ctx) - - c1, c2 := setupMultiConns(t, ctx) - - log.Info("gen msgs") - num := 100 - msgsFrom1 := genMessages(num, "from p1 to p2") - msgsFrom2 := genMessages(num, "from p2 to p1") - - var wg sync.WaitGroup - - send := func(c *MultiConn, msgs *msgMap) { - defer wg.Done() - - for _, m := range msgs.msgs { - log.Info("send: %s", m.payload) - c.WriteMsg([]byte(m.payload)) - msgs.Sent(t, m.payload) - <-time.After(time.Microsecond * 10) - } - } - - recv := func(ctx context.Context, c *MultiConn, msgs *msgMap) { - defer wg.Done() - - for { - select { - default: - case <-ctx.Done(): - return - } - - payload, err := c.ReadMsg() - if err != nil { - panic(err) - } - - msgs.Received(t, string(payload)) - log.Info("recv: %s", payload) - if msgs.recv == len(msgs.msgs) { - return - } - } - - } - - log.Info("msg send + recv") - - wg.Add(4) - go send(c1, msgsFrom1) - go send(c2, msgsFrom2) - go recv(ctxC, c1, msgsFrom2) - go recv(ctxC, c2, msgsFrom1) - wg.Wait() - cancel() - c1.Close() - c2.Close() - - msgsFrom1.CheckDone(t) - msgsFrom2.CheckDone(t) - <-time.After(100 * time.Millisecond) -} - -func TestMulticonnSendUnderlying(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - - log.Info("TestMulticonnSendUnderlying") - ctx := context.Background() - ctxC, cancel := context.WithCancel(ctx) - - c1, c2 := setupMultiConns(t, ctx) - - log.Info("gen msgs") - num := 100 - msgsFrom1 := genMessages(num, "from p1 to p2") - msgsFrom2 := genMessages(num, "from p2 to p1") - - var wg sync.WaitGroup - - send := func(c *MultiConn, msgs *msgMap) { - defer wg.Done() - - conns := make([]Conn, 0, len(c.conns)) - for _, c1 := range c.conns { - conns = append(conns, c1) - } - - i := 0 - for _, m := range msgs.msgs { - log.Info("send: %s", m.payload) - switch i % 3 { - case 0: - conns[0].WriteMsg([]byte(m.payload)) - case 1: - conns[1].WriteMsg([]byte(m.payload)) - case 2: - c.WriteMsg([]byte(m.payload)) - } - msgs.Sent(t, m.payload) - <-time.After(time.Microsecond * 10) - i++ - } - } - - recv := func(ctx context.Context, c *MultiConn, msgs *msgMap) { - defer wg.Done() - - for { - select { - default: - case <-ctx.Done(): - return - } - - payload, err := c.ReadMsg() - if err != nil { - panic(err) - } - - msgs.Received(t, string(payload)) - log.Info("recv: %s", payload) - if msgs.recv == len(msgs.msgs) { - return - } - } - - } - - log.Info("msg send + recv") - - wg.Add(4) - go send(c1, msgsFrom1) - go send(c2, msgsFrom2) - go recv(ctxC, c1, msgsFrom2) - go recv(ctxC, c2, msgsFrom1) - wg.Wait() - cancel() - c1.Close() - c2.Close() - - msgsFrom1.CheckDone(t) - msgsFrom2.CheckDone(t) -} - -func TestMulticonnClose(t *testing.T) { - // t.Skip("fooo") - - log.Info("TestMulticonnSendUnderlying") - ctx := context.Background() - c1, c2 := setupMultiConns(t, ctx) - - for _, c := range c1.getConns() { - c.Close() - } - - for _, c := range c2.getConns() { - c.Close() - } - - timeout := time.After(100 * time.Millisecond) - select { - case <-c1.Closed(): - case <-timeout: - t.Fatal("timeout") - } - - select { - case <-c2.Closed(): - case <-timeout: - t.Fatal("timeout") - } -} diff --git a/net/conn/secure_conn.go b/net/conn/secure_conn.go index 2e6d194e5..6437ea05f 100644 --- a/net/conn/secure_conn.go +++ b/net/conn/secure_conn.go @@ -10,7 +10,6 @@ import ( secio "github.com/jbenet/go-ipfs/crypto/secio" peer "github.com/jbenet/go-ipfs/peer" - ctxc "github.com/jbenet/go-ipfs/util/ctxcloser" ) // secureConn wraps another Conn object with an encrypted channel. @@ -24,8 +23,6 @@ type secureConn struct { // secure Session session secio.Session - - ctxc.ContextCloser } // newConn constructs a new connection @@ -43,13 +40,11 @@ func newSecureConn(ctx context.Context, insecure Conn, peers peer.Peerstore) (Co session: session, secure: session.ReadWriter(), } - conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close) log.Debugf("newSecureConn: %v to %v handshake success!", conn.LocalPeer(), conn.RemotePeer()) return conn, nil } -// close is called by ContextCloser -func (c *secureConn) close() error { +func (c *secureConn) Close() error { if err := c.secure.Close(); err != nil { c.insecure.Close() return err diff --git a/net/conn/secure_conn_test.go b/net/conn/secure_conn_test.go index 80a80c385..b73c0133c 100644 --- a/net/conn/secure_conn_test.go +++ b/net/conn/secure_conn_test.go @@ -15,93 +15,65 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) -func setupSecureConn(t *testing.T, c Conn) Conn { +func setupSecureConn(t *testing.T, ctx context.Context, c Conn) (Conn, error) { c, ok := c.(*secureConn) if ok { - return c + return c, nil } // shouldn't happen, because dial + listen already return secure conns. - s, err := newSecureConn(c.Context(), c, peer.NewPeerstore()) + s, err := newSecureConn(ctx, c, peer.NewPeerstore()) if err != nil { - t.Fatal(err) + return nil, err } - return s + return s, nil } func TestSecureClose(t *testing.T) { // t.Skip("Skipping in favor of another test") - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/6634", "/ip4/127.0.0.1/tcp/6645") - c1 = setupSecureConn(t, c1) - c2 = setupSecureConn(t, c2) - - select { - case <-c1.Closed(): - t.Fatal("done before close") - case <-c2.Closed(): - t.Fatal("done before close") - default: + c1, err1 := setupSecureConn(t, ctx, c1) + c2, err2 := setupSecureConn(t, ctx, c2) + if err1 != nil { + t.Fatal(err1) } + if err2 != nil { + t.Fatal(err2) + } + + testOneSendRecv(t, c1, c2) + testOneSendRecv(t, c2, c1) c1.Close() - select { - case <-c1.Closed(): - default: - t.Fatal("not done after close") - } + testNotOneSendRecv(t, c1, c2) + testNotOneSendRecv(t, c2, c1) - c2.Close() - - select { - case <-c2.Closed(): - default: - t.Fatal("not done after close") - } - - cancel() // close the listener :P } -func TestSecureCancel(t *testing.T) { +func TestSecureCancelHandshake(t *testing.T) { // t.Skip("Skipping in favor of another test") - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/6634", "/ip4/127.0.0.1/tcp/6645") - c1 = setupSecureConn(t, c1) - c2 = setupSecureConn(t, c2) - - select { - case <-c1.Closed(): - t.Fatal("done before close") - case <-c2.Closed(): - t.Fatal("done before close") - default: - } - - c1.Close() - c2.Close() - cancel() // listener - - // wait to ensure other goroutines run and close things. - <-time.After(time.Microsecond * 10) - // test that cancel called Close. - - select { - case <-c1.Closed(): - default: - t.Fatal("not done after cancel") - } - - select { - case <-c2.Closed(): - default: - t.Fatal("not done after cancel") - } + done := make(chan struct{}) + go func() { + _, err1 := setupSecureConn(t, ctx, c1) + _, err2 := setupSecureConn(t, ctx, c2) + if err1 == nil { + t.Fatal(err1) + } + if err2 == nil { + t.Fatal(err2) + } + done <- struct{}{} + }() + <-done } func TestSecureCloseLeak(t *testing.T) { @@ -122,8 +94,14 @@ func TestSecureCloseLeak(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2) - c1 = setupSecureConn(t, c1) - c2 = setupSecureConn(t, c2) + c1, err1 := setupSecureConn(t, ctx, c1) + c2, err2 := setupSecureConn(t, ctx, c2) + if err1 != nil { + t.Fatal(err1) + } + if err2 != nil { + t.Fatal(err2) + } for i := 0; i < num; i++ { b1 := []byte("beep") diff --git a/util/multierr/multierr.go b/util/multierr/multierr.go index 53139882e..6893400e2 100644 --- a/util/multierr/multierr.go +++ b/util/multierr/multierr.go @@ -23,8 +23,5 @@ func (e *Error) Error() string { } func New(errs ...error) *Error { - if len(errs) == 0 { - return nil - } return &Error{errs} }