import socketio import asyncio import psutil import platform import os from pathlib import Path import subprocess import json import aiohttp import logging import sys import time # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) CURRENT_VERSION = "1.0.3" # 当前客户端版本 UPDATE_URL_ZJ = "https://update.uqdm.com" # 更新服务器地址 CHECK_INTERVAL = 3600 # 每小时检查一次更新 sio = socketio.AsyncClient() SERVER_URL = 'http://ore.uqdm.com:5003' TOKEN = "131417" # 使用用户家目录 HOME_DIR = str(Path.home()) CLIENT_INFO_FILE = os.path.join(HOME_DIR, '.client_info.json') def get_client_id(): client_id = os.environ.get('name') if not client_id: client_id = platform.node() # 检查是否在Docker环境中运行 if os.path.exists('/.dockerenv'): # 生成随机的2位数字字母组合 random_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=2)) client_id += f'-D{random_suffix}' return client_id CLIENT_ID = get_client_id() UNIQUE_ID = None GROUP = 'default' heartbeat_time = 30 report_time = 10 program_status = "未运行" program_process = None PROGRAM_NAME = None PROGRAM_PATH = None COMMAND_PARAMS = None AUTO_RESTART = False DAEMON_ENABLED = False DAEMON_PROCESS = None def load_client_info(): global UNIQUE_ID, GROUP try: if os.path.exists(CLIENT_INFO_FILE): with open(CLIENT_INFO_FILE, 'r') as f: client_info = json.load(f) UNIQUE_ID = client_info.get('unique_id') GROUP = client_info.get('group', 'default') else: GROUP = 'default' except Exception as e: logger.error(f"Error loading client info: {e}") GROUP = 'default' def save_client_info(): try: client_info = {"unique_id": UNIQUE_ID, "group": GROUP} with open(CLIENT_INFO_FILE, 'w') as f: json.dump(client_info, f) logger.info(f"Client info saved - Unique ID: {UNIQUE_ID}, Group: {GROUP}") except Exception as e: logger.error(f"Error saving client info: {e}") async def get_system_info(): try: return { 'hostname': platform.node(), 'os': platform.platform(), 'cpu': psutil.cpu_percent(), 'memory': psutil.virtual_memory().percent, 'disk': psutil.disk_usage('/').percent, 'ip_address': await get_ip_address(), 'program_status': program_status } except Exception as e: logger.error(f"Error getting system info: {e}") return {} async def get_ip_address(): try: return subprocess.check_output(['hostname', '-I']).decode().split()[0] except Exception as e: logger.error(f"Error getting IP address: {e}") return 'N/A' @sio.event async def connect(): logger.info(f'Connected to server. Client ID: {CLIENT_ID}') await register_client() @sio.on('update_client_info') async def on_update_client_info(data): global UNIQUE_ID, GROUP UNIQUE_ID = data.get('unique_id') GROUP = data.get('group', 'default') save_client_info() logger.info(f"Updated client info - Unique ID: {UNIQUE_ID}, Group: {GROUP}") async def register_client(): try: await sio.emit('register', { 'client_id': CLIENT_ID, 'system_info': await get_system_info(), 'unique_id': UNIQUE_ID }) logger.info(f"Client registered: {CLIENT_ID}") except Exception as e: logger.error(f"Error registering client: {e}") @sio.event async def disconnect(): logger.info('Disconnected from server') @sio.on('execute_command') async def on_command(data): global program_status, program_process command = data['command'] params = data.get('params', {}) result = {'status': 'executed', 'output': '', 'error': ''} logger.info(f"Received command: {command} with params: {params}") try: if command == 'manual': manual_command = params.get('manualCommand') if manual_command: logger.info(f"Executing manual command: {manual_command}") process = await asyncio.create_subprocess_shell( manual_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() result['output'] = stdout.decode() result['error'] = stderr.decode() result['status'] = 'success' if process.returncode == 0 else 'error' else: result['status'] = 'error' result['error'] = "No manual command provided" elif command == 'start': if program_status == "未运行": program_path = params.get('programPath', PROGRAM_PATH) command_params = params.get('commandParams', COMMAND_PARAMS) full_command = f"{program_path} {command_params}" logger.info(f"Starting program: {full_command}") program_process = await asyncio.create_subprocess_shell( full_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) program_status = "运行中" result['status'] = 'started' result['output'] = "Program started successfully" else: result['status'] = 'already running' result['output'] = "Program is already running" elif command == 'stop': if program_process: logger.info("Stopping program") program_process.terminate() await program_process.wait() program_process = None program_status = "未运行" result['status'] = 'stopped' result['output'] = "Program stopped successfully" else: result['status'] = 'not running' result['output'] = "Program is not running" elif command == 'update': logger.info("Initiating update process") update_success = await self_update() if update_success: result['status'] = 'updated' result['output'] = "Program updated successfully" else: result['status'] = 'update failed' result['error'] = "Failed to update program" elif command == 'restart': logger.info("Restarting program") if program_process: program_process.terminate() await program_process.wait() program_path = params.get('programPath', PROGRAM_PATH) command_params = params.get('commandParams', COMMAND_PARAMS) full_command = f"{program_path} {command_params}" program_process = await asyncio.create_subprocess_shell( full_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) program_status = "运行中" result['status'] = 'restarted' result['output'] = "Program restarted successfully" elif command == 'force_update': logger.info("Initiating forced update process") update_success = await force_update(params.get('update_url')) if update_success: result['status'] = 'updated' result['output'] = "Program forcefully updated successfully" else: result['status'] = 'update failed' result['error'] = "Failed to forcefully update program" else: result['status'] = 'unknown command' result['error'] = f"Unknown command: {command}" except Exception as e: logger.error(f"Error executing command {command}: {e}") result['status'] = 'error' result['error'] = str(e) result['program_status'] = program_status logger.info(f"Sending command result: {result}") await sio.emit('command_result', { 'unique_id': UNIQUE_ID, 'client_id': CLIENT_ID, 'command': command, 'result': result }) @sio.on('update_settings') async def on_update_settings(data): global heartbeat_time, report_time heartbeat_time = data.get('heartbeatTime', heartbeat_time) report_time = data.get('reportTime', report_time) logger.info(f"Updated settings: heartbeat_time={heartbeat_time}, report_time={report_time}") async def send_heartbeat(): while True: try: await sio.emit('heartbeat', { 'unique_id': UNIQUE_ID, 'client_id': CLIENT_ID, 'system_info': await get_system_info() }) logger.debug("Heartbeat sent") except Exception as e: logger.error(f"Error sending heartbeat: {e}") await asyncio.sleep(heartbeat_time) async def send_status(): while True: try: status_data = await get_system_info() await sio.emit('status_update', { 'unique_id': UNIQUE_ID, 'client_id': CLIENT_ID, 'system_info': status_data }) logger.debug("Status update sent") except Exception as e: logger.error(f"Error sending status update: {e}") await asyncio.sleep(report_time) async def check_for_updates(): try: async with aiohttp.ClientSession() as session: async with session.get(f"{UPDATE_URL_ZJ}/check_update/{CURRENT_VERSION}") as response: if response.status == 200: data = await response.json() return data['update_available'], data.get('latest_version') else: logger.error(f"检查更新失败,状态码: {response.status}") return False, None except Exception as e: logger.error(f"检查更新失败: {e}") return False, None async def download_update(version): try: async with aiohttp.ClientSession() as session: async with session.get(f"{UPDATE_URL_ZJ}/download/{version}") as response: if response.status == 200: content = await response.read() with open(f"{sys.argv[0]}.new", 'wb') as f: f.write(content) return True else: logger.error(f"下载更新失败,状态码: {response.status}") return False except Exception as e: logger.error(f"下载更新失败: {e}") return False async def apply_update(): try: if os.path.exists(f"{sys.argv[0]}.old"): os.remove(f"{sys.argv[0]}.old") os.rename(sys.argv[0], f"{sys.argv[0]}.old") os.rename(f"{sys.argv[0]}.new", sys.argv[0]) logger.info("Update applied, restarting program...") os.execl(sys.executable, sys.executable, *sys.argv) except Exception as e: logger.error(f"Failed to apply update: {e}") if os.path.exists(f"{sys.argv[0]}.old"): os.rename(f"{sys.argv[0]}.old", sys.argv[0]) raise async def self_update(): update_available, latest_version = await check_for_updates() if update_available: logger.info(f"发现新版本 {latest_version},开始下载...") if await download_update(latest_version): logger.info("下载完成,准备应用更新...") await apply_update() return True else: logger.error("更新下载失败") return False else: logger.info("当前已是最新版本") return False async def force_update(update_url): try: logger.info(f"Downloading forced update from: {update_url}") async with aiohttp.ClientSession() as session: async with session.get(update_url) as response: if response.status == 200: content = await response.read() with open(f"{sys.argv[0]}.new", 'wb') as f: f.write(content) logger.info("Forced update downloaded successfully") await apply_update() return True else: logger.error(f"Forced update download failed, status code: {response.status}") return False except Exception as e: logger.error(f"Forced update failed: {e}") return False async def check_update_periodically(): while True: await asyncio.sleep(CHECK_INTERVAL) logger.info("Checking for updates...") await self_update() async def connect_with_retry(): retry_count = 0 while True: try: await sio.connect(SERVER_URL, auth={'token': TOKEN}) logger.info("Successfully connected to server") return except Exception as e: retry_count += 1 logger.error(f"Connection attempt {retry_count} failed: {e}") wait_time = min(60, 5 * retry_count) # 逐步增加等待时间,最多等待60秒 logger.info(f"Retrying in {wait_time} seconds...") await asyncio.sleep(wait_time) async def main(): load_client_info() # 加载客户端信息 while True: try: await connect_with_retry() await asyncio.gather( send_heartbeat(), send_status(), check_update_periodically() ) except Exception as e: logger.error(f"An error occurred in main loop: {e}") logger.info("Reconnecting...") await asyncio.sleep(5) if __name__ == '__main__': try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Program terminated by user") except Exception as e: logger.critical(f"Unhandled exception: {e}") raise