From 8cdf5668654ff60a478616bec3ad33bacf01bcd1 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 11 Dec 2014 06:08:53 +0000 Subject: [PATCH] remove multilayered routing table from the DHT (for now) --- routing/dht/dht.go | 63 ++++++++++++++---------------------------- routing/dht/diag.go | 2 +- routing/dht/routing.go | 29 ++++++++----------- 3 files changed, 32 insertions(+), 62 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 6f3a846d3..caf6d8c9d 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -37,7 +37,7 @@ const doPinging = false type IpfsDHT struct { // Array of routing tables for differently distanced nodes // NOTE: (currently, only a single table is used) - routingTables []*kb.RoutingTable + routingTable *kb.RoutingTable // the network services we need dialer inet.Dialer @@ -80,10 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia dht.providers = NewProviderManager(dht.Context(), p.ID()) dht.AddCloserChild(dht.providers) - dht.routingTables = make([]*kb.RoutingTable, 3) - dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000) - dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000) - dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour) + dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Minute) dht.birth = time.Now() dht.Validators = make(map[string]ValidatorFunc) @@ -243,9 +240,9 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er } func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, - key u.Key, level int) ([]byte, []peer.Peer, error) { + key u.Key) ([]byte, []peer.Peer, error) { - pmes, err := dht.getValueSingle(ctx, p, key, level) + pmes, err := dht.getValueSingle(ctx, p, key) if err != nil { return nil, nil, err } @@ -265,7 +262,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, // TODO decide on providers. This probably shouldn't be happening. if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 { - val, err := dht.getFromPeerList(ctx, key, prv, level) + val, err := dht.getFromPeerList(ctx, key, prv) if err != nil { return nil, nil, err } @@ -292,9 +289,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, // getValueSingle simply performs the get value RPC with the given parameters func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer, - key u.Key, level int) (*pb.Message, error) { + key u.Key) (*pb.Message, error) { - pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level) + pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0) return dht.sendRequest(ctx, p, pmes) } @@ -303,7 +300,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer, // one to get the value from? Or just connect to one at a time until we get a // successful connection and request the value from it? func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, - peerlist []*pb.Message_Peer, level int) ([]byte, error) { + peerlist []*pb.Message_Peer) ([]byte, error) { for _, pinfo := range peerlist { p, err := dht.ensureConnectedToPeer(ctx, pinfo) @@ -312,7 +309,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key, continue } - pmes, err := dht.getValueSingle(ctx, p, key, level) + pmes, err := dht.getValueSingle(ctx, p, key) if err != nil { log.Errorf("getFromPeers error: %s\n", err) continue @@ -379,47 +376,30 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error { return dht.datastore.Put(key.DsKey(), data) } -// Update signals to all routingTables to Update their last-seen status +// Update signals the routingTable to Update its last-seen status // on the given peer. func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) { log.Event(ctx, "updatePeer", p) - removedCount := 0 - for _, route := range dht.routingTables { - removed := route.Update(p) - // Only close the connection if no tables refer to this peer - if removed != nil { - removedCount++ - } - } - - // Only close the connection if no tables refer to this peer - // if removedCount == len(dht.routingTables) { - // dht.network.ClosePeer(p) - // } - // ACTUALLY, no, let's not just close the connection. it may be connected - // due to other things. it seems that we just need connection timeouts - // after some deadline of inactivity. + dht.routingTable.Update(p) } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) { - for _, table := range dht.routingTables { - p := table.Find(id) - if p != nil { - return p, table - } + p := dht.routingTable.Find(id) + if p != nil { + return p, dht.routingTable } return nil, nil } // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is -func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) { - pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level) +func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID) (*pb.Message, error) { + pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) return dht.sendRequest(ctx, p, pmes) } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) { - pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level) +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key) (*pb.Message, error) { + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0) return dht.sendRequest(ctx, p, pmes) } @@ -446,11 +426,8 @@ func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer // nearestPeersToQuery returns the routing tables closest peers. func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer { - level := pmes.GetClusterLevel() - cluster := dht.routingTables[level] - key := u.Key(pmes.GetKey()) - closer := cluster.NearestPeers(kb.ConvertKey(key), count) + closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count) return closer } @@ -537,7 +514,7 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) { case <-tick: id := make([]byte, 16) rand.Read(id) - peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5) for _, p := range peers { ctx, _ := context.WithTimeout(dht.Context(), time.Second*5) err := dht.Ping(ctx, p) diff --git a/routing/dht/diag.go b/routing/dht/diag.go index e91ba9bee..82316e2e3 100644 --- a/routing/dht/diag.go +++ b/routing/dht/diag.go @@ -36,7 +36,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo { di.LifeSpan = time.Since(dht.birth) di.Keys = nil // Currently no way to query datastore - for _, p := range dht.routingTables[0].ListPeers() { + for _, p := range dht.routingTable.ListPeers() { d := connDiagInfo{p.GetLatency(), p.ID()} di.Connections = append(di.Connections, d) } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index fbdc7293a..3148e1589 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -38,11 +38,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } - var peers []peer.Peer - for _, route := range dht.routingTables { - npeers := route.NearestPeers(kb.ConvertKey(key), KValue) - peers = append(peers, npeers...) - } + peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { log.Debugf("%s PutValue qry part %v", dht.self, p) @@ -71,9 +67,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { return val, nil } - // get closest peers in the routing tables - routeLevel := 0 - closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize) + // get closest peers in the routing table + closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) if closest == nil || len(closest) == 0 { log.Warning("Got no peers back from routing table!") return nil, kb.ErrLookupFailure @@ -82,7 +77,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // setup the Query query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { - val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel) + val, peers, err := dht.getValueOrPeers(ctx, p, key) if err != nil { return nil, err } @@ -116,7 +111,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { dht.providers.AddProvider(key, dht.self) - peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) if len(peers) == 0 { return nil } @@ -166,7 +161,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // setup the Query query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { - pmes, err := dht.findProvidersSingle(ctx, p, key, 0) + pmes, err := dht.findProvidersSingle(ctx, p, key) if err != nil { return nil, err } @@ -205,7 +200,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co return &dhtQueryResult{closerPeers: clpeers}, nil }) - peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) _, err := query.Run(ctx, peers) if err != nil { log.Errorf("FindProviders Query error: %s", err) @@ -253,8 +248,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) return p, nil } - routeLevel := 0 - closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue) + closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) if closest == nil || len(closest) == 0 { return nil, kb.ErrLookupFailure } @@ -270,7 +264,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) // setup the Query query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { - pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel) + pmes, err := dht.findPeerSingle(ctx, p, id) if err != nil { return nil, err } @@ -316,8 +310,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< peerchan := make(chan peer.Peer, asyncQueryBuffer) peersSeen := map[string]peer.Peer{} - routeLevel := 0 - closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue) + closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) if closest == nil || len(closest) == 0 { return nil, kb.ErrLookupFailure } @@ -325,7 +318,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< // setup the Query query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { - pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel) + pmes, err := dht.findPeerSingle(ctx, p, id) if err != nil { return nil, err }