From f20683eb531e8e798b8f5f6d09067567a6e58fe2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 1 Aug 2017 02:57:21 +0200 Subject: [PATCH] Reprovider strategies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/builder.go | 6 +++ core/core.go | 55 +++++++++++++++++------- exchange/reprovide/providers.go | 62 ++++++++++++++++++++++++++++ exchange/reprovide/reprovide.go | 17 ++++---- exchange/reprovide/reprovide_test.go | 3 +- repo/config/init.go | 1 + repo/config/reprovider.go | 1 + 7 files changed, 120 insertions(+), 25 deletions(-) create mode 100644 exchange/reprovide/providers.go diff --git a/core/builder.go b/core/builder.go index 28a5a283b..065c9cbd8 100644 --- a/core/builder.go +++ b/core/builder.go @@ -231,5 +231,11 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } n.Resolver = path.NewBasicResolver(n.DAG) + if cfg.Online { + if err := n.startLateOnlineServices(ctx); err != nil { + return err + } + } + return n.loadFilesRoot() } diff --git a/core/core.go b/core/core.go index 8afd615db..33a6b3b50 100644 --- a/core/core.go +++ b/core/core.go @@ -237,22 +237,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return err } - n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) - - if cfg.Reprovider.Interval != "0" { - interval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return err - } - - interval = dur - } - - go n.Reprovider.ProvideEvery(ctx, interval) - } - if pubsub { n.Floodsub = floodsub.NewFloodSub(ctx, peerhost) } @@ -273,6 +257,45 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin return n.Bootstrap(DefaultBootstrapConfig) } +func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { + cfg, err := n.Repo.Config() + if err != nil { + return err + } + + var keyProvider func(context.Context) (<-chan *cid.Cid, error) + + switch cfg.Reprovider.Strategy { + case "all": + fallthrough + case "": + keyProvider = rp.NewBlockstoreProvider(n.Blockstore) + case "roots": + keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, true) + case "pinned": + keyProvider = rp.NewPinnedProvider(n.Pinning, n.DAG, false) + default: + return fmt.Errorf("unknown reprovider strtaegy '%s'", cfg.Reprovider.Strategy) + } + n.Reprovider = rp.NewReprovider(n.Routing, keyProvider) + + if cfg.Reprovider.Interval != "0" { + interval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return err + } + + interval = dur + } + + go n.Reprovider.ProvideEvery(ctx, interval) + } + + return nil +} + func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { var annAddrs []ma.Multiaddr for _, addr := range cfg.Announce { diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go new file mode 100644 index 000000000..27be34855 --- /dev/null +++ b/exchange/reprovide/providers.go @@ -0,0 +1,62 @@ +package reprovide + +import ( + "context" + "errors" + "fmt" + + blocks "github.com/ipfs/go-ipfs/blocks/blockstore" + merkledag "github.com/ipfs/go-ipfs/merkledag" + pin "github.com/ipfs/go-ipfs/pin" + + cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid" +) + +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan *cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +func NewPinnedProvider(pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) KeyChanFunc { + return func(ctx context.Context) (<-chan *cid.Cid, error) { + set, err := pinSet(ctx, pinning, dag, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan *cid.Cid) + go func() { + set.ForEach(func(c *cid.Cid) error { + select { + case <-ctx.Done(): + return errors.New("context cancelled") + case outCh <- c: + } + return nil + }) + }() + + return outCh, nil + } +} + +func pinSet(ctx context.Context, pinning pin.Pinner, dag merkledag.DAGService, onlyRoots bool) (*cid.Set, error) { + set := cid.NewSet() + for _, key := range pinning.DirectKeys() { + set.Add(key) + } + + for _, key := range pinning.RecursiveKeys() { + set.Add(key) + + if !onlyRoots { + err := merkledag.EnumerateChildren(ctx, dag.GetLinks, key, set.Visit) + if err != nil { + return nil, err + } + } + } + + return set, nil +} diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index afa66016f..3d42c8439 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -5,26 +5,27 @@ import ( "fmt" "time" - blocks "github.com/ipfs/go-ipfs/blocks/blockstore" backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff" routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid" ) var log = logging.Logger("reprovider") +type KeyChanFunc func(context.Context) (<-chan *cid.Cid, error) + type Reprovider struct { // The routing system to provide values through rsys routing.ContentRouting - // The backing store for blocks to be provided - bstore blocks.Blockstore + keyProvider KeyChanFunc } -func NewReprovider(rsys routing.ContentRouting, bstore blocks.Blockstore) *Reprovider { +func NewReprovider(rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { return &Reprovider{ - rsys: rsys, - bstore: bstore, + rsys: rsys, + keyProvider: keyProvider, } } @@ -48,9 +49,9 @@ func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) { } func (rp *Reprovider) Reprovide(ctx context.Context) error { - keychan, err := rp.bstore.AllKeysChan(ctx) + keychan, err := rp.keyProvider(ctx) if err != nil { - return fmt.Errorf("Failed to get key chan from blockstore: %s", err) + return fmt.Errorf("Failed to get key chan: %s", err) } for c := range keychan { op := func() error { diff --git a/exchange/reprovide/reprovide_test.go b/exchange/reprovide/reprovide_test.go index 2d755e526..8a613700f 100644 --- a/exchange/reprovide/reprovide_test.go +++ b/exchange/reprovide/reprovide_test.go @@ -32,7 +32,8 @@ func TestReprovide(t *testing.T) { blk := blocks.NewBlock([]byte("this is a test")) bstore.Put(blk) - reprov := NewReprovider(clA, bstore) + keyProvider := NewBlockstoreProvider(bstore) + reprov := NewReprovider(clA, keyProvider) err := reprov.Reprovide(ctx) if err != nil { t.Fatal(err) diff --git a/repo/config/init.go b/repo/config/init.go index aa129d97e..f31edd42b 100644 --- a/repo/config/init.go +++ b/repo/config/init.go @@ -72,6 +72,7 @@ func Init(out io.Writer, nBitsForKeypair int) (*Config, error) { }, Reprovider: Reprovider{ Interval: "12h", + Strategy: "all", }, } diff --git a/repo/config/reprovider.go b/repo/config/reprovider.go index 53cf293ab..fa029c2fc 100644 --- a/repo/config/reprovider.go +++ b/repo/config/reprovider.go @@ -2,4 +2,5 @@ package config type Reprovider struct { Interval string // Time period to reprovide locally stored objects to the network + Strategy string // Which keys to announce }