mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
* blossomsub: Remove unused mutex * blossomsub: Add RPC queue * blossomsub: Use RPC queue * blossomsub: Add IDONTWANT control message to protos * blossomsub: Add IDONTWANT tracing support * blossomsub: Add pre-validation * blossomsub: Add IDONTWANT feature flag * blossomsub: Add IDONTWANT parameters * blossomsub: Add IDONTWANT observability * blossomsub: Send IDONTWANT control messages * blossomsub: Handle IDONTWANT control messages * blossomsub: Clear maps efficiently * blossomsub: Increase IDONTWANT parameter defaults * blossomsub: Do not send IDONTWANT to original sender * blossomsub: Add IDONTWANT unit tests
340 lines
11 KiB
Go
340 lines
11 KiB
Go
package observability
|
|
|
|
import (
|
|
"encoding/base64"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
|
|
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
|
|
)
|
|
|
|
const blossomSubNamespace = "blossomsub"
|
|
|
|
var binaryEncoding = base64.RawStdEncoding
|
|
|
|
type blossomSubRawTracer struct {
|
|
addPeerTotal *prometheus.CounterVec
|
|
removePeerTotal prometheus.Counter
|
|
joinTotal *prometheus.CounterVec
|
|
leaveTotal *prometheus.CounterVec
|
|
graftTotal *prometheus.CounterVec
|
|
pruneTotal *prometheus.CounterVec
|
|
validateMessageTotal *prometheus.CounterVec
|
|
deliverMessageTotal *prometheus.CounterVec
|
|
rejectMessageTotal *prometheus.CounterVec
|
|
duplicateMessageTotal *prometheus.CounterVec
|
|
throttlePeerTotal prometheus.Counter
|
|
recvRPCTotal prometheus.Counter
|
|
sendRPCTotal prometheus.Counter
|
|
dropRPCTotal prometheus.Counter
|
|
undeliverableMessageTotal *prometheus.CounterVec
|
|
iHaveMessageHistogram *prometheus.HistogramVec
|
|
iWantMessageHistogram *prometheus.HistogramVec
|
|
iDontWantMessageHistogram *prometheus.HistogramVec
|
|
}
|
|
|
|
func (b *blossomSubRawTracer) observeControl(control *pb.ControlMessage, direction string) {
|
|
labels := []string{direction}
|
|
for _, iHave := range control.GetIhave() {
|
|
labels := append(labels, binaryEncoding.EncodeToString(iHave.GetBitmask()))
|
|
b.iHaveMessageHistogram.WithLabelValues(labels...).Observe(float64(len(iHave.GetMessageIDs())))
|
|
}
|
|
for _, iWant := range control.GetIwant() {
|
|
b.iWantMessageHistogram.WithLabelValues(labels...).Observe(float64(len(iWant.GetMessageIDs())))
|
|
}
|
|
for _, iDontWant := range control.GetIdontwant() {
|
|
b.iDontWantMessageHistogram.WithLabelValues(labels...).Observe(float64(len(iDontWant.GetMessageIDs())))
|
|
}
|
|
}
|
|
|
|
var _ blossomsub.RawTracer = (*blossomSubRawTracer)(nil)
|
|
|
|
// AddPeer implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) AddPeer(p peer.ID, proto protocol.ID) {
|
|
b.addPeerTotal.WithLabelValues(string(proto)).Inc()
|
|
}
|
|
|
|
// RemovePeer implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) RemovePeer(p peer.ID) {
|
|
b.removePeerTotal.Inc()
|
|
}
|
|
|
|
// Join implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) Join(bitmask []byte) {
|
|
b.joinTotal.WithLabelValues(binaryEncoding.EncodeToString(bitmask)).Inc()
|
|
}
|
|
|
|
// Leave implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) Leave(bitmask []byte) {
|
|
b.leaveTotal.WithLabelValues(binaryEncoding.EncodeToString(bitmask)).Inc()
|
|
}
|
|
|
|
// Graft implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) Graft(p peer.ID, bitmask []byte) {
|
|
b.graftTotal.WithLabelValues(binaryEncoding.EncodeToString(bitmask)).Inc()
|
|
}
|
|
|
|
// Prune implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) Prune(p peer.ID, bitmask []byte) {
|
|
b.pruneTotal.WithLabelValues(binaryEncoding.EncodeToString(bitmask)).Inc()
|
|
}
|
|
|
|
// ValidateMessage implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) ValidateMessage(msg *blossomsub.Message) {
|
|
b.validateMessageTotal.WithLabelValues(binaryEncoding.EncodeToString(msg.GetBitmask())).Inc()
|
|
}
|
|
|
|
// SignMessage implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) DeliverMessage(msg *blossomsub.Message) {
|
|
b.deliverMessageTotal.WithLabelValues(binaryEncoding.EncodeToString(msg.GetBitmask())).Inc()
|
|
}
|
|
|
|
// RejectMessage implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) RejectMessage(msg *blossomsub.Message, reason string) {
|
|
b.rejectMessageTotal.WithLabelValues(binaryEncoding.EncodeToString(msg.GetBitmask()), reason).Inc()
|
|
}
|
|
|
|
// DuplicateMessage implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) DuplicateMessage(msg *blossomsub.Message) {
|
|
b.duplicateMessageTotal.WithLabelValues(binaryEncoding.EncodeToString(msg.GetBitmask())).Inc()
|
|
}
|
|
|
|
// ThrottlePeer implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) ThrottlePeer(p peer.ID) {
|
|
b.throttlePeerTotal.Inc()
|
|
}
|
|
|
|
// RecvRPC implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) RecvRPC(rpc *blossomsub.RPC) {
|
|
b.recvRPCTotal.Inc()
|
|
b.observeControl(rpc.GetControl(), "recv")
|
|
}
|
|
|
|
// SendRPC implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) SendRPC(rpc *blossomsub.RPC, p peer.ID) {
|
|
b.sendRPCTotal.Inc()
|
|
b.observeControl(rpc.GetControl(), "send")
|
|
}
|
|
|
|
// DropRPC implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) DropRPC(rpc *blossomsub.RPC, p peer.ID) {
|
|
b.dropRPCTotal.Inc()
|
|
b.observeControl(rpc.GetControl(), "drop")
|
|
}
|
|
|
|
// UndeliverableMessage implements blossomsub.RawTracer.
|
|
func (b *blossomSubRawTracer) UndeliverableMessage(msg *blossomsub.Message) {
|
|
b.undeliverableMessageTotal.WithLabelValues(binaryEncoding.EncodeToString(msg.GetBitmask())).Inc()
|
|
}
|
|
|
|
var _ prometheus.Collector = (*blossomSubRawTracer)(nil)
|
|
|
|
// Describe implements prometheus.Collector.
|
|
func (b *blossomSubRawTracer) Describe(ch chan<- *prometheus.Desc) {
|
|
b.addPeerTotal.Describe(ch)
|
|
b.removePeerTotal.Describe(ch)
|
|
b.joinTotal.Describe(ch)
|
|
b.leaveTotal.Describe(ch)
|
|
b.graftTotal.Describe(ch)
|
|
b.pruneTotal.Describe(ch)
|
|
b.validateMessageTotal.Describe(ch)
|
|
b.deliverMessageTotal.Describe(ch)
|
|
b.rejectMessageTotal.Describe(ch)
|
|
b.duplicateMessageTotal.Describe(ch)
|
|
b.throttlePeerTotal.Describe(ch)
|
|
b.recvRPCTotal.Describe(ch)
|
|
b.sendRPCTotal.Describe(ch)
|
|
b.dropRPCTotal.Describe(ch)
|
|
b.undeliverableMessageTotal.Describe(ch)
|
|
b.iHaveMessageHistogram.Describe(ch)
|
|
b.iWantMessageHistogram.Describe(ch)
|
|
b.iDontWantMessageHistogram.Describe(ch)
|
|
}
|
|
|
|
// Collect implements prometheus.Collector.
|
|
func (b *blossomSubRawTracer) Collect(ch chan<- prometheus.Metric) {
|
|
b.addPeerTotal.Collect(ch)
|
|
b.removePeerTotal.Collect(ch)
|
|
b.joinTotal.Collect(ch)
|
|
b.leaveTotal.Collect(ch)
|
|
b.graftTotal.Collect(ch)
|
|
b.pruneTotal.Collect(ch)
|
|
b.validateMessageTotal.Collect(ch)
|
|
b.deliverMessageTotal.Collect(ch)
|
|
b.rejectMessageTotal.Collect(ch)
|
|
b.duplicateMessageTotal.Collect(ch)
|
|
b.throttlePeerTotal.Collect(ch)
|
|
b.recvRPCTotal.Collect(ch)
|
|
b.sendRPCTotal.Collect(ch)
|
|
b.dropRPCTotal.Collect(ch)
|
|
b.undeliverableMessageTotal.Collect(ch)
|
|
b.iHaveMessageHistogram.Collect(ch)
|
|
b.iWantMessageHistogram.Collect(ch)
|
|
b.iDontWantMessageHistogram.Collect(ch)
|
|
}
|
|
|
|
type BlossomSubRawTracer interface {
|
|
blossomsub.RawTracer
|
|
prometheus.Collector
|
|
}
|
|
|
|
func NewBlossomSubRawTracer() BlossomSubRawTracer {
|
|
b := &blossomSubRawTracer{
|
|
addPeerTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "add_peer_total",
|
|
Help: "Total number of peers added to the mesh.",
|
|
},
|
|
[]string{"protocol"},
|
|
),
|
|
removePeerTotal: prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "remove_peer_total",
|
|
Help: "Total number of peers removed from the mesh.",
|
|
},
|
|
),
|
|
joinTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "join_total",
|
|
Help: "Total number of joins to the mesh.",
|
|
},
|
|
[]string{"bitmask"},
|
|
),
|
|
leaveTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "leave_total",
|
|
Help: "Total number of leaves from the mesh.",
|
|
},
|
|
[]string{"bitmask"},
|
|
),
|
|
graftTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "graft_total",
|
|
Help: "Total number of grafts.",
|
|
},
|
|
[]string{"bitmask"},
|
|
),
|
|
pruneTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "prune_total",
|
|
Help: "Total number of prunes.",
|
|
},
|
|
[]string{"bitmask"},
|
|
),
|
|
validateMessageTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "validate_message_total",
|
|
Help: "Total number of messages validated.",
|
|
},
|
|
[]string{"bitmask"},
|
|
),
|
|
deliverMessageTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "deliver_message_total",
|
|
Help: "Total number of messages delivered.",
|
|
},
|
|
[]string{"bitmask"},
|
|
),
|
|
rejectMessageTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "reject_message_total",
|
|
Help: "Total number of messages rejected.",
|
|
},
|
|
[]string{"bitmask", "reason"},
|
|
),
|
|
duplicateMessageTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "duplicate_message_total",
|
|
Help: "Total number of messages duplicated.",
|
|
},
|
|
[]string{"bitmask"},
|
|
),
|
|
throttlePeerTotal: prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "throttle_peer_total",
|
|
Help: "Total number of peers throttled.",
|
|
},
|
|
),
|
|
recvRPCTotal: prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "recv_rpc_total",
|
|
Help: "Total number of RPCs received.",
|
|
},
|
|
),
|
|
sendRPCTotal: prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "send_rpc_total",
|
|
Help: "Total number of RPCs sent.",
|
|
},
|
|
),
|
|
dropRPCTotal: prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "drop_rpc_total",
|
|
Help: "Total number of RPCs dropped.",
|
|
},
|
|
),
|
|
undeliverableMessageTotal: prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "undeliverable_message_total",
|
|
Help: "Total number of messages undeliverable.",
|
|
},
|
|
[]string{"bitmask"},
|
|
),
|
|
iHaveMessageHistogram: prometheus.NewHistogramVec(
|
|
prometheus.HistogramOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "ihave_messages",
|
|
Help: "Histogram of the number of messages in an IHave message.",
|
|
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
|
|
},
|
|
[]string{"direction", "bitmask"},
|
|
),
|
|
iWantMessageHistogram: prometheus.NewHistogramVec(
|
|
prometheus.HistogramOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "iwant_messages",
|
|
Help: "Histogram of the number of messages in an IWant message.",
|
|
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
|
|
},
|
|
[]string{"direction"},
|
|
),
|
|
iDontWantMessageHistogram: prometheus.NewHistogramVec(
|
|
prometheus.HistogramOpts{
|
|
Namespace: blossomSubNamespace,
|
|
Name: "idontwant_messages",
|
|
Help: "Histogram of the number of messages in an IDontWant message.",
|
|
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
|
|
},
|
|
[]string{"direction"},
|
|
),
|
|
}
|
|
return b
|
|
}
|
|
|
|
var globalBlossomSubRawTracer = NewBlossomSubRawTracer()
|
|
|
|
func init() {
|
|
prometheus.MustRegister(globalBlossomSubRawTracer)
|
|
}
|
|
|
|
func WithPrometheusRawTracer() blossomsub.Option {
|
|
return blossomsub.WithRawTracer(globalBlossomSubRawTracer)
|
|
}
|