From b8a6fbbf7d46c8ef9b26dafea712ac0cc0462541 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 12 Aug 2014 15:37:26 -0700 Subject: [PATCH] modify use of swarm to not make duplicate connections --- identify/identify.go | 2 +- routing/dht/DHTMessage.go | 2 ++ routing/dht/dht.go | 4 +-- routing/dht/routing.go | 26 ++++++++++---- swarm/conn.go | 3 +- swarm/interface.go | 3 +- swarm/swarm.go | 73 +++++++++++++++++++++++++++------------ swarm/swarm_test.go | 8 +++-- 8 files changed, 84 insertions(+), 37 deletions(-) diff --git a/identify/identify.go b/identify/identify.go index b6c67f2c5..20ad21c9a 100644 --- a/identify/identify.go +++ b/identify/identify.go @@ -14,7 +14,7 @@ func Handshake(self, remote *peer.Peer, in, out chan []byte) error { out <- self.ID resp := <-in remote.ID = peer.ID(resp) - u.DOut("identify: Got node id: %s", remote.ID.Pretty()) + u.DOut("[%s] identify: Got node id: %s", self.ID.Pretty(), remote.ID.Pretty()) return nil } diff --git a/routing/dht/DHTMessage.go b/routing/dht/DHTMessage.go index 701f36687..e2034d7e0 100644 --- a/routing/dht/DHTMessage.go +++ b/routing/dht/DHTMessage.go @@ -28,6 +28,8 @@ func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer { return pbp } +// TODO: building the protobuf message this way is a little wasteful +// Unused fields wont be omitted, find a better way to do this func (m *DHTMessage) ToProtobuf() *PBDHTMessage { pmes := new(PBDHTMessage) if m.Value != nil { diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 018c22a01..c9b32f90b 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -93,7 +93,7 @@ func (dht *IpfsDHT) Start() { func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { maddrstr, _ := addr.String() u.DOut("Connect to new peer: %s", maddrstr) - npeer, err := dht.network.Connect(addr) + npeer, err := dht.network.ConnectNew(addr) if err != nil { return nil, err } @@ -527,7 +527,7 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration, continue } - p, err = dht.network.Connect(maddr) + p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr) if err != nil { u.PErr("getValue error: %s", err) continue diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 71e950102..045b17f41 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -3,6 +3,7 @@ package dht import ( "bytes" "encoding/json" + "errors" "math/rand" "time" @@ -89,10 +90,10 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { panic("not yet implemented") } - p, err = s.network.Connect(maddr) + p, err = s.network.GetConnection(peer.ID(closers[0].GetId()), maddr) if err != nil { - // Move up route level - panic("not yet implemented.") + u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closers[0].GetAddr()) + route_level++ } } else { route_level++ @@ -160,12 +161,13 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, for _, prov := range pmes_out.GetPeers() { p := s.network.Find(u.Key(prov.GetId())) if p == nil { + u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty()) maddr, err := ma.NewMultiaddr(prov.GetAddr()) if err != nil { u.PErr("error connecting to new peer: %s", err) continue } - p, err = s.network.Connect(maddr) + p, err = s.network.GetConnection(peer.ID(prov.GetId()), maddr) if err != nil { u.PErr("error connecting to new peer: %s", err) continue @@ -183,11 +185,20 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, // FindPeer searches for a peer with given ID. func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { + // Check if were already connected to them + p, _ := s.Find(id) + if p != nil { + return p, nil + } + route_level := 0 - p := s.routes[route_level].NearestPeer(kb.ConvertPeerID(id)) + p = s.routes[route_level].NearestPeer(kb.ConvertPeerID(id)) if p == nil { return nil, kb.ErrLookupFailure } + if p.ID.Equal(id) { + return p, nil + } for route_level < len(s.routes) { pmes, err := s.findPeerSingle(p, id, timeout, route_level) @@ -202,11 +213,14 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error return nil, u.WrapError(err, "FindPeer received bad info") } - nxtPeer, err := s.network.Connect(addr) + nxtPeer, err := s.network.GetConnection(peer.ID(found.GetId()), addr) if err != nil { return nil, u.WrapError(err, "FindPeer failed to connect to new peer.") } if pmes.GetSuccess() { + if !id.Equal(nxtPeer.ID) { + return nil, errors.New("got back invalid peer from 'successful' response") + } return nxtPeer, nil } else { p = nxtPeer diff --git a/swarm/conn.go b/swarm/conn.go index 56e8eea17..072b53437 100644 --- a/swarm/conn.go +++ b/swarm/conn.go @@ -2,11 +2,12 @@ package swarm import ( "fmt" + "net" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" msgio "github.com/jbenet/go-msgio" ma "github.com/jbenet/go-multiaddr" - "net" ) // ChanBuffer is the size of the buffer in the Conn Chan diff --git a/swarm/interface.go b/swarm/interface.go index 88248837c..9a70890e6 100644 --- a/swarm/interface.go +++ b/swarm/interface.go @@ -12,7 +12,8 @@ type Network interface { Error(error) Find(u.Key) *peer.Peer Listen() error - Connect(*ma.Multiaddr) (*peer.Peer, error) + ConnectNew(*ma.Multiaddr) (*peer.Peer, error) + GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) GetChan() *Chan Close() Drop(*peer.Peer) error diff --git a/swarm/swarm.go b/swarm/swarm.go index 5a75ea723..12e82a6ec 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -203,7 +203,7 @@ func (s *Swarm) Close() { // etc. to achive connection. // // For now, Dial uses only TCP. This will be extended. -func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { +func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) { k := peer.Key() // check if we already have an open connection first @@ -211,17 +211,16 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) { conn, found := s.conns[k] s.connsLock.RUnlock() if found { - return conn, nil + return conn, nil, true } // open connection to peer conn, err := Dial("tcp", peer) if err != nil { - return nil, err + return nil, err, false } - s.StartConn(conn) - return conn, nil + return conn, nil, false } func (s *Swarm) StartConn(conn *Conn) error { @@ -309,7 +308,50 @@ func (s *Swarm) Find(key u.Key) *peer.Peer { return conn.Peer } -func (s *Swarm) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { +// GetConnection will check if we are already connected to the peer in question +// and only open a new connection if we arent already +func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) { + p := &peer.Peer{ + ID: id, + Addresses: []*ma.Multiaddr{addr}, + } + + conn, err, reused := s.Dial(p) + if err != nil { + return nil, err + } + + if reused { + return p, nil + } + + err = s.handleDialedCon(conn) + return conn.Peer, err +} + +func (s *Swarm) handleDialedCon(conn *Conn) error { + err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) + if err != nil { + return err + } + + // Send node an address that you can be reached on + myaddr := s.local.NetAddress("tcp") + mastr, err := myaddr.String() + if err != nil { + errors.New("No local address to send to peer.") + } + + conn.Outgoing.MsgChan <- []byte(mastr) + + s.StartConn(conn) + + return nil +} + +// ConnectNew is for connecting to a peer when you dont know their ID, +// Should only be used when you are sure that you arent already connected to peer in question +func (s *Swarm) ConnectNew(addr *ma.Multiaddr) (*peer.Peer, error) { if addr == nil { return nil, errors.New("nil Multiaddr passed to swarm.Connect()") } @@ -321,23 +363,8 @@ func (s *Swarm) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { return nil, err } - err = ident.Handshake(s.local, npeer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan) - if err != nil { - return nil, err - } - - // Send node an address that you can be reached on - myaddr := s.local.NetAddress("tcp") - mastr, err := myaddr.String() - if err != nil { - return nil, errors.New("No local address to send to peer.") - } - - conn.Outgoing.MsgChan <- []byte(mastr) - - s.StartConn(conn) - - return npeer, nil + err = s.handleDialedCon(conn) + return npeer, err } // Removes a given peer from the swarm and closes connections to it diff --git a/swarm/swarm_test.go b/swarm/swarm_test.go index b23919ecc..609288c38 100644 --- a/swarm/swarm_test.go +++ b/swarm/swarm_test.go @@ -2,11 +2,12 @@ package swarm import ( "fmt" + "net" + "testing" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" msgio "github.com/jbenet/go-msgio" - "net" - "testing" ) func pingListen(listener *net.TCPListener, peer *peer.Peer) { @@ -71,11 +72,12 @@ func TestSwarm(t *testing.T) { } go pingListen(listener.(*net.TCPListener), peer) - _, err = swarm.Dial(peer) + conn, err, _ := swarm.Dial(peer) if err != nil { t.Fatal("error swarm dialing to peer", err) } + swarm.StartConn(conn) // ok done, add it. peers = append(peers, peer) listeners = append(listeners, listener)