mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-03 23:38:07 +08:00
This commit finally makes use of the sent observed addrs. If the connection's local address is from one of our listen addrs, then the remote's observed addr is its natted mapping, which is useful to us. For now, we add it directly to our address book. (a future commit should make addressbook addresses expire)
254 lines
6.9 KiB
Go
254 lines
6.9 KiB
Go
package identify
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
|
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
|
semver "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
|
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
|
|
|
host "github.com/jbenet/go-ipfs/p2p/host"
|
|
inet "github.com/jbenet/go-ipfs/p2p/net"
|
|
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
|
|
pb "github.com/jbenet/go-ipfs/p2p/protocol/identify/pb"
|
|
config "github.com/jbenet/go-ipfs/repo/config"
|
|
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
|
|
)
|
|
|
|
var log = eventlog.Logger("net/identify")
|
|
|
|
// ID is the protocol.ID of the Identify Service.
|
|
const ID protocol.ID = "/ipfs/identify"
|
|
|
|
// IpfsVersion holds the current protocol version for a client running this code
|
|
var IpfsVersion *semver.Version
|
|
var ClientVersion = "go-ipfs/" + config.CurrentVersionNumber
|
|
|
|
func init() {
|
|
var err error
|
|
IpfsVersion, err = semver.NewVersion("0.0.1")
|
|
if err != nil {
|
|
panic(fmt.Errorf("invalid protocol version: %v", err))
|
|
}
|
|
}
|
|
|
|
// IDService is a structure that implements ProtocolIdentify.
|
|
// It is a trivial service that gives the other peer some
|
|
// useful information about the local peer. A sort of hello.
|
|
//
|
|
// The IDService sends:
|
|
// * Our IPFS Protocol Version
|
|
// * Our IPFS Agent Version
|
|
// * Our public Listen Addresses
|
|
type IDService struct {
|
|
Host host.Host
|
|
|
|
// connections undergoing identification
|
|
// for wait purposes
|
|
currid map[inet.Conn]chan struct{}
|
|
currmu sync.RWMutex
|
|
}
|
|
|
|
func NewIDService(h host.Host) *IDService {
|
|
s := &IDService{
|
|
Host: h,
|
|
currid: make(map[inet.Conn]chan struct{}),
|
|
}
|
|
h.SetStreamHandler(ID, s.RequestHandler)
|
|
return s
|
|
}
|
|
|
|
func (ids *IDService) IdentifyConn(c inet.Conn) {
|
|
ids.currmu.Lock()
|
|
if wait, found := ids.currid[c]; found {
|
|
ids.currmu.Unlock()
|
|
log.Debugf("IdentifyConn called twice on: %s", c)
|
|
<-wait // already identifying it. wait for it.
|
|
return
|
|
}
|
|
ids.currid[c] = make(chan struct{})
|
|
ids.currmu.Unlock()
|
|
|
|
s, err := c.NewStream()
|
|
if err != nil {
|
|
log.Error("error opening initial stream for %s", ID)
|
|
log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer())
|
|
} else {
|
|
|
|
// ok give the response to our handler.
|
|
if err := protocol.WriteHeader(s, ID); err != nil {
|
|
log.Error("error writing stream header for %s", ID)
|
|
log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer())
|
|
}
|
|
ids.ResponseHandler(s)
|
|
}
|
|
|
|
ids.currmu.Lock()
|
|
ch, found := ids.currid[c]
|
|
delete(ids.currid, c)
|
|
ids.currmu.Unlock()
|
|
|
|
if !found {
|
|
log.Errorf("IdentifyConn failed to find channel (programmer error) for %s", c)
|
|
return
|
|
}
|
|
|
|
close(ch) // release everyone waiting.
|
|
}
|
|
|
|
func (ids *IDService) RequestHandler(s inet.Stream) {
|
|
defer s.Close()
|
|
c := s.Conn()
|
|
|
|
w := ggio.NewDelimitedWriter(s)
|
|
mes := pb.Identify{}
|
|
ids.populateMessage(&mes, s.Conn())
|
|
w.WriteMsg(&mes)
|
|
|
|
log.Debugf("%s sent message to %s %s", ID,
|
|
c.RemotePeer(), c.RemoteMultiaddr())
|
|
}
|
|
|
|
func (ids *IDService) ResponseHandler(s inet.Stream) {
|
|
defer s.Close()
|
|
c := s.Conn()
|
|
|
|
r := ggio.NewDelimitedReader(s, 2048)
|
|
mes := pb.Identify{}
|
|
if err := r.ReadMsg(&mes); err != nil {
|
|
log.Errorf("%s error receiving message from %s %s", ID,
|
|
c.RemotePeer(), c.RemoteMultiaddr())
|
|
return
|
|
}
|
|
ids.consumeMessage(&mes, c)
|
|
|
|
log.Debugf("%s received message from %s %s", ID,
|
|
c.RemotePeer(), c.RemoteMultiaddr())
|
|
}
|
|
|
|
func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) {
|
|
|
|
// set protocols this node is currently handling
|
|
protos := ids.Host.Mux().Protocols()
|
|
mes.Protocols = make([]string, len(protos))
|
|
for i, p := range protos {
|
|
mes.Protocols[i] = string(p)
|
|
}
|
|
|
|
// observed address so other side is informed of their
|
|
// "public" address, at least in relation to us.
|
|
mes.ObservedAddr = c.RemoteMultiaddr().Bytes()
|
|
|
|
// set listen addrs
|
|
laddrs, err := ids.Host.Network().InterfaceListenAddresses()
|
|
if err != nil {
|
|
log.Error(err)
|
|
} else {
|
|
mes.ListenAddrs = make([][]byte, len(laddrs))
|
|
for i, addr := range laddrs {
|
|
mes.ListenAddrs[i] = addr.Bytes()
|
|
}
|
|
log.Debugf("%s sent listen addrs to %s: %s", c.LocalPeer(), c.RemotePeer(), laddrs)
|
|
}
|
|
|
|
// set protocol versions
|
|
s := IpfsVersion.String()
|
|
mes.ProtocolVersion = &s
|
|
mes.AgentVersion = &ClientVersion
|
|
}
|
|
|
|
func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) {
|
|
p := c.RemotePeer()
|
|
|
|
// mes.Protocols
|
|
|
|
// mes.ObservedAddr
|
|
ids.consumeObservedAddress(mes.GetObservedAddr(), c)
|
|
|
|
// mes.ListenAddrs
|
|
laddrs := mes.GetListenAddrs()
|
|
lmaddrs := make([]ma.Multiaddr, 0, len(laddrs))
|
|
for _, addr := range laddrs {
|
|
maddr, err := ma.NewMultiaddrBytes(addr)
|
|
if err != nil {
|
|
log.Errorf("%s failed to parse multiaddr from %s %s", ID,
|
|
p, c.RemoteMultiaddr())
|
|
continue
|
|
}
|
|
lmaddrs = append(lmaddrs, maddr)
|
|
}
|
|
|
|
// update our peerstore with the addresses.
|
|
ids.Host.Peerstore().AddAddresses(p, lmaddrs)
|
|
log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs)
|
|
|
|
// get protocol versions
|
|
pv := *mes.ProtocolVersion
|
|
av := *mes.AgentVersion
|
|
ids.Host.Peerstore().Put(p, "ProtocolVersion", pv)
|
|
ids.Host.Peerstore().Put(p, "AgentVersion", av)
|
|
}
|
|
|
|
// IdentifyWait returns a channel which will be closed once
|
|
// "ProtocolIdentify" (handshake3) finishes on given conn.
|
|
// This happens async so the connection can start to be used
|
|
// even if handshake3 knowledge is not necesary.
|
|
// Users **MUST** call IdentifyWait _after_ IdentifyConn
|
|
func (ids *IDService) IdentifyWait(c inet.Conn) <-chan struct{} {
|
|
ids.currmu.Lock()
|
|
ch, found := ids.currid[c]
|
|
ids.currmu.Unlock()
|
|
if found {
|
|
return ch
|
|
}
|
|
|
|
// if not found, it means we are already done identifying it, or
|
|
// haven't even started. either way, return a new channel closed.
|
|
ch = make(chan struct{})
|
|
close(ch)
|
|
return ch
|
|
}
|
|
|
|
func (ids *IDService) consumeObservedAddress(observed []byte, c inet.Conn) {
|
|
if observed == nil {
|
|
return
|
|
}
|
|
|
|
maddr, err := ma.NewMultiaddrBytes(observed)
|
|
if err != nil {
|
|
log.Debugf("error parsing received observed addr for %s: %s", c, err)
|
|
return
|
|
}
|
|
|
|
// we should only use ObservedAddr when our connection's LocalAddr is one
|
|
// of our ListenAddrs. If we Dial out using an ephemeral addr, knowing that
|
|
// address's external mapping is not very useful because the port will not be
|
|
// the same as the listen addr.
|
|
ifaceaddrs, err := ids.Host.Network().InterfaceListenAddresses()
|
|
if err != nil {
|
|
log.Infof("failed to get interface listen addrs", err)
|
|
return
|
|
}
|
|
|
|
log.Debugf("identify identifying observed multiaddr: %s %s", c.LocalMultiaddr(), ifaceaddrs)
|
|
if !addrInAddrs(c.LocalMultiaddr(), ifaceaddrs) {
|
|
// not in our list
|
|
return
|
|
}
|
|
|
|
// ok! we have the observed version of one of our ListenAddresses!
|
|
log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr)
|
|
ids.Host.Peerstore().AddAddress(ids.Host.ID(), maddr)
|
|
}
|
|
|
|
func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {
|
|
for _, b := range as {
|
|
if a.Equal(b) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|