feat(pubsub): persistent validation and diagnostic commands (#11110)
Some checks are pending
CodeQL / codeql (push) Waiting to run
Docker Check / lint (push) Waiting to run
Docker Check / build (push) Waiting to run
Gateway Conformance / gateway-conformance (push) Waiting to run
Gateway Conformance / gateway-conformance-libp2p-experiment (push) Waiting to run
Go Build / go-build (push) Waiting to run
Go Check / go-check (push) Waiting to run
Go Lint / go-lint (push) Waiting to run
Go Test / unit-tests (push) Waiting to run
Go Test / cli-tests (push) Waiting to run
Go Test / example-tests (push) Waiting to run
Interop / interop-prep (push) Waiting to run
Interop / helia-interop (push) Blocked by required conditions
Interop / ipfs-webui (push) Blocked by required conditions
Sharness / sharness-test (push) Waiting to run
Spell Check / spellcheck (push) Waiting to run

* feat(pubsub): persistent seqno validation and diagnostic commands

- upgrade go-libp2p-pubsub to v0.15.0
- add persistent seqno validator using BasicSeqnoValidator
  stores max seen seqno per peer at /pubsub/seqno/<peerid>
  survives daemon restarts, addresses message cycling in large networks (#9665)
- add `ipfs pubsub reset` command to clear validator state
- add `ipfs diag datastore get/count` commands for datastore inspection
  requires daemon to be stopped, useful for debugging
- change pubsub status from Deprecated to Experimental
- add CLI tests for pubsub and diag datastore commands
- remove flaky pubsub_msg_seen_cache_test.go (replaced by CLI tests)

* fix(pubsub): improve reset command and add deprecation warnings

- use batched delete for efficient bulk reset
- check key existence before reporting deleted count
- sync datastore after deletions to ensure persistence
- show "no validator state found" when resetting non-existent peer
- log deprecation warnings when using --enable-pubsub-experiment
  or --enable-namesys-pubsub CLI flags

* refactor(test): add datastore helpers to test harness

---------

Co-authored-by: Andrew Gillis <11790789+gammazero@users.noreply.github.com>
This commit is contained in:
Marcin Rataj 2026-01-16 00:27:09 +01:00 committed by GitHub
parent edb7056747
commit 824a47ae11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1219 additions and 375 deletions

View File

@ -181,8 +181,8 @@ Headers.
cmds.BoolOption(enableGCKwd, "Enable automatic periodic repo garbage collection"),
cmds.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").WithDefault(true),
cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmds.BoolOption(enablePubSubKwd, "DEPRECATED"),
cmds.BoolOption(enableIPNSPubSubKwd, "Enable IPNS over pubsub. Implicitly enables pubsub, overrides Ipns.UsePubsub config."),
cmds.BoolOption(enablePubSubKwd, "DEPRECATED CLI flag. Use Pubsub.Enabled config instead."),
cmds.BoolOption(enableIPNSPubSubKwd, "DEPRECATED CLI flag. Use Ipns.UsePubsub config instead."),
cmds.BoolOption(enableMultiplexKwd, "DEPRECATED"),
cmds.StringOption(agentVersionSuffix, "Optional suffix to the AgentVersion presented by `ipfs id` and exposed via libp2p identify protocol."),
@ -397,10 +397,14 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
fmt.Printf("PeerID: %s\n", cfg.Identity.PeerID)
if !psSet {
if psSet {
log.Error("The --enable-pubsub-experiment flag is deprecated. Use Pubsub.Enabled config option instead.")
} else {
pubsub = cfg.Pubsub.Enabled.WithDefault(false)
}
if !ipnsPsSet {
if ipnsPsSet {
log.Error("The --enable-namesys-pubsub flag is deprecated. Use Ipns.UsePubsub config option instead.")
} else {
ipnsps = cfg.Ipns.UsePubsub.WithDefault(false)
}

View File

@ -76,6 +76,9 @@ func TestCommands(t *testing.T) {
"/diag/cmds",
"/diag/cmds/clear",
"/diag/cmds/set-time",
"/diag/datastore",
"/diag/datastore/count",
"/diag/datastore/get",
"/diag/profile",
"/diag/sys",
"/files",
@ -170,6 +173,7 @@ func TestCommands(t *testing.T) {
"/pubsub/ls",
"/pubsub/peers",
"/pubsub/pub",
"/pubsub/reset",
"/pubsub/sub",
"/refs",
"/refs/local",

View File

@ -1,7 +1,16 @@
package commands
import (
"encoding/hex"
"errors"
"fmt"
"io"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cmds "github.com/ipfs/go-ipfs-cmds"
oldcmds "github.com/ipfs/kubo/commands"
fsrepo "github.com/ipfs/kubo/repo/fsrepo"
)
var DiagCmd = &cmds.Command{
@ -10,8 +19,182 @@ var DiagCmd = &cmds.Command{
},
Subcommands: map[string]*cmds.Command{
"sys": sysDiagCmd,
"cmds": ActiveReqsCmd,
"profile": sysProfileCmd,
"sys": sysDiagCmd,
"cmds": ActiveReqsCmd,
"profile": sysProfileCmd,
"datastore": diagDatastoreCmd,
},
}
var diagDatastoreCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Low-level datastore inspection for debugging and testing.",
ShortDescription: `
'ipfs diag datastore' provides low-level access to the datastore for debugging
and testing purposes.
WARNING: FOR DEBUGGING/TESTING ONLY
These commands expose internal datastore details and should not be used
in production workflows. The datastore format may change between versions.
The daemon must not be running when calling these commands.
EXAMPLE
Inspecting pubsub seqno validator state:
$ ipfs diag datastore count /pubsub/seqno/
2
$ ipfs diag datastore get --hex /pubsub/seqno/12D3KooW...
Key: /pubsub/seqno/12D3KooW...
Hex Dump:
00000000 18 81 81 c8 91 c0 ea f6 |........|
`,
},
Subcommands: map[string]*cmds.Command{
"get": diagDatastoreGetCmd,
"count": diagDatastoreCountCmd,
},
}
const diagDatastoreHexOptionName = "hex"
type diagDatastoreGetResult struct {
Key string `json:"key"`
Value []byte `json:"value"`
HexDump string `json:"hex_dump,omitempty"`
}
var diagDatastoreGetCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Read a raw key from the datastore.",
ShortDescription: `
Returns the value stored at the given datastore key.
Default output is raw bytes. Use --hex for human-readable hex dump.
The daemon must not be running when using this command.
WARNING: FOR DEBUGGING/TESTING ONLY
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("key", true, false, "Datastore key to read (e.g., /pubsub/seqno/<peerid>)"),
},
Options: []cmds.Option{
cmds.BoolOption(diagDatastoreHexOptionName, "Output hex dump instead of raw bytes"),
},
NoRemote: true,
PreRun: DaemonNotRunning,
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
cctx := env.(*oldcmds.Context)
repo, err := fsrepo.Open(cctx.ConfigRoot)
if err != nil {
return fmt.Errorf("failed to open repo: %w", err)
}
defer repo.Close()
keyStr := req.Arguments[0]
key := datastore.NewKey(keyStr)
ds := repo.Datastore()
val, err := ds.Get(req.Context, key)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
return fmt.Errorf("key not found: %s", keyStr)
}
return fmt.Errorf("failed to read key: %w", err)
}
result := &diagDatastoreGetResult{
Key: keyStr,
Value: val,
}
if hexDump, _ := req.Options[diagDatastoreHexOptionName].(bool); hexDump {
result.HexDump = hex.Dump(val)
}
return cmds.EmitOnce(res, result)
},
Type: diagDatastoreGetResult{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, result *diagDatastoreGetResult) error {
if result.HexDump != "" {
fmt.Fprintf(w, "Key: %s\nHex Dump:\n%s", result.Key, result.HexDump)
return nil
}
// Raw bytes output
_, err := w.Write(result.Value)
return err
}),
},
}
type diagDatastoreCountResult struct {
Prefix string `json:"prefix"`
Count int64 `json:"count"`
}
var diagDatastoreCountCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Count entries matching a datastore prefix.",
ShortDescription: `
Counts the number of datastore entries whose keys start with the given prefix.
The daemon must not be running when using this command.
WARNING: FOR DEBUGGING/TESTING ONLY
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("prefix", true, false, "Datastore key prefix (e.g., /pubsub/seqno/)"),
},
NoRemote: true,
PreRun: DaemonNotRunning,
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
cctx := env.(*oldcmds.Context)
repo, err := fsrepo.Open(cctx.ConfigRoot)
if err != nil {
return fmt.Errorf("failed to open repo: %w", err)
}
defer repo.Close()
prefix := req.Arguments[0]
ds := repo.Datastore()
q := query.Query{
Prefix: prefix,
KeysOnly: true,
}
results, err := ds.Query(req.Context, q)
if err != nil {
return fmt.Errorf("failed to query datastore: %w", err)
}
defer results.Close()
var count int64
for result := range results.Next() {
if result.Error != nil {
return fmt.Errorf("query error: %w", result.Error)
}
count++
}
return cmds.EmitOnce(res, &diagDatastoreCountResult{
Prefix: prefix,
Count: count,
})
},
Type: diagDatastoreCountResult{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, result *diagDatastoreCountResult) error {
_, err := fmt.Fprintf(w, "%d\n", result.Count)
return err
}),
},
}

View File

@ -8,26 +8,35 @@ import (
"net/http"
"slices"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
mbase "github.com/multiformats/go-multibase"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cmds "github.com/ipfs/go-ipfs-cmds"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
options "github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/core/node/libp2p"
"github.com/libp2p/go-libp2p/core/peer"
mbase "github.com/multiformats/go-multibase"
)
var PubsubCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "An experimental publish-subscribe system on ipfs.",
ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
The default message validator is designed for IPNS record protocol.
For custom pubsub applications requiring different validation logic,
use go-libp2p-pubsub (https://github.com/libp2p/go-libp2p-pubsub)
directly in a dedicated binary.
To enable, set 'Pubsub.Enabled' config to true.
`,
},
Subcommands: map[string]*cmds.Command{
@ -35,6 +44,7 @@ DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
"sub": PubsubSubCmd,
"ls": PubsubLsCmd,
"peers": PubsubPeersCmd,
"reset": PubsubResetCmd,
},
}
@ -46,17 +56,18 @@ type pubsubMessage struct {
}
var PubsubSubCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Subscribe to messages on a given topic.",
ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
To enable, set 'Pubsub.Enabled' config to true.
PEER ENCODING
@ -145,18 +156,19 @@ TOPIC AND DATA ENCODING
}
var PubsubPubCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Publish data to a given pubsub topic.",
ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.
It reads binary data from stdin or a file.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
To enable, set 'Pubsub.Enabled' config to true.
HTTP RPC ENCODING
@ -201,17 +213,18 @@ HTTP RPC ENCODING
}
var PubsubLsCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "List subscribed topics by name.",
ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
To enable, set 'Pubsub.Enabled' config to true.
TOPIC ENCODING
@ -273,7 +286,7 @@ func safeTextListEncoder(req *cmds.Request, w io.Writer, list *stringList) error
}
var PubsubPeersCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "List peers we are currently pubsubbing with.",
ShortDescription: `
@ -281,11 +294,12 @@ ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected peers who are
subscribed to the named topic.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
To enable, set 'Pubsub.Enabled' config to true.
TOPIC AND DATA ENCODING
@ -367,3 +381,122 @@ func urlArgsDecoder(req *cmds.Request, env cmds.Environment) error {
}
return nil
}
type pubsubResetResult struct {
Deleted int64 `json:"deleted"`
}
var PubsubResetCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Reset pubsub validator state.",
ShortDescription: `
Clears persistent sequence number state used by the pubsub validator.
WARNING: FOR TESTING ONLY - DO NOT USE IN PRODUCTION
Resets validator state that protects against replay attacks. After reset,
previously seen messages may be accepted again until their sequence numbers
are re-learned.
Use cases:
- Testing pubsub functionality
- Recovery from a peer sending artificially high sequence numbers
(which would cause subsequent messages from that peer to be rejected)
The --peer flag limits the reset to a specific peer's state.
Without --peer, all validator state is cleared.
NOTE: This only resets the persistent seqno validator state. The in-memory
seen messages cache (Pubsub.SeenMessagesTTL) auto-expires and can only be
fully cleared by restarting the daemon.
`,
},
Options: []cmds.Option{
cmds.StringOption(peerOptionName, "p", "Only reset state for this peer ID"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
return err
}
ds := n.Repo.Datastore()
ctx := req.Context
peerOpt, _ := req.Options[peerOptionName].(string)
var deleted int64
if peerOpt != "" {
// Reset specific peer
pid, err := peer.Decode(peerOpt)
if err != nil {
return fmt.Errorf("invalid peer ID: %w", err)
}
key := datastore.NewKey(libp2p.SeqnoStorePrefix + pid.String())
exists, err := ds.Has(ctx, key)
if err != nil {
return fmt.Errorf("failed to check seqno state: %w", err)
}
if exists {
if err := ds.Delete(ctx, key); err != nil {
return fmt.Errorf("failed to delete seqno state: %w", err)
}
deleted = 1
}
} else {
// Reset all peers using batched delete for efficiency
q := query.Query{
Prefix: libp2p.SeqnoStorePrefix,
KeysOnly: true,
}
results, err := ds.Query(ctx, q)
if err != nil {
return fmt.Errorf("failed to query seqno state: %w", err)
}
defer results.Close()
batch, err := ds.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create batch: %w", err)
}
for result := range results.Next() {
if result.Error != nil {
return fmt.Errorf("query error: %w", result.Error)
}
if err := batch.Delete(ctx, datastore.NewKey(result.Key)); err != nil {
return fmt.Errorf("failed to batch delete key %s: %w", result.Key, err)
}
deleted++
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch delete: %w", err)
}
}
// Sync to ensure deletions are persisted
if err := ds.Sync(ctx, datastore.NewKey(libp2p.SeqnoStorePrefix)); err != nil {
return fmt.Errorf("failed to sync datastore: %w", err)
}
return cmds.EmitOnce(res, &pubsubResetResult{Deleted: deleted})
},
Type: pubsubResetResult{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, result *pubsubResetResult) error {
peerOpt, _ := req.Options[peerOptionName].(string)
if peerOpt != "" {
if result.Deleted == 0 {
_, err := fmt.Fprintf(w, "No validator state found for peer %s\n", peerOpt)
return err
}
_, err := fmt.Fprintf(w, "Reset validator state for peer %s\n", peerOpt)
return err
}
_, err := fmt.Fprintf(w, "Reset validator state for %d peer(s)\n", result.Deleted)
return err
}),
},
}

View File

@ -1,26 +1,85 @@
package libp2p
import (
"context"
"errors"
"log/slog"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/fx"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
)
type pubsubParams struct {
fx.In
Repo repo.Repo
Host host.Host
Discovery discovery.Discovery
}
func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params pubsubParams) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(
helpers.LifecycleCtx(mctx, lc),
params.Host,
append(pubsubOptions,
pubsub.WithDiscovery(params.Discovery),
pubsub.WithDefaultValidator(newSeqnoValidator(params.Repo.Datastore())))...,
)
}
}
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(
pubsubOptions,
pubsub.WithDiscovery(disc),
pubsub.WithFloodPublish(true))...,
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params pubsubParams) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(
helpers.LifecycleCtx(mctx, lc),
params.Host,
append(pubsubOptions,
pubsub.WithDiscovery(params.Discovery),
pubsub.WithFloodPublish(true), // flood own publications to all peers for reliable IPNS delivery
pubsub.WithDefaultValidator(newSeqnoValidator(params.Repo.Datastore())))...,
)
}
}
func newSeqnoValidator(ds datastore.Datastore) pubsub.ValidatorEx {
return pubsub.NewBasicSeqnoValidator(&seqnoStore{ds: ds}, slog.New(logging.SlogHandler()).With("logger", "pubsub"))
}
// SeqnoStorePrefix is the datastore prefix for pubsub seqno validator state.
const SeqnoStorePrefix = "/pubsub/seqno/"
// seqnoStore implements pubsub.PeerMetadataStore using the repo datastore.
// It stores the maximum seen sequence number per peer to prevent message
// cycles when network diameter exceeds the timecache span.
type seqnoStore struct {
ds datastore.Datastore
}
var _ pubsub.PeerMetadataStore = (*seqnoStore)(nil)
// Get returns the stored seqno for a peer, or (nil, nil) if the peer is unknown.
// Returning (nil, nil) for unknown peers allows BasicSeqnoValidator to accept
// the first message from any peer.
func (s *seqnoStore) Get(ctx context.Context, p peer.ID) ([]byte, error) {
key := datastore.NewKey(SeqnoStorePrefix + p.String())
val, err := s.ds.Get(ctx, key)
if errors.Is(err, datastore.ErrNotFound) {
return nil, nil
}
return val, err
}
// Put stores the seqno for a peer.
func (s *seqnoStore) Put(ctx context.Context, p peer.ID, val []byte) error {
key := datastore.NewKey(SeqnoStorePrefix + p.String())
return s.ds.Put(ctx, key, val)
}

View File

@ -0,0 +1,130 @@
package libp2p
import (
"encoding/binary"
"testing"
"github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
// TestSeqnoStore tests the seqnoStore implementation which backs the
// BasicSeqnoValidator. The validator prevents message cycles when network
// diameter exceeds the timecache span by tracking the maximum sequence number
// seen from each peer.
func TestSeqnoStore(t *testing.T) {
ctx := t.Context()
ds := syncds.MutexWrap(datastore.NewMapDatastore())
store := &seqnoStore{ds: ds}
peerA, err := peer.Decode("12D3KooWGC6TvWhfapngX6wvJHMYvKpDMXPb3ZnCZ6dMoaMtimQ5")
require.NoError(t, err)
peerB, err := peer.Decode("12D3KooWJRqDKTRjvXeGdUEgwkHNsoghYMBUagNYgLPdA4mqdTeo")
require.NoError(t, err)
// BasicSeqnoValidator expects Get to return (nil, nil) for unknown peers,
// not an error. This allows the validator to accept the first message from
// any peer without special-casing.
t.Run("unknown peer returns nil without error", func(t *testing.T) {
val, err := store.Get(ctx, peerA)
require.NoError(t, err)
require.Nil(t, val, "unknown peer should return nil, not empty slice")
})
// Verify basic store/retrieve functionality with a sequence number encoded
// as big-endian uint64, matching the format used by BasicSeqnoValidator.
t.Run("stores and retrieves seqno", func(t *testing.T) {
seqno := uint64(12345)
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, seqno)
err := store.Put(ctx, peerA, data)
require.NoError(t, err)
val, err := store.Get(ctx, peerA)
require.NoError(t, err)
require.Equal(t, seqno, binary.BigEndian.Uint64(val))
})
// Each peer must have isolated storage. If peer data leaked between peers,
// the validator would incorrectly reject valid messages or accept replays.
t.Run("isolates seqno per peer", func(t *testing.T) {
seqnoA := uint64(100)
seqnoB := uint64(200)
dataA := make([]byte, 8)
dataB := make([]byte, 8)
binary.BigEndian.PutUint64(dataA, seqnoA)
binary.BigEndian.PutUint64(dataB, seqnoB)
err := store.Put(ctx, peerA, dataA)
require.NoError(t, err)
err = store.Put(ctx, peerB, dataB)
require.NoError(t, err)
valA, err := store.Get(ctx, peerA)
require.NoError(t, err)
require.Equal(t, seqnoA, binary.BigEndian.Uint64(valA))
valB, err := store.Get(ctx, peerB)
require.NoError(t, err)
require.Equal(t, seqnoB, binary.BigEndian.Uint64(valB))
})
// The validator updates the stored seqno when accepting messages with
// higher seqnos. This test verifies that updates work correctly.
t.Run("updates seqno to higher value", func(t *testing.T) {
seqno1 := uint64(1000)
seqno2 := uint64(2000)
data1 := make([]byte, 8)
data2 := make([]byte, 8)
binary.BigEndian.PutUint64(data1, seqno1)
binary.BigEndian.PutUint64(data2, seqno2)
err := store.Put(ctx, peerA, data1)
require.NoError(t, err)
err = store.Put(ctx, peerA, data2)
require.NoError(t, err)
val, err := store.Get(ctx, peerA)
require.NoError(t, err)
require.Equal(t, seqno2, binary.BigEndian.Uint64(val))
})
// Verify the datastore key format. This is important for:
// 1. Debugging: operators can inspect/clear pubsub state
// 2. Migrations: future changes need to know the key format
t.Run("uses expected datastore key format", func(t *testing.T) {
seqno := uint64(42)
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, seqno)
err := store.Put(ctx, peerA, data)
require.NoError(t, err)
// Verify we can read directly from datastore with expected key
expectedKey := datastore.NewKey("/pubsub/seqno/" + peerA.String())
val, err := ds.Get(ctx, expectedKey)
require.NoError(t, err)
require.Equal(t, seqno, binary.BigEndian.Uint64(val))
})
// Verify data persists when creating a new store instance with the same
// underlying datastore. This simulates node restart.
t.Run("persists across store instances", func(t *testing.T) {
seqno := uint64(99999)
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, seqno)
err := store.Put(ctx, peerB, data)
require.NoError(t, err)
// Create new store instance with same datastore
store2 := &seqnoStore{ds: ds}
val, err := store2.Get(ctx, peerB)
require.NoError(t, err)
require.Equal(t, seqno, binary.BigEndian.Uint64(val))
})
}

View File

@ -13,6 +13,8 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team.
- [🧹 Automatic cleanup of interrupted imports](#-automatic-cleanup-of-interrupted-imports)
- [Routing V1 HTTP API now exposed by default](#routing-v1-http-api-now-exposed-by-default)
- [Track total size when adding pins](#track-total-size-when-adding-pins)
- [Improved IPNS over PubSub validation](#improved-ipns-over-pubsub-validation)
- [New `ipfs diag datastore` commands](#new-ipfs-diag-datastore-commands)
- [🚇 Improved `ipfs p2p` tunnels with foreground mode](#-improved-ipfs-p2p-tunnels-with-foreground-mode)
- [Improved `ipfs dag stat` output](#improved-ipfs-dag-stat-output)
- [Skip bad keys when listing](#skip_bad_keys_when_listing)
@ -49,6 +51,23 @@ Example output:
Fetched/Processed 336 nodes (83 MB)
```
#### Improved IPNS over PubSub validation
[IPNS over PubSub](https://specs.ipfs.tech/ipns/ipns-pubsub-router/) implementation in Kubo is now more reliable. Duplicate messages are rejected even in large networks where messages may cycle back after the in-memory cache expires.
Kubo now persists the maximum seen sequence number per peer to the datastore ([go-libp2p-pubsub#BasicSeqnoValidator](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#BasicSeqnoValidator)), providing stronger duplicate detection that survives node restarts. This addresses message flooding issues reported in [#9665](https://github.com/ipfs/kubo/issues/9665).
Kubo's pubsub is optimized for IPNS use case. For custom pubsub applications requiring different validation logic, use [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub) directly in a dedicated binary.
#### New `ipfs diag datastore` commands
New experimental commands for low-level datastore inspection:
- `ipfs diag datastore get <key>` - Read raw value at a datastore key (use `--hex` for hex dump)
- `ipfs diag datastore count <prefix>` - Count entries matching a datastore prefix
The daemon must not be running when using these commands. Run `ipfs diag datastore --help` for usage examples.
#### 🚇 Improved `ipfs p2p` tunnels with foreground mode
P2P tunnels can now run like SSH port forwarding: start a tunnel, use it, and it cleans up automatically when you're done.

View File

@ -146,6 +146,8 @@ config file at runtime.
- [`Provider.Strategy`](#providerstrategy)
- [`Provider.WorkerCount`](#providerworkercount)
- [`Pubsub`](#pubsub)
- [When to use a dedicated pubsub node](#when-to-use-a-dedicated-pubsub-node)
- [Message deduplication](#message-deduplication)
- [`Pubsub.Enabled`](#pubsubenabled)
- [`Pubsub.Router`](#pubsubrouter)
- [`Pubsub.DisableSigning`](#pubsubdisablesigning)
@ -1787,7 +1789,7 @@ Type: `optionalDuration`
### `Ipns.UsePubsub`
Enables IPFS over pubsub experiment for publishing IPNS records in real time.
Enables [IPNS over PubSub](https://specs.ipfs.tech/ipns/ipns-pubsub-router/) for publishing and resolving IPNS records in real time.
**EXPERIMENTAL:** read about current limitations at [experimental-features.md#ipns-pubsub](./experimental-features.md#ipns-pubsub).
@ -2405,16 +2407,56 @@ Replaced with [`Provide.DHT.MaxWorkers`](#providedhtmaxworkers).
## `Pubsub`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Pubsub configures Kubo's opt-in, opinionated [libp2p pubsub](https://docs.libp2p.io/concepts/pubsub/overview/) instance.
To enable, set `Pubsub.Enabled` to `true`.
Pubsub configures the `ipfs pubsub` subsystem. To use, it must be enabled by
passing the `--enable-pubsub-experiment` flag to the daemon
or via the `Pubsub.Enabled` flag below.
**EXPERIMENTAL:** This is an opt-in feature. Its primary use case is
[IPNS over PubSub](https://specs.ipfs.tech/ipns/ipns-pubsub-router/), which
enables real-time IPNS record propagation. See [`Ipns.UsePubsub`](#ipnsusepubsub)
for details.
The `ipfs pubsub` commands can also be used for basic publish/subscribe
operations, but only if Kubo's built-in message validation (described below) is
acceptable for your use case.
### When to use a dedicated pubsub node
Kubo's pubsub is optimized for IPNS. It uses opinionated message validation
that may not fit all applications. If you need custom Message ID computation,
different deduplication logic, or validation rules beyond what Kubo provides,
consider building a dedicated pubsub node using
[go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub) directly.
### Message deduplication
Kubo uses two layers of message deduplication to handle duplicate messages that
may arrive via different network paths:
**Layer 1: In-memory TimeCache (Message ID)**
When a message arrives, Kubo computes its Message ID (hash of the message
content) and checks an in-memory cache. If the ID was seen recently, the
message is dropped. This cache is controlled by:
- [`Pubsub.SeenMessagesTTL`](#pubsubseenmessagesttl) - how long Message IDs are remembered (default: 120s)
- [`Pubsub.SeenMessagesStrategy`](#pubsubseenmessagesstrategy) - whether TTL resets on each sighting
This cache is fast but limited: it only works within the TTL window and is
cleared on node restart.
**Layer 2: Persistent Seqno Validator (per-peer)**
For stronger deduplication, Kubo tracks the maximum sequence number seen from
each peer and persists it to the datastore. Messages with sequence numbers
lower than the recorded maximum are rejected. This prevents replay attacks and
handles message cycles in large networks where messages may take longer than
the TimeCache TTL to propagate.
This layer survives node restarts. The state can be inspected or cleared using
`ipfs pubsub reset` (for testing/recovery only).
### `Pubsub.Enabled`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Enables the pubsub system.
Default: `false`
@ -2423,8 +2465,6 @@ Type: `flag`
### `Pubsub.Router`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Sets the default router used by pubsub to route messages to peers. This can be one of:
- `"floodsub"` - floodsub is a basic router that simply _floods_ messages to all
@ -2440,10 +2480,9 @@ Type: `string` (one of `"floodsub"`, `"gossipsub"`, or `""` (apply default))
### `Pubsub.DisableSigning`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Disables message signing and signature verification.
Disables message signing and signature verification. Enable this option if
you're operating in a completely trusted network.
**FOR TESTING ONLY - DO NOT USE IN PRODUCTION**
It is _not_ safe to disable signing even if you don't care _who_ sent the
message because spoofed messages can be used to silence real messages by
@ -2455,20 +2494,12 @@ Type: `bool`
### `Pubsub.SeenMessagesTTL`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Controls the time window for the in-memory Message ID cache (Layer 1
deduplication). Messages with the same ID seen within this window are dropped.
Controls the time window within which duplicate messages, identified by Message
ID, will be identified and won't be emitted again.
A smaller value for this parameter means that Pubsub messages in the cache will
be garbage collected sooner, which can result in a smaller cache. At the same
time, if there are slower nodes in the network that forward older messages,
this can cause more duplicates to be propagated through the network.
Conversely, a larger value for this parameter means that Pubsub messages in the
cache will be garbage collected later, which can result in a larger cache for
the same traffic pattern. However, it is less likely that duplicates will be
propagated through the network.
A smaller value reduces memory usage but may cause more duplicates in networks
with slow nodes. A larger value uses more memory but provides better duplicate
detection within the time window.
Default: see `TimeCacheDuration` from [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub)
@ -2476,24 +2507,12 @@ Type: `optionalDuration`
### `Pubsub.SeenMessagesStrategy`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Determines how the TTL countdown for the Message ID cache works.
Determines how the time-to-live (TTL) countdown for deduplicating Pubsub
messages is calculated.
The Pubsub seen messages cache is a LRU cache that keeps messages for up to a
specified time duration. After this duration has elapsed, expired messages will
be purged from the cache.
The `last-seen` cache is a sliding-window cache. Every time a message is seen
again with the SeenMessagesTTL duration, its timestamp slides forward. This
keeps frequently occurring messages cached and prevents them from being
continually propagated, especially because of issues that might increase the
number of duplicate messages in the network.
The `first-seen` cache will store new messages and purge them after the
SeenMessagesTTL duration, even if they are seen multiple times within this
duration.
- `last-seen` - Sliding window: TTL resets each time the message is seen again.
Keeps frequently-seen messages in cache longer, preventing continued propagation.
- `first-seen` - Fixed window: TTL counts from first sighting only. Messages are
purged after the TTL regardless of how many times they're seen.
Default: `last-seen` (see [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub))

View File

@ -375,6 +375,8 @@ kubo now automatically shards when directory block is bigger than 256KB, ensurin
## IPNS pubsub
Specification: [IPNS PubSub Router](https://specs.ipfs.tech/ipns/ipns-pubsub-router/)
### In Version
0.4.14 :
@ -389,13 +391,18 @@ kubo now automatically shards when directory block is bigger than 256KB, ensurin
0.11.0 :
- Can be enabled via `Ipns.UsePubsub` flag in config
0.40.0 :
- Persistent message sequence number validation to prevent message cycles
in large networks
### State
Experimental, default-disabled.
Utilizes pubsub for publishing ipns records in real time.
Utilizes pubsub for publishing IPNS records in real time.
When it is enabled:
- IPNS publishers push records to a name-specific pubsub topic,
in addition to publishing to the DHT.
- IPNS resolvers subscribe to the name-specific topic on first
@ -404,9 +411,6 @@ When it is enabled:
Both the publisher and the resolver nodes need to have the feature enabled for it to work effectively.
Note: While IPNS pubsub has been available since 0.4.14, it received major changes in 0.5.0.
Users interested in this feature should upgrade to at least 0.5.0
### How to enable
Run your daemon with the `--enable-namesys-pubsub` flag
@ -416,13 +420,12 @@ ipfs config --json Ipns.UsePubsub true
```
NOTE:
- This feature implicitly enables [ipfs pubsub](#ipfs-pubsub).
- This feature implicitly enables pubsub.
- Passing `--enable-namesys-pubsub` CLI flag overrides `Ipns.UsePubsub` config.
### Road to being a real feature
- [ ] Needs more people to use and report on how well it works
- [ ] Pubsub enabled as a real feature
## AutoRelay

View File

@ -0,0 +1,147 @@
package cli
import (
"encoding/json"
"testing"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDiagDatastore(t *testing.T) {
t.Parallel()
t.Run("diag datastore get returns error for non-existent key", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Don't start daemon - these commands require daemon to be stopped
res := node.RunIPFS("diag", "datastore", "get", "/nonexistent/key")
assert.Error(t, res.Err)
assert.Contains(t, res.Stderr.String(), "key not found")
})
t.Run("diag datastore get returns raw bytes by default", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Add some data to create a known datastore key
// We need daemon for add, then stop it
node.StartDaemon()
cid := node.IPFSAddStr("test data for diag datastore")
node.IPFS("pin", "add", cid)
node.StopDaemon()
// Test count to verify we have entries
count := node.DatastoreCount("/")
t.Logf("total datastore entries: %d", count)
assert.NotEqual(t, int64(0), count, "should have datastore entries after pinning")
})
t.Run("diag datastore get --hex returns hex dump", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Add and pin some data
node.StartDaemon()
cid := node.IPFSAddStr("test data for hex dump")
node.IPFS("pin", "add", cid)
node.StopDaemon()
// Test with existing keys in pins namespace
count := node.DatastoreCount("/pins/")
t.Logf("pins datastore entries: %d", count)
if count != 0 {
t.Log("pins datastore has entries, hex dump format tested implicitly")
}
})
t.Run("diag datastore count returns 0 for empty prefix", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
count := node.DatastoreCount("/definitely/nonexistent/prefix/")
assert.Equal(t, int64(0), count)
})
t.Run("diag datastore count returns JSON with --enc=json", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
res := node.IPFS("diag", "datastore", "count", "/pubsub/seqno/", "--enc=json")
assert.NoError(t, res.Err)
var result struct {
Prefix string `json:"prefix"`
Count int64 `json:"count"`
}
err := json.Unmarshal(res.Stdout.Bytes(), &result)
require.NoError(t, err)
assert.Equal(t, "/pubsub/seqno/", result.Prefix)
assert.Equal(t, int64(0), result.Count)
})
t.Run("diag datastore get returns JSON with --enc=json", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Test error case with JSON encoding
res := node.RunIPFS("diag", "datastore", "get", "/nonexistent", "--enc=json")
assert.Error(t, res.Err)
})
t.Run("diag datastore count counts entries correctly", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Add multiple pins to create multiple entries
node.StartDaemon()
cid1 := node.IPFSAddStr("data 1")
cid2 := node.IPFSAddStr("data 2")
cid3 := node.IPFSAddStr("data 3")
node.IPFS("pin", "add", cid1)
node.IPFS("pin", "add", cid2)
node.IPFS("pin", "add", cid3)
node.StopDaemon()
// Count should reflect the pins (plus any system entries)
count := node.DatastoreCount("/")
t.Logf("total entries after adding 3 pins: %d", count)
// Should have more than 0 entries
assert.NotEqual(t, int64(0), count)
})
t.Run("diag datastore commands work offline", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Don't start daemon - these commands require daemon to be stopped
// Count should work offline
count := node.DatastoreCount("/pubsub/seqno/")
assert.Equal(t, int64(0), count)
// Get should return error for missing key (but command should work)
res := node.RunIPFS("diag", "datastore", "get", "/nonexistent/key")
assert.Error(t, res.Err)
assert.Contains(t, res.Stderr.String(), "key not found")
})
t.Run("diag datastore commands require daemon to be stopped", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Both get and count require repo lock, which is held by the running daemon
res := node.RunIPFS("diag", "datastore", "get", "/test")
assert.Error(t, res.Err, "get should fail when daemon is running")
assert.Contains(t, res.Stderr.String(), "ipfs daemon is running")
res = node.RunIPFS("diag", "datastore", "count", "/pubsub/seqno/")
assert.Error(t, res.Err, "count should fail when daemon is running")
assert.Contains(t, res.Stderr.String(), "ipfs daemon is running")
})
}

View File

@ -730,3 +730,28 @@ func (n *Node) APIClient() *HTTPClient {
BaseURL: n.APIURL(),
}
}
// DatastoreCount returns the count of entries matching the given prefix.
// Requires the daemon to be stopped.
func (n *Node) DatastoreCount(prefix string) int64 {
res := n.IPFS("diag", "datastore", "count", prefix)
count, _ := strconv.ParseInt(strings.TrimSpace(res.Stdout.String()), 10, 64)
return count
}
// DatastoreGet retrieves the value at the given key.
// Requires the daemon to be stopped. Returns nil if key not found.
func (n *Node) DatastoreGet(key string) []byte {
res := n.RunIPFS("diag", "datastore", "get", key)
if res.Err != nil {
return nil
}
return res.Stdout.Bytes()
}
// DatastoreHasKey checks if a key exists in the datastore.
// Requires the daemon to be stopped.
func (n *Node) DatastoreHasKey(key string) bool {
res := n.RunIPFS("diag", "datastore", "get", key)
return res.Err == nil
}

403
test/cli/pubsub_test.go Normal file
View File

@ -0,0 +1,403 @@
package cli
import (
"context"
"encoding/json"
"slices"
"testing"
"time"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// waitForSubscription waits until the node has a subscription to the given topic.
func waitForSubscription(t *testing.T, node *harness.Node, topic string) {
t.Helper()
require.Eventually(t, func() bool {
res := node.RunIPFS("pubsub", "ls")
if res.Err != nil {
return false
}
return slices.Contains(res.Stdout.Lines(), topic)
}, 5*time.Second, 100*time.Millisecond, "expected subscription to topic %s", topic)
}
// waitForMessagePropagation waits for pubsub messages to propagate through the network
// and for seqno state to be persisted to the datastore.
func waitForMessagePropagation(t *testing.T) {
t.Helper()
time.Sleep(1 * time.Second)
}
// publishMessages publishes n messages from publisher to the given topic with
// a small delay between each to allow for ordered delivery.
func publishMessages(t *testing.T, publisher *harness.Node, topic string, n int) {
t.Helper()
for i := 0; i < n; i++ {
publisher.PipeStrToIPFS("msg", "pubsub", "pub", topic)
time.Sleep(50 * time.Millisecond)
}
}
// TestPubsub tests pubsub functionality and the persistent seqno validator.
//
// Pubsub has two deduplication layers:
//
// Layer 1: MessageID-based TimeCache (in-memory)
// - Controlled by Pubsub.SeenMessagesTTL config (default 120s)
// - Tested in go-libp2p-pubsub (see timecache in github.com/libp2p/go-libp2p-pubsub)
// - Only tested implicitly here via message delivery (timing-sensitive, not practical for CLI tests)
//
// Layer 2: Per-peer seqno validator (persistent in datastore)
// - Stores max seen seqno per peer at /pubsub/seqno/<peerid>
// - Tested directly below: persistence, updates, reset, survives restart
// - Validator: go-libp2p-pubsub BasicSeqnoValidator
func TestPubsub(t *testing.T) {
t.Parallel()
// enablePubsub configures a node with pubsub enabled
enablePubsub := func(n *harness.Node) {
n.SetIPFSConfig("Pubsub.Enabled", true)
n.SetIPFSConfig("Routing.Type", "none") // simplify test setup
}
t.Run("basic pub/sub message delivery", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub enabled
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
subscriber := nodes[0]
publisher := nodes[1]
const topic = "test-topic"
const message = "hello pubsub"
// Start subscriber in background
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Use a channel to receive the message
msgChan := make(chan string, 1)
go func() {
// Subscribe and wait for one message
res := subscriber.RunIPFS("pubsub", "sub", "--enc=json", topic)
if res.Err == nil {
// Parse JSON output to get message data
lines := res.Stdout.Lines()
if len(lines) > 0 {
var msg struct {
Data []byte `json:"data"`
}
if json.Unmarshal([]byte(lines[0]), &msg) == nil {
msgChan <- string(msg.Data)
}
}
}
}()
// Wait for subscriber to be ready
waitForSubscription(t, subscriber, topic)
// Publish message
publisher.PipeStrToIPFS(message, "pubsub", "pub", topic)
// Wait for message or timeout
select {
case received := <-msgChan:
assert.Equal(t, message, received)
case <-ctx.Done():
// Subscriber may not receive in time due to test timing - that's OK
// The main goal is to test the seqno validator state persistence
t.Log("subscriber did not receive message in time (this is acceptable)")
}
})
t.Run("seqno validator state is persisted", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node2PeerID := node2.PeerID().String()
const topic = "seqno-test"
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
// Publish multiple messages from node2 to trigger seqno validation
publishMessages(t, node2, topic, 3)
// Wait for messages to propagate and seqno to be stored
waitForMessagePropagation(t)
// Stop daemons to check datastore (diag datastore requires daemon to be stopped)
nodes.StopDaemons()
// Check that seqno state exists
count := node1.DatastoreCount("/pubsub/seqno/")
t.Logf("seqno entries count: %d", count)
// There should be at least one seqno entry (from node2)
assert.NotEqual(t, int64(0), count, "expected seqno state to be persisted")
// Verify the specific peer's key exists and test --hex output format
key := "/pubsub/seqno/" + node2PeerID
res := node1.RunIPFS("diag", "datastore", "get", "--hex", key)
if res.Err == nil {
t.Logf("seqno for peer %s:\n%s", node2PeerID, res.Stdout.String())
assert.Contains(t, res.Stdout.String(), "Hex Dump:")
} else {
// Key might not exist if messages didn't propagate - log but don't fail
t.Logf("seqno key not found for peer %s (messages may not have propagated)", node2PeerID)
}
})
t.Run("seqno updates when receiving multiple messages", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node2PeerID := node2.PeerID().String()
const topic = "seqno-update-test"
seqnoKey := "/pubsub/seqno/" + node2PeerID
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
// Send first message
node2.PipeStrToIPFS("msg1", "pubsub", "pub", topic)
time.Sleep(500 * time.Millisecond)
// Stop daemons to check seqno (diag datastore requires daemon to be stopped)
nodes.StopDaemons()
// Get seqno after first message
res1 := node1.RunIPFS("diag", "datastore", "get", seqnoKey)
var seqno1 []byte
if res1.Err == nil {
seqno1 = res1.Stdout.Bytes()
t.Logf("seqno after first message: %d bytes", len(seqno1))
} else {
t.Logf("seqno not found after first message (message may not have propagated)")
}
// Restart daemons for second message
nodes = nodes.StartDaemons().Connect()
// Resubscribe
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
// Send second message
node2.PipeStrToIPFS("msg2", "pubsub", "pub", topic)
time.Sleep(500 * time.Millisecond)
// Stop daemons to check seqno
nodes.StopDaemons()
// Get seqno after second message
res2 := node1.RunIPFS("diag", "datastore", "get", seqnoKey)
var seqno2 []byte
if res2.Err == nil {
seqno2 = res2.Stdout.Bytes()
t.Logf("seqno after second message: %d bytes", len(seqno2))
} else {
t.Logf("seqno not found after second message")
}
// If both messages were received, seqno should have been updated
// The seqno is a uint64 that should increase with each message
if len(seqno1) > 0 && len(seqno2) > 0 {
// seqno2 should be >= seqno1 (it's the max seen seqno)
// We just verify they're both non-empty and potentially different
t.Logf("seqno1: %x", seqno1)
t.Logf("seqno2: %x", seqno2)
// The seqno validator stores the max seqno seen, so seqno2 >= seqno1
// We can't do a simple byte comparison due to potential endianness
// but both should be valid uint64 values (8 bytes)
assert.Equal(t, 8, len(seqno2), "seqno should be 8 bytes (uint64)")
}
})
t.Run("pubsub reset clears seqno state", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
const topic = "reset-test"
// Start subscriber and exchange messages
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
publishMessages(t, node2, topic, 3)
waitForMessagePropagation(t)
// Stop daemons to check initial count
nodes.StopDaemons()
// Verify there is state before resetting
initialCount := node1.DatastoreCount("/pubsub/seqno/")
t.Logf("initial seqno count: %d", initialCount)
// Restart node1 to run pubsub reset
node1.StartDaemon()
// Reset all seqno state (while daemon is running)
res := node1.IPFS("pubsub", "reset")
assert.NoError(t, res.Err)
t.Logf("reset output: %s", res.Stdout.String())
// Stop daemon to verify state was cleared
node1.StopDaemon()
// Verify state was cleared
finalCount := node1.DatastoreCount("/pubsub/seqno/")
t.Logf("final seqno count: %d", finalCount)
assert.Equal(t, int64(0), finalCount, "seqno state should be cleared after reset")
})
t.Run("pubsub reset with peer flag", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create three connected nodes
nodes := h.NewNodes(3).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node3 := nodes[2]
node2PeerID := node2.PeerID().String()
node3PeerID := node3.PeerID().String()
const topic = "peer-reset-test"
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
// Publish from both node2 and node3
for range 3 {
node2.PipeStrToIPFS("msg2", "pubsub", "pub", topic)
node3.PipeStrToIPFS("msg3", "pubsub", "pub", topic)
time.Sleep(50 * time.Millisecond)
}
waitForMessagePropagation(t)
// Stop node2 and node3
node2.StopDaemon()
node3.StopDaemon()
// Reset only node2's state (while node1 daemon is running)
res := node1.IPFS("pubsub", "reset", "--peer", node2PeerID)
require.NoError(t, res.Err)
t.Logf("reset output: %s", res.Stdout.String())
// Stop node1 daemon to check datastore
node1.StopDaemon()
// Check that node2's key is gone
res = node1.RunIPFS("diag", "datastore", "get", "/pubsub/seqno/"+node2PeerID)
assert.Error(t, res.Err, "node2's seqno key should be deleted")
// Check that node3's key still exists (if it was created)
res = node1.RunIPFS("diag", "datastore", "get", "/pubsub/seqno/"+node3PeerID)
// Note: node3's key might not exist if messages didn't propagate
// So we just log the result without asserting
if res.Err == nil {
t.Logf("node3's seqno key still exists (as expected)")
} else {
t.Logf("node3's seqno key not found (messages may not have propagated)")
}
})
t.Run("seqno state survives daemon restart", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create and start single node
node := h.NewNode().Init()
enablePubsub(node)
node.StartDaemon()
// We need another node to publish messages
node2 := h.NewNode().Init()
enablePubsub(node2)
node2.StartDaemon()
node.Connect(node2)
const topic = "restart-test"
// Start subscriber and exchange messages
go func() {
node.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node, topic)
publishMessages(t, node2, topic, 3)
waitForMessagePropagation(t)
// Stop daemons to check datastore
node.StopDaemon()
node2.StopDaemon()
// Get count before restart
beforeCount := node.DatastoreCount("/pubsub/seqno/")
t.Logf("seqno count before restart: %d", beforeCount)
// Restart node (simulate restart scenario)
node.StartDaemon()
time.Sleep(500 * time.Millisecond)
// Stop daemon to check datastore again
node.StopDaemon()
// Get count after restart
afterCount := node.DatastoreCount("/pubsub/seqno/")
t.Logf("seqno count after restart: %d", afterCount)
// Count should be the same (state persisted)
assert.Equal(t, beforeCount, afterCount, "seqno state should survive daemon restart")
})
}

View File

@ -1,285 +0,0 @@
package integrationtest
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
"go.uber.org/fx"
"github.com/ipfs/boxo/bootstrap"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core"
"github.com/ipfs/kubo/core/coreapi"
libp2p2 "github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/ipfs/kubo/core/mock"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)
func TestMessageSeenCacheTTL(t *testing.T) {
t.Skip("skipping PubSub seen cache TTL test due to flakiness")
if err := RunMessageSeenCacheTTLTest(t, "10s"); err != nil {
t.Fatal(err)
}
}
func mockNode(ctx context.Context, mn mocknet.Mocknet, pubsubEnabled bool, seenMessagesCacheTTL string) (*core.IpfsNode, error) {
ds := syncds.MutexWrap(datastore.NewMapDatastore())
cfg, err := config.Init(io.Discard, 2048)
if err != nil {
return nil, err
}
count := len(mn.Peers())
cfg.Addresses.Swarm = []string{
fmt.Sprintf("/ip4/18.0.%d.%d/tcp/4001", count>>16, count&0xFF),
}
cfg.Datastore = config.Datastore{}
if pubsubEnabled {
cfg.Pubsub.Enabled = config.True
var ttl *config.OptionalDuration
if len(seenMessagesCacheTTL) > 0 {
ttl = &config.OptionalDuration{}
if err = ttl.UnmarshalJSON([]byte(seenMessagesCacheTTL)); err != nil {
return nil, err
}
}
cfg.Pubsub.SeenMessagesTTL = ttl
}
return core.NewNode(ctx, &core.BuildCfg{
Online: true,
Routing: libp2p2.DHTServerOption,
Repo: &repo.Mock{
C: *cfg,
D: ds,
},
Host: mock.MockHostOption(mn),
ExtraOpts: map[string]bool{
"pubsub": pubsubEnabled,
},
})
}
func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var bootstrapNode, consumerNode, producerNode *core.IpfsNode
var bootstrapPeerID, consumerPeerID, producerPeerID peer.ID
mn := mocknet.New()
bootstrapNode, err := mockNode(ctx, mn, false, "") // no need for PubSub configuration
if err != nil {
t.Fatal(err)
}
bootstrapPeerID = bootstrapNode.PeerHost.ID()
defer bootstrapNode.Close()
consumerNode, err = mockNode(ctx, mn, true, seenMessagesCacheTTL) // use passed seen cache TTL
if err != nil {
t.Fatal(err)
}
consumerPeerID = consumerNode.PeerHost.ID()
defer consumerNode.Close()
ttl, err := time.ParseDuration(seenMessagesCacheTTL)
if err != nil {
t.Fatal(err)
}
// Used for logging the timeline
startTime := time.Time{}
// Used for overriding the message ID
sendMsgID := ""
// Set up the pubsub message ID generation override for the producer
core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) {
var pubsubOptions []pubsub.Option
pubsubOptions = append(
pubsubOptions,
pubsub.WithSeenMessagesTTL(ttl),
pubsub.WithMessageIdFn(func(pmsg *pubsub_pb.Message) string {
now := time.Now()
if startTime.Second() == 0 {
startTime = now
}
timeElapsed := now.Sub(startTime).Seconds()
msg := string(pmsg.Data)
from, _ := peer.IDFromBytes(pmsg.From)
var msgID string
if from == producerPeerID {
msgID = sendMsgID
t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgID, timeElapsed)
} else {
msgID = pubsub.DefaultMsgIdFn(pmsg)
}
return msgID
}),
pubsub.WithSeenMessagesStrategy(timecache.Strategy_LastSeen),
)
return append(
info.FXOptions,
fx.Provide(libp2p2.TopicDiscovery()),
fx.Decorate(libp2p2.GossipSub(pubsubOptions...)),
), nil
})
producerNode, err = mockNode(ctx, mn, false, "") // PubSub configuration comes from overrides above
if err != nil {
t.Fatal(err)
}
producerPeerID = producerNode.PeerHost.ID()
defer producerNode.Close()
t.Logf("bootstrap peer=%s, consumer peer=%s, producer peer=%s", bootstrapPeerID, consumerPeerID, producerPeerID)
producerAPI, err := coreapi.NewCoreAPI(producerNode)
if err != nil {
t.Fatal(err)
}
consumerAPI, err := coreapi.NewCoreAPI(consumerNode)
if err != nil {
t.Fatal(err)
}
err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
bis := bootstrapNode.Peerstore.PeerInfo(bootstrapNode.PeerHost.ID())
bcfg := bootstrap.BootstrapConfigWithPeers([]peer.AddrInfo{bis})
if err = producerNode.Bootstrap(bcfg); err != nil {
t.Fatal(err)
}
if err = consumerNode.Bootstrap(bcfg); err != nil {
t.Fatal(err)
}
// Set up the consumer subscription
const TopicName = "topic"
consumerSubscription, err := consumerAPI.PubSub().Subscribe(ctx, TopicName)
if err != nil {
t.Fatal(err)
}
// Utility functions defined inline to include context in closure
now := func() float64 {
return time.Since(startTime).Seconds()
}
ctr := 0
msgGen := func() string {
ctr++
return fmt.Sprintf("msg_%d", ctr)
}
produceMessage := func() string {
msgTxt := msgGen()
err = producerAPI.PubSub().Publish(ctx, TopicName, []byte(msgTxt))
if err != nil {
t.Fatal(err)
}
return msgTxt
}
consumeMessage := func(msgTxt string, shouldFind bool) {
// Set up a separate timed context for receiving messages
rxCtx, rxCancel := context.WithTimeout(context.Background(), time.Second)
defer rxCancel()
msg, err := consumerSubscription.Next(rxCtx)
if shouldFind {
if err != nil {
t.Logf("expected but did not receive [%s] at T%fs", msgTxt, now())
t.Fatal(err)
}
t.Logf("received [%s] at T%fs", string(msg.Data()), now())
if !bytes.Equal(msg.Data(), []byte(msgTxt)) {
t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), msgTxt)
}
} else {
if err == nil {
t.Logf("not expected but received [%s] at T%fs", string(msg.Data()), now())
t.Fail()
}
t.Logf("did not receive [%s] at T%fs", msgTxt, now())
}
}
const MsgID1 = "MsgID1"
const MsgID2 = "MsgID2"
const MsgID3 = "MsgID3"
// Send message 1 with the message ID we're going to duplicate
sentMsg1 := time.Now()
sendMsgID = MsgID1
msgTxt := produceMessage()
// Should find the message because it's new
consumeMessage(msgTxt, true)
// Send message 2 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should NOT find message because it got deduplicated (sent 2 times within the SeenMessagesTTL window).
consumeMessage(msgTxt, false)
// Send message 3 with a new message ID
sendMsgID = MsgID2
msgTxt = produceMessage()
// Should find the message because it's new
consumeMessage(msgTxt, true)
// Wait till just before the SeenMessagesTTL window has passed since message 1 was sent
time.Sleep(time.Until(sentMsg1.Add(ttl - 100*time.Millisecond)))
// Send message 4 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should NOT find the message because it got deduplicated (sent 3 times within the SeenMessagesTTL window). This
// time, however, the expiration for the message should also get pushed out for a whole SeenMessagesTTL window since
// the default time cache now implements a sliding window algorithm.
consumeMessage(msgTxt, false)
// Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding
// a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window
// starting at message 1 has expired.
sentMsg5 := time.Now()
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should NOT find the message, because it got deduplicated (sent 2 times since the updated SeenMessagesTTL window
// started). This time again, the expiration should get pushed out for another SeenMessagesTTL window.
consumeMessage(msgTxt, false)
// Send message 6 with a message ID that hasn't been seen within a SeenMessagesTTL window
sendMsgID = MsgID2
msgTxt = produceMessage()
// Should find the message since last read > SeenMessagesTTL, so it looks like a new message.
consumeMessage(msgTxt, true)
// Sleep for a full SeenMessagesTTL window to let cache entries time out
time.Sleep(time.Until(sentMsg5.Add(ttl + 100*time.Millisecond)))
// Send message 7 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should find the message this time since last read > SeenMessagesTTL, so it looks like a new message.
consumeMessage(msgTxt, true)
// Send message 8 with a brand new message ID
//
// This step is not strictly necessary, but has been added for good measure.
sendMsgID = MsgID3
msgTxt = produceMessage()
// Should find the message because it's new
consumeMessage(msgTxt, true)
return nil
}