mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 18:37:26 +08:00
389 lines
11 KiB
Go
389 lines
11 KiB
Go
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
|
|
// of this source code is governed by a BSD-style license that can be found in
|
|
// the LICENSE file.
|
|
|
|
package remoteobjcat
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/cockroachdb/pebble/internal/base"
|
|
"github.com/cockroachdb/pebble/objstorage"
|
|
"github.com/cockroachdb/pebble/objstorage/remote"
|
|
"github.com/cockroachdb/pebble/record"
|
|
"github.com/cockroachdb/pebble/shims/cmp"
|
|
"github.com/cockroachdb/pebble/shims/slices"
|
|
"github.com/cockroachdb/pebble/vfs"
|
|
"github.com/cockroachdb/pebble/vfs/atomicfs"
|
|
)
|
|
|
|
// Catalog is used to manage the on-disk remote object catalog.
|
|
//
|
|
// The catalog file is a log of records, where each record is an encoded
|
|
// VersionEdit.
|
|
type Catalog struct {
|
|
fs vfs.FS
|
|
dirname string
|
|
mu struct {
|
|
sync.Mutex
|
|
|
|
creatorID objstorage.CreatorID
|
|
objects map[base.DiskFileNum]RemoteObjectMetadata
|
|
|
|
marker *atomicfs.Marker
|
|
|
|
catalogFile vfs.File
|
|
catalogRecWriter *record.Writer
|
|
|
|
rotationHelper record.RotationHelper
|
|
|
|
// catalogFilename is the filename of catalogFile when catalogFile != nil, otherwise
|
|
// it is the filename of the last catalog file.
|
|
catalogFilename string
|
|
}
|
|
}
|
|
|
|
// RemoteObjectMetadata encapsulates the data stored in the catalog file for each object.
|
|
type RemoteObjectMetadata struct {
|
|
// FileNum is the identifier for the object within the context of a single DB
|
|
// instance.
|
|
FileNum base.DiskFileNum
|
|
// FileType is the type of the object. Only certain FileTypes are possible.
|
|
FileType base.FileType
|
|
// CreatorID identifies the DB instance that originally created the object.
|
|
CreatorID objstorage.CreatorID
|
|
// CreatorFileNum is the identifier for the object within the context of the
|
|
// DB instance that originally created the object.
|
|
CreatorFileNum base.DiskFileNum
|
|
// CleanupMethod indicates the method for cleaning up unused shared objects.
|
|
CleanupMethod objstorage.SharedCleanupMethod
|
|
// Locator identifies a remote.Storage implementation.
|
|
Locator remote.Locator
|
|
// CustomObjectName (if it is set) overrides the object name that is normally
|
|
// derived from the CreatorID and CreatorFileNum.
|
|
CustomObjectName string
|
|
}
|
|
|
|
const (
|
|
catalogFilenameBase = "REMOTE-OBJ-CATALOG"
|
|
catalogMarkerName = "remote-obj-catalog"
|
|
|
|
// We create a new file when the size exceeds 1MB (and some other conditions
|
|
// hold; see record.RotationHelper).
|
|
rotateFileSize = 1024 * 1024 // 1MB
|
|
)
|
|
|
|
// CatalogContents contains the remote objects in the catalog.
|
|
type CatalogContents struct {
|
|
// CreatorID, if it is set.
|
|
CreatorID objstorage.CreatorID
|
|
Objects []RemoteObjectMetadata
|
|
}
|
|
|
|
// Open creates a Catalog and loads any existing catalog file, returning the
|
|
// creator ID (if it is set) and the contents.
|
|
func Open(fs vfs.FS, dirname string) (*Catalog, CatalogContents, error) {
|
|
c := &Catalog{
|
|
fs: fs,
|
|
dirname: dirname,
|
|
}
|
|
c.mu.objects = make(map[base.DiskFileNum]RemoteObjectMetadata)
|
|
|
|
var err error
|
|
c.mu.marker, c.mu.catalogFilename, err = atomicfs.LocateMarker(fs, dirname, catalogMarkerName)
|
|
if err != nil {
|
|
return nil, CatalogContents{}, err
|
|
}
|
|
// If the filename is empty, there is no existing catalog.
|
|
if c.mu.catalogFilename != "" {
|
|
if err := c.loadFromCatalogFile(c.mu.catalogFilename); err != nil {
|
|
return nil, CatalogContents{}, err
|
|
}
|
|
if err := c.mu.marker.RemoveObsolete(); err != nil {
|
|
return nil, CatalogContents{}, err
|
|
}
|
|
// TODO(radu): remove obsolete catalog files.
|
|
}
|
|
res := CatalogContents{
|
|
CreatorID: c.mu.creatorID,
|
|
Objects: make([]RemoteObjectMetadata, 0, len(c.mu.objects)),
|
|
}
|
|
for _, meta := range c.mu.objects {
|
|
res.Objects = append(res.Objects, meta)
|
|
}
|
|
// Sort the objects so the function is deterministic.
|
|
slices.SortFunc(res.Objects, func(a, b RemoteObjectMetadata) int {
|
|
return cmp.Compare(a.FileNum, b.FileNum)
|
|
})
|
|
return c, res, nil
|
|
}
|
|
|
|
// SetCreatorID sets the creator ID. If it is already set, it must match.
|
|
func (c *Catalog) SetCreatorID(id objstorage.CreatorID) error {
|
|
if !id.IsSet() {
|
|
return errors.AssertionFailedf("attempt to unset CreatorID")
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.mu.creatorID.IsSet() {
|
|
if c.mu.creatorID != id {
|
|
return errors.AssertionFailedf("attempt to change CreatorID from %s to %s", c.mu.creatorID, id)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
ve := VersionEdit{CreatorID: id}
|
|
if err := c.writeToCatalogFileLocked(&ve); err != nil {
|
|
return errors.Wrapf(err, "pebble: could not write to remote object catalog")
|
|
}
|
|
c.mu.creatorID = id
|
|
return nil
|
|
}
|
|
|
|
// Close any open files.
|
|
func (c *Catalog) Close() error {
|
|
return c.closeCatalogFile()
|
|
}
|
|
|
|
func (c *Catalog) closeCatalogFile() error {
|
|
if c.mu.catalogFile == nil {
|
|
return nil
|
|
}
|
|
err1 := c.mu.catalogRecWriter.Close()
|
|
err2 := c.mu.catalogFile.Close()
|
|
c.mu.catalogRecWriter = nil
|
|
c.mu.catalogFile = nil
|
|
if err1 != nil {
|
|
return err1
|
|
}
|
|
return err2
|
|
}
|
|
|
|
// Batch is used to perform multiple object additions/deletions at once.
|
|
type Batch struct {
|
|
ve VersionEdit
|
|
}
|
|
|
|
// AddObject adds a new object to the batch.
|
|
//
|
|
// The given FileNum must be new - it must not match that of any object that was
|
|
// ever in the catalog.
|
|
func (b *Batch) AddObject(meta RemoteObjectMetadata) {
|
|
b.ve.NewObjects = append(b.ve.NewObjects, meta)
|
|
}
|
|
|
|
// DeleteObject adds an object removal to the batch.
|
|
func (b *Batch) DeleteObject(fileNum base.DiskFileNum) {
|
|
b.ve.DeletedObjects = append(b.ve.DeletedObjects, fileNum)
|
|
}
|
|
|
|
// Reset clears the batch.
|
|
func (b *Batch) Reset() {
|
|
b.ve.NewObjects = b.ve.NewObjects[:0]
|
|
b.ve.DeletedObjects = b.ve.DeletedObjects[:0]
|
|
}
|
|
|
|
// IsEmpty returns true if the batch is empty.
|
|
func (b *Batch) IsEmpty() bool {
|
|
return len(b.ve.NewObjects) == 0 && len(b.ve.DeletedObjects) == 0
|
|
}
|
|
|
|
// Copy returns a copy of the Batch.
|
|
func (b *Batch) Copy() Batch {
|
|
var res Batch
|
|
if len(b.ve.NewObjects) > 0 {
|
|
res.ve.NewObjects = make([]RemoteObjectMetadata, len(b.ve.NewObjects))
|
|
copy(res.ve.NewObjects, b.ve.NewObjects)
|
|
}
|
|
if len(b.ve.DeletedObjects) > 0 {
|
|
res.ve.DeletedObjects = make([]base.DiskFileNum, len(b.ve.DeletedObjects))
|
|
copy(res.ve.DeletedObjects, b.ve.DeletedObjects)
|
|
}
|
|
return res
|
|
}
|
|
|
|
// Append merges two batches.
|
|
func (b *Batch) Append(other Batch) {
|
|
b.ve.NewObjects = append(b.ve.NewObjects, other.ve.NewObjects...)
|
|
b.ve.DeletedObjects = append(b.ve.DeletedObjects, other.ve.DeletedObjects...)
|
|
}
|
|
|
|
// ApplyBatch applies a batch of updates; returns after the change is stably
|
|
// recorded on storage.
|
|
func (c *Catalog) ApplyBatch(b Batch) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// Sanity checks.
|
|
toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
|
|
exists := func(n base.DiskFileNum) bool {
|
|
_, ok := c.mu.objects[n]
|
|
if !ok {
|
|
_, ok = toAdd[n]
|
|
}
|
|
return ok
|
|
}
|
|
for _, meta := range b.ve.NewObjects {
|
|
if exists(meta.FileNum) {
|
|
return errors.AssertionFailedf("adding existing object %s", meta.FileNum)
|
|
}
|
|
toAdd[meta.FileNum] = struct{}{}
|
|
}
|
|
for _, n := range b.ve.DeletedObjects {
|
|
if !exists(n) {
|
|
return errors.AssertionFailedf("deleting non-existent object %s", n)
|
|
}
|
|
}
|
|
|
|
if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
|
|
return errors.Wrapf(err, "pebble: could not write to remote object catalog")
|
|
}
|
|
|
|
// Add new objects before deleting any objects. This allows for cases where
|
|
// the same batch adds and deletes an object.
|
|
for _, meta := range b.ve.NewObjects {
|
|
c.mu.objects[meta.FileNum] = meta
|
|
}
|
|
for _, n := range b.ve.DeletedObjects {
|
|
delete(c.mu.objects, n)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Catalog) loadFromCatalogFile(filename string) error {
|
|
catalogPath := c.fs.PathJoin(c.dirname, filename)
|
|
f, err := c.fs.Open(catalogPath)
|
|
if err != nil {
|
|
return errors.Wrapf(
|
|
err, "pebble: could not open remote object catalog file %q for DB %q",
|
|
errors.Safe(filename), c.dirname,
|
|
)
|
|
}
|
|
defer f.Close()
|
|
rr := record.NewReader(f, 0 /* logNum */)
|
|
for {
|
|
r, err := rr.Next()
|
|
if err == io.EOF || record.IsInvalidRecord(err) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
|
|
errors.Safe(filename))
|
|
}
|
|
var ve VersionEdit
|
|
if err := ve.Decode(r); err != nil {
|
|
return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
|
|
errors.Safe(filename))
|
|
}
|
|
// Apply the version edit to the current state.
|
|
if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
|
|
return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
|
|
errors.Safe(filename))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeToCatalogFileLocked writes a VersionEdit to the catalog file.
|
|
// Creates a new file if this is the first write.
|
|
func (c *Catalog) writeToCatalogFileLocked(ve *VersionEdit) error {
|
|
c.mu.rotationHelper.AddRecord(int64(len(ve.NewObjects) + len(ve.DeletedObjects)))
|
|
snapshotSize := int64(len(c.mu.objects))
|
|
|
|
var shouldRotate bool
|
|
if c.mu.catalogFile == nil {
|
|
shouldRotate = true
|
|
} else if c.mu.catalogRecWriter.Size() >= rotateFileSize {
|
|
shouldRotate = c.mu.rotationHelper.ShouldRotate(snapshotSize)
|
|
}
|
|
|
|
if shouldRotate {
|
|
if c.mu.catalogFile != nil {
|
|
if err := c.closeCatalogFile(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := c.createNewCatalogFileLocked(); err != nil {
|
|
return err
|
|
}
|
|
c.mu.rotationHelper.Rotate(snapshotSize)
|
|
}
|
|
return writeRecord(ve, c.mu.catalogFile, c.mu.catalogRecWriter)
|
|
}
|
|
|
|
func makeCatalogFilename(iter uint64) string {
|
|
return fmt.Sprintf("%s-%06d", catalogFilenameBase, iter)
|
|
}
|
|
|
|
// createNewCatalogFileLocked creates a new catalog file, populates it with the
|
|
// current catalog and sets c.mu.catalogFile and c.mu.catalogRecWriter.
|
|
func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
|
|
if c.mu.catalogFile != nil {
|
|
return errors.AssertionFailedf("catalogFile already open")
|
|
}
|
|
filename := makeCatalogFilename(c.mu.marker.NextIter())
|
|
filepath := c.fs.PathJoin(c.dirname, filename)
|
|
file, err := c.fs.Create(filepath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
recWriter := record.NewWriter(file)
|
|
err = func() error {
|
|
// Create a VersionEdit that gets us from an empty catalog to the current state.
|
|
var ve VersionEdit
|
|
ve.CreatorID = c.mu.creatorID
|
|
ve.NewObjects = make([]RemoteObjectMetadata, 0, len(c.mu.objects))
|
|
for _, meta := range c.mu.objects {
|
|
ve.NewObjects = append(ve.NewObjects, meta)
|
|
}
|
|
if err := writeRecord(&ve, file, recWriter); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Move the marker to the new filename. Move handles syncing the data
|
|
// directory as well.
|
|
if err := c.mu.marker.Move(filename); err != nil {
|
|
return errors.Wrap(err, "moving marker")
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
|
|
if err != nil {
|
|
_ = recWriter.Close()
|
|
_ = file.Close()
|
|
_ = c.fs.Remove(filepath)
|
|
return err
|
|
}
|
|
|
|
// Remove any previous file (ignoring any error).
|
|
if c.mu.catalogFilename != "" {
|
|
_ = c.fs.Remove(c.fs.PathJoin(c.dirname, c.mu.catalogFilename))
|
|
}
|
|
|
|
c.mu.catalogFile = file
|
|
c.mu.catalogRecWriter = recWriter
|
|
c.mu.catalogFilename = filename
|
|
return nil
|
|
}
|
|
|
|
func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
|
|
w, err := recWriter.Next()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := ve.Encode(w); err != nil {
|
|
return err
|
|
}
|
|
if err := recWriter.Flush(); err != nil {
|
|
return err
|
|
}
|
|
return file.Sync()
|
|
}
|