import os import gradio as gr import whisperx import numpy as np import moviepy.editor as mp from moviepy.audio.AudioClip import AudioArrayClip from pytube import YouTube import deepl import torch import pyrubberband as pyrb import soundfile as sf import librosa from TTS.api import TTS HF_TOKEN = os.environ["HF_TOKEN"] DEEPL_TOKEN = os.environ["DEEPL_TOKEN"] # Agreeing to terms of coqui-tts model os.environ["COQUI_TOS_AGREED"] = "1" # Extract audio from video def extract_audio(video_path): clip = mp.VideoFileClip(video_path) audio_path = os.path.splitext(video_path)[0] + ".wav" clip.audio.write_audiofile(audio_path) return audio_path # Perform speech diarization def speech_diarization(audio_path, hf_token): device = "cuda" batch_size = 16 compute_type = "float16" model = whisperx.load_model("large-v2", device, compute_type=compute_type) # 1. Transcribe audio audio = whisperx.load_audio(audio_path) result = model.transcribe(audio, batch_size=batch_size) # delete model if low on GPU resources import gc; gc.collect(); torch.cuda.empty_cache(); del model # 2. Align whisper output model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False) # delete model if low on GPU resources import gc; gc.collect(); torch.cuda.empty_cache(); del model_a # 3. Assign speaker labels diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_token, device=device) # add min/max number of speakers if known diarize_segments = diarize_model(audio) # diarize_model(audio, min_speakers=min_speakers, max_speakers=max_speakers) result = whisperx.assign_word_speakers(diarize_segments, result) print(f'\n[Original transcript]:\n{result["segments"]}\n') return result["segments"] # Create per speaker voice clips for tts voice cloning def speaker_voice_clips(transcription, audio_path): # Create 3 uninterrupted per speaker timecodes snippets_timecodes = {} for segment in transcription: speaker = segment['speaker'] if speaker not in snippets_timecodes: snippets_timecodes[speaker] = [] if len(snippets_timecodes[speaker]) < 3: snippet = { 'start': segment['start'], 'end': segment['end'] } snippets_timecodes[speaker].append(snippet) # Cut voice clips and stitch them together original_audio = mp.AudioFileClip(audio_path) audio_file_directory = os.path.dirname(audio_path) voice_clips = {} for speaker, speaker_snippets in snippets_timecodes.items(): subclips = [] for snippet in speaker_snippets: start, end = snippet['start'], snippet['end'] subclip = original_audio.subclip(start, end) subclips.append(subclip) concatenated_clip = mp.concatenate_audioclips(subclips) output_filename = os.path.join(audio_file_directory, f"{speaker}_voice_clips.wav") concatenated_clip.write_audiofile(output_filename) voice_clips[speaker] = output_filename return voice_clips # Perform text translation def translate_transcript(transcript, target_language, deepl_token): language_map = { 'en':'en-us', 'ru':'ru', 'uk':'uk', 'pl':'pl'} translator = deepl.Translator(deepl_token) translated_transcript = [] for segment in transcript: text_to_translate = segment['text'] translated_text = translator.translate_text(text_to_translate, target_lang=language_map[target_language]) translated_segment = { 'start': segment['start'], 'end': segment['end'], 'text': translated_text.text, 'speaker': segment['speaker'] } translated_transcript.append(translated_segment) print(f'\n[Translated transcript]:\n{translated_transcript}\n') return translated_transcript # Adjust voice pace def adjust_voice_pace(sound_array, sample_rate, target_duration): duration = len(sound_array) / sample_rate tempo_change = duration / target_duration sound_array_stretched = pyrb.time_stretch(np.array(sound_array), sample_rate, tempo_change) return sound_array_stretched # Perform voice cloning def voice_cloning_translation(translated_transcription, speakers_voice_clips, target_language, speaker_model, audio_path): device = "cuda" vits_language_map = { 'en':'eng', 'ru':'rus', 'uk':'ukr', 'pl':'pol' } # Select model selected_model = None if 'vits' in speaker_model.lower() or target_language == 'uk': selected_model = f'tts_models/{vits_language_map[target_language]}/fairseq/vits' else: selected_model = 'tts_models/multilingual/multi-dataset/xtts_v2' print(selected_model) tts = None final_audio_track = None try: tts = TTS(selected_model).to(device) last_end_time = 0 clips = [] # Generate sentences for speech_item in translated_transcription: speech_item_duration = speech_item['end'] - speech_item['start'] # Silence gap_duration = speech_item['start'] - last_end_time if gap_duration > 0: silent_audio = np.zeros((int(44100 * gap_duration), 2)) silent_clip = AudioArrayClip(silent_audio, fps=44100) clips.append(silent_clip) print(f"\nAdded silence: Start={last_end_time}, Duration={gap_duration}") # Generate speech print(f"[{speech_item['speaker']}]") sample_rate = None audio = None if 'vits' in selected_model: audio = tts.tts(text=speech_item['text'], speaker_wav=speakers_voice_clips[speech_item['speaker']]) sample_rate = tts.synthesizer.output_sample_rate else: audio = tts.tts(text=speech_item['text'], speaker_wav=speakers_voice_clips[speech_item['speaker']], language=target_language) sample_rate = tts.synthesizer.output_sample_rate # Adjust pace to fit the speech timeframe if translated audio is longer than phrase audio_duration = len(audio) / sample_rate if speech_item_duration < audio_duration: audio = adjust_voice_pace(audio, sample_rate, speech_item_duration) # Resample to higher rate new_sample_rate = 44100 audio = librosa.resample(np.array(audio), orig_sr=sample_rate, target_sr=new_sample_rate) # Transform to AudioArrayClip object audio = np.expand_dims(audio, axis=1) audio_stereo = np.repeat(audio, 2, axis=1) audio_clip = AudioArrayClip(audio_stereo, fps=44100) # Cut out possible glitch from AudioArrayClip end audio_clip = audio_clip.subclip(0, audio_clip.duration - 0.2) clips.append(audio_clip) print(f"Added speech: Start={speech_item['start']}, Final duration={audio_clip.duration}, Original duration={speech_item_duration}") last_end_time = speech_item['start'] + audio_clip.duration # Merge sentences final_audio_track = mp.concatenate_audioclips(clips) audio_files_directory = os.path.dirname(audio_path) final_audio_track.write_audiofile(os.path.join(audio_files_directory, "translated_voice_track.wav"), fps=44100) except Exception as e: if tts is not None: import gc; gc.collect(); torch.cuda.empty_cache(); del tts raise e return final_audio_track def dub_video(video_path, translated_audio_track, target_language): video = mp.VideoFileClip(video_path) video = video.subclip(0, translated_audio_track.duration) original_audio = video.audio.volumex(0.15) dubbed_audio = mp.CompositeAudioClip([original_audio, translated_audio_track.set_start(0)]) video_with_dubbing = video.set_audio(dubbed_audio) video_with_dubbing_path = os.path.splitext(video_path)[0] + "_" + target_language + ".mp4" video_with_dubbing.write_videofile(video_with_dubbing_path) return video_with_dubbing_path # Perform video translation def video_translation(video_path, target_language, speaker_model, hf_token, deepl_token): original_audio_path = extract_audio(video_path) transcription = speech_diarization(original_audio_path, hf_token) translated_transcription = translate_transcript(transcription, target_language, deepl_token) speakers_voice_clips = speaker_voice_clips(transcription, original_audio_path) translated_audio_track = voice_cloning_translation(translated_transcription, speakers_voice_clips, target_language, speaker_model, original_audio_path) video_with_dubbing = dub_video(video_path, translated_audio_track, target_language) return video_with_dubbing def download_youtube_video(url): yt = YouTube(url) if yt.age_restricted: gr.Warning("The Youtube video you are trying to translate is age restricted. Manually download it using the following link(https://en.savefrom.net/) and use file upload, as pytube library doesn't support restricted videos download.") return None stream = yt.streams.filter(file_extension='mp4').first() output_path = stream.download() return output_path def translation_limit(): translator = deepl.Translator(DEEPL_TOKEN) usage = translator.get_usage() if usage.character.valid: characters_used = usage.character.count minutes_used = characters_used / 750 max_minutes = usage.character.limit / 750 percent_used = (minutes_used / max_minutes) * 100 # Formatting the output for hours and minutes used_time_str = f"{int(minutes_used)} min used" max_time_str = f"{int(max_minutes)} min total" if minutes_used >= 60: hours_used = int(minutes_used // 60) minutes_used = int(minutes_used % 60) used_time_str = f"{hours_used} hrs, {minutes_used} min used" if max_minutes >= 60: hours_max = int(max_minutes // 60) remaining_minutes_max = int(max_minutes % 60) max_time_str = f"{hours_max} hrs, {remaining_minutes_max} min total" progress_bar_html = ( "
" "
" f"{used_time_str} / {max_time_str}" "
" f"
" "
" "
" ) return progress_bar_html else: return "
Translation limit is reached
" def clear_inputs(): return None, "", None, None def translate_video(video_path, youtube_link, target_language, speaker_model): try: if not video_path and not youtube_link: gr.Warning("You should either upload video or input a YouTube link") return translation_limit(), None if youtube_link: video_path = download_youtube_video(youtube_link) if video_path is None: gr.Warning("Video input did not process well, try again") return translation_limit(), None dubbed_video_path = video_translation(video_path, target_language, speaker_model, HF_TOKEN, DEEPL_TOKEN) limit_info = translation_limit() return limit_info, dubbed_video_path except Exception as e: print(f"An error occurred: {e}") raise e initial_usage_info = translation_limit() with gr.Blocks(theme=gr.themes.Soft(), css=".column-frame {border: 2px solid #AAA;border-radius: 10px;padding: 10px;margin: 10px;}") as demo: gr.Markdown("

🌐AI Video Translation

") gr.Markdown("

Currently supported languages are: English, Polish, Ukrainian, and Russian

") with gr.Row(): with gr.Column(elem_classes=["column-frame"]): gr.Markdown("

Inputs

") translation_limit_info = gr.HTML(value=translation_limit()) video = gr.Video(label="Upload a video file") gr.Markdown("

OR

") youtube_link = gr.Textbox(label="Paste YouTube link") gr.Markdown("⚠️If you get a warning that the video is age restricted, manually download it using the following [link](https://en.savefrom.net/) and use file upload, as pytube library doesn't support restricted videos download.") gr.Markdown("---") target_language = gr.Dropdown(["en", "pl", "uk", "ru"], value="pl", label="Select translation target language") speaker_model = gr.Dropdown(["(Recommended) XTTS_V2", "VITs (will be default for Ukrainian)"], value="(Recommended) XTTS_V2", label="Select text-to-speech generation model") with gr.Row(): clear_btn = gr.Button("Clear inputs") translate_btn = gr.Button("Translate") with gr.Column(): with gr.Row(elem_classes=["column-frame"]): with gr.Column(): gr.Markdown("

Translated Video

") output_video = gr.Video(label="Translated video") translate_btn.click( fn=translate_video, inputs=[video, youtube_link, target_language, speaker_model], outputs=[translation_limit_info, output_video] ) clear_btn.click( fn=clear_inputs, inputs=[], outputs=[video, youtube_link, target_language, speaker_model] ) demo.launch(show_error=True, debug=True, share=True)