From e535b0e5203dce8470d3fdfbfe1ef81f48815bde Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 19 Dec 2014 21:05:18 -0800 Subject: [PATCH] swarm now uses peer.ID --- net/swarm/simul_test.go | 46 ++++++---------- net/swarm/swarm.go | 47 ++++------------- net/swarm/swarm_conn.go | 37 +------------ net/swarm/swarm_dial.go | 35 ++++++++----- net/swarm/swarm_listen.go | 29 ++++++----- net/swarm/swarm_test.go | 107 +++++++++++++------------------------- 6 files changed, 101 insertions(+), 200 deletions(-) diff --git a/net/swarm/simul_test.go b/net/swarm/simul_test.go index 661805ec7..4e2c3feaa 100644 --- a/net/swarm/simul_test.go +++ b/net/swarm/simul_test.go @@ -1,36 +1,29 @@ package swarm import ( - "fmt" "sync" "testing" + "time" peer "github.com/jbenet/go-ipfs/peer" - "github.com/jbenet/go-ipfs/util/testutil" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) func TestSimultOpen(t *testing.T) { // t.Skip("skipping for another test") - addrs := []string{ - "/ip4/127.0.0.1/tcp/1244", - "/ip4/127.0.0.1/tcp/1245", - } - ctx := context.Background() - swarms, _ := makeSwarms(ctx, t, addrs) + swarms, peers := makeSwarms(ctx, t, 2) // connect everyone { var wg sync.WaitGroup - connect := func(s *Swarm, dst peer.ID) { + connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // copy for other peer - cp := testutil.NewPeerWithID(dst.ID()) - cp.AddAddress(dst.Addresses()[0]) - - if _, err := s.Dial(ctx, cp); err != nil { + s.peers.AddAddress(dst, addr) + if _, err := s.Dial(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } wg.Done() @@ -38,8 +31,8 @@ func TestSimultOpen(t *testing.T) { log.Info("Connecting swarms simultaneously.") wg.Add(2) - go connect(swarms[0], swarms[1].local) - go connect(swarms[1], swarms[0].local) + go connect(swarms[0], swarms[1].local, peers[1].Addr) + go connect(swarms[1], swarms[0].local, peers[0].Addr) wg.Wait() } @@ -51,13 +44,7 @@ func TestSimultOpen(t *testing.T) { func TestSimultOpenMany(t *testing.T) { // t.Skip("very very slow") - many := 10 - addrs := []string{} - for i := 2200; i < (2200 + many); i++ { - s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i) - addrs = append(addrs, s) - } - + addrs := 20 SubtestSwarm(t, addrs, 10) } @@ -67,14 +54,13 @@ func TestSimultOpenFewStress(t *testing.T) { } // t.Skip("skipping for another test") - num := 10 - // num := 100 - for i := 0; i < num; i++ { - addrs := []string{ - fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 1900+i), - fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2900+i), - } + msgs := 40 + swarms := 2 + rounds := 10 + // rounds := 100 - SubtestSwarm(t, addrs, 10) + for i := 0; i < rounds; i++ { + SubtestSwarm(t, swarms, msgs) + <-time.After(10 * time.Millisecond) } } diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 2109790a9..baffa5423 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -22,7 +22,7 @@ var log = eventlog.Logger("swarm2") // Uses peerstream.Swarm type Swarm struct { swarm *ps.Swarm - local peer.Peer + local peer.ID peers peer.Peerstore cg ctxgroup.ContextGroup @@ -30,13 +30,7 @@ type Swarm struct { // NewSwarm constructs a Swarm, with a Chan. func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, - local peer.Peer, peers peer.Peerstore) (*Swarm, error) { - - // make sure our own peer is in our peerstore... - local, err := peers.Add(local) - if err != nil { - return nil, err - } + local peer.ID, peers peer.Peerstore) (*Swarm, error) { s := &Swarm{ swarm: ps.NewSwarm(), @@ -80,13 +74,7 @@ func (s *Swarm) SetStreamHandler(handler StreamHandler) { } // NewStreamWithPeer creates a new stream on any available connection to p -func (s *Swarm) NewStreamWithPeer(p peer.Peer) (*Stream, error) { - // make sure we use OUR peers. (the tests mess with you...) - p, err := s.peers.Add(p) - if err != nil { - return nil, err - } - +func (s *Swarm) NewStreamWithPeer(p peer.ID) (*Stream, error) { // if we have no connections, try connecting. if len(s.ConnectionsToPeer(p)) == 0 { log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...") @@ -101,21 +89,12 @@ func (s *Swarm) NewStreamWithPeer(p peer.Peer) (*Stream, error) { } // StreamsWithPeer returns all the live Streams to p -func (s *Swarm) StreamsWithPeer(p peer.Peer) []*Stream { - // make sure we use OUR peers. (the tests mess with you...) - if p2, err := s.peers.Add(p); err == nil { - p = p2 - } - +func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream { return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams())) } // ConnectionsToPeer returns all the live connections to p -func (s *Swarm) ConnectionsToPeer(p peer.Peer) []*Conn { - // make sure we use OUR peers. (the tests mess with you...) - if p2, err := s.peers.Add(p); err == nil { - p = p2 - } +func (s *Swarm) ConnectionsToPeer(p peer.ID) []*Conn { return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns())) } @@ -125,13 +104,7 @@ func (s *Swarm) Connections() []*Conn { } // CloseConnection removes a given peer from swarm + closes the connection -func (s *Swarm) CloseConnection(p peer.Peer) error { - // make sure we use OUR peers. (the tests mess with you...) - p, err := s.peers.Add(p) - if err != nil { - return err - } - +func (s *Swarm) CloseConnection(p peer.ID) error { conns := s.swarm.ConnsWithGroup(p) // boom. for _, c := range conns { c.Close() @@ -140,11 +113,11 @@ func (s *Swarm) CloseConnection(p peer.Peer) error { } // Peers returns a copy of the set of peers swarm is connected to. -func (s *Swarm) Peers() []peer.Peer { +func (s *Swarm) Peers() []peer.ID { conns := s.Connections() - seen := make(map[peer.Peer]struct{}) - peers := make([]peer.Peer, 0, len(conns)) + seen := make(map[peer.ID]struct{}) + peers := make([]peer.ID, 0, len(conns)) for _, c := range conns { p := c.RemotePeer() if _, found := seen[p]; found { @@ -157,6 +130,6 @@ func (s *Swarm) Peers() []peer.Peer { } // LocalPeer returns the local peer swarm is associated to. -func (s *Swarm) LocalPeer() peer.Peer { +func (s *Swarm) LocalPeer() peer.ID { return s.local } diff --git a/net/swarm/swarm_conn.go b/net/swarm/swarm_conn.go index 9fdd06b4a..6cc93312e 100644 --- a/net/swarm/swarm_conn.go +++ b/net/swarm/swarm_conn.go @@ -41,7 +41,7 @@ func (c *Conn) LocalMultiaddr() ma.Multiaddr { } // LocalPeer is the Peer on our side of the connection -func (c *Conn) LocalPeer() peer.Peer { +func (c *Conn) LocalPeer() peer.ID { return c.RawConn().LocalPeer() } @@ -51,7 +51,7 @@ func (c *Conn) RemoteMultiaddr() ma.Multiaddr { } // RemotePeer is the Peer on the remote side -func (c *Conn) RemotePeer() peer.Peer { +func (c *Conn) RemotePeer() peer.ID { return c.RawConn().RemotePeer() } @@ -96,13 +96,6 @@ func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error return nil, err } - // removing this for now, as it has to change. we can put this in a different - // sub-protocol anyway. - // // run Handshake3 - // if err := runHandshake3(ctx, s, sc); err != nil { - // return nil, err - // } - // ok great! we can use it. add it to our group. // set the RemotePeer as a group on the conn. this lets us group @@ -113,29 +106,3 @@ func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error return sc, nil } - -// func runHandshake3(ctx context.Context, s *Swarm, c *Conn) error { -// log.Event(ctx, "newConnection", c.LocalPeer(), c.RemotePeer()) - -// stream, err := c.NewStream() -// if err != nil { -// return err -// } - -// // handshake3 (this whole thing is ugly. maybe lets get rid of it...) -// h3result, err := conn.Handshake3(ctx, stream, c.RawConn()) -// if err != nil { -// return fmt.Errorf("Handshake3 failed: %s", err) -// } - -// // check for nats. you know, just in case. -// if h3result.LocalObservedAddress != nil { -// checkNATWarning(s, h3result.LocalObservedAddress, c.LocalMultiaddr()) -// } else { -// log.Warningf("Received nil observed address from %s", c.RemotePeer()) -// } - -// stream.Close() -// log.Event(ctx, "handshake3Succeeded", c.LocalPeer(), c.RemotePeer()) -// return nil -// } diff --git a/net/swarm/swarm_dial.go b/net/swarm/swarm_dial.go index e71d63b57..03e596e9e 100644 --- a/net/swarm/swarm_dial.go +++ b/net/swarm/swarm_dial.go @@ -17,9 +17,9 @@ import ( // the connection will happen over. Swarm can use whichever it choses. // This allows us to use various transport protocols, do NAT traversal/relay, // etc. to achive connection. -func (s *Swarm) Dial(ctx context.Context, p peer.Peer) (*Conn, error) { +func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) { - if p.ID().Equal(s.local.ID()) { + if p == s.local { return nil, errors.New("Attempted connection to self!") } @@ -31,28 +31,35 @@ func (s *Swarm) Dial(ctx context.Context, p peer.Peer) (*Conn, error) { } } - // check if we don't have the peer in Peerstore - p, err := s.peers.Add(p) - if err != nil { - return nil, err + sk := s.peers.PrivKey(s.local) + if sk == nil { + // may be fine for sk to be nil, just log a warning. + log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.") + } + + remoteAddrs := s.peers.Addresses(p) + if len(remoteAddrs) == 0 { + return nil, errors.New("peer has no addresses") + } + localAddrs := s.peers.Addresses(s.local) + if len(localAddrs) == 0 { + log.Debug("Dialing out with no local addresses.") } // open connection to peer d := &conn.Dialer{ - LocalPeer: s.local, - Peerstore: s.peers, - } - - if len(p.Addresses()) == 0 { - return nil, errors.New("peer has no addresses") + LocalPeer: s.local, + LocalAddrs: localAddrs, + PrivateKey: sk, } // try to connect to one of the peer's known addresses. // for simplicity, we do this sequentially. // A future commit will do this asynchronously. var connC conn.Conn - for _, addr := range p.Addresses() { - connC, err = d.DialAddr(ctx, addr, p) + var err error + for _, addr := range remoteAddrs { + connC, err = d.Dial(ctx, addr, p) if err == nil { break } diff --git a/net/swarm/swarm_listen.go b/net/swarm/swarm_listen.go index 94fd17aa8..c984a9276 100644 --- a/net/swarm/swarm_listen.go +++ b/net/swarm/swarm_listen.go @@ -6,8 +6,8 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - multierr "github.com/jbenet/go-ipfs/util/multierr" ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" + multierr "github.com/jbenet/go-ipfs/util/multierr" ) // Open listeners for each network the swarm should listen on @@ -35,21 +35,26 @@ func (s *Swarm) listen(addrs []ma.Multiaddr) error { // Listen for new connections on the given multiaddr func (s *Swarm) setupListener(maddr ma.Multiaddr) error { - resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr}) + // TODO rethink how this has to work. (jbenet) + // + // resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr}) + // if err != nil { + // return err + // } + // for _, a := range resolved { + // s.peers.AddAddress(s.local, a) + // } + + sk := s.peers.PrivKey(s.local) + if sk == nil { + // may be fine for sk to be nil, just log a warning. + log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") + } + list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk) if err != nil { return err } - list, err := conn.Listen(s.cg.Context(), maddr, s.local, s.peers) - if err != nil { - return err - } - - // add resolved local addresses to peer - for _, addr := range resolved { - s.local.AddAddress(addr) - } - // AddListener to the peerstream Listener. this will begin accepting connections // and streams! _, err = s.swarm.AddListener(list) diff --git a/net/swarm/swarm_test.go b/net/swarm/swarm_test.go index a8e3aa4c4..f13b80c23 100644 --- a/net/swarm/swarm_test.go +++ b/net/swarm/swarm_test.go @@ -7,9 +7,7 @@ import ( "testing" "time" - ci "github.com/jbenet/go-ipfs/crypto" peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" errors "github.com/jbenet/go-ipfs/util/debugerror" testutil "github.com/jbenet/go-ipfs/util/testutil" @@ -48,79 +46,56 @@ func EchoStreamHandler(stream *Stream) { }() } -func setupPeer(t *testing.T, addr string) peer.Peer { - tcp, err := ma.NewMultiaddr(addr) - if err != nil { - t.Fatal(err) - } +func makeSwarms(ctx context.Context, t *testing.T, num int) ([]*Swarm, []testutil.PeerNetParams) { + swarms := make([]*Swarm, 0, num) + peersnp := make([]testutil.PeerNetParams, 0, num) - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) - if err != nil { - t.Fatal(err) - } + for i := 0; i < num; i++ { + localnp := testutil.RandPeerNetParams(t) + peersnp = append(peersnp, localnp) - p, err := testutil.NewPeerWithKeyPair(sk, pk) - if err != nil { - t.Fatal(err) - } - p.AddAddress(tcp) - return p -} - -func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []peer.Peer) { - swarms := []*Swarm{} - - for _, addr := range addrs { - local := setupPeer(t, addr) peerstore := peer.NewPeerstore() - swarm, err := NewSwarm(ctx, local.Addresses(), local, peerstore) + peerstore.AddAddress(localnp.ID, localnp.Addr) + peerstore.AddPubKey(localnp.ID, localnp.PubKey) + peerstore.AddPrivKey(localnp.ID, localnp.PrivKey) + + addrs := peerstore.Addresses(localnp.ID) + swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore) if err != nil { t.Fatal(err) } + swarm.SetStreamHandler(EchoStreamHandler) swarms = append(swarms, swarm) } - peers := make([]peer.Peer, len(swarms)) - for i, s := range swarms { - peers[i] = s.local - } - - return swarms, peers + return swarms, peersnp } -func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { +func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { // t.Skip("skipping for another test") ctx := context.Background() - swarms, peers := makeSwarms(ctx, t, addrs) + swarms, peersnp := makeSwarms(ctx, t, SwarmNum) // connect everyone { var wg sync.WaitGroup - connect := func(s *Swarm, dst peer.Peer) { - // copy for other peer - - cp, err := s.peers.FindOrCreate(dst.ID()) - if err != nil { - t.Fatal(err) - } - cp.AddAddress(dst.Addresses()[0]) - - log.Infof("SWARM TEST: %s dialing %s", s.local, dst) - if _, err := s.Dial(ctx, cp); err != nil { + connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { + // TODO: make a DialAddr func. + s.peers.AddAddress(dst, addr) + if _, err := s.Dial(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } - log.Infof("SWARM TEST: %s connected to %s", s.local, dst) wg.Done() } log.Info("Connecting swarms simultaneously.") for _, s := range swarms { - for _, p := range peers { - if p != s.local { // don't connect to self. + for _, p := range peersnp { + if p.ID != s.local { // don't connect to self. wg.Add(1) - connect(s, p) + connect(s, p.ID, p.Addr) } } } @@ -138,13 +113,8 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { log.Debugf("-------------------------------------------------------") _, cancel := context.WithCancel(ctx) - peers, err := s1.peers.All() - if err != nil { - t.Fatal(err) - } - - got := map[u.Key]int{} - errChan := make(chan error, MsgNum*len(*peers)) + got := map[peer.ID]int{} + errChan := make(chan error, MsgNum*len(peersnp)) streamChan := make(chan *Stream, MsgNum) // send out "ping" x MsgNum to every peer @@ -152,7 +122,7 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { defer close(streamChan) var wg sync.WaitGroup - send := func(p peer.Peer) { + send := func(p peer.ID) { defer wg.Done() // first, one stream per peer (nice) @@ -173,13 +143,13 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { streamChan <- stream } - for _, p := range *peers { - if p == s1.local { + for _, p := range peersnp { + if p.ID == s1.local { continue // dont send to self... } wg.Add(1) - go send(p) + go send(p.ID) } wg.Wait() }() @@ -188,7 +158,7 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { go func() { defer close(errChan) count := 0 - countShouldBe := MsgNum * (len(*peers) - 1) + countShouldBe := MsgNum * (len(peersnp) - 1) for stream := range streamChan { // one per peer defer stream.Close() @@ -215,7 +185,7 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { msgCount++ } - got[p.Key()] = msgCount + got[p] = msgCount count += msgCount } @@ -232,8 +202,8 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { } log.Debugf("%s got pongs", s1.local) - if (len(*peers) - 1) != len(got) { - t.Error("got less messages than sent") + if (len(peersnp) - 1) != len(got) { + t.Errorf("got (%d) less messages than sent (%d).", len(got), len(peersnp)) } for p, n := range got { @@ -254,15 +224,8 @@ func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) { func TestSwarm(t *testing.T) { // t.Skip("skipping for another test") - addrs := []string{ - "/ip4/127.0.0.1/tcp/10234", - "/ip4/127.0.0.1/tcp/10235", - "/ip4/127.0.0.1/tcp/10236", - "/ip4/127.0.0.1/tcp/10237", - "/ip4/127.0.0.1/tcp/10238", - } - // msgs := 1000 msgs := 100 - SubtestSwarm(t, addrs, msgs) + swarms := 5 + SubtestSwarm(t, swarms, msgs) }