Use buffered channels when applicable (#373)

* Use buffered channels when applicable

* Do not start additional goroutines for processing

* Use context to stop ongoing loops
This commit is contained in:
petricadaipegsp 2024-11-22 02:32:04 +01:00 committed by GitHub
parent af0eded231
commit 44ccd14871
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 183 additions and 121 deletions

View File

@ -17,30 +17,39 @@ import (
func (e *DataClockConsensusEngine) handleFrameMessage(
message *pb.Message,
) error {
go func() {
e.frameMessageProcessorCh <- message
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.frameMessageProcessorCh <- message:
default:
e.logger.Warn("dropping frame message")
}
return nil
}
func (e *DataClockConsensusEngine) handleTxMessage(
message *pb.Message,
) error {
go func() {
e.txMessageProcessorCh <- message
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.txMessageProcessorCh <- message:
default:
e.logger.Warn("dropping tx message")
}
return nil
}
func (e *DataClockConsensusEngine) handleInfoMessage(
message *pb.Message,
) error {
go func() {
e.infoMessageProcessorCh <- message
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.infoMessageProcessorCh <- message:
default:
e.logger.Warn("dropping info message")
}
return nil
}
@ -130,9 +139,13 @@ func (e *DataClockConsensusEngine) insertTxMessage(
Seqno: nil,
}
go func() {
e.txMessageProcessorCh <- m
}()
select {
case <-e.ctx.Done():
return e.ctx.Err()
case e.txMessageProcessorCh <- m:
default:
e.logger.Warn("dropping tx message")
}
return nil
}

View File

@ -7,7 +7,6 @@ import (
"golang.org/x/crypto/sha3"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/consensus/data/internal"
"source.quilibrium.com/quilibrium/monorepo/node/internal/frametime"
@ -318,7 +317,7 @@ func (e *DataClockConsensusEngine) sync(
syncTimeout = defaultSyncTimeout
}
for e.GetState() < consensus.EngineStateStopping {
for {
ctx, cancel := context.WithTimeout(e.ctx, syncTimeout)
response, err := client.GetDataFrame(
ctx,
@ -364,11 +363,10 @@ func (e *DataClockConsensusEngine) sync(
); err != nil {
return nil, errors.Wrap(err, "sync")
}
e.dataTimeReel.Insert(response.ClockFrame, true)
e.dataTimeReel.Insert(e.ctx, response.ClockFrame, true)
latest = response.ClockFrame
if latest.FrameNumber >= maxFrame {
return latest, nil
}
}
return latest, nil
}

View File

@ -257,9 +257,9 @@ func NewDataClockConsensusEngine(
masterTimeReel: masterTimeReel,
dataTimeReel: dataTimeReel,
peerInfoManager: peerInfoManager,
frameMessageProcessorCh: make(chan *pb.Message),
txMessageProcessorCh: make(chan *pb.Message),
infoMessageProcessorCh: make(chan *pb.Message),
frameMessageProcessorCh: make(chan *pb.Message, 65536),
txMessageProcessorCh: make(chan *pb.Message, 65536),
infoMessageProcessorCh: make(chan *pb.Message, 65536),
config: cfg,
preMidnightMint: map[string]struct{}{},
grpcRateLimiter: NewRateLimiter(
@ -368,16 +368,19 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
panic(err)
}
source := rand.New(rand.NewSource(rand.Int63()))
for e.GetState() < consensus.EngineStateStopping {
for {
// Use exponential backoff with jitter in order to avoid hammering the bootstrappers.
time.Sleep(
backoff.FullJitter(
baseDuration<<currentBackoff,
baseDuration,
baseDuration<<maxBackoff,
source,
),
duration := backoff.FullJitter(
baseDuration<<currentBackoff,
baseDuration,
baseDuration<<maxBackoff,
source,
)
select {
case <-e.ctx.Done():
return
case <-time.After(duration):
}
currentHead, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
@ -406,7 +409,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
if frame.FrameNumber-100 >= nextFrame.FrameNumber ||
nextFrame.FrameNumber == 0 {
time.Sleep(120 * time.Second)
select {
case <-e.ctx.Done():
return
case <-time.After(2 * time.Minute):
}
continue
}
@ -485,7 +492,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
thresholdBeforeConfirming--
}
time.Sleep(120 * time.Second)
select {
case <-e.ctx.Done():
return
case <-time.After(2 * time.Minute):
}
}
}()
@ -494,7 +505,11 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
go e.runFramePruning()
go func() {
time.Sleep(30 * time.Second)
select {
case <-e.ctx.Done():
return
case <-time.After(30 * time.Second):
}
e.logger.Info("checking for snapshots to play forward")
if err := e.downloadSnapshot(e.config.DB.Path, e.config.P2P.Network); err != nil {
e.logger.Debug("error downloading snapshot", zap.Error(err))

View File

@ -113,14 +113,18 @@ func (e *DataClockConsensusEngine) runSync() {
func (e *DataClockConsensusEngine) runLoop() {
dataFrameCh := e.dataTimeReel.NewFrameCh()
runOnce := true
for e.GetState() < consensus.EngineStateStopping {
for {
peerCount := e.pubSub.GetNetworkPeersCount()
if peerCount < e.minimumPeersRequired {
e.logger.Info(
"waiting for minimum peers",
zap.Int("peer_count", peerCount),
)
time.Sleep(1 * time.Second)
select {
case <-e.ctx.Done():
return
case <-time.After(1 * time.Second):
}
} else {
latestFrame, err := e.dataTimeReel.Head()
if err != nil {
@ -205,7 +209,7 @@ func (e *DataClockConsensusEngine) processFrame(
return dataFrame
}
e.dataTimeReel.Insert(nextFrame, true)
e.dataTimeReel.Insert(e.ctx, nextFrame, true)
return nextFrame
} else {

View File

@ -3,6 +3,7 @@ package data
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
@ -13,14 +14,15 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"source.quilibrium.com/quilibrium/monorepo/node/config"
"source.quilibrium.com/quilibrium/monorepo/node/consensus"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token/application"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
func (e *DataClockConsensusEngine) runFrameMessageHandler() {
for e.GetState() < consensus.EngineStateStopping {
for {
select {
case <-e.ctx.Done():
return
case message := <-e.frameMessageProcessorCh:
e.logger.Debug("handling frame message")
msg := &protobufs.Message{}
@ -49,26 +51,26 @@ func (e *DataClockConsensusEngine) runFrameMessageHandler() {
continue
}
go func() {
switch any.TypeUrl {
case protobufs.ClockFrameType:
if err := e.handleClockFrameData(
message.From,
msg.Address,
any,
false,
); err != nil {
return
}
switch any.TypeUrl {
case protobufs.ClockFrameType:
if err := e.handleClockFrameData(
message.From,
msg.Address,
any,
false,
); err != nil {
e.logger.Debug("could not handle clock frame data", zap.Error(err))
}
}()
}
}
}
}
func (e *DataClockConsensusEngine) runTxMessageHandler() {
for e.GetState() < consensus.EngineStateStopping {
for {
select {
case <-e.ctx.Done():
return
case message := <-e.txMessageProcessorCh:
e.logger.Debug("handling tx message")
msg := &protobufs.Message{}
@ -97,9 +99,12 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() {
}
if e.frameProverTries[0].Contains(e.provingKeyAddress) {
wg := &sync.WaitGroup{}
for name := range e.executionEngines {
name := name
wg.Add(1)
go func() error {
defer wg.Done()
messages, err := e.executionEngines[name].ProcessMessage(
application.TOKEN_ADDRESS,
msg,
@ -125,18 +130,17 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() {
continue
}
e.logger.Debug(appMsg.TypeUrl)
switch appMsg.TypeUrl {
case protobufs.TokenRequestType:
t := &protobufs.TokenRequest{}
err := proto.Unmarshal(appMsg.Value, t)
if err != nil {
e.logger.Debug("could not unmarshal token request", zap.Error(err))
continue
}
if err := e.handleTokenRequest(t); err != nil {
continue
e.logger.Debug("could not handle token request", zap.Error(err))
}
}
}
@ -144,14 +148,17 @@ func (e *DataClockConsensusEngine) runTxMessageHandler() {
return nil
}()
}
wg.Wait()
}
}
}
}
func (e *DataClockConsensusEngine) runInfoMessageHandler() {
for e.GetState() < consensus.EngineStateStopping {
for {
select {
case <-e.ctx.Done():
return
case message := <-e.infoMessageProcessorCh:
e.logger.Debug("handling info message")
msg := &protobufs.Message{}
@ -180,18 +187,16 @@ func (e *DataClockConsensusEngine) runInfoMessageHandler() {
continue
}
go func() {
switch any.TypeUrl {
case protobufs.DataPeerListAnnounceType:
if err := e.handleDataPeerListAnnounce(
message.From,
msg.Address,
any,
); err != nil {
return
}
switch any.TypeUrl {
case protobufs.DataPeerListAnnounceType:
if err := e.handleDataPeerListAnnounce(
message.From,
msg.Address,
any,
); err != nil {
e.logger.Debug("could not handle data peer list announce", zap.Error(err))
}
}()
}
}
}
}
@ -249,7 +254,7 @@ func (e *DataClockConsensusEngine) handleClockFrame(
}
if frame.FrameNumber > head.FrameNumber {
e.dataTimeReel.Insert(frame, false)
e.dataTimeReel.Insert(e.ctx, frame, false)
}
return nil

View File

@ -2,6 +2,7 @@ package master
import (
"bytes"
"context"
"encoding/binary"
"strings"
"time"
@ -154,7 +155,7 @@ func (e *MasterClockConsensusEngine) publishProof(
zap.Uint64("frame_number", frame.FrameNumber),
)
e.masterTimeReel.Insert(frame, false)
e.masterTimeReel.Insert(context.TODO(), frame, false)
}
e.state = consensus.EngineStateCollecting

View File

@ -2,6 +2,7 @@ package master
import (
"bytes"
"context"
gcrypto "crypto"
"encoding/hex"
"math/big"
@ -207,7 +208,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
continue
}
e.masterTimeReel.Insert(newFrame, false)
e.masterTimeReel.Insert(context.TODO(), newFrame, false)
}
}
}()

View File

@ -2,6 +2,7 @@ package time
import (
"bytes"
"context"
"encoding/hex"
"math/big"
"os"
@ -32,7 +33,9 @@ type pendingFrame struct {
type DataTimeReel struct {
rwMutex sync.RWMutex
running bool
ctx context.Context
cancel context.CancelFunc
filter []byte
engineConfig *config.EngineConfig
@ -61,7 +64,6 @@ type DataTimeReel struct {
frames chan *pendingFrame
newFrameCh chan *protobufs.ClockFrame
badFrameCh chan *protobufs.ClockFrame
done chan bool
alwaysSend bool
restore func() []*tries.RollingFrecencyCritbitTrie
}
@ -115,8 +117,10 @@ func NewDataTimeReel(
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
return &DataTimeReel{
running: false,
ctx: ctx,
cancel: cancel,
logger: logger,
filter: filter,
engineConfig: engineConfig,
@ -129,10 +133,9 @@ func NewDataTimeReel(
lruFrames: cache,
// pending: make(map[uint64][]*pendingFrame),
incompleteForks: make(map[uint64][]*pendingFrame),
frames: make(chan *pendingFrame),
frames: make(chan *pendingFrame, 65536),
newFrameCh: make(chan *protobufs.ClockFrame),
badFrameCh: make(chan *protobufs.ClockFrame),
done: make(chan bool),
alwaysSend: alwaysSend,
restore: restore,
}
@ -172,17 +175,12 @@ func (d *DataTimeReel) Start() error {
d.headDistance, err = d.GetDistance(frame)
}
d.running = true
go d.runLoop()
return nil
}
func (d *DataTimeReel) SetHead(frame *protobufs.ClockFrame) {
if d.running == true {
panic("internal test function should never be called outside of tests")
}
d.head = frame
}
@ -193,9 +191,9 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) {
// Insert enqueues a structurally valid frame into the time reel. If the frame
// is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel.
func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error {
if !d.running {
return nil
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error {
if err := d.ctx.Err(); err != nil {
return err
}
d.logger.Debug(
@ -222,13 +220,17 @@ func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame, isSync bool) error {
d.storePending(selector, parent, distance, frame)
if d.head.FrameNumber+1 == frame.FrameNumber {
go func() {
d.frames <- &pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frame.FrameNumber,
}
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-d.ctx.Done():
return d.ctx.Err()
case d.frames <- &pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frame.FrameNumber,
}:
}
}
}
@ -250,7 +252,7 @@ func (d *DataTimeReel) BadFrameCh() <-chan *protobufs.ClockFrame {
}
func (d *DataTimeReel) Stop() {
d.done <- true
d.cancel()
}
func (d *DataTimeReel) createGenesisFrame() (
@ -336,8 +338,10 @@ func (d *DataTimeReel) createGenesisFrame() (
// Main data consensus loop
func (d *DataTimeReel) runLoop() {
for d.running {
for {
select {
case <-d.ctx.Done():
return
case frame := <-d.frames:
rawFrame, err := d.clockStore.GetStagedDataClockFrame(
d.filter,
@ -459,9 +463,6 @@ func (d *DataTimeReel) runLoop() {
// }
// }
}
case <-d.done:
d.running = false
return
}
}
}
@ -563,8 +564,7 @@ func (d *DataTimeReel) processPending(
for {
select {
case <-d.done:
d.running = false
case <-d.ctx.Done():
return
default:
}
@ -686,14 +686,19 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) e
d.headDistance = distance
if d.alwaysSend {
d.newFrameCh <- frame
}
go func() {
select {
case <-d.ctx.Done():
return d.ctx.Err()
case d.newFrameCh <- frame:
}
} else {
select {
case <-d.ctx.Done():
return d.ctx.Err()
case d.newFrameCh <- frame:
default:
}
}()
}
return nil
}
@ -992,12 +997,11 @@ func (d *DataTimeReel) forkChoice(
d.totalDistance,
)
go func() {
select {
case d.newFrameCh <- frame:
default:
}
}()
select {
case <-d.ctx.Done():
case d.newFrameCh <- frame:
default:
}
}
func (d *DataTimeReel) GetTotalDistance() *big.Int {

View File

@ -2,6 +2,7 @@ package time_test
import (
"bytes"
"context"
"fmt"
"math/rand"
"strings"
@ -108,6 +109,7 @@ func generateTestProvers() (
}
func TestDataTimeReel(t *testing.T) {
ctx := context.Background()
logger, _ := zap.NewDevelopment()
db := store.NewInMemKVDB()
clockStore := store.NewPebbleClockStore(db, logger)
@ -231,7 +233,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(frame, false)
d.Insert(ctx, frame, false)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
}
@ -262,7 +264,7 @@ func TestDataTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := d.Insert(insertFrames[i], false)
err := d.Insert(ctx, insertFrames[i], false)
assert.NoError(t, err)
}
@ -284,7 +286,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(frame, false)
d.Insert(ctx, frame, false)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
@ -332,7 +334,7 @@ func TestDataTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := d.Insert(insertFrames[i], false)
err := d.Insert(ctx, insertFrames[i], false)
assert.NoError(t, err)
}
@ -395,7 +397,7 @@ func TestDataTimeReel(t *testing.T) {
// Someone is honest, but running backwards:
for i := 99; i >= 0; i-- {
err := d.Insert(insertFrames[i], false)
err := d.Insert(ctx, insertFrames[i], false)
gotime.Sleep(1 * gotime.Second)
assert.NoError(t, err)
}

View File

@ -1,6 +1,7 @@
package time
import (
"context"
"encoding/hex"
"errors"
"math/big"
@ -120,6 +121,7 @@ func (m *MasterTimeReel) Head() (*protobufs.ClockFrame, error) {
// is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel.
func (m *MasterTimeReel) Insert(
ctx context.Context,
frame *protobufs.ClockFrame,
isSync bool,
) error {

View File

@ -1,6 +1,7 @@
package time_test
import (
"context"
"strings"
"sync"
"testing"
@ -15,6 +16,7 @@ import (
)
func TestMasterTimeReel(t *testing.T) {
ctx := context.Background()
logger, _ := zap.NewProduction()
db := store.NewInMemKVDB()
clockStore := store.NewPebbleClockStore(db, logger)
@ -59,7 +61,7 @@ func TestMasterTimeReel(t *testing.T) {
)
assert.NoError(t, err)
err := m.Insert(frame, false)
err := m.Insert(ctx, frame, false)
assert.NoError(t, err)
}
@ -79,7 +81,7 @@ func TestMasterTimeReel(t *testing.T) {
}
for i := 99; i >= 0; i-- {
err := m.Insert(insertFrames[i], false)
err := m.Insert(ctx, insertFrames[i], false)
assert.NoError(t, err)
}

View File

@ -1,13 +1,15 @@
package time
import (
"context"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
type TimeReel interface {
Start() error
Stop()
Insert(frame *protobufs.ClockFrame, isSync bool) error
Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error
Head() (*protobufs.ClockFrame, error)
NewFrameCh() <-chan *protobufs.ClockFrame
BadFrameCh() <-chan *protobufs.ClockFrame

View File

@ -2,6 +2,7 @@ package token
import (
"bytes"
"context"
"crypto"
"encoding/binary"
"encoding/hex"
@ -80,6 +81,8 @@ func (p PeerSeniorityItem) Priority() uint64 {
}
type TokenExecutionEngine struct {
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
clock *data.DataClockConsensusEngine
clockStore store.ClockStore
@ -205,7 +208,10 @@ func NewTokenExecutionEngine(
LoadAggregatedSeniorityMap(uint(cfg.P2P.Network))
}
ctx, cancel := context.WithCancel(context.Background())
e := &TokenExecutionEngine{
ctx: ctx,
cancel: cancel,
logger: logger,
engineConfig: cfg.Engine,
keyManager: keyManager,
@ -364,14 +370,19 @@ func NewTokenExecutionEngine(
}
// need to wait for peering
waitPeers:
for {
gotime.Sleep(30 * gotime.Second)
peerMap := e.pubSub.GetBitmaskPeers()
if peers, ok := peerMap[string(
append([]byte{0x00}, e.intrinsicFilter...),
)]; ok {
if len(peers) >= 3 {
break
select {
case <-e.ctx.Done():
return
case <-gotime.After(30 * gotime.Second):
peerMap := e.pubSub.GetBitmaskPeers()
if peers, ok := peerMap[string(
append([]byte{0x00}, e.intrinsicFilter...),
)]; ok {
if len(peers) >= 3 {
break waitPeers
}
}
}
}
@ -441,6 +452,8 @@ func (e *TokenExecutionEngine) Start() <-chan error {
// Stop implements ExecutionEngine
func (e *TokenExecutionEngine) Stop(force bool) <-chan error {
e.cancel()
errChan := make(chan error)
go func() {