From d36b6dbd06b105b61ac8e5bf40c1eae6fcd474d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 23 Apr 2019 20:11:43 +0200 Subject: [PATCH] reprovider: use goprocess MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/node/provider.go | 26 ++++++++++----------- reprovide/providers.go | 8 +++---- reprovide/reprovide.go | 46 +++++++++++++++++++++++-------------- reprovide/reprovide_test.go | 9 ++++---- 4 files changed, 50 insertions(+), 39 deletions(-) diff --git a/core/node/provider.go b/core/node/provider.go index 6736b9358..37b9637a7 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -45,6 +45,16 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { var keyProvider reprovide.KeyChanFunc + reproviderInterval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return nil, err + } + + reproviderInterval = dur + } + switch cfg.Reprovider.Strategy { case "all": fallthrough @@ -57,21 +67,11 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config default: return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) } - return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), rt, keyProvider), nil + return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } // Reprovider runs the reprovider service -func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error { - reproviderInterval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return err - } - - reproviderInterval = dur - } - - go reprovider.Run(reproviderInterval) // TODO: refactor reprovider to have Start/Stop, use lifecycle +func Reprovider(lp lcProcess, reprovider *reprovide.Reprovider) error { + lp.Append(reprovider.Run) return nil } diff --git a/reprovide/providers.go b/reprovide/providers.go index 7e7fbef19..77b19e2f8 100644 --- a/reprovide/providers.go +++ b/reprovide/providers.go @@ -3,13 +3,13 @@ package reprovide import ( "context" - "github.com/ipfs/go-ipfs/pin" + pin "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-cidutil" + cid "github.com/ipfs/go-cid" + cidutil "github.com/ipfs/go-cidutil" blocks "github.com/ipfs/go-ipfs-blockstore" ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" + merkledag "github.com/ipfs/go-merkledag" ) // NewBlockstoreProvider returns key provider using bstore.AllKeysChan diff --git a/reprovide/reprovide.go b/reprovide/reprovide.go index 9c4666512..1a6f5bad3 100644 --- a/reprovide/reprovide.go +++ b/reprovide/reprovide.go @@ -2,67 +2,77 @@ package reprovide import ( "context" + "errors" "fmt" "time" - backoff "github.com/cenkalti/backoff" - cid "github.com/ipfs/go-cid" + "github.com/cenkalti/backoff" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/ipfs/go-verifcid" + "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" routing "github.com/libp2p/go-libp2p-routing" ) var log = logging.Logger("reprovider") -//KeyChanFunc is function streaming CIDs to pass to content routing +// KeyChanFunc is function streaming CIDs to pass to content routing type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) type doneFunc func(error) type Reprovider struct { ctx context.Context trigger chan doneFunc + closing chan struct{} // The routing system to provide values through rsys routing.ContentRouting keyProvider KeyChanFunc + tick time.Duration } // NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { +func NewReprovider(ctx context.Context, tick time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ ctx: ctx, trigger: make(chan doneFunc), + closing: make(chan struct{}), rsys: rsys, keyProvider: keyProvider, + tick: tick, } } // Run re-provides keys with 'tick' interval or when triggered -func (rp *Reprovider) Run(tick time.Duration) { +func (rp *Reprovider) Run(proc goprocess.Process) { + ctx := goprocessctx.WithProcessClosing(rp.ctx, proc) + defer close(rp.closing) + // dont reprovide immediately. // may have just started the daemon and shutting it down immediately. // probability( up another minute | uptime ) increases with uptime. after := time.After(time.Minute) var done doneFunc for { - if tick == 0 { + if rp.tick == 0 { after = make(chan time.Time) } select { - case <-rp.ctx.Done(): + case <-ctx.Done(): return case done = <-rp.trigger: case <-after: } - //'mute' the trigger channel so when `ipfs bitswap reprovide` is called - //a 'reprovider is already running' error is returned + // 'mute' the trigger channel so when `ipfs bitswap reprovide` is called + // a 'reprovider is already running' error is returned unmute := rp.muteTrigger() - err := rp.Reprovide() + err := rp.reprovide(ctx) if err != nil { log.Debug(err) } @@ -73,13 +83,13 @@ func (rp *Reprovider) Run(tick time.Duration) { unmute() - after = time.After(tick) + after = time.After(rp.tick) } } -// Reprovide registers all keys given by rp.keyProvider to libp2p content routing -func (rp *Reprovider) Reprovide() error { - keychan, err := rp.keyProvider(rp.ctx) +// reprovide registers all keys given by rp.keyProvider to libp2p content routing +func (rp *Reprovider) reprovide(ctx context.Context) error { + keychan, err := rp.keyProvider(ctx) if err != nil { return fmt.Errorf("failed to get key chan: %s", err) } @@ -90,7 +100,7 @@ func (rp *Reprovider) Reprovide() error { continue } op := func() error { - err := rp.rsys.Provide(rp.ctx, c, true) + err := rp.rsys.Provide(ctx, c, true) if err != nil { log.Debugf("Failed to provide key: %s", err) } @@ -119,10 +129,12 @@ func (rp *Reprovider) Trigger(ctx context.Context) error { } select { + case <-rp.closing: + return errors.New("reprovider is closed") case <-rp.ctx.Done(): - return context.Canceled + return rp.ctx.Err() case <-ctx.Done(): - return context.Canceled + return ctx.Err() case rp.trigger <- df: <-progressCtx.Done() return err diff --git a/reprovide/reprovide_test.go b/reprovide/reprovide_test.go index 2f81066a1..b9e9738b4 100644 --- a/reprovide/reprovide_test.go +++ b/reprovide/reprovide_test.go @@ -1,8 +1,7 @@ -package reprovide_test +package reprovide import ( "context" - "github.com/ipfs/go-ipfs" "testing" blocks "github.com/ipfs/go-block-format" @@ -34,9 +33,9 @@ func TestReprovide(t *testing.T) { t.Fatal(err) } - keyProvider := ipfs.NewBlockstoreProvider(bstore) - reprov := ipfs.NewReprovider(ctx, clA, keyProvider) - err = reprov.Reprovide() + keyProvider := NewBlockstoreProvider(bstore) + reprov := NewReprovider(ctx, 0, clA, keyProvider) + err = reprov.reprovide(ctx) if err != nil { t.Fatal(err) }