artificialguybr commited on
Commit
8d0320b
·
verified ·
1 Parent(s): 12ce031

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +46 -106
app.py CHANGED
@@ -1,35 +1,28 @@
1
- import spaces
2
- import tempfile
3
- import gradio as gr
4
- import subprocess
5
- import os, stat
6
  import uuid
7
- from googletrans import Translator
8
- import edge_tts
9
  import asyncio
10
- import ffmpeg
11
  import json
12
- from scipy.signal import wiener
13
- import soundfile as sf
14
- from pydub import AudioSegment
15
- import numpy as np
16
- import librosa
17
  from zipfile import ZipFile
18
- import shlex
 
19
  import cv2
20
- import torch
21
- import torchvision
22
- from tqdm import tqdm
23
- from numba import jit
24
  from huggingface_hub import HfApi
25
  import moviepy.editor as mp
 
26
 
 
27
  HF_TOKEN = os.environ.get("HF_TOKEN")
 
 
 
28
  api = HfApi(token=HF_TOKEN)
29
- repo_id = "artificialguybr/video-dubbing"
 
30
  ZipFile("ffmpeg.zip").extractall()
31
- st = os.stat('ffmpeg')
32
- os.chmod('ffmpeg', st.st_mode | stat.S_IEXEC)
33
 
34
  print("Starting the program...")
35
 
@@ -50,19 +43,16 @@ def check_for_faces(video_path):
50
  ret, frame = cap.read()
51
  if not ret:
52
  break
53
-
54
  gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
55
- faces = face_cascade.detectMultiScale(gray, 1.1, 4)
56
-
57
- if len(faces) > 0:
58
  return True
59
-
60
  return False
61
 
62
  @spaces.GPU(duration=90)
63
  def transcribe_audio(file_path):
64
  print(f"Starting transcription of file: {file_path}")
65
  temp_audio = None
 
66
  if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')):
67
  print("Video file detected. Extracting audio...")
68
  try:
@@ -73,10 +63,7 @@ def transcribe_audio(file_path):
73
  except Exception as e:
74
  print(f"Error extracting audio from video: {e}")
75
  raise
76
-
77
- print(f"Does the file exist? {os.path.exists(file_path)}")
78
- print(f"File size: {os.path.getsize(file_path) if os.path.exists(file_path) else 'N/A'} bytes")
79
-
80
  output_file = generate_unique_filename(".json")
81
  command = [
82
  "insanely-fast-whisper",
@@ -87,37 +74,24 @@ def transcribe_audio(file_path):
87
  "--timestamp", "chunk",
88
  "--transcript-path", output_file
89
  ]
90
- print(f"Executing command: {' '.join(command)}")
91
  try:
92
  result = subprocess.run(command, check=True, capture_output=True, text=True)
93
- print(f"Standard output: {result.stdout}")
94
- print(f"Error output: {result.stderr}")
95
  except subprocess.CalledProcessError as e:
96
  print(f"Error running insanely-fast-whisper: {e}")
97
- print(f"Standard output: {e.stdout}")
98
- print(f"Error output: {e.stderr}")
99
  raise
100
-
101
- print(f"Reading transcription file: {output_file}")
102
  try:
103
  with open(output_file, "r") as f:
104
  transcription = json.load(f)
105
  except json.JSONDecodeError as e:
106
  print(f"Error decoding JSON: {e}")
107
- print(f"File content: {open(output_file, 'r').read()}")
108
  raise
 
 
109
 
110
- if "text" in transcription:
111
- result = transcription["text"]
112
- else:
113
- result = " ".join([chunk["text"] for chunk in transcription.get("chunks", [])])
114
-
115
- print("Transcription completed.")
116
-
117
- # Cleanup
118
- cleanup_files(output_file)
119
- if temp_audio:
120
- cleanup_files(temp_audio)
121
 
122
  return result
123
 
@@ -143,22 +117,16 @@ def process_video(radio, video, target_language, has_closeup_face):
143
  video_info = ffmpeg.probe(video_path)
144
  video_duration = float(video_info['streams'][0]['duration'])
145
 
146
- if video_duration > 60:
147
- os.remove(video_path)
148
- raise ValueError("Video duration exceeds 1 minute. Please upload a shorter video.")
149
 
150
  ffmpeg.input(video_path).output(f"{run_uuid}_output_audio.wav", acodec='pcm_s24le', ar=48000, map='a').run()
151
 
152
- shell_command = f"ffmpeg -y -i {run_uuid}_output_audio.wav -af lowpass=3000,highpass=100 {run_uuid}_output_audio_final.wav".split(" ")
153
- subprocess.run([item for item in shell_command], capture_output=False, text=True, check=True)
154
 
155
- print("Attempting to transcribe with Whisper...")
156
- try:
157
- whisper_text = transcribe_audio(f"{run_uuid}_output_audio_final.wav")
158
- print(f"Transcription successful: {whisper_text}")
159
- except Exception as e:
160
- print(f"Error encountered during transcription: {str(e)}")
161
- raise
162
 
163
  language_mapping = {
164
  'English': ('en', 'en-US-EricNeural'),
@@ -189,61 +157,36 @@ def process_video(radio, video, target_language, has_closeup_face):
189
 
190
  asyncio.run(text_to_speech(translated_text, voice, f"{run_uuid}_output_synth.wav"))
191
 
192
- pad_top = 0
193
- pad_bottom = 15
194
- pad_left = 0
195
- pad_right = 0
196
- rescaleFactor = 1
197
-
198
- video_path_fix = video_path
199
-
200
- if has_closeup_face:
201
- has_face = True
202
- else:
203
- has_face = check_for_faces(video_path)
204
-
205
- if has_closeup_face:
206
  try:
207
- cmd = f"python Wav2Lip/inference.py --checkpoint_path 'Wav2Lip/checkpoints/wav2lip_gan.pth' --face {shlex.quote(video_path)} --audio '{run_uuid}_output_synth.wav' --pads {pad_top} {pad_bottom} {pad_left} {pad_right} --resize_factor {rescaleFactor} --nosmooth --outfile '{run_uuid}_output_video.mp4'"
208
- subprocess.run(cmd, shell=True, check=True)
209
- except subprocess.CalledProcessError as e:
210
- if "Face not detected! Ensure the video contains a face in all the frames." in str(e.stderr):
211
- gr.Warning("Wav2lip didn't detect a face. Please try again with the option disabled.")
212
- cmd = f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4"
213
- subprocess.run(cmd, shell=True)
214
  else:
215
- cmd = f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4"
216
- subprocess.run(cmd, shell=True)
217
-
218
- if not os.path.exists(f"{run_uuid}_output_video.mp4"):
219
- raise FileNotFoundError(f"Error: {run_uuid}_output_video.mp4 was not generated.")
220
 
221
  output_video_path = f"{run_uuid}_output_video.mp4"
 
 
222
 
223
- files_to_delete = [
224
  f"{run_uuid}_resized_video.mp4",
225
  f"{run_uuid}_output_audio.wav",
226
  f"{run_uuid}_output_audio_final.wav",
227
  f"{run_uuid}_output_synth.wav"
228
- ]
229
- for file in files_to_delete:
230
- try:
231
- os.remove(file)
232
- except FileNotFoundError:
233
- print(f"File {file} not found for deletion.")
234
 
235
- return output_video_path, "" # Retorna o caminho do vídeo e uma string vazia para a mensagem de erro
236
 
237
  except Exception as e:
238
  print(f"Error in process_video: {str(e)}")
239
- return None, f"Error: {str(e)}" # Retorna None para o vídeo e a mensagem de erro
240
 
241
  def swap(radio):
242
- if(radio == "Upload"):
243
- return gr.update(source="upload")
244
- else:
245
- return gr.update(source="webcam")
246
-
247
  video = gr.Video()
248
  radio = gr.Radio(["Upload", "Record"], value="Upload", show_label=False)
249
  iface = gr.Interface(
@@ -251,11 +194,8 @@ iface = gr.Interface(
251
  inputs=[
252
  radio,
253
  video,
254
- gr.Dropdown(choices=["English", "Spanish", "French", "German", "Italian", "Portuguese", "Polish", "Turkish", "Russian", "Dutch", "Czech", "Arabic", "Chinese (Simplified)", "Japanese", "Korean", "Hindi", "Swedish", "Danish", "Finnish", "Greek"], label="Target Language for Dubbing", value="Spanish"),
255
- gr.Checkbox(
256
- label="Video has a close-up face. Use Wav2lip.",
257
- value=False,
258
- info="Say if video have close-up face. For Wav2lip. Will not work if checked wrongly.")
259
  ],
260
  outputs=[
261
  gr.Video(label="Processed Video"),
 
1
+ import os
 
 
 
 
2
  import uuid
 
 
3
  import asyncio
4
+ import subprocess
5
  import json
 
 
 
 
 
6
  from zipfile import ZipFile
7
+ import gradio as gr
8
+ import ffmpeg
9
  import cv2
10
+ import edge_tts
11
+ from googletrans import Translator
 
 
12
  from huggingface_hub import HfApi
13
  import moviepy.editor as mp
14
+ import spaces
15
 
16
+ # Constants and initialization
17
  HF_TOKEN = os.environ.get("HF_TOKEN")
18
+ REPO_ID = "artificialguybr/video-dubbing"
19
+ MAX_VIDEO_DURATION = 60 # seconds
20
+
21
  api = HfApi(token=HF_TOKEN)
22
+
23
+ # Extract and set permissions for ffmpeg
24
  ZipFile("ffmpeg.zip").extractall()
25
+ os.chmod('ffmpeg', os.stat('ffmpeg').st_mode | os.stat.S_IEXEC)
 
26
 
27
  print("Starting the program...")
28
 
 
43
  ret, frame = cap.read()
44
  if not ret:
45
  break
 
46
  gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
47
+ if face_cascade.detectMultiScale(gray, 1.1, 4):
 
 
48
  return True
 
49
  return False
50
 
51
  @spaces.GPU(duration=90)
52
  def transcribe_audio(file_path):
53
  print(f"Starting transcription of file: {file_path}")
54
  temp_audio = None
55
+
56
  if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')):
57
  print("Video file detected. Extracting audio...")
58
  try:
 
63
  except Exception as e:
64
  print(f"Error extracting audio from video: {e}")
65
  raise
66
+
 
 
 
67
  output_file = generate_unique_filename(".json")
68
  command = [
69
  "insanely-fast-whisper",
 
74
  "--timestamp", "chunk",
75
  "--transcript-path", output_file
76
  ]
77
+
78
  try:
79
  result = subprocess.run(command, check=True, capture_output=True, text=True)
80
+ print(f"Transcription output: {result.stdout}")
 
81
  except subprocess.CalledProcessError as e:
82
  print(f"Error running insanely-fast-whisper: {e}")
 
 
83
  raise
84
+
 
85
  try:
86
  with open(output_file, "r") as f:
87
  transcription = json.load(f)
88
  except json.JSONDecodeError as e:
89
  print(f"Error decoding JSON: {e}")
 
90
  raise
91
+
92
+ result = transcription.get("text", " ".join([chunk["text"] for chunk in transcription.get("chunks", [])]))
93
 
94
+ cleanup_files(output_file, temp_audio)
 
 
 
 
 
 
 
 
 
 
95
 
96
  return result
97
 
 
117
  video_info = ffmpeg.probe(video_path)
118
  video_duration = float(video_info['streams'][0]['duration'])
119
 
120
+ if video_duration > MAX_VIDEO_DURATION:
121
+ cleanup_files(video_path)
122
+ raise ValueError(f"Video duration exceeds {MAX_VIDEO_DURATION} seconds. Please upload a shorter video.")
123
 
124
  ffmpeg.input(video_path).output(f"{run_uuid}_output_audio.wav", acodec='pcm_s24le', ar=48000, map='a').run()
125
 
126
+ subprocess.run(f"ffmpeg -y -i {run_uuid}_output_audio.wav -af lowpass=3000,highpass=100 {run_uuid}_output_audio_final.wav", shell=True, check=True)
 
127
 
128
+ whisper_text = transcribe_audio(f"{run_uuid}_output_audio_final.wav")
129
+ print(f"Transcription successful: {whisper_text}")
 
 
 
 
 
130
 
131
  language_mapping = {
132
  'English': ('en', 'en-US-EricNeural'),
 
157
 
158
  asyncio.run(text_to_speech(translated_text, voice, f"{run_uuid}_output_synth.wav"))
159
 
160
+ if has_closeup_face or check_for_faces(video_path):
 
 
 
 
 
 
 
 
 
 
 
 
 
161
  try:
162
+ subprocess.run(f"python Wav2Lip/inference.py --checkpoint_path 'Wav2Lip/checkpoints/wav2lip_gan.pth' --face '{video_path}' --audio '{run_uuid}_output_synth.wav' --pads 0 15 0 0 --resize_factor 1 --nosmooth --outfile '{run_uuid}_output_video.mp4'", shell=True, check=True)
163
+ except subprocess.CalledProcessError:
164
+ gr.Warning("Wav2lip didn't detect a face. Please try again with the option disabled.")
165
+ subprocess.run(f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4", shell=True)
 
 
 
166
  else:
167
+ subprocess.run(f"ffmpeg -i {video_path} -i {run_uuid}_output_synth.wav -c:v copy -c:a aac -strict experimental -map 0:v:0 -map 1:a:0 {run_uuid}_output_video.mp4", shell=True)
 
 
 
 
168
 
169
  output_video_path = f"{run_uuid}_output_video.mp4"
170
+ if not os.path.exists(output_video_path):
171
+ raise FileNotFoundError(f"Error: {output_video_path} was not generated.")
172
 
173
+ cleanup_files(
174
  f"{run_uuid}_resized_video.mp4",
175
  f"{run_uuid}_output_audio.wav",
176
  f"{run_uuid}_output_audio_final.wav",
177
  f"{run_uuid}_output_synth.wav"
178
+ )
 
 
 
 
 
179
 
180
+ return output_video_path, ""
181
 
182
  except Exception as e:
183
  print(f"Error in process_video: {str(e)}")
184
+ return None, f"Error: {str(e)}"
185
 
186
  def swap(radio):
187
+ return gr.update(source="upload" if radio == "Upload" else "webcam")
188
+
189
+ # Gradio interface setup
 
 
190
  video = gr.Video()
191
  radio = gr.Radio(["Upload", "Record"], value="Upload", show_label=False)
192
  iface = gr.Interface(
 
194
  inputs=[
195
  radio,
196
  video,
197
+ gr.Dropdown(choices=list(language_mapping.keys()), label="Target Language for Dubbing", value="Spanish"),
198
+ gr.Checkbox(label="Video has a close-up face. Use Wav2lip.", value=False, info="Say if video have close-up face. For Wav2lip. Will not work if checked wrongly.")
 
 
 
199
  ],
200
  outputs=[
201
  gr.Video(label="Processed Video"),