File size: 8,198 Bytes
5f685fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import datetime
import os
import re
import shutil

from tqdm import tqdm

from shortGPT.audio.audio_duration import get_asset_duration
from shortGPT.audio.audio_utils import (audioToText, get_asset_duration,
                                        run_background_audio_split,
                                        speedUpAudio)
from shortGPT.audio.eleven_voice_module import VoiceModule
from shortGPT.config.languages import ACRONYM_LANGUAGE_MAPPING, Language
from shortGPT.editing_framework.editing_engine import (EditingEngine,
                                                       EditingStep)
from shortGPT.editing_utils.captions import (getCaptionsWithTime,
                                             getSpeechBlocks)
from shortGPT.editing_utils.handle_videos import get_aspect_ratio
from shortGPT.engine.abstract_content_engine import CONTENT_DB, AbstractContentEngine
from shortGPT.gpt.gpt_translate import translateContent

class MultiLanguageTranslationEngine(AbstractContentEngine):

    def __init__(self, voiceModule: VoiceModule, src_url: str = "", target_language: Language = Language.ENGLISH, use_captions=False, id=""):
        super().__init__(id, "content_translation", target_language, voiceModule)
        if not id:
            self._db_should_translate = True
            if src_url:
                self._db_src_url = src_url
            self._db_use_captions = use_captions
            self._db_target_language = target_language.value

        self.stepDict = {
            1: self._transcribe_audio,
            2: self._translate_content,
            3: self._generate_translated_audio,
            4: self._edit_and_render_video,
            5: self._add_metadata
        }

    def _transcribe_audio(self):
        cached_translation = CONTENT_DB.content_collection.find_one({
        "content_type": 'content_translation',
        'src_url': self._db_src_url,
        'ready_to_upload': True
        })
        if not (cached_translation and 'speech_blocks' in cached_translation and 'original_language' in cached_translation):
            video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False)
            self.verifyParameters(content_path=video_audio)
            self.logger(f"1/5 - Transcribing original audio to text...")
            whispered = audioToText(video_audio, model_size='base')
            self._db_speech_blocks = getSpeechBlocks(whispered, silence_time=0.8)
            self._db_original_language = whispered['language']
        
        if (ACRONYM_LANGUAGE_MAPPING.get(self._db_original_language) == Language(self._db_target_language)):
            self._db_translated_timed_sentences = self._db_speech_blocks
            self._db_should_translate = False

        expected_chars = len("".join([text for _, text in self._db_speech_blocks]))
        chars_remaining = self.voiceModule.get_remaining_characters()
        if chars_remaining < expected_chars:
            raise Exception(
                f"Your VoiceModule's key doesn't have enough characters to totally translate this video | Remaining: {chars_remaining} | Number of characters to translate: {expected_chars}")

    def _translate_content(self):
        if (self._db_should_translate):
            self.verifyParameters(_db_speech_blocks=self._db_speech_blocks)

            translated_timed_sentences = []
            for i, ((t1, t2), text) in tqdm(enumerate(self._db_speech_blocks), desc="Translating content"):
                self.logger(f"2/5 - Translating text content - {i+1} / {len(self._db_speech_blocks)}")
                translated_text = translateContent(text, self._db_target_language)
                translated_timed_sentences.append([[t1, t2], translated_text])
            self._db_translated_timed_sentences = translated_timed_sentences

    def _generate_translated_audio(self):
        self.verifyParameters(translated_timed_sentences=self._db_translated_timed_sentences)

        translated_audio_blocks = []
        for i, ((t1, t2), translated_text) in tqdm(enumerate(self._db_translated_timed_sentences), desc="Generating translated audio"):
            self.logger(f"3/5 - Generating translated audio - {i+1} / {len(self._db_translated_timed_sentences)}")
            translated_voice = self.voiceModule.generate_voice(translated_text, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}.wav")
            if not translated_voice:
                raise Exception('An error happending during audio voice creation')
            final_audio_path = speedUpAudio(translated_voice, self.dynamicAssetDir+f"translated_{i}_{self._db_target_language}_spedup.wav", expected_duration=t2-t1 - 0.05)
            _, translated_duration = get_asset_duration(final_audio_path, isVideo=False)
            translated_audio_blocks.append([[t1, t1+translated_duration], final_audio_path])
        self._db_audio_bits = translated_audio_blocks

    def _edit_and_render_video(self):
        self.verifyParameters(_db_audio_bits=self._db_audio_bits)
        self.logger(f"4.1 / 5 - Preparing automated editing")
        target_language =  Language(self._db_target_language)
        input_video, video_length = get_asset_duration(self._db_src_url)
        video_audio, _ = get_asset_duration(self._db_src_url, isVideo=False)
        editing_engine = EditingEngine()
        editing_engine.addEditingStep(EditingStep.ADD_BACKGROUND_VIDEO, {'url': input_video, "set_time_start": 0, "set_time_end": video_length})
        last_t2 = 0
        for (t1, t2), audio_path in self._db_audio_bits:
            t2+=-0.05
            editing_engine.addEditingStep(EditingStep.INSERT_AUDIO, {'url': audio_path, 'set_time_start': t1, 'set_time_end': t2})
            if t1-last_t2 >4:
                editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": t1}, "set_time_start": last_t2, "set_time_end":  t1})
            last_t2 = t2

        if video_length - last_t2 >4:
            editing_engine.addEditingStep(EditingStep.EXTRACT_AUDIO, {"url": video_audio, "subclip": {"t_start": last_t2, "t_end": video_length}, "set_time_start": last_t2, "set_time_end":  video_length})

        if self._db_use_captions:
            is_landscape = get_aspect_ratio(input_video) > 1
            if not self._db_timed_translated_captions:
                if not self._db_translated_voiceover_path:
                    self.logger(f"4.5 / 5 - Generating captions in {target_language.value}")
                    editing_engine.generateAudio(self.dynamicAssetDir+"translated_voiceover.wav")
                    self._db_translated_voiceover_path = self.dynamicAssetDir+"translated_voiceover.wav"
                whispered_translated = audioToText(self._db_translated_voiceover_path, model_size='base')
                timed_translated_captions = getCaptionsWithTime(whispered_translated, maxCaptionSize=50 if is_landscape else 15, considerPunctuation=True)
                self._db_timed_translated_captions = [[[t1,t2], text] for (t1, t2), text in timed_translated_captions if t2 - t1 <= 4]
            for (t1, t2), text in self._db_timed_translated_captions:
                caption_key = "LANDSCAPE" if is_landscape else "SHORT"
                caption_key += "_ARABIC" if target_language == Language.ARABIC else ""
                caption_type = getattr(EditingStep, f"ADD_CAPTION_{caption_key}")
                editing_engine.addEditingStep(caption_type, {'text': text, "set_time_start": t1, "set_time_end": t2})
    
        self._db_video_path = self.dynamicAssetDir+"translated_content.mp4"

        editing_engine.renderVideo(self._db_video_path, logger= self.logger if self.logger is not self.default_logger else None)

    def _add_metadata(self):
        self.logger(f"5 / 5 - Saving translated video")
        now = datetime.datetime.now()
        date_str = now.strftime("%Y-%m-%d_%H-%M-%S")
        newFileName = f"videos/{date_str} - " + \
            re.sub(r"[^a-zA-Z0-9 '\n\.]", '', f"translated_content_to_{self._db_target_language}")

        shutil.move(self._db_video_path, newFileName+".mp4")
        self._db_video_path = newFileName+".mp4"
        self._db_ready_to_upload = True