From f1ceb5d18993433aae3bc724ccae286a717b88e0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 11 May 2025 02:27:04 -0300 Subject: [PATCH 01/21] mdbx --- node/app/wire.go | 4 +- node/app/wire_gen.go | 34 +- node/consensus/data/frame_importer.go | 2 +- node/main.go | 2 +- node/store/mdbx.go | 475 ++++++++++++++++++++++++++ 5 files changed, 496 insertions(+), 21 deletions(-) create mode 100644 node/store/mdbx.go diff --git a/node/app/wire.go b/node/app/wire.go index c7a8d4f..e6c5957 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -52,8 +52,8 @@ var keyManagerSet = wire.NewSet( var storeSet = wire.NewSet( wire.FieldsOf(new(*config.Config), "DB"), - store.NewPebbleDB, - wire.Bind(new(store.KVDB), new(*store.PebbleDB)), + store.NewMDBXDB, + wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index d357b7f..0d9f545 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -37,24 +37,24 @@ func NewDHTNode(configConfig *config.Config) (*DHTNode, error) { func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := debugLogger() dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) - pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger) - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) - pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger) + mdbxdb := store.NewMDBXDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := store.NewPebbleHypergraphStore(pebbleDB, zapLogger) + pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := store.NewPebbleKeyStore(pebbleDB, zapLogger) + pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) - node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, pebbleDB) + node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) if err != nil { return nil, err } @@ -64,24 +64,24 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := logger() dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) - pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger) - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) - pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger) + mdbxdb := store.NewMDBXDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := store.NewPebbleHypergraphStore(pebbleDB, zapLogger) + pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := store.NewPebbleKeyStore(pebbleDB, zapLogger) + pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) - node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, pebbleDB) + node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) if err != nil { return nil, err } @@ -98,9 +98,9 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) { func NewClockStore(configConfig *config.Config) (store.ClockStore, error) { dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) + mdbxdb := store.NewMDBXDB(dbConfig) zapLogger := logger() - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) return pebbleClockStore, nil } @@ -134,7 +134,7 @@ var debugLoggerSet = wire.NewSet( var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager))) -var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, wire.Bind(new(store.KVDB), new(*store.PebbleDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore))) +var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewMDBXDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore))) var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager))) diff --git a/node/consensus/data/frame_importer.go b/node/consensus/data/frame_importer.go index 7f7b1bb..a3a7435 100644 --- a/node/consensus/data/frame_importer.go +++ b/node/consensus/data/frame_importer.go @@ -213,7 +213,7 @@ func (e *DataClockConsensusEngine) applySnapshot( return nil } - temporaryStore := store.NewPebbleDB(&config.DBConfig{ + temporaryStore := store.NewMDBXDB(&config.DBConfig{ Path: snapshotDBPath, }) temporaryClockStore := store.NewPebbleClockStore(temporaryStore, e.logger) diff --git a/node/main.go b/node/main.go index 56512f5..0bc5a87 100644 --- a/node/main.go +++ b/node/main.go @@ -329,7 +329,7 @@ func main() { } if *compactDB && *core == 0 { - db := store.NewPebbleDB(nodeConfig.DB) + db := store.NewMDBXDB(nodeConfig.DB) if err := db.CompactAll(); err != nil { panic(err) } diff --git a/node/store/mdbx.go b/node/store/mdbx.go new file mode 100644 index 0000000..179b1a1 --- /dev/null +++ b/node/store/mdbx.go @@ -0,0 +1,475 @@ +package store + +import ( + "bytes" + "io" + "os" + + "github.com/cockroachdb/pebble" + "github.com/erigontech/mdbx-go/mdbx" + "source.quilibrium.com/quilibrium/monorepo/node/config" +) + +type MDBXDB struct { + env *mdbx.Env + dbi mdbx.DBI +} + +const GB = 1 << 30 +const MB = 1 << 20 + +// erigon defaults, should be tuned to quil +const READERS_LIMIT = 32_000 +const RP_AUGMENT_LIMIT = 1_000_000 +const MAP_SIZE = 16 * GB +const GROWTH_STEP = 2 * MB +const PAGE_SIZE = 4096 + +const DEFAULT_TABLE = "default" // we use only one for now + +func NewMDBXDB(config *config.DBConfig) *MDBXDB { + env, err := mdbx.NewEnv(mdbx.Default) + if err != nil { + panic(err) + } + + // Configs + if err = env.SetOption(mdbx.OptMaxDB, 200); err != nil { + panic(err) + } + if err = env.SetOption(mdbx.OptMaxReaders, READERS_LIMIT); err != nil { + panic(err) + } + if err = env.SetOption(mdbx.OptRpAugmentLimit, RP_AUGMENT_LIMIT); err != nil { + panic(err) + } + os.MkdirAll(config.Path, os.ModePerm) + if err = env.SetGeometry(-1, -1, int(MAP_SIZE), int(GROWTH_STEP), -1, int(PAGE_SIZE)); err != nil { + panic(err) + } + + // Open the environment + flags := uint(mdbx.NoReadahead) | mdbx.Durable + if err := env.Open(config.Path, flags, 0664); err != nil { + panic(err) + } + + // Open the database (we use a single one called default, but this should change later) + var dbi mdbx.DBI + env.Update(func(txn *mdbx.Txn) error { + dbi, err = txn.OpenDBI(DEFAULT_TABLE, mdbx.Create, nil, nil) + if err != nil { + return err + } + return nil + }) + + return &MDBXDB{env: env, dbi: dbi} +} + +// KVDB interface implementation +func (m *MDBXDB) Get(key []byte) ([]byte, io.Closer, error) { + var result []byte + var err error + err = m.env.View(func(txn *mdbx.Txn) error { + val, err := txn.Get(m.dbi, key) + if err != nil { + if mdbx.IsNotFound(err) { + return pebble.ErrNotFound + } + return err + } + result = make([]byte, len(val)) + copy(result, val) + return err + }) + + // no closer, the transaction is already closed + return result, noopClose{}, err +} + +func (m *MDBXDB) Set(key, value []byte) error { + return m.env.Update(func(txn *mdbx.Txn) error { + return txn.Put(m.dbi, key, value, mdbx.Upsert) + }) +} + +func (m *MDBXDB) Delete(key []byte) error { + return m.env.Update(func(txn *mdbx.Txn) error { + return txn.Del(m.dbi, key, nil) + }) +} + +func (m *MDBXDB) NewBatch(indexed bool) Transaction { + // MDBX doesn't have a direct equivalent to Pebble's indexed batch + // We'll use a regular transaction for both cases + txn, err := m.env.BeginTxn(nil, 0) + if err != nil { + panic(err) + } + + return &MDBXTransaction{ + txn: txn, + dbi: m.dbi, + } +} + +func (m *MDBXDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { + return m.NewIterCustom(lowerBound, upperBound) +} + +func (m *MDBXDB) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) { + txn, err := m.env.BeginTxn(nil, mdbx.Readonly) + if err != nil { + return nil, err + } + + cursor, err := txn.OpenCursor(m.dbi) + if err != nil { + txn.Abort() + return nil, err + } + + return &MDBXIterator{ + txn: txn, + cursor: cursor, + lowerBound: lowerBound, + upperBound: upperBound, + valid: false, + }, nil +} + +func (m *MDBXDB) Compact(start, end []byte, parallelize bool) error { + // MDBX handles compaction differently than Pebble + // We can use env.Copy2fd to create a compacted copy, but for now + // we'll just return nil as MDBX handles this internally + return nil +} + +func (m *MDBXDB) CompactAll() error { + // MDBX handles compaction differently than Pebble + // For now, we'll just return nil as MDBX handles this internally + return nil +} + +func (m *MDBXDB) Close() error { + m.env.Close() + return nil +} + +func (m *MDBXDB) DeleteRange(start, end []byte) error { + iter, err := m.NewIterCustom(start, end) + defer iter.Close() + if err != nil { + return err + } + for iter.First(); iter.Valid(); iter.Next() { + err = iter.cursor.Del(mdbx.Current) + if err != nil { + return err + } + } + return nil +} + +// Ensure MDBXDB implements KVDB interface +var _ KVDB = (*MDBXDB)(nil) + +// Transaction implementation +type MDBXTransaction struct { + txn *mdbx.Txn + dbi mdbx.DBI +} + +func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) { + val, err := t.txn.Get(t.dbi, key) + if err != nil { + if mdbx.IsNotFound(err) { + return nil, nil, pebble.ErrNotFound + } + return nil, nil, err + } + + // Copy the value since it's only valid during the transaction + result := make([]byte, len(val)) + copy(result, val) + + return result, io.NopCloser(nil), nil +} + +func (t *MDBXTransaction) Set(key []byte, value []byte) error { + return t.txn.Put(t.dbi, key, value, 0) +} + +func (t *MDBXTransaction) Commit() error { + // we drop latency here, but it should be saved as metric + _, err := t.txn.Commit() + return err +} + +func (t *MDBXTransaction) Delete(key []byte) error { + return t.txn.Del(t.dbi, key, nil) +} + +func (t *MDBXTransaction) Abort() error { + t.txn.Abort() + return nil +} + +func (t *MDBXTransaction) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { + return t.NewIterCustom(lowerBound, upperBound) +} + +func (t *MDBXTransaction) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) { + cursor, err := t.txn.OpenCursor(t.dbi) + if err != nil { + return nil, err + } + + var key, value []byte + if lowerBound == nil { + key, value, err = cursor.Get(nil, nil, mdbx.First) + } else { + key, value, err = cursor.Get(lowerBound, nil, mdbx.SetRange) + } + if err != nil { + return nil, err + } + + return &MDBXIterator{ + txn: t.txn, + cursor: cursor, + lowerBound: lowerBound, + upperBound: upperBound, + valid: upperBound == nil || bytes.Compare(key, upperBound) < 0, + key: key, + value: value, + }, nil +} + +func (t *MDBXTransaction) DeleteRange(lowerBound []byte, upperBound []byte) error { + iter, err := t.NewIterCustom(lowerBound, upperBound) + defer iter.Close() + if err != nil { + return err + } + for iter.First(); iter.Valid(); iter.Next() { + err = iter.cursor.Del(mdbx.Current) + if err != nil { + return err + } + } + return nil +} + +// Ensure MDBXTransaction implements Transaction interface +var _ Transaction = (*MDBXTransaction)(nil) + +// Iterator implementation +type MDBXIterator struct { + txn *mdbx.Txn + cursor *mdbx.Cursor + lowerBound []byte + upperBound []byte + key []byte + value []byte + valid bool +} + +func (i *MDBXIterator) Key() []byte { + if !i.valid { + return nil + } + return i.key +} + +func (i *MDBXIterator) First() bool { + var err error + var k, v []byte + + if i.lowerBound == nil { + k, v, err = i.cursor.Get(nil, nil, mdbx.First) + } else { + k, v, err = i.cursor.Get(i.lowerBound, nil, mdbx.SetRange) + } + + if err != nil { + i.valid = false + return false + } + + // Check if key is within upper bound + if i.upperBound != nil && bytes.Compare(k, i.upperBound) >= 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +func (i *MDBXIterator) Next() bool { + if !i.valid { + return false + } + + k, v, err := i.cursor.Get(nil, nil, mdbx.Next) + if err != nil { + i.valid = false + return false + } + + // Check if key is within upper bound + if i.upperBound != nil && bytes.Compare(k, i.upperBound) >= 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + return true +} + +func (i *MDBXIterator) Prev() bool { + if !i.valid { + return false + } + + k, v, err := i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + return true +} + +func (i *MDBXIterator) Valid() bool { + return i.valid +} + +func (i *MDBXIterator) Value() []byte { + if !i.valid { + return nil + } + return i.value +} + +func (i *MDBXIterator) Close() error { + i.valid = false + i.cursor.Close() + i.txn.Abort() + return nil +} + +func (i *MDBXIterator) SeekLT(target []byte) bool { + if target == nil { + return i.Last() + } + + // First try to find the exact key + k, v, err := i.cursor.Get(target, nil, mdbx.SetRange) + if err != nil { + // If not found, try to find the last key + return i.Last() + } + + // If we found the exact key, move to the previous one + if bytes.Compare(k, target) == 0 { + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } else { + // If we found a key greater than target, move to the previous one + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +func (i *MDBXIterator) Last() bool { + var err error + var k, v []byte + + if i.upperBound == nil { + k, v, err = i.cursor.Get(nil, nil, mdbx.Last) + } else { + // Position at or before upper bound + k, v, err = i.cursor.Get(i.upperBound, nil, mdbx.SetRange) + if err != nil { + // If upper bound is beyond last key, get the last key + k, v, err = i.cursor.Get(nil, nil, mdbx.Last) + if err != nil { + i.valid = false + return false + } + } else if bytes.Compare(k, i.upperBound) >= 0 { + // If we found the upper bound or beyond, move to previous + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } + } + + if err != nil { + i.valid = false + return false + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +// Ensure MDBXIterator implements Iterator interface +var _ Iterator = (*MDBXIterator)(nil) + +// Helper for closing transactions +type txnCloser struct { + txn *mdbx.Txn +} + +func (c txnCloser) Close() error { + c.txn.Abort() + return nil +} + +type noopClose struct{} + +func (n noopClose) Close() error { + return nil +} From 1954c13bc691554bf1d799c668ddeb85d7bba956 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 12 May 2025 21:56:56 -0300 Subject: [PATCH 02/21] track writes, implement batcher --- node/store/mdbx.go | 128 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 114 insertions(+), 14 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 179b1a1..d9b0d1c 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -4,6 +4,8 @@ import ( "bytes" "io" "os" + "runtime" + "sync/atomic" "github.com/cockroachdb/pebble" "github.com/erigontech/mdbx-go/mdbx" @@ -15,6 +17,11 @@ type MDBXDB struct { dbi mdbx.DBI } +// this is ugly but I did not find a better way +var created = false +var readTxs atomic.Int32 +var writeTxs atomic.Int32 + const GB = 1 << 30 const MB = 1 << 20 @@ -28,6 +35,10 @@ const PAGE_SIZE = 4096 const DEFAULT_TABLE = "default" // we use only one for now func NewMDBXDB(config *config.DBConfig) *MDBXDB { + if created { + panic("do not create two instances") + } + created = true env, err := mdbx.NewEnv(mdbx.Default) if err != nil { panic(err) @@ -89,12 +100,18 @@ func (m *MDBXDB) Get(key []byte) ([]byte, io.Closer, error) { } func (m *MDBXDB) Set(key, value []byte) error { + if writeTxs.Load() > 1 { + panic("another tx is writing") + } return m.env.Update(func(txn *mdbx.Txn) error { return txn.Put(m.dbi, key, value, mdbx.Upsert) }) } func (m *MDBXDB) Delete(key []byte) error { + if writeTxs.Load() > 1 { + panic("another tx is writing") + } return m.env.Update(func(txn *mdbx.Txn) error { return txn.Del(m.dbi, key, nil) }) @@ -103,8 +120,21 @@ func (m *MDBXDB) Delete(key []byte) error { func (m *MDBXDB) NewBatch(indexed bool) Transaction { // MDBX doesn't have a direct equivalent to Pebble's indexed batch // We'll use a regular transaction for both cases + return &MDBXBatch{ + db: m, + } +} + +func (m *MDBXDB) NewTransaction() Transaction { txn, err := m.env.BeginTxn(nil, 0) + if writeTxs.Load() > 1 { + panic("another tx is writing") + } + writeTxs.Add(1) + runtime.LockOSThread() if err != nil { + writeTxs.Add(-1) + runtime.UnlockOSThread() panic(err) } @@ -115,18 +145,20 @@ func (m *MDBXDB) NewBatch(indexed bool) Transaction { } func (m *MDBXDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { - return m.NewIterCustom(lowerBound, upperBound) -} - -func (m *MDBXDB) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) { txn, err := m.env.BeginTxn(nil, mdbx.Readonly) + runtime.LockOSThread() + readTxs.Add(1) if err != nil { + runtime.UnlockOSThread() + readTxs.Add(-1) return nil, err } cursor, err := txn.OpenCursor(m.dbi) if err != nil { txn.Abort() + runtime.UnlockOSThread() + readTxs.Add(-1) return nil, err } @@ -136,6 +168,7 @@ func (m *MDBXDB) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXItera lowerBound: lowerBound, upperBound: upperBound, valid: false, + txOwner: true, }, nil } @@ -158,17 +191,13 @@ func (m *MDBXDB) Close() error { } func (m *MDBXDB) DeleteRange(start, end []byte) error { - iter, err := m.NewIterCustom(start, end) - defer iter.Close() + tx := m.NewBatch(false) + err := tx.DeleteRange(start, end) if err != nil { + tx.Abort() return err } - for iter.First(); iter.Valid(); iter.Next() { - err = iter.cursor.Del(mdbx.Current) - if err != nil { - return err - } - } + tx.Commit() return nil } @@ -204,6 +233,8 @@ func (t *MDBXTransaction) Set(key []byte, value []byte) error { func (t *MDBXTransaction) Commit() error { // we drop latency here, but it should be saved as metric _, err := t.txn.Commit() + runtime.UnlockOSThread() + writeTxs.Add(-1) return err } @@ -213,6 +244,8 @@ func (t *MDBXTransaction) Delete(key []byte) error { func (t *MDBXTransaction) Abort() error { t.txn.Abort() + runtime.UnlockOSThread() + writeTxs.Add(-1) return nil } @@ -244,6 +277,7 @@ func (t *MDBXTransaction) NewIterCustom(lowerBound []byte, upperBound []byte) (* valid: upperBound == nil || bytes.Compare(key, upperBound) < 0, key: key, value: value, + txOwner: false, }, nil } @@ -254,7 +288,7 @@ func (t *MDBXTransaction) DeleteRange(lowerBound []byte, upperBound []byte) erro return err } for iter.First(); iter.Valid(); iter.Next() { - err = iter.cursor.Del(mdbx.Current) + err = iter.txn.Del(t.dbi, iter.key, nil) if err != nil { return err } @@ -274,6 +308,7 @@ type MDBXIterator struct { key []byte value []byte valid bool + txOwner bool } func (i *MDBXIterator) Key() []byte { @@ -368,7 +403,11 @@ func (i *MDBXIterator) Value() []byte { func (i *MDBXIterator) Close() error { i.valid = false i.cursor.Close() - i.txn.Abort() + if i.txOwner { + i.txn.Abort() + readTxs.Add(-1) + runtime.UnlockOSThread() + } return nil } @@ -473,3 +512,64 @@ type noopClose struct{} func (n noopClose) Close() error { return nil } + +type MDBXBatch struct { + db *MDBXDB + calls []func(tx Transaction) error +} + +// Abort implements Transaction. +func (m *MDBXBatch) Abort() error { + m.calls = nil + return nil +} + +// Commit implements Transaction. +func (m *MDBXBatch) Commit() error { + tx := m.db.NewTransaction() + for _, call := range m.calls { + err := call(tx) + if err != nil { + tx.Abort() + return err + } + } + m.calls = nil + return tx.Commit() +} + +// Delete implements Transaction. +func (m *MDBXBatch) Delete(key []byte) error { + m.calls = append(m.calls, func(tx Transaction) error { + return tx.Delete(key) + }) + return nil +} + +// DeleteRange implements Transaction. +func (m *MDBXBatch) DeleteRange(lowerBound []byte, upperBound []byte) error { + m.calls = append(m.calls, func(tx Transaction) error { + return tx.DeleteRange(lowerBound, upperBound) + }) + return nil +} + +// Get implements Transaction. +func (m *MDBXBatch) Get(key []byte) ([]byte, io.Closer, error) { + panic("unimplemented") +} + +// NewIter implements Transaction. +func (m *MDBXBatch) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { + return m.db.NewIter(lowerBound, upperBound) +} + +// Set implements Transaction. +func (m *MDBXBatch) Set(key []byte, value []byte) error { + m.calls = append(m.calls, func(tx Transaction) error { + return tx.Set(key, value) + }) + return nil +} + +var _ Transaction = (*MDBXBatch)(nil) From 82646e8be0bedc111d3744a226b242831983d144 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 13 May 2025 16:16:39 -0300 Subject: [PATCH 03/21] create a new type for batch ops --- node/store/mdbx.go | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index d9b0d1c..8fc2d9e 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -514,43 +514,47 @@ func (n noopClose) Close() error { } type MDBXBatch struct { - db *MDBXDB - calls []func(tx Transaction) error + db *MDBXDB + operations []BatchOperation } // Abort implements Transaction. func (m *MDBXBatch) Abort() error { - m.calls = nil + m.operations = nil return nil } // Commit implements Transaction. func (m *MDBXBatch) Commit() error { tx := m.db.NewTransaction() - for _, call := range m.calls { - err := call(tx) + var err error + for _, op := range m.operations { + switch op.opcode { + case Set: + err = tx.Set(op.operand1, op.operand2) + case Delete: + err = tx.Delete(op.operand1) + case DeleteRange: + err = tx.DeleteRange(op.operand1, op.operand2) + } if err != nil { tx.Abort() return err } } - m.calls = nil + m.operations = nil return tx.Commit() } // Delete implements Transaction. func (m *MDBXBatch) Delete(key []byte) error { - m.calls = append(m.calls, func(tx Transaction) error { - return tx.Delete(key) - }) + m.operations = append(m.operations, BatchOperation{opcode: Delete, operand1: key}) return nil } // DeleteRange implements Transaction. func (m *MDBXBatch) DeleteRange(lowerBound []byte, upperBound []byte) error { - m.calls = append(m.calls, func(tx Transaction) error { - return tx.DeleteRange(lowerBound, upperBound) - }) + m.operations = append(m.operations, BatchOperation{opcode: DeleteRange, operand1: lowerBound, operand2: upperBound}) return nil } @@ -566,10 +570,20 @@ func (m *MDBXBatch) NewIter(lowerBound []byte, upperBound []byte) (Iterator, err // Set implements Transaction. func (m *MDBXBatch) Set(key []byte, value []byte) error { - m.calls = append(m.calls, func(tx Transaction) error { - return tx.Set(key, value) - }) + m.operations = append(m.operations, BatchOperation{opcode: Set, operand1: key, operand2: value}) return nil } var _ Transaction = (*MDBXBatch)(nil) + +type BatchOperation struct { + opcode uint8 + operand1 []byte + operand2 []byte +} + +const ( + Set = iota + Delete + DeleteRange +) From f864c209ca23985f137c5008d9f1c1ffec0bd533 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 13 May 2025 16:16:49 -0300 Subject: [PATCH 04/21] tests --- node/store/mdbx_test.go | 410 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 410 insertions(+) create mode 100644 node/store/mdbx_test.go diff --git a/node/store/mdbx_test.go b/node/store/mdbx_test.go new file mode 100644 index 0000000..f1bac20 --- /dev/null +++ b/node/store/mdbx_test.go @@ -0,0 +1,410 @@ +package store + +import ( + "bytes" + "errors" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/cockroachdb/pebble" + "source.quilibrium.com/quilibrium/monorepo/node/config" +) + +func TestMDBXDB(t *testing.T) { + // Create temporary directory + tempDir, err := os.MkdirTemp("", "mdbx-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + // Create DB config + dbConfig := &config.DBConfig{ + Path: tempDir, + } + + // Create DB instance + db := NewMDBXDB(dbConfig) + defer db.Close() + + // Test basic operations + t.Run("BasicOperations", func(t *testing.T) { + // Test Set and Get + key := []byte("test-key") + value := []byte("test-value") + + if err := db.Set(key, value); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + + result, closer, err := db.Get(key) + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + defer closer.Close() + + if !bytes.Equal(result, value) { + t.Errorf("Expected %v, got %v", value, result) + } + + // Test Delete + if err := db.Delete(key); err != nil { + t.Fatalf("Failed to delete key: %v", err) + } + + _, _, err = db.Get(key) + if err == nil { + t.Errorf("Expected error for deleted key") + } + }) + + // Test transactions + t.Run("Transactions", func(t *testing.T) { + // Create a non-indexed transaction + tx := db.NewTransaction() + + // Set some values in the transaction + key1 := []byte("tx-key-1") + value1 := []byte("tx-value-1") + key2 := []byte("tx-key-2") + value2 := []byte("tx-value-2") + + if err := tx.Set(key1, value1); err != nil { + t.Fatalf("Failed to set value in transaction: %v", err) + } + if err := tx.Set(key2, value2); err != nil { + t.Fatalf("Failed to set value in transaction: %v", err) + } + + // Values should not be visible outside the transaction yet + _, _, err := db.Get(key1) + if err == nil { + t.Errorf("Key should not be visible outside transaction before commit") + } + + // Values should be visible inside the transaction + result, closer, err := tx.Get(key1) + if err != nil { + t.Fatalf("Failed to get value from transaction: %v", err) + } + defer closer.Close() + if !bytes.Equal(result, value1) { + t.Errorf("Expected %v, got %v", value1, result) + } + + // Commit the transaction + if err := tx.Commit(); err != nil { + t.Fatalf("Failed to commit transaction: %v", err) + } + + // Values should now be visible outside the transaction + result, closer, err = db.Get(key1) + if err != nil { + t.Fatalf("Failed to get value after commit: %v", err) + } + defer closer.Close() + if !bytes.Equal(result, value1) { + t.Errorf("Expected %v, got %v", value1, result) + } + + // Test transaction abort + tx = db.NewBatch(false) + abortKey := []byte("abort-key") + abortValue := []byte("abort-value") + + if err := tx.Set(abortKey, abortValue); err != nil { + t.Fatalf("Failed to set value in transaction: %v", err) + } + + if err := tx.Abort(); err != nil { + t.Fatalf("Failed to abort transaction: %v", err) + } + + // Value should not be visible after abort + _, _, err = db.Get(abortKey) + if err == nil { + t.Errorf("Key should not be visible after transaction abort") + } + + // Test transaction DeleteRange + tx = db.NewTransaction() + + // Set up some keys for the range delete + for i := 1; i <= 5; i++ { + key := []byte(filepath.Join("tx-range", "key", "prefix", "key-"+string(rune('0'+i)))) + value := []byte("tx-value-" + string(rune('0'+i))) + if err := tx.Set(key, value); err != nil { + t.Fatalf("Failed to set value in transaction: %v", err) + } + } + + // Delete a range within the transaction + startKey := []byte(filepath.Join("tx-range", "key", "prefix", "key-2")) + endKey := []byte(filepath.Join("tx-range", "key", "prefix", "key-4")) + fmt.Println("---") + if err := tx.DeleteRange(startKey, endKey); err != nil { + t.Fatalf("Failed to delete range in transaction: %v", err) + } + fmt.Println("---") + + // Check that keys in the range are deleted within the transaction + for i := 2; i < 4; i++ { + key := []byte(filepath.Join("tx-range", "key", "prefix", "key-"+string(rune('0'+i)))) + _, _, err := tx.Get(key) + if err == nil { + t.Errorf("Key %s should be deleted in transaction", key) + } + } + + // Check that keys outside the range still exist within the transaction + txKey1 := []byte(filepath.Join("tx-range", "key", "prefix", "key-1")) + txResult, txCloser, txErr := tx.Get(txKey1) + if txErr != nil { + t.Errorf("Key %s should still exist in transaction: %v", txKey1, txErr) + } else { + defer txCloser.Close() + if !bytes.Equal(txResult, []byte("tx-value-1")) { + t.Errorf("Expected %v, got %v", []byte("tx-value-1"), txResult) + } + } + + // Commit the transaction + if err := tx.Commit(); err != nil { + t.Fatalf("Failed to commit transaction: %v", err) + } + + // Verify the changes are visible outside the transaction + for i := 2; i < 4; i++ { + key := []byte(filepath.Join("tx-range", "key", "prefix", "key-"+string(rune('0'+i)))) + _, _, err := db.Get(key) + if err == nil { + t.Errorf("Key %s should be deleted after commit", key) + } + } + + txKey5 := []byte(filepath.Join("tx-range", "key", "prefix", "key-5")) + txResult2, txCloser2, txErr2 := db.Get(txKey5) + if txErr2 != nil { + t.Errorf("Key %s should still exist after commit: %v", txKey5, txErr2) + } else { + defer txCloser2.Close() + if !bytes.Equal(txResult2, []byte("tx-value-5")) { + t.Errorf("Expected %v, got %v", []byte("tx-value-5"), txResult2) + } + } + }) + + // Test iterators + t.Run("Iterators", func(t *testing.T) { + // Insert some test data + testData := map[string]string{ + "iter-key-1": "iter-value-1", + "iter-key-2": "iter-value-2", + "iter-key-3": "iter-value-3", + "iter-key-4": "iter-value-4", + "iter-key-5": "iter-value-5", + } + + for k, v := range testData { + if err := db.Set([]byte(k), []byte(v)); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + } + + // Test full range iterator + iter, err := db.NewIter([]byte("iter-key-1"), []byte("iter-key-6")) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + count := 0 + for iter.First(); iter.Valid(); iter.Next() { + key := string(iter.Key()) + value := string(iter.Value()) + expectedValue, ok := testData[key] + if !ok { + t.Errorf("Unexpected key: %s", key) + } else if expectedValue != value { + t.Errorf("Expected value %s for key %s, got %s", expectedValue, key, value) + } + count++ + } + + if count != len(testData) { + t.Errorf("Expected %d items, got %d", len(testData), count) + } + + // Test partial range iterator + iter, err = db.NewIter([]byte("iter-key-2"), []byte("iter-key-4")) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + expectedKeys := []string{"iter-key-2", "iter-key-3"} + count = 0 + for iter.First(); iter.Valid(); iter.Next() { + key := string(iter.Key()) + if count >= len(expectedKeys) || key != expectedKeys[count] { + t.Errorf("Expected key %s, got %s", expectedKeys[count], key) + } + count++ + } + + if count != len(expectedKeys) { + t.Errorf("Expected %d items in range, got %d", len(expectedKeys), count) + } + + // Test Last and Prev + iter, err = db.NewIter([]byte("iter-key-1"), []byte("iter-key-6")) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + if !iter.Last() { + t.Errorf("Last() should return true") + } + + if string(iter.Key()) != "iter-key-5" { + t.Errorf("Expected last key to be iter-key-5, got %s", string(iter.Key())) + } + + count = 0 + expectedReverseKeys := []string{"iter-key-5", "iter-key-4", "iter-key-3", "iter-key-2", "iter-key-1"} + for ; iter.Valid(); iter.Prev() { + key := string(iter.Key()) + if count >= len(expectedReverseKeys) || key != expectedReverseKeys[count] { + t.Errorf("Expected key %s, got %s", expectedReverseKeys[count], key) + } + count++ + } + + if count != len(expectedReverseKeys) { + t.Errorf("Expected %d items in reverse range, got %d", len(expectedReverseKeys), count) + } + + // Test SeekLT + iter, err = db.NewIter([]byte("iter-key-1"), []byte("iter-key-6")) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + if !iter.SeekLT([]byte("iter-key-4")) { + t.Errorf("SeekLT() should return true") + } + + if string(iter.Key()) != "iter-key-3" { + t.Errorf("Expected key after SeekLT to be iter-key-3, got %s", string(iter.Key())) + } + + // Test SeekLT with a key that doesn't exist but is in range + if !iter.SeekLT([]byte("iter-key-3.5")) { + t.Errorf("SeekLT() should return true") + } + + if string(iter.Key()) != "iter-key-3" { + t.Errorf("Expected key after SeekLT to be iter-key-3, got %s", string(iter.Key())) + } + }) + + // Test DeleteRange + t.Run("DeleteRange", func(t *testing.T) { + // Insert some test data + for i := 1; i <= 5; i++ { + key := []byte(filepath.Join("range", "key", "prefix", "key-"+string(rune('0'+i)))) + value := []byte("value-" + string(rune('0'+i))) + if err := db.Set(key, value); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + } + + // Delete a range of keys + startKey := []byte(filepath.Join("range", "key", "prefix", "key-2")) + endKey := []byte(filepath.Join("range", "key", "prefix", "key-4")) + if err := db.DeleteRange(startKey, endKey); err != nil { + t.Fatalf("Failed to delete range: %v", err) + } + + // Check that keys in the range are deleted + for i := 2; i < 4; i++ { + key := []byte(filepath.Join("range", "key", "prefix", "key-"+string(rune('0'+i)))) + _, _, err := db.Get(key) + if err == nil { + t.Errorf("Key %s should be deleted", key) + } + } + + // Check that keys outside the range still exist + key1 := []byte(filepath.Join("range", "key", "prefix", "key-1")) + result, closer, err := db.Get(key1) + if err != nil { + t.Errorf("Key %s should still exist: %v", key1, err) + } else { + defer closer.Close() + if !bytes.Equal(result, []byte("value-1")) { + t.Errorf("Expected %v, got %v", []byte("value-1"), result) + } + } + + key5 := []byte(filepath.Join("range", "key", "prefix", "key-5")) + result, closer, err = db.Get(key5) + if err != nil { + t.Errorf("Key %s should still exist: %v", key5, err) + } else { + defer closer.Close() + if !bytes.Equal(result, []byte("value-5")) { + t.Errorf("Expected %v, got %v", []byte("value-5"), result) + } + } + }) + + t.Run("Batch", func(t *testing.T) { + // Insert some test data + for i := 1; i <= 5; i++ { + key := []byte("key-" + string(rune('0'+i))) + value := []byte("value-" + string(rune('0'+i))) + if err := db.Set(key, value); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + } + + batch := db.NewBatch(false) + batch.Set([]byte("key-1"), []byte("potato")) + batch.Set([]byte("key-5"), []byte("tomato")) + batch.Delete([]byte("key-2")) + batch.DeleteRange([]byte("key-3"), []byte("key-5")) + if batch.Commit() != nil { + t.Errorf("commit failed") + t.Fail() + } + + for i := 1; i <= 5; i++ { + key := []byte("key-" + string(rune('0'+i))) + val, _, err := db.Get(key) + if err != nil && !errors.Is(err, pebble.ErrNotFound) { + t.Fatalf("Failed to get value: %v", err) + t.Fail() + } + switch i { + case 1: + if !bytes.Equal(val, []byte("potato")) { + t.Errorf("wrong key-1") + } + case 5: + if !bytes.Equal(val, []byte("tomato")) { + t.Errorf("wrong key-5") + } + default: + if err != nil && errors.Is(err, ErrNotFound) { + t.Errorf("key-%d should be deleted, but we got (%s)", i, val) + } + } + } + }) + +} From 60dd1c104b4de15c121c74610c6a8aec0d69e8b7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 13 May 2025 18:11:25 -0300 Subject: [PATCH 05/21] mdbx: refactor batch commit --- node/store/mdbx.go | 48 ++++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 8fc2d9e..759b1c6 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -526,24 +526,40 @@ func (m *MDBXBatch) Abort() error { // Commit implements Transaction. func (m *MDBXBatch) Commit() error { - tx := m.db.NewTransaction() - var err error - for _, op := range m.operations { - switch op.opcode { - case Set: - err = tx.Set(op.operand1, op.operand2) - case Delete: - err = tx.Delete(op.operand1) - case DeleteRange: - err = tx.DeleteRange(op.operand1, op.operand2) + err := m.db.env.Update(func(tx *mdbx.Txn) error { + var err error + for _, op := range m.operations { + switch op.opcode { + case Set: + err = tx.Put(m.db.dbi, op.operand1, op.operand2, mdbx.Upsert) + case Delete: + err = tx.Del(m.db.dbi, op.operand1, nil) + case DeleteRange: + cursor, err := tx.OpenCursor(m.db.dbi) + if err != nil { + return err + } + key, _, err := cursor.Get(op.operand1, nil, mdbx.SetRange) + for bytes.Compare(key, op.operand1) >= 0 && bytes.Compare(key, op.operand2) < 0 { + err = cursor.Del(mdbx.Current) + if err != nil { + return err + } + key, _, err = cursor.Get(nil, nil, mdbx.Next) + if err != nil { + return err + } + } + cursor.Close() + } + if err != nil { + return err + } } - if err != nil { - tx.Abort() - return err - } - } + return err + }) m.operations = nil - return tx.Commit() + return err } // Delete implements Transaction. From d0a18c6b9988be3865684313a575616477eaa851 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 14 May 2025 19:44:14 -0300 Subject: [PATCH 06/21] mdbx: add and test OpenDB --- node/store/mdbx.go | 23 ++++++++++++++++++----- node/store/mdbx_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 759b1c6..766eef7 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -65,17 +65,30 @@ func NewMDBXDB(config *config.DBConfig) *MDBXDB { panic(err) } - // Open the database (we use a single one called default, but this should change later) + // Open the default database. Other databases are opened using OpenDB) + db := &MDBXDB{env: env} + db, err = db.OpenDB(DEFAULT_TABLE) + if err != nil { + panic(err) + } + + return db +} + +func (m *MDBXDB) OpenDB(name string) (*MDBXDB, error) { var dbi mdbx.DBI - env.Update(func(txn *mdbx.Txn) error { - dbi, err = txn.OpenDBI(DEFAULT_TABLE, mdbx.Create, nil, nil) + var err error + err = m.env.Update(func(txn *mdbx.Txn) error { + dbi, err = txn.OpenDBI(name, mdbx.Create, nil, nil) if err != nil { return err } return nil }) - - return &MDBXDB{env: env, dbi: dbi} + if err != nil { + return nil, err + } + return &MDBXDB{env: m.env, dbi: dbi}, nil } // KVDB interface implementation diff --git a/node/store/mdbx_test.go b/node/store/mdbx_test.go index f1bac20..91b0d0d 100644 --- a/node/store/mdbx_test.go +++ b/node/store/mdbx_test.go @@ -407,4 +407,45 @@ func TestMDBXDB(t *testing.T) { } }) + t.Run("OpenDB", func(t *testing.T) { + names := []string{DEFAULT_TABLE, "table1", "table2"} + var dbs []MDBXDB + for _, name := range names { + newdb, err := db.OpenDB(name) + if err != nil { + t.Errorf("error opening db: %v", err) + t.Fail() + } + dbs = append(dbs, *newdb) + } + for j, currentDB := range dbs { + for i := 1; i <= 5; i++ { + name := names[j] + key := []byte("key-" + string(rune('0'+i))) + value := []byte(name + string(rune('0'+i))) + if err := currentDB.Set(key, value); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + } + } + + // Check that values under the same key on different dbs were not overwritten + for j, currentDB := range dbs { + for i := 1; i <= 5; i++ { + name := names[j] + key := []byte("key-" + string(rune('0'+i))) + expectedValue := []byte(name + string(rune('0'+i))) + value, _, err := currentDB.Get(key) + if !bytes.Equal(value, expectedValue) { + t.Errorf("%s was overwritten by %s on db %s", expectedValue, value, name) + t.Fail() + } + if err != nil { + t.Errorf("Error getting value: %v", err) + } + } + } + + }) + } From 31830d0863a748743998d0258cf6260b1cb31991 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 14 May 2025 20:27:29 -0300 Subject: [PATCH 07/21] app: use multiple logical databases --- node/app/wire.go | 42 ++++++++++++++++++++++----- node/app/wire_gen.go | 64 +++++++++++++++++++++++++++++++---------- node/store/mdbx.go | 13 +++------ node/store/mdbx_test.go | 2 +- node/store/peerstore.go | 5 ++-- 5 files changed, 91 insertions(+), 35 deletions(-) diff --git a/node/app/wire.go b/node/app/wire.go index e6c5957..c5ae43d 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -50,16 +50,44 @@ var keyManagerSet = wire.NewSet( wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)), ) +func provideBaseDB(dbConfig *config.DBConfig) *store.MDBXDB { + return store.NewMDBXDB(dbConfig) +} + +func providePebbleClockStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleClockStore { + return store.NewPebbleClockStore(db.OpenDB("clock_store"), logger) +} + +func providePebbleCoinStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleCoinStore { + return store.NewPebbleCoinStore(db.OpenDB("coin_store"), logger) +} + +func providePebbleKeyStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleKeyStore { + return store.NewPebbleKeyStore(db.OpenDB("key_store"), logger) +} + +func providePebbleDataProofStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleDataProofStore { + return store.NewPebbleDataProofStore(db.OpenDB("data_proof_store"), logger) +} + +func providePebbleHypergraphStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleHypergraphStore { + return store.NewPebbleHypergraphStore(db.OpenDB("hypergraph_store"), logger) +} + +func providePeerstoreDatastore(db *store.MDBXDB, logger *zap.Logger) *store.PeerstoreDatastore { + return store.NewPeerstoreDatastore(db.OpenDB("peerstore")) +} + var storeSet = wire.NewSet( wire.FieldsOf(new(*config.Config), "DB"), - store.NewMDBXDB, + provideBaseDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), - store.NewPebbleClockStore, - store.NewPebbleCoinStore, - store.NewPebbleKeyStore, - store.NewPebbleDataProofStore, - store.NewPebbleHypergraphStore, - store.NewPeerstoreDatastore, + providePebbleClockStore, + providePebbleCoinStore, + providePebbleKeyStore, + providePebbleDataProofStore, + providePebbleHypergraphStore, + providePeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index 0d9f545..a31d73c 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -37,21 +37,21 @@ func NewDHTNode(configConfig *config.Config) (*DHTNode, error) { func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := debugLogger() dbConfig := configConfig.DB - mdbxdb := store.NewMDBXDB(dbConfig) - pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) - pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) - pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) + mdbxdb := provideBaseDB(dbConfig) + pebbleDataProofStore := providePebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := providePebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) + pebbleHypergraphStore := providePebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) + pebbleKeyStore := providePebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) @@ -64,21 +64,21 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := logger() dbConfig := configConfig.DB - mdbxdb := store.NewMDBXDB(dbConfig) - pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) - pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) - pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) + mdbxdb := provideBaseDB(dbConfig) + pebbleDataProofStore := providePebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := providePebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) + pebbleHypergraphStore := providePebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) + pebbleKeyStore := providePebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) @@ -98,9 +98,9 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) { func NewClockStore(configConfig *config.Config) (store.ClockStore, error) { dbConfig := configConfig.DB - mdbxdb := store.NewMDBXDB(dbConfig) + mdbxdb := provideBaseDB(dbConfig) zapLogger := logger() - pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger) return pebbleClockStore, nil } @@ -134,7 +134,41 @@ var debugLoggerSet = wire.NewSet( var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager))) -var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewMDBXDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore))) +func provideBaseDB(dbConfig *config.DBConfig) *store.MDBXDB { + return store.NewMDBXDB(dbConfig) +} + +func providePebbleClockStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleClockStore { + return store.NewPebbleClockStore(db.OpenDB("clock_store"), logger2) +} + +func providePebbleCoinStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleCoinStore { + return store.NewPebbleCoinStore(db.OpenDB("coin_store"), logger2) +} + +func providePebbleKeyStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleKeyStore { + return store.NewPebbleKeyStore(db.OpenDB("key_store"), logger2) +} + +func providePebbleDataProofStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleDataProofStore { + return store.NewPebbleDataProofStore(db.OpenDB("data_proof_store"), logger2) +} + +func providePebbleHypergraphStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleHypergraphStore { + return store.NewPebbleHypergraphStore(db.OpenDB("hypergraph_store"), logger2) +} + +func providePeerstoreDatastore(db *store.MDBXDB, logger2 *zap.Logger) *store.PeerstoreDatastore { + return store.NewPeerstoreDatastore(db.OpenDB("peerstore")) +} + +var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), provideBaseDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), providePebbleClockStore, + providePebbleCoinStore, + providePebbleKeyStore, + providePebbleDataProofStore, + providePebbleHypergraphStore, + providePeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore)), +) var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager))) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 766eef7..b9fdc4e 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -67,15 +67,10 @@ func NewMDBXDB(config *config.DBConfig) *MDBXDB { // Open the default database. Other databases are opened using OpenDB) db := &MDBXDB{env: env} - db, err = db.OpenDB(DEFAULT_TABLE) - if err != nil { - panic(err) - } - - return db + return db.OpenDB(DEFAULT_TABLE) } -func (m *MDBXDB) OpenDB(name string) (*MDBXDB, error) { +func (m *MDBXDB) OpenDB(name string) *MDBXDB { var dbi mdbx.DBI var err error err = m.env.Update(func(txn *mdbx.Txn) error { @@ -86,9 +81,9 @@ func (m *MDBXDB) OpenDB(name string) (*MDBXDB, error) { return nil }) if err != nil { - return nil, err + panic(err) } - return &MDBXDB{env: m.env, dbi: dbi}, nil + return &MDBXDB{env: m.env, dbi: dbi} } // KVDB interface implementation diff --git a/node/store/mdbx_test.go b/node/store/mdbx_test.go index 91b0d0d..5ac7bb5 100644 --- a/node/store/mdbx_test.go +++ b/node/store/mdbx_test.go @@ -411,7 +411,7 @@ func TestMDBXDB(t *testing.T) { names := []string{DEFAULT_TABLE, "table1", "table2"} var dbs []MDBXDB for _, name := range names { - newdb, err := db.OpenDB(name) + newdb := db.OpenDB(name) if err != nil { t.Errorf("error opening db: %v", err) t.Fail() diff --git a/node/store/peerstore.go b/node/store/peerstore.go index 85f0c3c..734fdd1 100644 --- a/node/store/peerstore.go +++ b/node/store/peerstore.go @@ -41,11 +41,10 @@ var _ ds.Batching = (*PeerstoreDatastore)(nil) var _ ds.Batch = (*batch)(nil) var _ Peerstore = (*PeerstoreDatastore)(nil) -func NewPeerstoreDatastore(db KVDB) (*PeerstoreDatastore, error) { - ds := PeerstoreDatastore{ +func NewPeerstoreDatastore(db KVDB) *PeerstoreDatastore { + return &PeerstoreDatastore{ db: db, } - return &ds, nil } func (d *PeerstoreDatastore) Put( From 9ad7426920fea91847bef9c56da91ac3ac1c57ef Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 14 May 2025 23:15:58 -0300 Subject: [PATCH 08/21] mdbx: shard table from first key byte --- node/app/wire.go | 42 +++++---------------------- node/app/wire_gen.go | 64 ++++++++++------------------------------- node/store/mdbx.go | 47 ++++++++++++++++-------------- node/store/mdbx_test.go | 41 -------------------------- 4 files changed, 48 insertions(+), 146 deletions(-) diff --git a/node/app/wire.go b/node/app/wire.go index c5ae43d..e6c5957 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -50,44 +50,16 @@ var keyManagerSet = wire.NewSet( wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)), ) -func provideBaseDB(dbConfig *config.DBConfig) *store.MDBXDB { - return store.NewMDBXDB(dbConfig) -} - -func providePebbleClockStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleClockStore { - return store.NewPebbleClockStore(db.OpenDB("clock_store"), logger) -} - -func providePebbleCoinStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleCoinStore { - return store.NewPebbleCoinStore(db.OpenDB("coin_store"), logger) -} - -func providePebbleKeyStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleKeyStore { - return store.NewPebbleKeyStore(db.OpenDB("key_store"), logger) -} - -func providePebbleDataProofStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleDataProofStore { - return store.NewPebbleDataProofStore(db.OpenDB("data_proof_store"), logger) -} - -func providePebbleHypergraphStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleHypergraphStore { - return store.NewPebbleHypergraphStore(db.OpenDB("hypergraph_store"), logger) -} - -func providePeerstoreDatastore(db *store.MDBXDB, logger *zap.Logger) *store.PeerstoreDatastore { - return store.NewPeerstoreDatastore(db.OpenDB("peerstore")) -} - var storeSet = wire.NewSet( wire.FieldsOf(new(*config.Config), "DB"), - provideBaseDB, + store.NewMDBXDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), - providePebbleClockStore, - providePebbleCoinStore, - providePebbleKeyStore, - providePebbleDataProofStore, - providePebbleHypergraphStore, - providePeerstoreDatastore, + store.NewPebbleClockStore, + store.NewPebbleCoinStore, + store.NewPebbleKeyStore, + store.NewPebbleDataProofStore, + store.NewPebbleHypergraphStore, + store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index a31d73c..0d9f545 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -37,21 +37,21 @@ func NewDHTNode(configConfig *config.Config) (*DHTNode, error) { func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := debugLogger() dbConfig := configConfig.DB - mdbxdb := provideBaseDB(dbConfig) - pebbleDataProofStore := providePebbleDataProofStore(mdbxdb, zapLogger) - pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger) - pebbleCoinStore := providePebbleCoinStore(mdbxdb, zapLogger) + mdbxdb := store.NewMDBXDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := providePebbleHypergraphStore(mdbxdb, zapLogger) + pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := providePebbleKeyStore(mdbxdb, zapLogger) + pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) @@ -64,21 +64,21 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := logger() dbConfig := configConfig.DB - mdbxdb := provideBaseDB(dbConfig) - pebbleDataProofStore := providePebbleDataProofStore(mdbxdb, zapLogger) - pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger) - pebbleCoinStore := providePebbleCoinStore(mdbxdb, zapLogger) + mdbxdb := store.NewMDBXDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := providePebbleHypergraphStore(mdbxdb, zapLogger) + pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := providePebbleKeyStore(mdbxdb, zapLogger) + pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) @@ -98,9 +98,9 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) { func NewClockStore(configConfig *config.Config) (store.ClockStore, error) { dbConfig := configConfig.DB - mdbxdb := provideBaseDB(dbConfig) + mdbxdb := store.NewMDBXDB(dbConfig) zapLogger := logger() - pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) return pebbleClockStore, nil } @@ -134,41 +134,7 @@ var debugLoggerSet = wire.NewSet( var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager))) -func provideBaseDB(dbConfig *config.DBConfig) *store.MDBXDB { - return store.NewMDBXDB(dbConfig) -} - -func providePebbleClockStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleClockStore { - return store.NewPebbleClockStore(db.OpenDB("clock_store"), logger2) -} - -func providePebbleCoinStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleCoinStore { - return store.NewPebbleCoinStore(db.OpenDB("coin_store"), logger2) -} - -func providePebbleKeyStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleKeyStore { - return store.NewPebbleKeyStore(db.OpenDB("key_store"), logger2) -} - -func providePebbleDataProofStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleDataProofStore { - return store.NewPebbleDataProofStore(db.OpenDB("data_proof_store"), logger2) -} - -func providePebbleHypergraphStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleHypergraphStore { - return store.NewPebbleHypergraphStore(db.OpenDB("hypergraph_store"), logger2) -} - -func providePeerstoreDatastore(db *store.MDBXDB, logger2 *zap.Logger) *store.PeerstoreDatastore { - return store.NewPeerstoreDatastore(db.OpenDB("peerstore")) -} - -var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), provideBaseDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), providePebbleClockStore, - providePebbleCoinStore, - providePebbleKeyStore, - providePebbleDataProofStore, - providePebbleHypergraphStore, - providePeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore)), -) +var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewMDBXDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore))) var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager))) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index b9fdc4e..5e71eda 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -5,6 +5,7 @@ import ( "io" "os" "runtime" + "strconv" "sync/atomic" "github.com/cockroachdb/pebble" @@ -13,8 +14,8 @@ import ( ) type MDBXDB struct { - env *mdbx.Env - dbi mdbx.DBI + env *mdbx.Env + dbis map[byte]mdbx.DBI } // this is ugly but I did not find a better way @@ -45,7 +46,7 @@ func NewMDBXDB(config *config.DBConfig) *MDBXDB { } // Configs - if err = env.SetOption(mdbx.OptMaxDB, 200); err != nil { + if err = env.SetOption(mdbx.OptMaxDB, 300); err != nil { panic(err) } if err = env.SetOption(mdbx.OptMaxReaders, READERS_LIMIT); err != nil { @@ -66,11 +67,15 @@ func NewMDBXDB(config *config.DBConfig) *MDBXDB { } // Open the default database. Other databases are opened using OpenDB) - db := &MDBXDB{env: env} - return db.OpenDB(DEFAULT_TABLE) + db := &MDBXDB{env: env, dbis: make(map[byte]mdbx.DBI)} + db.dbis[0] = db.OpenDB(DEFAULT_TABLE) + for i := byte(1); i < 255; i++ { + db.dbis[i] = db.OpenDB(strconv.Itoa(int(i))) + } + return db } -func (m *MDBXDB) OpenDB(name string) *MDBXDB { +func (m *MDBXDB) OpenDB(name string) mdbx.DBI { var dbi mdbx.DBI var err error err = m.env.Update(func(txn *mdbx.Txn) error { @@ -83,7 +88,7 @@ func (m *MDBXDB) OpenDB(name string) *MDBXDB { if err != nil { panic(err) } - return &MDBXDB{env: m.env, dbi: dbi} + return dbi } // KVDB interface implementation @@ -91,7 +96,7 @@ func (m *MDBXDB) Get(key []byte) ([]byte, io.Closer, error) { var result []byte var err error err = m.env.View(func(txn *mdbx.Txn) error { - val, err := txn.Get(m.dbi, key) + val, err := txn.Get(m.dbis[key[0]], key) if err != nil { if mdbx.IsNotFound(err) { return pebble.ErrNotFound @@ -112,7 +117,7 @@ func (m *MDBXDB) Set(key, value []byte) error { panic("another tx is writing") } return m.env.Update(func(txn *mdbx.Txn) error { - return txn.Put(m.dbi, key, value, mdbx.Upsert) + return txn.Put(m.dbis[key[0]], key, value, mdbx.Upsert) }) } @@ -121,7 +126,7 @@ func (m *MDBXDB) Delete(key []byte) error { panic("another tx is writing") } return m.env.Update(func(txn *mdbx.Txn) error { - return txn.Del(m.dbi, key, nil) + return txn.Del(m.dbis[key[0]], key, nil) }) } @@ -148,7 +153,7 @@ func (m *MDBXDB) NewTransaction() Transaction { return &MDBXTransaction{ txn: txn, - dbi: m.dbi, + db: m, } } @@ -162,7 +167,7 @@ func (m *MDBXDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) return nil, err } - cursor, err := txn.OpenCursor(m.dbi) + cursor, err := txn.OpenCursor(m.dbis[lowerBound[0]]) if err != nil { txn.Abort() runtime.UnlockOSThread() @@ -215,11 +220,11 @@ var _ KVDB = (*MDBXDB)(nil) // Transaction implementation type MDBXTransaction struct { txn *mdbx.Txn - dbi mdbx.DBI + db *MDBXDB } func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) { - val, err := t.txn.Get(t.dbi, key) + val, err := t.txn.Get(t.db.dbis[key[0]], key) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, pebble.ErrNotFound @@ -235,7 +240,7 @@ func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) { } func (t *MDBXTransaction) Set(key []byte, value []byte) error { - return t.txn.Put(t.dbi, key, value, 0) + return t.txn.Put(t.db.dbis[key[0]], key, value, 0) } func (t *MDBXTransaction) Commit() error { @@ -247,7 +252,7 @@ func (t *MDBXTransaction) Commit() error { } func (t *MDBXTransaction) Delete(key []byte) error { - return t.txn.Del(t.dbi, key, nil) + return t.txn.Del(t.db.dbis[key[0]], key, nil) } func (t *MDBXTransaction) Abort() error { @@ -262,7 +267,7 @@ func (t *MDBXTransaction) NewIter(lowerBound []byte, upperBound []byte) (Iterato } func (t *MDBXTransaction) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) { - cursor, err := t.txn.OpenCursor(t.dbi) + cursor, err := t.txn.OpenCursor(t.db.dbis[lowerBound[0]]) if err != nil { return nil, err } @@ -296,7 +301,7 @@ func (t *MDBXTransaction) DeleteRange(lowerBound []byte, upperBound []byte) erro return err } for iter.First(); iter.Valid(); iter.Next() { - err = iter.txn.Del(t.dbi, iter.key, nil) + err = iter.txn.Del(t.db.dbis[lowerBound[0]], iter.key, nil) if err != nil { return err } @@ -539,11 +544,11 @@ func (m *MDBXBatch) Commit() error { for _, op := range m.operations { switch op.opcode { case Set: - err = tx.Put(m.db.dbi, op.operand1, op.operand2, mdbx.Upsert) + err = tx.Put(m.db.dbis[op.operand1[0]], op.operand1, op.operand2, mdbx.Upsert) case Delete: - err = tx.Del(m.db.dbi, op.operand1, nil) + err = tx.Del(m.db.dbis[op.operand1[0]], op.operand1, nil) case DeleteRange: - cursor, err := tx.OpenCursor(m.db.dbi) + cursor, err := tx.OpenCursor(m.db.dbis[op.operand1[0]]) if err != nil { return err } diff --git a/node/store/mdbx_test.go b/node/store/mdbx_test.go index 5ac7bb5..f1bac20 100644 --- a/node/store/mdbx_test.go +++ b/node/store/mdbx_test.go @@ -407,45 +407,4 @@ func TestMDBXDB(t *testing.T) { } }) - t.Run("OpenDB", func(t *testing.T) { - names := []string{DEFAULT_TABLE, "table1", "table2"} - var dbs []MDBXDB - for _, name := range names { - newdb := db.OpenDB(name) - if err != nil { - t.Errorf("error opening db: %v", err) - t.Fail() - } - dbs = append(dbs, *newdb) - } - for j, currentDB := range dbs { - for i := 1; i <= 5; i++ { - name := names[j] - key := []byte("key-" + string(rune('0'+i))) - value := []byte(name + string(rune('0'+i))) - if err := currentDB.Set(key, value); err != nil { - t.Fatalf("Failed to set value: %v", err) - } - } - } - - // Check that values under the same key on different dbs were not overwritten - for j, currentDB := range dbs { - for i := 1; i <= 5; i++ { - name := names[j] - key := []byte("key-" + string(rune('0'+i))) - expectedValue := []byte(name + string(rune('0'+i))) - value, _, err := currentDB.Get(key) - if !bytes.Equal(value, expectedValue) { - t.Errorf("%s was overwritten by %s on db %s", expectedValue, value, name) - t.Fail() - } - if err != nil { - t.Errorf("Error getting value: %v", err) - } - } - } - - }) - } From 128252a7b8d35a7c440d8522fc236e7079c1e7b0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 19 May 2025 11:29:22 -0300 Subject: [PATCH 09/21] mdbx: add migration from pebble option --- node/main.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++ node/store/mdbx.go | 3 +++ 2 files changed, 62 insertions(+) diff --git a/node/main.go b/node/main.go index 0bc5a87..a2bafa9 100644 --- a/node/main.go +++ b/node/main.go @@ -97,6 +97,11 @@ var ( false, "print node related information", ) + migrate = flag.String( + "migrate", + "", + "migrate from Pebble to MDBX from specified path (e.g. /home/user/backup/.config/store)", + ) debug = flag.Bool( "debug", false, @@ -375,6 +380,60 @@ func main() { console.Run() return } + if *migrate != "" { + fmt.Printf("Migrating from Pebble to MDBX from %s\n", *migrate) + d, err := os.Stat(filepath.Join(*migrate, "LOCK")) + if err != nil || d == nil { + fmt.Printf("%s does not look like a pebble db! Double check your path", *migrate) + return + } + dbConfig := &config.DBConfig{ + Path: *migrate, + } + pebbleInput := store.NewPebbleDB(dbConfig) + mdbxOutput := store.NewMDBXDB(nodeConfig.DB) + + allIter, err := pebbleInput.NewIter(nil, nil) + if err != nil { + panic(err) + } + batch := mdbxOutput.NewBatch(false) + total := 0 + for allIter.First(); allIter.Valid(); allIter.Next() { + err := batch.Set(allIter.Key(), allIter.Value()) + if err != nil { + panic(err) + } + total++ + if total%10_000 == 0 { + err := batch.Commit() + if err != nil { + panic(err) + } + fmt.Printf("Commit. Total: %d", total) + } + } + err = batch.Commit() + if err != nil { + panic(err) + } + fmt.Printf("Commit. Total: %d", total) + err = allIter.Close() + if err != nil { + panic(err) + } + err = pebbleInput.Close() + if err != nil { + panic(err) + } + err = mdbxOutput.Close() + if err != nil { + panic(err) + } + fmt.Println("Done.") + + return + } if *dhtOnly { done := make(chan os.Signal, 1) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 5e71eda..b7b99c6 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -553,6 +553,9 @@ func (m *MDBXBatch) Commit() error { return err } key, _, err := cursor.Get(op.operand1, nil, mdbx.SetRange) + if err != nil { + return err + } for bytes.Compare(key, op.operand1) >= 0 && bytes.Compare(key, op.operand2) < 0 { err = cursor.Del(mdbx.Current) if err != nil { From 53b0c9821b3d212cb7cbc36c13a35ee660dacf4a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 02:06:14 -0300 Subject: [PATCH 10/21] mdbx: compress values --- node/main.go | 4 +- node/store/mdbx.go | 98 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/node/main.go b/node/main.go index a2bafa9..072a7fb 100644 --- a/node/main.go +++ b/node/main.go @@ -410,14 +410,14 @@ func main() { if err != nil { panic(err) } - fmt.Printf("Commit. Total: %d", total) + fmt.Printf("Commit. Total: %d\n", total) } } err = batch.Commit() if err != nil { panic(err) } - fmt.Printf("Commit. Total: %d", total) + fmt.Printf("Commit. Total: %d\n", total) err = allIter.Close() if err != nil { panic(err) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index b7b99c6..959ebd1 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -2,6 +2,7 @@ package store import ( "bytes" + "compress/zlib" "io" "os" "runtime" @@ -13,6 +14,72 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/config" ) +// Compression header to identify compressed values +var compressionHeader = []byte{0x1F, 0x8B} + +// compressValue compresses a byte slice using zlib compression +// It adds a header to identify the value as compressed +func compressValue(value []byte) ([]byte, error) { + // Don't compress small values or nil values + if len(value) < 64 { + return value, nil + } + + var b bytes.Buffer + w := zlib.NewWriter(&b) + + if _, err := w.Write(value); err != nil { + return nil, err + } + + if err := w.Close(); err != nil { + return nil, err + } + + // Add compression header to the compressed data + compressed := append(compressionHeader, b.Bytes()...) + + // Only use compression if it actually reduces the size + if len(compressed) < len(value) { + return compressed, nil + } + + // If compression doesn't help, return the original value + return value, nil +} + +// decompressValue decompresses a byte slice if it was compressed +// It checks for the compression header to determine if decompression is needed +func decompressValue(value []byte) ([]byte, error) { + // Handle nil or empty values + if value == nil || len(value) < len(compressionHeader) { + return value, nil + } + + // Check if the value has our compression header + if !bytes.Equal(value[:len(compressionHeader)], compressionHeader) { + return value, nil // Not compressed, return as is + } + + // Remove the header before decompression + compressedData := value[len(compressionHeader):] + + // Create a zlib reader + r, err := zlib.NewReader(bytes.NewReader(compressedData)) + if err != nil { + return nil, err + } + defer r.Close() + + // Read the decompressed data + var b bytes.Buffer + if _, err := io.Copy(&b, r); err != nil { + return nil, err + } + + return b.Bytes(), nil +} + type MDBXDB struct { env *mdbx.Env dbis map[byte]mdbx.DBI @@ -108,6 +175,14 @@ func (m *MDBXDB) Get(key []byte) ([]byte, io.Closer, error) { return err }) + // Decompress the value if it was compressed + if result != nil { + result, err = decompressValue(result) + if err != nil { + return nil, nil, err + } + } + // no closer, the transaction is already closed return result, noopClose{}, err } @@ -116,8 +191,15 @@ func (m *MDBXDB) Set(key, value []byte) error { if writeTxs.Load() > 1 { panic("another tx is writing") } + + // Compress the value before storing + compressedValue, err := compressValue(value) + if err != nil { + return err + } + return m.env.Update(func(txn *mdbx.Txn) error { - return txn.Put(m.dbis[key[0]], key, value, mdbx.Upsert) + return txn.Put(m.dbis[key[0]], key, compressedValue, mdbx.Upsert) }) } @@ -236,11 +318,23 @@ func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) { result := make([]byte, len(val)) copy(result, val) + // Decompress the value if it was compressed + result, err = decompressValue(result) + if err != nil { + return nil, nil, err + } + return result, io.NopCloser(nil), nil } func (t *MDBXTransaction) Set(key []byte, value []byte) error { - return t.txn.Put(t.db.dbis[key[0]], key, value, 0) + // Compress the value before storing + compressedValue, err := compressValue(value) + if err != nil { + return err + } + + return t.txn.Put(t.db.dbis[key[0]], key, compressedValue, 0) } func (t *MDBXTransaction) Commit() error { From a682134adc8d61a1feefc0d0481ff7a46440e809 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 02:12:15 -0300 Subject: [PATCH 11/21] mdbx: copy values on batch --- node/store/mdbx.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 959ebd1..ab63ff2 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -674,13 +674,19 @@ func (m *MDBXBatch) Commit() error { // Delete implements Transaction. func (m *MDBXBatch) Delete(key []byte) error { - m.operations = append(m.operations, BatchOperation{opcode: Delete, operand1: key}) + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + m.operations = append(m.operations, BatchOperation{opcode: Delete, operand1: keyCopy}) return nil } // DeleteRange implements Transaction. func (m *MDBXBatch) DeleteRange(lowerBound []byte, upperBound []byte) error { - m.operations = append(m.operations, BatchOperation{opcode: DeleteRange, operand1: lowerBound, operand2: upperBound}) + lowCopy := make([]byte, len(lowerBound)) + copy(lowCopy, lowerBound) + upCopy := make([]byte, len(upperBound)) + copy(upCopy, upperBound) + m.operations = append(m.operations, BatchOperation{opcode: DeleteRange, operand1: lowCopy, operand2: upCopy}) return nil } @@ -696,7 +702,11 @@ func (m *MDBXBatch) NewIter(lowerBound []byte, upperBound []byte) (Iterator, err // Set implements Transaction. func (m *MDBXBatch) Set(key []byte, value []byte) error { - m.operations = append(m.operations, BatchOperation{opcode: Set, operand1: key, operand2: value}) + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + valueCopy := make([]byte, len(value)) + copy(valueCopy, value) + m.operations = append(m.operations, BatchOperation{opcode: Set, operand1: keyCopy, operand2: valueCopy}) return nil } From 14c02bcef89624788c5d0eea460c752b0186276a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 02:14:28 -0300 Subject: [PATCH 12/21] mdbx: fix settings --- node/store/mdbx.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index ab63ff2..721c1aa 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -96,8 +96,8 @@ const MB = 1 << 20 // erigon defaults, should be tuned to quil const READERS_LIMIT = 32_000 const RP_AUGMENT_LIMIT = 1_000_000 -const MAP_SIZE = 16 * GB -const GROWTH_STEP = 2 * MB +const MAP_SIZE = 2000 * GB +const GROWTH_STEP = 2 * GB const PAGE_SIZE = 4096 const DEFAULT_TABLE = "default" // we use only one for now From f3251c0504dce49e99ec56c5086cf1dc4c7e3ff8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 02:21:04 -0300 Subject: [PATCH 13/21] mdbx: try compression level --- node/store/mdbx.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 721c1aa..caae266 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -21,12 +21,16 @@ var compressionHeader = []byte{0x1F, 0x8B} // It adds a header to identify the value as compressed func compressValue(value []byte) ([]byte, error) { // Don't compress small values or nil values - if len(value) < 64 { + if len(value) < 20 { return value, nil } var b bytes.Buffer - w := zlib.NewWriter(&b) + w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) + + if err != nil { + return nil, err + } if _, err := w.Write(value); err != nil { return nil, err From 13bbfd9fd73714e02d41708cd05c200de4dfc30d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 02:31:18 -0300 Subject: [PATCH 14/21] mdbx: fix compression --- node/store/mdbx.go | 62 ++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index caae266..51633b3 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -14,17 +14,12 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/config" ) -// Compression header to identify compressed values -var compressionHeader = []byte{0x1F, 0x8B} - // compressValue compresses a byte slice using zlib compression // It adds a header to identify the value as compressed func compressValue(value []byte) ([]byte, error) { - // Don't compress small values or nil values - if len(value) < 20 { + if value == nil { return value, nil } - var b bytes.Buffer w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) @@ -39,37 +34,19 @@ func compressValue(value []byte) ([]byte, error) { if err := w.Close(); err != nil { return nil, err } - - // Add compression header to the compressed data - compressed := append(compressionHeader, b.Bytes()...) - - // Only use compression if it actually reduces the size - if len(compressed) < len(value) { - return compressed, nil - } - - // If compression doesn't help, return the original value - return value, nil + return b.Bytes(), nil } // decompressValue decompresses a byte slice if it was compressed // It checks for the compression header to determine if decompression is needed func decompressValue(value []byte) ([]byte, error) { // Handle nil or empty values - if value == nil || len(value) < len(compressionHeader) { + if value == nil { return value, nil } - // Check if the value has our compression header - if !bytes.Equal(value[:len(compressionHeader)], compressionHeader) { - return value, nil // Not compressed, return as is - } - - // Remove the header before decompression - compressedData := value[len(compressionHeader):] - // Create a zlib reader - r, err := zlib.NewReader(bytes.NewReader(compressedData)) + r, err := zlib.NewReader(bytes.NewReader(value)) if err != nil { return nil, err } @@ -449,6 +426,11 @@ func (i *MDBXIterator) First() bool { i.valid = false return false } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } i.key = k i.value = v @@ -472,6 +454,11 @@ func (i *MDBXIterator) Next() bool { i.valid = false return false } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } i.key = k i.value = v @@ -494,6 +481,11 @@ func (i *MDBXIterator) Prev() bool { i.valid = false return false } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } i.key = k i.value = v @@ -555,6 +547,11 @@ func (i *MDBXIterator) SeekLT(target []byte) bool { i.valid = false return false } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } i.key = k i.value = v @@ -587,6 +584,11 @@ func (i *MDBXIterator) Last() bool { } } } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } if err != nil { i.valid = false @@ -708,8 +710,10 @@ func (m *MDBXBatch) NewIter(lowerBound []byte, upperBound []byte) (Iterator, err func (m *MDBXBatch) Set(key []byte, value []byte) error { keyCopy := make([]byte, len(key)) copy(keyCopy, key) - valueCopy := make([]byte, len(value)) - copy(valueCopy, value) + valueCopy, err := compressValue(value) + if err != nil { + return err + } m.operations = append(m.operations, BatchOperation{opcode: Set, operand1: keyCopy, operand2: valueCopy}) return nil } From 03ea2efa8c670b031583c05a9464112ea810accc Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 02:35:33 -0300 Subject: [PATCH 15/21] mdbx: use gzip --- node/store/mdbx.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 51633b3..31045a6 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -2,7 +2,7 @@ package store import ( "bytes" - "compress/zlib" + "compress/gzip" "io" "os" "runtime" @@ -21,7 +21,7 @@ func compressValue(value []byte) ([]byte, error) { return value, nil } var b bytes.Buffer - w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) + w, err := gzip.NewWriterLevel(&b, gzip.BestSpeed) if err != nil { return nil, err @@ -46,7 +46,7 @@ func decompressValue(value []byte) ([]byte, error) { } // Create a zlib reader - r, err := zlib.NewReader(bytes.NewReader(value)) + r, err := gzip.NewReader(bytes.NewReader(value)) if err != nil { return nil, err } From 450d96ec9f9f21565966d667e76d4f5efaae6128 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 02:42:20 -0300 Subject: [PATCH 16/21] mdbx: try lz4 --- node/store/mdbx.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 31045a6..672adf2 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -2,7 +2,6 @@ package store import ( "bytes" - "compress/gzip" "io" "os" "runtime" @@ -11,6 +10,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/erigontech/mdbx-go/mdbx" + lz4 "github.com/pierrec/lz4/v4" "source.quilibrium.com/quilibrium/monorepo/node/config" ) @@ -21,11 +21,7 @@ func compressValue(value []byte) ([]byte, error) { return value, nil } var b bytes.Buffer - w, err := gzip.NewWriterLevel(&b, gzip.BestSpeed) - - if err != nil { - return nil, err - } + w := lz4.NewWriter(&b) if _, err := w.Write(value); err != nil { return nil, err @@ -46,11 +42,7 @@ func decompressValue(value []byte) ([]byte, error) { } // Create a zlib reader - r, err := gzip.NewReader(bytes.NewReader(value)) - if err != nil { - return nil, err - } - defer r.Close() + r := lz4.NewReader(bytes.NewReader(value)) // Read the decompressed data var b bytes.Buffer From 4db2a4a85a41ebca82623fc52bfe7969098f911d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 03:02:02 -0300 Subject: [PATCH 17/21] mdbx: lz4 set fast level --- node/store/mdbx.go | 1 + 1 file changed, 1 insertion(+) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 672adf2..4eadd37 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -22,6 +22,7 @@ func compressValue(value []byte) ([]byte, error) { } var b bytes.Buffer w := lz4.NewWriter(&b) + w.Apply(lz4.CompressionLevelOption(lz4.Fast)) if _, err := w.Write(value); err != nil { return nil, err From 7674199ad86e789da636404dcc67fe6fb1fbdde8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 03:13:08 -0300 Subject: [PATCH 18/21] mdbx: use zstd instead --- node/store/mdbx.go | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 4eadd37..6100962 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -8,9 +8,9 @@ import ( "strconv" "sync/atomic" + "github.com/DataDog/zstd" "github.com/cockroachdb/pebble" "github.com/erigontech/mdbx-go/mdbx" - lz4 "github.com/pierrec/lz4/v4" "source.quilibrium.com/quilibrium/monorepo/node/config" ) @@ -20,18 +20,11 @@ func compressValue(value []byte) ([]byte, error) { if value == nil { return value, nil } - var b bytes.Buffer - w := lz4.NewWriter(&b) - w.Apply(lz4.CompressionLevelOption(lz4.Fast)) - - if _, err := w.Write(value); err != nil { + value, err := zstd.CompressLevel(nil, value, zstd.BestSpeed) + if err != nil { return nil, err } - - if err := w.Close(); err != nil { - return nil, err - } - return b.Bytes(), nil + return value, nil } // decompressValue decompresses a byte slice if it was compressed @@ -41,17 +34,11 @@ func decompressValue(value []byte) ([]byte, error) { if value == nil { return value, nil } - - // Create a zlib reader - r := lz4.NewReader(bytes.NewReader(value)) - - // Read the decompressed data - var b bytes.Buffer - if _, err := io.Copy(&b, r); err != nil { + value, err := zstd.Decompress(nil, value) + if err != nil { return nil, err } - - return b.Bytes(), nil + return value, err } type MDBXDB struct { From fd23adc794b7f3ad7efc3ef63d1c8e7364b9246d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 03:33:33 -0300 Subject: [PATCH 19/21] mdbx: lz4 fast --- node/store/mdbx.go | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 6100962..4eadd37 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -8,9 +8,9 @@ import ( "strconv" "sync/atomic" - "github.com/DataDog/zstd" "github.com/cockroachdb/pebble" "github.com/erigontech/mdbx-go/mdbx" + lz4 "github.com/pierrec/lz4/v4" "source.quilibrium.com/quilibrium/monorepo/node/config" ) @@ -20,11 +20,18 @@ func compressValue(value []byte) ([]byte, error) { if value == nil { return value, nil } - value, err := zstd.CompressLevel(nil, value, zstd.BestSpeed) - if err != nil { + var b bytes.Buffer + w := lz4.NewWriter(&b) + w.Apply(lz4.CompressionLevelOption(lz4.Fast)) + + if _, err := w.Write(value); err != nil { return nil, err } - return value, nil + + if err := w.Close(); err != nil { + return nil, err + } + return b.Bytes(), nil } // decompressValue decompresses a byte slice if it was compressed @@ -34,11 +41,17 @@ func decompressValue(value []byte) ([]byte, error) { if value == nil { return value, nil } - value, err := zstd.Decompress(nil, value) - if err != nil { + + // Create a zlib reader + r := lz4.NewReader(bytes.NewReader(value)) + + // Read the decompressed data + var b bytes.Buffer + if _, err := io.Copy(&b, r); err != nil { return nil, err } - return value, err + + return b.Bytes(), nil } type MDBXDB struct { From 7a49c73232694badecdd9fb3de1c2e2adb058a2d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 03:39:37 -0300 Subject: [PATCH 20/21] mdbx: snappy --- node/store/mdbx.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 4eadd37..2b15ad6 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -10,7 +10,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/erigontech/mdbx-go/mdbx" - lz4 "github.com/pierrec/lz4/v4" + "github.com/golang/snappy" "source.quilibrium.com/quilibrium/monorepo/node/config" ) @@ -21,8 +21,7 @@ func compressValue(value []byte) ([]byte, error) { return value, nil } var b bytes.Buffer - w := lz4.NewWriter(&b) - w.Apply(lz4.CompressionLevelOption(lz4.Fast)) + w := snappy.NewBufferedWriter(&b) if _, err := w.Write(value); err != nil { return nil, err @@ -43,7 +42,7 @@ func decompressValue(value []byte) ([]byte, error) { } // Create a zlib reader - r := lz4.NewReader(bytes.NewReader(value)) + r := snappy.NewReader(bytes.NewReader(value)) // Read the decompressed data var b bytes.Buffer From 65efc51d2d4e432cf87535db5aa9af7939fc9882 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 24 May 2025 03:43:10 -0300 Subject: [PATCH 21/21] mdbx: simplify snappy --- node/store/mdbx.go | 24 ++---------------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/node/store/mdbx.go b/node/store/mdbx.go index 2b15ad6..c46d924 100644 --- a/node/store/mdbx.go +++ b/node/store/mdbx.go @@ -20,17 +20,7 @@ func compressValue(value []byte) ([]byte, error) { if value == nil { return value, nil } - var b bytes.Buffer - w := snappy.NewBufferedWriter(&b) - - if _, err := w.Write(value); err != nil { - return nil, err - } - - if err := w.Close(); err != nil { - return nil, err - } - return b.Bytes(), nil + return snappy.Encode(nil, value), nil } // decompressValue decompresses a byte slice if it was compressed @@ -40,17 +30,7 @@ func decompressValue(value []byte) ([]byte, error) { if value == nil { return value, nil } - - // Create a zlib reader - r := snappy.NewReader(bytes.NewReader(value)) - - // Read the decompressed data - var b bytes.Buffer - if _, err := io.Copy(&b, r); err != nil { - return nil, err - } - - return b.Bytes(), nil + return snappy.Decode(nil, value) } type MDBXDB struct {