mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-05 08:18:03 +08:00
Merge pull request #3189 from ipfs/feat/metrics/interface
metrics: introduce go-metrics-interface
This commit is contained in:
commit
f23cd5c84c
@ -1,9 +1,11 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-ipfs/blocks"
|
||||
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
|
||||
|
||||
"github.com/ipfs/go-ipfs/blocks"
|
||||
|
||||
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
|
||||
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
|
||||
@ -12,15 +14,21 @@ import (
|
||||
type arccache struct {
|
||||
arc *lru.ARCCache
|
||||
blockstore Blockstore
|
||||
|
||||
hits metrics.Counter
|
||||
total metrics.Counter
|
||||
}
|
||||
|
||||
func arcCached(bs Blockstore, lruSize int) (*arccache, error) {
|
||||
func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) {
|
||||
arc, err := lru.NewARC(lruSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c := &arccache{arc: arc, blockstore: bs}
|
||||
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
|
||||
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()
|
||||
|
||||
return &arccache{arc: arc, blockstore: bs}, nil
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (b *arccache) DeleteBlock(k key.Key) error {
|
||||
@ -42,6 +50,7 @@ func (b *arccache) DeleteBlock(k key.Key) error {
|
||||
// if ok == false has is inconclusive
|
||||
// if ok == true then has respons to question: is it contained
|
||||
func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {
|
||||
b.total.Inc()
|
||||
if k == "" {
|
||||
// Return cache invalid so the call to blockstore happens
|
||||
// in case of invalid key and correct error is created.
|
||||
@ -50,6 +59,7 @@ func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {
|
||||
|
||||
h, ok := b.arc.Get(k)
|
||||
if ok {
|
||||
b.hits.Inc()
|
||||
return h.(bool), true
|
||||
}
|
||||
return false, false
|
||||
|
||||
@ -140,7 +140,7 @@ func TestGetAndDeleteFalseShortCircuit(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestArcCreationFailure(t *testing.T) {
|
||||
if arc, err := arcCached(nil, -1); arc != nil || err == nil {
|
||||
if arc, err := newARCCachedBS(context.TODO(), nil, -1); arc != nil || err == nil {
|
||||
t.Fatal("expected error and no cache")
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,12 +2,14 @@ package blockstore
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-ipfs/blocks"
|
||||
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
|
||||
|
||||
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
|
||||
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
|
||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||
bloom "gx/ipfs/QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM/bbloom"
|
||||
)
|
||||
|
||||
// bloomCached returns Blockstore that caches Has requests using Bloom filter
|
||||
@ -18,9 +20,31 @@ func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount int) (
|
||||
return nil, err
|
||||
}
|
||||
bc := &bloomcache{blockstore: bs, bloom: bl}
|
||||
bc.hits = metrics.NewCtx(ctx, "bloom.hits_total",
|
||||
"Number of cache hits in bloom cache").Counter()
|
||||
bc.total = metrics.NewCtx(ctx, "bloom_total",
|
||||
"Total number of requests to bloom cache").Counter()
|
||||
|
||||
bc.Invalidate()
|
||||
go bc.Rebuild(ctx)
|
||||
if metrics.Active() {
|
||||
go func() {
|
||||
fill := metrics.NewCtx(ctx, "bloom_fill_ratio",
|
||||
"Ratio of bloom filter fullnes, (updated once a minute)").Gauge()
|
||||
|
||||
<-bc.rebuildChan
|
||||
t := time.NewTicker(1 * time.Minute)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Stop()
|
||||
return
|
||||
case <-t.C:
|
||||
fill.Set(bc.bloom.FillRatio())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
return bc, nil
|
||||
}
|
||||
|
||||
@ -33,8 +57,8 @@ type bloomcache struct {
|
||||
blockstore Blockstore
|
||||
|
||||
// Statistics
|
||||
hits uint64
|
||||
misses uint64
|
||||
hits metrics.Counter
|
||||
total metrics.Counter
|
||||
}
|
||||
|
||||
func (b *bloomcache) Invalidate() {
|
||||
@ -84,6 +108,7 @@ func (b *bloomcache) DeleteBlock(k key.Key) error {
|
||||
// if ok == false has is inconclusive
|
||||
// if ok == true then has respons to question: is it contained
|
||||
func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
|
||||
b.total.Inc()
|
||||
if k == "" {
|
||||
// Return cache invalid so call to blockstore
|
||||
// in case of invalid key is forwarded deeper
|
||||
@ -92,6 +117,7 @@ func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
|
||||
if b.BloomActive() {
|
||||
blr := b.bloom.HasTS([]byte(k))
|
||||
if blr == false { // not contained in bloom is only conclusive answer bloom gives
|
||||
b.hits.Inc()
|
||||
return false, true
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package blockstore
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
|
||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||
)
|
||||
|
||||
@ -33,12 +34,15 @@ func CachedBlockstore(bs GCBlockstore,
|
||||
if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 {
|
||||
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
|
||||
}
|
||||
|
||||
ctx = metrics.CtxSubScope(ctx, "bs.cache")
|
||||
|
||||
if opts.HasARCCacheSize > 0 {
|
||||
cbs, err = newARCCachedBS(ctx, cbs, opts.HasARCCacheSize)
|
||||
}
|
||||
if opts.HasBloomFilterSize != 0 {
|
||||
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes)
|
||||
}
|
||||
if opts.HasARCCacheSize > 0 {
|
||||
cbs, err = arcCached(cbs, opts.HasARCCacheSize)
|
||||
}
|
||||
|
||||
return cbs, err
|
||||
}
|
||||
|
||||
@ -11,11 +11,6 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
|
||||
_ "gx/ipfs/QmV3NSS3A1kX5s28r7yLczhDsXzkgo65cqRgKFXYunWZmD/metrics/runtime"
|
||||
|
||||
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
|
||||
|
||||
cmds "github.com/ipfs/go-ipfs/commands"
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
commands "github.com/ipfs/go-ipfs/core/commands"
|
||||
@ -25,10 +20,17 @@ import (
|
||||
nodeMount "github.com/ipfs/go-ipfs/fuse/node"
|
||||
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
|
||||
migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
|
||||
|
||||
"gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
|
||||
"gx/ipfs/QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa/client_golang/prometheus"
|
||||
|
||||
conn "gx/ipfs/QmUuwQUJmtvC6ReYcu7xaYKEUM3pD46H18dFn3LBhVt2Di/go-libp2p/p2p/net/conn"
|
||||
mprome "gx/ipfs/QmXWro6iddJRbGWUoZDpTu6tjo5EXX4xJHHR9VczeoGZbw/go-metrics-prometheus"
|
||||
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
|
||||
util "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
|
||||
pstore "gx/ipfs/QmdMfSLMDBDYhtc4oF3NYGCZr5dy4wQb6Ji26N4D4mdxa2/go-libp2p-peerstore"
|
||||
prometheus "gx/ipfs/QmdhsRK1EK2fvAz2i2SH5DEfkL6seDuyMYEsxKa9Braim3/client_golang/prometheus"
|
||||
|
||||
_ "gx/ipfs/QmV3NSS3A1kX5s28r7yLczhDsXzkgo65cqRgKFXYunWZmD/metrics/runtime"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -359,8 +361,11 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
}
|
||||
|
||||
// initialize metrics collector
|
||||
prometheus.MustRegisterOrGet(&corehttp.IpfsNodeCollector{Node: node})
|
||||
prometheus.EnableCollectChecks(true)
|
||||
err = mprome.Inject()
|
||||
if err != nil {
|
||||
log.Warningf("Injecting prometheus handler for metrics failed with message: %s\n", err.Error())
|
||||
}
|
||||
prometheus.MustRegister(&corehttp.IpfsNodeCollector{Node: node})
|
||||
|
||||
fmt.Printf("Daemon is ready\n")
|
||||
// collect long-running errors and block for shutdown
|
||||
|
||||
@ -16,14 +16,15 @@ import (
|
||||
pin "github.com/ipfs/go-ipfs/pin"
|
||||
repo "github.com/ipfs/go-ipfs/repo"
|
||||
cfg "github.com/ipfs/go-ipfs/repo/config"
|
||||
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
|
||||
|
||||
retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore"
|
||||
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
|
||||
goprocessctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
|
||||
ci "gx/ipfs/QmVoi5es8D5fNHZDqoW6DgDAEPEV5hQp8GBz161vZXiwpQ/go-libp2p-crypto"
|
||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
|
||||
dsync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
|
||||
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
|
||||
pstore "gx/ipfs/QmdMfSLMDBDYhtc4oF3NYGCZr5dy4wQb6Ji26N4D4mdxa2/go-libp2p-peerstore"
|
||||
)
|
||||
|
||||
@ -109,6 +110,7 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx = metrics.CtxScope(ctx, "ipfs")
|
||||
|
||||
n := &IpfsNode{
|
||||
mode: offlineMode,
|
||||
|
||||
@ -4,9 +4,9 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
prometheus "gx/ipfs/QmdhsRK1EK2fvAz2i2SH5DEfkL6seDuyMYEsxKa9Braim3/client_golang/prometheus"
|
||||
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
|
||||
prometheus "gx/ipfs/QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// This adds the scraping endpoint which Prometheus uses to fetch metrics.
|
||||
@ -53,6 +53,9 @@ func (c IpfsNodeCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
|
||||
func (c IpfsNodeCollector) PeersTotalValues() map[string]float64 {
|
||||
vals := make(map[string]float64)
|
||||
if c.Node.PeerHost == nil {
|
||||
return vals
|
||||
}
|
||||
for _, conn := range c.Node.PeerHost.Network().Conns() {
|
||||
tr := ""
|
||||
for _, proto := range conn.RemoteMultiaddr().Protocols() {
|
||||
|
||||
20
package.json
20
package.json
@ -46,9 +46,9 @@
|
||||
"version": "0.0.0"
|
||||
},
|
||||
{
|
||||
"hash": "QmdhsRK1EK2fvAz2i2SH5DEfkL6seDuyMYEsxKa9Braim3",
|
||||
"hash": "QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa",
|
||||
"name": "client_golang",
|
||||
"version": "0.0.0"
|
||||
"version": "0.1.0"
|
||||
},
|
||||
{
|
||||
"hash": "Qma1FrGRasghpuETfCtsKdFtXKQffpNnakv3wG3QaMwCVi",
|
||||
@ -170,9 +170,9 @@
|
||||
},
|
||||
{
|
||||
"author": "kubuxu",
|
||||
"hash": "QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5",
|
||||
"hash": "QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM",
|
||||
"name": "bbloom",
|
||||
"version": "0.0.2"
|
||||
"version": "0.1.0"
|
||||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
@ -239,6 +239,18 @@
|
||||
"hash": "QmYrv4LgCC8FhG2Ab4bwuq5DqBdwMtx3hMb3KKJDZcr2d7",
|
||||
"name": "go-libp2p-loggables",
|
||||
"version": "1.0.11"
|
||||
},
|
||||
{
|
||||
"author": "ipfs",
|
||||
"hash": "QmXWro6iddJRbGWUoZDpTu6tjo5EXX4xJHHR9VczeoGZbw",
|
||||
"name": "go-metrics-prometheus",
|
||||
"version": "0.3.0"
|
||||
},
|
||||
{
|
||||
"author": "ipfs",
|
||||
"hash": "QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5",
|
||||
"name": "go-metrics-interface",
|
||||
"version": "0.1.2"
|
||||
}
|
||||
],
|
||||
"gxVersion": "0.4.0",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user