Add structured stop procedure (#379)

This commit is contained in:
petricadaipegsp 2024-11-25 00:07:15 +01:00 committed by GitHub
parent d8321bf812
commit ec37cb34d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 39 additions and 6 deletions

View File

@ -4,6 +4,8 @@ import (
"context"
"crypto"
"encoding/binary"
stderrors "errors"
"fmt"
"math/rand"
"sync"
@ -65,6 +67,7 @@ type DataClockConsensusEngine struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
lastProven uint64
difficulty uint32
@ -311,6 +314,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
panic(err)
}
e.wg.Add(3)
go e.runFrameMessageHandler()
go e.runTxMessageHandler()
go e.runInfoMessageHandler()
@ -359,7 +363,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
e.state = consensus.EngineStateCollecting
e.stateMx.Unlock()
e.wg.Add(1)
go func() {
defer e.wg.Done()
const baseDuration = 2 * time.Minute
const maxBackoff = 3
var currentBackoff = 0
@ -395,7 +401,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
}
}()
e.wg.Add(1)
go func() {
defer e.wg.Done()
thresholdBeforeConfirming := 4
frame, err := e.dataTimeReel.Head()
if err != nil {
@ -500,11 +508,14 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
}
}()
e.wg.Add(3)
go e.runLoop()
go e.runSync()
go e.runFramePruning()
e.wg.Add(1)
go func() {
defer e.wg.Done()
select {
case <-e.ctx.Done():
return
@ -524,7 +535,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
go e.runPreMidnightProofWorker()
e.wg.Add(1)
go func() {
defer e.wg.Done()
if len(e.config.Engine.DataWorkerMultiaddrs) != 0 {
e.clients, err = e.createParallelDataClientsFromList()
if err != nil {
@ -582,6 +595,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
i := i
client := client
go func() {
defer wg.Done()
resp, err :=
client.client.CalculateChallengeProof(
e.ctx,
@ -595,7 +609,6 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
)
if err != nil {
if status.Code(err) == codes.NotFound {
wg.Done()
return
}
}
@ -605,8 +618,6 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
} else {
e.clients[client.index] = nil
}
wg.Done()
}()
}
wg.Wait()
@ -623,6 +634,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof(
func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
e.logger.Info("stopping ceremony consensus engine")
e.cancel()
e.wg.Wait()
e.stateMx.Lock()
e.state = consensus.EngineStateStopping
e.stateMx.Unlock()
@ -654,9 +666,11 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
wg := sync.WaitGroup{}
wg.Add(len(e.executionEngines))
executionErrors := make(chan error, len(e.executionEngines))
for name := range e.executionEngines {
name := name
go func(name string) {
defer wg.Done()
frame, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
@ -664,9 +678,8 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
err = <-e.UnregisterExecutor(name, frame.FrameNumber, force)
if err != nil {
errChan <- err
executionErrors <- err
}
wg.Done()
}(name)
}
@ -679,6 +692,7 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
e.logger.Info("waiting for execution engines to stop")
wg.Wait()
close(executionErrors)
e.logger.Info("execution engines stopped")
e.dataTimeReel.Stop()
@ -689,7 +703,12 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error {
e.engineMx.Lock()
defer e.engineMx.Unlock()
go func() {
errChan <- nil
var errs []error
for err := range executionErrors {
errs = append(errs, err)
}
err := stderrors.Join(errs...)
errChan <- err
}()
return errChan
}

View File

@ -43,6 +43,7 @@ func (
}
func (e *DataClockConsensusEngine) runFramePruning() {
defer e.wg.Done()
// A full prover should _never_ do this
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) ||
e.config.Engine.MaxFrames == -1 || e.config.Engine.FullProver {
@ -85,6 +86,7 @@ func (e *DataClockConsensusEngine) runFramePruning() {
}
func (e *DataClockConsensusEngine) runSync() {
defer e.wg.Done()
// small optimization, beacon should never collect for now:
if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) {
return
@ -113,6 +115,7 @@ func (e *DataClockConsensusEngine) runSync() {
}
func (e *DataClockConsensusEngine) runLoop() {
defer e.wg.Done()
dataFrameCh := e.dataTimeReel.NewFrameCh()
runOnce := true
for {

View File

@ -19,6 +19,7 @@ import (
)
func (e *DataClockConsensusEngine) runFrameMessageHandler() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():
@ -67,6 +68,7 @@ func (e *DataClockConsensusEngine) runFrameMessageHandler() {
}
func (e *DataClockConsensusEngine) runTxMessageHandler() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():
@ -155,6 +157,7 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() {
}
func (e *DataClockConsensusEngine) runInfoMessageHandler() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():

View File

@ -36,6 +36,7 @@ type DataTimeReel struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
filter []byte
engineConfig *config.EngineConfig
@ -175,6 +176,7 @@ func (d *DataTimeReel) Start() error {
d.headDistance, err = d.GetDistance(frame)
}
d.wg.Add(1)
go d.runLoop()
return nil
@ -253,6 +255,7 @@ func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame {
func (d *DataTimeReel) Stop() {
d.cancel()
d.wg.Wait()
}
func (d *DataTimeReel) createGenesisFrame() (
@ -338,6 +341,7 @@ func (d *DataTimeReel) createGenesisFrame() (
// Main data consensus loop
func (d *DataTimeReel) runLoop() {
defer d.wg.Done()
for {
select {
case <-d.ctx.Done():

View File

@ -83,6 +83,7 @@ func (p PeerSeniorityItem) Priority() uint64 {
type TokenExecutionEngine struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *zap.Logger
clock *data.DataClockConsensusEngine
clockStore store.ClockStore
@ -341,7 +342,9 @@ func NewTokenExecutionEngine(
e.proverPublicKey = publicKeyBytes
e.provingKeyAddress = provingKeyAddress
e.wg.Add(1)
go func() {
defer e.wg.Done()
f, tries, err := e.clockStore.GetLatestDataClockFrame(e.intrinsicFilter)
if err != nil {
return
@ -453,6 +456,7 @@ func (e *TokenExecutionEngine) Start() <-chan error {
// Stop implements ExecutionEngine
func (e *TokenExecutionEngine) Stop(force bool) <-chan error {
e.cancel()
e.wg.Wait()
errChan := make(chan error)