diff --git a/node/app/data_worker_node.go b/node/app/data_worker_node.go index 1bf89e4..609ae43 100644 --- a/node/app/data_worker_node.go +++ b/node/app/data_worker_node.go @@ -81,10 +81,6 @@ func (n *DataWorkerNode) Start( ) go func() { - n.logger.Info( - "starting IPC server", - zap.Uint("core_id", n.coreId), - ) err := n.ipcServer.Start() if err != nil { n.logger.Error( @@ -308,6 +304,7 @@ func WaitForWorkerPortsAvailable( // monitorPortHealth checks if both the stream port and P2P listen port are listening after startup // The stream port is calculated as: base_stream_port + core_index - 1 // The P2P listen port is calculated as: base_p2p_port + core_index - 1 +// The stream port check waits for the IPC server to be ready before checking func (n *DataWorkerNode) monitorPortHealth() { n.logger.Info( "checking port health", @@ -330,12 +327,19 @@ func (n *DataWorkerNode) monitorPortHealth() { ) streamResult <- false // Mark as failed since we couldn't check } else { - // Check stream port in parallel + // Wait for IPC server to be ready before checking stream port wg.Add(1) go func() { defer wg.Done() + // Wait for IPC server to start listening n.logger.Debug( - "attempting to bind to the stream port to check if it's listening", + "waiting for IPC server to be ready before checking stream port", + zap.String("port", streamPort), + zap.Uint("core_id", n.coreId), + ) + <-n.ipcServer.Ready() + n.logger.Debug( + "IPC server is ready, checking stream port", zap.String("port", streamPort), zap.Uint("core_id", n.coreId), ) @@ -348,8 +352,8 @@ func (n *DataWorkerNode) monitorPortHealth() { ) if !isStreamListening { - n.logger.Error( - "stream port is not listening", + n.logger.Warn( + "stream port is not yet listening, may not be ready yet", zap.String("port", streamPort), zap.Uint("core_id", n.coreId), ) @@ -395,7 +399,7 @@ func (n *DataWorkerNode) monitorPortHealth() { if !isP2PListening { n.logger.Error( - "P2P listen port is not listening", + "P2P listen port is not yet listening, may not be ready yet", zap.String("port", p2pPortStr), zap.Uint("core_id", n.coreId), ) @@ -421,12 +425,12 @@ func (n *DataWorkerNode) monitorPortHealth() { // Ports are listening successfully, reset attempt counter if streamOk && p2pOk { n.logger.Info( - "all ports are listening successfully, resetting restart attempts", + "all ports are listening successfully", zap.Uint("core_id", n.coreId), ) } n.logger.Info( - "port health check completed successfully", + "port health check completed", zap.Uint("core_id", n.coreId), ) } diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index edf365d..bc0c020 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -3,6 +3,7 @@ package datarpc import ( "context" "encoding/hex" + "sync" "time" pcrypto "github.com/libp2p/go-libp2p/core/crypto" @@ -48,6 +49,8 @@ type DataWorkerIPCServer struct { quit chan struct{} peerInfoCtx lifecycle.SignalerContext peerInfoCancel context.CancelFunc + ready chan struct{} + readyOnce sync.Once } func NewDataWorkerIPCServer( @@ -102,10 +105,12 @@ func NewDataWorkerIPCServer( frameProver: frameProver, peerInfoManager: peerInfoManager, quit: make(chan struct{}), + ready: make(chan struct{}), }, nil } func (r *DataWorkerIPCServer) Start() error { + r.logger.Info("starting DataWorkerIPCServer", zap.Uint32("core_id", r.coreId)) peerInfoCtx, peerInfoCancel, _ := lifecycle.WithSignallerAndCancel( context.Background(), ) @@ -118,15 +123,19 @@ func (r *DataWorkerIPCServer) Start() error { ) select { case <-peerInfoReady: + r.logger.Info("peer info manager started successfully", zap.Uint32("core_id", r.coreId)) case <-time.After(5 * time.Second): - r.logger.Warn("peer info manager did not start before timeout") + r.logger.Warn("peer info manager did not start before timeout", zap.Uint32("core_id", r.coreId)) } r.peerInfoCtx = peerInfoCtx r.peerInfoCancel = peerInfoCancel + r.logger.Info("respawning server", zap.Uint32("core_id", r.coreId)) r.RespawnServer(nil) + r.logger.Info("data worker ipc server started", zap.Uint32("core_id", r.coreId)) <-r.quit + r.logger.Info("data worker ipc server quit signal received, stopping", zap.Uint32("core_id", r.coreId)) return nil } @@ -218,6 +227,7 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { return errors.Wrap(err, "respawn server") } + r.logger.Info("attempting to listen on address", zap.String("address", r.listenAddrGRPC)) lis, err := mn.Listen(mg) if err != nil { return errors.Wrap(err, "respawn server") @@ -228,6 +238,10 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { zap.String("address", r.listenAddrGRPC), zap.String("resolved", lis.Addr().String()), ) + // Signal that the server is ready (listening) + r.readyOnce.Do(func() { + close(r.ready) + }) if len(filter) != 0 { globalTimeReel, err := r.appConsensusEngineFactory.CreateGlobalTimeReel() if err != nil { @@ -257,6 +271,11 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error { return nil } +// Ready returns a channel that will be closed when the server has started listening +func (r *DataWorkerIPCServer) Ready() <-chan struct{} { + return r.ready +} + // CreateJoinProof implements protobufs.DataIPCServiceServer. func (r *DataWorkerIPCServer) CreateJoinProof( ctx context.Context,