diff --git a/p2p/net2/README.md b/p2p/net2/README.md
new file mode 100644
index 000000000..a1cf6aacf
--- /dev/null
+++ b/p2p/net2/README.md
@@ -0,0 +1,17 @@
+# Network
+
+The IPFS Network package handles all of the peer-to-peer networking. It connects to other hosts, it encrypts communications, it muxes messages between the network's client services and target hosts. It has multiple subcomponents:
+
+- `Conn` - a connection to a single Peer
+ - `MultiConn` - a set of connections to a single Peer
+ - `SecureConn` - an encrypted (tls-like) connection
+- `Swarm` - holds connections to Peers, multiplexes from/to each `MultiConn`
+- `Muxer` - multiplexes between `Services` and `Swarm`. Handles `Requet/Reply`.
+ - `Service` - connects between an outside client service and Network.
+ - `Handler` - the client service part that handles requests
+
+It looks a bit like this:
+
+
+
+
diff --git a/p2p/net2/conn/conn.go b/p2p/net2/conn/conn.go
new file mode 100644
index 000000000..65b12f23e
--- /dev/null
+++ b/p2p/net2/conn/conn.go
@@ -0,0 +1,157 @@
+package conn
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
+ mpool "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+ manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ u "github.com/jbenet/go-ipfs/util"
+ eventlog "github.com/jbenet/go-ipfs/util/eventlog"
+)
+
+var log = eventlog.Logger("conn")
+
+// ReleaseBuffer puts the given byte array back into the buffer pool,
+// first verifying that it is the correct size
+func ReleaseBuffer(b []byte) {
+ log.Debugf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
+ mpool.ByteSlicePool.Put(uint32(cap(b)), b)
+}
+
+// singleConn represents a single connection to another Peer (IPFS Node).
+type singleConn struct {
+ local peer.ID
+ remote peer.ID
+ maconn manet.Conn
+ msgrw msgio.ReadWriteCloser
+}
+
+// newConn constructs a new connection
+func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn) (Conn, error) {
+
+ conn := &singleConn{
+ local: local,
+ remote: remote,
+ maconn: maconn,
+ msgrw: msgio.NewReadWriter(maconn),
+ }
+
+ log.Debugf("newSingleConn %p: %v to %v", conn, local, remote)
+ return conn, nil
+}
+
+// close is the internal close function, called by ContextCloser.Close
+func (c *singleConn) Close() error {
+ log.Debugf("%s closing Conn with %s", c.local, c.remote)
+ // close underlying connection
+ return c.msgrw.Close()
+}
+
+// ID is an identifier unique to this connection.
+func (c *singleConn) ID() string {
+ return ID(c)
+}
+
+func (c *singleConn) String() string {
+ return String(c, "singleConn")
+}
+
+func (c *singleConn) LocalAddr() net.Addr {
+ return c.maconn.LocalAddr()
+}
+
+func (c *singleConn) RemoteAddr() net.Addr {
+ return c.maconn.RemoteAddr()
+}
+
+func (c *singleConn) LocalPrivateKey() ic.PrivKey {
+ return nil
+}
+
+func (c *singleConn) RemotePublicKey() ic.PubKey {
+ return nil
+}
+
+func (c *singleConn) SetDeadline(t time.Time) error {
+ return c.maconn.SetDeadline(t)
+}
+func (c *singleConn) SetReadDeadline(t time.Time) error {
+ return c.maconn.SetReadDeadline(t)
+}
+
+func (c *singleConn) SetWriteDeadline(t time.Time) error {
+ return c.maconn.SetWriteDeadline(t)
+}
+
+// LocalMultiaddr is the Multiaddr on this side
+func (c *singleConn) LocalMultiaddr() ma.Multiaddr {
+ return c.maconn.LocalMultiaddr()
+}
+
+// RemoteMultiaddr is the Multiaddr on the remote side
+func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
+ return c.maconn.RemoteMultiaddr()
+}
+
+// LocalPeer is the Peer on this side
+func (c *singleConn) LocalPeer() peer.ID {
+ return c.local
+}
+
+// RemotePeer is the Peer on the remote side
+func (c *singleConn) RemotePeer() peer.ID {
+ return c.remote
+}
+
+// Read reads data, net.Conn style
+func (c *singleConn) Read(buf []byte) (int, error) {
+ return c.msgrw.Read(buf)
+}
+
+// Write writes data, net.Conn style
+func (c *singleConn) Write(buf []byte) (int, error) {
+ return c.msgrw.Write(buf)
+}
+
+func (c *singleConn) NextMsgLen() (int, error) {
+ return c.msgrw.NextMsgLen()
+}
+
+// ReadMsg reads data, net.Conn style
+func (c *singleConn) ReadMsg() ([]byte, error) {
+ return c.msgrw.ReadMsg()
+}
+
+// WriteMsg writes data, net.Conn style
+func (c *singleConn) WriteMsg(buf []byte) error {
+ return c.msgrw.WriteMsg(buf)
+}
+
+// ReleaseMsg releases a buffer
+func (c *singleConn) ReleaseMsg(m []byte) {
+ c.msgrw.ReleaseMsg(m)
+}
+
+// ID returns the ID of a given Conn.
+func ID(c Conn) string {
+ l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().Pretty())
+ r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().Pretty())
+ lh := u.Hash([]byte(l))
+ rh := u.Hash([]byte(r))
+ ch := u.XOR(lh, rh)
+ return u.Key(ch).Pretty()
+}
+
+// String returns the user-friendly String representation of a conn
+func String(c Conn, typ string) string {
+ return fmt.Sprintf("%s (%s) <-- %s %p --> (%s) %s",
+ c.LocalPeer(), c.LocalMultiaddr(), typ, c, c.RemoteMultiaddr(), c.RemotePeer())
+}
diff --git a/p2p/net2/conn/conn_test.go b/p2p/net2/conn/conn_test.go
new file mode 100644
index 000000000..ccbbade6a
--- /dev/null
+++ b/p2p/net2/conn/conn_test.go
@@ -0,0 +1,122 @@
+package conn
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "runtime"
+ "sync"
+ "testing"
+ "time"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+func testOneSendRecv(t *testing.T, c1, c2 Conn) {
+ log.Debugf("testOneSendRecv from %s to %s", c1.LocalPeer(), c2.LocalPeer())
+ m1 := []byte("hello")
+ if err := c1.WriteMsg(m1); err != nil {
+ t.Fatal(err)
+ }
+ m2, err := c2.ReadMsg()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(m1, m2) {
+ t.Fatal("failed to send: %s %s", m1, m2)
+ }
+}
+
+func testNotOneSendRecv(t *testing.T, c1, c2 Conn) {
+ m1 := []byte("hello")
+ if err := c1.WriteMsg(m1); err == nil {
+ t.Fatal("write should have failed", err)
+ }
+ _, err := c2.ReadMsg()
+ if err == nil {
+ t.Fatal("read should have failed", err)
+ }
+}
+
+func TestClose(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ c1, c2, _, _ := setupSingleConn(t, ctx)
+
+ testOneSendRecv(t, c1, c2)
+ testOneSendRecv(t, c2, c1)
+
+ c1.Close()
+ testNotOneSendRecv(t, c1, c2)
+
+ c2.Close()
+ testNotOneSendRecv(t, c2, c1)
+ testNotOneSendRecv(t, c1, c2)
+}
+
+func TestCloseLeak(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+ if testing.Short() {
+ t.SkipNow()
+ }
+
+ if os.Getenv("TRAVIS") == "true" {
+ t.Skip("this doesn't work well on travis")
+ }
+
+ var wg sync.WaitGroup
+
+ runPair := func(num int) {
+ ctx, cancel := context.WithCancel(context.Background())
+ c1, c2, _, _ := setupSingleConn(t, ctx)
+
+ for i := 0; i < num; i++ {
+ b1 := []byte(fmt.Sprintf("beep%d", i))
+ c1.WriteMsg(b1)
+ b2, err := c2.ReadMsg()
+ if err != nil {
+ panic(err)
+ }
+ if !bytes.Equal(b1, b2) {
+ panic(fmt.Errorf("bytes not equal: %s != %s", b1, b2))
+ }
+
+ b2 = []byte(fmt.Sprintf("boop%d", i))
+ c2.WriteMsg(b2)
+ b1, err = c1.ReadMsg()
+ if err != nil {
+ panic(err)
+ }
+ if !bytes.Equal(b1, b2) {
+ panic(fmt.Errorf("bytes not equal: %s != %s", b1, b2))
+ }
+
+ <-time.After(time.Microsecond * 5)
+ }
+
+ c1.Close()
+ c2.Close()
+ cancel() // close the listener
+ wg.Done()
+ }
+
+ var cons = 5
+ var msgs = 50
+ log.Debugf("Running %d connections * %d msgs.\n", cons, msgs)
+ for i := 0; i < cons; i++ {
+ wg.Add(1)
+ go runPair(msgs)
+ }
+
+ log.Debugf("Waiting...\n")
+ wg.Wait()
+ // done!
+
+ <-time.After(time.Millisecond * 150)
+ if runtime.NumGoroutine() > 20 {
+ // panic("uncomment me to debug")
+ t.Fatal("leaking goroutines:", runtime.NumGoroutine())
+ }
+}
diff --git a/p2p/net2/conn/dial.go b/p2p/net2/conn/dial.go
new file mode 100644
index 000000000..1294f2241
--- /dev/null
+++ b/p2p/net2/conn/dial.go
@@ -0,0 +1,131 @@
+package conn
+
+import (
+ "fmt"
+ "strings"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+ manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
+
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ debugerror "github.com/jbenet/go-ipfs/util/debugerror"
+)
+
+// String returns the string rep of d.
+func (d *Dialer) String() string {
+ return fmt.Sprintf("", d.LocalPeer, d.LocalAddrs[0])
+}
+
+// Dial connects to a peer over a particular address
+// Ensures raddr is part of peer.Addresses()
+// Example: d.DialAddr(ctx, peer.Addresses()[0], peer)
+func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) {
+
+ network, _, err := manet.DialArgs(raddr)
+ if err != nil {
+ return nil, err
+ }
+
+ if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
+ return nil, debugerror.Errorf("Attempted to connect to zero address: %s", raddr)
+ }
+
+ var laddr ma.Multiaddr
+ if len(d.LocalAddrs) > 0 {
+ // laddr := MultiaddrNetMatch(raddr, d.LocalAddrs)
+ laddr = NetAddress(network, d.LocalAddrs)
+ if laddr == nil {
+ return nil, debugerror.Errorf("No local address for network %s", network)
+ }
+ }
+
+ // TODO: try to get reusing addr/ports to work.
+ // madialer := manet.Dialer{LocalAddr: laddr}
+ madialer := manet.Dialer{}
+
+ log.Debugf("%s dialing %s %s", d.LocalPeer, remote, raddr)
+ maconn, err := madialer.Dial(raddr)
+ if err != nil {
+ return nil, err
+ }
+
+ var connOut Conn
+ var errOut error
+ done := make(chan struct{})
+
+ // do it async to ensure we respect don contexteone
+ go func() {
+ defer func() { done <- struct{}{} }()
+
+ c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn)
+ if err != nil {
+ errOut = err
+ return
+ }
+
+ if d.PrivateKey == nil {
+ log.Warning("dialer %s dialing INSECURELY %s at %s!", d, remote, raddr)
+ connOut = c
+ return
+ }
+ c2, err := newSecureConn(ctx, d.PrivateKey, c)
+ if err != nil {
+ errOut = err
+ c.Close()
+ return
+ }
+
+ connOut = c2
+ }()
+
+ select {
+ case <-ctx.Done():
+ maconn.Close()
+ return nil, ctx.Err()
+ case <-done:
+ // whew, finished.
+ }
+
+ return connOut, errOut
+}
+
+// MultiaddrProtocolsMatch returns whether two multiaddrs match in protocol stacks.
+func MultiaddrProtocolsMatch(a, b ma.Multiaddr) bool {
+ ap := a.Protocols()
+ bp := b.Protocols()
+
+ if len(ap) != len(bp) {
+ return false
+ }
+
+ for i, api := range ap {
+ if api != bp[i] {
+ return false
+ }
+ }
+
+ return true
+}
+
+// MultiaddrNetMatch returns the first Multiaddr found to match network.
+func MultiaddrNetMatch(tgt ma.Multiaddr, srcs []ma.Multiaddr) ma.Multiaddr {
+ for _, a := range srcs {
+ if MultiaddrProtocolsMatch(tgt, a) {
+ return a
+ }
+ }
+ return nil
+}
+
+// NetAddress returns the first Multiaddr found for a given network.
+func NetAddress(n string, addrs []ma.Multiaddr) ma.Multiaddr {
+ for _, a := range addrs {
+ for _, p := range a.Protocols() {
+ if p.Name == n {
+ return a
+ }
+ }
+ }
+ return nil
+}
diff --git a/p2p/net2/conn/dial_test.go b/p2p/net2/conn/dial_test.go
new file mode 100644
index 000000000..bf24ab09a
--- /dev/null
+++ b/p2p/net2/conn/dial_test.go
@@ -0,0 +1,165 @@
+package conn
+
+import (
+ "io"
+ "net"
+ "testing"
+ "time"
+
+ tu "github.com/jbenet/go-ipfs/util/testutil"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+func echoListen(ctx context.Context, listener Listener) {
+ for {
+ c, err := listener.Accept()
+ if err != nil {
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+
+ if ne, ok := err.(net.Error); ok && ne.Temporary() {
+ <-time.After(time.Microsecond * 10)
+ continue
+ }
+
+ log.Debugf("echoListen: listener appears to be closing")
+ return
+ }
+
+ go echo(c.(Conn))
+ }
+}
+
+func echo(c Conn) {
+ io.Copy(c, c)
+}
+
+func setupSecureConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.PeerNetParams) {
+ return setupConn(t, ctx, true)
+}
+
+func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.PeerNetParams) {
+ return setupConn(t, ctx, false)
+}
+
+func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 tu.PeerNetParams) {
+
+ p1 = tu.RandPeerNetParamsOrFatal(t)
+ p2 = tu.RandPeerNetParamsOrFatal(t)
+ laddr := p1.Addr
+
+ key1 := p1.PrivKey
+ key2 := p2.PrivKey
+ if !secure {
+ key1 = nil
+ key2 = nil
+ }
+ l1, err := Listen(ctx, laddr, p1.ID, key1)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ d2 := &Dialer{
+ LocalPeer: p2.ID,
+ PrivateKey: key2,
+ }
+
+ var c2 Conn
+
+ done := make(chan error)
+ go func() {
+ var err error
+ c2, err = d2.Dial(ctx, p1.Addr, p1.ID)
+ if err != nil {
+ done <- err
+ }
+ close(done)
+ }()
+
+ c1, err := l1.Accept()
+ if err != nil {
+ t.Fatal("failed to accept", err)
+ }
+ if err := <-done; err != nil {
+ t.Fatal(err)
+ }
+
+ return c1.(Conn), c2, p1, p2
+}
+
+func testDialer(t *testing.T, secure bool) {
+ // t.Skip("Skipping in favor of another test")
+
+ p1 := tu.RandPeerNetParamsOrFatal(t)
+ p2 := tu.RandPeerNetParamsOrFatal(t)
+
+ key1 := p1.PrivKey
+ key2 := p2.PrivKey
+ if !secure {
+ key1 = nil
+ key2 = nil
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ d2 := &Dialer{
+ LocalPeer: p2.ID,
+ PrivateKey: key2,
+ }
+
+ go echoListen(ctx, l1)
+
+ c, err := d2.Dial(ctx, p1.Addr, p1.ID)
+ if err != nil {
+ t.Fatal("error dialing peer", err)
+ }
+
+ // fmt.Println("sending")
+ c.WriteMsg([]byte("beep"))
+ c.WriteMsg([]byte("boop"))
+
+ out, err := c.ReadMsg()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // fmt.Println("recving", string(out))
+ data := string(out)
+ if data != "beep" {
+ t.Error("unexpected conn output", data)
+ }
+
+ out, err = c.ReadMsg()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ data = string(out)
+ if string(out) != "boop" {
+ t.Error("unexpected conn output", data)
+ }
+
+ // fmt.Println("closing")
+ c.Close()
+ l1.Close()
+ cancel()
+}
+
+func TestDialerInsecure(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+ testDialer(t, false)
+}
+
+func TestDialerSecure(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+ testDialer(t, true)
+}
diff --git a/p2p/net2/conn/interface.go b/p2p/net2/conn/interface.go
new file mode 100644
index 000000000..1601da1aa
--- /dev/null
+++ b/p2p/net2/conn/interface.go
@@ -0,0 +1,84 @@
+package conn
+
+import (
+ "io"
+ "net"
+ "time"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ u "github.com/jbenet/go-ipfs/util"
+
+ msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+// Map maps Keys (Peer.IDs) to Connections.
+type Map map[u.Key]Conn
+
+type PeerConn interface {
+ // LocalPeer (this side) ID, PrivateKey, and Address
+ LocalPeer() peer.ID
+ LocalPrivateKey() ic.PrivKey
+ LocalMultiaddr() ma.Multiaddr
+
+ // RemotePeer ID, PublicKey, and Address
+ RemotePeer() peer.ID
+ RemotePublicKey() ic.PubKey
+ RemoteMultiaddr() ma.Multiaddr
+}
+
+// Conn is a generic message-based Peer-to-Peer connection.
+type Conn interface {
+ PeerConn
+
+ // ID is an identifier unique to this connection.
+ ID() string
+
+ // can't just say "net.Conn" cause we have duplicate methods.
+ LocalAddr() net.Addr
+ RemoteAddr() net.Addr
+ SetDeadline(t time.Time) error
+ SetReadDeadline(t time.Time) error
+ SetWriteDeadline(t time.Time) error
+
+ msgio.Reader
+ msgio.Writer
+ io.Closer
+}
+
+// Dialer is an object that can open connections. We could have a "convenience"
+// Dial function as before, but it would have many arguments, as dialing is
+// no longer simple (need a peerstore, a local peer, a context, a network, etc)
+type Dialer struct {
+
+ // LocalPeer is the identity of the local Peer.
+ LocalPeer peer.ID
+
+ // LocalAddrs is a set of local addresses to use.
+ LocalAddrs []ma.Multiaddr
+
+ // PrivateKey used to initialize a secure connection.
+ // Warning: if PrivateKey is nil, connection will not be secured.
+ PrivateKey ic.PrivKey
+}
+
+// Listener is an object that can accept connections. It matches net.Listener
+type Listener interface {
+
+ // Accept waits for and returns the next connection to the listener.
+ Accept() (net.Conn, error)
+
+ // Addr is the local address
+ Addr() net.Addr
+
+ // Multiaddr is the local multiaddr address
+ Multiaddr() ma.Multiaddr
+
+ // LocalPeer is the identity of the local Peer.
+ LocalPeer() peer.ID
+
+ // Close closes the listener.
+ // Any blocked Accept operations will be unblocked and return errors.
+ Close() error
+}
diff --git a/p2p/net2/conn/listen.go b/p2p/net2/conn/listen.go
new file mode 100644
index 000000000..dd6af24ba
--- /dev/null
+++ b/p2p/net2/conn/listen.go
@@ -0,0 +1,115 @@
+package conn
+
+import (
+ "fmt"
+ "net"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+ manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+)
+
+// listener is an object that can accept connections. It implements Listener
+type listener struct {
+ manet.Listener
+
+ maddr ma.Multiaddr // Local multiaddr to listen on
+ local peer.ID // LocalPeer is the identity of the local Peer
+ privk ic.PrivKey // private key to use to initialize secure conns
+
+ cg ctxgroup.ContextGroup
+}
+
+func (l *listener) teardown() error {
+ defer log.Debugf("listener closed: %s %s", l.local, l.maddr)
+ return l.Listener.Close()
+}
+
+func (l *listener) Close() error {
+ log.Debugf("listener closing: %s %s", l.local, l.maddr)
+ return l.cg.Close()
+}
+
+func (l *listener) String() string {
+ return fmt.Sprintf("", l.local, l.maddr)
+}
+
+// Accept waits for and returns the next connection to the listener.
+// Note that unfortunately this
+func (l *listener) Accept() (net.Conn, error) {
+
+ // listeners dont have contexts. given changes dont make sense here anymore
+ // note that the parent of listener will Close, which will interrupt all io.
+ // Contexts and io don't mix.
+ ctx := context.Background()
+
+ maconn, err := l.Listener.Accept()
+ if err != nil {
+ return nil, err
+ }
+
+ c, err := newSingleConn(ctx, l.local, "", maconn)
+ if err != nil {
+ return nil, fmt.Errorf("Error accepting connection: %v", err)
+ }
+
+ if l.privk == nil {
+ log.Warning("listener %s listening INSECURELY!", l)
+ return c, nil
+ }
+ sc, err := newSecureConn(ctx, l.privk, c)
+ if err != nil {
+ return nil, fmt.Errorf("Error securing connection: %v", err)
+ }
+ return sc, nil
+}
+
+func (l *listener) Addr() net.Addr {
+ return l.Listener.Addr()
+}
+
+// Multiaddr is the identity of the local Peer.
+func (l *listener) Multiaddr() ma.Multiaddr {
+ return l.maddr
+}
+
+// LocalPeer is the identity of the local Peer.
+func (l *listener) LocalPeer() peer.ID {
+ return l.local
+}
+
+func (l *listener) Loggable() map[string]interface{} {
+ return map[string]interface{}{
+ "listener": map[string]interface{}{
+ "peer": l.LocalPeer(),
+ "address": l.Multiaddr(),
+ "secure": (l.privk != nil),
+ },
+ }
+}
+
+// Listen listens on the particular multiaddr, with given peer and peerstore.
+func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
+
+ ml, err := manet.Listen(addr)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to listen on %s: %s", addr, err)
+ }
+
+ l := &listener{
+ Listener: ml,
+ maddr: addr,
+ local: local,
+ privk: sk,
+ cg: ctxgroup.WithContext(ctx),
+ }
+ l.cg.SetTeardown(l.teardown)
+
+ log.Infof("swarm listening on %s", l.Multiaddr())
+ log.Event(ctx, "swarmListen", l)
+ return l, nil
+}
diff --git a/p2p/net2/conn/secure_conn.go b/p2p/net2/conn/secure_conn.go
new file mode 100644
index 000000000..6d8cca6d5
--- /dev/null
+++ b/p2p/net2/conn/secure_conn.go
@@ -0,0 +1,154 @@
+package conn
+
+import (
+ "net"
+ "time"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ secio "github.com/jbenet/go-ipfs/p2p/crypto/secio"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ errors "github.com/jbenet/go-ipfs/util/debugerror"
+)
+
+// secureConn wraps another Conn object with an encrypted channel.
+type secureConn struct {
+
+ // the wrapped conn
+ insecure Conn
+
+ // secure io (wrapping insecure)
+ secure msgio.ReadWriteCloser
+
+ // secure Session
+ session secio.Session
+}
+
+// newConn constructs a new connection
+func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, error) {
+
+ if insecure == nil {
+ return nil, errors.New("insecure is nil")
+ }
+ if insecure.LocalPeer() == "" {
+ return nil, errors.New("insecure.LocalPeer() is nil")
+ }
+ if sk == nil {
+ panic("way")
+ return nil, errors.New("private key is nil")
+ }
+
+ // NewSession performs the secure handshake, which takes multiple RTT
+ sessgen := secio.SessionGenerator{LocalID: insecure.LocalPeer(), PrivateKey: sk}
+ session, err := sessgen.NewSession(ctx, insecure)
+ if err != nil {
+ return nil, err
+ }
+
+ conn := &secureConn{
+ insecure: insecure,
+ session: session,
+ secure: session.ReadWriter(),
+ }
+ log.Debugf("newSecureConn: %v to %v handshake success!", conn.LocalPeer(), conn.RemotePeer())
+ return conn, nil
+}
+
+func (c *secureConn) Close() error {
+ if err := c.secure.Close(); err != nil {
+ c.insecure.Close()
+ return err
+ }
+ return c.insecure.Close()
+}
+
+// ID is an identifier unique to this connection.
+func (c *secureConn) ID() string {
+ return ID(c)
+}
+
+func (c *secureConn) String() string {
+ return String(c, "secureConn")
+}
+
+func (c *secureConn) LocalAddr() net.Addr {
+ return c.insecure.LocalAddr()
+}
+
+func (c *secureConn) RemoteAddr() net.Addr {
+ return c.insecure.RemoteAddr()
+}
+
+func (c *secureConn) SetDeadline(t time.Time) error {
+ return c.insecure.SetDeadline(t)
+}
+
+func (c *secureConn) SetReadDeadline(t time.Time) error {
+ return c.insecure.SetReadDeadline(t)
+}
+
+func (c *secureConn) SetWriteDeadline(t time.Time) error {
+ return c.insecure.SetWriteDeadline(t)
+}
+
+// LocalMultiaddr is the Multiaddr on this side
+func (c *secureConn) LocalMultiaddr() ma.Multiaddr {
+ return c.insecure.LocalMultiaddr()
+}
+
+// RemoteMultiaddr is the Multiaddr on the remote side
+func (c *secureConn) RemoteMultiaddr() ma.Multiaddr {
+ return c.insecure.RemoteMultiaddr()
+}
+
+// LocalPeer is the Peer on this side
+func (c *secureConn) LocalPeer() peer.ID {
+ return c.session.LocalPeer()
+}
+
+// RemotePeer is the Peer on the remote side
+func (c *secureConn) RemotePeer() peer.ID {
+ return c.session.RemotePeer()
+}
+
+// LocalPrivateKey is the public key of the peer on this side
+func (c *secureConn) LocalPrivateKey() ic.PrivKey {
+ return c.session.LocalPrivateKey()
+}
+
+// RemotePubKey is the public key of the peer on the remote side
+func (c *secureConn) RemotePublicKey() ic.PubKey {
+ return c.session.RemotePublicKey()
+}
+
+// Read reads data, net.Conn style
+func (c *secureConn) Read(buf []byte) (int, error) {
+ return c.secure.Read(buf)
+}
+
+// Write writes data, net.Conn style
+func (c *secureConn) Write(buf []byte) (int, error) {
+ return c.secure.Write(buf)
+}
+
+func (c *secureConn) NextMsgLen() (int, error) {
+ return c.secure.NextMsgLen()
+}
+
+// ReadMsg reads data, net.Conn style
+func (c *secureConn) ReadMsg() ([]byte, error) {
+ return c.secure.ReadMsg()
+}
+
+// WriteMsg writes data, net.Conn style
+func (c *secureConn) WriteMsg(buf []byte) error {
+ return c.secure.WriteMsg(buf)
+}
+
+// ReleaseMsg releases a buffer
+func (c *secureConn) ReleaseMsg(m []byte) {
+ c.secure.ReleaseMsg(m)
+}
diff --git a/p2p/net2/conn/secure_conn_test.go b/p2p/net2/conn/secure_conn_test.go
new file mode 100644
index 000000000..7e364d12b
--- /dev/null
+++ b/p2p/net2/conn/secure_conn_test.go
@@ -0,0 +1,199 @@
+package conn
+
+import (
+ "bytes"
+ "os"
+ "runtime"
+ "sync"
+ "testing"
+ "time"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+func upgradeToSecureConn(t *testing.T, ctx context.Context, sk ic.PrivKey, c Conn) (Conn, error) {
+ if c, ok := c.(*secureConn); ok {
+ return c, nil
+ }
+
+ // shouldn't happen, because dial + listen already return secure conns.
+ s, err := newSecureConn(ctx, sk, c)
+ if err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+func secureHandshake(t *testing.T, ctx context.Context, sk ic.PrivKey, c Conn, done chan error) {
+ _, err := upgradeToSecureConn(t, ctx, sk, c)
+ done <- err
+}
+
+func TestSecureSimple(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+
+ numMsgs := 100
+ if testing.Short() {
+ numMsgs = 10
+ }
+
+ ctx := context.Background()
+ c1, c2, p1, p2 := setupSingleConn(t, ctx)
+
+ done := make(chan error)
+ go secureHandshake(t, ctx, p1.PrivKey, c1, done)
+ go secureHandshake(t, ctx, p2.PrivKey, c2, done)
+
+ for i := 0; i < 2; i++ {
+ if err := <-done; err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < numMsgs; i++ {
+ testOneSendRecv(t, c1, c2)
+ testOneSendRecv(t, c2, c1)
+ }
+
+ c1.Close()
+ c2.Close()
+}
+
+func TestSecureClose(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+
+ ctx := context.Background()
+ c1, c2, p1, p2 := setupSingleConn(t, ctx)
+
+ done := make(chan error)
+ go secureHandshake(t, ctx, p1.PrivKey, c1, done)
+ go secureHandshake(t, ctx, p2.PrivKey, c2, done)
+
+ for i := 0; i < 2; i++ {
+ if err := <-done; err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ testOneSendRecv(t, c1, c2)
+
+ c1.Close()
+ testNotOneSendRecv(t, c1, c2)
+
+ c2.Close()
+ testNotOneSendRecv(t, c1, c2)
+ testNotOneSendRecv(t, c2, c1)
+
+}
+
+func TestSecureCancelHandshake(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+
+ ctx, cancel := context.WithCancel(context.Background())
+ c1, c2, p1, p2 := setupSingleConn(t, ctx)
+
+ done := make(chan error)
+ go secureHandshake(t, ctx, p1.PrivKey, c1, done)
+ <-time.After(time.Millisecond)
+ cancel() // cancel ctx
+ go secureHandshake(t, ctx, p2.PrivKey, c2, done)
+
+ for i := 0; i < 2; i++ {
+ if err := <-done; err == nil {
+ t.Error("cancel should've errored out")
+ }
+ }
+}
+
+func TestSecureHandshakeFailsWithWrongKeys(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ c1, c2, p1, p2 := setupSingleConn(t, ctx)
+
+ done := make(chan error)
+ go secureHandshake(t, ctx, p2.PrivKey, c1, done)
+ go secureHandshake(t, ctx, p1.PrivKey, c2, done)
+
+ for i := 0; i < 2; i++ {
+ if err := <-done; err == nil {
+ t.Fatal("wrong keys should've errored out.")
+ }
+ }
+}
+
+func TestSecureCloseLeak(t *testing.T) {
+ // t.Skip("Skipping in favor of another test")
+
+ if testing.Short() {
+ t.SkipNow()
+ }
+ if os.Getenv("TRAVIS") == "true" {
+ t.Skip("this doesn't work well on travis")
+ }
+
+ runPair := func(c1, c2 Conn, num int) {
+ log.Debugf("runPair %d", num)
+
+ for i := 0; i < num; i++ {
+ log.Debugf("runPair iteration %d", i)
+ b1 := []byte("beep")
+ c1.WriteMsg(b1)
+ b2, err := c2.ReadMsg()
+ if err != nil {
+ panic(err)
+ }
+ if !bytes.Equal(b1, b2) {
+ panic("bytes not equal")
+ }
+
+ b2 = []byte("beep")
+ c2.WriteMsg(b2)
+ b1, err = c1.ReadMsg()
+ if err != nil {
+ panic(err)
+ }
+ if !bytes.Equal(b1, b2) {
+ panic("bytes not equal")
+ }
+
+ <-time.After(time.Microsecond * 5)
+ }
+ }
+
+ var cons = 5
+ var msgs = 50
+ log.Debugf("Running %d connections * %d msgs.\n", cons, msgs)
+
+ var wg sync.WaitGroup
+ for i := 0; i < cons; i++ {
+ wg.Add(1)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ c1, c2, _, _ := setupSecureConn(t, ctx)
+ go func(c1, c2 Conn) {
+
+ defer func() {
+ c1.Close()
+ c2.Close()
+ cancel()
+ wg.Done()
+ }()
+
+ runPair(c1, c2, msgs)
+ }(c1, c2)
+ }
+
+ log.Debugf("Waiting...\n")
+ wg.Wait()
+ // done!
+
+ <-time.After(time.Millisecond * 150)
+ if runtime.NumGoroutine() > 20 {
+ // panic("uncomment me to debug")
+ t.Fatal("leaking goroutines:", runtime.NumGoroutine())
+ }
+}
diff --git a/p2p/net2/interface.go b/p2p/net2/interface.go
new file mode 100644
index 000000000..b7f557ec0
--- /dev/null
+++ b/p2p/net2/interface.go
@@ -0,0 +1,133 @@
+package net
+
+import (
+ "io"
+
+ conn "github.com/jbenet/go-ipfs/p2p/net2/conn"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+// MessageSizeMax is a soft (recommended) maximum for network messages.
+// One can write more, as the interface is a stream. But it is useful
+// to bunch it up into multiple read/writes when the whole message is
+// a single, large serialized object.
+const MessageSizeMax = 2 << 22 // 4MB
+
+// Stream represents a bidirectional channel between two agents in
+// the IPFS network. "agent" is as granular as desired, potentially
+// being a "request -> reply" pair, or whole protocols.
+// Streams are backed by SPDY streams underneath the hood.
+type Stream interface {
+ io.Reader
+ io.Writer
+ io.Closer
+
+ // Conn returns the connection this stream is part of.
+ Conn() Conn
+}
+
+// StreamHandler is the type of function used to listen for
+// streams opened by the remote side.
+type StreamHandler func(Stream)
+
+// Conn is a connection to a remote peer. It multiplexes streams.
+// Usually there is no need to use a Conn directly, but it may
+// be useful to get information about the peer on the other side:
+// stream.Conn().RemotePeer()
+type Conn interface {
+ conn.PeerConn
+
+ // NewStream constructs a new Stream over this conn.
+ NewStream() (Stream, error)
+}
+
+// ConnHandler is the type of function used to listen for
+// connections opened by the remote side.
+type ConnHandler func(Conn)
+
+// Network is the interface used to connect to the outside world.
+// It dials and listens for connections. it uses a Swarm to pool
+// connnections (see swarm pkg, and peerstream.Swarm). Connections
+// are encrypted with a TLS-like protocol.
+type Network interface {
+ Dialer
+ io.Closer
+
+ // SetStreamHandler sets the handler for new streams opened by the
+ // remote side. This operation is threadsafe.
+ SetStreamHandler(StreamHandler)
+
+ // SetConnHandler sets the handler for new connections opened by the
+ // remote side. This operation is threadsafe.
+ SetConnHandler(ConnHandler)
+
+ // NewStream returns a new stream to given peer p.
+ // If there is no connection to p, attempts to create one.
+ NewStream(peer.ID) (Stream, error)
+
+ // ListenAddresses returns a list of addresses at which this network listens.
+ ListenAddresses() []ma.Multiaddr
+
+ // InterfaceListenAddresses returns a list of addresses at which this network
+ // listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
+ // use the known local interfaces.
+ InterfaceListenAddresses() ([]ma.Multiaddr, error)
+
+ // CtxGroup returns the network's contextGroup
+ CtxGroup() ctxgroup.ContextGroup
+}
+
+// Dialer represents a service that can dial out to peers
+// (this is usually just a Network, but other services may not need the whole
+// stack, and thus it becomes easier to mock)
+type Dialer interface {
+
+ // Peerstore returns the internal peerstore
+ // This is useful to tell the dialer about a new address for a peer.
+ // Or use one of the public keys found out over the network.
+ Peerstore() peer.Peerstore
+
+ // LocalPeer returns the local peer associated with this network
+ LocalPeer() peer.ID
+
+ // DialPeer establishes a connection to a given peer
+ DialPeer(context.Context, peer.ID) (Conn, error)
+
+ // ClosePeer closes the connection to a given peer
+ ClosePeer(peer.ID) error
+
+ // Connectedness returns a state signaling connection capabilities
+ Connectedness(peer.ID) Connectedness
+
+ // Peers returns the peers connected
+ Peers() []peer.ID
+
+ // Conns returns the connections in this Netowrk
+ Conns() []Conn
+
+ // ConnsToPeer returns the connections in this Netowrk for given peer.
+ ConnsToPeer(p peer.ID) []Conn
+}
+
+// Connectedness signals the capacity for a connection with a given node.
+// It is used to signal to services and other peers whether a node is reachable.
+type Connectedness int
+
+const (
+ // NotConnected means no connection to peer, and no extra information (default)
+ NotConnected Connectedness = iota
+
+ // Connected means has an open, live connection to peer
+ Connected
+
+ // CanConnect means recently connected to peer, terminated gracefully
+ CanConnect
+
+ // CannotConnect means recently attempted connecting but failed to connect.
+ // (should signal "made effort, failed")
+ CannotConnect
+)
diff --git a/p2p/net2/mock/interface.go b/p2p/net2/mock/interface.go
new file mode 100644
index 000000000..64aa4b14f
--- /dev/null
+++ b/p2p/net2/mock/interface.go
@@ -0,0 +1,98 @@
+// Package mocknet provides a mock net.Network to test with.
+//
+// - a Mocknet has many inet.Networks
+// - a Mocknet has many Links
+// - a Link joins two inet.Networks
+// - inet.Conns and inet.Streams are created by inet.Networks
+package mocknet
+
+import (
+ "io"
+ "time"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+type Mocknet interface {
+
+ // GenPeer generates a peer and its inet.Network in the Mocknet
+ GenPeer() (inet.Network, error)
+
+ // AddPeer adds an existing peer. we need both a privkey and addr.
+ // ID is derived from PrivKey
+ AddPeer(ic.PrivKey, ma.Multiaddr) (inet.Network, error)
+
+ // retrieve things (with randomized iteration order)
+ Peers() []peer.ID
+ Net(peer.ID) inet.Network
+ Nets() []inet.Network
+ Links() LinkMap
+ LinksBetweenPeers(a, b peer.ID) []Link
+ LinksBetweenNets(a, b inet.Network) []Link
+
+ // Links are the **ability to connect**.
+ // think of Links as the physical medium.
+ // For p1 and p2 to connect, a link must exist between them.
+ // (this makes it possible to test dial failures, and
+ // things like relaying traffic)
+ LinkPeers(peer.ID, peer.ID) (Link, error)
+ LinkNets(inet.Network, inet.Network) (Link, error)
+ Unlink(Link) error
+ UnlinkPeers(peer.ID, peer.ID) error
+ UnlinkNets(inet.Network, inet.Network) error
+
+ // LinkDefaults are the default options that govern links
+ // if they do not have thier own option set.
+ SetLinkDefaults(LinkOptions)
+ LinkDefaults() LinkOptions
+
+ // Connections are the usual. Connecting means Dialing.
+ // **to succeed, peers must be linked beforehand**
+ ConnectPeers(peer.ID, peer.ID) (inet.Conn, error)
+ ConnectNets(inet.Network, inet.Network) (inet.Conn, error)
+ DisconnectPeers(peer.ID, peer.ID) error
+ DisconnectNets(inet.Network, inet.Network) error
+}
+
+// LinkOptions are used to change aspects of the links.
+// Sorry but they dont work yet :(
+type LinkOptions struct {
+ Latency time.Duration
+ Bandwidth int // in bytes-per-second
+ // we can make these values distributions down the road.
+}
+
+// Link represents the **possibility** of a connection between
+// two peers. Think of it like physical network links. Without
+// them, the peers can try and try but they won't be able to
+// connect. This allows constructing topologies where specific
+// nodes cannot talk to each other directly. :)
+type Link interface {
+ Networks() []inet.Network
+ Peers() []peer.ID
+
+ SetOptions(LinkOptions)
+ Options() LinkOptions
+
+ // Metrics() Metrics
+}
+
+// LinkMap is a 3D map to give us an easy way to track links.
+// (wow, much map. so data structure. how compose. ahhh pointer)
+type LinkMap map[string]map[string]map[Link]struct{}
+
+// Printer lets you inspect things :)
+type Printer interface {
+ // MocknetLinks shows the entire Mocknet's link table :)
+ MocknetLinks(mn Mocknet)
+ NetworkConns(ni inet.Network)
+}
+
+// PrinterTo returns a Printer ready to write to w.
+func PrinterTo(w io.Writer) Printer {
+ return &printer{w}
+}
diff --git a/p2p/net2/mock/mock.go b/p2p/net2/mock/mock.go
new file mode 100644
index 000000000..5403358cb
--- /dev/null
+++ b/p2p/net2/mock/mock.go
@@ -0,0 +1,63 @@
+package mocknet
+
+import (
+ eventlog "github.com/jbenet/go-ipfs/util/eventlog"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+var log = eventlog.Logger("mocknet")
+
+// WithNPeers constructs a Mocknet with N peers.
+func WithNPeers(ctx context.Context, n int) (Mocknet, error) {
+ m := New(ctx)
+ for i := 0; i < n; i++ {
+ if _, err := m.GenPeer(); err != nil {
+ return nil, err
+ }
+ }
+ return m, nil
+}
+
+// FullMeshLinked constructs a Mocknet with full mesh of Links.
+// This means that all the peers **can** connect to each other
+// (not that they already are connected. you can use m.ConnectAll())
+func FullMeshLinked(ctx context.Context, n int) (Mocknet, error) {
+ m, err := WithNPeers(ctx, n)
+ if err != nil {
+ return nil, err
+ }
+
+ nets := m.Nets()
+ for _, n1 := range nets {
+ for _, n2 := range nets {
+ // yes, even self.
+ if _, err := m.LinkNets(n1, n2); err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ return m, nil
+}
+
+// FullMeshConnected constructs a Mocknet with full mesh of Connections.
+// This means that all the peers have dialed and are ready to talk to
+// each other.
+func FullMeshConnected(ctx context.Context, n int) (Mocknet, error) {
+ m, err := FullMeshLinked(ctx, n)
+ if err != nil {
+ return nil, err
+ }
+
+ nets := m.Nets()
+ for _, n1 := range nets {
+ for _, n2 := range nets {
+ if _, err := m.ConnectNets(n1, n2); err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ return m, nil
+}
diff --git a/p2p/net2/mock/mock_conn.go b/p2p/net2/mock/mock_conn.go
new file mode 100644
index 000000000..f1535ff91
--- /dev/null
+++ b/p2p/net2/mock/mock_conn.go
@@ -0,0 +1,120 @@
+package mocknet
+
+import (
+ "container/list"
+ "sync"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+// conn represents one side's perspective of a
+// live connection between two peers.
+// it goes over a particular link.
+type conn struct {
+ local peer.ID
+ remote peer.ID
+
+ localAddr ma.Multiaddr
+ remoteAddr ma.Multiaddr
+
+ localPrivKey ic.PrivKey
+ remotePubKey ic.PubKey
+
+ net *peernet
+ link *link
+ rconn *conn // counterpart
+ streams list.List
+
+ sync.RWMutex
+}
+
+func (c *conn) Close() error {
+ for _, s := range c.allStreams() {
+ s.Close()
+ }
+ c.net.removeConn(c)
+ return nil
+}
+
+func (c *conn) addStream(s *stream) {
+ c.Lock()
+ s.conn = c
+ c.streams.PushBack(s)
+ c.Unlock()
+}
+
+func (c *conn) removeStream(s *stream) {
+ c.Lock()
+ defer c.Unlock()
+ for e := c.streams.Front(); e != nil; e = e.Next() {
+ if s == e.Value {
+ c.streams.Remove(e)
+ return
+ }
+ }
+}
+
+func (c *conn) allStreams() []inet.Stream {
+ c.RLock()
+ defer c.RUnlock()
+
+ strs := make([]inet.Stream, 0, c.streams.Len())
+ for e := c.streams.Front(); e != nil; e = e.Next() {
+ s := e.Value.(*stream)
+ strs = append(strs, s)
+ }
+ return strs
+}
+
+func (c *conn) remoteOpenedStream(s *stream) {
+ c.addStream(s)
+ c.net.handleNewStream(s)
+}
+
+func (c *conn) openStream() *stream {
+ sl, sr := c.link.newStreamPair()
+ c.addStream(sl)
+ c.rconn.remoteOpenedStream(sr)
+ return sl
+}
+
+func (c *conn) NewStream() (inet.Stream, error) {
+ log.Debugf("Conn.NewStreamWithProtocol: %s --> %s", c.local, c.remote)
+
+ s := c.openStream()
+ return s, nil
+}
+
+// LocalMultiaddr is the Multiaddr on this side
+func (c *conn) LocalMultiaddr() ma.Multiaddr {
+ return c.localAddr
+}
+
+// LocalPeer is the Peer on our side of the connection
+func (c *conn) LocalPeer() peer.ID {
+ return c.local
+}
+
+// LocalPrivateKey is the private key of the peer on our side.
+func (c *conn) LocalPrivateKey() ic.PrivKey {
+ return c.localPrivKey
+}
+
+// RemoteMultiaddr is the Multiaddr on the remote side
+func (c *conn) RemoteMultiaddr() ma.Multiaddr {
+ return c.remoteAddr
+}
+
+// RemotePeer is the Peer on the remote side
+func (c *conn) RemotePeer() peer.ID {
+ return c.remote
+}
+
+// RemotePublicKey is the private key of the peer on our side.
+func (c *conn) RemotePublicKey() ic.PubKey {
+ return c.remotePubKey
+}
diff --git a/p2p/net2/mock/mock_link.go b/p2p/net2/mock/mock_link.go
new file mode 100644
index 000000000..352c35601
--- /dev/null
+++ b/p2p/net2/mock/mock_link.go
@@ -0,0 +1,93 @@
+package mocknet
+
+import (
+ "io"
+ "sync"
+
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+)
+
+// link implements mocknet.Link
+// and, for simplicity, inet.Conn
+type link struct {
+ mock *mocknet
+ nets []*peernet
+ opts LinkOptions
+
+ // this could have addresses on both sides.
+
+ sync.RWMutex
+}
+
+func newLink(mn *mocknet, opts LinkOptions) *link {
+ return &link{mock: mn, opts: opts}
+}
+
+func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
+ l.RLock()
+ defer l.RUnlock()
+
+ mkconn := func(ln, rn *peernet) *conn {
+ c := &conn{net: ln, link: l}
+ c.local = ln.peer
+ c.remote = rn.peer
+
+ c.localAddr = ln.ps.Addresses(ln.peer)[0]
+ c.remoteAddr = rn.ps.Addresses(rn.peer)[0]
+
+ c.localPrivKey = ln.ps.PrivKey(ln.peer)
+ c.remotePubKey = rn.ps.PubKey(rn.peer)
+
+ return c
+ }
+
+ c1 := mkconn(l.nets[0], l.nets[1])
+ c2 := mkconn(l.nets[1], l.nets[0])
+ c1.rconn = c2
+ c2.rconn = c1
+
+ if dialer == c1.net {
+ return c1, c2
+ }
+ return c2, c1
+}
+
+func (l *link) newStreamPair() (*stream, *stream) {
+ r1, w1 := io.Pipe()
+ r2, w2 := io.Pipe()
+
+ s1 := &stream{Reader: r1, Writer: w2}
+ s2 := &stream{Reader: r2, Writer: w1}
+ return s1, s2
+}
+
+func (l *link) Networks() []inet.Network {
+ l.RLock()
+ defer l.RUnlock()
+
+ cp := make([]inet.Network, len(l.nets))
+ for i, n := range l.nets {
+ cp[i] = n
+ }
+ return cp
+}
+
+func (l *link) Peers() []peer.ID {
+ l.RLock()
+ defer l.RUnlock()
+
+ cp := make([]peer.ID, len(l.nets))
+ for i, n := range l.nets {
+ cp[i] = n.peer
+ }
+ return cp
+}
+
+func (l *link) SetOptions(o LinkOptions) {
+ l.opts = o
+}
+
+func (l *link) Options() LinkOptions {
+ return l.opts
+}
diff --git a/p2p/net2/mock/mock_net.go b/p2p/net2/mock/mock_net.go
new file mode 100644
index 000000000..9e6208ca3
--- /dev/null
+++ b/p2p/net2/mock/mock_net.go
@@ -0,0 +1,322 @@
+package mocknet
+
+import (
+ "fmt"
+ "sync"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ testutil "github.com/jbenet/go-ipfs/util/testutil"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+// mocknet implements mocknet.Mocknet
+type mocknet struct {
+ // must map on peer.ID (instead of peer.ID) because
+ // each inet.Network has different peerstore
+ nets map[peer.ID]*peernet
+
+ // links make it possible to connect two peers.
+ // think of links as the physical medium.
+ // usually only one, but there could be multiple
+ // **links are shared between peers**
+ links map[peer.ID]map[peer.ID]map[*link]struct{}
+
+ linkDefaults LinkOptions
+
+ cg ctxgroup.ContextGroup // for Context closing
+ sync.RWMutex
+}
+
+func New(ctx context.Context) Mocknet {
+ return &mocknet{
+ nets: map[peer.ID]*peernet{},
+ links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
+ cg: ctxgroup.WithContext(ctx),
+ }
+}
+
+func (mn *mocknet) GenPeer() (inet.Network, error) {
+ sk, _, err := testutil.RandKeyPair(512)
+ if err != nil {
+ return nil, err
+ }
+
+ a := testutil.RandLocalTCPAddress()
+
+ n, err := mn.AddPeer(sk, a)
+ if err != nil {
+ return nil, err
+ }
+
+ return n, nil
+}
+
+func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (inet.Network, error) {
+ n, err := newPeernet(mn.cg.Context(), mn, k, a)
+ if err != nil {
+ return nil, err
+ }
+
+ // make sure to add listening address!
+ // this makes debugging things simpler as remembering to register
+ // an address may cause unexpected failure.
+ n.Peerstore().AddAddress(n.LocalPeer(), a)
+ log.Debugf("mocknet added listen addr for peer: %s -- %s", n.LocalPeer(), a)
+
+ mn.cg.AddChildGroup(n.cg)
+
+ mn.Lock()
+ mn.nets[n.peer] = n
+ mn.Unlock()
+ return n, nil
+}
+
+func (mn *mocknet) Peers() []peer.ID {
+ mn.RLock()
+ defer mn.RUnlock()
+
+ cp := make([]peer.ID, 0, len(mn.nets))
+ for _, n := range mn.nets {
+ cp = append(cp, n.peer)
+ }
+ return cp
+}
+
+func (mn *mocknet) Net(pid peer.ID) inet.Network {
+ mn.RLock()
+ defer mn.RUnlock()
+
+ for _, n := range mn.nets {
+ if n.peer == pid {
+ return n
+ }
+ }
+ return nil
+}
+
+func (mn *mocknet) Nets() []inet.Network {
+ mn.RLock()
+ defer mn.RUnlock()
+
+ cp := make([]inet.Network, 0, len(mn.nets))
+ for _, n := range mn.nets {
+ cp = append(cp, n)
+ }
+ return cp
+}
+
+// Links returns a copy of the internal link state map.
+// (wow, much map. so data structure. how compose. ahhh pointer)
+func (mn *mocknet) Links() LinkMap {
+ mn.RLock()
+ defer mn.RUnlock()
+
+ links := map[string]map[string]map[Link]struct{}{}
+ for p1, lm := range mn.links {
+ sp1 := string(p1)
+ links[sp1] = map[string]map[Link]struct{}{}
+ for p2, ls := range lm {
+ sp2 := string(p2)
+ links[sp1][sp2] = map[Link]struct{}{}
+ for l := range ls {
+ links[sp1][sp2][l] = struct{}{}
+ }
+ }
+ }
+ return links
+}
+
+func (mn *mocknet) LinkAll() error {
+ nets := mn.Nets()
+ for _, n1 := range nets {
+ for _, n2 := range nets {
+ if _, err := mn.LinkNets(n1, n2); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (mn *mocknet) LinkPeers(p1, p2 peer.ID) (Link, error) {
+ mn.RLock()
+ n1 := mn.nets[p1]
+ n2 := mn.nets[p2]
+ mn.RUnlock()
+
+ if n1 == nil {
+ return nil, fmt.Errorf("network for p1 not in mocknet")
+ }
+
+ if n2 == nil {
+ return nil, fmt.Errorf("network for p2 not in mocknet")
+ }
+
+ return mn.LinkNets(n1, n2)
+}
+
+func (mn *mocknet) validate(n inet.Network) (*peernet, error) {
+ // WARNING: assumes locks acquired
+
+ nr, ok := n.(*peernet)
+ if !ok {
+ return nil, fmt.Errorf("Network not supported (use mock package nets only)")
+ }
+
+ if _, found := mn.nets[nr.peer]; !found {
+ return nil, fmt.Errorf("Network not on mocknet. is it from another mocknet?")
+ }
+
+ return nr, nil
+}
+
+func (mn *mocknet) LinkNets(n1, n2 inet.Network) (Link, error) {
+ mn.RLock()
+ n1r, err1 := mn.validate(n1)
+ n2r, err2 := mn.validate(n2)
+ ld := mn.linkDefaults
+ mn.RUnlock()
+
+ if err1 != nil {
+ return nil, err1
+ }
+ if err2 != nil {
+ return nil, err2
+ }
+
+ l := newLink(mn, ld)
+ l.nets = append(l.nets, n1r, n2r)
+ mn.addLink(l)
+ return l, nil
+}
+
+func (mn *mocknet) Unlink(l2 Link) error {
+
+ l, ok := l2.(*link)
+ if !ok {
+ return fmt.Errorf("only links from mocknet are supported")
+ }
+
+ mn.removeLink(l)
+ return nil
+}
+
+func (mn *mocknet) UnlinkPeers(p1, p2 peer.ID) error {
+ ls := mn.LinksBetweenPeers(p1, p2)
+ if ls == nil {
+ return fmt.Errorf("no link between p1 and p2")
+ }
+
+ for _, l := range ls {
+ if err := mn.Unlink(l); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (mn *mocknet) UnlinkNets(n1, n2 inet.Network) error {
+ return mn.UnlinkPeers(n1.LocalPeer(), n2.LocalPeer())
+}
+
+// get from the links map. and lazily contruct.
+func (mn *mocknet) linksMapGet(p1, p2 peer.ID) *map[*link]struct{} {
+
+ l1, found := mn.links[p1]
+ if !found {
+ mn.links[p1] = map[peer.ID]map[*link]struct{}{}
+ l1 = mn.links[p1] // so we make sure it's there.
+ }
+
+ l2, found := l1[p2]
+ if !found {
+ m := map[*link]struct{}{}
+ l1[p2] = m
+ l2 = l1[p2]
+ }
+
+ return &l2
+}
+
+func (mn *mocknet) addLink(l *link) {
+ mn.Lock()
+ defer mn.Unlock()
+
+ n1, n2 := l.nets[0], l.nets[1]
+ (*mn.linksMapGet(n1.peer, n2.peer))[l] = struct{}{}
+ (*mn.linksMapGet(n2.peer, n1.peer))[l] = struct{}{}
+}
+
+func (mn *mocknet) removeLink(l *link) {
+ mn.Lock()
+ defer mn.Unlock()
+
+ n1, n2 := l.nets[0], l.nets[1]
+ delete(*mn.linksMapGet(n1.peer, n2.peer), l)
+ delete(*mn.linksMapGet(n2.peer, n1.peer), l)
+}
+
+func (mn *mocknet) ConnectAll() error {
+ nets := mn.Nets()
+ for _, n1 := range nets {
+ for _, n2 := range nets {
+ if n1 == n2 {
+ continue
+ }
+
+ if _, err := mn.ConnectNets(n1, n2); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (mn *mocknet) ConnectPeers(a, b peer.ID) (inet.Conn, error) {
+ return mn.Net(a).DialPeer(mn.cg.Context(), b)
+}
+
+func (mn *mocknet) ConnectNets(a, b inet.Network) (inet.Conn, error) {
+ return a.DialPeer(mn.cg.Context(), b.LocalPeer())
+}
+
+func (mn *mocknet) DisconnectPeers(p1, p2 peer.ID) error {
+ return mn.Net(p1).ClosePeer(p2)
+}
+
+func (mn *mocknet) DisconnectNets(n1, n2 inet.Network) error {
+ return n1.ClosePeer(n2.LocalPeer())
+}
+
+func (mn *mocknet) LinksBetweenPeers(p1, p2 peer.ID) []Link {
+ mn.RLock()
+ defer mn.RUnlock()
+
+ ls2 := *mn.linksMapGet(p1, p2)
+ cp := make([]Link, 0, len(ls2))
+ for l := range ls2 {
+ cp = append(cp, l)
+ }
+ return cp
+}
+
+func (mn *mocknet) LinksBetweenNets(n1, n2 inet.Network) []Link {
+ return mn.LinksBetweenPeers(n1.LocalPeer(), n2.LocalPeer())
+}
+
+func (mn *mocknet) SetLinkDefaults(o LinkOptions) {
+ mn.Lock()
+ mn.linkDefaults = o
+ mn.Unlock()
+}
+
+func (mn *mocknet) LinkDefaults() LinkOptions {
+ mn.RLock()
+ defer mn.RUnlock()
+ return mn.linkDefaults
+}
diff --git a/p2p/net2/mock/mock_peernet.go b/p2p/net2/mock/mock_peernet.go
new file mode 100644
index 000000000..948b83ea0
--- /dev/null
+++ b/p2p/net2/mock/mock_peernet.go
@@ -0,0 +1,353 @@
+package mocknet
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+// peernet implements inet.Network
+type peernet struct {
+ mocknet *mocknet // parent
+
+ peer peer.ID
+ ps peer.Peerstore
+
+ // conns are actual live connections between peers.
+ // many conns could run over each link.
+ // **conns are NOT shared between peers**
+ connsByPeer map[peer.ID]map[*conn]struct{}
+ connsByLink map[*link]map[*conn]struct{}
+
+ // implement inet.Network
+ streamHandler inet.StreamHandler
+ connHandler inet.ConnHandler
+
+ cg ctxgroup.ContextGroup
+ sync.RWMutex
+}
+
+// newPeernet constructs a new peernet
+func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
+ a ma.Multiaddr) (*peernet, error) {
+
+ p, err := peer.IDFromPublicKey(k.GetPublic())
+ if err != nil {
+ return nil, err
+ }
+
+ // create our own entirely, so that peers knowledge doesn't get shared
+ ps := peer.NewPeerstore()
+ ps.AddAddress(p, a)
+ ps.AddPrivKey(p, k)
+ ps.AddPubKey(p, k.GetPublic())
+
+ n := &peernet{
+ mocknet: m,
+ peer: p,
+ ps: ps,
+ cg: ctxgroup.WithContext(ctx),
+
+ connsByPeer: map[peer.ID]map[*conn]struct{}{},
+ connsByLink: map[*link]map[*conn]struct{}{},
+ }
+
+ n.cg.SetTeardown(n.teardown)
+ return n, nil
+}
+
+func (pn *peernet) teardown() error {
+
+ // close the connections
+ for _, c := range pn.allConns() {
+ c.Close()
+ }
+ return nil
+}
+
+// allConns returns all the connections between this peer and others
+func (pn *peernet) allConns() []*conn {
+ pn.RLock()
+ var cs []*conn
+ for _, csl := range pn.connsByPeer {
+ for c := range csl {
+ cs = append(cs, c)
+ }
+ }
+ pn.RUnlock()
+ return cs
+}
+
+// Close calls the ContextCloser func
+func (pn *peernet) Close() error {
+ return pn.cg.Close()
+}
+
+func (pn *peernet) Peerstore() peer.Peerstore {
+ return pn.ps
+}
+
+func (pn *peernet) String() string {
+ return fmt.Sprintf("", pn.peer, len(pn.allConns()))
+}
+
+// handleNewStream is an internal function to trigger the client's handler
+func (pn *peernet) handleNewStream(s inet.Stream) {
+ pn.RLock()
+ handler := pn.streamHandler
+ pn.RUnlock()
+ if handler != nil {
+ go handler(s)
+ }
+}
+
+// handleNewConn is an internal function to trigger the client's handler
+func (pn *peernet) handleNewConn(c inet.Conn) {
+ pn.RLock()
+ handler := pn.connHandler
+ pn.RUnlock()
+ if handler != nil {
+ go handler(c)
+ }
+}
+
+// DialPeer attempts to establish a connection to a given peer.
+// Respects the context.
+func (pn *peernet) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
+ return pn.connect(p)
+}
+
+func (pn *peernet) connect(p peer.ID) (*conn, error) {
+ // first, check if we already have live connections
+ pn.RLock()
+ cs, found := pn.connsByPeer[p]
+ pn.RUnlock()
+ if found && len(cs) > 0 {
+ for c := range cs {
+ return c, nil
+ }
+ }
+
+ log.Debugf("%s (newly) dialing %s", pn.peer, p)
+
+ // ok, must create a new connection. we need a link
+ links := pn.mocknet.LinksBetweenPeers(pn.peer, p)
+ if len(links) < 1 {
+ return nil, fmt.Errorf("%s cannot connect to %s", pn.peer, p)
+ }
+
+ // if many links found, how do we select? for now, randomly...
+ // this would be an interesting place to test logic that can measure
+ // links (network interfaces) and select properly
+ l := links[rand.Intn(len(links))]
+
+ log.Debugf("%s dialing %s openingConn", pn.peer, p)
+ // create a new connection with link
+ c := pn.openConn(p, l.(*link))
+ return c, nil
+}
+
+func (pn *peernet) openConn(r peer.ID, l *link) *conn {
+ lc, rc := l.newConnPair(pn)
+ log.Debugf("%s opening connection to %s", pn.LocalPeer(), lc.RemotePeer())
+ pn.addConn(lc)
+ rc.net.remoteOpenedConn(rc)
+ return lc
+}
+
+func (pn *peernet) remoteOpenedConn(c *conn) {
+ log.Debugf("%s accepting connection from %s", pn.LocalPeer(), c.RemotePeer())
+ pn.addConn(c)
+ pn.handleNewConn(c)
+}
+
+// addConn constructs and adds a connection
+// to given remote peer over given link
+func (pn *peernet) addConn(c *conn) {
+ pn.Lock()
+ defer pn.Unlock()
+
+ cs, found := pn.connsByPeer[c.RemotePeer()]
+ if !found {
+ cs = map[*conn]struct{}{}
+ pn.connsByPeer[c.RemotePeer()] = cs
+ }
+ pn.connsByPeer[c.RemotePeer()][c] = struct{}{}
+
+ cs, found = pn.connsByLink[c.link]
+ if !found {
+ cs = map[*conn]struct{}{}
+ pn.connsByLink[c.link] = cs
+ }
+ pn.connsByLink[c.link][c] = struct{}{}
+}
+
+// removeConn removes a given conn
+func (pn *peernet) removeConn(c *conn) {
+ pn.Lock()
+ defer pn.Unlock()
+
+ cs, found := pn.connsByLink[c.link]
+ if !found || len(cs) < 1 {
+ panic("attempting to remove a conn that doesnt exist")
+ }
+ delete(cs, c)
+
+ cs, found = pn.connsByPeer[c.remote]
+ if !found {
+ panic("attempting to remove a conn that doesnt exist")
+ }
+ delete(cs, c)
+}
+
+// CtxGroup returns the network's ContextGroup
+func (pn *peernet) CtxGroup() ctxgroup.ContextGroup {
+ return pn.cg
+}
+
+// LocalPeer the network's LocalPeer
+func (pn *peernet) LocalPeer() peer.ID {
+ return pn.peer
+}
+
+// Peers returns the connected peers
+func (pn *peernet) Peers() []peer.ID {
+ pn.RLock()
+ defer pn.RUnlock()
+
+ peers := make([]peer.ID, 0, len(pn.connsByPeer))
+ for _, cs := range pn.connsByPeer {
+ for c := range cs {
+ peers = append(peers, c.remote)
+ break
+ }
+ }
+ return peers
+}
+
+// Conns returns all the connections of this peer
+func (pn *peernet) Conns() []inet.Conn {
+ pn.RLock()
+ defer pn.RUnlock()
+
+ out := make([]inet.Conn, 0, len(pn.connsByPeer))
+ for _, cs := range pn.connsByPeer {
+ for c := range cs {
+ out = append(out, c)
+ }
+ }
+ return out
+}
+
+func (pn *peernet) ConnsToPeer(p peer.ID) []inet.Conn {
+ pn.RLock()
+ defer pn.RUnlock()
+
+ cs, found := pn.connsByPeer[p]
+ if !found || len(cs) == 0 {
+ return nil
+ }
+
+ var cs2 []inet.Conn
+ for c := range cs {
+ cs2 = append(cs2, c)
+ }
+ return cs2
+}
+
+// ClosePeer connections to peer
+func (pn *peernet) ClosePeer(p peer.ID) error {
+ pn.RLock()
+ cs, found := pn.connsByPeer[p]
+ pn.RUnlock()
+ if !found {
+ return nil
+ }
+
+ for c := range cs {
+ c.Close()
+ }
+ return nil
+}
+
+// BandwidthTotals returns the total amount of bandwidth transferred
+func (pn *peernet) BandwidthTotals() (in uint64, out uint64) {
+ // need to implement this. probably best to do it in swarm this time.
+ // need a "metrics" object
+ return 0, 0
+}
+
+// ListenAddresses returns a list of addresses at which this network listens.
+func (pn *peernet) ListenAddresses() []ma.Multiaddr {
+ return pn.Peerstore().Addresses(pn.LocalPeer())
+}
+
+// InterfaceListenAddresses returns a list of addresses at which this network
+// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
+// use the known local interfaces.
+func (pn *peernet) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
+ return pn.ListenAddresses(), nil
+}
+
+// Connectedness returns a state signaling connection capabilities
+// For now only returns Connecter || NotConnected. Expand into more later.
+func (pn *peernet) Connectedness(p peer.ID) inet.Connectedness {
+ pn.Lock()
+ defer pn.Unlock()
+
+ cs, found := pn.connsByPeer[p]
+ if found && len(cs) > 0 {
+ return inet.Connected
+ }
+ return inet.NotConnected
+}
+
+// NewStream returns a new stream to given peer p.
+// If there is no connection to p, attempts to create one.
+func (pn *peernet) NewStream(p peer.ID) (inet.Stream, error) {
+ pn.Lock()
+ cs, found := pn.connsByPeer[p]
+ if !found || len(cs) < 1 {
+ pn.Unlock()
+ return nil, fmt.Errorf("no connection to peer")
+ }
+ pn.Unlock()
+
+ // if many conns are found, how do we select? for now, randomly...
+ // this would be an interesting place to test logic that can measure
+ // links (network interfaces) and select properly
+ n := rand.Intn(len(cs))
+ var c *conn
+ for c = range cs {
+ if n == 0 {
+ break
+ }
+ n--
+ }
+
+ return c.NewStream()
+}
+
+// SetStreamHandler sets the new stream handler on the Network.
+// This operation is threadsafe.
+func (pn *peernet) SetStreamHandler(h inet.StreamHandler) {
+ pn.Lock()
+ pn.streamHandler = h
+ pn.Unlock()
+}
+
+// SetConnHandler sets the new conn handler on the Network.
+// This operation is threadsafe.
+func (pn *peernet) SetConnHandler(h inet.ConnHandler) {
+ pn.Lock()
+ pn.connHandler = h
+ pn.Unlock()
+}
diff --git a/p2p/net2/mock/mock_printer.go b/p2p/net2/mock/mock_printer.go
new file mode 100644
index 000000000..151b8d3d4
--- /dev/null
+++ b/p2p/net2/mock/mock_printer.go
@@ -0,0 +1,36 @@
+package mocknet
+
+import (
+ "fmt"
+ "io"
+
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+)
+
+// separate object so our interfaces are separate :)
+type printer struct {
+ w io.Writer
+}
+
+func (p *printer) MocknetLinks(mn Mocknet) {
+ links := mn.Links()
+
+ fmt.Fprintf(p.w, "Mocknet link map:\n")
+ for p1, lm := range links {
+ fmt.Fprintf(p.w, "\t%s linked to:\n", peer.ID(p1))
+ for p2, l := range lm {
+ fmt.Fprintf(p.w, "\t\t%s (%d links)\n", peer.ID(p2), len(l))
+ }
+ }
+ fmt.Fprintf(p.w, "\n")
+}
+
+func (p *printer) NetworkConns(ni inet.Network) {
+
+ fmt.Fprintf(p.w, "%s connected to:\n", ni.LocalPeer())
+ for _, c := range ni.Conns() {
+ fmt.Fprintf(p.w, "\t%s (addr: %s)\n", c.RemotePeer(), c.RemoteMultiaddr())
+ }
+ fmt.Fprintf(p.w, "\n")
+}
diff --git a/p2p/net2/mock/mock_stream.go b/p2p/net2/mock/mock_stream.go
new file mode 100644
index 000000000..317abbafd
--- /dev/null
+++ b/p2p/net2/mock/mock_stream.go
@@ -0,0 +1,29 @@
+package mocknet
+
+import (
+ "io"
+
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+)
+
+// stream implements inet.Stream
+type stream struct {
+ io.Reader
+ io.Writer
+ conn *conn
+}
+
+func (s *stream) Close() error {
+ s.conn.removeStream(s)
+ if r, ok := (s.Reader).(io.Closer); ok {
+ r.Close()
+ }
+ if w, ok := (s.Writer).(io.Closer); ok {
+ return w.Close()
+ }
+ return nil
+}
+
+func (s *stream) Conn() inet.Conn {
+ return s.conn
+}
diff --git a/p2p/net2/mock/mock_test.go b/p2p/net2/mock/mock_test.go
new file mode 100644
index 000000000..e492a9124
--- /dev/null
+++ b/p2p/net2/mock/mock_test.go
@@ -0,0 +1,460 @@
+package mocknet
+
+import (
+ "bytes"
+ "io"
+ "math/rand"
+ "sync"
+ "testing"
+
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ testutil "github.com/jbenet/go-ipfs/util/testutil"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+func randPeer(t *testing.T) peer.ID {
+ p, err := testutil.RandPeerID()
+ if err != nil {
+ t.Fatal(err)
+ }
+ return p
+}
+
+func TestNetworkSetup(t *testing.T) {
+
+ ctx := context.Background()
+ sk1, _, err := testutil.RandKeyPair(512)
+ if err != nil {
+ t.Fatal(t)
+ }
+ sk2, _, err := testutil.RandKeyPair(512)
+ if err != nil {
+ t.Fatal(t)
+ }
+ sk3, _, err := testutil.RandKeyPair(512)
+ if err != nil {
+ t.Fatal(t)
+ }
+ mn := New(ctx)
+ // peers := []peer.ID{p1, p2, p3}
+
+ // add peers to mock net
+
+ a1 := testutil.RandLocalTCPAddress()
+ a2 := testutil.RandLocalTCPAddress()
+ a3 := testutil.RandLocalTCPAddress()
+
+ n1, err := mn.AddPeer(sk1, a1)
+ if err != nil {
+ t.Fatal(err)
+ }
+ p1 := n1.LocalPeer()
+
+ n2, err := mn.AddPeer(sk2, a2)
+ if err != nil {
+ t.Fatal(err)
+ }
+ p2 := n2.LocalPeer()
+
+ n3, err := mn.AddPeer(sk3, a3)
+ if err != nil {
+ t.Fatal(err)
+ }
+ p3 := n3.LocalPeer()
+
+ // check peers and net
+ if mn.Net(p1) != n1 {
+ t.Error("net for p1.ID != n1")
+ }
+ if mn.Net(p2) != n2 {
+ t.Error("net for p2.ID != n1")
+ }
+ if mn.Net(p3) != n3 {
+ t.Error("net for p3.ID != n1")
+ }
+
+ // link p1<-->p2, p1<-->p1, p2<-->p3, p3<-->p2
+
+ l12, err := mn.LinkPeers(p1, p2)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !(l12.Networks()[0] == n1 && l12.Networks()[1] == n2) &&
+ !(l12.Networks()[0] == n2 && l12.Networks()[1] == n1) {
+ t.Error("l12 networks incorrect")
+ }
+
+ l11, err := mn.LinkPeers(p1, p1)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !(l11.Networks()[0] == n1 && l11.Networks()[1] == n1) {
+ t.Error("l11 networks incorrect")
+ }
+
+ l23, err := mn.LinkPeers(p2, p3)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !(l23.Networks()[0] == n2 && l23.Networks()[1] == n3) &&
+ !(l23.Networks()[0] == n3 && l23.Networks()[1] == n2) {
+ t.Error("l23 networks incorrect")
+ }
+
+ l32, err := mn.LinkPeers(p3, p2)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !(l32.Networks()[0] == n2 && l32.Networks()[1] == n3) &&
+ !(l32.Networks()[0] == n3 && l32.Networks()[1] == n2) {
+ t.Error("l32 networks incorrect")
+ }
+
+ // check things
+
+ links12 := mn.LinksBetweenPeers(p1, p2)
+ if len(links12) != 1 {
+ t.Errorf("should be 1 link bt. p1 and p2 (found %d)", len(links12))
+ }
+ if links12[0] != l12 {
+ t.Error("links 1-2 should be l12.")
+ }
+
+ links11 := mn.LinksBetweenPeers(p1, p1)
+ if len(links11) != 1 {
+ t.Errorf("should be 1 link bt. p1 and p1 (found %d)", len(links11))
+ }
+ if links11[0] != l11 {
+ t.Error("links 1-1 should be l11.")
+ }
+
+ links23 := mn.LinksBetweenPeers(p2, p3)
+ if len(links23) != 2 {
+ t.Errorf("should be 2 link bt. p2 and p3 (found %d)", len(links23))
+ }
+ if !((links23[0] == l23 && links23[1] == l32) ||
+ (links23[0] == l32 && links23[1] == l23)) {
+ t.Error("links 2-3 should be l23 and l32.")
+ }
+
+ // unlinking
+
+ if err := mn.UnlinkPeers(p2, p1); err != nil {
+ t.Error(err)
+ }
+
+ // check only one link affected:
+
+ links12 = mn.LinksBetweenPeers(p1, p2)
+ if len(links12) != 0 {
+ t.Errorf("should be 0 now...", len(links12))
+ }
+
+ links11 = mn.LinksBetweenPeers(p1, p1)
+ if len(links11) != 1 {
+ t.Errorf("should be 1 link bt. p1 and p1 (found %d)", len(links11))
+ }
+ if links11[0] != l11 {
+ t.Error("links 1-1 should be l11.")
+ }
+
+ links23 = mn.LinksBetweenPeers(p2, p3)
+ if len(links23) != 2 {
+ t.Errorf("should be 2 link bt. p2 and p3 (found %d)", len(links23))
+ }
+ if !((links23[0] == l23 && links23[1] == l32) ||
+ (links23[0] == l32 && links23[1] == l23)) {
+ t.Error("links 2-3 should be l23 and l32.")
+ }
+
+ // check connecting
+
+ // first, no conns
+ if len(n2.Conns()) > 0 || len(n3.Conns()) > 0 {
+ t.Error("should have 0 conn. Got: (%d, %d)", len(n2.Conns()), len(n3.Conns()))
+ }
+
+ // connect p2->p3
+ if _, err := n2.DialPeer(ctx, p3); err != nil {
+ t.Error(err)
+ }
+
+ if len(n2.Conns()) != 1 || len(n3.Conns()) != 1 {
+ t.Errorf("should have (1,1) conn. Got: (%d, %d)", len(n2.Conns()), len(n3.Conns()))
+ }
+
+ // p := PrinterTo(os.Stdout)
+ // p.NetworkConns(n1)
+ // p.NetworkConns(n2)
+ // p.NetworkConns(n3)
+
+ // can create a stream 2->3, 3->2,
+ if _, err := n2.NewStream(p3); err != nil {
+ t.Error(err)
+ }
+ if _, err := n3.NewStream(p2); err != nil {
+ t.Error(err)
+ }
+
+ // but not 1->2 nor 2->2 (not linked), nor 1->1 (not connected)
+ if _, err := n1.NewStream(p2); err == nil {
+ t.Error("should not be able to connect")
+ }
+ if _, err := n2.NewStream(p2); err == nil {
+ t.Error("should not be able to connect")
+ }
+ if _, err := n1.NewStream(p1); err == nil {
+ t.Error("should not be able to connect")
+ }
+
+ // connect p1->p1 (should work)
+ if _, err := n1.DialPeer(ctx, p1); err != nil {
+ t.Error("p1 should be able to dial self.", err)
+ }
+
+ // and a stream too
+ if _, err := n1.NewStream(p1); err != nil {
+ t.Error(err)
+ }
+
+ // connect p1->p2
+ if _, err := n1.DialPeer(ctx, p2); err == nil {
+ t.Error("p1 should not be able to dial p2, not connected...")
+ }
+
+ // connect p3->p1
+ if _, err := n3.DialPeer(ctx, p1); err == nil {
+ t.Error("p3 should not be able to dial p1, not connected...")
+ }
+
+ // relink p1->p2
+
+ l12, err = mn.LinkPeers(p1, p2)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !(l12.Networks()[0] == n1 && l12.Networks()[1] == n2) &&
+ !(l12.Networks()[0] == n2 && l12.Networks()[1] == n1) {
+ t.Error("l12 networks incorrect")
+ }
+
+ // should now be able to connect
+
+ // connect p1->p2
+ if _, err := n1.DialPeer(ctx, p2); err != nil {
+ t.Error(err)
+ }
+
+ // and a stream should work now too :)
+ if _, err := n2.NewStream(p3); err != nil {
+ t.Error(err)
+ }
+
+}
+
+func TestStreams(t *testing.T) {
+
+ mn, err := FullMeshConnected(context.Background(), 3)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ handler := func(s inet.Stream) {
+ b := make([]byte, 4)
+ if _, err := io.ReadFull(s, b); err != nil {
+ panic(err)
+ }
+ if !bytes.Equal(b, []byte("beep")) {
+ panic("bytes mismatch")
+ }
+ if _, err := s.Write([]byte("boop")); err != nil {
+ panic(err)
+ }
+ s.Close()
+ }
+
+ nets := mn.Nets()
+ for _, n := range nets {
+ n.SetStreamHandler(handler)
+ }
+
+ s, err := nets[0].NewStream(nets[1].LocalPeer())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if _, err := s.Write([]byte("beep")); err != nil {
+ panic(err)
+ }
+ b := make([]byte, 4)
+ if _, err := io.ReadFull(s, b); err != nil {
+ panic(err)
+ }
+ if !bytes.Equal(b, []byte("boop")) {
+ panic("bytes mismatch 2")
+ }
+
+}
+
+func makePinger(st string, n int) func(inet.Stream) {
+ return func(s inet.Stream) {
+ go func() {
+ defer s.Close()
+
+ for i := 0; i < n; i++ {
+ b := make([]byte, 4+len(st))
+ if _, err := s.Write([]byte("ping" + st)); err != nil {
+ panic(err)
+ }
+ if _, err := io.ReadFull(s, b); err != nil {
+ panic(err)
+ }
+ if !bytes.Equal(b, []byte("pong"+st)) {
+ panic("bytes mismatch")
+ }
+ }
+ }()
+ }
+}
+
+func makePonger(st string) func(inet.Stream) {
+ return func(s inet.Stream) {
+ go func() {
+ defer s.Close()
+
+ for {
+ b := make([]byte, 4+len(st))
+ if _, err := io.ReadFull(s, b); err != nil {
+ if err == io.EOF {
+ return
+ }
+ panic(err)
+ }
+ if !bytes.Equal(b, []byte("ping"+st)) {
+ panic("bytes mismatch")
+ }
+ if _, err := s.Write([]byte("pong" + st)); err != nil {
+ panic(err)
+ }
+ }
+ }()
+ }
+}
+
+func TestStreamsStress(t *testing.T) {
+
+ mn, err := FullMeshConnected(context.Background(), 100)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ nets := mn.Nets()
+ for _, n := range nets {
+ n.SetStreamHandler(makePonger("pingpong"))
+ }
+
+ var wg sync.WaitGroup
+ for i := 0; i < 1000; i++ {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ from := rand.Intn(len(nets))
+ to := rand.Intn(len(nets))
+ s, err := nets[from].NewStream(nets[to].LocalPeer())
+ if err != nil {
+ log.Debugf("%d (%s) %d (%s)", from, nets[from], to, nets[to])
+ panic(err)
+ }
+
+ log.Infof("%d start pinging", i)
+ makePinger("pingpong", rand.Intn(100))(s)
+ log.Infof("%d done pinging", i)
+ }(i)
+ }
+
+ wg.Wait()
+}
+
+func TestAdding(t *testing.T) {
+
+ mn := New(context.Background())
+
+ peers := []peer.ID{}
+ for i := 0; i < 3; i++ {
+ sk, _, err := testutil.RandKeyPair(512)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ a := testutil.RandLocalTCPAddress()
+ n, err := mn.AddPeer(sk, a)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ peers = append(peers, n.LocalPeer())
+ }
+
+ p1 := peers[0]
+ p2 := peers[1]
+
+ // link them
+ for _, p1 := range peers {
+ for _, p2 := range peers {
+ if _, err := mn.LinkPeers(p1, p2); err != nil {
+ t.Error(err)
+ }
+ }
+ }
+
+ // set the new stream handler on p2
+ n2 := mn.Net(p2)
+ if n2 == nil {
+ t.Fatalf("no network for %s", p2)
+ }
+ n2.SetStreamHandler(func(s inet.Stream) {
+ defer s.Close()
+
+ b := make([]byte, 4)
+ if _, err := io.ReadFull(s, b); err != nil {
+ panic(err)
+ }
+ if string(b) != "beep" {
+ panic("did not beep!")
+ }
+
+ if _, err := s.Write([]byte("boop")); err != nil {
+ panic(err)
+ }
+ })
+
+ // connect p1 to p2
+ if _, err := mn.ConnectPeers(p1, p2); err != nil {
+ t.Fatal(err)
+ }
+
+ // talk to p2
+ n1 := mn.Net(p1)
+ if n1 == nil {
+ t.Fatalf("no network for %s", p1)
+ }
+
+ s, err := n1.NewStream(p2)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if _, err := s.Write([]byte("beep")); err != nil {
+ t.Error(err)
+ }
+ b := make([]byte, 4)
+ if _, err := io.ReadFull(s, b); err != nil {
+ t.Error(err)
+ }
+ if !bytes.Equal(b, []byte("boop")) {
+ t.Error("bytes mismatch 2")
+ }
+
+}
diff --git a/p2p/net2/swarm/addr.go b/p2p/net2/swarm/addr.go
new file mode 100644
index 000000000..01cb39717
--- /dev/null
+++ b/p2p/net2/swarm/addr.go
@@ -0,0 +1,124 @@
+package swarm
+
+import (
+ conn "github.com/jbenet/go-ipfs/p2p/net/conn"
+ eventlog "github.com/jbenet/go-ipfs/util/eventlog"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+ manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
+)
+
+// ListenAddresses returns a list of addresses at which this swarm listens.
+func (s *Swarm) ListenAddresses() []ma.Multiaddr {
+ listeners := s.swarm.Listeners()
+ addrs := make([]ma.Multiaddr, 0, len(listeners))
+ for _, l := range listeners {
+ if l2, ok := l.NetListener().(conn.Listener); ok {
+ addrs = append(addrs, l2.Multiaddr())
+ }
+ }
+ return addrs
+}
+
+// InterfaceListenAddresses returns a list of addresses at which this swarm
+// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
+// use the known local interfaces.
+func InterfaceListenAddresses(s *Swarm) ([]ma.Multiaddr, error) {
+ return resolveUnspecifiedAddresses(s.ListenAddresses())
+}
+
+// resolveUnspecifiedAddresses expands unspecified ip addresses (/ip4/0.0.0.0, /ip6/::) to
+// use the known local interfaces.
+func resolveUnspecifiedAddresses(unspecifiedAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
+ var outputAddrs []ma.Multiaddr
+
+ // todo optimize: only fetch these if we have a "any" addr.
+ ifaceAddrs, err := interfaceAddresses()
+ if err != nil {
+ return nil, err
+ }
+
+ for _, a := range unspecifiedAddrs {
+
+ // split address into its components
+ split := ma.Split(a)
+
+ // if first component (ip) is not unspecified, use it as is.
+ if !manet.IsIPUnspecified(split[0]) {
+ outputAddrs = append(outputAddrs, a)
+ continue
+ }
+
+ // unspecified? add one address per interface.
+ for _, ia := range ifaceAddrs {
+ split[0] = ia
+ joined := ma.Join(split...)
+ outputAddrs = append(outputAddrs, joined)
+ }
+ }
+
+ log.Event(context.TODO(), "interfaceListenAddresses", func() eventlog.Loggable {
+ var addrs []string
+ for _, addr := range outputAddrs {
+ addrs = append(addrs, addr.String())
+ }
+ return eventlog.Metadata{"addresses": addrs}
+ }())
+ log.Debug("InterfaceListenAddresses:", outputAddrs)
+ return outputAddrs, nil
+}
+
+// interfaceAddresses returns a list of addresses associated with local machine
+func interfaceAddresses() ([]ma.Multiaddr, error) {
+ maddrs, err := manet.InterfaceMultiaddrs()
+ if err != nil {
+ return nil, err
+ }
+
+ var nonLoopback []ma.Multiaddr
+ for _, a := range maddrs {
+ if !manet.IsIPLoopback(a) {
+ nonLoopback = append(nonLoopback, a)
+ }
+ }
+
+ return nonLoopback, nil
+}
+
+// addrInList returns whether or not an address is part of a list.
+// this is useful to check if NAT is happening (or other bugs?)
+func addrInList(addr ma.Multiaddr, list []ma.Multiaddr) bool {
+ for _, addr2 := range list {
+ if addr.Equal(addr2) {
+ return true
+ }
+ }
+ return false
+}
+
+// checkNATWarning checks if our observed addresses differ. if so,
+// informs the user that certain things might not work yet
+func checkNATWarning(s *Swarm, observed ma.Multiaddr, expected ma.Multiaddr) {
+ if observed.Equal(expected) {
+ return
+ }
+
+ listen, err := InterfaceListenAddresses(s)
+ if err != nil {
+ log.Errorf("Error retrieving swarm.InterfaceListenAddresses: %s", err)
+ return
+ }
+
+ if !addrInList(observed, listen) { // probably a nat
+ log.Warningf(natWarning, observed, listen)
+ }
+}
+
+const natWarning = `Remote peer observed our address to be: %s
+The local addresses are: %s
+Thus, connection is going through NAT, and other connections may fail.
+
+IPFS NAT traversal is still under development. Please bug us on github or irc to fix this.
+Baby steps: http://jbenet.static.s3.amazonaws.com/271dfcf/baby-steps.gif
+`
diff --git a/p2p/net2/swarm/simul_test.go b/p2p/net2/swarm/simul_test.go
new file mode 100644
index 000000000..b61f3f03c
--- /dev/null
+++ b/p2p/net2/swarm/simul_test.go
@@ -0,0 +1,66 @@
+package swarm
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+func TestSimultOpen(t *testing.T) {
+ // t.Skip("skipping for another test")
+
+ ctx := context.Background()
+ swarms, peers := makeSwarms(ctx, t, 2)
+
+ // connect everyone
+ {
+ var wg sync.WaitGroup
+ connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
+ // copy for other peer
+ s.peers.AddAddress(dst, addr)
+ if _, err := s.Dial(ctx, dst); err != nil {
+ t.Fatal("error swarm dialing to peer", err)
+ }
+ wg.Done()
+ }
+
+ log.Info("Connecting swarms simultaneously.")
+ wg.Add(2)
+ go connect(swarms[0], swarms[1].local, peers[1].Addr)
+ go connect(swarms[1], swarms[0].local, peers[0].Addr)
+ wg.Wait()
+ }
+
+ for _, s := range swarms {
+ s.Close()
+ }
+}
+
+func TestSimultOpenMany(t *testing.T) {
+ // t.Skip("very very slow")
+
+ addrs := 20
+ SubtestSwarm(t, addrs, 10)
+}
+
+func TestSimultOpenFewStress(t *testing.T) {
+ if testing.Short() {
+ t.SkipNow()
+ }
+ // t.Skip("skipping for another test")
+
+ msgs := 40
+ swarms := 2
+ rounds := 10
+ // rounds := 100
+
+ for i := 0; i < rounds; i++ {
+ SubtestSwarm(t, swarms, msgs)
+ <-time.After(10 * time.Millisecond)
+ }
+}
diff --git a/p2p/net2/swarm/swarm.go b/p2p/net2/swarm/swarm.go
new file mode 100644
index 000000000..74201f4f0
--- /dev/null
+++ b/p2p/net2/swarm/swarm.go
@@ -0,0 +1,158 @@
+// package swarm implements a connection muxer with a pair of channels
+// to synchronize all network communication.
+package swarm
+
+import (
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ eventlog "github.com/jbenet/go-ipfs/util/eventlog"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+ ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
+)
+
+var log = eventlog.Logger("swarm2")
+
+// Swarm is a connection muxer, allowing connections to other peers to
+// be opened and closed, while still using the same Chan for all
+// communication. The Chan sends/receives Messages, which note the
+// destination or source Peer.
+//
+// Uses peerstream.Swarm
+type Swarm struct {
+ swarm *ps.Swarm
+ local peer.ID
+ peers peer.Peerstore
+ connh ConnHandler
+
+ cg ctxgroup.ContextGroup
+}
+
+// NewSwarm constructs a Swarm, with a Chan.
+func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
+ local peer.ID, peers peer.Peerstore) (*Swarm, error) {
+
+ s := &Swarm{
+ swarm: ps.NewSwarm(),
+ local: local,
+ peers: peers,
+ cg: ctxgroup.WithContext(ctx),
+ }
+
+ // configure Swarm
+ s.cg.SetTeardown(s.teardown)
+ s.SetConnHandler(nil) // make sure to setup our own conn handler.
+
+ return s, s.listen(listenAddrs)
+}
+
+func (s *Swarm) teardown() error {
+ return s.swarm.Close()
+}
+
+// CtxGroup returns the Context Group of the swarm
+func (s *Swarm) CtxGroup() ctxgroup.ContextGroup {
+ return s.cg
+}
+
+// Close stops the Swarm.
+func (s *Swarm) Close() error {
+ return s.cg.Close()
+}
+
+// StreamSwarm returns the underlying peerstream.Swarm
+func (s *Swarm) StreamSwarm() *ps.Swarm {
+ return s.swarm
+}
+
+// SetConnHandler assigns the handler for new connections.
+// See peerstream. You will rarely use this. See SetStreamHandler
+func (s *Swarm) SetConnHandler(handler ConnHandler) {
+
+ // handler is nil if user wants to clear the old handler.
+ if handler == nil {
+ s.swarm.SetConnHandler(func(psconn *ps.Conn) {
+ s.connHandler(psconn)
+ })
+ return
+ }
+
+ s.swarm.SetConnHandler(func(psconn *ps.Conn) {
+ // sc is nil if closed in our handler.
+ if sc := s.connHandler(psconn); sc != nil {
+ // call the user's handler. in a goroutine for sync safety.
+ go handler(sc)
+ }
+ })
+}
+
+// SetStreamHandler assigns the handler for new streams.
+// See peerstream.
+func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) {
+ s.swarm.SetStreamHandler(func(s *ps.Stream) {
+ handler(wrapStream(s))
+ })
+}
+
+// NewStreamWithPeer creates a new stream on any available connection to p
+func (s *Swarm) NewStreamWithPeer(p peer.ID) (*Stream, error) {
+ // if we have no connections, try connecting.
+ if len(s.ConnectionsToPeer(p)) == 0 {
+ log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...")
+ if _, err := s.Dial(context.Background(), p); err != nil {
+ return nil, err
+ }
+ }
+ log.Debug("Swarm: NewStreamWithPeer...")
+
+ st, err := s.swarm.NewStreamWithGroup(p)
+ return wrapStream(st), err
+}
+
+// StreamsWithPeer returns all the live Streams to p
+func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream {
+ return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams()))
+}
+
+// ConnectionsToPeer returns all the live connections to p
+func (s *Swarm) ConnectionsToPeer(p peer.ID) []*Conn {
+ return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns()))
+}
+
+// Connections returns a slice of all connections.
+func (s *Swarm) Connections() []*Conn {
+ return wrapConns(s.swarm.Conns())
+}
+
+// CloseConnection removes a given peer from swarm + closes the connection
+func (s *Swarm) CloseConnection(p peer.ID) error {
+ conns := s.swarm.ConnsWithGroup(p) // boom.
+ for _, c := range conns {
+ c.Close()
+ }
+ return nil
+}
+
+// Peers returns a copy of the set of peers swarm is connected to.
+func (s *Swarm) Peers() []peer.ID {
+ conns := s.Connections()
+
+ seen := make(map[peer.ID]struct{})
+ peers := make([]peer.ID, 0, len(conns))
+ for _, c := range conns {
+ p := c.RemotePeer()
+ if _, found := seen[p]; found {
+ continue
+ }
+
+ peers = append(peers, p)
+ }
+ return peers
+}
+
+// LocalPeer returns the local peer swarm is associated to.
+func (s *Swarm) LocalPeer() peer.ID {
+ return s.local
+}
diff --git a/p2p/net2/swarm/swarm_conn.go b/p2p/net2/swarm/swarm_conn.go
new file mode 100644
index 000000000..dd00a89ba
--- /dev/null
+++ b/p2p/net2/swarm/swarm_conn.go
@@ -0,0 +1,141 @@
+package swarm
+
+import (
+ "fmt"
+
+ ic "github.com/jbenet/go-ipfs/p2p/crypto"
+ conn "github.com/jbenet/go-ipfs/p2p/net/conn"
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+ ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
+)
+
+// a Conn is a simple wrapper around a ps.Conn that also exposes
+// some of the methods from the underlying conn.Conn.
+// There's **five** "layers" to each connection:
+// * 0. the net.Conn - underlying net.Conn (TCP/UDP/UTP/etc)
+// * 1. the manet.Conn - provides multiaddr friendly Conn
+// * 2. the conn.Conn - provides Peer friendly Conn (inc Secure channel)
+// * 3. the peerstream.Conn - provides peerstream / spdysptream happiness
+// * 4. the Conn - abstracts everyting out, exposing only key parts of underlying layers
+// (I know, this is kinda crazy. it's more historical than a good design. though the
+// layers do build up pieces of functionality. and they're all just io.RW :) )
+type Conn ps.Conn
+
+// ConnHandler is called when new conns are opened from remote peers.
+// See peerstream.ConnHandler
+type ConnHandler func(*Conn)
+
+func (c *Conn) StreamConn() *ps.Conn {
+ return (*ps.Conn)(c)
+}
+
+func (c *Conn) RawConn() conn.Conn {
+ // righly panic if these things aren't true. it is an expected
+ // invariant that these Conns are all of the typewe expect:
+ // ps.Conn wrapping a conn.Conn
+ // if we get something else it is programmer error.
+ return (*ps.Conn)(c).NetConn().(conn.Conn)
+}
+
+func (c *Conn) String() string {
+ return fmt.Sprintf("", c.RawConn())
+}
+
+// LocalMultiaddr is the Multiaddr on this side
+func (c *Conn) LocalMultiaddr() ma.Multiaddr {
+ return c.RawConn().LocalMultiaddr()
+}
+
+// LocalPeer is the Peer on our side of the connection
+func (c *Conn) LocalPeer() peer.ID {
+ return c.RawConn().LocalPeer()
+}
+
+// RemoteMultiaddr is the Multiaddr on the remote side
+func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
+ return c.RawConn().RemoteMultiaddr()
+}
+
+// RemotePeer is the Peer on the remote side
+func (c *Conn) RemotePeer() peer.ID {
+ return c.RawConn().RemotePeer()
+}
+
+// LocalPrivateKey is the public key of the peer on this side
+func (c *Conn) LocalPrivateKey() ic.PrivKey {
+ return c.RawConn().LocalPrivateKey()
+}
+
+// RemotePublicKey is the public key of the peer on the remote side
+func (c *Conn) RemotePublicKey() ic.PubKey {
+ return c.RawConn().RemotePublicKey()
+}
+
+// NewSwarmStream returns a new Stream from this connection
+func (c *Conn) NewSwarmStream() (*Stream, error) {
+ s, err := c.StreamConn().NewStream()
+ return wrapStream(s), err
+}
+
+// NewStream returns a new Stream from this connection
+func (c *Conn) NewStream() (inet.Stream, error) {
+ s, err := c.NewSwarmStream()
+ return inet.Stream(s), err
+}
+
+func (c *Conn) Close() error {
+ return c.StreamConn().Close()
+}
+
+func wrapConn(psc *ps.Conn) (*Conn, error) {
+ // grab the underlying connection.
+ if _, ok := psc.NetConn().(conn.Conn); !ok {
+ // this should never happen. if we see it ocurring it means that we added
+ // a Listener to the ps.Swarm that is NOT one of our net/conn.Listener.
+ return nil, fmt.Errorf("swarm connHandler: invalid conn (not a conn.Conn): %s", psc)
+ }
+ return (*Conn)(psc), nil
+}
+
+// wrapConns returns a *Conn for all these ps.Conns
+func wrapConns(conns1 []*ps.Conn) []*Conn {
+ conns2 := make([]*Conn, len(conns1))
+ for i, c1 := range conns1 {
+ if c2, err := wrapConn(c1); err == nil {
+ conns2[i] = c2
+ }
+ }
+ return conns2
+}
+
+// newConnSetup does the swarm's "setup" for a connection. returns the underlying
+// conn.Conn this method is used by both swarm.Dial and ps.Swarm connHandler
+func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error) {
+
+ // wrap with a Conn
+ sc, err := wrapConn(psConn)
+ if err != nil {
+ return nil, err
+ }
+
+ // if we have a public key, make sure we add it to our peerstore!
+ // This is an important detail. Otherwise we must fetch the public
+ // key from the DHT or some other system.
+ if pk := sc.RemotePublicKey(); pk != nil {
+ s.peers.AddPubKey(sc.RemotePeer(), pk)
+ }
+
+ // ok great! we can use it. add it to our group.
+
+ // set the RemotePeer as a group on the conn. this lets us group
+ // connections in the StreamSwarm by peer, and get a streams from
+ // any available connection in the group (better multiconn):
+ // swarm.StreamSwarm().NewStreamWithGroup(remotePeer)
+ psConn.AddGroup(sc.RemotePeer())
+
+ return sc, nil
+}
diff --git a/p2p/net2/swarm/swarm_dial.go b/p2p/net2/swarm/swarm_dial.go
new file mode 100644
index 000000000..dde967fbc
--- /dev/null
+++ b/p2p/net2/swarm/swarm_dial.go
@@ -0,0 +1,104 @@
+package swarm
+
+import (
+ "errors"
+ "fmt"
+
+ conn "github.com/jbenet/go-ipfs/p2p/net/conn"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+)
+
+// Dial connects to a peer.
+//
+// The idea is that the client of Swarm does not need to know what network
+// the connection will happen over. Swarm can use whichever it choses.
+// This allows us to use various transport protocols, do NAT traversal/relay,
+// etc. to achive connection.
+func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
+
+ if p == s.local {
+ return nil, errors.New("Attempted connection to self!")
+ }
+
+ // check if we already have an open connection first
+ cs := s.ConnectionsToPeer(p)
+ for _, c := range cs {
+ if c != nil { // dump out the first one we find
+ return c, nil
+ }
+ }
+
+ sk := s.peers.PrivKey(s.local)
+ if sk == nil {
+ // may be fine for sk to be nil, just log a warning.
+ log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
+ }
+
+ remoteAddrs := s.peers.Addresses(p)
+ if len(remoteAddrs) == 0 {
+ return nil, errors.New("peer has no addresses")
+ }
+ localAddrs := s.peers.Addresses(s.local)
+ if len(localAddrs) == 0 {
+ log.Debug("Dialing out with no local addresses.")
+ }
+
+ // open connection to peer
+ d := &conn.Dialer{
+ LocalPeer: s.local,
+ LocalAddrs: localAddrs,
+ PrivateKey: sk,
+ }
+
+ // try to connect to one of the peer's known addresses.
+ // for simplicity, we do this sequentially.
+ // A future commit will do this asynchronously.
+ var connC conn.Conn
+ var err error
+ for _, addr := range remoteAddrs {
+ connC, err = d.Dial(ctx, addr, p)
+ if err == nil {
+ break
+ }
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ // ok try to setup the new connection.
+ swarmC, err := dialConnSetup(ctx, s, connC)
+ if err != nil {
+ log.Error("Dial newConnSetup failed. disconnecting.")
+ log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
+ swarmC.Close() // close the connection. didn't work out :(
+ return nil, err
+ }
+
+ log.Event(ctx, "dial", p)
+ return swarmC, nil
+}
+
+// dialConnSetup is the setup logic for a connection from the dial side. it
+// needs to add the Conn to the StreamSwarm, then run newConnSetup
+func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {
+
+ psC, err := s.swarm.AddConn(connC)
+ if err != nil {
+ // connC is closed by caller if we fail.
+ return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
+ }
+
+ // ok try to setup the new connection. (newConnSetup will add to group)
+ swarmC, err := s.newConnSetup(ctx, psC)
+ if err != nil {
+ log.Error("Dial newConnSetup failed. disconnecting.")
+ log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
+ swarmC.Close() // we need to call this to make sure psC is Closed.
+ return nil, err
+ }
+
+ return swarmC, err
+}
diff --git a/p2p/net2/swarm/swarm_listen.go b/p2p/net2/swarm/swarm_listen.go
new file mode 100644
index 000000000..ba0bee2e7
--- /dev/null
+++ b/p2p/net2/swarm/swarm_listen.go
@@ -0,0 +1,86 @@
+package swarm
+
+import (
+ conn "github.com/jbenet/go-ipfs/p2p/net/conn"
+ lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+ ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
+ multierr "github.com/jbenet/go-ipfs/util/multierr"
+)
+
+// Open listeners for each network the swarm should listen on
+func (s *Swarm) listen(addrs []ma.Multiaddr) error {
+ retErr := multierr.New()
+
+ // listen on every address
+ for i, addr := range addrs {
+ err := s.setupListener(addr)
+ if err != nil {
+ if retErr.Errors == nil {
+ retErr.Errors = make([]error, len(addrs))
+ }
+ retErr.Errors[i] = err
+ log.Errorf("Failed to listen on: %s - %s", addr, err)
+ }
+ }
+
+ if retErr.Errors != nil {
+ return retErr
+ }
+ return nil
+}
+
+// Listen for new connections on the given multiaddr
+func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
+
+ // TODO rethink how this has to work. (jbenet)
+ //
+ // resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
+ // if err != nil {
+ // return err
+ // }
+ // for _, a := range resolved {
+ // s.peers.AddAddress(s.local, a)
+ // }
+
+ sk := s.peers.PrivKey(s.local)
+ if sk == nil {
+ // may be fine for sk to be nil, just log a warning.
+ log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
+ }
+ list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
+ if err != nil {
+ return err
+ }
+
+ // AddListener to the peerstream Listener. this will begin accepting connections
+ // and streams!
+ _, err = s.swarm.AddListener(list)
+ return err
+}
+
+// connHandler is called by the StreamSwarm whenever a new connection is added
+// here we configure it slightly. Note that this is sequential, so if anything
+// will take a while do it in a goroutine.
+// See https://godoc.org/github.com/jbenet/go-peerstream for more information
+func (s *Swarm) connHandler(c *ps.Conn) *Conn {
+ ctx := context.Background()
+ // this context is for running the handshake, which -- when receiveing connections
+ // -- we have no bound on beyond what the transport protocol bounds it at.
+ // note that setup + the handshake are bounded by underlying io.
+ // (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
+ // Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
+ // as long as the conn is live (TCP says its online), it tries its best. we follow suit.)
+
+ sc, err := s.newConnSetup(ctx, c)
+ if err != nil {
+ log.Error(err)
+ log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
+ c.Close() // boom. close it.
+ return nil
+ }
+
+ return sc
+}
diff --git a/p2p/net2/swarm/swarm_net.go b/p2p/net2/swarm/swarm_net.go
new file mode 100644
index 000000000..eab60a36f
--- /dev/null
+++ b/p2p/net2/swarm/swarm_net.go
@@ -0,0 +1,156 @@
+package swarm
+
+import (
+ "fmt"
+
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+// Network implements the inet.Network interface.
+// It is simply a swarm, with a few different functions
+// to implement inet.Network.
+type Network Swarm
+
+// NewNetwork constructs a new network and starts listening on given addresses.
+func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID,
+ peers peer.Peerstore) (*Network, error) {
+
+ s, err := NewSwarm(ctx, listen, local, peers)
+ if err != nil {
+ return nil, err
+ }
+
+ return (*Network)(s), nil
+}
+
+// DialPeer attempts to establish a connection to a given peer.
+// Respects the context.
+func (n *Network) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
+ log.Debugf("[%s] network dialing peer [%s]", n.local, p)
+ sc, err := n.Swarm().Dial(ctx, p)
+ if err != nil {
+ return nil, err
+ }
+
+ log.Debugf("network for %s finished dialing %s", n.local, p)
+ return inet.Conn(sc), nil
+}
+
+// CtxGroup returns the network's ContextGroup
+func (n *Network) CtxGroup() ctxgroup.ContextGroup {
+ return n.cg
+}
+
+// Swarm returns the network's peerstream.Swarm
+func (n *Network) Swarm() *Swarm {
+ return (*Swarm)(n)
+}
+
+// LocalPeer the network's LocalPeer
+func (n *Network) LocalPeer() peer.ID {
+ return n.Swarm().LocalPeer()
+}
+
+// Peers returns the connected peers
+func (n *Network) Peers() []peer.ID {
+ return n.Swarm().Peers()
+}
+
+// Peers returns the connected peers
+func (n *Network) Peerstore() peer.Peerstore {
+ return n.Swarm().peers
+}
+
+// Conns returns the connected peers
+func (n *Network) Conns() []inet.Conn {
+ conns1 := n.Swarm().Connections()
+ out := make([]inet.Conn, len(conns1))
+ for i, c := range conns1 {
+ out[i] = inet.Conn(c)
+ }
+ return out
+}
+
+// ConnsToPeer returns the connections in this Netowrk for given peer.
+func (n *Network) ConnsToPeer(p peer.ID) []inet.Conn {
+ conns1 := n.Swarm().ConnectionsToPeer(p)
+ out := make([]inet.Conn, len(conns1))
+ for i, c := range conns1 {
+ out[i] = inet.Conn(c)
+ }
+ return out
+}
+
+// ClosePeer connection to peer
+func (n *Network) ClosePeer(p peer.ID) error {
+ return n.Swarm().CloseConnection(p)
+}
+
+// close is the real teardown function
+func (n *Network) close() error {
+ return n.Swarm().Close()
+}
+
+// Close calls the ContextCloser func
+func (n *Network) Close() error {
+ return n.Swarm().cg.Close()
+}
+
+// ListenAddresses returns a list of addresses at which this network listens.
+func (n *Network) ListenAddresses() []ma.Multiaddr {
+ return n.Swarm().ListenAddresses()
+}
+
+// InterfaceListenAddresses returns a list of addresses at which this network
+// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
+// use the known local interfaces.
+func (n *Network) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
+ return InterfaceListenAddresses(n.Swarm())
+}
+
+// Connectedness returns a state signaling connection capabilities
+// For now only returns Connected || NotConnected. Expand into more later.
+func (n *Network) Connectedness(p peer.ID) inet.Connectedness {
+ c := n.Swarm().ConnectionsToPeer(p)
+ if c != nil && len(c) > 0 {
+ return inet.Connected
+ }
+ return inet.NotConnected
+}
+
+// NewStream returns a new stream to given peer p.
+// If there is no connection to p, attempts to create one.
+func (n *Network) NewStream(p peer.ID) (inet.Stream, error) {
+ log.Debugf("[%s] network opening stream to peer [%s]", n.local, p)
+ s, err := n.Swarm().NewStreamWithPeer(p)
+ if err != nil {
+ return nil, err
+ }
+
+ return inet.Stream(s), nil
+}
+
+// SetHandler sets the protocol handler on the Network's Muxer.
+// This operation is threadsafe.
+func (n *Network) SetStreamHandler(h inet.StreamHandler) {
+ n.Swarm().SetStreamHandler(h)
+}
+
+// SetConnHandler sets the conn handler on the Network.
+// This operation is threadsafe.
+func (n *Network) SetConnHandler(h inet.ConnHandler) {
+ n.Swarm().SetConnHandler(func(c *Conn) {
+ h(inet.Conn(c))
+ })
+}
+
+// String returns a string representation of Network.
+func (n *Network) String() string {
+ return fmt.Sprintf("", n.LocalPeer())
+}
diff --git a/p2p/net2/swarm/swarm_net_test.go b/p2p/net2/swarm/swarm_net_test.go
new file mode 100644
index 000000000..a0670b691
--- /dev/null
+++ b/p2p/net2/swarm/swarm_net_test.go
@@ -0,0 +1,78 @@
+package swarm_test
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+
+ inet "github.com/jbenet/go-ipfs/p2p/net"
+ netutil "github.com/jbenet/go-ipfs/p2p/net/swarmnet/util"
+)
+
+// TestConnectednessCorrect starts a few networks, connects a few
+// and tests Connectedness value is correct.
+func TestConnectednessCorrect(t *testing.T) {
+
+ ctx := context.Background()
+
+ nets := make([]inet.Network, 4)
+ for i := 0; i < 4; i++ {
+ nets[i] = netutil.GenNetwork(t, ctx)
+ }
+
+ // connect 0-1, 0-2, 0-3, 1-2, 2-3
+
+ dial := func(a, b inet.Network) {
+ netutil.DivulgeAddresses(b, a)
+ if err := a.DialPeer(ctx, b.LocalPeer()); err != nil {
+ t.Fatalf("Failed to dial: %s", err)
+ }
+ }
+
+ dial(nets[0], nets[1])
+ dial(nets[0], nets[3])
+ dial(nets[1], nets[2])
+ dial(nets[3], nets[2])
+
+ // there's something wrong with dial, i think. it's not finishing
+ // completely. there must be some async stuff.
+ <-time.After(100 * time.Millisecond)
+
+ // test those connected show up correctly
+
+ // test connected
+ expectConnectedness(t, nets[0], nets[1], inet.Connected)
+ expectConnectedness(t, nets[0], nets[3], inet.Connected)
+ expectConnectedness(t, nets[1], nets[2], inet.Connected)
+ expectConnectedness(t, nets[3], nets[2], inet.Connected)
+
+ // test not connected
+ expectConnectedness(t, nets[0], nets[2], inet.NotConnected)
+ expectConnectedness(t, nets[1], nets[3], inet.NotConnected)
+
+ for _, n := range nets {
+ n.Close()
+ }
+}
+
+func expectConnectedness(t *testing.T, a, b inet.Network, expected inet.Connectedness) {
+ es := "%s is connected to %s, but Connectedness incorrect. %s %s"
+ if a.Connectedness(b.LocalPeer()) != expected {
+ t.Errorf(es, a, b, printConns(a), printConns(b))
+ }
+
+ // test symmetric case
+ if b.Connectedness(a.LocalPeer()) != expected {
+ t.Errorf(es, b, a, printConns(b), printConns(a))
+ }
+}
+
+func printConns(n inet.Network) string {
+ s := fmt.Sprintf("Connections in %s:\n", n)
+ for _, c := range n.Conns() {
+ s = s + fmt.Sprintf("- %s\n", c)
+ }
+ return s
+}
diff --git a/p2p/net2/swarm/swarm_stream.go b/p2p/net2/swarm/swarm_stream.go
new file mode 100644
index 000000000..82a66e020
--- /dev/null
+++ b/p2p/net2/swarm/swarm_stream.go
@@ -0,0 +1,59 @@
+package swarm
+
+import (
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+
+ ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
+)
+
+// a Stream is a wrapper around a ps.Stream that exposes a way to get
+// our Conn and Swarm (instead of just the ps.Conn and ps.Swarm)
+type Stream ps.Stream
+
+// Stream returns the underlying peerstream.Stream
+func (s *Stream) Stream() *ps.Stream {
+ return (*ps.Stream)(s)
+}
+
+// Conn returns the Conn associated with this Stream, as an inet.Conn
+func (s *Stream) Conn() inet.Conn {
+ return s.SwarmConn()
+}
+
+// SwarmConn returns the Conn associated with this Stream, as a *Conn
+func (s *Stream) SwarmConn() *Conn {
+ return (*Conn)(s.Stream().Conn())
+}
+
+// Wait waits for the stream to receive a reply.
+func (s *Stream) Wait() error {
+ return s.Stream().Wait()
+}
+
+// Read reads bytes from a stream.
+func (s *Stream) Read(p []byte) (n int, err error) {
+ return s.Stream().Read(p)
+}
+
+// Write writes bytes to a stream, flushing for each call.
+func (s *Stream) Write(p []byte) (n int, err error) {
+ return s.Stream().Write(p)
+}
+
+// Close closes the stream, indicating this side is finished
+// with the stream.
+func (s *Stream) Close() error {
+ return s.Stream().Close()
+}
+
+func wrapStream(pss *ps.Stream) *Stream {
+ return (*Stream)(pss)
+}
+
+func wrapStreams(st []*ps.Stream) []*Stream {
+ out := make([]*Stream, len(st))
+ for i, s := range st {
+ out[i] = wrapStream(s)
+ }
+ return out
+}
diff --git a/p2p/net2/swarm/swarm_test.go b/p2p/net2/swarm/swarm_test.go
new file mode 100644
index 000000000..dfc9ca2d3
--- /dev/null
+++ b/p2p/net2/swarm/swarm_test.go
@@ -0,0 +1,269 @@
+package swarm
+
+import (
+ "bytes"
+ "io"
+ "sync"
+ "testing"
+ "time"
+
+ inet "github.com/jbenet/go-ipfs/p2p/net2"
+ peer "github.com/jbenet/go-ipfs/p2p/peer"
+ errors "github.com/jbenet/go-ipfs/util/debugerror"
+ testutil "github.com/jbenet/go-ipfs/util/testutil"
+
+ context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
+ ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
+)
+
+func EchoStreamHandler(stream inet.Stream) {
+ go func() {
+ defer stream.Close()
+
+ // pull out the ipfs conn
+ c := stream.Conn()
+ log.Debugf("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
+
+ buf := make([]byte, 4)
+
+ for {
+ if _, err := stream.Read(buf); err != nil {
+ if err != io.EOF {
+ log.Error("ping receive error:", err)
+ }
+ return
+ }
+
+ if !bytes.Equal(buf, []byte("ping")) {
+ log.Errorf("ping receive error: ping != %s %v", buf, buf)
+ return
+ }
+
+ if _, err := stream.Write([]byte("pong")); err != nil {
+ log.Error("pond send error:", err)
+ return
+ }
+ }
+ }()
+}
+
+func makeSwarms(ctx context.Context, t *testing.T, num int) ([]*Swarm, []testutil.PeerNetParams) {
+ swarms := make([]*Swarm, 0, num)
+ peersnp := make([]testutil.PeerNetParams, 0, num)
+
+ for i := 0; i < num; i++ {
+ localnp := testutil.RandPeerNetParamsOrFatal(t)
+ peersnp = append(peersnp, localnp)
+
+ peerstore := peer.NewPeerstore()
+ peerstore.AddAddress(localnp.ID, localnp.Addr)
+ peerstore.AddPubKey(localnp.ID, localnp.PubKey)
+ peerstore.AddPrivKey(localnp.ID, localnp.PrivKey)
+
+ addrs := peerstore.Addresses(localnp.ID)
+ swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ swarm.SetStreamHandler(EchoStreamHandler)
+ swarms = append(swarms, swarm)
+ }
+
+ return swarms, peersnp
+}
+
+func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm, peersnp []testutil.PeerNetParams) {
+
+ var wg sync.WaitGroup
+ connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
+ // TODO: make a DialAddr func.
+ s.peers.AddAddress(dst, addr)
+ if _, err := s.Dial(ctx, dst); err != nil {
+ t.Fatal("error swarm dialing to peer", err)
+ }
+ wg.Done()
+ }
+
+ log.Info("Connecting swarms simultaneously.")
+ for _, s := range swarms {
+ for _, p := range peersnp {
+ if p.ID != s.local { // don't connect to self.
+ wg.Add(1)
+ connect(s, p.ID, p.Addr)
+ }
+ }
+ }
+ wg.Wait()
+
+ for _, s := range swarms {
+ log.Infof("%s swarm routing table: %s", s.local, s.Peers())
+ }
+}
+
+func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
+ // t.Skip("skipping for another test")
+
+ ctx := context.Background()
+ swarms, peersnp := makeSwarms(ctx, t, SwarmNum)
+
+ // connect everyone
+ connectSwarms(t, ctx, swarms, peersnp)
+
+ // ping/pong
+ for _, s1 := range swarms {
+ log.Debugf("-------------------------------------------------------")
+ log.Debugf("%s ping pong round", s1.local)
+ log.Debugf("-------------------------------------------------------")
+
+ _, cancel := context.WithCancel(ctx)
+ got := map[peer.ID]int{}
+ errChan := make(chan error, MsgNum*len(peersnp))
+ streamChan := make(chan *Stream, MsgNum)
+
+ // send out "ping" x MsgNum to every peer
+ go func() {
+ defer close(streamChan)
+
+ var wg sync.WaitGroup
+ send := func(p peer.ID) {
+ defer wg.Done()
+
+ // first, one stream per peer (nice)
+ stream, err := s1.NewStreamWithPeer(p)
+ if err != nil {
+ errChan <- errors.Wrap(err)
+ return
+ }
+
+ // send out ping!
+ for k := 0; k < MsgNum; k++ { // with k messages
+ msg := "ping"
+ log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
+ stream.Write([]byte(msg))
+ }
+
+ // read it later
+ streamChan <- stream
+ }
+
+ for _, p := range peersnp {
+ if p.ID == s1.local {
+ continue // dont send to self...
+ }
+
+ wg.Add(1)
+ go send(p.ID)
+ }
+ wg.Wait()
+ }()
+
+ // receive "pong" x MsgNum from every peer
+ go func() {
+ defer close(errChan)
+ count := 0
+ countShouldBe := MsgNum * (len(peersnp) - 1)
+ for stream := range streamChan { // one per peer
+ defer stream.Close()
+
+ // get peer on the other side
+ p := stream.Conn().RemotePeer()
+
+ // receive pings
+ msgCount := 0
+ msg := make([]byte, 4)
+ for k := 0; k < MsgNum; k++ { // with k messages
+
+ // read from the stream
+ if _, err := stream.Read(msg); err != nil {
+ errChan <- errors.Wrap(err)
+ continue
+ }
+
+ if string(msg) != "pong" {
+ errChan <- errors.Errorf("unexpected message: %s", msg)
+ continue
+ }
+
+ log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
+ msgCount++
+ }
+
+ got[p] = msgCount
+ count += msgCount
+ }
+
+ if count != countShouldBe {
+ errChan <- errors.Errorf("count mismatch: %d != %d", count, countShouldBe)
+ }
+ }()
+
+ // check any errors (blocks till consumer is done)
+ for err := range errChan {
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ }
+
+ log.Debugf("%s got pongs", s1.local)
+ if (len(peersnp) - 1) != len(got) {
+ t.Errorf("got (%d) less messages than sent (%d).", len(got), len(peersnp))
+ }
+
+ for p, n := range got {
+ if n != MsgNum {
+ t.Error("peer did not get all msgs", p, n, "/", MsgNum)
+ }
+ }
+
+ cancel()
+ <-time.After(10 * time.Millisecond)
+ }
+
+ for _, s := range swarms {
+ s.Close()
+ }
+}
+
+func TestSwarm(t *testing.T) {
+ // t.Skip("skipping for another test")
+
+ // msgs := 1000
+ msgs := 100
+ swarms := 5
+ SubtestSwarm(t, swarms, msgs)
+}
+
+func TestConnHandler(t *testing.T) {
+ // t.Skip("skipping for another test")
+
+ ctx := context.Background()
+ swarms, peersnp := makeSwarms(ctx, t, 5)
+
+ gotconn := make(chan struct{}, 10)
+ swarms[0].SetConnHandler(func(conn *Conn) {
+ gotconn <- struct{}{}
+ })
+
+ connectSwarms(t, ctx, swarms, peersnp)
+
+ <-time.After(time.Millisecond)
+ // should've gotten 5 by now.
+
+ swarms[0].SetConnHandler(nil)
+
+ expect := 4
+ for i := 0; i < expect; i++ {
+ select {
+ case <-time.After(time.Second):
+ t.Fatal("failed to get connections")
+ case <-gotconn:
+ }
+ }
+
+ select {
+ case <-gotconn:
+ t.Fatalf("should have connected to %d swarms", expect)
+ default:
+ }
+}