ceremonyclient/node/store/pebble.go
Cassandra Heart 7eeb91a9a2
v2.1.0.19
2026-02-14 22:20:02 -06:00

1323 lines
37 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package store
import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net"
"os"
"strings"
pebblev1 "github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/v2"
"github.com/cockroachdb/pebble/v2/vfs"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
"source.quilibrium.com/quilibrium/monorepo/bls48581"
"source.quilibrium.com/quilibrium/monorepo/config"
hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph"
"source.quilibrium.com/quilibrium/monorepo/protobufs"
"source.quilibrium.com/quilibrium/monorepo/types/store"
"source.quilibrium.com/quilibrium/monorepo/types/tries"
up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p"
)
type PebbleDB struct {
db *pebble.DB
config *config.Config
}
func (p *PebbleDB) DB() *pebble.DB {
return p.db
}
// pebbleMigrations contains ordered migration steps. New migrations append to
// the end.
var pebbleMigrations = []func(*pebble.Batch, *pebble.DB, *config.Config) error{
migration_2_1_0_4,
migration_2_1_0_5,
migration_2_1_0_8,
migration_2_1_0_81,
migration_2_1_0_10,
migration_2_1_0_10,
migration_2_1_0_11,
migration_2_1_0_14,
migration_2_1_0_141,
migration_2_1_0_142,
migration_2_1_0_143,
migration_2_1_0_144,
migration_2_1_0_145,
migration_2_1_0_146,
migration_2_1_0_147,
migration_2_1_0_148,
migration_2_1_0_149,
migration_2_1_0_1410,
migration_2_1_0_1411,
migration_2_1_0_15,
migration_2_1_0_151,
migration_2_1_0_152,
migration_2_1_0_153,
migration_2_1_0_154,
migration_2_1_0_155,
migration_2_1_0_156,
migration_2_1_0_157,
migration_2_1_0_158,
migration_2_1_0_159,
migration_2_1_0_17,
migration_2_1_0_171,
migration_2_1_0_172,
migration_2_1_0_172,
migration_2_1_0_173,
migration_2_1_0_18,
migration_2_1_0_181,
migration_2_1_0_182,
migration_2_1_0_183,
migration_2_1_0_184,
migration_2_1_0_185,
migration_2_1_0_186,
migration_2_1_0_187,
migration_2_1_0_188,
migration_2_1_0_189,
migration_2_1_0_1810,
migration_2_1_0_1811,
migration_2_1_0_1812,
migration_2_1_0_1813,
migration_2_1_0_1814,
migration_2_1_0_1815,
migration_2_1_0_1816,
migration_2_1_0_1817,
migration_2_1_0_1818,
migration_2_1_0_1819,
migration_2_1_0_1820,
migration_2_1_0_1821,
migration_2_1_0_1822,
migration_2_1_0_1823,
}
func NewPebbleDB(
logger *zap.Logger,
cfg *config.Config,
coreId uint,
) *PebbleDB {
opts := &pebble.Options{
MemTableSize: 64 << 20,
MaxOpenFiles: 1000,
L0CompactionThreshold: 8,
L0StopWritesThreshold: 32,
LBaseMaxBytes: 64 << 20,
FormatMajorVersion: pebble.FormatNewest,
}
if cfg.DB.InMemoryDONOTUSE {
opts.FS = vfs.NewMem()
}
path := cfg.DB.Path
if coreId > 0 && len(cfg.DB.WorkerPaths) > int(coreId-1) {
path = cfg.DB.WorkerPaths[coreId-1]
} else if coreId > 0 {
path = fmt.Sprintf(cfg.DB.WorkerPathPrefix, coreId)
}
storeType := "store"
if coreId > 0 {
storeType = "worker store"
}
if _, err := os.Stat(path); os.IsNotExist(err) && !cfg.DB.InMemoryDONOTUSE {
logger.Warn(
fmt.Sprintf("%s not found, creating", storeType),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
if err := os.MkdirAll(path, 0755); err != nil {
logger.Error(
fmt.Sprintf("%s could not be created, terminating", storeType),
zap.Error(err),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
os.Exit(1)
}
} else {
logger.Info(
fmt.Sprintf("%s found", storeType),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
}
db, err := pebble.Open(path, opts)
if err != nil && shouldAttemptLegacyOpen(err, cfg.DB.InMemoryDONOTUSE) {
logger.Warn(
fmt.Sprintf(
"failed to open %s with pebble v2, trying legacy open",
storeType,
),
zap.Error(err),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
if compatErr := ensurePebbleLegacyCompatibility(
path,
storeType,
coreId,
logger,
); compatErr == nil {
logger.Info(
fmt.Sprintf(
"legacy pebble open succeeded, retrying %s with pebble v2",
storeType,
),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
db, err = pebble.Open(path, opts)
} else {
logger.Error(
fmt.Sprintf("legacy pebble open failed for %s", storeType),
zap.Error(compatErr),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
}
}
if err != nil {
logger.Error(
fmt.Sprintf("failed to open %s", storeType),
zap.Error(err),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
os.Exit(1)
}
pebbleDB := &PebbleDB{db, cfg}
if err := pebbleDB.migrate(logger); err != nil {
logger.Error(
fmt.Sprintf("failed to migrate %s", storeType),
zap.Error(err),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
pebbleDB.Close()
os.Exit(1)
}
return pebbleDB
}
// shouldAttemptLegacyOpen determines whether the error from pebble.Open is due
// to an outdated on-disk format. Only those cases benefit from temporarily
// opening with the legacy Pebble version.
func shouldAttemptLegacyOpen(err error, inMemory bool) bool {
if err == nil || inMemory {
return false
}
msg := err.Error()
return strings.Contains(msg, "format major version") &&
strings.Contains(msg, "no longer supported")
}
// ensurePebbleLegacyCompatibility attempts to open the database with the
// previous Pebble v1.1.5 release. Older stores that have not yet been opened
// by Pebble v2 will be updated during this open/close cycle, allowing the
// subsequent Pebble v2 open to succeed without manual intervention.
func ensurePebbleLegacyCompatibility(
path string,
storeType string,
coreId uint,
logger *zap.Logger,
) error {
legacyOpts := &pebblev1.Options{
MemTableSize: 64 << 20,
MaxOpenFiles: 1000,
L0CompactionThreshold: 8,
L0StopWritesThreshold: 32,
LBaseMaxBytes: 64 << 20,
FormatMajorVersion: pebblev1.FormatNewest,
}
legacyDB, err := pebblev1.Open(path, legacyOpts)
if err != nil {
return err
}
if err := legacyDB.Close(); err != nil {
return err
}
logger.Info(
fmt.Sprintf("legacy pebble open and close completed for %s", storeType),
zap.String("path", path),
zap.Uint("core_id", coreId),
)
return nil
}
func (p *PebbleDB) migrate(logger *zap.Logger) error {
if p.config.DB.InMemoryDONOTUSE {
return nil
}
currentVersion := uint64(len(pebbleMigrations))
var storedVersion uint64
var foundVersion bool
value, closer, err := p.db.Get([]byte{MIGRATION})
switch {
case err == pebble.ErrNotFound:
// missing version implies zero
case err != nil:
return errors.Wrap(err, "load migration version")
default:
foundVersion = true
if len(value) != 8 {
if closer != nil {
_ = closer.Close()
}
return errors.Errorf(
"invalid migration version length: %d",
len(value),
)
}
storedVersion = binary.BigEndian.Uint64(value)
if closer != nil {
if err := closer.Close(); err != nil {
logger.Warn("failed to close migration version reader", zap.Error(err))
}
}
}
if storedVersion > currentVersion {
return errors.Errorf(
"store migration version %d ahead of binary %d running a migrated db "+
"with an earlier version can cause irreparable corruption, shutting down",
storedVersion,
currentVersion,
)
}
needsUpdate := !foundVersion || storedVersion < currentVersion
if !needsUpdate {
logger.Info("no pebble store migrations required")
return nil
}
batch := p.db.NewIndexedBatch()
for i := int(storedVersion); i < len(pebbleMigrations); i++ {
logger.Warn(
"performing pebble store migration",
zap.Int("from_version", int(storedVersion)),
zap.Int("to_version", int(storedVersion+1)),
)
if err := pebbleMigrations[i](batch, p.db, p.config); err != nil {
batch.Close()
logger.Error("migration failed", zap.Error(err))
return errors.Wrapf(err, "apply migration %d", i+1)
}
logger.Info(
"migration step completed",
zap.Int("from_version", int(storedVersion)),
zap.Int("to_version", int(storedVersion+1)),
)
}
var versionBuf [8]byte
binary.BigEndian.PutUint64(versionBuf[:], currentVersion)
if err := batch.Set([]byte{MIGRATION}, versionBuf[:], nil); err != nil {
batch.Close()
return errors.Wrap(err, "set migration version")
}
if err := batch.Commit(&pebble.WriteOptions{Sync: true}); err != nil {
batch.Close()
return errors.Wrap(err, "commit migration batch")
}
if currentVersion != storedVersion {
logger.Info(
"applied pebble store migrations",
zap.Uint64("from_version", storedVersion),
zap.Uint64("to_version", currentVersion),
)
} else {
logger.Info(
"initialized pebble store migration version",
zap.Uint64("version", currentVersion),
)
}
return nil
}
func (p *PebbleDB) Get(key []byte) ([]byte, io.Closer, error) {
return p.db.Get(key)
}
func (p *PebbleDB) Set(key, value []byte) error {
return p.db.Set(key, value, &pebble.WriteOptions{Sync: true})
}
func (p *PebbleDB) Delete(key []byte) error {
return p.db.Delete(key, &pebble.WriteOptions{Sync: true})
}
func (p *PebbleDB) NewBatch(indexed bool) store.Transaction {
if indexed {
return &PebbleTransaction{
b: p.db.NewIndexedBatch(),
}
} else {
return &PebbleTransaction{
b: p.db.NewBatch(),
}
}
}
func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) (
store.Iterator,
error,
) {
return p.db.NewIter(&pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
func (p *PebbleDB) Compact(start, end []byte, parallelize bool) error {
return p.db.Compact(context.TODO(), start, end, parallelize)
// return p.db.Compact(start, end, parallelize)
}
func (p *PebbleDB) Close() error {
return p.db.Close()
}
func (p *PebbleDB) DeleteRange(start, end []byte) error {
return p.db.DeleteRange(start, end, &pebble.WriteOptions{Sync: true})
}
func (p *PebbleDB) CompactAll() error {
iter, err := p.db.NewIter(nil)
if err != nil {
return errors.Wrap(err, "compact all")
}
var first, last []byte
if iter.First() {
first = append(first, iter.Key()...)
}
if iter.Last() {
last = append(last, iter.Key()...)
}
if err := iter.Close(); err != nil {
return errors.Wrap(err, "compact all")
}
if err := p.Compact(first, last, false); err != nil {
return errors.Wrap(err, "compact all")
}
return nil
}
var _ store.KVDB = (*PebbleDB)(nil)
type PebbleTransaction struct {
b *pebble.Batch
}
func (t *PebbleTransaction) Get(key []byte) ([]byte, io.Closer, error) {
return t.b.Get(key)
}
func (t *PebbleTransaction) Set(key []byte, value []byte) error {
return t.b.Set(key, value, &pebble.WriteOptions{Sync: true})
}
func (t *PebbleTransaction) Commit() error {
return t.b.Commit(&pebble.WriteOptions{Sync: true})
}
func (t *PebbleTransaction) Delete(key []byte) error {
return t.b.Delete(key, &pebble.WriteOptions{Sync: true})
}
func (t *PebbleTransaction) Abort() error {
return t.b.Close()
}
func (t *PebbleTransaction) NewIter(lowerBound []byte, upperBound []byte) (
store.Iterator,
error,
) {
return t.b.NewIter(&pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
func (t *PebbleTransaction) DeleteRange(
lowerBound []byte,
upperBound []byte,
) error {
return t.b.DeleteRange(
lowerBound,
upperBound,
&pebble.WriteOptions{Sync: true},
)
}
var _ store.Transaction = (*PebbleTransaction)(nil)
func rightAlign(data []byte, size int) []byte {
l := len(data)
if l == size {
return data
}
if l > size {
return data[l-size:]
}
pad := make([]byte, size)
copy(pad[size-l:], data)
return pad
}
// Resolves all the variations of store issues from any series of upgrade steps
// in 2.1.0.1->2.1.0.3
func migration_2_1_0_4(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
// batches don't use this but for backcompat the parameter is required
wo := &pebble.WriteOptions{}
frame_start, _ := hex.DecodeString("0000000000000003b9e8")
frame_end, _ := hex.DecodeString("0000000000000003b9ec")
err := b.DeleteRange(frame_start, frame_end, wo)
if err != nil {
return errors.Wrap(err, "frame removal")
}
frame_first_index, _ := hex.DecodeString("0010")
frame_last_index, _ := hex.DecodeString("0020")
err = b.Delete(frame_first_index, wo)
if err != nil {
return errors.Wrap(err, "frame first index removal")
}
err = b.Delete(frame_last_index, wo)
if err != nil {
return errors.Wrap(err, "frame last index removal")
}
shard_commits_hex := []string{
"090000000000000000e0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
"090000000000000000e1ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
"090000000000000000e2ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
"090000000000000000e3ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
}
for _, shard_commit_hex := range shard_commits_hex {
shard_commit, _ := hex.DecodeString(shard_commit_hex)
err = b.Delete(shard_commit, wo)
if err != nil {
return errors.Wrap(err, "shard commit removal")
}
}
vertex_adds_tree_start, _ := hex.DecodeString("0902000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
vertex_adds_tree_end, _ := hex.DecodeString("0902000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
err = b.DeleteRange(vertex_adds_tree_start, vertex_adds_tree_end, wo)
if err != nil {
return errors.Wrap(err, "vertex adds tree removal")
}
hyperedge_adds_tree_start, _ := hex.DecodeString("0903000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
hyperedge_adds_tree_end, _ := hex.DecodeString("0903000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
err = b.DeleteRange(hyperedge_adds_tree_start, hyperedge_adds_tree_end, wo)
if err != nil {
return errors.Wrap(err, "hyperedge adds tree removal")
}
vertex_adds_by_path_start, _ := hex.DecodeString("0922000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
vertex_adds_by_path_end, _ := hex.DecodeString("0922000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
err = b.DeleteRange(vertex_adds_by_path_start, vertex_adds_by_path_end, wo)
if err != nil {
return errors.Wrap(err, "vertex adds by path removal")
}
hyperedge_adds_by_path_start, _ := hex.DecodeString("0923000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
hyperedge_adds_by_path_end, _ := hex.DecodeString("0923000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
err = b.DeleteRange(hyperedge_adds_by_path_start, hyperedge_adds_by_path_end, wo)
if err != nil {
return errors.Wrap(err, "hyperedge adds by path removal")
}
vertex_adds_change_record_start, _ := hex.DecodeString("0942000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
vertex_adds_change_record_end, _ := hex.DecodeString("0942000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
hyperedge_adds_change_record_start, _ := hex.DecodeString("0943000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
hyperedge_adds_change_record_end, _ := hex.DecodeString("0943000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
err = b.DeleteRange(vertex_adds_change_record_start, vertex_adds_change_record_end, wo)
if err != nil {
return errors.Wrap(err, "vertex adds change record removal")
}
err = b.DeleteRange(hyperedge_adds_change_record_start, hyperedge_adds_change_record_end, wo)
if err != nil {
return errors.Wrap(err, "hyperedge adds change record removal")
}
vertex_data_start, _ := hex.DecodeString("09f0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
vertex_data_end, _ := hex.DecodeString("09f0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
err = b.DeleteRange(vertex_data_start, vertex_data_end, wo)
if err != nil {
return errors.Wrap(err, "vertex data removal")
}
vertex_add_root, _ := hex.DecodeString("09fc000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
hyperedge_add_root, _ := hex.DecodeString("09fe000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
err = b.Delete(vertex_add_root, wo)
if err != nil {
return errors.Wrap(err, "vertex add root removal")
}
err = b.Delete(hyperedge_add_root, wo)
if err != nil {
return errors.Wrap(err, "hyperedge add root removal")
}
return nil
}
func migration_2_1_0_5(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
// We just re-run it again
return migration_2_1_0_4(b, db, cfg)
}
func migration_2_1_0_8(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
// these migration entries exist solely to advance migration number so all
// nodes are consistent
return nil
}
func migration_2_1_0_81(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
// these migration entries exist solely to advance migration number so all
// nodes are consistent
return nil
}
func migration_2_1_0_10(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
// these migration entries exist solely to advance migration number so all
// nodes are consistent
return nil
}
func migration_2_1_0_11(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_14(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_141(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_14(b, db, cfg)
}
func migration_2_1_0_142(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_14(b, db, cfg)
}
func migration_2_1_0_143(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_14(b, db, cfg)
}
func migration_2_1_0_144(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_14(b, db, cfg)
}
func migration_2_1_0_145(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_14(b, db, cfg)
}
func migration_2_1_0_146(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_14(b, db, cfg)
}
func migration_2_1_0_147(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_14(b, db, cfg)
}
func migration_2_1_0_148(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_14(b, db, cfg)
}
func migration_2_1_0_149(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1410(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_149(b, db, cfg)
}
func migration_2_1_0_1411(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_149(b, db, cfg)
}
func migration_2_1_0_15(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_151(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_152(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_153(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_154(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_155(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_156(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_157(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_158(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_159(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return migration_2_1_0_15(b, db, cfg)
}
func migration_2_1_0_17(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_171(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_172(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_173(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_18(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_181(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_182(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_183(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_184(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_185(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_186(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_187(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_188(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_189(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1810(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1811(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1812(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1813(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1814(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1815(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1816(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1817(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1818(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
// doMigration1818 performs the actual migration work for migration_2_1_0_1818.
// It uses the sync protocol to repair corrupted tree data by syncing to an
// in-memory instance and back.
func doMigration1818(db *pebble.DB, cfg *config.Config) error {
logger := zap.L()
// Global prover shard key: L1={0,0,0}, L2=0xff*32
globalShardKey := tries.ShardKey{
L1: [3]byte{},
L2: [32]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
}
prover := bls48581.NewKZGInclusionProver(logger)
// Create hypergraph from actual DB
actualDBWrapper := &PebbleDB{db: db}
actualStore := NewPebbleHypergraphStore(cfg.DB, actualDBWrapper, logger, nil, prover)
actualHG, err := actualStore.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "load actual hypergraph")
}
actualHGCRDT := actualHG.(*hgcrdt.HypergraphCRDT)
// Create in-memory pebble DB directly (bypassing NewPebbleDB to avoid cycle)
memOpts := &pebble.Options{
MemTableSize: 64 << 20,
FormatMajorVersion: pebble.FormatNewest,
FS: vfs.NewMem(),
}
memDB, err := pebble.Open("", memOpts)
if err != nil {
return errors.Wrap(err, "open in-memory pebble")
}
defer memDB.Close()
memDBWrapper := &PebbleDB{db: memDB}
memStore := NewPebbleHypergraphStore(cfg.DB, memDBWrapper, logger, nil, prover)
memHG, err := memStore.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "load in-memory hypergraph")
}
memHGCRDT := memHG.(*hgcrdt.HypergraphCRDT)
// Phase 1: Sync from actual DB to in-memory
// Get the current root from actual DB
actualRoot := actualHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, false)
if actualRoot == nil {
logger.Info("migration 1818: no data in global prover shard, skipping")
return nil
}
// Publish snapshot on actual hypergraph
actualHGCRDT.PublishSnapshot(actualRoot)
// Set up gRPC server backed by actual hypergraph
const bufSize = 1 << 20
actualLis := bufconn.Listen(bufSize)
actualGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(actualGRPCServer, actualHGCRDT)
go func() { _ = actualGRPCServer.Serve(actualLis) }()
defer actualGRPCServer.Stop()
// Create client connection to actual hypergraph server
actualDialer := func(context.Context, string) (net.Conn, error) {
return actualLis.Dial()
}
actualConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(actualDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
if err != nil {
return errors.Wrap(err, "dial actual hypergraph")
}
defer actualConn.Close()
actualClient := protobufs.NewHypergraphComparisonServiceClient(actualConn)
// Sync from actual to in-memory for all phases
phases := []protobufs.HypergraphPhaseSet{
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS,
protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES,
}
for _, phase := range phases {
stream, err := actualClient.PerformSync(context.Background())
if err != nil {
return errors.Wrapf(err, "create sync stream for phase %v", phase)
}
_, err = memHGCRDT.SyncFrom(stream, globalShardKey, phase, nil)
if err != nil {
logger.Warn("sync from actual to memory failed", zap.Error(err), zap.Any("phase", phase))
}
_ = stream.CloseSend()
}
// Commit in-memory to get root
memRoot := memHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, false)
logger.Info("migration 1818: synced to in-memory",
zap.String("actual_root", hex.EncodeToString(actualRoot)),
zap.String("mem_root", hex.EncodeToString(memRoot)),
)
// Stop the actual server before wiping data
actualGRPCServer.Stop()
actualConn.Close()
// Phase 2: Wipe tree data for global prover shard from actual DB
treePrefixes := []byte{
VERTEX_ADDS_TREE_NODE,
VERTEX_REMOVES_TREE_NODE,
HYPEREDGE_ADDS_TREE_NODE,
HYPEREDGE_REMOVES_TREE_NODE,
VERTEX_ADDS_TREE_NODE_BY_PATH,
VERTEX_REMOVES_TREE_NODE_BY_PATH,
HYPEREDGE_ADDS_TREE_NODE_BY_PATH,
HYPEREDGE_REMOVES_TREE_NODE_BY_PATH,
VERTEX_ADDS_CHANGE_RECORD,
VERTEX_REMOVES_CHANGE_RECORD,
HYPEREDGE_ADDS_CHANGE_RECORD,
HYPEREDGE_REMOVES_CHANGE_RECORD,
VERTEX_ADDS_TREE_ROOT,
VERTEX_REMOVES_TREE_ROOT,
HYPEREDGE_ADDS_TREE_ROOT,
HYPEREDGE_REMOVES_TREE_ROOT,
}
for _, prefix := range treePrefixes {
start, end := shardRangeBounds(prefix, globalShardKey)
if err := db.DeleteRange(start, end, &pebble.WriteOptions{Sync: true}); err != nil {
return errors.Wrapf(err, "delete range for prefix 0x%02x", prefix)
}
}
logger.Info("migration 1818: wiped tree data from actual DB")
// Reload actual hypergraph after wipe
actualStore2 := NewPebbleHypergraphStore(cfg.DB, actualDBWrapper, logger, nil, prover)
actualHG2, err := actualStore2.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "reload actual hypergraph after wipe")
}
actualHGCRDT2 := actualHG2.(*hgcrdt.HypergraphCRDT)
// Phase 3: Sync from in-memory back to actual DB
// Publish snapshot on in-memory hypergraph
memHGCRDT.PublishSnapshot(memRoot)
// Set up gRPC server backed by in-memory hypergraph
memLis := bufconn.Listen(bufSize)
memGRPCServer := grpc.NewServer(
grpc.MaxRecvMsgSize(100*1024*1024),
grpc.MaxSendMsgSize(100*1024*1024),
)
protobufs.RegisterHypergraphComparisonServiceServer(memGRPCServer, memHGCRDT)
go func() { _ = memGRPCServer.Serve(memLis) }()
defer memGRPCServer.Stop()
// Create client connection to in-memory hypergraph server
memDialer := func(context.Context, string) (net.Conn, error) {
return memLis.Dial()
}
memConn, err := grpc.DialContext(
context.Background(),
"bufnet",
grpc.WithContextDialer(memDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(100*1024*1024),
grpc.MaxCallSendMsgSize(100*1024*1024),
),
)
if err != nil {
return errors.Wrap(err, "dial in-memory hypergraph")
}
defer memConn.Close()
memClient := protobufs.NewHypergraphComparisonServiceClient(memConn)
// Sync from in-memory to actual for all phases
for _, phase := range phases {
stream, err := memClient.PerformSync(context.Background())
if err != nil {
return errors.Wrapf(err, "create sync stream for phase %v (reverse)", phase)
}
_, err = actualHGCRDT2.SyncFrom(stream, globalShardKey, phase, nil)
if err != nil {
logger.Warn("sync from memory to actual failed", zap.Error(err), zap.Any("phase", phase))
}
_ = stream.CloseSend()
}
// Final commit
finalRoot := actualHGCRDT2.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, true)
logger.Info("migration 1818: completed",
zap.String("final_root", hex.EncodeToString(finalRoot)),
)
return nil
}
func migration_2_1_0_1819(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1820(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
func migration_2_1_0_1821(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return nil
}
// doMigration1821 performs the actual work for migration_2_1_0_1821.
func doMigration1821(db *pebble.DB, cfg *config.Config) error {
logger := zap.L()
// Global intrinsic address: 32 bytes of 0xff
globalIntrinsicAddress := [32]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
prover := bls48581.NewKZGInclusionProver(logger)
// Create hypergraph from actual DB
dbWrapper := &PebbleDB{db: db}
hgStore := NewPebbleHypergraphStore(cfg.DB, dbWrapper, logger, nil, prover)
hg, err := hgStore.LoadHypergraph(nil, 0)
if err != nil {
return errors.Wrap(err, "load hypergraph")
}
hgCRDT := hg.(*hgcrdt.HypergraphCRDT)
// Get shard key for the global intrinsic domain
// L1 is computed from bloom filter indices of the domain
globalShardKey := tries.ShardKey{
L1: [3]byte(up2p.GetBloomFilterIndices(globalIntrinsicAddress[:], 256, 3)),
L2: globalIntrinsicAddress,
}
// Create a transaction for the deletions
txn, err := hgStore.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "create transaction")
}
// Get the vertex data iterator for the global intrinsic domain
iter := hgCRDT.GetVertexDataIterator(globalIntrinsicAddress)
defer iter.Close()
deletedCount := 0
totalCount := 0
for valid := iter.First(); valid; valid = iter.Next() {
totalCount++
tree := iter.Value()
if tree == nil {
continue
}
// Check if this is an empty tree (spent merge marker)
// Spent markers have Root == nil or GetSize() == 0
if tree.Root == nil || tree.GetSize().Sign() == 0 {
// This is a spent marker - delete it
// The Key() returns the full 64-byte vertex ID (domain + address)
key := iter.Key()
if len(key) < 64 {
continue
}
var vertexID [64]byte
copy(vertexID[:], key[:64])
if err := hgCRDT.DeleteVertexAdd(txn, globalShardKey, vertexID); err != nil {
logger.Warn("failed to delete spent marker",
zap.String("vertex_id", hex.EncodeToString(vertexID[:])),
zap.Error(err),
)
continue
}
deletedCount++
// Log progress every 1000 deletions
if deletedCount%1000 == 0 {
logger.Info("migration 1821: progress",
zap.Int("deleted", deletedCount),
zap.Int("examined", totalCount),
)
}
}
}
// Commit the transaction
if err := txn.Commit(); err != nil {
return errors.Wrap(err, "commit transaction")
}
logger.Info("migration 1821: completed",
zap.Int("deleted_spent_markers", deletedCount),
zap.Int("total_examined", totalCount),
)
return nil
}
// migration_2_1_0_1822 rebuilds the global prover shard tree to fix potential
// corruption from transaction bypass bugs in SaveRoot and Commit.
func migration_2_1_0_1822(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return doMigration1818(db, cfg)
}
// migration_2_1_0_1823 rebuilds the global prover shard tree to fix potential
// corruption from transaction bypass bugs in SaveRoot and Commit.
func migration_2_1_0_1823(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error {
return doMigration1818(db, cfg)
}
// pebbleBatchDB wraps a *pebble.Batch to implement store.KVDB for use in migrations
type pebbleBatchDB struct {
b *pebble.Batch
}
func (p *pebbleBatchDB) Get(key []byte) ([]byte, io.Closer, error) {
return p.b.Get(key)
}
func (p *pebbleBatchDB) Set(key, value []byte) error {
return p.b.Set(key, value, &pebble.WriteOptions{})
}
func (p *pebbleBatchDB) Delete(key []byte) error {
return p.b.Delete(key, &pebble.WriteOptions{})
}
func (p *pebbleBatchDB) NewBatch(indexed bool) store.Transaction {
// Migrations don't need nested transactions; return a wrapper around the same
// batch
return &pebbleBatchTransaction{b: p.b}
}
func (p *pebbleBatchDB) NewIter(lowerBound []byte, upperBound []byte) (
store.Iterator,
error,
) {
return p.b.NewIter(&pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
func (p *pebbleBatchDB) Compact(start, end []byte, parallelize bool) error {
return nil // No-op for batch
}
func (p *pebbleBatchDB) Close() error {
return nil // Don't close the batch here
}
func (p *pebbleBatchDB) DeleteRange(start, end []byte) error {
return p.b.DeleteRange(start, end, &pebble.WriteOptions{})
}
func (p *pebbleBatchDB) CompactAll() error {
return nil // No-op for batch
}
var _ store.KVDB = (*pebbleBatchDB)(nil)
// pebbleBatchTransaction wraps a *pebble.Batch to implement store.Transaction
type pebbleBatchTransaction struct {
b *pebble.Batch
}
func (t *pebbleBatchTransaction) Get(key []byte) ([]byte, io.Closer, error) {
return t.b.Get(key)
}
func (t *pebbleBatchTransaction) Set(key []byte, value []byte) error {
return t.b.Set(key, value, &pebble.WriteOptions{})
}
func (t *pebbleBatchTransaction) Commit() error {
return nil // Don't commit; the migration batch handles this
}
func (t *pebbleBatchTransaction) Delete(key []byte) error {
return t.b.Delete(key, &pebble.WriteOptions{})
}
func (t *pebbleBatchTransaction) Abort() error {
return nil // Can't abort part of a batch
}
func (t *pebbleBatchTransaction) NewIter(lowerBound []byte, upperBound []byte) (
store.Iterator,
error,
) {
return t.b.NewIter(&pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
func (t *pebbleBatchTransaction) DeleteRange(
lowerBound []byte,
upperBound []byte,
) error {
return t.b.DeleteRange(lowerBound, upperBound, &pebble.WriteOptions{})
}
var _ store.Transaction = (*pebbleBatchTransaction)(nil)
type pebbleSnapshotDB struct {
snap *pebble.Snapshot
}
func (p *pebbleSnapshotDB) Get(key []byte) ([]byte, io.Closer, error) {
return p.snap.Get(key)
}
func (p *pebbleSnapshotDB) Set(key, value []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) Delete(key []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) NewBatch(indexed bool) store.Transaction {
return &snapshotTransaction{}
}
func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) (
store.Iterator,
error,
) {
return p.snap.NewIter(&pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
})
}
func (p *pebbleSnapshotDB) Compact(start, end []byte, parallelize bool) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) Close() error {
return p.snap.Close()
}
func (p *pebbleSnapshotDB) DeleteRange(start, end []byte) error {
return errors.New("pebble snapshot is read-only")
}
func (p *pebbleSnapshotDB) CompactAll() error {
return errors.New("pebble snapshot is read-only")
}
var _ store.KVDB = (*pebbleSnapshotDB)(nil)
type snapshotTransaction struct{}
func (s *snapshotTransaction) Get(key []byte) ([]byte, io.Closer, error) {
return nil, nil, errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Set(key []byte, value []byte) error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Commit() error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Delete(key []byte) error {
return errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) Abort() error {
return nil
}
func (s *snapshotTransaction) NewIter(
lowerBound []byte,
upperBound []byte,
) (store.Iterator, error) {
return nil, errors.New("pebble snapshot transaction is read-only")
}
func (s *snapshotTransaction) DeleteRange(
lowerBound []byte,
upperBound []byte,
) error {
return errors.New("pebble snapshot transaction is read-only")
}
var _ store.Transaction = (*snapshotTransaction)(nil)