From b540fba1af9605acb04545aebb97715278649b10 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Mon, 1 Sep 2025 11:58:37 +0200 Subject: [PATCH] improved keystore gc process --- core/node/provider.go | 76 ++++++++++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 19 deletions(-) diff --git a/core/node/provider.go b/core/node/provider.go index 3164d736a..1da077718 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -296,6 +296,7 @@ type addrsFilter interface { } func SweepingProvider(cfg *config.Config) fx.Option { + reprovideInterval := cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval) type providerInput struct { fx.In DHT routing.Routing `name:"dhtc"` @@ -305,7 +306,6 @@ func SweepingProvider(cfg *config.Config) fx.Option { keyStore, err := rds.NewKeyStore(in.Repo.Datastore(), rds.WithPrefixBits(10), rds.WithDatastorePrefix("/reprovider/keystore"), - rds.WithGCInterval(cfg.Reprovider.Sweep.KeyStoreGCInterval.WithDefault(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval))), rds.WithGCBatchSize(int(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(cfg.Reprovider.Sweep.KeyStoreBatchSize.WithDefault(config.DefaultReproviderSweepKeyStoreBatchSize)))), ) if err != nil { @@ -322,7 +322,7 @@ func SweepingProvider(cfg *config.Config) fx.Option { prov, err := ddhtprovider.New(inDht, ddhtprovider.WithKeyStore(keyStore), - ddhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)), + ddhtprovider.WithReprovideInterval(reprovideInterval), ddhtprovider.WithMaxReprovideDelay(time.Hour), ddhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)), ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute), @@ -364,7 +364,7 @@ func SweepingProvider(cfg *config.Config) fx.Option { }), dhtprovider.WithReplicationFactor(amino.DefaultBucketSize), - dhtprovider.WithReprovideInterval(cfg.Reprovider.Interval.WithDefault(config.DefaultReproviderInterval)), + dhtprovider.WithReprovideInterval(reprovideInterval), dhtprovider.WithMaxReprovideDelay(time.Hour), dhtprovider.WithOfflineDelay(cfg.Reprovider.Sweep.OfflineDelay.WithDefault(config.DefaultReproviderSweepOfflineDelay)), dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute), @@ -386,28 +386,66 @@ func SweepingProvider(cfg *config.Config) fx.Option { KeyProvider provider.KeyChanFunc } initKeyStore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) { + var ( + cancel context.CancelFunc + done = make(chan struct{}) + ) + + syncKeyStore := func(ctx context.Context) error { + kcf, err := in.KeyProvider(ctx) + if err != nil { + return err + } + if err := in.KeyStore.ResetCids(ctx, kcf); err != nil { + return err + } + if err := in.Provider.RefreshSchedule(); err != nil { + logger.Infow("refreshing provider schedule", "err", err) + } + return nil + } + lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { // Set the KeyProvider as a garbage collection function for the - // keystore. The KeyStore will periodically purge its keys and replace - // them with the ones coming from the KeyChanFunc, to remove CIDs that - // should stop being reprovided from its state. - in.KeyStore.SetGCFunc(in.KeyProvider) + // keystore. Periodically purge the KeyStore from all its keys and + // replace them with the keys that needs to be reprovided, coming from + // the KeyChanFunc. So far, this is the less worse way to remove CIDs + // that shouldn't be reprovided from the provider's state. + if err := syncKeyStore(ctx); err != nil { + return err + } - ch, err := in.KeyProvider(ctx) - if err != nil { - return err - } - // Initialize the KeyStore with the current keys from the KeyProvider. - err = in.KeyStore.ResetCids(ctx, ch) - if err != nil { - return err - } - // Add keys from the KeyStore to the schedule. - _ = in.Provider.RefreshSchedule() + gcCtx, c := context.WithCancel(context.Background()) + cancel = c + + go func() { // garbage collection loop for cids to reprovide + defer close(done) + ticker := time.NewTicker(reprovideInterval) + defer ticker.Stop() + + for { + select { + case <-gcCtx.Done(): + return + case <-ticker.C: + if err := syncKeyStore(gcCtx); err != nil { + logger.Errorw("provider keystore sync", "err", err) + } + } + } + }() return nil }, - OnStop: func(_ context.Context) error { + OnStop: func(ctx context.Context) error { + if cancel != nil { + cancel() + } + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + } return in.KeyStore.Close() }, })