mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-26 12:57:44 +08:00
Merge pull request #7232 from ipfs/fix/less-verbose
fix: non-blocking peerlog logging
This commit is contained in:
commit
512050f1c7
2
go.mod
2
go.mod
@ -59,7 +59,6 @@ require (
|
||||
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
|
||||
github.com/jbenet/go-temp-err-catcher v0.1.0
|
||||
github.com/jbenet/goprocess v0.1.4
|
||||
github.com/libp2p/go-eventbus v0.1.0
|
||||
github.com/libp2p/go-libp2p v0.8.2
|
||||
github.com/libp2p/go-libp2p-circuit v0.2.2
|
||||
github.com/libp2p/go-libp2p-connmgr v0.2.1
|
||||
@ -101,6 +100,7 @@ require (
|
||||
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
|
||||
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c
|
||||
go.uber.org/fx v1.12.0
|
||||
go.uber.org/zap v1.14.1
|
||||
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
|
||||
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
|
||||
golang.org/x/sys v0.0.0-20200413165638-669c56c373c4
|
||||
|
||||
@ -2,26 +2,52 @@ package peerlog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
plugin "github.com/ipfs/go-ipfs/plugin"
|
||||
logging "github.com/ipfs/go-log"
|
||||
eventbus "github.com/libp2p/go-eventbus"
|
||||
event "github.com/libp2p/go-libp2p-core/event"
|
||||
network "github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var log = logging.Logger("plugin/peerlog")
|
||||
|
||||
type eventType int
|
||||
|
||||
var (
|
||||
// size of the event queue buffer
|
||||
eventQueueSize = 64 * 1024
|
||||
// number of events to drop when busy.
|
||||
busyDropAmount = eventQueueSize / 8
|
||||
)
|
||||
|
||||
const (
|
||||
eventConnect eventType = iota
|
||||
eventIdentify
|
||||
)
|
||||
|
||||
type plEvent struct {
|
||||
kind eventType
|
||||
peer peer.ID
|
||||
}
|
||||
|
||||
// Log all the PeerIDs we see
|
||||
//
|
||||
// Usage:
|
||||
// GOLOG_FILE=~/peer.log IPFS_LOGGING_FMT=json ipfs daemon
|
||||
// Output:
|
||||
// {"level":"info","ts":"2020-02-10T13:54:26.639Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:51","msg":"connected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"}
|
||||
// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"disconnected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"}
|
||||
// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"identified","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt","agent":"go-ipfs/0.5.0/"}
|
||||
//
|
||||
type peerLogPlugin struct{}
|
||||
type peerLogPlugin struct {
|
||||
droppedCount uint64
|
||||
events chan plEvent
|
||||
}
|
||||
|
||||
var _ plugin.PluginDaemonInternal = (*peerLogPlugin)(nil)
|
||||
|
||||
@ -41,60 +67,120 @@ func (*peerLogPlugin) Version() string {
|
||||
}
|
||||
|
||||
// Init initializes plugin
|
||||
func (*peerLogPlugin) Init(*plugin.Environment) error {
|
||||
func (pl *peerLogPlugin) Init(*plugin.Environment) error {
|
||||
pl.events = make(chan plEvent, eventQueueSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*peerLogPlugin) Start(node *core.IpfsNode) error {
|
||||
func (pl *peerLogPlugin) collectEvents(node *core.IpfsNode) {
|
||||
ctx := node.Context()
|
||||
|
||||
busyCounter := 0
|
||||
dlog := log.Desugar()
|
||||
for {
|
||||
// Deal with dropped events.
|
||||
dropped := atomic.SwapUint64(&pl.droppedCount, 0)
|
||||
if dropped > 0 {
|
||||
busyCounter++
|
||||
|
||||
// sleep a bit to give the system a chance to catch up with logging.
|
||||
select {
|
||||
case <-time.After(time.Duration(busyCounter) * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// drain 1/8th of the backlog backlog so we
|
||||
// don't immediately run into this situation
|
||||
// again.
|
||||
loop:
|
||||
for i := 0; i < busyDropAmount; i++ {
|
||||
select {
|
||||
case <-pl.events:
|
||||
dropped++
|
||||
default:
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
// Add in any events we've dropped in the mean-time.
|
||||
dropped += atomic.SwapUint64(&pl.droppedCount, 0)
|
||||
|
||||
// Report that we've dropped events.
|
||||
dlog.Error("dropped events", zap.Uint64("count", dropped))
|
||||
} else {
|
||||
busyCounter = 0
|
||||
}
|
||||
|
||||
var e plEvent
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e = <-pl.events:
|
||||
}
|
||||
|
||||
peerID := zap.String("peer", e.peer.Pretty())
|
||||
|
||||
switch e.kind {
|
||||
case eventConnect:
|
||||
dlog.Info("connected", peerID)
|
||||
case eventIdentify:
|
||||
agent, err := node.Peerstore.Get(e.peer, "AgentVersion")
|
||||
switch err {
|
||||
case nil:
|
||||
case peerstore.ErrNotFound:
|
||||
continue
|
||||
default:
|
||||
dlog.Error("failed to get agent version", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
agentS, ok := agent.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
dlog.Info("identified", peerID, zap.String("agent", agentS))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pl *peerLogPlugin) emit(evt eventType, p peer.ID) {
|
||||
select {
|
||||
case pl.events <- plEvent{kind: evt, peer: p}:
|
||||
default:
|
||||
atomic.AddUint64(&pl.droppedCount, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func (pl *peerLogPlugin) Start(node *core.IpfsNode) error {
|
||||
// Ensure logs from this plugin get printed regardless of global IPFS_LOGGING value
|
||||
if err := logging.SetLogLevel("plugin/peerlog", "info"); err != nil {
|
||||
return fmt.Errorf("failed to set log level: %w", err)
|
||||
}
|
||||
var notifee network.NotifyBundle
|
||||
notifee.ConnectedF = func(net network.Network, conn network.Conn) {
|
||||
// TODO: Log transport, country, etc?
|
||||
log.Infow("connected",
|
||||
"peer", conn.RemotePeer().Pretty(),
|
||||
)
|
||||
}
|
||||
notifee.DisconnectedF = func(net network.Network, conn network.Conn) {
|
||||
log.Infow("disconnected",
|
||||
"peer", conn.RemotePeer().Pretty(),
|
||||
)
|
||||
}
|
||||
node.PeerHost.Network().Notify(¬ifee)
|
||||
|
||||
sub, err := node.PeerHost.EventBus().Subscribe(
|
||||
new(event.EvtPeerIdentificationCompleted),
|
||||
eventbus.BufSize(1024),
|
||||
)
|
||||
sub, err := node.PeerHost.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to subscribe to identify notifications")
|
||||
}
|
||||
|
||||
var notifee network.NotifyBundle
|
||||
notifee.ConnectedF = func(net network.Network, conn network.Conn) {
|
||||
pl.emit(eventConnect, conn.RemotePeer())
|
||||
}
|
||||
node.PeerHost.Network().Notify(¬ifee)
|
||||
|
||||
go func() {
|
||||
defer sub.Close()
|
||||
for e := range sub.Out() {
|
||||
switch e := e.(type) {
|
||||
case event.EvtPeerIdentificationCompleted:
|
||||
protocols, err := node.Peerstore.GetProtocols(e.Peer)
|
||||
if err != nil {
|
||||
log.Errorw("failed to get protocols", "error", err)
|
||||
continue
|
||||
}
|
||||
agent, err := node.Peerstore.Get(e.Peer, "AgentVersion")
|
||||
if err != nil {
|
||||
log.Errorw("failed to get agent version", "error", err)
|
||||
continue
|
||||
}
|
||||
log.Infow(
|
||||
"identified",
|
||||
"peer", e.Peer.Pretty(),
|
||||
"agent", agent,
|
||||
"protocols", protocols,
|
||||
)
|
||||
pl.emit(eventIdentify, e.Peer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go pl.collectEvents(node)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user