This commit is contained in:
Victor Shyba 2025-05-11 02:27:04 -03:00
parent c3ebffc519
commit f1ceb5d189
5 changed files with 496 additions and 21 deletions

View File

@ -52,8 +52,8 @@ var keyManagerSet = wire.NewSet(
var storeSet = wire.NewSet(
wire.FieldsOf(new(*config.Config), "DB"),
store.NewPebbleDB,
wire.Bind(new(store.KVDB), new(*store.PebbleDB)),
store.NewMDBXDB,
wire.Bind(new(store.KVDB), new(*store.MDBXDB)),
store.NewPebbleClockStore,
store.NewPebbleCoinStore,
store.NewPebbleKeyStore,

View File

@ -37,24 +37,24 @@ func NewDHTNode(configConfig *config.Config) (*DHTNode, error) {
func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) {
zapLogger := debugLogger()
dbConfig := configConfig.DB
pebbleDB := store.NewPebbleDB(dbConfig)
pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger)
pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger)
mdbxdb := store.NewMDBXDB(dbConfig)
pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger)
pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger)
keyConfig := configConfig.Key
fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger)
p2PConfig := configConfig.P2P
blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger)
frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger)
kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger)
pebbleHypergraphStore := store.NewPebbleHypergraphStore(pebbleDB, zapLogger)
pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger)
engineConfig := configConfig.Engine
masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver)
inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger)
pebbleKeyStore := store.NewPebbleKeyStore(pebbleDB, zapLogger)
pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger)
tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport)
masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport)
node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, pebbleDB)
node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb)
if err != nil {
return nil, err
}
@ -64,24 +64,24 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes
func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) {
zapLogger := logger()
dbConfig := configConfig.DB
pebbleDB := store.NewPebbleDB(dbConfig)
pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger)
pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger)
mdbxdb := store.NewMDBXDB(dbConfig)
pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger)
pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger)
keyConfig := configConfig.Key
fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger)
p2PConfig := configConfig.P2P
blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger)
frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger)
kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger)
pebbleHypergraphStore := store.NewPebbleHypergraphStore(pebbleDB, zapLogger)
pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger)
engineConfig := configConfig.Engine
masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver)
inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger)
pebbleKeyStore := store.NewPebbleKeyStore(pebbleDB, zapLogger)
pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger)
tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport)
masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport)
node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, pebbleDB)
node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb)
if err != nil {
return nil, err
}
@ -98,9 +98,9 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) {
func NewClockStore(configConfig *config.Config) (store.ClockStore, error) {
dbConfig := configConfig.DB
pebbleDB := store.NewPebbleDB(dbConfig)
mdbxdb := store.NewMDBXDB(dbConfig)
zapLogger := logger()
pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger)
return pebbleClockStore, nil
}
@ -134,7 +134,7 @@ var debugLoggerSet = wire.NewSet(
var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)))
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, wire.Bind(new(store.KVDB), new(*store.PebbleDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore)))
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewMDBXDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore)))
var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)))

View File

@ -213,7 +213,7 @@ func (e *DataClockConsensusEngine) applySnapshot(
return nil
}
temporaryStore := store.NewPebbleDB(&config.DBConfig{
temporaryStore := store.NewMDBXDB(&config.DBConfig{
Path: snapshotDBPath,
})
temporaryClockStore := store.NewPebbleClockStore(temporaryStore, e.logger)

View File

@ -329,7 +329,7 @@ func main() {
}
if *compactDB && *core == 0 {
db := store.NewPebbleDB(nodeConfig.DB)
db := store.NewMDBXDB(nodeConfig.DB)
if err := db.CompactAll(); err != nil {
panic(err)
}

475
node/store/mdbx.go Normal file
View File

@ -0,0 +1,475 @@
package store
import (
"bytes"
"io"
"os"
"github.com/cockroachdb/pebble"
"github.com/erigontech/mdbx-go/mdbx"
"source.quilibrium.com/quilibrium/monorepo/node/config"
)
type MDBXDB struct {
env *mdbx.Env
dbi mdbx.DBI
}
const GB = 1 << 30
const MB = 1 << 20
// erigon defaults, should be tuned to quil
const READERS_LIMIT = 32_000
const RP_AUGMENT_LIMIT = 1_000_000
const MAP_SIZE = 16 * GB
const GROWTH_STEP = 2 * MB
const PAGE_SIZE = 4096
const DEFAULT_TABLE = "default" // we use only one for now
func NewMDBXDB(config *config.DBConfig) *MDBXDB {
env, err := mdbx.NewEnv(mdbx.Default)
if err != nil {
panic(err)
}
// Configs
if err = env.SetOption(mdbx.OptMaxDB, 200); err != nil {
panic(err)
}
if err = env.SetOption(mdbx.OptMaxReaders, READERS_LIMIT); err != nil {
panic(err)
}
if err = env.SetOption(mdbx.OptRpAugmentLimit, RP_AUGMENT_LIMIT); err != nil {
panic(err)
}
os.MkdirAll(config.Path, os.ModePerm)
if err = env.SetGeometry(-1, -1, int(MAP_SIZE), int(GROWTH_STEP), -1, int(PAGE_SIZE)); err != nil {
panic(err)
}
// Open the environment
flags := uint(mdbx.NoReadahead) | mdbx.Durable
if err := env.Open(config.Path, flags, 0664); err != nil {
panic(err)
}
// Open the database (we use a single one called default, but this should change later)
var dbi mdbx.DBI
env.Update(func(txn *mdbx.Txn) error {
dbi, err = txn.OpenDBI(DEFAULT_TABLE, mdbx.Create, nil, nil)
if err != nil {
return err
}
return nil
})
return &MDBXDB{env: env, dbi: dbi}
}
// KVDB interface implementation
func (m *MDBXDB) Get(key []byte) ([]byte, io.Closer, error) {
var result []byte
var err error
err = m.env.View(func(txn *mdbx.Txn) error {
val, err := txn.Get(m.dbi, key)
if err != nil {
if mdbx.IsNotFound(err) {
return pebble.ErrNotFound
}
return err
}
result = make([]byte, len(val))
copy(result, val)
return err
})
// no closer, the transaction is already closed
return result, noopClose{}, err
}
func (m *MDBXDB) Set(key, value []byte) error {
return m.env.Update(func(txn *mdbx.Txn) error {
return txn.Put(m.dbi, key, value, mdbx.Upsert)
})
}
func (m *MDBXDB) Delete(key []byte) error {
return m.env.Update(func(txn *mdbx.Txn) error {
return txn.Del(m.dbi, key, nil)
})
}
func (m *MDBXDB) NewBatch(indexed bool) Transaction {
// MDBX doesn't have a direct equivalent to Pebble's indexed batch
// We'll use a regular transaction for both cases
txn, err := m.env.BeginTxn(nil, 0)
if err != nil {
panic(err)
}
return &MDBXTransaction{
txn: txn,
dbi: m.dbi,
}
}
func (m *MDBXDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) {
return m.NewIterCustom(lowerBound, upperBound)
}
func (m *MDBXDB) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) {
txn, err := m.env.BeginTxn(nil, mdbx.Readonly)
if err != nil {
return nil, err
}
cursor, err := txn.OpenCursor(m.dbi)
if err != nil {
txn.Abort()
return nil, err
}
return &MDBXIterator{
txn: txn,
cursor: cursor,
lowerBound: lowerBound,
upperBound: upperBound,
valid: false,
}, nil
}
func (m *MDBXDB) Compact(start, end []byte, parallelize bool) error {
// MDBX handles compaction differently than Pebble
// We can use env.Copy2fd to create a compacted copy, but for now
// we'll just return nil as MDBX handles this internally
return nil
}
func (m *MDBXDB) CompactAll() error {
// MDBX handles compaction differently than Pebble
// For now, we'll just return nil as MDBX handles this internally
return nil
}
func (m *MDBXDB) Close() error {
m.env.Close()
return nil
}
func (m *MDBXDB) DeleteRange(start, end []byte) error {
iter, err := m.NewIterCustom(start, end)
defer iter.Close()
if err != nil {
return err
}
for iter.First(); iter.Valid(); iter.Next() {
err = iter.cursor.Del(mdbx.Current)
if err != nil {
return err
}
}
return nil
}
// Ensure MDBXDB implements KVDB interface
var _ KVDB = (*MDBXDB)(nil)
// Transaction implementation
type MDBXTransaction struct {
txn *mdbx.Txn
dbi mdbx.DBI
}
func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) {
val, err := t.txn.Get(t.dbi, key)
if err != nil {
if mdbx.IsNotFound(err) {
return nil, nil, pebble.ErrNotFound
}
return nil, nil, err
}
// Copy the value since it's only valid during the transaction
result := make([]byte, len(val))
copy(result, val)
return result, io.NopCloser(nil), nil
}
func (t *MDBXTransaction) Set(key []byte, value []byte) error {
return t.txn.Put(t.dbi, key, value, 0)
}
func (t *MDBXTransaction) Commit() error {
// we drop latency here, but it should be saved as metric
_, err := t.txn.Commit()
return err
}
func (t *MDBXTransaction) Delete(key []byte) error {
return t.txn.Del(t.dbi, key, nil)
}
func (t *MDBXTransaction) Abort() error {
t.txn.Abort()
return nil
}
func (t *MDBXTransaction) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) {
return t.NewIterCustom(lowerBound, upperBound)
}
func (t *MDBXTransaction) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) {
cursor, err := t.txn.OpenCursor(t.dbi)
if err != nil {
return nil, err
}
var key, value []byte
if lowerBound == nil {
key, value, err = cursor.Get(nil, nil, mdbx.First)
} else {
key, value, err = cursor.Get(lowerBound, nil, mdbx.SetRange)
}
if err != nil {
return nil, err
}
return &MDBXIterator{
txn: t.txn,
cursor: cursor,
lowerBound: lowerBound,
upperBound: upperBound,
valid: upperBound == nil || bytes.Compare(key, upperBound) < 0,
key: key,
value: value,
}, nil
}
func (t *MDBXTransaction) DeleteRange(lowerBound []byte, upperBound []byte) error {
iter, err := t.NewIterCustom(lowerBound, upperBound)
defer iter.Close()
if err != nil {
return err
}
for iter.First(); iter.Valid(); iter.Next() {
err = iter.cursor.Del(mdbx.Current)
if err != nil {
return err
}
}
return nil
}
// Ensure MDBXTransaction implements Transaction interface
var _ Transaction = (*MDBXTransaction)(nil)
// Iterator implementation
type MDBXIterator struct {
txn *mdbx.Txn
cursor *mdbx.Cursor
lowerBound []byte
upperBound []byte
key []byte
value []byte
valid bool
}
func (i *MDBXIterator) Key() []byte {
if !i.valid {
return nil
}
return i.key
}
func (i *MDBXIterator) First() bool {
var err error
var k, v []byte
if i.lowerBound == nil {
k, v, err = i.cursor.Get(nil, nil, mdbx.First)
} else {
k, v, err = i.cursor.Get(i.lowerBound, nil, mdbx.SetRange)
}
if err != nil {
i.valid = false
return false
}
// Check if key is within upper bound
if i.upperBound != nil && bytes.Compare(k, i.upperBound) >= 0 {
i.valid = false
return false
}
i.key = k
i.value = v
i.valid = true
return true
}
func (i *MDBXIterator) Next() bool {
if !i.valid {
return false
}
k, v, err := i.cursor.Get(nil, nil, mdbx.Next)
if err != nil {
i.valid = false
return false
}
// Check if key is within upper bound
if i.upperBound != nil && bytes.Compare(k, i.upperBound) >= 0 {
i.valid = false
return false
}
i.key = k
i.value = v
return true
}
func (i *MDBXIterator) Prev() bool {
if !i.valid {
return false
}
k, v, err := i.cursor.Get(nil, nil, mdbx.Prev)
if err != nil {
i.valid = false
return false
}
// Check if key is within lower bound
if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 {
i.valid = false
return false
}
i.key = k
i.value = v
return true
}
func (i *MDBXIterator) Valid() bool {
return i.valid
}
func (i *MDBXIterator) Value() []byte {
if !i.valid {
return nil
}
return i.value
}
func (i *MDBXIterator) Close() error {
i.valid = false
i.cursor.Close()
i.txn.Abort()
return nil
}
func (i *MDBXIterator) SeekLT(target []byte) bool {
if target == nil {
return i.Last()
}
// First try to find the exact key
k, v, err := i.cursor.Get(target, nil, mdbx.SetRange)
if err != nil {
// If not found, try to find the last key
return i.Last()
}
// If we found the exact key, move to the previous one
if bytes.Compare(k, target) == 0 {
k, v, err = i.cursor.Get(nil, nil, mdbx.Prev)
if err != nil {
i.valid = false
return false
}
} else {
// If we found a key greater than target, move to the previous one
k, v, err = i.cursor.Get(nil, nil, mdbx.Prev)
if err != nil {
i.valid = false
return false
}
}
// Check if key is within lower bound
if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 {
i.valid = false
return false
}
i.key = k
i.value = v
i.valid = true
return true
}
func (i *MDBXIterator) Last() bool {
var err error
var k, v []byte
if i.upperBound == nil {
k, v, err = i.cursor.Get(nil, nil, mdbx.Last)
} else {
// Position at or before upper bound
k, v, err = i.cursor.Get(i.upperBound, nil, mdbx.SetRange)
if err != nil {
// If upper bound is beyond last key, get the last key
k, v, err = i.cursor.Get(nil, nil, mdbx.Last)
if err != nil {
i.valid = false
return false
}
} else if bytes.Compare(k, i.upperBound) >= 0 {
// If we found the upper bound or beyond, move to previous
k, v, err = i.cursor.Get(nil, nil, mdbx.Prev)
if err != nil {
i.valid = false
return false
}
}
}
if err != nil {
i.valid = false
return false
}
// Check if key is within lower bound
if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 {
i.valid = false
return false
}
i.key = k
i.value = v
i.valid = true
return true
}
// Ensure MDBXIterator implements Iterator interface
var _ Iterator = (*MDBXIterator)(nil)
// Helper for closing transactions
type txnCloser struct {
txn *mdbx.Txn
}
func (c txnCloser) Close() error {
c.txn.Abort()
return nil
}
type noopClose struct{}
func (n noopClose) Close() error {
return nil
}