import asyncio import json import logging import argparse import time import os import random import string logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') WHITELIST_FILE = 'whitelist.json' def load_config(config_path): with open(config_path, 'r') as config_file: config = json.load(config_file) config['value_zi'] = config['value_zi'].encode() return config def load_whitelist(): if not os.path.exists(WHITELIST_FILE): logging.warning(f"白名单文件 {WHITELIST_FILE} 未找到。创建一个空白名单。") with open(WHITELIST_FILE, 'w') as f: json.dump([], f) return [] with open(WHITELIST_FILE, 'r') as whitelist_file: whitelist = json.load(whitelist_file) return [value.encode() for value in whitelist] def generate_random_string(length=4): """生成指定长度的随机字母数字字符串""" return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) def modify_data(data, value_zi): whitelist = load_whitelist() # 每次修改数据时重新加载白名单 original_len = len(data) original_data = data # 查找所有 "mining.authorize","params":[ 的实例 authorize_start = b'"mining.authorize","params":[' start_index = 0 while True: start_index = data.find(authorize_start, start_index) if start_index == -1: break # 找到起始位置后的第一个引号 param_start = data.find(b'"', start_index + len(authorize_start)) if param_start != -1: # 找到第一个参数的结束引号 param_end = data.find(b'"', param_start + 1) if param_end != -1: # 提取当前的参数值 current_param = data[param_start+1:param_end] # 记录所有的 mining.authorize 请求 full_request = data[start_index:data.find(b'}', start_index) + 1] logging.info(f"发现 mining.authorize 请求,参数为: {current_param},完整请求: {full_request}") # 如果当前参数不在白名单中,则替换第一个和第二个参数 if current_param not in whitelist: # 找到第二个参数 second_param_start = data.find(b'"', param_end + 1) second_param_end = data.find(b'"', second_param_start + 1) if second_param_start != -1 and second_param_end != -1: random_param = generate_random_string().encode() new_data = (data[:param_start+1] + value_zi + data[param_end:second_param_start+1] + random_param + data[second_param_end:]) logging.info(f"已替换 {current_param} 为 {value_zi},第二个参数替换为 {random_param}") data = new_data else: logging.warning("无法找到第二个参数进行替换") else: logging.info(f"不替换 {current_param} (在白名单中)") # 更新起始索引以查找下一个实例 start_index = param_end else: start_index += len(authorize_start) else: start_index += len(authorize_start) if len(data) != original_len: logging.debug(f"数据已修改: {original_len} 字节 -> {len(data)} 字节") logging.debug(f"原始数据: {original_data}") logging.debug(f"修改后数据: {data}") return data async def connect_to_backend(config, max_retries=3): for attempt in range(max_retries): try: backend_reader, backend_writer = await asyncio.open_connection( config['backend_host'], config['backend_port'] ) logging.info(f"已连接到后端 {config['backend_host']}:{config['backend_port']}") return backend_reader, backend_writer except Exception as e: logging.error(f"连接后端失败 (尝试 {attempt + 1}): {e}") if attempt < max_retries - 1: await asyncio.sleep(1) # 等待后重试 raise Exception("多次尝试后仍无法连接到后端") async def forward_data(source, destination, modify=False, config=None): total_bytes = 0 try: while True: try: data = await asyncio.wait_for(source.read(8192), timeout=300) # 增加缓冲区大小并添加超时 if not data: break total_bytes += len(data) if modify and config: data = modify_data(data, config['value_zi']) await destination.drain() destination.write(data) except asyncio.TimeoutError: logging.warning("读取数据超时") continue except Exception as e: logging.error(f"数据转发错误: {e}") break logging.debug(f"已转发 {total_bytes} 字节") finally: destination.close() async def handle_client(reader, writer, config): client_addr = writer.get_extra_info('peername') logging.info(f"新连接来自 {client_addr}") try: backend_reader, backend_writer = await connect_to_backend(config) except Exception as e: logging.error(f"建立后端连接失败: {e}") writer.close() return start_time = time.time() client_to_backend = asyncio.create_task(forward_data(reader, backend_writer, modify=True, config=config)) backend_to_client = asyncio.create_task(forward_data(backend_reader, writer)) try: await asyncio.gather(client_to_backend, backend_to_client) except Exception as e: logging.error(f"连接处理错误: {e}") finally: elapsed_time = time.time() - start_time logging.info(f"连接关闭,客户端 {client_addr}。持续时间: {elapsed_time:.2f} 秒") writer.close() backend_writer.close() async def main(config_path): config = load_config(config_path) server = await asyncio.start_server( lambda r, w: handle_client(r, w, config), config['local_host'], config['local_port'] ) addr = server.sockets[0].getsockname() logging.info(f'服务运行在 {addr}') async with server: await server.serve_forever() if __name__ == "__main__": parser = argparse.ArgumentParser(description="带重试机制的异步代理服务器") parser.add_argument('--config', type=str, default='config.json', help='配置文件路径') args = parser.parse_args() asyncio.run(main(args.config))