Merge branch 'master' into feat/key-ls-alias

This commit is contained in:
Vedant Madane 2026-01-26 13:38:28 +05:30 committed by GitHub
commit 973e2b56e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 2126 additions and 445 deletions

View File

@ -1,6 +1,45 @@
# Dependabot PRs are auto-tidied by .github/workflows/dependabot-tidy.yml
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "monthly"
open-pull-requests-limit: 10
labels:
- "dependencies"
ignore:
# Updated via go-ds-* wrappers in ipfs-ecosystem group
- dependency-name: "github.com/cockroachdb/pebble*"
- dependency-name: "github.com/syndtr/goleveldb"
- dependency-name: "github.com/dgraph-io/badger*"
groups:
ipfs-ecosystem:
patterns:
- "github.com/ipfs/*"
- "github.com/ipfs-shipyard/*"
- "github.com/ipshipyard/*"
- "github.com/multiformats/*"
- "github.com/ipld/*"
libp2p-ecosystem:
patterns:
- "github.com/libp2p/*"
golang-x:
patterns:
- "golang.org/x/*"
opentelemetry:
patterns:
- "go.opentelemetry.io/*"
prometheus:
patterns:
- "github.com/prometheus/*"
- "contrib.go.opencensus.io/*"
- "go.opencensus.io"
uber:
patterns:
- "go.uber.org/*"

61
.github/workflows/dependabot-tidy.yml vendored Normal file
View File

@ -0,0 +1,61 @@
# Dependabot only updates go.mod/go.sum in the root module, but this repo has
# multiple Go modules (see docs/examples/). This workflow runs `make mod_tidy`
# on Dependabot PRs to keep all go.sum files in sync, preventing go-check CI
# failures.
name: Dependabot Tidy
on:
pull_request_target:
types: [opened, synchronize]
workflow_dispatch:
inputs:
pr_number:
description: 'PR number to run mod_tidy on'
required: true
type: number
permissions:
contents: write
pull-requests: write
jobs:
tidy:
if: github.actor == 'dependabot[bot]' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
steps:
- name: Get PR info
id: pr
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
if [[ "${{ github.event_name }}" == "workflow_dispatch" ]]; then
pr_number="${{ inputs.pr_number }}"
else
pr_number="${{ github.event.pull_request.number }}"
fi
echo "number=$pr_number" >> $GITHUB_OUTPUT
branch=$(gh pr view "$pr_number" --repo "${{ github.repository }}" --json headRefName -q '.headRefName')
echo "branch=$branch" >> $GITHUB_OUTPUT
- uses: actions/checkout@v6
with:
ref: ${{ steps.pr.outputs.branch }}
token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/setup-go@v6
with:
go-version-file: go.mod
- name: Run make mod_tidy
run: make mod_tidy
- name: Check for changes
id: git-check
run: |
if [[ -n $(git status --porcelain) ]]; then
echo "modified=true" >> $GITHUB_OUTPUT
fi
- name: Commit changes
if: steps.git-check.outputs.modified == 'true'
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add -A
git commit -m "chore: run make mod_tidy"
git push

View File

@ -181,8 +181,8 @@ Headers.
cmds.BoolOption(enableGCKwd, "Enable automatic periodic repo garbage collection"),
cmds.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").WithDefault(true),
cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmds.BoolOption(enablePubSubKwd, "DEPRECATED"),
cmds.BoolOption(enableIPNSPubSubKwd, "Enable IPNS over pubsub. Implicitly enables pubsub, overrides Ipns.UsePubsub config."),
cmds.BoolOption(enablePubSubKwd, "DEPRECATED CLI flag. Use Pubsub.Enabled config instead."),
cmds.BoolOption(enableIPNSPubSubKwd, "DEPRECATED CLI flag. Use Ipns.UsePubsub config instead."),
cmds.BoolOption(enableMultiplexKwd, "DEPRECATED"),
cmds.StringOption(agentVersionSuffix, "Optional suffix to the AgentVersion presented by `ipfs id` and exposed via libp2p identify protocol."),
@ -397,10 +397,14 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
fmt.Printf("PeerID: %s\n", cfg.Identity.PeerID)
if !psSet {
if psSet {
log.Error("The --enable-pubsub-experiment flag is deprecated. Use Pubsub.Enabled config option instead.")
} else {
pubsub = cfg.Pubsub.Enabled.WithDefault(false)
}
if !ipnsPsSet {
if ipnsPsSet {
log.Error("The --enable-namesys-pubsub flag is deprecated. Use Ipns.UsePubsub config option instead.")
} else {
ipnsps = cfg.Ipns.UsePubsub.WithDefault(false)
}

3
cmd/ipfswatch/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
ipfswatch
ipfswatch-test-cover
ipfswatch.exe

View File

@ -13,6 +13,7 @@ const (
// Gateway limit defaults from boxo
DefaultRetrievalTimeout = gateway.DefaultRetrievalTimeout
DefaultMaxRequestDuration = gateway.DefaultMaxRequestDuration
DefaultMaxConcurrentRequests = gateway.DefaultMaxConcurrentRequests
DefaultMaxRangeRequestFileSize = 0 // 0 means no limit
)
@ -96,6 +97,14 @@ type Gateway struct {
// A value of 0 disables this timeout.
RetrievalTimeout *OptionalDuration `json:",omitempty"`
// MaxRequestDuration is an absolute deadline for the entire request.
// Unlike RetrievalTimeout (which resets on each data write and catches
// stalled transfers), this is a hard limit on the total time a request
// can take. Returns 504 Gateway Timeout when exceeded.
// This protects the gateway from edge cases and slow client attacks.
// A value of 0 uses the default (1 hour).
MaxRequestDuration *OptionalDuration `json:",omitempty"`
// MaxConcurrentRequests limits concurrent HTTP requests handled by the gateway.
// Requests beyond this limit receive 429 Too Many Requests with Retry-After header.
// A value of 0 disables the limit.

View File

@ -76,6 +76,9 @@ func TestCommands(t *testing.T) {
"/diag/cmds",
"/diag/cmds/clear",
"/diag/cmds/set-time",
"/diag/datastore",
"/diag/datastore/count",
"/diag/datastore/get",
"/diag/profile",
"/diag/sys",
"/files",
@ -170,6 +173,7 @@ func TestCommands(t *testing.T) {
"/pubsub/ls",
"/pubsub/peers",
"/pubsub/pub",
"/pubsub/reset",
"/pubsub/sub",
"/refs",
"/refs/local",

View File

@ -1,7 +1,16 @@
package commands
import (
"encoding/hex"
"errors"
"fmt"
"io"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cmds "github.com/ipfs/go-ipfs-cmds"
oldcmds "github.com/ipfs/kubo/commands"
fsrepo "github.com/ipfs/kubo/repo/fsrepo"
)
var DiagCmd = &cmds.Command{
@ -10,8 +19,182 @@ var DiagCmd = &cmds.Command{
},
Subcommands: map[string]*cmds.Command{
"sys": sysDiagCmd,
"cmds": ActiveReqsCmd,
"profile": sysProfileCmd,
"sys": sysDiagCmd,
"cmds": ActiveReqsCmd,
"profile": sysProfileCmd,
"datastore": diagDatastoreCmd,
},
}
var diagDatastoreCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Low-level datastore inspection for debugging and testing.",
ShortDescription: `
'ipfs diag datastore' provides low-level access to the datastore for debugging
and testing purposes.
WARNING: FOR DEBUGGING/TESTING ONLY
These commands expose internal datastore details and should not be used
in production workflows. The datastore format may change between versions.
The daemon must not be running when calling these commands.
EXAMPLE
Inspecting pubsub seqno validator state:
$ ipfs diag datastore count /pubsub/seqno/
2
$ ipfs diag datastore get --hex /pubsub/seqno/12D3KooW...
Key: /pubsub/seqno/12D3KooW...
Hex Dump:
00000000 18 81 81 c8 91 c0 ea f6 |........|
`,
},
Subcommands: map[string]*cmds.Command{
"get": diagDatastoreGetCmd,
"count": diagDatastoreCountCmd,
},
}
const diagDatastoreHexOptionName = "hex"
type diagDatastoreGetResult struct {
Key string `json:"key"`
Value []byte `json:"value"`
HexDump string `json:"hex_dump,omitempty"`
}
var diagDatastoreGetCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Read a raw key from the datastore.",
ShortDescription: `
Returns the value stored at the given datastore key.
Default output is raw bytes. Use --hex for human-readable hex dump.
The daemon must not be running when using this command.
WARNING: FOR DEBUGGING/TESTING ONLY
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("key", true, false, "Datastore key to read (e.g., /pubsub/seqno/<peerid>)"),
},
Options: []cmds.Option{
cmds.BoolOption(diagDatastoreHexOptionName, "Output hex dump instead of raw bytes"),
},
NoRemote: true,
PreRun: DaemonNotRunning,
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
cctx := env.(*oldcmds.Context)
repo, err := fsrepo.Open(cctx.ConfigRoot)
if err != nil {
return fmt.Errorf("failed to open repo: %w", err)
}
defer repo.Close()
keyStr := req.Arguments[0]
key := datastore.NewKey(keyStr)
ds := repo.Datastore()
val, err := ds.Get(req.Context, key)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
return fmt.Errorf("key not found: %s", keyStr)
}
return fmt.Errorf("failed to read key: %w", err)
}
result := &diagDatastoreGetResult{
Key: keyStr,
Value: val,
}
if hexDump, _ := req.Options[diagDatastoreHexOptionName].(bool); hexDump {
result.HexDump = hex.Dump(val)
}
return cmds.EmitOnce(res, result)
},
Type: diagDatastoreGetResult{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, result *diagDatastoreGetResult) error {
if result.HexDump != "" {
fmt.Fprintf(w, "Key: %s\nHex Dump:\n%s", result.Key, result.HexDump)
return nil
}
// Raw bytes output
_, err := w.Write(result.Value)
return err
}),
},
}
type diagDatastoreCountResult struct {
Prefix string `json:"prefix"`
Count int64 `json:"count"`
}
var diagDatastoreCountCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Count entries matching a datastore prefix.",
ShortDescription: `
Counts the number of datastore entries whose keys start with the given prefix.
The daemon must not be running when using this command.
WARNING: FOR DEBUGGING/TESTING ONLY
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("prefix", true, false, "Datastore key prefix (e.g., /pubsub/seqno/)"),
},
NoRemote: true,
PreRun: DaemonNotRunning,
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
cctx := env.(*oldcmds.Context)
repo, err := fsrepo.Open(cctx.ConfigRoot)
if err != nil {
return fmt.Errorf("failed to open repo: %w", err)
}
defer repo.Close()
prefix := req.Arguments[0]
ds := repo.Datastore()
q := query.Query{
Prefix: prefix,
KeysOnly: true,
}
results, err := ds.Query(req.Context, q)
if err != nil {
return fmt.Errorf("failed to query datastore: %w", err)
}
defer results.Close()
var count int64
for result := range results.Next() {
if result.Error != nil {
return fmt.Errorf("query error: %w", result.Error)
}
count++
}
return cmds.EmitOnce(res, &diagDatastoreCountResult{
Prefix: prefix,
Count: count,
})
},
Type: diagDatastoreCountResult{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, result *diagDatastoreCountResult) error {
_, err := fmt.Fprintf(w, "%d\n", result.Count)
return err
}),
},
}

View File

@ -48,6 +48,7 @@ const (
lsResolveTypeOptionName = "resolve-type"
lsSizeOptionName = "size"
lsStreamOptionName = "stream"
lsLongOptionName = "long"
)
var LsCmd = &cmds.Command{
@ -57,7 +58,26 @@ var LsCmd = &cmds.Command{
Displays the contents of an IPFS or IPNS object(s) at the given path, with
the following format:
<link base58 hash> <link size in bytes> <link name>
<cid> <size> <name>
With the --long (-l) option, display optional file mode (permissions) and
modification time in a format similar to Unix 'ls -l':
<mode> <cid> <size> <mtime> <name>
Mode and mtime are optional UnixFS metadata. They are only present if the
content was imported with 'ipfs add --preserve-mode' and '--preserve-mtime'.
Without preserved metadata, both mode and mtime display '-'. Times are in UTC.
Example with --long and preserved metadata:
-rw-r--r-- QmZULkCELmmk5XNf... 1234 Jan 15 10:30 document.txt
-rwxr-xr-x QmaRGe7bVmVaLmxb... 5678 Dec 01 2023 script.sh
drwxr-xr-x QmWWEQhcLufF3qPm... - Nov 20 2023 subdir/
Example with --long without preserved metadata:
- QmZULkCELmmk5XNf... 1234 - document.txt
The JSON output contains type information.
`,
@ -71,6 +91,7 @@ The JSON output contains type information.
cmds.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmds.BoolOption(lsSizeOptionName, "Resolve linked objects to find out their file size.").WithDefault(true),
cmds.BoolOption(lsStreamOptionName, "s", "Enable experimental streaming of directory entries as they are traversed."),
cmds.BoolOption(lsLongOptionName, "l", "Use a long listing format, showing file mode and modification time."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
@ -215,10 +236,121 @@ The JSON output contains type information.
Type: LsOutput{},
}
// formatMode converts os.FileMode to a 10-character Unix ls-style string.
//
// Format: [type][owner rwx][group rwx][other rwx]
//
// Type indicators: - (regular), d (directory), l (symlink), p (named pipe),
// s (socket), c (char device), b (block device).
//
// Special bits replace the execute position: setuid on owner (s/S),
// setgid on group (s/S), sticky on other (t/T). Lowercase when the
// underlying execute bit is also set, uppercase when not.
func formatMode(mode os.FileMode) string {
var buf [10]byte
// File type - handle all special file types like ls does
switch {
case mode&os.ModeDir != 0:
buf[0] = 'd'
case mode&os.ModeSymlink != 0:
buf[0] = 'l'
case mode&os.ModeNamedPipe != 0:
buf[0] = 'p'
case mode&os.ModeSocket != 0:
buf[0] = 's'
case mode&os.ModeDevice != 0:
if mode&os.ModeCharDevice != 0 {
buf[0] = 'c'
} else {
buf[0] = 'b'
}
default:
buf[0] = '-'
}
// Owner permissions (bits 8,7,6)
buf[1] = permBit(mode, 0400, 'r') // read
buf[2] = permBit(mode, 0200, 'w') // write
// Handle setuid bit for owner execute
if mode&os.ModeSetuid != 0 {
if mode&0100 != 0 {
buf[3] = 's'
} else {
buf[3] = 'S'
}
} else {
buf[3] = permBit(mode, 0100, 'x') // execute
}
// Group permissions (bits 5,4,3)
buf[4] = permBit(mode, 0040, 'r') // read
buf[5] = permBit(mode, 0020, 'w') // write
// Handle setgid bit for group execute
if mode&os.ModeSetgid != 0 {
if mode&0010 != 0 {
buf[6] = 's'
} else {
buf[6] = 'S'
}
} else {
buf[6] = permBit(mode, 0010, 'x') // execute
}
// Other permissions (bits 2,1,0)
buf[7] = permBit(mode, 0004, 'r') // read
buf[8] = permBit(mode, 0002, 'w') // write
// Handle sticky bit for other execute
if mode&os.ModeSticky != 0 {
if mode&0001 != 0 {
buf[9] = 't'
} else {
buf[9] = 'T'
}
} else {
buf[9] = permBit(mode, 0001, 'x') // execute
}
return string(buf[:])
}
// permBit returns the permission character if the bit is set.
func permBit(mode os.FileMode, bit os.FileMode, char byte) byte {
if mode&bit != 0 {
return char
}
return '-'
}
// formatModTime formats time.Time for display, following Unix ls conventions.
//
// Returns "-" for zero time. Otherwise returns a 12-character string:
// recent files (within 6 months) show "Jan 02 15:04",
// older or future files show "Jan 02 2006".
//
// The output uses the timezone embedded in t (UTC for IPFS metadata).
func formatModTime(t time.Time) string {
if t.IsZero() {
return "-"
}
// Format: "Jan 02 15:04" for times within the last 6 months
// Format: "Jan 02 2006" for older times (similar to ls)
now := time.Now()
sixMonthsAgo := now.AddDate(0, -6, 0)
if t.After(sixMonthsAgo) && t.Before(now.Add(24*time.Hour)) {
return t.Format("Jan 02 15:04")
}
return t.Format("Jan 02 2006")
}
func tabularOutput(req *cmds.Request, w io.Writer, out *LsOutput, lastObjectHash string, ignoreBreaks bool) string {
headers, _ := req.Options[lsHeadersOptionNameTime].(bool)
stream, _ := req.Options[lsStreamOptionName].(bool)
size, _ := req.Options[lsSizeOptionName].(bool)
long, _ := req.Options[lsLongOptionName].(bool)
// in streaming mode we can't automatically align the tabs
// so we take a best guess
var minTabWidth int
@ -242,9 +374,21 @@ func tabularOutput(req *cmds.Request, w io.Writer, out *LsOutput, lastObjectHash
fmt.Fprintf(tw, "%s:\n", object.Hash)
}
if headers {
s := "Hash\tName"
if size {
s = "Hash\tSize\tName"
var s string
if long {
// Long format: Mode Hash [Size] ModTime Name
if size {
s = "Mode\tHash\tSize\tModTime\tName"
} else {
s = "Mode\tHash\tModTime\tName"
}
} else {
// Standard format: Hash [Size] Name
if size {
s = "Hash\tSize\tName"
} else {
s = "Hash\tName"
}
}
fmt.Fprintln(tw, s)
}
@ -253,23 +397,54 @@ func tabularOutput(req *cmds.Request, w io.Writer, out *LsOutput, lastObjectHash
for _, link := range object.Links {
var s string
switch link.Type {
case unixfs.TDirectory, unixfs.THAMTShard, unixfs.TMetadata:
if size {
s = "%[1]s\t-\t%[3]s/\n"
} else {
s = "%[1]s\t%[3]s/\n"
}
default:
if size {
s = "%s\t%v\t%s\n"
} else {
s = "%[1]s\t%[3]s\n"
}
}
isDir := link.Type == unixfs.TDirectory || link.Type == unixfs.THAMTShard || link.Type == unixfs.TMetadata
// TODO: Print link.Mode and link.ModTime?
fmt.Fprintf(tw, s, link.Hash, link.Size, cmdenv.EscNonPrint(link.Name))
if long {
// Long format: Mode Hash Size ModTime Name
var mode string
if link.Mode == 0 {
// No mode metadata preserved. Show "-" to indicate
// "not available" rather than "----------" (mode 0000).
mode = "-"
} else {
mode = formatMode(link.Mode)
}
modTime := formatModTime(link.ModTime)
if isDir {
if size {
s = "%s\t%s\t-\t%s\t%s/\n"
} else {
s = "%s\t%s\t%s\t%s/\n"
}
fmt.Fprintf(tw, s, mode, link.Hash, modTime, cmdenv.EscNonPrint(link.Name))
} else {
if size {
s = "%s\t%s\t%v\t%s\t%s\n"
fmt.Fprintf(tw, s, mode, link.Hash, link.Size, modTime, cmdenv.EscNonPrint(link.Name))
} else {
s = "%s\t%s\t%s\t%s\n"
fmt.Fprintf(tw, s, mode, link.Hash, modTime, cmdenv.EscNonPrint(link.Name))
}
}
} else {
// Standard format: Hash [Size] Name
switch {
case isDir:
if size {
s = "%[1]s\t-\t%[3]s/\n"
} else {
s = "%[1]s\t%[3]s/\n"
}
default:
if size {
s = "%s\t%v\t%s\n"
} else {
s = "%[1]s\t%[3]s\n"
}
}
fmt.Fprintf(tw, s, link.Hash, link.Size, cmdenv.EscNonPrint(link.Name))
}
}
}
tw.Flush()

189
core/commands/ls_test.go Normal file
View File

@ -0,0 +1,189 @@
package commands
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestFormatMode(t *testing.T) {
t.Parallel()
tests := []struct {
name string
mode os.FileMode
expected string
}{
// File types
{
name: "regular file with rw-r--r--",
mode: 0644,
expected: "-rw-r--r--",
},
{
name: "regular file with rwxr-xr-x",
mode: 0755,
expected: "-rwxr-xr-x",
},
{
name: "regular file with no permissions",
mode: 0,
expected: "----------",
},
{
name: "regular file with full permissions",
mode: 0777,
expected: "-rwxrwxrwx",
},
{
name: "directory with rwxr-xr-x",
mode: os.ModeDir | 0755,
expected: "drwxr-xr-x",
},
{
name: "directory with rwx------",
mode: os.ModeDir | 0700,
expected: "drwx------",
},
{
name: "symlink with rwxrwxrwx",
mode: os.ModeSymlink | 0777,
expected: "lrwxrwxrwx",
},
{
name: "named pipe with rw-r--r--",
mode: os.ModeNamedPipe | 0644,
expected: "prw-r--r--",
},
{
name: "socket with rw-rw-rw-",
mode: os.ModeSocket | 0666,
expected: "srw-rw-rw-",
},
{
name: "block device with rw-rw----",
mode: os.ModeDevice | 0660,
expected: "brw-rw----",
},
{
name: "character device with rw-rw-rw-",
mode: os.ModeDevice | os.ModeCharDevice | 0666,
expected: "crw-rw-rw-",
},
// Special permission bits - setuid
{
name: "setuid with execute",
mode: os.ModeSetuid | 0755,
expected: "-rwsr-xr-x",
},
{
name: "setuid without execute",
mode: os.ModeSetuid | 0644,
expected: "-rwSr--r--",
},
// Special permission bits - setgid
{
name: "setgid with execute",
mode: os.ModeSetgid | 0755,
expected: "-rwxr-sr-x",
},
{
name: "setgid without execute",
mode: os.ModeSetgid | 0745,
expected: "-rwxr-Sr-x",
},
// Special permission bits - sticky
{
name: "sticky with execute",
mode: os.ModeSticky | 0755,
expected: "-rwxr-xr-t",
},
{
name: "sticky without execute",
mode: os.ModeSticky | 0754,
expected: "-rwxr-xr-T",
},
// Combined special bits
{
name: "setuid + setgid + sticky all with execute",
mode: os.ModeSetuid | os.ModeSetgid | os.ModeSticky | 0777,
expected: "-rwsrwsrwt",
},
{
name: "setuid + setgid + sticky none with execute",
mode: os.ModeSetuid | os.ModeSetgid | os.ModeSticky | 0666,
expected: "-rwSrwSrwT",
},
// Directory with special bits
{
name: "directory with sticky bit",
mode: os.ModeDir | os.ModeSticky | 0755,
expected: "drwxr-xr-t",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
result := formatMode(tc.mode)
assert.Equal(t, tc.expected, result)
})
}
}
func TestFormatModTime(t *testing.T) {
t.Parallel()
t.Run("zero time returns dash", func(t *testing.T) {
t.Parallel()
result := formatModTime(time.Time{})
assert.Equal(t, "-", result)
})
t.Run("old time shows year format", func(t *testing.T) {
t.Parallel()
// Use a time clearly in the past (more than 6 months ago)
oldTime := time.Date(2020, time.March, 15, 10, 30, 0, 0, time.UTC)
result := formatModTime(oldTime)
// Format: "Jan 02 2006" (note: two spaces before year)
assert.Equal(t, "Mar 15 2020", result)
})
t.Run("very old time shows year format", func(t *testing.T) {
t.Parallel()
veryOldTime := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
result := formatModTime(veryOldTime)
assert.Equal(t, "Jan 01 2000", result)
})
t.Run("future time shows year format", func(t *testing.T) {
t.Parallel()
// Times more than 24h in the future should show year format
futureTime := time.Now().AddDate(1, 0, 0)
result := formatModTime(futureTime)
// Should contain the future year
assert.Contains(t, result, " ") // two spaces before year
assert.Regexp(t, `^[A-Z][a-z]{2} \d{2} \d{4}$`, result) // matches "Mon DD YYYY"
assert.Contains(t, result, futureTime.Format("2006")) // contains the year
})
t.Run("format lengths are consistent", func(t *testing.T) {
t.Parallel()
// Both formats should produce 12-character strings for alignment
oldTime := time.Date(2020, time.March, 15, 10, 30, 0, 0, time.UTC)
oldResult := formatModTime(oldTime)
assert.Len(t, oldResult, 12, "old time format should be 12 chars")
// Recent time: use 1 month ago to ensure it's always within the 6-month window
recentTime := time.Now().AddDate(0, -1, 0)
recentResult := formatModTime(recentTime)
assert.Len(t, recentResult, 12, "recent time format should be 12 chars")
})
}

View File

@ -8,26 +8,35 @@ import (
"net/http"
"slices"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
mbase "github.com/multiformats/go-multibase"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cmds "github.com/ipfs/go-ipfs-cmds"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
options "github.com/ipfs/kubo/core/coreiface/options"
"github.com/ipfs/kubo/core/node/libp2p"
"github.com/libp2p/go-libp2p/core/peer"
mbase "github.com/multiformats/go-multibase"
)
var PubsubCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "An experimental publish-subscribe system on ipfs.",
ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
The default message validator is designed for IPNS record protocol.
For custom pubsub applications requiring different validation logic,
use go-libp2p-pubsub (https://github.com/libp2p/go-libp2p-pubsub)
directly in a dedicated binary.
To enable, set 'Pubsub.Enabled' config to true.
`,
},
Subcommands: map[string]*cmds.Command{
@ -35,6 +44,7 @@ DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
"sub": PubsubSubCmd,
"ls": PubsubLsCmd,
"peers": PubsubPeersCmd,
"reset": PubsubResetCmd,
},
}
@ -46,17 +56,18 @@ type pubsubMessage struct {
}
var PubsubSubCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Subscribe to messages on a given topic.",
ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
To enable, set 'Pubsub.Enabled' config to true.
PEER ENCODING
@ -145,18 +156,19 @@ TOPIC AND DATA ENCODING
}
var PubsubPubCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Publish data to a given pubsub topic.",
ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.
It reads binary data from stdin or a file.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
To enable, set 'Pubsub.Enabled' config to true.
HTTP RPC ENCODING
@ -201,17 +213,18 @@ HTTP RPC ENCODING
}
var PubsubLsCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "List subscribed topics by name.",
ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
To enable, set 'Pubsub.Enabled' config to true.
TOPIC ENCODING
@ -273,7 +286,7 @@ func safeTextListEncoder(req *cmds.Request, w io.Writer, list *stringList) error
}
var PubsubPeersCmd = &cmds.Command{
Status: cmds.Deprecated,
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "List peers we are currently pubsubbing with.",
ShortDescription: `
@ -281,11 +294,12 @@ ipfs pubsub peers with no arguments lists out the pubsub peers you are
currently connected to. If given a topic, it will list connected peers who are
subscribed to the named topic.
DEPRECATED FEATURE (see https://github.com/ipfs/kubo/issues/9717)
EXPERIMENTAL FEATURE
It is not intended in its current state to be used in a production
environment. To use, the daemon must be run with
'--enable-pubsub-experiment'.
This is an opt-in feature optimized for IPNS over PubSub
(https://specs.ipfs.tech/ipns/ipns-pubsub-router/).
To enable, set 'Pubsub.Enabled' config to true.
TOPIC AND DATA ENCODING
@ -367,3 +381,122 @@ func urlArgsDecoder(req *cmds.Request, env cmds.Environment) error {
}
return nil
}
type pubsubResetResult struct {
Deleted int64 `json:"deleted"`
}
var PubsubResetCmd = &cmds.Command{
Status: cmds.Experimental,
Helptext: cmds.HelpText{
Tagline: "Reset pubsub validator state.",
ShortDescription: `
Clears persistent sequence number state used by the pubsub validator.
WARNING: FOR TESTING ONLY - DO NOT USE IN PRODUCTION
Resets validator state that protects against replay attacks. After reset,
previously seen messages may be accepted again until their sequence numbers
are re-learned.
Use cases:
- Testing pubsub functionality
- Recovery from a peer sending artificially high sequence numbers
(which would cause subsequent messages from that peer to be rejected)
The --peer flag limits the reset to a specific peer's state.
Without --peer, all validator state is cleared.
NOTE: This only resets the persistent seqno validator state. The in-memory
seen messages cache (Pubsub.SeenMessagesTTL) auto-expires and can only be
fully cleared by restarting the daemon.
`,
},
Options: []cmds.Option{
cmds.StringOption(peerOptionName, "p", "Only reset state for this peer ID"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
if err != nil {
return err
}
ds := n.Repo.Datastore()
ctx := req.Context
peerOpt, _ := req.Options[peerOptionName].(string)
var deleted int64
if peerOpt != "" {
// Reset specific peer
pid, err := peer.Decode(peerOpt)
if err != nil {
return fmt.Errorf("invalid peer ID: %w", err)
}
key := datastore.NewKey(libp2p.SeqnoStorePrefix + pid.String())
exists, err := ds.Has(ctx, key)
if err != nil {
return fmt.Errorf("failed to check seqno state: %w", err)
}
if exists {
if err := ds.Delete(ctx, key); err != nil {
return fmt.Errorf("failed to delete seqno state: %w", err)
}
deleted = 1
}
} else {
// Reset all peers using batched delete for efficiency
q := query.Query{
Prefix: libp2p.SeqnoStorePrefix,
KeysOnly: true,
}
results, err := ds.Query(ctx, q)
if err != nil {
return fmt.Errorf("failed to query seqno state: %w", err)
}
defer results.Close()
batch, err := ds.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create batch: %w", err)
}
for result := range results.Next() {
if result.Error != nil {
return fmt.Errorf("query error: %w", result.Error)
}
if err := batch.Delete(ctx, datastore.NewKey(result.Key)); err != nil {
return fmt.Errorf("failed to batch delete key %s: %w", result.Key, err)
}
deleted++
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch delete: %w", err)
}
}
// Sync to ensure deletions are persisted
if err := ds.Sync(ctx, datastore.NewKey(libp2p.SeqnoStorePrefix)); err != nil {
return fmt.Errorf("failed to sync datastore: %w", err)
}
return cmds.EmitOnce(res, &pubsubResetResult{Deleted: deleted})
},
Type: pubsubResetResult{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, result *pubsubResetResult) error {
peerOpt, _ := req.Options[peerOptionName].(string)
if peerOpt != "" {
if result.Deleted == 0 {
_, err := fmt.Fprintf(w, "No validator state found for peer %s\n", peerOpt)
return err
}
_, err := fmt.Fprintf(w, "Reset validator state for peer %s\n", peerOpt)
return err
}
_, err := fmt.Fprintf(w, "Reset validator state for %d peer(s)\n", result.Deleted)
return err
}),
},
}

View File

@ -17,6 +17,7 @@ import (
dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/provider"
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
ipld "github.com/ipfs/go-ipld-format"
@ -296,9 +297,9 @@ Trigger reprovider to announce our data to network.
if cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0 {
return errors.New("invalid configuration: Provide.DHT.Interval is set to '0'")
}
provideSys, ok := nd.Provider.(*node.LegacyProvider)
provideSys, ok := nd.Provider.(provider.Reprovider)
if !ok {
return errors.New("manual reprovide not available with experimental sweeping provider (Provide.DHT.SweepEnabled=true)")
return errors.New("manual reprovide only available with legacy provider (Provide.DHT.SweepEnabled=false)")
}
err = provideSys.Reprovide(req.Context)

View File

@ -112,6 +112,7 @@ func Libp2pGatewayOption() ServeOption {
Menu: nil,
// Apply timeout and concurrency limits from user config
RetrievalTimeout: cfg.Gateway.RetrievalTimeout.WithDefault(config.DefaultRetrievalTimeout),
MaxRequestDuration: cfg.Gateway.MaxRequestDuration.WithDefault(config.DefaultMaxRequestDuration),
MaxConcurrentRequests: int(cfg.Gateway.MaxConcurrentRequests.WithDefault(int64(config.DefaultMaxConcurrentRequests))),
MaxRangeRequestFileSize: int64(cfg.Gateway.MaxRangeRequestFileSize.WithDefault(uint64(config.DefaultMaxRangeRequestFileSize))),
DiagnosticServiceURL: "", // Not used since DisableHTMLErrors=true
@ -272,6 +273,7 @@ func getGatewayConfig(n *core.IpfsNode) (gateway.Config, map[string][]string, er
NoDNSLink: cfg.Gateway.NoDNSLink,
PublicGateways: map[string]*gateway.PublicGateway{},
RetrievalTimeout: cfg.Gateway.RetrievalTimeout.WithDefault(config.DefaultRetrievalTimeout),
MaxRequestDuration: cfg.Gateway.MaxRequestDuration.WithDefault(config.DefaultMaxRequestDuration),
MaxConcurrentRequests: int(cfg.Gateway.MaxConcurrentRequests.WithDefault(int64(config.DefaultMaxConcurrentRequests))),
MaxRangeRequestFileSize: int64(cfg.Gateway.MaxRangeRequestFileSize.WithDefault(uint64(config.DefaultMaxRangeRequestFileSize))),
DiagnosticServiceURL: cfg.Gateway.DiagnosticServiceURL.WithDefault(config.DefaultDiagnosticServiceURL),

View File

@ -1,26 +1,85 @@
package libp2p
import (
"context"
"errors"
"log/slog"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/fx"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
)
type pubsubParams struct {
fx.In
Repo repo.Repo
Host host.Host
Discovery discovery.Discovery
}
func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params pubsubParams) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(
helpers.LifecycleCtx(mctx, lc),
params.Host,
append(pubsubOptions,
pubsub.WithDiscovery(params.Discovery),
pubsub.WithDefaultValidator(newSeqnoValidator(params.Repo.Datastore())))...,
)
}
}
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(
pubsubOptions,
pubsub.WithDiscovery(disc),
pubsub.WithFloodPublish(true))...,
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params pubsubParams) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(
helpers.LifecycleCtx(mctx, lc),
params.Host,
append(pubsubOptions,
pubsub.WithDiscovery(params.Discovery),
pubsub.WithFloodPublish(true), // flood own publications to all peers for reliable IPNS delivery
pubsub.WithDefaultValidator(newSeqnoValidator(params.Repo.Datastore())))...,
)
}
}
func newSeqnoValidator(ds datastore.Datastore) pubsub.ValidatorEx {
return pubsub.NewBasicSeqnoValidator(&seqnoStore{ds: ds}, slog.New(logging.SlogHandler()).With("logger", "pubsub"))
}
// SeqnoStorePrefix is the datastore prefix for pubsub seqno validator state.
const SeqnoStorePrefix = "/pubsub/seqno/"
// seqnoStore implements pubsub.PeerMetadataStore using the repo datastore.
// It stores the maximum seen sequence number per peer to prevent message
// cycles when network diameter exceeds the timecache span.
type seqnoStore struct {
ds datastore.Datastore
}
var _ pubsub.PeerMetadataStore = (*seqnoStore)(nil)
// Get returns the stored seqno for a peer, or (nil, nil) if the peer is unknown.
// Returning (nil, nil) for unknown peers allows BasicSeqnoValidator to accept
// the first message from any peer.
func (s *seqnoStore) Get(ctx context.Context, p peer.ID) ([]byte, error) {
key := datastore.NewKey(SeqnoStorePrefix + p.String())
val, err := s.ds.Get(ctx, key)
if errors.Is(err, datastore.ErrNotFound) {
return nil, nil
}
return val, err
}
// Put stores the seqno for a peer.
func (s *seqnoStore) Put(ctx context.Context, p peer.ID, val []byte) error {
key := datastore.NewKey(SeqnoStorePrefix + p.String())
return s.ds.Put(ctx, key, val)
}

View File

@ -0,0 +1,130 @@
package libp2p
import (
"encoding/binary"
"testing"
"github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
// TestSeqnoStore tests the seqnoStore implementation which backs the
// BasicSeqnoValidator. The validator prevents message cycles when network
// diameter exceeds the timecache span by tracking the maximum sequence number
// seen from each peer.
func TestSeqnoStore(t *testing.T) {
ctx := t.Context()
ds := syncds.MutexWrap(datastore.NewMapDatastore())
store := &seqnoStore{ds: ds}
peerA, err := peer.Decode("12D3KooWGC6TvWhfapngX6wvJHMYvKpDMXPb3ZnCZ6dMoaMtimQ5")
require.NoError(t, err)
peerB, err := peer.Decode("12D3KooWJRqDKTRjvXeGdUEgwkHNsoghYMBUagNYgLPdA4mqdTeo")
require.NoError(t, err)
// BasicSeqnoValidator expects Get to return (nil, nil) for unknown peers,
// not an error. This allows the validator to accept the first message from
// any peer without special-casing.
t.Run("unknown peer returns nil without error", func(t *testing.T) {
val, err := store.Get(ctx, peerA)
require.NoError(t, err)
require.Nil(t, val, "unknown peer should return nil, not empty slice")
})
// Verify basic store/retrieve functionality with a sequence number encoded
// as big-endian uint64, matching the format used by BasicSeqnoValidator.
t.Run("stores and retrieves seqno", func(t *testing.T) {
seqno := uint64(12345)
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, seqno)
err := store.Put(ctx, peerA, data)
require.NoError(t, err)
val, err := store.Get(ctx, peerA)
require.NoError(t, err)
require.Equal(t, seqno, binary.BigEndian.Uint64(val))
})
// Each peer must have isolated storage. If peer data leaked between peers,
// the validator would incorrectly reject valid messages or accept replays.
t.Run("isolates seqno per peer", func(t *testing.T) {
seqnoA := uint64(100)
seqnoB := uint64(200)
dataA := make([]byte, 8)
dataB := make([]byte, 8)
binary.BigEndian.PutUint64(dataA, seqnoA)
binary.BigEndian.PutUint64(dataB, seqnoB)
err := store.Put(ctx, peerA, dataA)
require.NoError(t, err)
err = store.Put(ctx, peerB, dataB)
require.NoError(t, err)
valA, err := store.Get(ctx, peerA)
require.NoError(t, err)
require.Equal(t, seqnoA, binary.BigEndian.Uint64(valA))
valB, err := store.Get(ctx, peerB)
require.NoError(t, err)
require.Equal(t, seqnoB, binary.BigEndian.Uint64(valB))
})
// The validator updates the stored seqno when accepting messages with
// higher seqnos. This test verifies that updates work correctly.
t.Run("updates seqno to higher value", func(t *testing.T) {
seqno1 := uint64(1000)
seqno2 := uint64(2000)
data1 := make([]byte, 8)
data2 := make([]byte, 8)
binary.BigEndian.PutUint64(data1, seqno1)
binary.BigEndian.PutUint64(data2, seqno2)
err := store.Put(ctx, peerA, data1)
require.NoError(t, err)
err = store.Put(ctx, peerA, data2)
require.NoError(t, err)
val, err := store.Get(ctx, peerA)
require.NoError(t, err)
require.Equal(t, seqno2, binary.BigEndian.Uint64(val))
})
// Verify the datastore key format. This is important for:
// 1. Debugging: operators can inspect/clear pubsub state
// 2. Migrations: future changes need to know the key format
t.Run("uses expected datastore key format", func(t *testing.T) {
seqno := uint64(42)
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, seqno)
err := store.Put(ctx, peerA, data)
require.NoError(t, err)
// Verify we can read directly from datastore with expected key
expectedKey := datastore.NewKey("/pubsub/seqno/" + peerA.String())
val, err := ds.Get(ctx, expectedKey)
require.NoError(t, err)
require.Equal(t, seqno, binary.BigEndian.Uint64(val))
})
// Verify data persists when creating a new store instance with the same
// underlying datastore. This simulates node restart.
t.Run("persists across store instances", func(t *testing.T) {
seqno := uint64(99999)
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, seqno)
err := store.Put(ctx, peerB, data)
require.NoError(t, err)
// Create new store instance with same datastore
store2 := &seqnoStore{ds: ds}
val, err := store2.Get(ctx, peerB)
require.NoError(t, err)
require.Equal(t, seqno, binary.BigEndian.Uint64(val))
})
}

View File

@ -16,6 +16,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
log "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/repo"
irouting "github.com/ipfs/kubo/routing"
@ -53,6 +54,8 @@ const (
keystoreDatastorePath = "keystore"
)
var errAcceleratedDHTNotReady = errors.New("AcceleratedDHTClient: routing table not ready")
// Interval between reprovide queue monitoring checks for slow reprovide alerts.
// Used when Provide.DHT.SweepEnabled=true
const reprovideAlertPollInterval = 15 * time.Minute
@ -325,13 +328,34 @@ type dhtImpl interface {
type fullrtRouter struct {
*fullrt.FullRT
ready bool
logger *log.ZapEventLogger
}
func newFullRTRouter(fr *fullrt.FullRT, loggerName string) *fullrtRouter {
return &fullrtRouter{
FullRT: fr,
ready: true,
logger: log.Logger(loggerName),
}
}
// GetClosestPeers overrides fullrt.FullRT's GetClosestPeers and returns an
// error if the fullrt's initial network crawl isn't complete yet.
func (fr *fullrtRouter) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
if !fr.Ready() {
return nil, errors.New("fullrt: initial network crawl still running")
if fr.ready {
if !fr.Ready() {
fr.ready = false
fr.logger.Info("AcceleratedDHTClient: waiting for routing table initialization (5-10 min, depends on DHT size and network) to complete before providing")
return nil, errAcceleratedDHTNotReady
}
} else {
if fr.Ready() {
fr.ready = true
fr.logger.Info("AcceleratedDHTClient: routing table ready, providing can begin")
} else {
return nil, errAcceleratedDHTNotReady
}
}
return fr.FullRT.GetClosestPeers(ctx, key)
}
@ -382,6 +406,9 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
// provides happen as fast as possible via a dedicated worker that continuously
// processes the queue regardless of this timing.
bufferedIdleWriteTime = time.Minute
// loggerName is the name of the go-log logger used by the provider.
loggerName = dhtprovider.DefaultLoggerName
)
bufferedProviderOpts := []buffered.Option{
@ -411,6 +438,8 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
ddhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))),
ddhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))),
ddhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))),
ddhtprovider.WithLoggerName(loggerName),
)
if err != nil {
return nil, nil, err
@ -419,7 +448,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
}
case *fullrt.FullRT:
if inDht != nil {
impl = &fullrtRouter{inDht}
impl = newFullRTRouter(inDht, loggerName)
}
}
if impl == nil {
@ -454,6 +483,8 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
dhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))),
dhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))),
dhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))),
dhtprovider.WithLoggerName(loggerName),
}
prov, err := dhtprovider.New(opts...)

View File

@ -13,11 +13,15 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team.
- [🧹 Automatic cleanup of interrupted imports](#-automatic-cleanup-of-interrupted-imports)
- [Routing V1 HTTP API now exposed by default](#routing-v1-http-api-now-exposed-by-default)
- [Track total size when adding pins](#track-total-size-when-adding-pins)
- [Improved IPNS over PubSub validation](#improved-ipns-over-pubsub-validation)
- [New `ipfs diag datastore` commands](#new-ipfs-diag-datastore-commands)
- [🚇 Improved `ipfs p2p` tunnels with foreground mode](#-improved-ipfs-p2p-tunnels-with-foreground-mode)
- [Improved `ipfs dag stat` output](#improved-ipfs-dag-stat-output)
- [Skip bad keys when listing](#skip_bad_keys_when_listing)
- [Accelerated DHT Client and Provide Sweep now work together](#accelerated-dht-client-and-provide-sweep-now-work-together)
- [⏱️ Configurable gateway request duration limit](#-configurable-gateway-request-duration-limit)
- [🔧 Recovery from corrupted MFS root](#-recovery-from-corrupted-mfs-root)
- [📋 Long listing format for `ipfs ls`](#-long-listing-format-for-ipfs-ls)
- [📦️ Dependency updates](#-dependency-updates)
- [📝 Changelog](#-changelog)
- [👨‍👩‍👧‍👦 Contributors](#-contributors)
@ -48,6 +52,23 @@ Example output:
Fetched/Processed 336 nodes (83 MB)
```
#### Improved IPNS over PubSub validation
[IPNS over PubSub](https://specs.ipfs.tech/ipns/ipns-pubsub-router/) implementation in Kubo is now more reliable. Duplicate messages are rejected even in large networks where messages may cycle back after the in-memory cache expires.
Kubo now persists the maximum seen sequence number per peer to the datastore ([go-libp2p-pubsub#BasicSeqnoValidator](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#BasicSeqnoValidator)), providing stronger duplicate detection that survives node restarts. This addresses message flooding issues reported in [#9665](https://github.com/ipfs/kubo/issues/9665).
Kubo's pubsub is optimized for IPNS use case. For custom pubsub applications requiring different validation logic, use [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub) directly in a dedicated binary.
#### New `ipfs diag datastore` commands
New experimental commands for low-level datastore inspection:
- `ipfs diag datastore get <key>` - Read raw value at a datastore key (use `--hex` for hex dump)
- `ipfs diag datastore count <prefix>` - Count entries matching a datastore prefix
The daemon must not be running when using these commands. Run `ipfs diag datastore --help` for usage examples.
#### 🚇 Improved `ipfs p2p` tunnels with foreground mode
P2P tunnels can now run like SSH port forwarding: start a tunnel, use it, and it cleans up automatically when you're done.
@ -97,6 +118,12 @@ Change the `ipfs key list` behavior to log an error and continue listing keys wh
Previously, provide operations could start before the Accelerated DHT Client discovered enough peers, causing sweep mode to lose its efficiency benefits. Now, providing waits for the initial network crawl (about 10 minutes). Your content will be properly distributed across DHT regions after initial DHT map is created. Check `ipfs provide stat` to see when providing begins.
#### ⏱️ Configurable gateway request duration limit
[`Gateway.MaxRequestDuration`](https://github.com/ipfs/kubo/blob/master/docs/config.md#gatewaymaxrequestduration) sets an absolute deadline for gateway requests. Unlike `RetrievalTimeout` (which resets on each data write and catches stalled transfers), this is a hard limit on the total time a request can take.
The default 1 hour limit (previously hardcoded) can now be adjusted to fit your deployment needs. This is a fallback that prevents requests from hanging indefinitely when subsystem timeouts are misconfigured or fail to trigger. Returns 504 Gateway Timeout when exceeded.
#### 🔧 Recovery from corrupted MFS root
If your daemon fails to start because the MFS root is not a directory (due to misconfiguration, operational error, or disk corruption), you can now recover without deleting and recreating your repository in a new `IPFS_PATH`.
@ -113,6 +140,10 @@ $ ipfs files chroot --confirm QmYourBackupCID
See `ipfs files chroot --help` for details.
#### 📋 Long listing format for `ipfs ls`
The `ipfs ls` command now supports `--long` (`-l`) flag for displaying Unix-style file permissions and modification times. This works with files added using `--preserve-mode` and `--preserve-mtime`. See `ipfs ls --help` for format details and examples.
#### 📦️ Dependency updates
- update `go-libp2p` to [v0.46.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.46.0)
@ -120,8 +151,8 @@ See `ipfs files chroot --help` for details.
- Fixed mDNS discovery on Windows and macOS by filtering addresses to reduce packet size ([go-libp2p#3434](https://github.com/libp2p/go-libp2p/pull/3434)).
- update `quic-go` to [v0.57.1](https://github.com/quic-go/quic-go/releases/tag/v0.57.1) (incl. [v0.56.0](https://github.com/quic-go/quic-go/releases/tag/v0.56.0) + [v0.57.0](https://github.com/quic-go/quic-go/releases/tag/v0.57.0))
- update `p2p-forge` to [v0.7.0](https://github.com/ipshipyard/p2p-forge/releases/tag/v0.7.0)
- update `go-ds-pebble` to [v0.5.8](https://github.com/ipfs/go-ds-pebble/releases/tag/v0.5.8)
- updates `github.com/cockroachdb/pebble` to [v2.1.3](https://github.com/cockroachdb/pebble/releases/tag/v2.1.3) to enable Go 1.26 support
- update `go-ds-pebble` to [v0.5.9](https://github.com/ipfs/go-ds-pebble/releases/tag/v0.5.9)
- updates `github.com/cockroachdb/pebble` to [v2.1.4](https://github.com/cockroachdb/pebble/releases/tag/v2.1.4) to enable Go 1.26 support
- update `go-libp2p-pubsub` to [v0.15.0](https://github.com/libp2p/go-libp2p-pubsub/releases/tag/v0.15.0)
### 📝 Changelog

View File

@ -67,6 +67,7 @@ config file at runtime.
- [`Gateway.DisableHTMLErrors`](#gatewaydisablehtmlerrors)
- [`Gateway.ExposeRoutingAPI`](#gatewayexposeroutingapi)
- [`Gateway.RetrievalTimeout`](#gatewayretrievaltimeout)
- [`Gateway.MaxRequestDuration`](#gatewaymaxrequestduration)
- [`Gateway.MaxRangeRequestFileSize`](#gatewaymaxrangerequestfilesize)
- [`Gateway.MaxConcurrentRequests`](#gatewaymaxconcurrentrequests)
- [`Gateway.HTTPHeaders`](#gatewayhttpheaders)
@ -145,6 +146,8 @@ config file at runtime.
- [`Provider.Strategy`](#providerstrategy)
- [`Provider.WorkerCount`](#providerworkercount)
- [`Pubsub`](#pubsub)
- [When to use a dedicated pubsub node](#when-to-use-a-dedicated-pubsub-node)
- [Message deduplication](#message-deduplication)
- [`Pubsub.Enabled`](#pubsubenabled)
- [`Pubsub.Router`](#pubsubrouter)
- [`Pubsub.DisableSigning`](#pubsubdisablesigning)
@ -1178,6 +1181,16 @@ Default: `30s`
Type: `optionalDuration`
### `Gateway.MaxRequestDuration`
An absolute deadline for the entire gateway request. Unlike [`RetrievalTimeout`](#gatewayretrievaltimeout) (which resets on each data write and catches stalled transfers), this is a hard limit on the total time a request can take.
Returns 504 Gateway Timeout when exceeded. This protects the gateway from edge cases and slow client attacks.
Default: `1h`
Type: `optionalDuration`
### `Gateway.MaxRangeRequestFileSize`
Maximum file size for HTTP range requests on deserialized responses. Range requests for files larger than this limit return 501 Not Implemented.
@ -1776,7 +1789,7 @@ Type: `optionalDuration`
### `Ipns.UsePubsub`
Enables IPFS over pubsub experiment for publishing IPNS records in real time.
Enables [IPNS over PubSub](https://specs.ipfs.tech/ipns/ipns-pubsub-router/) for publishing and resolving IPNS records in real time.
**EXPERIMENTAL:** read about current limitations at [experimental-features.md#ipns-pubsub](./experimental-features.md#ipns-pubsub).
@ -2394,16 +2407,56 @@ Replaced with [`Provide.DHT.MaxWorkers`](#providedhtmaxworkers).
## `Pubsub`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Pubsub configures Kubo's opt-in, opinionated [libp2p pubsub](https://docs.libp2p.io/concepts/pubsub/overview/) instance.
To enable, set `Pubsub.Enabled` to `true`.
Pubsub configures the `ipfs pubsub` subsystem. To use, it must be enabled by
passing the `--enable-pubsub-experiment` flag to the daemon
or via the `Pubsub.Enabled` flag below.
**EXPERIMENTAL:** This is an opt-in feature. Its primary use case is
[IPNS over PubSub](https://specs.ipfs.tech/ipns/ipns-pubsub-router/), which
enables real-time IPNS record propagation. See [`Ipns.UsePubsub`](#ipnsusepubsub)
for details.
The `ipfs pubsub` commands can also be used for basic publish/subscribe
operations, but only if Kubo's built-in message validation (described below) is
acceptable for your use case.
### When to use a dedicated pubsub node
Kubo's pubsub is optimized for IPNS. It uses opinionated message validation
that may not fit all applications. If you need custom Message ID computation,
different deduplication logic, or validation rules beyond what Kubo provides,
consider building a dedicated pubsub node using
[go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub) directly.
### Message deduplication
Kubo uses two layers of message deduplication to handle duplicate messages that
may arrive via different network paths:
**Layer 1: In-memory TimeCache (Message ID)**
When a message arrives, Kubo computes its Message ID (hash of the message
content) and checks an in-memory cache. If the ID was seen recently, the
message is dropped. This cache is controlled by:
- [`Pubsub.SeenMessagesTTL`](#pubsubseenmessagesttl) - how long Message IDs are remembered (default: 120s)
- [`Pubsub.SeenMessagesStrategy`](#pubsubseenmessagesstrategy) - whether TTL resets on each sighting
This cache is fast but limited: it only works within the TTL window and is
cleared on node restart.
**Layer 2: Persistent Seqno Validator (per-peer)**
For stronger deduplication, Kubo tracks the maximum sequence number seen from
each peer and persists it to the datastore. Messages with sequence numbers
lower than the recorded maximum are rejected. This prevents replay attacks and
handles message cycles in large networks where messages may take longer than
the TimeCache TTL to propagate.
This layer survives node restarts. The state can be inspected or cleared using
`ipfs pubsub reset` (for testing/recovery only).
### `Pubsub.Enabled`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Enables the pubsub system.
Default: `false`
@ -2412,8 +2465,6 @@ Type: `flag`
### `Pubsub.Router`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Sets the default router used by pubsub to route messages to peers. This can be one of:
- `"floodsub"` - floodsub is a basic router that simply _floods_ messages to all
@ -2429,10 +2480,9 @@ Type: `string` (one of `"floodsub"`, `"gossipsub"`, or `""` (apply default))
### `Pubsub.DisableSigning`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Disables message signing and signature verification.
Disables message signing and signature verification. Enable this option if
you're operating in a completely trusted network.
**FOR TESTING ONLY - DO NOT USE IN PRODUCTION**
It is _not_ safe to disable signing even if you don't care _who_ sent the
message because spoofed messages can be used to silence real messages by
@ -2444,20 +2494,12 @@ Type: `bool`
### `Pubsub.SeenMessagesTTL`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Controls the time window for the in-memory Message ID cache (Layer 1
deduplication). Messages with the same ID seen within this window are dropped.
Controls the time window within which duplicate messages, identified by Message
ID, will be identified and won't be emitted again.
A smaller value for this parameter means that Pubsub messages in the cache will
be garbage collected sooner, which can result in a smaller cache. At the same
time, if there are slower nodes in the network that forward older messages,
this can cause more duplicates to be propagated through the network.
Conversely, a larger value for this parameter means that Pubsub messages in the
cache will be garbage collected later, which can result in a larger cache for
the same traffic pattern. However, it is less likely that duplicates will be
propagated through the network.
A smaller value reduces memory usage but may cause more duplicates in networks
with slow nodes. A larger value uses more memory but provides better duplicate
detection within the time window.
Default: see `TimeCacheDuration` from [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub)
@ -2465,24 +2507,12 @@ Type: `optionalDuration`
### `Pubsub.SeenMessagesStrategy`
**DEPRECATED**: See [#9717](https://github.com/ipfs/kubo/issues/9717)
Determines how the TTL countdown for the Message ID cache works.
Determines how the time-to-live (TTL) countdown for deduplicating Pubsub
messages is calculated.
The Pubsub seen messages cache is a LRU cache that keeps messages for up to a
specified time duration. After this duration has elapsed, expired messages will
be purged from the cache.
The `last-seen` cache is a sliding-window cache. Every time a message is seen
again with the SeenMessagesTTL duration, its timestamp slides forward. This
keeps frequently occurring messages cached and prevents them from being
continually propagated, especially because of issues that might increase the
number of duplicate messages in the network.
The `first-seen` cache will store new messages and purge them after the
SeenMessagesTTL duration, even if they are seen multiple times within this
duration.
- `last-seen` - Sliding window: TTL resets each time the message is seen again.
Keeps frequently-seen messages in cache longer, preventing continued propagation.
- `first-seen` - Fixed window: TTL counts from first sighting only. Messages are
purged after the TTL regardless of how many times they're seen.
Default: `last-seen` (see [go-libp2p-pubsub](https://github.com/libp2p/go-libp2p-pubsub))

View File

@ -7,7 +7,7 @@ go 1.25
replace github.com/ipfs/kubo => ./../../..
require (
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2
github.com/ipfs/kubo v0.0.0-00010101000000-000000000000
github.com/libp2p/go-libp2p v0.46.0
github.com/multiformats/go-multiaddr v0.16.1
@ -18,7 +18,7 @@ require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/DataDog/zstd v1.5.7 // indirect
github.com/Jorropo/jsync v1.0.1 // indirect
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657 // indirect
github.com/RaduBerinde/axisds v0.1.0 // indirect
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 // indirect
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b // indirect
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
@ -34,7 +34,7 @@ require (
github.com/cockroachdb/crlib v0.0.0-20241112164430-1264a2edc35b // indirect
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble/v2 v2.1.3 // indirect
github.com/cockroachdb/pebble/v2 v2.1.4 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
@ -81,7 +81,7 @@ require (
github.com/ipfs/go-ds-flatfs v0.6.0 // indirect
github.com/ipfs/go-ds-leveldb v0.5.2 // indirect
github.com/ipfs/go-ds-measure v0.2.2 // indirect
github.com/ipfs/go-ds-pebble v0.5.8 // indirect
github.com/ipfs/go-ds-pebble v0.5.9 // indirect
github.com/ipfs/go-dsqueue v0.1.1 // indirect
github.com/ipfs/go-fs-lock v0.1.1 // indirect
github.com/ipfs/go-ipfs-cmds v0.15.0 // indirect
@ -114,7 +114,7 @@ require (
github.com/libp2p/go-doh-resolver v0.5.0 // indirect
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.36.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.37.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.15.0 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect

View File

@ -27,8 +27,8 @@ github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS
github.com/Jorropo/jsync v1.0.1 h1:6HgRolFZnsdfzRUj+ImB9og1JYOxQoReSywkHOGSaUU=
github.com/Jorropo/jsync v1.0.1/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657 h1:8XBWWQD+vFF+JqOsm16t0Kab1a7YWV8+GISVEP8AuZ8=
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y=
github.com/RaduBerinde/axisds v0.1.0 h1:YItk/RmU5nvlsv/awo2Fjx97Mfpt4JfgtEVAGPrLdz8=
github.com/RaduBerinde/axisds v0.1.0/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y=
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 h1:bsU8Tzxr/PNz75ayvCnxKZWEYdLMPDkUgticP4a4Bvk=
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54/go.mod h1:0tr7FllbE9gJkHq7CVeeDDFAFKQVy5RnCSSNBOvdqbc=
github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo=
@ -84,8 +84,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA=
github.com/cockroachdb/pebble/v2 v2.1.3 h1:irU503OnjRoJBrkZQIJvwv9c4WvpUeOJxhRApojB8D8=
github.com/cockroachdb/pebble/v2 v2.1.3/go.mod h1:B1UgWsyR+L+UvZXNgpxw+WqsUKA8VQ/bb//FXOHghB8=
github.com/cockroachdb/pebble/v2 v2.1.4 h1:j9wPgMDbkErFdAKYFGhsoCcvzcjR+6zrJ4jhKtJ6bOk=
github.com/cockroachdb/pebble/v2 v2.1.4/go.mod h1:Reo1RTniv1UjVTAu/Fv74y5i3kJ5gmVrPhO9UtFiKn8=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b h1:VXvSNzmr8hMj8XTuY0PT9Ane9qZGul/p67vGYwl9BFI=
@ -265,8 +265,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.25.0 h1:OqNqsGZPX8zh3eFMO8Lf8EHRRnSGBMqcd
github.com/ipfs-shipyard/nopfs/ipfs v0.25.0/go.mod h1:BxhUdtBgOXg1B+gAPEplkg/GpyTZY+kCMSfsJvvydqU=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c h1:mczpALnNzNhmggehO5Ehr9+Q8+NiJyKJfT4EPwi01d0=
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c/go.mod h1:Abmp1if6bMQG87/0SQPIB9fkxJnZMLCt2nQw3yUZHH0=
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2 h1:pRQYSSGnGQa921d8v0uhXg2BGzoSf9ndTWTlR7ImVoo=
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2/go.mod h1:Abmp1if6bMQG87/0SQPIB9fkxJnZMLCt2nQw3yUZHH0=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk=
@ -295,8 +295,8 @@ github.com/ipfs/go-ds-leveldb v0.5.2 h1:6nmxlQ2zbp4LCNdJVsmHfs9GP0eylfBNxpmY1csp
github.com/ipfs/go-ds-leveldb v0.5.2/go.mod h1:2fAwmcvD3WoRT72PzEekHBkQmBDhc39DJGoREiuGmYo=
github.com/ipfs/go-ds-measure v0.2.2 h1:4kwvBGbbSXNYe4ANlg7qTIYoZU6mNlqzQHdVqICkqGI=
github.com/ipfs/go-ds-measure v0.2.2/go.mod h1:b/87ak0jMgH9Ylt7oH0+XGy4P8jHx9KG09Qz+pOeTIs=
github.com/ipfs/go-ds-pebble v0.5.8 h1:NbAfKQo+m39Nka6gt8PARAyH+VoHtRInB6CFCmT+wqo=
github.com/ipfs/go-ds-pebble v0.5.8/go.mod h1:AJjJTHgads/Fn5+tuWmaDGjGEbks7Wgx82NQ/pwmEhc=
github.com/ipfs/go-ds-pebble v0.5.9 h1:D1FEuMxjbEmDADNqsyT74n9QHVAn12nv9i9Qa15AFYc=
github.com/ipfs/go-ds-pebble v0.5.9/go.mod h1:XmUBN05l6B+tMg7mpMS75ZcKW/CX01uZMhhWw85imQA=
github.com/ipfs/go-dsqueue v0.1.1 h1:6PQlHDyf9PSTN69NmwUir5+0is3tU0vRJj8zLlgK8Mc=
github.com/ipfs/go-dsqueue v0.1.1/go.mod h1:Xxg353WSwwzYn3FGSzZ+taSQII3pIZ+EJC8/oWRDM10=
github.com/ipfs/go-fs-lock v0.1.1 h1:TecsP/Uc7WqYYatasreZQiP9EGRy4ZnKoG4yXxR33nw=
@ -401,8 +401,8 @@ github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl9
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-kad-dht v0.36.0 h1:7QuXhV36+Vyj+L6A7mrYkn2sYLrbRcbjvsYDu/gXhn8=
github.com/libp2p/go-libp2p-kad-dht v0.36.0/go.mod h1:O24LxTH9Rt3I5XU8nmiA9VynS4TrTwAyj+zBJKB05vQ=
github.com/libp2p/go-libp2p-kad-dht v0.37.0 h1:V1IkFzK9taNS1UNAx260foulcBPH+watAUFjNo2qMUY=
github.com/libp2p/go-libp2p-kad-dht v0.37.0/go.mod h1:o4FPa1ea++UVAMJ1c+kyjUmj3CKm9+ZCyzQb4uutCFM=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=

View File

@ -375,6 +375,8 @@ kubo now automatically shards when directory block is bigger than 256KB, ensurin
## IPNS pubsub
Specification: [IPNS PubSub Router](https://specs.ipfs.tech/ipns/ipns-pubsub-router/)
### In Version
0.4.14 :
@ -389,13 +391,18 @@ kubo now automatically shards when directory block is bigger than 256KB, ensurin
0.11.0 :
- Can be enabled via `Ipns.UsePubsub` flag in config
0.40.0 :
- Persistent message sequence number validation to prevent message cycles
in large networks
### State
Experimental, default-disabled.
Utilizes pubsub for publishing ipns records in real time.
Utilizes pubsub for publishing IPNS records in real time.
When it is enabled:
- IPNS publishers push records to a name-specific pubsub topic,
in addition to publishing to the DHT.
- IPNS resolvers subscribe to the name-specific topic on first
@ -404,9 +411,6 @@ When it is enabled:
Both the publisher and the resolver nodes need to have the feature enabled for it to work effectively.
Note: While IPNS pubsub has been available since 0.4.14, it received major changes in 0.5.0.
Users interested in this feature should upgrade to at least 0.5.0
### How to enable
Run your daemon with the `--enable-namesys-pubsub` flag
@ -416,13 +420,12 @@ ipfs config --json Ipns.UsePubsub true
```
NOTE:
- This feature implicitly enables [ipfs pubsub](#ipfs-pubsub).
- This feature implicitly enables pubsub.
- Passing `--enable-namesys-pubsub` CLI flag overrides `Ipns.UsePubsub` config.
### Road to being a real feature
- [ ] Needs more people to use and report on how well it works
- [ ] Pubsub enabled as a real feature
## AutoRelay

View File

@ -109,7 +109,9 @@ When deploying Kubo's gateway in production, be aware of these important conside
> [!IMPORTANT]
> **Timeouts:** Configure [`Gateway.RetrievalTimeout`](config.md#gatewayretrievaltimeout)
> based on your expected content retrieval times.
> to terminate stalled transfers (resets on each data write, catches unresponsive operations),
> and [`Gateway.MaxRequestDuration`](config.md#gatewaymaxrequestduration) as a fallback
> deadline (default: 1 hour, catches cases when other timeouts are misconfigured or fail to fire).
> [!IMPORTANT]
> **Rate Limiting:** Use [`Gateway.MaxConcurrentRequests`](config.md#gatewaymaxconcurrentrequests)

17
go.mod
View File

@ -11,7 +11,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/ceramicnetwork/go-dag-jose v0.1.1
github.com/cheggaaa/pb v1.0.29
github.com/cockroachdb/pebble/v2 v2.1.3
github.com/cockroachdb/pebble/v2 v2.1.4
github.com/coreos/go-systemd/v22 v22.5.0
github.com/dustin/go-humanize v1.0.1
github.com/elgris/jsondiff v0.0.0-20160530203242-765b5c24c302
@ -21,7 +21,7 @@ require (
github.com/hashicorp/go-version v1.7.0
github.com/ipfs-shipyard/nopfs v0.0.14
github.com/ipfs-shipyard/nopfs/ipfs v0.25.0
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2
github.com/ipfs/go-block-format v0.2.3
github.com/ipfs/go-cid v0.6.0
github.com/ipfs/go-cidutil v0.1.0
@ -31,7 +31,7 @@ require (
github.com/ipfs/go-ds-flatfs v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.2
github.com/ipfs/go-ds-measure v0.2.2
github.com/ipfs/go-ds-pebble v0.5.8
github.com/ipfs/go-ds-pebble v0.5.9
github.com/ipfs/go-fs-lock v0.1.1
github.com/ipfs/go-ipfs-cmds v0.15.0
github.com/ipfs/go-ipld-cbor v0.2.1
@ -52,7 +52,7 @@ require (
github.com/libp2p/go-doh-resolver v0.5.0
github.com/libp2p/go-libp2p v0.46.0
github.com/libp2p/go-libp2p-http v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.36.0
github.com/libp2p/go-libp2p-kad-dht v0.37.0
github.com/libp2p/go-libp2p-kbucket v0.8.0
github.com/libp2p/go-libp2p-pubsub v0.15.0
github.com/libp2p/go-libp2p-pubsub-router v0.6.0
@ -99,7 +99,7 @@ require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/DataDog/zstd v1.5.7 // indirect
github.com/Jorropo/jsync v1.0.1 // indirect
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657 // indirect
github.com/RaduBerinde/axisds v0.1.0 // indirect
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 // indirect
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b // indirect
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
@ -271,3 +271,10 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.4.1 // indirect
)
// Exclude ancient +incompatible versions that confuse Dependabot.
// These pre-Go-modules versions reference packages that no longer exist.
exclude (
github.com/ipfs/go-ipfs-cmds v2.0.1+incompatible
github.com/libp2p/go-libp2p v6.0.23+incompatible
)

20
go.sum
View File

@ -46,8 +46,8 @@ github.com/Jorropo/jsync v1.0.1 h1:6HgRolFZnsdfzRUj+ImB9og1JYOxQoReSywkHOGSaUU=
github.com/Jorropo/jsync v1.0.1/go.mod h1:jCOZj3vrBCri3bSU3ErUYvevKlnbssrXeCivybS5ABQ=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657 h1:8XBWWQD+vFF+JqOsm16t0Kab1a7YWV8+GISVEP8AuZ8=
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y=
github.com/RaduBerinde/axisds v0.1.0 h1:YItk/RmU5nvlsv/awo2Fjx97Mfpt4JfgtEVAGPrLdz8=
github.com/RaduBerinde/axisds v0.1.0/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y=
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 h1:bsU8Tzxr/PNz75ayvCnxKZWEYdLMPDkUgticP4a4Bvk=
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54/go.mod h1:0tr7FllbE9gJkHq7CVeeDDFAFKQVy5RnCSSNBOvdqbc=
github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo=
@ -117,8 +117,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA=
github.com/cockroachdb/pebble/v2 v2.1.3 h1:irU503OnjRoJBrkZQIJvwv9c4WvpUeOJxhRApojB8D8=
github.com/cockroachdb/pebble/v2 v2.1.3/go.mod h1:B1UgWsyR+L+UvZXNgpxw+WqsUKA8VQ/bb//FXOHghB8=
github.com/cockroachdb/pebble/v2 v2.1.4 h1:j9wPgMDbkErFdAKYFGhsoCcvzcjR+6zrJ4jhKtJ6bOk=
github.com/cockroachdb/pebble/v2 v2.1.4/go.mod h1:Reo1RTniv1UjVTAu/Fv74y5i3kJ5gmVrPhO9UtFiKn8=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b h1:VXvSNzmr8hMj8XTuY0PT9Ane9qZGul/p67vGYwl9BFI=
@ -336,8 +336,8 @@ github.com/ipfs-shipyard/nopfs/ipfs v0.25.0 h1:OqNqsGZPX8zh3eFMO8Lf8EHRRnSGBMqcd
github.com/ipfs-shipyard/nopfs/ipfs v0.25.0/go.mod h1:BxhUdtBgOXg1B+gAPEplkg/GpyTZY+kCMSfsJvvydqU=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c h1:mczpALnNzNhmggehO5Ehr9+Q8+NiJyKJfT4EPwi01d0=
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c/go.mod h1:Abmp1if6bMQG87/0SQPIB9fkxJnZMLCt2nQw3yUZHH0=
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2 h1:pRQYSSGnGQa921d8v0uhXg2BGzoSf9ndTWTlR7ImVoo=
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2/go.mod h1:Abmp1if6bMQG87/0SQPIB9fkxJnZMLCt2nQw3yUZHH0=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk=
@ -366,8 +366,8 @@ github.com/ipfs/go-ds-leveldb v0.5.2 h1:6nmxlQ2zbp4LCNdJVsmHfs9GP0eylfBNxpmY1csp
github.com/ipfs/go-ds-leveldb v0.5.2/go.mod h1:2fAwmcvD3WoRT72PzEekHBkQmBDhc39DJGoREiuGmYo=
github.com/ipfs/go-ds-measure v0.2.2 h1:4kwvBGbbSXNYe4ANlg7qTIYoZU6mNlqzQHdVqICkqGI=
github.com/ipfs/go-ds-measure v0.2.2/go.mod h1:b/87ak0jMgH9Ylt7oH0+XGy4P8jHx9KG09Qz+pOeTIs=
github.com/ipfs/go-ds-pebble v0.5.8 h1:NbAfKQo+m39Nka6gt8PARAyH+VoHtRInB6CFCmT+wqo=
github.com/ipfs/go-ds-pebble v0.5.8/go.mod h1:AJjJTHgads/Fn5+tuWmaDGjGEbks7Wgx82NQ/pwmEhc=
github.com/ipfs/go-ds-pebble v0.5.9 h1:D1FEuMxjbEmDADNqsyT74n9QHVAn12nv9i9Qa15AFYc=
github.com/ipfs/go-ds-pebble v0.5.9/go.mod h1:XmUBN05l6B+tMg7mpMS75ZcKW/CX01uZMhhWw85imQA=
github.com/ipfs/go-dsqueue v0.1.1 h1:6PQlHDyf9PSTN69NmwUir5+0is3tU0vRJj8zLlgK8Mc=
github.com/ipfs/go-dsqueue v0.1.1/go.mod h1:Xxg353WSwwzYn3FGSzZ+taSQII3pIZ+EJC8/oWRDM10=
github.com/ipfs/go-fs-lock v0.1.1 h1:TecsP/Uc7WqYYatasreZQiP9EGRy4ZnKoG4yXxR33nw=
@ -490,8 +490,8 @@ github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qk
github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA=
github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc=
github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg=
github.com/libp2p/go-libp2p-kad-dht v0.36.0 h1:7QuXhV36+Vyj+L6A7mrYkn2sYLrbRcbjvsYDu/gXhn8=
github.com/libp2p/go-libp2p-kad-dht v0.36.0/go.mod h1:O24LxTH9Rt3I5XU8nmiA9VynS4TrTwAyj+zBJKB05vQ=
github.com/libp2p/go-libp2p-kad-dht v0.37.0 h1:V1IkFzK9taNS1UNAx260foulcBPH+watAUFjNo2qMUY=
github.com/libp2p/go-libp2p-kad-dht v0.37.0/go.mod h1:o4FPa1ea++UVAMJ1c+kyjUmj3CKm9+ZCyzQb4uutCFM=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=

View File

@ -0,0 +1,147 @@
package cli
import (
"encoding/json"
"testing"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDiagDatastore(t *testing.T) {
t.Parallel()
t.Run("diag datastore get returns error for non-existent key", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Don't start daemon - these commands require daemon to be stopped
res := node.RunIPFS("diag", "datastore", "get", "/nonexistent/key")
assert.Error(t, res.Err)
assert.Contains(t, res.Stderr.String(), "key not found")
})
t.Run("diag datastore get returns raw bytes by default", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Add some data to create a known datastore key
// We need daemon for add, then stop it
node.StartDaemon()
cid := node.IPFSAddStr("test data for diag datastore")
node.IPFS("pin", "add", cid)
node.StopDaemon()
// Test count to verify we have entries
count := node.DatastoreCount("/")
t.Logf("total datastore entries: %d", count)
assert.NotEqual(t, int64(0), count, "should have datastore entries after pinning")
})
t.Run("diag datastore get --hex returns hex dump", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Add and pin some data
node.StartDaemon()
cid := node.IPFSAddStr("test data for hex dump")
node.IPFS("pin", "add", cid)
node.StopDaemon()
// Test with existing keys in pins namespace
count := node.DatastoreCount("/pins/")
t.Logf("pins datastore entries: %d", count)
if count != 0 {
t.Log("pins datastore has entries, hex dump format tested implicitly")
}
})
t.Run("diag datastore count returns 0 for empty prefix", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
count := node.DatastoreCount("/definitely/nonexistent/prefix/")
assert.Equal(t, int64(0), count)
})
t.Run("diag datastore count returns JSON with --enc=json", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
res := node.IPFS("diag", "datastore", "count", "/pubsub/seqno/", "--enc=json")
assert.NoError(t, res.Err)
var result struct {
Prefix string `json:"prefix"`
Count int64 `json:"count"`
}
err := json.Unmarshal(res.Stdout.Bytes(), &result)
require.NoError(t, err)
assert.Equal(t, "/pubsub/seqno/", result.Prefix)
assert.Equal(t, int64(0), result.Count)
})
t.Run("diag datastore get returns JSON with --enc=json", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Test error case with JSON encoding
res := node.RunIPFS("diag", "datastore", "get", "/nonexistent", "--enc=json")
assert.Error(t, res.Err)
})
t.Run("diag datastore count counts entries correctly", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Add multiple pins to create multiple entries
node.StartDaemon()
cid1 := node.IPFSAddStr("data 1")
cid2 := node.IPFSAddStr("data 2")
cid3 := node.IPFSAddStr("data 3")
node.IPFS("pin", "add", cid1)
node.IPFS("pin", "add", cid2)
node.IPFS("pin", "add", cid3)
node.StopDaemon()
// Count should reflect the pins (plus any system entries)
count := node.DatastoreCount("/")
t.Logf("total entries after adding 3 pins: %d", count)
// Should have more than 0 entries
assert.NotEqual(t, int64(0), count)
})
t.Run("diag datastore commands work offline", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init()
// Don't start daemon - these commands require daemon to be stopped
// Count should work offline
count := node.DatastoreCount("/pubsub/seqno/")
assert.Equal(t, int64(0), count)
// Get should return error for missing key (but command should work)
res := node.RunIPFS("diag", "datastore", "get", "/nonexistent/key")
assert.Error(t, res.Err)
assert.Contains(t, res.Stderr.String(), "key not found")
})
t.Run("diag datastore commands require daemon to be stopped", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Both get and count require repo lock, which is held by the running daemon
res := node.RunIPFS("diag", "datastore", "get", "/test")
assert.Error(t, res.Err, "get should fail when daemon is running")
assert.Contains(t, res.Stderr.String(), "ipfs daemon is running")
res = node.RunIPFS("diag", "datastore", "count", "/pubsub/seqno/")
assert.Error(t, res.Err, "count should fail when daemon is running")
assert.Contains(t, res.Stderr.String(), "ipfs daemon is running")
})
}

View File

@ -58,6 +58,47 @@ func TestGatewayLimits(t *testing.T) {
assert.Contains(t, resp.Body, "Unable to retrieve content within timeout period")
})
t.Run("MaxRequestDuration", func(t *testing.T) {
t.Parallel()
// Create a node with a short max request duration
node := harness.NewT(t).NewNode().Init()
node.UpdateConfig(func(cfg *config.Config) {
// Set a short absolute deadline (500ms) for the entire request
cfg.Gateway.MaxRequestDuration = config.NewOptionalDuration(500 * time.Millisecond)
// Set retrieval timeout much longer so MaxRequestDuration fires first
cfg.Gateway.RetrievalTimeout = config.NewOptionalDuration(30 * time.Second)
})
node.StartDaemon()
defer node.StopDaemon()
// Add content that can be retrieved quickly
cid := node.IPFSAddStr("test content for max request duration")
client := node.GatewayClient()
// Fast request for local content should succeed (well within 500ms)
resp := client.Get("/ipfs/" + cid)
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "test content for max request duration", resp.Body)
// Request for non-existent content should timeout due to MaxRequestDuration
// This CID has no providers and will block during content routing
nonExistentCID := "bafkreif6lrhgz3fpiwypdk65qrqiey7svgpggruhbylrgv32l3izkqpsc4"
// Create a client with a longer timeout than MaxRequestDuration
// to ensure we receive the gateway's 504 response
clientWithTimeout := &harness.HTTPClient{
Client: &http.Client{
Timeout: 5 * time.Second,
},
BaseURL: client.BaseURL,
}
resp = clientWithTimeout.Get("/ipfs/" + nonExistentCID)
assert.Equal(t, http.StatusGatewayTimeout, resp.StatusCode, "Expected 504 when request exceeds MaxRequestDuration")
})
t.Run("MaxConcurrentRequests", func(t *testing.T) {
t.Parallel()

View File

@ -730,3 +730,28 @@ func (n *Node) APIClient() *HTTPClient {
BaseURL: n.APIURL(),
}
}
// DatastoreCount returns the count of entries matching the given prefix.
// Requires the daemon to be stopped.
func (n *Node) DatastoreCount(prefix string) int64 {
res := n.IPFS("diag", "datastore", "count", prefix)
count, _ := strconv.ParseInt(strings.TrimSpace(res.Stdout.String()), 10, 64)
return count
}
// DatastoreGet retrieves the value at the given key.
// Requires the daemon to be stopped. Returns nil if key not found.
func (n *Node) DatastoreGet(key string) []byte {
res := n.RunIPFS("diag", "datastore", "get", key)
if res.Err != nil {
return nil
}
return res.Stdout.Bytes()
}
// DatastoreHasKey checks if a key exists in the datastore.
// Requires the daemon to be stopped.
func (n *Node) DatastoreHasKey(key string) bool {
res := n.RunIPFS("diag", "datastore", "get", key)
return res.Err == nil
}

254
test/cli/ls_test.go Normal file
View File

@ -0,0 +1,254 @@
package cli
import (
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLsLongFormat(t *testing.T) {
t.Parallel()
t.Run("long format shows mode and mtime when preserved", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create a test directory structure with known permissions
testDir := filepath.Join(node.Dir, "testdata")
require.NoError(t, os.MkdirAll(testDir, 0755))
// Create files with specific permissions
file1 := filepath.Join(testDir, "readable.txt")
require.NoError(t, os.WriteFile(file1, []byte("hello"), 0644))
file2 := filepath.Join(testDir, "executable.sh")
require.NoError(t, os.WriteFile(file2, []byte("#!/bin/sh\necho hi"), 0755))
// Set a known mtime in the past (to get year format, avoiding flaky time-based tests)
oldTime := time.Date(2020, time.June, 15, 10, 30, 0, 0, time.UTC)
require.NoError(t, os.Chtimes(file1, oldTime, oldTime))
require.NoError(t, os.Chtimes(file2, oldTime, oldTime))
// Add with preserved mode and mtime
addRes := node.IPFS("add", "-r", "--preserve-mode", "--preserve-mtime", "-Q", testDir)
dirCid := addRes.Stdout.Trimmed()
// Run ls with --long flag
lsRes := node.IPFS("ls", "--long", dirCid)
output := lsRes.Stdout.String()
// Verify format: Mode Hash Size ModTime Name
lines := strings.Split(strings.TrimSpace(output), "\n")
require.Len(t, lines, 2, "expected 2 files in output")
// Check executable.sh line (should be first alphabetically)
assert.Contains(t, lines[0], "-rwxr-xr-x", "executable should have 755 permissions")
assert.Contains(t, lines[0], "Jun 15 2020", "should show mtime with year format")
assert.Contains(t, lines[0], "executable.sh", "should show filename")
// Check readable.txt line
assert.Contains(t, lines[1], "-rw-r--r--", "readable file should have 644 permissions")
assert.Contains(t, lines[1], "Jun 15 2020", "should show mtime with year format")
assert.Contains(t, lines[1], "readable.txt", "should show filename")
})
t.Run("long format shows dash for files without preserved mode or mtime", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create and add a file without --preserve-mode or --preserve-mtime
testFile := filepath.Join(node.Dir, "nopreserve.txt")
require.NoError(t, os.WriteFile(testFile, []byte("test content"), 0644))
addRes := node.IPFS("add", "-Q", testFile)
fileCid := addRes.Stdout.Trimmed()
// Create a wrapper directory to list
node.IPFS("files", "mkdir", "/testdir")
node.IPFS("files", "cp", "/ipfs/"+fileCid, "/testdir/file.txt")
statRes := node.IPFS("files", "stat", "--hash", "/testdir")
dirCid := statRes.Stdout.Trimmed()
// Run ls with --long flag
lsRes := node.IPFS("ls", "--long", dirCid)
output := lsRes.Stdout.String()
// Files without preserved mode or mtime should show "-" for both columns
// Format: "-" (mode) <CID> <size> "-" (mtime) <name>
assert.Regexp(t, `^-\s+\S+\s+\d+\s+-\s+`, output, "missing mode and mtime should both show dash")
})
t.Run("long format with headers shows correct column order", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create a simple test file
testDir := filepath.Join(node.Dir, "headertest")
require.NoError(t, os.MkdirAll(testDir, 0755))
testFile := filepath.Join(testDir, "file.txt")
require.NoError(t, os.WriteFile(testFile, []byte("hello"), 0644))
oldTime := time.Date(2020, time.January, 1, 0, 0, 0, 0, time.UTC)
require.NoError(t, os.Chtimes(testFile, oldTime, oldTime))
addRes := node.IPFS("add", "-r", "--preserve-mode", "--preserve-mtime", "-Q", testDir)
dirCid := addRes.Stdout.Trimmed()
// Run ls with --long and --headers (--size defaults to true)
lsRes := node.IPFS("ls", "--long", "--headers", dirCid)
output := lsRes.Stdout.String()
lines := strings.Split(strings.TrimSpace(output), "\n")
// First line should be headers in correct order: Mode Hash Size ModTime Name
require.GreaterOrEqual(t, len(lines), 2)
headerFields := strings.Fields(lines[0])
require.Len(t, headerFields, 5, "header should have 5 columns")
assert.Equal(t, "Mode", headerFields[0])
assert.Equal(t, "Hash", headerFields[1])
assert.Equal(t, "Size", headerFields[2])
assert.Equal(t, "ModTime", headerFields[3])
assert.Equal(t, "Name", headerFields[4])
// Data line should have matching columns
dataFields := strings.Fields(lines[1])
require.GreaterOrEqual(t, len(dataFields), 5)
assert.Regexp(t, `^-[rwx-]{9}$`, dataFields[0], "first field should be mode")
assert.Regexp(t, `^Qm`, dataFields[1], "second field should be CID")
assert.Regexp(t, `^\d+$`, dataFields[2], "third field should be size")
})
t.Run("long format with headers and size=false", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
testDir := filepath.Join(node.Dir, "headertest2")
require.NoError(t, os.MkdirAll(testDir, 0755))
testFile := filepath.Join(testDir, "file.txt")
require.NoError(t, os.WriteFile(testFile, []byte("hello"), 0644))
oldTime := time.Date(2020, time.January, 1, 0, 0, 0, 0, time.UTC)
require.NoError(t, os.Chtimes(testFile, oldTime, oldTime))
addRes := node.IPFS("add", "-r", "--preserve-mode", "--preserve-mtime", "-Q", testDir)
dirCid := addRes.Stdout.Trimmed()
// Run ls with --long --headers --size=false
lsRes := node.IPFS("ls", "--long", "--headers", "--size=false", dirCid)
output := lsRes.Stdout.String()
lines := strings.Split(strings.TrimSpace(output), "\n")
// Header should be: Mode Hash ModTime Name (no Size)
require.GreaterOrEqual(t, len(lines), 2)
headerFields := strings.Fields(lines[0])
require.Len(t, headerFields, 4, "header should have 4 columns without size")
assert.Equal(t, "Mode", headerFields[0])
assert.Equal(t, "Hash", headerFields[1])
assert.Equal(t, "ModTime", headerFields[2])
assert.Equal(t, "Name", headerFields[3])
})
t.Run("long format for directories shows trailing slash", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
// Create nested directory structure
testDir := filepath.Join(node.Dir, "dirtest")
subDir := filepath.Join(testDir, "subdir")
require.NoError(t, os.MkdirAll(subDir, 0755))
require.NoError(t, os.WriteFile(filepath.Join(subDir, "file.txt"), []byte("hi"), 0644))
addRes := node.IPFS("add", "-r", "--preserve-mode", "-Q", testDir)
dirCid := addRes.Stdout.Trimmed()
// Run ls with --long flag
lsRes := node.IPFS("ls", "--long", dirCid)
output := lsRes.Stdout.String()
// Directory should end with /
assert.Contains(t, output, "subdir/", "directory should have trailing slash")
// Directory should show 'd' in mode
assert.Contains(t, output, "drwxr-xr-x", "directory should show directory mode")
})
t.Run("long format without size flag", func(t *testing.T) {
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
testDir := filepath.Join(node.Dir, "nosizetest")
require.NoError(t, os.MkdirAll(testDir, 0755))
testFile := filepath.Join(testDir, "file.txt")
require.NoError(t, os.WriteFile(testFile, []byte("hello world"), 0644))
oldTime := time.Date(2020, time.January, 1, 0, 0, 0, 0, time.UTC)
require.NoError(t, os.Chtimes(testFile, oldTime, oldTime))
addRes := node.IPFS("add", "-r", "--preserve-mode", "--preserve-mtime", "-Q", testDir)
dirCid := addRes.Stdout.Trimmed()
// Run ls with --long but --size=false
lsRes := node.IPFS("ls", "--long", "--size=false", dirCid)
output := lsRes.Stdout.String()
// Should still have mode and mtime, but format differs (no size column)
assert.Contains(t, output, "-rw-r--r--")
assert.Contains(t, output, "Jan 01 2020")
assert.Contains(t, output, "file.txt")
})
t.Run("long format output is stable", func(t *testing.T) {
// This test ensures the output format doesn't change due to refactors
t.Parallel()
node := harness.NewT(t).NewNode().Init().StartDaemon()
defer node.StopDaemon()
testDir := filepath.Join(node.Dir, "stabletest")
require.NoError(t, os.MkdirAll(testDir, 0755))
testFile := filepath.Join(testDir, "test.txt")
require.NoError(t, os.WriteFile(testFile, []byte("stable"), 0644))
// Use a fixed time for reproducibility
fixedTime := time.Date(2020, time.December, 25, 12, 0, 0, 0, time.UTC)
require.NoError(t, os.Chtimes(testFile, fixedTime, fixedTime))
addRes := node.IPFS("add", "-r", "--preserve-mode", "--preserve-mtime", "-Q", testDir)
dirCid := addRes.Stdout.Trimmed()
// The CID should be deterministic given same content, mode, and mtime
// This is the expected CID for this specific test data
lsRes := node.IPFS("ls", "--long", dirCid)
output := strings.TrimSpace(lsRes.Stdout.String())
// Verify the format: Mode<tab>Hash<tab>Size<tab>ModTime<tab>Name
fields := strings.Fields(output)
require.GreaterOrEqual(t, len(fields), 5, "output should have at least 5 fields")
// Field 0: mode (10 chars, starts with - for regular file)
assert.Regexp(t, `^-[rwx-]{9}$`, fields[0], "mode should be Unix permission format")
// Field 1: CID (starts with Qm or bafy)
assert.Regexp(t, `^(Qm|bafy)`, fields[1], "second field should be CID")
// Field 2: size (numeric)
assert.Regexp(t, `^\d+$`, fields[2], "third field should be numeric size")
// Fields 3-4: date (e.g., "Dec 25 2020" or "Dec 25 12:00")
// The date format is "Mon DD YYYY" for old files
assert.Equal(t, "Dec", fields[3])
assert.Equal(t, "25", fields[4])
// Last field: filename
assert.Equal(t, "test.txt", fields[len(fields)-1])
})
}

403
test/cli/pubsub_test.go Normal file
View File

@ -0,0 +1,403 @@
package cli
import (
"context"
"encoding/json"
"slices"
"testing"
"time"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// waitForSubscription waits until the node has a subscription to the given topic.
func waitForSubscription(t *testing.T, node *harness.Node, topic string) {
t.Helper()
require.Eventually(t, func() bool {
res := node.RunIPFS("pubsub", "ls")
if res.Err != nil {
return false
}
return slices.Contains(res.Stdout.Lines(), topic)
}, 5*time.Second, 100*time.Millisecond, "expected subscription to topic %s", topic)
}
// waitForMessagePropagation waits for pubsub messages to propagate through the network
// and for seqno state to be persisted to the datastore.
func waitForMessagePropagation(t *testing.T) {
t.Helper()
time.Sleep(1 * time.Second)
}
// publishMessages publishes n messages from publisher to the given topic with
// a small delay between each to allow for ordered delivery.
func publishMessages(t *testing.T, publisher *harness.Node, topic string, n int) {
t.Helper()
for i := 0; i < n; i++ {
publisher.PipeStrToIPFS("msg", "pubsub", "pub", topic)
time.Sleep(50 * time.Millisecond)
}
}
// TestPubsub tests pubsub functionality and the persistent seqno validator.
//
// Pubsub has two deduplication layers:
//
// Layer 1: MessageID-based TimeCache (in-memory)
// - Controlled by Pubsub.SeenMessagesTTL config (default 120s)
// - Tested in go-libp2p-pubsub (see timecache in github.com/libp2p/go-libp2p-pubsub)
// - Only tested implicitly here via message delivery (timing-sensitive, not practical for CLI tests)
//
// Layer 2: Per-peer seqno validator (persistent in datastore)
// - Stores max seen seqno per peer at /pubsub/seqno/<peerid>
// - Tested directly below: persistence, updates, reset, survives restart
// - Validator: go-libp2p-pubsub BasicSeqnoValidator
func TestPubsub(t *testing.T) {
t.Parallel()
// enablePubsub configures a node with pubsub enabled
enablePubsub := func(n *harness.Node) {
n.SetIPFSConfig("Pubsub.Enabled", true)
n.SetIPFSConfig("Routing.Type", "none") // simplify test setup
}
t.Run("basic pub/sub message delivery", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub enabled
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
defer nodes.StopDaemons()
subscriber := nodes[0]
publisher := nodes[1]
const topic = "test-topic"
const message = "hello pubsub"
// Start subscriber in background
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Use a channel to receive the message
msgChan := make(chan string, 1)
go func() {
// Subscribe and wait for one message
res := subscriber.RunIPFS("pubsub", "sub", "--enc=json", topic)
if res.Err == nil {
// Parse JSON output to get message data
lines := res.Stdout.Lines()
if len(lines) > 0 {
var msg struct {
Data []byte `json:"data"`
}
if json.Unmarshal([]byte(lines[0]), &msg) == nil {
msgChan <- string(msg.Data)
}
}
}
}()
// Wait for subscriber to be ready
waitForSubscription(t, subscriber, topic)
// Publish message
publisher.PipeStrToIPFS(message, "pubsub", "pub", topic)
// Wait for message or timeout
select {
case received := <-msgChan:
assert.Equal(t, message, received)
case <-ctx.Done():
// Subscriber may not receive in time due to test timing - that's OK
// The main goal is to test the seqno validator state persistence
t.Log("subscriber did not receive message in time (this is acceptable)")
}
})
t.Run("seqno validator state is persisted", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node2PeerID := node2.PeerID().String()
const topic = "seqno-test"
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
// Publish multiple messages from node2 to trigger seqno validation
publishMessages(t, node2, topic, 3)
// Wait for messages to propagate and seqno to be stored
waitForMessagePropagation(t)
// Stop daemons to check datastore (diag datastore requires daemon to be stopped)
nodes.StopDaemons()
// Check that seqno state exists
count := node1.DatastoreCount("/pubsub/seqno/")
t.Logf("seqno entries count: %d", count)
// There should be at least one seqno entry (from node2)
assert.NotEqual(t, int64(0), count, "expected seqno state to be persisted")
// Verify the specific peer's key exists and test --hex output format
key := "/pubsub/seqno/" + node2PeerID
res := node1.RunIPFS("diag", "datastore", "get", "--hex", key)
if res.Err == nil {
t.Logf("seqno for peer %s:\n%s", node2PeerID, res.Stdout.String())
assert.Contains(t, res.Stdout.String(), "Hex Dump:")
} else {
// Key might not exist if messages didn't propagate - log but don't fail
t.Logf("seqno key not found for peer %s (messages may not have propagated)", node2PeerID)
}
})
t.Run("seqno updates when receiving multiple messages", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes with pubsub
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node2PeerID := node2.PeerID().String()
const topic = "seqno-update-test"
seqnoKey := "/pubsub/seqno/" + node2PeerID
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
// Send first message
node2.PipeStrToIPFS("msg1", "pubsub", "pub", topic)
time.Sleep(500 * time.Millisecond)
// Stop daemons to check seqno (diag datastore requires daemon to be stopped)
nodes.StopDaemons()
// Get seqno after first message
res1 := node1.RunIPFS("diag", "datastore", "get", seqnoKey)
var seqno1 []byte
if res1.Err == nil {
seqno1 = res1.Stdout.Bytes()
t.Logf("seqno after first message: %d bytes", len(seqno1))
} else {
t.Logf("seqno not found after first message (message may not have propagated)")
}
// Restart daemons for second message
nodes = nodes.StartDaemons().Connect()
// Resubscribe
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
// Send second message
node2.PipeStrToIPFS("msg2", "pubsub", "pub", topic)
time.Sleep(500 * time.Millisecond)
// Stop daemons to check seqno
nodes.StopDaemons()
// Get seqno after second message
res2 := node1.RunIPFS("diag", "datastore", "get", seqnoKey)
var seqno2 []byte
if res2.Err == nil {
seqno2 = res2.Stdout.Bytes()
t.Logf("seqno after second message: %d bytes", len(seqno2))
} else {
t.Logf("seqno not found after second message")
}
// If both messages were received, seqno should have been updated
// The seqno is a uint64 that should increase with each message
if len(seqno1) > 0 && len(seqno2) > 0 {
// seqno2 should be >= seqno1 (it's the max seen seqno)
// We just verify they're both non-empty and potentially different
t.Logf("seqno1: %x", seqno1)
t.Logf("seqno2: %x", seqno2)
// The seqno validator stores the max seqno seen, so seqno2 >= seqno1
// We can't do a simple byte comparison due to potential endianness
// but both should be valid uint64 values (8 bytes)
assert.Equal(t, 8, len(seqno2), "seqno should be 8 bytes (uint64)")
}
})
t.Run("pubsub reset clears seqno state", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create two connected nodes
nodes := h.NewNodes(2).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
const topic = "reset-test"
// Start subscriber and exchange messages
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
publishMessages(t, node2, topic, 3)
waitForMessagePropagation(t)
// Stop daemons to check initial count
nodes.StopDaemons()
// Verify there is state before resetting
initialCount := node1.DatastoreCount("/pubsub/seqno/")
t.Logf("initial seqno count: %d", initialCount)
// Restart node1 to run pubsub reset
node1.StartDaemon()
// Reset all seqno state (while daemon is running)
res := node1.IPFS("pubsub", "reset")
assert.NoError(t, res.Err)
t.Logf("reset output: %s", res.Stdout.String())
// Stop daemon to verify state was cleared
node1.StopDaemon()
// Verify state was cleared
finalCount := node1.DatastoreCount("/pubsub/seqno/")
t.Logf("final seqno count: %d", finalCount)
assert.Equal(t, int64(0), finalCount, "seqno state should be cleared after reset")
})
t.Run("pubsub reset with peer flag", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create three connected nodes
nodes := h.NewNodes(3).Init()
nodes.ForEachPar(enablePubsub)
nodes = nodes.StartDaemons().Connect()
node1 := nodes[0]
node2 := nodes[1]
node3 := nodes[2]
node2PeerID := node2.PeerID().String()
node3PeerID := node3.PeerID().String()
const topic = "peer-reset-test"
// Start subscriber on node1
go func() {
node1.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node1, topic)
// Publish from both node2 and node3
for range 3 {
node2.PipeStrToIPFS("msg2", "pubsub", "pub", topic)
node3.PipeStrToIPFS("msg3", "pubsub", "pub", topic)
time.Sleep(50 * time.Millisecond)
}
waitForMessagePropagation(t)
// Stop node2 and node3
node2.StopDaemon()
node3.StopDaemon()
// Reset only node2's state (while node1 daemon is running)
res := node1.IPFS("pubsub", "reset", "--peer", node2PeerID)
require.NoError(t, res.Err)
t.Logf("reset output: %s", res.Stdout.String())
// Stop node1 daemon to check datastore
node1.StopDaemon()
// Check that node2's key is gone
res = node1.RunIPFS("diag", "datastore", "get", "/pubsub/seqno/"+node2PeerID)
assert.Error(t, res.Err, "node2's seqno key should be deleted")
// Check that node3's key still exists (if it was created)
res = node1.RunIPFS("diag", "datastore", "get", "/pubsub/seqno/"+node3PeerID)
// Note: node3's key might not exist if messages didn't propagate
// So we just log the result without asserting
if res.Err == nil {
t.Logf("node3's seqno key still exists (as expected)")
} else {
t.Logf("node3's seqno key not found (messages may not have propagated)")
}
})
t.Run("seqno state survives daemon restart", func(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
// Create and start single node
node := h.NewNode().Init()
enablePubsub(node)
node.StartDaemon()
// We need another node to publish messages
node2 := h.NewNode().Init()
enablePubsub(node2)
node2.StartDaemon()
node.Connect(node2)
const topic = "restart-test"
// Start subscriber and exchange messages
go func() {
node.RunIPFS("pubsub", "sub", topic)
}()
waitForSubscription(t, node, topic)
publishMessages(t, node2, topic, 3)
waitForMessagePropagation(t)
// Stop daemons to check datastore
node.StopDaemon()
node2.StopDaemon()
// Get count before restart
beforeCount := node.DatastoreCount("/pubsub/seqno/")
t.Logf("seqno count before restart: %d", beforeCount)
// Restart node (simulate restart scenario)
node.StartDaemon()
time.Sleep(500 * time.Millisecond)
// Stop daemon to check datastore again
node.StopDaemon()
// Get count after restart
afterCount := node.DatastoreCount("/pubsub/seqno/")
t.Logf("seqno count after restart: %d", afterCount)
// Count should be the same (state persisted)
assert.Equal(t, beforeCount, afterCount, "seqno state should survive daemon restart")
})
}

View File

@ -34,7 +34,7 @@ require (
github.com/Jorropo/jsync v1.0.1 // indirect
github.com/Masterminds/semver/v3 v3.3.0 // indirect
github.com/OpenPeeDeeP/depguard/v2 v2.2.1 // indirect
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657 // indirect
github.com/RaduBerinde/axisds v0.1.0 // indirect
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 // indirect
github.com/alecthomas/go-check-sumtype v0.3.1 // indirect
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b // indirect
@ -65,7 +65,7 @@ require (
github.com/cockroachdb/crlib v0.0.0-20241112164430-1264a2edc35b // indirect
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble/v2 v2.1.3 // indirect
github.com/cockroachdb/pebble/v2 v2.1.4 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
@ -135,7 +135,7 @@ require (
github.com/huin/goupnp v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c // indirect
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-block-format v0.2.3 // indirect
github.com/ipfs/go-cid v0.6.0 // indirect
@ -183,7 +183,7 @@ require (
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p v0.46.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.36.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.37.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-record v0.3.1 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect

View File

@ -32,8 +32,8 @@ github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+
github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
github.com/OpenPeeDeeP/depguard/v2 v2.2.1 h1:vckeWVESWp6Qog7UZSARNqfu/cZqvki8zsuj3piCMx4=
github.com/OpenPeeDeeP/depguard/v2 v2.2.1/go.mod h1:q4DKzC4UcVaAvcfd41CZh0PWpGgzrVxUYBlgKNGquUo=
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657 h1:8XBWWQD+vFF+JqOsm16t0Kab1a7YWV8+GISVEP8AuZ8=
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y=
github.com/RaduBerinde/axisds v0.1.0 h1:YItk/RmU5nvlsv/awo2Fjx97Mfpt4JfgtEVAGPrLdz8=
github.com/RaduBerinde/axisds v0.1.0/go.mod h1:UHGJonU9z4YYGKJxSaC6/TNcLOBptpmM5m2Cksbnw0Y=
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 h1:bsU8Tzxr/PNz75ayvCnxKZWEYdLMPDkUgticP4a4Bvk=
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54/go.mod h1:0tr7FllbE9gJkHq7CVeeDDFAFKQVy5RnCSSNBOvdqbc=
github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo=
@ -104,8 +104,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA=
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA=
github.com/cockroachdb/pebble/v2 v2.1.3 h1:irU503OnjRoJBrkZQIJvwv9c4WvpUeOJxhRApojB8D8=
github.com/cockroachdb/pebble/v2 v2.1.3/go.mod h1:B1UgWsyR+L+UvZXNgpxw+WqsUKA8VQ/bb//FXOHghB8=
github.com/cockroachdb/pebble/v2 v2.1.4 h1:j9wPgMDbkErFdAKYFGhsoCcvzcjR+6zrJ4jhKtJ6bOk=
github.com/cockroachdb/pebble/v2 v2.1.4/go.mod h1:Reo1RTniv1UjVTAu/Fv74y5i3kJ5gmVrPhO9UtFiKn8=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/swiss v0.0.0-20251224182025-b0f6560f979b h1:VXvSNzmr8hMj8XTuY0PT9Ane9qZGul/p67vGYwl9BFI=
@ -294,8 +294,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c h1:mczpALnNzNhmggehO5Ehr9+Q8+NiJyKJfT4EPwi01d0=
github.com/ipfs/boxo v0.35.3-0.20251202220026-0842ad274a0c/go.mod h1:Abmp1if6bMQG87/0SQPIB9fkxJnZMLCt2nQw3yUZHH0=
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2 h1:pRQYSSGnGQa921d8v0uhXg2BGzoSf9ndTWTlR7ImVoo=
github.com/ipfs/boxo v0.35.3-0.20260109213916-89dc184784f2/go.mod h1:Abmp1if6bMQG87/0SQPIB9fkxJnZMLCt2nQw3yUZHH0=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.2.3 h1:mpCuDaNXJ4wrBJLrtEaGFGXkferrw5eqVvzaHhtFKQk=
@ -419,8 +419,8 @@ github.com/libp2p/go-libp2p v0.46.0 h1:0T2yvIKpZ3DVYCuPOFxPD1layhRU486pj9rSlGWYn
github.com/libp2p/go-libp2p v0.46.0/go.mod h1:TbIDnpDjBLa7isdgYpbxozIVPBTmM/7qKOJP4SFySrQ=
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-kad-dht v0.36.0 h1:7QuXhV36+Vyj+L6A7mrYkn2sYLrbRcbjvsYDu/gXhn8=
github.com/libp2p/go-libp2p-kad-dht v0.36.0/go.mod h1:O24LxTH9Rt3I5XU8nmiA9VynS4TrTwAyj+zBJKB05vQ=
github.com/libp2p/go-libp2p-kad-dht v0.37.0 h1:V1IkFzK9taNS1UNAx260foulcBPH+watAUFjNo2qMUY=
github.com/libp2p/go-libp2p-kad-dht v0.37.0/go.mod h1:o4FPa1ea++UVAMJ1c+kyjUmj3CKm9+ZCyzQb4uutCFM=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=
github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=

View File

@ -1,285 +0,0 @@
package integrationtest
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
"go.uber.org/fx"
"github.com/ipfs/boxo/bootstrap"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core"
"github.com/ipfs/kubo/core/coreapi"
libp2p2 "github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-pubsub/timecache"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/ipfs/kubo/core/mock"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)
func TestMessageSeenCacheTTL(t *testing.T) {
t.Skip("skipping PubSub seen cache TTL test due to flakiness")
if err := RunMessageSeenCacheTTLTest(t, "10s"); err != nil {
t.Fatal(err)
}
}
func mockNode(ctx context.Context, mn mocknet.Mocknet, pubsubEnabled bool, seenMessagesCacheTTL string) (*core.IpfsNode, error) {
ds := syncds.MutexWrap(datastore.NewMapDatastore())
cfg, err := config.Init(io.Discard, 2048)
if err != nil {
return nil, err
}
count := len(mn.Peers())
cfg.Addresses.Swarm = []string{
fmt.Sprintf("/ip4/18.0.%d.%d/tcp/4001", count>>16, count&0xFF),
}
cfg.Datastore = config.Datastore{}
if pubsubEnabled {
cfg.Pubsub.Enabled = config.True
var ttl *config.OptionalDuration
if len(seenMessagesCacheTTL) > 0 {
ttl = &config.OptionalDuration{}
if err = ttl.UnmarshalJSON([]byte(seenMessagesCacheTTL)); err != nil {
return nil, err
}
}
cfg.Pubsub.SeenMessagesTTL = ttl
}
return core.NewNode(ctx, &core.BuildCfg{
Online: true,
Routing: libp2p2.DHTServerOption,
Repo: &repo.Mock{
C: *cfg,
D: ds,
},
Host: mock.MockHostOption(mn),
ExtraOpts: map[string]bool{
"pubsub": pubsubEnabled,
},
})
}
func RunMessageSeenCacheTTLTest(t *testing.T, seenMessagesCacheTTL string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var bootstrapNode, consumerNode, producerNode *core.IpfsNode
var bootstrapPeerID, consumerPeerID, producerPeerID peer.ID
mn := mocknet.New()
bootstrapNode, err := mockNode(ctx, mn, false, "") // no need for PubSub configuration
if err != nil {
t.Fatal(err)
}
bootstrapPeerID = bootstrapNode.PeerHost.ID()
defer bootstrapNode.Close()
consumerNode, err = mockNode(ctx, mn, true, seenMessagesCacheTTL) // use passed seen cache TTL
if err != nil {
t.Fatal(err)
}
consumerPeerID = consumerNode.PeerHost.ID()
defer consumerNode.Close()
ttl, err := time.ParseDuration(seenMessagesCacheTTL)
if err != nil {
t.Fatal(err)
}
// Used for logging the timeline
startTime := time.Time{}
// Used for overriding the message ID
sendMsgID := ""
// Set up the pubsub message ID generation override for the producer
core.RegisterFXOptionFunc(func(info core.FXNodeInfo) ([]fx.Option, error) {
var pubsubOptions []pubsub.Option
pubsubOptions = append(
pubsubOptions,
pubsub.WithSeenMessagesTTL(ttl),
pubsub.WithMessageIdFn(func(pmsg *pubsub_pb.Message) string {
now := time.Now()
if startTime.Second() == 0 {
startTime = now
}
timeElapsed := now.Sub(startTime).Seconds()
msg := string(pmsg.Data)
from, _ := peer.IDFromBytes(pmsg.From)
var msgID string
if from == producerPeerID {
msgID = sendMsgID
t.Logf("sending [%s] with message ID [%s] at T%fs", msg, msgID, timeElapsed)
} else {
msgID = pubsub.DefaultMsgIdFn(pmsg)
}
return msgID
}),
pubsub.WithSeenMessagesStrategy(timecache.Strategy_LastSeen),
)
return append(
info.FXOptions,
fx.Provide(libp2p2.TopicDiscovery()),
fx.Decorate(libp2p2.GossipSub(pubsubOptions...)),
), nil
})
producerNode, err = mockNode(ctx, mn, false, "") // PubSub configuration comes from overrides above
if err != nil {
t.Fatal(err)
}
producerPeerID = producerNode.PeerHost.ID()
defer producerNode.Close()
t.Logf("bootstrap peer=%s, consumer peer=%s, producer peer=%s", bootstrapPeerID, consumerPeerID, producerPeerID)
producerAPI, err := coreapi.NewCoreAPI(producerNode)
if err != nil {
t.Fatal(err)
}
consumerAPI, err := coreapi.NewCoreAPI(consumerNode)
if err != nil {
t.Fatal(err)
}
err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
bis := bootstrapNode.Peerstore.PeerInfo(bootstrapNode.PeerHost.ID())
bcfg := bootstrap.BootstrapConfigWithPeers([]peer.AddrInfo{bis})
if err = producerNode.Bootstrap(bcfg); err != nil {
t.Fatal(err)
}
if err = consumerNode.Bootstrap(bcfg); err != nil {
t.Fatal(err)
}
// Set up the consumer subscription
const TopicName = "topic"
consumerSubscription, err := consumerAPI.PubSub().Subscribe(ctx, TopicName)
if err != nil {
t.Fatal(err)
}
// Utility functions defined inline to include context in closure
now := func() float64 {
return time.Since(startTime).Seconds()
}
ctr := 0
msgGen := func() string {
ctr++
return fmt.Sprintf("msg_%d", ctr)
}
produceMessage := func() string {
msgTxt := msgGen()
err = producerAPI.PubSub().Publish(ctx, TopicName, []byte(msgTxt))
if err != nil {
t.Fatal(err)
}
return msgTxt
}
consumeMessage := func(msgTxt string, shouldFind bool) {
// Set up a separate timed context for receiving messages
rxCtx, rxCancel := context.WithTimeout(context.Background(), time.Second)
defer rxCancel()
msg, err := consumerSubscription.Next(rxCtx)
if shouldFind {
if err != nil {
t.Logf("expected but did not receive [%s] at T%fs", msgTxt, now())
t.Fatal(err)
}
t.Logf("received [%s] at T%fs", string(msg.Data()), now())
if !bytes.Equal(msg.Data(), []byte(msgTxt)) {
t.Fatalf("consumed data [%s] does not match published data [%s]", string(msg.Data()), msgTxt)
}
} else {
if err == nil {
t.Logf("not expected but received [%s] at T%fs", string(msg.Data()), now())
t.Fail()
}
t.Logf("did not receive [%s] at T%fs", msgTxt, now())
}
}
const MsgID1 = "MsgID1"
const MsgID2 = "MsgID2"
const MsgID3 = "MsgID3"
// Send message 1 with the message ID we're going to duplicate
sentMsg1 := time.Now()
sendMsgID = MsgID1
msgTxt := produceMessage()
// Should find the message because it's new
consumeMessage(msgTxt, true)
// Send message 2 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should NOT find message because it got deduplicated (sent 2 times within the SeenMessagesTTL window).
consumeMessage(msgTxt, false)
// Send message 3 with a new message ID
sendMsgID = MsgID2
msgTxt = produceMessage()
// Should find the message because it's new
consumeMessage(msgTxt, true)
// Wait till just before the SeenMessagesTTL window has passed since message 1 was sent
time.Sleep(time.Until(sentMsg1.Add(ttl - 100*time.Millisecond)))
// Send message 4 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should NOT find the message because it got deduplicated (sent 3 times within the SeenMessagesTTL window). This
// time, however, the expiration for the message should also get pushed out for a whole SeenMessagesTTL window since
// the default time cache now implements a sliding window algorithm.
consumeMessage(msgTxt, false)
// Send message 5 with a duplicate message ID. This will be a second after the last attempt above since NOT finding
// a message takes a second to determine. That would put this attempt at ~1 second after the SeenMessagesTTL window
// starting at message 1 has expired.
sentMsg5 := time.Now()
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should NOT find the message, because it got deduplicated (sent 2 times since the updated SeenMessagesTTL window
// started). This time again, the expiration should get pushed out for another SeenMessagesTTL window.
consumeMessage(msgTxt, false)
// Send message 6 with a message ID that hasn't been seen within a SeenMessagesTTL window
sendMsgID = MsgID2
msgTxt = produceMessage()
// Should find the message since last read > SeenMessagesTTL, so it looks like a new message.
consumeMessage(msgTxt, true)
// Sleep for a full SeenMessagesTTL window to let cache entries time out
time.Sleep(time.Until(sentMsg5.Add(ttl + 100*time.Millisecond)))
// Send message 7 with a duplicate message ID
sendMsgID = MsgID1
msgTxt = produceMessage()
// Should find the message this time since last read > SeenMessagesTTL, so it looks like a new message.
consumeMessage(msgTxt, true)
// Send message 8 with a brand new message ID
//
// This step is not strictly necessary, but has been added for good measure.
sendMsgID = MsgID3
msgTxt = produceMessage()
// Should find the message because it's new
consumeMessage(msgTxt, true)
return nil
}