mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 10:27:46 +08:00
add provider/keystore/0 and /1 to ipfs diag command
mount keystore datastores to /provider/keystore/0 and /1 so that they are included in the ipfs diag datastore command
This commit is contained in:
parent
86d814bfd3
commit
d8e7d78366
@ -7,9 +7,11 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/mount"
|
||||
"github.com/ipfs/go-datastore/query"
|
||||
cmds "github.com/ipfs/go-ipfs-cmds"
|
||||
oldcmds "github.com/ipfs/kubo/commands"
|
||||
node "github.com/ipfs/kubo/core/node"
|
||||
fsrepo "github.com/ipfs/kubo/repo/fsrepo"
|
||||
)
|
||||
|
||||
@ -41,7 +43,11 @@ in production workflows. The datastore format may change between versions.
|
||||
|
||||
The daemon must not be running when calling these commands.
|
||||
|
||||
EXAMPLE
|
||||
When the provider keystore datastores exist on disk (nodes with
|
||||
Provide.DHT.SweepEnabled=true), they are automatically mounted into the
|
||||
datastore view under /provider/keystore/0/ and /provider/keystore/1/.
|
||||
|
||||
EXAMPLES
|
||||
|
||||
Inspecting pubsub seqno validator state:
|
||||
|
||||
@ -51,6 +57,11 @@ Inspecting pubsub seqno validator state:
|
||||
Key: /pubsub/seqno/12D3KooW...
|
||||
Hex Dump:
|
||||
00000000 18 81 81 c8 91 c0 ea f6 |........|
|
||||
|
||||
Inspecting provider keystore (requires SweepEnabled):
|
||||
|
||||
$ ipfs diag datastore count /provider/keystore/0/
|
||||
$ ipfs diag datastore count /provider/keystore/1/
|
||||
`,
|
||||
},
|
||||
Subcommands: map[string]*cmds.Command{
|
||||
@ -67,6 +78,36 @@ type diagDatastoreGetResult struct {
|
||||
HexDump string `json:"hex_dump,omitempty"`
|
||||
}
|
||||
|
||||
// openDiagDatastore opens the repo datastore and conditionally mounts any
|
||||
// provider keystore datastores that exist on disk. It returns the composite
|
||||
// datastore and a cleanup function that must be called when done.
|
||||
func openDiagDatastore(env cmds.Environment) (datastore.Datastore, func(), error) {
|
||||
cctx := env.(*oldcmds.Context)
|
||||
repo, err := fsrepo.Open(cctx.ConfigRoot)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to open repo: %w", err)
|
||||
}
|
||||
|
||||
extraMounts, extraCloser, err := node.MountKeystoreDatastores(repo)
|
||||
if err != nil {
|
||||
repo.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
closer := func() {
|
||||
extraCloser()
|
||||
repo.Close()
|
||||
}
|
||||
|
||||
if len(extraMounts) == 0 {
|
||||
return repo.Datastore(), closer, nil
|
||||
}
|
||||
|
||||
mounts := []mount.Mount{{Prefix: datastore.NewKey("/"), Datastore: repo.Datastore()}}
|
||||
mounts = append(mounts, extraMounts...)
|
||||
return mount.New(mounts), closer, nil
|
||||
}
|
||||
|
||||
var diagDatastoreGetCmd = &cmds.Command{
|
||||
Status: cmds.Experimental,
|
||||
Helptext: cmds.HelpText{
|
||||
@ -89,16 +130,14 @@ WARNING: FOR DEBUGGING/TESTING ONLY
|
||||
NoRemote: true,
|
||||
PreRun: DaemonNotRunning,
|
||||
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
cctx := env.(*oldcmds.Context)
|
||||
repo, err := fsrepo.Open(cctx.ConfigRoot)
|
||||
ds, closer, err := openDiagDatastore(env)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open repo: %w", err)
|
||||
return err
|
||||
}
|
||||
defer repo.Close()
|
||||
defer closer()
|
||||
|
||||
keyStr := req.Arguments[0]
|
||||
key := datastore.NewKey(keyStr)
|
||||
ds := repo.Datastore()
|
||||
|
||||
val, err := ds.Get(req.Context, key)
|
||||
if err != nil {
|
||||
@ -156,15 +195,13 @@ WARNING: FOR DEBUGGING/TESTING ONLY
|
||||
NoRemote: true,
|
||||
PreRun: DaemonNotRunning,
|
||||
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
|
||||
cctx := env.(*oldcmds.Context)
|
||||
repo, err := fsrepo.Open(cctx.ConfigRoot)
|
||||
ds, closer, err := openDiagDatastore(env)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open repo: %w", err)
|
||||
return err
|
||||
}
|
||||
defer repo.Close()
|
||||
defer closer()
|
||||
|
||||
prefix := req.Arguments[0]
|
||||
ds := repo.Datastore()
|
||||
|
||||
q := query.Query{
|
||||
Prefix: prefix,
|
||||
|
||||
@ -16,6 +16,7 @@ import (
|
||||
"github.com/ipfs/boxo/provider"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/mount"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
"github.com/ipfs/go-datastore/query"
|
||||
log "github.com/ipfs/go-log/v2"
|
||||
@ -51,10 +52,15 @@ const (
|
||||
// Datastore key used to store previous reprovide strategy.
|
||||
reprovideStrategyKey = "/reprovideStrategy"
|
||||
|
||||
// Datastore namespace prefix for provider data.
|
||||
providerDatastorePrefix = "provider"
|
||||
// Base directory for the provider keystore datastores.
|
||||
keystoreDatastorePath = "provider-keystore"
|
||||
// KeystoreDatastorePath is the base directory for the provider keystore datastores.
|
||||
KeystoreDatastorePath = "provider-keystore"
|
||||
)
|
||||
|
||||
var (
|
||||
// Datastore namespace key for provider data.
|
||||
providerDatastoreKey = datastore.NewKey("provider")
|
||||
// Datastore namespace key for provider keystore data.
|
||||
keystoreDatastoreKey = datastore.NewKey("keystore")
|
||||
)
|
||||
|
||||
var errAcceleratedDHTNotReady = errors.New("AcceleratedDHTClient: routing table not ready")
|
||||
@ -407,6 +413,59 @@ func findRootDatastoreSpec(spec map[string]any) map[string]any {
|
||||
}
|
||||
}
|
||||
|
||||
// MountKeystoreDatastores opens any provider keystore datastores that exist on
|
||||
// disk and returns them as mount.Mount entries ready to be combined with the
|
||||
// main repo datastore. The caller must call the returned cleanup function when
|
||||
// done. Returns nil mounts and a no-op closer if no keystores exist.
|
||||
func MountKeystoreDatastores(repo repo.Repo) ([]mount.Mount, func(), error) {
|
||||
cfg, err := repo.Config()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("reading repo config: %w", err)
|
||||
}
|
||||
|
||||
rootSpec := findRootDatastoreSpec(cfg.Datastore.Spec)
|
||||
if rootSpec == nil {
|
||||
return nil, func() {}, nil
|
||||
}
|
||||
|
||||
keystoreBasePath := filepath.Join(repo.Path(), KeystoreDatastorePath)
|
||||
var mounts []mount.Mount
|
||||
var closers []func()
|
||||
|
||||
for _, suffix := range []string{"0", "1"} {
|
||||
dir := filepath.Join(keystoreBasePath, suffix)
|
||||
if _, err := os.Stat(dir); err != nil {
|
||||
continue
|
||||
}
|
||||
ds, err := openDatastoreAt(rootSpec, dir)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
prefix := providerDatastoreKey.Child(keystoreDatastoreKey).ChildString(suffix)
|
||||
mounts = append(mounts, mount.Mount{Prefix: prefix, Datastore: ds})
|
||||
closers = append(closers, func() { ds.Close() })
|
||||
}
|
||||
|
||||
closer := func() {
|
||||
for _, c := range closers {
|
||||
c()
|
||||
}
|
||||
}
|
||||
return mounts, closer, nil
|
||||
}
|
||||
|
||||
// openDatastoreAt opens a datastore using the given spec at the specified path.
|
||||
// It deep-copies the spec to avoid mutating the original.
|
||||
func openDatastoreAt(rootSpec map[string]any, path string) (datastore.Batching, error) {
|
||||
spec := copySpec(rootSpec)
|
||||
spec["path"] = path
|
||||
dsc, err := fsrepo.AnyDatastoreConfig(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating datastore config for %s: %w", path, err)
|
||||
}
|
||||
return dsc.Create("")
|
||||
}
|
||||
|
||||
// copySpec deep-copies a datastore spec map so modifications (e.g., changing
|
||||
// the path) don't affect the original.
|
||||
func copySpec(spec map[string]any) map[string]any {
|
||||
@ -427,9 +486,9 @@ func copySpec(spec map[string]any) map[string]any {
|
||||
// purgeOrphanedKeystoreData deletes all keys under /provider/keystore/ from the
|
||||
// shared repo datastore. These were written by older Kubo versions that stored
|
||||
// provider keystore data inline in the shared datastore. The new code uses
|
||||
// separate filesystem datastores under <repo>/{keystoreDatastorePath}/ instead.
|
||||
// separate filesystem datastores under <repo>/{KeystoreDatastorePath}/ instead.
|
||||
func purgeOrphanedKeystoreData(ctx context.Context, ds datastore.Batching) error {
|
||||
orphanedPrefix := datastore.NewKey(providerDatastorePrefix).ChildString("keystore").String()
|
||||
orphanedPrefix := providerDatastoreKey.Child(keystoreDatastoreKey).String()
|
||||
|
||||
results, err := ds.Query(ctx, query.Query{
|
||||
Prefix: orphanedPrefix,
|
||||
@ -475,7 +534,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
|
||||
Repo repo.Repo
|
||||
}
|
||||
sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *keystore.ResettableKeystore, error) {
|
||||
ds := namespace.Wrap(in.Repo.Datastore(), datastore.NewKey(providerDatastorePrefix))
|
||||
ds := namespace.Wrap(in.Repo.Datastore(), providerDatastoreKey)
|
||||
|
||||
// Get repo path and config to determine datastore type
|
||||
repoPath := in.Repo.Path()
|
||||
@ -487,8 +546,8 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
|
||||
// Find the root datastore type (levelds, pebbleds, etc.)
|
||||
rootSpec := findRootDatastoreSpec(repoCfg.Datastore.Spec)
|
||||
|
||||
// Keystore datastores live at <repo>/{keystoreDatastorePath}/<suffix>
|
||||
keystoreBasePath := filepath.Join(repoPath, keystoreDatastorePath)
|
||||
// Keystore datastores live at <repo>/{KeystoreDatastorePath}/<suffix>
|
||||
keystoreBasePath := filepath.Join(repoPath, KeystoreDatastorePath)
|
||||
|
||||
createDs := func(suffix string) (datastore.Batching, error) {
|
||||
// When no datastore spec is configured (e.g., test/mock repos),
|
||||
@ -499,13 +558,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
|
||||
if err := os.MkdirAll(keystoreBasePath, 0o755); err != nil {
|
||||
return nil, fmt.Errorf("creating keystore base directory: %w", err)
|
||||
}
|
||||
spec := copySpec(rootSpec)
|
||||
spec["path"] = filepath.Join(keystoreBasePath, suffix)
|
||||
dsc, err := fsrepo.AnyDatastoreConfig(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating keystore datastore config: %w", err)
|
||||
}
|
||||
return dsc.Create("")
|
||||
return openDatastoreAt(rootSpec, filepath.Join(keystoreBasePath, suffix))
|
||||
}
|
||||
|
||||
destroyDs := func(suffix string) error {
|
||||
@ -515,7 +568,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
|
||||
// One-time migration: purge orphaned keystore data from the shared repo
|
||||
// datastore. Old Kubo versions stored keystore data at /provider/keystore/
|
||||
// inside the shared monolithic datastore. New code uses separate
|
||||
// filesystem datastores under <repo>/{keystoreDatastorePath}/. On first
|
||||
// filesystem datastores under <repo>/{KeystoreDatastorePath}/. On first
|
||||
// start with the new code, detect the upgrade (dir doesn't exist yet) and
|
||||
// delete the stale keys.
|
||||
if _, statErr := os.Stat(keystoreBasePath); os.IsNotExist(statErr) {
|
||||
@ -524,7 +577,7 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
|
||||
}
|
||||
}
|
||||
|
||||
keystoreDs := namespace.Wrap(ds, datastore.NewKey("keystore"))
|
||||
keystoreDs := namespace.Wrap(ds, keystoreDatastoreKey)
|
||||
ks, err := keystore.NewResettableKeystore(keystoreDs,
|
||||
keystore.WithDatastoreFactory(createDs, destroyDs),
|
||||
keystore.KeystoreOption(
|
||||
|
||||
@ -2,6 +2,8 @@ package cli
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/ipfs/kubo/test/cli/harness"
|
||||
@ -144,4 +146,69 @@ func TestDiagDatastore(t *testing.T) {
|
||||
assert.Error(t, res.Err, "count should fail when daemon is running")
|
||||
assert.Contains(t, res.Stderr.String(), "ipfs daemon is running")
|
||||
})
|
||||
|
||||
t.Run("provider keystore datastores are visible in unified view", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init()
|
||||
node.SetIPFSConfig("Provide.DHT.SweepEnabled", true)
|
||||
node.SetIPFSConfig("Provide.Enabled", true)
|
||||
|
||||
// Start daemon to create the provider-keystore datastores, then add data
|
||||
node.StartDaemon()
|
||||
cid := node.IPFSAddStr("data for provider keystore test")
|
||||
node.IPFS("pin", "add", cid)
|
||||
node.StopDaemon()
|
||||
|
||||
// Verify the provider-keystore directory was created
|
||||
keystorePath := filepath.Join(node.Dir, "provider-keystore")
|
||||
_, err := os.Stat(keystorePath)
|
||||
require.NoError(t, err, "provider-keystore directory should exist after sweep-enabled daemon ran")
|
||||
|
||||
// Count entries in each keystore namespace via the unified view
|
||||
for _, prefix := range []string{"/provider/keystore/0/", "/provider/keystore/1/"} {
|
||||
res := node.IPFS("diag", "datastore", "count", prefix)
|
||||
assert.NoError(t, res.Err)
|
||||
t.Logf("count %s: %s", prefix, res.Stdout.String())
|
||||
}
|
||||
|
||||
// The total count under /provider/keystore/ should include entries
|
||||
// from both keystore instances (0 and 1)
|
||||
count := node.DatastoreCount("/provider/keystore/")
|
||||
t.Logf("total /provider/keystore/ entries: %d", count)
|
||||
assert.Greater(t, count, int64(0), "should have provider keystore entries")
|
||||
})
|
||||
|
||||
t.Run("provider keystore count JSON output", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init()
|
||||
node.SetIPFSConfig("Provide.DHT.SweepEnabled", true)
|
||||
node.SetIPFSConfig("Provide.Enabled", true)
|
||||
|
||||
node.StartDaemon()
|
||||
node.StopDaemon()
|
||||
|
||||
res := node.IPFS("diag", "datastore", "count", "/provider/keystore/0/", "--enc=json")
|
||||
assert.NoError(t, res.Err)
|
||||
|
||||
var result struct {
|
||||
Prefix string `json:"prefix"`
|
||||
Count int64 `json:"count"`
|
||||
}
|
||||
err := json.Unmarshal(res.Stdout.Bytes(), &result)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "/provider/keystore/0/", result.Prefix)
|
||||
assert.GreaterOrEqual(t, result.Count, int64(0), "count should be non-negative")
|
||||
})
|
||||
|
||||
t.Run("works without provider keystore", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
node := harness.NewT(t).NewNode().Init()
|
||||
|
||||
// No sweep enabled, no provider-keystore dirs — should still work fine
|
||||
count := node.DatastoreCount("/provider/keystore/0/")
|
||||
assert.Zero(t, count)
|
||||
|
||||
count = node.DatastoreCount("/")
|
||||
assert.Greater(t, count, int64(0))
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user