From ec37cb34d7b7dd4b35959087aedd402e99a16c0a Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Mon, 25 Nov 2024 00:07:15 +0100 Subject: [PATCH] Add structured stop procedure (#379) --- .../data/data_clock_consensus_engine.go | 31 +++++++++++++++---- node/consensus/data/main_data_loop.go | 3 ++ node/consensus/data/message_handler.go | 3 ++ node/consensus/time/data_time_reel.go | 4 +++ .../token/token_execution_engine.go | 4 +++ 5 files changed, 39 insertions(+), 6 deletions(-) diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 426efea..41b2b5d 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -4,6 +4,8 @@ import ( "context" "crypto" "encoding/binary" + stderrors "errors" + "fmt" "math/rand" "sync" @@ -65,6 +67,7 @@ type DataClockConsensusEngine struct { ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup lastProven uint64 difficulty uint32 @@ -311,6 +314,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { panic(err) } + e.wg.Add(3) go e.runFrameMessageHandler() go e.runTxMessageHandler() go e.runInfoMessageHandler() @@ -359,7 +363,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error { e.state = consensus.EngineStateCollecting e.stateMx.Unlock() + e.wg.Add(1) go func() { + defer e.wg.Done() const baseDuration = 2 * time.Minute const maxBackoff = 3 var currentBackoff = 0 @@ -395,7 +401,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error { } }() + e.wg.Add(1) go func() { + defer e.wg.Done() thresholdBeforeConfirming := 4 frame, err := e.dataTimeReel.Head() if err != nil { @@ -500,11 +508,14 @@ func (e *DataClockConsensusEngine) Start() <-chan error { } }() + e.wg.Add(3) go e.runLoop() go e.runSync() go e.runFramePruning() + e.wg.Add(1) go func() { + defer e.wg.Done() select { case <-e.ctx.Done(): return @@ -524,7 +535,9 @@ func (e *DataClockConsensusEngine) Start() <-chan error { go e.runPreMidnightProofWorker() + e.wg.Add(1) go func() { + defer e.wg.Done() if len(e.config.Engine.DataWorkerMultiaddrs) != 0 { e.clients, err = e.createParallelDataClientsFromList() if err != nil { @@ -582,6 +595,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof( i := i client := client go func() { + defer wg.Done() resp, err := client.client.CalculateChallengeProof( e.ctx, @@ -595,7 +609,6 @@ func (e *DataClockConsensusEngine) PerformTimeProof( ) if err != nil { if status.Code(err) == codes.NotFound { - wg.Done() return } } @@ -605,8 +618,6 @@ func (e *DataClockConsensusEngine) PerformTimeProof( } else { e.clients[client.index] = nil } - - wg.Done() }() } wg.Wait() @@ -623,6 +634,7 @@ func (e *DataClockConsensusEngine) PerformTimeProof( func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.logger.Info("stopping ceremony consensus engine") e.cancel() + e.wg.Wait() e.stateMx.Lock() e.state = consensus.EngineStateStopping e.stateMx.Unlock() @@ -654,9 +666,11 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { wg := sync.WaitGroup{} wg.Add(len(e.executionEngines)) + executionErrors := make(chan error, len(e.executionEngines)) for name := range e.executionEngines { name := name go func(name string) { + defer wg.Done() frame, err := e.dataTimeReel.Head() if err != nil { panic(err) @@ -664,9 +678,8 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { err = <-e.UnregisterExecutor(name, frame.FrameNumber, force) if err != nil { - errChan <- err + executionErrors <- err } - wg.Done() }(name) } @@ -679,6 +692,7 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.logger.Info("waiting for execution engines to stop") wg.Wait() + close(executionErrors) e.logger.Info("execution engines stopped") e.dataTimeReel.Stop() @@ -689,7 +703,12 @@ func (e *DataClockConsensusEngine) Stop(force bool) <-chan error { e.engineMx.Lock() defer e.engineMx.Unlock() go func() { - errChan <- nil + var errs []error + for err := range executionErrors { + errs = append(errs, err) + } + err := stderrors.Join(errs...) + errChan <- err }() return errChan } diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index b4473fc..16051b9 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -43,6 +43,7 @@ func ( } func (e *DataClockConsensusEngine) runFramePruning() { + defer e.wg.Done() // A full prover should _never_ do this if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) || e.config.Engine.MaxFrames == -1 || e.config.Engine.FullProver { @@ -85,6 +86,7 @@ func (e *DataClockConsensusEngine) runFramePruning() { } func (e *DataClockConsensusEngine) runSync() { + defer e.wg.Done() // small optimization, beacon should never collect for now: if e.GetFrameProverTries()[0].Contains(e.provingKeyAddress) { return @@ -113,6 +115,7 @@ func (e *DataClockConsensusEngine) runSync() { } func (e *DataClockConsensusEngine) runLoop() { + defer e.wg.Done() dataFrameCh := e.dataTimeReel.NewFrameCh() runOnce := true for { diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index 36aa8da..09626be 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -19,6 +19,7 @@ import ( ) func (e *DataClockConsensusEngine) runFrameMessageHandler() { + defer e.wg.Done() for { select { case <-e.ctx.Done(): @@ -67,6 +68,7 @@ func (e *DataClockConsensusEngine) runFrameMessageHandler() { } func (e *DataClockConsensusEngine) runTxMessageHandler() { + defer e.wg.Done() for { select { case <-e.ctx.Done(): @@ -155,6 +157,7 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { } func (e *DataClockConsensusEngine) runInfoMessageHandler() { + defer e.wg.Done() for { select { case <-e.ctx.Done(): diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index f6002ad..25e08f7 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -36,6 +36,7 @@ type DataTimeReel struct { ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup filter []byte engineConfig *config.EngineConfig @@ -175,6 +176,7 @@ func (d *DataTimeReel) Start() error { d.headDistance, err = d.GetDistance(frame) } + d.wg.Add(1) go d.runLoop() return nil @@ -253,6 +255,7 @@ func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame { func (d *DataTimeReel) Stop() { d.cancel() + d.wg.Wait() } func (d *DataTimeReel) createGenesisFrame() ( @@ -338,6 +341,7 @@ func (d *DataTimeReel) createGenesisFrame() ( // Main data consensus loop func (d *DataTimeReel) runLoop() { + defer d.wg.Done() for { select { case <-d.ctx.Done(): diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index fee6f6e..5deb1a3 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -83,6 +83,7 @@ func (p PeerSeniorityItem) Priority() uint64 { type TokenExecutionEngine struct { ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup logger *zap.Logger clock *data.DataClockConsensusEngine clockStore store.ClockStore @@ -341,7 +342,9 @@ func NewTokenExecutionEngine( e.proverPublicKey = publicKeyBytes e.provingKeyAddress = provingKeyAddress + e.wg.Add(1) go func() { + defer e.wg.Done() f, tries, err := e.clockStore.GetLatestDataClockFrame(e.intrinsicFilter) if err != nil { return @@ -453,6 +456,7 @@ func (e *TokenExecutionEngine) Start() <-chan error { // Stop implements ExecutionEngine func (e *TokenExecutionEngine) Stop(force bool) <-chan error { e.cancel() + e.wg.Wait() errChan := make(chan error)