diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index fd4e3b3..910c327 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -490,6 +490,8 @@ func (e *DataClockConsensusEngine) Start() <-chan error { go e.runLoop() go e.runSync() + go e.runFramePruning() + go func() { time.Sleep(30 * time.Second) e.logger.Info("checking for snapshots to play forward") diff --git a/node/consensus/data/frame_pruner.go b/node/consensus/data/frame_pruner.go new file mode 100644 index 0000000..dc68f58 --- /dev/null +++ b/node/consensus/data/frame_pruner.go @@ -0,0 +1,13 @@ +package data + +import "go.uber.org/zap" + +func (e *DataClockConsensusEngine) pruneFrames(maxFrame uint64) error { + e.logger.Info("pruning frames", zap.Uint64("max_frame_to_prune", maxFrame)) + err := e.clockStore.DeleteDataClockFrameRange(e.filter, 1, maxFrame) + if err != nil { + e.logger.Error("failed to prune frames", zap.Error(err)) + return err + } + return nil +} diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 7706ee1..4da1fcc 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -41,7 +41,51 @@ func ( return frameProverTries } +func (e *DataClockConsensusEngine) runFramePruning() { + // A full prover should _never_ do this + if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) || + e.config.Engine.MaxFrames == -1 || e.config.Engine.FullProver { + e.logger.Info("frame pruning not enabled") + return + } + + if e.config.Engine.MaxFrames < 1000 { + e.logger.Warn( + "max frames for pruning too low, pruning disabled", + zap.Int64("max_frames", e.config.Engine.MaxFrames), + ) + return + } + + for { + select { + case <-e.ctx.Done(): + return + case <-time.After(1 * time.Hour): + head, err := e.dataTimeReel.Head() + if err != nil { + panic(err) + } + + if head.FrameNumber < uint64(e.config.Engine.MaxFrames)+1 { + continue + } + + if err := e.pruneFrames( + head.FrameNumber - uint64(e.config.Engine.MaxFrames), + ); err != nil { + e.logger.Error("could not prune", zap.Error(err)) + } + } + } +} + func (e *DataClockConsensusEngine) runSync() { + // small optimization, beacon should never collect for now: + if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { + return + } + for { select { case <-e.ctx.Done(): diff --git a/node/store/clock.go b/node/store/clock.go index 22591e4..8b2f4cf 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -98,6 +98,11 @@ type ClockStore interface { frame *protobufs.ClockFrame, tries []*tries.RollingFrecencyCritbitTrie, ) error + DeleteDataClockFrameRange( + filter []byte, + minFrameNumber uint64, + maxFrameNumber uint64, + ) error } type PebbleClockStore struct { @@ -714,6 +719,26 @@ func (p *PebbleClockStore) fillAggregateProofs( return nil } +func (p *PebbleClockStore) deleteAggregateProofs( + txn Transaction, + frame *protobufs.ClockFrame, +) error { + for i := 0; i < len(frame.Input[516:])/74; i++ { + commit := frame.Input[516+(i*74) : 516+((i+1)*74)] + err := internalDeleteAggregateProof( + p.db, + txn, + frame.AggregateProofs[i], + commit, + ) + if err != nil { + return errors.Wrap(err, "delete aggregate proofs") + } + } + + return nil +} + func (p *PebbleClockStore) saveAggregateProofs( txn Transaction, frame *protobufs.ClockFrame, @@ -1025,16 +1050,46 @@ func (p *PebbleClockStore) DeleteDataClockFrameRange( fromFrameNumber uint64, toFrameNumber uint64, ) error { - err := p.db.DeleteRange( - clockDataFrameKey( - filter, - fromFrameNumber, - ), - clockDataFrameKey( - filter, - toFrameNumber, - ), - ) + txn, err := p.NewTransaction() + if err != nil { + return errors.Wrap(err, "delete data clock frame range") + } + + for i := fromFrameNumber; i < toFrameNumber; i++ { + frames, err := p.GetStagedDataClockFramesForFrameNumber(filter, i) + if err != nil { + return errors.Wrap(err, "delete data clock frame range") + } + + for _, frame := range frames { + err = p.deleteAggregateProofs(txn, frame) + if err != nil { + txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + } + + err = txn.DeleteRange( + clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0x00}, 32)), + clockDataParentIndexKey(filter, i, bytes.Repeat([]byte{0xff}, 32)), + ) + if err != nil { + txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + + err = txn.Delete(clockDataFrameKey(filter, i)) + if err != nil { + txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + } + + if err = txn.Commit(); err != nil { + txn.Abort() + return errors.Wrap(err, "delete data clock frame range") + } + return errors.Wrap(err, "delete data clock frame range") } diff --git a/node/store/data_proof.go b/node/store/data_proof.go index b41713e..94644a9 100644 --- a/node/store/data_proof.go +++ b/node/store/data_proof.go @@ -325,6 +325,58 @@ func (p *PebbleDataProofStore) GetAggregateProof( ) } +func internalDeleteAggregateProof( + db KVDB, + txn Transaction, + aggregateProof *protobufs.InclusionAggregateProof, + commitment []byte, +) error { + buf := binary.BigEndian.AppendUint64( + nil, + uint64(len(aggregateProof.InclusionCommitments)), + ) + buf = append(buf, aggregateProof.Proof...) + + for i, inc := range aggregateProof.InclusionCommitments { + var segments [][]byte + if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType { + o := &protobufs.IntrinsicExecutionOutput{} + if err := proto.Unmarshal(inc.Data, o); err != nil { + return errors.Wrap(err, "delete aggregate proof") + } + leftBits := append([]byte{}, o.Address...) + leftBits = append(leftBits, o.Output...) + rightBits := o.Proof + segments = [][]byte{leftBits, rightBits} + } else { + segments = [][]byte{inc.Data} + } + + for _, segment := range segments { + hash := sha3.Sum256(segment) + if err := txn.Delete( + dataProofSegmentKey(aggregateProof.Filter, hash[:]), + ); err != nil { + return errors.Wrap(err, "delete aggregate proof") + } + } + + if err := txn.Delete( + dataProofInclusionKey(aggregateProof.Filter, commitment, uint64(i)), + ); err != nil { + return errors.Wrap(err, "delete aggregate proof") + } + } + + if err := txn.Delete( + dataProofMetadataKey(aggregateProof.Filter, commitment), + ); err != nil { + return errors.Wrap(err, "delete aggregate proof") + } + + return nil +} + func internalPutAggregateProof( db KVDB, txn Transaction, diff --git a/node/store/inmem.go b/node/store/inmem.go index 702626f..81c2a16 100644 --- a/node/store/inmem.go +++ b/node/store/inmem.go @@ -243,6 +243,7 @@ func (t *InMemKVDBTransaction) Delete(key []byte) error { if !t.db.open { return errors.New("inmem db closed") } + t.changes = append(t.changes, InMemKVDBOperation{ op: DeleteOperation, key: key, @@ -268,6 +269,32 @@ func (t *InMemKVDBTransaction) NewIter(lowerBound []byte, upperBound []byte) ( }, nil } +func (t *InMemKVDBTransaction) DeleteRange( + lowerBound []byte, + upperBound []byte, +) error { + if !t.db.open { + return errors.New("inmem db closed") + } + + iter, err := t.NewIter(lowerBound, upperBound) + if err != nil { + return err + } + + for iter.First(); iter.Valid(); iter.Next() { + t.changes = append(t.changes, InMemKVDBOperation{ + op: DeleteOperation, + key: iter.Key(), + }) + if err != nil { + return err + } + } + + return nil +} + func (t *InMemKVDBTransaction) Abort() error { return nil } diff --git a/node/store/pebble.go b/node/store/pebble.go index 9e2cbb9..cd793d3 100644 --- a/node/store/pebble.go +++ b/node/store/pebble.go @@ -94,6 +94,7 @@ type Transaction interface { Delete(key []byte) error Abort() error NewIter(lowerBound []byte, upperBound []byte) (Iterator, error) + DeleteRange(lowerBound []byte, upperBound []byte) error } type PebbleTransaction struct { @@ -130,6 +131,17 @@ func (t *PebbleTransaction) NewIter(lowerBound []byte, upperBound []byte) ( }) } +func (t *PebbleTransaction) DeleteRange( + lowerBound []byte, + upperBound []byte, +) error { + return t.b.DeleteRange( + lowerBound, + upperBound, + &pebble.WriteOptions{Sync: true}, + ) +} + var _ Transaction = (*PebbleTransaction)(nil) func rightAlign(data []byte, size int) []byte {