Merge pull request #6248 from ipfs/feat/reprovider-noctx

reprovider: Use goprocess
This commit is contained in:
Steven Allen 2019-04-23 17:32:56 -07:00 committed by GitHub
commit c1bc202f41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 51 additions and 42 deletions

View File

@ -19,7 +19,6 @@ import (
"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/node"
"github.com/ipfs/go-ipfs/core/node/libp2p"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
"github.com/ipfs/go-ipfs/filestore"
"github.com/ipfs/go-ipfs/fuse/mount"
"github.com/ipfs/go-ipfs/namesys"
@ -28,6 +27,7 @@ import (
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"
rp "github.com/ipfs/go-ipfs/reprovide"
bserv "github.com/ipfs/go-blockservice"
bstore "github.com/ipfs/go-ipfs-blockstore"
@ -69,9 +69,9 @@ type IpfsNode struct {
Repo repo.Repo
// Local node
Pinning pin.Pinner // the pinning manager
Mounts Mounts `optional:"true"` // current mount state, if any.
PrivateKey ic.PrivKey // the local node's private Key
Pinning pin.Pinner // the pinning manager
Mounts Mounts `optional:"true"` // current mount state, if any.
PrivateKey ic.PrivKey // the local node's private Key
PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network
// Services

View File

@ -119,4 +119,3 @@ func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.
return root, err
}

View File

@ -11,10 +11,10 @@ import (
"go.uber.org/fx"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/exchange/reprovide"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/reprovide"
)
const kReprovideFrequency = time.Hour * 12
@ -45,6 +45,16 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu
func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) {
var keyProvider reprovide.KeyChanFunc
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return nil, err
}
reproviderInterval = dur
}
switch cfg.Reprovider.Strategy {
case "all":
fallthrough
@ -57,21 +67,11 @@ func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config
default:
return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)
}
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), rt, keyProvider), nil
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
// Reprovider runs the reprovider service
func Reprovider(cfg *config.Config, reprovider *reprovide.Reprovider) error {
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}
reproviderInterval = dur
}
go reprovider.Run(reproviderInterval) // TODO: refactor reprovider to have Start/Stop, use lifecycle
func Reprovider(lp lcProcess, reprovider *reprovide.Reprovider) error {
lp.Append(reprovider.Run)
return nil
}

View File

@ -2,67 +2,77 @@ package reprovide
import (
"context"
"errors"
"fmt"
"time"
backoff "github.com/cenkalti/backoff"
cid "github.com/ipfs/go-cid"
"github.com/cenkalti/backoff"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-verifcid"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
routing "github.com/libp2p/go-libp2p-routing"
)
var log = logging.Logger("reprovider")
//KeyChanFunc is function streaming CIDs to pass to content routing
// KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)
type doneFunc func(error)
type Reprovider struct {
ctx context.Context
trigger chan doneFunc
closing chan struct{}
// The routing system to provide values through
rsys routing.ContentRouting
keyProvider KeyChanFunc
tick time.Duration
}
// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
func NewReprovider(ctx context.Context, tick time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
ctx: ctx,
trigger: make(chan doneFunc),
closing: make(chan struct{}),
rsys: rsys,
keyProvider: keyProvider,
tick: tick,
}
}
// Run re-provides keys with 'tick' interval or when triggered
func (rp *Reprovider) Run(tick time.Duration) {
func (rp *Reprovider) Run(proc goprocess.Process) {
ctx := goprocessctx.WithProcessClosing(rp.ctx, proc)
defer close(rp.closing)
// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done doneFunc
for {
if tick == 0 {
if rp.tick == 0 {
after = make(chan time.Time)
}
select {
case <-rp.ctx.Done():
case <-ctx.Done():
return
case done = <-rp.trigger:
case <-after:
}
//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
// 'mute' the trigger channel so when `ipfs bitswap reprovide` is called
// a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()
err := rp.Reprovide()
err := rp.reprovide(ctx)
if err != nil {
log.Debug(err)
}
@ -73,13 +83,13 @@ func (rp *Reprovider) Run(tick time.Duration) {
unmute()
after = time.After(tick)
after = time.After(rp.tick)
}
}
// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) Reprovide() error {
keychan, err := rp.keyProvider(rp.ctx)
// reprovide registers all keys given by rp.keyProvider to libp2p content routing
func (rp *Reprovider) reprovide(ctx context.Context) error {
keychan, err := rp.keyProvider(ctx)
if err != nil {
return fmt.Errorf("failed to get key chan: %s", err)
}
@ -90,7 +100,7 @@ func (rp *Reprovider) Reprovide() error {
continue
}
op := func() error {
err := rp.rsys.Provide(rp.ctx, c, true)
err := rp.rsys.Provide(ctx, c, true)
if err != nil {
log.Debugf("Failed to provide key: %s", err)
}
@ -119,10 +129,12 @@ func (rp *Reprovider) Trigger(ctx context.Context) error {
}
select {
case <-rp.closing:
return errors.New("reprovider is closed")
case <-rp.ctx.Done():
return context.Canceled
return rp.ctx.Err()
case <-ctx.Done():
return context.Canceled
return ctx.Err()
case rp.trigger <- df:
<-progressCtx.Done()
return err

View File

@ -1,4 +1,4 @@
package reprovide_test
package reprovide
import (
"context"
@ -10,9 +10,7 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
mock "github.com/ipfs/go-ipfs-routing/mock"
pstore "github.com/libp2p/go-libp2p-peerstore"
testutil "github.com/libp2p/go-testutil"
. "github.com/ipfs/go-ipfs/exchange/reprovide"
"github.com/libp2p/go-testutil"
)
func TestReprovide(t *testing.T) {
@ -36,8 +34,8 @@ func TestReprovide(t *testing.T) {
}
keyProvider := NewBlockstoreProvider(bstore)
reprov := NewReprovider(ctx, clA, keyProvider)
err = reprov.Reprovide()
reprov := NewReprovider(ctx, 0, clA, keyProvider)
err = reprov.reprovide(ctx)
if err != nil {
t.Fatal(err)
}