mdbx: shard table from first key byte

This commit is contained in:
Victor Shyba 2025-05-14 23:15:58 -03:00
parent 31830d0863
commit 9ad7426920
4 changed files with 48 additions and 146 deletions

View File

@ -50,44 +50,16 @@ var keyManagerSet = wire.NewSet(
wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)),
)
func provideBaseDB(dbConfig *config.DBConfig) *store.MDBXDB {
return store.NewMDBXDB(dbConfig)
}
func providePebbleClockStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleClockStore {
return store.NewPebbleClockStore(db.OpenDB("clock_store"), logger)
}
func providePebbleCoinStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleCoinStore {
return store.NewPebbleCoinStore(db.OpenDB("coin_store"), logger)
}
func providePebbleKeyStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleKeyStore {
return store.NewPebbleKeyStore(db.OpenDB("key_store"), logger)
}
func providePebbleDataProofStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleDataProofStore {
return store.NewPebbleDataProofStore(db.OpenDB("data_proof_store"), logger)
}
func providePebbleHypergraphStore(db *store.MDBXDB, logger *zap.Logger) *store.PebbleHypergraphStore {
return store.NewPebbleHypergraphStore(db.OpenDB("hypergraph_store"), logger)
}
func providePeerstoreDatastore(db *store.MDBXDB, logger *zap.Logger) *store.PeerstoreDatastore {
return store.NewPeerstoreDatastore(db.OpenDB("peerstore"))
}
var storeSet = wire.NewSet(
wire.FieldsOf(new(*config.Config), "DB"),
provideBaseDB,
store.NewMDBXDB,
wire.Bind(new(store.KVDB), new(*store.MDBXDB)),
providePebbleClockStore,
providePebbleCoinStore,
providePebbleKeyStore,
providePebbleDataProofStore,
providePebbleHypergraphStore,
providePeerstoreDatastore,
store.NewPebbleClockStore,
store.NewPebbleCoinStore,
store.NewPebbleKeyStore,
store.NewPebbleDataProofStore,
store.NewPebbleHypergraphStore,
store.NewPeerstoreDatastore,
wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)),
wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)),
wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)),

View File

@ -37,21 +37,21 @@ func NewDHTNode(configConfig *config.Config) (*DHTNode, error) {
func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) {
zapLogger := debugLogger()
dbConfig := configConfig.DB
mdbxdb := provideBaseDB(dbConfig)
pebbleDataProofStore := providePebbleDataProofStore(mdbxdb, zapLogger)
pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger)
pebbleCoinStore := providePebbleCoinStore(mdbxdb, zapLogger)
mdbxdb := store.NewMDBXDB(dbConfig)
pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger)
pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger)
keyConfig := configConfig.Key
fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger)
p2PConfig := configConfig.P2P
blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger)
frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger)
kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger)
pebbleHypergraphStore := providePebbleHypergraphStore(mdbxdb, zapLogger)
pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger)
engineConfig := configConfig.Engine
masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver)
inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger)
pebbleKeyStore := providePebbleKeyStore(mdbxdb, zapLogger)
pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger)
tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport)
masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport)
node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb)
@ -64,21 +64,21 @@ func NewDebugNode(configConfig *config.Config, selfTestReport *protobufs.SelfTes
func NewNode(configConfig *config.Config, selfTestReport *protobufs.SelfTestReport) (*Node, error) {
zapLogger := logger()
dbConfig := configConfig.DB
mdbxdb := provideBaseDB(dbConfig)
pebbleDataProofStore := providePebbleDataProofStore(mdbxdb, zapLogger)
pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger)
pebbleCoinStore := providePebbleCoinStore(mdbxdb, zapLogger)
mdbxdb := store.NewMDBXDB(dbConfig)
pebbleDataProofStore := store.NewPebbleDataProofStore(mdbxdb, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger)
pebbleCoinStore := store.NewPebbleCoinStore(mdbxdb, zapLogger)
keyConfig := configConfig.Key
fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger)
p2PConfig := configConfig.P2P
blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger)
frameProver := crypto.NewCachedWesolowskiFrameProver(zapLogger)
kzgInclusionProver := crypto.NewKZGInclusionProver(zapLogger)
pebbleHypergraphStore := providePebbleHypergraphStore(mdbxdb, zapLogger)
pebbleHypergraphStore := store.NewPebbleHypergraphStore(mdbxdb, zapLogger)
engineConfig := configConfig.Engine
masterTimeReel := time.NewMasterTimeReel(zapLogger, pebbleClockStore, engineConfig, frameProver)
inMemoryPeerInfoManager := p2p.NewInMemoryPeerInfoManager(zapLogger)
pebbleKeyStore := providePebbleKeyStore(mdbxdb, zapLogger)
pebbleKeyStore := store.NewPebbleKeyStore(mdbxdb, zapLogger)
tokenExecutionEngine := token.NewTokenExecutionEngine(zapLogger, configConfig, fileKeyManager, blossomSub, frameProver, kzgInclusionProver, pebbleClockStore, pebbleDataProofStore, pebbleHypergraphStore, pebbleCoinStore, masterTimeReel, inMemoryPeerInfoManager, pebbleKeyStore, selfTestReport)
masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub, kzgInclusionProver, frameProver, masterTimeReel, inMemoryPeerInfoManager, selfTestReport)
node, err := newNode(zapLogger, pebbleDataProofStore, pebbleClockStore, pebbleCoinStore, fileKeyManager, blossomSub, tokenExecutionEngine, masterClockConsensusEngine, mdbxdb)
@ -98,9 +98,9 @@ func NewDBConsole(configConfig *config.Config) (*DBConsole, error) {
func NewClockStore(configConfig *config.Config) (store.ClockStore, error) {
dbConfig := configConfig.DB
mdbxdb := provideBaseDB(dbConfig)
mdbxdb := store.NewMDBXDB(dbConfig)
zapLogger := logger()
pebbleClockStore := providePebbleClockStore(mdbxdb, zapLogger)
pebbleClockStore := store.NewPebbleClockStore(mdbxdb, zapLogger)
return pebbleClockStore, nil
}
@ -134,41 +134,7 @@ var debugLoggerSet = wire.NewSet(
var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)))
func provideBaseDB(dbConfig *config.DBConfig) *store.MDBXDB {
return store.NewMDBXDB(dbConfig)
}
func providePebbleClockStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleClockStore {
return store.NewPebbleClockStore(db.OpenDB("clock_store"), logger2)
}
func providePebbleCoinStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleCoinStore {
return store.NewPebbleCoinStore(db.OpenDB("coin_store"), logger2)
}
func providePebbleKeyStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleKeyStore {
return store.NewPebbleKeyStore(db.OpenDB("key_store"), logger2)
}
func providePebbleDataProofStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleDataProofStore {
return store.NewPebbleDataProofStore(db.OpenDB("data_proof_store"), logger2)
}
func providePebbleHypergraphStore(db *store.MDBXDB, logger2 *zap.Logger) *store.PebbleHypergraphStore {
return store.NewPebbleHypergraphStore(db.OpenDB("hypergraph_store"), logger2)
}
func providePeerstoreDatastore(db *store.MDBXDB, logger2 *zap.Logger) *store.PeerstoreDatastore {
return store.NewPeerstoreDatastore(db.OpenDB("peerstore"))
}
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), provideBaseDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), providePebbleClockStore,
providePebbleCoinStore,
providePebbleKeyStore,
providePebbleDataProofStore,
providePebbleHypergraphStore,
providePeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore)),
)
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewMDBXDB, wire.Bind(new(store.KVDB), new(*store.MDBXDB)), store.NewPebbleClockStore, store.NewPebbleCoinStore, store.NewPebbleKeyStore, store.NewPebbleDataProofStore, store.NewPebbleHypergraphStore, store.NewPeerstoreDatastore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), wire.Bind(new(store.CoinStore), new(*store.PebbleCoinStore)), wire.Bind(new(store.KeyStore), new(*store.PebbleKeyStore)), wire.Bind(new(store.DataProofStore), new(*store.PebbleDataProofStore)), wire.Bind(new(store.HypergraphStore), new(*store.PebbleHypergraphStore)), wire.Bind(new(store.Peerstore), new(*store.PeerstoreDatastore)))
var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewInMemoryPeerInfoManager, p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)), wire.Bind(new(p2p.PeerInfoManager), new(*p2p.InMemoryPeerInfoManager)))

View File

@ -5,6 +5,7 @@ import (
"io"
"os"
"runtime"
"strconv"
"sync/atomic"
"github.com/cockroachdb/pebble"
@ -13,8 +14,8 @@ import (
)
type MDBXDB struct {
env *mdbx.Env
dbi mdbx.DBI
env *mdbx.Env
dbis map[byte]mdbx.DBI
}
// this is ugly but I did not find a better way
@ -45,7 +46,7 @@ func NewMDBXDB(config *config.DBConfig) *MDBXDB {
}
// Configs
if err = env.SetOption(mdbx.OptMaxDB, 200); err != nil {
if err = env.SetOption(mdbx.OptMaxDB, 300); err != nil {
panic(err)
}
if err = env.SetOption(mdbx.OptMaxReaders, READERS_LIMIT); err != nil {
@ -66,11 +67,15 @@ func NewMDBXDB(config *config.DBConfig) *MDBXDB {
}
// Open the default database. Other databases are opened using OpenDB)
db := &MDBXDB{env: env}
return db.OpenDB(DEFAULT_TABLE)
db := &MDBXDB{env: env, dbis: make(map[byte]mdbx.DBI)}
db.dbis[0] = db.OpenDB(DEFAULT_TABLE)
for i := byte(1); i < 255; i++ {
db.dbis[i] = db.OpenDB(strconv.Itoa(int(i)))
}
return db
}
func (m *MDBXDB) OpenDB(name string) *MDBXDB {
func (m *MDBXDB) OpenDB(name string) mdbx.DBI {
var dbi mdbx.DBI
var err error
err = m.env.Update(func(txn *mdbx.Txn) error {
@ -83,7 +88,7 @@ func (m *MDBXDB) OpenDB(name string) *MDBXDB {
if err != nil {
panic(err)
}
return &MDBXDB{env: m.env, dbi: dbi}
return dbi
}
// KVDB interface implementation
@ -91,7 +96,7 @@ func (m *MDBXDB) Get(key []byte) ([]byte, io.Closer, error) {
var result []byte
var err error
err = m.env.View(func(txn *mdbx.Txn) error {
val, err := txn.Get(m.dbi, key)
val, err := txn.Get(m.dbis[key[0]], key)
if err != nil {
if mdbx.IsNotFound(err) {
return pebble.ErrNotFound
@ -112,7 +117,7 @@ func (m *MDBXDB) Set(key, value []byte) error {
panic("another tx is writing")
}
return m.env.Update(func(txn *mdbx.Txn) error {
return txn.Put(m.dbi, key, value, mdbx.Upsert)
return txn.Put(m.dbis[key[0]], key, value, mdbx.Upsert)
})
}
@ -121,7 +126,7 @@ func (m *MDBXDB) Delete(key []byte) error {
panic("another tx is writing")
}
return m.env.Update(func(txn *mdbx.Txn) error {
return txn.Del(m.dbi, key, nil)
return txn.Del(m.dbis[key[0]], key, nil)
})
}
@ -148,7 +153,7 @@ func (m *MDBXDB) NewTransaction() Transaction {
return &MDBXTransaction{
txn: txn,
dbi: m.dbi,
db: m,
}
}
@ -162,7 +167,7 @@ func (m *MDBXDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error)
return nil, err
}
cursor, err := txn.OpenCursor(m.dbi)
cursor, err := txn.OpenCursor(m.dbis[lowerBound[0]])
if err != nil {
txn.Abort()
runtime.UnlockOSThread()
@ -215,11 +220,11 @@ var _ KVDB = (*MDBXDB)(nil)
// Transaction implementation
type MDBXTransaction struct {
txn *mdbx.Txn
dbi mdbx.DBI
db *MDBXDB
}
func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) {
val, err := t.txn.Get(t.dbi, key)
val, err := t.txn.Get(t.db.dbis[key[0]], key)
if err != nil {
if mdbx.IsNotFound(err) {
return nil, nil, pebble.ErrNotFound
@ -235,7 +240,7 @@ func (t *MDBXTransaction) Get(key []byte) ([]byte, io.Closer, error) {
}
func (t *MDBXTransaction) Set(key []byte, value []byte) error {
return t.txn.Put(t.dbi, key, value, 0)
return t.txn.Put(t.db.dbis[key[0]], key, value, 0)
}
func (t *MDBXTransaction) Commit() error {
@ -247,7 +252,7 @@ func (t *MDBXTransaction) Commit() error {
}
func (t *MDBXTransaction) Delete(key []byte) error {
return t.txn.Del(t.dbi, key, nil)
return t.txn.Del(t.db.dbis[key[0]], key, nil)
}
func (t *MDBXTransaction) Abort() error {
@ -262,7 +267,7 @@ func (t *MDBXTransaction) NewIter(lowerBound []byte, upperBound []byte) (Iterato
}
func (t *MDBXTransaction) NewIterCustom(lowerBound []byte, upperBound []byte) (*MDBXIterator, error) {
cursor, err := t.txn.OpenCursor(t.dbi)
cursor, err := t.txn.OpenCursor(t.db.dbis[lowerBound[0]])
if err != nil {
return nil, err
}
@ -296,7 +301,7 @@ func (t *MDBXTransaction) DeleteRange(lowerBound []byte, upperBound []byte) erro
return err
}
for iter.First(); iter.Valid(); iter.Next() {
err = iter.txn.Del(t.dbi, iter.key, nil)
err = iter.txn.Del(t.db.dbis[lowerBound[0]], iter.key, nil)
if err != nil {
return err
}
@ -539,11 +544,11 @@ func (m *MDBXBatch) Commit() error {
for _, op := range m.operations {
switch op.opcode {
case Set:
err = tx.Put(m.db.dbi, op.operand1, op.operand2, mdbx.Upsert)
err = tx.Put(m.db.dbis[op.operand1[0]], op.operand1, op.operand2, mdbx.Upsert)
case Delete:
err = tx.Del(m.db.dbi, op.operand1, nil)
err = tx.Del(m.db.dbis[op.operand1[0]], op.operand1, nil)
case DeleteRange:
cursor, err := tx.OpenCursor(m.db.dbi)
cursor, err := tx.OpenCursor(m.db.dbis[op.operand1[0]])
if err != nil {
return err
}

View File

@ -407,45 +407,4 @@ func TestMDBXDB(t *testing.T) {
}
})
t.Run("OpenDB", func(t *testing.T) {
names := []string{DEFAULT_TABLE, "table1", "table2"}
var dbs []MDBXDB
for _, name := range names {
newdb := db.OpenDB(name)
if err != nil {
t.Errorf("error opening db: %v", err)
t.Fail()
}
dbs = append(dbs, *newdb)
}
for j, currentDB := range dbs {
for i := 1; i <= 5; i++ {
name := names[j]
key := []byte("key-" + string(rune('0'+i)))
value := []byte(name + string(rune('0'+i)))
if err := currentDB.Set(key, value); err != nil {
t.Fatalf("Failed to set value: %v", err)
}
}
}
// Check that values under the same key on different dbs were not overwritten
for j, currentDB := range dbs {
for i := 1; i <= 5; i++ {
name := names[j]
key := []byte("key-" + string(rune('0'+i)))
expectedValue := []byte(name + string(rune('0'+i)))
value, _, err := currentDB.Get(key)
if !bytes.Equal(value, expectedValue) {
t.Errorf("%s was overwritten by %s on db %s", expectedValue, value, name)
t.Fail()
}
if err != nil {
t.Errorf("Error getting value: %v", err)
}
}
}
})
}