add better logs for listening checks

This commit is contained in:
Tyler Sturos 2025-12-19 07:58:25 +00:00
parent 3368f61b89
commit 10e22578c3
2 changed files with 35 additions and 12 deletions

View File

@ -81,10 +81,6 @@ func (n *DataWorkerNode) Start(
) )
go func() { go func() {
n.logger.Info(
"starting IPC server",
zap.Uint("core_id", n.coreId),
)
err := n.ipcServer.Start() err := n.ipcServer.Start()
if err != nil { if err != nil {
n.logger.Error( n.logger.Error(
@ -308,6 +304,7 @@ func WaitForWorkerPortsAvailable(
// monitorPortHealth checks if both the stream port and P2P listen port are listening after startup // monitorPortHealth checks if both the stream port and P2P listen port are listening after startup
// The stream port is calculated as: base_stream_port + core_index - 1 // The stream port is calculated as: base_stream_port + core_index - 1
// The P2P listen port is calculated as: base_p2p_port + core_index - 1 // The P2P listen port is calculated as: base_p2p_port + core_index - 1
// The stream port check waits for the IPC server to be ready before checking
func (n *DataWorkerNode) monitorPortHealth() { func (n *DataWorkerNode) monitorPortHealth() {
n.logger.Info( n.logger.Info(
"checking port health", "checking port health",
@ -330,12 +327,19 @@ func (n *DataWorkerNode) monitorPortHealth() {
) )
streamResult <- false // Mark as failed since we couldn't check streamResult <- false // Mark as failed since we couldn't check
} else { } else {
// Check stream port in parallel // Wait for IPC server to be ready before checking stream port
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
// Wait for IPC server to start listening
n.logger.Debug( n.logger.Debug(
"attempting to bind to the stream port to check if it's listening", "waiting for IPC server to be ready before checking stream port",
zap.String("port", streamPort),
zap.Uint("core_id", n.coreId),
)
<-n.ipcServer.Ready()
n.logger.Debug(
"IPC server is ready, checking stream port",
zap.String("port", streamPort), zap.String("port", streamPort),
zap.Uint("core_id", n.coreId), zap.Uint("core_id", n.coreId),
) )
@ -348,8 +352,8 @@ func (n *DataWorkerNode) monitorPortHealth() {
) )
if !isStreamListening { if !isStreamListening {
n.logger.Error( n.logger.Warn(
"stream port is not listening", "stream port is not yet listening, may not be ready yet",
zap.String("port", streamPort), zap.String("port", streamPort),
zap.Uint("core_id", n.coreId), zap.Uint("core_id", n.coreId),
) )
@ -395,7 +399,7 @@ func (n *DataWorkerNode) monitorPortHealth() {
if !isP2PListening { if !isP2PListening {
n.logger.Error( n.logger.Error(
"P2P listen port is not listening", "P2P listen port is not yet listening, may not be ready yet",
zap.String("port", p2pPortStr), zap.String("port", p2pPortStr),
zap.Uint("core_id", n.coreId), zap.Uint("core_id", n.coreId),
) )
@ -421,12 +425,12 @@ func (n *DataWorkerNode) monitorPortHealth() {
// Ports are listening successfully, reset attempt counter // Ports are listening successfully, reset attempt counter
if streamOk && p2pOk { if streamOk && p2pOk {
n.logger.Info( n.logger.Info(
"all ports are listening successfully, resetting restart attempts", "all ports are listening successfully",
zap.Uint("core_id", n.coreId), zap.Uint("core_id", n.coreId),
) )
} }
n.logger.Info( n.logger.Info(
"port health check completed successfully", "port health check completed",
zap.Uint("core_id", n.coreId), zap.Uint("core_id", n.coreId),
) )
} }

View File

@ -3,6 +3,7 @@ package datarpc
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"sync"
"time" "time"
pcrypto "github.com/libp2p/go-libp2p/core/crypto" pcrypto "github.com/libp2p/go-libp2p/core/crypto"
@ -48,6 +49,8 @@ type DataWorkerIPCServer struct {
quit chan struct{} quit chan struct{}
peerInfoCtx lifecycle.SignalerContext peerInfoCtx lifecycle.SignalerContext
peerInfoCancel context.CancelFunc peerInfoCancel context.CancelFunc
ready chan struct{}
readyOnce sync.Once
} }
func NewDataWorkerIPCServer( func NewDataWorkerIPCServer(
@ -102,10 +105,12 @@ func NewDataWorkerIPCServer(
frameProver: frameProver, frameProver: frameProver,
peerInfoManager: peerInfoManager, peerInfoManager: peerInfoManager,
quit: make(chan struct{}), quit: make(chan struct{}),
ready: make(chan struct{}),
}, nil }, nil
} }
func (r *DataWorkerIPCServer) Start() error { func (r *DataWorkerIPCServer) Start() error {
r.logger.Info("starting DataWorkerIPCServer", zap.Uint32("core_id", r.coreId))
peerInfoCtx, peerInfoCancel, _ := lifecycle.WithSignallerAndCancel( peerInfoCtx, peerInfoCancel, _ := lifecycle.WithSignallerAndCancel(
context.Background(), context.Background(),
) )
@ -118,15 +123,19 @@ func (r *DataWorkerIPCServer) Start() error {
) )
select { select {
case <-peerInfoReady: case <-peerInfoReady:
r.logger.Info("peer info manager started successfully", zap.Uint32("core_id", r.coreId))
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
r.logger.Warn("peer info manager did not start before timeout") r.logger.Warn("peer info manager did not start before timeout", zap.Uint32("core_id", r.coreId))
} }
r.peerInfoCtx = peerInfoCtx r.peerInfoCtx = peerInfoCtx
r.peerInfoCancel = peerInfoCancel r.peerInfoCancel = peerInfoCancel
r.logger.Info("respawning server", zap.Uint32("core_id", r.coreId))
r.RespawnServer(nil) r.RespawnServer(nil)
r.logger.Info("data worker ipc server started", zap.Uint32("core_id", r.coreId))
<-r.quit <-r.quit
r.logger.Info("data worker ipc server quit signal received, stopping", zap.Uint32("core_id", r.coreId))
return nil return nil
} }
@ -218,6 +227,7 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
return errors.Wrap(err, "respawn server") return errors.Wrap(err, "respawn server")
} }
r.logger.Info("attempting to listen on address", zap.String("address", r.listenAddrGRPC))
lis, err := mn.Listen(mg) lis, err := mn.Listen(mg)
if err != nil { if err != nil {
return errors.Wrap(err, "respawn server") return errors.Wrap(err, "respawn server")
@ -228,6 +238,10 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
zap.String("address", r.listenAddrGRPC), zap.String("address", r.listenAddrGRPC),
zap.String("resolved", lis.Addr().String()), zap.String("resolved", lis.Addr().String()),
) )
// Signal that the server is ready (listening)
r.readyOnce.Do(func() {
close(r.ready)
})
if len(filter) != 0 { if len(filter) != 0 {
globalTimeReel, err := r.appConsensusEngineFactory.CreateGlobalTimeReel() globalTimeReel, err := r.appConsensusEngineFactory.CreateGlobalTimeReel()
if err != nil { if err != nil {
@ -257,6 +271,11 @@ func (r *DataWorkerIPCServer) RespawnServer(filter []byte) error {
return nil return nil
} }
// Ready returns a channel that will be closed when the server has started listening
func (r *DataWorkerIPCServer) Ready() <-chan struct{} {
return r.ready
}
// CreateJoinProof implements protobufs.DataIPCServiceServer. // CreateJoinProof implements protobufs.DataIPCServiceServer.
func (r *DataWorkerIPCServer) CreateJoinProof( func (r *DataWorkerIPCServer) CreateJoinProof(
ctx context.Context, ctx context.Context,