diff --git a/node/consensus/data/broadcast_messaging.go b/node/consensus/data/broadcast_messaging.go index 78b1b0a..b48d314 100644 --- a/node/consensus/data/broadcast_messaging.go +++ b/node/consensus/data/broadcast_messaging.go @@ -17,30 +17,39 @@ import ( func (e *DataClockConsensusEngine) handleFrameMessage( message *pb.Message, ) error { - go func() { - e.frameMessageProcessorCh <- message - }() - + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case e.frameMessageProcessorCh <- message: + default: + e.logger.Warn("dropping frame message") + } return nil } func (e *DataClockConsensusEngine) handleTxMessage( message *pb.Message, ) error { - go func() { - e.txMessageProcessorCh <- message - }() - + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case e.txMessageProcessorCh <- message: + default: + e.logger.Warn("dropping tx message") + } return nil } func (e *DataClockConsensusEngine) handleInfoMessage( message *pb.Message, ) error { - go func() { - e.infoMessageProcessorCh <- message - }() - + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case e.infoMessageProcessorCh <- message: + default: + e.logger.Warn("dropping info message") + } return nil } @@ -130,9 +139,13 @@ func (e *DataClockConsensusEngine) insertTxMessage( Seqno: nil, } - go func() { - e.txMessageProcessorCh <- m - }() + select { + case <-e.ctx.Done(): + return e.ctx.Err() + case e.txMessageProcessorCh <- m: + default: + e.logger.Warn("dropping tx message") + } return nil } diff --git a/node/consensus/data/consensus_frames.go b/node/consensus/data/consensus_frames.go index 1ec2a6b..71e6869 100644 --- a/node/consensus/data/consensus_frames.go +++ b/node/consensus/data/consensus_frames.go @@ -7,7 +7,6 @@ import ( "golang.org/x/crypto/sha3" "source.quilibrium.com/quilibrium/monorepo/node/config" - "source.quilibrium.com/quilibrium/monorepo/node/consensus" "source.quilibrium.com/quilibrium/monorepo/node/consensus/data/internal" "source.quilibrium.com/quilibrium/monorepo/node/internal/frametime" @@ -318,7 +317,7 @@ func (e *DataClockConsensusEngine) sync( syncTimeout = defaultSyncTimeout } - for e.GetState() < consensus.EngineStateStopping { + for { ctx, cancel := context.WithTimeout(e.ctx, syncTimeout) response, err := client.GetDataFrame( ctx, @@ -364,11 +363,10 @@ func (e *DataClockConsensusEngine) sync( ); err != nil { return nil, errors.Wrap(err, "sync") } - e.dataTimeReel.Insert(response.ClockFrame, true) + e.dataTimeReel.Insert(e.ctx, response.ClockFrame, true) latest = response.ClockFrame if latest.FrameNumber >= maxFrame { return latest, nil } } - return latest, nil } diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 3c2a5fd..4ffa4c7 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -257,9 +257,9 @@ func NewDataClockConsensusEngine( masterTimeReel: masterTimeReel, dataTimeReel: dataTimeReel, peerInfoManager: peerInfoManager, - frameMessageProcessorCh: make(chan *pb.Message), - txMessageProcessorCh: make(chan *pb.Message), - infoMessageProcessorCh: make(chan *pb.Message), + frameMessageProcessorCh: make(chan *pb.Message, 65536), + txMessageProcessorCh: make(chan *pb.Message, 65536), + infoMessageProcessorCh: make(chan *pb.Message, 65536), config: cfg, preMidnightMint: map[string]struct{}{}, grpcRateLimiter: NewRateLimiter( @@ -368,16 +368,19 @@ func (e *DataClockConsensusEngine) Start() <-chan error { panic(err) } source := rand.New(rand.NewSource(rand.Int63())) - for e.GetState() < consensus.EngineStateStopping { + for { // Use exponential backoff with jitter in order to avoid hammering the bootstrappers. - time.Sleep( - backoff.FullJitter( - baseDuration<= nextFrame.FrameNumber || nextFrame.FrameNumber == 0 { - time.Sleep(120 * time.Second) + select { + case <-e.ctx.Done(): + return + case <-time.After(2 * time.Minute): + } continue } @@ -485,7 +492,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error { thresholdBeforeConfirming-- } - time.Sleep(120 * time.Second) + select { + case <-e.ctx.Done(): + return + case <-time.After(2 * time.Minute): + } } }() @@ -494,7 +505,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error { go e.runFramePruning() go func() { - time.Sleep(30 * time.Second) + select { + case <-e.ctx.Done(): + return + case <-time.After(30 * time.Second): + } e.logger.Info("checking for snapshots to play forward") if err := e.downloadSnapshot(e.config.DB.Path, e.config.P2P.Network); err != nil { e.logger.Debug("error downloading snapshot", zap.Error(err)) diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 2c15b18..195c9fa 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -113,14 +113,18 @@ func (e *DataClockConsensusEngine) runSync() { func (e *DataClockConsensusEngine) runLoop() { dataFrameCh := e.dataTimeReel.NewFrameCh() runOnce := true - for e.GetState() < consensus.EngineStateStopping { + for { peerCount := e.pubSub.GetNetworkPeersCount() if peerCount < e.minimumPeersRequired { e.logger.Info( "waiting for minimum peers", zap.Int("peer_count", peerCount), ) - time.Sleep(1 * time.Second) + select { + case <-e.ctx.Done(): + return + case <-time.After(1 * time.Second): + } } else { latestFrame, err := e.dataTimeReel.Head() if err != nil { @@ -205,7 +209,7 @@ func (e *DataClockConsensusEngine) processFrame( return dataFrame } - e.dataTimeReel.Insert(nextFrame, true) + e.dataTimeReel.Insert(e.ctx, nextFrame, true) return nextFrame } else { diff --git a/node/consensus/data/message_handler.go b/node/consensus/data/message_handler.go index f77e9da..f3fe4fc 100644 --- a/node/consensus/data/message_handler.go +++ b/node/consensus/data/message_handler.go @@ -3,6 +3,7 @@ package data import ( "bytes" "fmt" + "sync" "time" "github.com/iden3/go-iden3-crypto/poseidon" @@ -13,14 +14,15 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "source.quilibrium.com/quilibrium/monorepo/node/config" - "source.quilibrium.com/quilibrium/monorepo/node/consensus" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) func (e *DataClockConsensusEngine) runFrameMessageHandler() { - for e.GetState() < consensus.EngineStateStopping { + for { select { + case <-e.ctx.Done(): + return case message := <-e.frameMessageProcessorCh: e.logger.Debug("handling frame message") msg := &protobufs.Message{} @@ -49,26 +51,26 @@ func (e *DataClockConsensusEngine) runFrameMessageHandler() { continue } - go func() { - switch any.TypeUrl { - case protobufs.ClockFrameType: - if err := e.handleClockFrameData( - message.From, - msg.Address, - any, - false, - ); err != nil { - return - } + switch any.TypeUrl { + case protobufs.ClockFrameType: + if err := e.handleClockFrameData( + message.From, + msg.Address, + any, + false, + ); err != nil { + e.logger.Debug("could not handle clock frame data", zap.Error(err)) } - }() + } } } } func (e *DataClockConsensusEngine) runTxMessageHandler() { - for e.GetState() < consensus.EngineStateStopping { + for { select { + case <-e.ctx.Done(): + return case message := <-e.txMessageProcessorCh: e.logger.Debug("handling tx message") msg := &protobufs.Message{} @@ -97,9 +99,12 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { } if e.frameProverTries[0].Contains(e.provingKeyAddress) { + wg := &sync.WaitGroup{} for name := range e.executionEngines { name := name + wg.Add(1) go func() error { + defer wg.Done() messages, err := e.executionEngines[name].ProcessMessage( application.TOKEN_ADDRESS, msg, @@ -125,18 +130,17 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { continue } - e.logger.Debug(appMsg.TypeUrl) - switch appMsg.TypeUrl { case protobufs.TokenRequestType: t := &protobufs.TokenRequest{} err := proto.Unmarshal(appMsg.Value, t) if err != nil { + e.logger.Debug("could not unmarshal token request", zap.Error(err)) continue } if err := e.handleTokenRequest(t); err != nil { - continue + e.logger.Debug("could not handle token request", zap.Error(err)) } } } @@ -144,14 +148,17 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() { return nil }() } + wg.Wait() } } } } func (e *DataClockConsensusEngine) runInfoMessageHandler() { - for e.GetState() < consensus.EngineStateStopping { + for { select { + case <-e.ctx.Done(): + return case message := <-e.infoMessageProcessorCh: e.logger.Debug("handling info message") msg := &protobufs.Message{} @@ -180,18 +187,16 @@ func (e *DataClockConsensusEngine) runInfoMessageHandler() { continue } - go func() { - switch any.TypeUrl { - case protobufs.DataPeerListAnnounceType: - if err := e.handleDataPeerListAnnounce( - message.From, - msg.Address, - any, - ); err != nil { - return - } + switch any.TypeUrl { + case protobufs.DataPeerListAnnounceType: + if err := e.handleDataPeerListAnnounce( + message.From, + msg.Address, + any, + ); err != nil { + e.logger.Debug("could not handle data peer list announce", zap.Error(err)) } - }() + } } } } @@ -249,7 +254,7 @@ func (e *DataClockConsensusEngine) handleClockFrame( } if frame.FrameNumber > head.FrameNumber { - e.dataTimeReel.Insert(frame, false) + e.dataTimeReel.Insert(e.ctx, frame, false) } return nil diff --git a/node/consensus/master/broadcast_messaging.go b/node/consensus/master/broadcast_messaging.go index e2fe211..cde9b3f 100644 --- a/node/consensus/master/broadcast_messaging.go +++ b/node/consensus/master/broadcast_messaging.go @@ -2,6 +2,7 @@ package master import ( "bytes" + "context" "encoding/binary" "strings" "time" @@ -154,7 +155,7 @@ func (e *MasterClockConsensusEngine) publishProof( zap.Uint64("frame_number", frame.FrameNumber), ) - e.masterTimeReel.Insert(frame, false) + e.masterTimeReel.Insert(context.TODO(), frame, false) } e.state = consensus.EngineStateCollecting diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index b17acdb..54fa8f7 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -2,6 +2,7 @@ package master import ( "bytes" + "context" gcrypto "crypto" "encoding/hex" "math/big" @@ -207,7 +208,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { continue } - e.masterTimeReel.Insert(newFrame, false) + e.masterTimeReel.Insert(context.TODO(), newFrame, false) } } }() diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index d0ee3f2..f6002ad 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -2,6 +2,7 @@ package time import ( "bytes" + "context" "encoding/hex" "math/big" "os" @@ -32,7 +33,9 @@ type pendingFrame struct { type DataTimeReel struct { rwMutex sync.RWMutex - running bool + + ctx context.Context + cancel context.CancelFunc filter []byte engineConfig *config.EngineConfig @@ -61,7 +64,6 @@ type DataTimeReel struct { frames chan *pendingFrame newFrameCh chan *protobufs.ClockFrame badFrameCh chan *protobufs.ClockFrame - done chan bool alwaysSend bool restore func() []*tries.RollingFrecencyCritbitTrie } @@ -115,8 +117,10 @@ func NewDataTimeReel( panic(err) } + ctx, cancel := context.WithCancel(context.Background()) return &DataTimeReel{ - running: false, + ctx: ctx, + cancel: cancel, logger: logger, filter: filter, engineConfig: engineConfig, @@ -129,10 +133,9 @@ func NewDataTimeReel( lruFrames: cache, // pending: make(map[uint64][]*pendingFrame), incompleteForks: make(map[uint64][]*pendingFrame), - frames: make(chan *pendingFrame), + frames: make(chan *pendingFrame, 65536), newFrameCh: make(chan *protobufs.ClockFrame), badFrameCh: make(chan *protobufs.ClockFrame), - done: make(chan bool), alwaysSend: alwaysSend, restore: restore, } @@ -172,17 +175,12 @@ func (d *DataTimeReel) Start() error { d.headDistance, err = d.GetDistance(frame) } - d.running = true go d.runLoop() return nil } func (d *DataTimeReel) SetHead(frame *protobufs.ClockFrame) { - if d.running == true { - panic("internal test function should never be called outside of tests") - } - d.head = frame } @@ -193,9 +191,9 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) { // Insert enqueues a structurally valid frame into the time reel. If the frame // is the next one in sequence, it advances the reel head forward and emits a // new frame on the new frame channel. -func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error { - if !d.running { - return nil +func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error { + if err := d.ctx.Err(); err != nil { + return err } d.logger.Debug( @@ -222,13 +220,17 @@ func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error { d.storePending(selector, parent, distance, frame) if d.head.FrameNumber+1 == frame.FrameNumber { - go func() { - d.frames <- &pendingFrame{ - selector: selector, - parentSelector: parent, - frameNumber: frame.FrameNumber, - } - }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-d.ctx.Done(): + return d.ctx.Err() + case d.frames <- &pendingFrame{ + selector: selector, + parentSelector: parent, + frameNumber: frame.FrameNumber, + }: + } } } @@ -250,7 +252,7 @@ func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame { } func (d *DataTimeReel) Stop() { - d.done <- true + d.cancel() } func (d *DataTimeReel) createGenesisFrame() ( @@ -336,8 +338,10 @@ func (d *DataTimeReel) createGenesisFrame() ( // Main data consensus loop func (d *DataTimeReel) runLoop() { - for d.running { + for { select { + case <-d.ctx.Done(): + return case frame := <-d.frames: rawFrame, err := d.clockStore.GetStagedDataClockFrame( d.filter, @@ -459,9 +463,6 @@ func (d *DataTimeReel) runLoop() { // } // } } - case <-d.done: - d.running = false - return } } } @@ -563,8 +564,7 @@ func (d *DataTimeReel) processPending( for { select { - case <-d.done: - d.running = false + case <-d.ctx.Done(): return default: } @@ -686,14 +686,19 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e d.headDistance = distance if d.alwaysSend { - d.newFrameCh <- frame - } - go func() { select { + case <-d.ctx.Done(): + return d.ctx.Err() + case d.newFrameCh <- frame: + } + } else { + select { + case <-d.ctx.Done(): + return d.ctx.Err() case d.newFrameCh <- frame: default: } - }() + } return nil } @@ -992,12 +997,11 @@ func (d *DataTimeReel) forkChoice( d.totalDistance, ) - go func() { - select { - case d.newFrameCh <- frame: - default: - } - }() + select { + case <-d.ctx.Done(): + case d.newFrameCh <- frame: + default: + } } func (d *DataTimeReel) GetTotalDistance() *big.Int { diff --git a/node/consensus/time/data_time_reel_test.go b/node/consensus/time/data_time_reel_test.go index 263e9dc..43cc07d 100644 --- a/node/consensus/time/data_time_reel_test.go +++ b/node/consensus/time/data_time_reel_test.go @@ -2,6 +2,7 @@ package time_test import ( "bytes" + "context" "fmt" "math/rand" "strings" @@ -108,6 +109,7 @@ func generateTestProvers() ( } func TestDataTimeReel(t *testing.T) { + ctx := context.Background() logger, _ := zap.NewDevelopment() db := store.NewInMemKVDB() clockStore := store.NewPebbleClockStore(db, logger) @@ -231,7 +233,7 @@ func TestDataTimeReel(t *testing.T) { i+1, 10, ) - d.Insert(frame, false) + d.Insert(ctx, frame, false) prevBI, _ := frame.GetSelector() prev = prevBI.FillBytes(make([]byte, 32)) } @@ -262,7 +264,7 @@ func TestDataTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := d.Insert(insertFrames[i], false) + err := d.Insert(ctx, insertFrames[i], false) assert.NoError(t, err) } @@ -284,7 +286,7 @@ func TestDataTimeReel(t *testing.T) { i+1, 10, ) - d.Insert(frame, false) + d.Insert(ctx, frame, false) prevBI, _ := frame.GetSelector() prev = prevBI.FillBytes(make([]byte, 32)) @@ -332,7 +334,7 @@ func TestDataTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := d.Insert(insertFrames[i], false) + err := d.Insert(ctx, insertFrames[i], false) assert.NoError(t, err) } @@ -395,7 +397,7 @@ func TestDataTimeReel(t *testing.T) { // Someone is honest, but running backwards: for i := 99; i >= 0; i-- { - err := d.Insert(insertFrames[i], false) + err := d.Insert(ctx, insertFrames[i], false) gotime.Sleep(1 * gotime.Second) assert.NoError(t, err) } diff --git a/node/consensus/time/master_time_reel.go b/node/consensus/time/master_time_reel.go index 4b67cad..70ef4d3 100644 --- a/node/consensus/time/master_time_reel.go +++ b/node/consensus/time/master_time_reel.go @@ -1,6 +1,7 @@ package time import ( + "context" "encoding/hex" "errors" "math/big" @@ -120,6 +121,7 @@ func (m *MasterTimeReel) Head() (*protobufs.ClockFrame, error) { // is the next one in sequence, it advances the reel head forward and emits a // new frame on the new frame channel. func (m *MasterTimeReel) Insert( + ctx context.Context, frame *protobufs.ClockFrame, isSync bool, ) error { diff --git a/node/consensus/time/master_time_reel_test.go b/node/consensus/time/master_time_reel_test.go index 62e2d44..6332d3f 100644 --- a/node/consensus/time/master_time_reel_test.go +++ b/node/consensus/time/master_time_reel_test.go @@ -1,6 +1,7 @@ package time_test import ( + "context" "strings" "sync" "testing" @@ -15,6 +16,7 @@ import ( ) func TestMasterTimeReel(t *testing.T) { + ctx := context.Background() logger, _ := zap.NewProduction() db := store.NewInMemKVDB() clockStore := store.NewPebbleClockStore(db, logger) @@ -59,7 +61,7 @@ func TestMasterTimeReel(t *testing.T) { ) assert.NoError(t, err) - err := m.Insert(frame, false) + err := m.Insert(ctx, frame, false) assert.NoError(t, err) } @@ -79,7 +81,7 @@ func TestMasterTimeReel(t *testing.T) { } for i := 99; i >= 0; i-- { - err := m.Insert(insertFrames[i], false) + err := m.Insert(ctx, insertFrames[i], false) assert.NoError(t, err) } diff --git a/node/consensus/time/time_reel.go b/node/consensus/time/time_reel.go index 3f60338..489b892 100644 --- a/node/consensus/time/time_reel.go +++ b/node/consensus/time/time_reel.go @@ -1,13 +1,15 @@ package time import ( + "context" + "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) type TimeReel interface { Start() error Stop() - Insert(frame *protobufs.ClockFrame, isSync bool) error + Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error Head() (*protobufs.ClockFrame, error) NewFrameCh() <-chan *protobufs.ClockFrame BadFrameCh() <-chan *protobufs.ClockFrame diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index 2ae742c..fee6f6e 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -2,6 +2,7 @@ package token import ( "bytes" + "context" "crypto" "encoding/binary" "encoding/hex" @@ -80,6 +81,8 @@ func (p PeerSeniorityItem) Priority() uint64 { } type TokenExecutionEngine struct { + ctx context.Context + cancel context.CancelFunc logger *zap.Logger clock *data.DataClockConsensusEngine clockStore store.ClockStore @@ -205,7 +208,10 @@ func NewTokenExecutionEngine( LoadAggregatedSeniorityMap(uint(cfg.P2P.Network)) } + ctx, cancel := context.WithCancel(context.Background()) e := &TokenExecutionEngine{ + ctx: ctx, + cancel: cancel, logger: logger, engineConfig: cfg.Engine, keyManager: keyManager, @@ -364,14 +370,19 @@ func NewTokenExecutionEngine( } // need to wait for peering + waitPeers: for { - gotime.Sleep(30 * gotime.Second) - peerMap := e.pubSub.GetBitmaskPeers() - if peers, ok := peerMap[string( - append([]byte{0x00}, e.intrinsicFilter...), - )]; ok { - if len(peers) >= 3 { - break + select { + case <-e.ctx.Done(): + return + case <-gotime.After(30 * gotime.Second): + peerMap := e.pubSub.GetBitmaskPeers() + if peers, ok := peerMap[string( + append([]byte{0x00}, e.intrinsicFilter...), + )]; ok { + if len(peers) >= 3 { + break waitPeers + } } } } @@ -441,6 +452,8 @@ func (e *TokenExecutionEngine) Start() <-chan error { // Stop implements ExecutionEngine func (e *TokenExecutionEngine) Stop(force bool) <-chan error { + e.cancel() + errChan := make(chan error) go func() {