mirror of
https://github.com/levywang/avhub.git
synced 2026-02-21 08:47:22 +08:00
2.Improve backend concurrency\n- Add copyright information 3.Optimize search result sorting rules 4.Add caching functionality and related configurations 5.Update documentation
281 lines
10 KiB
Python
281 lines
10 KiB
Python
# -*- encoding: utf-8 -*-
|
|
import re
|
|
import json
|
|
import os
|
|
import asyncio
|
|
import aiohttp
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from bs4 import BeautifulSoup
|
|
from curl_cffi import requests
|
|
from omegaconf import DictConfig
|
|
from utils.logger import setup_logger
|
|
from typing import List, Set, Dict, Any
|
|
from aiohttp import ClientTimeout
|
|
|
|
class AVSpider:
|
|
def __init__(self, av_code, source_url, proxy_url, use_proxy, cfg: DictConfig):
|
|
self.source_url = source_url
|
|
self.av_code = av_code.lower()
|
|
self.proxy_url = proxy_url if use_proxy else None
|
|
self.headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
|
|
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1',
|
|
'Sec-Fetch-Dest': 'document',
|
|
'Sec-Fetch-Mode': 'navigate',
|
|
'Sec-Fetch-Site': 'none',
|
|
'Sec-Fetch-User': '?1',
|
|
}
|
|
self.proxies = {
|
|
"http": self.proxy_url,
|
|
"https": self.proxy_url
|
|
} if self.proxy_url else {}
|
|
self.logger = setup_logger(cfg)
|
|
self.executor = ThreadPoolExecutor(max_workers=10)
|
|
|
|
def _fetch_url(self, url: str) -> str:
|
|
"""使用curl_cffi获取URL内容"""
|
|
try:
|
|
response = requests.get(
|
|
url,
|
|
proxies=self.proxies,
|
|
headers=self.headers,
|
|
impersonate="chrome110",
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
return response.text
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching {url}: {str(e)}")
|
|
return ""
|
|
|
|
def _parse_video_page(self, html_content: str, code_str: str) -> Set[str]:
|
|
"""在线程池中解析视频页面"""
|
|
try:
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
unique_links = set()
|
|
for a_tag in soup.find_all('a'):
|
|
alt_text = a_tag.get('alt')
|
|
if alt_text and code_str in alt_text:
|
|
href = a_tag.get('href')
|
|
if href:
|
|
unique_links.add(href)
|
|
return unique_links
|
|
except Exception as e:
|
|
self.logger.error(f"Error parsing video page: {str(e)}")
|
|
return set()
|
|
|
|
def _parse_magnet_page(self, html_content: str) -> List[List[str]]:
|
|
"""在线程池中解析磁力链接页面"""
|
|
try:
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
target_table = soup.find('table', class_='min-w-full')
|
|
result = []
|
|
|
|
if target_table is not None:
|
|
rows = target_table.find_all('tr')
|
|
for row in rows:
|
|
cols = row.find_all('td')
|
|
data = []
|
|
for col in cols:
|
|
links = col.find_all('a', rel='nofollow')
|
|
if links:
|
|
for l in links:
|
|
href = l['href']
|
|
if "keepshare.org" not in href:
|
|
data.append(href)
|
|
text = col.get_text(strip=True)
|
|
if text != "下载" and "keepshare.org" not in text:
|
|
data.append(text)
|
|
if data:
|
|
result.append(data)
|
|
return result
|
|
except Exception as e:
|
|
self.logger.error(f"Error parsing magnet page: {str(e)}")
|
|
return []
|
|
|
|
async def get_video_url(self) -> List[str]:
|
|
"""获取视频页面的链接"""
|
|
code_str = self.av_code.replace('-', '')
|
|
match = re.match(r'([a-zA-Z]+)(\d+)', code_str)
|
|
if not match:
|
|
self.logger.error(f"Invalid AV code format: {self.av_code}")
|
|
return []
|
|
|
|
letters, digits = match.groups()
|
|
code_str = f"{letters.lower()}-{digits}"
|
|
url = f"{self.source_url}{code_str}"
|
|
|
|
# 在线程池中执行同步请求
|
|
loop = asyncio.get_event_loop()
|
|
html_content = await loop.run_in_executor(self.executor, self._fetch_url, url)
|
|
|
|
if not html_content:
|
|
return []
|
|
|
|
# 在线程池中解析HTML
|
|
unique_links = await loop.run_in_executor(
|
|
self.executor,
|
|
self._parse_video_page,
|
|
html_content,
|
|
code_str
|
|
)
|
|
|
|
self.logger.info(f"Found {len(unique_links)} video URLs")
|
|
return list(unique_links)
|
|
|
|
async def get_magnet_links(self, links: List[str]) -> List[List[str]]:
|
|
"""获取所有磁力链接"""
|
|
loop = asyncio.get_event_loop()
|
|
tasks = []
|
|
|
|
# 创建所有获取页面内容的任务
|
|
for link in links:
|
|
task = loop.run_in_executor(self.executor, self._fetch_url, link)
|
|
tasks.append(task)
|
|
|
|
# 等待所有页面内容获取完成
|
|
html_contents = await asyncio.gather(*tasks)
|
|
|
|
# 在线程池中解析所有页面
|
|
parse_tasks = [
|
|
loop.run_in_executor(self.executor, self._parse_magnet_page, content)
|
|
for content in html_contents if content
|
|
]
|
|
results = await asyncio.gather(*parse_tasks)
|
|
|
|
# 合并所有结果
|
|
all_results = []
|
|
for result in results:
|
|
all_results.extend(result)
|
|
|
|
self.logger.info(f"Found {len(all_results)} magnet links")
|
|
return all_results
|
|
|
|
async def process_av_code(self) -> List[List[str]]:
|
|
"""处理整个AV代码的主方法"""
|
|
try:
|
|
video_links = await self.get_video_url()
|
|
if not video_links:
|
|
return []
|
|
|
|
magnet_links = await self.get_magnet_links(video_links)
|
|
return magnet_links
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing AV code {self.av_code}: {str(e)}")
|
|
return []
|
|
|
|
def __del__(self):
|
|
"""确保线程池被正确关闭"""
|
|
self.executor.shutdown(wait=False)
|
|
|
|
class HacgSpider:
|
|
def __init__(self, url, filepath, cfg: DictConfig):
|
|
self.url = url
|
|
self.filepath = filepath
|
|
self.logger = setup_logger(cfg)
|
|
|
|
def get_pages(self):
|
|
try:
|
|
response = requests.get(self.url)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
self.logger.error(f"Request Error: {e}")
|
|
return None
|
|
|
|
html_content = response.text
|
|
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
div_ele = soup.find('div', class_='wp-pagenavi')
|
|
page_text = div_ele.get_text() if div_ele else ''
|
|
|
|
pages = None
|
|
if "共" in page_text:
|
|
pages = int(page_text.split('共')[1].split('页')[0])
|
|
|
|
self.logger.info(f"Total pages found: {pages}")
|
|
|
|
return pages
|
|
|
|
def get_links(self, page):
|
|
url = f'{self.url}/wp/page/{page}?s=%E5%90%88%E9%9B%86&submit=%E6%90%9C%E7%B4%A2'
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
self.logger.error(f"Request Error: {e}")
|
|
return {}
|
|
|
|
html_content = response.text
|
|
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
links = {}
|
|
for a_tag in soup.find_all('a'):
|
|
href = a_tag.get('href')
|
|
text = a_tag.get_text(strip=True)
|
|
if "月合集" in text:
|
|
links[text] = href
|
|
|
|
magnet_links = {}
|
|
for title, link in links.items():
|
|
try:
|
|
response = requests.get(link)
|
|
response.raise_for_status()
|
|
except requests.RequestException as e:
|
|
self.logger.error(f"Request Error: {e}")
|
|
continue
|
|
|
|
content = response.text
|
|
matches = re.findall(r'\b[a-f0-9]{40}\b', content)
|
|
if matches:
|
|
magnet_links[title] = f'magnet:?xt=urn:btih:{matches[0]}'
|
|
|
|
self.logger.info(f"Magnet links extracted from page {page}")
|
|
|
|
return magnet_links
|
|
|
|
def update_json_file(self):
|
|
if not os.path.exists(self.filepath) or os.path.getsize(self.filepath) == 0:
|
|
results = {}
|
|
total_pages = self.get_pages()
|
|
if total_pages is None:
|
|
self.logger.error("Unable to get total")
|
|
return
|
|
|
|
for i in range(1, total_pages + 1):
|
|
new_data = self.get_links(i)
|
|
results.update(new_data)
|
|
self.logger.info(f'Page {i} processed (Full Update)')
|
|
else:
|
|
with open(self.filepath, 'r', encoding='utf-8') as file:
|
|
results = json.load(file)
|
|
|
|
total_pages = self.get_pages()
|
|
if total_pages is None:
|
|
self.logger.error("Unable to get total")
|
|
return
|
|
|
|
for i in range(1, total_pages + 1):
|
|
new_data = self.get_links(i)
|
|
all_exists = True
|
|
|
|
for title, magnet_link in new_data.items():
|
|
if title not in results or results[title] != magnet_link:
|
|
all_exists = False
|
|
break
|
|
|
|
if not all_exists:
|
|
results = {**new_data, **results}
|
|
self.logger.info(f'Page {i} processed (Incremental Update)')
|
|
|
|
if all_exists:
|
|
self.logger.info(f"Page {i} data already exists in the JSON file, stop updating")
|
|
break
|
|
|
|
with open(self.filepath, 'w', encoding='utf-8') as file:
|
|
json.dump(results, file, ensure_ascii=False, indent=4)
|
|
|
|
self.logger.info("JSON file updated") |