From 14866308c7dbee722a38905acff7a65dbcc505fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 23 Apr 2019 14:41:00 +0200 Subject: [PATCH 1/2] move reprovider out of exchange directory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/core.go | 8 ++++---- core/node/core.go | 1 - core/node/provider.go | 2 +- {exchange/reprovide => reprovide}/providers.go | 8 ++++---- {exchange/reprovide => reprovide}/reprovide.go | 0 {exchange/reprovide => reprovide}/reprovide_test.go | 9 ++++----- 6 files changed, 13 insertions(+), 15 deletions(-) rename {exchange/reprovide => reprovide}/providers.go (91%) rename {exchange/reprovide => reprovide}/reprovide.go (100%) rename {exchange/reprovide => reprovide}/reprovide_test.go (86%) diff --git a/core/core.go b/core/core.go index c8ecbe19f..23656e722 100644 --- a/core/core.go +++ b/core/core.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/core/node/libp2p" - rp "github.com/ipfs/go-ipfs/exchange/reprovide" "github.com/ipfs/go-ipfs/filestore" "github.com/ipfs/go-ipfs/fuse/mount" "github.com/ipfs/go-ipfs/namesys" @@ -28,6 +27,7 @@ import ( "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" + rp "github.com/ipfs/go-ipfs/reprovide" bserv "github.com/ipfs/go-blockservice" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -69,9 +69,9 @@ type IpfsNode struct { Repo repo.Repo // Local node - Pinning pin.Pinner // the pinning manager - Mounts Mounts `optional:"true"` // current mount state, if any. - PrivateKey ic.PrivKey // the local node's private Key + Pinning pin.Pinner // the pinning manager + Mounts Mounts `optional:"true"` // current mount state, if any. + PrivateKey ic.PrivKey // the local node's private Key PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network // Services diff --git a/core/node/core.go b/core/node/core.go index 5da6d2752..d7f3ba97e 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -119,4 +119,3 @@ func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format. return root, err } - diff --git a/core/node/provider.go b/core/node/provider.go index 336a9fd36..6736b9358 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -11,10 +11,10 @@ import ( "go.uber.org/fx" "github.com/ipfs/go-ipfs/core/node/helpers" - "github.com/ipfs/go-ipfs/exchange/reprovide" "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/reprovide" ) const kReprovideFrequency = time.Hour * 12 diff --git a/exchange/reprovide/providers.go b/reprovide/providers.go similarity index 91% rename from exchange/reprovide/providers.go rename to reprovide/providers.go index 77b19e2f8..7e7fbef19 100644 --- a/exchange/reprovide/providers.go +++ b/reprovide/providers.go @@ -3,13 +3,13 @@ package reprovide import ( "context" - pin "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/pin" - cid "github.com/ipfs/go-cid" - cidutil "github.com/ipfs/go-cidutil" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-cidutil" blocks "github.com/ipfs/go-ipfs-blockstore" ipld "github.com/ipfs/go-ipld-format" - merkledag "github.com/ipfs/go-merkledag" + "github.com/ipfs/go-merkledag" ) // NewBlockstoreProvider returns key provider using bstore.AllKeysChan diff --git a/exchange/reprovide/reprovide.go b/reprovide/reprovide.go similarity index 100% rename from exchange/reprovide/reprovide.go rename to reprovide/reprovide.go diff --git a/exchange/reprovide/reprovide_test.go b/reprovide/reprovide_test.go similarity index 86% rename from exchange/reprovide/reprovide_test.go rename to reprovide/reprovide_test.go index ce1d123c1..2f81066a1 100644 --- a/exchange/reprovide/reprovide_test.go +++ b/reprovide/reprovide_test.go @@ -2,6 +2,7 @@ package reprovide_test import ( "context" + "github.com/ipfs/go-ipfs" "testing" blocks "github.com/ipfs/go-block-format" @@ -10,9 +11,7 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" mock "github.com/ipfs/go-ipfs-routing/mock" pstore "github.com/libp2p/go-libp2p-peerstore" - testutil "github.com/libp2p/go-testutil" - - . "github.com/ipfs/go-ipfs/exchange/reprovide" + "github.com/libp2p/go-testutil" ) func TestReprovide(t *testing.T) { @@ -35,8 +34,8 @@ func TestReprovide(t *testing.T) { t.Fatal(err) } - keyProvider := NewBlockstoreProvider(bstore) - reprov := NewReprovider(ctx, clA, keyProvider) + keyProvider := ipfs.NewBlockstoreProvider(bstore) + reprov := ipfs.NewReprovider(ctx, clA, keyProvider) err = reprov.Reprovide() if err != nil { t.Fatal(err) From d36b6dbd06b105b61ac8e5bf40c1eae6fcd474d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 23 Apr 2019 20:11:43 +0200 Subject: [PATCH 2/2] reprovider: use goprocess MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/node/provider.go | 26 ++++++++++----------- reprovide/providers.go | 8 +++---- reprovide/reprovide.go | 46 +++++++++++++++++++++++-------------- reprovide/reprovide_test.go | 9 ++++---- 4 files changed, 50 insertions(+), 39 deletions(-) diff --git a/core/node/provider.go b/core/node/provider.go index 6736b9358..37b9637a7 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -45,6 +45,16 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { var keyProvider reprovide.KeyChanFunc + reproviderInterval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return nil, err + } + + reproviderInterval = dur + } + switch cfg.Reprovider.Strategy { case "all": fallthrough @@ -57,21 +67,11 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config default: return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) } - return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), rt, keyProvider), nil + return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } // Reprovider runs the reprovider service -func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error { - reproviderInterval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return err - } - - reproviderInterval = dur - } - - go reprovider.Run(reproviderInterval) // TODO: refactor reprovider to have Start/Stop, use lifecycle +func Reprovider(lp lcProcess, reprovider *reprovide.Reprovider) error { + lp.Append(reprovider.Run) return nil } diff --git a/reprovide/providers.go b/reprovide/providers.go index 7e7fbef19..77b19e2f8 100644 --- a/reprovide/providers.go +++ b/reprovide/providers.go @@ -3,13 +3,13 @@ package reprovide import ( "context" - "github.com/ipfs/go-ipfs/pin" + pin "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-cidutil" + cid "github.com/ipfs/go-cid" + cidutil "github.com/ipfs/go-cidutil" blocks "github.com/ipfs/go-ipfs-blockstore" ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" + merkledag "github.com/ipfs/go-merkledag" ) // NewBlockstoreProvider returns key provider using bstore.AllKeysChan diff --git a/reprovide/reprovide.go b/reprovide/reprovide.go index 9c4666512..1a6f5bad3 100644 --- a/reprovide/reprovide.go +++ b/reprovide/reprovide.go @@ -2,67 +2,77 @@ package reprovide import ( "context" + "errors" "fmt" "time" - backoff "github.com/cenkalti/backoff" - cid "github.com/ipfs/go-cid" + "github.com/cenkalti/backoff" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/ipfs/go-verifcid" + "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" routing "github.com/libp2p/go-libp2p-routing" ) var log = logging.Logger("reprovider") -//KeyChanFunc is function streaming CIDs to pass to content routing +// KeyChanFunc is function streaming CIDs to pass to content routing type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) type doneFunc func(error) type Reprovider struct { ctx context.Context trigger chan doneFunc + closing chan struct{} // The routing system to provide values through rsys routing.ContentRouting keyProvider KeyChanFunc + tick time.Duration } // NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { +func NewReprovider(ctx context.Context, tick time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ ctx: ctx, trigger: make(chan doneFunc), + closing: make(chan struct{}), rsys: rsys, keyProvider: keyProvider, + tick: tick, } } // Run re-provides keys with 'tick' interval or when triggered -func (rp *Reprovider) Run(tick time.Duration) { +func (rp *Reprovider) Run(proc goprocess.Process) { + ctx := goprocessctx.WithProcessClosing(rp.ctx, proc) + defer close(rp.closing) + // dont reprovide immediately. // may have just started the daemon and shutting it down immediately. // probability( up another minute | uptime ) increases with uptime. after := time.After(time.Minute) var done doneFunc for { - if tick == 0 { + if rp.tick == 0 { after = make(chan time.Time) } select { - case <-rp.ctx.Done(): + case <-ctx.Done(): return case done = <-rp.trigger: case <-after: } - //'mute' the trigger channel so when `ipfs bitswap reprovide` is called - //a 'reprovider is already running' error is returned + // 'mute' the trigger channel so when `ipfs bitswap reprovide` is called + // a 'reprovider is already running' error is returned unmute := rp.muteTrigger() - err := rp.Reprovide() + err := rp.reprovide(ctx) if err != nil { log.Debug(err) } @@ -73,13 +83,13 @@ func (rp *Reprovider) Run(tick time.Duration) { unmute() - after = time.After(tick) + after = time.After(rp.tick) } } -// Reprovide registers all keys given by rp.keyProvider to libp2p content routing -func (rp *Reprovider) Reprovide() error { - keychan, err := rp.keyProvider(rp.ctx) +// reprovide registers all keys given by rp.keyProvider to libp2p content routing +func (rp *Reprovider) reprovide(ctx context.Context) error { + keychan, err := rp.keyProvider(ctx) if err != nil { return fmt.Errorf("failed to get key chan: %s", err) } @@ -90,7 +100,7 @@ func (rp *Reprovider) Reprovide() error { continue } op := func() error { - err := rp.rsys.Provide(rp.ctx, c, true) + err := rp.rsys.Provide(ctx, c, true) if err != nil { log.Debugf("Failed to provide key: %s", err) } @@ -119,10 +129,12 @@ func (rp *Reprovider) Trigger(ctx context.Context) error { } select { + case <-rp.closing: + return errors.New("reprovider is closed") case <-rp.ctx.Done(): - return context.Canceled + return rp.ctx.Err() case <-ctx.Done(): - return context.Canceled + return ctx.Err() case rp.trigger <- df: <-progressCtx.Done() return err diff --git a/reprovide/reprovide_test.go b/reprovide/reprovide_test.go index 2f81066a1..b9e9738b4 100644 --- a/reprovide/reprovide_test.go +++ b/reprovide/reprovide_test.go @@ -1,8 +1,7 @@ -package reprovide_test +package reprovide import ( "context" - "github.com/ipfs/go-ipfs" "testing" blocks "github.com/ipfs/go-block-format" @@ -34,9 +33,9 @@ func TestReprovide(t *testing.T) { t.Fatal(err) } - keyProvider := ipfs.NewBlockstoreProvider(bstore) - reprov := ipfs.NewReprovider(ctx, clA, keyProvider) - err = reprov.Reprovide() + keyProvider := NewBlockstoreProvider(bstore) + reprov := NewReprovider(ctx, 0, clA, keyProvider) + err = reprov.reprovide(ctx) if err != nil { t.Fatal(err) }