From b14a57b25a59f9273bf308c673b5a24a6a43e73e Mon Sep 17 00:00:00 2001 From: Tyler Sturos <55340199+tjsturos@users.noreply.github.com> Date: Wed, 8 Jan 2025 23:31:13 -0900 Subject: [PATCH] speed up data worker connection speeds (#414) Co-authored-by: Tyler Sturos --- .../data/data_clock_consensus_engine.go | 337 +++++++----------- node/consensus/data/main_data_loop.go | 46 +-- 2 files changed, 137 insertions(+), 246 deletions(-) diff --git a/node/consensus/data/data_clock_consensus_engine.go b/node/consensus/data/data_clock_consensus_engine.go index 14e171e..0ca6f89 100644 --- a/node/consensus/data/data_clock_consensus_engine.go +++ b/node/consensus/data/data_clock_consensus_engine.go @@ -141,6 +141,7 @@ type DataClockConsensusEngine struct { infoMessageProcessorCh chan *pb.Message report *protobufs.SelfTestReport clients []protobufs.DataIPCServiceClient + clientsMx sync.Mutex grpcRateLimiter *RateLimiter previousFrameProven *protobufs.ClockFrame previousTree *mt.MerkleTree @@ -562,19 +563,7 @@ func (e *DataClockConsensusEngine) Start() <-chan error { e.wg.Add(1) go func() { defer e.wg.Done() - if len(e.config.Engine.DataWorkerMultiaddrs) != 0 { - e.clients, err = e.createParallelDataClientsFromList() - if err != nil { - panic(err) - } - } else { - e.clients, err = e.createParallelDataClientsFromBaseMultiaddr( - e.config.Engine.DataWorkerCount, - ) - if err != nil { - panic(err) - } - } + e.createParallelDataWorkerClients() }() return errChan @@ -831,211 +820,155 @@ func (e *DataClockConsensusEngine) createCommunicationKeys() error { return nil } -func (e *DataClockConsensusEngine) createParallelDataClientsFromListAndIndex( - index uint32, +func (e *DataClockConsensusEngine) connectToClient( + index int, + useList bool, ) ( protobufs.DataIPCServiceClient, error, ) { - ma, err := multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[index]) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - _, addr, err := mn.DialArgs(ma) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) - defer cancel() - conn, err := qgrpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(10*1024*1024), - grpc.MaxCallRecvMsgSize(10*1024*1024), - ), - grpc.WithBlock(), - ) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - client := protobufs.NewDataIPCServiceClient(conn) - - e.logger.Info( - "connected to data worker process", - zap.Uint32("client", index), - ) - return client, nil -} - -func ( - e *DataClockConsensusEngine, -) createParallelDataClientsFromBaseMultiaddrAndIndex( - index uint32, -) ( - protobufs.DataIPCServiceClient, - error, -) { - e.logger.Info( - "re-connecting to data worker process", - zap.Uint32("client", index), - ) - - ma, err := multiaddr.NewMultiaddr( - fmt.Sprintf( - e.config.Engine.DataWorkerBaseListenMultiaddr, - int(e.config.Engine.DataWorkerBaseListenPort)+int(index), - ), - ) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - _, addr, err := mn.DialArgs(ma) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) - defer cancel() - conn, err := qgrpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(10*1024*1024), - grpc.MaxCallRecvMsgSize(10*1024*1024), - ), - grpc.WithBlock(), - ) - if err != nil { - return nil, errors.Wrap(err, "create parallel data client") - } - - client := protobufs.NewDataIPCServiceClient(conn) - - e.logger.Info( - "connected to data worker process", - zap.Uint32("client", index), - ) - return client, nil -} - -func (e *DataClockConsensusEngine) createParallelDataClientsFromList() ( - []protobufs.DataIPCServiceClient, - error, -) { - parallelism := len(e.config.Engine.DataWorkerMultiaddrs) - - e.logger.Info( - "connecting to data worker processes", - zap.Int("parallelism", parallelism), - ) - - clients := make([]protobufs.DataIPCServiceClient, parallelism) - - for i := 0; i < parallelism; i++ { - ma, err := multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[i]) - if err != nil { - panic(err) - } - - _, addr, err := mn.DialArgs(ma) - if err != nil { - e.logger.Error("could not get dial args", zap.Error(err)) - continue - } - - ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) - defer cancel() - conn, err := qgrpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(10*1024*1024), - grpc.MaxCallRecvMsgSize(10*1024*1024), - ), - grpc.WithBlock(), - ) - if err != nil { - e.logger.Error("could not dial", zap.Error(err)) - continue - } - - clients[i] = protobufs.NewDataIPCServiceClient(conn) - } - - e.logger.Info( - "connected to data worker processes", - zap.Int("parallelism", parallelism), - ) - return clients, nil -} - -func (e *DataClockConsensusEngine) createParallelDataClientsFromBaseMultiaddr( - parallelism int, -) ([]protobufs.DataIPCServiceClient, error) { - e.logger.Info( - "connecting to data worker processes", - zap.Int("parallelism", parallelism), - ) - - clients := make([]protobufs.DataIPCServiceClient, parallelism) - - for i := 0; i < parallelism; i++ { - ma, err := multiaddr.NewMultiaddr( + var ma multiaddr.Multiaddr + var err error + if useList { + ma, err = multiaddr.NewMultiaddr(e.config.Engine.DataWorkerMultiaddrs[index]) + } else { + ma, err = multiaddr.NewMultiaddr( fmt.Sprintf( e.config.Engine.DataWorkerBaseListenMultiaddr, - int(e.config.Engine.DataWorkerBaseListenPort)+i, + int(e.config.Engine.DataWorkerBaseListenPort)+int(index), ), ) - if err != nil { - panic(err) - } + } + if err != nil { + e.logger.Error("failed to create multiaddr", zap.Error(err)) + return nil, err + } - _, addr, err := mn.DialArgs(ma) - if err != nil { - e.logger.Error("could not get dial args", zap.Error(err)) - continue - } - ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) - defer cancel() - conn, err := qgrpc.DialContext( - ctx, - addr, - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - grpc.WithDefaultCallOptions( - grpc.MaxCallSendMsgSize(10*1024*1024), - grpc.MaxCallRecvMsgSize(10*1024*1024), - ), - grpc.WithBlock(), + _, addr, err := mn.DialArgs(ma) + + if err != nil { + e.logger.Error("could not get dial args", + zap.Error(err), + zap.String("multiaddr", ma.String()), + zap.Int("index", index), ) - if err != nil { - e.logger.Error("could not dial", zap.Error(err)) - continue - } + return nil, err + } - clients[i] = protobufs.NewDataIPCServiceClient(conn) + ctx, cancel := context.WithTimeout(e.ctx, 1*time.Second) + defer cancel() + conn, err := qgrpc.DialContext( + ctx, + addr, + grpc.WithTransportCredentials( + insecure.NewCredentials(), + ), + grpc.WithDefaultCallOptions( + grpc.MaxCallSendMsgSize(10*1024*1024), + grpc.MaxCallRecvMsgSize(10*1024*1024), + ), + grpc.WithBlock(), + ) + if err != nil { + e.logger.Error("could not dial", + zap.Error(err), + zap.String("multiaddr", ma.String()), + zap.Int("index", index), + ) + return nil, err } e.logger.Info( - "connected to data worker processes", + "connected to data worker process", + zap.String("multiaddr", ma.String()), + ) + + return protobufs.NewDataIPCServiceClient(conn), nil + +} + +func (e *DataClockConsensusEngine) createParallelDataWorkerClients() { + parallelism := len(e.config.Engine.DataWorkerMultiaddrs) + useList := true + if parallelism == 0 { + parallelism = e.config.Engine.DataWorkerCount + useList = false + } + + e.clientsMx.Lock() + e.clients = make([]protobufs.DataIPCServiceClient, parallelism) + e.clientsMx.Unlock() + + e.logger.Info( + "connecting to data worker processes", zap.Int("parallelism", parallelism), ) - return clients, nil + + wg := sync.WaitGroup{} + wg.Add(parallelism) + for i := 0; i < parallelism; i++ { + index := i + go func() { + defer wg.Done() + client, err := e.connectToClient(index, useList) + if err != nil { + e.clientsMx.Lock() + e.clients[index] = nil + e.clientsMx.Unlock() + e.logger.Error("failed to connect to data worker", zap.Error(err)) + return + } + e.clientsMx.Lock() + e.clients[index] = client + e.clientsMx.Unlock() + }() + } + wg.Wait() +} + +func (e *DataClockConsensusEngine) tryReconnectDataWorkerClients() { + // could reload worker list config here + parallelism := len(e.config.Engine.DataWorkerMultiaddrs) + useList := true + if parallelism == 0 { + parallelism = e.config.Engine.DataWorkerCount + useList = false + } + + wg := sync.WaitGroup{} + wg.Add(parallelism) + for i := 0; i < parallelism; i++ { + index := i + + go func() { + defer wg.Done() + if e.clients[index] != nil { + return + } + for j := 3; j >= 0; j-- { + client, err := e.connectToClient(index, useList) + if err != nil { + e.clientsMx.Lock() + e.clients[index] = nil + e.clientsMx.Unlock() + e.logger.Error("failed to connect to data worker", + zap.Error(err), + zap.Int("index", index), + ) + time.Sleep(50 * time.Millisecond) + continue + } + e.clientsMx.Lock() + e.logger.Info("reconnected to data worker", + zap.Int("index", index), + ) + e.clients[index] = client + e.clientsMx.Unlock() + break + } + }() + } + wg.Wait() } func (e *DataClockConsensusEngine) GetWorkerCount() uint32 { diff --git a/node/consensus/data/main_data_loop.go b/node/consensus/data/main_data_loop.go index 36a4722..d5832e5 100644 --- a/node/consensus/data/main_data_loop.go +++ b/node/consensus/data/main_data_loop.go @@ -2,7 +2,6 @@ package data import ( "bytes" - "sync" "time" "github.com/iden3/go-iden3-crypto/poseidon" @@ -234,6 +233,7 @@ func (e *DataClockConsensusEngine) processFrame( latestFrame *protobufs.ClockFrame, dataFrame *protobufs.ClockFrame, ) *protobufs.ClockFrame { + e.logger.Info( "current frame head", zap.Uint64("frame_number", dataFrame.FrameNumber), @@ -311,49 +311,7 @@ func (e *DataClockConsensusEngine) processFrame( e.clientReconnectTest++ if e.clientReconnectTest >= 10 { - wg := sync.WaitGroup{} - wg.Add(len(e.clients)) - for i, client := range e.clients { - i := i - client := client - go func() { - for j := 3; j >= 0; j-- { - var err error - if client == nil { - if len(e.config.Engine.DataWorkerMultiaddrs) != 0 { - e.logger.Error( - "client failed, reconnecting after 50ms", - zap.Uint32("client", uint32(i)), - ) - time.Sleep(50 * time.Millisecond) - client, err = e.createParallelDataClientsFromListAndIndex(uint32(i)) - if err != nil { - e.logger.Error("failed to reconnect", zap.Error(err)) - } - } else if len(e.config.Engine.DataWorkerMultiaddrs) == 0 { - e.logger.Error( - "client failed, reconnecting after 50ms", - zap.Uint32("client", uint32(i)), - ) - time.Sleep(50 * time.Millisecond) - client, err = - e.createParallelDataClientsFromBaseMultiaddrAndIndex(uint32(i)) - if err != nil { - e.logger.Error( - "failed to reconnect", - zap.Uint32("client", uint32(i)), - zap.Error(err), - ) - } - } - e.clients[i] = client - continue - } - } - wg.Done() - }() - } - wg.Wait() + e.tryReconnectDataWorkerClients() e.clientReconnectTest = 0 }