feat: Pubsub.SeenMessagesStrategy (#9543)

* feat: expire messages from the cache based on last seen time
* docs: Pubsub.SeenMessagesStrategy

Ref. https://github.com/libp2p/go-libp2p-pubsub/pull/513

Co-authored-by: Marcin Rataj <lidel@lidel.org>
This commit is contained in:
Mohsin Zaidi 2023-01-26 18:24:35 -05:00 committed by galargh
parent abd6343e41
commit 2c623b663e
9 changed files with 191 additions and 53 deletions

View File

@ -1,5 +1,24 @@
package config
const (
// LastSeenMessagesStrategy is a strategy that calculates the TTL countdown
// based on the last time a Pubsub message is seen. This means that if a message
// is received and then seen again within the specified TTL window, it
// won't be emitted until the TTL countdown expires from the last time the
// message was seen.
LastSeenMessagesStrategy = "last-seen"
// FirstSeenMessagesStrategy is a strategy that calculates the TTL
// countdown based on the first time a Pubsub message is seen. This means that if
// a message is received and then seen again within the specified TTL
// window, it won't be emitted.
FirstSeenMessagesStrategy = "first-seen"
// DefaultSeenMessagesStrategy is the strategy that is used by default if
// no Pubsub.SeenMessagesStrategy is specified.
DefaultSeenMessagesStrategy = LastSeenMessagesStrategy
)
type PubsubConfig struct {
// Router can be either floodsub (legacy) or gossipsub (new and
// backwards compatible).
@ -12,7 +31,11 @@ type PubsubConfig struct {
// Enable pubsub (--enable-pubsub-experiment)
Enabled Flag `json:",omitempty"`
// SeenMessagesTTL configures the duration after which a previously seen
// message ID can be forgotten about.
// SeenMessagesTTL is a value that controls the time window within which
// duplicate messages will be identified and won't be emitted.
SeenMessagesTTL *OptionalDuration `json:",omitempty"`
// SeenMessagesStrategy is a setting that determines how the time-to-live
// (TTL) countdown for deduplicating messages is calculated.
SeenMessagesStrategy *OptionalString `json:",omitempty"`
}

View File

@ -11,6 +11,7 @@ import (
"github.com/ipfs/go-log"
"github.com/ipfs/kubo/config"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ipfs/kubo/core/node/libp2p"
@ -66,6 +67,18 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
pubsub.WithSeenMessagesTTL(cfg.Pubsub.SeenMessagesTTL.WithDefault(pubsub.TimeCacheDuration)),
)
var seenMessagesStrategy timecache.Strategy
configSeenMessagesStrategy := cfg.Pubsub.SeenMessagesStrategy.WithDefault(config.DefaultSeenMessagesStrategy)
switch configSeenMessagesStrategy {
case config.LastSeenMessagesStrategy:
seenMessagesStrategy = timecache.Strategy_LastSeen
case config.FirstSeenMessagesStrategy:
seenMessagesStrategy = timecache.Strategy_FirstSeen
default:
return fx.Error(fmt.Errorf("unsupported Pubsub.SeenMessagesStrategy %q", configSeenMessagesStrategy))
}
pubsubOptions = append(pubsubOptions, pubsub.WithSeenMessagesStrategy(seenMessagesStrategy))
switch cfg.Pubsub.Router {
case "":
fallthrough

View File

@ -1,5 +1,43 @@
# Kubo changelog v0.18
## v0.18.1
This release includes improvements around Pubsub message deduplication, and more.
<!-- TOC depthfrom:3 -->
- [Overview](#overview)
- [🔦 Highlights](#-highlights)
- [New default Pubsub.SeenMessagesStrategy](#new-default-pubsubseenmessagesstrategy)
- [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors)
<!-- /TOC -->
### 🔦 Highlights
#### New default `Pubsub.SeenMessagesStrategy`
A new optional [`Pubsub.SeenMessagesStrategy`](../config.md#pubsubseenmessagesstrategy) configuration option has been added.
This option allows you to choose between two different strategies for
deduplicating messages: `first-seen` and `last-seen`.
When unset, the default strategy is `last-seen`, which calculates the
time-to-live (TTL) countdown based on the last time a message is seen. This
means that if a message is received and then seen again within the specified
TTL window based on the last time it was seen, it won't be emitted.
If you prefer the old behavior, which calculates the TTL countdown based on the
first time a message is seen, you can set `Pubsub.SeenMessagesStrategy` to
`first-seen`.
### 📝 Changelog
### 👨‍👩‍👧‍👦 Contributors
## v0.18.0
### Overview

View File

@ -100,6 +100,7 @@ config file at runtime.
- [`Pubsub.Router`](#pubsubrouter)
- [`Pubsub.DisableSigning`](#pubsubdisablesigning)
- [`Pubsub.SeenMessagesTTL`](#pubsubseenmessagesttl)
- [`Pubsub.SeenMessagesStrategy`](#pubsubseenmessagesstrategy)
- [`Peering`](#peering)
- [`Peering.Peers`](#peeringpeers)
- [`Reprovider`](#reprovider)
@ -1206,8 +1207,8 @@ Type: `bool`
### `Pubsub.SeenMessagesTTL`
Configures the duration after which a previously seen Pubsub Message ID can be
forgotten about.
Controls the time window within which duplicate messages, identified by Message
ID, will be identified and won't be emitted again.
A smaller value for this parameter means that Pubsub messages in the cache will
be garbage collected sooner, which can result in a smaller cache. At the same
@ -1223,6 +1224,29 @@ Default: see `TimeCacheDuration` from [go-libp2p-pubsub](https://github.com/libp
Type: `optionalDuration`
### `Pubsub.SeenMessagesStrategy`
Determines how the time-to-live (TTL) countdown for deduplicating Pubsub
messages is calculated.
The Pubsub seen messages cache is a LRU cache that keeps messages for up to a
specified time duration. After this duration has elapsed, expired messages will
be purged from the cache.
The `last-seen` cache is a sliding-window cache. Every time a message is seen
again with the SeenMessagesTTL duration, its timestamp slides forward. This
keeps frequently occurring messages cached and prevents them from being
continually propagated, especially because of issues that might increase the
number of duplicate messages in the network.
The `first-seen` cache will store new messages and purge them after the
SeenMessagesTTL duration, even if they are seen multiple times within this
duration.
Default: `last-seen` (see [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub))
Type: `optionalString`
## `Peering`
Configures the peering subsystem. The peering subsystem configures Kubo to

View File

@ -38,6 +38,7 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
@ -122,7 +123,7 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.20.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.8.2 // indirect
github.com/libp2p/go-libp2p-pubsub v0.8.3 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.6.0 // indirect
@ -180,7 +181,6 @@ require (
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.7.0 // indirect

View File

@ -198,6 +198,8 @@ github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaB
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4=
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@ -773,8 +775,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I=
github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw=
github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww=
github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
@ -1284,8 +1286,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=

4
go.mod
View File

@ -75,7 +75,7 @@ require (
github.com/libp2p/go-libp2p-http v0.4.0
github.com/libp2p/go-libp2p-kad-dht v0.20.0
github.com/libp2p/go-libp2p-kbucket v0.5.0
github.com/libp2p/go-libp2p-pubsub v0.8.2
github.com/libp2p/go-libp2p-pubsub v0.8.3
github.com/libp2p/go-libp2p-pubsub-router v0.6.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-libp2p-routing-helpers v0.6.0
@ -133,6 +133,7 @@ require (
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
@ -223,7 +224,6 @@ require (
github.com/whyrusleeping/cbor-gen v0.0.0-20221220214510-0333c149dec0 // indirect
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect

8
go.sum
View File

@ -206,6 +206,8 @@ github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302 h1:QV0ZrfBLpFc2KDk+a4LJefDczXnonRwrYrQJY/9L4dA=
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302/go.mod h1:qBlWZqWeVx9BjvqBsnC/8RUlAYpIFmPvgROcw0n1scE=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@ -807,8 +809,8 @@ github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRj
github.com/libp2p/go-libp2p-peerstore v0.2.6/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-peerstore v0.2.7/go.mod h1:ss/TWTgHZTMpsU/oKVVPQCGuDHItOpf2W8RxAi50P2s=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.8.2 h1:QLGUmkgKmwEVxVDYGsqc5t9CykOMY2Y21cXQHjR462I=
github.com/libp2p/go-libp2p-pubsub v0.8.2/go.mod h1:e4kT+DYjzPUYGZeWk4I+oxCSYTXizzXii5LDRRhjKSw=
github.com/libp2p/go-libp2p-pubsub v0.8.3 h1:T4+pcfcFm1K2v5oFyk68peSjVroaoM8zFygf6Y5WOww=
github.com/libp2p/go-libp2p-pubsub v0.8.3/go.mod h1:eje970FXxjhtFbVEoiae+VUw24ZoSlk67BsiZPLRzlw=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 h1:D30iKdlqDt5ZmLEYhHELCMRj8b4sFAqrUcshIUvVP/s=
github.com/libp2p/go-libp2p-pubsub-router v0.6.0/go.mod h1:FY/q0/RBTKsLA7l4vqC2cbRbOvyDotg8PJQ7j8FDudE=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
@ -1343,8 +1345,6 @@ github.com/whyrusleeping/mdns v0.0.0-20180901202407-ef14215e6b30/go.mod h1:j4l84
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1:E9S12nwJwEOXe2d6gT6qxdvqMnNq+VnSsKPgm2ZZNds=
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=

View File

@ -22,6 +22,7 @@ import (
"github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/ipfs/kubo/core/mock"
@ -76,7 +77,6 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
var bootstrapNode, consumerNode, producerNode *core.IpfsNode
var bootstrapPeerID, consumerPeerID, producerPeerID peer.ID
sendDupMsg := false
mn := mocknet.New()
bootstrapNode, err := mockNode(ctx, mn, false, "") // no need for PubSub configuration
@ -98,6 +98,12 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
t.Fatal(err)
}
// Used for logging the timeline
startTime := time.Time{}
// Used for overriding the message ID
sendMsgID := ""
// Set up the pubsub message ID generation override for the producer
core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) {
var pubsubOptions []pubsub.Option
@ -105,19 +111,23 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
pubsubOptions,
pubsub.WithSeenMessagesTTL(ttl),
pubsub.WithMessageIdFn(func(pmsg *pubsub_pb.Message) string {
now := time.Now().Format(time.StampMilli)
now := time.Now()
if startTime.Second() == 0 {
startTime = now
}
timeElapsed := now.Sub(startTime).Seconds()
msg := string(pmsg.Data)
var msgID string
from, _ := peer.IDFromBytes(pmsg.From)
if (from == producerPeerID) && sendDupMsg {
msgID = "DupMsg"
t.Logf("sending [%s] with duplicate message ID at [%s]", msg, now)
var msgID string
if from == producerPeerID {
msgID = sendMsgID
t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgID, timeElapsed)
} else {
msgID = pubsub.DefaultMsgIdFn(pmsg)
t.Logf("sending [%s] with unique message ID at [%s]", msg, now)
}
return msgID
}),
pubsub.WithSeenMessagesStrategy(timecache.Strategy_LastSeen),
)
return append(
info.FXOptions,
@ -165,8 +175,8 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
t.Fatal(err)
}
// Utility functions defined inline to include context in closure
now := func() string {
return time.Now().Format(time.StampMilli)
now := func() float64 {
return time.Since(startTime).Seconds()
}
ctr := 0
msgGen := func() string {
@ -188,57 +198,87 @@ func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error
msg, err := consumerSubscription.Next(rxCtx)
if shouldFind {
if err != nil {
t.Logf("did not receive [%s] by [%s]", msgTxt, now())
t.Logf("expected but did not receive [%s] at T%fs", msgTxt, now())
t.Fatal(err)
}
t.Logf("received [%s] at [%s]", string(msg.Data()), now())
t.Logf("received [%s] at T%fs", string(msg.Data()), now())
if !bytes.Equal(msg.Data(), []byte(msgTxt)) {
t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), msgTxt)
}
} else {
if err == nil {
t.Logf("received [%s] at [%s]", string(msg.Data()), now())
t.Logf("not expected but received [%s] at T%fs", string(msg.Data()), now())
t.Fail()
}
t.Logf("did not receive [%s] by [%s]", msgTxt, now())
t.Logf("did not receive [%s] at T%fs", msgTxt, now())
}
}
// Send message 1 with the message ID we're going to duplicate later
sendDupMsg = true
const MsgID1 = "MsgID1"
const MsgID2 = "MsgID2"
const MsgID3 = "MsgID3"
// Send message 1 with the message ID we're going to duplicate
sentMsg1 := time.Now()
sendMsgID = MsgID1
msgTxt := produceMessage()
consumeMessage(msgTxt, true) // should find message
// Should find the message because it's new
consumeMessage(msgTxt, true)
// Send message 2 with the same message ID as before
sendDupMsg = true
// Send message 2 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
consumeMessage(msgTxt, false) // should NOT find message, because it got deduplicated (sent twice within the SeenMessagesTTL window)
// Wait for seen cache TTL time to let seen cache entries time out
time.Sleep(ttl)
// Should NOT find message because it got deduplicated (sent 2 times within the SeenMessagesTTL window).
consumeMessage(msgTxt, false)
// Send message 3 with a new message ID
//
// This extra step is necessary for testing the cache TTL because the PubSub code only garbage collects when a
// message ID was not already present in the cache. This means that message 2's cache entry, even though it has
// technically timed out, will still cause the message to be considered duplicate. When a message with a different
// ID passes through, it will be added to the cache and garbage collection will clean up message 2's entry. This is
// another bug in the pubsub/cache implementation that will be fixed once the code is refactored for this issue:
// https://github.com/libp2p/go-libp2p-pubsub/issues/502
sendDupMsg = false
sendMsgID = MsgID2
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message
// Should find the message because it's new
consumeMessage(msgTxt, true)
// Send message 4 with the same message ID as before
sendDupMsg = true
// Wait till just before the SeenMessagesTTL window has passed since message 1 was sent
time.Sleep(time.Until(sentMsg1.Add(ttl - 100*time.Millisecond)))
// Send message 4 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message again (time since the last read > SeenMessagesTTL, so it looks like a new message).
// Should NOT find the message because it got deduplicated (sent 3 times within the SeenMessagesTTL window). This
// time, however, the expiration for the message should also get pushed out for a whole SeenMessagesTTL window since
// the default time cache now implements a sliding window algorithm.
consumeMessage(msgTxt, false)
// Send message 5 with a new message ID
// Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding
// a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window
// starting at message 1 has expired.
sentMsg5 := time.Now()
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should NOT find the message, because it got deduplicated (sent 2 times since the updated SeenMessagesTTL window
// started). This time again, the expiration should get pushed out for another SeenMessagesTTL window.
consumeMessage(msgTxt, false)
// Send message 6 with a message ID that hasn't been seen within a SeenMessagesTTL window
sendMsgID = MsgID2
msgTxt = produceMessage()
// Should find the message since last read > SeenMessagesTTL, so it looks like a new message.
consumeMessage(msgTxt, true)
// Sleep for a full SeenMessagesTTL window to let cache entries time out
time.Sleep(time.Until(sentMsg5.Add(ttl + 100*time.Millisecond)))
// Send message 7 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should find the message this time since last read > SeenMessagesTTL, so it looks like a new message.
consumeMessage(msgTxt, true)
// Send message 8 with a brand new message ID
//
// This step is not strictly necessary, but has been added for good measure.
sendDupMsg = false
sendMsgID = MsgID3
msgTxt = produceMessage()
consumeMessage(msgTxt, true) // should find message
// Should find the message because it's new
consumeMessage(msgTxt, true)
return nil
}