Merge pull request #368 from harry0703/dev

enhanced exception handling for llm and optimized video concatenation
This commit is contained in:
Harry 2024-05-16 16:35:56 +08:00 committed by GitHub
commit dc460c25d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 57 additions and 42 deletions

View File

@ -9,6 +9,8 @@ from openai.types.chat import ChatCompletion
from app.config import config
_max_retries = 5
def _generate_response(prompt: str) -> str:
content = ""
@ -219,11 +221,8 @@ Generate a script for a video, depending on the subject of the video.
final_script = ""
logger.info(f"subject: {video_subject}")
# logger.debug(f"prompt: \n{prompt}")
response = _generate_response(prompt=prompt)
# Return the generated script
if response:
def format_response(response):
# Clean the script
# Remove asterisks, hashes
response = response.replace("*", "")
@ -240,19 +239,30 @@ Generate a script for a video, depending on the subject of the video.
selected_paragraphs = paragraphs[:paragraph_number]
# Join the selected paragraphs into a single string
final_script = "\n\n".join(selected_paragraphs)
return "\n\n".join(selected_paragraphs)
# Print to console the number of paragraphs used
# logger.info(f"number of paragraphs used: {len(selected_paragraphs)}")
else:
logging.error("gpt returned an empty response")
for i in range(_max_retries):
try:
response = _generate_response(prompt=prompt)
if response:
final_script = format_response(response)
else:
logging.error("gpt returned an empty response")
# g4f may return an error message
if final_script and "当日额度已消耗完" in final_script:
raise ValueError(final_script)
# g4f may return an error message
if final_script and "当日额度已消耗完" in final_script:
raise ValueError(final_script)
if final_script:
break
except Exception as e:
logger.error(f"failed to generate script: {e}")
if i < _max_retries:
logger.warning(f"failed to generate video script, trying again... {i + 1}")
logger.success(f"completed: \n{final_script}")
return final_script
return final_script.strip()
def generate_terms(video_subject: str, video_script: str, amount: int = 5) -> List[str]:
@ -283,25 +293,28 @@ Please note that you must use English for generating video search terms; Chinese
""".strip()
logger.info(f"subject: {video_subject}")
# logger.debug(f"prompt: \n{prompt}")
response = _generate_response(prompt)
search_terms = []
for i in range(_max_retries):
try:
response = _generate_response(prompt)
search_terms = json.loads(response)
if not isinstance(search_terms, list) or not all(isinstance(term, str) for term in search_terms):
logger.error("response is not a list of strings.")
continue
try:
search_terms = json.loads(response)
if not isinstance(search_terms, list) or not all(isinstance(term, str) for term in search_terms):
raise ValueError("response is not a list of strings.")
except Exception as e:
match = re.search(r'\[.*]', response)
if match:
try:
search_terms = json.loads(match.group())
except json.JSONDecodeError:
pass
except (json.JSONDecodeError, ValueError):
# logger.warning(f"gpt returned an unformatted response. attempting to clean...")
# Attempt to extract list-like string and convert to list
match = re.search(r'\["(?:[^"\\]|\\.)*"(?:,\s*"[^"\\]*")*\]', response)
if match:
try:
search_terms = json.loads(match.group())
except json.JSONDecodeError:
logger.error(f"could not parse response: {response}")
return []
if search_terms and len(search_terms) > 0:
break
if i < _max_retries:
logger.warning(f"failed to generate video terms, trying again... {i + 1}")
logger.success(f"completed: \n{search_terms}")
return search_terms
@ -310,8 +323,8 @@ Please note that you must use English for generating video search terms; Chinese
if __name__ == "__main__":
video_subject = "生命的意义是什么"
script = generate_script(video_subject=video_subject, language="zh-CN", paragraph_number=1)
# print("######################")
# print(script)
# search_terms = generate_terms(video_subject=video_subject, video_script=script, amount=5)
# print("######################")
# print(search_terms)
print("######################")
print(script)
search_terms = generate_terms(video_subject=video_subject, video_script=script, amount=5)
print("######################")
print(search_terms)

View File

@ -49,26 +49,28 @@ def combine_videos(combined_video_path: str,
clips = []
video_duration = 0
raw_clips = []
for video_path in video_paths:
clip = VideoFileClip(video_path).without_audio()
clip_duration = clip.duration
start_time = 0
while start_time < clip_duration:
end_time = min(start_time + max_clip_duration, clip_duration)
split_clip = clip.subclip(start_time, end_time)
raw_clips.append(split_clip)
logger.info(f"splitting from {start_time:.2f} to {end_time:.2f}, clip duration {clip_duration:.2f}, split_clip duration {split_clip.duration:.2f}")
# logger.info(f"splitting from {start_time:.2f} to {end_time:.2f}, clip duration {clip_duration:.2f}, split_clip duration {split_clip.duration:.2f}")
start_time = end_time
if video_concat_mode.value == VideoConcatMode.sequential.value:
break
# random video_paths order
if video_concat_mode.value == VideoConcatMode.random.value:
random.shuffle(raw_clips)
# Add downloaded clips over and over until the duration of the audio (max_duration) has been reached
while video_duration < audio_duration:
# random video_paths order
if video_concat_mode.value == VideoConcatMode.random.value:
random.shuffle(raw_clips)
for clip in raw_clips:
# Check if clip is longer than the remaining audio
if (audio_duration - video_duration) < clip.duration: