handshake3 (addrs)

This commit is contained in:
Juan Batiz-Benet 2014-10-22 05:25:31 -07:00
parent 97c66ddc8f
commit 701035d5b0
5 changed files with 129 additions and 6 deletions

View File

@ -79,9 +79,9 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
// version handshake
ctxT, _ := context.WithTimeout(ctx, HandshakeTimeout)
if err := VersionHandshake(ctxT, conn); err != nil {
if err := Handshake1(ctxT, conn); err != nil {
conn.Close()
return nil, fmt.Errorf("Version handshake: %s", err)
return nil, fmt.Errorf("Handshake1 failed: %s", err)
}
return conn, nil

View File

@ -11,9 +11,9 @@ import (
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
)
// VersionHandshake exchanges local and remote versions and compares them
// Handshake1 exchanges local and remote versions and compares them
// closes remote and returns an error in case of major difference
func VersionHandshake(ctx context.Context, c Conn) error {
func Handshake1(ctx context.Context, c Conn) error {
rpeer := c.RemotePeer()
lpeer := c.LocalPeer()
@ -57,3 +57,47 @@ func VersionHandshake(ctx context.Context, c Conn) error {
log.Debug("%s version handshake compatible %s", lpeer, rpeer)
return nil
}
// Handshake3 exchanges local and remote service information
func Handshake3(ctx context.Context, c Conn) error {
rpeer := c.RemotePeer()
lpeer := c.LocalPeer()
var remoteH, localH *hspb.Handshake3
localH = handshake.Handshake3Msg(lpeer)
localB, err := proto.Marshal(localH)
if err != nil {
return err
}
c.Out() <- localB
log.Debug("Handshake1: sent to %s", rpeer)
select {
case <-ctx.Done():
return ctx.Err()
case <-c.Closing():
return errors.New("Handshake3: error remote connection closed")
case remoteB, ok := <-c.In():
if !ok {
return fmt.Errorf("Handshake3 error receiving from conn: %v", rpeer)
}
remoteH = new(hspb.Handshake3)
err = proto.Unmarshal(remoteB, remoteH)
if err != nil {
return fmt.Errorf("Handshake3 could not decode remote msg: %q", err)
}
log.Debug("Handshake3 received from %s", rpeer)
}
if err := handshake.Handshake3UpdatePeer(rpeer, remoteH); err != nil {
log.Error("Handshake3 failed to update %s", rpeer)
return err
}
return nil
}

View File

@ -0,0 +1,54 @@
package handshake
import (
"fmt"
pb "github.com/jbenet/go-ipfs/net/handshake/pb"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
var log = u.Logger("handshake")
// Handshake3Msg constructs a Handshake3 msg.
func Handshake3Msg(localPeer peer.Peer) *pb.Handshake3 {
var msg pb.Handshake3
// don't need publicKey after secure channel.
// msg.PublicKey = localPeer.PubKey().Bytes()
// addresses
addrs := localPeer.Addresses()
msg.ListenAddrs = make([][]byte, len(addrs))
for i, a := range addrs {
msg.ListenAddrs[i] = a.Bytes()
}
// services
// srv := localPeer.Services()
// msg.Services = make([]mux.ProtocolID, len(srv))
// for i, pid := range srv {
// msg.Services[i] = pid
// }
return &msg
}
// Handshake3UpdatePeer updates a remote peer with the information in the
// handshake3 msg we received from them.
func Handshake3UpdatePeer(remotePeer peer.Peer, msg *pb.Handshake3) error {
// addresses
for _, a := range msg.GetListenAddrs() {
addr, err := ma.NewMultiaddrBytes(a)
if err != nil {
err = fmt.Errorf("remote peer address not a multiaddr: %s", err)
log.Error("Handshake3: error %s", err)
return err
}
remotePeer.AddAddress(addr)
}
return nil
}

View File

@ -7,6 +7,7 @@ import (
conn "github.com/jbenet/go-ipfs/net/conn"
msg "github.com/jbenet/go-ipfs/net/message"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
@ -99,6 +100,13 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
// addresses should be figured out through the DHT.
// c.Remote.AddAddress(c.Conn.RemoteMultiaddr())
// handshake3
ctxT, _ := context.WithTimeout(c.Context(), conn.HandshakeTimeout)
if err := conn.Handshake3(ctxT, c); err != nil {
c.Close()
return nil, fmt.Errorf("Handshake3 failed: %s", err)
}
// add to conns
s.connsLock.Lock()

View File

@ -1,6 +1,7 @@
package peer
import (
"bytes"
"errors"
"fmt"
"sync"
@ -9,10 +10,9 @@ import (
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
ic "github.com/jbenet/go-ipfs/crypto"
u "github.com/jbenet/go-ipfs/util"
"bytes"
)
var log = u.Logger("peer")
@ -71,6 +71,10 @@ type Peer interface {
// NetAddress returns the first Multiaddr found for a given network.
NetAddress(n string) ma.Multiaddr
// Services returns the peer's services
// Services() []mux.ProtocolID
// SetServices([]mux.ProtocolID)
// Priv/PubKey returns the peer's Private Key
PrivKey() ic.PrivKey
PubKey() ic.PubKey
@ -92,6 +96,7 @@ type Peer interface {
type peer struct {
id ID
addresses []ma.Multiaddr
// services []mux.ProtocolID
privKey ic.PrivKey
pubKey ic.PubKey
@ -163,6 +168,18 @@ func (p *peer) NetAddress(n string) ma.Multiaddr {
return nil
}
// func (p *peer) Services() []mux.ProtocolID {
// p.RLock()
// defer p.RUnlock()
// return p.services
// }
//
// func (p *peer) SetServices(s []mux.ProtocolID) {
// p.Lock()
// defer p.Unlock()
// p.services = s
// }
// GetLatency retrieves the current latency measurement.
func (p *peer) GetLatency() (out time.Duration) {
p.RLock()