mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
Provide root node immediately when add and pin add
License: MIT Signed-off-by: Michael Avila <davidmichaelavila@gmail.com>
This commit is contained in:
parent
245c40b8fd
commit
a9b6534687
@ -5,6 +5,7 @@ import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"github.com/ipfs/go-ipfs/provider"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
@ -275,6 +276,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
|
||||
}
|
||||
n.Resolver = resolver.NewBasicResolver(n.DAG)
|
||||
|
||||
// Provider
|
||||
queue, err := provider.NewQueue("provider-v1", ctx, n.Repo.Datastore())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.Provider = provider.NewProvider(ctx, queue, n.Routing)
|
||||
|
||||
if cfg.Online {
|
||||
if err := n.startLateOnlineServices(ctx); err != nil {
|
||||
return err
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/ipfs/go-ipfs/provider"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@ -124,6 +125,7 @@ type IpfsNode struct {
|
||||
Routing routing.IpfsRouting // the routing system. recommend ipfs-dht
|
||||
Exchange exchange.Interface // the block exchange + strategy (bitswap)
|
||||
Namesys namesys.NameSystem // the name system, resolves paths to hashes
|
||||
Provider *provider.Provider // the value provider system
|
||||
Reprovider *rp.Reprovider // the value reprovider system
|
||||
IpnsRepub *ipnsrp.Republisher
|
||||
|
||||
@ -324,6 +326,12 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Provider
|
||||
|
||||
n.Provider.Run()
|
||||
|
||||
// Reprovider
|
||||
|
||||
var keyProvider rp.KeyChanFunc
|
||||
|
||||
switch cfg.Reprovider.Strategy {
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
"github.com/ipfs/go-ipfs/namesys"
|
||||
"github.com/ipfs/go-ipfs/provider"
|
||||
"github.com/ipfs/go-ipfs/pin"
|
||||
"github.com/ipfs/go-ipfs/repo"
|
||||
|
||||
@ -66,6 +67,8 @@ type CoreAPI struct {
|
||||
namesys namesys.NameSystem
|
||||
routing routing.IpfsRouting
|
||||
|
||||
provider *provider.Provider
|
||||
|
||||
pubSub *pubsub.PubSub
|
||||
|
||||
checkPublishAllowed func() error
|
||||
@ -174,6 +177,8 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
|
||||
exchange: n.Exchange,
|
||||
routing: n.Routing,
|
||||
|
||||
provider: n.Provider,
|
||||
|
||||
pubSub: n.PubSub,
|
||||
|
||||
nd: n,
|
||||
|
||||
@ -32,6 +32,10 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin
|
||||
return fmt.Errorf("pin: %s", err)
|
||||
}
|
||||
|
||||
if err := api.provider.Provide(dagNode.Cid()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return api.pinning.Flush()
|
||||
}
|
||||
|
||||
|
||||
11
core/coreapi/provider.go
Normal file
11
core/coreapi/provider.go
Normal file
@ -0,0 +1,11 @@
|
||||
package coreapi
|
||||
|
||||
import (
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
type ProviderAPI CoreAPI
|
||||
|
||||
func (api *ProviderAPI) Provide(root cid.Cid) error {
|
||||
return api.provider.Provide(root)
|
||||
}
|
||||
@ -129,6 +129,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := api.provider.Provide(nd.Cid()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return coreiface.IpfsPath(nd.Cid()), nil
|
||||
}
|
||||
|
||||
|
||||
88
provider/provider.go
Normal file
88
provider/provider.go
Normal file
@ -0,0 +1,88 @@
|
||||
// Package provider implements structures and methods to provide blocks,
|
||||
// keep track of which blocks are provided, and to allow those blocks to
|
||||
// be reprovided.
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-routing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
log = logging.Logger("provider")
|
||||
)
|
||||
|
||||
const (
|
||||
provideOutgoingWorkerLimit = 8
|
||||
provideOutgoingTimeout = 15 * time.Second
|
||||
)
|
||||
|
||||
// Provider announces blocks to the network, tracks which blocks are
|
||||
// being provided, and untracks blocks when they're no longer in the blockstore.
|
||||
type Provider struct {
|
||||
ctx context.Context
|
||||
// the CIDs for which provide announcements should be made
|
||||
queue *Queue
|
||||
// used to announce providing to the network
|
||||
contentRouting routing.ContentRouting
|
||||
}
|
||||
|
||||
func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) *Provider {
|
||||
return &Provider{
|
||||
ctx: ctx,
|
||||
queue: queue,
|
||||
contentRouting: contentRouting,
|
||||
}
|
||||
}
|
||||
|
||||
// Start workers to handle provide requests.
|
||||
func (p *Provider) Run() {
|
||||
p.queue.Run()
|
||||
p.handleAnnouncements()
|
||||
}
|
||||
|
||||
// Provide the given cid using specified strategy.
|
||||
func (p *Provider) Provide(root cid.Cid) error {
|
||||
return p.queue.Enqueue(root)
|
||||
}
|
||||
|
||||
// Handle all outgoing cids by providing (announcing) them
|
||||
func (p *Provider) handleAnnouncements() {
|
||||
for workers := 0; workers < provideOutgoingWorkerLimit; workers++ {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
case entry := <-p.queue.Dequeue():
|
||||
if err := doProvide(p.ctx, p.contentRouting, entry.cid); err != nil {
|
||||
log.Warningf("Unable to provide entry: %s, %s", entry.cid, err)
|
||||
}
|
||||
|
||||
if err := entry.Complete(); err != nil {
|
||||
log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: better document this provide logic
|
||||
func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error {
|
||||
// announce
|
||||
log.Info("announce - start - ", key)
|
||||
ctx, cancel := context.WithTimeout(ctx, provideOutgoingTimeout)
|
||||
if err := contentRouting.Provide(ctx, key, true); err != nil {
|
||||
log.Warningf("Failed to provide cid: %s", err)
|
||||
// TODO: Maybe put these failures onto a failures queue?
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
cancel()
|
||||
log.Info("announce - end - ", key)
|
||||
return nil
|
||||
}
|
||||
235
provider/queue.go
Normal file
235
provider/queue.go
Normal file
@ -0,0 +1,235 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/ipfs/go-cid"
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
"github.com/ipfs/go-datastore/query"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Entry allows for the durability in the queue. When a cid is dequeued it is
|
||||
// not removed from the datastore until you call Complete() on the entry you
|
||||
// receive.
|
||||
type Entry struct {
|
||||
cid cid.Cid
|
||||
key ds.Key
|
||||
queue *Queue
|
||||
}
|
||||
|
||||
func (e *Entry) Complete() error {
|
||||
return e.queue.remove(e.key)
|
||||
}
|
||||
|
||||
// Queue provides a durable, FIFO interface to the datastore for storing cids
|
||||
//
|
||||
// Durability just means that cids in the process of being provided when a
|
||||
// crash or shutdown occurs will still be in the queue when the node is
|
||||
// brought back online.
|
||||
type Queue struct {
|
||||
// used to differentiate queues in datastore
|
||||
// e.g. provider vs reprovider
|
||||
name string
|
||||
|
||||
ctx context.Context
|
||||
|
||||
tail uint64
|
||||
head uint64
|
||||
|
||||
lock sync.Mutex
|
||||
datastore ds.Datastore
|
||||
|
||||
dequeue chan *Entry
|
||||
notEmpty chan struct{}
|
||||
|
||||
isRunning bool
|
||||
}
|
||||
|
||||
func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) {
|
||||
namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/"))
|
||||
head, tail, err := getQueueHeadTail(name, ctx, namespaced)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q := &Queue{
|
||||
name: name,
|
||||
ctx: ctx,
|
||||
head: head,
|
||||
tail: tail,
|
||||
lock: sync.Mutex{},
|
||||
datastore: namespaced,
|
||||
dequeue: make(chan *Entry),
|
||||
notEmpty: make(chan struct{}),
|
||||
isRunning: false,
|
||||
}
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// Put a cid in the queue
|
||||
func (q *Queue) Enqueue(cid cid.Cid) error {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
wasEmpty := q.IsEmpty()
|
||||
|
||||
nextKey := q.queueKey(q.tail)
|
||||
|
||||
if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
q.tail++
|
||||
|
||||
if q.isRunning && wasEmpty {
|
||||
select {
|
||||
case q.notEmpty <- struct{}{}:
|
||||
case <-q.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove an entry from the queue.
|
||||
func (q *Queue) Dequeue() <-chan *Entry {
|
||||
return q.dequeue
|
||||
}
|
||||
|
||||
func (q *Queue) IsEmpty() bool {
|
||||
return (q.tail - q.head) == 0
|
||||
}
|
||||
|
||||
func (q *Queue) remove(key ds.Key) error {
|
||||
return q.datastore.Delete(key)
|
||||
}
|
||||
|
||||
// dequeue items when the dequeue channel is available to
|
||||
// be written to
|
||||
func (q *Queue) Run() {
|
||||
q.isRunning = true
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
if q.IsEmpty() {
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
return
|
||||
// wait for a notEmpty message
|
||||
case <-q.notEmpty:
|
||||
}
|
||||
}
|
||||
|
||||
entry, err := q.next()
|
||||
if err != nil {
|
||||
log.Warningf("Error Dequeue()-ing: %s, %s", entry, err)
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
return
|
||||
case q.dequeue <- entry:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Find the next item in the queue, crawl forward if an entry is not
|
||||
// found in the next spot.
|
||||
func (q *Queue) next() (*Entry, error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
var nextKey ds.Key
|
||||
var value []byte
|
||||
var err error
|
||||
for {
|
||||
if q.head >= q.tail {
|
||||
return nil, errors.New("no more entries in queue")
|
||||
}
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
return nil, nil
|
||||
default:
|
||||
}
|
||||
nextKey = q.queueKey(q.head)
|
||||
value, err = q.datastore.Get(nextKey)
|
||||
if err == ds.ErrNotFound {
|
||||
q.head++
|
||||
continue
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
id, err := cid.Parse(value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entry := &Entry {
|
||||
cid: id,
|
||||
key: nextKey,
|
||||
queue: q,
|
||||
}
|
||||
|
||||
q.head++
|
||||
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
func (q *Queue) queueKey(id uint64) ds.Key {
|
||||
return ds.NewKey(strconv.FormatUint(id, 10))
|
||||
}
|
||||
|
||||
// crawl over the queue entries to find the head and tail
|
||||
func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) {
|
||||
query := query.Query{}
|
||||
results, err := datastore.Query(query)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
var tail uint64 = 0
|
||||
var head uint64 = math.MaxUint64
|
||||
for entry := range results.Next() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, 0, nil
|
||||
default:
|
||||
}
|
||||
trimmed := strings.TrimPrefix(entry.Key, "/")
|
||||
id, err := strconv.ParseUint(trimmed, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
if id < head {
|
||||
head = id
|
||||
}
|
||||
|
||||
if (id+1) > tail {
|
||||
tail = (id+1)
|
||||
}
|
||||
}
|
||||
if err := results.Close(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
if head == math.MaxUint64 {
|
||||
head = 0
|
||||
}
|
||||
|
||||
return head, tail, nil
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user