mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-26 21:07:45 +08:00
Merge pull request #3026 from ipfs/feature/blockstore-separation
blockstore: extract ARC cache from Bloom cache
This commit is contained in:
commit
8405b56df0
126
blocks/blockstore/arc_cache.go
Normal file
126
blocks/blockstore/arc_cache.go
Normal file
@ -0,0 +1,126 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-ipfs/blocks"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
|
||||
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||
)
|
||||
|
||||
type arccache struct {
|
||||
arc *lru.ARCCache
|
||||
blockstore Blockstore
|
||||
}
|
||||
|
||||
func arcCached(bs Blockstore, lruSize int) (*arccache, error) {
|
||||
arc, err := lru.NewARC(lruSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &arccache{arc: arc, blockstore: bs}, nil
|
||||
}
|
||||
|
||||
func (b *arccache) DeleteBlock(k key.Key) error {
|
||||
if has, ok := b.hasCached(k); ok && !has {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
b.arc.Remove(k) // Invalidate cache before deleting.
|
||||
err := b.blockstore.DeleteBlock(k)
|
||||
switch err {
|
||||
case nil, ds.ErrNotFound, ErrNotFound:
|
||||
b.arc.Add(k, false)
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// if ok == false has is inconclusive
|
||||
// if ok == true then has respons to question: is it contained
|
||||
func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {
|
||||
if k == "" {
|
||||
// Return cache invalid so the call to blockstore happens
|
||||
// in case of invalid key and correct error is created.
|
||||
return false, false
|
||||
}
|
||||
|
||||
h, ok := b.arc.Get(k)
|
||||
if ok {
|
||||
return h.(bool), true
|
||||
}
|
||||
return false, false
|
||||
}
|
||||
|
||||
func (b *arccache) Has(k key.Key) (bool, error) {
|
||||
if has, ok := b.hasCached(k); ok {
|
||||
return has, nil
|
||||
}
|
||||
|
||||
res, err := b.blockstore.Has(k)
|
||||
if err == nil {
|
||||
b.arc.Add(k, res)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (b *arccache) Get(k key.Key) (blocks.Block, error) {
|
||||
if has, ok := b.hasCached(k); ok && !has {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
bl, err := b.blockstore.Get(k)
|
||||
if bl == nil && err == ErrNotFound {
|
||||
b.arc.Add(k, false)
|
||||
} else if bl != nil {
|
||||
b.arc.Add(k, true)
|
||||
}
|
||||
return bl, err
|
||||
}
|
||||
|
||||
func (b *arccache) Put(bl blocks.Block) error {
|
||||
if has, ok := b.hasCached(bl.Key()); ok && has {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := b.blockstore.Put(bl)
|
||||
if err == nil {
|
||||
b.arc.Add(bl.Key(), true)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *arccache) PutMany(bs []blocks.Block) error {
|
||||
var good []blocks.Block
|
||||
for _, block := range bs {
|
||||
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
|
||||
good = append(good, block)
|
||||
}
|
||||
}
|
||||
err := b.blockstore.PutMany(bs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, block := range bs {
|
||||
b.arc.Add(block.Key(), true)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
|
||||
return b.blockstore.AllKeysChan(ctx)
|
||||
}
|
||||
|
||||
func (b *arccache) GCLock() Unlocker {
|
||||
return b.blockstore.(GCBlockstore).GCLock()
|
||||
}
|
||||
|
||||
func (b *arccache) PinLock() Unlocker {
|
||||
return b.blockstore.(GCBlockstore).PinLock()
|
||||
}
|
||||
|
||||
func (b *arccache) GCRequested() bool {
|
||||
return b.blockstore.(GCBlockstore).GCRequested()
|
||||
}
|
||||
67
blocks/blockstore/arc_cache_test.go
Normal file
67
blocks/blockstore/arc_cache_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-ipfs/blocks"
|
||||
"testing"
|
||||
|
||||
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
|
||||
syncds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
|
||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||
)
|
||||
|
||||
func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.TODO()
|
||||
}
|
||||
opts := DefaultCacheOpts()
|
||||
opts.HasBloomFilterSize = 0
|
||||
opts.HasBloomFilterHashes = 0
|
||||
bbs, err := CachedBlockstore(bs, ctx, opts)
|
||||
if err == nil {
|
||||
return bbs.(*arccache), nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveCacheEntryOnDelete(t *testing.T) {
|
||||
b := blocks.NewBlock([]byte("foo"))
|
||||
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
||||
bs := NewBlockstore(syncds.MutexWrap(cd))
|
||||
cachedbs, err := testArcCached(bs, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cachedbs.Put(b)
|
||||
|
||||
cd.Lock()
|
||||
writeHitTheDatastore := false
|
||||
cd.Unlock()
|
||||
|
||||
cd.SetFunc(func() {
|
||||
writeHitTheDatastore = true
|
||||
})
|
||||
|
||||
cachedbs.DeleteBlock(b.Key())
|
||||
cachedbs.Put(b)
|
||||
if !writeHitTheDatastore {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestElideDuplicateWrite(t *testing.T) {
|
||||
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
||||
bs := NewBlockstore(syncds.MutexWrap(cd))
|
||||
cachedbs, err := testArcCached(bs, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
b1 := blocks.NewBlock([]byte("foo"))
|
||||
|
||||
cachedbs.Put(b1)
|
||||
cd.SetFunc(func() {
|
||||
t.Fatal("write hit the datastore")
|
||||
})
|
||||
cachedbs.Put(b1)
|
||||
}
|
||||
@ -3,8 +3,6 @@ package blockstore
|
||||
import (
|
||||
"github.com/ipfs/go-ipfs/blocks"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
|
||||
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
||||
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
|
||||
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
||||
|
||||
@ -13,16 +11,12 @@ import (
|
||||
|
||||
// bloomCached returns Blockstore that caches Has requests using Bloom filter
|
||||
// Size is size of bloom filter in bytes
|
||||
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) {
|
||||
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount int) (*bloomcache, error) {
|
||||
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
arc, err := lru.NewARC(lruSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
|
||||
bc := &bloomcache{blockstore: bs, bloom: bl}
|
||||
bc.Invalidate()
|
||||
go bc.Rebuild(ctx)
|
||||
|
||||
@ -33,7 +27,6 @@ type bloomcache struct {
|
||||
bloom *bloom.Bloom
|
||||
active int32
|
||||
|
||||
arc *lru.ARCCache
|
||||
// This chan is only used for testing to wait for bloom to enable
|
||||
rebuildChan chan struct{}
|
||||
blockstore Blockstore
|
||||
@ -84,17 +77,7 @@ func (b *bloomcache) DeleteBlock(k key.Key) error {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
b.arc.Remove(k) // Invalidate cache before deleting.
|
||||
err := b.blockstore.DeleteBlock(k)
|
||||
switch err {
|
||||
case nil:
|
||||
b.arc.Add(k, false)
|
||||
case ds.ErrNotFound, ErrNotFound:
|
||||
b.arc.Add(k, false)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return b.blockstore.DeleteBlock(k)
|
||||
}
|
||||
|
||||
// if ok == false has is inconclusive
|
||||
@ -111,12 +94,7 @@ func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
|
||||
return false, true
|
||||
}
|
||||
}
|
||||
h, ok := b.arc.Get(k)
|
||||
if ok {
|
||||
return h.(bool), ok
|
||||
} else {
|
||||
return false, false
|
||||
}
|
||||
return false, false
|
||||
}
|
||||
|
||||
func (b *bloomcache) Has(k key.Key) (bool, error) {
|
||||
@ -124,11 +102,7 @@ func (b *bloomcache) Has(k key.Key) (bool, error) {
|
||||
return has, nil
|
||||
}
|
||||
|
||||
res, err := b.blockstore.Has(k)
|
||||
if err == nil {
|
||||
b.arc.Add(k, res)
|
||||
}
|
||||
return res, err
|
||||
return b.blockstore.Has(k)
|
||||
}
|
||||
|
||||
func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
|
||||
@ -136,13 +110,7 @@ func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
bl, err := b.blockstore.Get(k)
|
||||
if bl == nil && err == ErrNotFound {
|
||||
b.arc.Add(k, false)
|
||||
} else if bl != nil {
|
||||
b.arc.Add(k, true)
|
||||
}
|
||||
return bl, err
|
||||
return b.blockstore.Get(k)
|
||||
}
|
||||
|
||||
func (b *bloomcache) Put(bl blocks.Block) error {
|
||||
@ -153,7 +121,6 @@ func (b *bloomcache) Put(bl blocks.Block) error {
|
||||
err := b.blockstore.Put(bl)
|
||||
if err == nil {
|
||||
b.bloom.AddTS([]byte(bl.Key()))
|
||||
b.arc.Add(bl.Key(), true)
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -169,7 +136,6 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
|
||||
if err == nil {
|
||||
for _, block := range bs {
|
||||
b.bloom.AddTS([]byte(block.Key()))
|
||||
b.arc.Add(block.Key(), true)
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
||||
@ -19,6 +19,7 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error)
|
||||
ctx = context.TODO()
|
||||
}
|
||||
opts := DefaultCacheOpts()
|
||||
opts.HasARCCacheSize = 0
|
||||
bbs, err := CachedBlockstore(bs, ctx, opts)
|
||||
if err == nil {
|
||||
return bbs.(*bloomcache), nil
|
||||
@ -29,56 +30,10 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error)
|
||||
|
||||
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
|
||||
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
|
||||
_, err := bloomCached(bs, context.TODO(), 100, 1, -1)
|
||||
_, err := bloomCached(bs, context.TODO(), -1, 1)
|
||||
if err == nil {
|
||||
t.Fail()
|
||||
}
|
||||
_, err = bloomCached(bs, context.TODO(), -1, 1, 100)
|
||||
if err == nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveCacheEntryOnDelete(t *testing.T) {
|
||||
b := blocks.NewBlock([]byte("foo"))
|
||||
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
||||
bs := NewBlockstore(syncds.MutexWrap(cd))
|
||||
cachedbs, err := testBloomCached(bs, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cachedbs.Put(b)
|
||||
|
||||
cd.Lock()
|
||||
writeHitTheDatastore := false
|
||||
cd.Unlock()
|
||||
|
||||
cd.SetFunc(func() {
|
||||
writeHitTheDatastore = true
|
||||
})
|
||||
|
||||
cachedbs.DeleteBlock(b.Key())
|
||||
cachedbs.Put(b)
|
||||
if !writeHitTheDatastore {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestElideDuplicateWrite(t *testing.T) {
|
||||
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
||||
bs := NewBlockstore(syncds.MutexWrap(cd))
|
||||
cachedbs, err := testBloomCached(bs, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
b1 := blocks.NewBlock([]byte("foo"))
|
||||
|
||||
cachedbs.Put(b1)
|
||||
cd.SetFunc(func() {
|
||||
t.Fatal("write hit the datastore")
|
||||
})
|
||||
cachedbs.Put(b1)
|
||||
}
|
||||
func TestHasIsBloomCached(t *testing.T) {
|
||||
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
|
||||
|
||||
@ -34,8 +34,10 @@ func CachedBlockstore(bs GCBlockstore,
|
||||
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
|
||||
}
|
||||
if opts.HasBloomFilterSize != 0 {
|
||||
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes,
|
||||
opts.HasARCCacheSize)
|
||||
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes)
|
||||
}
|
||||
if opts.HasARCCacheSize > 0 {
|
||||
cbs, err = arcCached(cbs, opts.HasARCCacheSize)
|
||||
}
|
||||
|
||||
return cbs, err
|
||||
|
||||
Loading…
Reference in New Issue
Block a user