diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index 867cb49c2..a769e6b0a 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -21,10 +21,11 @@ var BitswapCmd = &cmds.Command{ ShortDescription: ``, }, Subcommands: map[string]*cmds.Command{ - "wantlist": showWantlistCmd, - "stat": bitswapStatCmd, - "unwant": unwantCmd, - "ledger": ledgerCmd, + "wantlist": showWantlistCmd, + "stat": bitswapStatCmd, + "unwant": unwantCmd, + "ledger": ledgerCmd, + "reprovide": reprovideCmd, }, } @@ -242,3 +243,30 @@ prints the ledger associated with a given peer. }, }, } + +var reprovideCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Trigger reprovider.", + ShortDescription: ` +Trigger reprovider to announce our data to network. +`, + }, + Run: func(req cmds.Request, res cmds.Response) { + nd, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + if !nd.OnlineMode() { + res.SetError(errNotOnline, cmds.ErrClient) + return + } + + err = nd.Reprovider.Trigger(req.Context()) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + }, +} diff --git a/core/core.go b/core/core.go index 33a6b3b50..faf64ff5d 100644 --- a/core/core.go +++ b/core/core.go @@ -277,7 +277,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { default: return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy) } - n.Reprovider = rp.NewReprovider(n.Routing, keyProvider) + n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider) if cfg.Reprovider.Interval != "0" { interval := kReprovideFrequency @@ -290,7 +290,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { interval = dur } - go n.Reprovider.ProvideEvery(ctx, interval) + go n.Reprovider.ProvideEvery(interval) } return nil diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index b96807d37..07ddd99cc 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -26,6 +26,7 @@ func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots b outCh := make(chan *cid.Cid) go func() { + defer close(outCh) set.ForEach(func(c *cid.Cid) error { select { case <-ctx.Done(): diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index 3d42c8439..d84d753e0 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -16,35 +16,48 @@ var log = logging.Logger("reprovider") type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error) type Reprovider struct { + ctx context.Context + trigger chan context.CancelFunc + // The routing system to provide values through rsys routing.ContentRouting keyProvider KeyChanFunc } -func NewReprovider(rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { +func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ - rsys: rsys, + ctx: ctx, + trigger: make(chan context.CancelFunc), + + rsys: rsys, keyProvider: keyProvider, } } -func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) { +func (rp *Reprovider) ProvideEvery(tick time.Duration) { // dont reprovide immediately. // may have just started the daemon and shutting it down immediately. // probability( up another minute | uptime ) increases with uptime. after := time.After(time.Minute) + var done context.CancelFunc for { select { - case <-ctx.Done(): + case <-rp.ctx.Done(): return + case done = <-rp.trigger: case <-after: - err := rp.Reprovide(ctx) - if err != nil { - log.Debug(err) - } - after = time.After(tick) } + + err := rp.Reprovide(rp.ctx) + if err != nil { + log.Debug(err) + } + + if done != nil { + done() + } + after = time.After(tick) } } @@ -72,3 +85,16 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error { } return nil } + +func (rp *Reprovider) Trigger(ctx context.Context) error { + progressCtx, done := context.WithCancel(ctx) + select { + case <-rp.ctx.Done(): + return context.Canceled + case <-ctx.Done(): + return context.Canceled + case rp.trigger <- done: + <-progressCtx.Done() + return nil + } +}