Merge branch 'develop' into v2.0.4-p1

This commit is contained in:
Cassandra Heart 2024-11-22 19:58:04 -06:00
commit 35d4b7a728
No known key found for this signature in database
GPG Key ID: 6352152859385958
23 changed files with 1381 additions and 404 deletions

View File

@ -597,6 +597,919 @@
"x": 0,
"y": 19
},
"id": 12,
"panels": [],
"title": "Gossip",
"type": "row"
},
{
"datasource": {
"default": false,
"type": "prometheus",
"uid": "${datasource}"
},
"description": "The number of message IDs provided in IWANT control messages sent to remote peers.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 20
},
"id": 17,
"options": {
"legend": {
"calcs": [
"lastNotNull",
"min",
"max",
"mean"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Name",
"sortDesc": true
},
"tooltip": {
"mode": "multi",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"send\"}[$__rate_interval]))",
"hide": false,
"instant": false,
"legendFormat": "P99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.95, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"send\"}[$__rate_interval]))",
"instant": false,
"legendFormat": "P95",
"range": true,
"refId": "B"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.5, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"send\"}[$__rate_interval]))",
"hide": false,
"instant": false,
"legendFormat": "P50",
"range": true,
"refId": "C"
}
],
"title": "Sent IWANT message count histogram",
"type": "timeseries"
},
{
"datasource": {
"default": false,
"type": "prometheus",
"uid": "${datasource}"
},
"description": "The number of message IDs in IHAVE control messages which have been successfully sent to a remote peer.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 20
},
"id": 14,
"options": {
"legend": {
"calcs": [
"lastNotNull",
"min",
"max",
"mean"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Name",
"sortDesc": true
},
"tooltip": {
"mode": "multi",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.95, sum by(bitmask, le) (rate(blossomsub_ihave_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"send\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{bitmask}} - P95",
"range": true,
"refId": "A"
}
],
"title": "Sent IHAVE message count histogram",
"transformations": [
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Frames Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAA(.*)",
"renamePattern": "Data Frames Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQA(.*)",
"renamePattern": "Data Frames Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Token Requests Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAA(.*)",
"renamePattern": "Data Token Requests Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEA(.*)",
"renamePattern": "Data Token Requests Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Peer Announcements Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAA(.*)",
"renamePattern": "Data Peer Announcements Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA(.*)",
"renamePattern": "Data Peer Announcements Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Master Frames$1"
}
}
],
"type": "timeseries"
},
{
"datasource": {
"default": false,
"type": "prometheus",
"uid": "${datasource}"
},
"description": "The number of message IDs provided in IWANT control messages received from remote peers.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 29
},
"id": 13,
"options": {
"legend": {
"calcs": [
"lastNotNull",
"min",
"max",
"mean"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Name",
"sortDesc": true
},
"tooltip": {
"mode": "multi",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"recv\"}[$__rate_interval]))",
"hide": false,
"instant": false,
"legendFormat": "P99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.95, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"recv\"}[$__rate_interval]))",
"instant": false,
"legendFormat": "P95",
"range": true,
"refId": "B"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.5, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"recv\"}[$__rate_interval]))",
"hide": false,
"instant": false,
"legendFormat": "P50",
"range": true,
"refId": "C"
}
],
"title": "Received IWANT message count histogram",
"type": "timeseries"
},
{
"datasource": {
"default": false,
"type": "prometheus",
"uid": "${datasource}"
},
"description": "The number of message IDs in IHAVE control messages which have received from a remote peer.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 29
},
"id": 16,
"options": {
"legend": {
"calcs": [
"lastNotNull",
"min",
"max",
"mean"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Name",
"sortDesc": true
},
"tooltip": {
"mode": "multi",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.95, sum by(bitmask, le) (rate(blossomsub_ihave_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"recv\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{bitmask}} - P95",
"range": true,
"refId": "A"
}
],
"title": "Received IHAVE message count histogram",
"transformations": [
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Frames Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAA(.*)",
"renamePattern": "Data Frames Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQA(.*)",
"renamePattern": "Data Frames Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Token Requests Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAA(.*)",
"renamePattern": "Data Token Requests Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEA(.*)",
"renamePattern": "Data Token Requests Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Peer Announcements Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAA(.*)",
"renamePattern": "Data Peer Announcements Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA(.*)",
"renamePattern": "Data Peer Announcements Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Master Frames$1"
}
}
],
"type": "timeseries"
},
{
"datasource": {
"default": false,
"type": "prometheus",
"uid": "${datasource}"
},
"description": "The number of message IDs provided in IWANT control messages which have not been sent to a remote peer due to an error (usually a full queue).",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 38
},
"id": 18,
"options": {
"legend": {
"calcs": [
"lastNotNull",
"min",
"max",
"mean"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Name",
"sortDesc": true
},
"tooltip": {
"mode": "multi",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"drop\"}[$__rate_interval]))",
"hide": false,
"instant": false,
"legendFormat": "P99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.95, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"drop\"}[$__rate_interval]))",
"instant": false,
"legendFormat": "P95",
"range": true,
"refId": "B"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.5, rate(blossomsub_iwant_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"drop\"}[$__rate_interval]))",
"hide": false,
"instant": false,
"legendFormat": "P50",
"range": true,
"refId": "C"
}
],
"title": "Dropped IWANT message count histogram",
"type": "timeseries"
},
{
"datasource": {
"default": false,
"type": "prometheus",
"uid": "${datasource}"
},
"description": "The number of message IDs in IHAVE control messages which have not been sent to a remote peer due to an error (usually a full queue).",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 38
},
"id": 15,
"options": {
"legend": {
"calcs": [
"lastNotNull",
"min",
"max",
"mean"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Name",
"sortDesc": true
},
"tooltip": {
"mode": "multi",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.95, sum by(bitmask, le) (rate(blossomsub_ihave_messages_bucket{job=~\"$job\", instance=~\"$host\", direction=\"drop\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{bitmask}} - P95",
"range": true,
"refId": "A"
}
],
"title": "Dropped IHAVE message count histogram",
"transformations": [
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Frames Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAAAA(.*)",
"renamePattern": "Data Frames Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQA(.*)",
"renamePattern": "Data Frames Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AACAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Token Requests Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAA(.*)",
"renamePattern": "Data Token Requests Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEA(.*)",
"renamePattern": "Data Token Requests Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Data Peer Announcements Shard 1$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAA(.*)",
"renamePattern": "Data Peer Announcements Shard 2$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAA(.*)",
"renamePattern": "Data Peer Announcements Shard 3$1"
}
},
{
"id": "renameByRegex",
"options": {
"regex": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA(.*)",
"renamePattern": "Master Frames$1"
}
}
],
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 47
},
"id": 8,
"panels": [],
"title": "Meshes",
@ -663,7 +1576,7 @@
"h": 9,
"w": 24,
"x": 0,
"y": 20
"y": 48
},
"id": 11,
"options": {
@ -836,7 +1749,7 @@
"h": 9,
"w": 12,
"x": 0,
"y": 29
"y": 57
},
"id": 9,
"options": {
@ -1009,7 +1922,7 @@
"h": 9,
"w": 12,
"x": 12,
"y": 29
"y": 57
},
"id": 10,
"options": {
@ -1126,7 +2039,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 38
"y": 66
},
"id": 6,
"panels": [],
@ -1195,7 +2108,7 @@
"h": 9,
"w": 24,
"x": 0,
"y": 39
"y": 67
},
"id": 7,
"options": {
@ -1338,6 +2251,6 @@
"timezone": "browser",
"title": "BlossomSub",
"uid": "ee47pcfax962ob",
"version": 29,
"version": 39,
"weekStart": ""
}

View File

@ -38,48 +38,34 @@ func TestBlossomSubMatchingFn(t *testing.T) {
}
// build the mesh
var subs []*Subscription
var bitmasks []*Bitmask
var subs [][]*Subscription
var bitmasks [][]*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x80, 0x00, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
bitmasks = append(bitmasks, b)
sub, err := ps.Subscribe([]byte{0x00, 0x80, 0x00, 0x00})
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub...)
subs = append(subs, sub)
}
time.Sleep(time.Second)
// publish a message
msg := []byte("message")
bitmasks[0].Publish(ctx, bitmasks[0].bitmask, msg)
bitmasks[0][0].Publish(ctx, bitmasks[0][0].bitmask, msg)
assertReceive(t, subs[0], msg)
assertReceive(t, subs[1], msg) // Should match via semver over CustomSub name, ignoring the version
assertReceive(t, subs[2], msg) // Should match via BlossomSubID_v2
// No message should be received because customsubA and customsubB have different names
ctxTimeout, timeoutCancel := context.WithTimeout(context.Background(), 1*time.Second)
defer timeoutCancel()
received := false
for {
msg, err := subs[3].Next(ctxTimeout)
if err != nil {
break
}
if msg != nil {
received = true
}
}
if received {
t.Fatal("Should not have received a message")
}
assertNeverReceives(t, subs[2], 1*time.Second)
}
func protocolNameMatch(base protocol.ID) func(protocol.ID) bool {

View File

@ -38,7 +38,7 @@ func assertPeerLists(t *testing.T, bitmask []byte, hosts []host.Host, ps *PubSub
}
}
func checkMessageRouting(t *testing.T, ctx context.Context, bitmasks []*Bitmask, subs []*Subscription) {
func checkMessageRouting(t *testing.T, ctx context.Context, bitmasks []*Bitmask, subs [][]*Subscription) {
for _, p := range bitmasks {
data := make([]byte, 16)
rand.Read(data)
@ -112,24 +112,48 @@ func connectAll(t *testing.T, hosts []host.Host) {
}
}
func assertReceive(t *testing.T, ch *Subscription, exp []byte) {
select {
case msg := <-ch.ch:
if !bytes.Equal(msg.GetData(), exp) {
t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(msg.GetData()))
}
case <-time.After(time.Second * 5):
t.Logf("%#v\n", ch)
func assertReceive(t *testing.T, ch []*Subscription, exp []byte) {
received := false
var wrong *Message
wg := sync.WaitGroup{}
done, cancel := context.WithCancel(context.TODO())
wg.Add(len(ch))
for _, c := range ch {
c := c
go func() {
defer wg.Done()
select {
case msg := <-c.ch:
if !bytes.Equal(msg.GetData(), exp) {
wrong = msg
} else {
received = true
}
cancel()
case <-done.Done():
case <-time.After(time.Second * 5):
t.Logf("%#v\n", ch)
}
}()
}
wg.Wait()
if !received {
t.Fatal("timed out waiting for message of: ", string(exp))
}
if wrong != nil {
t.Fatalf("got wrong message, expected %s but got %s", string(exp), string(wrong.Data))
}
}
func assertNeverReceives(t *testing.T, ch *Subscription, timeout time.Duration) {
select {
case msg := <-ch.ch:
t.Logf("%#v\n", ch)
t.Fatal("got unexpected message: ", string(msg.GetData()))
case <-time.After(timeout):
func assertNeverReceives(t *testing.T, ch []*Subscription, timeout time.Duration) {
for _, c := range ch {
select {
case msg := <-c.ch:
t.Logf("%#v\n", ch)
t.Fatal("got unexpected message: ", string(msg.GetData()))
case <-time.After(timeout):
}
}
}
@ -492,21 +516,21 @@ func TestBlossomSubGossip(t *testing.T) {
psubs := getBlossomSubs(ctx, hosts)
var msgs []*Subscription
var bitmasks []*Bitmask
var msgs [][]*Subscription
var bitmasks [][]*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x01})
b, err := ps.Join([]byte{0x00, 0x81})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
subch, err := ps.Subscribe([]byte{0x00, 0x01})
bitmasks = append(bitmasks, b)
subch, err := ps.Subscribe([]byte{0x00, 0x81})
if err != nil {
t.Fatal(err)
}
msgs = append(msgs, subch...)
msgs = append(msgs, subch)
}
denseConnect(t, hosts)
@ -519,16 +543,10 @@ func TestBlossomSubGossip(t *testing.T) {
owner := rand.Intn(len(psubs))
bitmasks[owner].Publish(ctx, []byte{0x00, 0x01}, msg)
bitmasks[owner][0].Publish(ctx, []byte{0x00, 0x81}, msg)
for _, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
assertReceive(t, sub, msg)
}
// wait a bit to have some gossip interleaved
@ -1063,13 +1081,13 @@ func TestMixedBlossomSub(t *testing.T) {
var msgs []*Subscription
var bitmasks []*Bitmask
for _, ps := range bsubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
subch, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
subch, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -1087,7 +1105,7 @@ func TestMixedBlossomSub(t *testing.T) {
owner := rand.Intn(len(bsubs))
bitmasks[owner].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[owner].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
for _, sub := range msgs {
got, err := sub.Next(ctx)
@ -1118,13 +1136,13 @@ func TestBlossomSubMultihops(t *testing.T) {
var subs []*Subscription
var bitmasks []*Bitmask
for i := 1; i < 6; i++ {
b, err := psubs[i].Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := psubs[i].Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
ch, err := psubs[i].Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
ch, err := psubs[i].Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -1135,7 +1153,7 @@ func TestBlossomSubMultihops(t *testing.T) {
time.Sleep(time.Second * 2)
msg := []byte("i like cats")
err := bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
err := bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
if err != nil {
t.Fatal(err)
}
@ -1178,31 +1196,31 @@ func TestBlossomSubTreeTopology(t *testing.T) {
[8] -> [9]
*/
var chs []*Subscription
var bitmasks []*Bitmask
var chs [][]*Subscription
var bitmasks [][]*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
ch, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
bitmasks = append(bitmasks, b)
ch, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
chs = append(chs, ch...)
chs = append(chs, ch)
}
// wait for heartbeats to build mesh
time.Sleep(time.Second * 2)
assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[0], 1, 5)
assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[1], 0, 2, 4)
assertPeerLists(t, []byte{0x00, 0x00, 0x80, 0x00}, hosts, psubs[2], 1, 3)
assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[0], 1, 5)
assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[1], 0, 2, 4)
assertPeerLists(t, []byte{0x00, 0x00, 0x81, 0x00}, hosts, psubs[2], 1, 3)
checkMessageRouting(t, ctx, []*Bitmask{bitmasks[9], bitmasks[3]}, chs)
checkMessageRouting(t, ctx, []*Bitmask{bitmasks[9][0], bitmasks[3][0]}, chs)
}
// this tests overlay bootstrapping through px in BlossomSub v1.2
@ -1258,20 +1276,20 @@ func TestBlossomSubStarTopology(t *testing.T) {
time.Sleep(time.Second)
// build the mesh
var subs []*Subscription
var subs [][]*Subscription
var bitmasks []*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub...)
subs = append(subs, sub)
}
// wait a bit for the mesh to build
@ -1287,7 +1305,7 @@ func TestBlossomSubStarTopology(t *testing.T) {
// send a message from each peer and assert it was propagated
for i := 0; i < 20; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
for _, sub := range subs {
assertReceive(t, sub, msg)
@ -1364,20 +1382,20 @@ func TestBlossomSubStarTopologyWithSignedPeerRecords(t *testing.T) {
time.Sleep(time.Second)
// build the mesh
var subs []*Subscription
var bitmasks []*Bitmask
var subs [][]*Subscription
var bitmasks [][]*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
bitmasks = append(bitmasks, b)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub...)
subs = append(subs, sub)
}
// wait a bit for the mesh to build
@ -1393,7 +1411,7 @@ func TestBlossomSubStarTopologyWithSignedPeerRecords(t *testing.T) {
// send a message from each peer and assert it was propagated
for i := 0; i < 20; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
for _, sub := range subs {
assertReceive(t, sub, msg)
@ -1422,20 +1440,20 @@ func TestBlossomSubDirectPeers(t *testing.T) {
}
// build the mesh
var subs []*Subscription
var bitmasks []*Bitmask
var subs [][]*Subscription
var bitmasks [][]*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
bitmasks = append(bitmasks, b)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub...)
subs = append(subs, sub)
}
time.Sleep(time.Second)
@ -1443,7 +1461,7 @@ func TestBlossomSubDirectPeers(t *testing.T) {
// publish some messages
for i := 0; i < 3; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
for _, sub := range subs {
assertReceive(t, sub, msg)
@ -1464,7 +1482,7 @@ func TestBlossomSubDirectPeers(t *testing.T) {
// publish some messages
for i := 0; i < 3; i++ {
msg := []byte(fmt.Sprintf("message %d", i+3))
bitmasks[i].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[i][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
for _, sub := range subs {
assertReceive(t, sub, msg)
@ -1491,34 +1509,34 @@ func TestBlossomSubPeerFilter(t *testing.T) {
connect(t, h[0], h[2])
// Join all peers
var subs []*Subscription
var bitmasks []*Bitmask
var subs [][]*Subscription
var bitmasks [][]*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
bitmasks = append(bitmasks, b)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub...)
subs = append(subs, sub)
}
time.Sleep(time.Second)
msg := []byte("message")
bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[0][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
assertReceive(t, subs[0], msg)
assertReceive(t, subs[1], msg)
assertNeverReceives(t, subs[2], time.Second)
msg = []byte("message2")
bitmasks[1].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[1][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
assertReceive(t, subs[0], msg)
assertReceive(t, subs[1], msg)
assertNeverReceives(t, subs[2], time.Second)
@ -1540,25 +1558,25 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) {
connect(t, h[0], h[2])
// Join all peers except h2
var subs []*Subscription
var bitmasks []*Bitmask
var subs [][]*Subscription
var bitmasks [][]*Bitmask
for _, ps := range psubs[:2] {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
bitmasks = append(bitmasks, b)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub...)
subs = append(subs, sub)
}
time.Sleep(time.Second)
b, err := psubs[2].Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := psubs[2].Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -1566,7 +1584,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) {
// h2 publishes some messages to build a fanout
for i := 0; i < 3; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
b[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
b[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
for _, sub := range subs {
assertReceive(t, sub, msg)
@ -1577,7 +1595,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) {
result := make(chan bool, 2)
psubs[2].eval <- func() {
rt := psubs[2].rt.(*BlossomSubRouter)
fanout := rt.fanout[string([]byte{0x00, 0x00, 0x80, 0x00})]
fanout := rt.fanout[string([]byte{0x00, 0x00, 0x81, 0x00})]
_, ok := fanout[h[0].ID()]
result <- ok
_, ok = fanout[h[1].ID()]
@ -1595,7 +1613,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) {
}
// now subscribe h2 too and verify tht h0 is in the mesh but not h1
_, err = psubs[2].Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
_, err = psubs[2].Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -1604,7 +1622,7 @@ func TestBlossomSubDirectPeersFanout(t *testing.T) {
psubs[2].eval <- func() {
rt := psubs[2].rt.(*BlossomSubRouter)
mesh := rt.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})]
mesh := rt.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})]
_, ok := mesh[h[0].ID()]
result <- ok
_, ok = mesh[h[1].ID()]
@ -1637,20 +1655,20 @@ func TestBlossomSubFloodPublish(t *testing.T) {
}
// build the (partial, unstable) mesh
var subs []*Subscription
var bitmasks []*Bitmask
var subs [][]*Subscription
var bitmasks [][]*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
bitmasks = append(bitmasks, b)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub...)
subs = append(subs, sub)
}
time.Sleep(time.Second)
@ -1658,7 +1676,7 @@ func TestBlossomSubFloodPublish(t *testing.T) {
// send a message from the star and assert it was received
for i := 0; i < 20; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
bitmasks[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[0][0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
for _, sub := range subs {
assertReceive(t, sub, msg)
@ -1674,7 +1692,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) {
psubs := getBlossomSubs(ctx, hosts)
for _, ps := range psubs {
_, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
_, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -1683,7 +1701,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) {
// at this point we have no connections and no mesh, so EnoughPeers should return false
res := make(chan bool, 1)
psubs[0].eval <- func() {
res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x80, 0x00}, 0)
res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x81, 0x00}, 0)
}
enough := <-res
if enough {
@ -1696,7 +1714,7 @@ func TestBlossomSubEnoughPeers(t *testing.T) {
time.Sleep(3 * time.Second)
psubs[0].eval <- func() {
res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x80, 0x00}, 0)
res <- psubs[0].rt.EnoughPeers([]byte{0x00, 0x00, 0x81, 0x00}, 0)
}
enough = <-res
if !enough {
@ -1768,13 +1786,13 @@ func TestBlossomSubNegativeScore(t *testing.T) {
var subs []*Subscription
var bitmasks []*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -1785,7 +1803,7 @@ func TestBlossomSubNegativeScore(t *testing.T) {
for i := 0; i < 20; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
bitmasks[i%20].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[i%20].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
time.Sleep(20 * time.Millisecond)
}
@ -1841,7 +1859,7 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) {
DecayInterval: time.Second,
DecayToZero: 0.01,
Bitmasks: map[string]*BitmaskScoreParams{
string([]byte{0x00, 0x00, 0x80, 0x00}): {
string([]byte{0x00, 0x00, 0x81, 0x00}): {
BitmaskWeight: 1,
TimeInMeshQuantum: time.Second,
InvalidMessageDeliveriesWeight: -1,
@ -1857,7 +1875,7 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) {
connectAll(t, hosts)
err := psubs[0].RegisterBitmaskValidator([]byte{0x00, 0x00, 0x80, 0x00}, func(ctx context.Context, p peer.ID, msg *Message) ValidationResult {
err := psubs[0].RegisterBitmaskValidator([]byte{0x00, 0x00, 0x81, 0x00}, func(ctx context.Context, p peer.ID, msg *Message) ValidationResult {
// we ignore host1 and reject host2
if p == hosts[1].ID() {
return ValidationIgnore
@ -1872,17 +1890,17 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) {
t.Fatal(err)
}
sub, err := psubs[0].Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
sub, err := psubs[0].Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
b1, err := psubs[1].Join([]byte{0x00, 0x00, 0x80, 0x00})
b1, err := psubs[1].Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
b2, err := psubs[2].Join([]byte{0x00, 0x00, 0x80, 0x00})
b2, err := psubs[2].Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -1899,8 +1917,8 @@ func TestBlossomSubScoreValidatorEx(t *testing.T) {
}
}
b1[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, []byte("i am not a walrus"))
b2[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, []byte("i am not a walrus either"))
b1[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, []byte("i am not a walrus"))
b2[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, []byte("i am not a walrus either"))
// assert no messages
expectNoMessage(sub[0])
@ -1939,7 +1957,7 @@ func TestBlossomSubPiggybackControl(t *testing.T) {
res := make(chan *RPC, 1)
ps.eval <- func() {
gs := ps.rt.(*BlossomSubRouter)
test1 := []byte{0x00, 0x80, 0x00, 0x00}
test1 := []byte{0x00, 0x81, 0x00, 0x00}
test2 := []byte{0x00, 0x20, 0x00, 0x00}
test3 := []byte{0x00, 0x00, 0x02, 0x00}
gs.mesh[string(test1)] = make(map[peer.ID]struct{})
@ -1961,7 +1979,7 @@ func TestBlossomSubPiggybackControl(t *testing.T) {
if len(rpc.Control.Graft) != 1 {
t.Fatal("expected 1 GRAFT")
}
if !bytes.Equal(rpc.Control.Graft[0].GetBitmask(), []byte{0x00, 0x80, 0x00, 0x00}) {
if !bytes.Equal(rpc.Control.Graft[0].GetBitmask(), []byte{0x00, 0x81, 0x00, 0x00}) {
t.Fatal("expected test1 as graft bitmask ID")
}
if len(rpc.Control.Prune) != 2 {
@ -1985,7 +2003,7 @@ func TestBlossomSubMultipleGraftBitmasks(t *testing.T) {
time.Sleep(time.Second * 1)
firstBitmask := []byte{0x00, 0x80, 0x00, 0x00}
firstBitmask := []byte{0x00, 0x81, 0x00, 0x00}
secondBitmask := []byte{0x00, 0x20, 0x00, 0x00}
thirdBitmask := []byte{0x00, 0x00, 0x02, 0x00}
@ -2058,7 +2076,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) {
DecayInterval: time.Second,
DecayToZero: 0.01,
Bitmasks: map[string]*BitmaskScoreParams{
string([]byte{0x00, 0x00, 0x80, 0x00}): {
string([]byte{0x00, 0x00, 0x81, 0x00}): {
BitmaskWeight: 1,
TimeInMeshWeight: 0.0002777,
TimeInMeshQuantum: time.Second,
@ -2099,13 +2117,13 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) {
// ask the real pubsus to join the bitmask
var bitmasks []*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -2123,7 +2141,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) {
// publish a bunch of messages from the real hosts
for i := 0; i < 1000; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
bitmasks[i%10].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[i%10].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
time.Sleep(20 * time.Millisecond)
}
@ -2137,7 +2155,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) {
gs := ps.rt.(*BlossomSubRouter)
count := 0
for _, h := range hosts[:10] {
_, ok := gs.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})][h.ID()]
_, ok := gs.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})][h.ID()]
if ok {
count++
}
@ -2167,7 +2185,7 @@ func TestBlossomSubLeaveBitmask(t *testing.T) {
// Join all peers
var subs []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -2181,9 +2199,9 @@ func TestBlossomSubLeaveBitmask(t *testing.T) {
psubs[0].rt.(*BlossomSubRouter).p.eval <- func() {
defer close(done)
psubs[0].rt.Leave([]byte{0x00, 0x00, 0x80, 0x00})
psubs[0].rt.Leave([]byte{0x00, 0x00, 0x81, 0x00})
time.Sleep(time.Second)
peerMap := psubs[0].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x80, 0x00})]
peerMap := psubs[0].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x81, 0x00})]
if len(peerMap) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 0")
}
@ -2205,7 +2223,7 @@ func TestBlossomSubLeaveBitmask(t *testing.T) {
// for peer 0.
psubs[1].rt.(*BlossomSubRouter).p.eval <- func() {
defer close(done)
peerMap2 := psubs[1].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x80, 0x00})]
peerMap2 := psubs[1].rt.(*BlossomSubRouter).backoff[string([]byte{0x00, 0x00, 0x81, 0x00})]
if len(peerMap2) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 1")
}
@ -2243,12 +2261,12 @@ func TestBlossomSubJoinBitmask(t *testing.T) {
peerMap := make(map[peer.ID]time.Time)
peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff)
router0.backoff[string([]byte{0x00, 0x00, 0x80, 0x00})] = peerMap
router0.backoff[string([]byte{0x00, 0x00, 0x81, 0x00})] = peerMap
// Join all peers
var subs []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
sub, err := ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -2258,7 +2276,7 @@ func TestBlossomSubJoinBitmask(t *testing.T) {
time.Sleep(time.Second)
router0.meshMx.RLock()
meshMap := router0.mesh[string([]byte{0x00, 0x00, 0x80, 0x00})]
meshMap := router0.mesh[string([]byte{0x00, 0x00, 0x81, 0x00})]
router0.meshMx.RUnlock()
if len(meshMap) != 1 {
t.Fatalf("Unexpect peer included in the mesh")
@ -2287,7 +2305,7 @@ func (sq *sybilSquatter) handleStream(s network.Stream) {
r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize)
w := msgio.NewVarintWriter(os)
truth := true
bitmask := []byte{0x00, 0x00, 0x80, 0x00}
bitmask := []byte{0x00, 0x00, 0x81, 0x00}
msg := &pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: truth, Bitmask: bitmask}}}
out, err := proto.Marshal(msg)
if err != nil {
@ -2326,7 +2344,7 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) {
WithPeerScore(
&PeerScoreParams{
Bitmasks: map[string]*BitmaskScoreParams{
string([]byte{0x00, 0x00, 0x80, 0x00}): {
string([]byte{0x00, 0x00, 0x81, 0x00}): {
BitmaskWeight: 1,
TimeInMeshQuantum: time.Second,
FirstMessageDeliveriesWeight: 1,
@ -2352,13 +2370,13 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) {
connect(t, hosts[0], hosts[1])
var bitmasks []*Bitmask
for _, ps := range psubs {
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
bitmasks = append(bitmasks, b...)
_, err = ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
_, err = ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -2368,7 +2386,7 @@ func TestBlossomSubPeerScoreInspect(t *testing.T) {
for i := 0; i < 20; i++ {
msg := []byte(fmt.Sprintf("message %d", i))
bitmasks[i%2].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
bitmasks[i%2].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
time.Sleep(20 * time.Millisecond)
}
@ -2391,7 +2409,7 @@ func TestBlossomSubPeerScoreResetBitmaskParams(t *testing.T) {
WithPeerScore(
&PeerScoreParams{
Bitmasks: map[string]*BitmaskScoreParams{
string([]byte{0x00, 0x00, 0x80, 0x00}): {
string([]byte{0x00, 0x00, 0x81, 0x00}): {
BitmaskWeight: 1,
TimeInMeshQuantum: time.Second,
FirstMessageDeliveriesWeight: 1,
@ -2411,7 +2429,7 @@ func TestBlossomSubPeerScoreResetBitmaskParams(t *testing.T) {
GraylistThreshold: -1000,
}))
bitmask, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
bitmask, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -2462,11 +2480,11 @@ func TestBlossomSubRPCFragmentation(t *testing.T) {
connect(t, hosts[0], hosts[1])
// have the real pubsub join the test bitmask
b, err := ps.Join([]byte{0x00, 0x00, 0x80, 0x00})
b, err := ps.Join([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
_, err = ps.Subscribe([]byte{0x00, 0x00, 0x80, 0x00})
_, err = ps.Subscribe([]byte{0x00, 0x00, 0x81, 0x00})
if err != nil {
t.Fatal(err)
}
@ -2480,7 +2498,7 @@ func TestBlossomSubRPCFragmentation(t *testing.T) {
for i := 0; i < nMessages; i++ {
msg := make([]byte, msgSize)
rand.Read(msg)
b[0].Publish(ctx, []byte{0x00, 0x00, 0x80, 0x00}, msg)
b[0].Publish(ctx, []byte{0x00, 0x00, 0x81, 0x00}, msg)
time.Sleep(20 * time.Millisecond)
}
@ -2535,7 +2553,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) {
r := msgio.NewVarintReaderSize(s, DefaultMaxMessageSize)
w := msgio.NewVarintWriter(os)
truth := true
bitmask := []byte{0x00, 0x00, 0x80, 0x00}
bitmask := []byte{0x00, 0x00, 0x81, 0x00}
msg := &pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: truth, Bitmask: bitmask}}}
out, err := proto.Marshal(msg)
@ -2610,7 +2628,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) {
func TestFragmentRPCFunction(t *testing.T) {
p := peer.ID("some-peer")
bitmask := []byte{0x00, 0x00, 0x80, 0x00}
bitmask := []byte{0x00, 0x00, 0x81, 0x00}
rpc := &RPC{RPC: new(pb.RPC), from: p}
limit := 1024

View File

@ -77,6 +77,7 @@ func (p *PubSub) handleNewStream(s network.Stream) {
return
}
if len(msgbytes) == 0 {
r.ReleaseMsg(msgbytes)
continue
}
@ -186,18 +187,18 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
s.Close()
pool.Put(buf)
return
}
_, err = s.Write(buf)
pool.Put(buf)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
s.Close()
return
}
pool.Put(buf)
case <-ctx.Done():
s.Close()
return
@ -237,7 +238,7 @@ func rpcWithControl(msgs []*pb.Message,
func copyRPC(rpc *RPC) *RPC {
res := new(RPC)
copiedRPC := (proto.Clone(rpc.RPC)).(*pb.RPC)
res.RPC = copiedRPC
*res = *rpc
res.RPC = (proto.Clone(rpc.RPC)).(*pb.RPC)
return res
}

View File

@ -56,7 +56,9 @@ type CacheEntry struct {
func (mc *MessageCache) Put(msg *Message) {
mid := mc.msgID(msg)
mc.msgs[string(mid)] = msg
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, bitmask: msg.GetBitmask()})
for _, bitmask := range SliceBitmask(msg.GetBitmask()) {
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, bitmask: bitmask})
}
}
func (mc *MessageCache) Get(mid []byte) (*Message, bool) {
@ -101,5 +103,5 @@ func (mc *MessageCache) Shift() {
for i := len(mc.history) - 2; i >= 0; i-- {
mc.history[i+1] = mc.history[i]
}
mc.history[0] = nil
mc.history[0] = last[:0]
}

View File

@ -154,7 +154,7 @@ func TestBasicSeqnoValidatorReplay(t *testing.T) {
}
for _, sub := range msgs {
assertNeverReceives(t, sub, time.Second)
assertNeverReceives(t, []*Subscription{sub}, time.Second)
}
}

View File

@ -349,13 +349,13 @@ func TestValidateAssortedOptions(t *testing.T) {
bitmasks1[i].Publish(ctx, bitmasks1[i].bitmask, msg)
for _, sub := range subs1 {
assertReceive(t, sub, msg)
assertReceive(t, []*Subscription{sub}, msg)
}
msg = []byte(fmt.Sprintf("message2 %d", i))
bitmasks2[i].Publish(ctx, bitmasks2[i].bitmask, msg)
for _, sub := range subs2 {
assertReceive(t, sub, msg)
assertReceive(t, []*Subscription{sub}, msg)
}
}
}

View File

@ -10,7 +10,7 @@ func GetMinimumVersionCutoff() time.Time {
}
func GetMinimumVersion() []byte {
return []byte{0x02, 0x00, 0x03}
return []byte{0x02, 0x00, 0x04}
}
func GetVersion() []byte {
@ -36,9 +36,9 @@ func FormatVersion(version []byte) string {
}
func GetPatchNumber() byte {
return 0x00
return 0x01
}
func GetRCNumber() byte {
return 0x00
return 0x01
}

View File

@ -17,30 +17,39 @@ import (
func (e *DataClockConsensusEngine) handleFrameMessage(
message *pb.Message,
) error {
go func() {
e.frameMessageProcessorCh <- message
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.frameMessageProcessorCh <- message:
default:
e.logger.Warn("dropping frame message")
}
return nil
}
func (e *DataClockConsensusEngine) handleTxMessage(
message *pb.Message,
) error {
go func() {
e.txMessageProcessorCh <- message
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.txMessageProcessorCh <- message:
default:
e.logger.Warn("dropping tx message")
}
return nil
}
func (e *DataClockConsensusEngine) handleInfoMessage(
message *pb.Message,
) error {
go func() {
e.infoMessageProcessorCh <- message
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.infoMessageProcessorCh <- message:
default:
e.logger.Warn("dropping info message")
}
return nil
}
@ -130,9 +139,13 @@ func (e *DataClockConsensusEngine) insertTxMessage(
Seqno: nil,
}
go func() {
e.txMessageProcessorCh <- m
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.txMessageProcessorCh <- m:
default:
e.logger.Warn("dropping tx message")
}
return nil
}

View File

@ -7,7 +7,6 @@ import (
"golang.org/x/crypto/sha3"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/data/internal"
"source.quilibrium.com/quilibrium/monorepo/node/internal/frametime"
@ -61,7 +60,6 @@ func (e *DataClockConsensusEngine) prove(
if e.lastProven >= previousFrame.FrameNumber && e.lastProven != 0 {
return previousFrame, nil
}
e.stagedTransactionsMx.Lock()
executionOutput := &protobufs.IntrinsicExecutionOutput{}
_, tries, err := e.clockStore.GetDataClockFrame(
e.filter,
@ -78,29 +76,33 @@ func (e *DataClockConsensusEngine) prove(
e.logger,
)
if err != nil {
e.stagedTransactionsMx.Unlock()
return nil, errors.Wrap(err, "prove")
}
if e.stagedTransactions == nil {
e.stagedTransactions = &protobufs.TokenRequests{}
e.stagedTransactionsMx.Lock()
stagedTransactions := e.stagedTransactions
if stagedTransactions == nil {
stagedTransactions = &protobufs.TokenRequests{}
}
e.stagedTransactions = &protobufs.TokenRequests{
Requests: make([]*protobufs.TokenRequest, 0, len(stagedTransactions.Requests)),
}
e.stagedTransactionsSet = make(map[string]struct{}, len(e.stagedTransactionsSet))
e.stagedTransactionsMx.Unlock()
e.logger.Info(
"proving new frame",
zap.Int("transactions", len(e.stagedTransactions.Requests)),
zap.Int("transactions", len(stagedTransactions.Requests)),
)
var validTransactions *protobufs.TokenRequests
var invalidTransactions *protobufs.TokenRequests
app, validTransactions, invalidTransactions, err = app.ApplyTransitions(
previousFrame.FrameNumber+1,
e.stagedTransactions,
stagedTransactions,
true,
)
if err != nil {
e.stagedTransactions = &protobufs.TokenRequests{}
e.stagedTransactionsMx.Unlock()
return nil, errors.Wrap(err, "prove")
}
@ -109,8 +111,6 @@ func (e *DataClockConsensusEngine) prove(
zap.Int("successful", len(validTransactions.Requests)),
zap.Int("failed", len(invalidTransactions.Requests)),
)
e.stagedTransactions = &protobufs.TokenRequests{}
e.stagedTransactionsMx.Unlock()
outputState, err := app.MaterializeStateFromApplication()
if err != nil {
@ -317,7 +317,7 @@ func (e *DataClockConsensusEngine) sync(
syncTimeout = defaultSyncTimeout
}
for e.GetState() < consensus.EngineStateStopping {
for {
ctx, cancel := context.WithTimeout(e.ctx, syncTimeout)
response, err := client.GetDataFrame(
ctx,
@ -363,11 +363,10 @@ func (e *DataClockConsensusEngine) sync(
); err != nil {
return nil, errors.Wrap(err, "sync")
}
e.dataTimeReel.Insert(response.ClockFrame, true)
e.dataTimeReel.Insert(e.ctx, response.ClockFrame, true)
latest = response.ClockFrame
if latest.FrameNumber >= maxFrame {
return latest, nil
}
}
return latest, nil
}

View File

@ -116,6 +116,7 @@ type DataClockConsensusEngine struct {
engineMx sync.Mutex
dependencyMapMx sync.Mutex
stagedTransactions *protobufs.TokenRequests
stagedTransactionsSet map[string]struct{}
stagedTransactionsMx sync.Mutex
peerMapMx sync.RWMutex
peerAnnounceMapMx sync.Mutex
@ -256,9 +257,9 @@ func NewDataClockConsensusEngine(
masterTimeReel: masterTimeReel,
dataTimeReel: dataTimeReel,
peerInfoManager: peerInfoManager,
frameMessageProcessorCh: make(chan *pb.Message),
txMessageProcessorCh: make(chan *pb.Message),
infoMessageProcessorCh: make(chan *pb.Message),
frameMessageProcessorCh: make(chan *pb.Message, 65536),
txMessageProcessorCh: make(chan *pb.Message, 65536),
infoMessageProcessorCh: make(chan *pb.Message, 65536),
config: cfg,
preMidnightMint: map[string]struct{}{},
grpcRateLimiter: NewRateLimiter(
@ -367,16 +368,19 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
panic(err)
}
source := rand.New(rand.NewSource(rand.Int63()))
for e.GetState() < consensus.EngineStateStopping {
for {
// Use exponential backoff with jitter in order to avoid hammering the bootstrappers.
time.Sleep(
backoff.FullJitter(
baseDuration<<currentBackoff,
baseDuration,
baseDuration<<maxBackoff,
source,
),
duration := backoff.FullJitter(
baseDuration<<currentBackoff,
baseDuration,
baseDuration<<maxBackoff,
source,
)
select {
case <-e.ctx.Done():
return
case <-time.After(duration):
}
currentHead, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
@ -405,7 +409,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
if frame.FrameNumber-100 >= nextFrame.FrameNumber ||
nextFrame.FrameNumber == 0 {
time.Sleep(120 * time.Second)
select {
case <-e.ctx.Done():
return
case <-time.After(2 * time.Minute):
}
continue
}
@ -484,7 +492,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
thresholdBeforeConfirming--
}
time.Sleep(120 * time.Second)
select {
case <-e.ctx.Done():
return
case <-time.After(2 * time.Minute):
}
}
}()
@ -493,7 +505,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
go e.runFramePruning()
go func() {
time.Sleep(30 * time.Second)
select {
case <-e.ctx.Done():
return
case <-time.After(30 * time.Second):
}
e.logger.Info("checking for snapshots to play forward")
if err := e.downloadSnapshot(e.config.DB.Path, e.config.P2P.Network); err != nil {
e.logger.Debug("error downloading snapshot", zap.Error(err))
@ -547,6 +563,9 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
})
}
}
if len(actives) < 3 {
return []mt.DataBlock{}
}
output := make([]mt.DataBlock, len(actives))
e.logger.Info(
"creating data shard ring proof",

View File

@ -58,11 +58,13 @@ func (e *DataClockConsensusEngine) runFramePruning() {
return
}
e.logger.Info("frame pruning enabled, waiting for delay timeout expiry")
for {
select {
case <-e.ctx.Done():
return
case <-time.After(1 * time.Hour):
case <-time.After(1 * time.Minute):
head, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
@ -113,14 +115,18 @@ func (e *DataClockConsensusEngine) runSync() {
func (e *DataClockConsensusEngine) runLoop() {
dataFrameCh := e.dataTimeReel.NewFrameCh()
runOnce := true
for e.GetState() < consensus.EngineStateStopping {
for {
peerCount := e.pubSub.GetNetworkPeersCount()
if peerCount < e.minimumPeersRequired {
e.logger.Info(
"waiting for minimum peers",
zap.Int("peer_count", peerCount),
)
time.Sleep(1 * time.Second)
select {
case <-e.ctx.Done():
return
case <-time.After(1 * time.Second):
}
} else {
latestFrame, err := e.dataTimeReel.Head()
if err != nil {
@ -205,7 +211,7 @@ func (e *DataClockConsensusEngine) processFrame(
return dataFrame
}
e.dataTimeReel.Insert(nextFrame, true)
e.dataTimeReel.Insert(e.ctx, nextFrame, true)
return nextFrame
} else {
@ -288,7 +294,7 @@ func (e *DataClockConsensusEngine) processFrame(
outputs := e.PerformTimeProof(latestFrame, latestFrame.Difficulty, ring)
if outputs == nil || len(outputs) < 3 {
e.logger.Error("could not successfully build proof, reattempting")
e.logger.Info("workers not yet available for proving")
return latestFrame
}
modulo := len(outputs)

View File

@ -2,23 +2,27 @@ package data
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (e *DataClockConsensusEngine) runFrameMessageHandler() {
for e.GetState() < consensus.EngineStateStopping {
for {
select {
case <-e.ctx.Done():
return
case message := <-e.frameMessageProcessorCh:
e.logger.Debug("handling frame message")
msg := &protobufs.Message{}
@ -47,26 +51,26 @@ func (e *DataClockConsensusEngine) runFrameMessageHandler() {
continue
}
go func() {
switch any.TypeUrl {
case protobufs.ClockFrameType:
if err := e.handleClockFrameData(
message.From,
msg.Address,
any,
false,
); err != nil {
return
}
switch any.TypeUrl {
case protobufs.ClockFrameType:
if err := e.handleClockFrameData(
message.From,
msg.Address,
any,
false,
); err != nil {
e.logger.Debug("could not handle clock frame data", zap.Error(err))
}
}()
}
}
}
}
func (e *DataClockConsensusEngine) runTxMessageHandler() {
for e.GetState() < consensus.EngineStateStopping {
for {
select {
case <-e.ctx.Done():
return
case message := <-e.txMessageProcessorCh:
e.logger.Debug("handling tx message")
msg := &protobufs.Message{}
@ -95,9 +99,12 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() {
}
if e.frameProverTries[0].Contains(e.provingKeyAddress) {
wg := &sync.WaitGroup{}
for name := range e.executionEngines {
name := name
wg.Add(1)
go func() error {
defer wg.Done()
messages, err := e.executionEngines[name].ProcessMessage(
application.TOKEN_ADDRESS,
msg,
@ -123,18 +130,17 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() {
continue
}
e.logger.Debug(appMsg.TypeUrl)
switch appMsg.TypeUrl {
case protobufs.TokenRequestType:
t := &protobufs.TokenRequest{}
err := proto.Unmarshal(appMsg.Value, t)
if err != nil {
e.logger.Debug("could not unmarshal token request", zap.Error(err))
continue
}
if err := e.handleTokenRequest(t); err != nil {
continue
e.logger.Debug("could not handle token request", zap.Error(err))
}
}
}
@ -142,14 +148,17 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() {
return nil
}()
}
wg.Wait()
}
}
}
}
func (e *DataClockConsensusEngine) runInfoMessageHandler() {
for e.GetState() < consensus.EngineStateStopping {
for {
select {
case <-e.ctx.Done():
return
case message := <-e.infoMessageProcessorCh:
e.logger.Debug("handling info message")
msg := &protobufs.Message{}
@ -178,18 +187,16 @@ func (e *DataClockConsensusEngine) runInfoMessageHandler() {
continue
}
go func() {
switch any.TypeUrl {
case protobufs.DataPeerListAnnounceType:
if err := e.handleDataPeerListAnnounce(
message.From,
msg.Address,
any,
); err != nil {
return
}
switch any.TypeUrl {
case protobufs.DataPeerListAnnounceType:
if err := e.handleDataPeerListAnnounce(
message.From,
msg.Address,
any,
); err != nil {
e.logger.Debug("could not handle data peer list announce", zap.Error(err))
}
}()
}
}
}
}
@ -247,7 +254,7 @@ func (e *DataClockConsensusEngine) handleClockFrame(
}
if frame.FrameNumber > head.FrameNumber {
e.dataTimeReel.Insert(frame, false)
e.dataTimeReel.Insert(e.ctx, frame, false)
}
return nil
@ -349,116 +356,60 @@ func (e *DataClockConsensusEngine) handleDataPeerListAnnounce(
return nil
}
func TokenRequestIdentifiers(transition *protobufs.TokenRequest) []string {
switch t := transition.Request.(type) {
case *protobufs.TokenRequest_Transfer:
return []string{fmt.Sprintf("transfer-%x", t.Transfer.OfCoin.Address)}
case *protobufs.TokenRequest_Split:
return []string{fmt.Sprintf("split-%x", t.Split.OfCoin.Address)}
case *protobufs.TokenRequest_Merge:
identifiers := make([]string, len(t.Merge.Coins))
for i, coin := range t.Merge.Coins {
identifiers[i] = fmt.Sprintf("merge-%x", coin.Address)
}
return identifiers
case *protobufs.TokenRequest_Mint:
if len(t.Mint.Proofs) == 1 {
return []string{fmt.Sprintf("mint-%x", sha3.Sum512(t.Mint.Proofs[0]))}
}
// Large proofs are currently not deduplicated.
return nil
case *protobufs.TokenRequest_Announce:
identifiers := make([]string, len(t.Announce.GetPublicKeySignaturesEd448()))
for i, sig := range t.Announce.GetPublicKeySignaturesEd448() {
identifiers[i] = fmt.Sprintf("announce-%x", sig.PublicKey.KeyValue)
}
return identifiers
case *protobufs.TokenRequest_Join:
return []string{fmt.Sprintf("join-%x", t.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue)}
case *protobufs.TokenRequest_Leave:
return []string{fmt.Sprintf("leave-%x", t.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue)}
case *protobufs.TokenRequest_Pause:
return []string{fmt.Sprintf("pause-%x", t.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue)}
case *protobufs.TokenRequest_Resume:
return []string{fmt.Sprintf("resume-%x", t.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue)}
default:
panic("unhandled transition type")
}
}
func (e *DataClockConsensusEngine) handleTokenRequest(
transition *protobufs.TokenRequest,
) error {
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
identifiers := TokenRequestIdentifiers(transition)
e.stagedTransactionsMx.Lock()
if e.stagedTransactions == nil {
e.stagedTransactions = &protobufs.TokenRequests{}
e.stagedTransactionsSet = make(map[string]struct{})
}
found := false
for _, ti := range e.stagedTransactions.Requests {
switch t := ti.Request.(type) {
case *protobufs.TokenRequest_Transfer:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Transfer:
if bytes.Equal(r.Transfer.OfCoin.Address, t.Transfer.OfCoin.Address) {
found = true
}
}
case *protobufs.TokenRequest_Split:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Split:
if bytes.Equal(r.Split.OfCoin.Address, r.Split.OfCoin.Address) {
found = true
}
}
case *protobufs.TokenRequest_Merge:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Merge:
checkmerge:
for i := range t.Merge.Coins {
for j := range r.Merge.Coins {
if bytes.Equal(t.Merge.Coins[i].Address, r.Merge.Coins[j].Address) {
found = true
break checkmerge
}
}
}
}
case *protobufs.TokenRequest_Mint:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Mint:
checkmint:
for i := range t.Mint.Proofs {
if len(r.Mint.Proofs) < 2 {
for j := range r.Mint.Proofs {
if bytes.Equal(t.Mint.Proofs[i], r.Mint.Proofs[j]) {
found = true
break checkmint
}
}
}
}
}
case *protobufs.TokenRequest_Announce:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Announce:
checkannounce:
for i := range t.Announce.GetPublicKeySignaturesEd448() {
for j := range r.Announce.GetPublicKeySignaturesEd448() {
if bytes.Equal(
t.Announce.GetPublicKeySignaturesEd448()[i].PublicKey.KeyValue,
r.Announce.GetPublicKeySignaturesEd448()[j].PublicKey.KeyValue,
) {
found = true
break checkannounce
}
}
}
}
case *protobufs.TokenRequest_Join:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Join:
if bytes.Equal(
t.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue,
r.Join.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
found = true
}
}
case *protobufs.TokenRequest_Leave:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Leave:
if bytes.Equal(
t.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue,
r.Leave.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
found = true
}
}
case *protobufs.TokenRequest_Pause:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Pause:
if bytes.Equal(
t.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue,
r.Pause.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
found = true
}
}
case *protobufs.TokenRequest_Resume:
switch r := transition.Request.(type) {
case *protobufs.TokenRequest_Resume:
if bytes.Equal(
t.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue,
r.Resume.GetPublicKeySignatureEd448().PublicKey.KeyValue,
) {
found = true
}
}
var found bool
for _, identifier := range identifiers {
if _, ok := e.stagedTransactionsSet[identifier]; ok {
found = true
break
}
}
@ -467,6 +418,9 @@ func (e *DataClockConsensusEngine) handleTokenRequest(
e.stagedTransactions.Requests,
transition,
)
for _, identifier := range identifiers {
e.stagedTransactionsSet[identifier] = struct{}{}
}
}
e.stagedTransactionsMx.Unlock()
}

View File

@ -2,6 +2,7 @@ package master
import (
"bytes"
"context"
"encoding/binary"
"strings"
"time"
@ -154,7 +155,7 @@ func (e *MasterClockConsensusEngine) publishProof(
zap.Uint64("frame_number", frame.FrameNumber),
)
e.masterTimeReel.Insert(frame, false)
e.masterTimeReel.Insert(context.TODO(), frame, false)
}
e.state = consensus.EngineStateCollecting

View File

@ -2,6 +2,7 @@ package master
import (
"bytes"
"context"
gcrypto "crypto"
"encoding/hex"
"math/big"
@ -207,7 +208,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
continue
}
e.masterTimeReel.Insert(newFrame, false)
e.masterTimeReel.Insert(context.TODO(), newFrame, false)
}
}
}()

View File

@ -2,6 +2,7 @@ package time
import (
"bytes"
"context"
"encoding/hex"
"math/big"
"os"
@ -32,7 +33,9 @@ type pendingFrame struct {
type DataTimeReel struct {
rwMutex sync.RWMutex
running bool
ctx context.Context
cancel context.CancelFunc
filter []byte
engineConfig *config.EngineConfig
@ -61,7 +64,6 @@ type DataTimeReel struct {
frames chan *pendingFrame
newFrameCh chan *protobufs.ClockFrame
badFrameCh chan *protobufs.ClockFrame
done chan bool
alwaysSend bool
restore func() []*tries.RollingFrecencyCritbitTrie
}
@ -115,8 +117,10 @@ func NewDataTimeReel(
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
return &DataTimeReel{
running: false,
ctx: ctx,
cancel: cancel,
logger: logger,
filter: filter,
engineConfig: engineConfig,
@ -129,10 +133,9 @@ func NewDataTimeReel(
lruFrames: cache,
// pending: make(map[uint64][]*pendingFrame),
incompleteForks: make(map[uint64][]*pendingFrame),
frames: make(chan *pendingFrame),
frames: make(chan *pendingFrame, 65536),
newFrameCh: make(chan *protobufs.ClockFrame),
badFrameCh: make(chan *protobufs.ClockFrame),
done: make(chan bool),
alwaysSend: alwaysSend,
restore: restore,
}
@ -172,17 +175,12 @@ func (d *DataTimeReel) Start() error {
d.headDistance, err = d.GetDistance(frame)
}
d.running = true
go d.runLoop()
return nil
}
func (d *DataTimeReel) SetHead(frame *protobufs.ClockFrame) {
if d.running == true {
panic("internal test function should never be called outside of tests")
}
d.head = frame
}
@ -193,9 +191,9 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) {
// Insert enqueues a structurally valid frame into the time reel. If the frame
// is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel.
func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error {
if !d.running {
return nil
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error {
if err := d.ctx.Err(); err != nil {
return err
}
d.logger.Debug(
@ -222,13 +220,17 @@ func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error {
d.storePending(selector, parent, distance, frame)
if d.head.FrameNumber+1 == frame.FrameNumber {
go func() {
d.frames <- &pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frame.FrameNumber,
}
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-d.ctx.Done():
return d.ctx.Err()
case d.frames <- &pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frame.FrameNumber,
}:
}
}
}
@ -250,7 +252,7 @@ func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame {
}
func (d *DataTimeReel) Stop() {
d.done <- true
d.cancel()
}
func (d *DataTimeReel) createGenesisFrame() (
@ -336,8 +338,10 @@ func (d *DataTimeReel) createGenesisFrame() (
// Main data consensus loop
func (d *DataTimeReel) runLoop() {
for d.running {
for {
select {
case <-d.ctx.Done():
return
case frame := <-d.frames:
rawFrame, err := d.clockStore.GetStagedDataClockFrame(
d.filter,
@ -459,9 +463,6 @@ func (d *DataTimeReel) runLoop() {
// }
// }
}
case <-d.done:
d.running = false
return
}
}
}
@ -563,8 +564,7 @@ func (d *DataTimeReel) processPending(
for {
select {
case <-d.done:
d.running = false
case <-d.ctx.Done():
return
default:
}
@ -686,14 +686,19 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e
d.headDistance = distance
if d.alwaysSend {
d.newFrameCh <- frame
}
go func() {
select {
case <-d.ctx.Done():
return d.ctx.Err()
case d.newFrameCh <- frame:
}
} else {
select {
case <-d.ctx.Done():
return d.ctx.Err()
case d.newFrameCh <- frame:
default:
}
}()
}
return nil
}
@ -992,12 +997,11 @@ func (d *DataTimeReel) forkChoice(
d.totalDistance,
)
go func() {
select {
case d.newFrameCh <- frame:
default:
}
}()
select {
case <-d.ctx.Done():
case d.newFrameCh <- frame:
default:
}
}
func (d *DataTimeReel) GetTotalDistance() *big.Int {

View File

@ -2,6 +2,7 @@ package time_test
import (
"bytes"
"context"
"fmt"
"math/rand"
"strings"
@ -108,6 +109,7 @@ func generateTestProvers() (
}
func TestDataTimeReel(t *testing.T) {
ctx := context.Background()
logger, _ := zap.NewDevelopment()
db := store.NewInMemKVDB()
clockStore := store.NewPebbleClockStore(db, logger)
@ -231,7 +233,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(frame, false)
d.Insert(ctx, frame, false)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
}
@ -262,7 +264,7 @@ func TestDataTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := d.Insert(insertFrames[i], false)
err := d.Insert(ctx, insertFrames[i], false)
assert.NoError(t, err)
}
@ -284,7 +286,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(frame, false)
d.Insert(ctx, frame, false)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
@ -332,7 +334,7 @@ func TestDataTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := d.Insert(insertFrames[i], false)
err := d.Insert(ctx, insertFrames[i], false)
assert.NoError(t, err)
}
@ -395,7 +397,7 @@ func TestDataTimeReel(t *testing.T) {
// Someone is honest, but running backwards:
for i := 99; i >= 0; i-- {
err := d.Insert(insertFrames[i], false)
err := d.Insert(ctx, insertFrames[i], false)
gotime.Sleep(1 * gotime.Second)
assert.NoError(t, err)
}

View File

@ -1,6 +1,7 @@
package time
import (
"context"
"encoding/hex"
"errors"
"math/big"
@ -120,6 +121,7 @@ func (m *MasterTimeReel) Head() (*protobufs.ClockFrame, error) {
// is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel.
func (m *MasterTimeReel) Insert(
ctx context.Context,
frame *protobufs.ClockFrame,
isSync bool,
) error {

View File

@ -1,6 +1,7 @@
package time_test
import (
"context"
"strings"
"sync"
"testing"
@ -15,6 +16,7 @@ import (
)
func TestMasterTimeReel(t *testing.T) {
ctx := context.Background()
logger, _ := zap.NewProduction()
db := store.NewInMemKVDB()
clockStore := store.NewPebbleClockStore(db, logger)
@ -59,7 +61,7 @@ func TestMasterTimeReel(t *testing.T) {
)
assert.NoError(t, err)
err := m.Insert(frame, false)
err := m.Insert(ctx, frame, false)
assert.NoError(t, err)
}
@ -79,7 +81,7 @@ func TestMasterTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := m.Insert(insertFrames[i], false)
err := m.Insert(ctx, insertFrames[i], false)
assert.NoError(t, err)
}

View File

@ -1,13 +1,15 @@
package time
import (
"context"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
type TimeReel interface {
Start() error
Stop()
Insert(frame *protobufs.ClockFrame, isSync bool) error
Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error
Head() (*protobufs.ClockFrame, error)
NewFrameCh() <-chan *protobufs.ClockFrame
BadFrameCh() <-chan *protobufs.ClockFrame

View File

@ -2,6 +2,7 @@ package token
import (
"bytes"
"context"
"crypto"
"encoding/binary"
"encoding/hex"
@ -80,6 +81,8 @@ func (p PeerSeniorityItem) Priority() uint64 {
}
type TokenExecutionEngine struct {
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
clock *data.DataClockConsensusEngine
clockStore store.ClockStore
@ -205,7 +208,10 @@ func NewTokenExecutionEngine(
LoadAggregatedSeniorityMap(uint(cfg.P2P.Network))
}
ctx, cancel := context.WithCancel(context.Background())
e := &TokenExecutionEngine{
ctx: ctx,
cancel: cancel,
logger: logger,
engineConfig: cfg.Engine,
keyManager: keyManager,
@ -364,14 +370,19 @@ func NewTokenExecutionEngine(
}
// need to wait for peering
waitPeers:
for {
gotime.Sleep(30 * gotime.Second)
peerMap := e.pubSub.GetBitmaskPeers()
if peers, ok := peerMap[string(
append([]byte{0x00}, e.intrinsicFilter...),
)]; ok {
if len(peers) >= 3 {
break
select {
case <-e.ctx.Done():
return
case <-gotime.After(30 * gotime.Second):
peerMap := e.pubSub.GetBitmaskPeers()
if peers, ok := peerMap[string(
append([]byte{0x00}, e.intrinsicFilter...),
)]; ok {
if len(peers) >= 3 {
break waitPeers
}
}
}
}
@ -441,6 +452,8 @@ func (e *TokenExecutionEngine) Start() <-chan error {
// Stop implements ExecutionEngine
func (e *TokenExecutionEngine) Stop(force bool) <-chan error {
e.cancel()
errChan := make(chan error)
go func() {

View File

@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/prometheus/client_golang/prometheus"
blossomsub "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
)
const blossomSubNamespace = "blossomsub"
@ -29,6 +30,19 @@ type blossomSubRawTracer struct {
sendRPCTotal prometheus.Counter
dropRPCTotal prometheus.Counter
undeliverableMessageTotal *prometheus.CounterVec
iHaveMessageHistogram *prometheus.HistogramVec
iWantMessageHistogram *prometheus.HistogramVec
}
func (b *blossomSubRawTracer) observeControl(control *pb.ControlMessage, direction string) {
labels := []string{direction}
for _, iHave := range control.GetIhave() {
labels := append(labels, binaryEncoding.EncodeToString(iHave.GetBitmask()))
b.iHaveMessageHistogram.WithLabelValues(labels...).Observe(float64(len(iHave.GetMessageIDs())))
}
for _, iWant := range control.GetIwant() {
b.iWantMessageHistogram.WithLabelValues(labels...).Observe(float64(len(iWant.GetMessageIDs())))
}
}
var _ blossomsub.RawTracer = (*blossomSubRawTracer)(nil)
@ -91,16 +105,19 @@ func (b *blossomSubRawTracer) ThrottlePeer(p peer.ID) {
// RecvRPC implements blossomsub.RawTracer.
func (b *blossomSubRawTracer) RecvRPC(rpc *blossomsub.RPC) {
b.recvRPCTotal.Inc()
b.observeControl(rpc.GetControl(), "recv")
}
// SendRPC implements blossomsub.RawTracer.
func (b *blossomSubRawTracer) SendRPC(rpc *blossomsub.RPC, p peer.ID) {
b.sendRPCTotal.Inc()
b.observeControl(rpc.GetControl(), "send")
}
// DropRPC implements blossomsub.RawTracer.
func (b *blossomSubRawTracer) DropRPC(rpc *blossomsub.RPC, p peer.ID) {
b.dropRPCTotal.Inc()
b.observeControl(rpc.GetControl(), "drop")
}
// UndeliverableMessage implements blossomsub.RawTracer.
@ -127,6 +144,8 @@ func (b *blossomSubRawTracer) Describe(ch chan<- *prometheus.Desc) {
b.sendRPCTotal.Describe(ch)
b.dropRPCTotal.Describe(ch)
b.undeliverableMessageTotal.Describe(ch)
b.iHaveMessageHistogram.Describe(ch)
b.iWantMessageHistogram.Describe(ch)
}
// Collect implements prometheus.Collector.
@ -146,6 +165,8 @@ func (b *blossomSubRawTracer) Collect(ch chan<- prometheus.Metric) {
b.sendRPCTotal.Collect(ch)
b.dropRPCTotal.Collect(ch)
b.undeliverableMessageTotal.Collect(ch)
b.iHaveMessageHistogram.Collect(ch)
b.iWantMessageHistogram.Collect(ch)
}
type BlossomSubRawTracer interface {
@ -270,6 +291,24 @@ func NewBlossomSubRawTracer() BlossomSubRawTracer {
},
[]string{"bitmask"},
),
iHaveMessageHistogram: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: blossomSubNamespace,
Name: "ihave_messages",
Help: "Histogram of the number of messages in an IHave message.",
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
},
[]string{"direction", "bitmask"},
),
iWantMessageHistogram: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: blossomSubNamespace,
Name: "iwant_messages",
Help: "Histogram of the number of messages in an IWant message.",
Buckets: prometheus.ExponentialBuckets(1, 2, 14),
},
[]string{"direction"},
),
}
return b
}

View File

@ -1058,7 +1058,7 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange(
for i := fromFrameNumber; i < toFrameNumber; i++ {
frames, err := p.GetStagedDataClockFramesForFrameNumber(filter, i)
if err != nil {
return errors.Wrap(err, "delete data clock frame range")
continue
}
for _, frame := range frames {