diff --git a/core/bootstrap.go b/core/bootstrap.go index b1d1f2587..116226028 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -190,7 +190,7 @@ func bootstrapConnect(ctx context.Context, defer log.EventBegin(ctx, "bootstrapDial", route.LocalPeer(), p.ID).Done() log.Debugf("%s bootstrapping to %s", route.LocalPeer(), p.ID) - ps.AddAddresses(p.ID, p.Addrs) + ps.AddAddrs(p.ID, p.Addrs, peer.PermanentAddrTTL) err := route.Connect(ctx, p.ID) if err != nil { log.Event(ctx, "bootstrapDialFailed", p.ID) diff --git a/core/commands/id.go b/core/commands/id.go index 11ef40575..232f4c556 100644 --- a/core/commands/id.go +++ b/core/commands/id.go @@ -151,7 +151,7 @@ func printPeer(ps peer.Peerstore, p peer.ID) (interface{}, error) { info.PublicKey = base64.StdEncoding.EncodeToString(pkb) } - for _, a := range ps.Addresses(p) { + for _, a := range ps.Addrs(p) { info.Addresses = append(info.Addresses, a.String()) } diff --git a/core/commands/ping.go b/core/commands/ping.go index 73636db92..3242f6ea8 100644 --- a/core/commands/ping.go +++ b/core/commands/ping.go @@ -95,7 +95,7 @@ trip latency information. } if addr != nil { - n.Peerstore.AddAddress(peerID, addr) + n.Peerstore.AddAddr(peerID, addr, peer.TempAddrTTL) // temporary } // Set up number of pings @@ -120,7 +120,7 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int) go func() { defer close(outChan) - if len(n.Peerstore.Addresses(pid)) == 0 { + if len(n.Peerstore.Addrs(pid)) == 0 { // Make sure we can find the node in question outChan <- &PingResult{ Text: fmt.Sprintf("Looking up peer %s", pid.Pretty()), @@ -132,7 +132,7 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int) outChan <- &PingResult{Text: fmt.Sprintf("Peer lookup error: %s", err)} return } - n.Peerstore.AddPeerInfo(p) + n.Peerstore.AddAddrs(p.ID, p.Addrs, peer.TempAddrTTL) } outChan <- &PingResult{Text: fmt.Sprintf("PING %s.", pid.Pretty())} diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 1db08aa02..241b49084 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -236,7 +236,7 @@ func peersWithAddresses(ps peer.Peerstore, addrs []string) (pids []peer.ID, err for _, iaddr := range iaddrs { pids = append(pids, iaddr.ID()) - ps.AddAddress(iaddr.ID(), iaddr.Multiaddr()) + ps.AddAddr(iaddr.ID(), iaddr.Multiaddr(), peer.TempAddrTTL) } return pids, nil } diff --git a/core/core.go b/core/core.go index a065ac905..f8c42a223 100644 --- a/core/core.go +++ b/core/core.go @@ -476,16 +476,12 @@ func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) return err } - // explicitly set these as our listen addrs. - // (why not do it inside inet.NewNetwork? because this way we can - // listen on addresses without necessarily advertising those publicly.) + // list out our addresses addrs, err := host.Network().InterfaceListenAddresses() if err != nil { return debugerror.Wrap(err) } log.Infof("Swarm listening at: %s", addrs) - - host.Peerstore().AddAddresses(host.ID(), addrs) return nil } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 2ea6705d0..22ead701c 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -38,18 +38,24 @@ type impl struct { receiver Receiver } +func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) { + + // first, make sure we're connected. + // if this fails, we cannot connect to given peer. + //TODO(jbenet) move this into host.NewStream? + if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { + return nil, err + } + + return bsnet.host.NewStream(ProtocolBitswap, p) +} + func (bsnet *impl) SendMessage( ctx context.Context, p peer.ID, outgoing bsmsg.BitSwapMessage) error { - // ensure we're connected - //TODO(jbenet) move this into host.NewStream? - if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { - return err - } - - s, err := bsnet.host.NewStream(ProtocolBitswap, p) + s, err := bsnet.newStreamToPeer(ctx, p) if err != nil { return err } @@ -68,13 +74,7 @@ func (bsnet *impl) SendRequest( p peer.ID, outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) { - // ensure we're connected - //TODO(jbenet) move this into host.NewStream? - if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { - return nil, err - } - - s, err := bsnet.host.NewStream(ProtocolBitswap, p) + s, err := bsnet.newStreamToPeer(ctx, p) if err != nil { return nil, err } @@ -123,7 +123,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) if info.ID == bsnet.host.ID() { continue // ignore self as provider } - bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs) + bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peer.TempAddrTTL) select { case <-ctx.Done(): return diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index b9314c399..f6a05ee0c 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -144,7 +144,7 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) { func (h *BasicHost) Connect(ctx context.Context, pi peer.PeerInfo) error { // absorb addresses into peerstore - h.Peerstore().AddPeerInfo(pi) + h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peer.TempAddrTTL) cs := h.Network().ConnsToPeer(pi.ID) if len(cs) > 0 { @@ -189,6 +189,10 @@ func (h *BasicHost) Addrs() []ma.Multiaddr { log.Debug("error retrieving network interface addrs") } + if h.ids != nil { // add external observed addresses + addrs = append(addrs, h.ids.OwnObservedAddrs()...) + } + if h.natmgr != nil { // natmgr is nil if we do not use nat option. nat := h.natmgr.NAT() if nat != nil { // nat is nil if not ready, or no nat is available. diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go index cfd4be142..3881e62c2 100644 --- a/p2p/net/mock/mock_link.go +++ b/p2p/net/mock/mock_link.go @@ -33,8 +33,8 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { c.local = ln.peer c.remote = rn.peer - c.localAddr = ln.ps.Addresses(ln.peer)[0] - c.remoteAddr = rn.ps.Addresses(rn.peer)[0] + c.localAddr = ln.ps.Addrs(ln.peer)[0] + c.remoteAddr = rn.ps.Addrs(rn.peer)[0] c.localPrivKey = ln.ps.PrivKey(ln.peer) c.remotePubKey = rn.ps.PubKey(rn.peer) diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index 87e51faf6..79c4a6aa0 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -49,7 +49,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, // create our own entirely, so that peers knowledge doesn't get shared ps := peer.NewPeerstore() - ps.AddAddress(p, a) + ps.AddAddr(p, a, peer.PermanentAddrTTL) ps.AddPrivKey(p, k) ps.AddPubKey(p, k.GetPublic()) @@ -307,13 +307,13 @@ func (pn *peernet) BandwidthTotals() (in uint64, out uint64) { // Listen tells the network to start listening on given multiaddrs. func (pn *peernet) Listen(addrs ...ma.Multiaddr) error { - pn.Peerstore().AddAddresses(pn.LocalPeer(), addrs) + pn.Peerstore().AddAddrs(pn.LocalPeer(), addrs, peer.PermanentAddrTTL) return nil } // ListenAddresses returns a list of addresses at which this network listens. func (pn *peernet) ListenAddresses() []ma.Multiaddr { - return pn.Peerstore().Addresses(pn.LocalPeer()) + return pn.Peerstore().Addrs(pn.LocalPeer()) } // InterfaceListenAddresses returns a list of addresses at which this network diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 92e54c719..252859df0 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -48,7 +48,7 @@ func TestSimultDials(t *testing.T) { connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // copy for other peer log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr) - s.peers.AddAddress(dst, addr) + s.peers.AddAddr(dst, addr, peer.TempAddrTTL) if _, err := s.Dial(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } @@ -125,7 +125,7 @@ func TestDialWait(t *testing.T) { s2p, s2addr, s2l := newSilentPeer(t) go acceptAndHang(s2l) defer s2l.Close() - s1.peers.AddAddress(s2p, s2addr) + s1.peers.AddAddr(s2p, s2addr, peer.PermanentAddrTTL) before := time.Now() if c, err := s1.Dial(ctx, s2p); err == nil { @@ -171,13 +171,13 @@ func TestDialBackoff(t *testing.T) { if err != nil { t.Fatal(err) } - s1.peers.AddAddresses(s2.local, s2addrs) + s1.peers.AddAddrs(s2.local, s2addrs, peer.PermanentAddrTTL) // dial to a non-existent peer. s3p, s3addr, s3l := newSilentPeer(t) go acceptAndHang(s3l) defer s3l.Close() - s1.peers.AddAddress(s3p, s3addr) + s1.peers.AddAddr(s3p, s3addr, peer.PermanentAddrTTL) // in this test we will: // 1) dial 10x to each node. @@ -389,7 +389,7 @@ func TestDialBackoffClears(t *testing.T) { defer s2l.Close() // phase 1 -- dial to non-operational addresses - s1.peers.AddAddress(s2.local, s2bad) + s1.peers.AddAddr(s2.local, s2bad, peer.PermanentAddrTTL) before := time.Now() if c, err := s1.Dial(ctx, s2.local); err == nil { @@ -419,7 +419,7 @@ func TestDialBackoffClears(t *testing.T) { if err != nil { t.Fatal(err) } - s1.peers.AddAddresses(s2.local, ifaceAddrs1) + s1.peers.AddAddrs(s2.local, ifaceAddrs1, peer.PermanentAddrTTL) before = time.Now() if c, err := s1.Dial(ctx, s2.local); err != nil { diff --git a/p2p/net/swarm/peers_test.go b/p2p/net/swarm/peers_test.go index 7a35c70d4..583e218d4 100644 --- a/p2p/net/swarm/peers_test.go +++ b/p2p/net/swarm/peers_test.go @@ -19,7 +19,7 @@ func TestPeers(t *testing.T) { connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // TODO: make a DialAddr func. - s.peers.AddAddress(dst, addr) + s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL) // t.Logf("connections from %s", s.LocalPeer()) // for _, c := range s.ConnectionsToPeer(dst) { // t.Logf("connection from %s to %s: %v", s.LocalPeer(), dst, c) diff --git a/p2p/net/swarm/simul_test.go b/p2p/net/swarm/simul_test.go index 8a3df0d2e..446a011d7 100644 --- a/p2p/net/swarm/simul_test.go +++ b/p2p/net/swarm/simul_test.go @@ -25,7 +25,7 @@ func TestSimultOpen(t *testing.T) { connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // copy for other peer log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr) - s.peers.AddAddress(dst, addr) + s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL) if _, err := s.Dial(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } diff --git a/p2p/net/swarm/swarm_addr_test.go b/p2p/net/swarm/swarm_addr_test.go index 850ba6eb4..e822a4b01 100644 --- a/p2p/net/swarm/swarm_addr_test.go +++ b/p2p/net/swarm/swarm_addr_test.go @@ -110,7 +110,7 @@ func TestDialBadAddrs(t *testing.T) { test := func(a ma.Multiaddr) { p := testutil.RandPeerIDFatal(t) - s.peers.AddAddress(p, a) + s.peers.AddAddr(p, a, peer.PermanentAddrTTL) if _, err := s.Dial(ctx, p); err == nil { t.Error("swarm should not dial: %s", m) } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index bad1422cc..5ee049441 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -289,14 +289,14 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { } // get remote peer addrs - remoteAddrs := s.peers.Addresses(p) + remoteAddrs := s.peers.Addrs(p) // make sure we can use the addresses. remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs) // drop out any addrs that would just dial ourselves. use ListenAddresses // as that is a more authoritative view than localAddrs. ila, _ := s.InterfaceListenAddresses() remoteAddrs = addrutil.Subtract(remoteAddrs, ila) - remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local)) + remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local)) log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses()) if len(remoteAddrs) == 0 { err := errors.New("peer has no addresses") diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 734726fac..8c7a7b5a7 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -53,7 +53,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { // return err // } // for _, a := range resolved { - // s.peers.AddAddress(s.local, a) + // s.peers.AddAddr(s.local, a) // } sk := s.peers.PrivKey(s.local) diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 07c583eab..1c2ae1d03 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -75,7 +75,7 @@ func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm) { var wg sync.WaitGroup connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { // TODO: make a DialAddr func. - s.peers.AddAddress(dst, addr) + s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL) if _, err := s.Dial(ctx, dst); err != nil { t.Fatal("error swarm dialing to peer", err) } diff --git a/p2p/peer/addr/addr_manager.go b/p2p/peer/addr_manager.go similarity index 55% rename from p2p/peer/addr/addr_manager.go rename to p2p/peer/addr_manager.go index b35b02eac..14a8010b0 100644 --- a/p2p/peer/addr/addr_manager.go +++ b/p2p/peer/addr_manager.go @@ -1,16 +1,38 @@ -// package addr provides useful address utilities for p2p -// applications. It buys into the multi-transport addressing -// scheme Multiaddr, and uses it to build its own p2p addressing. -// All Addrs must have an associated peer.ID. -package addr +package peer import ( "sync" "time" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) - peer "github.com/jbenet/go-ipfs/p2p/peer" +const ( + + // TempAddrTTL is the ttl used for a short lived address + TempAddrTTL = time.Second * 10 + + // ProviderAddrTTL is the TTL of an address we've received from a provider. + // This is also a temporary address, but lasts longer. After this expires, + // the records we return will require an extra lookup. + ProviderAddrTTL = time.Minute * 10 + + // RecentlyConnectedAddrTTL is used when we recently connected to a peer. + // It means that we are reasonably certain of the peer's address. + RecentlyConnectedAddrTTL = time.Minute * 10 + + // OwnObservedAddrTTL is used for our own external addresses observed by peers. + OwnObservedAddrTTL = time.Minute * 20 + + // PermanentAddrTTL is the ttl for a "permanent address" (e.g. bootstrap nodes) + // if we haven't shipped you an update to ipfs in 356 days + // we probably arent running the same bootstrap nodes... + PermanentAddrTTL = time.Hour * 24 * 356 + + // ConnectedAddrTTL is the ttl used for the addresses of a peer to whom + // we're connected directly. This is basically permanent, as we will + // clear them + re-add under a TempAddrTTL after disconnecting. + ConnectedAddrTTL = PermanentAddrTTL ) type expiringAddr struct { @@ -24,30 +46,44 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool { type addrSet map[string]expiringAddr -// Manager manages addresses. +// AddrManager manages addresses. // The zero-value is ready to be used. -type Manager struct { +type AddrManager struct { addrmu sync.Mutex // guards addrs - addrs map[peer.ID]addrSet + addrs map[ID]addrSet } -// ensures the Manager is initialized. +// ensures the AddrManager is initialized. // So we can use the zero value. -func (mgr *Manager) init() { +func (mgr *AddrManager) init() { if mgr.addrs == nil { - mgr.addrs = make(map[peer.ID]addrSet) + mgr.addrs = make(map[ID]addrSet) } } +func (mgr *AddrManager) Peers() []ID { + mgr.addrmu.Lock() + defer mgr.addrmu.Unlock() + if mgr.addrs == nil { + return nil + } + + pids := make([]ID, 0, len(mgr.addrs)) + for pid := range mgr.addrs { + pids = append(pids, pid) + } + return pids +} + // AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl) -func (mgr *Manager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { +func (mgr *AddrManager) AddAddr(p ID, addr ma.Multiaddr, ttl time.Duration) { mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) } -// AddAddrs gives Manager addresses to use, with a given ttl +// AddAddrs gives AddrManager addresses to use, with a given ttl // (time-to-live), after which the address is no longer valid. // If the manager has a longer TTL, the operation is a no-op for that address -func (mgr *Manager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { +func (mgr *AddrManager) AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) { mgr.addrmu.Lock() defer mgr.addrmu.Unlock() @@ -77,13 +113,13 @@ func (mgr *Manager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) } // SetAddr calls mgr.SetAddrs(p, addr, ttl) -func (mgr *Manager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { +func (mgr *AddrManager) SetAddr(p ID, addr ma.Multiaddr, ttl time.Duration) { mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl) } // SetAddrs sets the ttl on addresses. This clears any TTL there previously. // This is used when we receive the best estimate of the validity of an address. -func (mgr *Manager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { +func (mgr *AddrManager) SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) { mgr.addrmu.Lock() defer mgr.addrmu.Unlock() @@ -109,8 +145,8 @@ func (mgr *Manager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) } } -// Addresses returns all known (and valid) addresses for a given peer. -func (mgr *Manager) Addrs(p peer.ID) []ma.Multiaddr { +// Addresses returns all known (and valid) addresses for a given +func (mgr *AddrManager) Addrs(p ID) []ma.Multiaddr { mgr.addrmu.Lock() defer mgr.addrmu.Unlock() @@ -143,7 +179,7 @@ func (mgr *Manager) Addrs(p peer.ID) []ma.Multiaddr { } // ClearAddresses removes all previously stored addresses -func (mgr *Manager) ClearAddrs(p peer.ID) { +func (mgr *AddrManager) ClearAddrs(p ID) { mgr.addrmu.Lock() defer mgr.addrmu.Unlock() mgr.init() diff --git a/p2p/peer/addr/addr_manager_test.go b/p2p/peer/addr_manager_test.go similarity index 96% rename from p2p/peer/addr/addr_manager_test.go rename to p2p/peer/addr_manager_test.go index be24ff12b..1c488abda 100644 --- a/p2p/peer/addr/addr_manager_test.go +++ b/p2p/peer/addr_manager_test.go @@ -1,16 +1,14 @@ -package addr +package peer import ( "testing" "time" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" - - peer "github.com/jbenet/go-ipfs/p2p/peer" ) -func IDS(t *testing.T, ids string) peer.ID { - id, err := peer.IDB58Decode(ids) +func IDS(t *testing.T, ids string) ID { + id, err := IDB58Decode(ids) if err != nil { t.Fatal(err) } @@ -71,7 +69,7 @@ func TestAddresses(t *testing.T) { ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555") ttl := time.Hour - m := Manager{} + m := AddrManager{} m.AddAddr(id1, ma11, ttl) m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl) @@ -109,7 +107,7 @@ func TestAddressesExpire(t *testing.T) { ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444") ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555") - m := Manager{} + m := AddrManager{} m.AddAddr(id1, ma11, time.Hour) m.AddAddr(id1, ma12, time.Hour) m.AddAddr(id1, ma13, time.Hour) @@ -164,7 +162,7 @@ func TestClearWorks(t *testing.T) { ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444") ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555") - m := Manager{} + m := AddrManager{} m.AddAddr(id1, ma11, time.Hour) m.AddAddr(id1, ma12, time.Hour) m.AddAddr(id1, ma13, time.Hour) diff --git a/p2p/peer/peerstore.go b/p2p/peer/peerstore.go index 8be502ca4..f1281e86e 100644 --- a/p2p/peer/peerstore.go +++ b/p2p/peer/peerstore.go @@ -20,8 +20,8 @@ const ( // Peerstore provides a threadsafe store of Peer related // information. type Peerstore interface { + AddrBook KeyBook - AddressBook Metrics // Peers returns a list of all peer.IDs in this Peerstore @@ -32,9 +32,6 @@ type Peerstore interface { // that peer, useful to other services. PeerInfo(ID) PeerInfo - // AddPeerInfo absorbs the information listed in given PeerInfo. - AddPeerInfo(PeerInfo) - // Get/Put is a simple registry for other peer-related key/value pairs. // if we find something we use often, it should become its own set of // methods. this is a last resort. @@ -42,109 +39,30 @@ type Peerstore interface { Put(id ID, key string, val interface{}) error } -// AddressBook tracks the addresses of Peers -type AddressBook interface { - Addresses(ID) []ma.Multiaddr // returns addresses for ID - AddAddress(ID, ma.Multiaddr) // Adds given addr for ID - AddAddresses(ID, []ma.Multiaddr) // Adds given addrs for ID - SetAddresses(ID, []ma.Multiaddr) // Sets given addrs for ID (clears previously stored) -} +// AddrBook is an interface that fits the new AddrManager. I'm patching +// it up in here to avoid changing a ton of the codebase. +type AddrBook interface { -type expiringAddr struct { - Addr ma.Multiaddr - TTL time.Time -} + // AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl) + AddAddr(p ID, addr ma.Multiaddr, ttl time.Duration) -func (e *expiringAddr) Expired() bool { - return time.Now().After(e.TTL) -} + // AddAddrs gives AddrManager addresses to use, with a given ttl + // (time-to-live), after which the address is no longer valid. + // If the manager has a longer TTL, the operation is a no-op for that address + AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) -type addressMap map[string]expiringAddr + // SetAddr calls mgr.SetAddrs(p, addr, ttl) + SetAddr(p ID, addr ma.Multiaddr, ttl time.Duration) -type addressbook struct { - sync.RWMutex // guards all fields + // SetAddrs sets the ttl on addresses. This clears any TTL there previously. + // This is used when we receive the best estimate of the validity of an address. + SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) - addrs map[ID]addressMap - ttl time.Duration // initial ttl -} + // Addresses returns all known (and valid) addresses for a given + Addrs(p ID) []ma.Multiaddr -func newAddressbook() *addressbook { - return &addressbook{ - addrs: map[ID]addressMap{}, - ttl: AddressTTL, - } -} - -func (ab *addressbook) Peers() []ID { - ab.RLock() - ps := make([]ID, 0, len(ab.addrs)) - for p := range ab.addrs { - ps = append(ps, p) - } - ab.RUnlock() - return ps -} - -func (ab *addressbook) Addresses(p ID) []ma.Multiaddr { - ab.Lock() - defer ab.Unlock() - - maddrs, found := ab.addrs[p] - if !found { - return nil - } - - good := make([]ma.Multiaddr, 0, len(maddrs)) - var expired []string - for s, m := range maddrs { - if m.Expired() { - expired = append(expired, s) - } else { - good = append(good, m.Addr) - } - } - - // clean up the expired ones. - for _, s := range expired { - delete(ab.addrs[p], s) - } - return good -} - -func (ab *addressbook) AddAddress(p ID, m ma.Multiaddr) { - ab.AddAddresses(p, []ma.Multiaddr{m}) -} - -func (ab *addressbook) AddAddresses(p ID, ms []ma.Multiaddr) { - ab.Lock() - defer ab.Unlock() - - amap, found := ab.addrs[p] - if !found { - amap = addressMap{} - ab.addrs[p] = amap - } - - ttl := time.Now().Add(ab.ttl) - for _, m := range ms { - // re-set all of them for new ttl. - amap[m.String()] = expiringAddr{ - Addr: m, - TTL: ttl, - } - } -} - -func (ab *addressbook) SetAddresses(p ID, ms []ma.Multiaddr) { - ab.Lock() - defer ab.Unlock() - - amap := addressMap{} - ttl := time.Now().Add(ab.ttl) - for _, m := range ms { - amap[m.String()] = expiringAddr{Addr: m, TTL: ttl} - } - ab.addrs[p] = amap // clear what was there before + // ClearAddresses removes all previously stored addresses + ClearAddrs(p ID) } // KeyBook tracks the Public keys of Peers. @@ -231,8 +149,8 @@ func (kb *keybook) AddPrivKey(p ID, sk ic.PrivKey) error { type peerstore struct { keybook - addressbook metrics + AddrManager // store other data, like versions ds ds.ThreadSafeDatastore @@ -242,8 +160,8 @@ type peerstore struct { func NewPeerstore() Peerstore { return &peerstore{ keybook: *newKeybook(), - addressbook: *newAddressbook(), metrics: *(NewMetrics()).(*metrics), + AddrManager: AddrManager{}, ds: dssync.MutexWrap(ds.NewMapDatastore()), } } @@ -263,7 +181,7 @@ func (ps *peerstore) Peers() []ID { for _, p := range ps.keybook.Peers() { set[p] = struct{}{} } - for _, p := range ps.addressbook.Peers() { + for _, p := range ps.AddrManager.Peers() { set[p] = struct{}{} } @@ -277,14 +195,10 @@ func (ps *peerstore) Peers() []ID { func (ps *peerstore) PeerInfo(p ID) PeerInfo { return PeerInfo{ ID: p, - Addrs: ps.addressbook.Addresses(p), + Addrs: ps.AddrManager.Addrs(p), } } -func (ps *peerstore) AddPeerInfo(pi PeerInfo) { - ps.AddAddresses(pi.ID, pi.Addrs) -} - func PeerInfos(ps Peerstore, peers []ID) []PeerInfo { pi := make([]PeerInfo, len(peers)) for i, p := range peers { diff --git a/p2p/peer/peerstore_test.go b/p2p/peer/peerstore_test.go deleted file mode 100644 index 1edf6ae70..000000000 --- a/p2p/peer/peerstore_test.go +++ /dev/null @@ -1,185 +0,0 @@ -package peer - -import ( - "testing" - "time" - - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" -) - -func IDS(t *testing.T, ids string) ID { - id, err := IDB58Decode(ids) - if err != nil { - t.Fatal(err) - } - return id -} - -func MA(t *testing.T, m string) ma.Multiaddr { - maddr, err := ma.NewMultiaddr(m) - if err != nil { - t.Fatal(err) - } - return maddr -} - -func TestAddresses(t *testing.T) { - - ps := NewPeerstore() - - id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") - id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ") - id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn") - id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn") - id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km") - - ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") - ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111") - ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222") - ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111") - ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222") - ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333") - ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111") - ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222") - ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333") - ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444") - ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111") - ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222") - ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333") - ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444") - ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555") - - ps.AddAddress(id1, ma11) - ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22}) - ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22}) // idempotency - ps.AddAddress(id3, ma31) - ps.AddAddress(id3, ma32) - ps.AddAddress(id3, ma33) - ps.AddAddress(id3, ma33) // idempotency - ps.AddAddress(id3, ma33) - ps.AddAddresses(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // multiple - ps.AddAddresses(id5, []ma.Multiaddr{ma21, ma22}) // clearing - ps.AddAddresses(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // clearing - ps.SetAddresses(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}) // clearing - - test := func(exp, act []ma.Multiaddr) { - if len(exp) != len(act) { - t.Fatal("lengths not the same") - } - - for _, a := range exp { - found := false - - for _, b := range act { - if a.Equal(b) { - found = true - break - } - } - - if !found { - t.Fatal("expected address %s not found", a) - } - } - } - - // test the Addresses return value - test([]ma.Multiaddr{ma11}, ps.Addresses(id1)) - test([]ma.Multiaddr{ma21, ma22}, ps.Addresses(id2)) - test([]ma.Multiaddr{ma31, ma32, ma33}, ps.Addresses(id3)) - test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.Addresses(id4)) - test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.Addresses(id5)) - - // test also the PeerInfo return - test([]ma.Multiaddr{ma11}, ps.PeerInfo(id1).Addrs) - test([]ma.Multiaddr{ma21, ma22}, ps.PeerInfo(id2).Addrs) - test([]ma.Multiaddr{ma31, ma32, ma33}, ps.PeerInfo(id3).Addrs) - test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.PeerInfo(id4).Addrs) - test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.PeerInfo(id5).Addrs) -} - -func TestAddressTTL(t *testing.T) { - - ps := NewPeerstore() - id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") - ma1 := MA(t, "/ip4/1.2.3.1/tcp/1111") - ma2 := MA(t, "/ip4/2.2.3.2/tcp/2222") - ma3 := MA(t, "/ip4/3.2.3.3/tcp/3333") - ma4 := MA(t, "/ip4/4.2.3.3/tcp/4444") - ma5 := MA(t, "/ip4/5.2.3.3/tcp/5555") - - ps.AddAddress(id1, ma1) - ps.AddAddress(id1, ma2) - ps.AddAddress(id1, ma3) - ps.AddAddress(id1, ma4) - ps.AddAddress(id1, ma5) - - test := func(exp, act []ma.Multiaddr) { - if len(exp) != len(act) { - t.Fatal("lengths not the same") - } - - for _, a := range exp { - found := false - - for _, b := range act { - if a.Equal(b) { - found = true - break - } - } - - if !found { - t.Fatal("expected address %s not found", a) - } - } - } - - testTTL := func(ttle time.Duration, id ID, addr ma.Multiaddr) { - ab := ps.(*peerstore).addressbook - ttlat := ab.addrs[id][addr.String()].TTL - ttla := ttlat.Sub(time.Now()) - if ttla > ttle { - t.Error("ttl is greater than expected", ttle, ttla) - } - if ttla < (ttle / 2) { - t.Error("ttl is smaller than expected", ttle/2, ttla) - } - } - - // should they are there - ab := ps.(*peerstore).addressbook - if len(ab.addrs[id1]) != 5 { - t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1]) - } - - // test the Addresses return value - test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.Addresses(id1)) - test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.PeerInfo(id1).Addrs) - - // check the addr TTL is a bit smaller than the init TTL - testTTL(AddressTTL, id1, ma1) - testTTL(AddressTTL, id1, ma2) - testTTL(AddressTTL, id1, ma3) - testTTL(AddressTTL, id1, ma4) - testTTL(AddressTTL, id1, ma5) - - // change the TTL - setTTL := func(id ID, addr ma.Multiaddr, ttl time.Time) { - a := ab.addrs[id][addr.String()] - a.TTL = ttl - ab.addrs[id][addr.String()] = a - } - setTTL(id1, ma1, time.Now().Add(-1*time.Second)) - setTTL(id1, ma2, time.Now().Add(-1*time.Hour)) - setTTL(id1, ma3, time.Now().Add(-1*AddressTTL)) - - // should no longer list those - test([]ma.Multiaddr{ma4, ma5}, ps.Addresses(id1)) - test([]ma.Multiaddr{ma4, ma5}, ps.PeerInfo(id1).Addrs) - - // should no longer be there - if len(ab.addrs[id1]) != 2 { - t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1]) - } -} diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 107c2538d..1e7089ecf 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -11,6 +11,7 @@ import ( host "github.com/jbenet/go-ipfs/p2p/host" inet "github.com/jbenet/go-ipfs/p2p/net" + peer "github.com/jbenet/go-ipfs/p2p/peer" protocol "github.com/jbenet/go-ipfs/p2p/protocol" pb "github.com/jbenet/go-ipfs/p2p/protocol/identify/pb" config "github.com/jbenet/go-ipfs/repo/config" @@ -49,6 +50,10 @@ type IDService struct { // for wait purposes currid map[inet.Conn]chan struct{} currmu sync.RWMutex + + // our own observed addresses. + // TODO: instead of expiring, remove these when we disconnect + addrs peer.AddrManager } func NewIDService(h host.Host) *IDService { @@ -60,6 +65,11 @@ func NewIDService(h host.Host) *IDService { return s } +// OwnObservedAddrs returns the addresses peers have reported we've dialed from +func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr { + return ids.addrs.Addrs(ids.Host.ID()) +} + func (ids *IDService) IdentifyConn(c inet.Conn) { ids.currmu.Lock() if wait, found := ids.currid[c]; found { @@ -176,7 +186,7 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { // update our peerstore with the addresses. here, we SET the addresses, clearing old ones. // We are receiving from the peer itself. this is current address ground truth. - ids.Host.Peerstore().SetAddresses(p, lmaddrs) + ids.Host.Peerstore().SetAddrs(p, lmaddrs, peer.ConnectedAddrTTL) log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs) // get protocol versions @@ -235,7 +245,7 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c inet.Conn) { // ok! we have the observed version of one of our ListenAddresses! log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr) - ids.Host.Peerstore().AddAddress(ids.Host.ID(), maddr) + ids.addrs.AddAddr(ids.Host.ID(), maddr, peer.OwnObservedAddrTTL) } func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool { @@ -246,3 +256,28 @@ func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool { } return false } + +// netNotifiee defines methods to be used with the IpfsDHT +type netNotifiee IDService + +func (nn *netNotifiee) IDService() *IDService { + return (*IDService)(nn) +} + +func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { + // TODO: deprecate the setConnHandler hook, and kick off + // identification here. +} + +func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { + // undo the setting of addresses to peer.ConnectedAddrTTL we did + ids := nn.IDService() + ps := ids.Host.Peerstore() + addrs := ps.Addrs(v.RemotePeer()) + ps.SetAddrs(v.RemotePeer(), addrs, peer.RecentlyConnectedAddrTTL) +} + +func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {} +func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {} +func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {} diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index c6707b0ac..6423a14f7 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -38,7 +38,7 @@ func subtestIDService(t *testing.T, postDialWait time.Duration) { // the IDService should be opened automatically, by the network. // what we should see now is that both peers know about each others listen addresses. - testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addresses(h2p)) // has them + testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them testHasProtocolVersions(t, h1, h2p) // now, this wait we do have to do. it's the wait for the Listening side @@ -50,12 +50,12 @@ func subtestIDService(t *testing.T, postDialWait time.Duration) { <-h2.IDService().IdentifyWait(c[0]) // and the protocol versions. - testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addresses(h1p)) // has them + testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p)) // has them testHasProtocolVersions(t, h2, h1p) } func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) { - actual := h.Peerstore().Addresses(p) + actual := h.Peerstore().Addrs(p) if len(actual) != len(expected) { t.Error("dont have the same addresses") diff --git a/p2p/test/util/util.go b/p2p/test/util/util.go index 70b286429..a680d4f3d 100644 --- a/p2p/test/util/util.go +++ b/p2p/test/util/util.go @@ -22,14 +22,14 @@ func GenSwarmNetwork(t *testing.T, ctx context.Context) *swarm.Network { if err != nil { t.Fatal(err) } - ps.AddAddresses(p.ID, n.ListenAddresses()) + ps.AddAddrs(p.ID, n.ListenAddresses(), peer.PermanentAddrTTL) return n } func DivulgeAddresses(a, b inet.Network) { id := a.LocalPeer() - addrs := a.Peerstore().Addresses(id) - b.Peerstore().AddAddresses(id, addrs) + addrs := a.Peerstore().Addrs(id) + b.Peerstore().AddAddrs(id, addrs, peer.PermanentAddrTTL) } func GenHostSwarm(t *testing.T, ctx context.Context) *bhost.BasicHost { diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 00597b016..9d65a6fac 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -55,7 +55,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer for i := 0; i < n; i++ { dhts[i] = setupDHT(ctx, t) peers[i] = dhts[i].self - addrs[i] = dhts[i].peerstore.Addresses(dhts[i].self)[0] + addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0] } return addrs, peers, dhts @@ -64,12 +64,12 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { idB := b.self - addrB := b.peerstore.Addresses(idB) + addrB := b.peerstore.Addrs(idB) if len(addrB) == 0 { t.Fatal("peers setup incorrectly: no local address") } - a.peerstore.AddAddresses(idB, addrB) + a.peerstore.AddAddrs(idB, addrB, peer.TempAddrTTL) if err := a.Connect(ctx, idB); err != nil { t.Fatal(err) } @@ -754,20 +754,20 @@ func TestConnectCollision(t *testing.T) { dhtA := setupDHT(ctx, t) dhtB := setupDHT(ctx, t) - addrA := dhtA.peerstore.Addresses(dhtA.self)[0] - addrB := dhtB.peerstore.Addresses(dhtB.self)[0] + addrA := dhtA.peerstore.Addrs(dhtA.self)[0] + addrB := dhtB.peerstore.Addrs(dhtB.self)[0] peerA := dhtA.self peerB := dhtB.self errs := make(chan error) go func() { - dhtA.peerstore.AddAddress(peerB, addrB) + dhtA.peerstore.AddAddr(peerB, addrB, peer.TempAddrTTL) err := dhtA.Connect(ctx, peerB) errs <- err }() go func() { - dhtB.peerstore.AddAddress(peerA, addrA) + dhtB.peerstore.AddAddr(peerA, addrA, peer.TempAddrTTL) err := dhtB.Connect(ctx, peerA) errs <- err }() diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 6376dbcba..6974ad08d 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -238,7 +238,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs) if pi.ID != dht.self { // dont add own addrs. // add the received addresses to our peerstore. - dht.peerstore.AddPeerInfo(pi) + dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peer.ProviderAddrTTL) } dht.providers.AddProvider(key, p) } diff --git a/routing/dht/lookup.go b/routing/dht/lookup.go index 6e0acd9fa..a713e553d 100644 --- a/routing/dht/lookup.go +++ b/routing/dht/lookup.go @@ -100,7 +100,7 @@ func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key u.Key, p peer.ID) for _, pbp := range pmes.GetCloserPeers() { pid := peer.ID(pbp.GetId()) if pid != dht.self { // dont add self - dht.peerstore.AddAddresses(pid, pbp.Addresses()) + dht.peerstore.AddAddrs(pid, pbp.Addresses(), peer.TempAddrTTL) out = append(out, pid) } } diff --git a/routing/dht/query.go b/routing/dht/query.go index 8d6505b88..293c0ddd9 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -253,7 +253,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { } // add their addresses to the dialer's peerstore - r.query.dht.peerstore.AddPeerInfo(next) + r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, peer.TempAddrTTL) r.addPeerToQuery(cg.Context(), next.ID) log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs) } diff --git a/routing/grandcentral/server.go b/routing/grandcentral/server.go index f51b71917..179b15b3e 100644 --- a/routing/grandcentral/server.go +++ b/routing/grandcentral/server.go @@ -96,7 +96,7 @@ func (s *Server) handleMessage( } for _, maddr := range provider.Addresses() { // FIXME do we actually want to store to peerstore - s.peerstore.AddAddress(p, maddr) + s.peerstore.AddAddr(p, maddr, peer.TempAddrTTL) } } var providers []dhtpb.Message_Peer