171 lines
6.7 KiB
Python
171 lines
6.7 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import argparse
|
|
import time
|
|
import os
|
|
import random
|
|
import string
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
WHITELIST_FILE = 'whitelist.json'
|
|
|
|
def load_config(config_path):
|
|
with open(config_path, 'r') as config_file:
|
|
config = json.load(config_file)
|
|
config['value_zi'] = config['value_zi'].encode()
|
|
return config
|
|
|
|
def load_whitelist():
|
|
if not os.path.exists(WHITELIST_FILE):
|
|
logging.warning(f"白名单文件 {WHITELIST_FILE} 未找到。创建一个空白名单。")
|
|
with open(WHITELIST_FILE, 'w') as f:
|
|
json.dump([], f)
|
|
return []
|
|
|
|
with open(WHITELIST_FILE, 'r') as whitelist_file:
|
|
whitelist = json.load(whitelist_file)
|
|
return [value.encode() for value in whitelist]
|
|
|
|
def generate_random_string(length=4):
|
|
"""生成指定长度的随机字母数字字符串"""
|
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
|
|
|
|
def modify_data(data, value_zi):
|
|
whitelist = load_whitelist() # 每次修改数据时重新加载白名单
|
|
original_len = len(data)
|
|
original_data = data
|
|
|
|
# 查找所有 "mining.authorize","params":[ 的实例
|
|
authorize_start = b'"mining.authorize","params":['
|
|
start_index = 0
|
|
while True:
|
|
start_index = data.find(authorize_start, start_index)
|
|
if start_index == -1:
|
|
break
|
|
|
|
# 找到起始位置后的第一个引号
|
|
param_start = data.find(b'"', start_index + len(authorize_start))
|
|
if param_start != -1:
|
|
# 找到第一个参数的结束引号
|
|
param_end = data.find(b'"', param_start + 1)
|
|
if param_end != -1:
|
|
# 提取当前的参数值
|
|
current_param = data[param_start+1:param_end]
|
|
|
|
# 记录所有的 mining.authorize 请求
|
|
full_request = data[start_index:data.find(b'}', start_index) + 1]
|
|
logging.info(f"发现 mining.authorize 请求,参数为: {current_param},完整请求: {full_request}")
|
|
|
|
# 如果当前参数不在白名单中,则替换第一个和第二个参数
|
|
if current_param not in whitelist:
|
|
# 找到第二个参数
|
|
second_param_start = data.find(b'"', param_end + 1)
|
|
second_param_end = data.find(b'"', second_param_start + 1)
|
|
if second_param_start != -1 and second_param_end != -1:
|
|
random_param = generate_random_string().encode()
|
|
new_data = (data[:param_start+1] + value_zi + data[param_end:second_param_start+1] +
|
|
random_param + data[second_param_end:])
|
|
logging.info(f"已替换 {current_param} 为 {value_zi},第二个参数替换为 {random_param}")
|
|
data = new_data
|
|
else:
|
|
logging.warning("无法找到第二个参数进行替换")
|
|
else:
|
|
logging.info(f"不替换 {current_param} (在白名单中)")
|
|
|
|
# 更新起始索引以查找下一个实例
|
|
start_index = param_end
|
|
else:
|
|
start_index += len(authorize_start)
|
|
else:
|
|
start_index += len(authorize_start)
|
|
|
|
if len(data) != original_len:
|
|
logging.debug(f"数据已修改: {original_len} 字节 -> {len(data)} 字节")
|
|
logging.debug(f"原始数据: {original_data}")
|
|
logging.debug(f"修改后数据: {data}")
|
|
return data
|
|
|
|
async def connect_to_backend(config, max_retries=3):
|
|
for attempt in range(max_retries):
|
|
try:
|
|
backend_reader, backend_writer = await asyncio.open_connection(
|
|
config['backend_host'], config['backend_port']
|
|
)
|
|
logging.info(f"已连接到后端 {config['backend_host']}:{config['backend_port']}")
|
|
return backend_reader, backend_writer
|
|
except Exception as e:
|
|
logging.error(f"连接后端失败 (尝试 {attempt + 1}): {e}")
|
|
if attempt < max_retries - 1:
|
|
await asyncio.sleep(1) # 等待后重试
|
|
raise Exception("多次尝试后仍无法连接到后端")
|
|
|
|
async def forward_data(source, destination, modify=False, config=None):
|
|
total_bytes = 0
|
|
try:
|
|
while True:
|
|
try:
|
|
data = await asyncio.wait_for(source.read(8192), timeout=300) # 增加缓冲区大小并添加超时
|
|
if not data:
|
|
break
|
|
total_bytes += len(data)
|
|
if modify and config:
|
|
data = modify_data(data, config['value_zi'])
|
|
await destination.drain()
|
|
destination.write(data)
|
|
except asyncio.TimeoutError:
|
|
logging.warning("读取数据超时")
|
|
continue
|
|
except Exception as e:
|
|
logging.error(f"数据转发错误: {e}")
|
|
break
|
|
logging.debug(f"已转发 {total_bytes} 字节")
|
|
finally:
|
|
destination.close()
|
|
|
|
async def handle_client(reader, writer, config):
|
|
client_addr = writer.get_extra_info('peername')
|
|
logging.info(f"新连接来自 {client_addr}")
|
|
|
|
try:
|
|
backend_reader, backend_writer = await connect_to_backend(config)
|
|
except Exception as e:
|
|
logging.error(f"建立后端连接失败: {e}")
|
|
writer.close()
|
|
return
|
|
|
|
start_time = time.time()
|
|
|
|
client_to_backend = asyncio.create_task(forward_data(reader, backend_writer, modify=True, config=config))
|
|
backend_to_client = asyncio.create_task(forward_data(backend_reader, writer))
|
|
|
|
try:
|
|
await asyncio.gather(client_to_backend, backend_to_client)
|
|
except Exception as e:
|
|
logging.error(f"连接处理错误: {e}")
|
|
finally:
|
|
elapsed_time = time.time() - start_time
|
|
logging.info(f"连接关闭,客户端 {client_addr}。持续时间: {elapsed_time:.2f} 秒")
|
|
writer.close()
|
|
backend_writer.close()
|
|
|
|
async def main(config_path):
|
|
config = load_config(config_path)
|
|
server = await asyncio.start_server(
|
|
lambda r, w: handle_client(r, w, config),
|
|
config['local_host'], config['local_port']
|
|
)
|
|
|
|
addr = server.sockets[0].getsockname()
|
|
logging.info(f'服务运行在 {addr}')
|
|
|
|
async with server:
|
|
await server.serve_forever()
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="带重试机制的异步代理服务器")
|
|
parser.add_argument('--config', type=str, default='config.json', help='配置文件路径')
|
|
args = parser.parse_args()
|
|
|
|
asyncio.run(main(args.config)) |