添加 khd04.py

04版本
This commit is contained in:
coolsd 2024-09-22 11:20:07 +08:00
parent cd8e030b66
commit 6c33b748b7

359
khd04.py Normal file
View File

@ -0,0 +1,359 @@
import socketio
import asyncio
import psutil
import platform
import os
import subprocess
import json
import aiohttp
import logging
import sys
import time
import random
import string
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CURRENT_VERSION = "1.0.4" # 当前客户端版本
UPDATE_URL_ZJ = "https://update.uqdm.com" # 更新服务器地址
CHECK_INTERVAL = 3600 # 每小时检查一次更新
sio = socketio.AsyncClient()
SERVER_URL = 'http://ore.uqdm.com:5003'
TOKEN = "131417"
def get_client_id():
client_id = os.environ.get('name')
if not client_id:
client_id = platform.node()
# 检查是否在Docker环境中运行
if os.path.exists('/.dockerenv'):
# 生成随机的2位数字字母组合
random_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=2))
client_id += f'-D{random_suffix}'
return client_id
CLIENT_ID = get_client_id()
heartbeat_time = 30
report_time = 10
program_status = "未运行"
program_process = None
PROGRAM_NAME = None
PROGRAM_PATH = None
COMMAND_PARAMS = None
AUTO_RESTART = False
DAEMON_ENABLED = False
DAEMON_PROCESS = None
async def get_system_info():
try:
return {
'hostname': platform.node(),
'os': platform.platform(),
'cpu': psutil.cpu_percent(),
'memory': psutil.virtual_memory().percent,
'disk': psutil.disk_usage('/').percent,
'ip_address': await get_ip_address(),
'program_status': program_status
}
except Exception as e:
logger.error(f"Error getting system info: {e}")
return {}
async def get_ip_address():
try:
return subprocess.check_output(['hostname', '-I']).decode().split()[0]
except Exception as e:
logger.error(f"Error getting IP address: {e}")
return 'N/A'
@sio.event
async def connect():
logger.info(f'Connected to server. Client ID: {CLIENT_ID}')
await register_client()
async def register_client():
try:
await sio.emit('register', {
'client_id': CLIENT_ID,
'system_info': await get_system_info()
})
logger.info(f"Client registered: {CLIENT_ID}")
except Exception as e:
logger.error(f"Error registering client: {e}")
@sio.event
async def disconnect():
logger.info('Disconnected from server')
@sio.on('update_client_id')
async def on_update_client_id(data):
global CLIENT_ID
new_id = data['new_id']
logger.info(f"Received new client ID from server: {new_id}")
CLIENT_ID = new_id
# 可能需要更新本地存储的 ID 或进行其他相关操作
@sio.on('execute_command')
async def on_command(data):
global program_status, program_process
command = data['command']
params = data.get('params', {})
result = {'status': 'executed', 'output': '', 'error': ''}
logger.info(f"Received command: {command} with params: {params}")
try:
if command == 'manual':
manual_command = params.get('manualCommand')
if manual_command:
logger.info(f"Executing manual command: {manual_command}")
process = await asyncio.create_subprocess_shell(
manual_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result['output'] = stdout.decode()
result['error'] = stderr.decode()
result['status'] = 'success' if process.returncode == 0 else 'error'
else:
result['status'] = 'error'
result['error'] = "No manual command provided"
elif command == 'start':
if program_status == "未运行":
program_path = params.get('programPath', PROGRAM_PATH)
command_params = params.get('commandParams', COMMAND_PARAMS)
full_command = f"{program_path} {command_params}"
logger.info(f"Starting program: {full_command}")
program_process = await asyncio.create_subprocess_shell(
full_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
program_status = "运行中"
result['status'] = 'started'
result['output'] = "Program started successfully"
else:
result['status'] = 'already running'
result['output'] = "Program is already running"
elif command == 'stop':
if program_process:
logger.info("Stopping program")
program_process.terminate()
await program_process.wait()
program_process = None
program_status = "未运行"
result['status'] = 'stopped'
result['output'] = "Program stopped successfully"
else:
result['status'] = 'not running'
result['output'] = "Program is not running"
elif command == 'update':
logger.info("Initiating update process")
update_success = await self_update()
if update_success:
result['status'] = 'updated'
result['output'] = "Program updated successfully"
else:
result['status'] = 'update failed'
result['error'] = "Failed to update program"
elif command == 'restart':
logger.info("Restarting program")
if program_process:
program_process.terminate()
await program_process.wait()
program_path = params.get('programPath', PROGRAM_PATH)
command_params = params.get('commandParams', COMMAND_PARAMS)
full_command = f"{program_path} {command_params}"
program_process = await asyncio.create_subprocess_shell(
full_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
program_status = "运行中"
result['status'] = 'restarted'
result['output'] = "Program restarted successfully"
elif command == 'force_update':
logger.info("Initiating forced update process")
update_success = await force_update(params.get('update_url'))
if update_success:
result['status'] = 'updated'
result['output'] = "Program forcefully updated successfully"
else:
result['status'] = 'update failed'
result['error'] = "Failed to forcefully update program"
else:
result['status'] = 'unknown command'
result['error'] = f"Unknown command: {command}"
except Exception as e:
logger.error(f"Error executing command {command}: {e}")
result['status'] = 'error'
result['error'] = str(e)
result['program_status'] = program_status
logger.info(f"Sending command result: {result}")
await sio.emit('command_result', {
'client_id': CLIENT_ID,
'command': command,
'result': result
})
@sio.on('update_settings')
async def on_update_settings(data):
global heartbeat_time, report_time
heartbeat_time = data.get('heartbeatTime', heartbeat_time)
report_time = data.get('reportTime', report_time)
logger.info(f"Updated settings: heartbeat_time={heartbeat_time}, report_time={report_time}")
async def send_heartbeat():
while True:
try:
await sio.emit('heartbeat', {
'client_id': CLIENT_ID,
'system_info': await get_system_info()
})
logger.debug("Heartbeat sent")
except Exception as e:
logger.error(f"Error sending heartbeat: {e}")
await asyncio.sleep(heartbeat_time)
async def send_status():
while True:
try:
status_data = await get_system_info()
await sio.emit('status_update', {
'client_id': CLIENT_ID,
'system_info': status_data
})
logger.debug("Status update sent")
except Exception as e:
logger.error(f"Error sending status update: {e}")
await asyncio.sleep(report_time)
async def check_for_updates():
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{UPDATE_URL_ZJ}/check_update/{CURRENT_VERSION}") as response:
if response.status == 200:
data = await response.json()
return data['update_available'], data.get('latest_version')
else:
logger.error(f"检查更新失败,状态码: {response.status}")
return False, None
except Exception as e:
logger.error(f"检查更新失败: {e}")
return False, None
async def download_update(version):
try:
async with aiohttp.ClientSession() as session:
async with session.get(f"{UPDATE_URL_ZJ}/download/{version}") as response:
if response.status == 200:
content = await response.read()
with open(f"{sys.argv[0]}.new", 'wb') as f:
f.write(content)
return True
else:
logger.error(f"下载更新失败,状态码: {response.status}")
return False
except Exception as e:
logger.error(f"下载更新失败: {e}")
return False
async def apply_update():
try:
if os.path.exists(f"{sys.argv[0]}.old"):
os.remove(f"{sys.argv[0]}.old")
os.rename(sys.argv[0], f"{sys.argv[0]}.old")
os.rename(f"{sys.argv[0]}.new", sys.argv[0])
logger.info("Update applied, restarting program...")
os.execl(sys.executable, sys.executable, *sys.argv)
except Exception as e:
logger.error(f"Failed to apply update: {e}")
if os.path.exists(f"{sys.argv[0]}.old"):
os.rename(f"{sys.argv[0]}.old", sys.argv[0])
raise
async def self_update():
update_available, latest_version = await check_for_updates()
if update_available:
logger.info(f"发现新版本 {latest_version},开始下载...")
if await download_update(latest_version):
logger.info("下载完成,准备应用更新...")
await apply_update()
return True
else:
logger.error("更新下载失败")
return False
else:
logger.info("当前已是最新版本")
return False
async def force_update(update_url):
try:
logger.info(f"Downloading forced update from: {update_url}")
async with aiohttp.ClientSession() as session:
async with session.get(update_url) as response:
if response.status == 200:
content = await response.read()
with open(f"{sys.argv[0]}.new", 'wb') as f:
f.write(content)
logger.info("Forced update downloaded successfully")
await apply_update()
return True
else:
logger.error(f"Forced update download failed, status code: {response.status}")
return False
except Exception as e:
logger.error(f"Forced update failed: {e}")
return False
async def check_update_periodically():
while True:
await asyncio.sleep(CHECK_INTERVAL)
logger.info("Checking for updates...")
await self_update()
async def connect_with_retry():
retry_count = 0
while True:
try:
await sio.connect(SERVER_URL, auth={'token': TOKEN})
logger.info("Successfully connected to server")
return
except Exception as e:
retry_count += 1
logger.error(f"Connection attempt {retry_count} failed: {e}")
wait_time = min(60, 5 * retry_count) # 逐步增加等待时间最多等待60秒
logger.info(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
async def main():
while True:
try:
await connect_with_retry()
await asyncio.gather(
send_heartbeat(),
send_status(),
check_update_periodically()
)
except Exception as e:
logger.error(f"An error occurred in main loop: {e}")
logger.info("Reconnecting...")
await asyncio.sleep(5)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Program terminated by user")
except Exception as e:
logger.critical(f"Unhandled exception: {e}")
raise