package store import ( "context" "encoding/binary" "encoding/hex" "fmt" "io" "net" "os" "strings" pebblev1 "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/v2" "github.com/cockroachdb/pebble/v2/vfs" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" "source.quilibrium.com/quilibrium/monorepo/bls48581" "source.quilibrium.com/quilibrium/monorepo/config" hgcrdt "source.quilibrium.com/quilibrium/monorepo/hypergraph" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/store" "source.quilibrium.com/quilibrium/monorepo/types/tries" up2p "source.quilibrium.com/quilibrium/monorepo/utils/p2p" ) type PebbleDB struct { db *pebble.DB config *config.Config } func (p *PebbleDB) DB() *pebble.DB { return p.db } // pebbleMigrations contains ordered migration steps. New migrations append to // the end. var pebbleMigrations = []func(*pebble.Batch, *pebble.DB, *config.Config) error{ migration_2_1_0_4, migration_2_1_0_5, migration_2_1_0_8, migration_2_1_0_81, migration_2_1_0_10, migration_2_1_0_10, migration_2_1_0_11, migration_2_1_0_14, migration_2_1_0_141, migration_2_1_0_142, migration_2_1_0_143, migration_2_1_0_144, migration_2_1_0_145, migration_2_1_0_146, migration_2_1_0_147, migration_2_1_0_148, migration_2_1_0_149, migration_2_1_0_1410, migration_2_1_0_1411, migration_2_1_0_15, migration_2_1_0_151, migration_2_1_0_152, migration_2_1_0_153, migration_2_1_0_154, migration_2_1_0_155, migration_2_1_0_156, migration_2_1_0_157, migration_2_1_0_158, migration_2_1_0_159, migration_2_1_0_17, migration_2_1_0_171, migration_2_1_0_172, migration_2_1_0_172, migration_2_1_0_173, migration_2_1_0_18, migration_2_1_0_181, migration_2_1_0_182, migration_2_1_0_183, migration_2_1_0_184, migration_2_1_0_185, migration_2_1_0_186, migration_2_1_0_187, migration_2_1_0_188, migration_2_1_0_189, migration_2_1_0_1810, migration_2_1_0_1811, migration_2_1_0_1812, migration_2_1_0_1813, migration_2_1_0_1814, migration_2_1_0_1815, migration_2_1_0_1816, migration_2_1_0_1817, migration_2_1_0_1818, migration_2_1_0_1819, migration_2_1_0_1820, migration_2_1_0_1821, migration_2_1_0_1822, } func NewPebbleDB( logger *zap.Logger, cfg *config.Config, coreId uint, ) *PebbleDB { opts := &pebble.Options{ MemTableSize: 64 << 20, MaxOpenFiles: 1000, L0CompactionThreshold: 8, L0StopWritesThreshold: 32, LBaseMaxBytes: 64 << 20, FormatMajorVersion: pebble.FormatNewest, } if cfg.DB.InMemoryDONOTUSE { opts.FS = vfs.NewMem() } path := cfg.DB.Path if coreId > 0 && len(cfg.DB.WorkerPaths) > int(coreId-1) { path = cfg.DB.WorkerPaths[coreId-1] } else if coreId > 0 { path = fmt.Sprintf(cfg.DB.WorkerPathPrefix, coreId) } storeType := "store" if coreId > 0 { storeType = "worker store" } if _, err := os.Stat(path); os.IsNotExist(err) && !cfg.DB.InMemoryDONOTUSE { logger.Warn( fmt.Sprintf("%s not found, creating", storeType), zap.String("path", path), zap.Uint("core_id", coreId), ) if err := os.MkdirAll(path, 0755); err != nil { logger.Error( fmt.Sprintf("%s could not be created, terminating", storeType), zap.Error(err), zap.String("path", path), zap.Uint("core_id", coreId), ) os.Exit(1) } } else { logger.Info( fmt.Sprintf("%s found", storeType), zap.String("path", path), zap.Uint("core_id", coreId), ) } db, err := pebble.Open(path, opts) if err != nil && shouldAttemptLegacyOpen(err, cfg.DB.InMemoryDONOTUSE) { logger.Warn( fmt.Sprintf( "failed to open %s with pebble v2, trying legacy open", storeType, ), zap.Error(err), zap.String("path", path), zap.Uint("core_id", coreId), ) if compatErr := ensurePebbleLegacyCompatibility( path, storeType, coreId, logger, ); compatErr == nil { logger.Info( fmt.Sprintf( "legacy pebble open succeeded, retrying %s with pebble v2", storeType, ), zap.String("path", path), zap.Uint("core_id", coreId), ) db, err = pebble.Open(path, opts) } else { logger.Error( fmt.Sprintf("legacy pebble open failed for %s", storeType), zap.Error(compatErr), zap.String("path", path), zap.Uint("core_id", coreId), ) } } if err != nil { logger.Error( fmt.Sprintf("failed to open %s", storeType), zap.Error(err), zap.String("path", path), zap.Uint("core_id", coreId), ) os.Exit(1) } pebbleDB := &PebbleDB{db, cfg} if err := pebbleDB.migrate(logger); err != nil { logger.Error( fmt.Sprintf("failed to migrate %s", storeType), zap.Error(err), zap.String("path", path), zap.Uint("core_id", coreId), ) pebbleDB.Close() os.Exit(1) } return pebbleDB } // shouldAttemptLegacyOpen determines whether the error from pebble.Open is due // to an outdated on-disk format. Only those cases benefit from temporarily // opening with the legacy Pebble version. func shouldAttemptLegacyOpen(err error, inMemory bool) bool { if err == nil || inMemory { return false } msg := err.Error() return strings.Contains(msg, "format major version") && strings.Contains(msg, "no longer supported") } // ensurePebbleLegacyCompatibility attempts to open the database with the // previous Pebble v1.1.5 release. Older stores that have not yet been opened // by Pebble v2 will be updated during this open/close cycle, allowing the // subsequent Pebble v2 open to succeed without manual intervention. func ensurePebbleLegacyCompatibility( path string, storeType string, coreId uint, logger *zap.Logger, ) error { legacyOpts := &pebblev1.Options{ MemTableSize: 64 << 20, MaxOpenFiles: 1000, L0CompactionThreshold: 8, L0StopWritesThreshold: 32, LBaseMaxBytes: 64 << 20, FormatMajorVersion: pebblev1.FormatNewest, } legacyDB, err := pebblev1.Open(path, legacyOpts) if err != nil { return err } if err := legacyDB.Close(); err != nil { return err } logger.Info( fmt.Sprintf("legacy pebble open and close completed for %s", storeType), zap.String("path", path), zap.Uint("core_id", coreId), ) return nil } func (p *PebbleDB) migrate(logger *zap.Logger) error { if p.config.DB.InMemoryDONOTUSE { return nil } currentVersion := uint64(len(pebbleMigrations)) var storedVersion uint64 var foundVersion bool value, closer, err := p.db.Get([]byte{MIGRATION}) switch { case err == pebble.ErrNotFound: // missing version implies zero case err != nil: return errors.Wrap(err, "load migration version") default: foundVersion = true if len(value) != 8 { if closer != nil { _ = closer.Close() } return errors.Errorf( "invalid migration version length: %d", len(value), ) } storedVersion = binary.BigEndian.Uint64(value) if closer != nil { if err := closer.Close(); err != nil { logger.Warn("failed to close migration version reader", zap.Error(err)) } } } if storedVersion > currentVersion { return errors.Errorf( "store migration version %d ahead of binary %d – running a migrated db "+ "with an earlier version can cause irreparable corruption, shutting down", storedVersion, currentVersion, ) } needsUpdate := !foundVersion || storedVersion < currentVersion if !needsUpdate { logger.Info("no pebble store migrations required") return nil } batch := p.db.NewIndexedBatch() for i := int(storedVersion); i < len(pebbleMigrations); i++ { logger.Warn( "performing pebble store migration", zap.Int("from_version", int(storedVersion)), zap.Int("to_version", int(storedVersion+1)), ) if err := pebbleMigrations[i](batch, p.db, p.config); err != nil { batch.Close() logger.Error("migration failed", zap.Error(err)) return errors.Wrapf(err, "apply migration %d", i+1) } logger.Info( "migration step completed", zap.Int("from_version", int(storedVersion)), zap.Int("to_version", int(storedVersion+1)), ) } var versionBuf [8]byte binary.BigEndian.PutUint64(versionBuf[:], currentVersion) if err := batch.Set([]byte{MIGRATION}, versionBuf[:], nil); err != nil { batch.Close() return errors.Wrap(err, "set migration version") } if err := batch.Commit(&pebble.WriteOptions{Sync: true}); err != nil { batch.Close() return errors.Wrap(err, "commit migration batch") } if currentVersion != storedVersion { logger.Info( "applied pebble store migrations", zap.Uint64("from_version", storedVersion), zap.Uint64("to_version", currentVersion), ) } else { logger.Info( "initialized pebble store migration version", zap.Uint64("version", currentVersion), ) } return nil } func (p *PebbleDB) Get(key []byte) ([]byte, io.Closer, error) { return p.db.Get(key) } func (p *PebbleDB) Set(key, value []byte) error { return p.db.Set(key, value, &pebble.WriteOptions{Sync: true}) } func (p *PebbleDB) Delete(key []byte) error { return p.db.Delete(key, &pebble.WriteOptions{Sync: true}) } func (p *PebbleDB) NewBatch(indexed bool) store.Transaction { if indexed { return &PebbleTransaction{ b: p.db.NewIndexedBatch(), } } else { return &PebbleTransaction{ b: p.db.NewBatch(), } } } func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) ( store.Iterator, error, ) { return p.db.NewIter(&pebble.IterOptions{ LowerBound: lowerBound, UpperBound: upperBound, }) } func (p *PebbleDB) Compact(start, end []byte, parallelize bool) error { return p.db.Compact(context.TODO(), start, end, parallelize) // return p.db.Compact(start, end, parallelize) } func (p *PebbleDB) Close() error { return p.db.Close() } func (p *PebbleDB) DeleteRange(start, end []byte) error { return p.db.DeleteRange(start, end, &pebble.WriteOptions{Sync: true}) } func (p *PebbleDB) CompactAll() error { iter, err := p.db.NewIter(nil) if err != nil { return errors.Wrap(err, "compact all") } var first, last []byte if iter.First() { first = append(first, iter.Key()...) } if iter.Last() { last = append(last, iter.Key()...) } if err := iter.Close(); err != nil { return errors.Wrap(err, "compact all") } if err := p.Compact(first, last, false); err != nil { return errors.Wrap(err, "compact all") } return nil } var _ store.KVDB = (*PebbleDB)(nil) type PebbleTransaction struct { b *pebble.Batch } func (t *PebbleTransaction) Get(key []byte) ([]byte, io.Closer, error) { return t.b.Get(key) } func (t *PebbleTransaction) Set(key []byte, value []byte) error { return t.b.Set(key, value, &pebble.WriteOptions{Sync: true}) } func (t *PebbleTransaction) Commit() error { return t.b.Commit(&pebble.WriteOptions{Sync: true}) } func (t *PebbleTransaction) Delete(key []byte) error { return t.b.Delete(key, &pebble.WriteOptions{Sync: true}) } func (t *PebbleTransaction) Abort() error { return t.b.Close() } func (t *PebbleTransaction) NewIter(lowerBound []byte, upperBound []byte) ( store.Iterator, error, ) { return t.b.NewIter(&pebble.IterOptions{ LowerBound: lowerBound, UpperBound: upperBound, }) } func (t *PebbleTransaction) DeleteRange( lowerBound []byte, upperBound []byte, ) error { return t.b.DeleteRange( lowerBound, upperBound, &pebble.WriteOptions{Sync: true}, ) } var _ store.Transaction = (*PebbleTransaction)(nil) func rightAlign(data []byte, size int) []byte { l := len(data) if l == size { return data } if l > size { return data[l-size:] } pad := make([]byte, size) copy(pad[size-l:], data) return pad } // Resolves all the variations of store issues from any series of upgrade steps // in 2.1.0.1->2.1.0.3 func migration_2_1_0_4(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { // batches don't use this but for backcompat the parameter is required wo := &pebble.WriteOptions{} frame_start, _ := hex.DecodeString("0000000000000003b9e8") frame_end, _ := hex.DecodeString("0000000000000003b9ec") err := b.DeleteRange(frame_start, frame_end, wo) if err != nil { return errors.Wrap(err, "frame removal") } frame_first_index, _ := hex.DecodeString("0010") frame_last_index, _ := hex.DecodeString("0020") err = b.Delete(frame_first_index, wo) if err != nil { return errors.Wrap(err, "frame first index removal") } err = b.Delete(frame_last_index, wo) if err != nil { return errors.Wrap(err, "frame last index removal") } shard_commits_hex := []string{ "090000000000000000e0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "090000000000000000e1ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "090000000000000000e2ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "090000000000000000e3ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", } for _, shard_commit_hex := range shard_commits_hex { shard_commit, _ := hex.DecodeString(shard_commit_hex) err = b.Delete(shard_commit, wo) if err != nil { return errors.Wrap(err, "shard commit removal") } } vertex_adds_tree_start, _ := hex.DecodeString("0902000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") vertex_adds_tree_end, _ := hex.DecodeString("0902000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") err = b.DeleteRange(vertex_adds_tree_start, vertex_adds_tree_end, wo) if err != nil { return errors.Wrap(err, "vertex adds tree removal") } hyperedge_adds_tree_start, _ := hex.DecodeString("0903000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") hyperedge_adds_tree_end, _ := hex.DecodeString("0903000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") err = b.DeleteRange(hyperedge_adds_tree_start, hyperedge_adds_tree_end, wo) if err != nil { return errors.Wrap(err, "hyperedge adds tree removal") } vertex_adds_by_path_start, _ := hex.DecodeString("0922000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") vertex_adds_by_path_end, _ := hex.DecodeString("0922000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") err = b.DeleteRange(vertex_adds_by_path_start, vertex_adds_by_path_end, wo) if err != nil { return errors.Wrap(err, "vertex adds by path removal") } hyperedge_adds_by_path_start, _ := hex.DecodeString("0923000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") hyperedge_adds_by_path_end, _ := hex.DecodeString("0923000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") err = b.DeleteRange(hyperedge_adds_by_path_start, hyperedge_adds_by_path_end, wo) if err != nil { return errors.Wrap(err, "hyperedge adds by path removal") } vertex_adds_change_record_start, _ := hex.DecodeString("0942000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") vertex_adds_change_record_end, _ := hex.DecodeString("0942000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") hyperedge_adds_change_record_start, _ := hex.DecodeString("0943000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") hyperedge_adds_change_record_end, _ := hex.DecodeString("0943000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") err = b.DeleteRange(vertex_adds_change_record_start, vertex_adds_change_record_end, wo) if err != nil { return errors.Wrap(err, "vertex adds change record removal") } err = b.DeleteRange(hyperedge_adds_change_record_start, hyperedge_adds_change_record_end, wo) if err != nil { return errors.Wrap(err, "hyperedge adds change record removal") } vertex_data_start, _ := hex.DecodeString("09f0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") vertex_data_end, _ := hex.DecodeString("09f0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") err = b.DeleteRange(vertex_data_start, vertex_data_end, wo) if err != nil { return errors.Wrap(err, "vertex data removal") } vertex_add_root, _ := hex.DecodeString("09fc000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") hyperedge_add_root, _ := hex.DecodeString("09fe000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") err = b.Delete(vertex_add_root, wo) if err != nil { return errors.Wrap(err, "vertex add root removal") } err = b.Delete(hyperedge_add_root, wo) if err != nil { return errors.Wrap(err, "hyperedge add root removal") } return nil } func migration_2_1_0_5(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { // We just re-run it again return migration_2_1_0_4(b, db, cfg) } func migration_2_1_0_8(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { // these migration entries exist solely to advance migration number so all // nodes are consistent return nil } func migration_2_1_0_81(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { // these migration entries exist solely to advance migration number so all // nodes are consistent return nil } func migration_2_1_0_10(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { // these migration entries exist solely to advance migration number so all // nodes are consistent return nil } func migration_2_1_0_11(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_14(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_141(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_14(b, db, cfg) } func migration_2_1_0_142(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_14(b, db, cfg) } func migration_2_1_0_143(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_14(b, db, cfg) } func migration_2_1_0_144(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_14(b, db, cfg) } func migration_2_1_0_145(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_14(b, db, cfg) } func migration_2_1_0_146(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_14(b, db, cfg) } func migration_2_1_0_147(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_14(b, db, cfg) } func migration_2_1_0_148(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_14(b, db, cfg) } func migration_2_1_0_149(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1410(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_149(b, db, cfg) } func migration_2_1_0_1411(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_149(b, db, cfg) } func migration_2_1_0_15(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_151(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_152(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_153(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_154(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_155(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_156(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_157(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_158(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_159(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return migration_2_1_0_15(b, db, cfg) } func migration_2_1_0_17(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_171(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_172(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_173(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_18(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_181(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_182(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_183(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_184(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_185(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_186(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_187(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_188(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_189(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1810(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1811(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1812(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1813(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1814(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1815(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1816(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1817(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1818(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } // doMigration1818 performs the actual migration work for migration_2_1_0_1818. // It uses the sync protocol to repair corrupted tree data by syncing to an // in-memory instance and back. func doMigration1818(db *pebble.DB, cfg *config.Config) error { logger := zap.L() // Global prover shard key: L1={0,0,0}, L2=0xff*32 globalShardKey := tries.ShardKey{ L1: [3]byte{}, L2: [32]byte{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, }, } prover := bls48581.NewKZGInclusionProver(logger) // Create hypergraph from actual DB actualDBWrapper := &PebbleDB{db: db} actualStore := NewPebbleHypergraphStore(cfg.DB, actualDBWrapper, logger, nil, prover) actualHG, err := actualStore.LoadHypergraph(nil, 0) if err != nil { return errors.Wrap(err, "load actual hypergraph") } actualHGCRDT := actualHG.(*hgcrdt.HypergraphCRDT) // Create in-memory pebble DB directly (bypassing NewPebbleDB to avoid cycle) memOpts := &pebble.Options{ MemTableSize: 64 << 20, FormatMajorVersion: pebble.FormatNewest, FS: vfs.NewMem(), } memDB, err := pebble.Open("", memOpts) if err != nil { return errors.Wrap(err, "open in-memory pebble") } defer memDB.Close() memDBWrapper := &PebbleDB{db: memDB} memStore := NewPebbleHypergraphStore(cfg.DB, memDBWrapper, logger, nil, prover) memHG, err := memStore.LoadHypergraph(nil, 0) if err != nil { return errors.Wrap(err, "load in-memory hypergraph") } memHGCRDT := memHG.(*hgcrdt.HypergraphCRDT) // Phase 1: Sync from actual DB to in-memory // Get the current root from actual DB actualRoot := actualHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, false) if actualRoot == nil { logger.Info("migration 1818: no data in global prover shard, skipping") return nil } // Publish snapshot on actual hypergraph actualHGCRDT.PublishSnapshot(actualRoot) // Set up gRPC server backed by actual hypergraph const bufSize = 1 << 20 actualLis := bufconn.Listen(bufSize) actualGRPCServer := grpc.NewServer( grpc.MaxRecvMsgSize(100*1024*1024), grpc.MaxSendMsgSize(100*1024*1024), ) protobufs.RegisterHypergraphComparisonServiceServer(actualGRPCServer, actualHGCRDT) go func() { _ = actualGRPCServer.Serve(actualLis) }() defer actualGRPCServer.Stop() // Create client connection to actual hypergraph server actualDialer := func(context.Context, string) (net.Conn, error) { return actualLis.Dial() } actualConn, err := grpc.DialContext( context.Background(), "bufnet", grpc.WithContextDialer(actualDialer), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(100*1024*1024), grpc.MaxCallSendMsgSize(100*1024*1024), ), ) if err != nil { return errors.Wrap(err, "dial actual hypergraph") } defer actualConn.Close() actualClient := protobufs.NewHypergraphComparisonServiceClient(actualConn) // Sync from actual to in-memory for all phases phases := []protobufs.HypergraphPhaseSet{ protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_ADDS, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_VERTEX_REMOVES, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_ADDS, protobufs.HypergraphPhaseSet_HYPERGRAPH_PHASE_SET_HYPEREDGE_REMOVES, } for _, phase := range phases { stream, err := actualClient.PerformSync(context.Background()) if err != nil { return errors.Wrapf(err, "create sync stream for phase %v", phase) } _, err = memHGCRDT.SyncFrom(stream, globalShardKey, phase, nil) if err != nil { logger.Warn("sync from actual to memory failed", zap.Error(err), zap.Any("phase", phase)) } _ = stream.CloseSend() } // Commit in-memory to get root memRoot := memHGCRDT.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, false) logger.Info("migration 1818: synced to in-memory", zap.String("actual_root", hex.EncodeToString(actualRoot)), zap.String("mem_root", hex.EncodeToString(memRoot)), ) // Stop the actual server before wiping data actualGRPCServer.Stop() actualConn.Close() // Phase 2: Wipe tree data for global prover shard from actual DB treePrefixes := []byte{ VERTEX_ADDS_TREE_NODE, VERTEX_REMOVES_TREE_NODE, HYPEREDGE_ADDS_TREE_NODE, HYPEREDGE_REMOVES_TREE_NODE, VERTEX_ADDS_TREE_NODE_BY_PATH, VERTEX_REMOVES_TREE_NODE_BY_PATH, HYPEREDGE_ADDS_TREE_NODE_BY_PATH, HYPEREDGE_REMOVES_TREE_NODE_BY_PATH, VERTEX_ADDS_CHANGE_RECORD, VERTEX_REMOVES_CHANGE_RECORD, HYPEREDGE_ADDS_CHANGE_RECORD, HYPEREDGE_REMOVES_CHANGE_RECORD, VERTEX_ADDS_TREE_ROOT, VERTEX_REMOVES_TREE_ROOT, HYPEREDGE_ADDS_TREE_ROOT, HYPEREDGE_REMOVES_TREE_ROOT, } for _, prefix := range treePrefixes { start, end := shardRangeBounds(prefix, globalShardKey) if err := db.DeleteRange(start, end, &pebble.WriteOptions{Sync: true}); err != nil { return errors.Wrapf(err, "delete range for prefix 0x%02x", prefix) } } logger.Info("migration 1818: wiped tree data from actual DB") // Reload actual hypergraph after wipe actualStore2 := NewPebbleHypergraphStore(cfg.DB, actualDBWrapper, logger, nil, prover) actualHG2, err := actualStore2.LoadHypergraph(nil, 0) if err != nil { return errors.Wrap(err, "reload actual hypergraph after wipe") } actualHGCRDT2 := actualHG2.(*hgcrdt.HypergraphCRDT) // Phase 3: Sync from in-memory back to actual DB // Publish snapshot on in-memory hypergraph memHGCRDT.PublishSnapshot(memRoot) // Set up gRPC server backed by in-memory hypergraph memLis := bufconn.Listen(bufSize) memGRPCServer := grpc.NewServer( grpc.MaxRecvMsgSize(100*1024*1024), grpc.MaxSendMsgSize(100*1024*1024), ) protobufs.RegisterHypergraphComparisonServiceServer(memGRPCServer, memHGCRDT) go func() { _ = memGRPCServer.Serve(memLis) }() defer memGRPCServer.Stop() // Create client connection to in-memory hypergraph server memDialer := func(context.Context, string) (net.Conn, error) { return memLis.Dial() } memConn, err := grpc.DialContext( context.Background(), "bufnet", grpc.WithContextDialer(memDialer), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(100*1024*1024), grpc.MaxCallSendMsgSize(100*1024*1024), ), ) if err != nil { return errors.Wrap(err, "dial in-memory hypergraph") } defer memConn.Close() memClient := protobufs.NewHypergraphComparisonServiceClient(memConn) // Sync from in-memory to actual for all phases for _, phase := range phases { stream, err := memClient.PerformSync(context.Background()) if err != nil { return errors.Wrapf(err, "create sync stream for phase %v (reverse)", phase) } _, err = actualHGCRDT2.SyncFrom(stream, globalShardKey, phase, nil) if err != nil { logger.Warn("sync from memory to actual failed", zap.Error(err), zap.Any("phase", phase)) } _ = stream.CloseSend() } // Final commit finalRoot := actualHGCRDT2.GetVertexAddsSet(globalShardKey).GetTree().Commit(nil, true) logger.Info("migration 1818: completed", zap.String("final_root", hex.EncodeToString(finalRoot)), ) return nil } func migration_2_1_0_1819(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1820(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } func migration_2_1_0_1821(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return nil } // doMigration1821 performs the actual work for migration_2_1_0_1821. func doMigration1821(db *pebble.DB, cfg *config.Config) error { logger := zap.L() // Global intrinsic address: 32 bytes of 0xff globalIntrinsicAddress := [32]byte{ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, } prover := bls48581.NewKZGInclusionProver(logger) // Create hypergraph from actual DB dbWrapper := &PebbleDB{db: db} hgStore := NewPebbleHypergraphStore(cfg.DB, dbWrapper, logger, nil, prover) hg, err := hgStore.LoadHypergraph(nil, 0) if err != nil { return errors.Wrap(err, "load hypergraph") } hgCRDT := hg.(*hgcrdt.HypergraphCRDT) // Get shard key for the global intrinsic domain // L1 is computed from bloom filter indices of the domain globalShardKey := tries.ShardKey{ L1: [3]byte(up2p.GetBloomFilterIndices(globalIntrinsicAddress[:], 256, 3)), L2: globalIntrinsicAddress, } // Create a transaction for the deletions txn, err := hgStore.NewTransaction(false) if err != nil { return errors.Wrap(err, "create transaction") } // Get the vertex data iterator for the global intrinsic domain iter := hgCRDT.GetVertexDataIterator(globalIntrinsicAddress) defer iter.Close() deletedCount := 0 totalCount := 0 for valid := iter.First(); valid; valid = iter.Next() { totalCount++ tree := iter.Value() if tree == nil { continue } // Check if this is an empty tree (spent merge marker) // Spent markers have Root == nil or GetSize() == 0 if tree.Root == nil || tree.GetSize().Sign() == 0 { // This is a spent marker - delete it // The Key() returns the full 64-byte vertex ID (domain + address) key := iter.Key() if len(key) < 64 { continue } var vertexID [64]byte copy(vertexID[:], key[:64]) if err := hgCRDT.DeleteVertexAdd(txn, globalShardKey, vertexID); err != nil { logger.Warn("failed to delete spent marker", zap.String("vertex_id", hex.EncodeToString(vertexID[:])), zap.Error(err), ) continue } deletedCount++ // Log progress every 1000 deletions if deletedCount%1000 == 0 { logger.Info("migration 1821: progress", zap.Int("deleted", deletedCount), zap.Int("examined", totalCount), ) } } } // Commit the transaction if err := txn.Commit(); err != nil { return errors.Wrap(err, "commit transaction") } logger.Info("migration 1821: completed", zap.Int("deleted_spent_markers", deletedCount), zap.Int("total_examined", totalCount), ) return nil } // migration_2_1_0_1822 rebuilds the global prover shard tree to fix potential // corruption from transaction bypass bugs in SaveRoot and Commit. func migration_2_1_0_1822(b *pebble.Batch, db *pebble.DB, cfg *config.Config) error { return doMigration1818(db, cfg) } // pebbleBatchDB wraps a *pebble.Batch to implement store.KVDB for use in migrations type pebbleBatchDB struct { b *pebble.Batch } func (p *pebbleBatchDB) Get(key []byte) ([]byte, io.Closer, error) { return p.b.Get(key) } func (p *pebbleBatchDB) Set(key, value []byte) error { return p.b.Set(key, value, &pebble.WriteOptions{}) } func (p *pebbleBatchDB) Delete(key []byte) error { return p.b.Delete(key, &pebble.WriteOptions{}) } func (p *pebbleBatchDB) NewBatch(indexed bool) store.Transaction { // Migrations don't need nested transactions; return a wrapper around the same // batch return &pebbleBatchTransaction{b: p.b} } func (p *pebbleBatchDB) NewIter(lowerBound []byte, upperBound []byte) ( store.Iterator, error, ) { return p.b.NewIter(&pebble.IterOptions{ LowerBound: lowerBound, UpperBound: upperBound, }) } func (p *pebbleBatchDB) Compact(start, end []byte, parallelize bool) error { return nil // No-op for batch } func (p *pebbleBatchDB) Close() error { return nil // Don't close the batch here } func (p *pebbleBatchDB) DeleteRange(start, end []byte) error { return p.b.DeleteRange(start, end, &pebble.WriteOptions{}) } func (p *pebbleBatchDB) CompactAll() error { return nil // No-op for batch } var _ store.KVDB = (*pebbleBatchDB)(nil) // pebbleBatchTransaction wraps a *pebble.Batch to implement store.Transaction type pebbleBatchTransaction struct { b *pebble.Batch } func (t *pebbleBatchTransaction) Get(key []byte) ([]byte, io.Closer, error) { return t.b.Get(key) } func (t *pebbleBatchTransaction) Set(key []byte, value []byte) error { return t.b.Set(key, value, &pebble.WriteOptions{}) } func (t *pebbleBatchTransaction) Commit() error { return nil // Don't commit; the migration batch handles this } func (t *pebbleBatchTransaction) Delete(key []byte) error { return t.b.Delete(key, &pebble.WriteOptions{}) } func (t *pebbleBatchTransaction) Abort() error { return nil // Can't abort part of a batch } func (t *pebbleBatchTransaction) NewIter(lowerBound []byte, upperBound []byte) ( store.Iterator, error, ) { return t.b.NewIter(&pebble.IterOptions{ LowerBound: lowerBound, UpperBound: upperBound, }) } func (t *pebbleBatchTransaction) DeleteRange( lowerBound []byte, upperBound []byte, ) error { return t.b.DeleteRange(lowerBound, upperBound, &pebble.WriteOptions{}) } var _ store.Transaction = (*pebbleBatchTransaction)(nil) type pebbleSnapshotDB struct { snap *pebble.Snapshot } func (p *pebbleSnapshotDB) Get(key []byte) ([]byte, io.Closer, error) { return p.snap.Get(key) } func (p *pebbleSnapshotDB) Set(key, value []byte) error { return errors.New("pebble snapshot is read-only") } func (p *pebbleSnapshotDB) Delete(key []byte) error { return errors.New("pebble snapshot is read-only") } func (p *pebbleSnapshotDB) NewBatch(indexed bool) store.Transaction { return &snapshotTransaction{} } func (p *pebbleSnapshotDB) NewIter(lowerBound []byte, upperBound []byte) ( store.Iterator, error, ) { return p.snap.NewIter(&pebble.IterOptions{ LowerBound: lowerBound, UpperBound: upperBound, }) } func (p *pebbleSnapshotDB) Compact(start, end []byte, parallelize bool) error { return errors.New("pebble snapshot is read-only") } func (p *pebbleSnapshotDB) Close() error { return p.snap.Close() } func (p *pebbleSnapshotDB) DeleteRange(start, end []byte) error { return errors.New("pebble snapshot is read-only") } func (p *pebbleSnapshotDB) CompactAll() error { return errors.New("pebble snapshot is read-only") } var _ store.KVDB = (*pebbleSnapshotDB)(nil) type snapshotTransaction struct{} func (s *snapshotTransaction) Get(key []byte) ([]byte, io.Closer, error) { return nil, nil, errors.New("pebble snapshot transaction is read-only") } func (s *snapshotTransaction) Set(key []byte, value []byte) error { return errors.New("pebble snapshot transaction is read-only") } func (s *snapshotTransaction) Commit() error { return errors.New("pebble snapshot transaction is read-only") } func (s *snapshotTransaction) Delete(key []byte) error { return errors.New("pebble snapshot transaction is read-only") } func (s *snapshotTransaction) Abort() error { return nil } func (s *snapshotTransaction) NewIter( lowerBound []byte, upperBound []byte, ) (store.Iterator, error) { return nil, errors.New("pebble snapshot transaction is read-only") } func (s *snapshotTransaction) DeleteRange( lowerBound []byte, upperBound []byte, ) error { return errors.New("pebble snapshot transaction is read-only") } var _ store.Transaction = (*snapshotTransaction)(nil)