speed up data worker connection speeds (#414)

Co-authored-by: Tyler Sturos <tyler.john@qcommander.sh>
This commit is contained in:
Tyler Sturos 2025-01-08 23:31:13 -09:00 committed by GitHub
parent 819bb26dd6
commit b14a57b25a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 137 additions and 246 deletions

View File

@ -141,6 +141,7 @@ type DataClockConsensusEngine struct {
infoMessageProcessorCh chan *pb.Message
report *protobufs.SelfTestReport
clients []protobufs.DataIPCServiceClient
clientsMx sync.Mutex
grpcRateLimiter *RateLimiter
previousFrameProven *protobufs.ClockFrame
previousTree *mt.MerkleTree
@ -562,19 +563,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error {
e.wg.Add(1)
go func() {
defer e.wg.Done()
if len(e.config.Engine.DataWorkerMultiaddrs) != 0 {
e.clients, err = e.createParallelDataClientsFromList()
if err != nil {
panic(err)
}
} else {
e.clients, err = e.createParallelDataClientsFromBaseMultiaddr(
e.config.Engine.DataWorkerCount,
)
if err != nil {
panic(err)
}
}
e.createParallelDataWorkerClients()
}()
return errChan
@ -831,211 +820,155 @@ func (e *DataClockConsensusEngine) createCommunicationKeys() error {
return nil
}
func (e *DataClockConsensusEngine) createParallelDataClientsFromListAndIndex(
index uint32,
func (e *DataClockConsensusEngine) connectToClient(
index int,
useList bool,
) (
protobufs.DataIPCServiceClient,
error,
) {
ma, err := multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[index])
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
client := protobufs.NewDataIPCServiceClient(conn)
e.logger.Info(
"connected to data worker process",
zap.Uint32("client", index),
)
return client, nil
}
func (
e *DataClockConsensusEngine,
) createParallelDataClientsFromBaseMultiaddrAndIndex(
index uint32,
) (
protobufs.DataIPCServiceClient,
error,
) {
e.logger.Info(
"re-connecting to data worker process",
zap.Uint32("client", index),
)
ma, err := multiaddr.NewMultiaddr(
fmt.Sprintf(
e.config.Engine.DataWorkerBaseListenMultiaddr,
int(e.config.Engine.DataWorkerBaseListenPort)+int(index),
),
)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
)
if err != nil {
return nil, errors.Wrap(err, "create parallel data client")
}
client := protobufs.NewDataIPCServiceClient(conn)
e.logger.Info(
"connected to data worker process",
zap.Uint32("client", index),
)
return client, nil
}
func (e *DataClockConsensusEngine) createParallelDataClientsFromList() (
[]protobufs.DataIPCServiceClient,
error,
) {
parallelism := len(e.config.Engine.DataWorkerMultiaddrs)
e.logger.Info(
"connecting to data worker processes",
zap.Int("parallelism", parallelism),
)
clients := make([]protobufs.DataIPCServiceClient, parallelism)
for i := 0; i < parallelism; i++ {
ma, err := multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[i])
if err != nil {
panic(err)
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
e.logger.Error("could not get dial args", zap.Error(err))
continue
}
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
)
if err != nil {
e.logger.Error("could not dial", zap.Error(err))
continue
}
clients[i] = protobufs.NewDataIPCServiceClient(conn)
}
e.logger.Info(
"connected to data worker processes",
zap.Int("parallelism", parallelism),
)
return clients, nil
}
func (e *DataClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr(
parallelism int,
) ([]protobufs.DataIPCServiceClient, error) {
e.logger.Info(
"connecting to data worker processes",
zap.Int("parallelism", parallelism),
)
clients := make([]protobufs.DataIPCServiceClient, parallelism)
for i := 0; i < parallelism; i++ {
ma, err := multiaddr.NewMultiaddr(
var ma multiaddr.Multiaddr
var err error
if useList {
ma, err = multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[index])
} else {
ma, err = multiaddr.NewMultiaddr(
fmt.Sprintf(
e.config.Engine.DataWorkerBaseListenMultiaddr,
int(e.config.Engine.DataWorkerBaseListenPort)+i,
int(e.config.Engine.DataWorkerBaseListenPort)+int(index),
),
)
if err != nil {
panic(err)
}
}
if err != nil {
e.logger.Error("failed to create multiaddr", zap.Error(err))
return nil, err
}
_, addr, err := mn.DialArgs(ma)
if err != nil {
e.logger.Error("could not get dial args", zap.Error(err))
continue
}
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
_, addr, err := mn.DialArgs(ma)
if err != nil {
e.logger.Error("could not get dial args",
zap.Error(err),
zap.String("multiaddr", ma.String()),
zap.Int("index", index),
)
if err != nil {
e.logger.Error("could not dial", zap.Error(err))
continue
}
return nil, err
}
clients[i] = protobufs.NewDataIPCServiceClient(conn)
ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second)
defer cancel()
conn, err := qgrpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(
insecure.NewCredentials(),
),
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(10*1024*1024),
grpc.MaxCallRecvMsgSize(10*1024*1024),
),
grpc.WithBlock(),
)
if err != nil {
e.logger.Error("could not dial",
zap.Error(err),
zap.String("multiaddr", ma.String()),
zap.Int("index", index),
)
return nil, err
}
e.logger.Info(
"connected to data worker processes",
"connected to data worker process",
zap.String("multiaddr", ma.String()),
)
return protobufs.NewDataIPCServiceClient(conn), nil
}
func (e *DataClockConsensusEngine) createParallelDataWorkerClients() {
parallelism := len(e.config.Engine.DataWorkerMultiaddrs)
useList := true
if parallelism == 0 {
parallelism = e.config.Engine.DataWorkerCount
useList = false
}
e.clientsMx.Lock()
e.clients = make([]protobufs.DataIPCServiceClient, parallelism)
e.clientsMx.Unlock()
e.logger.Info(
"connecting to data worker processes",
zap.Int("parallelism", parallelism),
)
return clients, nil
wg := sync.WaitGroup{}
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
index := i
go func() {
defer wg.Done()
client, err := e.connectToClient(index, useList)
if err != nil {
e.clientsMx.Lock()
e.clients[index] = nil
e.clientsMx.Unlock()
e.logger.Error("failed to connect to data worker", zap.Error(err))
return
}
e.clientsMx.Lock()
e.clients[index] = client
e.clientsMx.Unlock()
}()
}
wg.Wait()
}
func (e *DataClockConsensusEngine) tryReconnectDataWorkerClients() {
// could reload worker list config here
parallelism := len(e.config.Engine.DataWorkerMultiaddrs)
useList := true
if parallelism == 0 {
parallelism = e.config.Engine.DataWorkerCount
useList = false
}
wg := sync.WaitGroup{}
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
index := i
go func() {
defer wg.Done()
if e.clients[index] != nil {
return
}
for j := 3; j >= 0; j-- {
client, err := e.connectToClient(index, useList)
if err != nil {
e.clientsMx.Lock()
e.clients[index] = nil
e.clientsMx.Unlock()
e.logger.Error("failed to connect to data worker",
zap.Error(err),
zap.Int("index", index),
)
time.Sleep(50 * time.Millisecond)
continue
}
e.clientsMx.Lock()
e.logger.Info("reconnected to data worker",
zap.Int("index", index),
)
e.clients[index] = client
e.clientsMx.Unlock()
break
}
}()
}
wg.Wait()
}
func (e *DataClockConsensusEngine) GetWorkerCount() uint32 {

View File

@ -2,7 +2,6 @@ package data
import (
"bytes"
"sync"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
@ -234,6 +233,7 @@ func (e *DataClockConsensusEngine) processFrame(
latestFrame *protobufs.ClockFrame,
dataFrame *protobufs.ClockFrame,
) *protobufs.ClockFrame {
e.logger.Info(
"current frame head",
zap.Uint64("frame_number", dataFrame.FrameNumber),
@ -311,49 +311,7 @@ func (e *DataClockConsensusEngine) processFrame(
e.clientReconnectTest++
if e.clientReconnectTest >= 10 {
wg := sync.WaitGroup{}
wg.Add(len(e.clients))
for i, client := range e.clients {
i := i
client := client
go func() {
for j := 3; j >= 0; j-- {
var err error
if client == nil {
if len(e.config.Engine.DataWorkerMultiaddrs) != 0 {
e.logger.Error(
"client failed, reconnecting after 50ms",
zap.Uint32("client", uint32(i)),
)
time.Sleep(50 * time.Millisecond)
client, err = e.createParallelDataClientsFromListAndIndex(uint32(i))
if err != nil {
e.logger.Error("failed to reconnect", zap.Error(err))
}
} else if len(e.config.Engine.DataWorkerMultiaddrs) == 0 {
e.logger.Error(
"client failed, reconnecting after 50ms",
zap.Uint32("client", uint32(i)),
)
time.Sleep(50 * time.Millisecond)
client, err =
e.createParallelDataClientsFromBaseMultiaddrAndIndex(uint32(i))
if err != nil {
e.logger.Error(
"failed to reconnect",
zap.Uint32("client", uint32(i)),
zap.Error(err),
)
}
}
e.clients[i] = client
continue
}
}
wg.Done()
}()
}
wg.Wait()
e.tryReconnectDataWorkerClients()
e.clientReconnectTest = 0
}