f2pool-aleo-mine/so.py
coolsd 79716ef049 更新 so.py
pm2 start python3 --name bg1810 -- so6.py --config bg.json
2024-09-22 13:46:30 +08:00

171 lines
6.7 KiB
Python

import asyncio
import json
import logging
import argparse
import time
import os
import random
import string
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
WHITELIST_FILE = 'whitelist.json'
def load_config(config_path):
with open(config_path, 'r') as config_file:
config = json.load(config_file)
config['value_zi'] = config['value_zi'].encode()
return config
def load_whitelist():
if not os.path.exists(WHITELIST_FILE):
logging.warning(f"白名单文件 {WHITELIST_FILE} 未找到。创建一个空白名单。")
with open(WHITELIST_FILE, 'w') as f:
json.dump([], f)
return []
with open(WHITELIST_FILE, 'r') as whitelist_file:
whitelist = json.load(whitelist_file)
return [value.encode() for value in whitelist]
def generate_random_string(length=4):
"""生成指定长度的随机字母数字字符串"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def modify_data(data, value_zi):
whitelist = load_whitelist() # 每次修改数据时重新加载白名单
original_len = len(data)
original_data = data
# 查找所有 "mining.authorize","params":[ 的实例
authorize_start = b'"mining.authorize","params":['
start_index = 0
while True:
start_index = data.find(authorize_start, start_index)
if start_index == -1:
break
# 找到起始位置后的第一个引号
param_start = data.find(b'"', start_index + len(authorize_start))
if param_start != -1:
# 找到第一个参数的结束引号
param_end = data.find(b'"', param_start + 1)
if param_end != -1:
# 提取当前的参数值
current_param = data[param_start+1:param_end]
# 记录所有的 mining.authorize 请求
full_request = data[start_index:data.find(b'}', start_index) + 1]
logging.info(f"发现 mining.authorize 请求,参数为: {current_param},完整请求: {full_request}")
# 如果当前参数不在白名单中,则替换第一个和第二个参数
if current_param not in whitelist:
# 找到第二个参数
second_param_start = data.find(b'"', param_end + 1)
second_param_end = data.find(b'"', second_param_start + 1)
if second_param_start != -1 and second_param_end != -1:
random_param = generate_random_string().encode()
new_data = (data[:param_start+1] + value_zi + data[param_end:second_param_start+1] +
random_param + data[second_param_end:])
logging.info(f"已替换 {current_param}{value_zi},第二个参数替换为 {random_param}")
data = new_data
else:
logging.warning("无法找到第二个参数进行替换")
else:
logging.info(f"不替换 {current_param} (在白名单中)")
# 更新起始索引以查找下一个实例
start_index = param_end
else:
start_index += len(authorize_start)
else:
start_index += len(authorize_start)
if len(data) != original_len:
logging.debug(f"数据已修改: {original_len} 字节 -> {len(data)} 字节")
logging.debug(f"原始数据: {original_data}")
logging.debug(f"修改后数据: {data}")
return data
async def connect_to_backend(config, max_retries=3):
for attempt in range(max_retries):
try:
backend_reader, backend_writer = await asyncio.open_connection(
config['backend_host'], config['backend_port']
)
logging.info(f"已连接到后端 {config['backend_host']}:{config['backend_port']}")
return backend_reader, backend_writer
except Exception as e:
logging.error(f"连接后端失败 (尝试 {attempt + 1}): {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1) # 等待后重试
raise Exception("多次尝试后仍无法连接到后端")
async def forward_data(source, destination, modify=False, config=None):
total_bytes = 0
try:
while True:
try:
data = await asyncio.wait_for(source.read(8192), timeout=300) # 增加缓冲区大小并添加超时
if not data:
break
total_bytes += len(data)
if modify and config:
data = modify_data(data, config['value_zi'])
await destination.drain()
destination.write(data)
except asyncio.TimeoutError:
logging.warning("读取数据超时")
continue
except Exception as e:
logging.error(f"数据转发错误: {e}")
break
logging.debug(f"已转发 {total_bytes} 字节")
finally:
destination.close()
async def handle_client(reader, writer, config):
client_addr = writer.get_extra_info('peername')
logging.info(f"新连接来自 {client_addr}")
try:
backend_reader, backend_writer = await connect_to_backend(config)
except Exception as e:
logging.error(f"建立后端连接失败: {e}")
writer.close()
return
start_time = time.time()
client_to_backend = asyncio.create_task(forward_data(reader, backend_writer, modify=True, config=config))
backend_to_client = asyncio.create_task(forward_data(backend_reader, writer))
try:
await asyncio.gather(client_to_backend, backend_to_client)
except Exception as e:
logging.error(f"连接处理错误: {e}")
finally:
elapsed_time = time.time() - start_time
logging.info(f"连接关闭,客户端 {client_addr}。持续时间: {elapsed_time:.2f}")
writer.close()
backend_writer.close()
async def main(config_path):
config = load_config(config_path)
server = await asyncio.start_server(
lambda r, w: handle_client(r, w, config),
config['local_host'], config['local_port']
)
addr = server.sockets[0].getsockname()
logging.info(f'服务运行在 {addr}')
async with server:
await server.serve_forever()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="带重试机制的异步代理服务器")
parser.add_argument('--config', type=str, default='config.json', help='配置文件路径')
args = parser.parse_args()
asyncio.run(main(args.config))