mirror of
https://github.com/harry0703/MoneyPrinterTurbo.git
synced 2026-02-21 16:37:21 +08:00
230 lines
5.8 KiB
Python
230 lines
5.8 KiB
Python
import json
|
|
import locale
|
|
import os
|
|
import threading
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
|
|
import urllib3
|
|
from loguru import logger
|
|
|
|
from app.models import const
|
|
|
|
urllib3.disable_warnings()
|
|
|
|
|
|
def get_response(status: int, data: Any = None, message: str = ""):
|
|
obj = {
|
|
"status": status,
|
|
}
|
|
if data:
|
|
obj["data"] = data
|
|
if message:
|
|
obj["message"] = message
|
|
return obj
|
|
|
|
|
|
def to_json(obj):
|
|
try:
|
|
# Define a helper function to handle different types of objects
|
|
def serialize(o):
|
|
# If the object is a serializable type, return it directly
|
|
if isinstance(o, (int, float, bool, str)) or o is None:
|
|
return o
|
|
# If the object is binary data, convert it to a base64-encoded string
|
|
elif isinstance(o, bytes):
|
|
return "*** binary data ***"
|
|
# If the object is a dictionary, recursively process each key-value pair
|
|
elif isinstance(o, dict):
|
|
return {k: serialize(v) for k, v in o.items()}
|
|
# If the object is a list or tuple, recursively process each element
|
|
elif isinstance(o, (list, tuple)):
|
|
return [serialize(item) for item in o]
|
|
# If the object is a custom type, attempt to return its __dict__ attribute
|
|
elif hasattr(o, "__dict__"):
|
|
return serialize(o.__dict__)
|
|
# Return None for other cases (or choose to raise an exception)
|
|
else:
|
|
return None
|
|
|
|
# Use the serialize function to process the input object
|
|
serialized_obj = serialize(obj)
|
|
|
|
# Serialize the processed object into a JSON string
|
|
return json.dumps(serialized_obj, ensure_ascii=False, indent=4)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_uuid(remove_hyphen: bool = False):
|
|
u = str(uuid4())
|
|
if remove_hyphen:
|
|
u = u.replace("-", "")
|
|
return u
|
|
|
|
|
|
def root_dir():
|
|
return os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
|
|
|
|
|
|
def storage_dir(sub_dir: str = "", create: bool = False):
|
|
d = os.path.join(root_dir(), "storage")
|
|
if sub_dir:
|
|
d = os.path.join(d, sub_dir)
|
|
if create and not os.path.exists(d):
|
|
os.makedirs(d)
|
|
|
|
return d
|
|
|
|
|
|
def resource_dir(sub_dir: str = ""):
|
|
d = os.path.join(root_dir(), "resource")
|
|
if sub_dir:
|
|
d = os.path.join(d, sub_dir)
|
|
return d
|
|
|
|
|
|
def task_dir(sub_dir: str = ""):
|
|
d = os.path.join(storage_dir(), "tasks")
|
|
if sub_dir:
|
|
d = os.path.join(d, sub_dir)
|
|
if not os.path.exists(d):
|
|
os.makedirs(d)
|
|
return d
|
|
|
|
|
|
def font_dir(sub_dir: str = ""):
|
|
d = resource_dir("fonts")
|
|
if sub_dir:
|
|
d = os.path.join(d, sub_dir)
|
|
if not os.path.exists(d):
|
|
os.makedirs(d)
|
|
return d
|
|
|
|
|
|
def song_dir(sub_dir: str = ""):
|
|
d = resource_dir("songs")
|
|
if sub_dir:
|
|
d = os.path.join(d, sub_dir)
|
|
if not os.path.exists(d):
|
|
os.makedirs(d)
|
|
return d
|
|
|
|
|
|
def public_dir(sub_dir: str = ""):
|
|
d = resource_dir("public")
|
|
if sub_dir:
|
|
d = os.path.join(d, sub_dir)
|
|
if not os.path.exists(d):
|
|
os.makedirs(d)
|
|
return d
|
|
|
|
|
|
def run_in_background(func, *args, **kwargs):
|
|
def run():
|
|
try:
|
|
func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger.error(f"run_in_background error: {e}")
|
|
|
|
thread = threading.Thread(target=run)
|
|
thread.start()
|
|
return thread
|
|
|
|
|
|
def time_convert_seconds_to_hmsm(seconds) -> str:
|
|
hours = int(seconds // 3600)
|
|
seconds = seconds % 3600
|
|
minutes = int(seconds // 60)
|
|
milliseconds = int(seconds * 1000) % 1000
|
|
seconds = int(seconds % 60)
|
|
return "{:02d}:{:02d}:{:02d},{:03d}".format(hours, minutes, seconds, milliseconds)
|
|
|
|
|
|
def text_to_srt(idx: int, msg: str, start_time: float, end_time: float) -> str:
|
|
start_time = time_convert_seconds_to_hmsm(start_time)
|
|
end_time = time_convert_seconds_to_hmsm(end_time)
|
|
srt = """%d
|
|
%s --> %s
|
|
%s
|
|
""" % (
|
|
idx,
|
|
start_time,
|
|
end_time,
|
|
msg,
|
|
)
|
|
return srt
|
|
|
|
|
|
def str_contains_punctuation(word):
|
|
for p in const.PUNCTUATIONS:
|
|
if p in word:
|
|
return True
|
|
return False
|
|
|
|
|
|
def split_string_by_punctuations(s):
|
|
result = []
|
|
txt = ""
|
|
|
|
previous_char = ""
|
|
next_char = ""
|
|
for i in range(len(s)):
|
|
char = s[i]
|
|
if char == "\n":
|
|
result.append(txt.strip())
|
|
txt = ""
|
|
continue
|
|
|
|
if i > 0:
|
|
previous_char = s[i - 1]
|
|
if i < len(s) - 1:
|
|
next_char = s[i + 1]
|
|
|
|
if char == "." and previous_char.isdigit() and next_char.isdigit():
|
|
# # In the case of "withdraw 10,000, charged at 2.5% fee", the dot in "2.5" should not be treated as a line break marker
|
|
txt += char
|
|
continue
|
|
|
|
if char not in const.PUNCTUATIONS:
|
|
txt += char
|
|
else:
|
|
result.append(txt.strip())
|
|
txt = ""
|
|
result.append(txt.strip())
|
|
# filter empty string
|
|
result = list(filter(None, result))
|
|
return result
|
|
|
|
|
|
def md5(text):
|
|
import hashlib
|
|
|
|
return hashlib.md5(text.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def get_system_locale():
|
|
try:
|
|
loc = locale.getdefaultlocale()
|
|
# zh_CN, zh_TW return zh
|
|
# en_US, en_GB return en
|
|
language_code = loc[0].split("_")[0]
|
|
return language_code
|
|
except Exception:
|
|
return "en"
|
|
|
|
|
|
def load_locales(i18n_dir):
|
|
_locales = {}
|
|
for root, dirs, files in os.walk(i18n_dir):
|
|
for file in files:
|
|
if file.endswith(".json"):
|
|
lang = file.split(".")[0]
|
|
with open(os.path.join(root, file), "r", encoding="utf-8") as f:
|
|
_locales[lang] = json.loads(f.read())
|
|
return _locales
|
|
|
|
|
|
def parse_extension(filename):
|
|
return os.path.splitext(filename)[1].strip().lower().replace(".", "")
|