reprovider: Implement 'bitswap reprovide' command

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2017-08-01 17:45:15 +02:00
parent bb7aee5729
commit 17ae331be2
4 changed files with 70 additions and 15 deletions

View File

@ -21,10 +21,11 @@ var BitswapCmd = &cmds.Command{
ShortDescription: ``,
},
Subcommands: map[string]*cmds.Command{
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
"unwant": unwantCmd,
"ledger": ledgerCmd,
"wantlist": showWantlistCmd,
"stat": bitswapStatCmd,
"unwant": unwantCmd,
"ledger": ledgerCmd,
"reprovide": reprovideCmd,
},
}
@ -242,3 +243,30 @@ prints the ledger associated with a given peer.
},
},
}
var reprovideCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Trigger reprovider.",
ShortDescription: `
Trigger reprovider to announce our data to network.
`,
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !nd.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}
err = nd.Reprovider.Trigger(req.Context())
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
},
}

View File

@ -277,7 +277,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
default:
return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy)
}
n.Reprovider = rp.NewReprovider(n.Routing, keyProvider)
n.Reprovider = rp.NewReprovider(ctx, n.Routing, keyProvider)
if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
@ -290,7 +290,7 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
interval = dur
}
go n.Reprovider.ProvideEvery(ctx, interval)
go n.Reprovider.ProvideEvery(interval)
}
return nil

View File

@ -26,6 +26,7 @@ func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots b
outCh := make(chan *cid.Cid)
go func() {
defer close(outCh)
set.ForEach(func(c *cid.Cid) error {
select {
case <-ctx.Done():

View File

@ -16,35 +16,48 @@ var log = logging.Logger("reprovider")
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type Reprovider struct {
ctx context.Context
trigger chan context.CancelFunc
// The routing system to provide values through
rsys routing.ContentRouting
keyProvider KeyChanFunc
}
func NewReprovider(rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
func NewReprovider(ctx context.Context, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
rsys: rsys,
ctx: ctx,
trigger: make(chan context.CancelFunc),
rsys: rsys,
keyProvider: keyProvider,
}
}
func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) {
func (rp *Reprovider) ProvideEvery(tick time.Duration) {
// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done context.CancelFunc
for {
select {
case <-ctx.Done():
case <-rp.ctx.Done():
return
case done = <-rp.trigger:
case <-after:
err := rp.Reprovide(ctx)
if err != nil {
log.Debug(err)
}
after = time.After(tick)
}
err := rp.Reprovide(rp.ctx)
if err != nil {
log.Debug(err)
}
if done != nil {
done()
}
after = time.After(tick)
}
}
@ -72,3 +85,16 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error {
}
return nil
}
func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx)
select {
case <-rp.ctx.Done():
return context.Canceled
case <-ctx.Done():
return context.Canceled
case rp.trigger <- done:
<-progressCtx.Done()
return nil
}
}