mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-22 02:47:48 +08:00
modify use of swarm to not make duplicate connections
This commit is contained in:
parent
f09dba772c
commit
b8a6fbbf7d
@ -14,7 +14,7 @@ func Handshake(self, remote *peer.Peer, in, out chan []byte) error {
|
||||
out <- self.ID
|
||||
resp := <-in
|
||||
remote.ID = peer.ID(resp)
|
||||
u.DOut("identify: Got node id: %s", remote.ID.Pretty())
|
||||
u.DOut("[%s] identify: Got node id: %s", self.ID.Pretty(), remote.ID.Pretty())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -28,6 +28,8 @@ func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer {
|
||||
return pbp
|
||||
}
|
||||
|
||||
// TODO: building the protobuf message this way is a little wasteful
|
||||
// Unused fields wont be omitted, find a better way to do this
|
||||
func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
|
||||
pmes := new(PBDHTMessage)
|
||||
if m.Value != nil {
|
||||
|
||||
@ -93,7 +93,7 @@ func (dht *IpfsDHT) Start() {
|
||||
func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
maddrstr, _ := addr.String()
|
||||
u.DOut("Connect to new peer: %s", maddrstr)
|
||||
npeer, err := dht.network.Connect(addr)
|
||||
npeer, err := dht.network.ConnectNew(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -527,7 +527,7 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
|
||||
continue
|
||||
}
|
||||
|
||||
p, err = dht.network.Connect(maddr)
|
||||
p, err = dht.network.GetConnection(peer.ID(pinfo.GetId()), maddr)
|
||||
if err != nil {
|
||||
u.PErr("getValue error: %s", err)
|
||||
continue
|
||||
|
||||
@ -3,6 +3,7 @@ package dht
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
@ -89,10 +90,10 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
panic("not yet implemented")
|
||||
}
|
||||
|
||||
p, err = s.network.Connect(maddr)
|
||||
p, err = s.network.GetConnection(peer.ID(closers[0].GetId()), maddr)
|
||||
if err != nil {
|
||||
// Move up route level
|
||||
panic("not yet implemented.")
|
||||
u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closers[0].GetAddr())
|
||||
route_level++
|
||||
}
|
||||
} else {
|
||||
route_level++
|
||||
@ -160,12 +161,13 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
|
||||
for _, prov := range pmes_out.GetPeers() {
|
||||
p := s.network.Find(u.Key(prov.GetId()))
|
||||
if p == nil {
|
||||
u.DOut("given provider %s was not in our network already.", peer.ID(prov.GetId()).Pretty())
|
||||
maddr, err := ma.NewMultiaddr(prov.GetAddr())
|
||||
if err != nil {
|
||||
u.PErr("error connecting to new peer: %s", err)
|
||||
continue
|
||||
}
|
||||
p, err = s.network.Connect(maddr)
|
||||
p, err = s.network.GetConnection(peer.ID(prov.GetId()), maddr)
|
||||
if err != nil {
|
||||
u.PErr("error connecting to new peer: %s", err)
|
||||
continue
|
||||
@ -183,11 +185,20 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
|
||||
|
||||
// FindPeer searches for a peer with given ID.
|
||||
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
|
||||
// Check if were already connected to them
|
||||
p, _ := s.Find(id)
|
||||
if p != nil {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
route_level := 0
|
||||
p := s.routes[route_level].NearestPeer(kb.ConvertPeerID(id))
|
||||
p = s.routes[route_level].NearestPeer(kb.ConvertPeerID(id))
|
||||
if p == nil {
|
||||
return nil, kb.ErrLookupFailure
|
||||
}
|
||||
if p.ID.Equal(id) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
for route_level < len(s.routes) {
|
||||
pmes, err := s.findPeerSingle(p, id, timeout, route_level)
|
||||
@ -202,11 +213,14 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
|
||||
return nil, u.WrapError(err, "FindPeer received bad info")
|
||||
}
|
||||
|
||||
nxtPeer, err := s.network.Connect(addr)
|
||||
nxtPeer, err := s.network.GetConnection(peer.ID(found.GetId()), addr)
|
||||
if err != nil {
|
||||
return nil, u.WrapError(err, "FindPeer failed to connect to new peer.")
|
||||
}
|
||||
if pmes.GetSuccess() {
|
||||
if !id.Equal(nxtPeer.ID) {
|
||||
return nil, errors.New("got back invalid peer from 'successful' response")
|
||||
}
|
||||
return nxtPeer, nil
|
||||
} else {
|
||||
p = nxtPeer
|
||||
|
||||
@ -2,11 +2,12 @@ package swarm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
msgio "github.com/jbenet/go-msgio"
|
||||
ma "github.com/jbenet/go-multiaddr"
|
||||
"net"
|
||||
)
|
||||
|
||||
// ChanBuffer is the size of the buffer in the Conn Chan
|
||||
|
||||
@ -12,7 +12,8 @@ type Network interface {
|
||||
Error(error)
|
||||
Find(u.Key) *peer.Peer
|
||||
Listen() error
|
||||
Connect(*ma.Multiaddr) (*peer.Peer, error)
|
||||
ConnectNew(*ma.Multiaddr) (*peer.Peer, error)
|
||||
GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error)
|
||||
GetChan() *Chan
|
||||
Close()
|
||||
Drop(*peer.Peer) error
|
||||
|
||||
@ -203,7 +203,7 @@ func (s *Swarm) Close() {
|
||||
// etc. to achive connection.
|
||||
//
|
||||
// For now, Dial uses only TCP. This will be extended.
|
||||
func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
|
||||
func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error, bool) {
|
||||
k := peer.Key()
|
||||
|
||||
// check if we already have an open connection first
|
||||
@ -211,17 +211,16 @@ func (s *Swarm) Dial(peer *peer.Peer) (*Conn, error) {
|
||||
conn, found := s.conns[k]
|
||||
s.connsLock.RUnlock()
|
||||
if found {
|
||||
return conn, nil
|
||||
return conn, nil, true
|
||||
}
|
||||
|
||||
// open connection to peer
|
||||
conn, err := Dial("tcp", peer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, err, false
|
||||
}
|
||||
|
||||
s.StartConn(conn)
|
||||
return conn, nil
|
||||
return conn, nil, false
|
||||
}
|
||||
|
||||
func (s *Swarm) StartConn(conn *Conn) error {
|
||||
@ -309,7 +308,50 @@ func (s *Swarm) Find(key u.Key) *peer.Peer {
|
||||
return conn.Peer
|
||||
}
|
||||
|
||||
func (s *Swarm) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
// GetConnection will check if we are already connected to the peer in question
|
||||
// and only open a new connection if we arent already
|
||||
func (s *Swarm) GetConnection(id peer.ID, addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
p := &peer.Peer{
|
||||
ID: id,
|
||||
Addresses: []*ma.Multiaddr{addr},
|
||||
}
|
||||
|
||||
conn, err, reused := s.Dial(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if reused {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
err = s.handleDialedCon(conn)
|
||||
return conn.Peer, err
|
||||
}
|
||||
|
||||
func (s *Swarm) handleDialedCon(conn *Conn) error {
|
||||
err := ident.Handshake(s.local, conn.Peer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send node an address that you can be reached on
|
||||
myaddr := s.local.NetAddress("tcp")
|
||||
mastr, err := myaddr.String()
|
||||
if err != nil {
|
||||
errors.New("No local address to send to peer.")
|
||||
}
|
||||
|
||||
conn.Outgoing.MsgChan <- []byte(mastr)
|
||||
|
||||
s.StartConn(conn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConnectNew is for connecting to a peer when you dont know their ID,
|
||||
// Should only be used when you are sure that you arent already connected to peer in question
|
||||
func (s *Swarm) ConnectNew(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
if addr == nil {
|
||||
return nil, errors.New("nil Multiaddr passed to swarm.Connect()")
|
||||
}
|
||||
@ -321,23 +363,8 @@ func (s *Swarm) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = ident.Handshake(s.local, npeer, conn.Incoming.MsgChan, conn.Outgoing.MsgChan)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Send node an address that you can be reached on
|
||||
myaddr := s.local.NetAddress("tcp")
|
||||
mastr, err := myaddr.String()
|
||||
if err != nil {
|
||||
return nil, errors.New("No local address to send to peer.")
|
||||
}
|
||||
|
||||
conn.Outgoing.MsgChan <- []byte(mastr)
|
||||
|
||||
s.StartConn(conn)
|
||||
|
||||
return npeer, nil
|
||||
err = s.handleDialedCon(conn)
|
||||
return npeer, err
|
||||
}
|
||||
|
||||
// Removes a given peer from the swarm and closes connections to it
|
||||
|
||||
@ -2,11 +2,12 @@ package swarm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
msgio "github.com/jbenet/go-msgio"
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func pingListen(listener *net.TCPListener, peer *peer.Peer) {
|
||||
@ -71,11 +72,12 @@ func TestSwarm(t *testing.T) {
|
||||
}
|
||||
go pingListen(listener.(*net.TCPListener), peer)
|
||||
|
||||
_, err = swarm.Dial(peer)
|
||||
conn, err, _ := swarm.Dial(peer)
|
||||
if err != nil {
|
||||
t.Fatal("error swarm dialing to peer", err)
|
||||
}
|
||||
|
||||
swarm.StartConn(conn)
|
||||
// ok done, add it.
|
||||
peers = append(peers, peer)
|
||||
listeners = append(listeners, listener)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user