ceremonyclient/pebble/objstorage/objstorageprovider/sharedcache/shared_cache_test.go
Cassandra Heart 2e2a1e4789
v1.2.0 (#31)
2024-01-03 01:31:42 -06:00

266 lines
8.5 KiB
Go

package sharedcache_test
import (
"bytes"
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)
func TestSharedCache(t *testing.T) {
ctx := context.Background()
datadriven.Walk(t, "testdata/cache", func(t *testing.T, path string) {
var log base.InMemLogger
fs := vfs.WithLogging(vfs.NewMem(), func(fmt string, args ...interface{}) {
log.Infof("<local fs> "+fmt, args...)
})
provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(fs, ""))
require.NoError(t, err)
var cache *sharedcache.Cache
defer func() {
if cache != nil {
cache.Close()
}
}()
var objData []byte
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
log.Reset()
switch d.Cmd {
case "init":
blockSize := parseBytesArg(t, d, "block-size", 32*1024)
shardingBlockSize := parseBytesArg(t, d, "sharding-block-size", 1024*1024)
numShards := parseBytesArg(t, d, "num-shards", 32)
size := parseBytesArg(t, d, "size", numShards*shardingBlockSize)
if size%(numShards*shardingBlockSize) != 0 {
d.Fatalf(t, "size (%d) must be a multiple of numShards (%d) * shardingBlockSize(%d)",
size, numShards, shardingBlockSize,
)
}
cache, err = sharedcache.Open(
fs, base.DefaultLogger, "", blockSize, int64(shardingBlockSize), int64(size), numShards,
)
require.NoError(t, err)
return fmt.Sprintf("initialized with block-size=%d size=%d num-shards=%d", blockSize, size, numShards)
case "write":
size := mustParseBytesArg(t, d, "size")
writable, _, err := provider.Create(ctx, base.FileTypeTable, base.FileNum(1).DiskFileNum(), objstorage.CreateOptions{})
require.NoError(t, err)
defer writable.Finish()
// With invariants on, Write will modify its input buffer.
objData = make([]byte, size)
wrote := make([]byte, size)
for i := 0; i < size; i++ {
objData[i] = byte(i)
wrote[i] = byte(i)
}
err = writable.Write(wrote)
// Writing a file is test setup, and it always is expected to succeed, so we assert
// within the test, rather than returning n and/or err here.
require.NoError(t, err)
return ""
case "read", "read-for-compaction":
missesBefore := cache.Metrics().ReadsWithPartialHit + cache.Metrics().ReadsWithNoHit
offset := mustParseBytesArg(t, d, "offset")
size := mustParseBytesArg(t, d, "size")
readable, err := provider.OpenForReading(ctx, base.FileTypeTable, base.FileNum(1).DiskFileNum(), objstorage.OpenOptions{})
require.NoError(t, err)
defer readable.Close()
got := make([]byte, size)
flags := sharedcache.ReadFlags{
ReadOnly: d.Cmd == "read-for-compaction",
}
err = cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, int64(offset), readable, readable.Size(), flags)
// We always expect cache.ReadAt to succeed.
require.NoError(t, err)
// It is easier to assert this condition programmatically, rather than returning
// got, which may be very large.
require.True(t, bytes.Equal(objData[int(offset):int(offset)+size], got), "incorrect data returned")
// In order to ensure we get a hit on the next read, we must wait for writing to
// the cache to complete.
cache.WaitForWritesToComplete()
// TODO(josh): Not tracing out filesystem activity here, since logging_fs.go
// doesn't trace calls to ReadAt or WriteAt. We should consider changing this.
missesAfter := cache.Metrics().ReadsWithPartialHit + cache.Metrics().ReadsWithNoHit
return fmt.Sprintf("misses=%d", missesAfter-missesBefore)
default:
d.Fatalf(t, "unknown command %s", d.Cmd)
return ""
}
})
})
}
func TestSharedCacheRandomized(t *testing.T) {
ctx := context.Background()
var log base.InMemLogger
fs := vfs.WithLogging(vfs.NewMem(), func(fmt string, args ...interface{}) {
log.Infof("<local fs> "+fmt, args...)
})
provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(fs, ""))
require.NoError(t, err)
seed := uint64(time.Now().UnixNano())
fmt.Printf("seed: %v\n", seed)
rand.Seed(seed)
helper := func(
blockSize int,
shardingBlockSize int64) func(t *testing.T) {
return func(t *testing.T) {
for _, concurrentReads := range []bool{false, true} {
t.Run(fmt.Sprintf("concurrentReads=%v", concurrentReads), func(t *testing.T) {
maxShards := 32
if invariants.RaceEnabled {
maxShards = 8
}
numShards := rand.Intn(maxShards) + 1
cacheSize := shardingBlockSize * int64(numShards) // minimum allowed cache size
cache, err := sharedcache.Open(fs, base.DefaultLogger, "", blockSize, shardingBlockSize, cacheSize, numShards)
require.NoError(t, err)
defer cache.Close()
writable, _, err := provider.Create(ctx, base.FileTypeTable, base.FileNum(1).DiskFileNum(), objstorage.CreateOptions{})
require.NoError(t, err)
// With invariants on, Write will modify its input buffer.
// If size == 0, we can see panics below, so force a nonzero size.
size := rand.Int63n(cacheSize-1) + 1
objData := make([]byte, size)
wrote := make([]byte, size)
for i := 0; i < int(size); i++ {
objData[i] = byte(i)
wrote[i] = byte(i)
}
require.NoError(t, writable.Write(wrote))
require.NoError(t, writable.Finish())
readable, err := provider.OpenForReading(ctx, base.FileTypeTable, base.FileNum(1).DiskFileNum(), objstorage.OpenOptions{})
require.NoError(t, err)
defer readable.Close()
const numDistinctReads = 100
wg := sync.WaitGroup{}
for i := 0; i < numDistinctReads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
offset := rand.Int63n(size)
got := make([]byte, size-offset)
err := cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, offset, readable, readable.Size(), sharedcache.ReadFlags{})
require.NoError(t, err)
require.Equal(t, objData[int(offset):], got)
got = make([]byte, size-offset)
err = cache.ReadAt(ctx, base.FileNum(1).DiskFileNum(), got, offset, readable, readable.Size(), sharedcache.ReadFlags{})
require.NoError(t, err)
require.Equal(t, objData[int(offset):], got)
}()
// If concurrent reads, only wait 50% of loop iterations on average.
if concurrentReads && rand.Intn(2) == 0 {
wg.Wait()
}
if !concurrentReads {
wg.Wait()
}
}
wg.Wait()
})
}
}
}
t.Run("32 KB block size", helper(32*1024, 1024*1024))
t.Run("1 MB block size", helper(1024*1024, 1024*1024))
if !invariants.RaceEnabled {
for i := 0; i < 5; i++ {
exp := rand.Intn(11) + 10 // [10, 20]
randomBlockSize := 1 << exp // [1 KB, 1 MB]
factor := rand.Intn(4) + 1 // [1, 4]
randomShardingBlockSize := int64(randomBlockSize * factor) // [1 KB, 4 MB]
t.Run("random block and sharding block size", helper(randomBlockSize, randomShardingBlockSize))
}
}
}
// parseBytesArg parses an optional argument that specifies a byte size; if the
// argument is not specified the default value is used. K/M/G suffixes are
// supported.
func parseBytesArg(t testing.TB, d *datadriven.TestData, argName string, defaultValue int) int {
res, ok := tryParseBytesArg(t, d, argName)
if !ok {
return defaultValue
}
return res
}
// parseBytesArg parses a mandatory argument that specifies a byte size; K/M/G
// suffixes are supported.
func mustParseBytesArg(t testing.TB, d *datadriven.TestData, argName string) int {
res, ok := tryParseBytesArg(t, d, argName)
if !ok {
t.Fatalf("argument '%s' missing", argName)
}
return res
}
func tryParseBytesArg(t testing.TB, d *datadriven.TestData, argName string) (val int, ok bool) {
arg, ok := d.Arg(argName)
if !ok {
return 0, false
}
if len(arg.Vals) != 1 {
t.Fatalf("expected 1 value for '%s'", argName)
}
v := arg.Vals[0]
factor := 1
switch v[len(v)-1] {
case 'k', 'K':
factor = 1024
case 'm', 'M':
factor = 1024 * 1024
case 'g', 'G':
factor = 1024 * 1024 * 1024
}
if factor > 1 {
v = v[:len(v)-1]
}
res, err := strconv.Atoi(v)
if err != nil {
t.Fatalf("could not parse value '%s' for '%s'", arg.Vals[0], argName)
}
return res * factor, true
}