khd/khd.py
2024-09-21 22:46:43 +08:00

394 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socketio
import asyncio
import psutil
import platform
import os
from pathlib import Path
import subprocess
import json
import aiohttp
import logging
import sys
import time
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CURRENT_VERSION = "1.0.3" # 当前客户端版本
UPDATE_URL_ZJ = "https://update.uqdm.com" # 更新服务器地址
CHECK_INTERVAL = 3600 # 每小时检查一次更新
sio = socketio.AsyncClient()
SERVER_URL = 'http://ore.uqdm.com:5003'
TOKEN = "131417"
# 使用用户家目录
HOME_DIR = str(Path.home())
CLIENT_INFO_FILE = os.path.join(HOME_DIR, '.client_info.json')
def get_client_id():
client_id = os.environ.get('name')
if not client_id:
client_id = platform.node()
# 检查是否在Docker环境中运行
if os.path.exists('/.dockerenv'):
# 生成随机的2位数字字母组合
random_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=2))
client_id += f'-D{random_suffix}'
return client_id
CLIENT_ID = get_client_id()
UNIQUE_ID = None
GROUP = 'default'
heartbeat_time = 30
report_time = 10
program_status = "未运行"
program_process = None
PROGRAM_NAME = None
PROGRAM_PATH = None
COMMAND_PARAMS = None
AUTO_RESTART = False
DAEMON_ENABLED = False
DAEMON_PROCESS = None
def load_client_info():
global UNIQUE_ID, GROUP
try:
if os.path.exists(CLIENT_INFO_FILE):
with open(CLIENT_INFO_FILE, 'r') as f:
client_info = json.load(f)
UNIQUE_ID = client_info.get('unique_id')
GROUP = client_info.get('group', 'default')
else:
GROUP = 'default'
except Exception as e:
logger.error(f"Error loading client info: {e}")
GROUP = 'default'
def save_client_info():
try:
client_info = {"unique_id": UNIQUE_ID, "group": GROUP}
with open(CLIENT_INFO_FILE, 'w') as f:
json.dump(client_info, f)
logger.info(f"Client info saved - Unique ID: {UNIQUE_ID}, Group: {GROUP}")
except Exception as e:
logger.error(f"Error saving client info: {e}")
async def get_system_info():
try:
return {
'hostname': platform.node(),
'os': platform.platform(),
'cpu': psutil.cpu_percent(),
'memory': psutil.virtual_memory().percent,
'disk': psutil.disk_usage('/').percent,
'ip_address': await get_ip_address(),
'program_status': program_status
}
except Exception as e:
logger.error(f"Error getting system info: {e}")
return {}
async def get_ip_address():
try:
return subprocess.check_output(['hostname', '-I']).decode().split()[0]
except Exception as e:
logger.error(f"Error getting IP address: {e}")
return 'N/A'
@sio.event
async def connect():
logger.info(f'Connected to server. Client ID: {CLIENT_ID}')
await register_client()
@sio.on('update_client_info')
async def on_update_client_info(data):
global UNIQUE_ID, GROUP
UNIQUE_ID = data.get('unique_id')
GROUP = data.get('group', 'default')
save_client_info()
logger.info(f"Updated client info - Unique ID: {UNIQUE_ID}, Group: {GROUP}")
async def register_client():
try:
await sio.emit('register', {
'client_id': CLIENT_ID,
'system_info': await get_system_info(),
'unique_id': UNIQUE_ID
})
logger.info(f"Client registered: {CLIENT_ID}")
except Exception as e:
logger.error(f"Error registering client: {e}")
@sio.event
async def disconnect():
logger.info('Disconnected from server')
@sio.on('execute_command')
async def on_command(data):
global program_status, program_process
command = data['command']
params = data.get('params', {})
result = {'status': 'executed', 'output': '', 'error': ''}
logger.info(f"Received command: {command} with params: {params}")
try:
if command == 'manual':
manual_command = params.get('manualCommand')
if manual_command:
logger.info(f"Executing manual command: {manual_command}")
process = await asyncio.create_subprocess_shell(
manual_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result['output'] = stdout.decode()
result['error'] = stderr.decode()
result['status'] = 'success' if process.returncode == 0 else 'error'
else:
result['status'] = 'error'
result['error'] = "No manual command provided"
elif command == 'start':
if program_status == "未运行":
program_path = params.get('programPath', PROGRAM_PATH)
command_params = params.get('commandParams', COMMAND_PARAMS)
full_command = f"{program_path} {command_params}"
logger.info(f"Starting program: {full_command}")
program_process = await asyncio.create_subprocess_shell(
full_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
program_status = "运行中"
result['status'] = 'started'
result['output'] = "Program started successfully"
else:
result['status'] = 'already running'
result['output'] = "Program is already running"
elif command == 'stop':
if program_process:
logger.info("Stopping program")
program_process.terminate()
await program_process.wait()
program_process = None
program_status = "未运行"
result['status'] = 'stopped'
result['output'] = "Program stopped successfully"
else:
result['status'] = 'not running'
result['output'] = "Program is not running"
elif command == 'update':
logger.info("Initiating update process")
update_success = await self_update()
if update_success:
result['status'] = 'updated'
result['output'] = "Program updated successfully"
else:
result['status'] = 'update failed'
result['error'] = "Failed to update program"
elif command == 'restart':
logger.info("Restarting program")
if program_process:
program_process.terminate()
await program_process.wait()
program_path = params.get('programPath', PROGRAM_PATH)
command_params = params.get('commandParams', COMMAND_PARAMS)
full_command = f"{program_path} {command_params}"
program_process = await asyncio.create_subprocess_shell(
full_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
program_status = "运行中"
result['status'] = 'restarted'
result['output'] = "Program restarted successfully"
elif command == 'force_update':
logger.info("Initiating forced update process")
update_success = await force_update(params.get('update_url'))
if update_success:
result['status'] = 'updated'
result['output'] = "Program forcefully updated successfully"
else:
result['status'] = 'update failed'
result['error'] = "Failed to forcefully update program"
else:
result['status'] = 'unknown command'
result['error'] = f"Unknown command: {command}"
except Exception as e:
logger.error(f"Error executing command {command}: {e}")
result['status'] = 'error'
result['error'] = str(e)
result['program_status'] = program_status
logger.info(f"Sending command result: {result}")
await sio.emit('command_result', {
'unique_id': UNIQUE_ID,
'client_id': CLIENT_ID,
'command': command,
'result': result
})
@sio.on('update_settings')
async def on_update_settings(data):
global heartbeat_time, report_time
heartbeat_time = data.get('heartbeatTime', heartbeat_time)
report_time = data.get('reportTime', report_time)
logger.info(f"Updated settings: heartbeat_time={heartbeat_time}, report_time={report_time}")
async def send_heartbeat():
while True:
try:
await sio.emit('heartbeat', {
'unique_id': UNIQUE_ID,
'client_id': CLIENT_ID,
'system_info': await get_system_info()
})
logger.debug("Heartbeat sent")
except Exception as e:
logger.error(f"Error sending heartbeat: {e}")
await asyncio.sleep(heartbeat_time)
async def send_status():
while True:
try:
status_data = await get_system_info()
await sio.emit('status_update', {
'unique_id': UNIQUE_ID,
'client_id': CLIENT_ID,
'system_info': status_data
})
logger.debug("Status update sent")
except Exception as e:
logger.error(f"Error sending status update: {e}")
await asyncio.sleep(report_time)
async def check_for_updates():
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{UPDATE_URL_ZJ}/check_update/{CURRENT_VERSION}") as response:
if response.status == 200:
data = await response.json()
return data['update_available'], data.get('latest_version')
else:
logger.error(f"检查更新失败,状态码: {response.status}")
return False, None
except Exception as e:
logger.error(f"检查更新失败: {e}")
return False, None
async def download_update(version):
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{UPDATE_URL_ZJ}/download/{version}") as response:
if response.status == 200:
content = await response.read()
with open(f"{sys.argv[0]}.new", 'wb') as f:
f.write(content)
return True
else:
logger.error(f"下载更新失败,状态码: {response.status}")
return False
except Exception as e:
logger.error(f"下载更新失败: {e}")
return False
async def apply_update():
try:
if os.path.exists(f"{sys.argv[0]}.old"):
os.remove(f"{sys.argv[0]}.old")
os.rename(sys.argv[0], f"{sys.argv[0]}.old")
os.rename(f"{sys.argv[0]}.new", sys.argv[0])
logger.info("Update applied, restarting program...")
os.execl(sys.executable, sys.executable, *sys.argv)
except Exception as e:
logger.error(f"Failed to apply update: {e}")
if os.path.exists(f"{sys.argv[0]}.old"):
os.rename(f"{sys.argv[0]}.old", sys.argv[0])
raise
async def self_update():
update_available, latest_version = await check_for_updates()
if update_available:
logger.info(f"发现新版本 {latest_version},开始下载...")
if await download_update(latest_version):
logger.info("下载完成,准备应用更新...")
await apply_update()
return True
else:
logger.error("更新下载失败")
return False
else:
logger.info("当前已是最新版本")
return False
async def force_update(update_url):
try:
logger.info(f"Downloading forced update from: {update_url}")
async with aiohttp.ClientSession() as session:
async with session.get(update_url) as response:
if response.status == 200:
content = await response.read()
with open(f"{sys.argv[0]}.new", 'wb') as f:
f.write(content)
logger.info("Forced update downloaded successfully")
await apply_update()
return True
else:
logger.error(f"Forced update download failed, status code: {response.status}")
return False
except Exception as e:
logger.error(f"Forced update failed: {e}")
return False
async def check_update_periodically():
while True:
await asyncio.sleep(CHECK_INTERVAL)
logger.info("Checking for updates...")
await self_update()
async def connect_with_retry():
retry_count = 0
while True:
try:
await sio.connect(SERVER_URL, auth={'token': TOKEN})
logger.info("Successfully connected to server")
return
except Exception as e:
retry_count += 1
logger.error(f"Connection attempt {retry_count} failed: {e}")
wait_time = min(60, 5 * retry_count) # 逐步增加等待时间最多等待60秒
logger.info(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
async def main():
load_client_info() # 加载客户端信息
while True:
try:
await connect_with_retry()
await asyncio.gather(
send_heartbeat(),
send_status(),
check_update_periodically()
)
except Exception as e:
logger.error(f"An error occurred in main loop: {e}")
logger.info("Reconnecting...")
await asyncio.sleep(5)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Program terminated by user")
except Exception as e:
logger.critical(f"Unhandled exception: {e}")
raise