From bdbb79d30f6f65ab41780239653e0ea048e3a81e Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 27 Apr 2020 19:10:11 -0700 Subject: [PATCH 1/2] fix: non-blocking peerlog logging Avoid ever blocking new connections in the peer logger. Instead: 1. Send all new peers to a highly buffered channel. 2. Emit "dropped event" errors whenever we detect that we're dropping events and falling behind. 3. Don't log protocols, they're too large. 4. Don't log disconnects, we don't need them. --- go.mod | 2 +- plugin/plugins/peerlog/peerlog.go | 127 +++++++++++++++++++++--------- 2 files changed, 90 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index 4d659ee0e..3ebadc7d9 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,6 @@ require ( github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jbenet/go-temp-err-catcher v0.1.0 github.com/jbenet/goprocess v0.1.4 - github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-libp2p v0.8.2 github.com/libp2p/go-libp2p-circuit v0.2.2 github.com/libp2p/go-libp2p-connmgr v0.2.1 @@ -101,6 +100,7 @@ require ( github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c go.uber.org/fx v1.12.0 + go.uber.org/zap v1.14.1 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect golang.org/x/sys v0.0.0-20200413165638-669c56c373c4 diff --git a/plugin/plugins/peerlog/peerlog.go b/plugin/plugins/peerlog/peerlog.go index 0b6b7a338..593563171 100644 --- a/plugin/plugins/peerlog/peerlog.go +++ b/plugin/plugins/peerlog/peerlog.go @@ -2,26 +2,44 @@ package peerlog import ( "fmt" + "sync/atomic" core "github.com/ipfs/go-ipfs/core" plugin "github.com/ipfs/go-ipfs/plugin" logging "github.com/ipfs/go-log" - eventbus "github.com/libp2p/go-eventbus" event "github.com/libp2p/go-libp2p-core/event" network "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "go.uber.org/zap" ) var log = logging.Logger("plugin/peerlog") +type eventType int + +const ( + eventConnect eventType = iota + eventIdentify +) + +type plEvent struct { + kind eventType + peer peer.ID +} + // Log all the PeerIDs we see // // Usage: // GOLOG_FILE=~/peer.log IPFS_LOGGING_FMT=json ipfs daemon // Output: // {"level":"info","ts":"2020-02-10T13:54:26.639Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:51","msg":"connected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"} -// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"disconnected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"} +// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"identified","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt","agent":"go-ipfs/0.5.0/"} // -type peerLogPlugin struct{} +type peerLogPlugin struct { + droppedCount uint64 + events chan plEvent +} var _ plugin.PluginDaemonInternal = (*peerLogPlugin)(nil) @@ -41,60 +59,93 @@ func (*peerLogPlugin) Version() string { } // Init initializes plugin -func (*peerLogPlugin) Init(*plugin.Environment) error { +func (pl *peerLogPlugin) Init(*plugin.Environment) error { + pl.events = make(chan plEvent, 64*1024) return nil } -func (*peerLogPlugin) Start(node *core.IpfsNode) error { +func (pl *peerLogPlugin) collectEvents(node *core.IpfsNode) { + go func() { + ctx := node.Context() + + dlog := log.Desugar() + for { + dropped := atomic.SwapUint64(&pl.droppedCount, 0) + if dropped > 0 { + dlog.Error("dropped events", zap.Uint64("count", dropped)) + } + + var e plEvent + select { + case <-ctx.Done(): + return + case e = <-pl.events: + } + + peerID := zap.String("peer", e.peer.Pretty()) + + switch e.kind { + case eventConnect: + dlog.Info("connected", peerID) + case eventIdentify: + agent, err := node.Peerstore.Get(e.peer, "AgentVersion") + switch err { + case nil: + case peerstore.ErrNotFound: + continue + default: + dlog.Error("failed to get agent version", zap.Error(err)) + continue + } + + agentS, ok := agent.(string) + if !ok { + continue + } + dlog.Info("identified", peerID, zap.String("agent", agentS)) + } + } + }() + +} + +func (pl *peerLogPlugin) emit(evt eventType, p peer.ID) { + select { + case pl.events <- plEvent{kind: evt, peer: p}: + default: + atomic.AddUint64(&pl.droppedCount, 1) + } +} + +func (pl *peerLogPlugin) Start(node *core.IpfsNode) error { // Ensure logs from this plugin get printed regardless of global IPFS_LOGGING value if err := logging.SetLogLevel("plugin/peerlog", "info"); err != nil { return fmt.Errorf("failed to set log level: %w", err) } - var notifee network.NotifyBundle - notifee.ConnectedF = func(net network.Network, conn network.Conn) { - // TODO: Log transport, country, etc? - log.Infow("connected", - "peer", conn.RemotePeer().Pretty(), - ) - } - notifee.DisconnectedF = func(net network.Network, conn network.Conn) { - log.Infow("disconnected", - "peer", conn.RemotePeer().Pretty(), - ) - } - node.PeerHost.Network().Notify(¬ifee) - sub, err := node.PeerHost.EventBus().Subscribe( - new(event.EvtPeerIdentificationCompleted), - eventbus.BufSize(1024), - ) + sub, err := node.PeerHost.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)) if err != nil { return fmt.Errorf("failed to subscribe to identify notifications") } + + var notifee network.NotifyBundle + notifee.ConnectedF = func(net network.Network, conn network.Conn) { + pl.emit(eventConnect, conn.RemotePeer()) + } + node.PeerHost.Network().Notify(¬ifee) + go func() { defer sub.Close() for e := range sub.Out() { switch e := e.(type) { case event.EvtPeerIdentificationCompleted: - protocols, err := node.Peerstore.GetProtocols(e.Peer) - if err != nil { - log.Errorw("failed to get protocols", "error", err) - continue - } - agent, err := node.Peerstore.Get(e.Peer, "AgentVersion") - if err != nil { - log.Errorw("failed to get agent version", "error", err) - continue - } - log.Infow( - "identified", - "peer", e.Peer.Pretty(), - "agent", agent, - "protocols", protocols, - ) + pl.emit(eventIdentify, e.Peer) } } }() + + go pl.collectEvents(node) + return nil } From a10a14fc0629229f2d8b42438ba79c05209d3a57 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 27 Apr 2020 19:37:03 -0700 Subject: [PATCH 2/2] feat(peerlog): add a bit of backoff logic --- plugin/plugins/peerlog/peerlog.go | 99 +++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/plugin/plugins/peerlog/peerlog.go b/plugin/plugins/peerlog/peerlog.go index 593563171..97f11eee6 100644 --- a/plugin/plugins/peerlog/peerlog.go +++ b/plugin/plugins/peerlog/peerlog.go @@ -3,6 +3,7 @@ package peerlog import ( "fmt" "sync/atomic" + "time" core "github.com/ipfs/go-ipfs/core" plugin "github.com/ipfs/go-ipfs/plugin" @@ -18,6 +19,13 @@ var log = logging.Logger("plugin/peerlog") type eventType int +var ( + // size of the event queue buffer + eventQueueSize = 64 * 1024 + // number of events to drop when busy. + busyDropAmount = eventQueueSize / 8 +) + const ( eventConnect eventType = iota eventIdentify @@ -60,53 +68,80 @@ func (*peerLogPlugin) Version() string { // Init initializes plugin func (pl *peerLogPlugin) Init(*plugin.Environment) error { - pl.events = make(chan plEvent, 64*1024) + pl.events = make(chan plEvent, eventQueueSize) return nil } func (pl *peerLogPlugin) collectEvents(node *core.IpfsNode) { - go func() { - ctx := node.Context() + ctx := node.Context() - dlog := log.Desugar() - for { - dropped := atomic.SwapUint64(&pl.droppedCount, 0) - if dropped > 0 { - dlog.Error("dropped events", zap.Uint64("count", dropped)) - } + busyCounter := 0 + dlog := log.Desugar() + for { + // Deal with dropped events. + dropped := atomic.SwapUint64(&pl.droppedCount, 0) + if dropped > 0 { + busyCounter++ - var e plEvent + // sleep a bit to give the system a chance to catch up with logging. select { + case <-time.After(time.Duration(busyCounter) * time.Second): case <-ctx.Done(): return - case e = <-pl.events: } - peerID := zap.String("peer", e.peer.Pretty()) - - switch e.kind { - case eventConnect: - dlog.Info("connected", peerID) - case eventIdentify: - agent, err := node.Peerstore.Get(e.peer, "AgentVersion") - switch err { - case nil: - case peerstore.ErrNotFound: - continue + // drain 1/8th of the backlog backlog so we + // don't immediately run into this situation + // again. + loop: + for i := 0; i < busyDropAmount; i++ { + select { + case <-pl.events: + dropped++ default: - dlog.Error("failed to get agent version", zap.Error(err)) - continue + break loop } - - agentS, ok := agent.(string) - if !ok { - continue - } - dlog.Info("identified", peerID, zap.String("agent", agentS)) } - } - }() + // Add in any events we've dropped in the mean-time. + dropped += atomic.SwapUint64(&pl.droppedCount, 0) + + // Report that we've dropped events. + dlog.Error("dropped events", zap.Uint64("count", dropped)) + } else { + busyCounter = 0 + } + + var e plEvent + select { + case <-ctx.Done(): + return + case e = <-pl.events: + } + + peerID := zap.String("peer", e.peer.Pretty()) + + switch e.kind { + case eventConnect: + dlog.Info("connected", peerID) + case eventIdentify: + agent, err := node.Peerstore.Get(e.peer, "AgentVersion") + switch err { + case nil: + case peerstore.ErrNotFound: + continue + default: + dlog.Error("failed to get agent version", zap.Error(err)) + continue + } + + agentS, ok := agent.(string) + if !ok { + continue + } + dlog.Info("identified", peerID, zap.String("agent", agentS)) + } + } } func (pl *peerLogPlugin) emit(evt eventType, p peer.ID) {