add port checks to workers

This commit is contained in:
Tyler Sturos 2025-12-19 06:55:43 +00:00
parent a05d6b38b1
commit 3368f61b89
3 changed files with 297 additions and 2 deletions

View File

@ -2,10 +2,15 @@ package app
import (
"fmt"
"net"
"os"
"sync"
"time"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/config"
consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time"
"source.quilibrium.com/quilibrium/monorepo/node/datarpc"
"source.quilibrium.com/quilibrium/monorepo/types/crypto"
@ -15,6 +20,7 @@ import (
type DataWorkerNode struct {
logger *zap.Logger
config *config.Config
dataProofStore store.DataProofStore
clockStore store.ClockStore
coinStore store.TokenStore
@ -25,12 +31,14 @@ type DataWorkerNode struct {
frameProver crypto.FrameProver
globalTimeReel *consensustime.GlobalTimeReel
parentProcess int
rpcMultiaddr string
quit chan struct{}
stopOnce sync.Once
}
func newDataWorkerNode(
logger *zap.Logger,
config *config.Config,
dataProofStore store.DataProofStore,
clockStore store.ClockStore,
coinStore store.TokenStore,
@ -41,10 +49,12 @@ func newDataWorkerNode(
globalTimeReel *consensustime.GlobalTimeReel,
coreId uint,
parentProcess int,
rpcMultiaddr string,
) (*DataWorkerNode, error) {
logger = logger.With(zap.String("process", fmt.Sprintf("worker %d", coreId)))
return &DataWorkerNode{
logger: logger,
config: config,
dataProofStore: dataProofStore,
clockStore: clockStore,
coinStore: coinStore,
@ -55,6 +65,7 @@ func newDataWorkerNode(
frameProver: frameProver,
globalTimeReel: globalTimeReel,
parentProcess: parentProcess,
rpcMultiaddr: rpcMultiaddr,
quit: make(chan struct{}),
}, nil
}
@ -63,17 +74,40 @@ func (n *DataWorkerNode) Start(
done chan os.Signal,
quitCh chan struct{},
) error {
n.logger.Info(
"starting data worker node",
zap.Uint("core_id", n.coreId),
zap.String("rpc_multiaddr", n.rpcMultiaddr),
)
go func() {
n.logger.Info(
"starting IPC server",
zap.Uint("core_id", n.coreId),
)
err := n.ipcServer.Start()
if err != nil {
n.logger.Error(
"error while starting ipc server for core",
zap.Uint64("core", uint64(n.coreId)),
zap.Error(err),
)
n.Stop()
} else {
n.logger.Info(
"IPC server started successfully",
zap.Uint("core_id", n.coreId),
)
}
}()
// Start port health check in background
n.logger.Info(
"starting port health check monitor",
zap.Uint("core_id", n.coreId),
)
go n.monitorPortHealth()
n.logger.Info("data worker node started", zap.Uint("core_id", n.coreId))
select {
@ -149,3 +183,250 @@ func (n *DataWorkerNode) GetFrameProver() crypto.FrameProver {
func (n *DataWorkerNode) GetIPCServer() *datarpc.DataWorkerIPCServer {
return n.ipcServer
}
// extractPortFromMultiaddr extracts the TCP port from a multiaddr string
func extractPortFromMultiaddr(multiaddrStr string) (string, error) {
ma, err := multiaddr.NewMultiaddr(multiaddrStr)
if err != nil {
return "", errors.Wrap(err, "failed to parse multiaddr")
}
port, err := ma.ValueForProtocol(multiaddr.P_TCP)
if err != nil {
// Try UDP as fallback
port, err = ma.ValueForProtocol(multiaddr.P_UDP)
if err != nil {
return "", errors.Wrap(err, "failed to extract port from multiaddr")
}
}
return port, nil
}
// isPortListening checks if a port is currently listening (in use)
func isPortListening(port string) bool {
ln, err := net.Listen("tcp", ":"+port)
if err != nil {
// Port is in use (listening) or in TIME_WAIT state
return true
}
if err := ln.Close(); err != nil {
// Log but don't fail - port was available
}
// Port is available (not listening)
return false
}
// waitForPortAvailable waits for a port to become available, checking periodically
// Returns true if port becomes available, false if timeout is reached
func waitForPortAvailable(port string, timeout time.Duration, logger *zap.Logger) bool {
deadline := time.Now().Add(timeout)
checkInterval := 500 * time.Millisecond
logger.Info(
"waiting for port to become available",
zap.String("port", port),
zap.Duration("timeout", timeout),
)
for time.Now().Before(deadline) {
if !isPortListening(port) {
logger.Info(
"port is now available",
zap.String("port", port),
)
return true
}
time.Sleep(checkInterval)
}
logger.Warn(
"port did not become available within timeout",
zap.String("port", port),
zap.Duration("timeout", timeout),
)
return false
}
// WaitForWorkerPortsAvailable waits for both P2P and stream ports to become available
// This helps avoid race conditions when processes restart quickly
// Returns true if all ports are available, false otherwise
func WaitForWorkerPortsAvailable(
logger *zap.Logger,
config *config.Config,
coreId uint,
rpcMultiaddr string,
timeout time.Duration,
) bool {
var wg sync.WaitGroup
streamPortAvailable := make(chan bool, 1)
p2pPortAvailable := make(chan bool, 1)
// Check stream port in parallel
streamPort, err := extractPortFromMultiaddr(rpcMultiaddr)
if err != nil {
logger.Warn(
"failed to extract stream port, skipping stream port availability check",
zap.String("multiaddr", rpcMultiaddr),
zap.Uint("core_id", coreId),
zap.Error(err),
)
streamPortAvailable <- true // Skip check, assume available
} else {
wg.Add(1)
go func() {
defer wg.Done()
available := waitForPortAvailable(streamPort, timeout, logger)
streamPortAvailable <- available
}()
}
// Check P2P port in parallel
if config.Engine.DataWorkerBaseP2PPort > 0 {
p2pPort := int(config.Engine.DataWorkerBaseP2PPort) + int(coreId) - 1
p2pPortStr := fmt.Sprintf("%d", p2pPort)
wg.Add(1)
go func() {
defer wg.Done()
available := waitForPortAvailable(p2pPortStr, timeout, logger)
p2pPortAvailable <- available
}()
} else {
p2pPortAvailable <- true // Skip check, assume available
}
// Wait for both checks to complete
wg.Wait()
// Read results
streamOk := <-streamPortAvailable
p2pOk := <-p2pPortAvailable
return streamOk && p2pOk
}
// monitorPortHealth checks if both the stream port and P2P listen port are listening after startup
// The stream port is calculated as: base_stream_port + core_index - 1
// The P2P listen port is calculated as: base_p2p_port + core_index - 1
func (n *DataWorkerNode) monitorPortHealth() {
n.logger.Info(
"checking port health",
zap.Uint("core_id", n.coreId),
zap.String("rpc_multiaddr", n.rpcMultiaddr),
)
var wg sync.WaitGroup
streamResult := make(chan bool, 1)
p2pResult := make(chan bool, 1)
// Extract stream port from multiaddr
streamPort, err := extractPortFromMultiaddr(n.rpcMultiaddr)
if err != nil {
n.logger.Error(
"failed to extract stream port from multiaddr, skipping stream port health check",
zap.String("multiaddr", n.rpcMultiaddr),
zap.Uint("core_id", n.coreId),
zap.Error(err),
)
streamResult <- false // Mark as failed since we couldn't check
} else {
// Check stream port in parallel
wg.Add(1)
go func() {
defer wg.Done()
n.logger.Debug(
"attempting to bind to the stream port to check if it's listening",
zap.String("port", streamPort),
zap.Uint("core_id", n.coreId),
)
isStreamListening := isPortListening(streamPort)
n.logger.Debug(
"stream port check completed",
zap.String("port", streamPort),
zap.Bool("is_listening", isStreamListening),
zap.Uint("core_id", n.coreId),
)
if !isStreamListening {
n.logger.Error(
"stream port is not listening",
zap.String("port", streamPort),
zap.Uint("core_id", n.coreId),
)
streamResult <- false
} else {
n.logger.Info(
"stream port is listening successfully",
zap.String("port", streamPort),
zap.Uint("core_id", n.coreId),
)
streamResult <- true
}
}()
}
// Check P2P listen port in parallel
// Calculate P2P port: base_p2p_port + core_index - 1
if n.config.Engine.DataWorkerBaseP2PPort == 0 {
n.logger.Warn(
"DataWorkerBaseP2PPort is not set, skipping P2P port health check",
zap.Uint("core_id", n.coreId),
)
p2pResult <- true // Skip check, assume OK
} else {
p2pPort := int(n.config.Engine.DataWorkerBaseP2PPort) + int(n.coreId) - 1
p2pPortStr := fmt.Sprintf("%d", p2pPort)
wg.Add(1)
go func() {
defer wg.Done()
n.logger.Debug(
"attempting to bind to P2P port to check if it's listening",
zap.String("port", p2pPortStr),
zap.Uint("core_id", n.coreId),
)
isP2PListening := isPortListening(p2pPortStr)
n.logger.Debug(
"P2P port check completed",
zap.String("port", p2pPortStr),
zap.Bool("is_listening", isP2PListening),
zap.Uint("core_id", n.coreId),
)
if !isP2PListening {
n.logger.Error(
"P2P listen port is not listening",
zap.String("port", p2pPortStr),
zap.Uint("core_id", n.coreId),
)
p2pResult <- false
} else {
n.logger.Info(
"P2P listen port is listening successfully",
zap.String("port", p2pPortStr),
zap.Uint("core_id", n.coreId),
)
p2pResult <- true
}
}()
}
// Wait for both checks to complete
wg.Wait()
// Read results
streamOk := <-streamResult
p2pOk := <-p2pResult
// Ports are listening successfully, reset attempt counter
if streamOk && p2pOk {
n.logger.Info(
"all ports are listening successfully, resetting restart attempts",
zap.Uint("core_id", n.coreId),
)
}
n.logger.Info(
"port health check completed successfully",
zap.Uint("core_id", n.coreId),
)
}

View File

@ -125,7 +125,7 @@ func NewDataWorkerNodeWithProxyPubsub(logger *zap.Logger, config2 *config.Config
if err != nil {
return nil, err
}
dataWorkerNode, err := newDataWorkerNode(logger, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, pebbleDB, frameProver, dataWorkerIPCServer, globalTimeReel, coreId, parentProcess)
dataWorkerNode, err := newDataWorkerNode(logger, config2, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, pebbleDB, frameProver, dataWorkerIPCServer, globalTimeReel, coreId, parentProcess, rpcMultiaddr)
if err != nil {
return nil, err
}
@ -182,7 +182,7 @@ func NewDataWorkerNodeWithoutProxyPubsub(logger *zap.Logger, config2 *config.Con
if err != nil {
return nil, err
}
dataWorkerNode, err := newDataWorkerNode(logger, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, pebbleDB, frameProver, dataWorkerIPCServer, globalTimeReel, coreId, parentProcess)
dataWorkerNode, err := newDataWorkerNode(logger, config2, pebbleDataProofStore, pebbleClockStore, pebbleTokenStore, fileKeyManager, pebbleDB, frameProver, dataWorkerIPCServer, globalTimeReel, coreId, parentProcess, rpcMultiaddr)
if err != nil {
return nil, err
}

View File

@ -528,6 +528,20 @@ func main() {
rpcMultiaddr = strings.Replace(rpcMultiaddr, "/quic-v1", "", 1)
rpcMultiaddr = strings.Replace(rpcMultiaddr, "udp", "tcp", 1)
if !app.WaitForWorkerPortsAvailable(
logger,
nodeConfig,
uint(*core),
rpcMultiaddr,
30*time.Second, // Wait up to 30 seconds for ports to become available
) {
logger.Error(
"ports not available, exiting - could not reserve necessary ports",
zap.Uint("core_id", uint(*core)),
)
os.Exit(1)
}
dataWorkerNode, err := app.NewDataWorkerNode(
logger,
nodeConfig,