From bae8b9f4c052aa8c556e89c1b476f299e07b1091 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 15 Jan 2015 04:17:17 +0000 Subject: [PATCH 1/4] starting to move important events over to EventBegin/Done --- exchange/bitswap/bitswap.go | 4 ++-- p2p/crypto/secio/protocol.go | 4 +++- routing/dht/dht.go | 8 ++++++++ routing/dht/routing.go | 15 ++++++++++----- 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f0063a9d9..0ccf0cffa 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -120,12 +120,12 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ctx, cancelFunc := context.WithCancel(parent) ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) - log.Event(ctx, "GetBlockRequestBegin", &k) + e := log.EventBegin(ctx, "GetBlockRequest", &k) log.Debugf("GetBlockRequestBegin") defer func() { cancelFunc() - log.Event(ctx, "GetBlockRequestEnd", &k) + e.Done() log.Debugf("GetBlockRequestEnd") }() diff --git a/p2p/crypto/secio/protocol.go b/p2p/crypto/secio/protocol.go index 503c09149..e1960dc14 100644 --- a/p2p/crypto/secio/protocol.go +++ b/p2p/crypto/secio/protocol.go @@ -81,7 +81,9 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e } log.Debugf("handshake: %s <--start--> %s", s.localPeer, s.remotePeer) - log.Event(ctx, "secureHandshakeStart", s.localPeer) + e := log.EventBegin(ctx, "secureHandshake", s.localPeer) + defer e.Done() + s.local.permanentPubKey = s.localKey.GetPublic() myPubKeyBytes, err := s.local.permanentPubKey.Bytes() if err != nil { diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 9b1279f10..c0b27ea90 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -196,6 +196,8 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, // getValueSingle simply performs the get value RPC with the given parameters func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) { + e := log.EventBegin(ctx, "getValueSingle", p, &key) + defer e.Done() pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0) return dht.sendRequest(ctx, p, pmes) @@ -265,11 +267,17 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo { // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) { + e := log.EventBegin(ctx, "findPeerSingle", p, id) + defer e.Done() + pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) return dht.sendRequest(ctx, p, pmes) } func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) { + e := log.EventBegin(ctx, "findProvidersSingle", p, &key) + defer e.Done() + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0) return dht.sendRequest(ctx, p, pmes) } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index cd929c255..3ce5d5ec6 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -122,10 +122,12 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { log := dht.log().Prefix("Provide(%s)", key) + log.Debugf("start", key) - log.Event(ctx, "provideBegin", &key) defer log.Debugf("end", key) - defer log.Event(ctx, "provideEnd", &key) + + e := log.EventBegin(ctx, "provide", &key) + defer e.Done() // add self locally dht.providers.AddProvider(key, dht.self) @@ -163,6 +165,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn // Kademlia 'node lookup' operation. Returns a channel of the K closest peers // to the given key func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) { + e := log.EventBegin(ctx, "getClosestPeers", &key) tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) if len(tablepeers) == 0 { return nil, errors.Wrap(kb.ErrLookupFailure) @@ -204,6 +207,7 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer go func() { defer close(out) + defer e.Done() // run it! _, err := query.Run(ctx, tablepeers) if err != nil { @@ -242,10 +246,9 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) { log := dht.log().Prefix("FindProviders(%s)", key) + e := log.EventBegin(ctx, "findProvidersAsync", &key) + defer e.Done() defer close(peerOut) - defer log.Event(ctx, "findProviders end", &key) - log.Debug("begin") - defer log.Debug("begin") ps := pset.NewLimited(count) provs := dht.providers.GetProviders(ctx, key) @@ -314,6 +317,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) { + e := log.EventBegin(ctx, "FindPeer", id) + defer e.Done() // Check if were already connected to them if pi := dht.FindLocal(id); pi.ID != "" { From 3c55902811fdb25455ded17981f40932922f8c4a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 15 Jan 2015 04:45:34 +0000 Subject: [PATCH 2/4] rewrite as single line defer logs --- exchange/bitswap/bitswap.go | 3 +-- p2p/crypto/secio/protocol.go | 3 +-- routing/dht/dht.go | 9 +++------ routing/dht/routing.go | 9 +++------ 4 files changed, 8 insertions(+), 16 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 0ccf0cffa..25025bb8e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -120,12 +120,11 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ctx, cancelFunc := context.WithCancel(parent) ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) - e := log.EventBegin(ctx, "GetBlockRequest", &k) + defer log.EventBegin(ctx, "GetBlockRequest", &k).Done() log.Debugf("GetBlockRequestBegin") defer func() { cancelFunc() - e.Done() log.Debugf("GetBlockRequestEnd") }() diff --git a/p2p/crypto/secio/protocol.go b/p2p/crypto/secio/protocol.go index e1960dc14..998e55913 100644 --- a/p2p/crypto/secio/protocol.go +++ b/p2p/crypto/secio/protocol.go @@ -81,8 +81,7 @@ func (s *secureSession) handshake(ctx context.Context, insecure io.ReadWriter) e } log.Debugf("handshake: %s <--start--> %s", s.localPeer, s.remotePeer) - e := log.EventBegin(ctx, "secureHandshake", s.localPeer) - defer e.Done() + defer log.EventBegin(ctx, "secureHandshake", s.localPeer).Done() s.local.permanentPubKey = s.localKey.GetPublic() myPubKeyBytes, err := s.local.permanentPubKey.Bytes() diff --git a/routing/dht/dht.go b/routing/dht/dht.go index c0b27ea90..7befb6597 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -196,8 +196,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, // getValueSingle simply performs the get value RPC with the given parameters func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) { - e := log.EventBegin(ctx, "getValueSingle", p, &key) - defer e.Done() + defer log.EventBegin(ctx, "getValueSingle", p, &key).Done() pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0) return dht.sendRequest(ctx, p, pmes) @@ -267,16 +266,14 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.PeerInfo { // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) { - e := log.EventBegin(ctx, "findPeerSingle", p, id) - defer e.Done() + defer log.EventBegin(ctx, "findPeerSingle", p, id).Done() pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) return dht.sendRequest(ctx, p, pmes) } func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) { - e := log.EventBegin(ctx, "findProvidersSingle", p, &key) - defer e.Done() + defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done() pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0) return dht.sendRequest(ctx, p, pmes) diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 3ce5d5ec6..05fa028a8 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -126,8 +126,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { log.Debugf("start", key) defer log.Debugf("end", key) - e := log.EventBegin(ctx, "provide", &key) - defer e.Done() + defer log.EventBegin(ctx, "provide", &key).Done() // add self locally dht.providers.AddProvider(key, dht.self) @@ -246,8 +245,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) { log := dht.log().Prefix("FindProviders(%s)", key) - e := log.EventBegin(ctx, "findProvidersAsync", &key) - defer e.Done() + defer log.EventBegin(ctx, "findProvidersAsync", &key).Done() defer close(peerOut) ps := pset.NewLimited(count) @@ -317,8 +315,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) { - e := log.EventBegin(ctx, "FindPeer", id) - defer e.Done() + defer log.EventBegin(ctx, "FindPeer", id).Done() // Check if were already connected to them if pi := dht.FindLocal(id); pi.ID != "" { From 7c9431ba23b6c919f256d97a116d2b2cb10fb367 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 15 Jan 2015 17:47:36 +0000 Subject: [PATCH 3/4] add events for handlers --- routing/dht/handlers.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 8f66afbf6..59e30d398 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -39,6 +39,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { } func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + defer log.EventBegin(ctx, "handleGetValue", p).Done() log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey()) // setup response @@ -114,6 +115,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess // Store a value in this peer local storage func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + defer log.EventBegin(ctx, "handlePutValue", p).Done() dskey := u.Key(pmes.GetKey()).DsKey() if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil { @@ -137,6 +139,7 @@ func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) ( } func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + defer log.EventBegin(ctx, "handleFindPeer", p).Done() resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel()) var closest []peer.ID @@ -166,6 +169,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess } func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + defer log.EventBegin(ctx, "handleGetProviders", p).Done() resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) key := u.Key(pmes.GetKey()) @@ -211,6 +215,7 @@ type providerInfo struct { } func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { + defer log.EventBegin(ctx, "handleAddProvider", p).Done() key := u.Key(pmes.GetKey()) log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key) From 6bfb9d838c1e5bbacdd9e5aa0211a0678f2eacaf Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 15 Jan 2015 22:01:33 +0000 Subject: [PATCH 4/4] remove low signal log messages --- routing/dht/routing.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 05fa028a8..4a2cc3518 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -123,9 +123,6 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { log := dht.log().Prefix("Provide(%s)", key) - log.Debugf("start", key) - defer log.Debugf("end", key) - defer log.EventBegin(ctx, "provide", &key).Done() // add self locally