mirror of
https://github.com/ipfs/kubo.git
synced 2026-02-24 20:07:45 +08:00
Merge pull request #3348 from ipfs/kevina/gclocker
Separate out the G.C. Locking from the Blockstore interface.
This commit is contained in:
commit
1ca2d42889
@ -13,7 +13,7 @@ import (
|
||||
|
||||
var exampleBlock = blocks.NewBlock([]byte("foo"))
|
||||
|
||||
func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) {
|
||||
func testArcCached(bs Blockstore, ctx context.Context) (*arccache, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.TODO()
|
||||
}
|
||||
|
||||
@ -39,9 +39,7 @@ type Blockstore interface {
|
||||
AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
|
||||
}
|
||||
|
||||
type GCBlockstore interface {
|
||||
Blockstore
|
||||
|
||||
type GCLocker interface {
|
||||
// GCLock locks the blockstore for garbage collection. No operations
|
||||
// that expect to finish with a pin should ocurr simultaneously.
|
||||
// Reading during GC is safe, and requires no lock.
|
||||
@ -58,6 +56,20 @@ type GCBlockstore interface {
|
||||
GCRequested() bool
|
||||
}
|
||||
|
||||
type GCBlockstore interface {
|
||||
Blockstore
|
||||
GCLocker
|
||||
}
|
||||
|
||||
func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
|
||||
return gcBlockstore{bs, gcl}
|
||||
}
|
||||
|
||||
type gcBlockstore struct {
|
||||
Blockstore
|
||||
GCLocker
|
||||
}
|
||||
|
||||
func NewBlockstore(d ds.Batching) *blockstore {
|
||||
var dsb ds.Batching
|
||||
dd := dsns.Wrap(d, BlockPrefix)
|
||||
@ -223,6 +235,16 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
|
||||
return output, nil
|
||||
}
|
||||
|
||||
func NewGCLocker() *gclocker {
|
||||
return &gclocker{}
|
||||
}
|
||||
|
||||
type gclocker struct {
|
||||
lk sync.RWMutex
|
||||
gcreq int32
|
||||
gcreqlk sync.Mutex
|
||||
}
|
||||
|
||||
type Unlocker interface {
|
||||
Unlock()
|
||||
}
|
||||
@ -236,18 +258,18 @@ func (u *unlocker) Unlock() {
|
||||
u.unlock = nil // ensure its not called twice
|
||||
}
|
||||
|
||||
func (bs *blockstore) GCLock() Unlocker {
|
||||
func (bs *gclocker) GCLock() Unlocker {
|
||||
atomic.AddInt32(&bs.gcreq, 1)
|
||||
bs.lk.Lock()
|
||||
atomic.AddInt32(&bs.gcreq, -1)
|
||||
return &unlocker{bs.lk.Unlock}
|
||||
}
|
||||
|
||||
func (bs *blockstore) PinLock() Unlocker {
|
||||
func (bs *gclocker) PinLock() Unlocker {
|
||||
bs.lk.RLock()
|
||||
return &unlocker{bs.lk.RUnlock}
|
||||
}
|
||||
|
||||
func (bs *blockstore) GCRequested() bool {
|
||||
func (bs *gclocker) GCRequested() bool {
|
||||
return atomic.LoadInt32(&bs.gcreq) > 0
|
||||
}
|
||||
|
||||
@ -14,7 +14,7 @@ import (
|
||||
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
|
||||
)
|
||||
|
||||
func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) {
|
||||
func testBloomCached(bs Blockstore, ctx context.Context) (*bloomcache, error) {
|
||||
if ctx == nil {
|
||||
ctx = context.TODO()
|
||||
}
|
||||
|
||||
@ -22,8 +22,8 @@ func DefaultCacheOpts() CacheOpts {
|
||||
}
|
||||
}
|
||||
|
||||
func CachedBlockstore(bs GCBlockstore,
|
||||
ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) {
|
||||
func CachedBlockstore(bs Blockstore,
|
||||
ctx context.Context, opts CacheOpts) (cbs Blockstore, err error) {
|
||||
cbs = bs
|
||||
|
||||
if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
|
||||
|
||||
@ -36,14 +36,14 @@ func TestWriteThroughWorks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil)
|
||||
var _ blockstore.Blockstore = (*PutCountingBlockstore)(nil)
|
||||
|
||||
type PutCountingBlockstore struct {
|
||||
blockstore.GCBlockstore
|
||||
blockstore.Blockstore
|
||||
PutCounter int
|
||||
}
|
||||
|
||||
func (bs *PutCountingBlockstore) Put(block blocks.Block) error {
|
||||
bs.PutCounter++
|
||||
return bs.GCBlockstore.Put(block)
|
||||
return bs.Blockstore.Put(block)
|
||||
}
|
||||
|
||||
@ -179,11 +179,13 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
|
||||
opts.HasBloomFilterSize = 0
|
||||
}
|
||||
|
||||
n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, opts)
|
||||
cbs, err := bstore.CachedBlockstore(bs, ctx, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker())
|
||||
|
||||
rcfg, err := n.Repo.Config()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -22,7 +22,7 @@ import (
|
||||
"gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
|
||||
)
|
||||
|
||||
func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore) {
|
||||
func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blockstore) {
|
||||
dstore := ds.NewMapDatastore()
|
||||
tsds := sync.MutexWrap(dstore)
|
||||
bstore := blockstore.NewBlockstore(tsds)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user