ceremonyclient/node/consensus/data/main_data_loop.go
Cassandra Heart 7f8137df67
reset cutoffs
2025-01-30 03:20:52 -06:00

290 lines
6.7 KiB
Go

package data
import (
"bytes"
"time"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application"
"source.quilibrium.com/quilibrium/monorepo/node/internal/cas"
"source.quilibrium.com/quilibrium/monorepo/node/internal/frametime"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
func (
e *DataClockConsensusEngine,
) GetFrameProverTries() []*tries.RollingFrecencyCritbitTrie {
e.frameProverTriesMx.RLock()
frameProverTries := make(
[]*tries.RollingFrecencyCritbitTrie,
len(e.frameProverTries),
)
for i, trie := range e.frameProverTries {
newTrie := &tries.RollingFrecencyCritbitTrie{}
b, err := trie.Serialize()
if err != nil {
panic(err)
}
err = newTrie.Deserialize(b)
if err != nil {
panic(err)
}
frameProverTries[i] = newTrie
}
e.frameProverTriesMx.RUnlock()
return frameProverTries
}
func (e *DataClockConsensusEngine) GetFrameProverTrie(i int) *tries.RollingFrecencyCritbitTrie {
e.frameProverTriesMx.RLock()
defer e.frameProverTriesMx.RUnlock()
newTrie := &tries.RollingFrecencyCritbitTrie{}
if i < 0 || i >= len(e.frameProverTries) {
return newTrie
}
b, err := e.frameProverTries[i].Serialize()
if err != nil {
panic(err)
}
if err := newTrie.Deserialize(b); err != nil {
panic(err)
}
return newTrie
}
func (e *DataClockConsensusEngine) FrameProverTriesContains(
key []byte,
) bool {
e.frameProverTriesMx.RLock()
defer e.frameProverTriesMx.RUnlock()
for _, trie := range e.frameProverTries {
if trie.Contains(key) {
return true
}
}
return false
}
func (e *DataClockConsensusEngine) FrameProverTrieContains(
i int,
key []byte,
) bool {
e.frameProverTriesMx.RLock()
defer e.frameProverTriesMx.RUnlock()
if i < 0 || i >= len(e.frameProverTries) {
return false
}
return e.frameProverTries[i].Contains(key)
}
func (e *DataClockConsensusEngine) runFramePruning() {
defer e.wg.Done()
// A full prover should _never_ do this
if e.FrameProverTrieContains(0, e.provingKeyAddress) ||
e.config.Engine.MaxFrames == -1 || e.config.Engine.FullProver {
e.logger.Info("frame pruning not enabled")
return
}
if e.config.Engine.MaxFrames < 1000 {
e.logger.Warn(
"max frames for pruning too low, pruning disabled",
zap.Int64("max_frames", e.config.Engine.MaxFrames),
)
return
}
e.logger.Info("frame pruning enabled, waiting for delay timeout expiry")
from := uint64(1)
maxFrames := uint64(e.config.Engine.MaxFrames)
batchSize := uint64(1000)
outer:
for {
select {
case <-e.ctx.Done():
return
case <-time.After(1 * time.Minute):
head, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
if head.FrameNumber <= maxFrames ||
head.FrameNumber <= application.PROOF_FRAME_SENIORITY_REPAIR+1 {
continue
}
if head.FrameNumber <= maxFrames {
continue
}
to := head.FrameNumber - maxFrames
for i := from; i < to; i += batchSize {
start, stop := i, min(i+batchSize, to)
if err := e.clockStore.DeleteDataClockFrameRange(e.filter, start, stop); err != nil {
e.logger.Error(
"failed to prune frames",
zap.Error(err),
zap.Uint64("from", start),
zap.Uint64("to", stop),
)
continue outer
}
e.logger.Info(
"pruned frames",
zap.Uint64("from", start),
zap.Uint64("to", stop),
)
select {
case <-e.ctx.Done():
return
default:
}
from = stop
}
}
}
}
func (e *DataClockConsensusEngine) runSync() {
defer e.wg.Done()
// small optimization, beacon should never collect for now:
if e.FrameProverTrieContains(0, e.provingKeyAddress) {
return
}
for {
select {
case <-e.ctx.Done():
return
case <-e.requestSyncCh:
if err := e.pubSub.Bootstrap(e.ctx); err != nil {
e.logger.Error("could not bootstrap", zap.Error(err))
}
if err := e.syncWithMesh(); err != nil {
e.logger.Error("could not sync", zap.Error(err))
}
}
}
}
func (e *DataClockConsensusEngine) runLoop() {
defer e.wg.Done()
dataFrameCh := e.dataTimeReel.NewFrameCh()
runOnce := true
for {
peerCount := e.pubSub.GetNetworkPeersCount()
if peerCount < e.minimumPeersRequired {
e.logger.Info(
"waiting for minimum peers",
zap.Int("peer_count", peerCount),
)
select {
case <-e.ctx.Done():
return
case <-time.After(1 * time.Second):
}
} else {
latestFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
if runOnce {
if e.FrameProverTrieContains(0, e.provingKeyAddress) {
dataFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
latestFrame = e.processFrame(latestFrame, dataFrame)
}
runOnce = false
}
select {
case <-e.ctx.Done():
return
case dataFrame := <-dataFrameCh:
e.validationFilterMx.Lock()
e.validationFilter = make(map[string]struct{}, len(e.validationFilter))
e.validationFilterMx.Unlock()
if e.FrameProverTrieContains(0, e.provingKeyAddress) {
if err := e.publishProof(dataFrame); err != nil {
e.logger.Error("could not publish proof", zap.Error(err))
e.stateMx.Lock()
if e.state < consensus.EngineStateStopping {
e.state = consensus.EngineStateCollecting
}
e.stateMx.Unlock()
}
}
latestFrame = e.processFrame(latestFrame, dataFrame)
}
}
}
}
func (e *DataClockConsensusEngine) processFrame(
latestFrame *protobufs.ClockFrame,
dataFrame *protobufs.ClockFrame,
) *protobufs.ClockFrame {
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
zap.Duration("frame_age", frametime.Since(dataFrame)),
)
var err error
if !e.FrameProverTrieContains(0, e.provingKeyAddress) {
select {
case e.requestSyncCh <- struct{}{}:
default:
}
}
if latestFrame != nil && dataFrame.FrameNumber > latestFrame.FrameNumber {
latestFrame = dataFrame
}
cas.IfLessThanUint64(&e.latestFrameReceived, latestFrame.FrameNumber)
e.frameProverTriesMx.Lock()
e.frameProverTries = e.dataTimeReel.GetFrameProverTries()
e.frameProverTriesMx.Unlock()
trie := e.GetFrameProverTrie(0)
selBI, _ := dataFrame.GetSelector()
sel := make([]byte, 32)
sel = selBI.FillBytes(sel)
if bytes.Equal(
trie.FindNearest(sel).Key,
e.provingKeyAddress,
) {
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(dataFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.stateMx.Lock()
if e.state < consensus.EngineStateStopping {
e.state = consensus.EngineStateCollecting
}
e.stateMx.Unlock()
return dataFrame
}
if _, err := e.dataTimeReel.Insert(e.ctx, nextFrame); err != nil {
e.logger.Debug("could not insert frame", zap.Error(err))
}
return nextFrame
} else {
return latestFrame
}
}