Reprovider strategies

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera 2017-08-01 02:57:21 +02:00
parent 1d2fbbd850
commit f20683eb53
7 changed files with 120 additions and 25 deletions

View File

@ -231,5 +231,11 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}
n.Resolver = path.NewBasicResolver(n.DAG)
if cfg.Online {
if err := n.startLateOnlineServices(ctx); err != nil {
return err
}
}
return n.loadFilesRoot()
}

View File

@ -237,22 +237,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}
n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore)
if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}
interval = dur
}
go n.Reprovider.ProvideEvery(ctx, interval)
}
if pubsub {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
@ -273,6 +257,45 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return n.Bootstrap(DefaultBootstrapConfig)
}
func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
cfg, err := n.Repo.Config()
if err != nil {
return err
}
var keyProvider func(context.Context) (<-chan *cid.Cid, error)
switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = rp.NewBlockstoreProvider(n.Blockstore)
case "roots":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true)
case "pinned":
keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false)
default:
return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy)
}
n.Reprovider = rp.NewReprovider(n.Routing, keyProvider)
if cfg.Reprovider.Interval != "0" {
interval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return err
}
interval = dur
}
go n.Reprovider.ProvideEvery(ctx, interval)
}
return nil
}
func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) {
var annAddrs []ma.Multiaddr
for _, addr := range cfg.Announce {

View File

@ -0,0 +1,62 @@
package reprovide
import (
"context"
"errors"
"fmt"
blocks "github.com/ipfs/go-ipfs/blocks/blockstore"
merkledag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
)
func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
return bstore.AllKeysChan(ctx)
}
}
func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc {
return func(ctx context.Context) (<-chan *cid.Cid, error) {
set, err := pinSet(ctx, pinning, dag, onlyRoots)
if err != nil {
return nil, err
}
outCh := make(chan *cid.Cid)
go func() {
set.ForEach(func(c *cid.Cid) error {
select {
case <-ctx.Done():
return errors.New("context cancelled")
case outCh <- c:
}
return nil
})
}()
return outCh, nil
}
}
func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*cid.Set, error) {
set := cid.NewSet()
for _, key := range pinning.DirectKeys() {
set.Add(key)
}
for _, key := range pinning.RecursiveKeys() {
set.Add(key)
if !onlyRoots {
err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.Visit)
if err != nil {
return nil, err
}
}
}
return set, nil
}

View File

@ -5,26 +5,27 @@ import (
"fmt"
"time"
blocks "github.com/ipfs/go-ipfs/blocks/blockstore"
backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff"
routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
)
var log = logging.Logger("reprovider")
type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error)
type Reprovider struct {
// The routing system to provide values through
rsys routing.ContentRouting
// The backing store for blocks to be provided
bstore blocks.Blockstore
keyProvider KeyChanFunc
}
func NewReprovider(rsys routing.ContentRouting, bstore blocks.Blockstore) *Reprovider {
func NewReprovider(rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
return &Reprovider{
rsys: rsys,
bstore: bstore,
rsys: rsys,
keyProvider: keyProvider,
}
}
@ -48,9 +49,9 @@ func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) {
}
func (rp *Reprovider) Reprovide(ctx context.Context) error {
keychan, err := rp.bstore.AllKeysChan(ctx)
keychan, err := rp.keyProvider(ctx)
if err != nil {
return fmt.Errorf("Failed to get key chan from blockstore: %s", err)
return fmt.Errorf("Failed to get key chan: %s", err)
}
for c := range keychan {
op := func() error {

View File

@ -32,7 +32,8 @@ func TestReprovide(t *testing.T) {
blk := blocks.NewBlock([]byte("this is a test"))
bstore.Put(blk)
reprov := NewReprovider(clA, bstore)
keyProvider := NewBlockstoreProvider(bstore)
reprov := NewReprovider(clA, keyProvider)
err := reprov.Reprovide(ctx)
if err != nil {
t.Fatal(err)

View File

@ -72,6 +72,7 @@ func Init(out io.Writer, nBitsForKeypair int) (*Config, error) {
},
Reprovider: Reprovider{
Interval: "12h",
Strategy: "all",
},
}

View File

@ -2,4 +2,5 @@ package config
type Reprovider struct {
Interval string // Time period to reprovide locally stored objects to the network
Strategy string // Which keys to announce
}