diff --git a/node/app/wire.go b/node/app/wire.go index c7a8d4f..e6c5957 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -52,8 +52,8 @@ var keyManagerSet = wire.NewSet( var storeSet = wire.NewSet( wire.FieldsOf(new(*config.Config), "DB"), - store.NewPebbleDB, - wire.Bind(new(store.KVDB), new(*store.PebbleDB)), + store.NewMDBXDB, + wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index d357b7f..0d9f545 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -37,24 +37,24 @@ func NewDHTNode(configConfig *config.Config) (*DHTNode, error) { func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := debugLogger() dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) - pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger) - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) - pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger) + mdbxdb := store.NewMDBXDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := store.NewPebbleHypergraphStore(pebbleDB, zapLogger) + pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := store.NewPebbleKeyStore(pebbleDB, zapLogger) + pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) - node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, pebbleDB) + node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) if err != nil { return nil, err } @@ -64,24 +64,24 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) { zapLogger := logger() dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) - pebbleDataProofStore := store.NewPebbleDataProofStore(pebbleDB, zapLogger) - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) - pebbleCoinStore := store.NewPebbleCoinStore(pebbleDB, zapLogger) + mdbxdb := store.NewMDBXDB(dbConfig) + pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) + pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger) kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger) - pebbleHypergraphStore := store.NewPebbleHypergraphStore(pebbleDB, zapLogger) + pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger) engineConfig := configConfig.Engine masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver) inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger) - pebbleKeyStore := store.NewPebbleKeyStore(pebbleDB, zapLogger) + pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger) tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport) - node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, pebbleDB) + node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb) if err != nil { return nil, err } @@ -98,9 +98,9 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) { func NewClockStore(configConfig *config.Config) (store.ClockStore, error) { dbConfig := configConfig.DB - pebbleDB := store.NewPebbleDB(dbConfig) + mdbxdb := store.NewMDBXDB(dbConfig) zapLogger := logger() - pebbleClockStore := store.NewPebbleClockStore(pebbleDB, zapLogger) + pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger) return pebbleClockStore, nil } @@ -134,7 +134,7 @@ var debugLoggerSet = wire.NewSet( var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager))) -var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, wire.Bind(new(store.KVDB), new(*store.PebbleDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore))) +var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewMDBXDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore))) var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager))) diff --git a/node/consensus/data/frame_importer.go b/node/consensus/data/frame_importer.go index 7f7b1bb..a3a7435 100644 --- a/node/consensus/data/frame_importer.go +++ b/node/consensus/data/frame_importer.go @@ -213,7 +213,7 @@ func (e *DataClockConsensusEngine) applySnapshot( return nil } - temporaryStore := store.NewPebbleDB(&config.DBConfig{ + temporaryStore := store.NewMDBXDB(&config.DBConfig{ Path: snapshotDBPath, }) temporaryClockStore := store.NewPebbleClockStore(temporaryStore, e.logger) diff --git a/node/main.go b/node/main.go index 56512f5..072a7fb 100644 --- a/node/main.go +++ b/node/main.go @@ -97,6 +97,11 @@ var ( false, "print node related information", ) + migrate = flag.String( + "migrate", + "", + "migrate from Pebble to MDBX from specified path (e.g. /home/user/backup/.config/store)", + ) debug = flag.Bool( "debug", false, @@ -329,7 +334,7 @@ func main() { } if *compactDB && *core == 0 { - db := store.NewPebbleDB(nodeConfig.DB) + db := store.NewMDBXDB(nodeConfig.DB) if err := db.CompactAll(); err != nil { panic(err) } @@ -375,6 +380,60 @@ func main() { console.Run() return } + if *migrate != "" { + fmt.Printf("Migrating from Pebble to MDBX from %s\n", *migrate) + d, err := os.Stat(filepath.Join(*migrate, "LOCK")) + if err != nil || d == nil { + fmt.Printf("%s does not look like a pebble db! Double check your path", *migrate) + return + } + dbConfig := &config.DBConfig{ + Path: *migrate, + } + pebbleInput := store.NewPebbleDB(dbConfig) + mdbxOutput := store.NewMDBXDB(nodeConfig.DB) + + allIter, err := pebbleInput.NewIter(nil, nil) + if err != nil { + panic(err) + } + batch := mdbxOutput.NewBatch(false) + total := 0 + for allIter.First(); allIter.Valid(); allIter.Next() { + err := batch.Set(allIter.Key(), allIter.Value()) + if err != nil { + panic(err) + } + total++ + if total%10_000 == 0 { + err := batch.Commit() + if err != nil { + panic(err) + } + fmt.Printf("Commit. Total: %d\n", total) + } + } + err = batch.Commit() + if err != nil { + panic(err) + } + fmt.Printf("Commit. Total: %d\n", total) + err = allIter.Close() + if err != nil { + panic(err) + } + err = pebbleInput.Close() + if err != nil { + panic(err) + } + err = mdbxOutput.Close() + if err != nil { + panic(err) + } + fmt.Println("Done.") + + return + } if *dhtOnly { done := make(chan os.Signal, 1) diff --git a/node/store/mdbx.go b/node/store/mdbx.go new file mode 100644 index 0000000..c46d924 --- /dev/null +++ b/node/store/mdbx.go @@ -0,0 +1,705 @@ +package store + +import ( + "bytes" + "io" + "os" + "runtime" + "strconv" + "sync/atomic" + + "github.com/cockroachdb/pebble" + "github.com/erigontech/mdbx-go/mdbx" + "github.com/golang/snappy" + "source.quilibrium.com/quilibrium/monorepo/node/config" +) + +// compressValue compresses a byte slice using zlib compression +// It adds a header to identify the value as compressed +func compressValue(value []byte) ([]byte, error) { + if value == nil { + return value, nil + } + return snappy.Encode(nil, value), nil +} + +// decompressValue decompresses a byte slice if it was compressed +// It checks for the compression header to determine if decompression is needed +func decompressValue(value []byte) ([]byte, error) { + // Handle nil or empty values + if value == nil { + return value, nil + } + return snappy.Decode(nil, value) +} + +type MDBXDB struct { + env *mdbx.Env + dbis map[byte]mdbx.DBI +} + +// this is ugly but I did not find a better way +var created = false +var readTxs atomic.Int32 +var writeTxs atomic.Int32 + +const GB = 1 << 30 +const MB = 1 << 20 + +// erigon defaults, should be tuned to quil +const READERS_LIMIT = 32_000 +const RP_AUGMENT_LIMIT = 1_000_000 +const MAP_SIZE = 2000 * GB +const GROWTH_STEP = 2 * GB +const PAGE_SIZE = 4096 + +const DEFAULT_TABLE = "default" // we use only one for now + +func NewMDBXDB(config *config.DBConfig) *MDBXDB { + if created { + panic("do not create two instances") + } + created = true + env, err := mdbx.NewEnv(mdbx.Default) + if err != nil { + panic(err) + } + + // Configs + if err = env.SetOption(mdbx.OptMaxDB, 300); err != nil { + panic(err) + } + if err = env.SetOption(mdbx.OptMaxReaders, READERS_LIMIT); err != nil { + panic(err) + } + if err = env.SetOption(mdbx.OptRpAugmentLimit, RP_AUGMENT_LIMIT); err != nil { + panic(err) + } + os.MkdirAll(config.Path, os.ModePerm) + if err = env.SetGeometry(-1, -1, int(MAP_SIZE), int(GROWTH_STEP), -1, int(PAGE_SIZE)); err != nil { + panic(err) + } + + // Open the environment + flags := uint(mdbx.NoReadahead) | mdbx.Durable + if err := env.Open(config.Path, flags, 0664); err != nil { + panic(err) + } + + // Open the default database. Other databases are opened using OpenDB) + db := &MDBXDB{env: env, dbis: make(map[byte]mdbx.DBI)} + db.dbis[0] = db.OpenDB(DEFAULT_TABLE) + for i := byte(1); i < 255; i++ { + db.dbis[i] = db.OpenDB(strconv.Itoa(int(i))) + } + return db +} + +func (m *MDBXDB) OpenDB(name string) mdbx.DBI { + var dbi mdbx.DBI + var err error + err = m.env.Update(func(txn *mdbx.Txn) error { + dbi, err = txn.OpenDBI(name, mdbx.Create, nil, nil) + if err != nil { + return err + } + return nil + }) + if err != nil { + panic(err) + } + return dbi +} + +// KVDB interface implementation +func (m *MDBXDB) Get(key []byte) ([]byte, io.Closer, error) { + var result []byte + var err error + err = m.env.View(func(txn *mdbx.Txn) error { + val, err := txn.Get(m.dbis[key[0]], key) + if err != nil { + if mdbx.IsNotFound(err) { + return pebble.ErrNotFound + } + return err + } + result = make([]byte, len(val)) + copy(result, val) + return err + }) + + // Decompress the value if it was compressed + if result != nil { + result, err = decompressValue(result) + if err != nil { + return nil, nil, err + } + } + + // no closer, the transaction is already closed + return result, noopClose{}, err +} + +func (m *MDBXDB) Set(key, value []byte) error { + if writeTxs.Load() > 1 { + panic("another tx is writing") + } + + // Compress the value before storing + compressedValue, err := compressValue(value) + if err != nil { + return err + } + + return m.env.Update(func(txn *mdbx.Txn) error { + return txn.Put(m.dbis[key[0]], key, compressedValue, mdbx.Upsert) + }) +} + +func (m *MDBXDB) Delete(key []byte) error { + if writeTxs.Load() > 1 { + panic("another tx is writing") + } + return m.env.Update(func(txn *mdbx.Txn) error { + return txn.Del(m.dbis[key[0]], key, nil) + }) +} + +func (m *MDBXDB) NewBatch(indexed bool) Transaction { + // MDBX doesn't have a direct equivalent to Pebble's indexed batch + // We'll use a regular transaction for both cases + return &MDBXBatch{ + db: m, + } +} + +func (m *MDBXDB) NewTransaction() Transaction { + txn, err := m.env.BeginTxn(nil, 0) + if writeTxs.Load() > 1 { + panic("another tx is writing") + } + writeTxs.Add(1) + runtime.LockOSThread() + if err != nil { + writeTxs.Add(-1) + runtime.UnlockOSThread() + panic(err) + } + + return &MDBXTransaction{ + txn: txn, + db: m, + } +} + +func (m *MDBXDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { + txn, err := m.env.BeginTxn(nil, mdbx.Readonly) + runtime.LockOSThread() + readTxs.Add(1) + if err != nil { + runtime.UnlockOSThread() + readTxs.Add(-1) + return nil, err + } + + cursor, err := txn.OpenCursor(m.dbis[lowerBound[0]]) + if err != nil { + txn.Abort() + runtime.UnlockOSThread() + readTxs.Add(-1) + return nil, err + } + + return &MDBXIterator{ + txn: txn, + cursor: cursor, + lowerBound: lowerBound, + upperBound: upperBound, + valid: false, + txOwner: true, + }, nil +} + +func (m *MDBXDB) Compact(start, end []byte, parallelize bool) error { + // MDBX handles compaction differently than Pebble + // We can use env.Copy2fd to create a compacted copy, but for now + // we'll just return nil as MDBX handles this internally + return nil +} + +func (m *MDBXDB) CompactAll() error { + // MDBX handles compaction differently than Pebble + // For now, we'll just return nil as MDBX handles this internally + return nil +} + +func (m *MDBXDB) Close() error { + m.env.Close() + return nil +} + +func (m *MDBXDB) DeleteRange(start, end []byte) error { + tx := m.NewBatch(false) + err := tx.DeleteRange(start, end) + if err != nil { + tx.Abort() + return err + } + tx.Commit() + return nil +} + +// Ensure MDBXDB implements KVDB interface +var _ KVDB = (*MDBXDB)(nil) + +// Transaction implementation +type MDBXTransaction struct { + txn *mdbx.Txn + db *MDBXDB +} + +func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) { + val, err := t.txn.Get(t.db.dbis[key[0]], key) + if err != nil { + if mdbx.IsNotFound(err) { + return nil, nil, pebble.ErrNotFound + } + return nil, nil, err + } + + // Copy the value since it's only valid during the transaction + result := make([]byte, len(val)) + copy(result, val) + + // Decompress the value if it was compressed + result, err = decompressValue(result) + if err != nil { + return nil, nil, err + } + + return result, io.NopCloser(nil), nil +} + +func (t *MDBXTransaction) Set(key []byte, value []byte) error { + // Compress the value before storing + compressedValue, err := compressValue(value) + if err != nil { + return err + } + + return t.txn.Put(t.db.dbis[key[0]], key, compressedValue, 0) +} + +func (t *MDBXTransaction) Commit() error { + // we drop latency here, but it should be saved as metric + _, err := t.txn.Commit() + runtime.UnlockOSThread() + writeTxs.Add(-1) + return err +} + +func (t *MDBXTransaction) Delete(key []byte) error { + return t.txn.Del(t.db.dbis[key[0]], key, nil) +} + +func (t *MDBXTransaction) Abort() error { + t.txn.Abort() + runtime.UnlockOSThread() + writeTxs.Add(-1) + return nil +} + +func (t *MDBXTransaction) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { + return t.NewIterCustom(lowerBound, upperBound) +} + +func (t *MDBXTransaction) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) { + cursor, err := t.txn.OpenCursor(t.db.dbis[lowerBound[0]]) + if err != nil { + return nil, err + } + + var key, value []byte + if lowerBound == nil { + key, value, err = cursor.Get(nil, nil, mdbx.First) + } else { + key, value, err = cursor.Get(lowerBound, nil, mdbx.SetRange) + } + if err != nil { + return nil, err + } + + return &MDBXIterator{ + txn: t.txn, + cursor: cursor, + lowerBound: lowerBound, + upperBound: upperBound, + valid: upperBound == nil || bytes.Compare(key, upperBound) < 0, + key: key, + value: value, + txOwner: false, + }, nil +} + +func (t *MDBXTransaction) DeleteRange(lowerBound []byte, upperBound []byte) error { + iter, err := t.NewIterCustom(lowerBound, upperBound) + defer iter.Close() + if err != nil { + return err + } + for iter.First(); iter.Valid(); iter.Next() { + err = iter.txn.Del(t.db.dbis[lowerBound[0]], iter.key, nil) + if err != nil { + return err + } + } + return nil +} + +// Ensure MDBXTransaction implements Transaction interface +var _ Transaction = (*MDBXTransaction)(nil) + +// Iterator implementation +type MDBXIterator struct { + txn *mdbx.Txn + cursor *mdbx.Cursor + lowerBound []byte + upperBound []byte + key []byte + value []byte + valid bool + txOwner bool +} + +func (i *MDBXIterator) Key() []byte { + if !i.valid { + return nil + } + return i.key +} + +func (i *MDBXIterator) First() bool { + var err error + var k, v []byte + + if i.lowerBound == nil { + k, v, err = i.cursor.Get(nil, nil, mdbx.First) + } else { + k, v, err = i.cursor.Get(i.lowerBound, nil, mdbx.SetRange) + } + + if err != nil { + i.valid = false + return false + } + + // Check if key is within upper bound + if i.upperBound != nil && bytes.Compare(k, i.upperBound) >= 0 { + i.valid = false + return false + } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +func (i *MDBXIterator) Next() bool { + if !i.valid { + return false + } + + k, v, err := i.cursor.Get(nil, nil, mdbx.Next) + if err != nil { + i.valid = false + return false + } + + // Check if key is within upper bound + if i.upperBound != nil && bytes.Compare(k, i.upperBound) >= 0 { + i.valid = false + return false + } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } + + i.key = k + i.value = v + return true +} + +func (i *MDBXIterator) Prev() bool { + if !i.valid { + return false + } + + k, v, err := i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } + + i.key = k + i.value = v + return true +} + +func (i *MDBXIterator) Valid() bool { + return i.valid +} + +func (i *MDBXIterator) Value() []byte { + if !i.valid { + return nil + } + return i.value +} + +func (i *MDBXIterator) Close() error { + i.valid = false + i.cursor.Close() + if i.txOwner { + i.txn.Abort() + readTxs.Add(-1) + runtime.UnlockOSThread() + } + return nil +} + +func (i *MDBXIterator) SeekLT(target []byte) bool { + if target == nil { + return i.Last() + } + + // First try to find the exact key + k, v, err := i.cursor.Get(target, nil, mdbx.SetRange) + if err != nil { + // If not found, try to find the last key + return i.Last() + } + + // If we found the exact key, move to the previous one + if bytes.Compare(k, target) == 0 { + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } else { + // If we found a key greater than target, move to the previous one + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +func (i *MDBXIterator) Last() bool { + var err error + var k, v []byte + + if i.upperBound == nil { + k, v, err = i.cursor.Get(nil, nil, mdbx.Last) + } else { + // Position at or before upper bound + k, v, err = i.cursor.Get(i.upperBound, nil, mdbx.SetRange) + if err != nil { + // If upper bound is beyond last key, get the last key + k, v, err = i.cursor.Get(nil, nil, mdbx.Last) + if err != nil { + i.valid = false + return false + } + } else if bytes.Compare(k, i.upperBound) >= 0 { + // If we found the upper bound or beyond, move to previous + k, v, err = i.cursor.Get(nil, nil, mdbx.Prev) + if err != nil { + i.valid = false + return false + } + } + } + v, err = decompressValue(v) + if err != nil { + i.valid = false + return false + } + + if err != nil { + i.valid = false + return false + } + + // Check if key is within lower bound + if i.lowerBound != nil && bytes.Compare(k, i.lowerBound) < 0 { + i.valid = false + return false + } + + i.key = k + i.value = v + i.valid = true + return true +} + +// Ensure MDBXIterator implements Iterator interface +var _ Iterator = (*MDBXIterator)(nil) + +// Helper for closing transactions +type txnCloser struct { + txn *mdbx.Txn +} + +func (c txnCloser) Close() error { + c.txn.Abort() + return nil +} + +type noopClose struct{} + +func (n noopClose) Close() error { + return nil +} + +type MDBXBatch struct { + db *MDBXDB + operations []BatchOperation +} + +// Abort implements Transaction. +func (m *MDBXBatch) Abort() error { + m.operations = nil + return nil +} + +// Commit implements Transaction. +func (m *MDBXBatch) Commit() error { + err := m.db.env.Update(func(tx *mdbx.Txn) error { + var err error + for _, op := range m.operations { + switch op.opcode { + case Set: + err = tx.Put(m.db.dbis[op.operand1[0]], op.operand1, op.operand2, mdbx.Upsert) + case Delete: + err = tx.Del(m.db.dbis[op.operand1[0]], op.operand1, nil) + case DeleteRange: + cursor, err := tx.OpenCursor(m.db.dbis[op.operand1[0]]) + if err != nil { + return err + } + key, _, err := cursor.Get(op.operand1, nil, mdbx.SetRange) + if err != nil { + return err + } + for bytes.Compare(key, op.operand1) >= 0 && bytes.Compare(key, op.operand2) < 0 { + err = cursor.Del(mdbx.Current) + if err != nil { + return err + } + key, _, err = cursor.Get(nil, nil, mdbx.Next) + if err != nil { + return err + } + } + cursor.Close() + } + if err != nil { + return err + } + } + return err + }) + m.operations = nil + return err +} + +// Delete implements Transaction. +func (m *MDBXBatch) Delete(key []byte) error { + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + m.operations = append(m.operations, BatchOperation{opcode: Delete, operand1: keyCopy}) + return nil +} + +// DeleteRange implements Transaction. +func (m *MDBXBatch) DeleteRange(lowerBound []byte, upperBound []byte) error { + lowCopy := make([]byte, len(lowerBound)) + copy(lowCopy, lowerBound) + upCopy := make([]byte, len(upperBound)) + copy(upCopy, upperBound) + m.operations = append(m.operations, BatchOperation{opcode: DeleteRange, operand1: lowCopy, operand2: upCopy}) + return nil +} + +// Get implements Transaction. +func (m *MDBXBatch) Get(key []byte) ([]byte, io.Closer, error) { + panic("unimplemented") +} + +// NewIter implements Transaction. +func (m *MDBXBatch) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { + return m.db.NewIter(lowerBound, upperBound) +} + +// Set implements Transaction. +func (m *MDBXBatch) Set(key []byte, value []byte) error { + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + valueCopy, err := compressValue(value) + if err != nil { + return err + } + m.operations = append(m.operations, BatchOperation{opcode: Set, operand1: keyCopy, operand2: valueCopy}) + return nil +} + +var _ Transaction = (*MDBXBatch)(nil) + +type BatchOperation struct { + opcode uint8 + operand1 []byte + operand2 []byte +} + +const ( + Set = iota + Delete + DeleteRange +) diff --git a/node/store/mdbx_test.go b/node/store/mdbx_test.go new file mode 100644 index 0000000..f1bac20 --- /dev/null +++ b/node/store/mdbx_test.go @@ -0,0 +1,410 @@ +package store + +import ( + "bytes" + "errors" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/cockroachdb/pebble" + "source.quilibrium.com/quilibrium/monorepo/node/config" +) + +func TestMDBXDB(t *testing.T) { + // Create temporary directory + tempDir, err := os.MkdirTemp("", "mdbx-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + // Create DB config + dbConfig := &config.DBConfig{ + Path: tempDir, + } + + // Create DB instance + db := NewMDBXDB(dbConfig) + defer db.Close() + + // Test basic operations + t.Run("BasicOperations", func(t *testing.T) { + // Test Set and Get + key := []byte("test-key") + value := []byte("test-value") + + if err := db.Set(key, value); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + + result, closer, err := db.Get(key) + if err != nil { + t.Fatalf("Failed to get value: %v", err) + } + defer closer.Close() + + if !bytes.Equal(result, value) { + t.Errorf("Expected %v, got %v", value, result) + } + + // Test Delete + if err := db.Delete(key); err != nil { + t.Fatalf("Failed to delete key: %v", err) + } + + _, _, err = db.Get(key) + if err == nil { + t.Errorf("Expected error for deleted key") + } + }) + + // Test transactions + t.Run("Transactions", func(t *testing.T) { + // Create a non-indexed transaction + tx := db.NewTransaction() + + // Set some values in the transaction + key1 := []byte("tx-key-1") + value1 := []byte("tx-value-1") + key2 := []byte("tx-key-2") + value2 := []byte("tx-value-2") + + if err := tx.Set(key1, value1); err != nil { + t.Fatalf("Failed to set value in transaction: %v", err) + } + if err := tx.Set(key2, value2); err != nil { + t.Fatalf("Failed to set value in transaction: %v", err) + } + + // Values should not be visible outside the transaction yet + _, _, err := db.Get(key1) + if err == nil { + t.Errorf("Key should not be visible outside transaction before commit") + } + + // Values should be visible inside the transaction + result, closer, err := tx.Get(key1) + if err != nil { + t.Fatalf("Failed to get value from transaction: %v", err) + } + defer closer.Close() + if !bytes.Equal(result, value1) { + t.Errorf("Expected %v, got %v", value1, result) + } + + // Commit the transaction + if err := tx.Commit(); err != nil { + t.Fatalf("Failed to commit transaction: %v", err) + } + + // Values should now be visible outside the transaction + result, closer, err = db.Get(key1) + if err != nil { + t.Fatalf("Failed to get value after commit: %v", err) + } + defer closer.Close() + if !bytes.Equal(result, value1) { + t.Errorf("Expected %v, got %v", value1, result) + } + + // Test transaction abort + tx = db.NewBatch(false) + abortKey := []byte("abort-key") + abortValue := []byte("abort-value") + + if err := tx.Set(abortKey, abortValue); err != nil { + t.Fatalf("Failed to set value in transaction: %v", err) + } + + if err := tx.Abort(); err != nil { + t.Fatalf("Failed to abort transaction: %v", err) + } + + // Value should not be visible after abort + _, _, err = db.Get(abortKey) + if err == nil { + t.Errorf("Key should not be visible after transaction abort") + } + + // Test transaction DeleteRange + tx = db.NewTransaction() + + // Set up some keys for the range delete + for i := 1; i <= 5; i++ { + key := []byte(filepath.Join("tx-range", "key", "prefix", "key-"+string(rune('0'+i)))) + value := []byte("tx-value-" + string(rune('0'+i))) + if err := tx.Set(key, value); err != nil { + t.Fatalf("Failed to set value in transaction: %v", err) + } + } + + // Delete a range within the transaction + startKey := []byte(filepath.Join("tx-range", "key", "prefix", "key-2")) + endKey := []byte(filepath.Join("tx-range", "key", "prefix", "key-4")) + fmt.Println("---") + if err := tx.DeleteRange(startKey, endKey); err != nil { + t.Fatalf("Failed to delete range in transaction: %v", err) + } + fmt.Println("---") + + // Check that keys in the range are deleted within the transaction + for i := 2; i < 4; i++ { + key := []byte(filepath.Join("tx-range", "key", "prefix", "key-"+string(rune('0'+i)))) + _, _, err := tx.Get(key) + if err == nil { + t.Errorf("Key %s should be deleted in transaction", key) + } + } + + // Check that keys outside the range still exist within the transaction + txKey1 := []byte(filepath.Join("tx-range", "key", "prefix", "key-1")) + txResult, txCloser, txErr := tx.Get(txKey1) + if txErr != nil { + t.Errorf("Key %s should still exist in transaction: %v", txKey1, txErr) + } else { + defer txCloser.Close() + if !bytes.Equal(txResult, []byte("tx-value-1")) { + t.Errorf("Expected %v, got %v", []byte("tx-value-1"), txResult) + } + } + + // Commit the transaction + if err := tx.Commit(); err != nil { + t.Fatalf("Failed to commit transaction: %v", err) + } + + // Verify the changes are visible outside the transaction + for i := 2; i < 4; i++ { + key := []byte(filepath.Join("tx-range", "key", "prefix", "key-"+string(rune('0'+i)))) + _, _, err := db.Get(key) + if err == nil { + t.Errorf("Key %s should be deleted after commit", key) + } + } + + txKey5 := []byte(filepath.Join("tx-range", "key", "prefix", "key-5")) + txResult2, txCloser2, txErr2 := db.Get(txKey5) + if txErr2 != nil { + t.Errorf("Key %s should still exist after commit: %v", txKey5, txErr2) + } else { + defer txCloser2.Close() + if !bytes.Equal(txResult2, []byte("tx-value-5")) { + t.Errorf("Expected %v, got %v", []byte("tx-value-5"), txResult2) + } + } + }) + + // Test iterators + t.Run("Iterators", func(t *testing.T) { + // Insert some test data + testData := map[string]string{ + "iter-key-1": "iter-value-1", + "iter-key-2": "iter-value-2", + "iter-key-3": "iter-value-3", + "iter-key-4": "iter-value-4", + "iter-key-5": "iter-value-5", + } + + for k, v := range testData { + if err := db.Set([]byte(k), []byte(v)); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + } + + // Test full range iterator + iter, err := db.NewIter([]byte("iter-key-1"), []byte("iter-key-6")) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + count := 0 + for iter.First(); iter.Valid(); iter.Next() { + key := string(iter.Key()) + value := string(iter.Value()) + expectedValue, ok := testData[key] + if !ok { + t.Errorf("Unexpected key: %s", key) + } else if expectedValue != value { + t.Errorf("Expected value %s for key %s, got %s", expectedValue, key, value) + } + count++ + } + + if count != len(testData) { + t.Errorf("Expected %d items, got %d", len(testData), count) + } + + // Test partial range iterator + iter, err = db.NewIter([]byte("iter-key-2"), []byte("iter-key-4")) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + expectedKeys := []string{"iter-key-2", "iter-key-3"} + count = 0 + for iter.First(); iter.Valid(); iter.Next() { + key := string(iter.Key()) + if count >= len(expectedKeys) || key != expectedKeys[count] { + t.Errorf("Expected key %s, got %s", expectedKeys[count], key) + } + count++ + } + + if count != len(expectedKeys) { + t.Errorf("Expected %d items in range, got %d", len(expectedKeys), count) + } + + // Test Last and Prev + iter, err = db.NewIter([]byte("iter-key-1"), []byte("iter-key-6")) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + if !iter.Last() { + t.Errorf("Last() should return true") + } + + if string(iter.Key()) != "iter-key-5" { + t.Errorf("Expected last key to be iter-key-5, got %s", string(iter.Key())) + } + + count = 0 + expectedReverseKeys := []string{"iter-key-5", "iter-key-4", "iter-key-3", "iter-key-2", "iter-key-1"} + for ; iter.Valid(); iter.Prev() { + key := string(iter.Key()) + if count >= len(expectedReverseKeys) || key != expectedReverseKeys[count] { + t.Errorf("Expected key %s, got %s", expectedReverseKeys[count], key) + } + count++ + } + + if count != len(expectedReverseKeys) { + t.Errorf("Expected %d items in reverse range, got %d", len(expectedReverseKeys), count) + } + + // Test SeekLT + iter, err = db.NewIter([]byte("iter-key-1"), []byte("iter-key-6")) + if err != nil { + t.Fatalf("Failed to create iterator: %v", err) + } + defer iter.Close() + + if !iter.SeekLT([]byte("iter-key-4")) { + t.Errorf("SeekLT() should return true") + } + + if string(iter.Key()) != "iter-key-3" { + t.Errorf("Expected key after SeekLT to be iter-key-3, got %s", string(iter.Key())) + } + + // Test SeekLT with a key that doesn't exist but is in range + if !iter.SeekLT([]byte("iter-key-3.5")) { + t.Errorf("SeekLT() should return true") + } + + if string(iter.Key()) != "iter-key-3" { + t.Errorf("Expected key after SeekLT to be iter-key-3, got %s", string(iter.Key())) + } + }) + + // Test DeleteRange + t.Run("DeleteRange", func(t *testing.T) { + // Insert some test data + for i := 1; i <= 5; i++ { + key := []byte(filepath.Join("range", "key", "prefix", "key-"+string(rune('0'+i)))) + value := []byte("value-" + string(rune('0'+i))) + if err := db.Set(key, value); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + } + + // Delete a range of keys + startKey := []byte(filepath.Join("range", "key", "prefix", "key-2")) + endKey := []byte(filepath.Join("range", "key", "prefix", "key-4")) + if err := db.DeleteRange(startKey, endKey); err != nil { + t.Fatalf("Failed to delete range: %v", err) + } + + // Check that keys in the range are deleted + for i := 2; i < 4; i++ { + key := []byte(filepath.Join("range", "key", "prefix", "key-"+string(rune('0'+i)))) + _, _, err := db.Get(key) + if err == nil { + t.Errorf("Key %s should be deleted", key) + } + } + + // Check that keys outside the range still exist + key1 := []byte(filepath.Join("range", "key", "prefix", "key-1")) + result, closer, err := db.Get(key1) + if err != nil { + t.Errorf("Key %s should still exist: %v", key1, err) + } else { + defer closer.Close() + if !bytes.Equal(result, []byte("value-1")) { + t.Errorf("Expected %v, got %v", []byte("value-1"), result) + } + } + + key5 := []byte(filepath.Join("range", "key", "prefix", "key-5")) + result, closer, err = db.Get(key5) + if err != nil { + t.Errorf("Key %s should still exist: %v", key5, err) + } else { + defer closer.Close() + if !bytes.Equal(result, []byte("value-5")) { + t.Errorf("Expected %v, got %v", []byte("value-5"), result) + } + } + }) + + t.Run("Batch", func(t *testing.T) { + // Insert some test data + for i := 1; i <= 5; i++ { + key := []byte("key-" + string(rune('0'+i))) + value := []byte("value-" + string(rune('0'+i))) + if err := db.Set(key, value); err != nil { + t.Fatalf("Failed to set value: %v", err) + } + } + + batch := db.NewBatch(false) + batch.Set([]byte("key-1"), []byte("potato")) + batch.Set([]byte("key-5"), []byte("tomato")) + batch.Delete([]byte("key-2")) + batch.DeleteRange([]byte("key-3"), []byte("key-5")) + if batch.Commit() != nil { + t.Errorf("commit failed") + t.Fail() + } + + for i := 1; i <= 5; i++ { + key := []byte("key-" + string(rune('0'+i))) + val, _, err := db.Get(key) + if err != nil && !errors.Is(err, pebble.ErrNotFound) { + t.Fatalf("Failed to get value: %v", err) + t.Fail() + } + switch i { + case 1: + if !bytes.Equal(val, []byte("potato")) { + t.Errorf("wrong key-1") + } + case 5: + if !bytes.Equal(val, []byte("tomato")) { + t.Errorf("wrong key-5") + } + default: + if err != nil && errors.Is(err, ErrNotFound) { + t.Errorf("key-%d should be deleted, but we got (%s)", i, val) + } + } + } + }) + +} diff --git a/node/store/peerstore.go b/node/store/peerstore.go index 85f0c3c..734fdd1 100644 --- a/node/store/peerstore.go +++ b/node/store/peerstore.go @@ -41,11 +41,10 @@ var _ ds.Batching = (*PeerstoreDatastore)(nil) var _ ds.Batch = (*batch)(nil) var _ Peerstore = (*PeerstoreDatastore)(nil) -func NewPeerstoreDatastore(db KVDB) (*PeerstoreDatastore, error) { - ds := PeerstoreDatastore{ +func NewPeerstoreDatastore(db KVDB) *PeerstoreDatastore { + return &PeerstoreDatastore{ db: db, } - return &ds, nil } func (d *PeerstoreDatastore) Put(