diff --git a/routing/dht/Message.go b/routing/dht/Message.go index e2034d7e0..71a9537f9 100644 --- a/routing/dht/Message.go +++ b/routing/dht/Message.go @@ -4,13 +4,13 @@ import ( peer "github.com/jbenet/go-ipfs/peer" ) -// A helper struct to make working with protbuf types easier -type DHTMessage struct { +// Message is a a helper struct which makes working with protbuf types easier +type Message struct { Type PBDHTMessage_MessageType Key string Value []byte Response bool - Id uint64 + ID uint64 Success bool Peers []*peer.Peer } @@ -28,9 +28,10 @@ func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer { return pbp } +// ToProtobuf takes a Message and produces a protobuf with it. // TODO: building the protobuf message this way is a little wasteful // Unused fields wont be omitted, find a better way to do this -func (m *DHTMessage) ToProtobuf() *PBDHTMessage { +func (m *Message) ToProtobuf() *PBDHTMessage { pmes := new(PBDHTMessage) if m.Value != nil { pmes.Value = m.Value @@ -39,7 +40,7 @@ func (m *DHTMessage) ToProtobuf() *PBDHTMessage { pmes.Type = &m.Type pmes.Key = &m.Key pmes.Response = &m.Response - pmes.Id = &m.Id + pmes.Id = &m.ID pmes.Success = &m.Success for _, p := range m.Peers { pmes.Peers = append(pmes.Peers, peerInfo(p)) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index c28ca0a0f..901f1a861 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -25,7 +25,7 @@ import ( type IpfsDHT struct { // Array of routing tables for differently distanced nodes // NOTE: (currently, only a single table is used) - routes []*kb.RoutingTable + routingTables []*kb.RoutingTable network swarm.Network @@ -49,7 +49,7 @@ type IpfsDHT struct { diaglock sync.Mutex // listener is a server to register to listen for responses to messages - listener *MesListener + listener *mesListener } // NewDHT creates a new DHT object with the given peer as the 'local' host @@ -61,12 +61,11 @@ func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT { dht.providers = make(map[u.Key][]*providerInfo) dht.shutdown = make(chan struct{}) - dht.routes = make([]*kb.RoutingTable, 3) - dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30) - dht.routes[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100) - dht.routes[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour) - - dht.listener = NewMesListener() + dht.routingTables = make([]*kb.RoutingTable, 3) + dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30) + dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100) + dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour) + dht.listener = newMesListener() dht.birth = time.Now() return dht } @@ -175,11 +174,11 @@ func (dht *IpfsDHT) cleanExpiredProviders() { } func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error { - pmes := DHTMessage{ + pmes := Message{ Type: PBDHTMessage_PUT_VALUE, Key: key, Value: value, - Id: GenerateMessageID(), + ID: GenerateMessageID(), } mes := swarm.NewMessage(p, pmes.ToProtobuf()) @@ -190,9 +189,9 @@ func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) er func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { u.DOut("handleGetValue for key: %s", pmes.GetKey()) dskey := ds.NewKey(pmes.GetKey()) - resp := &DHTMessage{ + resp := &Message{ Response: true, - Id: pmes.GetId(), + ID: pmes.GetId(), Key: pmes.GetKey(), } iVal, err := dht.datastore.Get(dskey) @@ -222,7 +221,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { } u.DOut("handleGetValue searching level %d clusters", level) - closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) + closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) if closer.ID.Equal(dht.self.ID) { u.DOut("Attempted to return self! this shouldnt happen...") @@ -259,19 +258,19 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) { } func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) { - resp := DHTMessage{ + resp := Message{ Type: pmes.GetType(), Response: true, - Id: pmes.GetId(), + ID: pmes.GetId(), } dht.network.Send(swarm.NewMessage(p, resp.ToProtobuf())) } func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { - resp := DHTMessage{ + resp := Message{ Type: pmes.GetType(), - Id: pmes.GetId(), + ID: pmes.GetId(), Response: true, } defer func() { @@ -280,7 +279,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { }() level := pmes.GetValue()[0] u.DOut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty()) - closest := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) + closest := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) if closest == nil { u.PErr("handleFindPeer: could not find anything.") return @@ -302,10 +301,10 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { } func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { - resp := DHTMessage{ + resp := Message{ Type: PBDHTMessage_GET_PROVIDERS, Key: pmes.GetKey(), - Id: pmes.GetId(), + ID: pmes.GetId(), Response: true, } @@ -318,7 +317,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { level = int(pmes.GetValue()[0]) } - closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) + closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) { resp.Peers = nil } else { @@ -346,7 +345,7 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) { dht.addProviderEntry(key, p) } -// Stop all communications from this peer and shut down +// Halt stops all communications from this peer and shut down func (dht *IpfsDHT) Halt() { dht.shutdown <- struct{}{} dht.network.Close() @@ -362,7 +361,7 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) { // NOTE: not yet finished, low priority func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) { - seq := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) + seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) listenChan := dht.listener.Listen(pmes.GetId(), len(seq), time.Second*30) for _, ps := range seq { @@ -382,22 +381,22 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) { case <-after: //Timeout, return what we have goto out - case req_resp := <-listenChan: - pmes_out := new(PBDHTMessage) - err := proto.Unmarshal(req_resp.Data, pmes_out) + case reqResp := <-listenChan: + pmesOut := new(PBDHTMessage) + err := proto.Unmarshal(reqResp.Data, pmesOut) if err != nil { // It broke? eh, whatever, keep going continue } - buf.Write(req_resp.Data) + buf.Write(reqResp.Data) count-- } } out: - resp := DHTMessage{ + resp := Message{ Type: PBDHTMessage_DIAGNOSTIC, - Id: pmes.GetId(), + ID: pmes.GetId(), Value: buf.Bytes(), Response: true, } @@ -423,40 +422,40 @@ func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Durati // Success! We were given the value return pmes.GetValue(), nil, nil - } else { - // We were given a closer node - var peers []*peer.Peer - for _, pb := range pmes.GetPeers() { - if peer.ID(pb.GetId()).Equal(dht.self.ID) { - continue - } - addr, err := ma.NewMultiaddr(pb.GetAddr()) - if err != nil { - u.PErr(err.Error()) - continue - } - - np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr) - if err != nil { - u.PErr(err.Error()) - continue - } - - peers = append(peers, np) - } - return nil, peers, nil } + + // We were given a closer node + var peers []*peer.Peer + for _, pb := range pmes.GetPeers() { + if peer.ID(pb.GetId()).Equal(dht.self.ID) { + continue + } + addr, err := ma.NewMultiaddr(pb.GetAddr()) + if err != nil { + u.PErr(err.Error()) + continue + } + + np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr) + if err != nil { + u.PErr(err.Error()) + continue + } + + peers = append(peers, np) + } + return nil, peers, nil } // getValueSingle simply performs the get value RPC with the given parameters func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*PBDHTMessage, error) { - pmes := DHTMessage{ + pmes := Message{ Type: PBDHTMessage_GET_VALUE, Key: string(key), Value: []byte{byte(level)}, - Id: GenerateMessageID(), + ID: GenerateMessageID(), } - response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute) + responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute) mes := swarm.NewMessage(p, pmes.ToProtobuf()) t := time.Now() @@ -466,21 +465,21 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio timeup := time.After(timeout) select { case <-timeup: - dht.listener.Unlisten(pmes.Id) + dht.listener.Unlisten(pmes.ID) return nil, u.ErrTimeout - case resp, ok := <-response_chan: + case resp, ok := <-responseChan: if !ok { u.PErr("response channel closed before timeout, please investigate.") return nil, u.ErrTimeout } roundtrip := time.Since(t) resp.Peer.SetLatency(roundtrip) - pmes_out := new(PBDHTMessage) - err := proto.Unmarshal(resp.Data, pmes_out) + pmesOut := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmesOut) if err != nil { return nil, err } - return pmes_out, nil + return pmesOut, nil } } @@ -520,7 +519,7 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration, return nil, u.ErrNotFound } -func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) { +func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) { v, err := dht.datastore.Get(ds.NewKey(string(key))) if err != nil { return nil, err @@ -528,17 +527,18 @@ func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) { return v.([]byte), nil } -func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error { +func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error { return dht.datastore.Put(ds.NewKey(string(key)), value) } +// Update TODO(chas) Document this function func (dht *IpfsDHT) Update(p *peer.Peer) { - for _, route := range dht.routes { + for _, route := range dht.routingTables { removed := route.Update(p) // Only drop the connection if no tables refer to this peer if removed != nil { found := false - for _, r := range dht.routes { + for _, r := range dht.routingTables { if r.Find(removed.ID) != nil { found = true break @@ -551,9 +551,9 @@ func (dht *IpfsDHT) Update(p *peer.Peer) { } } -// Look for a peer with a given ID connected to this dht +// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) { - for _, table := range dht.routes { + for _, table := range dht.routingTables { p := table.Find(id) if p != nil { return p, table @@ -563,72 +563,72 @@ func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) { } func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*PBDHTMessage, error) { - pmes := DHTMessage{ + pmes := Message{ Type: PBDHTMessage_FIND_NODE, Key: string(id), - Id: GenerateMessageID(), + ID: GenerateMessageID(), Value: []byte{byte(level)}, } mes := swarm.NewMessage(p, pmes.ToProtobuf()) - listenChan := dht.listener.Listen(pmes.Id, 1, time.Minute) + listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute) t := time.Now() dht.network.Send(mes) after := time.After(timeout) select { case <-after: - dht.listener.Unlisten(pmes.Id) + dht.listener.Unlisten(pmes.ID) return nil, u.ErrTimeout case resp := <-listenChan: roundtrip := time.Since(t) resp.Peer.SetLatency(roundtrip) - pmes_out := new(PBDHTMessage) - err := proto.Unmarshal(resp.Data, pmes_out) + pmesOut := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmesOut) if err != nil { return nil, err } - return pmes_out, nil + return pmesOut, nil } } -func (dht *IpfsDHT) PrintTables() { - for _, route := range dht.routes { +func (dht *IpfsDHT) printTables() { + for _, route := range dht.routingTables { route.Print() } } func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) { - pmes := DHTMessage{ + pmes := Message{ Type: PBDHTMessage_GET_PROVIDERS, Key: string(key), - Id: GenerateMessageID(), + ID: GenerateMessageID(), Value: []byte{byte(level)}, } mes := swarm.NewMessage(p, pmes.ToProtobuf()) - listenChan := dht.listener.Listen(pmes.Id, 1, time.Minute) + listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute) dht.network.Send(mes) after := time.After(timeout) select { case <-after: - dht.listener.Unlisten(pmes.Id) + dht.listener.Unlisten(pmes.ID) return nil, u.ErrTimeout case resp := <-listenChan: u.DOut("FindProviders: got response.") - pmes_out := new(PBDHTMessage) - err := proto.Unmarshal(resp.Data, pmes_out) + pmesOut := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmesOut) if err != nil { return nil, err } - return pmes_out, nil + return pmesOut, nil } } func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer { - var prov_arr []*peer.Peer + var provArr []*peer.Peer for _, prov := range peers { // Dont add outselves to the list if peer.ID(prov.GetId()).Equal(dht.self.ID) { @@ -650,7 +650,7 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer } } dht.addProviderEntry(key, p) - prov_arr = append(prov_arr, p) + provArr = append(provArr, p) } - return prov_arr + return provArr } diff --git a/routing/dht/dht_logger.go b/routing/dht/dht_logger.go index c892959f0..4a02fc304 100644 --- a/routing/dht/dht_logger.go +++ b/routing/dht/dht_logger.go @@ -7,28 +7,28 @@ import ( u "github.com/jbenet/go-ipfs/util" ) -type logDhtRpc struct { +type logDhtRPC struct { Type string Start time.Time End time.Time Duration time.Duration - RpcCount int + RPCCount int Success bool } -func startNewRpc(name string) *logDhtRpc { - r := new(logDhtRpc) +func startNewRPC(name string) *logDhtRPC { + r := new(logDhtRPC) r.Type = name r.Start = time.Now() return r } -func (l *logDhtRpc) EndLog() { +func (l *logDhtRPC) EndLog() { l.End = time.Now() l.Duration = l.End.Sub(l.Start) } -func (l *logDhtRpc) Print() { +func (l *logDhtRPC) Print() { b, err := json.Marshal(l) if err != nil { u.DOut(err.Error()) diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index a7e14d703..581e19277 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -47,93 +47,93 @@ func setupDHTS(n int, t *testing.T) ([]*ma.Multiaddr, []*peer.Peer, []*IpfsDHT) func TestPing(t *testing.T) { u.Debug = false - addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222") + addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222") if err != nil { t.Fatal(err) } - addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") + addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") if err != nil { t.Fatal(err) } - peer_a := new(peer.Peer) - peer_a.AddAddress(addr_a) - peer_a.ID = peer.ID([]byte("peer_a")) + peerA := new(peer.Peer) + peerA.AddAddress(addrA) + peerA.ID = peer.ID([]byte("peerA")) - peer_b := new(peer.Peer) - peer_b.AddAddress(addr_b) - peer_b.ID = peer.ID([]byte("peer_b")) + peerB := new(peer.Peer) + peerB.AddAddress(addrB) + peerB.ID = peer.ID([]byte("peerB")) - neta := swarm.NewSwarm(peer_a) + neta := swarm.NewSwarm(peerA) err = neta.Listen() if err != nil { t.Fatal(err) } - dht_a := NewDHT(peer_a, neta) + dhtA := NewDHT(peerA, neta) - netb := swarm.NewSwarm(peer_b) + netb := swarm.NewSwarm(peerB) err = netb.Listen() if err != nil { t.Fatal(err) } - dht_b := NewDHT(peer_b, netb) + dhtB := NewDHT(peerB, netb) - dht_a.Start() - dht_b.Start() + dhtA.Start() + dhtB.Start() - _, err = dht_a.Connect(addr_b) + _, err = dhtA.Connect(addrB) if err != nil { t.Fatal(err) } //Test that we can ping the node - err = dht_a.Ping(peer_b, time.Second*2) + err = dhtA.Ping(peerB, time.Second*2) if err != nil { t.Fatal(err) } - dht_a.Halt() - dht_b.Halt() + dhtA.Halt() + dhtB.Halt() } func TestValueGetSet(t *testing.T) { u.Debug = false - addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235") + addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235") if err != nil { t.Fatal(err) } - addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679") + addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679") if err != nil { t.Fatal(err) } - peer_a := new(peer.Peer) - peer_a.AddAddress(addr_a) - peer_a.ID = peer.ID([]byte("peer_a")) + peerA := new(peer.Peer) + peerA.AddAddress(addrA) + peerA.ID = peer.ID([]byte("peerA")) - peer_b := new(peer.Peer) - peer_b.AddAddress(addr_b) - peer_b.ID = peer.ID([]byte("peer_b")) + peerB := new(peer.Peer) + peerB.AddAddress(addrB) + peerB.ID = peer.ID([]byte("peerB")) - neta := swarm.NewSwarm(peer_a) + neta := swarm.NewSwarm(peerA) err = neta.Listen() if err != nil { t.Fatal(err) } - dht_a := NewDHT(peer_a, neta) + dhtA := NewDHT(peerA, neta) - netb := swarm.NewSwarm(peer_b) + netb := swarm.NewSwarm(peerB) err = netb.Listen() if err != nil { t.Fatal(err) } - dht_b := NewDHT(peer_b, netb) + dhtB := NewDHT(peerB, netb) - dht_a.Start() - dht_b.Start() + dhtA.Start() + dhtB.Start() - errsa := dht_a.network.GetChan().Errors - errsb := dht_b.network.GetChan().Errors + errsa := dhtA.network.GetChan().Errors + errsb := dhtB.network.GetChan().Errors go func() { select { case err := <-errsa: @@ -143,14 +143,14 @@ func TestValueGetSet(t *testing.T) { } }() - _, err = dht_a.Connect(addr_b) + _, err = dhtA.Connect(addrB) if err != nil { t.Fatal(err) } - dht_a.PutValue("hello", []byte("world")) + dhtA.PutValue("hello", []byte("world")) - val, err := dht_a.GetValue("hello", time.Second*2) + val, err := dhtA.GetValue("hello", time.Second*2) if err != nil { t.Fatal(err) } diff --git a/routing/dht/diag.go b/routing/dht/diag.go index 03997c5e7..d6bc6bacf 100644 --- a/routing/dht/diag.go +++ b/routing/dht/diag.go @@ -9,11 +9,11 @@ import ( type connDiagInfo struct { Latency time.Duration - Id peer.ID + ID peer.ID } type diagInfo struct { - Id peer.ID + ID peer.ID Connections []connDiagInfo Keys []string LifeSpan time.Duration @@ -32,11 +32,11 @@ func (di *diagInfo) Marshal() []byte { func (dht *IpfsDHT) getDiagInfo() *diagInfo { di := new(diagInfo) di.CodeVersion = "github.com/jbenet/go-ipfs" - di.Id = dht.self.ID + di.ID = dht.self.ID di.LifeSpan = time.Since(dht.birth) di.Keys = nil // Currently no way to query datastore - for _, p := range dht.routes[0].Listpeers() { + for _, p := range dht.routingTables[0].Listpeers() { di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID}) } return di diff --git a/routing/dht/mes_listener.go b/routing/dht/mes_listener.go index 2fcd99fc1..133be877a 100644 --- a/routing/dht/mes_listener.go +++ b/routing/dht/mes_listener.go @@ -8,7 +8,7 @@ import ( u "github.com/jbenet/go-ipfs/util" ) -type MesListener struct { +type mesListener struct { listeners map[uint64]*listenInfo haltchan chan struct{} unlist chan uint64 @@ -36,8 +36,8 @@ type listenInfo struct { id uint64 } -func NewMesListener() *MesListener { - ml := new(MesListener) +func newMesListener() *mesListener { + ml := new(mesListener) ml.haltchan = make(chan struct{}) ml.listeners = make(map[uint64]*listenInfo) ml.nlist = make(chan *listenInfo, 16) @@ -47,7 +47,7 @@ func NewMesListener() *MesListener { return ml } -func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *swarm.Message { +func (ml *mesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *swarm.Message { li := new(listenInfo) li.count = count li.eol = time.Now().Add(timeout) @@ -57,7 +57,7 @@ func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-cha return li.resp } -func (ml *MesListener) Unlisten(id uint64) { +func (ml *mesListener) Unlisten(id uint64) { ml.unlist <- id } @@ -66,18 +66,18 @@ type respMes struct { mes *swarm.Message } -func (ml *MesListener) Respond(id uint64, mes *swarm.Message) { +func (ml *mesListener) Respond(id uint64, mes *swarm.Message) { ml.send <- &respMes{ id: id, mes: mes, } } -func (ml *MesListener) Halt() { +func (ml *mesListener) Halt() { ml.haltchan <- struct{}{} } -func (ml *MesListener) run() { +func (ml *mesListener) run() { for { select { case <-ml.haltchan: diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index a852c5e1f..7c337d306 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-go. +// Code generated by protoc-gen-gogo. // source: messages.proto // DO NOT EDIT! @@ -13,7 +13,7 @@ It has these top-level messages: */ package dht -import proto "code.google.com/p/goprotobuf/proto" +import proto "code.google.com/p/gogoprotobuf/proto" import math "math" // Reference imports to suppress errors if they are not otherwise used. @@ -69,14 +69,17 @@ func (x *PBDHTMessage_MessageType) UnmarshalJSON(data []byte) error { } type PBDHTMessage struct { - Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"` - Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` - Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` - Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` - Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` - Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"` - XXX_unrecognized []byte `json:"-"` + Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"` + Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` + // Unique ID of this message, used to match queries with responses + Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"` + // Signals whether or not this message is a response to another message + Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` + Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` + // Used for returning peers from queries (normally, peers closer to X) + Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *PBDHTMessage) Reset() { *m = PBDHTMessage{} } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 309962a93..4e91e0eb4 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -27,6 +27,7 @@ var KValue = 10 // Its in the paper, i swear var AlphaValue = 3 +// GenerateMessageID creates and returns a new message ID // TODO: determine a way of creating and managing message IDs func GenerateMessageID() uint64 { //return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32()) @@ -39,21 +40,21 @@ func GenerateMessageID() uint64 { // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT -func (s *IpfsDHT) PutValue(key u.Key, value []byte) { +func (dht *IpfsDHT) PutValue(key u.Key, value []byte) { complete := make(chan struct{}) count := 0 - for _, route := range s.routes { + for _, route := range dht.routingTables { peers := route.NearestPeers(kb.ConvertKey(key), KValue) for _, p := range peers { if p == nil { - s.network.Error(kb.ErrLookupFailure) + dht.network.Error(kb.ErrLookupFailure) continue } count++ go func(sp *peer.Peer) { - err := s.putValueToNetwork(sp, string(key), value) + err := dht.putValueToNetwork(sp, string(key), value) if err != nil { - s.network.Error(err) + dht.network.Error(err) } complete <- struct{}{} }(p) @@ -121,8 +122,8 @@ func (ps *peerSet) Size() int { // GetValue searches for the value corresponding to given Key. // If the search does not succeed, a multiaddr string of a closer peer is // returned along with util.ErrSearchIncomplete -func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { - ll := startNewRpc("GET") +func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { + ll := startNewRPC("GET") defer func() { ll.EndLog() ll.Print() @@ -130,29 +131,29 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { // If we have it local, dont bother doing an RPC! // NOTE: this might not be what we want to do... - val, err := s.GetLocal(key) + val, err := dht.getLocal(key) if err == nil { ll.Success = true u.DOut("Found local, returning.") return val, nil } - route_level := 0 - closest := s.routes[route_level].NearestPeers(kb.ConvertKey(key), PoolSize) + routeLevel := 0 + closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize) if closest == nil || len(closest) == 0 { return nil, kb.ErrLookupFailure } - val_chan := make(chan []byte) - npeer_chan := make(chan *peer.Peer, 30) - proc_peer := make(chan *peer.Peer, 30) - err_chan := make(chan error) + valChan := make(chan []byte) + npeerChan := make(chan *peer.Peer, 30) + procPeer := make(chan *peer.Peer, 30) + errChan := make(chan error) after := time.After(timeout) pset := newPeerSet() for _, p := range closest { pset.Add(p) - npeer_chan <- p + npeerChan <- p } c := counter{} @@ -161,17 +162,17 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { go func() { for { select { - case p := <-npeer_chan: + case p := <-npeerChan: count++ if count >= KValue { break } c.Increment() - proc_peer <- p + procPeer <- p default: if c.Size() == 0 { - err_chan <- u.ErrNotFound + errChan <- u.ErrNotFound } } } @@ -180,19 +181,19 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { process := func() { for { select { - case p, ok := <-proc_peer: + case p, ok := <-procPeer: if !ok || p == nil { c.Decrement() return } - val, peers, err := s.getValueOrPeers(p, key, timeout/4, route_level) + val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel) if err != nil { u.DErr(err.Error()) c.Decrement() continue } if val != nil { - val_chan <- val + valChan <- val c.Decrement() return } @@ -201,7 +202,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { // TODO: filter out peers that arent closer if !pset.Contains(np) && pset.Size() < KValue { pset.Add(np) //This is racey... make a single function to do operation - npeer_chan <- np + npeerChan <- np } } c.Decrement() @@ -214,9 +215,9 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { } select { - case val := <-val_chan: + case val := <-valChan: return val, nil - case err := <-err_chan: + case err := <-errChan: return nil, err case <-after: return nil, u.ErrTimeout @@ -226,14 +227,14 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { // Value provider layer of indirection. // This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. -// Announce that this node can provide value for given key -func (s *IpfsDHT) Provide(key u.Key) error { - peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize) +// Provide makes this node announce that it can provide a value for the given key +func (dht *IpfsDHT) Provide(key u.Key) error { + peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize) if len(peers) == 0 { return kb.ErrLookupFailure } - pmes := DHTMessage{ + pmes := Message{ Type: PBDHTMessage_ADD_PROVIDER, Key: string(key), } @@ -241,57 +242,57 @@ func (s *IpfsDHT) Provide(key u.Key) error { for _, p := range peers { mes := swarm.NewMessage(p, pbmes) - s.network.Send(mes) + dht.network.Send(mes) } return nil } // FindProviders searches for peers who can provide the value for given key. -func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { - ll := startNewRpc("FindProviders") +func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { + ll := startNewRPC("FindProviders") defer func() { ll.EndLog() ll.Print() }() u.DOut("Find providers for: '%s'", key) - p := s.routes[0].NearestPeer(kb.ConvertKey(key)) + p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key)) if p == nil { return nil, kb.ErrLookupFailure } - for level := 0; level < len(s.routes); { - pmes, err := s.findProvidersSingle(p, key, level, timeout) + for level := 0; level < len(dht.routingTables); { + pmes, err := dht.findProvidersSingle(p, key, level, timeout) if err != nil { return nil, err } if pmes.GetSuccess() { - provs := s.addPeerList(key, pmes.GetPeers()) + provs := dht.addPeerList(key, pmes.GetPeers()) ll.Success = true return provs, nil - } else { - closer := pmes.GetPeers() - if len(closer) == 0 { - level++ - continue - } - if peer.ID(closer[0].GetId()).Equal(s.self.ID) { - u.DOut("Got myself back as a closer peer.") - return nil, u.ErrNotFound - } - maddr, err := ma.NewMultiaddr(closer[0].GetAddr()) - if err != nil { - // ??? Move up route level??? - panic("not yet implemented") - } - - np, err := s.network.GetConnection(peer.ID(closer[0].GetId()), maddr) - if err != nil { - u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closer[0].GetAddr()) - level++ - continue - } - p = np } + + closer := pmes.GetPeers() + if len(closer) == 0 { + level++ + continue + } + if peer.ID(closer[0].GetId()).Equal(dht.self.ID) { + u.DOut("Got myself back as a closer peer.") + return nil, u.ErrNotFound + } + maddr, err := ma.NewMultiaddr(closer[0].GetAddr()) + if err != nil { + // ??? Move up route level??? + panic("not yet implemented") + } + + np, err := dht.network.GetConnection(peer.ID(closer[0].GetId()), maddr) + if err != nil { + u.PErr("[%s] Failed to connect to: %s", dht.self.ID.Pretty(), closer[0].GetAddr()) + level++ + continue + } + p = np } return nil, u.ErrNotFound } @@ -299,15 +300,15 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, // Find specific Peer // FindPeer searches for a peer with given ID. -func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { +func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { // Check if were already connected to them - p, _ := s.Find(id) + p, _ := dht.Find(id) if p != nil { return p, nil } - route_level := 0 - p = s.routes[route_level].NearestPeer(kb.ConvertPeerID(id)) + routeLevel := 0 + p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id)) if p == nil { return nil, kb.ErrLookupFailure } @@ -315,11 +316,11 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error return p, nil } - for route_level < len(s.routes) { - pmes, err := s.findPeerSingle(p, id, timeout, route_level) + for routeLevel < len(dht.routingTables) { + pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel) plist := pmes.GetPeers() if len(plist) == 0 { - route_level++ + routeLevel++ } found := plist[0] @@ -328,7 +329,7 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error return nil, err } - nxtPeer, err := s.network.GetConnection(peer.ID(found.GetId()), addr) + nxtPeer, err := dht.network.GetConnection(peer.ID(found.GetId()), addr) if err != nil { return nil, err } @@ -337,9 +338,8 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error return nil, errors.New("got back invalid peer from 'successful' response") } return nxtPeer, nil - } else { - p = nxtPeer } + p = nxtPeer } return nil, u.ErrNotFound } @@ -349,16 +349,16 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { // Thoughts: maybe this should accept an ID and do a peer lookup? u.DOut("Enter Ping.") - pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING} + pmes := Message{ID: GenerateMessageID(), Type: PBDHTMessage_PING} mes := swarm.NewMessage(p, pmes.ToProtobuf()) before := time.Now() - response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute) + responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute) dht.network.Send(mes) tout := time.After(timeout) select { - case <-response_chan: + case <-responseChan: roundtrip := time.Since(before) p.SetLatency(roundtrip) u.DOut("Ping took %s.", roundtrip.String()) @@ -366,23 +366,23 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { case <-tout: // Timed out, think about removing peer from network u.DOut("Ping peer timed out.") - dht.listener.Unlisten(pmes.Id) + dht.listener.Unlisten(pmes.ID) return u.ErrTimeout } } -func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { +func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) { u.DOut("Begin Diagnostic") //Send to N closest peers - targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) + targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) // TODO: Add timeout to this struct so nodes know when to return - pmes := DHTMessage{ + pmes := Message{ Type: PBDHTMessage_DIAGNOSTIC, - Id: GenerateMessageID(), + ID: GenerateMessageID(), } - listenChan := dht.listener.Listen(pmes.Id, len(targets), time.Minute*2) + listenChan := dht.listener.Listen(pmes.ID, len(targets), time.Minute*2) pbmes := pmes.ToProtobuf() for _, p := range targets { @@ -398,15 +398,15 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { u.DOut("Diagnostic request timed out.") return out, u.ErrTimeout case resp := <-listenChan: - pmes_out := new(PBDHTMessage) - err := proto.Unmarshal(resp.Data, pmes_out) + pmesOut := new(PBDHTMessage) + err := proto.Unmarshal(resp.Data, pmesOut) if err != nil { // NOTE: here and elsewhere, need to audit error handling, // some errors should be continued on from return out, err } - dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue())) + dec := json.NewDecoder(bytes.NewBuffer(pmesOut.GetValue())) for { di := new(diagInfo) err := dec.Decode(di) diff --git a/routing/kbucket/bucket.go b/routing/kbucket/bucket.go index 1a55a0f69..77ed596db 100644 --- a/routing/kbucket/bucket.go +++ b/routing/kbucket/bucket.go @@ -13,13 +13,13 @@ type Bucket struct { list *list.List } -func NewBucket() *Bucket { +func newBucket() *Bucket { b := new(Bucket) b.list = list.New() return b } -func (b *Bucket) Find(id peer.ID) *list.Element { +func (b *Bucket) find(id peer.ID) *list.Element { b.lk.RLock() defer b.lk.RUnlock() for e := b.list.Front(); e != nil; e = e.Next() { @@ -30,19 +30,19 @@ func (b *Bucket) Find(id peer.ID) *list.Element { return nil } -func (b *Bucket) MoveToFront(e *list.Element) { +func (b *Bucket) moveToFront(e *list.Element) { b.lk.Lock() b.list.MoveToFront(e) b.lk.Unlock() } -func (b *Bucket) PushFront(p *peer.Peer) { +func (b *Bucket) pushFront(p *peer.Peer) { b.lk.Lock() b.list.PushFront(p) b.lk.Unlock() } -func (b *Bucket) PopBack() *peer.Peer { +func (b *Bucket) popBack() *peer.Peer { b.lk.Lock() defer b.lk.Unlock() last := b.list.Back() @@ -50,13 +50,13 @@ func (b *Bucket) PopBack() *peer.Peer { return last.Value.(*peer.Peer) } -func (b *Bucket) Len() int { +func (b *Bucket) len() int { b.lk.RLock() defer b.lk.RUnlock() return b.list.Len() } -// Splits a buckets peers into two buckets, the methods receiver will have +// Split splits a buckets peers into two buckets, the methods receiver will have // peers with CPL equal to cpl, the returned bucket will have peers with CPL // greater than cpl (returned bucket has closer peers) func (b *Bucket) Split(cpl int, target ID) *Bucket { @@ -64,13 +64,13 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket { defer b.lk.Unlock() out := list.New() - newbuck := NewBucket() + newbuck := newBucket() newbuck.list = out e := b.list.Front() for e != nil { - peer_id := ConvertPeerID(e.Value.(*peer.Peer).ID) - peer_cpl := prefLen(peer_id, target) - if peer_cpl > cpl { + peerID := convertPeerID(e.Value.(*peer.Peer).ID) + peerCPL := prefLen(peerID, target) + if peerCPL > cpl { cur := e out.PushBack(e.Value) e = e.Next() diff --git a/routing/kbucket/table.go b/routing/kbucket/table.go index d0137c6d7..3bbd56d07 100644 --- a/routing/kbucket/table.go +++ b/routing/kbucket/table.go @@ -28,11 +28,11 @@ type RoutingTable struct { bucketsize int } -func NewRoutingTable(bucketsize int, local_id ID, latency time.Duration) *RoutingTable { +func newRoutingTable(bucketsize int, localID ID, latency time.Duration) *RoutingTable { rt := new(RoutingTable) - rt.Buckets = []*Bucket{NewBucket()} + rt.Buckets = []*Bucket{newBucket()} rt.bucketsize = bucketsize - rt.local = local_id + rt.local = localID rt.maxLatency = latency return rt } @@ -42,51 +42,50 @@ func NewRoutingTable(bucketsize int, local_id ID, latency time.Duration) *Routin func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { rt.tabLock.Lock() defer rt.tabLock.Unlock() - peer_id := ConvertPeerID(p.ID) - cpl := xor(peer_id, rt.local).commonPrefixLen() + peerID := convertPeerID(p.ID) + cpl := xor(peerID, rt.local).commonPrefixLen() - b_id := cpl - if b_id >= len(rt.Buckets) { - b_id = len(rt.Buckets) - 1 + bucketID := cpl + if bucketID >= len(rt.Buckets) { + bucketID = len(rt.Buckets) - 1 } - bucket := rt.Buckets[b_id] - e := bucket.Find(p.ID) + bucket := rt.Buckets[bucketID] + e := bucket.find(p.ID) if e == nil { // New peer, add to bucket if p.GetLatency() > rt.maxLatency { // Connection doesnt meet requirements, skip! return nil } - bucket.PushFront(p) + bucket.pushFront(p) // Are we past the max bucket size? - if bucket.Len() > rt.bucketsize { - if b_id == len(rt.Buckets)-1 { - new_bucket := bucket.Split(b_id, rt.local) - rt.Buckets = append(rt.Buckets, new_bucket) - if new_bucket.Len() > rt.bucketsize { + if bucket.len() > rt.bucketsize { + if bucketID == len(rt.Buckets)-1 { + newBucket := bucket.Split(bucketID, rt.local) + rt.Buckets = append(rt.Buckets, newBucket) + if newBucket.len() > rt.bucketsize { // TODO: This is a very rare and annoying case panic("Case not handled.") } // If all elements were on left side of split... - if bucket.Len() > rt.bucketsize { - return bucket.PopBack() + if bucket.len() > rt.bucketsize { + return bucket.popBack() } } else { // If the bucket cant split kick out least active node - return bucket.PopBack() + return bucket.popBack() } } return nil - } else { - // If the peer is already in the table, move it to the front. - // This signifies that it it "more active" and the less active nodes - // Will as a result tend towards the back of the list - bucket.MoveToFront(e) - return nil } + // If the peer is already in the table, move it to the front. + // This signifies that it it "more active" and the less active nodes + // Will as a result tend towards the back of the list + bucket.moveToFront(e) + return nil } // A helper struct to sort peers by their distance to the local node @@ -101,7 +100,7 @@ type peerSorterArr []*peerDistance func (p peerSorterArr) Len() int { return len(p) } func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] } func (p peerSorterArr) Less(a, b int) bool { - return p[a].distance.Less(p[b].distance) + return p[a].distance.less(p[b].distance) } // @@ -109,10 +108,10 @@ func (p peerSorterArr) Less(a, b int) bool { func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { for e := peerList.Front(); e != nil; e = e.Next() { p := e.Value.(*peer.Peer) - p_id := ConvertPeerID(p.ID) + pID := convertPeerID(p.ID) pd := peerDistance{ p: p, - distance: xor(target, p_id), + distance: xor(target, pID), } peerArr = append(peerArr, &pd) if e == nil { @@ -125,24 +124,23 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe // Find a specific peer by ID or return nil func (rt *RoutingTable) Find(id peer.ID) *peer.Peer { - srch := rt.NearestPeers(ConvertPeerID(id), 1) + srch := rt.NearestPeers(convertPeerID(id), 1) if len(srch) == 0 || !srch[0].ID.Equal(id) { return nil } return srch[0] } -// Returns a single peer that is nearest to the given ID +// NearestPeer returns a single peer that is nearest to the given ID func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { peers := rt.NearestPeers(id, 1) if len(peers) > 0 { return peers[0] - } else { - return nil } + return nil } -// Returns a list of the 'count' closest peers to the given ID +// NearestPeers returns a list of the 'count' closest peers to the given ID func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { rt.tabLock.RLock() defer rt.tabLock.RUnlock() @@ -156,7 +154,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { bucket = rt.Buckets[cpl] var peerArr peerSorterArr - if bucket.Len() == 0 { + if bucket.len() == 0 { // In the case of an unusual split, one bucket may be empty. // if this happens, search both surrounding buckets for nearest peer if cpl > 0 { @@ -183,17 +181,17 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { return out } -// Returns the total number of peers in the routing table +// Size returns the total number of peers in the routing table func (rt *RoutingTable) Size() int { var tot int for _, buck := range rt.Buckets { - tot += buck.Len() + tot += buck.len() } return tot } // NOTE: This is potentially unsafe... use at your own risk -func (rt *RoutingTable) Listpeers() []*peer.Peer { +func (rt *RoutingTable) listPeers() []*peer.Peer { var peers []*peer.Peer for _, buck := range rt.Buckets { for e := buck.getIter(); e != nil; e = e.Next() { @@ -203,10 +201,10 @@ func (rt *RoutingTable) Listpeers() []*peer.Peer { return peers } -func (rt *RoutingTable) Print() { +func (rt *RoutingTable) print() { fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency) rt.tabLock.RLock() - peers := rt.Listpeers() + peers := rt.listPeers() for i, p := range peers { fmt.Printf("%d) %s %s\n", i, p.ID.Pretty(), p.GetLatency().String()) } diff --git a/routing/kbucket/table_test.go b/routing/kbucket/table_test.go index 02d8f5e0e..ba5baee13 100644 --- a/routing/kbucket/table_test.go +++ b/routing/kbucket/table_test.go @@ -27,28 +27,28 @@ func _randID() ID { // Test basic features of the bucket struct func TestBucket(t *testing.T) { - b := NewBucket() + b := newBucket() peers := make([]*peer.Peer, 100) for i := 0; i < 100; i++ { peers[i] = _randPeer() - b.PushFront(peers[i]) + b.pushFront(peers[i]) } local := _randPeer() - local_id := ConvertPeerID(local.ID) + localID := convertPeerID(local.ID) i := rand.Intn(len(peers)) - e := b.Find(peers[i].ID) + e := b.find(peers[i].ID) if e == nil { t.Errorf("Failed to find peer: %v", peers[i]) } - spl := b.Split(0, ConvertPeerID(local.ID)) + spl := b.Split(0, convertPeerID(local.ID)) llist := b.list for e := llist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(*peer.Peer).ID) - cpl := xor(p, local_id).commonPrefixLen() + p := convertPeerID(e.Value.(*peer.Peer).ID) + cpl := xor(p, localID).commonPrefixLen() if cpl > 0 { t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket") } @@ -56,8 +56,8 @@ func TestBucket(t *testing.T) { rlist := spl.list for e := rlist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(*peer.Peer).ID) - cpl := xor(p, local_id).commonPrefixLen() + p := convertPeerID(e.Value.(*peer.Peer).ID) + cpl := xor(p, localID).commonPrefixLen() if cpl == 0 { t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket") } @@ -67,7 +67,7 @@ func TestBucket(t *testing.T) { // Right now, this just makes sure that it doesnt hang or crash func TestTableUpdate(t *testing.T) { local := _randPeer() - rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour) + rt := newRoutingTable(10, convertPeerID(local.ID), time.Hour) peers := make([]*peer.Peer, 100) for i := 0; i < 100; i++ { @@ -93,7 +93,7 @@ func TestTableUpdate(t *testing.T) { func TestTableFind(t *testing.T) { local := _randPeer() - rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour) + rt := newRoutingTable(10, convertPeerID(local.ID), time.Hour) peers := make([]*peer.Peer, 100) for i := 0; i < 5; i++ { @@ -102,7 +102,7 @@ func TestTableFind(t *testing.T) { } t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) - found := rt.NearestPeer(ConvertPeerID(peers[2].ID)) + found := rt.NearestPeer(convertPeerID(peers[2].ID)) if !found.ID.Equal(peers[2].ID) { t.Fatalf("Failed to lookup known node...") } @@ -110,7 +110,7 @@ func TestTableFind(t *testing.T) { func TestTableFindMultiple(t *testing.T) { local := _randPeer() - rt := NewRoutingTable(20, ConvertPeerID(local.ID), time.Hour) + rt := newRoutingTable(20, convertPeerID(local.ID), time.Hour) peers := make([]*peer.Peer, 100) for i := 0; i < 18; i++ { @@ -119,7 +119,7 @@ func TestTableFindMultiple(t *testing.T) { } t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) - found := rt.NearestPeers(ConvertPeerID(peers[2].ID), 15) + found := rt.NearestPeers(convertPeerID(peers[2].ID), 15) if len(found) != 15 { t.Fatalf("Got back different number of peers than we expected.") } @@ -130,7 +130,7 @@ func TestTableFindMultiple(t *testing.T) { // and set GOMAXPROCS above 1 func TestTableMultithreaded(t *testing.T) { local := peer.ID("localPeer") - tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour) + tab := newRoutingTable(20, convertPeerID(local), time.Hour) var peers []*peer.Peer for i := 0; i < 500; i++ { peers = append(peers, _randPeer()) @@ -167,8 +167,8 @@ func TestTableMultithreaded(t *testing.T) { func BenchmarkUpdates(b *testing.B) { b.StopTimer() - local := ConvertKey("localKey") - tab := NewRoutingTable(20, local, time.Hour) + local := convertKey("localKey") + tab := newRoutingTable(20, local, time.Hour) var peers []*peer.Peer for i := 0; i < b.N; i++ { @@ -183,8 +183,8 @@ func BenchmarkUpdates(b *testing.B) { func BenchmarkFinds(b *testing.B) { b.StopTimer() - local := ConvertKey("localKey") - tab := NewRoutingTable(20, local, time.Hour) + local := convertKey("localKey") + tab := newRoutingTable(20, local, time.Hour) var peers []*peer.Peer for i := 0; i < b.N; i++ { diff --git a/routing/kbucket/util.go b/routing/kbucket/util.go index 32ff2c269..addd92565 100644 --- a/routing/kbucket/util.go +++ b/routing/kbucket/util.go @@ -20,11 +20,11 @@ var ErrLookupFailure = errors.New("failed to find any peer in table") // peer.ID or a util.Key. This unifies the keyspace type ID []byte -func (id ID) Equal(other ID) bool { +func (id ID) equal(other ID) bool { return bytes.Equal(id, other) } -func (id ID) Less(other ID) bool { +func (id ID) less(other ID) bool { a, b := equalizeSizes(id, other) for i := 0; i < len(a); i++ { if a[i] != b[i] { @@ -76,23 +76,23 @@ func equalizeSizes(a, b ID) (ID, ID) { return a, b } -func ConvertPeerID(id peer.ID) ID { +func convertPeerID(id peer.ID) ID { hash := sha256.Sum256(id) return hash[:] } -func ConvertKey(id u.Key) ID { +func convertKey(id u.Key) ID { hash := sha256.Sum256([]byte(id)) return hash[:] } -// Returns true if a is closer to key than b is +// Closer returns true if a is closer to key than b is func Closer(a, b peer.ID, key u.Key) bool { - aid := ConvertPeerID(a) - bid := ConvertPeerID(b) - tgt := ConvertKey(key) + aid := convertPeerID(a) + bid := convertPeerID(b) + tgt := convertKey(key) adist := xor(aid, tgt) bdist := xor(bid, tgt) - return adist.Less(bdist) + return adist.less(bdist) } diff --git a/routing/routing.go b/routing/routing.go index 3826f13cb..fdf350749 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -1,9 +1,10 @@ package routing import ( + "time" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" - "time" ) // IpfsRouting is the routing module interface