frame pruning

This commit is contained in:
Cassandra Heart 2024-11-17 04:37:45 -06:00
parent 14f7e3c40d
commit f511149b36
No known key found for this signature in database
GPG Key ID: 6352152859385958
7 changed files with 215 additions and 10 deletions

View File

@ -490,6 +490,8 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
go e.runLoop()
go e.runSync()
go e.runFramePruning()
go func() {
time.Sleep(30 * time.Second)
e.logger.Info("checking for snapshots to play forward")

View File

@ -0,0 +1,13 @@
package data
import "go.uber.org/zap"
func (e *DataClockConsensusEngine) pruneFrames(maxFrame uint64) error {
e.logger.Info("pruning frames", zap.Uint64("max_frame_to_prune", maxFrame))
err := e.clockStore.DeleteDataClockFrameRange(e.filter, 1, maxFrame)
if err != nil {
e.logger.Error("failed to prune frames", zap.Error(err))
return err
}
return nil
}

View File

@ -41,7 +41,51 @@ func (
return frameProverTries
}
func (e *DataClockConsensusEngine) runFramePruning() {
// A full prover should _never_ do this
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) ||
e.config.Engine.MaxFrames == -1 || e.config.Engine.FullProver {
e.logger.Info("frame pruning not enabled")
return
}
if e.config.Engine.MaxFrames < 1000 {
e.logger.Warn(
"max frames for pruning too low, pruning disabled",
zap.Int64("max_frames", e.config.Engine.MaxFrames),
)
return
}
for {
select {
case <-e.ctx.Done():
return
case <-time.After(1 * time.Hour):
head, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
if head.FrameNumber < uint64(e.config.Engine.MaxFrames)+1 {
continue
}
if err := e.pruneFrames(
head.FrameNumber - uint64(e.config.Engine.MaxFrames),
); err != nil {
e.logger.Error("could not prune", zap.Error(err))
}
}
}
}
func (e *DataClockConsensusEngine) runSync() {
// small optimization, beacon should never collect for now:
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
return
}
for {
select {
case <-e.ctx.Done():

View File

@ -98,6 +98,11 @@ type ClockStore interface {
frame *protobufs.ClockFrame,
tries []*tries.RollingFrecencyCritbitTrie,
) error
DeleteDataClockFrameRange(
filter []byte,
minFrameNumber uint64,
maxFrameNumber uint64,
) error
}
type PebbleClockStore struct {
@ -714,6 +719,26 @@ func (p *PebbleClockStore) fillAggregateProofs(
return nil
}
func (p *PebbleClockStore) deleteAggregateProofs(
txn Transaction,
frame *protobufs.ClockFrame,
) error {
for i := 0; i < len(frame.Input[516:])/74; i++ {
commit := frame.Input[516+(i*74) : 516+((i+1)*74)]
err := internalDeleteAggregateProof(
p.db,
txn,
frame.AggregateProofs[i],
commit,
)
if err != nil {
return errors.Wrap(err, "delete aggregate proofs")
}
}
return nil
}
func (p *PebbleClockStore) saveAggregateProofs(
txn Transaction,
frame *protobufs.ClockFrame,
@ -1025,16 +1050,46 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange(
fromFrameNumber uint64,
toFrameNumber uint64,
) error {
err := p.db.DeleteRange(
clockDataFrameKey(
filter,
fromFrameNumber,
),
clockDataFrameKey(
filter,
toFrameNumber,
),
)
txn, err := p.NewTransaction()
if err != nil {
return errors.Wrap(err, "delete data clock frame range")
}
for i := fromFrameNumber; i < toFrameNumber; i++ {
frames, err := p.GetStagedDataClockFramesForFrameNumber(filter, i)
if err != nil {
return errors.Wrap(err, "delete data clock frame range")
}
for _, frame := range frames {
err = p.deleteAggregateProofs(txn, frame)
if err != nil {
txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
}
err = txn.DeleteRange(
clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0x00}, 32)),
clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0xff}, 32)),
)
if err != nil {
txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
err = txn.Delete(clockDataFrameKey(filter, i))
if err != nil {
txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
}
if err = txn.Commit(); err != nil {
txn.Abort()
return errors.Wrap(err, "delete data clock frame range")
}
return errors.Wrap(err, "delete data clock frame range")
}

View File

@ -325,6 +325,58 @@ func (p *PebbleDataProofStore) GetAggregateProof(
)
}
func internalDeleteAggregateProof(
db KVDB,
txn Transaction,
aggregateProof *protobufs.InclusionAggregateProof,
commitment []byte,
) error {
buf := binary.BigEndian.AppendUint64(
nil,
uint64(len(aggregateProof.InclusionCommitments)),
)
buf = append(buf, aggregateProof.Proof...)
for i, inc := range aggregateProof.InclusionCommitments {
var segments [][]byte
if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType {
o := &protobufs.IntrinsicExecutionOutput{}
if err := proto.Unmarshal(inc.Data, o); err != nil {
return errors.Wrap(err, "delete aggregate proof")
}
leftBits := append([]byte{}, o.Address...)
leftBits = append(leftBits, o.Output...)
rightBits := o.Proof
segments = [][]byte{leftBits, rightBits}
} else {
segments = [][]byte{inc.Data}
}
for _, segment := range segments {
hash := sha3.Sum256(segment)
if err := txn.Delete(
dataProofSegmentKey(aggregateProof.Filter, hash[:]),
); err != nil {
return errors.Wrap(err, "delete aggregate proof")
}
}
if err := txn.Delete(
dataProofInclusionKey(aggregateProof.Filter, commitment, uint64(i)),
); err != nil {
return errors.Wrap(err, "delete aggregate proof")
}
}
if err := txn.Delete(
dataProofMetadataKey(aggregateProof.Filter, commitment),
); err != nil {
return errors.Wrap(err, "delete aggregate proof")
}
return nil
}
func internalPutAggregateProof(
db KVDB,
txn Transaction,

View File

@ -243,6 +243,7 @@ func (t *InMemKVDBTransaction) Delete(key []byte) error {
if !t.db.open {
return errors.New("inmem db closed")
}
t.changes = append(t.changes, InMemKVDBOperation{
op: DeleteOperation,
key: key,
@ -268,6 +269,32 @@ func (t *InMemKVDBTransaction) NewIter(lowerBound []byte, upperBound []byte) (
}, nil
}
func (t *InMemKVDBTransaction) DeleteRange(
lowerBound []byte,
upperBound []byte,
) error {
if !t.db.open {
return errors.New("inmem db closed")
}
iter, err := t.NewIter(lowerBound, upperBound)
if err != nil {
return err
}
for iter.First(); iter.Valid(); iter.Next() {
t.changes = append(t.changes, InMemKVDBOperation{
op: DeleteOperation,
key: iter.Key(),
})
if err != nil {
return err
}
}
return nil
}
func (t *InMemKVDBTransaction) Abort() error {
return nil
}

View File

@ -94,6 +94,7 @@ type Transaction interface {
Delete(key []byte) error
Abort() error
NewIter(lowerBound []byte, upperBound []byte) (Iterator, error)
DeleteRange(lowerBound []byte, upperBound []byte) error
}
type PebbleTransaction struct {
@ -130,6 +131,17 @@ func (t *PebbleTransaction) NewIter(lowerBound []byte, upperBound []byte) (
})
}
func (t *PebbleTransaction) DeleteRange(
lowerBound []byte,
upperBound []byte,
) error {
return t.b.DeleteRange(
lowerBound,
upperBound,
&pebble.WriteOptions{Sync: true},
)
}
var _ Transaction = (*PebbleTransaction)(nil)
func rightAlign(data []byte, size int) []byte {