|
from .utils import remove_files, run_command |
|
from .text_multiformat_processor import get_subtitle |
|
from .logging_setup import logger |
|
import unicodedata |
|
import shutil |
|
import copy |
|
import os |
|
import re |
|
|
|
OUTPUT_TYPE_OPTIONS = [ |
|
"video (mp4)", |
|
"video (mkv)", |
|
"audio (mp3)", |
|
"audio (ogg)", |
|
"audio (wav)", |
|
"subtitle", |
|
"subtitle [by speaker]", |
|
"video [subtitled] (mp4)", |
|
"video [subtitled] (mkv)", |
|
"audio [original vocal sound]", |
|
"audio [original background sound]", |
|
"audio [original vocal and background sound]", |
|
"audio [original vocal-dereverb sound]", |
|
"audio [original vocal-dereverb and background sound]", |
|
"raw media", |
|
] |
|
|
|
DOCS_OUTPUT_TYPE_OPTIONS = [ |
|
"videobook (mp4)", |
|
"videobook (mkv)", |
|
"audiobook (wav)", |
|
"audiobook (mp3)", |
|
"audiobook (ogg)", |
|
"book (txt)", |
|
] |
|
|
|
|
|
def get_no_ext_filename(file_path): |
|
file_name_with_extension = os.path.basename(rf"{file_path}") |
|
filename_without_extension, _ = os.path.splitext(file_name_with_extension) |
|
return filename_without_extension |
|
|
|
|
|
def get_video_info(link): |
|
aux_name = f"video_url_{link}" |
|
params_dlp = {"quiet": True, "no_warnings": True, "noplaylist": True} |
|
try: |
|
from yt_dlp import YoutubeDL |
|
|
|
with YoutubeDL(params_dlp) as ydl: |
|
if link.startswith(("www.youtube.com/", "m.youtube.com/")): |
|
link = "https://" + link |
|
info_dict = ydl.extract_info(link, download=False, process=False) |
|
video_id = info_dict.get("id", aux_name) |
|
video_title = info_dict.get("title", video_id) |
|
if "youtube.com" in link and "&list=" in link: |
|
video_title = ydl.extract_info( |
|
"https://m.youtube.com/watch?v="+video_id, |
|
download=False, |
|
process=False |
|
).get("title", video_title) |
|
except Exception as error: |
|
logger.error(str(error)) |
|
video_title, video_id = aux_name, "NO_ID" |
|
return video_title, video_id |
|
|
|
|
|
def sanitize_file_name(file_name): |
|
|
|
|
|
normalized_name = unicodedata.normalize("NFKD", file_name) |
|
|
|
sanitized_name = re.sub(r"[^\w\s.-]", "_", normalized_name) |
|
return sanitized_name |
|
|
|
|
|
def get_output_file( |
|
original_file, |
|
new_file_name, |
|
soft_subtitles, |
|
output_directory="", |
|
): |
|
directory_base = "." |
|
|
|
if output_directory and os.path.isdir(output_directory): |
|
new_file_path = os.path.join(output_directory, new_file_name) |
|
else: |
|
new_file_path = os.path.join(directory_base, "outputs", new_file_name) |
|
remove_files(new_file_path) |
|
|
|
cm = None |
|
if soft_subtitles and original_file.endswith(".mp4"): |
|
if new_file_path.endswith(".mp4"): |
|
cm = f'ffmpeg -y -i "{original_file}" -i sub_tra.srt -i sub_ori.srt -map 0:v -map 0:a -map 1 -map 2 -c:v copy -c:a copy -c:s mov_text "{new_file_path}"' |
|
else: |
|
cm = f'ffmpeg -y -i "{original_file}" -i sub_tra.srt -i sub_ori.srt -map 0:v -map 0:a -map 1 -map 2 -c:v copy -c:a copy -c:s srt -movflags use_metadata_tags -map_metadata 0 "{new_file_path}"' |
|
elif new_file_path.endswith(".mkv"): |
|
cm = f'ffmpeg -i "{original_file}" -c:v copy -c:a copy "{new_file_path}"' |
|
elif new_file_path.endswith(".wav") and not original_file.endswith(".wav"): |
|
cm = f'ffmpeg -y -i "{original_file}" -acodec pcm_s16le -ar 44100 -ac 2 "{new_file_path}"' |
|
elif new_file_path.endswith(".ogg"): |
|
cm = f'ffmpeg -i "{original_file}" -c:a libvorbis "{new_file_path}"' |
|
elif new_file_path.endswith(".mp3") and not original_file.endswith(".mp3"): |
|
cm = f'ffmpeg -y -i "{original_file}" -codec:a libmp3lame -qscale:a 2 "{new_file_path}"' |
|
|
|
if cm: |
|
try: |
|
run_command(cm) |
|
except Exception as error: |
|
logger.error(str(error)) |
|
remove_files(new_file_path) |
|
shutil.copy2(original_file, new_file_path) |
|
else: |
|
shutil.copy2(original_file, new_file_path) |
|
|
|
return os.path.abspath(new_file_path) |
|
|
|
|
|
def media_out( |
|
media_file, |
|
lang_code, |
|
media_out_name="", |
|
extension="mp4", |
|
file_obj="video_dub.mp4", |
|
soft_subtitles=False, |
|
subtitle_files="disable", |
|
): |
|
if media_out_name: |
|
base_name = media_out_name + "_origin" |
|
else: |
|
if os.path.exists(media_file): |
|
base_name = get_no_ext_filename(media_file) |
|
else: |
|
base_name, _ = get_video_info(media_file) |
|
|
|
media_out_name = f"{base_name}__{lang_code}" |
|
|
|
f_name = f"{sanitize_file_name(media_out_name)}.{extension}" |
|
|
|
if subtitle_files != "disable": |
|
final_media = [get_output_file(file_obj, f_name, soft_subtitles)] |
|
name_tra = f"{sanitize_file_name(media_out_name)}.{subtitle_files}" |
|
name_ori = f"{sanitize_file_name(base_name)}.{subtitle_files}" |
|
tgt_subs = f"sub_tra.{subtitle_files}" |
|
ori_subs = f"sub_ori.{subtitle_files}" |
|
final_subtitles = [ |
|
get_output_file(tgt_subs, name_tra, False), |
|
get_output_file(ori_subs, name_ori, False) |
|
] |
|
return final_media + final_subtitles |
|
else: |
|
return get_output_file(file_obj, f_name, soft_subtitles) |
|
|
|
|
|
def get_subtitle_speaker(media_file, result, language, extension, base_name): |
|
|
|
segments_base = copy.deepcopy(result) |
|
|
|
|
|
segments_by_speaker = {} |
|
for segment in segments_base["segments"]: |
|
if segment["speaker"] not in segments_by_speaker.keys(): |
|
segments_by_speaker[segment["speaker"]] = [segment] |
|
else: |
|
segments_by_speaker[segment["speaker"]].append(segment) |
|
|
|
if not base_name: |
|
if os.path.exists(media_file): |
|
base_name = get_no_ext_filename(media_file) |
|
else: |
|
base_name, _ = get_video_info(media_file) |
|
|
|
files_subs = [] |
|
for name_sk, segments in segments_by_speaker.items(): |
|
|
|
subtitle_speaker = get_subtitle( |
|
language, |
|
{"segments": segments}, |
|
extension, |
|
filename=name_sk, |
|
) |
|
|
|
media_out_name = f"{base_name}_{language}_{name_sk}" |
|
|
|
output = media_out( |
|
media_file, |
|
language, |
|
media_out_name, |
|
extension, |
|
file_obj=subtitle_speaker, |
|
) |
|
|
|
files_subs.append(output) |
|
|
|
return files_subs |
|
|
|
|
|
def sound_separate(media_file, task_uvr): |
|
from .mdx_net import process_uvr_task |
|
|
|
outputs = [] |
|
|
|
if "vocal" in task_uvr: |
|
try: |
|
_, _, _, _, vocal_audio = process_uvr_task( |
|
orig_song_path=media_file, |
|
main_vocals=False, |
|
dereverb=True if "dereverb" in task_uvr else False, |
|
remove_files_output_dir=True, |
|
) |
|
outputs.append(vocal_audio) |
|
except Exception as error: |
|
logger.error(str(error)) |
|
|
|
if "background" in task_uvr: |
|
try: |
|
background_audio, _ = process_uvr_task( |
|
orig_song_path=media_file, |
|
song_id="voiceless", |
|
only_voiceless=True, |
|
remove_files_output_dir=False if "vocal" in task_uvr else True, |
|
) |
|
|
|
outputs.append(background_audio) |
|
except Exception as error: |
|
logger.error(str(error)) |
|
|
|
if not outputs: |
|
raise Exception("Error in uvr process") |
|
|
|
return outputs |
|
|