mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-26 12:57:44 +08:00
Merge pull request #6068 from ipfs/features/provide-roots-on-add-and-pin
Provide root node immediately on add and pin add
This commit is contained in:
commit
2ec7befff4
@ -5,6 +5,7 @@ import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"github.com/ipfs/go-ipfs/provider"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
@ -275,6 +276,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
|
||||
}
|
||||
n.Resolver = resolver.NewBasicResolver(n.DAG)
|
||||
|
||||
// Provider
|
||||
queue, err := provider.NewQueue(ctx, "provider-v1", n.Repo.Datastore())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.Provider = provider.NewProvider(ctx, queue, n.Routing)
|
||||
|
||||
if cfg.Online {
|
||||
if err := n.startLateOnlineServices(ctx); err != nil {
|
||||
return err
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
|
||||
p2p "github.com/ipfs/go-ipfs/p2p"
|
||||
pin "github.com/ipfs/go-ipfs/pin"
|
||||
provider "github.com/ipfs/go-ipfs/provider"
|
||||
repo "github.com/ipfs/go-ipfs/repo"
|
||||
|
||||
bitswap "github.com/ipfs/go-bitswap"
|
||||
@ -124,6 +125,7 @@ type IpfsNode struct {
|
||||
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
|
||||
Exchange exchange.Interface // the block exchange + strategy (bitswap)
|
||||
Namesys namesys.NameSystem // the name system, resolves paths to hashes
|
||||
Provider provider.Provider // the value provider system
|
||||
Reprovider *rp.Reprovider // the value reprovider system
|
||||
IpnsRepub *ipnsrp.Republisher
|
||||
|
||||
@ -324,6 +326,12 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Provider
|
||||
|
||||
n.Provider.Run()
|
||||
|
||||
// Reprovider
|
||||
|
||||
var keyProvider rp.KeyChanFunc
|
||||
|
||||
switch cfg.Reprovider.Strategy {
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
"github.com/ipfs/go-ipfs/namesys"
|
||||
"github.com/ipfs/go-ipfs/pin"
|
||||
"github.com/ipfs/go-ipfs/provider"
|
||||
"github.com/ipfs/go-ipfs/repo"
|
||||
|
||||
bserv "github.com/ipfs/go-blockservice"
|
||||
@ -66,6 +67,8 @@ type CoreAPI struct {
|
||||
namesys namesys.NameSystem
|
||||
routing routing.IpfsRouting
|
||||
|
||||
provider provider.Provider
|
||||
|
||||
pubSub *pubsub.PubSub
|
||||
|
||||
checkPublishAllowed func() error
|
||||
@ -174,6 +177,8 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
|
||||
exchange: n.Exchange,
|
||||
routing: n.Routing,
|
||||
|
||||
provider: n.Provider,
|
||||
|
||||
pubSub: n.PubSub,
|
||||
|
||||
nd: n,
|
||||
@ -210,6 +215,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
|
||||
|
||||
subApi.routing = offlineroute.NewOfflineRouter(subApi.repo.Datastore(), subApi.recordValidator)
|
||||
subApi.namesys = namesys.NewNameSystem(subApi.routing, subApi.repo.Datastore(), cs)
|
||||
subApi.provider = provider.NewOfflineProvider()
|
||||
|
||||
subApi.peerstore = nil
|
||||
subApi.peerHost = nil
|
||||
|
||||
@ -32,6 +32,10 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin
|
||||
return fmt.Errorf("pin: %s", err)
|
||||
}
|
||||
|
||||
if err := api.provider.Provide(dagNode.Cid()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return api.pinning.Flush()
|
||||
}
|
||||
|
||||
|
||||
13
core/coreapi/provider.go
Normal file
13
core/coreapi/provider.go
Normal file
@ -0,0 +1,13 @@
|
||||
package coreapi
|
||||
|
||||
import (
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// ProviderAPI brings Provider behavior to CoreAPI
|
||||
type ProviderAPI CoreAPI
|
||||
|
||||
// Provide the given cid using the current provider
|
||||
func (api *ProviderAPI) Provide(cid cid.Cid) error {
|
||||
return api.provider.Provide(cid)
|
||||
}
|
||||
@ -129,6 +129,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := api.provider.Provide(nd.Cid()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return coreiface.IpfsPath(nd.Cid()), nil
|
||||
}
|
||||
|
||||
|
||||
1
go.mod
1
go.mod
@ -30,6 +30,7 @@ require (
|
||||
github.com/ipfs/go-fs-lock v0.0.1
|
||||
github.com/ipfs/go-ipfs-addr v0.0.1
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1
|
||||
github.com/ipfs/go-ipfs-blocksutil v0.0.1
|
||||
github.com/ipfs/go-ipfs-chunker v0.0.1
|
||||
github.com/ipfs/go-ipfs-cmdkit v0.0.1
|
||||
github.com/ipfs/go-ipfs-cmds v0.0.2
|
||||
|
||||
16
provider/offline.go
Normal file
16
provider/offline.go
Normal file
@ -0,0 +1,16 @@
|
||||
package provider
|
||||
|
||||
import "github.com/ipfs/go-cid"
|
||||
|
||||
type offlineProvider struct{}
|
||||
|
||||
// NewOfflineProvider creates a Provider that does nothing
|
||||
func NewOfflineProvider() Provider {
|
||||
return &offlineProvider{}
|
||||
}
|
||||
|
||||
func (op *offlineProvider) Run() {}
|
||||
|
||||
func (op *offlineProvider) Provide(cid cid.Cid) error {
|
||||
return nil
|
||||
}
|
||||
71
provider/provider.go
Normal file
71
provider/provider.go
Normal file
@ -0,0 +1,71 @@
|
||||
// Package provider implements structures and methods to provide blocks,
|
||||
// keep track of which blocks are provided, and to allow those blocks to
|
||||
// be reprovided.
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-routing"
|
||||
)
|
||||
|
||||
var log = logging.Logger("provider")
|
||||
|
||||
const provideOutgoingWorkerLimit = 8
|
||||
|
||||
// Provider announces blocks to the network
|
||||
type Provider interface {
|
||||
// Run is used to begin processing the provider work
|
||||
Run()
|
||||
// Provide takes a cid and makes an attempt to announce it to the network
|
||||
Provide(cid.Cid) error
|
||||
}
|
||||
|
||||
type provider struct {
|
||||
ctx context.Context
|
||||
// the CIDs for which provide announcements should be made
|
||||
queue *Queue
|
||||
// used to announce providing to the network
|
||||
contentRouting routing.ContentRouting
|
||||
}
|
||||
|
||||
// NewProvider creates a provider that announces blocks to the network using a content router
|
||||
func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider {
|
||||
return &provider{
|
||||
ctx: ctx,
|
||||
queue: queue,
|
||||
contentRouting: contentRouting,
|
||||
}
|
||||
}
|
||||
|
||||
// Start workers to handle provide requests.
|
||||
func (p *provider) Run() {
|
||||
p.handleAnnouncements()
|
||||
}
|
||||
|
||||
// Provide the given cid using specified strategy.
|
||||
func (p *provider) Provide(root cid.Cid) error {
|
||||
p.queue.Enqueue(root)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handle all outgoing cids by providing (announcing) them
|
||||
func (p *provider) handleAnnouncements() {
|
||||
for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
|
||||
go func() {
|
||||
for p.ctx.Err() == nil {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
case c := <-p.queue.Dequeue():
|
||||
log.Info("announce - start - ", c)
|
||||
if err := p.contentRouting.Provide(p.ctx, c, true); err != nil {
|
||||
log.Warningf("Unable to provide entry: %s, %s", c, err)
|
||||
}
|
||||
log.Info("announce - end - ", c)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
79
provider/provider_test.go
Normal file
79
provider/provider_test.go
Normal file
@ -0,0 +1,79 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
datastore "github.com/ipfs/go-datastore"
|
||||
sync "github.com/ipfs/go-datastore/sync"
|
||||
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
|
||||
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
)
|
||||
|
||||
var blockGenerator = blocksutil.NewBlockGenerator()
|
||||
|
||||
type mockRouting struct {
|
||||
provided chan cid.Cid
|
||||
}
|
||||
|
||||
func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error {
|
||||
r.provided <- cid
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo {
|
||||
return nil
|
||||
}
|
||||
|
||||
func mockContentRouting() *mockRouting {
|
||||
r := mockRouting{}
|
||||
r.provided = make(chan cid.Cid)
|
||||
return &r
|
||||
}
|
||||
|
||||
func TestAnnouncement(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
ds := sync.MutexWrap(datastore.NewMapDatastore())
|
||||
queue, err := NewQueue(ctx, "test", ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := mockContentRouting()
|
||||
|
||||
provider := NewProvider(ctx, queue, r)
|
||||
provider.Run()
|
||||
|
||||
cids := cid.NewSet()
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
c := blockGenerator.Next().Cid()
|
||||
cids.Add(c)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for _, c := range cids.Keys() {
|
||||
err = provider.Provide(c)
|
||||
// A little goroutine stirring to exercise some different states
|
||||
r := rand.Intn(10)
|
||||
time.Sleep(time.Microsecond * time.Duration(r))
|
||||
}
|
||||
}()
|
||||
|
||||
for cids.Len() > 0 {
|
||||
select {
|
||||
case cp := <-r.provided:
|
||||
if !cids.Has(cp) {
|
||||
t.Fatal("Wrong CID provided")
|
||||
}
|
||||
cids.Remove(cp)
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatal("Timeout waiting for cids to be provided.")
|
||||
}
|
||||
}
|
||||
}
|
||||
181
provider/queue.go
Normal file
181
provider/queue.go
Normal file
@ -0,0 +1,181 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
datastore "github.com/ipfs/go-datastore"
|
||||
namespace "github.com/ipfs/go-datastore/namespace"
|
||||
query "github.com/ipfs/go-datastore/query"
|
||||
)
|
||||
|
||||
// Queue provides a durable, FIFO interface to the datastore for storing cids
|
||||
//
|
||||
// Durability just means that cids in the process of being provided when a
|
||||
// crash or shutdown occurs will still be in the queue when the node is
|
||||
// brought back online.
|
||||
type Queue struct {
|
||||
// used to differentiate queues in datastore
|
||||
// e.g. provider vs reprovider
|
||||
name string
|
||||
ctx context.Context
|
||||
tail uint64
|
||||
head uint64
|
||||
ds datastore.Datastore // Must be threadsafe
|
||||
dequeue chan cid.Cid
|
||||
enqueue chan cid.Cid
|
||||
}
|
||||
|
||||
// NewQueue creates a queue for cids
|
||||
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
|
||||
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
|
||||
head, tail, err := getQueueHeadTail(ctx, name, namespaced)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q := &Queue{
|
||||
name: name,
|
||||
ctx: ctx,
|
||||
head: head,
|
||||
tail: tail,
|
||||
ds: namespaced,
|
||||
dequeue: make(chan cid.Cid),
|
||||
enqueue: make(chan cid.Cid),
|
||||
}
|
||||
q.work()
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// Enqueue puts a cid in the queue
|
||||
func (q *Queue) Enqueue(cid cid.Cid) {
|
||||
select {
|
||||
case q.enqueue <- cid:
|
||||
case <-q.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
// Dequeue returns a channel that if listened to will remove entries from the queue
|
||||
func (q *Queue) Dequeue() <-chan cid.Cid {
|
||||
return q.dequeue
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
cid cid.Cid
|
||||
key datastore.Key
|
||||
}
|
||||
|
||||
// Look for next Cid in the queue and return it. Skip over gaps and mangled data
|
||||
func (q *Queue) nextEntry() (datastore.Key, cid.Cid) {
|
||||
for {
|
||||
if q.head >= q.tail {
|
||||
return datastore.Key{}, cid.Undef
|
||||
}
|
||||
|
||||
key := q.queueKey(q.head)
|
||||
value, err := q.ds.Get(key)
|
||||
|
||||
if err == datastore.ErrNotFound {
|
||||
log.Warningf("Error missing entry in queue: %s", key)
|
||||
q.head++ // move on
|
||||
continue
|
||||
} else if err != nil {
|
||||
log.Warningf("Error fetching from queue: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
c, err := cid.Parse(value)
|
||||
if err != nil {
|
||||
log.Warningf("Error marshalling Cid from queue: ", err)
|
||||
q.head++
|
||||
err = q.ds.Delete(key)
|
||||
continue
|
||||
}
|
||||
|
||||
return key, c
|
||||
}
|
||||
}
|
||||
|
||||
// Run dequeues and enqueues when available.
|
||||
func (q *Queue) work() {
|
||||
go func() {
|
||||
var k datastore.Key = datastore.Key{}
|
||||
var c cid.Cid = cid.Undef
|
||||
|
||||
for {
|
||||
if c == cid.Undef {
|
||||
k, c = q.nextEntry()
|
||||
}
|
||||
|
||||
// If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue
|
||||
var dequeue chan cid.Cid
|
||||
if c != cid.Undef {
|
||||
dequeue = q.dequeue
|
||||
}
|
||||
|
||||
select {
|
||||
case toQueue := <-q.enqueue:
|
||||
nextKey := q.queueKey(q.tail)
|
||||
|
||||
if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
|
||||
log.Errorf("Failed to enqueue cid: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
q.tail++
|
||||
case dequeue <- c:
|
||||
err := q.ds.Delete(k)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
|
||||
continue
|
||||
}
|
||||
c = cid.Undef
|
||||
q.head++
|
||||
case <-q.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (q *Queue) queueKey(id uint64) datastore.Key {
|
||||
return datastore.NewKey(strconv.FormatUint(id, 10))
|
||||
}
|
||||
|
||||
// crawl over the queue entries to find the head and tail
|
||||
func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) {
|
||||
q := query.Query{}
|
||||
results, err := datastore.Query(q)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
var tail uint64
|
||||
var head uint64 = math.MaxUint64
|
||||
for entry := range results.Next() {
|
||||
trimmed := strings.TrimPrefix(entry.Key, "/")
|
||||
id, err := strconv.ParseUint(trimmed, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
if id < head {
|
||||
head = id
|
||||
}
|
||||
|
||||
if (id + 1) > tail {
|
||||
tail = (id + 1)
|
||||
}
|
||||
}
|
||||
if err := results.Close(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
if head == math.MaxUint64 {
|
||||
head = 0
|
||||
}
|
||||
|
||||
return head, tail, nil
|
||||
}
|
||||
131
provider/queue_test.go
Normal file
131
provider/queue_test.go
Normal file
@ -0,0 +1,131 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
datastore "github.com/ipfs/go-datastore"
|
||||
sync "github.com/ipfs/go-datastore/sync"
|
||||
)
|
||||
|
||||
func makeCids(n int) []cid.Cid {
|
||||
cids := make([]cid.Cid, 0, n)
|
||||
for i := 0; i < 10; i++ {
|
||||
c := blockGenerator.Next().Cid()
|
||||
cids = append(cids, c)
|
||||
}
|
||||
return cids
|
||||
}
|
||||
|
||||
func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) {
|
||||
for _, c := range cids {
|
||||
select {
|
||||
case dequeued := <-q.dequeue:
|
||||
if c != dequeued {
|
||||
t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued)
|
||||
}
|
||||
|
||||
case <-time.After(time.Second * 1):
|
||||
t.Fatal("Timeout waiting for cids to be provided.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBasicOperation(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
ds := sync.MutexWrap(datastore.NewMapDatastore())
|
||||
queue, err := NewQueue(ctx, "test", ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cids := makeCids(10)
|
||||
|
||||
for _, c := range cids {
|
||||
queue.Enqueue(c)
|
||||
}
|
||||
|
||||
assertOrdered(cids, queue, t)
|
||||
}
|
||||
|
||||
func TestSparseDatastore(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
ds := sync.MutexWrap(datastore.NewMapDatastore())
|
||||
queue, err := NewQueue(ctx, "test", ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cids := makeCids(10)
|
||||
for _, c := range cids {
|
||||
queue.Enqueue(c)
|
||||
}
|
||||
|
||||
// remove entries in the middle
|
||||
err = queue.ds.Delete(queue.queueKey(5))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = queue.ds.Delete(queue.queueKey(6))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected := append(cids[:5], cids[7:]...)
|
||||
assertOrdered(expected, queue, t)
|
||||
}
|
||||
|
||||
func TestMangledData(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
ds := sync.MutexWrap(datastore.NewMapDatastore())
|
||||
queue, err := NewQueue(ctx, "test", ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cids := makeCids(10)
|
||||
for _, c := range cids {
|
||||
queue.Enqueue(c)
|
||||
}
|
||||
|
||||
// remove entries in the middle
|
||||
err = queue.ds.Put(queue.queueKey(5), []byte("borked"))
|
||||
|
||||
expected := append(cids[:5], cids[6:]...)
|
||||
assertOrdered(expected, queue, t)
|
||||
}
|
||||
|
||||
func TestInitialization(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
ds := sync.MutexWrap(datastore.NewMapDatastore())
|
||||
queue, err := NewQueue(ctx, "test", ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cids := makeCids(10)
|
||||
for _, c := range cids {
|
||||
queue.Enqueue(c)
|
||||
}
|
||||
|
||||
assertOrdered(cids[:5], queue, t)
|
||||
|
||||
// make a new queue, same data
|
||||
queue, err = NewQueue(ctx, "test", ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assertOrdered(cids[5:], queue, t)
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user