allow choice of indexed or non-indexed batches

This commit is contained in:
Cassandra Heart 2024-11-17 07:58:07 -06:00
parent ceba9ff200
commit 29a129d543
No known key found for this signature in database
GPG Key ID: 6352152859385958
17 changed files with 53 additions and 46 deletions

View File

@ -428,7 +428,7 @@ func (e *DataClockConsensusEngine) handleMint(
highestIncrement = previousIncrement
}
txn, err := e.coinStore.NewTransaction()
txn, err := e.coinStore.NewTransaction(false)
if err != nil {
return nil, errors.Wrap(err, "handle mint")
}

View File

@ -661,7 +661,7 @@ func TestHandlePreMidnightMint(t *testing.T) {
assert.Len(t, success.Requests, 1)
assert.Len(t, fail.Requests, 1)
txn, _ := app.CoinStore.NewTransaction()
txn, _ := app.CoinStore.NewTransaction(false)
for i, o := range app.TokenOutputs.Outputs {
switch e := o.Output.(type) {
case *protobufs.TokenOutput_Coin:
@ -670,7 +670,7 @@ func TestHandlePreMidnightMint(t *testing.T) {
err = app.CoinStore.PutCoin(txn, 1, a, e.Coin)
assert.NoError(t, err)
case *protobufs.TokenOutput_DeletedCoin:
c, err := app.CoinStore.GetCoinByAddress(txn, e.DeletedCoin.Address)
c, err := app.CoinStore.GetCoinByAddress(nil, e.DeletedCoin.Address)
assert.NoError(t, err)
err = app.CoinStore.DeleteCoin(txn, e.DeletedCoin.Address, c)
assert.NoError(t, err)

View File

@ -290,7 +290,7 @@ func (d *DataTimeReel) createGenesisFrame() (
panic(err)
}
txn, err := d.clockStore.NewTransaction()
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -311,7 +311,7 @@ func (d *DataTimeReel) createGenesisFrame() (
panic(err)
}
txn, err = d.clockStore.NewTransaction()
txn, err = d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -531,7 +531,7 @@ func (d *DataTimeReel) storePending(
zap.String("distance", distance.Text(16)),
)
txn, err := d.clockStore.NewTransaction()
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -638,7 +638,7 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e
zap.Uint64("head_number", d.head.FrameNumber),
zap.String("head_output_tag", hex.EncodeToString(d.head.Output[:64])),
)
txn, err := d.clockStore.NewTransaction()
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -933,7 +933,7 @@ func (d *DataTimeReel) forkChoice(
rightReplaySelectors =
rightReplaySelectors[1:]
txn, err := d.clockStore.NewTransaction()
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -956,7 +956,7 @@ func (d *DataTimeReel) forkChoice(
frameNumber++
}
txn, err := d.clockStore.NewTransaction()
txn, err := d.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}

View File

@ -187,6 +187,7 @@ func TestDataTimeReel(t *testing.T) {
},
pubKeys,
true,
func() []*tries.RollingFrecencyCritbitTrie { return []*tries.RollingFrecencyCritbitTrie{} },
)
err = d.Start()

View File

@ -164,7 +164,7 @@ func (m *MasterTimeReel) createGenesisFrame() *protobufs.ClockFrame {
panic(err)
}
txn, err := m.clockStore.NewTransaction()
txn, err := m.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -212,7 +212,7 @@ func (m *MasterTimeReel) runLoop() {
continue
}
txn, err := m.clockStore.NewTransaction()
txn, err := m.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -281,7 +281,7 @@ func (m *MasterTimeReel) processPending() {
continue
}
txn, err := m.clockStore.NewTransaction()
txn, err := m.clockStore.NewTransaction(false)
if err != nil {
panic(err)
}

View File

@ -83,7 +83,7 @@ func TestHandleProverJoin(t *testing.T) {
[][]byte{bpub},
)
selbi, _ := gen.GetSelector()
txn, _ := app.ClockStore.NewTransaction()
txn, _ := app.ClockStore.NewTransaction(false)
app.ClockStore.StageDataClockFrame(selbi.FillBytes(make([]byte, 32)), gen, txn)
app.ClockStore.CommitDataClockFrame(gen.Filter, 0, selbi.FillBytes(make([]byte, 32)), app.Tries, txn, false)
txn.Commit()
@ -115,7 +115,7 @@ func TestHandleProverJoin(t *testing.T) {
assert.Len(t, fail.Requests, 0)
app.Tries = append(app.Tries, &tries.RollingFrecencyCritbitTrie{})
app.Tries[1].Add(addr, 0)
txn, _ = app.ClockStore.NewTransaction()
txn, _ = app.ClockStore.NewTransaction(false)
frame1, _ := wprover.ProveDataClockFrame(gen, [][]byte{}, []*protobufs.InclusionAggregateProof{}, bprivKey, time.Now().UnixMilli(), 10000)
selbi, _ = frame1.GetSelector()
app.ClockStore.StageDataClockFrame(selbi.FillBytes(make([]byte, 32)), frame1, txn)
@ -144,7 +144,7 @@ func TestHandleProverJoin(t *testing.T) {
false,
)
assert.Error(t, err)
txn, _ = app.ClockStore.NewTransaction()
txn, _ = app.ClockStore.NewTransaction(false)
frame2, _ := wprover.ProveDataClockFrame(frame1, [][]byte{}, []*protobufs.InclusionAggregateProof{}, bprivKey, time.Now().UnixMilli(), 10000)
selbi, _ = frame2.GetSelector()
app.ClockStore.StageDataClockFrame(selbi.FillBytes(make([]byte, 32)), frame2, txn)
@ -205,7 +205,7 @@ func TestHandleProverJoin(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, success.Requests, 1)
assert.Len(t, app.TokenOutputs.Outputs, 1)
txn, _ = app.CoinStore.NewTransaction()
txn, _ = app.CoinStore.NewTransaction(false)
for i, o := range app.TokenOutputs.Outputs {
switch e := o.Output.(type) {
case *protobufs.TokenOutput_Coin:
@ -233,7 +233,7 @@ func TestHandleProverJoin(t *testing.T) {
}
}
err = txn.Commit()
txn, _ = app.ClockStore.NewTransaction()
txn, _ = app.ClockStore.NewTransaction(false)
frame3, _ := wprover.ProveDataClockFrame(frame2, [][]byte{}, []*protobufs.InclusionAggregateProof{}, bprivKey, time.Now().UnixMilli(), 10000)
selbi, _ = frame3.GetSelector()
app.ClockStore.StageDataClockFrame(selbi.FillBytes(make([]byte, 32)), frame3, txn)

View File

@ -185,7 +185,7 @@ func NewTokenExecutionEngine(
panic(err)
}
txn, err := clockStore.NewTransaction()
txn, err := clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -249,7 +249,7 @@ func NewTokenExecutionEngine(
panic(err)
}
txn, err := clockStore.NewTransaction()
txn, err := clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -559,7 +559,7 @@ func (e *TokenExecutionEngine) ProcessFrame(
return nil, errors.Wrap(err, "process frame")
}
case *protobufs.TokenOutput_DeletedCoin:
coin, err := e.coinStore.GetCoinByAddress(txn, o.DeletedCoin.Address)
coin, err := e.coinStore.GetCoinByAddress(nil, o.DeletedCoin.Address)
if err != nil {
txn.Abort()
return nil, errors.Wrap(err, "process frame")

View File

@ -848,7 +848,7 @@ func CreateGenesisState(
totalExecutions,
),
)
txn, err := coinStore.NewTransaction()
txn, err := coinStore.NewTransaction(false)
for _, output := range genesisState.Outputs {
if err != nil {
panic(err)
@ -872,7 +872,7 @@ func CreateGenesisState(
panic(err)
}
txn, err = clockStore.NewTransaction()
txn, err = clockStore.NewTransaction(false)
if err != nil {
panic(err)
}
@ -989,7 +989,7 @@ func CreateGenesisState(
})
logger.Info("serializing execution state to store")
txn, err := coinStore.NewTransaction()
txn, err := coinStore.NewTransaction(false)
for _, output := range genesisState.Outputs {
if err != nil {
panic(err)

View File

@ -598,14 +598,14 @@ func RunForkRepairIfNeeded(
if bytes.Equal(badFrameSelector, compareSel.FillBytes(make([]byte, 32))) {
logger.Info("performing fork repair")
txn, _ := coinStore.NewTransaction()
txn, _ := coinStore.NewTransaction(false)
_, outs, _ := application.GetOutputsFromClockFrame(frame)
logger.Info("removing invalid frame at position 48995")
for i, output := range outs.Outputs {
switch o := output.Output.(type) {
case *protobufs.TokenOutput_Coin:
address, _ := token.GetAddressOfCoin(o.Coin, frame.FrameNumber, uint64(i))
coin, err := coinStore.GetCoinByAddress(txn, address)
coin, err := coinStore.GetCoinByAddress(nil, address)
if err != nil {
fmt.Println(err)
return
@ -656,7 +656,7 @@ func RunForkRepairIfNeeded(
return
}
txn, _ := clockStore.NewTransaction()
txn, _ := clockStore.NewTransaction(false)
if err := overrideHead(
txn,
clockStore,

View File

@ -18,7 +18,7 @@ import (
)
type ClockStore interface {
NewTransaction() (Transaction, error)
NewTransaction(indexed bool) (Transaction, error)
GetLatestMasterClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetEarliestMasterClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetMasterClockFrame(
@ -453,8 +453,8 @@ func clockDataSeniorityKey(
return key
}
func (p *PebbleClockStore) NewTransaction() (Transaction, error) {
return p.db.NewBatch(), nil
func (p *PebbleClockStore) NewTransaction(indexed bool) (Transaction, error) {
return p.db.NewBatch(indexed), nil
}
// GetEarliestMasterClockFrame implements ClockStore.
@ -746,7 +746,7 @@ func (p *PebbleClockStore) saveAggregateProofs(
shouldClose := false
if txn == nil {
var err error
txn, err = p.NewTransaction()
txn, err = p.NewTransaction(false)
if err != nil {
return err
}
@ -1050,7 +1050,7 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange(
fromFrameNumber uint64,
toFrameNumber uint64,
) error {
txn, err := p.NewTransaction()
txn, err := p.NewTransaction(false)
if err != nil {
return errors.Wrap(err, "delete data clock frame range")
}

View File

@ -14,7 +14,7 @@ import (
)
type CoinStore interface {
NewTransaction() (Transaction, error)
NewTransaction(indexed bool) (Transaction, error)
GetCoinsForOwner(owner []byte) ([]uint64, [][]byte, []*protobufs.Coin, error)
GetPreCoinProofsForOwner(owner []byte) (
[]uint64,
@ -117,8 +117,8 @@ func genesisSeedKey() []byte {
return []byte{COIN, GENESIS}
}
func (p *PebbleCoinStore) NewTransaction() (Transaction, error) {
return p.db.NewBatch(), nil
func (p *PebbleCoinStore) NewTransaction(indexed bool) (Transaction, error) {
return p.db.NewBatch(indexed), nil
}
func (p *PebbleCoinStore) GetCoinsForOwner(
@ -415,7 +415,7 @@ func (p *PebbleCoinStore) SetMigrationVersion(
return errors.Wrap(err, "migrate")
}
txn, err := p.NewTransaction()
txn, err := p.NewTransaction(false)
if err != nil {
return nil
}
@ -493,7 +493,7 @@ func (p *PebbleCoinStore) internalMigrate(
panic(err)
}
txn, err := p.NewTransaction()
txn, err := p.NewTransaction(false)
if err != nil {
return nil
}
@ -537,7 +537,7 @@ func (p *PebbleCoinStore) Migrate(filter []byte, genesisSeedHex string) error {
return errors.Wrap(err, "migrate")
}
txn, err := p.NewTransaction()
txn, err := p.NewTransaction(false)
if err != nil {
return nil
}

View File

@ -122,7 +122,7 @@ func dataTimeProofLatestKey(peerId []byte) []byte {
}
func (p *PebbleDataProofStore) NewTransaction() (Transaction, error) {
return p.db.NewBatch(), nil
return p.db.NewBatch(false), nil
}
func internalGetAggregateProof(

View File

@ -363,7 +363,7 @@ func (d *InMemKVDB) Delete(key []byte) error {
return nil
}
func (d *InMemKVDB) NewBatch() Transaction {
func (d *InMemKVDB) NewBatch(indexed bool) Transaction {
if !d.open {
return nil
}

View File

@ -217,7 +217,7 @@ func keyBundleEarliestKey(provingKey []byte) []byte {
}
func (p *PebbleKeyStore) NewTransaction() (Transaction, error) {
return p.db.NewBatch(), nil
return p.db.NewBatch(false), nil
}
// Stages a proving key for later inclusion on proof of meaningful work.

View File

@ -8,7 +8,7 @@ type KVDB interface {
Get(key []byte) ([]byte, io.Closer, error)
Set(key, value []byte) error
Delete(key []byte) error
NewBatch() Transaction
NewBatch(indexed bool) Transaction
NewIter(lowerBound []byte, upperBound []byte) (Iterator, error)
Compact(start, end []byte, parallelize bool) error
CompactAll() error

View File

@ -33,9 +33,15 @@ func (p *PebbleDB) Delete(key []byte) error {
return p.db.Delete(key, &pebble.WriteOptions{Sync: true})
}
func (p *PebbleDB) NewBatch() Transaction {
return &PebbleTransaction{
b: p.db.NewIndexedBatch(),
func (p *PebbleDB) NewBatch(indexed bool) Transaction {
if indexed {
return &PebbleTransaction{
b: p.db.NewIndexedBatch(),
}
} else {
return &PebbleTransaction{
b: p.db.NewBatch(),
}
}
}

View File

@ -180,7 +180,7 @@ func (d *PeerstoreDatastore) Close() (err error) {
func (d *PeerstoreDatastore) Batch(ctx context.Context) (ds.Batch, error) {
return &batch{
b: &transaction{tx: d.db.NewBatch()},
b: &transaction{tx: d.db.NewBatch(false)},
db: d.db,
}, nil
}
@ -189,7 +189,7 @@ func (d *PeerstoreDatastore) NewTransaction(
ctx context.Context,
readOnly bool,
) (ds.Txn, error) {
tx := d.db.NewBatch()
tx := d.db.NewBatch(false)
return &transaction{tx}, nil
}