diff --git a/app/services/material.py b/app/services/material.py index 177a0d3..6c6e6e6 100644 --- a/app/services/material.py +++ b/app/services/material.py @@ -1,3 +1,4 @@ +import logging import os import random from typing import List @@ -5,10 +6,12 @@ from urllib.parse import urlencode import requests from loguru import logger -from moviepy.video.io.VideoFileClip import VideoFileClip +import subprocess +import json from app.config import config from app.models.schema import MaterialInfo, VideoAspect, VideoConcatMode +from app.services import llm from app.utils import utils requested_count = 0 @@ -45,7 +48,7 @@ def search_videos_pexels( "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36", } # Build URL - params = {"query": search_term, "page": 1, "per_page": 80, "orientation": "landscape", "size": "medium","locale":"en-US"} + params = {"query": search_term, "page": 1, "per_page": 5, "orientation": "landscape", "size": "medium","locale":"en-US"} query_url = f"https://api.pexels.com/videos/search?{urlencode(params)}" logger.info(f"searching videos: {query_url}, with proxies: {config.proxy}") @@ -100,7 +103,7 @@ def search_videos_pexels( item.url = best_landscape_file["link"] # 使用最佳版本的链接 item.duration = duration video_items.append(item) - + logging.info("选取的Mp4链接地址为{}".format(item.url)) return video_items except Exception as e: @@ -108,60 +111,156 @@ def search_videos_pexels( return [] + def search_videos_pixabay( search_term: str, minimum_duration: int, video_aspect: VideoAspect = VideoAspect.portrait, + category: str = "", ) -> List[MaterialInfo]: aspect = VideoAspect(video_aspect) - video_width, video_height = aspect.to_resolution() - api_key = get_api_key("pixabay_api_keys") - # Build URL - params = { - "q": search_term, - "video_type": "film", # Accepted values: "all", "film", "animation" - "per_page": 50, - "key": api_key, - } - query_url = f"https://pixabay.com/api/videos/?{urlencode(params)}" - logger.info(f"searching videos: {query_url}, with proxies: {config.proxy}") - try: - r = requests.get( - query_url, proxies=config.proxy, verify=False, timeout=(30, 60) - ) - response = r.json() - video_items = [] - if "hits" not in response: - logger.error(f"search videos failed: {response}") - return video_items - videos = response["hits"] - # loop through each video in the result - for v in videos: - duration = v["duration"] - # check if video has desired minimum duration - if duration < minimum_duration: - continue - video_files = v["videos"] - # loop through each url to determine the best quality - for video_type in video_files: - video = video_files[video_type] - w = int(video["width"]) - # h = int(video["height"]) - if w >= video_width: + def perform_search(params): + params["key"] = api_key + query_url = f"https://pixabay.com/api/videos/?{urlencode(params)}" + logger.info(f"Searching videos: {query_url}, with proxies: {config.proxy}") + try: + r = requests.get( + query_url, + proxies=config.proxy, + verify=False, + timeout=(30, 60), + ) + r.raise_for_status() + response = r.json() + if "hits" not in response or not response["hits"]: + return [] + + video_items = [] + for v in response["hits"]: + duration = v.get("duration") + if not duration or duration < minimum_duration: + continue + + video_files = v.get("videos", {}) + best_video = None + # Simplified logic to find a suitable video rendition + for size in ["large", "medium", "small", "tiny"]: + rendition = video_files.get(size) + if not rendition or not rendition.get("url"): + continue + + width = rendition.get("width", 0) + height = rendition.get("height", 0) + + is_portrait = height > width + is_landscape = width > height + + if aspect == VideoAspect.portrait and is_portrait: + best_video = rendition + break + elif aspect != VideoAspect.portrait and is_landscape: + best_video = rendition + break + + # Fallback to any available video if exact aspect not found + if not best_video: + for size in ["large", "medium", "small", "tiny"]: + if video_files.get(size) and video_files.get(size).get("url"): + best_video = video_files.get(size) + break + + if best_video: item = MaterialInfo() item.provider = "pixabay" - item.url = video["url"] + item.url = best_video.get("url") item.duration = duration video_items.append(item) - break - return video_items - except Exception as e: - logger.error(f"search videos failed: {str(e)}") + + return video_items - return [] + except requests.exceptions.RequestException as e: + logger.error(f"Search videos failed: {str(e)}") + return [] + except Exception as e: + logger.error(f"An unexpected error occurred during video search: {str(e)}") + return [] + + # Attempt 1: Strict search with category and editors_choice + logger.info("Attempt 1: Strict search with category and editors_choice") + params = { + "q": search_term, + "video_type": "film", + "safesearch": "true", + "editors_choice": "true", + "order": "popular", + "page": 1, + "per_page": 80, + } + if category: + params["category"] = category + if video_width > 0: + params["min_width"] = video_width + if video_height > 0: + params["min_height"] = video_height + + results = perform_search(params) + if results: + logger.success(f"Found {len(results)} videos on first attempt.") + return results + + # Attempt 2: Search with editors_choice but without category + logger.warning("First attempt failed. Attempt 2: Retrying without category.") + params.pop("category", None) + results = perform_search(params) + if results: + logger.success(f"Found {len(results)} videos on second attempt.") + return results + + # Attempt 3: Broadest search, without editors_choice + logger.warning("Second attempt failed. Attempt 3: Retrying with broadest settings.") + params.pop("editors_choice", None) + results = perform_search(params) + if results: + logger.success(f"Found {len(results)} videos on third attempt.") + else: + logger.error("All search attempts failed to find any videos.") + + return results + + +def _get_video_info_ffprobe(video_path: str) -> dict: + """ + Get video information using ffprobe. + """ + command = [ + "ffprobe", + "-v", "quiet", + "-print_format", "json", + "-show_format", + "-show_streams", + video_path + ] + try: + result = subprocess.run(command, capture_output=True, text=True, check=True) + info = json.loads(result.stdout) + video_stream = next((s for s in info['streams'] if s['codec_type'] == 'video'), None) + if not video_stream: + return None + + fps_str = video_stream.get('avg_frame_rate', video_stream.get('r_frame_rate', '0/1')) + num, den = map(int, fps_str.split('/')) + fps = num / den if den != 0 else 0 + + return { + "duration": float(video_stream.get('duration', info['format'].get('duration', 0))), + "fps": fps + } + except (subprocess.CalledProcessError, json.JSONDecodeError, StopIteration, KeyError, ZeroDivisionError) as e: + logger.error(f"Failed to get video info for {video_path} using ffprobe: {e}") + return None def save_video(video_url: str, save_dir: str = "") -> str: @@ -199,12 +298,12 @@ def save_video(video_url: str, save_dir: str = "") -> str: if os.path.exists(video_path) and os.path.getsize(video_path) > 0: try: - clip = VideoFileClip(video_path) - duration = clip.duration - fps = clip.fps - clip.close() - if duration > 0 and fps > 0: + info = _get_video_info_ffprobe(video_path) + if info and info.get("duration", 0) > 0 and info.get("fps", 0) > 0: + logger.info(f"video validated: {video_path}") return video_path + else: + raise ValueError("Invalid video file, duration or fps is 0.") except Exception as e: try: os.remove(video_path) @@ -216,6 +315,7 @@ def save_video(video_url: str, save_dir: str = "") -> str: def download_videos( task_id: str, + video_subject: str, search_terms: List[str], source: str = "pexels", video_aspect: VideoAspect = VideoAspect.portrait, @@ -227,14 +327,21 @@ def download_videos( valid_video_urls = [] found_duration = 0.0 search_videos = search_videos_pexels + search_kwargs = {} if source == "pixabay": search_videos = search_videos_pixabay + video_category = "" + if video_subject: + video_category = llm.generate_video_category(video_subject) + if video_category: + search_kwargs['category'] = video_category for search_term in search_terms: video_items = search_videos( search_term=search_term, minimum_duration=max_clip_duration, video_aspect=video_aspect, + **search_kwargs, ) logger.info(f"found {len(video_items)} videos for '{search_term}'") @@ -281,6 +388,7 @@ def download_videos( return video_paths +# 以下为调试入口,仅供开发测试 if __name__ == "__main__": download_videos( "test123", ["Money Exchange Medium"], audio_duration=100, source="pixabay"