mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-25 20:37:53 +08:00
reprovider: Make codeclimate happier
License: MIT Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
parent
279a560660
commit
3917d4dd18
@ -11,13 +11,15 @@ import (
|
||||
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
|
||||
)
|
||||
|
||||
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
|
||||
// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
|
||||
func NewBlockstoreProvider(bstore blocks.Blockstore) keyChanFunc {
|
||||
return func(ctx context.Context) (<-chan *cid.Cid, error) {
|
||||
return bstore.AllKeysChan(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc {
|
||||
// NewPinnedProvider returns provider supplying pinned keys
|
||||
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) keyChanFunc {
|
||||
return func(ctx context.Context) (<-chan *cid.Cid, error) {
|
||||
set, err := pinSet(ctx, pinning, dag, onlyRoots)
|
||||
if err != nil {
|
||||
|
||||
@ -13,7 +13,7 @@ import (
|
||||
|
||||
var log = logging.Logger("reprovider")
|
||||
|
||||
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
|
||||
type keyChanFunc func(context.Context) (<-chan *cid.Cid, error)
|
||||
|
||||
type Reprovider struct {
|
||||
ctx context.Context
|
||||
@ -22,19 +22,21 @@ type Reprovider struct {
|
||||
// The routing system to provide values through
|
||||
rsys routing.ContentRouting
|
||||
|
||||
keyProvider KeyChanFunc
|
||||
keyProvider keyChanFunc
|
||||
}
|
||||
|
||||
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
|
||||
// NewReprovider creates new Reprovider instance.
|
||||
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider keyChanFunc) *Reprovider {
|
||||
return &Reprovider{
|
||||
ctx: ctx,
|
||||
trigger: make(chan context.CancelFunc),
|
||||
|
||||
rsys: rsys,
|
||||
rsys: rsys,
|
||||
keyProvider: keyProvider,
|
||||
}
|
||||
}
|
||||
|
||||
// ProvideEvery re-provides keys with 'tick' interval
|
||||
func (rp *Reprovider) ProvideEvery(tick time.Duration) {
|
||||
// dont reprovide immediately.
|
||||
// may have just started the daemon and shutting it down immediately.
|
||||
@ -49,7 +51,7 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) {
|
||||
case <-after:
|
||||
}
|
||||
|
||||
err := rp.Reprovide(rp.ctx)
|
||||
err := rp.Reprovide()
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
}
|
||||
@ -61,14 +63,15 @@ func (rp *Reprovider) ProvideEvery(tick time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
func (rp *Reprovider) Reprovide(ctx context.Context) error {
|
||||
keychan, err := rp.keyProvider(ctx)
|
||||
// Reprovide registers all keys given by rp.keyProvider to libp2p content routing
|
||||
func (rp *Reprovider) Reprovide() error {
|
||||
keychan, err := rp.keyProvider(rp.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get key chan: %s", err)
|
||||
}
|
||||
for c := range keychan {
|
||||
op := func() error {
|
||||
err := rp.rsys.Provide(ctx, c, true)
|
||||
err := rp.rsys.Provide(rp.ctx, c, true)
|
||||
if err != nil {
|
||||
log.Debugf("Failed to provide key: %s", err)
|
||||
}
|
||||
@ -86,8 +89,10 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Trigger starts reprovision process in rp.ProvideEvery and waits for it
|
||||
func (rp *Reprovider) Trigger(ctx context.Context) error {
|
||||
progressCtx, done := context.WithCancel(ctx)
|
||||
|
||||
select {
|
||||
case <-rp.ctx.Done():
|
||||
return context.Canceled
|
||||
|
||||
Loading…
Reference in New Issue
Block a user