diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index caa5507..cf7a2af 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -16,7 +16,6 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "source.quilibrium.com/quilibrium/monorepo/node/config" "source.quilibrium.com/quilibrium/monorepo/node/crypto" "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" @@ -90,15 +89,9 @@ func TestHypergraphSyncServer(t *testing.T) { } } - serverKvdb := store.NewPebbleDB(&config.DBConfig{ - Path: ".testconfigserver/store", - }) - clientKvdb := store.NewPebbleDB(&config.DBConfig{ - Path: ".testconfigclient/store", - }) - controlKvdb := store.NewPebbleDB(&config.DBConfig{ - Path: ".testconfigcontrol/store", - }) + serverKvdb := store.NewInMemKVDB() + clientKvdb := store.NewInMemKVDB() + controlKvdb := store.NewInMemKVDB() logger, _ := zap.NewProduction() serverHypergraphStore := store.NewPebbleHypergraphStore(serverKvdb, logger) clientHypergraphStore := store.NewPebbleHypergraphStore(clientKvdb, logger) @@ -126,7 +119,7 @@ func TestHypergraphSyncServer(t *testing.T) { ) }) - txn, _ := serverHypergraphStore.NewOversizedBatch() + txn, _ := serverHypergraphStore.NewTransaction(false) for _, op := range operations1[:5000] { switch op.Type { case "AddVertex": @@ -155,7 +148,7 @@ func TestHypergraphSyncServer(t *testing.T) { } } - txn, _ = clientHypergraphStore.NewOversizedBatch() + txn, _ = clientHypergraphStore.NewTransaction(false) for _, op := range operations1[5000:] { switch op.Type { case "AddVertex": @@ -212,13 +205,13 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[0].Commit() crdts[1].Commit() crdts[2].Commit() - txn, _ = serverHypergraphStore.NewOversizedBatch() + txn, _ = serverHypergraphStore.NewTransaction(false) serverHypergraphStore.SaveHypergraph(txn, crdts[0]) txn.Commit() - txn, _ = clientHypergraphStore.NewOversizedBatch() + txn, _ = clientHypergraphStore.NewTransaction(false) clientHypergraphStore.SaveHypergraph(txn, crdts[1]) txn.Commit() - txn, _ = controlHypergraphStore.NewOversizedBatch() + txn, _ = controlHypergraphStore.NewTransaction(false) controlHypergraphStore.SaveHypergraph(txn, crdts[2]) txn.Commit() var err error diff --git a/node/store/hypergraph.go b/node/store/hypergraph.go index 98d0dfe..3156859 100644 --- a/node/store/hypergraph.go +++ b/node/store/hypergraph.go @@ -158,10 +158,6 @@ func (p *PebbleHypergraphStore) NewTransaction(indexed bool) ( return p.db.NewBatch(indexed), nil } -func (p *PebbleHypergraphStore) NewOversizedBatch() (Transaction, error) { - return p.db.NewOversizedBatch(), nil -} - func (p *PebbleHypergraphStore) LoadVertexTree(id []byte) ( *crypto.RawVectorCommitmentTree, error, diff --git a/node/store/inmem.go b/node/store/inmem.go index 1b45e46..54110a9 100644 --- a/node/store/inmem.go +++ b/node/store/inmem.go @@ -374,19 +374,6 @@ func (d *InMemKVDB) NewBatch(indexed bool) Transaction { } } -func (d *InMemKVDB) NewOversizedBatch() Transaction { - if !d.open { - return nil - } - - id := rand.Int() - return &InMemKVDBTransaction{ - id: id, - db: d, - changes: []InMemKVDBOperation{}, - } -} - func (d *InMemKVDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) { if !d.open { return nil, errors.New("inmem db closed") diff --git a/node/store/kvdb.go b/node/store/kvdb.go index 20142fa..61422da 100644 --- a/node/store/kvdb.go +++ b/node/store/kvdb.go @@ -9,7 +9,6 @@ type KVDB interface { Set(key, value []byte) error Delete(key []byte) error NewBatch(indexed bool) Transaction - NewOversizedBatch() Transaction NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) Compact(start, end []byte, parallelize bool) error CompactAll() error diff --git a/node/store/pebble.go b/node/store/pebble.go index 2e10225..6e497be 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -1,13 +1,7 @@ package store import ( - "fmt" "io" - "io/fs" - "os" - "path" - "path/filepath" - "time" "github.com/cockroachdb/pebble" "github.com/pkg/errors" @@ -15,8 +9,7 @@ import ( ) type PebbleDB struct { - config *config.DBConfig - db *pebble.DB + db *pebble.DB } func NewPebbleDB(config *config.DBConfig) *PebbleDB { @@ -25,7 +18,7 @@ func NewPebbleDB(config *config.DBConfig) *PebbleDB { panic(err) } - return &PebbleDB{config, db} + return &PebbleDB{db} } func (p *PebbleDB) Get(key []byte) ([]byte, io.Closer, error) { @@ -52,24 +45,6 @@ func (p *PebbleDB) NewBatch(indexed bool) Transaction { } } -func (p *PebbleDB) NewOversizedBatch() Transaction { - path := path.Join( - p.config.Path, - fmt.Sprintf("batch-%d", time.Now().UnixMilli()), - ) - - db, err := pebble.Open(path, &pebble.Options{}) - if err != nil { - panic(err) - } - - return &PebbleIngestTransaction{ - path: path, - parent: p.db, - b: db, - } -} - func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) ( Iterator, error, @@ -128,73 +103,6 @@ type Transaction interface { DeleteRange(lowerBound []byte, upperBound []byte) error } -type PebbleIngestTransaction struct { - path string - parent *pebble.DB - b *pebble.DB -} - -func (t *PebbleIngestTransaction) Get(key []byte) ([]byte, io.Closer, error) { - return t.b.Get(key) -} - -func (t *PebbleIngestTransaction) Set(key []byte, value []byte) error { - return t.b.Set(key, value, &pebble.WriteOptions{Sync: true}) -} - -func (t *PebbleIngestTransaction) Commit() error { - t.b.Close() - find := func(root, ext string) []string { - var a []string - filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error { - if e != nil { - return e - } - if filepath.Ext(d.Name()) == ext { - a = append(a, s) - } - return nil - }) - return a - } - - err := t.parent.Ingest(find(t.path, ".sst")) - if err != nil { - return errors.Wrap(err, "commit") - } - - wait, err := t.parent.AsyncFlush() - if err != nil { - return errors.Wrap(err, "commit") - } - - <-wait - return nil -} - -func (t *PebbleIngestTransaction) Delete(key []byte) error { - return t.b.Delete(key, &pebble.WriteOptions{Sync: true}) -} - -func (t *PebbleIngestTransaction) Abort() error { - t.b.Close() - return errors.Wrap(os.RemoveAll(t.path), "abort") -} - -func (t *PebbleIngestTransaction) NewIter(lowerBound []byte, upperBound []byte) ( - Iterator, - error, -) { - return nil, errors.New("unsupported") -} - -func (t *PebbleIngestTransaction) DeleteRange( - lowerBound []byte, - upperBound []byte, -) error { - return errors.New("unsupported") -} - type PebbleTransaction struct { b *pebble.Batch } @@ -241,7 +149,6 @@ func (t *PebbleTransaction) DeleteRange( } var _ Transaction = (*PebbleTransaction)(nil) -var _ Transaction = (*PebbleIngestTransaction)(nil) func rightAlign(data []byte, size int) []byte { l := len(data)