video-dubbing / app.py
artificialguybr's picture
Update app.py
4fe6158 verified
raw
history blame
9.13 kB
import os
import uuid
import asyncio
import subprocess
import json
from zipfile import ZipFile
import stat
import gradio as gr
import ffmpeg
import cv2
import edge_tts
from googletrans import Translator
from huggingface_hub import HfApi
import moviepy.editor as mp
import spaces
# Constants and initialization
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = "artificialguybr/video-dubbing"
MAX_VIDEO_DURATION = 60 # seconds
api = HfApi(token=HF_TOKEN)
# Extract and set permissions for ffmpeg
ZipFile("ffmpeg.zip").extractall()
st = os.stat('ffmpeg')
os.chmod('ffmpeg', st.st_mode | stat.S_IEXEC)
language_mapping = {
'English': ('en', 'en-US-EricNeural'),
'Spanish': ('es', 'es-ES-AlvaroNeural'),
'French': ('fr', 'fr-FR-HenriNeural'),
'German': ('de', 'de-DE-ConradNeural'),
'Italian': ('it', 'it-IT-DiegoNeural'),
'Portuguese': ('pt', 'pt-PT-DuarteNeural'),
'Polish': ('pl', 'pl-PL-MarekNeural'),
'Turkish': ('tr', 'tr-TR-AhmetNeural'),
'Russian': ('ru', 'ru-RU-DmitryNeural'),
'Dutch': ('nl', 'nl-NL-MaartenNeural'),
'Czech': ('cs', 'cs-CZ-AntoninNeural'),
'Arabic': ('ar', 'ar-SA-HamedNeural'),
'Chinese (Simplified)': ('zh-CN', 'zh-CN-YunxiNeural'),
'Japanese': ('ja', 'ja-JP-KeitaNeural'),
'Korean': ('ko', 'ko-KR-InJoonNeural'),
'Hindi': ('hi', 'hi-IN-MadhurNeural'),
'Swedish': ('sv', 'sv-SE-MattiasNeural'),
'Danish': ('da', 'da-DK-JeppeNeural'),
'Finnish': ('fi', 'fi-FI-HarriNeural'),
'Greek': ('el', 'el-GR-NestorasNeural')
}
print("Starting the program...")
def generate_unique_filename(extension):
return f"{uuid.uuid4()}{extension}"
def cleanup_files(*files):
for file in files:
if file and os.path.exists(file):
os.remove(file)
print(f"Removed file: {file}")
def check_for_faces(video_path):
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
cap = cv2.VideoCapture(video_path)
while True:
ret, frame = cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.1, 4)
if len(faces) > 0:
return True
return False
@spaces.GPU(duration=90)
def transcribe_audio(file_path):
print(f"Starting transcription of file: {file_path}")
temp_audio = None
if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')):
print("Video file detected. Extracting audio...")
try:
video = mp.VideoFileClip(file_path)
temp_audio = generate_unique_filename(".wav")
video.audio.write_audiofile(temp_audio)
file_path = temp_audio
except Exception as e:
print(f"Error extracting audio from video: {e}")
raise
output_file = generate_unique_filename(".json")
command = [
"insanely-fast-whisper",
"--file-name", file_path,
"--device-id", "0",
"--model-name", "openai/whisper-large-v3",
"--task", "transcribe",
"--timestamp", "chunk",
"--transcript-path", output_file
]
try:
result = subprocess.run(command, check=True, capture_output=True, text=True)
print(f"Transcription output: {result.stdout}")
except subprocess.CalledProcessError as e:
print(f"Error running insanely-fast-whisper: {e}")
raise
try:
with open(output_file, "r") as f:
transcription = json.load(f)
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
raise
result = transcription.get("text", " ".join([chunk["text"] for chunk in transcription.get("chunks", [])]))
cleanup_files(output_file, temp_audio)
return result
async def text_to_speech(text, voice, output_file):
communicate = edge_tts.Communicate(text, voice)
await communicate.save(output_file)
@spaces.GPU
def process_video(radio, video, target_language, has_closeup_face):
try:
if target_language is None:
raise ValueError("Please select a Target Language for Dubbing.")
run_uuid = uuid.uuid4().hex[:6]
output_filename = f"{run_uuid}_resized_video.mp4"
ffmpeg.input(video).output(output_filename, vf='scale=-2:720').run()
video_path = output_filename
if not os.path.exists(video_path):
raise FileNotFoundError(f"Error: {video_path} does not exist.")
video_info = ffmpeg.probe(video_path)
video_duration = float(video_info['streams'][0]['duration'])
if video_duration > MAX_VIDEO_DURATION:
cleanup_files(video_path)
raise ValueError(f"Video duration exceeds {MAX_VIDEO_DURATION} seconds. Please upload a shorter video.")
ffmpeg.input(video_path).output(f"{run_uuid}_output_audio.wav", acodec='pcm_s24le', ar=48000, map='a').run()
subprocess.run(f"ffmpeg -y -i {run_uuid}_output_audio.wav -af lowpass=3000,highpass=100 {run_uuid}_output_audio_final.wav", shell=True, check=True)
whisper_text = transcribe_audio(f"{run_uuid}_output_audio_final.wav")
print(f"Transcription successful: {whisper_text}")
target_language_code, voice = language_mapping[target_language]
translator = Translator()
translated_text = translator.translate(whisper_text, dest=target_language_code).text
print(f"Translated text: {translated_text}")
asyncio.run(text_to_speech(translated_text, voice, f"{run_uuid}_output_synth.wav"))
if has_closeup_face or check_for_faces(video_path):
try:
subprocess.run(f"python Wav2Lip/inference.py --checkpoint_path 'Wav2Lip/checkpoints/wav2lip_gan.pth' --face '{video_path}' --audio '{run_uuid}_output_synth.wav' --pads 0 15 0 0 --resize_factor 1 --nosmooth --outfile '{run_uuid}_output_video.mp4'", shell=True, check=True)
except subprocess.CalledProcessError as e:
print(f"Wav2Lip error: {str(e)}")
gr.Warning("Wav2lip didn't detect a face or encountered an error. Falling back to simple audio replacement.")
subprocess.run(f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4", shell=True, check=True)
else:
subprocess.run(f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4", shell=True, check=True)
output_video_path = f"{run_uuid}_output_video.mp4"
if not os.path.exists(output_video_path):
raise FileNotFoundError(f"Error: {output_video_path} was not generated.")
cleanup_files(
f"{run_uuid}_resized_video.mp4",
f"{run_uuid}_output_audio.wav",
f"{run_uuid}_output_audio_final.wav",
f"{run_uuid}_output_synth.wav"
)
return output_video_path, ""
except Exception as e:
print(f"Error in process_video: {str(e)}")
return None, f"Error: {str(e)}"
def swap(radio):
return gr.update(source="upload" if radio == "Upload" else "webcam")
# Gradio interface setup
video = gr.Video()
radio = gr.Radio(["Upload", "Record"], value="Upload", show_label=False)
iface = gr.Interface(
fn=process_video,
inputs=[
radio,
video,
gr.Dropdown(choices=list(language_mapping.keys()), label="Target Language for Dubbing", value="Spanish"),
gr.Checkbox(label="Video has a close-up face. Use Wav2lip.", value=False, info="Say if video have close-up face. For Wav2lip. Will not work if checked wrongly.")
],
outputs=[
gr.Video(label="Processed Video"),
gr.Textbox(label="Error Message")
],
live=False,
title="AI Video Dubbing",
description="""This tool was developed by [@artificialguybr](https://twitter.com/artificialguybr) using entirely open-source tools. Special thanks to Hugging Face for the GPU support. Thanks [@yeswondwer](https://twitter.com/@yeswondwerr) for original code. Test the [Video Transcription and Translate](https://huggingface.co/spaces/artificialguybr/VIDEO-TRANSLATION-TRANSCRIPTION) space!""",
allow_flagging=False
)
with gr.Blocks() as demo:
iface.render()
radio.change(swap, inputs=[radio], outputs=video)
gr.Markdown("""
**Note:**
- Video limit is 1 minute. It will dubbing all people using just one voice.
- Generation may take up to 5 minutes.
- The tool uses open-source models for all models. It's an alpha version.
- Quality can be improved but would require more processing time per video. For scalability and hardware limitations, speed was chosen, not just quality.
- If you need more than 1 minute, duplicate the Space and change the limit on app.py.
- If you incorrectly mark the 'Video has a close-up face' checkbox, the dubbing may not work as expected.
""")
print("Launching Gradio interface...")
demo.queue()
demo.launch()