ceremonyclient/node/consensus/data/main_data_loop.go
Cassandra Heart b0cf294c99
V2.0.2.3 (#321)
* roll up v2.0.1-b2 to develop

* b2-fixed

* adjust return data of fast sync so it doesn't return the earliest frame

* -b3

* fix: announce peer based on leading frame, not initial frame; fix: looping bug

* fix: last batch fails due to underflow; qol: make logging chattier

* -b4

* resolve frame cache issue

* fix: mint loop + re-migrate

* fix: register execution panic

* fix: mint loop, other side

* fix: handle unexpected return of nil status

* final -b4

* handle subtle change to migration

* qol: add heuristic to handle corruption scenario

* bump genesis

* qol: use separate channel for worker

* final parameterization, parallelize streams

* Add direct peers to blossomsub (#309)

Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>

* chore(docker): add ca-certificates to fix x509 error. (#307)

* Update qcommander.sh bootrap (#304)

* chore(docker): add ca-certificates to fix x509 error.

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>

* deprecate signers 10, 11, 14, 17

* adjust signatory check size to match rotated out signers

* qol: sync by rebroadcast

* upgrade version

* more small adjustments

* wait a little longer

* fix: don't use iterator for frame directly until iterator is fixed

* change iterator, genesis for testnet

* adjust to previous sync handling

* adjust: don't grab the very latest while it's already being broadcasted

* ok, ready for testnet

* handle rebroadcast quirks

* more adjustments from testing

* faster

* temporarily bulk process on frame candidates

* resolve separate frames

* don't loop

* make worker reset resume to check where it should continue

* move window

* reduce signature count now that supermajority signed last

* resolve bottlenecks

* remove GOMAXPROCS limit for now

* revisions for v2.0.2.1

* bump version

* bulk import

* reintroduce sync

* small adustments to make life better

* check bitmask for peers and keep alive

* adjust reconnect

* ensure peer doesn't fall off address list

* adjust blossomsub to background discovery

* bump version

* remove dev check

* remove debug log line

* further adjustments

* a little more logic around connection management

* v2.0.2.3

* Fix peer discovery (#319)

* Fix peer discovery

* Make peer discovery connections parallel

* Monitor peers via pings (#317)

* Support QUILIBRIUM_SIGNATURE_CHECK in client (#314)

* Ensure direct peers are not pruned by resource limits (#315)

* Support pprof profiling via HTTP (#313)

* Fix CPU profiling

* Add pprof server support

* Additional peering connection improvements (#320)

* Lookup peers if not enough external peers are available

* Make bootstrap peer discovery sensitive to a lack of bootstrappers

---------

Co-authored-by: Tyler Sturos <55340199+tjsturos@users.noreply.github.com>
Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>
Co-authored-by: linquanisaac <33619994+linquanisaac@users.noreply.github.com>
Co-authored-by: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com>
2024-10-31 16:43:49 -05:00

362 lines
10 KiB
Go

package data
import (
"bytes"
"crypto/rand"
"time"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
func (
e *DataClockConsensusEngine,
) GetFrameProverTries() []*tries.RollingFrecencyCritbitTrie {
e.frameProverTriesMx.RLock()
frameProverTries := make(
[]*tries.RollingFrecencyCritbitTrie,
len(e.frameProverTries),
)
for i, trie := range e.frameProverTries {
newTrie := &tries.RollingFrecencyCritbitTrie{}
b, err := trie.Serialize()
if err != nil {
panic(err)
}
err = newTrie.Deserialize(b)
if err != nil {
panic(err)
}
frameProverTries[i] = newTrie
}
e.frameProverTriesMx.RUnlock()
return frameProverTries
}
func (e *DataClockConsensusEngine) runLoop() {
dataFrameCh := e.dataTimeReel.NewFrameCh()
for e.state < consensus.EngineStateStopping {
peerCount := e.pubSub.GetNetworkPeersCount()
if peerCount < e.minimumPeersRequired {
e.logger.Info(
"waiting for minimum peers",
zap.Int("peer_count", peerCount),
)
time.Sleep(1 * time.Second)
} else {
latestFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
select {
case dataFrame := <-dataFrameCh:
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
if !e.IsInProverTrie(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
if latestFrame != nil &&
dataFrame.FrameNumber > latestFrame.FrameNumber {
latestFrame = dataFrame
}
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
}
trie := e.GetFrameProverTries()[0]
selBI, _ := dataFrame.GetSelector()
sel := make([]byte, 32)
sel = selBI.FillBytes(sel)
if bytes.Equal(
trie.FindNearest(sel).External.Key,
e.provingKeyAddress,
) {
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.state = consensus.EngineStateCollecting
continue
}
// e.proverTrieRequestsMx.Lock()
// joinAddrs := tries.NewMinHeap[peerSeniorityItem]()
// leaveAddrs := tries.NewMinHeap[peerSeniorityItem]()
// for _, addr := range e.proverTrieJoinRequests {
// if _, ok := (*e.peerSeniority)[addr]; !ok {
// joinAddrs.Push(peerSeniorityItem{
// addr: addr,
// seniority: 0,
// })
// } else {
// joinAddrs.Push((*e.peerSeniority)[addr])
// }
// }
// for _, addr := range e.proverTrieLeaveRequests {
// if _, ok := (*e.peerSeniority)[addr]; !ok {
// leaveAddrs.Push(peerSeniorityItem{
// addr: addr,
// seniority: 0,
// })
// } else {
// leaveAddrs.Push((*e.peerSeniority)[addr])
// }
// }
// for _, addr := range e.proverTrieResumeRequests {
// if _, ok := e.proverTriePauseRequests[addr]; ok {
// delete(e.proverTriePauseRequests, addr)
// }
// }
// joinReqs := make([]peerSeniorityItem, len(joinAddrs.All()))
// copy(joinReqs, joinAddrs.All())
// slices.Reverse(joinReqs)
// leaveReqs := make([]peerSeniorityItem, len(leaveAddrs.All()))
// copy(leaveReqs, leaveAddrs.All())
// slices.Reverse(leaveReqs)
// e.proverTrieJoinRequests = make(map[string]string)
// e.proverTrieLeaveRequests = make(map[string]string)
// e.proverTrieRequestsMx.Unlock()
// e.frameProverTriesMx.Lock()
// for _, addr := range joinReqs {
// rings := len(e.frameProverTries)
// last := e.frameProverTries[rings-1]
// set := last.FindNearestAndApproximateNeighbors(make([]byte, 32))
// if len(set) == 1024 {
// e.frameProverTries = append(
// e.frameProverTries,
// &tries.RollingFrecencyCritbitTrie{},
// )
// last = e.frameProverTries[rings]
// }
// last.Add([]byte(addr.addr), nextFrame.FrameNumber)
// }
// for _, addr := range leaveReqs {
// for _, t := range e.frameProverTries {
// if bytes.Equal(
// t.FindNearest([]byte(addr.addr)).External.Key,
// []byte(addr.addr),
// ) {
// t.Remove([]byte(addr.addr))
// break
// }
// }
// }
// e.frameProverTriesMx.Unlock()
e.dataTimeReel.Insert(nextFrame, true)
if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting
}
break
}
case <-time.After(20 * time.Second):
dataFrame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
)
if !e.IsInProverTrie(e.provingKeyBytes) {
if latestFrame, err = e.collect(dataFrame); err != nil {
e.logger.Error("could not collect", zap.Error(err))
}
}
if latestFrame == nil ||
latestFrame.FrameNumber < dataFrame.FrameNumber {
latestFrame, err = e.dataTimeReel.Head()
if err != nil {
panic(err)
}
}
if e.latestFrameReceived < latestFrame.FrameNumber {
e.latestFrameReceived = latestFrame.FrameNumber
}
for _, trie := range e.GetFrameProverTries() {
if bytes.Equal(
trie.FindNearest(e.provingKeyAddress).External.Key,
e.provingKeyAddress,
) {
var nextFrame *protobufs.ClockFrame
if nextFrame, err = e.prove(latestFrame); err != nil {
e.logger.Error("could not prove", zap.Error(err))
e.state = consensus.EngineStateCollecting
continue
}
// e.proverTrieRequestsMx.Lock()
// joinAddrs := tries.NewMinHeap[peerSeniorityItem]()
// leaveAddrs := tries.NewMinHeap[peerSeniorityItem]()
// for _, addr := range e.proverTrieJoinRequests {
// if _, ok := (*e.peerSeniority)[addr]; !ok {
// joinAddrs.Push(peerSeniorityItem{
// addr: addr,
// seniority: 0,
// })
// } else {
// joinAddrs.Push((*e.peerSeniority)[addr])
// }
// }
// for _, addr := range e.proverTrieLeaveRequests {
// if _, ok := (*e.peerSeniority)[addr]; !ok {
// leaveAddrs.Push(peerSeniorityItem{
// addr: addr,
// seniority: 0,
// })
// } else {
// leaveAddrs.Push((*e.peerSeniority)[addr])
// }
// }
// for _, addr := range e.proverTrieResumeRequests {
// if _, ok := e.proverTriePauseRequests[addr]; ok {
// delete(e.proverTriePauseRequests, addr)
// }
// }
// joinReqs := make([]peerSeniorityItem, len(joinAddrs.All()))
// copy(joinReqs, joinAddrs.All())
// slices.Reverse(joinReqs)
// leaveReqs := make([]peerSeniorityItem, len(leaveAddrs.All()))
// copy(leaveReqs, leaveAddrs.All())
// slices.Reverse(leaveReqs)
// e.proverTrieJoinRequests = make(map[string]string)
// e.proverTrieLeaveRequests = make(map[string]string)
// e.proverTrieRequestsMx.Unlock()
// e.frameProverTriesMx.Lock()
// for _, addr := range joinReqs {
// rings := len(e.frameProverTries)
// last := e.frameProverTries[rings-1]
// set := last.FindNearestAndApproximateNeighbors(make([]byte, 32))
// if len(set) == 8 {
// e.frameProverTries = append(
// e.frameProverTries,
// &tries.RollingFrecencyCritbitTrie{},
// )
// last = e.frameProverTries[rings]
// }
// last.Add([]byte(addr.addr), nextFrame.FrameNumber)
// }
// for _, addr := range leaveReqs {
// for _, t := range e.frameProverTries {
// if bytes.Equal(
// t.FindNearest([]byte(addr.addr)).External.Key,
// []byte(addr.addr),
// ) {
// t.Remove([]byte(addr.addr))
// break
// }
// }
// }
// e.frameProverTriesMx.Unlock()
e.dataTimeReel.Insert(nextFrame, true)
if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting
}
break
}
}
}
}
}
}
func (e *DataClockConsensusEngine) rebroadcastLoop() {
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
time.Sleep(120 * time.Second)
for {
_, err := e.dataTimeReel.Head()
if err != nil {
e.logger.Info("no frames to rebroadcast yet, waiting...")
time.Sleep(10 * time.Second)
continue
}
max, _, err := e.clockStore.GetLatestDataClockFrame(e.filter)
frames := []*protobufs.ClockFrame{}
sent := false
for i := uint64(1); i < max.FrameNumber; i++ {
if e.state == consensus.EngineStateStopped ||
e.state == consensus.EngineStateStopping {
e.logger.Info("shutting down rebroadcaster")
return
}
frame, _, err := e.clockStore.GetDataClockFrame(e.filter, i, false)
if err != nil {
frames = []*protobufs.ClockFrame{}
e.logger.Error("error while iterating", zap.Error(err))
break
}
if frame == nil {
frames = []*protobufs.ClockFrame{}
e.logger.Error("too far ahead", zap.Error(err))
break
}
frames = append(frames, frame)
if i%50 == 0 {
e.logger.Info(
"rebroadcasting frames",
zap.Uint64("from", frames[0].FrameNumber),
zap.Uint64("to", frames[len(frames)-1].FrameNumber),
)
e.publishMessage(e.filter, &protobufs.FrameRebroadcast{
From: frames[0].FrameNumber,
To: frames[len(frames)-1].FrameNumber,
ClockFrames: frames,
})
time.Sleep(60 * time.Second)
sent = true
frames = []*protobufs.ClockFrame{}
}
}
if !sent && len(frames) != 0 {
e.logger.Info(
"rebroadcasting frames",
zap.Uint64("from", frames[0].FrameNumber),
zap.Uint64("to", frames[len(frames)-1].FrameNumber),
)
b := make([]byte, 24)
rand.Read(b)
e.publishMessage(e.filter, &protobufs.FrameRebroadcast{
From: frames[0].FrameNumber,
To: frames[len(frames)-1].FrameNumber,
ClockFrames: frames,
Random: b,
})
time.Sleep(60 * time.Second)
}
}
}
}