From 280c7e7e06fe69942d9b0b08e0bc752ce2e1715b Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 9 Oct 2014 22:51:47 +0000 Subject: [PATCH] implement diagnostics service --- Godeps/Godeps.json | 2 +- cmd/ipfs/ipfs.go | 1 + core/core.go | 35 +++-- daemon/daemon.go | 10 ++ diagnostics/diag.go | 263 ++++++++++++++++++++++++++++++++++++++ diagnostics/message.pb.go | 48 +++++++ diagnostics/message.proto | 6 + net/mux/mux.pb.go | 27 ++-- net/mux/mux.proto | 1 + net/net.go | 5 + net/swarm/conn.go | 2 + net/swarm/swarm.go | 10 ++ util/util.go | 7 + 13 files changed, 391 insertions(+), 26 deletions(-) create mode 100644 diagnostics/diag.go create mode 100644 diagnostics/message.pb.go create mode 100644 diagnostics/message.proto diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 3cce3f0d3..71ee4c0e1 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/jbenet/go-ipfs", - "GoVersion": "go1.3", + "GoVersion": "go1.3.3", "Packages": [ "./..." ], diff --git a/cmd/ipfs/ipfs.go b/cmd/ipfs/ipfs.go index 933ea5bd7..f5e645bcf 100644 --- a/cmd/ipfs/ipfs.go +++ b/cmd/ipfs/ipfs.go @@ -81,6 +81,7 @@ func ipfsCmd(c *commander.Command, args []string) error { } func main() { + u.AllLoggersOn() u.Debug = false // setup logging diff --git a/core/core.go b/core/core.go index 06dd08c11..2dc5b5309 100644 --- a/core/core.go +++ b/core/core.go @@ -13,6 +13,7 @@ import ( bserv "github.com/jbenet/go-ipfs/blockservice" config "github.com/jbenet/go-ipfs/config" ci "github.com/jbenet/go-ipfs/crypto" + diag "github.com/jbenet/go-ipfs/diagnostics" exchange "github.com/jbenet/go-ipfs/exchange" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" merkledag "github.com/jbenet/go-ipfs/merkledag" @@ -64,6 +65,9 @@ type IpfsNode struct { // the name system, resolves paths to hashes Namesys namesys.NameSystem + + // the diagnostics service + Diagnostics *diag.Diagnostics } // NewIpfsNode constructs a new IpfsNode based on the given config. @@ -103,12 +107,14 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { // TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific route *dht.IpfsDHT exchangeSession exchange.Interface + diagnostics *diag.Diagnostics ) if online { dhtService := netservice.NewService(nil) // nil handler for now, need to patch it exchangeService := netservice.NewService(nil) // nil handler for now, need to patch it + diagService := netservice.NewService(nil) if err := dhtService.Start(ctx); err != nil { return nil, err @@ -118,14 +124,18 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { } net, err = inet.NewIpfsNetwork(context.TODO(), local, peerstore, &mux.ProtocolMap{ - mux.ProtocolID_Routing: dhtService, - mux.ProtocolID_Exchange: exchangeService, + mux.ProtocolID_Routing: dhtService, + mux.ProtocolID_Exchange: exchangeService, + mux.ProtocolID_Diagnostic: diagService, // add protocol services here. }) if err != nil { return nil, err } + diagnostics = diag.NewDiagnostics(local, net, diagService) + diagService.Handler = diagnostics + route = dht.NewDHT(local, peerstore, net, dhtService, d) // TODO(brian): perform this inside NewDHT factory method dhtService.Handler = route // wire the handler to the service. @@ -149,16 +159,17 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { success = true return &IpfsNode{ - Config: cfg, - Peerstore: peerstore, - Datastore: d, - Blocks: bs, - DAG: dag, - Resolver: &path.Resolver{DAG: dag}, - Exchange: exchangeSession, - Identity: local, - Routing: route, - Namesys: ns, + Config: cfg, + Peerstore: peerstore, + Datastore: d, + Blocks: bs, + DAG: dag, + Resolver: &path.Resolver{DAG: dag}, + Exchange: exchangeSession, + Identity: local, + Routing: route, + Namesys: ns, + Diagnostics: diagnostics, }, nil } diff --git a/daemon/daemon.go b/daemon/daemon.go index 1a3555f93..ab5c66728 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -8,6 +8,7 @@ import ( "os" "path" "sync" + "time" core "github.com/jbenet/go-ipfs/core" "github.com/jbenet/go-ipfs/core/commands" @@ -136,6 +137,15 @@ func (dl *DaemonListener) handleConnection(conn net.Conn) { err = commands.Publish(dl.node, command.Args, command.Opts, conn) case "resolve": err = commands.Resolve(dl.node, command.Args, command.Opts, conn) + case "diag": + log.Debug("DIAGNOSTIC!") + info, err := dl.node.Diagnostics.GetDiagnostic(time.Second * 20) + if err != nil { + fmt.Fprintln(conn, err) + return + } + enc := json.NewEncoder(conn) + err = enc.Encode(info) default: err = fmt.Errorf("Invalid Command: '%s'", command.Command) } diff --git a/diagnostics/diag.go b/diagnostics/diag.go new file mode 100644 index 000000000..111e54016 --- /dev/null +++ b/diagnostics/diag.go @@ -0,0 +1,263 @@ +package diagnostic + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "sync" + "time" + + "crypto/rand" + + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + + net "github.com/jbenet/go-ipfs/net" + msg "github.com/jbenet/go-ipfs/net/message" + peer "github.com/jbenet/go-ipfs/peer" + util "github.com/jbenet/go-ipfs/util" +) + +var log = util.Logger("diagnostics") + +type Diagnostics struct { + network net.Network + sender net.Sender + self *peer.Peer + + diagLock sync.Mutex + diagMap map[string]time.Time + birth time.Time +} + +func NewDiagnostics(self *peer.Peer, inet net.Network, sender net.Sender) *Diagnostics { + return &Diagnostics{ + network: inet, + sender: sender, + self: self, + diagMap: make(map[string]time.Time), + birth: time.Now(), + } +} + +type connDiagInfo struct { + Latency time.Duration + ID string +} + +type diagInfo struct { + ID string + Connections []connDiagInfo + Keys []string + LifeSpan time.Duration + CodeVersion string +} + +func (di *diagInfo) Marshal() []byte { + b, err := json.Marshal(di) + if err != nil { + panic(err) + } + //TODO: also consider compressing this. There will be a lot of these + return b +} + +func (d *Diagnostics) getPeers() []*peer.Peer { + // + n, ok := d.network.(*net.IpfsNetwork) + if !ok { + return nil + } + s := n.GetSwarm() + return s.GetPeerList() + // +} + +func (d *Diagnostics) getDiagInfo() *diagInfo { + di := new(diagInfo) + di.CodeVersion = "github.com/jbenet/go-ipfs" + di.ID = d.self.ID.Pretty() + di.LifeSpan = time.Since(d.birth) + di.Keys = nil // Currently no way to query datastore + + for _, p := range d.getPeers() { + di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID.Pretty()}) + } + return di +} + +func newID() string { + id := make([]byte, 4) + rand.Read(id) + return string(id) +} + +func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { + log.Debug("Getting diagnostic.") + ctx, _ := context.WithTimeout(context.TODO(), timeout) + + diagID := newID() + d.diagLock.Lock() + d.diagMap[diagID] = time.Now() + d.diagLock.Unlock() + + log.Debug("Begin Diagnostic") + + peers := d.getPeers() + log.Debug("Sending diagnostic request to %d peers.", len(peers)) + + var out []*diagInfo + di := d.getDiagInfo() + out = append(out, di) + + pmes := newMessage(diagID) + for _, p := range peers { + log.Debug("Sending getDiagnostic to: %s", p) + data, err := d.getDiagnosticFromPeer(ctx, p, pmes) + if err != nil { + log.Error("GetDiagnostic error: %v", err) + continue + } + buf := bytes.NewBuffer(data) + dec := json.NewDecoder(buf) + for { + di := new(diagInfo) + err := dec.Decode(di) + if err != nil { + if err != io.EOF { + log.Error("error decoding diagInfo: %v", err) + } + break + } + out = append(out, di) + } + } + return out, nil +} + +// TODO: this method no longer needed. +func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p *peer.Peer, mes *Message) ([]byte, error) { + rpmes, err := d.sendRequest(ctx, p, mes) + if err != nil { + return nil, err + } + return rpmes.GetData(), nil +} + +func newMessage(diagID string) *Message { + pmes := new(Message) + pmes.DiagID = proto.String(diagID) + return pmes +} + +func (d *Diagnostics) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) { + + mes, err := msg.FromObject(p, pmes) + if err != nil { + return nil, err + } + + start := time.Now() + + rmes, err := d.sender.SendRequest(ctx, mes) + if err != nil { + return nil, err + } + if rmes == nil { + return nil, errors.New("no response to request") + } + + rtt := time.Since(start) + log.Info("diagnostic request took: %s", rtt.String()) + + rpmes := new(Message) + if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil { + return nil, err + } + + return rpmes, nil +} + +// NOTE: not yet finished, low priority +func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) { + resp := newMessage(pmes.GetDiagID()) + d.diagLock.Lock() + _, found := d.diagMap[pmes.GetDiagID()] + if found { + d.diagLock.Unlock() + return resp, nil + } + d.diagMap[pmes.GetDiagID()] = time.Now() + d.diagLock.Unlock() + + buf := new(bytes.Buffer) + di := d.getDiagInfo() + buf.Write(di.Marshal()) + + ctx, _ := context.WithTimeout(context.TODO(), time.Second*10) + + for _, p := range d.getPeers() { + log.Debug("Sending diagnostic request to peer: %s", p) + out, err := d.getDiagnosticFromPeer(ctx, p, pmes) + if err != nil { + log.Error("getDiagnostic error: %v", err) + continue + } + _, err = buf.Write(out) + if err != nil { + log.Error("getDiagnostic write output error: %v", err) + continue + } + } + + resp.Data = buf.Bytes() + return resp, nil +} + +func (d *Diagnostics) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage { + mData := mes.Data() + if mData == nil { + log.Error("message did not include Data") + return nil + } + + mPeer := mes.Peer() + if mPeer == nil { + log.Error("message did not include a Peer") + return nil + } + + // deserialize msg + pmes := new(Message) + err := proto.Unmarshal(mData, pmes) + if err != nil { + log.Error("Failed to decode protobuf message: %v", err) + return nil + } + + // Print out diagnostic + log.Info("[peer: %s] Got message from [%s]\n", + d.self.ID.Pretty(), mPeer.ID.Pretty()) + + // dispatch handler. + rpmes, err := d.handleDiagnostic(mPeer, pmes) + if err != nil { + log.Error("handleDiagnostic error: %s", err) + return nil + } + + // if nil response, return it before serializing + if rpmes == nil { + return nil + } + + // serialize response msg + rmes, err := msg.FromObject(mPeer, rpmes) + if err != nil { + log.Error("Failed to encode protobuf message: %v", err) + return nil + } + + return rmes +} diff --git a/diagnostics/message.pb.go b/diagnostics/message.pb.go new file mode 100644 index 000000000..d05d11b44 --- /dev/null +++ b/diagnostics/message.pb.go @@ -0,0 +1,48 @@ +// Code generated by protoc-gen-go. +// source: message.proto +// DO NOT EDIT! + +/* +Package diagnostic is a generated protocol buffer package. + +It is generated from these files: + message.proto + +It has these top-level messages: + Message +*/ +package diagnostic + +import proto "code.google.com/p/goprotobuf/proto" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = math.Inf + +type Message struct { + DiagID *string `protobuf:"bytes,1,req" json:"DiagID,omitempty"` + Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} + +func (m *Message) GetDiagID() string { + if m != nil && m.DiagID != nil { + return *m.DiagID + } + return "" +} + +func (m *Message) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func init() { +} diff --git a/diagnostics/message.proto b/diagnostics/message.proto new file mode 100644 index 000000000..349afba25 --- /dev/null +++ b/diagnostics/message.proto @@ -0,0 +1,6 @@ +package diagnostic; + +message Message { + required string DiagID = 1; + optional bytes Data = 2; +} diff --git a/net/mux/mux.pb.go b/net/mux/mux.pb.go index f4e951915..482846de8 100644 --- a/net/mux/mux.pb.go +++ b/net/mux/mux.pb.go @@ -1,4 +1,4 @@ -// Code generated by protoc-gen-gogo. +// Code generated by protoc-gen-go. // source: mux.proto // DO NOT EDIT! @@ -13,22 +13,21 @@ It has these top-level messages: */ package mux -import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" -import json "encoding/json" +import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" import math "math" -// Reference proto, json, and math imports to suppress error if they are not otherwise used. +// Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal -var _ = &json.SyntaxError{} var _ = math.Inf type ProtocolID int32 const ( - ProtocolID_Test ProtocolID = 0 - ProtocolID_Identify ProtocolID = 1 - ProtocolID_Routing ProtocolID = 2 - ProtocolID_Exchange ProtocolID = 3 + ProtocolID_Test ProtocolID = 0 + ProtocolID_Identify ProtocolID = 1 + ProtocolID_Routing ProtocolID = 2 + ProtocolID_Exchange ProtocolID = 3 + ProtocolID_Diagnostic ProtocolID = 4 ) var ProtocolID_name = map[int32]string{ @@ -36,12 +35,14 @@ var ProtocolID_name = map[int32]string{ 1: "Identify", 2: "Routing", 3: "Exchange", + 4: "Diagnostic", } var ProtocolID_value = map[string]int32{ - "Test": 0, - "Identify": 1, - "Routing": 2, - "Exchange": 3, + "Test": 0, + "Identify": 1, + "Routing": 2, + "Exchange": 3, + "Diagnostic": 4, } func (x ProtocolID) Enum() *ProtocolID { diff --git a/net/mux/mux.proto b/net/mux/mux.proto index 0883cb655..7c65339a8 100644 --- a/net/mux/mux.proto +++ b/net/mux/mux.proto @@ -5,6 +5,7 @@ enum ProtocolID { Identify = 1; // setup Routing = 2; // dht Exchange = 3; // bitswap + Diagnostic = 4; } message PBProtocolMessage { diff --git a/net/net.go b/net/net.go index fc341fd7d..70a4830e3 100644 --- a/net/net.go +++ b/net/net.go @@ -107,3 +107,8 @@ func (n *IpfsNetwork) Close() error { n.cancel = nil return nil } + +// XXX +func (n *IpfsNetwork) GetSwarm() *swarm.Swarm { + return n.swarm +} diff --git a/net/swarm/conn.go b/net/swarm/conn.go index f91d6219f..a2577f186 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -122,10 +122,12 @@ func (s *Swarm) connSetup(c *conn.Conn) error { // add to conns s.connsLock.Lock() if _, ok := s.conns[c.Peer.Key()]; ok { + log.Debug("Conn already open!") s.connsLock.Unlock() return ErrAlreadyOpen } s.conns[c.Peer.Key()] = c + log.Debug("Added conn to map!") s.connsLock.Unlock() // kick off reader goroutine diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 1f42b85af..b066e79a6 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -201,5 +201,15 @@ func (s *Swarm) GetErrChan() chan error { return s.errChan } +func (s *Swarm) GetPeerList() []*peer.Peer { + var out []*peer.Peer + s.connsLock.Lock() + for _, p := range s.conns { + out = append(out, p.Peer) + } + s.connsLock.Unlock() + return out +} + // Temporary to ensure that the Swarm always matches the Network interface as we are changing it // var _ Network = &Swarm{} diff --git a/util/util.go b/util/util.go index a7058d41d..3040e7e4b 100644 --- a/util/util.go +++ b/util/util.go @@ -153,6 +153,13 @@ func SetupLogging() { } } +func AllLoggersOn() { + for n, log := range loggers { + logging.SetLevel(logging.DEBUG, n) + log.Error("setting logger: %s to %v\n", n, logging.DEBUG) + } +} + // Logger retrieves a particular logger + initializes it at a particular level func Logger(name string) *logging.Logger { log := logging.MustGetLogger(name)