Made the DHT module pass golint

This commit is contained in:
Chas Leichner 2014-08-16 23:03:36 -07:00 committed by Juan Batiz-Benet
parent 2c74093bc7
commit 87bfdbc599
13 changed files with 316 additions and 313 deletions

View File

@ -4,13 +4,13 @@ import (
peer "github.com/jbenet/go-ipfs/peer"
)
// A helper struct to make working with protbuf types easier
type DHTMessage struct {
// Message is a a helper struct which makes working with protbuf types easier
type Message struct {
Type PBDHTMessage_MessageType
Key string
Value []byte
Response bool
Id uint64
ID uint64
Success bool
Peers []*peer.Peer
}
@ -28,9 +28,10 @@ func peerInfo(p *peer.Peer) *PBDHTMessage_PBPeer {
return pbp
}
// ToProtobuf takes a Message and produces a protobuf with it.
// TODO: building the protobuf message this way is a little wasteful
// Unused fields wont be omitted, find a better way to do this
func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
func (m *Message) ToProtobuf() *PBDHTMessage {
pmes := new(PBDHTMessage)
if m.Value != nil {
pmes.Value = m.Value
@ -39,7 +40,7 @@ func (m *DHTMessage) ToProtobuf() *PBDHTMessage {
pmes.Type = &m.Type
pmes.Key = &m.Key
pmes.Response = &m.Response
pmes.Id = &m.Id
pmes.Id = &m.ID
pmes.Success = &m.Success
for _, p := range m.Peers {
pmes.Peers = append(pmes.Peers, peerInfo(p))

View File

@ -25,7 +25,7 @@ import (
type IpfsDHT struct {
// Array of routing tables for differently distanced nodes
// NOTE: (currently, only a single table is used)
routes []*kb.RoutingTable
routingTables []*kb.RoutingTable
network swarm.Network
@ -49,7 +49,7 @@ type IpfsDHT struct {
diaglock sync.Mutex
// listener is a server to register to listen for responses to messages
listener *MesListener
listener *mesListener
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
@ -61,12 +61,11 @@ func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
dht.providers = make(map[u.Key][]*providerInfo)
dht.shutdown = make(chan struct{})
dht.routes = make([]*kb.RoutingTable, 3)
dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
dht.routes[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
dht.routes[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.listener = NewMesListener()
dht.routingTables = make([]*kb.RoutingTable, 3)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.listener = newMesListener()
dht.birth = time.Now()
return dht
}
@ -175,11 +174,11 @@ func (dht *IpfsDHT) cleanExpiredProviders() {
}
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
pmes := DHTMessage{
pmes := Message{
Type: PBDHTMessage_PUT_VALUE,
Key: key,
Value: value,
Id: GenerateMessageID(),
ID: GenerateMessageID(),
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
@ -190,9 +189,9 @@ func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) er
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
u.DOut("handleGetValue for key: %s", pmes.GetKey())
dskey := ds.NewKey(pmes.GetKey())
resp := &DHTMessage{
resp := &Message{
Response: true,
Id: pmes.GetId(),
ID: pmes.GetId(),
Key: pmes.GetKey(),
}
iVal, err := dht.datastore.Get(dskey)
@ -222,7 +221,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
}
u.DOut("handleGetValue searching level %d clusters", level)
closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if closer.ID.Equal(dht.self.ID) {
u.DOut("Attempted to return self! this shouldnt happen...")
@ -259,19 +258,19 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *PBDHTMessage) {
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
resp := DHTMessage{
resp := Message{
Type: pmes.GetType(),
Response: true,
Id: pmes.GetId(),
ID: pmes.GetId(),
}
dht.network.Send(swarm.NewMessage(p, resp.ToProtobuf()))
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
resp := DHTMessage{
resp := Message{
Type: pmes.GetType(),
Id: pmes.GetId(),
ID: pmes.GetId(),
Response: true,
}
defer func() {
@ -280,7 +279,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
}()
level := pmes.GetValue()[0]
u.DOut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
closest := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
closest := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if closest == nil {
u.PErr("handleFindPeer: could not find anything.")
return
@ -302,10 +301,10 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
resp := DHTMessage{
resp := Message{
Type: PBDHTMessage_GET_PROVIDERS,
Key: pmes.GetKey(),
Id: pmes.GetId(),
ID: pmes.GetId(),
Response: true,
}
@ -318,7 +317,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
level = int(pmes.GetValue()[0])
}
closer := dht.routes[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
closer := dht.routingTables[level].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey())))
if kb.Closer(dht.self.ID, closer.ID, u.Key(pmes.GetKey())) {
resp.Peers = nil
} else {
@ -346,7 +345,7 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
dht.addProviderEntry(key, p)
}
// Stop all communications from this peer and shut down
// Halt stops all communications from this peer and shut down
func (dht *IpfsDHT) Halt() {
dht.shutdown <- struct{}{}
dht.network.Close()
@ -362,7 +361,7 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
seq := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
listenChan := dht.listener.Listen(pmes.GetId(), len(seq), time.Second*30)
for _, ps := range seq {
@ -382,22 +381,22 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
case <-after:
//Timeout, return what we have
goto out
case req_resp := <-listenChan:
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(req_resp.Data, pmes_out)
case reqResp := <-listenChan:
pmesOut := new(PBDHTMessage)
err := proto.Unmarshal(reqResp.Data, pmesOut)
if err != nil {
// It broke? eh, whatever, keep going
continue
}
buf.Write(req_resp.Data)
buf.Write(reqResp.Data)
count--
}
}
out:
resp := DHTMessage{
resp := Message{
Type: PBDHTMessage_DIAGNOSTIC,
Id: pmes.GetId(),
ID: pmes.GetId(),
Value: buf.Bytes(),
Response: true,
}
@ -423,40 +422,40 @@ func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Durati
// Success! We were given the value
return pmes.GetValue(), nil, nil
} else {
// We were given a closer node
var peers []*peer.Peer
for _, pb := range pmes.GetPeers() {
if peer.ID(pb.GetId()).Equal(dht.self.ID) {
continue
}
addr, err := ma.NewMultiaddr(pb.GetAddr())
if err != nil {
u.PErr(err.Error())
continue
}
np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
if err != nil {
u.PErr(err.Error())
continue
}
peers = append(peers, np)
}
return nil, peers, nil
}
// We were given a closer node
var peers []*peer.Peer
for _, pb := range pmes.GetPeers() {
if peer.ID(pb.GetId()).Equal(dht.self.ID) {
continue
}
addr, err := ma.NewMultiaddr(pb.GetAddr())
if err != nil {
u.PErr(err.Error())
continue
}
np, err := dht.network.GetConnection(peer.ID(pb.GetId()), addr)
if err != nil {
u.PErr(err.Error())
continue
}
peers = append(peers, np)
}
return nil, peers, nil
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*PBDHTMessage, error) {
pmes := DHTMessage{
pmes := Message{
Type: PBDHTMessage_GET_VALUE,
Key: string(key),
Value: []byte{byte(level)},
Id: GenerateMessageID(),
ID: GenerateMessageID(),
}
response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute)
responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
mes := swarm.NewMessage(p, pmes.ToProtobuf())
t := time.Now()
@ -466,21 +465,21 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
timeup := time.After(timeout)
select {
case <-timeup:
dht.listener.Unlisten(pmes.Id)
dht.listener.Unlisten(pmes.ID)
return nil, u.ErrTimeout
case resp, ok := <-response_chan:
case resp, ok := <-responseChan:
if !ok {
u.PErr("response channel closed before timeout, please investigate.")
return nil, u.ErrTimeout
}
roundtrip := time.Since(t)
resp.Peer.SetLatency(roundtrip)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
pmesOut := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmesOut)
if err != nil {
return nil, err
}
return pmes_out, nil
return pmesOut, nil
}
}
@ -520,7 +519,7 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
return nil, u.ErrNotFound
}
func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
v, err := dht.datastore.Get(ds.NewKey(string(key)))
if err != nil {
return nil, err
@ -528,17 +527,18 @@ func (dht *IpfsDHT) GetLocal(key u.Key) ([]byte, error) {
return v.([]byte), nil
}
func (dht *IpfsDHT) PutLocal(key u.Key, value []byte) error {
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
return dht.datastore.Put(ds.NewKey(string(key)), value)
}
// Update TODO(chas) Document this function
func (dht *IpfsDHT) Update(p *peer.Peer) {
for _, route := range dht.routes {
for _, route := range dht.routingTables {
removed := route.Update(p)
// Only drop the connection if no tables refer to this peer
if removed != nil {
found := false
for _, r := range dht.routes {
for _, r := range dht.routingTables {
if r.Find(removed.ID) != nil {
found = true
break
@ -551,9 +551,9 @@ func (dht *IpfsDHT) Update(p *peer.Peer) {
}
}
// Look for a peer with a given ID connected to this dht
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routes {
for _, table := range dht.routingTables {
p := table.Find(id)
if p != nil {
return p, table
@ -563,72 +563,72 @@ func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
}
func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Duration, level int) (*PBDHTMessage, error) {
pmes := DHTMessage{
pmes := Message{
Type: PBDHTMessage_FIND_NODE,
Key: string(id),
Id: GenerateMessageID(),
ID: GenerateMessageID(),
Value: []byte{byte(level)},
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.Id, 1, time.Minute)
listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
t := time.Now()
dht.network.Send(mes)
after := time.After(timeout)
select {
case <-after:
dht.listener.Unlisten(pmes.Id)
dht.listener.Unlisten(pmes.ID)
return nil, u.ErrTimeout
case resp := <-listenChan:
roundtrip := time.Since(t)
resp.Peer.SetLatency(roundtrip)
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
pmesOut := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmesOut)
if err != nil {
return nil, err
}
return pmes_out, nil
return pmesOut, nil
}
}
func (dht *IpfsDHT) PrintTables() {
for _, route := range dht.routes {
func (dht *IpfsDHT) printTables() {
for _, route := range dht.routingTables {
route.Print()
}
}
func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, timeout time.Duration) (*PBDHTMessage, error) {
pmes := DHTMessage{
pmes := Message{
Type: PBDHTMessage_GET_PROVIDERS,
Key: string(key),
Id: GenerateMessageID(),
ID: GenerateMessageID(),
Value: []byte{byte(level)},
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.Id, 1, time.Minute)
listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.network.Send(mes)
after := time.After(timeout)
select {
case <-after:
dht.listener.Unlisten(pmes.Id)
dht.listener.Unlisten(pmes.ID)
return nil, u.ErrTimeout
case resp := <-listenChan:
u.DOut("FindProviders: got response.")
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
pmesOut := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmesOut)
if err != nil {
return nil, err
}
return pmes_out, nil
return pmesOut, nil
}
}
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer {
var prov_arr []*peer.Peer
var provArr []*peer.Peer
for _, prov := range peers {
// Dont add outselves to the list
if peer.ID(prov.GetId()).Equal(dht.self.ID) {
@ -650,7 +650,7 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer
}
}
dht.addProviderEntry(key, p)
prov_arr = append(prov_arr, p)
provArr = append(provArr, p)
}
return prov_arr
return provArr
}

View File

@ -7,28 +7,28 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
type logDhtRpc struct {
type logDhtRPC struct {
Type string
Start time.Time
End time.Time
Duration time.Duration
RpcCount int
RPCCount int
Success bool
}
func startNewRpc(name string) *logDhtRpc {
r := new(logDhtRpc)
func startNewRPC(name string) *logDhtRPC {
r := new(logDhtRPC)
r.Type = name
r.Start = time.Now()
return r
}
func (l *logDhtRpc) EndLog() {
func (l *logDhtRPC) EndLog() {
l.End = time.Now()
l.Duration = l.End.Sub(l.Start)
}
func (l *logDhtRpc) Print() {
func (l *logDhtRPC) Print() {
b, err := json.Marshal(l)
if err != nil {
u.DOut(err.Error())

View File

@ -47,93 +47,93 @@ func setupDHTS(n int, t *testing.T) ([]*ma.Multiaddr, []*peer.Peer, []*IpfsDHT)
func TestPing(t *testing.T) {
u.Debug = false
addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222")
addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222")
if err != nil {
t.Fatal(err)
}
addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
if err != nil {
t.Fatal(err)
}
peer_a := new(peer.Peer)
peer_a.AddAddress(addr_a)
peer_a.ID = peer.ID([]byte("peer_a"))
peerA := new(peer.Peer)
peerA.AddAddress(addrA)
peerA.ID = peer.ID([]byte("peerA"))
peer_b := new(peer.Peer)
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
peerB := new(peer.Peer)
peerB.AddAddress(addrB)
peerB.ID = peer.ID([]byte("peerB"))
neta := swarm.NewSwarm(peer_a)
neta := swarm.NewSwarm(peerA)
err = neta.Listen()
if err != nil {
t.Fatal(err)
}
dht_a := NewDHT(peer_a, neta)
dhtA := NewDHT(peerA, neta)
netb := swarm.NewSwarm(peer_b)
netb := swarm.NewSwarm(peerB)
err = netb.Listen()
if err != nil {
t.Fatal(err)
}
dht_b := NewDHT(peer_b, netb)
dhtB := NewDHT(peerB, netb)
dht_a.Start()
dht_b.Start()
dhtA.Start()
dhtB.Start()
_, err = dht_a.Connect(addr_b)
_, err = dhtA.Connect(addrB)
if err != nil {
t.Fatal(err)
}
//Test that we can ping the node
err = dht_a.Ping(peer_b, time.Second*2)
err = dhtA.Ping(peerB, time.Second*2)
if err != nil {
t.Fatal(err)
}
dht_a.Halt()
dht_b.Halt()
dhtA.Halt()
dhtB.Halt()
}
func TestValueGetSet(t *testing.T) {
u.Debug = false
addr_a, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235")
if err != nil {
t.Fatal(err)
}
addr_b, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5679")
if err != nil {
t.Fatal(err)
}
peer_a := new(peer.Peer)
peer_a.AddAddress(addr_a)
peer_a.ID = peer.ID([]byte("peer_a"))
peerA := new(peer.Peer)
peerA.AddAddress(addrA)
peerA.ID = peer.ID([]byte("peerA"))
peer_b := new(peer.Peer)
peer_b.AddAddress(addr_b)
peer_b.ID = peer.ID([]byte("peer_b"))
peerB := new(peer.Peer)
peerB.AddAddress(addrB)
peerB.ID = peer.ID([]byte("peerB"))
neta := swarm.NewSwarm(peer_a)
neta := swarm.NewSwarm(peerA)
err = neta.Listen()
if err != nil {
t.Fatal(err)
}
dht_a := NewDHT(peer_a, neta)
dhtA := NewDHT(peerA, neta)
netb := swarm.NewSwarm(peer_b)
netb := swarm.NewSwarm(peerB)
err = netb.Listen()
if err != nil {
t.Fatal(err)
}
dht_b := NewDHT(peer_b, netb)
dhtB := NewDHT(peerB, netb)
dht_a.Start()
dht_b.Start()
dhtA.Start()
dhtB.Start()
errsa := dht_a.network.GetChan().Errors
errsb := dht_b.network.GetChan().Errors
errsa := dhtA.network.GetChan().Errors
errsb := dhtB.network.GetChan().Errors
go func() {
select {
case err := <-errsa:
@ -143,14 +143,14 @@ func TestValueGetSet(t *testing.T) {
}
}()
_, err = dht_a.Connect(addr_b)
_, err = dhtA.Connect(addrB)
if err != nil {
t.Fatal(err)
}
dht_a.PutValue("hello", []byte("world"))
dhtA.PutValue("hello", []byte("world"))
val, err := dht_a.GetValue("hello", time.Second*2)
val, err := dhtA.GetValue("hello", time.Second*2)
if err != nil {
t.Fatal(err)
}

View File

@ -9,11 +9,11 @@ import (
type connDiagInfo struct {
Latency time.Duration
Id peer.ID
ID peer.ID
}
type diagInfo struct {
Id peer.ID
ID peer.ID
Connections []connDiagInfo
Keys []string
LifeSpan time.Duration
@ -32,11 +32,11 @@ func (di *diagInfo) Marshal() []byte {
func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di := new(diagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.Id = dht.self.ID
di.ID = dht.self.ID
di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore
for _, p := range dht.routes[0].Listpeers() {
for _, p := range dht.routingTables[0].Listpeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID})
}
return di

View File

@ -8,7 +8,7 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
type MesListener struct {
type mesListener struct {
listeners map[uint64]*listenInfo
haltchan chan struct{}
unlist chan uint64
@ -36,8 +36,8 @@ type listenInfo struct {
id uint64
}
func NewMesListener() *MesListener {
ml := new(MesListener)
func newMesListener() *mesListener {
ml := new(mesListener)
ml.haltchan = make(chan struct{})
ml.listeners = make(map[uint64]*listenInfo)
ml.nlist = make(chan *listenInfo, 16)
@ -47,7 +47,7 @@ func NewMesListener() *MesListener {
return ml
}
func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *swarm.Message {
func (ml *mesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *swarm.Message {
li := new(listenInfo)
li.count = count
li.eol = time.Now().Add(timeout)
@ -57,7 +57,7 @@ func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-cha
return li.resp
}
func (ml *MesListener) Unlisten(id uint64) {
func (ml *mesListener) Unlisten(id uint64) {
ml.unlist <- id
}
@ -66,18 +66,18 @@ type respMes struct {
mes *swarm.Message
}
func (ml *MesListener) Respond(id uint64, mes *swarm.Message) {
func (ml *mesListener) Respond(id uint64, mes *swarm.Message) {
ml.send <- &respMes{
id: id,
mes: mes,
}
}
func (ml *MesListener) Halt() {
func (ml *mesListener) Halt() {
ml.haltchan <- struct{}{}
}
func (ml *MesListener) run() {
func (ml *mesListener) run() {
for {
select {
case <-ml.haltchan:

View File

@ -1,4 +1,4 @@
// Code generated by protoc-gen-go.
// Code generated by protoc-gen-gogo.
// source: messages.proto
// DO NOT EDIT!
@ -13,7 +13,7 @@ It has these top-level messages:
*/
package dht
import proto "code.google.com/p/goprotobuf/proto"
import proto "code.google.com/p/gogoprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
@ -69,14 +69,17 @@ func (x *PBDHTMessage_MessageType) UnmarshalJSON(data []byte) error {
}
type PBDHTMessage struct {
Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"`
XXX_unrecognized []byte `json:"-"`
Type *PBDHTMessage_MessageType `protobuf:"varint,1,req,name=type,enum=dht.PBDHTMessage_MessageType" json:"type,omitempty"`
Key *string `protobuf:"bytes,2,opt,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
// Unique ID of this message, used to match queries with responses
Id *uint64 `protobuf:"varint,4,req,name=id" json:"id,omitempty"`
// Signals whether or not this message is a response to another message
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
// Used for returning peers from queries (normally, peers closer to X)
Peers []*PBDHTMessage_PBPeer `protobuf:"bytes,7,rep,name=peers" json:"peers,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBDHTMessage) Reset() { *m = PBDHTMessage{} }

View File

@ -27,6 +27,7 @@ var KValue = 10
// Its in the paper, i swear
var AlphaValue = 3
// GenerateMessageID creates and returns a new message ID
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
@ -39,21 +40,21 @@ func GenerateMessageID() uint64 {
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) {
complete := make(chan struct{})
count := 0
for _, route := range s.routes {
for _, route := range dht.routingTables {
peers := route.NearestPeers(kb.ConvertKey(key), KValue)
for _, p := range peers {
if p == nil {
s.network.Error(kb.ErrLookupFailure)
dht.network.Error(kb.ErrLookupFailure)
continue
}
count++
go func(sp *peer.Peer) {
err := s.putValueToNetwork(sp, string(key), value)
err := dht.putValueToNetwork(sp, string(key), value)
if err != nil {
s.network.Error(err)
dht.network.Error(err)
}
complete <- struct{}{}
}(p)
@ -121,8 +122,8 @@ func (ps *peerSet) Size() int {
// GetValue searches for the value corresponding to given Key.
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
ll := startNewRpc("GET")
func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
ll := startNewRPC("GET")
defer func() {
ll.EndLog()
ll.Print()
@ -130,29 +131,29 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// If we have it local, dont bother doing an RPC!
// NOTE: this might not be what we want to do...
val, err := s.GetLocal(key)
val, err := dht.getLocal(key)
if err == nil {
ll.Success = true
u.DOut("Found local, returning.")
return val, nil
}
route_level := 0
closest := s.routes[route_level].NearestPeers(kb.ConvertKey(key), PoolSize)
routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure
}
val_chan := make(chan []byte)
npeer_chan := make(chan *peer.Peer, 30)
proc_peer := make(chan *peer.Peer, 30)
err_chan := make(chan error)
valChan := make(chan []byte)
npeerChan := make(chan *peer.Peer, 30)
procPeer := make(chan *peer.Peer, 30)
errChan := make(chan error)
after := time.After(timeout)
pset := newPeerSet()
for _, p := range closest {
pset.Add(p)
npeer_chan <- p
npeerChan <- p
}
c := counter{}
@ -161,17 +162,17 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
go func() {
for {
select {
case p := <-npeer_chan:
case p := <-npeerChan:
count++
if count >= KValue {
break
}
c.Increment()
proc_peer <- p
procPeer <- p
default:
if c.Size() == 0 {
err_chan <- u.ErrNotFound
errChan <- u.ErrNotFound
}
}
}
@ -180,19 +181,19 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
process := func() {
for {
select {
case p, ok := <-proc_peer:
case p, ok := <-procPeer:
if !ok || p == nil {
c.Decrement()
return
}
val, peers, err := s.getValueOrPeers(p, key, timeout/4, route_level)
val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
if err != nil {
u.DErr(err.Error())
c.Decrement()
continue
}
if val != nil {
val_chan <- val
valChan <- val
c.Decrement()
return
}
@ -201,7 +202,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// TODO: filter out peers that arent closer
if !pset.Contains(np) && pset.Size() < KValue {
pset.Add(np) //This is racey... make a single function to do operation
npeer_chan <- np
npeerChan <- np
}
}
c.Decrement()
@ -214,9 +215,9 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
}
select {
case val := <-val_chan:
case val := <-valChan:
return val, nil
case err := <-err_chan:
case err := <-errChan:
return nil, err
case <-after:
return nil, u.ErrTimeout
@ -226,14 +227,14 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
// Announce that this node can provide value for given key
func (s *IpfsDHT) Provide(key u.Key) error {
peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize)
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(key u.Key) error {
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
if len(peers) == 0 {
return kb.ErrLookupFailure
}
pmes := DHTMessage{
pmes := Message{
Type: PBDHTMessage_ADD_PROVIDER,
Key: string(key),
}
@ -241,57 +242,57 @@ func (s *IpfsDHT) Provide(key u.Key) error {
for _, p := range peers {
mes := swarm.NewMessage(p, pbmes)
s.network.Send(mes)
dht.network.Send(mes)
}
return nil
}
// FindProviders searches for peers who can provide the value for given key.
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
ll := startNewRpc("FindProviders")
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
ll := startNewRPC("FindProviders")
defer func() {
ll.EndLog()
ll.Print()
}()
u.DOut("Find providers for: '%s'", key)
p := s.routes[0].NearestPeer(kb.ConvertKey(key))
p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
return nil, kb.ErrLookupFailure
}
for level := 0; level < len(s.routes); {
pmes, err := s.findProvidersSingle(p, key, level, timeout)
for level := 0; level < len(dht.routingTables); {
pmes, err := dht.findProvidersSingle(p, key, level, timeout)
if err != nil {
return nil, err
}
if pmes.GetSuccess() {
provs := s.addPeerList(key, pmes.GetPeers())
provs := dht.addPeerList(key, pmes.GetPeers())
ll.Success = true
return provs, nil
} else {
closer := pmes.GetPeers()
if len(closer) == 0 {
level++
continue
}
if peer.ID(closer[0].GetId()).Equal(s.self.ID) {
u.DOut("Got myself back as a closer peer.")
return nil, u.ErrNotFound
}
maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
if err != nil {
// ??? Move up route level???
panic("not yet implemented")
}
np, err := s.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
if err != nil {
u.PErr("[%s] Failed to connect to: %s", s.self.ID.Pretty(), closer[0].GetAddr())
level++
continue
}
p = np
}
closer := pmes.GetPeers()
if len(closer) == 0 {
level++
continue
}
if peer.ID(closer[0].GetId()).Equal(dht.self.ID) {
u.DOut("Got myself back as a closer peer.")
return nil, u.ErrNotFound
}
maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
if err != nil {
// ??? Move up route level???
panic("not yet implemented")
}
np, err := dht.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
if err != nil {
u.PErr("[%s] Failed to connect to: %s", dht.self.ID.Pretty(), closer[0].GetAddr())
level++
continue
}
p = np
}
return nil, u.ErrNotFound
}
@ -299,15 +300,15 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
// Find specific Peer
// FindPeer searches for a peer with given ID.
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
// Check if were already connected to them
p, _ := s.Find(id)
p, _ := dht.Find(id)
if p != nil {
return p, nil
}
route_level := 0
p = s.routes[route_level].NearestPeer(kb.ConvertPeerID(id))
routeLevel := 0
p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
if p == nil {
return nil, kb.ErrLookupFailure
}
@ -315,11 +316,11 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
return p, nil
}
for route_level < len(s.routes) {
pmes, err := s.findPeerSingle(p, id, timeout, route_level)
for routeLevel < len(dht.routingTables) {
pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
plist := pmes.GetPeers()
if len(plist) == 0 {
route_level++
routeLevel++
}
found := plist[0]
@ -328,7 +329,7 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
return nil, err
}
nxtPeer, err := s.network.GetConnection(peer.ID(found.GetId()), addr)
nxtPeer, err := dht.network.GetConnection(peer.ID(found.GetId()), addr)
if err != nil {
return nil, err
}
@ -337,9 +338,8 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
return nil, errors.New("got back invalid peer from 'successful' response")
}
return nxtPeer, nil
} else {
p = nxtPeer
}
p = nxtPeer
}
return nil, u.ErrNotFound
}
@ -349,16 +349,16 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
u.DOut("Enter Ping.")
pmes := DHTMessage{Id: GenerateMessageID(), Type: PBDHTMessage_PING}
pmes := Message{ID: GenerateMessageID(), Type: PBDHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
response_chan := dht.listener.Listen(pmes.Id, 1, time.Minute)
responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.network.Send(mes)
tout := time.After(timeout)
select {
case <-response_chan:
case <-responseChan:
roundtrip := time.Since(before)
p.SetLatency(roundtrip)
u.DOut("Ping took %s.", roundtrip.String())
@ -366,23 +366,23 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
case <-tout:
// Timed out, think about removing peer from network
u.DOut("Ping peer timed out.")
dht.listener.Unlisten(pmes.Id)
dht.listener.Unlisten(pmes.ID)
return u.ErrTimeout
}
}
func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
u.DOut("Begin Diagnostic")
//Send to N closest peers
targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
// TODO: Add timeout to this struct so nodes know when to return
pmes := DHTMessage{
pmes := Message{
Type: PBDHTMessage_DIAGNOSTIC,
Id: GenerateMessageID(),
ID: GenerateMessageID(),
}
listenChan := dht.listener.Listen(pmes.Id, len(targets), time.Minute*2)
listenChan := dht.listener.Listen(pmes.ID, len(targets), time.Minute*2)
pbmes := pmes.ToProtobuf()
for _, p := range targets {
@ -398,15 +398,15 @@ func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
u.DOut("Diagnostic request timed out.")
return out, u.ErrTimeout
case resp := <-listenChan:
pmes_out := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmes_out)
pmesOut := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmesOut)
if err != nil {
// NOTE: here and elsewhere, need to audit error handling,
// some errors should be continued on from
return out, err
}
dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue()))
dec := json.NewDecoder(bytes.NewBuffer(pmesOut.GetValue()))
for {
di := new(diagInfo)
err := dec.Decode(di)

View File

@ -13,13 +13,13 @@ type Bucket struct {
list *list.List
}
func NewBucket() *Bucket {
func newBucket() *Bucket {
b := new(Bucket)
b.list = list.New()
return b
}
func (b *Bucket) Find(id peer.ID) *list.Element {
func (b *Bucket) find(id peer.ID) *list.Element {
b.lk.RLock()
defer b.lk.RUnlock()
for e := b.list.Front(); e != nil; e = e.Next() {
@ -30,19 +30,19 @@ func (b *Bucket) Find(id peer.ID) *list.Element {
return nil
}
func (b *Bucket) MoveToFront(e *list.Element) {
func (b *Bucket) moveToFront(e *list.Element) {
b.lk.Lock()
b.list.MoveToFront(e)
b.lk.Unlock()
}
func (b *Bucket) PushFront(p *peer.Peer) {
func (b *Bucket) pushFront(p *peer.Peer) {
b.lk.Lock()
b.list.PushFront(p)
b.lk.Unlock()
}
func (b *Bucket) PopBack() *peer.Peer {
func (b *Bucket) popBack() *peer.Peer {
b.lk.Lock()
defer b.lk.Unlock()
last := b.list.Back()
@ -50,13 +50,13 @@ func (b *Bucket) PopBack() *peer.Peer {
return last.Value.(*peer.Peer)
}
func (b *Bucket) Len() int {
func (b *Bucket) len() int {
b.lk.RLock()
defer b.lk.RUnlock()
return b.list.Len()
}
// Splits a buckets peers into two buckets, the methods receiver will have
// Split splits a buckets peers into two buckets, the methods receiver will have
// peers with CPL equal to cpl, the returned bucket will have peers with CPL
// greater than cpl (returned bucket has closer peers)
func (b *Bucket) Split(cpl int, target ID) *Bucket {
@ -64,13 +64,13 @@ func (b *Bucket) Split(cpl int, target ID) *Bucket {
defer b.lk.Unlock()
out := list.New()
newbuck := NewBucket()
newbuck := newBucket()
newbuck.list = out
e := b.list.Front()
for e != nil {
peer_id := ConvertPeerID(e.Value.(*peer.Peer).ID)
peer_cpl := prefLen(peer_id, target)
if peer_cpl > cpl {
peerID := convertPeerID(e.Value.(*peer.Peer).ID)
peerCPL := prefLen(peerID, target)
if peerCPL > cpl {
cur := e
out.PushBack(e.Value)
e = e.Next()

View File

@ -28,11 +28,11 @@ type RoutingTable struct {
bucketsize int
}
func NewRoutingTable(bucketsize int, local_id ID, latency time.Duration) *RoutingTable {
func newRoutingTable(bucketsize int, localID ID, latency time.Duration) *RoutingTable {
rt := new(RoutingTable)
rt.Buckets = []*Bucket{NewBucket()}
rt.Buckets = []*Bucket{newBucket()}
rt.bucketsize = bucketsize
rt.local = local_id
rt.local = localID
rt.maxLatency = latency
return rt
}
@ -42,51 +42,50 @@ func NewRoutingTable(bucketsize int, local_id ID, latency time.Duration) *Routin
func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
peer_id := ConvertPeerID(p.ID)
cpl := xor(peer_id, rt.local).commonPrefixLen()
peerID := convertPeerID(p.ID)
cpl := xor(peerID, rt.local).commonPrefixLen()
b_id := cpl
if b_id >= len(rt.Buckets) {
b_id = len(rt.Buckets) - 1
bucketID := cpl
if bucketID >= len(rt.Buckets) {
bucketID = len(rt.Buckets) - 1
}
bucket := rt.Buckets[b_id]
e := bucket.Find(p.ID)
bucket := rt.Buckets[bucketID]
e := bucket.find(p.ID)
if e == nil {
// New peer, add to bucket
if p.GetLatency() > rt.maxLatency {
// Connection doesnt meet requirements, skip!
return nil
}
bucket.PushFront(p)
bucket.pushFront(p)
// Are we past the max bucket size?
if bucket.Len() > rt.bucketsize {
if b_id == len(rt.Buckets)-1 {
new_bucket := bucket.Split(b_id, rt.local)
rt.Buckets = append(rt.Buckets, new_bucket)
if new_bucket.Len() > rt.bucketsize {
if bucket.len() > rt.bucketsize {
if bucketID == len(rt.Buckets)-1 {
newBucket := bucket.Split(bucketID, rt.local)
rt.Buckets = append(rt.Buckets, newBucket)
if newBucket.len() > rt.bucketsize {
// TODO: This is a very rare and annoying case
panic("Case not handled.")
}
// If all elements were on left side of split...
if bucket.Len() > rt.bucketsize {
return bucket.PopBack()
if bucket.len() > rt.bucketsize {
return bucket.popBack()
}
} else {
// If the bucket cant split kick out least active node
return bucket.PopBack()
return bucket.popBack()
}
}
return nil
} else {
// If the peer is already in the table, move it to the front.
// This signifies that it it "more active" and the less active nodes
// Will as a result tend towards the back of the list
bucket.MoveToFront(e)
return nil
}
// If the peer is already in the table, move it to the front.
// This signifies that it it "more active" and the less active nodes
// Will as a result tend towards the back of the list
bucket.moveToFront(e)
return nil
}
// A helper struct to sort peers by their distance to the local node
@ -101,7 +100,7 @@ type peerSorterArr []*peerDistance
func (p peerSorterArr) Len() int { return len(p) }
func (p peerSorterArr) Swap(a, b int) { p[a], p[b] = p[b], p[a] }
func (p peerSorterArr) Less(a, b int) bool {
return p[a].distance.Less(p[b].distance)
return p[a].distance.less(p[b].distance)
}
//
@ -109,10 +108,10 @@ func (p peerSorterArr) Less(a, b int) bool {
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
for e := peerList.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer.Peer)
p_id := ConvertPeerID(p.ID)
pID := convertPeerID(p.ID)
pd := peerDistance{
p: p,
distance: xor(target, p_id),
distance: xor(target, pID),
}
peerArr = append(peerArr, &pd)
if e == nil {
@ -125,24 +124,23 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe
// Find a specific peer by ID or return nil
func (rt *RoutingTable) Find(id peer.ID) *peer.Peer {
srch := rt.NearestPeers(ConvertPeerID(id), 1)
srch := rt.NearestPeers(convertPeerID(id), 1)
if len(srch) == 0 || !srch[0].ID.Equal(id) {
return nil
}
return srch[0]
}
// Returns a single peer that is nearest to the given ID
// NearestPeer returns a single peer that is nearest to the given ID
func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer {
peers := rt.NearestPeers(id, 1)
if len(peers) > 0 {
return peers[0]
} else {
return nil
}
return nil
}
// Returns a list of the 'count' closest peers to the given ID
// NearestPeers returns a list of the 'count' closest peers to the given ID
func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
@ -156,7 +154,7 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
bucket = rt.Buckets[cpl]
var peerArr peerSorterArr
if bucket.Len() == 0 {
if bucket.len() == 0 {
// In the case of an unusual split, one bucket may be empty.
// if this happens, search both surrounding buckets for nearest peer
if cpl > 0 {
@ -183,17 +181,17 @@ func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer {
return out
}
// Returns the total number of peers in the routing table
// Size returns the total number of peers in the routing table
func (rt *RoutingTable) Size() int {
var tot int
for _, buck := range rt.Buckets {
tot += buck.Len()
tot += buck.len()
}
return tot
}
// NOTE: This is potentially unsafe... use at your own risk
func (rt *RoutingTable) Listpeers() []*peer.Peer {
func (rt *RoutingTable) listPeers() []*peer.Peer {
var peers []*peer.Peer
for _, buck := range rt.Buckets {
for e := buck.getIter(); e != nil; e = e.Next() {
@ -203,10 +201,10 @@ func (rt *RoutingTable) Listpeers() []*peer.Peer {
return peers
}
func (rt *RoutingTable) Print() {
func (rt *RoutingTable) print() {
fmt.Printf("Routing Table, bs = %d, Max latency = %d\n", rt.bucketsize, rt.maxLatency)
rt.tabLock.RLock()
peers := rt.Listpeers()
peers := rt.listPeers()
for i, p := range peers {
fmt.Printf("%d) %s %s\n", i, p.ID.Pretty(), p.GetLatency().String())
}

View File

@ -27,28 +27,28 @@ func _randID() ID {
// Test basic features of the bucket struct
func TestBucket(t *testing.T) {
b := NewBucket()
b := newBucket()
peers := make([]*peer.Peer, 100)
for i := 0; i < 100; i++ {
peers[i] = _randPeer()
b.PushFront(peers[i])
b.pushFront(peers[i])
}
local := _randPeer()
local_id := ConvertPeerID(local.ID)
localID := convertPeerID(local.ID)
i := rand.Intn(len(peers))
e := b.Find(peers[i].ID)
e := b.find(peers[i].ID)
if e == nil {
t.Errorf("Failed to find peer: %v", peers[i])
}
spl := b.Split(0, ConvertPeerID(local.ID))
spl := b.Split(0, convertPeerID(local.ID))
llist := b.list
for e := llist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, local_id).commonPrefixLen()
p := convertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, localID).commonPrefixLen()
if cpl > 0 {
t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket")
}
@ -56,8 +56,8 @@ func TestBucket(t *testing.T) {
rlist := spl.list
for e := rlist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, local_id).commonPrefixLen()
p := convertPeerID(e.Value.(*peer.Peer).ID)
cpl := xor(p, localID).commonPrefixLen()
if cpl == 0 {
t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket")
}
@ -67,7 +67,7 @@ func TestBucket(t *testing.T) {
// Right now, this just makes sure that it doesnt hang or crash
func TestTableUpdate(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
rt := newRoutingTable(10, convertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 100; i++ {
@ -93,7 +93,7 @@ func TestTableUpdate(t *testing.T) {
func TestTableFind(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(10, ConvertPeerID(local.ID), time.Hour)
rt := newRoutingTable(10, convertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 5; i++ {
@ -102,7 +102,7 @@ func TestTableFind(t *testing.T) {
}
t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty())
found := rt.NearestPeer(ConvertPeerID(peers[2].ID))
found := rt.NearestPeer(convertPeerID(peers[2].ID))
if !found.ID.Equal(peers[2].ID) {
t.Fatalf("Failed to lookup known node...")
}
@ -110,7 +110,7 @@ func TestTableFind(t *testing.T) {
func TestTableFindMultiple(t *testing.T) {
local := _randPeer()
rt := NewRoutingTable(20, ConvertPeerID(local.ID), time.Hour)
rt := newRoutingTable(20, convertPeerID(local.ID), time.Hour)
peers := make([]*peer.Peer, 100)
for i := 0; i < 18; i++ {
@ -119,7 +119,7 @@ func TestTableFindMultiple(t *testing.T) {
}
t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty())
found := rt.NearestPeers(ConvertPeerID(peers[2].ID), 15)
found := rt.NearestPeers(convertPeerID(peers[2].ID), 15)
if len(found) != 15 {
t.Fatalf("Got back different number of peers than we expected.")
}
@ -130,7 +130,7 @@ func TestTableFindMultiple(t *testing.T) {
// and set GOMAXPROCS above 1
func TestTableMultithreaded(t *testing.T) {
local := peer.ID("localPeer")
tab := NewRoutingTable(20, ConvertPeerID(local), time.Hour)
tab := newRoutingTable(20, convertPeerID(local), time.Hour)
var peers []*peer.Peer
for i := 0; i < 500; i++ {
peers = append(peers, _randPeer())
@ -167,8 +167,8 @@ func TestTableMultithreaded(t *testing.T) {
func BenchmarkUpdates(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
tab := NewRoutingTable(20, local, time.Hour)
local := convertKey("localKey")
tab := newRoutingTable(20, local, time.Hour)
var peers []*peer.Peer
for i := 0; i < b.N; i++ {
@ -183,8 +183,8 @@ func BenchmarkUpdates(b *testing.B) {
func BenchmarkFinds(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
tab := NewRoutingTable(20, local, time.Hour)
local := convertKey("localKey")
tab := newRoutingTable(20, local, time.Hour)
var peers []*peer.Peer
for i := 0; i < b.N; i++ {

View File

@ -20,11 +20,11 @@ var ErrLookupFailure = errors.New("failed to find any peer in table")
// peer.ID or a util.Key. This unifies the keyspace
type ID []byte
func (id ID) Equal(other ID) bool {
func (id ID) equal(other ID) bool {
return bytes.Equal(id, other)
}
func (id ID) Less(other ID) bool {
func (id ID) less(other ID) bool {
a, b := equalizeSizes(id, other)
for i := 0; i < len(a); i++ {
if a[i] != b[i] {
@ -76,23 +76,23 @@ func equalizeSizes(a, b ID) (ID, ID) {
return a, b
}
func ConvertPeerID(id peer.ID) ID {
func convertPeerID(id peer.ID) ID {
hash := sha256.Sum256(id)
return hash[:]
}
func ConvertKey(id u.Key) ID {
func convertKey(id u.Key) ID {
hash := sha256.Sum256([]byte(id))
return hash[:]
}
// Returns true if a is closer to key than b is
// Closer returns true if a is closer to key than b is
func Closer(a, b peer.ID, key u.Key) bool {
aid := ConvertPeerID(a)
bid := ConvertPeerID(b)
tgt := ConvertKey(key)
aid := convertPeerID(a)
bid := convertPeerID(b)
tgt := convertKey(key)
adist := xor(aid, tgt)
bdist := xor(bid, tgt)
return adist.Less(bdist)
return adist.less(bdist)
}

View File

@ -1,9 +1,10 @@
package routing
import (
"time"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
"time"
)
// IpfsRouting is the routing module interface