Revert "remove batch operations for hypergraph, pebble is now prioritized for removal"

This reverts commit 352e3e8f24.
This commit is contained in:
Cassandra Heart 2025-02-18 19:07:40 -06:00
parent f904c1c633
commit 6591d82fff
No known key found for this signature in database
GPG Key ID: 6352152859385958
7 changed files with 185 additions and 25 deletions

View File

@ -565,7 +565,7 @@ func (e *TokenExecutionEngine) addBatchToHypergraph(batchKey [][]byte, batchValu
batchKey[i]...,
)
err = e.hypergraphStore.SaveVertexTree(id, vertTree)
err = e.hypergraphStore.SaveVertexTree(txn, id, vertTree)
if err != nil {
txn.Abort()
panic(err)
@ -718,6 +718,11 @@ func (e *TokenExecutionEngine) rebuildHypergraph() {
e.addBatchToHypergraph(batchKey, batchValue)
}
txn, err := e.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
e.logger.Info("committing hypergraph")
roots := e.hypergraph.Commit()
@ -727,8 +732,14 @@ func (e *TokenExecutionEngine) rebuildHypergraph() {
zap.String("root", fmt.Sprintf("%x", roots[0])),
)
err = e.hypergraphStore.SaveHypergraph(e.hypergraph)
err = e.hypergraphStore.SaveHypergraph(txn, e.hypergraph)
if err != nil {
txn.Abort()
panic(err)
}
if err = txn.Commit(); err != nil {
txn.Abort()
panic(err)
}
}
@ -946,6 +957,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
}
vertTree, commitment, err := e.hypergraphStore.CommitAndSaveVertexData(
txn,
append(append([]byte{}, application.TOKEN_ADDRESS...), address...),
compressed,
)
@ -1010,6 +1022,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
}
vertTree, _, err = e.hypergraphStore.CommitAndSaveVertexData(
txn,
vertId,
compressed,
)
@ -1403,7 +1416,10 @@ func (e *TokenExecutionEngine) ProcessFrame(
zap.String("root", fmt.Sprintf("%x", roots[0])),
)
err = e.hypergraphStore.SaveHypergraph(hg)
err = e.hypergraphStore.SaveHypergraph(
txn,
hg,
)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "process frame")

View File

@ -895,6 +895,7 @@ func CreateGenesisState(
compressed = append(compressed, d)
}
vertTree, commitment, err := hypergraphStore.CommitAndSaveVertexData(
txn,
append(append([]byte{}, application.TOKEN_ADDRESS...), address...),
compressed,
)
@ -934,7 +935,7 @@ func CreateGenesisState(
}
intrinsicFilter := p2p.GetBloomFilter(application.TOKEN_ADDRESS, 256, 3)
err = hypergraphStore.SaveHypergraph(hg)
err = hypergraphStore.SaveHypergraph(txn, hg)
if err != nil {
txn.Abort()
panic(err)
@ -1077,6 +1078,7 @@ func CreateGenesisState(
compressed = append(compressed, d)
}
vertTree, commitment, err := hypergraphStore.CommitAndSaveVertexData(
txn,
append(append([]byte{}, application.TOKEN_ADDRESS...), address...),
compressed,
)
@ -1098,7 +1100,7 @@ func CreateGenesisState(
}
}
intrinsicFilter := p2p.GetBloomFilter(application.TOKEN_ADDRESS, 256, 3)
err = hypergraphStore.SaveHypergraph(hg)
err = hypergraphStore.SaveHypergraph(txn, hg)
if err != nil {
txn.Abort()
panic(err)

View File

@ -520,16 +520,27 @@ func syncTreeBidirectionallyServer(
),
)
if len(remoteUpdate.UnderlyingData) != 0 {
txn, err := localHypergraphStore.NewTransaction(false)
if err != nil {
return err
}
tree := &crypto.RawVectorCommitmentTree{}
var b bytes.Buffer
b.Write(remoteUpdate.UnderlyingData)
dec := gob.NewDecoder(&b)
if err := dec.Decode(tree); err != nil {
txn.Abort()
return err
}
err = localHypergraphStore.SaveVertexTree(remoteUpdate.Key, tree)
err = localHypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree)
if err != nil {
txn.Abort()
return err
}
if err = txn.Commit(); err != nil {
txn.Abort()
return err
}
}
@ -792,15 +803,29 @@ func SyncTreeBidirectionally(
remoteUpdate := payload.LeafData
size := new(big.Int).SetBytes(remoteUpdate.Size)
if len(remoteUpdate.UnderlyingData) != 0 {
txn, err := hypergraphStore.NewTransaction(false)
if err != nil {
return err
}
tree := &crypto.RawVectorCommitmentTree{}
var b bytes.Buffer
b.Write(remoteUpdate.UnderlyingData)
dec := gob.NewDecoder(&b)
if err := dec.Decode(tree); err != nil {
txn.Abort()
return err
}
err = hypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree)
if err != nil {
txn.Abort()
return err
}
if err = txn.Commit(); err != nil {
txn.Abort()
return err
}
err = hypergraphStore.SaveVertexTree(remoteUpdate.Key, tree)
}
localTree.Insert(

View File

@ -126,11 +126,12 @@ func TestHypergraphSyncServer(t *testing.T) {
)
})
txn, _ := serverHypergraphStore.NewOversizedBatch()
for _, op := range operations1[:5000] {
switch op.Type {
case "AddVertex":
id := op.Vertex.GetID()
serverHypergraphStore.SaveVertexTree(id[:], dataTree)
serverHypergraphStore.SaveVertexTree(txn, id[:], dataTree)
crdts[0].AddVertex(op.Vertex)
case "RemoveVertex":
crdts[0].RemoveVertex(op.Vertex)
@ -140,6 +141,7 @@ func TestHypergraphSyncServer(t *testing.T) {
crdts[0].RemoveHyperedge(op.Hyperedge)
}
}
txn.Commit()
for _, op := range operations2[:5000] {
switch op.Type {
case "AddVertex":
@ -153,11 +155,12 @@ func TestHypergraphSyncServer(t *testing.T) {
}
}
txn, _ = clientHypergraphStore.NewOversizedBatch()
for _, op := range operations1[5000:] {
switch op.Type {
case "AddVertex":
id := op.Vertex.GetID()
clientHypergraphStore.SaveVertexTree(id[:], dataTree)
clientHypergraphStore.SaveVertexTree(txn, id[:], dataTree)
crdts[1].AddVertex(op.Vertex)
case "RemoveVertex":
crdts[1].RemoveVertex(op.Vertex)
@ -167,6 +170,7 @@ func TestHypergraphSyncServer(t *testing.T) {
crdts[1].RemoveHyperedge(op.Hyperedge)
}
}
txn.Commit()
for _, op := range operations2[5000:] {
switch op.Type {
case "AddVertex":
@ -208,9 +212,16 @@ func TestHypergraphSyncServer(t *testing.T) {
crdts[0].Commit()
crdts[1].Commit()
crdts[2].Commit()
serverHypergraphStore.SaveHypergraph(crdts[0])
clientHypergraphStore.SaveHypergraph(crdts[1])
controlHypergraphStore.SaveHypergraph(crdts[2])
txn, _ = serverHypergraphStore.NewOversizedBatch()
serverHypergraphStore.SaveHypergraph(txn, crdts[0])
txn.Commit()
txn, _ = clientHypergraphStore.NewOversizedBatch()
clientHypergraphStore.SaveHypergraph(txn, crdts[1])
txn.Commit()
txn, _ = controlHypergraphStore.NewOversizedBatch()
controlHypergraphStore.SaveHypergraph(txn, crdts[2])
txn.Commit()
var err error
eval0, err := serverHypergraphStore.LoadHypergraph()
assert.NoError(t, err)
eval1, err := clientHypergraphStore.LoadHypergraph()

View File

@ -21,10 +21,12 @@ type HypergraphStore interface {
)
LoadVertexData(id []byte) ([]application.Encrypted, error)
SaveVertexTree(
txn Transaction,
id []byte,
vertTree *crypto.RawVectorCommitmentTree,
) error
CommitAndSaveVertexData(
txn Transaction,
id []byte,
data []application.Encrypted,
) (*crypto.RawVectorCommitmentTree, []byte, error)
@ -33,11 +35,13 @@ type HypergraphStore interface {
error,
)
SaveHypergraph(
txn Transaction,
hg *application.Hypergraph,
) error
GetBranchNode(id NodeID) (*StoredBranchNode, error)
GetLeafNode(id NodeID) (*StoredLeafNode, error)
BatchWrite(
txn Transaction,
branches map[NodeID]*StoredBranchNode,
leaves map[NodeID]*StoredLeafNode,
deletions map[NodeID]struct{},
@ -154,6 +158,10 @@ func (p *PebbleHypergraphStore) NewTransaction(indexed bool) (
return p.db.NewBatch(indexed), nil
}
func (p *PebbleHypergraphStore) NewOversizedBatch() (Transaction, error) {
return p.db.NewOversizedBatch(), nil
}
func (p *PebbleHypergraphStore) LoadVertexTree(id []byte) (
*crypto.RawVectorCommitmentTree,
error,
@ -203,6 +211,7 @@ func (p *PebbleHypergraphStore) LoadVertexData(id []byte) (
}
func (p *PebbleHypergraphStore) SaveVertexTree(
txn Transaction,
id []byte,
vertTree *crypto.RawVectorCommitmentTree,
) error {
@ -213,12 +222,13 @@ func (p *PebbleHypergraphStore) SaveVertexTree(
}
return errors.Wrap(
p.db.Set(hypergraphVertexDataKey(id), buf.Bytes()),
txn.Set(hypergraphVertexDataKey(id), buf.Bytes()),
"save vertex tree",
)
}
func (p *PebbleHypergraphStore) CommitAndSaveVertexData(
txn Transaction,
id []byte,
data []application.Encrypted,
) (*crypto.RawVectorCommitmentTree, []byte, error) {
@ -232,7 +242,7 @@ func (p *PebbleHypergraphStore) CommitAndSaveVertexData(
}
return dataTree, commit, errors.Wrap(
p.db.Set(hypergraphVertexDataKey(id), buf.Bytes()),
txn.Set(hypergraphVertexDataKey(id), buf.Bytes()),
"commit and save vertex data",
)
}
@ -417,13 +427,14 @@ func (p *PebbleHypergraphStore) LoadHypergraph() (
}
func (p *PebbleHypergraphStore) SaveHypergraph(
txn Transaction,
hg *application.Hypergraph,
) error {
hg.Commit()
for _, vertexAdds := range hg.GetVertexAdds() {
if vertexAdds.IsDirty() {
err := vertexAdds.GetTree().(*PersistentVectorTree).WriteBatch()
err := vertexAdds.GetTree().(*PersistentVectorTree).WriteBatch(txn)
if err != nil {
return errors.Wrap(err, "save hypergraph")
}
@ -432,7 +443,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph(
for _, vertexRemoves := range hg.GetVertexRemoves() {
if vertexRemoves.IsDirty() {
err := vertexRemoves.GetTree().(*PersistentVectorTree).WriteBatch()
err := vertexRemoves.GetTree().(*PersistentVectorTree).WriteBatch(txn)
if err != nil {
return errors.Wrap(err, "save hypergraph")
}
@ -441,7 +452,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph(
for _, hyperedgeAdds := range hg.GetHyperedgeAdds() {
if hyperedgeAdds.IsDirty() {
err := hyperedgeAdds.GetTree().(*PersistentVectorTree).WriteBatch()
err := hyperedgeAdds.GetTree().(*PersistentVectorTree).WriteBatch(txn)
if err != nil {
return errors.Wrap(err, "save hypergraph")
}
@ -450,7 +461,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph(
for _, hyperedgeRemoves := range hg.GetHyperedgeRemoves() {
if hyperedgeRemoves.IsDirty() {
err := hyperedgeRemoves.GetTree().(*PersistentVectorTree).WriteBatch()
err := hyperedgeRemoves.GetTree().(*PersistentVectorTree).WriteBatch(txn)
if err != nil {
return errors.Wrap(err, "save hypergraph")
}
@ -505,12 +516,13 @@ func (p *PebbleHypergraphStore) GetLeafNode(id NodeID) (
}
func (p *PebbleHypergraphStore) BatchWrite(
txn Transaction,
branches map[NodeID]*StoredBranchNode,
leaves map[NodeID]*StoredLeafNode,
deletions map[NodeID]struct{},
) error {
for id := range deletions {
if err := p.db.Delete([]byte(id)); err != nil {
if err := txn.Delete([]byte(id)); err != nil {
return errors.Wrap(err, "batch write")
}
}
@ -522,7 +534,7 @@ func (p *PebbleHypergraphStore) BatchWrite(
return errors.Wrap(err, "batch write")
}
if err := p.db.Set([]byte(id), buf.Bytes()); err != nil {
if err := txn.Set([]byte(id), buf.Bytes()); err != nil {
return errors.Wrap(err, "batch write")
}
}
@ -534,7 +546,7 @@ func (p *PebbleHypergraphStore) BatchWrite(
return errors.Wrap(err, "batch write")
}
if err := p.db.Set([]byte(id), buf.Bytes()); err != nil {
if err := txn.Set([]byte(id), buf.Bytes()); err != nil {
return errors.Wrap(err, "batch write")
}
}
@ -1205,8 +1217,8 @@ func (t *PersistentVectorTree) Commit(recalculate bool) []byte {
return t.tree.Commit(recalculate)
}
func (t *PersistentVectorTree) WriteBatch() error {
err := t.store.BatchWrite(t.addedBranches, t.addedLeaves, t.deletions)
func (t *PersistentVectorTree) WriteBatch(txn Transaction) error {
err := t.store.BatchWrite(txn, t.addedBranches, t.addedLeaves, t.deletions)
if err != nil {
return errors.Wrap(err, "write batch")
}

View File

@ -9,6 +9,7 @@ type KVDB interface {
Set(key, value []byte) error
Delete(key []byte) error
NewBatch(indexed bool) Transaction
NewOversizedBatch() Transaction
NewIter(lowerBound []byte, upperBound []byte) (Iterator, error)
Compact(start, end []byte, parallelize bool) error
CompactAll() error

View File

@ -1,7 +1,13 @@
package store
import (
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"time"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
@ -9,7 +15,8 @@ import (
)
type PebbleDB struct {
db *pebble.DB
config *config.DBConfig
db *pebble.DB
}
func NewPebbleDB(config *config.DBConfig) *PebbleDB {
@ -18,7 +25,7 @@ func NewPebbleDB(config *config.DBConfig) *PebbleDB {
panic(err)
}
return &PebbleDB{db}
return &PebbleDB{config, db}
}
func (p *PebbleDB) Get(key []byte) ([]byte, io.Closer, error) {
@ -45,6 +52,24 @@ func (p *PebbleDB) NewBatch(indexed bool) Transaction {
}
}
func (p *PebbleDB) NewOversizedBatch() Transaction {
path := path.Join(
p.config.Path,
fmt.Sprintf("batch-%d", time.Now().UnixMilli()),
)
db, err := pebble.Open(path, &pebble.Options{})
if err != nil {
panic(err)
}
return &PebbleIngestTransaction{
path: path,
parent: p.db,
b: db,
}
}
func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) (
Iterator,
error,
@ -103,6 +128,73 @@ type Transaction interface {
DeleteRange(lowerBound []byte, upperBound []byte) error
}
type PebbleIngestTransaction struct {
path string
parent *pebble.DB
b *pebble.DB
}
func (t *PebbleIngestTransaction) Get(key []byte) ([]byte, io.Closer, error) {
return t.b.Get(key)
}
func (t *PebbleIngestTransaction) Set(key []byte, value []byte) error {
return t.b.Set(key, value, &pebble.WriteOptions{Sync: true})
}
func (t *PebbleIngestTransaction) Commit() error {
t.b.Close()
find := func(root, ext string) []string {
var a []string
filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error {
if e != nil {
return e
}
if filepath.Ext(d.Name()) == ext {
a = append(a, s)
}
return nil
})
return a
}
err := t.parent.Ingest(find(t.path, ".sst"))
if err != nil {
return errors.Wrap(err, "commit")
}
wait, err := t.parent.AsyncFlush()
if err != nil {
return errors.Wrap(err, "commit")
}
<-wait
return nil
}
func (t *PebbleIngestTransaction) Delete(key []byte) error {
return t.b.Delete(key, &pebble.WriteOptions{Sync: true})
}
func (t *PebbleIngestTransaction) Abort() error {
t.b.Close()
return errors.Wrap(os.RemoveAll(t.path), "abort")
}
func (t *PebbleIngestTransaction) NewIter(lowerBound []byte, upperBound []byte) (
Iterator,
error,
) {
return nil, errors.New("unsupported")
}
func (t *PebbleIngestTransaction) DeleteRange(
lowerBound []byte,
upperBound []byte,
) error {
return errors.New("unsupported")
}
type PebbleTransaction struct {
b *pebble.Batch
}
@ -149,6 +241,7 @@ func (t *PebbleTransaction) DeleteRange(
}
var _ Transaction = (*PebbleTransaction)(nil)
var _ Transaction = (*PebbleIngestTransaction)(nil)
func rightAlign(data []byte, size int) []byte {
l := len(data)