Revert "Use buffered channels in the engines (#357)"

This reverts commit d32d79f58d.
This commit is contained in:
Cassandra Heart 2024-11-16 23:11:25 -06:00
parent d97a093c18
commit 6e2c7f4028
No known key found for this signature in database
GPG Key ID: 6352152859385958
6 changed files with 35 additions and 40 deletions

View File

@ -234,7 +234,7 @@ func NewDataClockConsensusEngine(
keyStore: keyStore,
keyManager: keyManager,
pubSub: pubSub,
frameChan: make(chan *protobufs.ClockFrame, 8),
frameChan: make(chan *protobufs.ClockFrame),
executionEngines: map[string]execution.ExecutionEngine{},
dependencyMap: make(map[string]*anypb.Any),
parentSelector: []byte{
@ -256,9 +256,9 @@ func NewDataClockConsensusEngine(
masterTimeReel: masterTimeReel,
dataTimeReel: dataTimeReel,
peerInfoManager: peerInfoManager,
frameMessageProcessorCh: make(chan *pb.Message, 8),
txMessageProcessorCh: make(chan *pb.Message, 8),
infoMessageProcessorCh: make(chan *pb.Message, 8),
frameMessageProcessorCh: make(chan *pb.Message),
txMessageProcessorCh: make(chan *pb.Message),
infoMessageProcessorCh: make(chan *pb.Message),
config: cfg,
preMidnightMint: map[string]struct{}{},
grpcRateLimiter: NewRateLimiter(

View File

@ -113,7 +113,7 @@ func TestHandlePreMidnightMint(t *testing.T) {
keyStore: keystore,
keyManager: keys.NewInMemoryKeyManager(),
pubSub: nil,
frameChan: make(chan *protobufs.ClockFrame, 8),
frameChan: make(chan *protobufs.ClockFrame),
executionEngines: map[string]execution.ExecutionEngine{},
dependencyMap: make(map[string]*anypb.Any),
parentSelector: []byte{
@ -135,9 +135,9 @@ func TestHandlePreMidnightMint(t *testing.T) {
masterTimeReel: nil,
dataTimeReel: &qtime.DataTimeReel{},
peerInfoManager: nil,
frameMessageProcessorCh: make(chan *pb.Message, 8),
txMessageProcessorCh: make(chan *pb.Message, 8),
infoMessageProcessorCh: make(chan *pb.Message, 8),
frameMessageProcessorCh: make(chan *pb.Message),
txMessageProcessorCh: make(chan *pb.Message),
infoMessageProcessorCh: make(chan *pb.Message),
config: nil,
preMidnightMint: map[string]struct{}{},
}

View File

@ -123,7 +123,7 @@ func NewMasterClockConsensusEngine(
keyManager: keyManager,
pubSub: pubSub,
executionEngines: map[string]execution.ExecutionEngine{},
frameChan: make(chan *protobufs.ClockFrame, 8),
frameChan: make(chan *protobufs.ClockFrame),
input: seed,
lastFrameReceivedAt: time.Time{},
syncingStatus: SyncStatusNotSyncing,
@ -133,7 +133,7 @@ func NewMasterClockConsensusEngine(
masterTimeReel: masterTimeReel,
peerInfoManager: peerInfoManager,
report: report,
frameValidationCh: make(chan *protobufs.ClockFrame, 8),
frameValidationCh: make(chan *protobufs.ClockFrame),
collectedProverSlots: []*protobufs.InclusionAggregateProof{},
engineConfig: engineConfig,
}

View File

@ -62,7 +62,7 @@ type DataTimeReel struct {
newFrameCh chan *protobufs.ClockFrame
badFrameCh chan *protobufs.ClockFrame
done chan bool
syncSend bool
alwaysSend bool
restore func() []*tries.RollingFrecencyCritbitTrie
}
@ -83,7 +83,7 @@ func NewDataTimeReel(
origin []byte,
initialInclusionProof *crypto.InclusionAggregateProof,
initialProverKeys [][]byte,
syncSend bool,
alwaysSend bool,
restore func() []*tries.RollingFrecencyCritbitTrie,
) *DataTimeReel {
if filter == nil {
@ -129,11 +129,11 @@ func NewDataTimeReel(
lruFrames: cache,
// pending: make(map[uint64][]*pendingFrame),
incompleteForks: make(map[uint64][]*pendingFrame),
frames: make(chan *pendingFrame, 8),
newFrameCh: make(chan *protobufs.ClockFrame, 8),
badFrameCh: make(chan *protobufs.ClockFrame, 8),
frames: make(chan *pendingFrame),
newFrameCh: make(chan *protobufs.ClockFrame),
badFrameCh: make(chan *protobufs.ClockFrame),
done: make(chan bool),
syncSend: syncSend,
alwaysSend: alwaysSend,
restore: restore,
}
}
@ -685,16 +685,15 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e
d.head = frame
d.headDistance = distance
if d.syncSend {
if d.alwaysSend {
d.newFrameCh <- frame
} else {
go func() {
select {
case d.newFrameCh <- frame:
default:
}
}()
}
go func() {
select {
case d.newFrameCh <- frame:
default:
}
}()
return nil
}
@ -993,16 +992,12 @@ func (d *DataTimeReel) forkChoice(
d.totalDistance,
)
if d.syncSend {
d.newFrameCh <- frame
} else {
go func() {
select {
case d.newFrameCh <- frame:
default:
}
}()
}
go func() {
select {
case d.newFrameCh <- frame:
default:
}
}()
}
func (d *DataTimeReel) GetTotalDistance() *big.Int {

View File

@ -66,9 +66,9 @@ func NewMasterTimeReel(
clockStore: clockStore,
frameProver: frameProver,
pending: make(map[uint64][]*protobufs.ClockFrame),
frames: make(chan *protobufs.ClockFrame, 8),
newFrameCh: make(chan *protobufs.ClockFrame, 8),
badFrameCh: make(chan *protobufs.ClockFrame, 8),
frames: make(chan *protobufs.ClockFrame),
newFrameCh: make(chan *protobufs.ClockFrame),
badFrameCh: make(chan *protobufs.ClockFrame),
done: make(chan bool),
}
}

View File

@ -222,9 +222,9 @@ func NewTokenExecutionEngine(
peerSeniority: NewFromMap(peerSeniority),
}
syncSend := false
alwaysSend := false
if bytes.Equal(config.GetGenesis().Beacon, pubSub.GetPublicKey()) {
syncSend = true
alwaysSend = true
}
restore := func() []*tries.RollingFrecencyCritbitTrie {
@ -297,7 +297,7 @@ func NewTokenExecutionEngine(
origin,
inclusionProof,
proverKeys,
syncSend,
alwaysSend,
restore,
)