diff --git a/core/core.go b/core/core.go index c8ecbe19f..23656e722 100644 --- a/core/core.go +++ b/core/core.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/go-ipfs/core/bootstrap" "github.com/ipfs/go-ipfs/core/node" "github.com/ipfs/go-ipfs/core/node/libp2p" - rp "github.com/ipfs/go-ipfs/exchange/reprovide" "github.com/ipfs/go-ipfs/filestore" "github.com/ipfs/go-ipfs/fuse/mount" "github.com/ipfs/go-ipfs/namesys" @@ -28,6 +27,7 @@ import ( "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" + rp "github.com/ipfs/go-ipfs/reprovide" bserv "github.com/ipfs/go-blockservice" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -69,9 +69,9 @@ type IpfsNode struct { Repo repo.Repo // Local node - Pinning pin.Pinner // the pinning manager - Mounts Mounts `optional:"true"` // current mount state, if any. - PrivateKey ic.PrivKey // the local node's private Key + Pinning pin.Pinner // the pinning manager + Mounts Mounts `optional:"true"` // current mount state, if any. + PrivateKey ic.PrivKey // the local node's private Key PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network // Services diff --git a/core/node/core.go b/core/node/core.go index 5da6d2752..d7f3ba97e 100644 --- a/core/node/core.go +++ b/core/node/core.go @@ -119,4 +119,3 @@ func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format. return root, err } - diff --git a/core/node/provider.go b/core/node/provider.go index 336a9fd36..37b9637a7 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -11,10 +11,10 @@ import ( "go.uber.org/fx" "github.com/ipfs/go-ipfs/core/node/helpers" - "github.com/ipfs/go-ipfs/exchange/reprovide" "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/reprovide" ) const kReprovideFrequency = time.Hour * 12 @@ -45,6 +45,16 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { var keyProvider reprovide.KeyChanFunc + reproviderInterval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return nil, err + } + + reproviderInterval = dur + } + switch cfg.Reprovider.Strategy { case "all": fallthrough @@ -57,21 +67,11 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config default: return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) } - return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), rt, keyProvider), nil + return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } // Reprovider runs the reprovider service -func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error { - reproviderInterval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return err - } - - reproviderInterval = dur - } - - go reprovider.Run(reproviderInterval) // TODO: refactor reprovider to have Start/Stop, use lifecycle +func Reprovider(lp lcProcess, reprovider *reprovide.Reprovider) error { + lp.Append(reprovider.Run) return nil } diff --git a/exchange/reprovide/providers.go b/reprovide/providers.go similarity index 100% rename from exchange/reprovide/providers.go rename to reprovide/providers.go diff --git a/exchange/reprovide/reprovide.go b/reprovide/reprovide.go similarity index 69% rename from exchange/reprovide/reprovide.go rename to reprovide/reprovide.go index 9c4666512..1a6f5bad3 100644 --- a/exchange/reprovide/reprovide.go +++ b/reprovide/reprovide.go @@ -2,67 +2,77 @@ package reprovide import ( "context" + "errors" "fmt" "time" - backoff "github.com/cenkalti/backoff" - cid "github.com/ipfs/go-cid" + "github.com/cenkalti/backoff" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/ipfs/go-verifcid" + "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" routing "github.com/libp2p/go-libp2p-routing" ) var log = logging.Logger("reprovider") -//KeyChanFunc is function streaming CIDs to pass to content routing +// KeyChanFunc is function streaming CIDs to pass to content routing type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) type doneFunc func(error) type Reprovider struct { ctx context.Context trigger chan doneFunc + closing chan struct{} // The routing system to provide values through rsys routing.ContentRouting keyProvider KeyChanFunc + tick time.Duration } // NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { +func NewReprovider(ctx context.Context, tick time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ ctx: ctx, trigger: make(chan doneFunc), + closing: make(chan struct{}), rsys: rsys, keyProvider: keyProvider, + tick: tick, } } // Run re-provides keys with 'tick' interval or when triggered -func (rp *Reprovider) Run(tick time.Duration) { +func (rp *Reprovider) Run(proc goprocess.Process) { + ctx := goprocessctx.WithProcessClosing(rp.ctx, proc) + defer close(rp.closing) + // dont reprovide immediately. // may have just started the daemon and shutting it down immediately. // probability( up another minute | uptime ) increases with uptime. after := time.After(time.Minute) var done doneFunc for { - if tick == 0 { + if rp.tick == 0 { after = make(chan time.Time) } select { - case <-rp.ctx.Done(): + case <-ctx.Done(): return case done = <-rp.trigger: case <-after: } - //'mute' the trigger channel so when `ipfs bitswap reprovide` is called - //a 'reprovider is already running' error is returned + // 'mute' the trigger channel so when `ipfs bitswap reprovide` is called + // a 'reprovider is already running' error is returned unmute := rp.muteTrigger() - err := rp.Reprovide() + err := rp.reprovide(ctx) if err != nil { log.Debug(err) } @@ -73,13 +83,13 @@ func (rp *Reprovider) Run(tick time.Duration) { unmute() - after = time.After(tick) + after = time.After(rp.tick) } } -// Reprovide registers all keys given by rp.keyProvider to libp2p content routing -func (rp *Reprovider) Reprovide() error { - keychan, err := rp.keyProvider(rp.ctx) +// reprovide registers all keys given by rp.keyProvider to libp2p content routing +func (rp *Reprovider) reprovide(ctx context.Context) error { + keychan, err := rp.keyProvider(ctx) if err != nil { return fmt.Errorf("failed to get key chan: %s", err) } @@ -90,7 +100,7 @@ func (rp *Reprovider) Reprovide() error { continue } op := func() error { - err := rp.rsys.Provide(rp.ctx, c, true) + err := rp.rsys.Provide(ctx, c, true) if err != nil { log.Debugf("Failed to provide key: %s", err) } @@ -119,10 +129,12 @@ func (rp *Reprovider) Trigger(ctx context.Context) error { } select { + case <-rp.closing: + return errors.New("reprovider is closed") case <-rp.ctx.Done(): - return context.Canceled + return rp.ctx.Err() case <-ctx.Done(): - return context.Canceled + return ctx.Err() case rp.trigger <- df: <-progressCtx.Done() return err diff --git a/exchange/reprovide/reprovide_test.go b/reprovide/reprovide_test.go similarity index 86% rename from exchange/reprovide/reprovide_test.go rename to reprovide/reprovide_test.go index ce1d123c1..b9e9738b4 100644 --- a/exchange/reprovide/reprovide_test.go +++ b/reprovide/reprovide_test.go @@ -1,4 +1,4 @@ -package reprovide_test +package reprovide import ( "context" @@ -10,9 +10,7 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" mock "github.com/ipfs/go-ipfs-routing/mock" pstore "github.com/libp2p/go-libp2p-peerstore" - testutil "github.com/libp2p/go-testutil" - - . "github.com/ipfs/go-ipfs/exchange/reprovide" + "github.com/libp2p/go-testutil" ) func TestReprovide(t *testing.T) { @@ -36,8 +34,8 @@ func TestReprovide(t *testing.T) { } keyProvider := NewBlockstoreProvider(bstore) - reprov := NewReprovider(ctx, clA, keyProvider) - err = reprov.Reprovide() + reprov := NewReprovider(ctx, 0, clA, keyProvider) + err = reprov.reprovide(ctx) if err != nil { t.Fatal(err) }