fix(provider): purge keystore datastore after reset

This commit is contained in:
guillaumemichel 2026-02-13 15:34:10 +01:00
parent 8eab2fcf5d
commit 442a44d20d
No known key found for this signature in database
GPG Key ID: 612745DB2E6D0E15
8 changed files with 173 additions and 15 deletions

View File

@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/ipfs/boxo/blockstore"
@ -19,6 +21,7 @@ import (
log "github.com/ipfs/go-log/v2"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/kubo/repo/fsrepo"
irouting "github.com/ipfs/kubo/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/amino"
@ -50,8 +53,8 @@ const (
// Datastore namespace prefix for provider data.
providerDatastorePrefix = "provider"
// Datastore path for the provider keystore.
keystoreDatastorePath = "keystore"
// Base directory for the provider keystore datastores.
keystoreDatastorePath = "provider-keystore"
)
var errAcceleratedDHTNotReady = errors.New("AcceleratedDHTClient: routing table not ready")
@ -369,6 +372,58 @@ type addrsFilter interface {
FilteredAddrs() []ma.Multiaddr
}
// findRootDatastoreSpec extracts the leaf datastore spec for the root ("/")
// mount from the repo's Datastore.Spec config. It unwraps mount (picks the "/"
// mountpoint), measure, and log wrappers to find the actual backend spec
// (e.g., levelds, pebbleds).
func findRootDatastoreSpec(spec map[string]any) map[string]any {
if spec == nil {
return nil
}
switch spec["type"] {
case "mount":
mounts, ok := spec["mounts"].([]any)
if !ok {
return spec
}
for _, m := range mounts {
mount, ok := m.(map[string]any)
if !ok {
continue
}
if mount["mountpoint"] == "/" {
return findRootDatastoreSpec(mount)
}
}
// No root mount found, return as-is
return spec
case "measure", "log":
if child, ok := spec["child"].(map[string]any); ok {
return findRootDatastoreSpec(child)
}
return spec
default:
return spec
}
}
// copySpec deep-copies a datastore spec map so modifications (e.g., changing
// the path) don't affect the original.
func copySpec(spec map[string]any) map[string]any {
if spec == nil {
return nil
}
cp := make(map[string]any, len(spec))
for k, v := range spec {
if m, ok := v.(map[string]any); ok {
cp[k] = copySpec(m)
} else {
cp[k] = v
}
}
return cp
}
func SweepingProviderOpt(cfg *config.Config) fx.Option {
reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval)
type providerInput struct {
@ -378,10 +433,40 @@ func SweepingProviderOpt(cfg *config.Config) fx.Option {
}
sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *keystore.ResettableKeystore, error) {
ds := namespace.Wrap(in.Repo.Datastore(), datastore.NewKey(providerDatastorePrefix))
ks, err := keystore.NewResettableKeystore(ds,
keystore.WithPrefixBits(16),
keystore.WithDatastorePath(keystoreDatastorePath),
keystore.WithBatchSize(int(cfg.Provide.DHT.KeystoreBatchSize.WithDefault(config.DefaultProvideDHTKeystoreBatchSize))),
// Get repo path and config to determine datastore type
repoPath := in.Repo.Path()
repoCfg, err := in.Repo.Config()
if err != nil {
return nil, nil, fmt.Errorf("getting repo config: %w", err)
}
// Find the root datastore type (levelds, pebbleds, etc.)
rootSpec := findRootDatastoreSpec(repoCfg.Datastore.Spec)
// Keystore datastores live at <repo>/provider-keystore/<suffix>
keystoreBasePath := filepath.Join(repoPath, keystoreDatastorePath)
createDs := func(suffix string) (datastore.Batching, error) {
spec := copySpec(rootSpec)
spec["path"] = filepath.Join(keystoreBasePath, suffix)
dsc, err := fsrepo.AnyDatastoreConfig(spec)
if err != nil {
return nil, fmt.Errorf("creating keystore datastore config: %w", err)
}
return dsc.Create("")
}
destroyDs := func(suffix string) error {
return os.RemoveAll(filepath.Join(keystoreBasePath, suffix))
}
ks, err := keystore.NewResettableKeystore(
keystore.WithDatastoreFactory(createDs, destroyDs),
keystore.KeystoreOption(
keystore.WithPrefixBits(16),
keystore.WithBatchSize(int(cfg.Provide.DHT.KeystoreBatchSize.WithDefault(config.DefaultProvideDHTKeystoreBatchSize))),
),
)
if err != nil {
return nil, nil, err

View File

@ -116,7 +116,7 @@ require (
github.com/libp2p/go-doh-resolver v0.5.0 // indirect
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.15.0 // indirect
github.com/libp2p/go-libp2p-pubsub-router v0.6.0 // indirect

View File

@ -405,8 +405,8 @@ github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl9
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883 h1:cEbqqo3yrRk/K0sfro5FIo5udSwNH4Y1N+8MFfp7bz0=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883/go.mod h1:aZuF2qipKhprKkt8Xw3lGJlysEjpZXDIUyGdm1/KGA8=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4 h1:CSdNKDR+OIl+vh1KMzNKcgKV/yxbElIcrbFAcnfDhG0=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4/go.mod h1:aZuF2qipKhprKkt8Xw3lGJlysEjpZXDIUyGdm1/KGA8=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=

2
go.mod
View File

@ -52,7 +52,7 @@ require (
github.com/libp2p/go-doh-resolver v0.5.0
github.com/libp2p/go-libp2p v0.47.0
github.com/libp2p/go-libp2p-http v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4
github.com/libp2p/go-libp2p-kbucket v0.8.0
github.com/libp2p/go-libp2p-pubsub v0.15.0
github.com/libp2p/go-libp2p-pubsub-router v0.6.0

4
go.sum
View File

@ -493,8 +493,8 @@ github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qk
github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA=
github.com/libp2p/go-libp2p-http v0.5.0 h1:+x0AbLaUuLBArHubbbNRTsgWz0RjNTy6DJLOxQ3/QBc=
github.com/libp2p/go-libp2p-http v0.5.0/go.mod h1:glh87nZ35XCQyFsdzZps6+F4HYI6DctVFY5u1fehwSg=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883 h1:cEbqqo3yrRk/K0sfro5FIo5udSwNH4Y1N+8MFfp7bz0=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883/go.mod h1:aZuF2qipKhprKkt8Xw3lGJlysEjpZXDIUyGdm1/KGA8=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4 h1:CSdNKDR+OIl+vh1KMzNKcgKV/yxbElIcrbFAcnfDhG0=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4/go.mod h1:aZuF2qipKhprKkt8Xw3lGJlysEjpZXDIUyGdm1/KGA8=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=

View File

@ -6,6 +6,8 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
@ -842,3 +844,74 @@ func TestHTTPOnlyProviderWithSweepEnabled(t *testing.T) {
assert.Contains(t, statRes.Stdout.String(), "TotalReprovides:",
"should show legacy provider stats")
}
// TestProviderKeystoreDatastoreCompaction verifies that the SweepingProvider's
// keystore uses a datastore factory that creates separate physical datastores
// and reclaims disk space by deleting old datastores after each reset cycle.
//
// The keystore uses two alternating namespaces ("0" and "1") plus a "meta"
// namespace. The lifecycle is:
// 1. First start: namespace "0" is created as the initial active datastore
// 2. First reset (keystore sync at startup): "1" is created, data is written,
// namespaces swap, "0" is destroyed from disk via os.RemoveAll
// 3. Restart: "1" and "meta" survive on disk
// 4. Second reset: "0" is recreated, namespaces swap, "1" is destroyed
func TestProviderKeystoreDatastorePurge(t *testing.T) {
t.Parallel()
h := harness.NewT(t)
node := h.NewNode().Init()
node.SetIPFSConfig("Provide.DHT.SweepEnabled", true)
node.SetIPFSConfig("Provide.Enabled", true)
node.SetIPFSConfig("Bootstrap", []string{})
// Add content offline so the keystore has something to sync on startup.
for i := range 5 {
node.IPFSAddStr(fmt.Sprintf("keystore-compaction-test-%d", i))
}
keystoreBase := filepath.Join(node.Dir, "provider-keystore")
ns0 := filepath.Join(keystoreBase, "0")
ns1 := filepath.Join(keystoreBase, "1")
meta := filepath.Join(keystoreBase, "meta")
// Directory should not exist before starting the daemon.
_, err := os.Stat(keystoreBase)
require.True(t, os.IsNotExist(err), "provider-keystore should not exist before daemon start")
// --- First start: triggers keystore sync (ResetCids) ---
// Init creates "0", then reset swaps to "1" and destroys "0".
node.StartDaemon()
require.Eventually(t, func() bool {
return dirExists(ns1) && !dirExists(ns0)
}, 30*time.Second, 200*time.Millisecond,
"after first reset: ns1 should exist, ns0 should be destroyed")
assert.True(t, dirExists(meta), "meta should exist after first reset")
// --- Restart: triggers a second keystore sync (ResetCids) ---
// Reset swaps back to "0" and destroys "1".
node.StopDaemon()
// Between restarts: ns1 and meta survive on disk, ns0 does not.
assert.True(t, dirExists(ns1), "ns1 should survive shutdown")
assert.True(t, dirExists(meta), "meta should survive shutdown")
assert.False(t, dirExists(ns0), "ns0 should not reappear between restarts")
node.StartDaemon()
require.Eventually(t, func() bool {
return dirExists(ns0) && !dirExists(ns1)
}, 30*time.Second, 200*time.Millisecond,
"after second reset: ns0 should exist, ns1 should be destroyed")
assert.True(t, dirExists(meta), "meta should still exist after second reset")
node.StopDaemon()
}
func dirExists(path string) bool {
info, err := os.Stat(path)
return err == nil && info.IsDir()
}

View File

@ -183,7 +183,7 @@ require (
github.com/libp2p/go-flow-metrics v0.3.0 // indirect
github.com/libp2p/go-libp2p v0.47.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4 // indirect
github.com/libp2p/go-libp2p-kbucket v0.8.0 // indirect
github.com/libp2p/go-libp2p-record v0.3.1 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.5 // indirect

View File

@ -421,8 +421,8 @@ github.com/libp2p/go-libp2p v0.47.0 h1:qQpBjSCWNQFF0hjBbKirMXE9RHLtSuzTDkTfr1rw0
github.com/libp2p/go-libp2p v0.47.0/go.mod h1:s8HPh7mMV933OtXzONaGFseCg/BE//m1V34p3x4EUOY=
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883 h1:cEbqqo3yrRk/K0sfro5FIo5udSwNH4Y1N+8MFfp7bz0=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260211161343-a9412b283883/go.mod h1:aZuF2qipKhprKkt8Xw3lGJlysEjpZXDIUyGdm1/KGA8=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4 h1:CSdNKDR+OIl+vh1KMzNKcgKV/yxbElIcrbFAcnfDhG0=
github.com/libp2p/go-libp2p-kad-dht v0.37.2-0.20260212142733-97ce04b37df4/go.mod h1:aZuF2qipKhprKkt8Xw3lGJlysEjpZXDIUyGdm1/KGA8=
github.com/libp2p/go-libp2p-kbucket v0.8.0 h1:QAK7RzKJpYe+EuSEATAaaHYMYLkPDGC18m9jxPLnU8s=
github.com/libp2p/go-libp2p-kbucket v0.8.0/go.mod h1:JMlxqcEyKwO6ox716eyC0hmiduSWZZl6JY93mGaaqc4=
github.com/libp2p/go-libp2p-record v0.3.1 h1:cly48Xi5GjNw5Wq+7gmjfBiG9HCzQVkiZOUZ8kUl+Fg=