From 4183902e4620bd24675555718497975da6863420 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 2 Aug 2016 01:10:48 +0100 Subject: [PATCH 1/3] blockstore: extract ARC cache from Bloom cache it removes race condition that would happen during various calls License: MIT Signed-off-by: Jakub Sztandera --- blocks/blockstore/arc_cache.go | 127 ++++++++++++++++++++++++++ blocks/blockstore/arc_cache_test.go | 67 ++++++++++++++ blocks/blockstore/bloom_cache.go | 46 ++-------- blocks/blockstore/bloom_cache_test.go | 49 +--------- blocks/blockstore/caching.go | 6 +- 5 files changed, 206 insertions(+), 89 deletions(-) create mode 100644 blocks/blockstore/arc_cache.go create mode 100644 blocks/blockstore/arc_cache_test.go diff --git a/blocks/blockstore/arc_cache.go b/blocks/blockstore/arc_cache.go new file mode 100644 index 000000000..37a8f0d02 --- /dev/null +++ b/blocks/blockstore/arc_cache.go @@ -0,0 +1,127 @@ +package blockstore + +import ( + "github.com/ipfs/go-ipfs/blocks" + key "github.com/ipfs/go-ipfs/blocks/key" + ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore" + lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" +) + +type arccache struct { + arc *lru.ARCCache + blockstore Blockstore +} + +func arcCached(bs Blockstore, lruSize int) (*arccache, error) { + arc, err := lru.NewARC(lruSize) + if err != nil { + return nil, err + } + + return &arccache{arc: arc, blockstore: bs}, nil +} + +func (b *arccache) DeleteBlock(k key.Key) error { + if has, ok := b.hasCached(k); ok && !has { + return ErrNotFound + } + + b.arc.Remove(k) // Invalidate cache before deleting. + err := b.blockstore.DeleteBlock(k) + switch err { + case nil: + b.arc.Add(k, false) + case ds.ErrNotFound, ErrNotFound: + b.arc.Add(k, false) + default: + return err + } + return nil +} + +// if ok == false has is inconclusive +// if ok == true then has respons to question: is it contained +func (b *arccache) hasCached(k key.Key) (has bool, ok bool) { + if k == "" { + // Return cache invalid so call to blockstore + // in case of invalid key is forwarded deeper + return false, false + } + h, ok := b.arc.Get(k) + if ok { + return h.(bool), ok + } else { + return false, false + } +} + +func (b *arccache) Has(k key.Key) (bool, error) { + if has, ok := b.hasCached(k); ok { + return has, nil + } + + res, err := b.blockstore.Has(k) + if err == nil { + b.arc.Add(k, res) + } + return res, err +} + +func (b *arccache) Get(k key.Key) (blocks.Block, error) { + if has, ok := b.hasCached(k); ok && !has { + return nil, ErrNotFound + } + + bl, err := b.blockstore.Get(k) + if bl == nil && err == ErrNotFound { + b.arc.Add(k, false) + } else if bl != nil { + b.arc.Add(k, true) + } + return bl, err +} + +func (b *arccache) Put(bl blocks.Block) error { + if has, ok := b.hasCached(bl.Key()); ok && has { + return nil + } + + err := b.blockstore.Put(bl) + if err == nil { + b.arc.Add(bl.Key(), true) + } + return err +} + +func (b *arccache) PutMany(bs []blocks.Block) error { + var good []blocks.Block + for _, block := range bs { + if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) { + good = append(good, block) + } + } + err := b.blockstore.PutMany(bs) + if err == nil { + for _, block := range bs { + b.arc.Add(block.Key(), true) + } + } + return err +} + +func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { + return b.blockstore.AllKeysChan(ctx) +} + +func (b *arccache) GCLock() Unlocker { + return b.blockstore.(GCBlockstore).GCLock() +} + +func (b *arccache) PinLock() Unlocker { + return b.blockstore.(GCBlockstore).PinLock() +} + +func (b *arccache) GCRequested() bool { + return b.blockstore.(GCBlockstore).GCRequested() +} diff --git a/blocks/blockstore/arc_cache_test.go b/blocks/blockstore/arc_cache_test.go new file mode 100644 index 000000000..505f7e1ea --- /dev/null +++ b/blocks/blockstore/arc_cache_test.go @@ -0,0 +1,67 @@ +package blockstore + +import ( + "github.com/ipfs/go-ipfs/blocks" + "testing" + + ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore" + syncds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync" + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" +) + +func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) { + if ctx == nil { + ctx = context.TODO() + } + opts := DefaultCacheOpts() + opts.HasBloomFilterSize = 0 + opts.HasBloomFilterHashes = 0 + bbs, err := CachedBlockstore(bs, ctx, opts) + if err == nil { + return bbs.(*arccache), nil + } else { + return nil, err + } +} + +func TestRemoveCacheEntryOnDelete(t *testing.T) { + b := blocks.NewBlock([]byte("foo")) + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + cachedbs, err := testArcCached(bs, nil) + if err != nil { + t.Fatal(err) + } + cachedbs.Put(b) + + cd.Lock() + writeHitTheDatastore := false + cd.Unlock() + + cd.SetFunc(func() { + writeHitTheDatastore = true + }) + + cachedbs.DeleteBlock(b.Key()) + cachedbs.Put(b) + if !writeHitTheDatastore { + t.Fail() + } +} + +func TestElideDuplicateWrite(t *testing.T) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + cachedbs, err := testArcCached(bs, nil) + if err != nil { + t.Fatal(err) + } + + b1 := blocks.NewBlock([]byte("foo")) + + cachedbs.Put(b1) + cd.SetFunc(func() { + t.Fatal("write hit the datastore") + }) + cachedbs.Put(b1) +} diff --git a/blocks/blockstore/bloom_cache.go b/blocks/blockstore/bloom_cache.go index 35d6ce38f..e10dacfaf 100644 --- a/blocks/blockstore/bloom_cache.go +++ b/blocks/blockstore/bloom_cache.go @@ -3,8 +3,6 @@ package blockstore import ( "github.com/ipfs/go-ipfs/blocks" key "github.com/ipfs/go-ipfs/blocks/key" - ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore" - lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" @@ -13,16 +11,12 @@ import ( // bloomCached returns Blockstore that caches Has requests using Bloom filter // Size is size of bloom filter in bytes -func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) { +func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount int) (*bloomcache, error) { bl, err := bloom.New(float64(bloomSize), float64(hashCount)) if err != nil { return nil, err } - arc, err := lru.NewARC(lruSize) - if err != nil { - return nil, err - } - bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc} + bc := &bloomcache{blockstore: bs, bloom: bl} bc.Invalidate() go bc.Rebuild(ctx) @@ -33,7 +27,6 @@ type bloomcache struct { bloom *bloom.Bloom active int32 - arc *lru.ARCCache // This chan is only used for testing to wait for bloom to enable rebuildChan chan struct{} blockstore Blockstore @@ -84,17 +77,7 @@ func (b *bloomcache) DeleteBlock(k key.Key) error { return ErrNotFound } - b.arc.Remove(k) // Invalidate cache before deleting. - err := b.blockstore.DeleteBlock(k) - switch err { - case nil: - b.arc.Add(k, false) - case ds.ErrNotFound, ErrNotFound: - b.arc.Add(k, false) - default: - return err - } - return nil + return b.blockstore.DeleteBlock(k) } // if ok == false has is inconclusive @@ -111,12 +94,7 @@ func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) { return false, true } } - h, ok := b.arc.Get(k) - if ok { - return h.(bool), ok - } else { - return false, false - } + return false, false } func (b *bloomcache) Has(k key.Key) (bool, error) { @@ -124,11 +102,7 @@ func (b *bloomcache) Has(k key.Key) (bool, error) { return has, nil } - res, err := b.blockstore.Has(k) - if err == nil { - b.arc.Add(k, res) - } - return res, err + return b.blockstore.Has(k) } func (b *bloomcache) Get(k key.Key) (blocks.Block, error) { @@ -136,13 +110,7 @@ func (b *bloomcache) Get(k key.Key) (blocks.Block, error) { return nil, ErrNotFound } - bl, err := b.blockstore.Get(k) - if bl == nil && err == ErrNotFound { - b.arc.Add(k, false) - } else if bl != nil { - b.arc.Add(k, true) - } - return bl, err + return b.blockstore.Get(k) } func (b *bloomcache) Put(bl blocks.Block) error { @@ -153,7 +121,6 @@ func (b *bloomcache) Put(bl blocks.Block) error { err := b.blockstore.Put(bl) if err == nil { b.bloom.AddTS([]byte(bl.Key())) - b.arc.Add(bl.Key(), true) } return err } @@ -169,7 +136,6 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error { if err == nil { for _, block := range bs { b.bloom.AddTS([]byte(block.Key())) - b.arc.Add(block.Key(), true) } } return err diff --git a/blocks/blockstore/bloom_cache_test.go b/blocks/blockstore/bloom_cache_test.go index d9cc5c817..fbffd42f5 100644 --- a/blocks/blockstore/bloom_cache_test.go +++ b/blocks/blockstore/bloom_cache_test.go @@ -19,6 +19,7 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) ctx = context.TODO() } opts := DefaultCacheOpts() + opts.HasARCCacheSize = 0 bbs, err := CachedBlockstore(bs, ctx, opts) if err == nil { return bbs.(*bloomcache), nil @@ -29,56 +30,10 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) func TestReturnsErrorWhenSizeNegative(t *testing.T) { bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) - _, err := bloomCached(bs, context.TODO(), 100, 1, -1) + _, err := bloomCached(bs, context.TODO(), -1, 1) if err == nil { t.Fail() } - _, err = bloomCached(bs, context.TODO(), -1, 1, 100) - if err == nil { - t.Fail() - } -} - -func TestRemoveCacheEntryOnDelete(t *testing.T) { - b := blocks.NewBlock([]byte("foo")) - cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} - bs := NewBlockstore(syncds.MutexWrap(cd)) - cachedbs, err := testBloomCached(bs, nil) - if err != nil { - t.Fatal(err) - } - cachedbs.Put(b) - - cd.Lock() - writeHitTheDatastore := false - cd.Unlock() - - cd.SetFunc(func() { - writeHitTheDatastore = true - }) - - cachedbs.DeleteBlock(b.Key()) - cachedbs.Put(b) - if !writeHitTheDatastore { - t.Fail() - } -} - -func TestElideDuplicateWrite(t *testing.T) { - cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} - bs := NewBlockstore(syncds.MutexWrap(cd)) - cachedbs, err := testBloomCached(bs, nil) - if err != nil { - t.Fatal(err) - } - - b1 := blocks.NewBlock([]byte("foo")) - - cachedbs.Put(b1) - cd.SetFunc(func() { - t.Fatal("write hit the datastore") - }) - cachedbs.Put(b1) } func TestHasIsBloomCached(t *testing.T) { cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} diff --git a/blocks/blockstore/caching.go b/blocks/blockstore/caching.go index 689a9b5fc..f691f89f8 100644 --- a/blocks/blockstore/caching.go +++ b/blocks/blockstore/caching.go @@ -34,8 +34,10 @@ func CachedBlockstore(bs GCBlockstore, return nil, errors.New("bloom filter hash count can't be 0 when there is size set") } if opts.HasBloomFilterSize != 0 { - cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes, - opts.HasARCCacheSize) + cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes) + } + if opts.HasARCCacheSize > 0 { + cbs, err = arcCached(cbs, opts.HasARCCacheSize) } return cbs, err From 9543ed6ca1c8ceea157eef9d24d3f536362d6cd9 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 2 Aug 2016 10:59:38 +0100 Subject: [PATCH 2/3] blockstore: cleanup style a bit License: MIT Signed-off-by: Jakub Sztandera --- blocks/blockstore/arc_cache.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/blocks/blockstore/arc_cache.go b/blocks/blockstore/arc_cache.go index 37a8f0d02..7ab07e1d5 100644 --- a/blocks/blockstore/arc_cache.go +++ b/blocks/blockstore/arc_cache.go @@ -102,12 +102,13 @@ func (b *arccache) PutMany(bs []blocks.Block) error { } } err := b.blockstore.PutMany(bs) - if err == nil { - for _, block := range bs { - b.arc.Add(block.Key(), true) - } + if err != nil { + return err } - return err + for _, block := range bs { + b.arc.Add(block.Key(), true) + } + return nil } func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { From 61b481628ad6b8779fa49ba13dd5e41bcca4982a Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Wed, 3 Aug 2016 13:54:37 +0200 Subject: [PATCH 3/3] blockstore: cleanup the style removing some mess from the refactor License: MIT Signed-off-by: Jakub Sztandera --- blocks/blockstore/arc_cache.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/blocks/blockstore/arc_cache.go b/blocks/blockstore/arc_cache.go index 7ab07e1d5..63253ef9c 100644 --- a/blocks/blockstore/arc_cache.go +++ b/blocks/blockstore/arc_cache.go @@ -30,30 +30,28 @@ func (b *arccache) DeleteBlock(k key.Key) error { b.arc.Remove(k) // Invalidate cache before deleting. err := b.blockstore.DeleteBlock(k) switch err { - case nil: - b.arc.Add(k, false) - case ds.ErrNotFound, ErrNotFound: + case nil, ds.ErrNotFound, ErrNotFound: b.arc.Add(k, false) + return nil default: return err } - return nil } // if ok == false has is inconclusive // if ok == true then has respons to question: is it contained func (b *arccache) hasCached(k key.Key) (has bool, ok bool) { if k == "" { - // Return cache invalid so call to blockstore - // in case of invalid key is forwarded deeper + // Return cache invalid so the call to blockstore happens + // in case of invalid key and correct error is created. return false, false } + h, ok := b.arc.Get(k) if ok { - return h.(bool), ok - } else { - return false, false + return h.(bool), true } + return false, false } func (b *arccache) Has(k key.Key) (bool, error) {