From a9b65346871e845a67bbd19c668b699c55095e41 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 8 Mar 2019 14:19:29 -0800 Subject: [PATCH 01/13] Provide root node immediately when add and pin add License: MIT Signed-off-by: Michael Avila --- core/builder.go | 8 ++ core/core.go | 8 ++ core/coreapi/coreapi.go | 5 + core/coreapi/pin.go | 4 + core/coreapi/provider.go | 11 ++ core/coreapi/unixfs.go | 5 + provider/provider.go | 88 +++++++++++++++ provider/queue.go | 235 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 364 insertions(+) create mode 100644 core/coreapi/provider.go create mode 100644 provider/provider.go create mode 100644 provider/queue.go diff --git a/core/builder.go b/core/builder.go index 3ddc58f3c..2d6d8b664 100644 --- a/core/builder.go +++ b/core/builder.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/base64" "errors" + "github.com/ipfs/go-ipfs/provider" "os" "syscall" "time" @@ -275,6 +276,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } n.Resolver = resolver.NewBasicResolver(n.DAG) + // Provider + queue, err := provider.NewQueue("provider-v1", ctx, n.Repo.Datastore()) + if err != nil { + return err + } + n.Provider = provider.NewProvider(ctx, queue, n.Routing) + if cfg.Online { if err := n.startLateOnlineServices(ctx); err != nil { return err diff --git a/core/core.go b/core/core.go index 0817118b3..28cf6047a 100644 --- a/core/core.go +++ b/core/core.go @@ -14,6 +14,7 @@ import ( "context" "errors" "fmt" + "github.com/ipfs/go-ipfs/provider" "io" "io/ioutil" "os" @@ -124,6 +125,7 @@ type IpfsNode struct { Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes + Provider *provider.Provider // the value provider system Reprovider *rp.Reprovider // the value reprovider system IpnsRepub *ipnsrp.Republisher @@ -324,6 +326,12 @@ func (n *IpfsNode) startLateOnlineServices(ctx context.Context) error { return err } + // Provider + + n.Provider.Run() + + // Reprovider + var keyProvider rp.KeyChanFunc switch cfg.Reprovider.Strategy { diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index c5ba1b566..a10af96e9 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/namesys" + "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/repo" @@ -66,6 +67,8 @@ type CoreAPI struct { namesys namesys.NameSystem routing routing.IpfsRouting + provider *provider.Provider + pubSub *pubsub.PubSub checkPublishAllowed func() error @@ -174,6 +177,8 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e exchange: n.Exchange, routing: n.Routing, + provider: n.Provider, + pubSub: n.PubSub, nd: n, diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 1976517dd..df478732c 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -32,6 +32,10 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin return fmt.Errorf("pin: %s", err) } + if err := api.provider.Provide(dagNode.Cid()); err != nil { + return err + } + return api.pinning.Flush() } diff --git a/core/coreapi/provider.go b/core/coreapi/provider.go new file mode 100644 index 000000000..b22a3811c --- /dev/null +++ b/core/coreapi/provider.go @@ -0,0 +1,11 @@ +package coreapi + +import ( + cid "github.com/ipfs/go-cid" +) + +type ProviderAPI CoreAPI + +func (api *ProviderAPI) Provide(root cid.Cid) error { + return api.provider.Provide(root) +} diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index e26c755b9..c840280bb 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -129,6 +129,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options if err != nil { return nil, err } + + if err := api.provider.Provide(nd.Cid()); err != nil { + return nil, err + } + return coreiface.IpfsPath(nd.Cid()), nil } diff --git a/provider/provider.go b/provider/provider.go new file mode 100644 index 000000000..ee0481a0f --- /dev/null +++ b/provider/provider.go @@ -0,0 +1,88 @@ +// Package provider implements structures and methods to provide blocks, +// keep track of which blocks are provided, and to allow those blocks to +// be reprovided. +package provider + +import ( + "context" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-routing" + "time" +) + +var ( + log = logging.Logger("provider") +) + +const ( + provideOutgoingWorkerLimit = 8 + provideOutgoingTimeout = 15 * time.Second +) + +// Provider announces blocks to the network, tracks which blocks are +// being provided, and untracks blocks when they're no longer in the blockstore. +type Provider struct { + ctx context.Context + // the CIDs for which provide announcements should be made + queue *Queue + // used to announce providing to the network + contentRouting routing.ContentRouting +} + +func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) *Provider { + return &Provider{ + ctx: ctx, + queue: queue, + contentRouting: contentRouting, + } +} + +// Start workers to handle provide requests. +func (p *Provider) Run() { + p.queue.Run() + p.handleAnnouncements() +} + +// Provide the given cid using specified strategy. +func (p *Provider) Provide(root cid.Cid) error { + return p.queue.Enqueue(root) +} + +// Handle all outgoing cids by providing (announcing) them +func (p *Provider) handleAnnouncements() { + for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { + go func() { + for { + select { + case <-p.ctx.Done(): + return + case entry := <-p.queue.Dequeue(): + if err := doProvide(p.ctx, p.contentRouting, entry.cid); err != nil { + log.Warningf("Unable to provide entry: %s, %s", entry.cid, err) + } + + if err := entry.Complete(); err != nil { + log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err) + } + } + } + }() + } +} + +// TODO: better document this provide logic +func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error { + // announce + log.Info("announce - start - ", key) + ctx, cancel := context.WithTimeout(ctx, provideOutgoingTimeout) + if err := contentRouting.Provide(ctx, key, true); err != nil { + log.Warningf("Failed to provide cid: %s", err) + // TODO: Maybe put these failures onto a failures queue? + cancel() + return err + } + cancel() + log.Info("announce - end - ", key) + return nil +} diff --git a/provider/queue.go b/provider/queue.go new file mode 100644 index 000000000..65656450a --- /dev/null +++ b/provider/queue.go @@ -0,0 +1,235 @@ +package provider + +import ( + "context" + "errors" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" + "math" + "strconv" + "strings" + "sync" +) + +// Entry allows for the durability in the queue. When a cid is dequeued it is +// not removed from the datastore until you call Complete() on the entry you +// receive. +type Entry struct { + cid cid.Cid + key ds.Key + queue *Queue +} + +func (e *Entry) Complete() error { + return e.queue.remove(e.key) +} + +// Queue provides a durable, FIFO interface to the datastore for storing cids +// +// Durability just means that cids in the process of being provided when a +// crash or shutdown occurs will still be in the queue when the node is +// brought back online. +type Queue struct { + // used to differentiate queues in datastore + // e.g. provider vs reprovider + name string + + ctx context.Context + + tail uint64 + head uint64 + + lock sync.Mutex + datastore ds.Datastore + + dequeue chan *Entry + notEmpty chan struct{} + + isRunning bool +} + +func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/")) + head, tail, err := getQueueHeadTail(name, ctx, namespaced) + if err != nil { + return nil, err + } + q := &Queue{ + name: name, + ctx: ctx, + head: head, + tail: tail, + lock: sync.Mutex{}, + datastore: namespaced, + dequeue: make(chan *Entry), + notEmpty: make(chan struct{}), + isRunning: false, + } + return q, nil +} + +// Put a cid in the queue +func (q *Queue) Enqueue(cid cid.Cid) error { + q.lock.Lock() + defer q.lock.Unlock() + + wasEmpty := q.IsEmpty() + + nextKey := q.queueKey(q.tail) + + if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil { + return err + } + + q.tail++ + + if q.isRunning && wasEmpty { + select { + case q.notEmpty <- struct{}{}: + case <-q.ctx.Done(): + } + } + + return nil +} + +// Remove an entry from the queue. +func (q *Queue) Dequeue() <-chan *Entry { + return q.dequeue +} + +func (q *Queue) IsEmpty() bool { + return (q.tail - q.head) == 0 +} + +func (q *Queue) remove(key ds.Key) error { + return q.datastore.Delete(key) +} + +// dequeue items when the dequeue channel is available to +// be written to +func (q *Queue) Run() { + q.isRunning = true + go func() { + for { + select { + case <-q.ctx.Done(): + return + default: + } + if q.IsEmpty() { + select { + case <-q.ctx.Done(): + return + // wait for a notEmpty message + case <-q.notEmpty: + } + } + + entry, err := q.next() + if err != nil { + log.Warningf("Error Dequeue()-ing: %s, %s", entry, err) + continue + } + + select { + case <-q.ctx.Done(): + return + case q.dequeue <- entry: + } + } + }() +} + +// Find the next item in the queue, crawl forward if an entry is not +// found in the next spot. +func (q *Queue) next() (*Entry, error) { + q.lock.Lock() + defer q.lock.Unlock() + + var nextKey ds.Key + var value []byte + var err error + for { + if q.head >= q.tail { + return nil, errors.New("no more entries in queue") + } + select { + case <-q.ctx.Done(): + return nil, nil + default: + } + nextKey = q.queueKey(q.head) + value, err = q.datastore.Get(nextKey) + if err == ds.ErrNotFound { + q.head++ + continue + } else if err != nil { + return nil, err + } else { + break + } + } + + id, err := cid.Parse(value) + if err != nil { + return nil, err + } + + entry := &Entry { + cid: id, + key: nextKey, + queue: q, + } + + q.head++ + + return entry, nil +} + +func (q *Queue) queueKey(id uint64) ds.Key { + return ds.NewKey(strconv.FormatUint(id, 10)) +} + +// crawl over the queue entries to find the head and tail +func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) { + query := query.Query{} + results, err := datastore.Query(query) + if err != nil { + return 0, 0, err + } + + var tail uint64 = 0 + var head uint64 = math.MaxUint64 + for entry := range results.Next() { + select { + case <-ctx.Done(): + return 0, 0, nil + default: + } + trimmed := strings.TrimPrefix(entry.Key, "/") + id, err := strconv.ParseUint(trimmed, 10, 64) + if err != nil { + return 0, 0, err + } + + if id < head { + head = id + } + + if (id+1) > tail { + tail = (id+1) + } + } + if err := results.Close(); err != nil { + return 0, 0, err + } + if head == math.MaxUint64 { + head = 0 + } + + return head, tail, nil +} + From d786a69e72c6423808c4f6c675194f62a98d91de Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 8 Mar 2019 15:49:25 -0800 Subject: [PATCH 02/13] Remove timeout from provide context This is being removed because it appears that the provide announcements go out regardless of the timeout. License: MIT Signed-off-by: Michael Avila --- provider/provider.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/provider/provider.go b/provider/provider.go index ee0481a0f..e43058b39 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-routing" - "time" ) var ( @@ -17,7 +16,6 @@ var ( const ( provideOutgoingWorkerLimit = 8 - provideOutgoingTimeout = 15 * time.Second ) // Provider announces blocks to the network, tracks which blocks are @@ -75,14 +73,11 @@ func (p *Provider) handleAnnouncements() { func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error { // announce log.Info("announce - start - ", key) - ctx, cancel := context.WithTimeout(ctx, provideOutgoingTimeout) if err := contentRouting.Provide(ctx, key, true); err != nil { log.Warningf("Failed to provide cid: %s", err) // TODO: Maybe put these failures onto a failures queue? - cancel() return err } - cancel() log.Info("announce - end - ", key) return nil } From bfcea27d396c994406439bf74c77b309b7310485 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 8 Mar 2019 16:31:51 -0800 Subject: [PATCH 03/13] Use offlineProvider when --offline License: MIT Signed-off-by: Michael Avila --- core/core.go | 2 +- core/coreapi/coreapi.go | 3 ++- provider/offline.go | 15 +++++++++++++++ provider/provider.go | 17 +++++++++++------ 4 files changed, 29 insertions(+), 8 deletions(-) create mode 100644 provider/offline.go diff --git a/core/core.go b/core/core.go index 28cf6047a..db75e3df0 100644 --- a/core/core.go +++ b/core/core.go @@ -125,7 +125,7 @@ type IpfsNode struct { Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes - Provider *provider.Provider // the value provider system + Provider provider.Provider // the value provider system Reprovider *rp.Reprovider // the value reprovider system IpnsRepub *ipnsrp.Republisher diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index a10af96e9..5045c88eb 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -67,7 +67,7 @@ type CoreAPI struct { namesys namesys.NameSystem routing routing.IpfsRouting - provider *provider.Provider + provider provider.Provider pubSub *pubsub.PubSub @@ -215,6 +215,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e subApi.routing = offlineroute.NewOfflineRouter(subApi.repo.Datastore(), subApi.recordValidator) subApi.namesys = namesys.NewNameSystem(subApi.routing, subApi.repo.Datastore(), cs) + subApi.provider = provider.NewOfflineProvider() subApi.peerstore = nil subApi.peerHost = nil diff --git a/provider/offline.go b/provider/offline.go new file mode 100644 index 000000000..f7b9603b9 --- /dev/null +++ b/provider/offline.go @@ -0,0 +1,15 @@ +package provider + +import "github.com/ipfs/go-cid" + +type offlineProvider struct {} + +func NewOfflineProvider() Provider { + return &offlineProvider{} +} + +func (op *offlineProvider) Run() {} + +func (op *offlineProvider) Provide(cid cid.Cid) error { + return nil +} diff --git a/provider/provider.go b/provider/provider.go index e43058b39..e4ee6d9ff 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -18,9 +18,14 @@ const ( provideOutgoingWorkerLimit = 8 ) +type Provider interface { + Run() + Provide(cid.Cid) error +} + // Provider announces blocks to the network, tracks which blocks are // being provided, and untracks blocks when they're no longer in the blockstore. -type Provider struct { +type provider struct { ctx context.Context // the CIDs for which provide announcements should be made queue *Queue @@ -28,8 +33,8 @@ type Provider struct { contentRouting routing.ContentRouting } -func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) *Provider { - return &Provider{ +func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider { + return &provider{ ctx: ctx, queue: queue, contentRouting: contentRouting, @@ -37,18 +42,18 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte } // Start workers to handle provide requests. -func (p *Provider) Run() { +func (p *provider) Run() { p.queue.Run() p.handleAnnouncements() } // Provide the given cid using specified strategy. -func (p *Provider) Provide(root cid.Cid) error { +func (p *provider) Provide(root cid.Cid) error { return p.queue.Enqueue(root) } // Handle all outgoing cids by providing (announcing) them -func (p *Provider) handleAnnouncements() { +func (p *provider) handleAnnouncements() { for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { go func() { for { From dde397ebc1259e7f86ea6a4428314ebb51f58bd7 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Fri, 8 Mar 2019 16:58:17 -0800 Subject: [PATCH 04/13] Refactor per code climate rules License: MIT Signed-off-by: Michael Avila --- core/builder.go | 2 +- core/core.go | 2 +- core/coreapi/coreapi.go | 2 +- core/coreapi/provider.go | 6 ++-- provider/offline.go | 3 +- provider/provider.go | 4 +-- provider/queue.go | 60 +++++++++++++++++++++------------------- 7 files changed, 42 insertions(+), 37 deletions(-) diff --git a/core/builder.go b/core/builder.go index 2d6d8b664..13dce637d 100644 --- a/core/builder.go +++ b/core/builder.go @@ -277,7 +277,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { n.Resolver = resolver.NewBasicResolver(n.DAG) // Provider - queue, err := provider.NewQueue("provider-v1", ctx, n.Repo.Datastore()) + queue, err := provider.NewQueue(ctx, "provider-v1", n.Repo.Datastore()) if err != nil { return err } diff --git a/core/core.go b/core/core.go index db75e3df0..d13901147 100644 --- a/core/core.go +++ b/core/core.go @@ -125,7 +125,7 @@ type IpfsNode struct { Routing routing.IpfsRouting // the routing system. recommend ipfs-dht Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes - Provider provider.Provider // the value provider system + Provider provider.Provider // the value provider system Reprovider *rp.Reprovider // the value reprovider system IpnsRepub *ipnsrp.Republisher diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 5045c88eb..f22803f92 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -20,8 +20,8 @@ import ( "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/namesys" - "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" bserv "github.com/ipfs/go-blockservice" diff --git a/core/coreapi/provider.go b/core/coreapi/provider.go index b22a3811c..8148c8789 100644 --- a/core/coreapi/provider.go +++ b/core/coreapi/provider.go @@ -4,8 +4,10 @@ import ( cid "github.com/ipfs/go-cid" ) +// ProviderAPI brings Provider behavior to CoreAPI type ProviderAPI CoreAPI -func (api *ProviderAPI) Provide(root cid.Cid) error { - return api.provider.Provide(root) +// Provide the given cid using the current provider +func (api *ProviderAPI) Provide(cid cid.Cid) error { + return api.provider.Provide(cid) } diff --git a/provider/offline.go b/provider/offline.go index f7b9603b9..029ddfa98 100644 --- a/provider/offline.go +++ b/provider/offline.go @@ -2,8 +2,9 @@ package provider import "github.com/ipfs/go-cid" -type offlineProvider struct {} +type offlineProvider struct{} +// NewOfflineProvider creates a Provider that does nothing func NewOfflineProvider() Provider { return &offlineProvider{} } diff --git a/provider/provider.go b/provider/provider.go index e4ee6d9ff..76004f51a 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -18,13 +18,12 @@ const ( provideOutgoingWorkerLimit = 8 ) +// Provider announces blocks to the network type Provider interface { Run() Provide(cid.Cid) error } -// Provider announces blocks to the network, tracks which blocks are -// being provided, and untracks blocks when they're no longer in the blockstore. type provider struct { ctx context.Context // the CIDs for which provide announcements should be made @@ -33,6 +32,7 @@ type provider struct { contentRouting routing.ContentRouting } +// NewProvider creates a provider that announces blocks to the network using a content router func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.ContentRouting) Provider { return &provider{ ctx: ctx, diff --git a/provider/queue.go b/provider/queue.go index 65656450a..cc756366d 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -17,11 +17,12 @@ import ( // not removed from the datastore until you call Complete() on the entry you // receive. type Entry struct { - cid cid.Cid - key ds.Key + cid cid.Cid + key ds.Key queue *Queue } +// Complete the entry by removing it from the queue func (e *Entry) Complete() error { return e.queue.remove(e.key) } @@ -41,36 +42,37 @@ type Queue struct { tail uint64 head uint64 - lock sync.Mutex + lock sync.Mutex datastore ds.Datastore - dequeue chan *Entry + dequeue chan *Entry notEmpty chan struct{} isRunning bool } -func NewQueue(name string, ctx context.Context, datastore ds.Datastore) (*Queue, error) { - namespaced := namespace.Wrap(datastore, ds.NewKey("/" + name + "/queue/")) - head, tail, err := getQueueHeadTail(name, ctx, namespaced) +// NewQueue creates a queue for cids +func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(datastore, ds.NewKey("/"+name+"/queue/")) + head, tail, err := getQueueHeadTail(ctx, name, namespaced) if err != nil { return nil, err } q := &Queue{ - name: name, - ctx: ctx, - head: head, - tail: tail, - lock: sync.Mutex{}, + name: name, + ctx: ctx, + head: head, + tail: tail, + lock: sync.Mutex{}, datastore: namespaced, - dequeue: make(chan *Entry), - notEmpty: make(chan struct{}), + dequeue: make(chan *Entry), + notEmpty: make(chan struct{}), isRunning: false, } return q, nil } -// Put a cid in the queue +// Enqueue puts a cid in the queue func (q *Queue) Enqueue(cid cid.Cid) error { q.lock.Lock() defer q.lock.Unlock() @@ -95,21 +97,18 @@ func (q *Queue) Enqueue(cid cid.Cid) error { return nil } -// Remove an entry from the queue. +// Dequeue returns a channel that if listened to will remove entries from the queue func (q *Queue) Dequeue() <-chan *Entry { return q.dequeue } +// IsEmpty returns whether or not the queue has any items func (q *Queue) IsEmpty() bool { return (q.tail - q.head) == 0 } -func (q *Queue) remove(key ds.Key) error { - return q.datastore.Delete(key) -} - -// dequeue items when the dequeue channel is available to -// be written to +// Run dequeues items when the dequeue channel is available to +// be written to. func (q *Queue) Run() { q.isRunning = true go func() { @@ -178,9 +177,9 @@ func (q *Queue) next() (*Entry, error) { return nil, err } - entry := &Entry { - cid: id, - key: nextKey, + entry := &Entry{ + cid: id, + key: nextKey, queue: q, } @@ -194,14 +193,14 @@ func (q *Queue) queueKey(id uint64) ds.Key { } // crawl over the queue entries to find the head and tail -func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) (uint64, uint64, error) { +func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) (uint64, uint64, error) { query := query.Query{} results, err := datastore.Query(query) if err != nil { return 0, 0, err } - var tail uint64 = 0 + var tail uint64 var head uint64 = math.MaxUint64 for entry := range results.Next() { select { @@ -219,8 +218,8 @@ func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) head = id } - if (id+1) > tail { - tail = (id+1) + if (id + 1) > tail { + tail = (id + 1) } } if err := results.Close(); err != nil { @@ -233,3 +232,6 @@ func getQueueHeadTail(name string, ctx context.Context, datastore ds.Datastore) return head, tail, nil } +func (q *Queue) remove(key ds.Key) error { + return q.datastore.Delete(key) +} From 8337035466942d787ffce694c8f7d18368bb95e1 Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Thu, 14 Mar 2019 16:48:17 -0700 Subject: [PATCH 05/13] Provider queue updates to address deadlocks License: MIT Signed-off-by: Erik Ingenito --- provider/provider.go | 5 +++-- provider/queue.go | 38 +++++++++++++++----------------------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/provider/provider.go b/provider/provider.go index 76004f51a..28fed7649 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -5,9 +5,10 @@ package provider import ( "context" - "github.com/ipfs/go-cid" + + cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-routing" + routing "github.com/libp2p/go-libp2p-routing" ) var ( diff --git a/provider/queue.go b/provider/queue.go index cc756366d..4219cc80d 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -24,6 +24,8 @@ type Entry struct { // Complete the entry by removing it from the queue func (e *Entry) Complete() error { + e.queue.lock.Lock() + defer e.queue.lock.Unlock() return e.queue.remove(e.key) } @@ -46,9 +48,7 @@ type Queue struct { datastore ds.Datastore dequeue chan *Entry - notEmpty chan struct{} - - isRunning bool + added chan struct{} } // NewQueue creates a queue for cids @@ -66,8 +66,7 @@ func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue, lock: sync.Mutex{}, datastore: namespaced, dequeue: make(chan *Entry), - notEmpty: make(chan struct{}), - isRunning: false, + added: make(chan struct{}), } return q, nil } @@ -77,8 +76,6 @@ func (q *Queue) Enqueue(cid cid.Cid) error { q.lock.Lock() defer q.lock.Unlock() - wasEmpty := q.IsEmpty() - nextKey := q.queueKey(q.tail) if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil { @@ -87,11 +84,10 @@ func (q *Queue) Enqueue(cid cid.Cid) error { q.tail++ - if q.isRunning && wasEmpty { - select { - case q.notEmpty <- struct{}{}: + select { + case q.added <- struct{}{}: case <-q.ctx.Done(): - } + default: } return nil @@ -110,20 +106,13 @@ func (q *Queue) IsEmpty() bool { // Run dequeues items when the dequeue channel is available to // be written to. func (q *Queue) Run() { - q.isRunning = true go func() { for { - select { - case <-q.ctx.Done(): - return - default: - } if q.IsEmpty() { select { case <-q.ctx.Done(): return - // wait for a notEmpty message - case <-q.notEmpty: + case <-q.added: } } @@ -138,6 +127,7 @@ func (q *Queue) Run() { return case q.dequeue <- entry: } + } }() } @@ -146,14 +136,16 @@ func (q *Queue) Run() { // found in the next spot. func (q *Queue) next() (*Entry, error) { q.lock.Lock() - defer q.lock.Unlock() + defer func() { + q.lock.Unlock() + }() var nextKey ds.Key var value []byte var err error for { if q.head >= q.tail { - return nil, errors.New("no more entries in queue") + return nil, errors.New("next: no more entries in queue returning") } select { case <-q.ctx.Done(): @@ -194,8 +186,8 @@ func (q *Queue) queueKey(id uint64) ds.Key { // crawl over the queue entries to find the head and tail func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) (uint64, uint64, error) { - query := query.Query{} - results, err := datastore.Query(query) + q := query.Query{} + results, err := datastore.Query(q) if err != nil { return 0, 0, err } From 7d07347ed718ff1759b6210e3113946b3dcb2d6e Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Thu, 14 Mar 2019 16:49:15 -0700 Subject: [PATCH 06/13] Provider tests License: MIT Signed-off-by: Erik Ingenito --- go.mod | 1 + provider/provider_test.go | 79 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 provider/provider_test.go diff --git a/go.mod b/go.mod index 06f13475b..28671188a 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/ipfs/go-fs-lock v0.0.1 github.com/ipfs/go-ipfs-addr v0.0.1 github.com/ipfs/go-ipfs-blockstore v0.0.1 + github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.1 github.com/ipfs/go-ipfs-cmdkit v0.0.1 github.com/ipfs/go-ipfs-cmds v0.0.1 diff --git a/provider/provider_test.go b/provider/provider_test.go new file mode 100644 index 000000000..bacb73944 --- /dev/null +++ b/provider/provider_test.go @@ -0,0 +1,79 @@ +package provider + +import ( + "context" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-ipfs-blocksutil" + pstore "github.com/libp2p/go-libp2p-peerstore" + "math/rand" + "testing" + "time" +) + +var blockGenerator = blocksutil.NewBlockGenerator() + +type mockRouting struct { + provided chan cid.Cid +} + +func mockContentRouting() *mockRouting { + r := mockRouting{} + r.provided = make(chan cid.Cid) + return &r +} + +func TestAnnouncement(t *testing.T) { + ctx := context.Background() + defer func() { + ctx.Done() + }() + + queue, err := NewQueue(ctx, "test", datastore.NewMapDatastore()) + if err != nil { + t.Fatal(err) + } + + r := mockContentRouting() + + provider := NewProvider(ctx, queue, r) + provider.Run() + + cids := cid.NewSet() + + for i := 0; i < 100; i++ { + c := blockGenerator.Next().Cid() + cids.Add(c) + } + + go func() { + for _, c := range cids.Keys() { + err = provider.Provide(c) + // A little goroutine stirring to exercise some different states + r := rand.Intn(10) + time.Sleep(time.Microsecond * time.Duration(r)) + } + }() + + for cids.Len() > 0 { + select { + case cp := <-r.provided: + if !cids.Has(cp) { + t.Fatal("Wrong CID provided") + } + cids.Remove(cp) + case <-time.After(time.Second * 1): + t.Fatal("Timeout waiting for cids to be provided.") + } + } +} + +func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { + r.provided <- cid + return nil +} + +// Search for peers who are able to provide a given key +func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo { + return nil +} \ No newline at end of file From 5d4f3fbdec94b0c8a408bb686e2e46a20201f4b1 Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Fri, 15 Mar 2019 11:04:31 -0700 Subject: [PATCH 07/13] Cleanup, fix broken restart, and more tests. License: MIT Signed-off-by: Erik Ingenito --- provider/provider.go | 21 ++------- provider/provider_test.go | 17 ++++---- provider/queue.go | 83 ++++++++++++++++-------------------- provider/queue_test.go | 89 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 72 deletions(-) create mode 100644 provider/queue_test.go diff --git a/provider/provider.go b/provider/provider.go index 28fed7649..7a30f6d27 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -62,28 +62,13 @@ func (p *provider) handleAnnouncements() { case <-p.ctx.Done(): return case entry := <-p.queue.Dequeue(): - if err := doProvide(p.ctx, p.contentRouting, entry.cid); err != nil { + log.Info("announce - start - ", entry.cid) + if err := p.contentRouting.Provide(p.ctx, entry.cid, true); err != nil { log.Warningf("Unable to provide entry: %s, %s", entry.cid, err) } - - if err := entry.Complete(); err != nil { - log.Warningf("Unable to complete queue entry when providing: %s, %s", entry.cid, err) - } + log.Info("announce - end - ", entry.cid) } } }() } } - -// TODO: better document this provide logic -func doProvide(ctx context.Context, contentRouting routing.ContentRouting, key cid.Cid) error { - // announce - log.Info("announce - start - ", key) - if err := contentRouting.Provide(ctx, key, true); err != nil { - log.Warningf("Failed to provide cid: %s", err) - // TODO: Maybe put these failures onto a failures queue? - return err - } - log.Info("announce - end - ", key) - return nil -} diff --git a/provider/provider_test.go b/provider/provider_test.go index bacb73944..464d73d9a 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -2,13 +2,15 @@ package provider import ( "context" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-ipfs-blocksutil" - pstore "github.com/libp2p/go-libp2p-peerstore" "math/rand" "testing" "time" + + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + pstore "github.com/libp2p/go-libp2p-peerstore" + sync "github.com/ipfs/go-datastore/sync" ) var blockGenerator = blocksutil.NewBlockGenerator() @@ -25,11 +27,10 @@ func mockContentRouting() *mockRouting { func TestAnnouncement(t *testing.T) { ctx := context.Background() - defer func() { - ctx.Done() - }() + defer ctx.Done() - queue, err := NewQueue(ctx, "test", datastore.NewMapDatastore()) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) if err != nil { t.Fatal(err) } diff --git a/provider/queue.go b/provider/queue.go index 4219cc80d..f1c9945cd 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -3,14 +3,15 @@ package provider import ( "context" "errors" - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/query" "math" "strconv" "strings" "sync" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" + query "github.com/ipfs/go-datastore/query" ) // Entry allows for the durability in the queue. When a cid is dequeued it is @@ -18,17 +19,10 @@ import ( // receive. type Entry struct { cid cid.Cid - key ds.Key + key datastore.Key queue *Queue } -// Complete the entry by removing it from the queue -func (e *Entry) Complete() error { - e.queue.lock.Lock() - defer e.queue.lock.Unlock() - return e.queue.remove(e.key) -} - // Queue provides a durable, FIFO interface to the datastore for storing cids // // Durability just means that cids in the process of being provided when a @@ -44,41 +38,41 @@ type Queue struct { tail uint64 head uint64 - lock sync.Mutex - datastore ds.Datastore + enqueueLock sync.Mutex + ds datastore.Datastore // Must be threadsafe dequeue chan *Entry added chan struct{} } // NewQueue creates a queue for cids -func NewQueue(ctx context.Context, name string, datastore ds.Datastore) (*Queue, error) { - namespaced := namespace.Wrap(datastore, ds.NewKey("/"+name+"/queue/")) +func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { + namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) head, tail, err := getQueueHeadTail(ctx, name, namespaced) if err != nil { return nil, err } q := &Queue{ - name: name, - ctx: ctx, - head: head, - tail: tail, - lock: sync.Mutex{}, - datastore: namespaced, - dequeue: make(chan *Entry), - added: make(chan struct{}), + name: name, + ctx: ctx, + head: head, + tail: tail, + enqueueLock: sync.Mutex{}, + ds: namespaced, + dequeue: make(chan *Entry), + added: make(chan struct{}), } return q, nil } // Enqueue puts a cid in the queue func (q *Queue) Enqueue(cid cid.Cid) error { - q.lock.Lock() - defer q.lock.Unlock() + q.enqueueLock.Lock() + defer q.enqueueLock.Unlock() nextKey := q.queueKey(q.tail) - if err := q.datastore.Put(nextKey, cid.Bytes()); err != nil { + if err := q.ds.Put(nextKey, cid.Bytes()); err != nil { return err } @@ -126,8 +120,9 @@ func (q *Queue) Run() { case <-q.ctx.Done(): return case q.dequeue <- entry: + q.head++ + err = q.ds.Delete(entry.key) } - } }() } @@ -135,12 +130,7 @@ func (q *Queue) Run() { // Find the next item in the queue, crawl forward if an entry is not // found in the next spot. func (q *Queue) next() (*Entry, error) { - q.lock.Lock() - defer func() { - q.lock.Unlock() - }() - - var nextKey ds.Key + var key datastore.Key var value []byte var err error for { @@ -152,9 +142,12 @@ func (q *Queue) next() (*Entry, error) { return nil, nil default: } - nextKey = q.queueKey(q.head) - value, err = q.datastore.Get(nextKey) - if err == ds.ErrNotFound { + key = q.queueKey(q.head) + + value, err = q.ds.Get(key) + + value, err = q.ds.Get(key) + if err == datastore.ErrNotFound { q.head++ continue } else if err != nil { @@ -171,21 +164,23 @@ func (q *Queue) next() (*Entry, error) { entry := &Entry{ cid: id, - key: nextKey, + key: key, queue: q, } - q.head++ + if err != nil { + return nil, err + } return entry, nil } -func (q *Queue) queueKey(id uint64) ds.Key { - return ds.NewKey(strconv.FormatUint(id, 10)) +func (q *Queue) queueKey(id uint64) datastore.Key { + return datastore.NewKey(strconv.FormatUint(id, 10)) } // crawl over the queue entries to find the head and tail -func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) (uint64, uint64, error) { +func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) { q := query.Query{} results, err := datastore.Query(q) if err != nil { @@ -223,7 +218,3 @@ func getQueueHeadTail(ctx context.Context, name string, datastore ds.Datastore) return head, tail, nil } - -func (q *Queue) remove(key ds.Key) error { - return q.datastore.Delete(key) -} diff --git a/provider/queue_test.go b/provider/queue_test.go new file mode 100644 index 000000000..724eca4ee --- /dev/null +++ b/provider/queue_test.go @@ -0,0 +1,89 @@ +package provider + +import ( + "context" + "testing" + "time" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + sync "github.com/ipfs/go-datastore/sync" +) + +func makeCids(n int) []cid.Cid { + cids := make([]cid.Cid, 0, 10) + for i := 0; i < 10; i++ { + c := blockGenerator.Next().Cid() + cids = append(cids, c) + } + return cids +} + +func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { + for _, c := range cids { + select { + case dequeued := <- q.dequeue: + if c != dequeued.cid { + t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued.cid) + } + + case <-time.After(time.Second * 1): + t.Fatal("Timeout waiting for cids to be provided.") + } + } +} + +func TestBasicOperation(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + queue.Run() + + cids := makeCids(10) + + for _, c := range cids { + err = queue.Enqueue(c) + if err != nil { + t.Fatal("Failed to enqueue CID") + } + } + + assertOrdered(cids, queue, t) +} + +func TestInitialization(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + queue.Run() + + cids := makeCids(10) + + for _, c := range cids { + err = queue.Enqueue(c) + if err != nil { + t.Fatal("Failed to enqueue CID") + } + } + + assertOrdered(cids[:5], queue, t) + + // make a new queue, same data + queue, err = NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + queue.Run() + + assertOrdered(cids[5:], queue, t) +} From 1595253b7b6f73b2b1f121677f3f32e7b2a6cb2d Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Fri, 15 Mar 2019 14:19:19 -0700 Subject: [PATCH 08/13] Remove locking entirely License: MIT Signed-off-by: Erik Ingenito --- provider/provider.go | 22 ++--- provider/provider_test.go | 4 +- provider/queue.go | 168 +++++++++++++------------------------- provider/queue_test.go | 19 ++--- 4 files changed, 71 insertions(+), 142 deletions(-) diff --git a/provider/provider.go b/provider/provider.go index 7a30f6d27..7e149f777 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -11,13 +11,9 @@ import ( routing "github.com/libp2p/go-libp2p-routing" ) -var ( - log = logging.Logger("provider") -) +var log = logging.Logger("provider") -const ( - provideOutgoingWorkerLimit = 8 -) +const provideOutgoingWorkerLimit = 8 // Provider announces blocks to the network type Provider interface { @@ -44,13 +40,13 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte // Start workers to handle provide requests. func (p *provider) Run() { - p.queue.Run() p.handleAnnouncements() } // Provide the given cid using specified strategy. func (p *provider) Provide(root cid.Cid) error { - return p.queue.Enqueue(root) + p.queue.Enqueue(root) + return nil } // Handle all outgoing cids by providing (announcing) them @@ -61,12 +57,12 @@ func (p *provider) handleAnnouncements() { select { case <-p.ctx.Done(): return - case entry := <-p.queue.Dequeue(): - log.Info("announce - start - ", entry.cid) - if err := p.contentRouting.Provide(p.ctx, entry.cid, true); err != nil { - log.Warningf("Unable to provide entry: %s, %s", entry.cid, err) + case c := <-p.queue.Dequeue(): + log.Info("announce - start - ", c) + if err := p.contentRouting.Provide(p.ctx, c, true); err != nil { + log.Warningf("Unable to provide entry: %s, %s", c, err) } - log.Info("announce - end - ", entry.cid) + log.Info("announce - end - ", c) } } }() diff --git a/provider/provider_test.go b/provider/provider_test.go index 464d73d9a..95282e38a 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -42,7 +42,7 @@ func TestAnnouncement(t *testing.T) { cids := cid.NewSet() - for i := 0; i < 100; i++ { + for i := 0; i < 1000; i++ { c := blockGenerator.Next().Cid() cids.Add(c) } @@ -63,7 +63,7 @@ func TestAnnouncement(t *testing.T) { t.Fatal("Wrong CID provided") } cids.Remove(cp) - case <-time.After(time.Second * 1): + case <-time.After(time.Second * 5): t.Fatal("Timeout waiting for cids to be provided.") } } diff --git a/provider/queue.go b/provider/queue.go index f1c9945cd..3f7115f68 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -2,27 +2,15 @@ package provider import ( "context" - "errors" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" "math" "strconv" "strings" - "sync" - - cid "github.com/ipfs/go-cid" - datastore "github.com/ipfs/go-datastore" - namespace "github.com/ipfs/go-datastore/namespace" - query "github.com/ipfs/go-datastore/query" ) -// Entry allows for the durability in the queue. When a cid is dequeued it is -// not removed from the datastore until you call Complete() on the entry you -// receive. -type Entry struct { - cid cid.Cid - key datastore.Key - queue *Queue -} - // Queue provides a durable, FIFO interface to the datastore for storing cids // // Durability just means that cids in the process of being provided when a @@ -32,17 +20,15 @@ type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider name string - ctx context.Context tail uint64 head uint64 - enqueueLock sync.Mutex ds datastore.Datastore // Must be threadsafe - dequeue chan *Entry - added chan struct{} + dequeue chan cid.Cid + enqueue chan cid.Cid } // NewQueue creates a queue for cids @@ -57,124 +43,85 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, ctx: ctx, head: head, tail: tail, - enqueueLock: sync.Mutex{}, ds: namespaced, - dequeue: make(chan *Entry), - added: make(chan struct{}), + dequeue: make(chan cid.Cid), + enqueue: make(chan cid.Cid), } + q.work() return q, nil } // Enqueue puts a cid in the queue -func (q *Queue) Enqueue(cid cid.Cid) error { - q.enqueueLock.Lock() - defer q.enqueueLock.Unlock() - - nextKey := q.queueKey(q.tail) - - if err := q.ds.Put(nextKey, cid.Bytes()); err != nil { - return err - } - - q.tail++ - +func (q *Queue) Enqueue(cid cid.Cid) { select { - case q.added <- struct{}{}: + case q.enqueue <- cid: case <-q.ctx.Done(): - default: } - - return nil } // Dequeue returns a channel that if listened to will remove entries from the queue -func (q *Queue) Dequeue() <-chan *Entry { +func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } -// IsEmpty returns whether or not the queue has any items -func (q *Queue) IsEmpty() bool { - return (q.tail - q.head) == 0 -} - -// Run dequeues items when the dequeue channel is available to -// be written to. -func (q *Queue) Run() { +// Run dequeues and enqueues when available. +func (q *Queue) work() { go func() { for { - if q.IsEmpty() { - select { - case <-q.ctx.Done(): - return - case <-q.added: + var c cid.Cid = cid.Undef + var key datastore.Key + var dequeue chan cid.Cid + + // If we're not empty dequeue a cid and ship it + if q.head < q.tail { + key = q.queueKey(q.head) + value, err := q.ds.Get(key) + + if err == datastore.ErrNotFound { + log.Warningf("Missing entry in queue: %s", err) + q.head++ + continue + } else if err != nil { + log.Warningf("Error fetching from queue: %s", err) + continue + } + + c, err = cid.Parse(value) + if err != nil { + log.Warningf("Error marshalling Cid from queue: ", err) + q.head++ + err = q.ds.Delete(key) + continue } } - entry, err := q.next() - if err != nil { - log.Warningf("Error Dequeue()-ing: %s, %s", entry, err) - continue + if c != cid.Undef { + dequeue = q.dequeue } select { + case toQueue := <-q.enqueue: + nextKey := q.queueKey(q.tail) + + if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil { + log.Errorf("Failed to enqueue cid: %s", err) + } + + q.tail++ + case dequeue <- c: + q.head++ + err := q.ds.Delete(key) + + if err != nil { + log.Errorf("Failed to delete queued cid: %s", err) + } case <-q.ctx.Done(): return - case q.dequeue <- entry: - q.head++ - err = q.ds.Delete(entry.key) } } }() } -// Find the next item in the queue, crawl forward if an entry is not -// found in the next spot. -func (q *Queue) next() (*Entry, error) { - var key datastore.Key - var value []byte - var err error - for { - if q.head >= q.tail { - return nil, errors.New("next: no more entries in queue returning") - } - select { - case <-q.ctx.Done(): - return nil, nil - default: - } - key = q.queueKey(q.head) - - value, err = q.ds.Get(key) - - value, err = q.ds.Get(key) - if err == datastore.ErrNotFound { - q.head++ - continue - } else if err != nil { - return nil, err - } else { - break - } - } - - id, err := cid.Parse(value) - if err != nil { - return nil, err - } - - entry := &Entry{ - cid: id, - key: key, - queue: q, - } - - if err != nil { - return nil, err - } - - return entry, nil -} - func (q *Queue) queueKey(id uint64) datastore.Key { return datastore.NewKey(strconv.FormatUint(id, 10)) } @@ -190,11 +137,6 @@ func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Data var tail uint64 var head uint64 = math.MaxUint64 for entry := range results.Next() { - select { - case <-ctx.Done(): - return 0, 0, nil - default: - } trimmed := strings.TrimPrefix(entry.Key, "/") id, err := strconv.ParseUint(trimmed, 10, 64) if err != nil { diff --git a/provider/queue_test.go b/provider/queue_test.go index 724eca4ee..2ac2de288 100644 --- a/provider/queue_test.go +++ b/provider/queue_test.go @@ -11,7 +11,7 @@ import ( ) func makeCids(n int) []cid.Cid { - cids := make([]cid.Cid, 0, 10) + cids := make([]cid.Cid, 0, n) for i := 0; i < 10; i++ { c := blockGenerator.Next().Cid() cids = append(cids, c) @@ -23,8 +23,8 @@ func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { for _, c := range cids { select { case dequeued := <- q.dequeue: - if c != dequeued.cid { - t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued.cid) + if c != dequeued { + t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued) } case <-time.After(time.Second * 1): @@ -42,15 +42,11 @@ func TestBasicOperation(t *testing.T) { if err != nil { t.Fatal(err) } - queue.Run() cids := makeCids(10) for _, c := range cids { - err = queue.Enqueue(c) - if err != nil { - t.Fatal("Failed to enqueue CID") - } + queue.Enqueue(c) } assertOrdered(cids, queue, t) @@ -65,15 +61,11 @@ func TestInitialization(t *testing.T) { if err != nil { t.Fatal(err) } - queue.Run() cids := makeCids(10) for _, c := range cids { - err = queue.Enqueue(c) - if err != nil { - t.Fatal("Failed to enqueue CID") - } + queue.Enqueue(c) } assertOrdered(cids[:5], queue, t) @@ -83,7 +75,6 @@ func TestInitialization(t *testing.T) { if err != nil { t.Fatal(err) } - queue.Run() assertOrdered(cids[5:], queue, t) } From 7fcafb66152f5ac0f49e82899932432d0e445a55 Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Fri, 15 Mar 2019 16:36:43 -0700 Subject: [PATCH 09/13] Gofmt License: MIT Signed-off-by: Erik Ingenito --- provider/provider.go | 2 +- provider/provider_test.go | 20 ++++++++++---------- provider/queue.go | 31 ++++++++++++++----------------- provider/queue_test.go | 2 +- 4 files changed, 26 insertions(+), 29 deletions(-) diff --git a/provider/provider.go b/provider/provider.go index 7e149f777..a5093d65b 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -11,7 +11,7 @@ import ( routing "github.com/libp2p/go-libp2p-routing" ) -var log = logging.Logger("provider") +var log = logging.Logger("provider") const provideOutgoingWorkerLimit = 8 diff --git a/provider/provider_test.go b/provider/provider_test.go index 95282e38a..7836f04ce 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -6,11 +6,11 @@ import ( "testing" "time" - blocksutil "github.com/ipfs/go-ipfs-blocksutil" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" - pstore "github.com/libp2p/go-libp2p-peerstore" sync "github.com/ipfs/go-datastore/sync" + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + pstore "github.com/libp2p/go-libp2p-peerstore" ) var blockGenerator = blocksutil.NewBlockGenerator() @@ -58,13 +58,13 @@ func TestAnnouncement(t *testing.T) { for cids.Len() > 0 { select { - case cp := <-r.provided: - if !cids.Has(cp) { - t.Fatal("Wrong CID provided") - } - cids.Remove(cp) - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for cids to be provided.") + case cp := <-r.provided: + if !cids.Has(cp) { + t.Fatal("Wrong CID provided") + } + cids.Remove(cp) + case <-time.After(time.Second * 5): + t.Fatal("Timeout waiting for cids to be provided.") } } } @@ -77,4 +77,4 @@ func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) // Search for peers who are able to provide a given key func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo { return nil -} \ No newline at end of file +} diff --git a/provider/queue.go b/provider/queue.go index 3f7115f68..b5b7ba709 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -19,14 +19,11 @@ import ( type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider - name string - ctx context.Context - - tail uint64 - head uint64 - - ds datastore.Datastore // Must be threadsafe - + name string + ctx context.Context + tail uint64 + head uint64 + ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid enqueue chan cid.Cid } @@ -39,13 +36,13 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, return nil, err } q := &Queue{ - name: name, - ctx: ctx, - head: head, - tail: tail, - ds: namespaced, - dequeue: make(chan cid.Cid), - enqueue: make(chan cid.Cid), + name: name, + ctx: ctx, + head: head, + tail: tail, + ds: namespaced, + dequeue: make(chan cid.Cid), + enqueue: make(chan cid.Cid), } q.work() return q, nil @@ -54,8 +51,8 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, // Enqueue puts a cid in the queue func (q *Queue) Enqueue(cid cid.Cid) { select { - case q.enqueue <- cid: - case <-q.ctx.Done(): + case q.enqueue <- cid: + case <-q.ctx.Done(): } } diff --git a/provider/queue_test.go b/provider/queue_test.go index 2ac2de288..0dd3bab09 100644 --- a/provider/queue_test.go +++ b/provider/queue_test.go @@ -22,7 +22,7 @@ func makeCids(n int) []cid.Cid { func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { for _, c := range cids { select { - case dequeued := <- q.dequeue: + case dequeued := <-q.dequeue: if c != dequeued { t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued) } From 6c1eca959eeda71868bb7bcc0523e093b248a75c Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Fri, 15 Mar 2019 20:45:59 -0700 Subject: [PATCH 10/13] Make queue operation more clear License: MIT Signed-off-by: Erik Ingenito --- provider/provider_test.go | 2 +- provider/queue.go | 72 ++++++++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/provider/provider_test.go b/provider/provider_test.go index 7836f04ce..14cd68521 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -42,7 +42,7 @@ func TestAnnouncement(t *testing.T) { cids := cid.NewSet() - for i := 0; i < 1000; i++ { + for i := 0; i < 100; i++ { c := blockGenerator.Next().Cid() cids.Add(c) } diff --git a/provider/queue.go b/provider/queue.go index b5b7ba709..122077954 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -61,37 +61,50 @@ func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } +type entry struct { + cid cid.Cid + key datastore.Key +} + +// Look for next Cid in the queue and return it. Skip over gaps and mangled data +func (q *Queue) nextEntry() (datastore.Key, cid.Cid) { + for { + if q.head >= q.tail { + return datastore.Key{}, cid.Undef + } + + key := q.queueKey(q.head) + value, err := q.ds.Get(key) + + if err == datastore.ErrNotFound { + log.Warningf("Error missing entry in queue: %s", key) + q.head++ // move on + continue + } else if err != nil { + log.Warningf("Error fetching from queue: %s", err) + continue + } + + c, err := cid.Parse(value) + if err != nil { + log.Warningf("Error marshalling Cid from queue: ", err) + q.head++ + err = q.ds.Delete(key) + continue + } + + return key, c + } +} + // Run dequeues and enqueues when available. func (q *Queue) work() { go func() { + for { - var c cid.Cid = cid.Undef - var key datastore.Key + k, c := q.nextEntry() var dequeue chan cid.Cid - // If we're not empty dequeue a cid and ship it - if q.head < q.tail { - key = q.queueKey(q.head) - value, err := q.ds.Get(key) - - if err == datastore.ErrNotFound { - log.Warningf("Missing entry in queue: %s", err) - q.head++ - continue - } else if err != nil { - log.Warningf("Error fetching from queue: %s", err) - continue - } - - c, err = cid.Parse(value) - if err != nil { - log.Warningf("Error marshalling Cid from queue: ", err) - q.head++ - err = q.ds.Delete(key) - continue - } - } - if c != cid.Undef { dequeue = q.dequeue } @@ -102,16 +115,19 @@ func (q *Queue) work() { if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to enqueue cid: %s", err) + continue } q.tail++ case dequeue <- c: - q.head++ - err := q.ds.Delete(key) + err := q.ds.Delete(k) if err != nil { - log.Errorf("Failed to delete queued cid: %s", err) + log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) + continue } + + q.head++ case <-q.ctx.Done(): return } From 906d2bd093fc8f6776cf80f81f2a363ad5006ec8 Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Sat, 16 Mar 2019 09:52:24 -0700 Subject: [PATCH 11/13] Don't do extra work in provider queue loop License: MIT Signed-off-by: Erik Ingenito --- provider/queue.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/provider/queue.go b/provider/queue.go index 122077954..a3268e109 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -2,13 +2,14 @@ package provider import ( "context" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/query" "math" "strconv" "strings" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" + query "github.com/ipfs/go-datastore/query" ) // Queue provides a durable, FIFO interface to the datastore for storing cids @@ -100,11 +101,16 @@ func (q *Queue) nextEntry() (datastore.Key, cid.Cid) { // Run dequeues and enqueues when available. func (q *Queue) work() { go func() { + var k datastore.Key = datastore.Key{} + var c cid.Cid = cid.Undef for { - k, c := q.nextEntry() - var dequeue chan cid.Cid + if c == cid.Undef { + k, c = q.nextEntry() + } + // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue + var dequeue chan cid.Cid if c != cid.Undef { dequeue = q.dequeue } @@ -126,7 +132,7 @@ func (q *Queue) work() { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) continue } - + c = cid.Undef q.head++ case <-q.ctx.Done(): return From 09cc62ebdfe9b1c6277e4a8f142cd10819cd93b9 Mon Sep 17 00:00:00 2001 From: Erik Ingenito Date: Sat, 16 Mar 2019 09:52:57 -0700 Subject: [PATCH 12/13] Additional provider tests License: MIT Signed-off-by: Erik Ingenito --- provider/provider_test.go | 19 +++++++------- provider/queue_test.go | 53 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/provider/provider_test.go b/provider/provider_test.go index 14cd68521..7ef007b03 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -19,6 +19,15 @@ type mockRouting struct { provided chan cid.Cid } +func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { + r.provided <- cid + return nil +} + +func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo { + return nil +} + func mockContentRouting() *mockRouting { r := mockRouting{} r.provided = make(chan cid.Cid) @@ -68,13 +77,3 @@ func TestAnnouncement(t *testing.T) { } } } - -func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { - r.provided <- cid - return nil -} - -// Search for peers who are able to provide a given key -func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan pstore.PeerInfo { - return nil -} diff --git a/provider/queue_test.go b/provider/queue_test.go index 0dd3bab09..e1b74878e 100644 --- a/provider/queue_test.go +++ b/provider/queue_test.go @@ -52,6 +52,58 @@ func TestBasicOperation(t *testing.T) { assertOrdered(cids, queue, t) } +func TestSparseDatastore(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(10) + for _, c := range cids { + queue.Enqueue(c) + } + + // remove entries in the middle + err = queue.ds.Delete(queue.queueKey(5)) + if err != nil { + t.Fatal(err) + } + + err = queue.ds.Delete(queue.queueKey(6)) + if err != nil { + t.Fatal(err) + } + + expected := append(cids[:5], cids[7:]...) + assertOrdered(expected, queue, t) +} + +func TestMangledData(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(10) + for _, c := range cids { + queue.Enqueue(c) + } + + // remove entries in the middle + err = queue.ds.Put(queue.queueKey(5), []byte("borked")) + + expected := append(cids[:5], cids[6:]...) + assertOrdered(expected, queue, t) +} + func TestInitialization(t *testing.T) { ctx := context.Background() defer ctx.Done() @@ -63,7 +115,6 @@ func TestInitialization(t *testing.T) { } cids := makeCids(10) - for _, c := range cids { queue.Enqueue(c) } From 019af7507582cb16074ecbed8fa501a14714643e Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Wed, 20 Mar 2019 14:13:30 -0700 Subject: [PATCH 13/13] Add comments; Check ctx.Err(); Move import License: MIT Signed-off-by: Michael Avila --- core/core.go | 2 +- provider/provider.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/core.go b/core/core.go index d13901147..0d41b7378 100644 --- a/core/core.go +++ b/core/core.go @@ -14,7 +14,6 @@ import ( "context" "errors" "fmt" - "github.com/ipfs/go-ipfs/provider" "io" "io/ioutil" "os" @@ -29,6 +28,7 @@ import ( ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" p2p "github.com/ipfs/go-ipfs/p2p" pin "github.com/ipfs/go-ipfs/pin" + provider "github.com/ipfs/go-ipfs/provider" repo "github.com/ipfs/go-ipfs/repo" bitswap "github.com/ipfs/go-bitswap" diff --git a/provider/provider.go b/provider/provider.go index a5093d65b..f9aa4ed78 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -5,10 +5,9 @@ package provider import ( "context" - - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" - routing "github.com/libp2p/go-libp2p-routing" + "github.com/libp2p/go-libp2p-routing" ) var log = logging.Logger("provider") @@ -17,7 +16,9 @@ const provideOutgoingWorkerLimit = 8 // Provider announces blocks to the network type Provider interface { + // Run is used to begin processing the provider work Run() + // Provide takes a cid and makes an attempt to announce it to the network Provide(cid.Cid) error } @@ -53,7 +54,7 @@ func (p *provider) Provide(root cid.Cid) error { func (p *provider) handleAnnouncements() { for workers := 0; workers < provideOutgoingWorkerLimit; workers++ { go func() { - for { + for p.ctx.Err() == nil { select { case <-p.ctx.Done(): return