From f1ceb5d18993433aae3bc724ccae286a717b88e0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 11 May 2025 02:27:04 -0300 Subject: [PATCH] mdbx --- node/app/wire.go | 4 +- node/app/wire_gen.go | 34 +- node/consensus/data/frame_importer.go | 2 +- node/main.go | 2 +- node/store/mdbx.go | 475 ++++++++++++++++++++++++++ 5 files changed, 496 insertions(+), 21 deletions(-) create mode 100644 node/store/mdbx.go diff --git a/node/app/wire.go b/node/app/wire.go index c7a8d4f..e6c5957 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -52,8 +52,8 @@ var keyManagerSet = wire.NewSet( var storeSet = wire.NewSet( wire.FieldsOf(new(*config.Config), "DB"), - store.NewPebbleDB, - wire.Bind(new(store.KVDB), new(*store.PebbleDB)), + store.NewMDBXDB, + wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index d357b7f..0d9f545 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -37,24 +37,24 @@ func NewDHTNode(configConfig *config.Config) (*DHTNode, error) { func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := debugLogger() dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) - pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger) - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) - pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger) + mdbxdb := store.NewMDBXDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := store.NewPebbleHypergraphStore(pebbleDB, zapLogger) + pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := store.NewPebbleKeyStore(pebbleDB, zapLogger) + pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) - node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, pebbleDB) + node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) if err != nil { return nil, err } @@ -64,24 +64,24 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := logger() dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) - pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger) - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) - pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger) + mdbxdb := store.NewMDBXDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := store.NewPebbleHypergraphStore(pebbleDB, zapLogger) + pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := store.NewPebbleKeyStore(pebbleDB, zapLogger) + pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) - node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, pebbleDB) + node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) if err != nil { return nil, err } @@ -98,9 +98,9 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) { func NewClockStore(configConfig *config.Config) (store.ClockStore, error) { dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) + mdbxdb := store.NewMDBXDB(dbConfig) zapLogger := logger() - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) return pebbleClockStore, nil } @@ -134,7 +134,7 @@ var debugLoggerSet = wire.NewSet( var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager))) -var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, wire.Bind(new(store.KVDB), new(*store.PebbleDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore))) +var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewMDBXDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore))) var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager))) diff --git a/node/consensus/data/frame_importer.go b/node/consensus/data/frame_importer.go index 7f7b1bb..a3a7435 100644 --- a/node/consensus/data/frame_importer.go +++ b/node/consensus/data/frame_importer.go @@ -213,7 +213,7 @@ func (e *DataClockConsensusEngine) applySnapshot( return nil } - temporaryStore := store.NewPebbleDB(&config.DBConfig{ + temporaryStore := store.NewMDBXDB(&config.DBConfig{ Path: snapshotDBPath, }) temporaryClockStore := store.NewPebbleClockStore(temporaryStore, e.logger) diff --git a/node/main.go b/node/main.go index 56512f5..0bc5a87 100644 --- a/node/main.go +++ b/node/main.go @@ -329,7 +329,7 @@ func main() { } if *compactDB && *core == 0 { - db := store.NewPebbleDB(nodeConfig.DB) + db := store.NewMDBXDB(nodeConfig.DB) if err := db.CompactAll(); err != nil { panic(err) } diff --git a/node/store/mdbx.go b/node/store/mdbx.go new file mode 100644 index 0000000..179b1a1 --- /dev/null +++ b/node/store/mdbx.go @@ -0,0 +1,475 @@ +package store + +import ( + "bytes" + "io" + "os" + + "github.com/cockroachdb/pebble" + "github.com/erigontech/mdbx-go/mdbx" + "source.quilibrium.com/quilibrium/monorepo/node/config" +) + +type MDBXDB struct { + env *mdbx.Env + dbi mdbx.DBI +} + +const GB = 1 << 30 +const MB = 1 << 20 + +// erigon defaults, should be tuned to quil +const READERS_LIMIT = 32_000 +const RP_AUGMENT_LIMIT = 1_000_000 +const MAP_SIZE = 16 * GB +const GROWTH_STEP = 2 * MB +const PAGE_SIZE = 4096 + +const DEFAULT_TABLE = "default" // we use only one for now + +func NewMDBXDB(config *config.DBConfig) *MDBXDB { + env, err := mdbx.NewEnv(mdbx.Default) + if err != nil { + panic(err) + } + + // Configs + if err = env.SetOption(mdbx.OptMaxDB, 200); err != nil { + panic(err) + } + if err = env.SetOption(mdbx.OptMaxReaders, READERS_LIMIT); err != nil { + panic(err) + } + if err = env.SetOption(mdbx.OptRpAugmentLimit, RP_AUGMENT_LIMIT); err != nil { + panic(err) + } + os.MkdirAll(config.Path, os.ModePerm) + if err = env.SetGeometry(-1, -1, int(MAP_SIZE), int(GROWTH_STEP), -1, int(PAGE_SIZE)); err != nil { + panic(err) + } + + // Open the environment + flags := uint(mdbx.NoReadahead) | mdbx.Durable + if err := env.Open(config.Path, flags, 0664); err != nil { + panic(err) + } + + // Open the database (we use a single one called default, but this should change later) + var dbi mdbx.DBI + env.Update(func(txn *mdbx.Txn) error { + dbi, err = txn.OpenDBI(DEFAULT_TABLE, mdbx.Create, nil, nil) + if err != nil { + return err + } + return nil + }) + + return &MDBXDB{env: env, dbi: dbi} +} + +// KVDB interface implementation +func (m *MDBXDB) Get(key []byte) ([]byte, io.Closer, error) { + var result []byte + var err error + err = m.env.View(func(txn *mdbx.Txn) error { + val, err := txn.Get(m.dbi, key) + if err != nil { + if mdbx.IsNotFound(err) { + return pebble.ErrNotFound + } + return err + } + result = make([]byte, len(val)) + copy(result, val) + return err + }) + + // no closer, the transaction is already closed + return result, noopClose{}, err +} + +func (m *MDBXDB) Set(key, value []byte) error { + return m.env.Update(func(txn *mdbx.Txn) error { + return txn.Put(m.dbi, key, value, mdbx.Upsert) + }) +} + +func (m *MDBXDB) Delete(key []byte) error { + return m.env.Update(func(txn *mdbx.Txn) error { + return txn.Del(m.dbi, key, nil) + }) +} + +func (m *MDBXDB) NewBatch(indexed bool) Transaction { + // MDBX doesn't have a direct equivalent to Pebble's indexed batch + // We'll use a regular transaction for both cases + txn, err := m.env.BeginTxn(nil, 0) + if err != nil { + panic(err) + } + + return &MDBXTransaction{ + txn: txn, + dbi: m.dbi, + } +} + +func (m *MDBXDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { + return m.NewIterCustom(lowerBound, upperBound) +} + +func (m *MDBXDB) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) { + txn, err := m.env.BeginTxn(nil, mdbx.Readonly) + if err != nil { + return nil, err + } + + cursor, err := txn.OpenCursor(m.dbi) + if err != nil { + txn.Abort() + return nil, err + } + + return &MDBXIterator{ + txn: txn, + cursor: cursor, + lowerBound: lowerBound, + upperBound: upperBound, + valid: false, + }, nil +} + +func (m *MDBXDB) Compact(start, end []byte, parallelize bool) error { + // MDBX handles compaction differently than Pebble + // We can use env.Copy2fd to create a compacted copy, but for now + // we'll just return nil as MDBX handles this internally + return nil +} + +func (m *MDBXDB) CompactAll() error { + // MDBX handles compaction differently than Pebble + // For now, we'll just return nil as MDBX handles this internally + return nil +} + +func (m *MDBXDB) Close() error { + m.env.Close() + return nil +} + +func (m *MDBXDB) DeleteRange(start, end []byte) error { + iter, err := m.NewIterCustom(start, end) + defer iter.Close() + if err != nil { + return err + } + for iter.First(); iter.Valid(); iter.Next() { + err = iter.cursor.Del(mdbx.Current) + if err != nil { + return err + } + } + return nil +} + +// Ensure MDBXDB implements KVDB interface +var _ KVDB = (*MDBXDB)(nil) + +// Transaction implementation +type MDBXTransaction struct { + txn *mdbx.Txn + dbi mdbx.DBI +} + +func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) { + val, err := t.txn.Get(t.dbi, key) + if err != nil { + if mdbx.IsNotFound(err) { + return nil, nil, pebble.ErrNotFound + } + return nil, nil, err + } + + // Copy the value since it's only valid during the transaction + result := make([]byte, len(val)) + copy(result, val) + + return result, io.NopCloser(nil), nil +} + +func (t *MDBXTransaction) Set(key []byte, value []byte) error { + return t.txn.Put(t.dbi, key, value, 0) +} + +func (t *MDBXTransaction) Commit() error { + // we drop latency here, but it should be saved as metric + _, err := t.txn.Commit() + return err +} + +func (t *MDBXTransaction) Delete(key []byte) error { + return t.txn.Del(t.dbi, key, nil) +} + +func (t *MDBXTransaction) Abort() error { + t.txn.Abort() + return nil +} + +func (t *MDBXTransaction) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { + return t.NewIterCustom(lowerBound, upperBound) +} + +func (t *MDBXTransaction) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) { + cursor, err := t.txn.OpenCursor(t.dbi) + if err != nil { + return nil, err + } + + var key, value []byte + if lowerBound == nil { + key, value, err = cursor.Get(nil, nil, mdbx.First) + } else { + key, value, err = cursor.Get(lowerBound, nil, mdbx.SetRange) + } + if err != nil { + return nil, err + } + + return &MDBXIterator{ + txn: t.txn, + cursor: cursor, + lowerBound: lowerBound, + upperBound: upperBound, + valid: upperBound == nil || bytes.Compare(key, upperBound) < 0, + key: key, + value: value, + }, nil +} + +func (t *MDBXTransaction) DeleteRange(lowerBound []byte, upperBound []byte) error { + iter, err := t.NewIterCustom(lowerBound, upperBound) + defer iter.Close() + if err != nil { + return err + } + for iter.First(); iter.Valid(); iter.Next() { + err = iter.cursor.Del(mdbx.Current) + if err != nil { + return err + } + } + return nil +} + +// Ensure MDBXTransaction implements Transaction interface +var _ Transaction = (*MDBXTransaction)(nil) + +// Iterator implementation +type MDBXIterator struct { + txn *mdbx.Txn + cursor *mdbx.Cursor + lowerBound []byte + upperBound []byte + key []byte + value []byte + valid bool +} + +func (i *MDBXIterator) Key() []byte { + if !i.valid { + return nil + } + return i.key +} + +func (i *MDBXIterator) First() bool { + var err error + var k, v []byte + + if i.lowerBound == nil { + k, v, err = i.cursor.Get(nil, nil, mdbx.First) + } else { + k, v, err = i.cursor.Get(i.lowerBound, nil, mdbx.SetRange) + } + + if err != nil { + i.valid = false + return false + } + + // Check if key is within upper bound + if i.upperBound != nil && bytes.Compare(k, i.upperBound) >= 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +func (i *MDBXIterator) Next() bool { + if !i.valid { + return false + } + + k, v, err := i.cursor.Get(nil, nil, mdbx.Next) + if err != nil { + i.valid = false + return false + } + + // Check if key is within upper bound + if i.upperBound != nil && bytes.Compare(k, i.upperBound) >= 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + return true +} + +func (i *MDBXIterator) Prev() bool { + if !i.valid { + return false + } + + k, v, err := i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + return true +} + +func (i *MDBXIterator) Valid() bool { + return i.valid +} + +func (i *MDBXIterator) Value() []byte { + if !i.valid { + return nil + } + return i.value +} + +func (i *MDBXIterator) Close() error { + i.valid = false + i.cursor.Close() + i.txn.Abort() + return nil +} + +func (i *MDBXIterator) SeekLT(target []byte) bool { + if target == nil { + return i.Last() + } + + // First try to find the exact key + k, v, err := i.cursor.Get(target, nil, mdbx.SetRange) + if err != nil { + // If not found, try to find the last key + return i.Last() + } + + // If we found the exact key, move to the previous one + if bytes.Compare(k, target) == 0 { + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } else { + // If we found a key greater than target, move to the previous one + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +func (i *MDBXIterator) Last() bool { + var err error + var k, v []byte + + if i.upperBound == nil { + k, v, err = i.cursor.Get(nil, nil, mdbx.Last) + } else { + // Position at or before upper bound + k, v, err = i.cursor.Get(i.upperBound, nil, mdbx.SetRange) + if err != nil { + // If upper bound is beyond last key, get the last key + k, v, err = i.cursor.Get(nil, nil, mdbx.Last) + if err != nil { + i.valid = false + return false + } + } else if bytes.Compare(k, i.upperBound) >= 0 { + // If we found the upper bound or beyond, move to previous + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } + } + + if err != nil { + i.valid = false + return false + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +// Ensure MDBXIterator implements Iterator interface +var _ Iterator = (*MDBXIterator)(nil) + +// Helper for closing transactions +type txnCloser struct { + txn *mdbx.Txn +} + +func (c txnCloser) Close() error { + c.txn.Abort() + return nil +} + +type noopClose struct{} + +func (n noopClose) Close() error { + return nil +}