File size: 8,198 Bytes
5f685fd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import datetime
import os
import re
import shutil
from tqdm import tqdm
from shortGPT.audio.audio_duration import get_asset_duration
from shortGPT.audio.audio_utils import (audioToText, get_asset_duration,
run_background_audio_split,
speedUpAudio)
from shortGPT.audio.eleven_voice_module import VoiceModule
from shortGPT.config.languages import ACRONYM_LANGUAGE_MAPPING, Language
from shortGPT.editing_framework.editing_engine import (EditingEngine,
EditingStep)
from shortGPT.editing_utils.captions import (getCaptionsWithTime,
getSpeechBlocks)
from shortGPT.editing_utils.handle_videos import get_aspect_ratio
from shortGPT.engine.abstract_content_engine import CONTENT_DB, AbstractContentEngine
from shortGPT.gpt.gpt_translate import translateContent
class MultiLanguageTranslationEngine(AbstractContentEngine):
def __init__(self, voiceModule: VoiceModule, src_url: str = "", target_language: Language = Language.ENGLISH, use_captions=False, id=""):
super().__init__(id, "content_translation", target_language, voiceModule)
if not id:
self._db_should_translate = True
if src_url:
self._db_src_url = src_url
self._db_use_captions = use_captions
self._db_target_language = target_language.value
self.stepDict = {
1: self._transcribe_audio,
2: self._translate_content,
3: self._generate_translated_audio,
4: self._edit_and_render_video,
5: self._add_metadata
}
def _transcribe_audio(self):
cached_translation = CONTENT_DB.content_collection.find_one({
"content_type": 'content_translation',
'src_url': self._db_src_url,
'ready_to_upload': True
})
if not (cached_translation and 'speech_blocks' in cached_translation and 'original_language' in cached_translation):
video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False)
self.verifyParameters(content_path=video_audio)
self.logger(f"1/5 - Transcribing original audio to text...")
whispered = audioToText(video_audio, model_size='base')
self._db_speech_blocks = getSpeechBlocks(whispered, silence_time=0.8)
self._db_original_language = whispered['language']
if (ACRONYM_LANGUAGE_MAPPING.get(self._db_original_language) == Language(self._db_target_language)):
self._db_translated_timed_sentences = self._db_speech_blocks
self._db_should_translate = False
expected_chars = len("".join([text for _, text in self._db_speech_blocks]))
chars_remaining = self.voiceModule.get_remaining_characters()
if chars_remaining < expected_chars:
raise Exception(
f"Your VoiceModule's key doesn't have enough characters to totally translate this video | Remaining: {chars_remaining} | Number of characters to translate: {expected_chars}")
def _translate_content(self):
if (self._db_should_translate):
self.verifyParameters(_db_speech_blocks=self._db_speech_blocks)
translated_timed_sentences = []
for i, ((t1, t2), text) in tqdm(enumerate(self._db_speech_blocks), desc="Translating content"):
self.logger(f"2/5 - Translating text content - {i+1} / {len(self._db_speech_blocks)}")
translated_text = translateContent(text, self._db_target_language)
translated_timed_sentences.append([[t1, t2], translated_text])
self._db_translated_timed_sentences = translated_timed_sentences
def _generate_translated_audio(self):
self.verifyParameters(translated_timed_sentences=self._db_translated_timed_sentences)
translated_audio_blocks = []
for i, ((t1, t2), translated_text) in tqdm(enumerate(self._db_translated_timed_sentences), desc="Generating translated audio"):
self.logger(f"3/5 - Generating translated audio - {i+1} / {len(self._db_translated_timed_sentences)}")
translated_voice = self.voiceModule.generate_voice(translated_text, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}.wav")
if not translated_voice:
raise Exception('An error happending during audio voice creation')
final_audio_path = speedUpAudio(translated_voice, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}_spedup.wav", expected_duration=t2-t1 - 0.05)
_, translated_duration = get_asset_duration(final_audio_path, isVideo=False)
translated_audio_blocks.append([[t1, t1+translated_duration], final_audio_path])
self._db_audio_bits = translated_audio_blocks
def _edit_and_render_video(self):
self.verifyParameters(_db_audio_bits=self._db_audio_bits)
self.logger(f"4.1 / 5 - Preparing automated editing")
target_language = Language(self._db_target_language)
input_video, video_length = get_asset_duration(self._db_src_url)
video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False)
editing_engine = EditingEngine()
editing_engine.addEditingStep(EditingStep.ADD_BACKGROUND_VIDEO, {'url': input_video, "set_time_start": 0, "set_time_end": video_length})
last_t2 = 0
for (t1, t2), audio_path in self._db_audio_bits:
t2+=-0.05
editing_engine.addEditingStep(EditingStep.INSERT_AUDIO, {'url': audio_path, 'set_time_start': t1, 'set_time_end': t2})
if t1-last_t2 >4:
editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": t1}, "set_time_start": last_t2, "set_time_end": t1})
last_t2 = t2
if video_length - last_t2 >4:
editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": video_length}, "set_time_start": last_t2, "set_time_end": video_length})
if self._db_use_captions:
is_landscape = get_aspect_ratio(input_video) > 1
if not self._db_timed_translated_captions:
if not self._db_translated_voiceover_path:
self.logger(f"4.5 / 5 - Generating captions in {target_language.value}")
editing_engine.generateAudio(self.dynamicAssetDir+"translated_voiceover.wav")
self._db_translated_voiceover_path = self.dynamicAssetDir+"translated_voiceover.wav"
whispered_translated = audioToText(self._db_translated_voiceover_path, model_size='base')
timed_translated_captions = getCaptionsWithTime(whispered_translated, maxCaptionSize=50 if is_landscape else 15, considerPunctuation=True)
self._db_timed_translated_captions = [[[t1,t2], text] for (t1, t2), text in timed_translated_captions if t2 - t1 <= 4]
for (t1, t2), text in self._db_timed_translated_captions:
caption_key = "LANDSCAPE" if is_landscape else "SHORT"
caption_key += "_ARABIC" if target_language == Language.ARABIC else ""
caption_type = getattr(EditingStep, f"ADD_CAPTION_{caption_key}")
editing_engine.addEditingStep(caption_type, {'text': text, "set_time_start": t1, "set_time_end": t2})
self._db_video_path = self.dynamicAssetDir+"translated_content.mp4"
editing_engine.renderVideo(self._db_video_path, logger= self.logger if self.logger is not self.default_logger else None)
def _add_metadata(self):
self.logger(f"5 / 5 - Saving translated video")
now = datetime.datetime.now()
date_str = now.strftime("%Y-%m-%d_%H-%M-%S")
newFileName = f"videos/{date_str} - " + \
re.sub(r"[^a-zA-Z0-9 '\n\.]", '', f"translated_content_to_{self._db_target_language}")
shutil.move(self._db_video_path, newFileName+".mp4")
self._db_video_path = newFileName+".mp4"
self._db_ready_to_upload = True
|