From 6e2c7f402854e77188a3a7c44fa75c6d21119b04 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Sat, 16 Nov 2024 23:11:25 -0600 Subject: [PATCH] Revert "Use buffered channels in the engines (#357)" This reverts commit d32d79f58dede104416cc2d471dab604d62506f5. --- .../data/data_clock_consensus_engine.go | 8 ++-- node/consensus/data/token_handle_mint_test.go | 8 ++-- .../master/master_clock_consensus_engine.go | 4 +- node/consensus/time/data_time_reel.go | 43 ++++++++----------- node/consensus/time/master_time_reel.go | 6 +-- .../token/token_execution_engine.go | 6 +-- 6 files changed, 35 insertions(+), 40 deletions(-) diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 222aab1..fd4e3b3 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -234,7 +234,7 @@ func NewDataClockConsensusEngine( keyStore: keyStore, keyManager: keyManager, pubSub: pubSub, - frameChan: make(chan *protobufs.ClockFrame, 8), + frameChan: make(chan *protobufs.ClockFrame), executionEngines: map[string]execution.ExecutionEngine{}, dependencyMap: make(map[string]*anypb.Any), parentSelector: []byte{ @@ -256,9 +256,9 @@ func NewDataClockConsensusEngine( masterTimeReel: masterTimeReel, dataTimeReel: dataTimeReel, peerInfoManager: peerInfoManager, - frameMessageProcessorCh: make(chan *pb.Message, 8), - txMessageProcessorCh: make(chan *pb.Message, 8), - infoMessageProcessorCh: make(chan *pb.Message, 8), + frameMessageProcessorCh: make(chan *pb.Message), + txMessageProcessorCh: make(chan *pb.Message), + infoMessageProcessorCh: make(chan *pb.Message), config: cfg, preMidnightMint: map[string]struct{}{}, grpcRateLimiter: NewRateLimiter( diff --git a/node/consensus/data/token_handle_mint_test.go b/node/consensus/data/token_handle_mint_test.go index b02099d..26a073e 100644 --- a/node/consensus/data/token_handle_mint_test.go +++ b/node/consensus/data/token_handle_mint_test.go @@ -113,7 +113,7 @@ func TestHandlePreMidnightMint(t *testing.T) { keyStore: keystore, keyManager: keys.NewInMemoryKeyManager(), pubSub: nil, - frameChan: make(chan *protobufs.ClockFrame, 8), + frameChan: make(chan *protobufs.ClockFrame), executionEngines: map[string]execution.ExecutionEngine{}, dependencyMap: make(map[string]*anypb.Any), parentSelector: []byte{ @@ -135,9 +135,9 @@ func TestHandlePreMidnightMint(t *testing.T) { masterTimeReel: nil, dataTimeReel: &qtime.DataTimeReel{}, peerInfoManager: nil, - frameMessageProcessorCh: make(chan *pb.Message, 8), - txMessageProcessorCh: make(chan *pb.Message, 8), - infoMessageProcessorCh: make(chan *pb.Message, 8), + frameMessageProcessorCh: make(chan *pb.Message), + txMessageProcessorCh: make(chan *pb.Message), + infoMessageProcessorCh: make(chan *pb.Message), config: nil, preMidnightMint: map[string]struct{}{}, } diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index db45b46..b17acdb 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -123,7 +123,7 @@ func NewMasterClockConsensusEngine( keyManager: keyManager, pubSub: pubSub, executionEngines: map[string]execution.ExecutionEngine{}, - frameChan: make(chan *protobufs.ClockFrame, 8), + frameChan: make(chan *protobufs.ClockFrame), input: seed, lastFrameReceivedAt: time.Time{}, syncingStatus: SyncStatusNotSyncing, @@ -133,7 +133,7 @@ func NewMasterClockConsensusEngine( masterTimeReel: masterTimeReel, peerInfoManager: peerInfoManager, report: report, - frameValidationCh: make(chan *protobufs.ClockFrame, 8), + frameValidationCh: make(chan *protobufs.ClockFrame), collectedProverSlots: []*protobufs.InclusionAggregateProof{}, engineConfig: engineConfig, } diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index 694ab4f..a46e499 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -62,7 +62,7 @@ type DataTimeReel struct { newFrameCh chan *protobufs.ClockFrame badFrameCh chan *protobufs.ClockFrame done chan bool - syncSend bool + alwaysSend bool restore func() []*tries.RollingFrecencyCritbitTrie } @@ -83,7 +83,7 @@ func NewDataTimeReel( origin []byte, initialInclusionProof *crypto.InclusionAggregateProof, initialProverKeys [][]byte, - syncSend bool, + alwaysSend bool, restore func() []*tries.RollingFrecencyCritbitTrie, ) *DataTimeReel { if filter == nil { @@ -129,11 +129,11 @@ func NewDataTimeReel( lruFrames: cache, // pending: make(map[uint64][]*pendingFrame), incompleteForks: make(map[uint64][]*pendingFrame), - frames: make(chan *pendingFrame, 8), - newFrameCh: make(chan *protobufs.ClockFrame, 8), - badFrameCh: make(chan *protobufs.ClockFrame, 8), + frames: make(chan *pendingFrame), + newFrameCh: make(chan *protobufs.ClockFrame), + badFrameCh: make(chan *protobufs.ClockFrame), done: make(chan bool), - syncSend: syncSend, + alwaysSend: alwaysSend, restore: restore, } } @@ -685,16 +685,15 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e d.head = frame d.headDistance = distance - if d.syncSend { + if d.alwaysSend { d.newFrameCh <- frame - } else { - go func() { - select { - case d.newFrameCh <- frame: - default: - } - }() } + go func() { + select { + case d.newFrameCh <- frame: + default: + } + }() return nil } @@ -993,16 +992,12 @@ func (d *DataTimeReel) forkChoice( d.totalDistance, ) - if d.syncSend { - d.newFrameCh <- frame - } else { - go func() { - select { - case d.newFrameCh <- frame: - default: - } - }() - } + go func() { + select { + case d.newFrameCh <- frame: + default: + } + }() } func (d *DataTimeReel) GetTotalDistance() *big.Int { diff --git a/node/consensus/time/master_time_reel.go b/node/consensus/time/master_time_reel.go index 801cc4e..aec5833 100644 --- a/node/consensus/time/master_time_reel.go +++ b/node/consensus/time/master_time_reel.go @@ -66,9 +66,9 @@ func NewMasterTimeReel( clockStore: clockStore, frameProver: frameProver, pending: make(map[uint64][]*protobufs.ClockFrame), - frames: make(chan *protobufs.ClockFrame, 8), - newFrameCh: make(chan *protobufs.ClockFrame, 8), - badFrameCh: make(chan *protobufs.ClockFrame, 8), + frames: make(chan *protobufs.ClockFrame), + newFrameCh: make(chan *protobufs.ClockFrame), + badFrameCh: make(chan *protobufs.ClockFrame), done: make(chan bool), } } diff --git a/node/execution/intrinsics/token/token_execution_engine.go b/node/execution/intrinsics/token/token_execution_engine.go index ab93c63..b8ce7ca 100644 --- a/node/execution/intrinsics/token/token_execution_engine.go +++ b/node/execution/intrinsics/token/token_execution_engine.go @@ -222,9 +222,9 @@ func NewTokenExecutionEngine( peerSeniority: NewFromMap(peerSeniority), } - syncSend := false + alwaysSend := false if bytes.Equal(config.GetGenesis().Beacon, pubSub.GetPublicKey()) { - syncSend = true + alwaysSend = true } restore := func() []*tries.RollingFrecencyCritbitTrie { @@ -297,7 +297,7 @@ func NewTokenExecutionEngine( origin, inclusionProof, proverKeys, - syncSend, + alwaysSend, restore, )