mimishanmi commited on
Commit
2d87036
·
verified ·
1 Parent(s): b67ab96

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +148 -162
app.py CHANGED
@@ -1,51 +1,64 @@
1
-
2
  import os
3
  import asyncio
4
  import whisper
5
  import gradio as gr
6
  import torch
7
- import shutil
8
  import logging
9
  from pathlib import Path
10
- import concurrent.futures
11
  import ffmpeg
12
  import re
13
- import threading
14
- from tqdm.notebook import tqdm
15
- import numpy as np
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
  # --- File Handling ---
18
- # Define paths and constants
19
- TEMP_FOLDER = '/content/temp/'
20
- SUPPORTED_AUDIO_FORMATS = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.amr', '.wma']
21
- SUPPORTED_VIDEO_FORMATS = ['.mp4', '.avi', '.mov', '.wmv', '.mkv', '.webm', '.3gp']
22
- SUPPORTED_FORMATS = SUPPORTED_AUDIO_FORMATS + SUPPORTED_VIDEO_FORMATS
23
 
24
  def create_folders():
25
- """Creates the necessary temporary folder if it doesn't exist."""
26
- Path(TEMP_FOLDER).mkdir(parents=True, exist_ok=True)
27
 
28
  def is_supported_format(file):
29
  """Checks if a file has a supported audio/video format."""
30
- if file is not None:
31
- return any(file.lower().endswith(ext) for ext in SUPPORTED_FORMATS)
32
- else:
33
- return False
34
 
35
  def convert_to_wav(original_file_path):
36
- """Converts input file to WAV format."""
37
  output_path = os.path.join(TEMP_FOLDER, os.path.splitext(os.path.basename(original_file_path))[0] + '.wav')
38
  try:
39
  (
40
  ffmpeg
41
- .input(original_file_path)
42
  .output(output_path, acodec='pcm_s16le', ac=1, ar='16k')
43
  .overwrite_output()
44
  .run(capture_stdout=True, capture_stderr=True)
45
  )
46
  return output_path
47
  except ffmpeg.Error as e:
48
- print(f'Error converting {original_file_path}: {e.stderr.decode()}')
49
  return None
50
 
51
  def delete_temp_file(file_path):
@@ -56,12 +69,10 @@ def delete_temp_file(file_path):
56
  # --- Transcription ---
57
  class WhisperModelCache:
58
  """Singleton class to load and cache the Whisper model."""
59
-
60
  _instance = None
61
 
62
  @staticmethod
63
  def get_instance():
64
- """Get the singleton instance."""
65
  if WhisperModelCache._instance is None:
66
  WhisperModelCache._instance = WhisperModelCache()
67
  return WhisperModelCache._instance
@@ -71,27 +82,14 @@ class WhisperModelCache:
71
  self.device = None
72
 
73
  def load_model(self):
74
- """Loads the Whisper model, prioritizing GPU and handling memory."""
75
  if self.model is None:
76
  self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
77
- logging.info(f"Using device: {self.device}")
78
-
79
- model_size = "large-v2" if torch.cuda.is_available() else "medium"
80
- logging.info(f"Loading Whisper model: {model_size}")
81
-
82
- try:
83
- self.model = whisper.load_model(model_size, device=self.device)
84
- except RuntimeError as e:
85
- if "out of memory" in str(e):
86
- logging.error(f"Error: {e}")
87
- logging.warning("Falling back to 'medium' model size due to memory constraints.")
88
- self.model = whisper.load_model("medium", device=self.device)
89
- else:
90
- raise e
91
-
92
  return self.model
 
93
  def unload_model(self):
94
- """Unloads the model and clears CUDA cache."""
95
  if self.model is not None:
96
  del self.model
97
  self.model = None
@@ -99,58 +97,25 @@ class WhisperModelCache:
99
  torch.cuda.empty_cache()
100
  logging.info("Model unloaded and CUDA cache cleared.")
101
 
102
- async def transcribe_audio(audio_path, language, progress_bar,
103
- task='transcribe', initial_prompt=None,
104
- temperature=0.5, chunk_duration=30):
105
- """Transcribes audio using Whisper, handling chunking and errors."""
106
-
107
  try:
108
  model = WhisperModelCache.get_instance().load_model()
109
- device = WhisperModelCache.get_instance().device
110
-
111
- probe = ffmpeg.probe(audio_path)
112
- total_duration = float(probe['format']['duration'])
113
- num_chunks = int(total_duration // chunk_duration) + (total_duration % chunk_duration > 0)
114
- progress_per_chunk = 20 / num_chunks
115
-
116
- full_transcription = ""
117
-
118
- for chunk_idx in range(num_chunks):
119
- start_time = chunk_idx * chunk_duration
120
- end_time = min((chunk_idx + 1) * chunk_duration, total_duration)
121
- temp_chunk_path = f"{TEMP_FOLDER}/temp_chunk_{chunk_idx}.wav"
122
-
123
- try:
124
- (
125
- ffmpeg
126
- .input(audio_path)
127
- .filter('atrim', start=start_time, end=end_time)
128
- .output(temp_chunk_path, acodec='pcm_s16le', ac=1, ar='16k')
129
- .overwrite_output()
130
- .run(capture_stdout=True, capture_stderr=True)
131
- )
132
- except ffmpeg.Error as e:
133
- logging.error(f"Error extracting audio chunk: {e.stderr.decode()}")
134
- return "Error: Could not extract audio chunk for transcription"
135
-
136
- result = await asyncio.to_thread(model.transcribe, temp_chunk_path,
137
- language=language,
138
- task=task,
139
- initial_prompt=initial_prompt,
140
- temperature=temperature)
141
-
142
- full_transcription += result['text']
143
-
144
- progress_bar.update(progress_per_chunk)
145
- delete_temp_file(temp_chunk_path)
146
-
147
- return full_transcription
148
-
149
  except Exception as e:
150
  logging.error(f"Error transcribing {audio_path}: {str(e)}")
151
  return f"Error during transcription: {str(e)}"
152
 
153
- # --- Anonymization ---
154
  def anonymize_text(text):
155
  """Anonymizes personal information in text."""
156
  text = re.sub(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b|\S+@\S+|\d{3}[-.]?\d{3}[-.]?\d{4}',
@@ -160,105 +125,126 @@ def anonymize_text(text):
160
  return text
161
 
162
  # --- Gradio UI ---
163
- async def process_audio(file, language, anonymize):
164
  """Processes audio: validation, conversion, transcription, anonymization, cleanup."""
165
  try:
166
- if file is None:
167
  return "Error: Please upload an audio or video file."
168
 
169
- if not is_supported_format(file):
170
- raise ValueError(f"Unsupported file format: {file}")
171
-
172
- progress_bar = tqdm(total=100, desc="Overall Process", unit="%", position=0, leave=True)
173
 
174
- progress_bar.update(10)
 
 
 
 
 
 
 
175
 
176
- temp_audio_path = convert_to_wav(file)
177
  if not temp_audio_path:
178
- raise ValueError(f"Failed to convert {file} to WAV format.")
179
-
180
- progress_bar.update(30)
181
-
182
- transcription = await transcribe_audio(temp_audio_path, language, progress_bar)
183
-
184
- progress_bar.update(20)
 
 
185
 
186
- delete_temp_file(temp_audio_path)
187
 
188
  if anonymize:
189
  transcription = anonymize_text(transcription)
190
- progress_bar.update(10)
191
 
192
- progress_bar.update(30)
193
-
194
- progress_bar.close()
 
 
 
 
 
195
 
196
  return transcription
197
 
198
  except Exception as e:
199
- print(f"Error processing audio: {e}")
200
- return f"Error: {str(e)}"
201
 
202
  def create_ui():
203
- """Create the Gradio UI."""
204
- language_choices = ["en", "es", "fr", "de", "it", "pt", "nl", "ru", "zh", "ja", "ko", "ar", "he", "iw", "ar", "auto"]
205
- output_format_choices = ["txt", "srt", "vtt", "tsv", "json"]
206
-
207
- with gr.Blocks() as interface:
208
- with gr.Row():
209
- with gr.Column():
210
- audio_input = gr.Audio(label="Upload Audio/Video", type="filepath")
211
-
212
- task_dropdown = gr.Dropdown(
213
- choices=["Transcribe", "Translate"],
214
- label="Task",
215
- value="Transcribe"
216
- )
217
-
218
- language_dropdown = gr.Dropdown(
219
- choices=language_choices,
220
- label="Language",
221
- value="en", # Default to English
222
- info="Select 'auto' for automatic language detection."
223
- )
224
-
225
- output_format_checkbox_group = gr.CheckboxGroup(
226
- choices=output_format_choices,
227
- label="Output Formats",
228
- value=["txt"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
229
  )
230
-
231
- anonymize_checkbox = gr.Checkbox(label="Anonymize Transcription")
232
-
233
- prompt_input = gr.Textbox(
234
- label="Initial Prompt",
235
- lines=2,
236
- placeholder="Optional prompt to guide transcription"
237
- )
238
-
239
- temperature_slider = gr.Slider(
240
- minimum=0.0,
241
- maximum=1.0,
242
- value=0.5,
243
- label="Temperature"
244
- )
245
-
246
- timestamps_checkbox = gr.Checkbox(label="Include Word Timestamps")
247
-
248
- transcribe_button = gr.Button(value="Transcribe")
249
- with gr.Column():
250
- transcription_output = gr.Textbox(label="Transcription", lines=10)
251
-
252
- transcribe_button.click(
253
- fn=process_audio,
254
- inputs=[audio_input, language_dropdown, anonymize_checkbox],
255
- outputs=transcription_output
256
- )
257
- return interface
258
 
259
  # --- Main Execution ---
260
  if __name__ == "__main__":
261
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
262
  create_folders()
263
  iface = create_ui()
264
- iface.launch(debug=True, share=True)
 
 
1
  import os
2
  import asyncio
3
  import whisper
4
  import gradio as gr
5
  import torch
 
6
  import logging
7
  from pathlib import Path
 
8
  import ffmpeg
9
  import re
10
+ from tqdm import tqdm
11
+ from cryptography.fernet import Fernet
12
+
13
+ # --- Encryption ---
14
+ def generate_key():
15
+ """Generates a new Fernet encryption key."""
16
+ return Fernet.generate_key()
17
+
18
+ def encrypt_file(key, filename):
19
+ """Encrypts a file using the provided key."""
20
+ f = Fernet(key)
21
+ with open(filename, "rb") as file:
22
+ original_data = file.read()
23
+ encrypted_data = f.encrypt(original_data)
24
+ with open(filename, "wb") as file:
25
+ file.write(encrypted_data)
26
+
27
+ def decrypt_file(key, filename):
28
+ """Decrypts a file using the provided key."""
29
+ f = Fernet(key)
30
+ with open(filename, "rb") as file:
31
+ encrypted_data = file.read()
32
+ decrypted_data = f.decrypt(encrypted_data)
33
+ with open(filename, "wb") as file:
34
+ file.write(decrypted_data)
35
 
36
  # --- File Handling ---
37
+ TEMP_FOLDER = 'temp/'
38
+ SUPPORTED_FORMATS = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.mp4', '.avi', '.mov', '.mkv', '.webm']
 
 
 
39
 
40
  def create_folders():
41
+ """Creates the 'temp/' folder if it doesn't exist."""
42
+ Path(TEMP_FOLDER).mkdir(exist_ok=True)
43
 
44
  def is_supported_format(file):
45
  """Checks if a file has a supported audio/video format."""
46
+ return file is not None and any(file.name.lower().endswith(ext) for ext in SUPPORTED_FORMATS)
 
 
 
47
 
48
  def convert_to_wav(original_file_path):
49
+ """Converts input file to WAV format using ffmpeg."""
50
  output_path = os.path.join(TEMP_FOLDER, os.path.splitext(os.path.basename(original_file_path))[0] + '.wav')
51
  try:
52
  (
53
  ffmpeg
54
+ .input(original_file_path)
55
  .output(output_path, acodec='pcm_s16le', ac=1, ar='16k')
56
  .overwrite_output()
57
  .run(capture_stdout=True, capture_stderr=True)
58
  )
59
  return output_path
60
  except ffmpeg.Error as e:
61
+ logging.error(f'Error converting {original_file_path}: {e.stderr.decode()}')
62
  return None
63
 
64
  def delete_temp_file(file_path):
 
69
  # --- Transcription ---
70
  class WhisperModelCache:
71
  """Singleton class to load and cache the Whisper model."""
 
72
  _instance = None
73
 
74
  @staticmethod
75
  def get_instance():
 
76
  if WhisperModelCache._instance is None:
77
  WhisperModelCache._instance = WhisperModelCache()
78
  return WhisperModelCache._instance
 
82
  self.device = None
83
 
84
  def load_model(self):
 
85
  if self.model is None:
86
  self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
87
+ model_size = "medium"
88
+ logging.info(f"Loading Whisper model: {model_size} on {self.device}")
89
+ self.model = whisper.load_model(model_size, device=self.device)
 
 
 
 
 
 
 
 
 
 
 
 
90
  return self.model
91
+
92
  def unload_model(self):
 
93
  if self.model is not None:
94
  del self.model
95
  self.model = None
 
97
  torch.cuda.empty_cache()
98
  logging.info("Model unloaded and CUDA cache cleared.")
99
 
100
+ async def transcribe_audio(audio_path, language, progress=gr.Progress(), task='transcribe', initial_prompt=None, temperature=0.5):
101
+ """Transcribes audio using Whisper."""
 
 
 
102
  try:
103
  model = WhisperModelCache.get_instance().load_model()
104
+ result = await asyncio.to_thread(
105
+ model.transcribe,
106
+ audio_path,
107
+ language=language,
108
+ task=task,
109
+ initial_prompt=initial_prompt,
110
+ temperature=temperature,
111
+ progress_bar=True
112
+ )
113
+ return result['text']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
  except Exception as e:
115
  logging.error(f"Error transcribing {audio_path}: {str(e)}")
116
  return f"Error during transcription: {str(e)}"
117
 
118
+ # --- Anonymization ---
119
  def anonymize_text(text):
120
  """Anonymizes personal information in text."""
121
  text = re.sub(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b|\S+@\S+|\d{3}[-.]?\d{3}[-.]?\d{4}',
 
125
  return text
126
 
127
  # --- Gradio UI ---
128
+ async def process_audio(file, language, task, anonymize, initial_prompt, temperature, encryption_key):
129
  """Processes audio: validation, conversion, transcription, anonymization, cleanup."""
130
  try:
131
+ if not file:
132
  return "Error: Please upload an audio or video file."
133
 
134
+ if not is_supported_format(file):
135
+ return f"Error: Unsupported file format: {file.name}"
 
 
136
 
137
+ # Encryption
138
+ if encryption_key:
139
+ try:
140
+ encrypt_file(encryption_key.encode(), file.name)
141
+ logging.info("File encrypted successfully.")
142
+ except Exception as e:
143
+ logging.error(f"Encryption failed: {str(e)}")
144
+ return f"Error: Encryption failed: {str(e)}"
145
 
146
+ temp_audio_path = convert_to_wav(file.name)
147
  if not temp_audio_path:
148
+ return f"Error: Failed to convert {file.name} to WAV format."
149
+
150
+ transcription = await transcribe_audio(
151
+ temp_audio_path,
152
+ language,
153
+ task=task,
154
+ initial_prompt=initial_prompt,
155
+ temperature=temperature
156
+ )
157
 
158
+ delete_temp_file(temp_audio_path)
159
 
160
  if anonymize:
161
  transcription = anonymize_text(transcription)
 
162
 
163
+ # Decryption
164
+ if encryption_key:
165
+ try:
166
+ decrypt_file(encryption_key.encode(), file.name)
167
+ logging.info("File decrypted successfully.")
168
+ except Exception as e:
169
+ logging.error(f"Decryption failed: {str(e)}")
170
+ return f"Error: Decryption failed: {str(e)}"
171
 
172
  return transcription
173
 
174
  except Exception as e:
175
+ logging.error(f"Error processing audio: {e}")
176
+ return f"Error: {str(e)}"
177
 
178
  def create_ui():
179
+ """Create the Gradio UI."""
180
+ with gr.Blocks(title="Whisper Transcription App") as interface:
181
+ gr.Markdown("# 🎙️ Whisper Transcription App")
182
+ gr.Markdown("Upload an audio or video file to transcribe or translate it using OpenAI's Whisper model.")
183
+
184
+ with gr.Row():
185
+ with gr.Column(scale=2):
186
+ audio_input = gr.File(label="Upload Audio/Video")
187
+
188
+ task_dropdown = gr.Dropdown(
189
+ choices=["transcribe", "translate"],
190
+ label="Task",
191
+ value="transcribe"
192
+ )
193
+
194
+ language_dropdown = gr.Dropdown(
195
+ choices=["en", "es", "fr", "de", "it", "pt", "nl", "ru", "zh", "ja", "ko", "ar", "auto"],
196
+ label="Language",
197
+ value="en",
198
+ info="Select 'auto' for automatic language detection."
199
+ )
200
+
201
+ anonymize_checkbox = gr.Checkbox(label="Anonymize Transcription")
202
+
203
+ prompt_input = gr.Textbox(
204
+ label="Initial Prompt",
205
+ lines=2,
206
+ placeholder="Optional prompt to guide transcription"
207
+ )
208
+
209
+ temperature_slider = gr.Slider(
210
+ minimum=0.0,
211
+ maximum=1.0,
212
+ value=0.5,
213
+ label="Temperature"
214
+ )
215
+
216
+ encryption_key = gr.Textbox(label="Encryption Key (Optional)", type="password")
217
+
218
+ transcribe_button = gr.Button("Transcribe", variant="primary")
219
+
220
+ with gr.Column(scale=3):
221
+ transcription_output = gr.Textbox(label="Transcription", lines=20)
222
+
223
+ transcribe_button.click(
224
+ fn=process_audio,
225
+ inputs=[audio_input, language_dropdown, task_dropdown, anonymize_checkbox, prompt_input, temperature_slider, encryption_key],
226
+ outputs=transcription_output
227
  )
228
+
229
+ gr.Markdown("## How to use")
230
+ gr.Markdown("""
231
+ 1. Upload an audio or video file.
232
+ 2. Choose between transcription or translation.
233
+ 3. Select the language of the audio (or 'auto' for automatic detection).
234
+ 4. Optionally, check 'Anonymize Transcription' to remove personal information.
235
+ 5. You can provide an initial prompt to guide the transcription.
236
+ 6. Adjust the temperature for more or less random results.
237
+ 7. Optionally, provide an encryption key for added security.
238
+ 8. Click 'Transcribe' and wait for the results!
239
+ """)
240
+
241
+ gr.Markdown("Created using OpenAI Whisper and Gradio")
242
+
243
+ return interface
 
 
 
 
 
 
 
 
 
 
 
 
244
 
245
  # --- Main Execution ---
246
  if __name__ == "__main__":
247
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
248
  create_folders()
249
  iface = create_ui()
250
+ iface.launch()