From d32d79f58dede104416cc2d471dab604d62506f5 Mon Sep 17 00:00:00 2001 From: petricadaipegsp <155911522+petricadaipegsp@users.noreply.github.com> Date: Sun, 17 Nov 2024 00:58:32 +0100 Subject: [PATCH] Use buffered channels in the engines (#357) * Make time data reel sending consistent * Use buffered channels to avoid drops --- .../data/data_clock_consensus_engine.go | 8 ++-- node/consensus/data/token_handle_mint_test.go | 8 ++-- .../master/master_clock_consensus_engine.go | 4 +- node/consensus/time/data_time_reel.go | 43 +++++++++++-------- node/consensus/time/master_time_reel.go | 6 +-- .../token/token_execution_engine.go | 6 +-- 6 files changed, 40 insertions(+), 35 deletions(-) diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 31c7874..346610c 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -234,7 +234,7 @@ func NewDataClockConsensusEngine( keyStore: keyStore, keyManager: keyManager, pubSub: pubSub, - frameChan: make(chan *protobufs.ClockFrame), + frameChan: make(chan *protobufs.ClockFrame, 8), executionEngines: map[string]execution.ExecutionEngine{}, dependencyMap: make(map[string]*anypb.Any), parentSelector: []byte{ @@ -256,9 +256,9 @@ func NewDataClockConsensusEngine( masterTimeReel: masterTimeReel, dataTimeReel: dataTimeReel, peerInfoManager: peerInfoManager, - frameMessageProcessorCh: make(chan *pb.Message), - txMessageProcessorCh: make(chan *pb.Message), - infoMessageProcessorCh: make(chan *pb.Message), + frameMessageProcessorCh: make(chan *pb.Message, 8), + txMessageProcessorCh: make(chan *pb.Message, 8), + infoMessageProcessorCh: make(chan *pb.Message, 8), config: cfg, preMidnightMint: map[string]struct{}{}, grpcRateLimiter: NewRateLimiter( diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index 26a073e..b02099d 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -113,7 +113,7 @@ func TestHandlePreMidnightMint(t *testing.T) { keyStore: keystore, keyManager: keys.NewInMemoryKeyManager(), pubSub: nil, - frameChan: make(chan *protobufs.ClockFrame), + frameChan: make(chan *protobufs.ClockFrame, 8), executionEngines: map[string]execution.ExecutionEngine{}, dependencyMap: make(map[string]*anypb.Any), parentSelector: []byte{ @@ -135,9 +135,9 @@ func TestHandlePreMidnightMint(t *testing.T) { masterTimeReel: nil, dataTimeReel: &qtime.DataTimeReel{}, peerInfoManager: nil, - frameMessageProcessorCh: make(chan *pb.Message), - txMessageProcessorCh: make(chan *pb.Message), - infoMessageProcessorCh: make(chan *pb.Message), + frameMessageProcessorCh: make(chan *pb.Message, 8), + txMessageProcessorCh: make(chan *pb.Message, 8), + infoMessageProcessorCh: make(chan *pb.Message, 8), config: nil, preMidnightMint: map[string]struct{}{}, } diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index b17acdb..db45b46 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -123,7 +123,7 @@ func NewMasterClockConsensusEngine( keyManager: keyManager, pubSub: pubSub, executionEngines: map[string]execution.ExecutionEngine{}, - frameChan: make(chan *protobufs.ClockFrame), + frameChan: make(chan *protobufs.ClockFrame, 8), input: seed, lastFrameReceivedAt: time.Time{}, syncingStatus: SyncStatusNotSyncing, @@ -133,7 +133,7 @@ func NewMasterClockConsensusEngine( masterTimeReel: masterTimeReel, peerInfoManager: peerInfoManager, report: report, - frameValidationCh: make(chan *protobufs.ClockFrame), + frameValidationCh: make(chan *protobufs.ClockFrame, 8), collectedProverSlots: []*protobufs.InclusionAggregateProof{}, engineConfig: engineConfig, } diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index a46e499..694ab4f 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -62,7 +62,7 @@ type DataTimeReel struct { newFrameCh chan *protobufs.ClockFrame badFrameCh chan *protobufs.ClockFrame done chan bool - alwaysSend bool + syncSend bool restore func() []*tries.RollingFrecencyCritbitTrie } @@ -83,7 +83,7 @@ func NewDataTimeReel( origin []byte, initialInclusionProof *crypto.InclusionAggregateProof, initialProverKeys [][]byte, - alwaysSend bool, + syncSend bool, restore func() []*tries.RollingFrecencyCritbitTrie, ) *DataTimeReel { if filter == nil { @@ -129,11 +129,11 @@ func NewDataTimeReel( lruFrames: cache, // pending: make(map[uint64][]*pendingFrame), incompleteForks: make(map[uint64][]*pendingFrame), - frames: make(chan *pendingFrame), - newFrameCh: make(chan *protobufs.ClockFrame), - badFrameCh: make(chan *protobufs.ClockFrame), + frames: make(chan *pendingFrame, 8), + newFrameCh: make(chan *protobufs.ClockFrame, 8), + badFrameCh: make(chan *protobufs.ClockFrame, 8), done: make(chan bool), - alwaysSend: alwaysSend, + syncSend: syncSend, restore: restore, } } @@ -685,15 +685,16 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e d.head = frame d.headDistance = distance - if d.alwaysSend { + if d.syncSend { d.newFrameCh <- frame + } else { + go func() { + select { + case d.newFrameCh <- frame: + default: + } + }() } - go func() { - select { - case d.newFrameCh <- frame: - default: - } - }() return nil } @@ -992,12 +993,16 @@ func (d *DataTimeReel) forkChoice( d.totalDistance, ) - go func() { - select { - case d.newFrameCh <- frame: - default: - } - }() + if d.syncSend { + d.newFrameCh <- frame + } else { + go func() { + select { + case d.newFrameCh <- frame: + default: + } + }() + } } func (d *DataTimeReel) GetTotalDistance() *big.Int { diff --git a/node/consensus/time/master_time_reel.go b/node/consensus/time/master_time_reel.go index aec5833..801cc4e 100644 --- a/node/consensus/time/master_time_reel.go +++ b/node/consensus/time/master_time_reel.go @@ -66,9 +66,9 @@ func NewMasterTimeReel( clockStore: clockStore, frameProver: frameProver, pending: make(map[uint64][]*protobufs.ClockFrame), - frames: make(chan *protobufs.ClockFrame), - newFrameCh: make(chan *protobufs.ClockFrame), - badFrameCh: make(chan *protobufs.ClockFrame), + frames: make(chan *protobufs.ClockFrame, 8), + newFrameCh: make(chan *protobufs.ClockFrame, 8), + badFrameCh: make(chan *protobufs.ClockFrame, 8), done: make(chan bool), } } diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index 645564a..b3bd426 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -222,9 +222,9 @@ func NewTokenExecutionEngine( peerSeniority: NewFromMap(peerSeniority), } - alwaysSend := false + syncSend := false if bytes.Equal(config.GetGenesis().Beacon, pubSub.GetPublicKey()) { - alwaysSend = true + syncSend = true } restore := func() []*tries.RollingFrecencyCritbitTrie { @@ -297,7 +297,7 @@ func NewTokenExecutionEngine( origin, inclusionProof, proverKeys, - alwaysSend, + syncSend, restore, )