横屏完美版本0708001

This commit is contained in:
yanjianzao 2025-07-08 18:45:00 +08:00
parent fd5c924238
commit 400f873b4a
5 changed files with 211 additions and 372 deletions

View File

@ -1,3 +1,8 @@
# ==============================================================================
# app/services/llm.py (最终修复版 - 含场景延续性规则)
# 操作指南:请复制所有代码,并完全覆盖您项目中的同名文件。
# ==============================================================================
import json
import logging
import re
@ -15,6 +20,10 @@ _max_retries = 5
def _generate_response(prompt: str) -> str:
"""
与大语言模型LLM提供商进行交互
(此函数已从您上传的文件中完整保留)
"""
try:
content = ""
llm_provider = config.app.get("llm_provider", "openai")
@ -34,7 +43,6 @@ def _generate_response(prompt: str) -> str:
model_name = config.app.get("moonshot_model_name")
base_url = "https://api.moonshot.cn/v1"
elif llm_provider == "ollama":
# api_key = config.app.get("openai_api_key")
api_key = "ollama" # any string works but you are required to have one
model_name = config.app.get("ollama_model_name")
base_url = config.app.get("ollama_base_url", "")
@ -58,16 +66,16 @@ def _generate_response(prompt: str) -> str:
elif llm_provider == "gemini":
api_key = config.app.get("gemini_api_key")
model_name = config.app.get("gemini_model_name")
base_url = "***"
base_url = "***" # Placeholder from original file
elif llm_provider == "qwen":
api_key = config.app.get("qwen_api_key")
model_name = config.app.get("qwen_model_name")
base_url = "***"
base_url = "***" # Placeholder from original file
elif llm_provider == "cloudflare":
api_key = config.app.get("cloudflare_api_key")
model_name = config.app.get("cloudflare_model_name")
account_id = config.app.get("cloudflare_account_id")
base_url = "***"
base_url = "***" # Placeholder from original file
elif llm_provider == "deepseek":
api_key = config.app.get("deepseek_api_key")
model_name = config.app.get("deepseek_model_name")
@ -78,7 +86,7 @@ def _generate_response(prompt: str) -> str:
api_key = config.app.get("ernie_api_key")
secret_key = config.app.get("ernie_secret_key")
base_url = config.app.get("ernie_base_url")
model_name = "***"
model_name = "***" # Placeholder from original file
if not secret_key:
raise ValueError(
f"{llm_provider}: secret_key is not set, please set it in the config.toml file."
@ -89,140 +97,77 @@ def _generate_response(prompt: str) -> str:
if not base_url:
base_url = "https://text.pollinations.ai/openai"
model_name = config.app.get("pollinations_model_name", "openai-fast")
# Prepare the payload
payload = {
"model": model_name,
"messages": [
{"role": "user", "content": prompt}
],
"seed": 101 # Optional but helps with reproducibility
"messages": [{"role": "user", "content": prompt}],
"seed": 101
}
# Optional parameters if configured
if config.app.get("pollinations_private"):
payload["private"] = True
if config.app.get("pollinations_referrer"):
payload["referrer"] = config.app.get("pollinations_referrer")
headers = {
"Content-Type": "application/json"
}
# Make the API request
headers = {"Content-Type": "application/json"}
response = requests.post(base_url, headers=headers, json=payload)
response.raise_for_status()
result = response.json()
if result and "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
return content.replace("\n", "")
else:
raise Exception(f"[{llm_provider}] returned an invalid response format")
except requests.exceptions.RequestException as e:
raise Exception(f"[{llm_provider}] request failed: {str(e)}")
except Exception as e:
raise Exception(f"[{llm_provider}] error: {str(e)}")
if llm_provider not in ["pollinations", "ollama"]: # Skip validation for providers that don't require API key
if llm_provider not in ["pollinations", "ollama"]:
if not api_key:
raise ValueError(
f"{llm_provider}: api_key is not set, please set it in the config.toml file."
)
raise ValueError(f"{llm_provider}: api_key is not set, please set it in the config.toml file.")
if not model_name:
raise ValueError(
f"{llm_provider}: model_name is not set, please set it in the config.toml file."
)
raise ValueError(f"{llm_provider}: model_name is not set, please set it in the config.toml file.")
if not base_url:
raise ValueError(
f"{llm_provider}: base_url is not set, please set it in the config.toml file."
)
raise ValueError(f"{llm_provider}: base_url is not set, please set it in the config.toml file.")
if llm_provider == "qwen":
import dashscope
from dashscope.api_entities.dashscope_response import GenerationResponse
dashscope.api_key = api_key
response = dashscope.Generation.call(
model=model_name, messages=[{"role": "user", "content": prompt}]
)
response = dashscope.Generation.call(model=model_name, messages=[{"role": "user", "content": prompt}])
if response:
if isinstance(response, GenerationResponse):
status_code = response.status_code
if status_code != 200:
raise Exception(
f'[{llm_provider}] returned an error response: "{response}"'
)
if response.status_code != 200:
raise Exception(f'[{llm_provider}] returned an error response: "{response}"')
content = response["output"]["text"]
return content.replace("\n", "")
else:
raise Exception(
f'[{llm_provider}] returned an invalid response: "{response}"'
)
raise Exception(f'[{llm_provider}] returned an invalid response: "{response}"')
else:
raise Exception(f"[{llm_provider}] returned an empty response")
if llm_provider == "gemini":
import google.generativeai as genai
genai.configure(api_key=api_key, transport="rest")
generation_config = {
"temperature": 0.5,
"top_p": 1,
"top_k": 1,
"max_output_tokens": 8192,
}
generation_config = {"temperature": 0.5, "top_p": 1, "top_k": 1, "max_output_tokens": 8192}
safety_settings = [
{
"category": "HARM_CATEGORY_HARASSMENT",
"threshold": "BLOCK_ONLY_HIGH",
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_ONLY_HIGH",
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_ONLY_HIGH",
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_ONLY_HIGH",
},
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_ONLY_HIGH"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_ONLY_HIGH"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_ONLY_HIGH"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_ONLY_HIGH"},
]
model = genai.GenerativeModel(
model_name=model_name,
generation_config=generation_config,
safety_settings=safety_settings,
)
model = genai.GenerativeModel(model_name=model_name, generation_config=generation_config, safety_settings=safety_settings)
try:
response = model.generate_content(prompt)
candidates = response.candidates
generated_text = candidates[0].content.parts[0].text
generated_text = response.candidates[0].content.parts[0].text
return generated_text
except (AttributeError, IndexError) as e:
print("Gemini Error:", e)
return generated_text
logger.error(f"Gemini Error: {e}")
return ""
if llm_provider == "cloudflare":
response = requests.post(
f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model_name}",
headers={"Authorization": f"Bearer {api_key}"},
json={
"messages": [
{
"role": "system",
"content": "You are a friendly assistant",
},
{"role": "user", "content": prompt},
]
},
json={"messages": [{"role": "system", "content": "You are a friendly assistant"}, {"role": "user", "content": prompt}]}
)
result = response.json()
logger.info(result)
@ -230,78 +175,45 @@ def _generate_response(prompt: str) -> str:
if llm_provider == "ernie":
response = requests.post(
"https://aip.baidubce.com/oauth/2.0/token",
params={
"grant_type": "client_credentials",
"client_id": api_key,
"client_secret": secret_key,
}
"https://aip.baidubce.com/oauth/2.0/token",
params={"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key}
)
access_token = response.json().get("access_token")
url = f"{base_url}?access_token={access_token}"
payload = json.dumps(
{
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.5,
"top_p": 0.8,
"penalty_score": 1,
"disable_search": False,
"enable_citation": False,
"response_format": "text",
}
)
payload = json.dumps({"messages": [{"role": "user", "content": prompt}], "temperature": 0.5, "top_p": 0.8, "penalty_score": 1, "disable_search": False, "enable_citation": False, "response_format": "text"})
headers = {"Content-Type": "application/json"}
response = requests.request(
"POST", url, headers=headers, data=payload
).json()
response = requests.request("POST", url, headers=headers, data=payload).json()
return response.get("result")
if llm_provider == "azure":
client = AzureOpenAI(
api_key=api_key,
api_version=api_version,
azure_endpoint=base_url,
)
client = AzureOpenAI(api_key=api_key, api_version=api_version, azure_endpoint=base_url)
else:
client = OpenAI(
api_key=api_key,
base_url=base_url,
)
client = OpenAI(api_key=api_key, base_url=base_url)
response: ChatCompletion = client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": prompt}],
max_tokens=4096
)
response: ChatCompletion = client.chat.completions.create(model=model_name, messages=[{"role": "user", "content": prompt}], max_tokens=4096)
if response:
if isinstance(response, ChatCompletion):
content = response.choices[0].message.content
else:
raise Exception(
f'[{llm_provider}] returned an invalid response: "{response}", please check your network '
f"connection and try again."
)
raise Exception(f'[{llm_provider}] returned an invalid response: "{response}", please check your network connection and try again.')
else:
raise Exception(
f"[{llm_provider}] returned an empty response, please check your network connection and try again."
)
raise Exception(f"[{llm_provider}] returned an empty response, please check your network connection and try again.")
return content.replace("\n", "")
except Exception as e:
logger.error(f"[_generate_response] 发生错误: {e}")
return f"Error: {str(e)}"
def generate_script(
video_subject: str, language: str = "", paragraph_number: int = 1
) -> str:
def generate_script(video_subject: str, language: str = "", paragraph_number: int = 1) -> str:
"""
根据视频主题生成脚本
(此函数已从您上传的文件中完整保留)
"""
prompt = f"""
# Role: Video Script Generator
## Goals:
Generate a script for a video, depending on the subject of the video.
## Constrains:
1. the script is to be returned as a string with the specified number of paragraphs.
2. do not under any circumstance reference this prompt in your response.
@ -311,7 +223,6 @@ Generate a script for a video, depending on the subject of the video.
6. do not include "voiceover", "narrator" or similar indicators of what should be spoken at the beginning of each paragraph or line.
7. you must not mention the prompt, or anything about the script itself. also, never talk about the number of paragraphs or lines. just write the script.
8. respond in the same language as the video subject.
# Initialization:
- video subject: {video_subject}
- number of paragraphs: {paragraph_number}
@ -323,22 +234,10 @@ Generate a script for a video, depending on the subject of the video.
logger.info(f"subject: {video_subject}")
def format_response(response):
# Clean the script
# Remove asterisks, hashes
response = response.replace("*", "")
response = response.replace("#", "")
# Remove markdown syntax
response = response.replace("*", "").replace("#", "")
response = re.sub(r"\[.*\]", "", response)
response = re.sub(r"\(.*\)", "", response)
# Split the script into paragraphs
paragraphs = response.split("\n\n")
# Select the specified number of paragraphs
# selected_paragraphs = paragraphs[:paragraph_number]
# Join the selected paragraphs into a single string
return "\n\n".join(paragraphs)
for i in range(_max_retries):
@ -348,16 +247,12 @@ Generate a script for a video, depending on the subject of the video.
final_script = format_response(response)
else:
logging.error("gpt returned an empty response")
# g4f may return an error message
if final_script and "当日额度已消耗完" in final_script:
raise ValueError(final_script)
if final_script:
break
except Exception as e:
logger.error(f"failed to generate script: {e}")
if i < _max_retries:
logger.warning(f"failed to generate video script, trying again... {i + 1}")
if "Error: " in final_script:
@ -367,247 +262,138 @@ Generate a script for a video, depending on the subject of the video.
return final_script.strip()
# def generate_terms(video_subject: str, video_script: str) -> List[str]:
# prompt = f"""
# # Role: AI Video Director and Editor
# ## Core Goal:
# Analyze the provided complete video script and intelligently segment it into a sequence of logical scenes suitable for a short-form video. For each segmented scene, you must generate a highly descriptive English search query ideal for finding the most relevant stock footage on platforms like Pexels.
# ## Output Format and Constraints:
# 1. **You MUST return a pure, single JSON Array.** Do not include any explanatory text, markdown markers (` ```json ... ``` `), or any other content outside of the JSON array. Your entire response body must be a valid JSON array that can be parsed directly.
# 2. each search term should consist of 1-3 words, always add the main subject of the video.
# 3. Constraints for the `pexels_search_query` field value:
# - It must be a concise, highly descriptive **English phrase**.
# - It is intended to be used directly as the `query` parameter for the Pexels API.
# - It should describe a concrete **visual scene**, not an abstract concept or emotion.
# - **Excellent Examples**: "Man walking alone on foggy road", "Futuristic city skyline at night", "Close up of old book pages turning".
# - **Poor Examples**: "sadness", "a trip", "the meaning of life".
# 4. Scene segmentation should be based on logical shifts in the narrative, changes in time, or natural transition points for visuals.
# 5. reply with english search terms only.
# 6.**The number of search terms should directly correspond to the number of distinct scenes you identify in the script. A longer script should naturally result in more search terms.**
# ## Output Example:
# ["search term 1", "search term 2", "search term 3","search term 4","search term 5", "..."]
# ## Context:
# ### Video Subject
# {video_subject}
def generate_terms(video_subject: str, video_script: str) -> List[str]:
"""
Generate video terms from video subject and script.
"""
prompt_template = """
# Role: Video Search Terms Generator
## Task:
Generate a concise, comma-separated list of 1-5 English search terms based on the provided `Video Subject` and `Video Script`. These terms will be used to find relevant video clips.
## Instructions:
1. **Analyze Context:** Read the `Video Subject` and `Video Script` to understand the main topics and visual elements.
2. **Brainstorm Keywords:** Think of concrete, visually-driven keywords. Avoid abstract concepts.
3. **Select & Refine:** Choose the most powerful and representative terms.
4. **Format Output:** Provide a single line of comma-separated English keywords. Do not include any other text, explanations, or formatting.
## Example:
**Video Subject:** "The Impact of Sugar on Your Brain"
**Video Script:** "Sugar, a sweet temptation, can have a profound effect on our brain chemistry..."
**Output:**
`sugar cubes, brain scan, dopamine release, person eating candy, neural pathways`
## Your Turn:
### Video Subject:
{video_subject}
### Video Script:
{video_script}
### Output:
"""
prompt = prompt_template.format(
video_subject=video_subject, video_script=video_script
)
logger.info(f"subject: {video_subject}")
try:
response = _generate_response(prompt)
# remove blank lines
generated_text = "\n".join(
[line for line in response.split("\n") if line.strip()]
)
if not generated_text:
logger.warning("LLM returned empty terms list.")
return []
terms = [term.strip().strip("`'\"") for term in generated_text.split(",")]
logger.info(f"Generated terms: {terms}")
return terms
except Exception as e:
logger.error(f"Failed to generate video terms: {e}")
return []
# def generate_storyboard(video_subject: str, video_script: str) -> List[Dict]:
# """
# Analyzes the entire script, breaks it down into scenes, and generates matching search terms for each scene.
# Returns a list of scenes, where each scene is a dictionary containing 'scene_script' and 'search_terms'.
# """
# prompt = f"""
# # Role: Video Script Analyst
# ## GOAL:
# Your task is to transform a video script into a storyboard. You will read the provided script, segment it into scenes, and for each scene, generate a set of descriptive, visual search terms that will be used to find stock video footage. The final output must be a valid JSON array of objects.
# ## STEP-BY-STEP INSTRUCTIONS:
# 1. **Segment the Script:** Read the `Video Script` and break it down into short, logical, spoken segments. A segment should typically be one or two sentences long.
# ## EXAMPLE (Note the Realism and Concreteness):
# [
# {{
# "scene_script": "Blueberries. They're often called nature's perfect food for your eyes.",
# "search_terms": ["woman eating fresh blueberries from a bowl", "close up of fresh blueberries", "bowl of blueberries on a table"]
# }},
# {{
# "scene_script": "And for good reason. Packed with anthocyanins, vitamin C, and ludian...",
# "search_terms": ["nutritionist explaining health benefits", "close up of vitamin C tablets", "diagram of anthocyanin molecule"]
# }},
# {{
# "scene_script": "...these tiny berries act like microscopic shields, protecting your retina and macula from oxidative stress and age related damage.",
# "search_terms": ["medical animation of the human eye", "diagram of the retina and macula", "older person with healthy eyes smiling"]
# }}
# ]
# ## CONTEXT:
# ### Video Subject:
# {video_subject}
# ### Video Script:
# {video_script}
def generate_storyboard(video_subject: str, video_script: str) -> List[Dict]:
"""
Analyzes the script, breaks it into scenes, and extracts the main subject nouns as search terms for each scene.
Returns a list of scenes, where each scene is a dictionary containing 'scene_script' and 'search_terms'.
根据视频主题和脚本生成一个包含多个场景对象的故事板列表
每个对象包含 'script' 'search_terms'
(此函数已更新为新版逻辑并重命名)
"""
# [核心修改] 通过更明确、更强力的指令,强制要求 LLM 将视频脚本的每一句话都处理成一个独立的场景,并为每个场景生成对应的英文关键词。
prompt = f"""
You are a video production assistant. Your task is to process a script for a video, breaking it down sentence by sentence to generate visual search terms.
prompt_template = """
# 角色 (Role)
你是一位顶级的视觉内容策略师Expert Visual Content Strategist你的核心专长是将书面脚本转化为具体引人入胜且在主流视频素材库 Pexels, Pixabay中高度可搜索的视觉关键词序列你的输出必须兼具艺术相关性和技术实用性
**CRITICAL INSTRUCTIONS - FOLLOW THESE RULES EXACTLY:**
# 核心任务 (Core Task)
你的任务是接收一段视频脚本严格按照下述的核心思维链与执行规则输出一个格式化无额外解释的 JSON 数组数组中的每个对象代表脚本中的一个句子并包含该句子对应的经过策略优化的视觉搜索关键词字符串
1. **ONE SENTENCE = ONE VISUAL SEGMENT:** Each sentence from the script is a distinct visual segment. Do not merge sentences.
2. **CONCRETE & VISUAL KEYWORDS ONLY:** The `search_terms` MUST be concrete, visual, and tangible things. They must be nouns or descriptive actions that can be found in a video library.
- **GOOD:** `blueberries`, `person walking`, `city skyline`, `laughing friends`, `human eye`.
- **BAD / FORBIDDEN:** `reason`, `concept`, `idea`, `method`, `health`, `protection`, `damage`. Never use abstract, non-visual words.
3. **MANDATORY KEYWORD DIVERSITY:** You are FORBIDDEN from using the same primary keyword for two consecutive segments. If segment 1 uses `blueberries`, segment 2 MUST use a different but relevant keyword (e.g., `antioxidants` could be visualized as `colorful fruits`, `retina` as `close-up of eye`). DIVERSIFY a lot.
# 关键输出格式 (Critical Output Format)
你必须且只能输出一个严格的不包含任何前后说明文字的 JSON 数组每个JSON对象必须只包含两个键
1. `"script"`: 原始脚本的句子
2. `"search_terms"`: 一个由逗号分隔的全英文小写的关键词字符串
**REQUIRED OUTPUT FORMAT:**
- You must output a valid JSON array of objects.
- Each object represents one sentence and must ONLY contain two keys: `script` and `search_terms`.
# 核心思维链与执行规则 (Core Chain of Thought & Execution Rules)
你必须严格按照以下顺序思考和执行每一步不得跳过或颠倒
**EXAMPLE:**
### 步骤 1: 分段 (Segmentation)
将接收到的 `{video_script}` 分解成独立的句子每个句子构成一个独立的处理单元对应JSON数组中的一个对象
Video Script:
"Blueberries are packed with anthocyanins, which are great for your eyes. These antioxidants protect the retina from damage."
### 步骤 2: 初步提取 (Initial Extraction)
对于每个句子首先识别并提取所有字面上Verbatim的名词和可以被视觉化的核心概念
Your JSON Output:
```json
[
{{
"script": "Blueberries are packed with anthocyanins, which are great for your eyes.",
"search_terms": "blueberries, fresh fruit, antioxidant food"
}},
{{
"script": "These antioxidants protect the retina from damage.",
"search_terms": "close-up of eye, retina scan, vision test"
}}
]
```
### 步骤 3: 视觉化与过滤 (Visualization & Filtering) - [不可妥协的规则]
审视步骤 2 中提取的每个词必须无条件过滤掉所有抽象无形无法直接用镜头表达的概念
- **绝对禁止的词汇范畴**: 概念`concept`情感`happiness`性质`quality`关系`relationship`不可见的动作或状态`protection`, `damage`, `stress`, `health`, `age`
- **此步骤的目标**: 清理掉所有在视频素材库中没有意义的噪音词汇
**Video Script to Process:**
### 步骤 3b: 场景延续性规则 (Continuity Rule for Abstract Sentences) - [新增规则]
如果在步骤3之后一个句子的关键词列表变为空例如句子是 "And for good reason."**你绝不能返回空字符串**你必须
1. **参考前一个场景**的视觉主题和关键词
2. 生成一个与前一场景**相关但不同**的关键词以实现视觉上的平滑过渡或情感深化
3. **示例**: 如果前一场景的关键词是 `person's eyes, eye close-up`,那么对于 "And for good reason." 这个句子,一个好的延续性关键词可以是 `thoughtful expression` (深思的表情) 或 `person looking at camera` (人物看镜头)。
### 步骤 4: 可搜索性增强 (Searchability Enhancement) - [智能扩展规则]
审视步骤 3 3b 后剩下的关键词这是发挥你策略师价值的关键
- **识别技术/专业词汇**: 如果关键词过于学术化专业化或罕见例如`anthocyanins`, `macula`以至于在标准素材库中不可能找到匹配你必须为其补充一个或多个更通用更形象的搜索词
- **补充原则**: 补充的词汇必须与原词在视觉上高度相关
- 示例 1: 对于 `retina` (视网膜)补充 `eye close-up`最终结果包含 `"retina, eye close-up"`
- 示例 2: 对于 `anthocyanins` (花青素)补充 `colorful fruits` `antioxidant food`最终结果包含 `"anthocyanins, colorful fruits"`
- **此步骤的目标**: 确保最终的关键词列表不仅在内容上正确在实践中也真正可用
### 步骤 5: 多样性原则 (Diversity Principle)
在构建最终的关键词字符串时确保**连续两个有内容的视觉片段**不会使用完全相同的主要搜索词例如如果前一个片段的主要关键词是 `blueberries`下一个片段应优先使用其它关键词 `food`, `eyes`来开始以增强视觉多样性
### 步骤 6: 最终构建 (Final Construction)
将经过以上所有步骤处理和优化后的关键词整理成一个全英文小写用英文逗号分隔的字符串作为 `search_terms` 的最终值
# 待处理脚本 (Script to Process):
```
{video_script}
```
**Your JSON Output (must be a valid JSON array):**
"""
# return []
prompt = prompt_template.format(video_script=video_script, video_subject=video_subject)
logger.info(f"Generating storyboard for subject: {video_subject}")
response_str = _generate_response(prompt)
logger.info(f"正在为主题 '{video_subject}' 生成故事板 (Storyboard)...")
response_str = _generate_response(prompt=prompt)
logger.debug(f"从LLM收到的原始回复: {response_str}")
try:
# The model should return a valid JSON array string.
# Find the start and end of the JSON array.
json_start = response_str.find('[')
json_end = response_str.rfind(']')
if json_start != -1 and json_end != -1 and json_start < json_end:
json_str = response_str[json_start:json_end+1]
json_match = re.search(r'\[.*\]', response_str, re.DOTALL)
if json_match:
json_str = json_match.group(0)
storyboard = json.loads(json_str)
logger.success("Successfully parsed storyboard from LLM response.")
logger.success(f"成功从LLM回复中解析出 {len(storyboard)} 个场景。")
return storyboard
else:
logger.error(f"Could not find a valid JSON array in the response. Raw response: {response_str}")
logger.error(f"在回复中未找到有效的JSON数组。原始回复: {response_str}")
return []
except json.JSONDecodeError:
logger.error(f"Failed to parse JSON. Raw response: {response_str}")
# Fallback logic can be added here if needed, e.g., using regex to extract JSON.
except json.JSONDecodeError as e:
logger.error(f"解析JSON失败: {e}。原始回复: {response_str}")
return []
# ... (您的其他函数和代码保持不变)
if __name__ == "__main__":
video_subject = "生命的意义是什么"
script = generate_script(
video_subject=video_subject, language="zh-CN", paragraph_number=1
)
print("######################")
print(script)
search_terms = generate_terms(
video_subject=video_subject, video_script=script
)
print("######################")
print(search_terms)
print("-----输出包含的场景数量-----")
print(len(search_terms))
def generate_video_category(video_subject: str) -> str:
"""
Selects the most appropriate video category from a predefined list based on the video subject.
根据视频主题选择最合适的视频分类
(此函数已从您上传的文件中完整保留)
"""
prompt = f"""
# Role: Video Category Selector
## Goal:
Based on the provided 'Video Subject', select the ONE most suitable category from the `Category List` that best represents the subject. Your response must be only the single category name.
## Category List:
backgrounds, fashion, nature, science, education, feelings, health, people, religion, places, animals, industry, computer, food, sports, transportation, travel, buildings, business, music
## Instructions:
- Analyze the 'Video Subject'.
- Choose the single best-fitting category from the list.
- Respond with ONLY the category name and nothing else.
## Example:
Video Subject: "The benefits of a ketogenic diet"
Response: health
Video Subject: "A tour of the Grand Canyon"
Response: travel
## CONTEXT:
### Video Subject:
{video_subject}
"""
category = _generate_response(prompt).strip().lower()
# Fallback to a default category if the response is invalid
valid_categories = ["backgrounds", "fashion", "nature", "science", "education", "feelings", "health", "people", "religion", "places", "animals", "industry", "computer", "food", "sports", "transportation", "travel", "buildings", "business", "music"]
if category not in valid_categories:
logger.warning(f"Generated category '{category}' is not valid. Falling back to 'nature'.")
return "nature"
logger.success(f"Successfully selected video category: {category}")
return category
return category
if __name__ == "__main__":
# 这个部分用于直接运行此文件进行测试
video_subject = "生命的意义是什么"
script = generate_script(
video_subject=video_subject, language="zh-CN", paragraph_number=1
)
print("######################")
print(script)
# 注意:这里调用的是重命名后的函数 generate_storyboard
storyboard = generate_storyboard(
video_subject=video_subject, video_script=script
)
print("######################")
import pprint
pprint.pprint(storyboard)
print("-----输出包含的场景数量-----")
if storyboard:
print(len(storyboard))
else:
print(0)

View File

@ -49,7 +49,7 @@ def search_videos_pexels(
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
}
# Build URL
params = {"query": search_term, "page": 1, "per_page": 5, "orientation": "landscape", "size": "medium","locale":"en-US"}
params = {"query": search_term, "page": 1, "per_page": 5, "orientation": "landscape", "size": "large","locale":"en-US"}
query_url = f"https://api.pexels.com/videos/search?{urlencode(params)}"
logger.info(f"searching videos: {query_url}, with proxies: {config.proxy}")
@ -202,7 +202,7 @@ def search_videos_pixabay(
"editors_choice": "true",
"order": "popular",
"page": 1,
"per_page": 80,
"per_page": 10,
}
if category:
params["category"] = category

View File

@ -86,10 +86,19 @@ def start_storyboard_task(task_id, params: VideoParams):
if not sub_maker:
raise Exception(f"Failed to generate audio for segment {i + 1}")
# Trim silence from the generated audio
trimmed_audio_file = path.join(workdir, f"segment_{i + 1}_trimmed.mp3")
if voice.trim_audio_silence(segment_audio_file, trimmed_audio_file):
logger.info(f"Silence trimmed for segment {i+1}, using trimmed audio.")
audio_to_process = trimmed_audio_file
else:
logger.warning(f"Failed to trim silence for segment {i+1}, using original audio.")
audio_to_process = segment_audio_file
voice.create_subtitle(
sub_maker=sub_maker, text=segment_script, subtitle_file=segment_srt_file
)
audio_duration = voice.get_audio_duration(sub_maker)
audio_duration = video.get_video_duration(audio_to_process)
total_duration += audio_duration
# b. Calculate the number of clips needed and download them
@ -131,7 +140,7 @@ def start_storyboard_task(task_id, params: VideoParams):
return
segment_video_paths.append(segment_video_path)
segment_audio_paths.append(segment_audio_file)
segment_audio_paths.append(audio_to_process)
segment_srt_paths.append(segment_srt_file)
except Exception as e:
@ -189,18 +198,20 @@ def start_storyboard_task(task_id, params: VideoParams):
# c. Add subtitles
final_video_path = path.join(workdir, f"final_{task_id}.mp4")
video.add_subtitles_to_video(
video_path=video_with_bgm_path,
srt_path=combined_srt_path,
font_name=params.font_name,
font_size=params.font_size,
text_fore_color=params.text_fore_color,
stroke_color=params.stroke_color,
stroke_width=params.stroke_width,
subtitle_position=params.subtitle_position,
custom_position=params.custom_position,
output_path=final_video_path
)
# video.add_subtitles_to_video(
# video_path=video_with_bgm_path,
# srt_path=combined_srt_path,
# font_name=params.font_name,
# font_size=params.font_size,
# text_fore_color=params.text_fore_color,
# stroke_color=params.stroke_color,
# stroke_width=params.stroke_width,
# subtitle_position=params.subtitle_position,
# custom_position=params.custom_position,
# output_path=final_video_path
# )
import shutil
shutil.copy(video_with_bgm_path, final_video_path)
# 5. Cleanup
logger.info("--- Step 5: Cleaning up temporary files ---")

View File

@ -108,7 +108,7 @@ def create_video_clip_from_segments(segments: list, video_aspect: VideoAspect, o
scale_filter = f"scale={w}:{h}:force_original_aspect_ratio=increase"
crop_filter = f"crop={w}:{h}"
sar_filter = "setsar=1"
fps_filter = "fps=30"
fps_filter = "fps=60"
filter_complex_parts = []
concat_inputs = ""
@ -129,13 +129,13 @@ def create_video_clip_from_segments(segments: list, video_aspect: VideoAspect, o
input_specifier = f"[{input_idx}:v]"
# Each segment is trimmed from the start of the source video.
trim_filter = f"{input_specifier}trim=start=0:duration={duration},setpts=PTS-STARTPTS"
trim_filter = f"{input_specifier}trim=start=1:duration={duration},setpts=PTS-STARTPTS"
processed_clip_name = f"[v{i}]"
filter_complex_parts.append(f"{trim_filter},{sar_filter},{scale_filter},{crop_filter},{fps_filter}{processed_clip_name}")
filter_complex_parts.append(f"{trim_filter},{scale_filter},{crop_filter},{fps_filter}{processed_clip_name}")
concat_inputs += processed_clip_name
concat_filter = f"{concat_inputs}concat=n={len(segments)}:v=1:a=0[outv]"
concat_filter = f"{concat_inputs}concat=n={len(segments)}:v=1:a=0,setsar=1[outv]"
filter_complex_parts.append(concat_filter)
command = [
@ -149,8 +149,9 @@ def create_video_clip_from_segments(segments: list, video_aspect: VideoAspect, o
";".join(filter_complex_parts),
"-map", "[outv]",
"-c:v", "libx264",
"-crf", "18",
"-an",
"-r", "30",
"-r", "60",
"-t", str(total_duration),
output_path
])
@ -384,6 +385,7 @@ def add_subtitles_to_video(video_path: str, srt_path: str, font_name: str, font_
"-c:a", "aac",
"-b:a", "192k",
"-shortest",
"-vsync", "cfr",
output_path
]

View File

@ -44,7 +44,7 @@ def tts(
else:
logger.error(f"Invalid siliconflow voice name format: {voice_name}")
return None
return azure_tts_v1(text, voice_name, voice_rate, voice_file)
return azure_tts_v1(text, voice_name, voice_rate, voice_file, voice_volume)
def convert_rate_to_percent(rate: float) -> str:
@ -57,8 +57,18 @@ def convert_rate_to_percent(rate: float) -> str:
return f"{percent}%"
def convert_volume_to_percent(volume: float) -> str:
if volume == 1.0:
return "+0%"
percent = round((volume - 1.0) * 100)
if percent > 0:
return f"+{percent}%"
else:
return f"{percent}%"
def azure_tts_v1(
text: str, voice_name: str, voice_rate: float, voice_file: str
text: str, voice_name: str, voice_rate: float, voice_file: str, voice_volume: float = 1.0
) -> Union[SubMaker, None]:
voice_name = parse_voice_name(voice_name)
text = text.strip()
@ -68,7 +78,7 @@ def azure_tts_v1(
logger.info(f"start, voice name: {voice_name}, try: {i + 1}")
async def _do() -> SubMaker:
communicate = edge_tts.Communicate(text, voice_name, rate=rate_str)
communicate = edge_tts.Communicate(text, voice_name, rate=rate_str, volume=convert_volume_to_percent(voice_volume))
sub_maker = edge_tts.SubMaker()
with open(voice_file, "wb") as file:
async for chunk in communicate.stream():
@ -198,13 +208,12 @@ def siliconflow_tts(
# 计算当前句子的时长
sentence_chars = len(sentence)
sentence_duration = int(sentence_chars * char_duration)
# 添加到SubMaker
sub_maker.subs.append(sentence)
sub_maker.offset.append(
(current_offset, current_offset + sentence_duration)
)
sub_maker.offset.append((current_offset, current_offset + sentence_duration))
# 更新偏移量
current_offset += sentence_duration
@ -455,6 +464,37 @@ def get_audio_duration(sub_maker: submaker.SubMaker):
return sub_maker.offset[-1][1] / 10000000
def trim_audio_silence(input_path: str, output_path: str) -> bool:
"""
Trims silence from the beginning and end of an audio file using ffmpeg.
"""
if not os.path.exists(input_path):
logger.error(f"Input file not found: {input_path}")
return False
command = [
"ffmpeg",
"-i", input_path,
"-af", "silenceremove=stop_periods=-1:stop_duration=0.1:stop_threshold=-40dB,areverse,silenceremove=stop_periods=-1:stop_duration=0.1:stop_threshold=-40dB,areverse",
"-y",
output_path,
]
try:
process = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0,
)
logger.debug(f"Successfully trimmed silence from {input_path} to {output_path}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to trim silence from {input_path}. Error: {e.stderr}")
return False
def combine_audio_files(audio_paths: List[str], output_path: str) -> bool:
"""
Combines multiple audio files into a single audio file using ffmpeg.