mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-23 03:17:43 +08:00
182 lines
4.1 KiB
Go
182 lines
4.1 KiB
Go
package blockstore
|
|
|
|
import (
|
|
"github.com/ipfs/go-ipfs/blocks"
|
|
key "github.com/ipfs/go-ipfs/blocks/key"
|
|
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
|
|
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
|
|
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
|
|
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
|
|
|
|
"sync/atomic"
|
|
)
|
|
|
|
// BloomCached returns Blockstore that caches Has requests using Bloom filter
|
|
// Size is size of bloom filter in bytes
|
|
func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
|
|
bl, err := bloom.New(float64(bloomSize), float64(7))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
arc, err := lru.NewARC(lruSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
|
|
bc.Invalidate()
|
|
go bc.Rebuild()
|
|
|
|
return bc, nil
|
|
}
|
|
|
|
type bloomcache struct {
|
|
bloom *bloom.Bloom
|
|
active int32
|
|
|
|
arc *lru.ARCCache
|
|
// This chan is only used for testing to wait for bloom to enable
|
|
rebuildChan chan struct{}
|
|
blockstore Blockstore
|
|
|
|
// Statistics
|
|
hits uint64
|
|
misses uint64
|
|
}
|
|
|
|
func (b *bloomcache) Invalidate() {
|
|
b.rebuildChan = make(chan struct{})
|
|
atomic.StoreInt32(&b.active, 0)
|
|
}
|
|
|
|
func (b *bloomcache) BloomActive() bool {
|
|
return atomic.LoadInt32(&b.active) != 0
|
|
}
|
|
|
|
func (b *bloomcache) Rebuild() {
|
|
ctx := context.TODO()
|
|
evt := log.EventBegin(ctx, "bloomcache.Rebuild")
|
|
defer evt.Done()
|
|
|
|
ch, err := b.blockstore.AllKeysChan(ctx)
|
|
if err != nil {
|
|
log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
|
|
return
|
|
}
|
|
for key := range ch {
|
|
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
|
|
}
|
|
close(b.rebuildChan)
|
|
atomic.StoreInt32(&b.active, 1)
|
|
}
|
|
|
|
func (b *bloomcache) DeleteBlock(k key.Key) error {
|
|
if has, ok := b.hasCached(k); ok && !has {
|
|
return ErrNotFound
|
|
}
|
|
|
|
b.arc.Remove(k) // Invalidate cache before deleting.
|
|
err := b.blockstore.DeleteBlock(k)
|
|
switch err {
|
|
case nil:
|
|
b.arc.Add(k, false)
|
|
case ds.ErrNotFound, ErrNotFound:
|
|
b.arc.Add(k, false)
|
|
default:
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// if ok == false has is inconclusive
|
|
// if ok == true then has respons to question: is it contained
|
|
func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
|
|
if k == "" {
|
|
// Return cache invalid so call to blockstore
|
|
// in case of invalid key is forwarded deeper
|
|
return false, false
|
|
}
|
|
if b.BloomActive() {
|
|
blr := b.bloom.HasTS([]byte(k))
|
|
if blr == false { // not contained in bloom is only conclusive answer bloom gives
|
|
return false, true
|
|
}
|
|
}
|
|
h, ok := b.arc.Get(k)
|
|
if ok {
|
|
return h.(bool), ok
|
|
} else {
|
|
return false, false
|
|
}
|
|
}
|
|
|
|
func (b *bloomcache) Has(k key.Key) (bool, error) {
|
|
if has, ok := b.hasCached(k); ok {
|
|
return has, nil
|
|
}
|
|
|
|
res, err := b.blockstore.Has(k)
|
|
if err == nil {
|
|
b.arc.Add(k, res)
|
|
}
|
|
return res, err
|
|
}
|
|
|
|
func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
|
|
if has, ok := b.hasCached(k); ok && !has {
|
|
return nil, ErrNotFound
|
|
}
|
|
|
|
bl, err := b.blockstore.Get(k)
|
|
if bl == nil && err == ErrNotFound {
|
|
b.arc.Add(k, false)
|
|
} else if bl != nil {
|
|
b.arc.Add(k, true)
|
|
}
|
|
return bl, err
|
|
}
|
|
|
|
func (b *bloomcache) Put(bl blocks.Block) error {
|
|
if has, ok := b.hasCached(bl.Key()); ok && has {
|
|
return nil
|
|
}
|
|
|
|
err := b.blockstore.Put(bl)
|
|
if err == nil {
|
|
b.bloom.AddTS([]byte(bl.Key()))
|
|
b.arc.Add(bl.Key(), true)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (b *bloomcache) PutMany(bs []blocks.Block) error {
|
|
var good []blocks.Block
|
|
for _, block := range bs {
|
|
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
|
|
good = append(good, block)
|
|
}
|
|
}
|
|
err := b.blockstore.PutMany(bs)
|
|
if err == nil {
|
|
for _, block := range bs {
|
|
b.bloom.AddTS([]byte(block.Key()))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
|
|
return b.blockstore.AllKeysChan(ctx)
|
|
}
|
|
|
|
func (b *bloomcache) GCLock() Unlocker {
|
|
return b.blockstore.(GCBlockstore).GCLock()
|
|
}
|
|
|
|
func (b *bloomcache) PinLock() Unlocker {
|
|
return b.blockstore.(GCBlockstore).PinLock()
|
|
}
|
|
|
|
func (b *bloomcache) GCRequested() bool {
|
|
return b.blockstore.(GCBlockstore).GCRequested()
|
|
}
|