From d8e7d7836673f583707a49c073c87d96480eb9b9 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Tue, 17 Feb 2026 16:18:46 +0100 Subject: [PATCH] add provider/keystore/0 and /1 to ipfs diag command mount keystore datastores to /provider/keystore/0 and /1 so that they are included in the ipfs diag datastore command --- core/commands/diag.go | 59 ++++++++++++++++++---- core/node/provider.go | 89 ++++++++++++++++++++++++++------- test/cli/diag_datastore_test.go | 67 +++++++++++++++++++++++++ 3 files changed, 186 insertions(+), 29 deletions(-) diff --git a/core/commands/diag.go b/core/commands/diag.go index 777e9445f..cc8663c32 100644 --- a/core/commands/diag.go +++ b/core/commands/diag.go @@ -7,9 +7,11 @@ import ( "io" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/mount" "github.com/ipfs/go-datastore/query" cmds "github.com/ipfs/go-ipfs-cmds" oldcmds "github.com/ipfs/kubo/commands" + node "github.com/ipfs/kubo/core/node" fsrepo "github.com/ipfs/kubo/repo/fsrepo" ) @@ -41,7 +43,11 @@ in production workflows. The datastore format may change between versions. The daemon must not be running when calling these commands. -EXAMPLE +When the provider keystore datastores exist on disk (nodes with +Provide.DHT.SweepEnabled=true), they are automatically mounted into the +datastore view under /provider/keystore/0/ and /provider/keystore/1/. + +EXAMPLES Inspecting pubsub seqno validator state: @@ -51,6 +57,11 @@ Inspecting pubsub seqno validator state: Key: /pubsub/seqno/12D3KooW... Hex Dump: 00000000 18 81 81 c8 91 c0 ea f6 |........| + +Inspecting provider keystore (requires SweepEnabled): + + $ ipfs diag datastore count /provider/keystore/0/ + $ ipfs diag datastore count /provider/keystore/1/ `, }, Subcommands: map[string]*cmds.Command{ @@ -67,6 +78,36 @@ type diagDatastoreGetResult struct { HexDump string `json:"hex_dump,omitempty"` } +// openDiagDatastore opens the repo datastore and conditionally mounts any +// provider keystore datastores that exist on disk. It returns the composite +// datastore and a cleanup function that must be called when done. +func openDiagDatastore(env cmds.Environment) (datastore.Datastore, func(), error) { + cctx := env.(*oldcmds.Context) + repo, err := fsrepo.Open(cctx.ConfigRoot) + if err != nil { + return nil, nil, fmt.Errorf("failed to open repo: %w", err) + } + + extraMounts, extraCloser, err := node.MountKeystoreDatastores(repo) + if err != nil { + repo.Close() + return nil, nil, err + } + + closer := func() { + extraCloser() + repo.Close() + } + + if len(extraMounts) == 0 { + return repo.Datastore(), closer, nil + } + + mounts := []mount.Mount{{Prefix: datastore.NewKey("/"), Datastore: repo.Datastore()}} + mounts = append(mounts, extraMounts...) + return mount.New(mounts), closer, nil +} + var diagDatastoreGetCmd = &cmds.Command{ Status: cmds.Experimental, Helptext: cmds.HelpText{ @@ -89,16 +130,14 @@ WARNING: FOR DEBUGGING/TESTING ONLY NoRemote: true, PreRun: DaemonNotRunning, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - cctx := env.(*oldcmds.Context) - repo, err := fsrepo.Open(cctx.ConfigRoot) + ds, closer, err := openDiagDatastore(env) if err != nil { - return fmt.Errorf("failed to open repo: %w", err) + return err } - defer repo.Close() + defer closer() keyStr := req.Arguments[0] key := datastore.NewKey(keyStr) - ds := repo.Datastore() val, err := ds.Get(req.Context, key) if err != nil { @@ -156,15 +195,13 @@ WARNING: FOR DEBUGGING/TESTING ONLY NoRemote: true, PreRun: DaemonNotRunning, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - cctx := env.(*oldcmds.Context) - repo, err := fsrepo.Open(cctx.ConfigRoot) + ds, closer, err := openDiagDatastore(env) if err != nil { - return fmt.Errorf("failed to open repo: %w", err) + return err } - defer repo.Close() + defer closer() prefix := req.Arguments[0] - ds := repo.Datastore() q := query.Query{ Prefix: prefix, diff --git a/core/node/provider.go b/core/node/provider.go index ac2cac0e2..11b5c5130 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -16,6 +16,7 @@ import ( "github.com/ipfs/boxo/provider" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/mount" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" log "github.com/ipfs/go-log/v2" @@ -51,10 +52,15 @@ const ( // Datastore key used to store previous reprovide strategy. reprovideStrategyKey = "/reprovideStrategy" - // Datastore namespace prefix for provider data. - providerDatastorePrefix = "provider" - // Base directory for the provider keystore datastores. - keystoreDatastorePath = "provider-keystore" + // KeystoreDatastorePath is the base directory for the provider keystore datastores. + KeystoreDatastorePath = "provider-keystore" +) + +var ( + // Datastore namespace key for provider data. + providerDatastoreKey = datastore.NewKey("provider") + // Datastore namespace key for provider keystore data. + keystoreDatastoreKey = datastore.NewKey("keystore") ) var errAcceleratedDHTNotReady = errors.New("AcceleratedDHTClient: routing table not ready") @@ -407,6 +413,59 @@ func findRootDatastoreSpec(spec map[string]any) map[string]any { } } +// MountKeystoreDatastores opens any provider keystore datastores that exist on +// disk and returns them as mount.Mount entries ready to be combined with the +// main repo datastore. The caller must call the returned cleanup function when +// done. Returns nil mounts and a no-op closer if no keystores exist. +func MountKeystoreDatastores(repo repo.Repo) ([]mount.Mount, func(), error) { + cfg, err := repo.Config() + if err != nil { + return nil, nil, fmt.Errorf("reading repo config: %w", err) + } + + rootSpec := findRootDatastoreSpec(cfg.Datastore.Spec) + if rootSpec == nil { + return nil, func() {}, nil + } + + keystoreBasePath := filepath.Join(repo.Path(), KeystoreDatastorePath) + var mounts []mount.Mount + var closers []func() + + for _, suffix := range []string{"0", "1"} { + dir := filepath.Join(keystoreBasePath, suffix) + if _, err := os.Stat(dir); err != nil { + continue + } + ds, err := openDatastoreAt(rootSpec, dir) + if err != nil { + return nil, nil, err + } + prefix := providerDatastoreKey.Child(keystoreDatastoreKey).ChildString(suffix) + mounts = append(mounts, mount.Mount{Prefix: prefix, Datastore: ds}) + closers = append(closers, func() { ds.Close() }) + } + + closer := func() { + for _, c := range closers { + c() + } + } + return mounts, closer, nil +} + +// openDatastoreAt opens a datastore using the given spec at the specified path. +// It deep-copies the spec to avoid mutating the original. +func openDatastoreAt(rootSpec map[string]any, path string) (datastore.Batching, error) { + spec := copySpec(rootSpec) + spec["path"] = path + dsc, err := fsrepo.AnyDatastoreConfig(spec) + if err != nil { + return nil, fmt.Errorf("creating datastore config for %s: %w", path, err) + } + return dsc.Create("") +} + // copySpec deep-copies a datastore spec map so modifications (e.g., changing // the path) don't affect the original. func copySpec(spec map[string]any) map[string]any { @@ -427,9 +486,9 @@ func copySpec(spec map[string]any) map[string]any { // purgeOrphanedKeystoreData deletes all keys under /provider/keystore/ from the // shared repo datastore. These were written by older Kubo versions that stored // provider keystore data inline in the shared datastore. The new code uses -// separate filesystem datastores under /{keystoreDatastorePath}/ instead. +// separate filesystem datastores under /{KeystoreDatastorePath}/ instead. func purgeOrphanedKeystoreData(ctx context.Context, ds datastore.Batching) error { - orphanedPrefix := datastore.NewKey(providerDatastorePrefix).ChildString("keystore").String() + orphanedPrefix := providerDatastoreKey.Child(keystoreDatastoreKey).String() results, err := ds.Query(ctx, query.Query{ Prefix: orphanedPrefix, @@ -475,7 +534,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option { Repo repo.Repo } sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *keystore.ResettableKeystore, error) { - ds := namespace.Wrap(in.Repo.Datastore(), datastore.NewKey(providerDatastorePrefix)) + ds := namespace.Wrap(in.Repo.Datastore(), providerDatastoreKey) // Get repo path and config to determine datastore type repoPath := in.Repo.Path() @@ -487,8 +546,8 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option { // Find the root datastore type (levelds, pebbleds, etc.) rootSpec := findRootDatastoreSpec(repoCfg.Datastore.Spec) - // Keystore datastores live at /{keystoreDatastorePath}/ - keystoreBasePath := filepath.Join(repoPath, keystoreDatastorePath) + // Keystore datastores live at /{KeystoreDatastorePath}/ + keystoreBasePath := filepath.Join(repoPath, KeystoreDatastorePath) createDs := func(suffix string) (datastore.Batching, error) { // When no datastore spec is configured (e.g., test/mock repos), @@ -499,13 +558,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option { if err := os.MkdirAll(keystoreBasePath, 0o755); err != nil { return nil, fmt.Errorf("creating keystore base directory: %w", err) } - spec := copySpec(rootSpec) - spec["path"] = filepath.Join(keystoreBasePath, suffix) - dsc, err := fsrepo.AnyDatastoreConfig(spec) - if err != nil { - return nil, fmt.Errorf("creating keystore datastore config: %w", err) - } - return dsc.Create("") + return openDatastoreAt(rootSpec, filepath.Join(keystoreBasePath, suffix)) } destroyDs := func(suffix string) error { @@ -515,7 +568,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option { // One-time migration: purge orphaned keystore data from the shared repo // datastore. Old Kubo versions stored keystore data at /provider/keystore/ // inside the shared monolithic datastore. New code uses separate - // filesystem datastores under /{keystoreDatastorePath}/. On first + // filesystem datastores under /{KeystoreDatastorePath}/. On first // start with the new code, detect the upgrade (dir doesn't exist yet) and // delete the stale keys. if _, statErr := os.Stat(keystoreBasePath); os.IsNotExist(statErr) { @@ -524,7 +577,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option { } } - keystoreDs := namespace.Wrap(ds, datastore.NewKey("keystore")) + keystoreDs := namespace.Wrap(ds, keystoreDatastoreKey) ks, err := keystore.NewResettableKeystore(keystoreDs, keystore.WithDatastoreFactory(createDs, destroyDs), keystore.KeystoreOption( diff --git a/test/cli/diag_datastore_test.go b/test/cli/diag_datastore_test.go index 2a69f60cc..a8e950da4 100644 --- a/test/cli/diag_datastore_test.go +++ b/test/cli/diag_datastore_test.go @@ -2,6 +2,8 @@ package cli import ( "encoding/json" + "os" + "path/filepath" "testing" "github.com/ipfs/kubo/test/cli/harness" @@ -144,4 +146,69 @@ func TestDiagDatastore(t *testing.T) { assert.Error(t, res.Err, "count should fail when daemon is running") assert.Contains(t, res.Stderr.String(), "ipfs daemon is running") }) + + t.Run("provider keystore datastores are visible in unified view", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.SetIPFSConfig("Provide.DHT.SweepEnabled", true) + node.SetIPFSConfig("Provide.Enabled", true) + + // Start daemon to create the provider-keystore datastores, then add data + node.StartDaemon() + cid := node.IPFSAddStr("data for provider keystore test") + node.IPFS("pin", "add", cid) + node.StopDaemon() + + // Verify the provider-keystore directory was created + keystorePath := filepath.Join(node.Dir, "provider-keystore") + _, err := os.Stat(keystorePath) + require.NoError(t, err, "provider-keystore directory should exist after sweep-enabled daemon ran") + + // Count entries in each keystore namespace via the unified view + for _, prefix := range []string{"/provider/keystore/0/", "/provider/keystore/1/"} { + res := node.IPFS("diag", "datastore", "count", prefix) + assert.NoError(t, res.Err) + t.Logf("count %s: %s", prefix, res.Stdout.String()) + } + + // The total count under /provider/keystore/ should include entries + // from both keystore instances (0 and 1) + count := node.DatastoreCount("/provider/keystore/") + t.Logf("total /provider/keystore/ entries: %d", count) + assert.Greater(t, count, int64(0), "should have provider keystore entries") + }) + + t.Run("provider keystore count JSON output", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + node.SetIPFSConfig("Provide.DHT.SweepEnabled", true) + node.SetIPFSConfig("Provide.Enabled", true) + + node.StartDaemon() + node.StopDaemon() + + res := node.IPFS("diag", "datastore", "count", "/provider/keystore/0/", "--enc=json") + assert.NoError(t, res.Err) + + var result struct { + Prefix string `json:"prefix"` + Count int64 `json:"count"` + } + err := json.Unmarshal(res.Stdout.Bytes(), &result) + require.NoError(t, err) + assert.Equal(t, "/provider/keystore/0/", result.Prefix) + assert.GreaterOrEqual(t, result.Count, int64(0), "count should be non-negative") + }) + + t.Run("works without provider keystore", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + + // No sweep enabled, no provider-keystore dirs — should still work fine + count := node.DatastoreCount("/provider/keystore/0/") + assert.Zero(t, count) + + count = node.DatastoreCount("/") + assert.Greater(t, count, int64(0)) + }) }