ceremonyclient/node/consensus/data/main_data_loop.go
2024-11-06 14:46:04 -06:00

132 lines
3.1 KiB
Go

package data
import (
"bytes"
"time"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
func (
e *DataClockConsensusEngine,
) GetFrameProverTries() []*tries.RollingFrecencyCritbitTrie {
e.frameProverTriesMx.RLock()
frameProverTries := make(
[]*tries.RollingFrecencyCritbitTrie,
len(e.frameProverTries),
)
for i, trie := range e.frameProverTries {
newTrie := &tries.RollingFrecencyCritbitTrie{}
b, err := trie.Serialize()
if err != nil {
panic(err)
}
err = newTrie.Deserialize(b)
if err != nil {
panic(err)
}
frameProverTries[i] = newTrie
}
e.frameProverTriesMx.RUnlock()
return frameProverTries
}
func (e *DataClockConsensusEngine) runLoop() {
dataFrameCh := e.dataTimeReel.NewFrameCh()
for e.state < consensus.EngineStateStopping {
peerCount := e.pubSub.GetNetworkPeersCount()
if peerCount < e.minimumPeersRequired {
e.logger.Info(
"waiting for minimum peers",
zap.Int("peer_count", peerCount),
)
time.Sleep(1 * time.Second)
} else {
latestFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
select {
case dataFrame := <-dataFrameCh:
latestFrame = e.processFrame(latestFrame, dataFrame)
case <-time.After(20 * time.Second):
dataFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
latestFrame = e.processFrame(latestFrame, dataFrame)
}
}
}
}
func (e *DataClockConsensusEngine) processFrame(
latestFrame *protobufs.ClockFrame,
dataFrame *protobufs.ClockFrame,
) *protobufs.ClockFrame {
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
var err error
if !e.GetFrameProverTries()[0].Contains(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
if latestFrame != nil &&
dataFrame.FrameNumber > latestFrame.FrameNumber {
latestFrame = dataFrame
}
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
}
e.frameProverTriesMx.Lock()
e.frameProverTries = e.dataTimeReel.GetFrameProverTries()
e.frameProverTriesMx.Unlock()
trie := e.GetFrameProverTries()[0]
selBI, _ := dataFrame.GetSelector()
sel := make([]byte, 32)
sel = selBI.FillBytes(sel)
if bytes.Equal(
trie.FindNearest(sel).External.Key,
e.provingKeyAddress,
) {
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.state = consensus.EngineStateCollecting
return latestFrame
}
e.dataTimeReel.Insert(nextFrame, true)
if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting
}
return nextFrame
} else {
if !e.IsInProverTrie(e.pubSub.GetPeerID()) &&
dataFrame.Timestamp > time.Now().UnixMilli()-30000 {
e.logger.Info("announcing prover join")
e.announceProverJoin()
}
return latestFrame
}
}