From d5331e7dc761d52e17a2cb66306b37607fb9066a Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Wed, 28 Jan 2015 08:06:53 -0800 Subject: [PATCH] feat(gcr/s) add eventlogs --- routing/grandcentral/client.go | 6 ++++++ routing/grandcentral/proxy/standard.go | 30 +++++++++++++++++++------- routing/grandcentral/server.go | 4 ++++ 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/routing/grandcentral/client.go b/routing/grandcentral/client.go index 8b67276b7..405a7750d 100644 --- a/routing/grandcentral/client.go +++ b/routing/grandcentral/client.go @@ -36,6 +36,7 @@ func NewClient(px proxy.Proxy, h host.Host, ps peer.Peerstore, local peer.ID) (* } func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan peer.PeerInfo { + defer log.EventBegin(ctx, "findProviders", &k).Done() ch := make(chan peer.PeerInfo) go func() { defer close(ch) @@ -58,6 +59,7 @@ func (c *Client) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-cha } func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error { + defer log.EventBegin(ctx, "putValue", &k).Done() r, err := makeRecord(c.peerstore, c.local, k, v) if err != nil { return err @@ -68,6 +70,7 @@ func (c *Client) PutValue(ctx context.Context, k u.Key, v []byte) error { } func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) { + defer log.EventBegin(ctx, "getValue", &k).Done() msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0) response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote if err != nil { @@ -77,6 +80,7 @@ func (c *Client) GetValue(ctx context.Context, k u.Key) ([]byte, error) { } func (c *Client) Provide(ctx context.Context, k u.Key) error { + defer log.EventBegin(ctx, "provide", &k).Done() msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0) // FIXME how is connectedness defined for the local node pri := []pb.PeerRoutingInfo{ @@ -92,6 +96,7 @@ func (c *Client) Provide(ctx context.Context, k u.Key) error { } func (c *Client) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) { + defer log.EventBegin(ctx, "findPeer", id).Done() request := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) response, err := c.proxy.SendRequest(ctx, request) // hide remote if err != nil { @@ -121,6 +126,7 @@ func makeRecord(ps peer.Peerstore, p peer.ID, k u.Key, v []byte) (*pb.Record, er } func (c *Client) Ping(ctx context.Context, id peer.ID) (time.Duration, error) { + defer log.EventBegin(ctx, "ping", id).Done() return time.Nanosecond, errors.New("grandcentral routing does not support the ping method") } diff --git a/routing/grandcentral/proxy/standard.go b/routing/grandcentral/proxy/standard.go index 373501ea1..36da88cbf 100644 --- a/routing/grandcentral/proxy/standard.go +++ b/routing/grandcentral/proxy/standard.go @@ -13,10 +13,10 @@ import ( errors "github.com/jbenet/go-ipfs/util/debugerror" ) -var log = eventlog.Logger("proxy") - const ProtocolGCR = "/ipfs/grandcentral" +var log = eventlog.Logger("grandcentral/proxy") + type Proxy interface { HandleStream(inet.Stream) SendMessage(ctx context.Context, m *dhtpb.Message) error @@ -48,8 +48,15 @@ func (px *standard) SendMessage(ctx context.Context, m *dhtpb.Message) error { return err // NB: returns the last error } -func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) error { - if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil { +func (px *standard) sendMessage(ctx context.Context, m *dhtpb.Message, remote peer.ID) (err error) { + e := log.EventBegin(ctx, "sendRoutingMessage", px.Host.ID(), remote, m) + defer func() { + if err != nil { + e.SetError(err) + } + e.Done() + }() + if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil { return err } s, err := px.Host.NewStream(ProtocolGCR, remote) @@ -78,8 +85,15 @@ func (px *standard) SendRequest(ctx context.Context, m *dhtpb.Message) (*dhtpb.M return nil, err // NB: returns the last error } -func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (*dhtpb.Message, error) { - if err := px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil { +func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote peer.ID) (_ *dhtpb.Message, err error) { + e := log.EventBegin(ctx, "sendRoutingRequest", px.Host.ID(), remote, m) + defer func() { + if err != nil { + e.SetError(err) + } + e.Done() + }() + if err = px.Host.Connect(ctx, peer.PeerInfo{ID: remote}); err != nil { return nil, err } s, err := px.Host.NewStream(ProtocolGCR, remote) @@ -89,12 +103,12 @@ func (px *standard) sendRequest(ctx context.Context, m *dhtpb.Message, remote pe defer s.Close() r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) w := ggio.NewDelimitedWriter(s) - if err := w.WriteMsg(m); err != nil { + if err = w.WriteMsg(m); err != nil { return nil, err } var reply dhtpb.Message - if err := r.ReadMsg(&reply); err != nil { + if err = r.ReadMsg(&reply); err != nil { return nil, err } // need ctx expiration? diff --git a/routing/grandcentral/server.go b/routing/grandcentral/server.go index 309433eff..62198f8cb 100644 --- a/routing/grandcentral/server.go +++ b/routing/grandcentral/server.go @@ -42,6 +42,8 @@ func (s *Server) HandleRequest(ctx context.Context, p peer.ID, req *dhtpb.Messag func (s *Server) handleMessage( ctx context.Context, p peer.ID, req *dhtpb.Message) (peer.ID, *dhtpb.Message) { + log.EventBegin(ctx, "routingMessageReceived", req, p, s.local).Done() // TODO may need to differentiate between local and remote + // FIXME threw everything into this switch statement to get things going. // Once each operation is well-defined, extract pluggable backend so any // database may be used. @@ -131,6 +133,7 @@ func putRoutingRecord(ds datastore.Datastore, k util.Key, value *dhtpb.Record) e } func putRoutingProviders(ds datastore.Datastore, k util.Key, providers []*dhtpb.Message_Peer) error { + log.Event(context.Background(), "putRoutingProviders", &k) pkey := datastore.KeyWithNamespaces([]string{"routing", "providers", k.String()}) if v, err := ds.Get(pkey); err == nil { if msg, ok := v.([]byte); ok { @@ -166,6 +169,7 @@ func storeProvidersToPeerstore(ps peer.Peerstore, p peer.ID, providers []*dhtpb. } func getRoutingProviders(local peer.ID, ds datastore.Datastore, k util.Key) ([]*dhtpb.Message_Peer, error) { + log.Event(context.Background(), "getProviders", local, &k) var providers []*dhtpb.Message_Peer exists, err := ds.Has(k.DsKey()) // TODO store values in a local datastore? if err == nil && exists {