swarm now uses peer.ID

This commit is contained in:
Juan Batiz-Benet 2014-12-19 21:05:18 -08:00
parent 35f1d3c236
commit e535b0e520
6 changed files with 101 additions and 200 deletions

View File

@ -1,36 +1,29 @@
package swarm
import (
"fmt"
"sync"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func TestSimultOpen(t *testing.T) {
// t.Skip("skipping for another test")
addrs := []string{
"/ip4/127.0.0.1/tcp/1244",
"/ip4/127.0.0.1/tcp/1245",
}
ctx := context.Background()
swarms, _ := makeSwarms(ctx, t, addrs)
swarms, peers := makeSwarms(ctx, t, 2)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID) {
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
cp := testutil.NewPeerWithID(dst.ID())
cp.AddAddress(dst.Addresses()[0])
if _, err := s.Dial(ctx, cp); err != nil {
s.peers.AddAddress(dst, addr)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
@ -38,8 +31,8 @@ func TestSimultOpen(t *testing.T) {
log.Info("Connecting swarms simultaneously.")
wg.Add(2)
go connect(swarms[0], swarms[1].local)
go connect(swarms[1], swarms[0].local)
go connect(swarms[0], swarms[1].local, peers[1].Addr)
go connect(swarms[1], swarms[0].local, peers[0].Addr)
wg.Wait()
}
@ -51,13 +44,7 @@ func TestSimultOpen(t *testing.T) {
func TestSimultOpenMany(t *testing.T) {
// t.Skip("very very slow")
many := 10
addrs := []string{}
for i := 2200; i < (2200 + many); i++ {
s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i)
addrs = append(addrs, s)
}
addrs := 20
SubtestSwarm(t, addrs, 10)
}
@ -67,14 +54,13 @@ func TestSimultOpenFewStress(t *testing.T) {
}
// t.Skip("skipping for another test")
num := 10
// num := 100
for i := 0; i < num; i++ {
addrs := []string{
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 1900+i),
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2900+i),
}
msgs := 40
swarms := 2
rounds := 10
// rounds := 100
SubtestSwarm(t, addrs, 10)
for i := 0; i < rounds; i++ {
SubtestSwarm(t, swarms, msgs)
<-time.After(10 * time.Millisecond)
}
}

View File

@ -22,7 +22,7 @@ var log = eventlog.Logger("swarm2")
// Uses peerstream.Swarm
type Swarm struct {
swarm *ps.Swarm
local peer.Peer
local peer.ID
peers peer.Peerstore
cg ctxgroup.ContextGroup
@ -30,13 +30,7 @@ type Swarm struct {
// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local peer.Peer, peers peer.Peerstore) (*Swarm, error) {
// make sure our own peer is in our peerstore...
local, err := peers.Add(local)
if err != nil {
return nil, err
}
local peer.ID, peers peer.Peerstore) (*Swarm, error) {
s := &Swarm{
swarm: ps.NewSwarm(),
@ -80,13 +74,7 @@ func (s *Swarm) SetStreamHandler(handler StreamHandler) {
}
// NewStreamWithPeer creates a new stream on any available connection to p
func (s *Swarm) NewStreamWithPeer(p peer.Peer) (*Stream, error) {
// make sure we use OUR peers. (the tests mess with you...)
p, err := s.peers.Add(p)
if err != nil {
return nil, err
}
func (s *Swarm) NewStreamWithPeer(p peer.ID) (*Stream, error) {
// if we have no connections, try connecting.
if len(s.ConnectionsToPeer(p)) == 0 {
log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...")
@ -101,21 +89,12 @@ func (s *Swarm) NewStreamWithPeer(p peer.Peer) (*Stream, error) {
}
// StreamsWithPeer returns all the live Streams to p
func (s *Swarm) StreamsWithPeer(p peer.Peer) []*Stream {
// make sure we use OUR peers. (the tests mess with you...)
if p2, err := s.peers.Add(p); err == nil {
p = p2
}
func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream {
return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams()))
}
// ConnectionsToPeer returns all the live connections to p
func (s *Swarm) ConnectionsToPeer(p peer.Peer) []*Conn {
// make sure we use OUR peers. (the tests mess with you...)
if p2, err := s.peers.Add(p); err == nil {
p = p2
}
func (s *Swarm) ConnectionsToPeer(p peer.ID) []*Conn {
return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns()))
}
@ -125,13 +104,7 @@ func (s *Swarm) Connections() []*Conn {
}
// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p peer.Peer) error {
// make sure we use OUR peers. (the tests mess with you...)
p, err := s.peers.Add(p)
if err != nil {
return err
}
func (s *Swarm) CloseConnection(p peer.ID) error {
conns := s.swarm.ConnsWithGroup(p) // boom.
for _, c := range conns {
c.Close()
@ -140,11 +113,11 @@ func (s *Swarm) CloseConnection(p peer.Peer) error {
}
// Peers returns a copy of the set of peers swarm is connected to.
func (s *Swarm) Peers() []peer.Peer {
func (s *Swarm) Peers() []peer.ID {
conns := s.Connections()
seen := make(map[peer.Peer]struct{})
peers := make([]peer.Peer, 0, len(conns))
seen := make(map[peer.ID]struct{})
peers := make([]peer.ID, 0, len(conns))
for _, c := range conns {
p := c.RemotePeer()
if _, found := seen[p]; found {
@ -157,6 +130,6 @@ func (s *Swarm) Peers() []peer.Peer {
}
// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.Peer {
func (s *Swarm) LocalPeer() peer.ID {
return s.local
}

View File

@ -41,7 +41,7 @@ func (c *Conn) LocalMultiaddr() ma.Multiaddr {
}
// LocalPeer is the Peer on our side of the connection
func (c *Conn) LocalPeer() peer.Peer {
func (c *Conn) LocalPeer() peer.ID {
return c.RawConn().LocalPeer()
}
@ -51,7 +51,7 @@ func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
}
// RemotePeer is the Peer on the remote side
func (c *Conn) RemotePeer() peer.Peer {
func (c *Conn) RemotePeer() peer.ID {
return c.RawConn().RemotePeer()
}
@ -96,13 +96,6 @@ func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error
return nil, err
}
// removing this for now, as it has to change. we can put this in a different
// sub-protocol anyway.
// // run Handshake3
// if err := runHandshake3(ctx, s, sc); err != nil {
// return nil, err
// }
// ok great! we can use it. add it to our group.
// set the RemotePeer as a group on the conn. this lets us group
@ -113,29 +106,3 @@ func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error
return sc, nil
}
// func runHandshake3(ctx context.Context, s *Swarm, c *Conn) error {
// log.Event(ctx, "newConnection", c.LocalPeer(), c.RemotePeer())
// stream, err := c.NewStream()
// if err != nil {
// return err
// }
// // handshake3 (this whole thing is ugly. maybe lets get rid of it...)
// h3result, err := conn.Handshake3(ctx, stream, c.RawConn())
// if err != nil {
// return fmt.Errorf("Handshake3 failed: %s", err)
// }
// // check for nats. you know, just in case.
// if h3result.LocalObservedAddress != nil {
// checkNATWarning(s, h3result.LocalObservedAddress, c.LocalMultiaddr())
// } else {
// log.Warningf("Received nil observed address from %s", c.RemotePeer())
// }
// stream.Close()
// log.Event(ctx, "handshake3Succeeded", c.LocalPeer(), c.RemotePeer())
// return nil
// }

View File

@ -17,9 +17,9 @@ import (
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.Peer) (*Conn, error) {
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
if p.ID().Equal(s.local.ID()) {
if p == s.local {
return nil, errors.New("Attempted connection to self!")
}
@ -31,28 +31,35 @@ func (s *Swarm) Dial(ctx context.Context, p peer.Peer) (*Conn, error) {
}
}
// check if we don't have the peer in Peerstore
p, err := s.peers.Add(p)
if err != nil {
return nil, err
sk := s.peers.PrivKey(s.local)
if sk == nil {
// may be fine for sk to be nil, just log a warning.
log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
}
remoteAddrs := s.peers.Addresses(p)
if len(remoteAddrs) == 0 {
return nil, errors.New("peer has no addresses")
}
localAddrs := s.peers.Addresses(s.local)
if len(localAddrs) == 0 {
log.Debug("Dialing out with no local addresses.")
}
// open connection to peer
d := &conn.Dialer{
LocalPeer: s.local,
Peerstore: s.peers,
}
if len(p.Addresses()) == 0 {
return nil, errors.New("peer has no addresses")
LocalPeer: s.local,
LocalAddrs: localAddrs,
PrivateKey: sk,
}
// try to connect to one of the peer's known addresses.
// for simplicity, we do this sequentially.
// A future commit will do this asynchronously.
var connC conn.Conn
for _, addr := range p.Addresses() {
connC, err = d.DialAddr(ctx, addr, p)
var err error
for _, addr := range remoteAddrs {
connC, err = d.Dial(ctx, addr, p)
if err == nil {
break
}

View File

@ -6,8 +6,8 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
multierr "github.com/jbenet/go-ipfs/util/multierr"
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
multierr "github.com/jbenet/go-ipfs/util/multierr"
)
// Open listeners for each network the swarm should listen on
@ -35,21 +35,26 @@ func (s *Swarm) listen(addrs []ma.Multiaddr) error {
// Listen for new connections on the given multiaddr
func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
// TODO rethink how this has to work. (jbenet)
//
// resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
// if err != nil {
// return err
// }
// for _, a := range resolved {
// s.peers.AddAddress(s.local, a)
// }
sk := s.peers.PrivKey(s.local)
if sk == nil {
// may be fine for sk to be nil, just log a warning.
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
if err != nil {
return err
}
list, err := conn.Listen(s.cg.Context(), maddr, s.local, s.peers)
if err != nil {
return err
}
// add resolved local addresses to peer
for _, addr := range resolved {
s.local.AddAddress(addr)
}
// AddListener to the peerstream Listener. this will begin accepting connections
// and streams!
_, err = s.swarm.AddListener(list)

View File

@ -7,9 +7,7 @@ import (
"testing"
"time"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
errors "github.com/jbenet/go-ipfs/util/debugerror"
testutil "github.com/jbenet/go-ipfs/util/testutil"
@ -48,79 +46,56 @@ func EchoStreamHandler(stream *Stream) {
}()
}
func setupPeer(t *testing.T, addr string) peer.Peer {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
}
func makeSwarms(ctx context.Context, t *testing.T, num int) ([]*Swarm, []testutil.PeerNetParams) {
swarms := make([]*Swarm, 0, num)
peersnp := make([]testutil.PeerNetParams, 0, num)
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
t.Fatal(err)
}
for i := 0; i < num; i++ {
localnp := testutil.RandPeerNetParams(t)
peersnp = append(peersnp, localnp)
p, err := testutil.NewPeerWithKeyPair(sk, pk)
if err != nil {
t.Fatal(err)
}
p.AddAddress(tcp)
return p
}
func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []peer.Peer) {
swarms := []*Swarm{}
for _, addr := range addrs {
local := setupPeer(t, addr)
peerstore := peer.NewPeerstore()
swarm, err := NewSwarm(ctx, local.Addresses(), local, peerstore)
peerstore.AddAddress(localnp.ID, localnp.Addr)
peerstore.AddPubKey(localnp.ID, localnp.PubKey)
peerstore.AddPrivKey(localnp.ID, localnp.PrivKey)
addrs := peerstore.Addresses(localnp.ID)
swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore)
if err != nil {
t.Fatal(err)
}
swarm.SetStreamHandler(EchoStreamHandler)
swarms = append(swarms, swarm)
}
peers := make([]peer.Peer, len(swarms))
for i, s := range swarms {
peers[i] = s.local
}
return swarms, peers
return swarms, peersnp
}
func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms, peers := makeSwarms(ctx, t, addrs)
swarms, peersnp := makeSwarms(ctx, t, SwarmNum)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.Peer) {
// copy for other peer
cp, err := s.peers.FindOrCreate(dst.ID())
if err != nil {
t.Fatal(err)
}
cp.AddAddress(dst.Addresses()[0])
log.Infof("SWARM TEST: %s dialing %s", s.local, dst)
if _, err := s.Dial(ctx, cp); err != nil {
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// TODO: make a DialAddr func.
s.peers.AddAddress(dst, addr)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
log.Infof("SWARM TEST: %s connected to %s", s.local, dst)
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
for _, s := range swarms {
for _, p := range peers {
if p != s.local { // don't connect to self.
for _, p := range peersnp {
if p.ID != s.local { // don't connect to self.
wg.Add(1)
connect(s, p)
connect(s, p.ID, p.Addr)
}
}
}
@ -138,13 +113,8 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
log.Debugf("-------------------------------------------------------")
_, cancel := context.WithCancel(ctx)
peers, err := s1.peers.All()
if err != nil {
t.Fatal(err)
}
got := map[u.Key]int{}
errChan := make(chan error, MsgNum*len(*peers))
got := map[peer.ID]int{}
errChan := make(chan error, MsgNum*len(peersnp))
streamChan := make(chan *Stream, MsgNum)
// send out "ping" x MsgNum to every peer
@ -152,7 +122,7 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
defer close(streamChan)
var wg sync.WaitGroup
send := func(p peer.Peer) {
send := func(p peer.ID) {
defer wg.Done()
// first, one stream per peer (nice)
@ -173,13 +143,13 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
streamChan <- stream
}
for _, p := range *peers {
if p == s1.local {
for _, p := range peersnp {
if p.ID == s1.local {
continue // dont send to self...
}
wg.Add(1)
go send(p)
go send(p.ID)
}
wg.Wait()
}()
@ -188,7 +158,7 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
go func() {
defer close(errChan)
count := 0
countShouldBe := MsgNum * (len(*peers) - 1)
countShouldBe := MsgNum * (len(peersnp) - 1)
for stream := range streamChan { // one per peer
defer stream.Close()
@ -215,7 +185,7 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
msgCount++
}
got[p.Key()] = msgCount
got[p] = msgCount
count += msgCount
}
@ -232,8 +202,8 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
}
log.Debugf("%s got pongs", s1.local)
if (len(*peers) - 1) != len(got) {
t.Error("got less messages than sent")
if (len(peersnp) - 1) != len(got) {
t.Errorf("got (%d) less messages than sent (%d).", len(got), len(peersnp))
}
for p, n := range got {
@ -254,15 +224,8 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
func TestSwarm(t *testing.T) {
// t.Skip("skipping for another test")
addrs := []string{
"/ip4/127.0.0.1/tcp/10234",
"/ip4/127.0.0.1/tcp/10235",
"/ip4/127.0.0.1/tcp/10236",
"/ip4/127.0.0.1/tcp/10237",
"/ip4/127.0.0.1/tcp/10238",
}
// msgs := 1000
msgs := 100
SubtestSwarm(t, addrs, msgs)
swarms := 5
SubtestSwarm(t, swarms, msgs)
}