From 3269986e4281f24fc5a23de5b5d89aa5b23be0ac Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 13 Jan 2015 22:00:33 +0000 Subject: [PATCH] basic reprovider implementation make vendor --- core/core.go | 6 ++++ exchange/reprovide/reprovide.go | 55 +++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 exchange/reprovide/reprovide.go diff --git a/core/core.go b/core/core.go index 2477354f6..163b286e7 100644 --- a/core/core.go +++ b/core/core.go @@ -16,6 +16,7 @@ import ( bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" offline "github.com/jbenet/go-ipfs/exchange/offline" + rp "github.com/jbenet/go-ipfs/exchange/reprovide" mount "github.com/jbenet/go-ipfs/fuse/mount" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" @@ -79,6 +80,7 @@ type IpfsNode struct { Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes Diagnostics *diag.Diagnostics // the diagnostics service + Reprovider *rp.Reprovider // the value reprovider system ctxgroup.ContextGroup @@ -183,6 +185,10 @@ func Standard(cfg *config.Config, online bool) ConfigOption { if err := n.StartOnlineServices(); err != nil { return nil, err // debugerror.Wraps. } + + // Start up reprovider system + n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) + go n.Reprovider.Run(ctx) } else { n.Exchange = offline.Exchange(n.Blockstore) } diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go new file mode 100644 index 000000000..e6c15f621 --- /dev/null +++ b/exchange/reprovide/reprovide.go @@ -0,0 +1,55 @@ +package reprovide + +import ( + "time" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + blocks "github.com/jbenet/go-ipfs/blocks/blockstore" + routing "github.com/jbenet/go-ipfs/routing" + "github.com/jbenet/go-ipfs/util/eventlog" +) + +var log = eventlog.Logger("reprovider") + +type Reprovider struct { + // The routing system to provide values through + rsys routing.IpfsRouting + + // The backing store for blocks to be provided + bstore blocks.Blockstore +} + +func NewReprovider(rsys routing.IpfsRouting, bstore blocks.Blockstore) *Reprovider { + return &Reprovider{ + rsys: rsys, + bstore: bstore, + } +} + +func (rp *Reprovider) Run(ctx context.Context) { + after := time.After(0) + for { + select { + case <-ctx.Done(): + return + case <-after: + rp.reprovide(ctx) + after = time.After(time.Hour * 12) + } + } +} + +func (rp *Reprovider) reprovide(ctx context.Context) { + keychan, err := rp.bstore.AllKeysChan(ctx, 0, 1<<16) + if err != nil { + log.Errorf("Failed to get key chan from blockstore: %s", err) + return + } + for k := range keychan { + err := rp.rsys.Provide(ctx, k) + if err != nil { + log.Errorf("Failed to provide key: %s, %s", k, err) + } + } +}