From 6c33b748b7ec1c31c9c17fa0e434bf03c5a0abff Mon Sep 17 00:00:00 2001 From: coolsd Date: Sun, 22 Sep 2024 11:20:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20khd04.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 04版本 --- khd04.py | 359 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 359 insertions(+) create mode 100644 khd04.py diff --git a/khd04.py b/khd04.py new file mode 100644 index 0000000..3eccf28 --- /dev/null +++ b/khd04.py @@ -0,0 +1,359 @@ +import socketio +import asyncio +import psutil +import platform +import os +import subprocess +import json +import aiohttp +import logging +import sys +import time +import random +import string + +# 设置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +CURRENT_VERSION = "1.0.4" # 当前客户端版本 +UPDATE_URL_ZJ = "https://update.uqdm.com" # 更新服务器地址 +CHECK_INTERVAL = 3600 # 每小时检查一次更新 + +sio = socketio.AsyncClient() +SERVER_URL = 'http://ore.uqdm.com:5003' +TOKEN = "131417" + +def get_client_id(): + client_id = os.environ.get('name') + if not client_id: + client_id = platform.node() + + # 检查是否在Docker环境中运行 + if os.path.exists('/.dockerenv'): + # 生成随机的2位数字字母组合 + random_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=2)) + client_id += f'-D{random_suffix}' + + return client_id + +CLIENT_ID = get_client_id() + +heartbeat_time = 30 +report_time = 10 +program_status = "未运行" +program_process = None + +PROGRAM_NAME = None +PROGRAM_PATH = None +COMMAND_PARAMS = None +AUTO_RESTART = False +DAEMON_ENABLED = False +DAEMON_PROCESS = None + +async def get_system_info(): + try: + return { + 'hostname': platform.node(), + 'os': platform.platform(), + 'cpu': psutil.cpu_percent(), + 'memory': psutil.virtual_memory().percent, + 'disk': psutil.disk_usage('/').percent, + 'ip_address': await get_ip_address(), + 'program_status': program_status + } + except Exception as e: + logger.error(f"Error getting system info: {e}") + return {} + +async def get_ip_address(): + try: + return subprocess.check_output(['hostname', '-I']).decode().split()[0] + except Exception as e: + logger.error(f"Error getting IP address: {e}") + return 'N/A' + +@sio.event +async def connect(): + logger.info(f'Connected to server. Client ID: {CLIENT_ID}') + await register_client() + +async def register_client(): + try: + await sio.emit('register', { + 'client_id': CLIENT_ID, + 'system_info': await get_system_info() + }) + logger.info(f"Client registered: {CLIENT_ID}") + except Exception as e: + logger.error(f"Error registering client: {e}") + +@sio.event +async def disconnect(): + logger.info('Disconnected from server') + +@sio.on('update_client_id') +async def on_update_client_id(data): + global CLIENT_ID + new_id = data['new_id'] + logger.info(f"Received new client ID from server: {new_id}") + CLIENT_ID = new_id + # 可能需要更新本地存储的 ID 或进行其他相关操作 + +@sio.on('execute_command') +async def on_command(data): + global program_status, program_process + command = data['command'] + params = data.get('params', {}) + result = {'status': 'executed', 'output': '', 'error': ''} + + logger.info(f"Received command: {command} with params: {params}") + + try: + if command == 'manual': + manual_command = params.get('manualCommand') + if manual_command: + logger.info(f"Executing manual command: {manual_command}") + process = await asyncio.create_subprocess_shell( + manual_command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + result['output'] = stdout.decode() + result['error'] = stderr.decode() + result['status'] = 'success' if process.returncode == 0 else 'error' + else: + result['status'] = 'error' + result['error'] = "No manual command provided" + elif command == 'start': + if program_status == "未运行": + program_path = params.get('programPath', PROGRAM_PATH) + command_params = params.get('commandParams', COMMAND_PARAMS) + full_command = f"{program_path} {command_params}" + logger.info(f"Starting program: {full_command}") + program_process = await asyncio.create_subprocess_shell( + full_command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + program_status = "运行中" + result['status'] = 'started' + result['output'] = "Program started successfully" + else: + result['status'] = 'already running' + result['output'] = "Program is already running" + elif command == 'stop': + if program_process: + logger.info("Stopping program") + program_process.terminate() + await program_process.wait() + program_process = None + program_status = "未运行" + result['status'] = 'stopped' + result['output'] = "Program stopped successfully" + else: + result['status'] = 'not running' + result['output'] = "Program is not running" + elif command == 'update': + logger.info("Initiating update process") + update_success = await self_update() + if update_success: + result['status'] = 'updated' + result['output'] = "Program updated successfully" + else: + result['status'] = 'update failed' + result['error'] = "Failed to update program" + elif command == 'restart': + logger.info("Restarting program") + if program_process: + program_process.terminate() + await program_process.wait() + program_path = params.get('programPath', PROGRAM_PATH) + command_params = params.get('commandParams', COMMAND_PARAMS) + full_command = f"{program_path} {command_params}" + program_process = await asyncio.create_subprocess_shell( + full_command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + program_status = "运行中" + result['status'] = 'restarted' + result['output'] = "Program restarted successfully" + elif command == 'force_update': + logger.info("Initiating forced update process") + update_success = await force_update(params.get('update_url')) + if update_success: + result['status'] = 'updated' + result['output'] = "Program forcefully updated successfully" + else: + result['status'] = 'update failed' + result['error'] = "Failed to forcefully update program" + else: + result['status'] = 'unknown command' + result['error'] = f"Unknown command: {command}" + except Exception as e: + logger.error(f"Error executing command {command}: {e}") + result['status'] = 'error' + result['error'] = str(e) + + result['program_status'] = program_status + logger.info(f"Sending command result: {result}") + await sio.emit('command_result', { + 'client_id': CLIENT_ID, + 'command': command, + 'result': result + }) + +@sio.on('update_settings') +async def on_update_settings(data): + global heartbeat_time, report_time + heartbeat_time = data.get('heartbeatTime', heartbeat_time) + report_time = data.get('reportTime', report_time) + logger.info(f"Updated settings: heartbeat_time={heartbeat_time}, report_time={report_time}") + +async def send_heartbeat(): + while True: + try: + await sio.emit('heartbeat', { + 'client_id': CLIENT_ID, + 'system_info': await get_system_info() + }) + logger.debug("Heartbeat sent") + except Exception as e: + logger.error(f"Error sending heartbeat: {e}") + await asyncio.sleep(heartbeat_time) + +async def send_status(): + while True: + try: + status_data = await get_system_info() + await sio.emit('status_update', { + 'client_id': CLIENT_ID, + 'system_info': status_data + }) + logger.debug("Status update sent") + except Exception as e: + logger.error(f"Error sending status update: {e}") + await asyncio.sleep(report_time) + +async def check_for_updates(): + try: + async with aiohttp.ClientSession() as session: + async with session.get(f"{UPDATE_URL_ZJ}/check_update/{CURRENT_VERSION}") as response: + if response.status == 200: + data = await response.json() + return data['update_available'], data.get('latest_version') + else: + logger.error(f"检查更新失败,状态码: {response.status}") + return False, None + except Exception as e: + logger.error(f"检查更新失败: {e}") + return False, None + +async def download_update(version): + try: + async with aiohttp.ClientSession() as session: + async with session.get(f"{UPDATE_URL_ZJ}/download/{version}") as response: + if response.status == 200: + content = await response.read() + with open(f"{sys.argv[0]}.new", 'wb') as f: + f.write(content) + return True + else: + logger.error(f"下载更新失败,状态码: {response.status}") + return False + except Exception as e: + logger.error(f"下载更新失败: {e}") + return False + +async def apply_update(): + try: + if os.path.exists(f"{sys.argv[0]}.old"): + os.remove(f"{sys.argv[0]}.old") + os.rename(sys.argv[0], f"{sys.argv[0]}.old") + os.rename(f"{sys.argv[0]}.new", sys.argv[0]) + logger.info("Update applied, restarting program...") + os.execl(sys.executable, sys.executable, *sys.argv) + except Exception as e: + logger.error(f"Failed to apply update: {e}") + if os.path.exists(f"{sys.argv[0]}.old"): + os.rename(f"{sys.argv[0]}.old", sys.argv[0]) + raise + +async def self_update(): + update_available, latest_version = await check_for_updates() + if update_available: + logger.info(f"发现新版本 {latest_version},开始下载...") + if await download_update(latest_version): + logger.info("下载完成,准备应用更新...") + await apply_update() + return True + else: + logger.error("更新下载失败") + return False + else: + logger.info("当前已是最新版本") + return False + +async def force_update(update_url): + try: + logger.info(f"Downloading forced update from: {update_url}") + async with aiohttp.ClientSession() as session: + async with session.get(update_url) as response: + if response.status == 200: + content = await response.read() + with open(f"{sys.argv[0]}.new", 'wb') as f: + f.write(content) + logger.info("Forced update downloaded successfully") + await apply_update() + return True + else: + logger.error(f"Forced update download failed, status code: {response.status}") + return False + except Exception as e: + logger.error(f"Forced update failed: {e}") + return False + +async def check_update_periodically(): + while True: + await asyncio.sleep(CHECK_INTERVAL) + logger.info("Checking for updates...") + await self_update() + +async def connect_with_retry(): + retry_count = 0 + while True: + try: + await sio.connect(SERVER_URL, auth={'token': TOKEN}) + logger.info("Successfully connected to server") + return + except Exception as e: + retry_count += 1 + logger.error(f"Connection attempt {retry_count} failed: {e}") + wait_time = min(60, 5 * retry_count) # 逐步增加等待时间,最多等待60秒 + logger.info(f"Retrying in {wait_time} seconds...") + await asyncio.sleep(wait_time) + +async def main(): + while True: + try: + await connect_with_retry() + await asyncio.gather( + send_heartbeat(), + send_status(), + check_update_periodically() + ) + except Exception as e: + logger.error(f"An error occurred in main loop: {e}") + logger.info("Reconnecting...") + await asyncio.sleep(5) + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("Program terminated by user") + except Exception as e: + logger.critical(f"Unhandled exception: {e}") + raise