mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-21 18:37:45 +08:00
Query for provider head/tail
License: MIT Signed-off-by: Michael Avila <davidmichaelavila@gmail.com>
This commit is contained in:
parent
8c96e3be90
commit
fca85a2bdd
4
go.mod
4
go.mod
@ -20,10 +20,10 @@ require (
|
||||
github.com/ipfs/go-blockservice v0.0.3
|
||||
github.com/ipfs/go-cid v0.0.1
|
||||
github.com/ipfs/go-cidutil v0.0.1
|
||||
github.com/ipfs/go-datastore v0.0.1
|
||||
github.com/ipfs/go-datastore v0.0.2
|
||||
github.com/ipfs/go-detect-race v0.0.1
|
||||
github.com/ipfs/go-ds-badger v0.0.2
|
||||
github.com/ipfs/go-ds-flatfs v0.0.1
|
||||
github.com/ipfs/go-ds-flatfs v0.0.2
|
||||
github.com/ipfs/go-ds-leveldb v0.0.1
|
||||
github.com/ipfs/go-ds-measure v0.0.1
|
||||
github.com/ipfs/go-fs-lock v0.0.1
|
||||
|
||||
10
go.sum
10
go.sum
@ -121,12 +121,14 @@ github.com/ipfs/go-cidutil v0.0.1 h1:UpDQI2LrihqOGY2mHaMhjrhh1DJ14N/58BQb7lKXvlQ
|
||||
github.com/ipfs/go-cidutil v0.0.1/go.mod h1:/0H649ymJksNEZvBAkM18HIctk7tkONH9tspTeLok48=
|
||||
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
|
||||
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
|
||||
github.com/ipfs/go-datastore v0.0.2 h1:Blyjq95atbxmCHSaDt42phIhf9NGLflzwq/95FqsNs0=
|
||||
github.com/ipfs/go-datastore v0.0.2/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
|
||||
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
|
||||
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
|
||||
github.com/ipfs/go-ds-badger v0.0.2 h1:7ToQt7QByBhOTuZF2USMv+PGlMcBC7FW7FdgQ4FCsoo=
|
||||
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
|
||||
github.com/ipfs/go-ds-flatfs v0.0.1 h1:yqWwRYFOGNClUL7V2jvcx4KMMso1Jv+pgQzsv9/gWBs=
|
||||
github.com/ipfs/go-ds-flatfs v0.0.1/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM=
|
||||
github.com/ipfs/go-ds-flatfs v0.0.2 h1:1zujtU5bPBH6B8roE+TknKIbBCrpau865xUk0dH3x2A=
|
||||
github.com/ipfs/go-ds-flatfs v0.0.2/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM=
|
||||
github.com/ipfs/go-ds-leveldb v0.0.1 h1:Z0lsTFciec9qYsyngAw1f/czhRU35qBLR2vhavPFgqA=
|
||||
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
|
||||
github.com/ipfs/go-ds-measure v0.0.1 h1:PrCueug+yZLkDCOthZTXKinuoCal/GvlAT7cNxzr03g=
|
||||
@ -143,8 +145,6 @@ github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE
|
||||
github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw=
|
||||
github.com/ipfs/go-ipfs-cmdkit v0.0.1 h1:X6YXEAjUljTzevE6DPUKXSqcgf+4FXzcn5B957F5MXo=
|
||||
github.com/ipfs/go-ipfs-cmdkit v0.0.1/go.mod h1:9FtbMdUabcSqv/G4/8WCxSLxkZxn/aZEFrxxqnVcRbg=
|
||||
github.com/ipfs/go-ipfs-cmds v0.0.3 h1:QvNUE8lslNQghxXf6vzV1ZoMQCDDAtKG8f2oINiRew4=
|
||||
github.com/ipfs/go-ipfs-cmds v0.0.3/go.mod h1:1QVgxSgenZvOMGVC/XUTC7tJxRBGPLxYvpgPpCi3DUk=
|
||||
github.com/ipfs/go-ipfs-cmds v0.0.4 h1:Iq4I8irWw5TmHe/4pjSyYJLbYkkdMOgHVe8ofJmPa4k=
|
||||
github.com/ipfs/go-ipfs-cmds v0.0.4/go.mod h1:1QVgxSgenZvOMGVC/XUTC7tJxRBGPLxYvpgPpCi3DUk=
|
||||
github.com/ipfs/go-ipfs-config v0.0.1 h1:6ED08emzI1imdsAjixFi2pEyZxTVD5ECKtCOxLBx+Uc=
|
||||
@ -220,8 +220,6 @@ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec h1:DQqZhhDvrTrEQ3Q
|
||||
github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs=
|
||||
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
|
||||
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
|
||||
github.com/jbenet/go-is-domain v0.0.0-20160119110217-ba9815c809e0 h1:qxMncUW0TzViA3REiN3/YgVOoekVtUtEY0O/j/Qlctg=
|
||||
github.com/jbenet/go-is-domain v0.0.0-20160119110217-ba9815c809e0/go.mod h1:I9DYFcJAixF5f9iOu/9oC451/bq+QDTaLGznkcJPWgg=
|
||||
github.com/jbenet/go-is-domain v1.0.2 h1:11r5MSptcNFZyBoqubBQnVMUKRWLuRjL1banaIk+iYo=
|
||||
github.com/jbenet/go-is-domain v1.0.2/go.mod h1:xbRLRb0S7FgzDBTJlguhDVwLYM/5yNtvktxj2Ttfy7Q=
|
||||
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c h1:uUx61FiAa1GI6ZmVd2wf2vULeQZIKG66eybjNXKYCz4=
|
||||
|
||||
@ -2,7 +2,7 @@ package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@ -32,7 +32,7 @@ type Queue struct {
|
||||
// NewQueue creates a queue for cids
|
||||
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
|
||||
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
|
||||
head, tail, err := getQueueHeadTail(ctx, name, namespaced)
|
||||
head, tail, err := getQueueHeadTail(ctx, namespaced)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -142,40 +142,52 @@ func (q *Queue) work() {
|
||||
}
|
||||
|
||||
func (q *Queue) queueKey(id uint64) datastore.Key {
|
||||
return datastore.NewKey(strconv.FormatUint(id, 10))
|
||||
s := fmt.Sprintf("%016X", id)
|
||||
return datastore.NewKey(s)
|
||||
}
|
||||
|
||||
// crawl over the queue entries to find the head and tail
|
||||
func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) {
|
||||
q := query.Query{}
|
||||
results, err := datastore.Query(q)
|
||||
func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) {
|
||||
head, err := getQueueHead(datastore)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
var tail uint64
|
||||
var head uint64 = math.MaxUint64
|
||||
for entry := range results.Next() {
|
||||
trimmed := strings.TrimPrefix(entry.Key, "/")
|
||||
id, err := strconv.ParseUint(trimmed, 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
if id < head {
|
||||
head = id
|
||||
}
|
||||
|
||||
if (id + 1) > tail {
|
||||
tail = (id + 1)
|
||||
}
|
||||
}
|
||||
if err := results.Close(); err != nil {
|
||||
tail, err := getQueueTail(datastore)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
if head == math.MaxUint64 {
|
||||
head = 0
|
||||
}
|
||||
|
||||
return head, tail, nil
|
||||
}
|
||||
|
||||
func getQueueHead(ds datastore.Datastore) (uint64, error) {
|
||||
return getFirstIDByOrder(ds, query.OrderByKey{})
|
||||
}
|
||||
|
||||
func getQueueTail(ds datastore.Datastore) (uint64, error) {
|
||||
tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if tail > 0 {
|
||||
tail++
|
||||
}
|
||||
return tail, nil
|
||||
}
|
||||
|
||||
func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) {
|
||||
q := query.Query{Orders: []query.Order{order}}
|
||||
results, err := ds.Query(q)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer results.Close()
|
||||
r, ok := results.NextSync()
|
||||
if !ok {
|
||||
return 0, nil
|
||||
}
|
||||
trimmed := strings.TrimPrefix(r.Key, "/")
|
||||
id, err := strconv.ParseUint(trimmed, 16, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
@ -12,7 +12,7 @@ import (
|
||||
|
||||
func makeCids(n int) []cid.Cid {
|
||||
cids := make([]cid.Cid, 0, n)
|
||||
for i := 0; i < 10; i++ {
|
||||
for i := 0; i < n; i++ {
|
||||
c := blockGenerator.Next().Cid()
|
||||
cids = append(cids, c)
|
||||
}
|
||||
@ -129,3 +129,27 @@ func TestInitialization(t *testing.T) {
|
||||
|
||||
assertOrdered(cids[5:], queue, t)
|
||||
}
|
||||
|
||||
func TestInitializationWithManyCids(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
defer ctx.Done()
|
||||
|
||||
ds := sync.MutexWrap(datastore.NewMapDatastore())
|
||||
queue, err := NewQueue(ctx, "test", ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cids := makeCids(25)
|
||||
for _, c := range cids {
|
||||
queue.Enqueue(c)
|
||||
}
|
||||
|
||||
// make a new queue, same data
|
||||
queue, err = NewQueue(ctx, "test", ds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assertOrdered(cids, queue, t)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user