From 60cd0f1cf00fa1526c079e21b77ddcc57905fede Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 15 Oct 2014 12:30:52 -0700 Subject: [PATCH] some dht cleanup, and make DHTs take a master context --- core/core.go | 2 +- routing/dht/dht.go | 31 +++++++++++++++++++--- routing/dht/dht_logger.go | 7 ++++- routing/dht/handlers.go | 54 -------------------------------------- routing/dht/messages.pb.go | 17 +++++------- routing/dht/messages.proto | 7 +++-- routing/dht/routing.go | 33 +---------------------- 7 files changed, 45 insertions(+), 106 deletions(-) diff --git a/core/core.go b/core/core.go index 3b9cc1fb2..331299fec 100644 --- a/core/core.go +++ b/core/core.go @@ -141,7 +141,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { diagnostics = diag.NewDiagnostics(local, net, diagService) diagService.SetHandler(diagnostics) - route = dht.NewDHT(local, peerstore, net, dhtService, d) + route = dht.NewDHT(ctx, local, peerstore, net, dhtService, d) // TODO(brian): perform this inside NewDHT factory method dhtService.SetHandler(route) // wire the handler to the service. diff --git a/routing/dht/dht.go b/routing/dht/dht.go index e00b82bf2..22523f0bb 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -53,16 +53,19 @@ type IpfsDHT struct { //lock to make diagnostics work better diaglock sync.Mutex + + ctx context.Context } // NewDHT creates a new DHT object with the given peer as the 'local' host -func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT { +func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT { dht := new(IpfsDHT) dht.network = net dht.sender = sender dht.datastore = dstore dht.self = p dht.peerstore = ps + dht.ctx = ctx dht.providers = NewProviderManager(p.ID) @@ -71,6 +74,8 @@ func NewDHT(p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sende dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000) dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour) dht.birth = time.Now() + + go dht.PingRoutine(time.Second * 10) return dht } @@ -137,7 +142,6 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N // get handler for this msg type. handler := dht.handlerForMsgType(pmes.GetType()) if handler == nil { - // TODO handle/log err log.Error("got back nil handler from handlerForMsgType") return nil } @@ -350,7 +354,7 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) { byt, ok := v.([]byte) if !ok { - return byt, errors.New("value stored in datastore not []byte") + return nil, errors.New("value stored in datastore not []byte") } return byt, nil } @@ -533,6 +537,27 @@ func (dht *IpfsDHT) loadProvidableKeys() error { return nil } +func (dht *IpfsDHT) PingRoutine(t time.Duration) { + tick := time.Tick(t) + for { + select { + case <-tick: + id := make([]byte, 16) + rand.Read(id) + peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5) + for _, p := range peers { + ctx, _ := context.WithTimeout(dht.ctx, time.Second*5) + err := dht.Ping(ctx, p) + if err != nil { + log.Error("Ping error: %s", err) + } + } + case <-dht.ctx.Done(): + return + } + } +} + // Bootstrap builds up list of peers by requesting random peer IDs func (dht *IpfsDHT) Bootstrap(ctx context.Context) { id := make([]byte, 16) diff --git a/routing/dht/dht_logger.go b/routing/dht/dht_logger.go index 1a0878bf7..0ff012956 100644 --- a/routing/dht/dht_logger.go +++ b/routing/dht/dht_logger.go @@ -2,6 +2,7 @@ package dht import ( "encoding/json" + "fmt" "time" ) @@ -29,12 +30,16 @@ func (l *logDhtRPC) EndLog() { func (l *logDhtRPC) Print() { b, err := json.Marshal(l) if err != nil { - log.Debug(err.Error()) + log.Debug("Error marshaling logDhtRPC object: %s", err) } else { log.Debug(string(b)) } } +func (l *logDhtRPC) String() string { + return fmt.Sprintf("DHT RPC: %s took %s, success = %s", l.Type, l.Duration, l.Success) +} + func (l *logDhtRPC) EndAndPrint() { l.EndLog() l.Print() diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 4a9de160e..1046516b6 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -5,9 +5,7 @@ import ( "fmt" "time" - msg "github.com/jbenet/go-ipfs/net/message" peer "github.com/jbenet/go-ipfs/peer" - kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" @@ -32,8 +30,6 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler { return dht.handleGetProviders case Message_PING: return dht.handlePing - case Message_DIAGNOSTIC: - return dht.handleDiagnostic default: return nil } @@ -211,53 +207,3 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er func (dht *IpfsDHT) Halt() { dht.providers.Halt() } - -// NOTE: not yet finished, low priority -func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) { - seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) - - for _, ps := range seq { - _, err := msg.FromObject(ps, pmes) - if err != nil { - log.Error("handleDiagnostics error creating message: %v\n", err) - continue - } - // dht.sender.SendRequest(context.TODO(), mes) - } - return nil, errors.New("not yet ported back") - - // buf := new(bytes.Buffer) - // di := dht.getDiagInfo() - // buf.Write(di.Marshal()) - // - // // NOTE: this shouldnt be a hardcoded value - // after := time.After(time.Second * 20) - // count := len(seq) - // for count > 0 { - // select { - // case <-after: - // //Timeout, return what we have - // goto out - // case reqResp := <-listenChan: - // pmesOut := new(Message) - // err := proto.Unmarshal(reqResp.Data, pmesOut) - // if err != nil { - // // It broke? eh, whatever, keep going - // continue - // } - // buf.Write(reqResp.Data) - // count-- - // } - // } - // - // out: - // resp := Message{ - // Type: Message_DIAGNOSTIC, - // ID: pmes.GetId(), - // Value: buf.Bytes(), - // Response: true, - // } - // - // mes := swarm.NewMessage(p, resp.ToProtobuf()) - // dht.netChan.Outgoing <- mes -} diff --git a/routing/dht/messages.pb.go b/routing/dht/messages.pb.go index b6e9fa4f2..f204544ea 100644 --- a/routing/dht/messages.pb.go +++ b/routing/dht/messages.pb.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-gogo. +// Code generated by protoc-gen-go. // source: messages.proto // DO NOT EDIT! @@ -13,13 +13,11 @@ It has these top-level messages: */ package dht -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" -import json "encoding/json" +import proto "code.google.com/p/goprotobuf/proto" import math "math" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type Message_MessageType int32 @@ -31,7 +29,6 @@ const ( Message_GET_PROVIDERS Message_MessageType = 3 Message_FIND_NODE Message_MessageType = 4 Message_PING Message_MessageType = 5 - Message_DIAGNOSTIC Message_MessageType = 6 ) var Message_MessageType_name = map[int32]string{ @@ -41,7 +38,6 @@ var Message_MessageType_name = map[int32]string{ 3: "GET_PROVIDERS", 4: "FIND_NODE", 5: "PING", - 6: "DIAGNOSTIC", } var Message_MessageType_value = map[string]int32{ "PUT_VALUE": 0, @@ -50,7 +46,6 @@ var Message_MessageType_value = map[string]int32{ "GET_PROVIDERS": 3, "FIND_NODE": 4, "PING": 5, - "DIAGNOSTIC": 6, } func (x Message_MessageType) Enum() *Message_MessageType { @@ -72,7 +67,7 @@ func (x *Message_MessageType) UnmarshalJSON(data []byte) error { type Message struct { // defines what type of message it is. - Type *Message_MessageType `protobuf:"varint,1,req,name=type,enum=dht.Message_MessageType" json:"type,omitempty"` + Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.Message_MessageType" json:"type,omitempty"` // defines what coral cluster level this query/response belongs to. ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"` // Used to specify the key associated with this message. @@ -137,8 +132,8 @@ func (m *Message) GetProviderPeers() []*Message_Peer { } type Message_Peer struct { - Id *string `protobuf:"bytes,1,req,name=id" json:"id,omitempty"` - Addr *string `protobuf:"bytes,2,req,name=addr" json:"addr,omitempty"` + Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Addr *string `protobuf:"bytes,2,opt,name=addr" json:"addr,omitempty"` XXX_unrecognized []byte `json:"-"` } diff --git a/routing/dht/messages.proto b/routing/dht/messages.proto index 3c33f9382..067690150 100644 --- a/routing/dht/messages.proto +++ b/routing/dht/messages.proto @@ -10,16 +10,15 @@ message Message { GET_PROVIDERS = 3; FIND_NODE = 4; PING = 5; - DIAGNOSTIC = 6; } message Peer { - required string id = 1; - required string addr = 2; + optional string id = 1; + optional string addr = 2; } // defines what type of message it is. - required MessageType type = 1; + optional MessageType type = 1; // defines what coral cluster level this query/response belongs to. optional int32 clusterLevelRaw = 10; diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 03d94d118..55ef265cb 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,8 +1,6 @@ package dht import ( - "bytes" - "encoding/json" "sync" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -62,6 +60,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { routeLevel := 0 closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize) if closest == nil || len(closest) == 0 { + log.Warning("Got no peers back from routing table!") return nil, nil } @@ -282,33 +281,3 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error { log.Info("ping %s end (err = %s)", p, err) return err } - -func (dht *IpfsDHT) getDiagnostic(ctx context.Context) ([]*diagInfo, error) { - - log.Info("Begin Diagnostic") - peers := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) - var out []*diagInfo - - query := newQuery(dht.self.Key(), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) { - pmes := newMessage(Message_DIAGNOSTIC, "", 0) - rpmes, err := dht.sendRequest(ctx, p, pmes) - if err != nil { - return nil, err - } - - dec := json.NewDecoder(bytes.NewBuffer(rpmes.GetValue())) - for { - di := new(diagInfo) - err := dec.Decode(di) - if err != nil { - break - } - - out = append(out, di) - } - return &dhtQueryResult{success: true}, nil - }) - - _, err := query.Run(ctx, peers) - return out, err -}