From 56d900fa1058a357599236b5434ec83d2019b923 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 26 Jan 2015 12:06:39 -0800 Subject: [PATCH 1/5] p2p/peer: addressbook can now clear addrs --- p2p/peer/addr/addrsrcs.go | 70 ++++++++++++++++++++++++++++++ p2p/peer/addr/addrsrcs_test.go | 78 ++++++++++++++++++++++++++++++++++ p2p/peer/peerstore.go | 40 +++++++++-------- p2p/peer/peerstore_test.go | 35 ++++++++++++--- 4 files changed, 199 insertions(+), 24 deletions(-) create mode 100644 p2p/peer/addr/addrsrcs.go create mode 100644 p2p/peer/addr/addrsrcs_test.go diff --git a/p2p/peer/addr/addrsrcs.go b/p2p/peer/addr/addrsrcs.go new file mode 100644 index 000000000..67f0ff2ed --- /dev/null +++ b/p2p/peer/addr/addrsrcs.go @@ -0,0 +1,70 @@ +// Package addr provides utility functions to handle peer addresses. +package addr + +import ( + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +// AddrSource is a source of addresses. It allows clients to retrieve +// a set of addresses at a last possible moment in time. It is used +// to query a set of addresses that may change over time, as a result +// of the network changing interfaces or mappings. +type Source interface { + Addrs() []ma.Multiaddr +} + +// CombineSources returns a new AddrSource which is the +// concatenation of all input AddrSources: +// +// combined := CombinedSources(a, b) +// combined.Addrs() // append(a.Addrs(), b.Addrs()...) +// +func CombineSources(srcs ...Source) Source { + return combinedAS(srcs) +} + +type combinedAS []Source + +func (cas combinedAS) Addrs() []ma.Multiaddr { + var addrs []ma.Multiaddr + for _, s := range cas { + addrs = append(addrs, s.Addrs()...) + } + return addrs +} + +// UniqueSource returns a new AddrSource which omits duplicate +// addresses from the inputs: +// +// unique := UniqueSource(a, b) +// unique.Addrs() // append(a.Addrs(), b.Addrs()...) +// // but only adds each addr once. +// +func UniqueSource(srcs ...Source) Source { + return uniqueAS(srcs) +} + +type uniqueAS []Source + +func (uas uniqueAS) Addrs() []ma.Multiaddr { + seen := make(map[string]struct{}) + var addrs []ma.Multiaddr + for _, s := range uas { + for _, a := range s.Addrs() { + s := a.String() + if _, found := seen[s]; !found { + addrs = append(addrs, a) + seen[s] = struct{}{} + } + } + } + return addrs +} + +// Slice is a simple slice of addresses that implements +// the AddrSource interface. +type Slice []ma.Multiaddr + +func (as Slice) Addrs() []ma.Multiaddr { + return as +} diff --git a/p2p/peer/addr/addrsrcs_test.go b/p2p/peer/addr/addrsrcs_test.go new file mode 100644 index 000000000..fbd99d247 --- /dev/null +++ b/p2p/peer/addr/addrsrcs_test.go @@ -0,0 +1,78 @@ +package addr + +import ( + "fmt" + "testing" + + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +func newAddrOrFatal(t *testing.T, s string) ma.Multiaddr { + a, err := ma.NewMultiaddr(s) + if err != nil { + t.Fatal("error parsing multiaddr", err) + } + return a +} + +func newAddrs(t *testing.T, n int) []ma.Multiaddr { + addrs := make([]ma.Multiaddr, n) + for i := 0; i < n; i++ { + s := fmt.Sprintf("/ip4/1.2.3.4/tcp/%d", i) + addrs[i] = newAddrOrFatal(t, s) + } + return addrs +} + +func addrSetsSame(a, b []ma.Multiaddr) bool { + if len(a) != len(b) { + return false + } + for i, aa := range a { + bb := b[i] + if !aa.Equal(bb) { + return false + } + } + return true +} + +func addrSourcesSame(a, b Source) bool { + return addrSetsSame(a.Addrs(), b.Addrs()) +} + +func TestAddrCombine(t *testing.T) { + addrs := newAddrs(t, 30) + a := Slice(addrs[0:10]) + b := Slice(addrs[10:20]) + c := Slice(addrs[20:30]) + d := CombineSources(a, b, c) + if !addrSetsSame(addrs, d.Addrs()) { + t.Error("addrs differ") + } + if !addrSourcesSame(Slice(addrs), d) { + t.Error("addrs differ") + } +} + +func TestAddrUnique(t *testing.T) { + + addrs := newAddrs(t, 40) + a := Slice(addrs[0:20]) + b := Slice(addrs[10:30]) + c := Slice(addrs[20:40]) + d := CombineSources(a, b, c) + e := UniqueSource(a, b, c) + if addrSetsSame(addrs, d.Addrs()) { + t.Error("addrs same") + } + if addrSourcesSame(Slice(addrs), d) { + t.Error("addrs same") + } + if !addrSetsSame(addrs, e.Addrs()) { + t.Error("addrs differ", addrs, "\n\n", e.Addrs(), "\n\n") + } + if !addrSourcesSame(Slice(addrs), e) { + t.Error("addrs differ", addrs, "\n\n", e.Addrs(), "\n\n") + } +} diff --git a/p2p/peer/peerstore.go b/p2p/peer/peerstore.go index 32d6207ea..2f25ae46a 100644 --- a/p2p/peer/peerstore.go +++ b/p2p/peer/peerstore.go @@ -38,9 +38,10 @@ type Peerstore interface { // AddressBook tracks the addresses of Peers type AddressBook interface { - Addresses(ID) []ma.Multiaddr - AddAddress(ID, ma.Multiaddr) - AddAddresses(ID, []ma.Multiaddr) + Addresses(ID) []ma.Multiaddr // returns addresses for ID + AddAddress(ID, ma.Multiaddr) // Adds given addr for ID + AddAddresses(ID, []ma.Multiaddr) // Adds given addrs for ID + SetAddresses(ID, []ma.Multiaddr) // Sets given addrs for ID (clears previously stored) } type addressMap map[string]ma.Multiaddr @@ -81,27 +82,32 @@ func (ab *addressbook) Addresses(p ID) []ma.Multiaddr { } func (ab *addressbook) AddAddress(p ID, m ma.Multiaddr) { - ab.Lock() - defer ab.Unlock() - - _, found := ab.addrs[p] - if !found { - ab.addrs[p] = addressMap{} - } - ab.addrs[p][m.String()] = m + ab.AddAddresses(p, []ma.Multiaddr{m}) } func (ab *addressbook) AddAddresses(p ID, ms []ma.Multiaddr) { ab.Lock() defer ab.Unlock() - for _, m := range ms { - _, found := ab.addrs[p] - if !found { - ab.addrs[p] = addressMap{} - } - ab.addrs[p][m.String()] = m + amap, found := ab.addrs[p] + if !found { + amap = addressMap{} + ab.addrs[p] = amap } + for _, m := range ms { + amap[m.String()] = m + } +} + +func (ab *addressbook) SetAddresses(p ID, ms []ma.Multiaddr) { + ab.Lock() + defer ab.Unlock() + + amap := addressMap{} + for _, m := range ms { + amap[m.String()] = m + } + ab.addrs[p] = amap // clear what was there before } // KeyBook tracks the Public keys of Peers. diff --git a/p2p/peer/peerstore_test.go b/p2p/peer/peerstore_test.go index ae08e5f14..995dfd3f8 100644 --- a/p2p/peer/peerstore_test.go +++ b/p2p/peer/peerstore_test.go @@ -29,20 +29,37 @@ func TestAddresses(t *testing.T) { id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ") id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn") + id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn") + id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km") ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111") - ma21 := MA(t, "/ip4/1.2.3.2/tcp/1111") - ma22 := MA(t, "/ip4/1.2.3.2/tcp/2222") - ma31 := MA(t, "/ip4/1.2.3.3/tcp/1111") - ma32 := MA(t, "/ip4/1.2.3.3/tcp/2222") - ma33 := MA(t, "/ip4/1.2.3.3/tcp/3333") + ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111") + ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222") + ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111") + ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222") + ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333") + ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111") + ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222") + ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333") + ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444") + ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111") + ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222") + ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333") + ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444") + ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555") ps.AddAddress(id1, ma11) - ps.AddAddress(id2, ma21) - ps.AddAddress(id2, ma22) + ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22}) + ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22}) // idempotency ps.AddAddress(id3, ma31) ps.AddAddress(id3, ma32) ps.AddAddress(id3, ma33) + ps.AddAddress(id3, ma33) // idempotency + ps.AddAddress(id3, ma33) + ps.AddAddresses(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // multiple + ps.AddAddresses(id5, []ma.Multiaddr{ma21, ma22}) // clearing + ps.AddAddresses(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // clearing + ps.SetAddresses(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}) // clearing test := func(exp, act []ma.Multiaddr) { if len(exp) != len(act) { @@ -69,9 +86,13 @@ func TestAddresses(t *testing.T) { test([]ma.Multiaddr{ma11}, ps.Addresses(id1)) test([]ma.Multiaddr{ma21, ma22}, ps.Addresses(id2)) test([]ma.Multiaddr{ma31, ma32, ma33}, ps.Addresses(id3)) + test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.Addresses(id4)) + test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.Addresses(id5)) // test also the PeerInfo return test([]ma.Multiaddr{ma11}, ps.PeerInfo(id1).Addrs) test([]ma.Multiaddr{ma21, ma22}, ps.PeerInfo(id2).Addrs) test([]ma.Multiaddr{ma31, ma32, ma33}, ps.PeerInfo(id3).Addrs) + test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.PeerInfo(id4).Addrs) + test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.PeerInfo(id5).Addrs) } From 3118777a5eb9d51f2a58540ffafddf8994f6f48e Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 26 Jan 2015 12:09:20 -0800 Subject: [PATCH 2/5] p2p/id: SET listen addrs, clearing old ones. --- p2p/protocol/identify/id.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index d45b479c5..187637307 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -176,8 +176,9 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { lmaddrs = append(lmaddrs, maddr) } - // update our peerstore with the addresses. - ids.Host.Peerstore().AddAddresses(p, lmaddrs) + // update our peerstore with the addresses. here, we SET the addresses, clearing old ones. + // We are receiving from the peer itself. this is current address ground truth. + ids.Host.Peerstore().SetAddresses(p, lmaddrs) log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs) // get protocol versions From 983822f335e574fe9bf9afe8b6614cea8ca59580 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 26 Jan 2015 12:43:19 -0800 Subject: [PATCH 3/5] p2p/peer: addresses expire after an hour --- p2p/peer/peerstore.go | 52 +++++++++++++++++++---- p2p/peer/peerstore_test.go | 87 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 8 deletions(-) diff --git a/p2p/peer/peerstore.go b/p2p/peer/peerstore.go index 2f25ae46a..3293cc253 100644 --- a/p2p/peer/peerstore.go +++ b/p2p/peer/peerstore.go @@ -3,6 +3,7 @@ package peer import ( "errors" "sync" + "time" ic "github.com/jbenet/go-ipfs/p2p/crypto" @@ -11,6 +12,11 @@ import ( ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) +const ( + // AddressTTL is the expiration time of addresses. + AddressTTL = time.Hour +) + // Peerstore provides a threadsafe store of Peer related // information. type Peerstore interface { @@ -44,15 +50,28 @@ type AddressBook interface { SetAddresses(ID, []ma.Multiaddr) // Sets given addrs for ID (clears previously stored) } -type addressMap map[string]ma.Multiaddr +type expiringAddr struct { + Addr ma.Multiaddr + TTL time.Time +} + +func (e *expiringAddr) Expired() bool { + return time.Now().After(e.TTL) +} + +type addressMap map[string]expiringAddr type addressbook struct { addrs map[ID]addressMap + ttl time.Duration // initial ttl sync.RWMutex } func newAddressbook() *addressbook { - return &addressbook{addrs: map[ID]addressMap{}} + return &addressbook{ + addrs: map[ID]addressMap{}, + ttl: AddressTTL, + } } func (ab *addressbook) Peers() []ID { @@ -74,11 +93,21 @@ func (ab *addressbook) Addresses(p ID) []ma.Multiaddr { return nil } - maddrs2 := make([]ma.Multiaddr, 0, len(maddrs)) - for _, m := range maddrs { - maddrs2 = append(maddrs2, m) + good := make([]ma.Multiaddr, 0, len(maddrs)) + var expired []string + for s, m := range maddrs { + if m.Expired() { + expired = append(expired, s) + } else { + good = append(good, m.Addr) + } } - return maddrs2 + + // clean up the expired ones. + for _, s := range expired { + delete(ab.addrs[p], s) + } + return good } func (ab *addressbook) AddAddress(p ID, m ma.Multiaddr) { @@ -94,8 +123,14 @@ func (ab *addressbook) AddAddresses(p ID, ms []ma.Multiaddr) { amap = addressMap{} ab.addrs[p] = amap } + + ttl := time.Now().Add(ab.ttl) for _, m := range ms { - amap[m.String()] = m + // re-set all of them for new ttl. + amap[m.String()] = expiringAddr{ + Addr: m, + TTL: ttl, + } } } @@ -104,8 +139,9 @@ func (ab *addressbook) SetAddresses(p ID, ms []ma.Multiaddr) { defer ab.Unlock() amap := addressMap{} + ttl := time.Now().Add(ab.ttl) for _, m := range ms { - amap[m.String()] = m + amap[m.String()] = expiringAddr{Addr: m, TTL: ttl} } ab.addrs[p] = amap // clear what was there before } diff --git a/p2p/peer/peerstore_test.go b/p2p/peer/peerstore_test.go index 995dfd3f8..1edf6ae70 100644 --- a/p2p/peer/peerstore_test.go +++ b/p2p/peer/peerstore_test.go @@ -2,6 +2,7 @@ package peer import ( "testing" + "time" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) @@ -96,3 +97,89 @@ func TestAddresses(t *testing.T) { test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.PeerInfo(id4).Addrs) test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.PeerInfo(id5).Addrs) } + +func TestAddressTTL(t *testing.T) { + + ps := NewPeerstore() + id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN") + ma1 := MA(t, "/ip4/1.2.3.1/tcp/1111") + ma2 := MA(t, "/ip4/2.2.3.2/tcp/2222") + ma3 := MA(t, "/ip4/3.2.3.3/tcp/3333") + ma4 := MA(t, "/ip4/4.2.3.3/tcp/4444") + ma5 := MA(t, "/ip4/5.2.3.3/tcp/5555") + + ps.AddAddress(id1, ma1) + ps.AddAddress(id1, ma2) + ps.AddAddress(id1, ma3) + ps.AddAddress(id1, ma4) + ps.AddAddress(id1, ma5) + + test := func(exp, act []ma.Multiaddr) { + if len(exp) != len(act) { + t.Fatal("lengths not the same") + } + + for _, a := range exp { + found := false + + for _, b := range act { + if a.Equal(b) { + found = true + break + } + } + + if !found { + t.Fatal("expected address %s not found", a) + } + } + } + + testTTL := func(ttle time.Duration, id ID, addr ma.Multiaddr) { + ab := ps.(*peerstore).addressbook + ttlat := ab.addrs[id][addr.String()].TTL + ttla := ttlat.Sub(time.Now()) + if ttla > ttle { + t.Error("ttl is greater than expected", ttle, ttla) + } + if ttla < (ttle / 2) { + t.Error("ttl is smaller than expected", ttle/2, ttla) + } + } + + // should they are there + ab := ps.(*peerstore).addressbook + if len(ab.addrs[id1]) != 5 { + t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1]) + } + + // test the Addresses return value + test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.Addresses(id1)) + test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.PeerInfo(id1).Addrs) + + // check the addr TTL is a bit smaller than the init TTL + testTTL(AddressTTL, id1, ma1) + testTTL(AddressTTL, id1, ma2) + testTTL(AddressTTL, id1, ma3) + testTTL(AddressTTL, id1, ma4) + testTTL(AddressTTL, id1, ma5) + + // change the TTL + setTTL := func(id ID, addr ma.Multiaddr, ttl time.Time) { + a := ab.addrs[id][addr.String()] + a.TTL = ttl + ab.addrs[id][addr.String()] = a + } + setTTL(id1, ma1, time.Now().Add(-1*time.Second)) + setTTL(id1, ma2, time.Now().Add(-1*time.Hour)) + setTTL(id1, ma3, time.Now().Add(-1*AddressTTL)) + + // should no longer list those + test([]ma.Multiaddr{ma4, ma5}, ps.Addresses(id1)) + test([]ma.Multiaddr{ma4, ma5}, ps.PeerInfo(id1).Addrs) + + // should no longer be there + if len(ab.addrs[id1]) != 2 { + t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1]) + } +} From d8a63be3727ab1b274835210bda8955525aa93d1 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 26 Jan 2015 21:55:26 -0800 Subject: [PATCH 4/5] p2p/peer/addr: addrbook RLock fix --- p2p/peer/peerstore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/peer/peerstore.go b/p2p/peer/peerstore.go index 3293cc253..540987b44 100644 --- a/p2p/peer/peerstore.go +++ b/p2p/peer/peerstore.go @@ -85,8 +85,8 @@ func (ab *addressbook) Peers() []ID { } func (ab *addressbook) Addresses(p ID) []ma.Multiaddr { - ab.RLock() - defer ab.RUnlock() + ab.Lock() + defer ab.Unlock() maddrs, found := ab.addrs[p] if !found { From 71f2c4dee04e6f4eea53a48852b0e8f066b0c4ae Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 26 Jan 2015 22:31:58 -0800 Subject: [PATCH 5/5] p2p/peer/peerstore: mu position + comment --- p2p/peer/peerstore.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/p2p/peer/peerstore.go b/p2p/peer/peerstore.go index 540987b44..8be502ca4 100644 --- a/p2p/peer/peerstore.go +++ b/p2p/peer/peerstore.go @@ -62,9 +62,10 @@ func (e *expiringAddr) Expired() bool { type addressMap map[string]expiringAddr type addressbook struct { + sync.RWMutex // guards all fields + addrs map[ID]addressMap ttl time.Duration // initial ttl - sync.RWMutex } func newAddressbook() *addressbook {