20250708001

This commit is contained in:
yanjianzao 2025-07-08 13:29:54 +08:00
parent 6ca8f195e5
commit 6c549c1ce9
14 changed files with 1144 additions and 1829 deletions

2
.gitignore vendored
View File

@ -9,7 +9,7 @@
/app/utils/__pycache__/
/*/__pycache__/*
.vscode
/**/.streamlit
__pycache__
logs/

View File

@ -3,7 +3,7 @@ from enum import Enum
from typing import Any, List, Optional, Union
import pydantic
from pydantic import BaseModel
from pydantic import BaseModel, Field
# 忽略 Pydantic 的特定警告
warnings.filterwarnings(
@ -74,7 +74,7 @@ class VideoParams(BaseModel):
video_aspect: Optional[VideoAspect] = VideoAspect.portrait.value
video_concat_mode: Optional[VideoConcatMode] = VideoConcatMode.random.value
video_transition_mode: Optional[VideoTransitionMode] = None
video_clip_duration: Optional[int] = 5
max_clip_duration: Optional[int] = 5
video_count: Optional[int] = 1
video_source: Optional[str] = "pexels"
@ -103,7 +103,7 @@ class VideoParams(BaseModel):
stroke_width: float = 1.5
n_threads: Optional[int] = 2
paragraph_number: Optional[int] = 1
storyboard_mode: bool = Field(False, description="是否启用故事板模式以实现音画同步")
class SubtitleRequest(BaseModel):
video_script: str

View File

@ -2,7 +2,7 @@ import json
import logging
import re
import requests
from typing import List
from typing import List, Dict
import g4f
from loguru import logger
@ -173,7 +173,7 @@ def _generate_response(prompt: str) -> str:
"temperature": 0.5,
"top_p": 1,
"top_k": 1,
"max_output_tokens": 2048,
"max_output_tokens": 8192,
}
safety_settings = [
@ -270,8 +270,10 @@ def _generate_response(prompt: str) -> str:
base_url=base_url,
)
response = client.chat.completions.create(
model=model_name, messages=[{"role": "user", "content": prompt}]
response: ChatCompletion = client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": prompt}],
max_tokens=4096
)
if response:
if isinstance(response, ChatCompletion):
@ -392,77 +394,168 @@ Generate a script for a video, depending on the subject of the video.
# ### Video Subject
# {video_subject}
def generate_terms(video_subject: str, video_script: str) -> List[str]:
prompt = f"""
# Role: AI Video Director and Editor
"""
Generate video terms from video subject and script.
"""
prompt_template = """
# Role: Video Search Terms Generator
## Core Goal:
Your mission is to meticulously analyze the provided video script, break it down into distinct visual scenes, and generate a diverse list of English search terms for stock footage.
## Task:
Generate a concise, comma-separated list of 1-5 English search terms based on the provided `Video Subject` and `Video Script`. These terms will be used to find relevant video clips.
## Step-by-Step Instructions:
1. Read the entire `{video_subject}` script to understand the main narrative and mood.
2. Go through the script paragraph by paragraph (or by logical scene breaks).
3. For each paragraph/scene, generate ONE primary search term that best captures its visual essence.
4. Compile all generated search terms into a single JSON array.
## Instructions:
1. **Analyze Context:** Read the `Video Subject` and `Video Script` to understand the main topics and visual elements.
2. **Brainstorm Keywords:** Think of concrete, visually-driven keywords. Avoid abstract concepts.
3. **Select & Refine:** Choose the most powerful and representative terms.
4. **Format Output:** Provide a single line of comma-separated English keywords. Do not include any other text, explanations, or formatting.
## Keyword Generation Principles:
- **DIVERSITY**: CRITICAL. Avoid repetitive or overly similar terms. Each keyword must represent a distinct visual concept from the script.
- **SPECIFICITY**: Be specific. Instead of "car driving," prefer "sports car on mountain road at sunset."
- **VISUAL & CONCRETE**: Each term must describe a tangible, visual scene. Do not use abstract concepts (e.g., "sadness", "freedom").
- **CONCISENESS**: Terms should ideally be 2-4 words long.
- **RELEVANCE**: Every term must be directly inspired by a part of the script and be relevant to the main video subject.
## Example:
**Video Subject:** "The Impact of Sugar on Your Brain"
**Video Script:** "Sugar, a sweet temptation, can have a profound effect on our brain chemistry..."
**Output:**
`sugar cubes, brain scan, dopamine release, person eating candy, neural pathways`
## Output Format Constraints:
- You MUST return a pure, single JSON Array. No introductory text, no markdown. Your entire response body must be a valid JSON array.
- All search terms must be in English.
## Example of a Good Output:
["dramatic mountain landscape", "hiker reaching summit", "close up of old compass", "time-lapse of starry night", "..."]
## Context:
## Your Turn:
### Video Subject:
{video_subject}
### Video Script
### Video Script:
{video_script}
Please note that you must use English for generating video search terms; Chinese is not accepted.
""".strip()
### Output:
"""
prompt = prompt_template.format(
video_subject=video_subject, video_script=video_script
)
logger.info(f"subject: {video_subject}")
search_terms = []
response = ""
for i in range(_max_retries):
try:
response = _generate_response(prompt)
if "Error: " in response:
logger.error(f"failed to generate video script: {response}")
return response
search_terms = json.loads(response)
if not isinstance(search_terms, list) or not all(
isinstance(term, str) for term in search_terms
):
logger.error("response is not a list of strings.")
continue
try:
response = _generate_response(prompt)
# remove blank lines
generated_text = "\n".join(
[line for line in response.split("\n") if line.strip()]
)
if not generated_text:
logger.warning("LLM returned empty terms list.")
return []
except Exception as e:
logger.warning(f"failed to generate video terms: {str(e)}")
if response:
match = re.search(r"\[.*]", response)
if match:
try:
search_terms = json.loads(match.group())
except Exception as e:
logger.warning(f"failed to generate video terms: {str(e)}")
pass
terms = [term.strip().strip("`'\"") for term in generated_text.split(",")]
logger.info(f"Generated terms: {terms}")
return terms
except Exception as e:
logger.error(f"Failed to generate video terms: {e}")
return []
if search_terms and len(search_terms) > 0:
break
if i < _max_retries:
logger.warning(f"failed to generate video terms, trying again... {i + 1}")
logger.success(f"completed: \n{search_terms}")
return search_terms
# def generate_storyboard(video_subject: str, video_script: str) -> List[Dict]:
# """
# Analyzes the entire script, breaks it down into scenes, and generates matching search terms for each scene.
# Returns a list of scenes, where each scene is a dictionary containing 'scene_script' and 'search_terms'.
# """
# prompt = f"""
# # Role: Video Script Analyst
# ## GOAL:
# Your task is to transform a video script into a storyboard. You will read the provided script, segment it into scenes, and for each scene, generate a set of descriptive, visual search terms that will be used to find stock video footage. The final output must be a valid JSON array of objects.
# ## STEP-BY-STEP INSTRUCTIONS:
# 1. **Segment the Script:** Read the `Video Script` and break it down into short, logical, spoken segments. A segment should typically be one or two sentences long.
# ## EXAMPLE (Note the Realism and Concreteness):
# [
# {{
# "scene_script": "Blueberries. They're often called nature's perfect food for your eyes.",
# "search_terms": ["woman eating fresh blueberries from a bowl", "close up of fresh blueberries", "bowl of blueberries on a table"]
# }},
# {{
# "scene_script": "And for good reason. Packed with anthocyanins, vitamin C, and ludian...",
# "search_terms": ["nutritionist explaining health benefits", "close up of vitamin C tablets", "diagram of anthocyanin molecule"]
# }},
# {{
# "scene_script": "...these tiny berries act like microscopic shields, protecting your retina and macula from oxidative stress and age related damage.",
# "search_terms": ["medical animation of the human eye", "diagram of the retina and macula", "older person with healthy eyes smiling"]
# }}
# ]
# ## CONTEXT:
# ### Video Subject:
# {video_subject}
# ### Video Script:
# {video_script}
def generate_storyboard(video_subject: str, video_script: str) -> List[Dict]:
"""
Analyzes the script, breaks it into scenes, and extracts the main subject nouns as search terms for each scene.
Returns a list of scenes, where each scene is a dictionary containing 'scene_script' and 'search_terms'.
"""
# [核心修改] 通过更明确、更强力的指令,强制要求 LLM 将视频脚本的每一句话都处理成一个独立的场景,并为每个场景生成对应的英文关键词。
prompt = f"""
You are a video production assistant. Your task is to process a script for a video, breaking it down sentence by sentence to generate visual search terms.
**CRITICAL INSTRUCTIONS - FOLLOW THESE RULES EXACTLY:**
1. **ONE SENTENCE = ONE VISUAL SEGMENT:** Each sentence from the script is a distinct visual segment. Do not merge sentences.
2. **CONCRETE & VISUAL KEYWORDS ONLY:** The `search_terms` MUST be concrete, visual, and tangible things. They must be nouns or descriptive actions that can be found in a video library.
- **GOOD:** `blueberries`, `person walking`, `city skyline`, `laughing friends`, `human eye`.
- **BAD / FORBIDDEN:** `reason`, `concept`, `idea`, `method`, `health`, `protection`, `damage`. Never use abstract, non-visual words.
3. **MANDATORY KEYWORD DIVERSITY:** You are FORBIDDEN from using the same primary keyword for two consecutive segments. If segment 1 uses `blueberries`, segment 2 MUST use a different but relevant keyword (e.g., `antioxidants` could be visualized as `colorful fruits`, `retina` as `close-up of eye`). DIVERSIFY a lot.
**REQUIRED OUTPUT FORMAT:**
- You must output a valid JSON array of objects.
- Each object represents one sentence and must ONLY contain two keys: `script` and `search_terms`.
**EXAMPLE:**
Video Script:
"Blueberries are packed with anthocyanins, which are great for your eyes. These antioxidants protect the retina from damage."
Your JSON Output:
```json
[
{{
"script": "Blueberries are packed with anthocyanins, which are great for your eyes.",
"search_terms": "blueberries, fresh fruit, antioxidant food"
}},
{{
"script": "These antioxidants protect the retina from damage.",
"search_terms": "close-up of eye, retina scan, vision test"
}}
]
```
**Video Script to Process:**
```
{video_script}
```
**Your JSON Output (must be a valid JSON array):**
"""
# return []
logger.info(f"Generating storyboard for subject: {video_subject}")
response_str = _generate_response(prompt)
try:
# The model should return a valid JSON array string.
# Find the start and end of the JSON array.
json_start = response_str.find('[')
json_end = response_str.rfind(']')
if json_start != -1 and json_end != -1 and json_start < json_end:
json_str = response_str[json_start:json_end+1]
storyboard = json.loads(json_str)
logger.success("Successfully parsed storyboard from LLM response.")
return storyboard
else:
logger.error(f"Could not find a valid JSON array in the response. Raw response: {response_str}")
return []
except json.JSONDecodeError:
logger.error(f"Failed to parse JSON. Raw response: {response_str}")
# Fallback logic can be added here if needed, e.g., using regex to extract JSON.
return []
# ... (您的其他函数和代码保持不变)
if __name__ == "__main__":
@ -479,4 +572,42 @@ if __name__ == "__main__":
print(search_terms)
print("-----输出包含的场景数量-----")
print(len(search_terms))
def generate_video_category(video_subject: str) -> str:
"""
Selects the most appropriate video category from a predefined list based on the video subject.
"""
prompt = f"""
# Role: Video Category Selector
## Goal:
Based on the provided 'Video Subject', select the ONE most suitable category from the `Category List` that best represents the subject. Your response must be only the single category name.
## Category List:
backgrounds, fashion, nature, science, education, feelings, health, people, religion, places, animals, industry, computer, food, sports, transportation, travel, buildings, business, music
## Instructions:
- Analyze the 'Video Subject'.
- Choose the single best-fitting category from the list.
- Respond with ONLY the category name and nothing else.
## Example:
Video Subject: "The benefits of a ketogenic diet"
Response: health
Video Subject: "A tour of the Grand Canyon"
Response: travel
## CONTEXT:
### Video Subject:
{video_subject}
"""
category = _generate_response(prompt).strip().lower()
# Fallback to a default category if the response is invalid
valid_categories = ["backgrounds", "fashion", "nature", "science", "education", "feelings", "health", "people", "religion", "places", "animals", "industry", "computer", "food", "sports", "transportation", "travel", "buildings", "business", "music"]
if category not in valid_categories:
logger.warning(f"Generated category '{category}' is not valid. Falling back to 'nature'.")
return "nature"
logger.success(f"Successfully selected video category: {category}")
return category

View File

@ -102,6 +102,8 @@ def search_videos_pexels(
item.provider = "pexels"
item.url = best_landscape_file["link"] # 使用最佳版本的链接
item.duration = duration
item.path = ""
item.start_time = 0.0
video_items.append(item)
logging.info("选取的Mp4链接地址为{}".format(item.url))
return video_items
@ -177,6 +179,8 @@ def search_videos_pixabay(
item.provider = "pixabay"
item.url = best_video.get("url")
item.duration = duration
item.path = ""
item.start_time = 0.0
video_items.append(item)
return video_items
@ -319,73 +323,86 @@ def download_videos(
search_terms: List[str],
source: str = "pexels",
video_aspect: VideoAspect = VideoAspect.portrait,
video_contact_mode: VideoConcatMode = VideoConcatMode.random,
video_concat_mode: VideoConcatMode = VideoConcatMode.random,
audio_duration: float = 0.0,
max_clip_duration: int = 5,
) -> List[str]:
valid_video_items = []
valid_video_urls = []
found_duration = 0.0
search_videos = search_videos_pexels
search_kwargs = {}
if source == "pixabay":
search_videos = search_videos_pixabay
video_category = ""
if video_subject:
video_category = llm.generate_video_category(video_subject)
if video_category:
search_kwargs['category'] = video_category
) -> List[MaterialInfo]:
"""
Download videos from Pexels or Pixabay based on search terms.
"""
all_video_items: List[MaterialInfo] = []
for term in search_terms:
if source == "pexels":
video_items = search_videos_pexels(
search_term=term,
minimum_duration=max_clip_duration,
video_aspect=video_aspect,
)
elif source == "pixabay":
video_items = search_videos_pixabay(
search_term=term,
minimum_duration=max_clip_duration,
video_aspect=video_aspect,
)
else:
video_items = []
logger.info(f"found {len(video_items)} videos for '{term}'")
all_video_items.extend(video_items)
for search_term in search_terms:
video_items = search_videos(
search_term=search_term,
minimum_duration=max_clip_duration,
video_aspect=video_aspect,
**search_kwargs,
)
logger.info(f"found {len(video_items)} videos for '{search_term}'")
# Remove duplicates and calculate total duration
unique_video_items = []
seen_urls = set()
for item in all_video_items:
if item.url not in seen_urls:
unique_video_items.append(item)
seen_urls.add(item.url)
for item in video_items:
if item.url not in valid_video_urls:
valid_video_items.append(item)
valid_video_urls.append(item.url)
found_duration += item.duration
if video_concat_mode == VideoConcatMode.random:
random.shuffle(unique_video_items)
logger.info(
f"found total videos: {len(valid_video_items)}, required duration: {audio_duration} seconds, found duration: {found_duration} seconds"
)
video_paths = []
found_duration = sum(item.duration for item in unique_video_items)
logger.info(f"found total unique videos: {len(unique_video_items)}, required duration: {audio_duration:.4f} seconds, found duration: {found_duration:.2f} seconds")
logger.info(f"Video download list (first 5): {[item.url for item in unique_video_items[:5]]}")
material_directory = config.app.get("material_directory", "").strip()
if material_directory == "task":
material_directory = utils.task_dir(task_id)
elif material_directory and not os.path.isdir(material_directory):
material_directory = ""
if not unique_video_items:
logger.warning("No videos found for the given search terms.")
return []
if video_contact_mode.value == VideoConcatMode.random.value:
random.shuffle(valid_video_items)
if found_duration < audio_duration:
logger.warning(f"total duration of found videos ({found_duration:.2f}s) is less than audio duration ({audio_duration:.2f}s).")
total_duration = 0.0
for item in valid_video_items:
downloaded_materials: List[MaterialInfo] = []
downloaded_duration = 0.0
for item in unique_video_items:
if downloaded_duration >= audio_duration:
logger.info(f"total duration of downloaded videos: {downloaded_duration:.2f} seconds, skip downloading more")
break
try:
logger.info(f"downloading video: {item.url}")
saved_video_path = save_video(
video_url=item.url, save_dir=material_directory
)
if saved_video_path:
logger.info(f"video saved: {saved_video_path}")
video_paths.append(saved_video_path)
seconds = min(max_clip_duration, item.duration)
total_duration += seconds
if total_duration > audio_duration:
logger.info(
f"total duration of downloaded videos: {total_duration} seconds, skip downloading more"
)
break
file_path = save_video(video_url=item.url)
if file_path:
logger.info(f"video saved: {file_path}")
material_info = MaterialInfo()
material_info.path = file_path
material_info.start_time = 0.0
ffprobe_info = _get_video_info_ffprobe(file_path)
if ffprobe_info and ffprobe_info.get("duration"):
material_info.duration = float(ffprobe_info.get("duration"))
downloaded_duration += material_info.duration
else:
material_info.duration = item.duration # fallback
downloaded_duration += item.duration
downloaded_materials.append(material_info)
except Exception as e:
logger.error(f"failed to download video: {utils.to_json(item)} => {str(e)}")
logger.success(f"downloaded {len(video_paths)} videos")
return video_paths
logger.error(f"failed to download video: {item.url} => {e}")
logger.success(f"downloaded {len(downloaded_materials)} videos")
return downloaded_materials
# 以下为调试入口,仅供开发测试

View File

@ -278,6 +278,77 @@ def correct(subtitle_file, video_script):
logger.success("Subtitle is correct")
def combine_srt_files(srt_files: list, output_file: str):
"""
Combines multiple SRT files into a single file, adjusting timestamps sequentially.
"""
logger.info(f"Combining {len(srt_files)} SRT files into {output_file}")
combined_subtitles = []
last_end_time_seconds = 0.0
entry_index = 1
for srt_file in srt_files:
if not os.path.exists(srt_file):
logger.warning(f"SRT file not found, skipping: {srt_file}")
continue
try:
with open(srt_file, 'r', encoding='utf-8') as f:
content = f.read()
entries = re.split(r'\n\s*\n', content.strip())
for entry in entries:
if not entry.strip():
continue
lines = entry.split('\n')
if len(lines) < 3:
continue
# Parse timestamp
timestamp_line = lines[1]
start_time_str, end_time_str = timestamp_line.split(' --> ')
def srt_time_to_seconds(t_str):
h, m, s_ms = t_str.split(':')
s, ms = s_ms.split(',')
return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000.0
start_time = srt_time_to_seconds(start_time_str)
end_time = srt_time_to_seconds(end_time_str)
duration = end_time - start_time
# Adjust time
new_start_time = last_end_time_seconds
new_end_time = new_start_time + duration
def seconds_to_srt_time(seconds):
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds * 1000) % 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
new_start_str = seconds_to_srt_time(new_start_time)
new_end_str = seconds_to_srt_time(new_end_time)
# Append to combined list
text = '\n'.join(lines[2:])
combined_subtitles.append(f"{entry_index}\n{new_start_str} --> {new_end_str}\n{text}")
entry_index += 1
# Update last end time for the next file
last_end_time_seconds = new_end_time
except Exception as e:
logger.error(f"Error processing SRT file {srt_file}: {e}")
# Write combined SRT to output file
with open(output_file, 'w', encoding='utf-8') as f:
f.write('\n\n'.join(combined_subtitles) + '\n\n')
logger.success(f"Successfully combined SRT files into {output_file}")
if __name__ == "__main__":
task_id = "c12fd1e6-4b0a-4d65-a075-c87abe35a072"
task_dir = utils.task_dir(task_id)

View File

@ -7,10 +7,209 @@ from loguru import logger
from app.config import config
from app.models import const
from app.models.schema import VideoConcatMode, VideoParams
from app.services import llm, material, subtitle, video, voice
from app.models.schema import (
VideoConcatMode,
VideoParams,
VideoAspect,
MaterialInfo,
)
from app.services import llm, material, subtitle, voice, video
from app.services import video as video_utils
from app.services import state as sm
from app.utils import utils
import time
# ... 您已有的 start 函数 ...
# ===================================================================
# 新增的、实现音画同步的主任务函数
# ===================================================================
def start_storyboard_task(task_id, params: VideoParams):
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING)
workdir = utils.task_dir(task_id)
# 1. Generate Storyboard
logger.info("--- Step 1: Generating Storyboard ---")
video_script = params.video_script
if not video_script:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED, status_message="Video script is empty.")
return
storyboard = llm.generate_storyboard(params.video_subject, video_script)
if not storyboard:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED, status_message="Failed to generate storyboard.")
return
# 2. Process each segment
logger.info(f"--- Step 2: Processing {len(storyboard)} video segments ---")
segment_video_paths = []
segment_audio_paths = []
segment_srt_paths = []
total_duration = 0
last_used_keywords = set()
for i, segment in enumerate(storyboard):
try:
logger.info(f"--- Processing segment {i + 1} ---")
segment_script = segment.get("script")
if not segment_script:
logger.warning(f"Segment {i + 1} has no script, skipping")
continue
search_terms_str = segment.get("search_terms", "")
search_terms = [term.strip() for term in search_terms_str.split(',') if term.strip()]
if not search_terms:
logger.warning(f"Segment {i + 1} has no search terms, skipping")
continue
# Keyword Guard: Check for repetitive keywords
current_keywords = set(search_terms)
if i > 0 and current_keywords == last_used_keywords:
logger.warning(f"Segment {i + 1} uses the same keywords as the previous one ({search_terms_str}). Reusing last video clip to avoid visual repetition.")
if segment_video_paths:
segment_video_paths.append(segment_video_paths[-1]) # Reuse the last processed video clip
segment_audio_paths.append(segment_audio_paths[-1]) # Reuse the last audio clip
continue # Skip processing for this segment
last_used_keywords = current_keywords
# a. Generate audio and subtitles for the segment
segment_audio_file = path.join(workdir, f"segment_{i + 1}.mp3")
segment_srt_file = path.join(workdir, f"segment_{i + 1}.srt")
sub_maker = voice.tts(
text=segment_script,
voice_name=voice.parse_voice_name(params.voice_name),
voice_rate=params.voice_rate,
voice_file=segment_audio_file,
)
if not sub_maker:
raise Exception(f"Failed to generate audio for segment {i + 1}")
voice.create_subtitle(
sub_maker=sub_maker, text=segment_script, subtitle_file=segment_srt_file
)
audio_duration = voice.get_audio_duration(sub_maker)
total_duration += audio_duration
# b. Search and download video materials for each term
video_materials = []
downloaded_duration = 0
for term in search_terms:
if downloaded_duration >= audio_duration:
break
term_materials = material.download_videos(
task_id=task_id,
video_subject=params.video_subject,
search_terms=[term], # Pass one term at a time
source=params.video_source,
video_aspect=params.video_aspect,
video_concat_mode=params.video_concat_mode,
audio_duration=audio_duration - downloaded_duration,
max_clip_duration=params.max_clip_duration,
)
if term_materials:
video_materials.extend(term_materials)
downloaded_duration = sum(m.duration for m in video_materials)
if not video_materials:
raise Exception(f"Failed to find materials for segment {i + 1}")
# c. Create a video clip matching the audio duration
segment_video_path = path.join(workdir, f"segment_video_{i + 1}.mp4")
clip_created = video.create_video_clip_from_materials(
video_materials=video_materials,
audio_duration=audio_duration,
max_clip_duration=params.max_clip_duration,
video_aspect=params.video_aspect,
output_path=segment_video_path
)
if not clip_created:
raise Exception(f"Failed to create video clip for segment {i + 1}")
segment_video_paths.append(segment_video_path)
segment_audio_paths.append(segment_audio_file)
segment_srt_paths.append(segment_srt_file)
except Exception as e:
logger.error(f"Error processing segment {i + 1}: {e}")
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED, status_message=f"Error in segment {i + 1}: {e}")
return
# Check if any segments were processed
if not segment_video_paths:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED, status_message="Failed to process any segments.")
logger.error("Failed to process any segments. Aborting video generation.")
return
# 3. Combine all segments
logger.info("--- Step 3: Combining all video segments ---")
# a. Combine audios
combined_audio_path = path.join(workdir, "voice.mp3")
if not voice.combine_audio_files(segment_audio_paths, combined_audio_path):
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED, status_message="Failed to combine audio files.")
return
# b. Combine videos
video_transition_mode = params.video_transition_mode
concatenated_video_path = path.join(workdir, "concatenated_video.mp4")
if not video.concatenate_videos(segment_video_paths, concatenated_video_path, transition_mode=video_transition_mode):
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED, status_message="Failed to concatenate videos.")
return
# c. Combine subtitles
combined_srt_path = path.join(workdir, "subtitles.srt")
subtitle.combine_srt_files(segment_srt_paths, combined_srt_path)
# 4. Final video assembly
logger.info("--- Step 4: Final video assembly ---")
# a. Add audio to concatenated video
video_with_audio_path = path.join(workdir, "video_with_audio.mp4")
if not video.add_audio_to_video(concatenated_video_path, combined_audio_path, video_with_audio_path):
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED, status_message="Failed to add audio to video.")
return
# b. Add background music
video_with_bgm_path = path.join(workdir, "video_with_bgm.mp4")
bgm_file = video.get_bgm_file(bgm_type=params.bgm_type, bgm_file=params.bgm_file)
if bgm_file:
if not video.add_bgm_to_video(
input_video_path=video_with_audio_path,
bgm_path=bgm_file,
bgm_volume=params.bgm_volume,
output_video_path=video_with_bgm_path
):
logger.warning("Failed to mix BGM. Proceeding without it.")
video_with_bgm_path = video_with_audio_path # Fallback
else:
video_with_bgm_path = video_with_audio_path # No BGM requested
# c. Add subtitles
final_video_path = path.join(workdir, f"final_{task_id}.mp4")
video.add_subtitles_to_video(
video_path=video_with_bgm_path,
srt_path=combined_srt_path,
font_name=params.font_name,
font_size=params.font_size,
text_fore_color=params.text_fore_color,
stroke_color=params.stroke_color,
stroke_width=params.stroke_width,
subtitle_position=params.subtitle_position,
custom_position=params.custom_position,
output_path=final_video_path
)
# 5. Cleanup
logger.info("--- Step 5: Cleaning up temporary files ---")
cleanup_files = segment_video_paths + segment_audio_paths + segment_srt_paths + [combined_audio_path, concatenated_video_path, combined_srt_path, video_with_audio_path, video_with_bgm_path]
for item in cleanup_files:
if item and item != final_video_path and os.path.exists(item):
os.remove(item)
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, video_path=final_video_path)
logger.success(f"Task {task_id} completed successfully. Final video: {final_video_path}")
return {"videos": [final_video_path]}
def generate_script(task_id, params):
@ -127,7 +326,7 @@ def get_video_materials(task_id, params, video_terms, audio_duration):
if params.video_source == "local":
logger.info("\n\n## preprocess local materials")
materials = video.preprocess_video(
materials=params.video_materials, clip_duration=params.video_clip_duration
materials=params.video_materials, clip_duration=params.max_clip_duration
)
if not materials:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
@ -140,12 +339,13 @@ def get_video_materials(task_id, params, video_terms, audio_duration):
logger.info(f"\n\n## downloading videos from {params.video_source}")
downloaded_videos = material.download_videos(
task_id=task_id,
video_subject=params.video_subject,
search_terms=video_terms,
source=params.video_source,
video_aspect=params.video_aspect,
video_contact_mode=params.video_concat_mode,
audio_duration=audio_duration * params.video_count,
max_clip_duration=params.video_clip_duration,
max_clip_duration=params.max_clip_duration,
)
if not downloaded_videos:
sm.state.update_task(task_id, state=const.TASK_STATE_FAILED)
@ -173,14 +373,14 @@ def generate_final_videos(
utils.task_dir(task_id), f"combined-{index}.mp4"
)
logger.info(f"\n\n## combining video: {index} => {combined_video_path}")
video.combine_videos_ffmpeg(
video_utils.combine_videos_ffmpeg(
combined_video_path=combined_video_path,
video_paths=downloaded_videos,
audio_file=audio_file,
video_aspect=params.video_aspect,
video_concat_mode=video_concat_mode,
video_transition_mode=video_transition_mode,
max_clip_duration=params.video_clip_duration,
max_clip_duration=params.max_clip_duration,
threads=params.n_threads,
)
@ -190,7 +390,7 @@ def generate_final_videos(
final_video_path = path.join(utils.task_dir(task_id), f"final-{index}.mp4")
logger.info(f"\n\n## generating video: {index} => {final_video_path}")
video.generate_video(
video_utils.generate_video(
video_path=combined_video_path,
audio_path=audio_file,
subtitle_path=subtitle_path,

View File

@ -1,21 +0,0 @@
from moviepy import Clip, vfx
# FadeIn
def fadein_transition(clip: Clip, t: float) -> Clip:
return clip.with_effects([vfx.FadeIn(t)])
# FadeOut
def fadeout_transition(clip: Clip, t: float) -> Clip:
return clip.with_effects([vfx.FadeOut(t)])
# SlideIn
def slidein_transition(clip: Clip, t: float, side: str) -> Clip:
return clip.with_effects([vfx.SlideIn(t, side)])
# SlideOut
def slideout_transition(clip: Clip, t: float, side: str) -> Clip:
return clip.with_effects([vfx.SlideOut(t, side)])

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

0
app/utils/__init__.py Normal file
View File

View File

@ -14,6 +14,23 @@ from app.models import const
urllib3.disable_warnings()
def parse_voice_name(name: str):
# zh-CN-XiaoyiNeural-Female -> zh-CN-XiaoyiNeural
# zh-CN-YunxiNeural-Male -> zh-CN-YunxiNeural
# zh-CN-XiaoxiaoMultilingualNeural-V2-Female -> zh-CN-XiaoxiaoMultilingualNeural-V2
return name.replace("-Female", "").replace("-Male", "").strip()
def is_azure_v2_voice(voice_name: str):
voice_name = parse_voice_name(voice_name)
if voice_name.endswith("-V2"):
return voice_name.replace("-V2", "").strip()
return ""
def is_siliconflow_voice(voice_name: str):
"""检查是否是硅基流动的声音"""
return voice_name.startswith("siliconflow:")
def get_response(status: int, data: Any = None, message: str = ""):
obj = {
"status": status,
@ -64,6 +81,13 @@ def get_uuid(remove_hyphen: bool = False):
return u
def get_root_dir(sub_dir: str = ""):
d = root_dir()
if sub_dir:
d = os.path.join(d, sub_dir)
return d
def root_dir():
return os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
@ -103,6 +127,10 @@ def font_dir(sub_dir: str = ""):
return d
def get_font_path(font_name: str):
return os.path.join(font_dir(), font_name)
def song_dir(sub_dir: str = ""):
d = resource_dir("songs")
if sub_dir:
@ -227,4 +255,22 @@ def load_locales(i18n_dir):
def parse_extension(filename):
return Path(filename).suffix.lower().lstrip('.')
return os.path.splitext(filename)[1]
def rgb_to_bgr_hex(rgb_color):
"""Converts an RGB color string (e.g., '#RRGGBB') to a BGR hex string for FFmpeg.
Args:
rgb_color (str): The RGB color string, starting with '#'.
Returns:
str: The BGR hex string (e.g., 'BBGGRR').
"""
if not rgb_color.startswith('#') or len(rgb_color) != 7:
logger.warning(f"Invalid color format: {rgb_color}. Using default white.")
return "FFFFFF" # Default to white for invalid formats
r = rgb_color[1:3]
g = rgb_color[3:5]
b = rgb_color[5:7]
return f"{b}{g}{r}"

View File

@ -6,7 +6,7 @@ uvicorn==0.32.1
openai==1.56.1
faster-whisper==1.1.0
loguru==0.7.3
google.generativeai==0.8.3
google-generativeai==0.8.3
dashscope==1.20.14
g4f==0.5.2.2
azure-cognitiveservices-speech==1.41.1

View File

@ -1,2 +1,2 @@
[browser]
gatherUsageStats = false
[server]
fileWatcherType = "none"

View File

@ -618,7 +618,7 @@ with middle_panel:
)
params.video_aspect = VideoAspect(video_aspect_ratios[selected_index][1])
params.video_clip_duration = st.selectbox(
params.max_clip_duration = st.selectbox(
tr("Clip Duration"), options=[2, 3, 4, 5, 6, 7, 8, 9, 10], index=1
)
params.video_count = st.selectbox(
@ -659,7 +659,8 @@ with middle_panel:
if selected_tts_server == "siliconflow":
# 获取硅基流动的声音列表
filtered_voices = voice.get_siliconflow_voices()
# filtered_voices = voice.get_siliconflow_voices()
pass
else:
# 获取Azure的声音列表
all_voices = voice.get_all_azure_voices(filter_locals=None)
@ -699,6 +700,7 @@ with middle_panel:
if saved_voice_name_index >= len(friendly_names) and friendly_names:
saved_voice_name_index = 0
voice_name = ""
# 确保有声音可选
if friendly_names:
selected_friendly_name = st.selectbox(
@ -715,14 +717,16 @@ with middle_panel:
params.voice_name = voice_name
config.ui["voice_name"] = voice_name
else:
# 如果没有声音可选,显示提示信息
# 如果没有声音可选,使用默认声音并显示提示信息
st.warning(
tr(
"No voices available for the selected TTS server. Please select another server."
"No voices available for the selected TTS server. A default voice (en-US-JennyNeural) will be used."
)
)
params.voice_name = ""
config.ui["voice_name"] = ""
default_voice = "en-US-JennyNeural"
params.voice_name = default_voice
config.ui["voice_name"] = default_voice
voice_name = default_voice
# 只有在有声音可选时才显示试听按钮
if friendly_names and st.button(tr("Play Voice")):
@ -961,7 +965,7 @@ if start_button:
logger.info(utils.to_json(params))
scroll_to_bottom()
result = tm.start(task_id=task_id, params=params)
result = tm.start_storyboard_task(task_id=task_id, params=params)
if not result or "videos" not in result:
st.error(tr("Video Generation Failed"))
logger.error(tr("Video Generation Failed"))