mirror of
https://github.com/QuilibriumNetwork/ceremonyclient.git
synced 2026-02-21 10:27:26 +08:00
Revert "switch to db.ingest for hypergraph test to evaluate alternative path"
This reverts commit 65caed5988.
This commit is contained in:
parent
6591d82fff
commit
51eaf5a5ad
@ -16,7 +16,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/config"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/crypto"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/hypergraph/application"
|
||||
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
|
||||
@ -90,15 +89,9 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
serverKvdb := store.NewPebbleDB(&config.DBConfig{
|
||||
Path: ".testconfigserver/store",
|
||||
})
|
||||
clientKvdb := store.NewPebbleDB(&config.DBConfig{
|
||||
Path: ".testconfigclient/store",
|
||||
})
|
||||
controlKvdb := store.NewPebbleDB(&config.DBConfig{
|
||||
Path: ".testconfigcontrol/store",
|
||||
})
|
||||
serverKvdb := store.NewInMemKVDB()
|
||||
clientKvdb := store.NewInMemKVDB()
|
||||
controlKvdb := store.NewInMemKVDB()
|
||||
logger, _ := zap.NewProduction()
|
||||
serverHypergraphStore := store.NewPebbleHypergraphStore(serverKvdb, logger)
|
||||
clientHypergraphStore := store.NewPebbleHypergraphStore(clientKvdb, logger)
|
||||
@ -126,7 +119,7 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
)
|
||||
})
|
||||
|
||||
txn, _ := serverHypergraphStore.NewOversizedBatch()
|
||||
txn, _ := serverHypergraphStore.NewTransaction(false)
|
||||
for _, op := range operations1[:5000] {
|
||||
switch op.Type {
|
||||
case "AddVertex":
|
||||
@ -155,7 +148,7 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
txn, _ = clientHypergraphStore.NewOversizedBatch()
|
||||
txn, _ = clientHypergraphStore.NewTransaction(false)
|
||||
for _, op := range operations1[5000:] {
|
||||
switch op.Type {
|
||||
case "AddVertex":
|
||||
@ -212,13 +205,13 @@ func TestHypergraphSyncServer(t *testing.T) {
|
||||
crdts[0].Commit()
|
||||
crdts[1].Commit()
|
||||
crdts[2].Commit()
|
||||
txn, _ = serverHypergraphStore.NewOversizedBatch()
|
||||
txn, _ = serverHypergraphStore.NewTransaction(false)
|
||||
serverHypergraphStore.SaveHypergraph(txn, crdts[0])
|
||||
txn.Commit()
|
||||
txn, _ = clientHypergraphStore.NewOversizedBatch()
|
||||
txn, _ = clientHypergraphStore.NewTransaction(false)
|
||||
clientHypergraphStore.SaveHypergraph(txn, crdts[1])
|
||||
txn.Commit()
|
||||
txn, _ = controlHypergraphStore.NewOversizedBatch()
|
||||
txn, _ = controlHypergraphStore.NewTransaction(false)
|
||||
controlHypergraphStore.SaveHypergraph(txn, crdts[2])
|
||||
txn.Commit()
|
||||
var err error
|
||||
|
||||
@ -158,10 +158,6 @@ func (p *PebbleHypergraphStore) NewTransaction(indexed bool) (
|
||||
return p.db.NewBatch(indexed), nil
|
||||
}
|
||||
|
||||
func (p *PebbleHypergraphStore) NewOversizedBatch() (Transaction, error) {
|
||||
return p.db.NewOversizedBatch(), nil
|
||||
}
|
||||
|
||||
func (p *PebbleHypergraphStore) LoadVertexTree(id []byte) (
|
||||
*crypto.RawVectorCommitmentTree,
|
||||
error,
|
||||
|
||||
@ -374,19 +374,6 @@ func (d *InMemKVDB) NewBatch(indexed bool) Transaction {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *InMemKVDB) NewOversizedBatch() Transaction {
|
||||
if !d.open {
|
||||
return nil
|
||||
}
|
||||
|
||||
id := rand.Int()
|
||||
return &InMemKVDBTransaction{
|
||||
id: id,
|
||||
db: d,
|
||||
changes: []InMemKVDBOperation{},
|
||||
}
|
||||
}
|
||||
|
||||
func (d *InMemKVDB) NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) {
|
||||
if !d.open {
|
||||
return nil, errors.New("inmem db closed")
|
||||
|
||||
@ -9,7 +9,6 @@ type KVDB interface {
|
||||
Set(key, value []byte) error
|
||||
Delete(key []byte) error
|
||||
NewBatch(indexed bool) Transaction
|
||||
NewOversizedBatch() Transaction
|
||||
NewIter(lowerBound []byte, upperBound []byte) (Iterator, error)
|
||||
Compact(start, end []byte, parallelize bool) error
|
||||
CompactAll() error
|
||||
|
||||
@ -1,13 +1,7 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/pkg/errors"
|
||||
@ -15,8 +9,7 @@ import (
|
||||
)
|
||||
|
||||
type PebbleDB struct {
|
||||
config *config.DBConfig
|
||||
db *pebble.DB
|
||||
db *pebble.DB
|
||||
}
|
||||
|
||||
func NewPebbleDB(config *config.DBConfig) *PebbleDB {
|
||||
@ -25,7 +18,7 @@ func NewPebbleDB(config *config.DBConfig) *PebbleDB {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &PebbleDB{config, db}
|
||||
return &PebbleDB{db}
|
||||
}
|
||||
|
||||
func (p *PebbleDB) Get(key []byte) ([]byte, io.Closer, error) {
|
||||
@ -52,24 +45,6 @@ func (p *PebbleDB) NewBatch(indexed bool) Transaction {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PebbleDB) NewOversizedBatch() Transaction {
|
||||
path := path.Join(
|
||||
p.config.Path,
|
||||
fmt.Sprintf("batch-%d", time.Now().UnixMilli()),
|
||||
)
|
||||
|
||||
db, err := pebble.Open(path, &pebble.Options{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &PebbleIngestTransaction{
|
||||
path: path,
|
||||
parent: p.db,
|
||||
b: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) (
|
||||
Iterator,
|
||||
error,
|
||||
@ -128,73 +103,6 @@ type Transaction interface {
|
||||
DeleteRange(lowerBound []byte, upperBound []byte) error
|
||||
}
|
||||
|
||||
type PebbleIngestTransaction struct {
|
||||
path string
|
||||
parent *pebble.DB
|
||||
b *pebble.DB
|
||||
}
|
||||
|
||||
func (t *PebbleIngestTransaction) Get(key []byte) ([]byte, io.Closer, error) {
|
||||
return t.b.Get(key)
|
||||
}
|
||||
|
||||
func (t *PebbleIngestTransaction) Set(key []byte, value []byte) error {
|
||||
return t.b.Set(key, value, &pebble.WriteOptions{Sync: true})
|
||||
}
|
||||
|
||||
func (t *PebbleIngestTransaction) Commit() error {
|
||||
t.b.Close()
|
||||
find := func(root, ext string) []string {
|
||||
var a []string
|
||||
filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error {
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
if filepath.Ext(d.Name()) == ext {
|
||||
a = append(a, s)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return a
|
||||
}
|
||||
|
||||
err := t.parent.Ingest(find(t.path, ".sst"))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "commit")
|
||||
}
|
||||
|
||||
wait, err := t.parent.AsyncFlush()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "commit")
|
||||
}
|
||||
|
||||
<-wait
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *PebbleIngestTransaction) Delete(key []byte) error {
|
||||
return t.b.Delete(key, &pebble.WriteOptions{Sync: true})
|
||||
}
|
||||
|
||||
func (t *PebbleIngestTransaction) Abort() error {
|
||||
t.b.Close()
|
||||
return errors.Wrap(os.RemoveAll(t.path), "abort")
|
||||
}
|
||||
|
||||
func (t *PebbleIngestTransaction) NewIter(lowerBound []byte, upperBound []byte) (
|
||||
Iterator,
|
||||
error,
|
||||
) {
|
||||
return nil, errors.New("unsupported")
|
||||
}
|
||||
|
||||
func (t *PebbleIngestTransaction) DeleteRange(
|
||||
lowerBound []byte,
|
||||
upperBound []byte,
|
||||
) error {
|
||||
return errors.New("unsupported")
|
||||
}
|
||||
|
||||
type PebbleTransaction struct {
|
||||
b *pebble.Batch
|
||||
}
|
||||
@ -241,7 +149,6 @@ func (t *PebbleTransaction) DeleteRange(
|
||||
}
|
||||
|
||||
var _ Transaction = (*PebbleTransaction)(nil)
|
||||
var _ Transaction = (*PebbleIngestTransaction)(nil)
|
||||
|
||||
func rightAlign(data []byte, size int) []byte {
|
||||
l := len(data)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user