Merge pull request #6125 from ipfs/chores/query-for-provider-queue-head-and-tail

Query for provider head/tail
This commit is contained in:
Steven Allen 2019-03-27 14:19:00 +00:00 committed by GitHub
commit b9e45e8941
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 39 deletions

4
go.mod
View File

@ -20,10 +20,10 @@ require (
github.com/ipfs/go-blockservice v0.0.3
github.com/ipfs/go-cid v0.0.1
github.com/ipfs/go-cidutil v0.0.1
github.com/ipfs/go-datastore v0.0.1
github.com/ipfs/go-datastore v0.0.2
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ds-badger v0.0.2
github.com/ipfs/go-ds-flatfs v0.0.1
github.com/ipfs/go-ds-flatfs v0.0.2
github.com/ipfs/go-ds-leveldb v0.0.1
github.com/ipfs/go-ds-measure v0.0.1
github.com/ipfs/go-fs-lock v0.0.1

10
go.sum
View File

@ -121,12 +121,14 @@ github.com/ipfs/go-cidutil v0.0.1 h1:UpDQI2LrihqOGY2mHaMhjrhh1DJ14N/58BQb7lKXvlQ
github.com/ipfs/go-cidutil v0.0.1/go.mod h1:/0H649ymJksNEZvBAkM18HIctk7tkONH9tspTeLok48=
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.0.2 h1:Blyjq95atbxmCHSaDt42phIhf9NGLflzwq/95FqsNs0=
github.com/ipfs/go-datastore v0.0.2/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.0.2 h1:7ToQt7QByBhOTuZF2USMv+PGlMcBC7FW7FdgQ4FCsoo=
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
github.com/ipfs/go-ds-flatfs v0.0.1 h1:yqWwRYFOGNClUL7V2jvcx4KMMso1Jv+pgQzsv9/gWBs=
github.com/ipfs/go-ds-flatfs v0.0.1/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM=
github.com/ipfs/go-ds-flatfs v0.0.2 h1:1zujtU5bPBH6B8roE+TknKIbBCrpau865xUk0dH3x2A=
github.com/ipfs/go-ds-flatfs v0.0.2/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM=
github.com/ipfs/go-ds-leveldb v0.0.1 h1:Z0lsTFciec9qYsyngAw1f/czhRU35qBLR2vhavPFgqA=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-measure v0.0.1 h1:PrCueug+yZLkDCOthZTXKinuoCal/GvlAT7cNxzr03g=
@ -143,8 +145,6 @@ github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE
github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw=
github.com/ipfs/go-ipfs-cmdkit v0.0.1 h1:X6YXEAjUljTzevE6DPUKXSqcgf+4FXzcn5B957F5MXo=
github.com/ipfs/go-ipfs-cmdkit v0.0.1/go.mod h1:9FtbMdUabcSqv/G4/8WCxSLxkZxn/aZEFrxxqnVcRbg=
github.com/ipfs/go-ipfs-cmds v0.0.3 h1:QvNUE8lslNQghxXf6vzV1ZoMQCDDAtKG8f2oINiRew4=
github.com/ipfs/go-ipfs-cmds v0.0.3/go.mod h1:1QVgxSgenZvOMGVC/XUTC7tJxRBGPLxYvpgPpCi3DUk=
github.com/ipfs/go-ipfs-cmds v0.0.4 h1:Iq4I8irWw5TmHe/4pjSyYJLbYkkdMOgHVe8ofJmPa4k=
github.com/ipfs/go-ipfs-cmds v0.0.4/go.mod h1:1QVgxSgenZvOMGVC/XUTC7tJxRBGPLxYvpgPpCi3DUk=
github.com/ipfs/go-ipfs-config v0.0.1 h1:6ED08emzI1imdsAjixFi2pEyZxTVD5ECKtCOxLBx+Uc=
@ -226,8 +226,6 @@ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec h1:DQqZhhDvrTrEQ3Q
github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jbenet/go-is-domain v0.0.0-20160119110217-ba9815c809e0 h1:qxMncUW0TzViA3REiN3/YgVOoekVtUtEY0O/j/Qlctg=
github.com/jbenet/go-is-domain v0.0.0-20160119110217-ba9815c809e0/go.mod h1:I9DYFcJAixF5f9iOu/9oC451/bq+QDTaLGznkcJPWgg=
github.com/jbenet/go-is-domain v1.0.2 h1:11r5MSptcNFZyBoqubBQnVMUKRWLuRjL1banaIk+iYo=
github.com/jbenet/go-is-domain v1.0.2/go.mod h1:xbRLRb0S7FgzDBTJlguhDVwLYM/5yNtvktxj2Ttfy7Q=
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c h1:uUx61FiAa1GI6ZmVd2wf2vULeQZIKG66eybjNXKYCz4=

View File

@ -2,7 +2,7 @@ package provider
import (
"context"
"math"
"fmt"
"strconv"
"strings"
@ -32,7 +32,7 @@ type Queue struct {
// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
head, tail, err := getQueueHeadTail(ctx, name, namespaced)
head, tail, err := getQueueHeadTail(ctx, namespaced)
if err != nil {
return nil, err
}
@ -142,40 +142,52 @@ func (q *Queue) work() {
}
func (q *Queue) queueKey(id uint64) datastore.Key {
return datastore.NewKey(strconv.FormatUint(id, 10))
s := fmt.Sprintf("%016X", id)
return datastore.NewKey(s)
}
// crawl over the queue entries to find the head and tail
func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) {
q := query.Query{}
results, err := datastore.Query(q)
func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) {
head, err := getQueueHead(datastore)
if err != nil {
return 0, 0, err
}
var tail uint64
var head uint64 = math.MaxUint64
for entry := range results.Next() {
trimmed := strings.TrimPrefix(entry.Key, "/")
id, err := strconv.ParseUint(trimmed, 10, 64)
if err != nil {
return 0, 0, err
}
if id < head {
head = id
}
if (id + 1) > tail {
tail = (id + 1)
}
}
if err := results.Close(); err != nil {
tail, err := getQueueTail(datastore)
if err != nil {
return 0, 0, err
}
if head == math.MaxUint64 {
head = 0
}
return head, tail, nil
}
func getQueueHead(ds datastore.Datastore) (uint64, error) {
return getFirstIDByOrder(ds, query.OrderByKey{})
}
func getQueueTail(ds datastore.Datastore) (uint64, error) {
tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{})
if err != nil {
return 0, err
}
if tail > 0 {
tail++
}
return tail, nil
}
func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) {
q := query.Query{Orders: []query.Order{order}}
results, err := ds.Query(q)
if err != nil {
return 0, err
}
defer results.Close()
r, ok := results.NextSync()
if !ok {
return 0, nil
}
trimmed := strings.TrimPrefix(r.Key, "/")
id, err := strconv.ParseUint(trimmed, 16, 64)
if err != nil {
return 0, err
}
return id, nil
}

View File

@ -12,7 +12,7 @@ import (
func makeCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, n)
for i := 0; i < 10; i++ {
for i := 0; i < n; i++ {
c := blockGenerator.Next().Cid()
cids = append(cids, c)
}
@ -129,3 +129,27 @@ func TestInitialization(t *testing.T) {
assertOrdered(cids[5:], queue, t)
}
func TestInitializationWithManyCids(t *testing.T) {
ctx := context.Background()
defer ctx.Done()
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
cids := makeCids(25)
for _, c := range cids {
queue.Enqueue(c)
}
// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
assertOrdered(cids, queue, t)
}