diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index aadf3b4..97c7ed9 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -565,7 +565,7 @@ func (e *TokenExecutionEngine) addBatchToHypergraph(batchKey [][]byte, batchValu batchKey[i]..., ) - err = e.hypergraphStore.SaveVertexTree(id, vertTree) + err = e.hypergraphStore.SaveVertexTree(txn, id, vertTree) if err != nil { txn.Abort() panic(err) @@ -718,6 +718,11 @@ func (e *TokenExecutionEngine) rebuildHypergraph() { e.addBatchToHypergraph(batchKey, batchValue) } + txn, err := e.clockStore.NewTransaction(false) + if err != nil { + panic(err) + } + e.logger.Info("committing hypergraph") roots := e.hypergraph.Commit() @@ -727,8 +732,14 @@ func (e *TokenExecutionEngine) rebuildHypergraph() { zap.String("root", fmt.Sprintf("%x", roots[0])), ) - err = e.hypergraphStore.SaveHypergraph(e.hypergraph) + err = e.hypergraphStore.SaveHypergraph(txn, e.hypergraph) if err != nil { + txn.Abort() + panic(err) + } + + if err = txn.Commit(); err != nil { + txn.Abort() panic(err) } } @@ -946,6 +957,7 @@ func (e *TokenExecutionEngine) ProcessFrame( } vertTree, commitment, err := e.hypergraphStore.CommitAndSaveVertexData( + txn, append(append([]byte{}, application.TOKEN_ADDRESS...), address...), compressed, ) @@ -1010,6 +1022,7 @@ func (e *TokenExecutionEngine) ProcessFrame( } vertTree, _, err = e.hypergraphStore.CommitAndSaveVertexData( + txn, vertId, compressed, ) @@ -1403,7 +1416,10 @@ func (e *TokenExecutionEngine) ProcessFrame( zap.String("root", fmt.Sprintf("%x", roots[0])), ) - err = e.hypergraphStore.SaveHypergraph(hg) + err = e.hypergraphStore.SaveHypergraph( + txn, + hg, + ) if err != nil { txn.Abort() return nil, errors.Wrap(err, "process frame") diff --git a/node/execution/intrinsics/token/token_genesis.go b/node/execution/intrinsics/token/token_genesis.go index 2da45f0..0e24cab 100644 --- a/node/execution/intrinsics/token/token_genesis.go +++ b/node/execution/intrinsics/token/token_genesis.go @@ -895,6 +895,7 @@ func CreateGenesisState( compressed = append(compressed, d) } vertTree, commitment, err := hypergraphStore.CommitAndSaveVertexData( + txn, append(append([]byte{}, application.TOKEN_ADDRESS...), address...), compressed, ) @@ -934,7 +935,7 @@ func CreateGenesisState( } intrinsicFilter := p2p.GetBloomFilter(application.TOKEN_ADDRESS, 256, 3) - err = hypergraphStore.SaveHypergraph(hg) + err = hypergraphStore.SaveHypergraph(txn, hg) if err != nil { txn.Abort() panic(err) @@ -1077,6 +1078,7 @@ func CreateGenesisState( compressed = append(compressed, d) } vertTree, commitment, err := hypergraphStore.CommitAndSaveVertexData( + txn, append(append([]byte{}, application.TOKEN_ADDRESS...), address...), compressed, ) @@ -1098,7 +1100,7 @@ func CreateGenesisState( } } intrinsicFilter := p2p.GetBloomFilter(application.TOKEN_ADDRESS, 256, 3) - err = hypergraphStore.SaveHypergraph(hg) + err = hypergraphStore.SaveHypergraph(txn, hg) if err != nil { txn.Abort() panic(err) diff --git a/node/rpc/hypergraph_sync_rpc_server.go b/node/rpc/hypergraph_sync_rpc_server.go index d8b7917..565dbe9 100644 --- a/node/rpc/hypergraph_sync_rpc_server.go +++ b/node/rpc/hypergraph_sync_rpc_server.go @@ -520,16 +520,27 @@ func syncTreeBidirectionallyServer( ), ) if len(remoteUpdate.UnderlyingData) != 0 { + txn, err := localHypergraphStore.NewTransaction(false) + if err != nil { + return err + } tree := &crypto.RawVectorCommitmentTree{} var b bytes.Buffer b.Write(remoteUpdate.UnderlyingData) dec := gob.NewDecoder(&b) if err := dec.Decode(tree); err != nil { + txn.Abort() return err } - err = localHypergraphStore.SaveVertexTree(remoteUpdate.Key, tree) + err = localHypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree) if err != nil { + txn.Abort() + return err + } + + if err = txn.Commit(); err != nil { + txn.Abort() return err } } @@ -792,15 +803,29 @@ func SyncTreeBidirectionally( remoteUpdate := payload.LeafData size := new(big.Int).SetBytes(remoteUpdate.Size) if len(remoteUpdate.UnderlyingData) != 0 { + txn, err := hypergraphStore.NewTransaction(false) + if err != nil { + return err + } tree := &crypto.RawVectorCommitmentTree{} var b bytes.Buffer b.Write(remoteUpdate.UnderlyingData) dec := gob.NewDecoder(&b) if err := dec.Decode(tree); err != nil { + txn.Abort() + return err + } + err = hypergraphStore.SaveVertexTree(txn, remoteUpdate.Key, tree) + if err != nil { + txn.Abort() + return err + } + + if err = txn.Commit(); err != nil { + txn.Abort() return err } - err = hypergraphStore.SaveVertexTree(remoteUpdate.Key, tree) } localTree.Insert( diff --git a/node/rpc/hypergraph_sync_rpc_server_test.go b/node/rpc/hypergraph_sync_rpc_server_test.go index 8c95dc5..caa5507 100644 --- a/node/rpc/hypergraph_sync_rpc_server_test.go +++ b/node/rpc/hypergraph_sync_rpc_server_test.go @@ -126,11 +126,12 @@ func TestHypergraphSyncServer(t *testing.T) { ) }) + txn, _ := serverHypergraphStore.NewOversizedBatch() for _, op := range operations1[:5000] { switch op.Type { case "AddVertex": id := op.Vertex.GetID() - serverHypergraphStore.SaveVertexTree(id[:], dataTree) + serverHypergraphStore.SaveVertexTree(txn, id[:], dataTree) crdts[0].AddVertex(op.Vertex) case "RemoveVertex": crdts[0].RemoveVertex(op.Vertex) @@ -140,6 +141,7 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[0].RemoveHyperedge(op.Hyperedge) } } + txn.Commit() for _, op := range operations2[:5000] { switch op.Type { case "AddVertex": @@ -153,11 +155,12 @@ func TestHypergraphSyncServer(t *testing.T) { } } + txn, _ = clientHypergraphStore.NewOversizedBatch() for _, op := range operations1[5000:] { switch op.Type { case "AddVertex": id := op.Vertex.GetID() - clientHypergraphStore.SaveVertexTree(id[:], dataTree) + clientHypergraphStore.SaveVertexTree(txn, id[:], dataTree) crdts[1].AddVertex(op.Vertex) case "RemoveVertex": crdts[1].RemoveVertex(op.Vertex) @@ -167,6 +170,7 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[1].RemoveHyperedge(op.Hyperedge) } } + txn.Commit() for _, op := range operations2[5000:] { switch op.Type { case "AddVertex": @@ -208,9 +212,16 @@ func TestHypergraphSyncServer(t *testing.T) { crdts[0].Commit() crdts[1].Commit() crdts[2].Commit() - serverHypergraphStore.SaveHypergraph(crdts[0]) - clientHypergraphStore.SaveHypergraph(crdts[1]) - controlHypergraphStore.SaveHypergraph(crdts[2]) + txn, _ = serverHypergraphStore.NewOversizedBatch() + serverHypergraphStore.SaveHypergraph(txn, crdts[0]) + txn.Commit() + txn, _ = clientHypergraphStore.NewOversizedBatch() + clientHypergraphStore.SaveHypergraph(txn, crdts[1]) + txn.Commit() + txn, _ = controlHypergraphStore.NewOversizedBatch() + controlHypergraphStore.SaveHypergraph(txn, crdts[2]) + txn.Commit() + var err error eval0, err := serverHypergraphStore.LoadHypergraph() assert.NoError(t, err) eval1, err := clientHypergraphStore.LoadHypergraph() diff --git a/node/store/hypergraph.go b/node/store/hypergraph.go index 407e687..98d0dfe 100644 --- a/node/store/hypergraph.go +++ b/node/store/hypergraph.go @@ -21,10 +21,12 @@ type HypergraphStore interface { ) LoadVertexData(id []byte) ([]application.Encrypted, error) SaveVertexTree( + txn Transaction, id []byte, vertTree *crypto.RawVectorCommitmentTree, ) error CommitAndSaveVertexData( + txn Transaction, id []byte, data []application.Encrypted, ) (*crypto.RawVectorCommitmentTree, []byte, error) @@ -33,11 +35,13 @@ type HypergraphStore interface { error, ) SaveHypergraph( + txn Transaction, hg *application.Hypergraph, ) error GetBranchNode(id NodeID) (*StoredBranchNode, error) GetLeafNode(id NodeID) (*StoredLeafNode, error) BatchWrite( + txn Transaction, branches map[NodeID]*StoredBranchNode, leaves map[NodeID]*StoredLeafNode, deletions map[NodeID]struct{}, @@ -154,6 +158,10 @@ func (p *PebbleHypergraphStore) NewTransaction(indexed bool) ( return p.db.NewBatch(indexed), nil } +func (p *PebbleHypergraphStore) NewOversizedBatch() (Transaction, error) { + return p.db.NewOversizedBatch(), nil +} + func (p *PebbleHypergraphStore) LoadVertexTree(id []byte) ( *crypto.RawVectorCommitmentTree, error, @@ -203,6 +211,7 @@ func (p *PebbleHypergraphStore) LoadVertexData(id []byte) ( } func (p *PebbleHypergraphStore) SaveVertexTree( + txn Transaction, id []byte, vertTree *crypto.RawVectorCommitmentTree, ) error { @@ -213,12 +222,13 @@ func (p *PebbleHypergraphStore) SaveVertexTree( } return errors.Wrap( - p.db.Set(hypergraphVertexDataKey(id), buf.Bytes()), + txn.Set(hypergraphVertexDataKey(id), buf.Bytes()), "save vertex tree", ) } func (p *PebbleHypergraphStore) CommitAndSaveVertexData( + txn Transaction, id []byte, data []application.Encrypted, ) (*crypto.RawVectorCommitmentTree, []byte, error) { @@ -232,7 +242,7 @@ func (p *PebbleHypergraphStore) CommitAndSaveVertexData( } return dataTree, commit, errors.Wrap( - p.db.Set(hypergraphVertexDataKey(id), buf.Bytes()), + txn.Set(hypergraphVertexDataKey(id), buf.Bytes()), "commit and save vertex data", ) } @@ -417,13 +427,14 @@ func (p *PebbleHypergraphStore) LoadHypergraph() ( } func (p *PebbleHypergraphStore) SaveHypergraph( + txn Transaction, hg *application.Hypergraph, ) error { hg.Commit() for _, vertexAdds := range hg.GetVertexAdds() { if vertexAdds.IsDirty() { - err := vertexAdds.GetTree().(*PersistentVectorTree).WriteBatch() + err := vertexAdds.GetTree().(*PersistentVectorTree).WriteBatch(txn) if err != nil { return errors.Wrap(err, "save hypergraph") } @@ -432,7 +443,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph( for _, vertexRemoves := range hg.GetVertexRemoves() { if vertexRemoves.IsDirty() { - err := vertexRemoves.GetTree().(*PersistentVectorTree).WriteBatch() + err := vertexRemoves.GetTree().(*PersistentVectorTree).WriteBatch(txn) if err != nil { return errors.Wrap(err, "save hypergraph") } @@ -441,7 +452,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph( for _, hyperedgeAdds := range hg.GetHyperedgeAdds() { if hyperedgeAdds.IsDirty() { - err := hyperedgeAdds.GetTree().(*PersistentVectorTree).WriteBatch() + err := hyperedgeAdds.GetTree().(*PersistentVectorTree).WriteBatch(txn) if err != nil { return errors.Wrap(err, "save hypergraph") } @@ -450,7 +461,7 @@ func (p *PebbleHypergraphStore) SaveHypergraph( for _, hyperedgeRemoves := range hg.GetHyperedgeRemoves() { if hyperedgeRemoves.IsDirty() { - err := hyperedgeRemoves.GetTree().(*PersistentVectorTree).WriteBatch() + err := hyperedgeRemoves.GetTree().(*PersistentVectorTree).WriteBatch(txn) if err != nil { return errors.Wrap(err, "save hypergraph") } @@ -505,12 +516,13 @@ func (p *PebbleHypergraphStore) GetLeafNode(id NodeID) ( } func (p *PebbleHypergraphStore) BatchWrite( + txn Transaction, branches map[NodeID]*StoredBranchNode, leaves map[NodeID]*StoredLeafNode, deletions map[NodeID]struct{}, ) error { for id := range deletions { - if err := p.db.Delete([]byte(id)); err != nil { + if err := txn.Delete([]byte(id)); err != nil { return errors.Wrap(err, "batch write") } } @@ -522,7 +534,7 @@ func (p *PebbleHypergraphStore) BatchWrite( return errors.Wrap(err, "batch write") } - if err := p.db.Set([]byte(id), buf.Bytes()); err != nil { + if err := txn.Set([]byte(id), buf.Bytes()); err != nil { return errors.Wrap(err, "batch write") } } @@ -534,7 +546,7 @@ func (p *PebbleHypergraphStore) BatchWrite( return errors.Wrap(err, "batch write") } - if err := p.db.Set([]byte(id), buf.Bytes()); err != nil { + if err := txn.Set([]byte(id), buf.Bytes()); err != nil { return errors.Wrap(err, "batch write") } } @@ -1205,8 +1217,8 @@ func (t *PersistentVectorTree) Commit(recalculate bool) []byte { return t.tree.Commit(recalculate) } -func (t *PersistentVectorTree) WriteBatch() error { - err := t.store.BatchWrite(t.addedBranches, t.addedLeaves, t.deletions) +func (t *PersistentVectorTree) WriteBatch(txn Transaction) error { + err := t.store.BatchWrite(txn, t.addedBranches, t.addedLeaves, t.deletions) if err != nil { return errors.Wrap(err, "write batch") } diff --git a/node/store/kvdb.go b/node/store/kvdb.go index 61422da..20142fa 100644 --- a/node/store/kvdb.go +++ b/node/store/kvdb.go @@ -9,6 +9,7 @@ type KVDB interface { Set(key, value []byte) error Delete(key []byte) error NewBatch(indexed bool) Transaction + NewOversizedBatch() Transaction NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) Compact(start, end []byte, parallelize bool) error CompactAll() error diff --git a/node/store/pebble.go b/node/store/pebble.go index 6e497be..2e10225 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -1,7 +1,13 @@ package store import ( + "fmt" "io" + "io/fs" + "os" + "path" + "path/filepath" + "time" "github.com/cockroachdb/pebble" "github.com/pkg/errors" @@ -9,7 +15,8 @@ import ( ) type PebbleDB struct { - db *pebble.DB + config *config.DBConfig + db *pebble.DB } func NewPebbleDB(config *config.DBConfig) *PebbleDB { @@ -18,7 +25,7 @@ func NewPebbleDB(config *config.DBConfig) *PebbleDB { panic(err) } - return &PebbleDB{db} + return &PebbleDB{config, db} } func (p *PebbleDB) Get(key []byte) ([]byte, io.Closer, error) { @@ -45,6 +52,24 @@ func (p *PebbleDB) NewBatch(indexed bool) Transaction { } } +func (p *PebbleDB) NewOversizedBatch() Transaction { + path := path.Join( + p.config.Path, + fmt.Sprintf("batch-%d", time.Now().UnixMilli()), + ) + + db, err := pebble.Open(path, &pebble.Options{}) + if err != nil { + panic(err) + } + + return &PebbleIngestTransaction{ + path: path, + parent: p.db, + b: db, + } +} + func (p *PebbleDB) NewIter(lowerBound []byte, upperBound []byte) ( Iterator, error, @@ -103,6 +128,73 @@ type Transaction interface { DeleteRange(lowerBound []byte, upperBound []byte) error } +type PebbleIngestTransaction struct { + path string + parent *pebble.DB + b *pebble.DB +} + +func (t *PebbleIngestTransaction) Get(key []byte) ([]byte, io.Closer, error) { + return t.b.Get(key) +} + +func (t *PebbleIngestTransaction) Set(key []byte, value []byte) error { + return t.b.Set(key, value, &pebble.WriteOptions{Sync: true}) +} + +func (t *PebbleIngestTransaction) Commit() error { + t.b.Close() + find := func(root, ext string) []string { + var a []string + filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error { + if e != nil { + return e + } + if filepath.Ext(d.Name()) == ext { + a = append(a, s) + } + return nil + }) + return a + } + + err := t.parent.Ingest(find(t.path, ".sst")) + if err != nil { + return errors.Wrap(err, "commit") + } + + wait, err := t.parent.AsyncFlush() + if err != nil { + return errors.Wrap(err, "commit") + } + + <-wait + return nil +} + +func (t *PebbleIngestTransaction) Delete(key []byte) error { + return t.b.Delete(key, &pebble.WriteOptions{Sync: true}) +} + +func (t *PebbleIngestTransaction) Abort() error { + t.b.Close() + return errors.Wrap(os.RemoveAll(t.path), "abort") +} + +func (t *PebbleIngestTransaction) NewIter(lowerBound []byte, upperBound []byte) ( + Iterator, + error, +) { + return nil, errors.New("unsupported") +} + +func (t *PebbleIngestTransaction) DeleteRange( + lowerBound []byte, + upperBound []byte, +) error { + return errors.New("unsupported") +} + type PebbleTransaction struct { b *pebble.Batch } @@ -149,6 +241,7 @@ func (t *PebbleTransaction) DeleteRange( } var _ Transaction = (*PebbleTransaction)(nil) +var _ Transaction = (*PebbleIngestTransaction)(nil) func rightAlign(data []byte, size int) []byte { l := len(data)