Merlintxu commited on
Commit
694f93a
1 Parent(s): 3c889a2

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +99 -165
app.py CHANGED
@@ -1,24 +1,19 @@
1
- import os
2
- import warnings
3
- import subprocess
4
  import gradio as gr
 
5
  import torch
6
- import numpy as np
7
  import librosa
8
- import math
9
- import json
10
- from transformers import pipeline, WhisperProcessor, WhisperForConditionalGeneration
11
  from langdetect import detect_langs
12
- from pyannote.audio import Pipeline
 
13
  from transformers import logging
 
 
14
 
15
  # Suppress warnings
16
  warnings.filterwarnings("ignore")
17
  logging.set_verbosity_error()
18
 
19
- # Read the Hugging Face token from the environment variable
20
- HUGGINGFACE_TOKEN = os.getenv("HF_TOKEN")
21
-
22
  # Updated models by language
23
  MODELS = {
24
  "es": [
@@ -39,187 +34,127 @@ MODELS = {
39
  }
40
 
41
  def convert_audio_to_wav(audio_path):
42
- try:
43
- print("Converting audio to WAV format...")
44
- wav_path = "converted_audio.wav"
45
- command = ["ffmpeg", "-i", audio_path, "-ac", "1", "-ar", "16000", wav_path]
46
- subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
47
- print(f"Audio converted to {wav_path}")
48
- return wav_path
49
- except Exception as e:
50
- print(f"Error converting audio to WAV: {e}")
51
- raise RuntimeError(f"Error converting audio to WAV: {e}")
52
 
53
  def detect_language(audio_path):
54
- try:
55
- print("Detecting language...")
56
- speech, _ = librosa.load(audio_path, sr=16000, duration=30)
57
-
58
- processor = WhisperProcessor.from_pretrained("openai/whisper-base")
59
- model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-base")
60
-
61
- input_features = processor(speech, sampling_rate=16000, return_tensors="pt").input_features
62
- predicted_ids = model.generate(input_features)
63
- transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
64
-
65
- langs = detect_langs(transcription)
66
-
67
- es_confidence = next((lang.prob for lang in langs if lang.lang == 'es'), 0)
68
- pt_confidence = next((lang.prob for lang in langs if lang.lang == 'pt'), 0)
69
-
70
- if abs(es_confidence - pt_confidence) < 0.2:
71
- print("Detected language: Spanish")
72
- return 'es'
73
-
74
- detected_language = max(langs, key=lambda x: x.prob).lang
75
- print(f"Detected language: {detected_language}")
76
- return detected_language
77
- except Exception as e:
78
- print(f"Error detecting language: {e}")
79
- raise RuntimeError(f"Error detecting language: {e}")
80
-
81
- def diarize_audio(wav_audio):
82
- try:
83
- print("Performing diarization...")
84
- pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=HUGGINGFACE_TOKEN)
85
- diarization = pipeline(wav_audio)
86
- print("Diarization complete.")
87
- return diarization
88
- except Exception as e:
89
- print(f"Error in diarization: {e}")
90
- raise RuntimeError(f"Error in diarization: {e}")
91
 
92
  def transcribe_audio_stream(audio, model_name):
93
- try:
94
- wav_audio = convert_audio_to_wav(audio)
95
- speech, rate = librosa.load(wav_audio, sr=16000)
96
- duration = len(speech) / rate
 
 
 
 
 
97
 
98
- transcriptions = []
99
 
100
- if "whisper" in model_name:
101
- processor = WhisperProcessor.from_pretrained(model_name)
102
- model = WhisperForConditionalGeneration.from_pretrained(model_name)
103
-
104
- chunk_duration = 30 # seconds
105
 
106
- for i in range(0, int(duration), chunk_duration):
107
- end = min(i + chunk_duration, duration)
108
- chunk = speech[int(i * rate):int(end * rate)]
109
-
110
- input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features
111
- predicted_ids = model.generate(input_features)
112
- transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
113
-
114
- progress = min(100, (end / duration) * 100)
115
- timestamp = i
116
- transcriptions.append((timestamp, transcription, progress))
117
- yield transcriptions, progress
118
- else:
119
- transcriber = pipeline("automatic-speech-recognition", model=model_name)
120
 
121
- chunk_duration = 10 # seconds
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
 
123
- for i in range(0, int(duration), chunk_duration):
124
- end = min(i + chunk_duration, duration)
125
- chunk = speech[int(i * rate):int(end * rate)]
126
- result = transcriber(chunk)
127
-
128
- progress = min(100, (end / duration) * 100)
129
- timestamp = i
130
- transcriptions.append((timestamp, result["text"], progress))
131
- yield transcriptions, progress
132
- except Exception as e:
133
- print(f"Error in transcription: {e}")
134
- raise RuntimeError(f"Error in transcription: {e}")
135
-
136
- def merge_diarization_with_transcription(transcriptions, diarization, rate):
137
- try:
138
- print("Merging diarization with transcription...")
139
- speaker_transcriptions = []
140
- for segment in diarization.itertracks(yield_label=True):
141
- start, end, speaker = segment
142
- start_time = start / rate
143
- end_time = end / rate
144
- text_segment = ""
145
- for ts, text, _ in transcriptions:
146
- if start_time <= ts <= end_time:
147
- text_segment += text + " "
148
- speaker_transcriptions.append((start_time, end_time, speaker, text_segment.strip()))
149
- print("Merge complete.")
150
- return speaker_transcriptions
151
- except Exception as e:
152
- print(f"Error merging diarization with transcription: {e}")
153
- raise RuntimeError(f"Error merging diarization with transcription: {e}")
154
 
155
  def detect_and_select_model(audio):
156
- try:
157
- print("Detecting and selecting model...")
158
- wav_audio = convert_audio_to_wav(audio)
159
- language = detect_language(wav_audio)
160
- model_options = MODELS.get(language, MODELS["en"])
161
- print(f"Selected model: {model_options[0]}")
162
- return language, model_options
163
- except Exception as e:
164
- print(f"Error detecting and selecting model: {e}")
165
- raise RuntimeError(f"Error detecting and selecting model: {e}")
166
 
167
  def save_transcription(transcriptions, file_format):
168
- try:
169
- print(f"Saving transcription to {file_format} format...")
170
- if file_format == "txt":
171
- file_path = "/tmp/transcription.txt"
172
- with open(file_path, "w") as f:
173
- for start, end, speaker, text in transcriptions:
174
- f.write(f"[{start:.2f}-{end:.2f}] {speaker}: {text}\n")
175
- print(f"Transcription saved to {file_path}")
176
- return file_path
177
- elif file_format == "json":
178
- file_path = "/tmp/transcription.json"
179
- with open(file_path, "w") as f:
180
- json.dump(transcriptions, f)
181
- print(f"Transcription saved to {file_path}")
182
- return file_path
183
- except Exception as e:
184
- print(f"Error saving transcription: {e}")
185
- raise RuntimeError(f"Error saving transcription: {e}")
186
 
187
- def combined_interface(audio):
188
  try:
189
- print("Starting combined interface...")
190
  language, model_options = detect_and_select_model(audio)
191
  selected_model = model_options[0]
192
 
193
- yield language, model_options, selected_model, "", 0, "Initializing...", None, None
194
 
195
- wav_audio = convert_audio_to_wav(audio)
196
- diarization = diarize_audio(wav_audio)
197
  transcriptions = []
198
-
199
  for partial_transcriptions, progress in transcribe_audio_stream(audio, selected_model):
200
  transcriptions = partial_transcriptions
201
- transcriptions_text = "\n".join([f"[{start}-{end}] {text}" for start, end, text in transcriptions])
202
  progress_int = math.floor(progress)
203
  status = f"Transcribing... {progress_int}% complete"
204
- yield language, model_options, selected_model, transcriptions_text, progress_int, status, None, None
205
 
206
- rate = librosa.get_samplerate(wav_audio)
207
- speaker_transcriptions = merge_diarization_with_transcription(transcriptions, diarization, rate)
208
- transcriptions_text = "\n".join([f"[{start:.2f}-{end:.2f}] {speaker}: {text}" for start, end, speaker, text in speaker_transcriptions])
209
 
210
- txt_file_path = save_transcription(speaker_transcriptions, "txt")
211
- json_file_path = save_transcription(speaker_transcriptions, "json")
212
-
213
- os.remove(wav_audio)
214
 
215
- yield language, model_options, selected_model, transcriptions_text, 100, "Transcription complete!", txt_file_path, json_file_path
216
  except Exception as e:
217
- print(f"Error in combined interface: {e}")
218
- yield str(e), [], "", "An error occurred during processing.", 0, "Error", None, None
219
 
220
  iface = gr.Interface(
221
  fn=combined_interface,
222
- inputs=gr.Audio(type="filepath"),
 
 
 
223
  outputs=[
224
  gr.Textbox(label="Detected Language"),
225
  gr.Dropdown(label="Available Models", choices=[]),
@@ -227,11 +162,10 @@ iface = gr.Interface(
227
  gr.Textbox(label="Transcription", lines=10),
228
  gr.Slider(minimum=0, maximum=100, label="Progress", interactive=False),
229
  gr.Textbox(label="Status"),
230
- gr.File(label="Download Transcription (TXT)", type="filepath"),
231
- gr.File(label="Download Transcription (JSON)", type="filepath")
232
  ],
233
- title="Multilingual Audio Transcriber with Real-time Display, Timestamps, and Speaker Diarization",
234
- description="Upload an audio file to detect the language, select the transcription model, and get the transcription with timestamps and speaker labels in real-time. Download the transcription as TXT or JSON. Optimized for Spanish, English, and Portuguese.",
235
  live=True
236
  )
237
 
 
 
 
 
1
  import gradio as gr
2
+ from transformers import pipeline, WhisperProcessor, WhisperForConditionalGeneration
3
  import torch
 
4
  import librosa
5
+ import subprocess
 
 
6
  from langdetect import detect_langs
7
+ import os
8
+ import warnings
9
  from transformers import logging
10
+ import math
11
+ import json
12
 
13
  # Suppress warnings
14
  warnings.filterwarnings("ignore")
15
  logging.set_verbosity_error()
16
 
 
 
 
17
  # Updated models by language
18
  MODELS = {
19
  "es": [
 
34
  }
35
 
36
  def convert_audio_to_wav(audio_path):
37
+ wav_path = "converted_audio.wav"
38
+ command = ["ffmpeg", "-i", audio_path, "-ac", "1", "-ar", "16000", wav_path]
39
+ subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
40
+ return wav_path
 
 
 
 
 
 
41
 
42
  def detect_language(audio_path):
43
+ speech, _ = librosa.load(audio_path, sr=16000, duration=30)
44
+
45
+ processor = WhisperProcessor.from_pretrained("openai/whisper-base")
46
+ model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-base")
47
+
48
+ input_features = processor(speech, sampling_rate=16000, return_tensors="pt").input_features
49
+ predicted_ids = model.generate(input_features)
50
+ transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
51
+
52
+ langs = detect_langs(transcription)
53
+
54
+ es_confidence = next((lang.prob for lang in langs if lang.lang == 'es'), 0)
55
+ pt_confidence = next((lang.prob for lang in langs if lang.lang == 'pt'), 0)
56
+
57
+ if abs(es_confidence - pt_confidence) < 0.2:
58
+ return 'es'
59
+
60
+ return max(langs, key=lambda x: x.prob).lang
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
 
62
  def transcribe_audio_stream(audio, model_name):
63
+ wav_audio = convert_audio_to_wav(audio)
64
+ speech, rate = librosa.load(wav_audio, sr=16000)
65
+ duration = len(speech) / rate
66
+
67
+ transcriptions = []
68
+
69
+ if "whisper" in model_name:
70
+ processor = WhisperProcessor.from_pretrained(model_name)
71
+ model = WhisperForConditionalGeneration.from_pretrained(model_name)
72
 
73
+ chunk_duration = 30 # seconds
74
 
75
+ for i in range(0, int(duration), chunk_duration):
76
+ end = min(i + chunk_duration, duration)
77
+ chunk = speech[int(i * rate):int(end * rate)]
 
 
78
 
79
+ input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features
80
+ predicted_ids = model.generate(input_features)
81
+ transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
 
 
 
 
 
 
 
 
 
 
 
82
 
83
+ progress = min(100, (end / duration) * 100)
84
+ transcriptions.append({
85
+ "start_time": i,
86
+ "end_time": end,
87
+ "text": transcription
88
+ })
89
+ yield transcriptions, progress
90
+ else:
91
+ transcriber = pipeline("automatic-speech-recognition", model=model_name)
92
+
93
+ chunk_duration = 10 # seconds
94
+
95
+ for i in range(0, int(duration), chunk_duration):
96
+ end = min(i + chunk_duration, duration)
97
+ chunk = speech[int(i * rate):int(end * rate)]
98
+ result = transcriber(chunk)
99
 
100
+ progress = min(100, (end / duration) * 100)
101
+ transcriptions.append({
102
+ "start_time": i,
103
+ "end_time": end,
104
+ "text": result["text"]
105
+ })
106
+ yield transcriptions, progress
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
 
108
  def detect_and_select_model(audio):
109
+ wav_audio = convert_audio_to_wav(audio)
110
+ language = detect_language(wav_audio)
111
+ model_options = MODELS.get(language, MODELS["en"])
112
+ return language, model_options
 
 
 
 
 
 
113
 
114
  def save_transcription(transcriptions, file_format):
115
+ if file_format == "JSON":
116
+ file_path = "transcription.json"
117
+ with open(file_path, 'w') as f:
118
+ json.dump(transcriptions, f, ensure_ascii=False, indent=4)
119
+ elif file_format == "TXT":
120
+ file_path = "transcription.txt"
121
+ with open(file_path, 'w') as f:
122
+ for entry in transcriptions:
123
+ f.write(f"{entry['start_time']},{entry['end_time']},{entry['text']}\n")
124
+ return file_path
 
 
 
 
 
 
 
 
125
 
126
+ def combined_interface(audio, file_format):
127
  try:
 
128
  language, model_options = detect_and_select_model(audio)
129
  selected_model = model_options[0]
130
 
131
+ yield language, model_options, selected_model, "", 0, "Initializing..."
132
 
 
 
133
  transcriptions = []
 
134
  for partial_transcriptions, progress in transcribe_audio_stream(audio, selected_model):
135
  transcriptions = partial_transcriptions
136
+ full_transcription = " ".join([t["text"] for t in transcriptions])
137
  progress_int = math.floor(progress)
138
  status = f"Transcribing... {progress_int}% complete"
139
+ yield language, model_options, selected_model, full_transcription.strip(), progress_int, status
140
 
141
+ # Save transcription file
142
+ file_path = save_transcription(transcriptions, file_format)
 
143
 
144
+ # Clean up temporary files
145
+ os.remove("converted_audio.wav")
146
+
147
+ yield language, model_options, selected_model, full_transcription.strip(), 100, f"Transcription complete! Download {file_path}", file_path
148
 
 
149
  except Exception as e:
150
+ yield str(e), [], "", "An error occurred during processing.", 0, "Error", ""
 
151
 
152
  iface = gr.Interface(
153
  fn=combined_interface,
154
+ inputs=[
155
+ gr.Audio(type="filepath"),
156
+ gr.Radio(choices=["JSON", "TXT"], label="Choose output format")
157
+ ],
158
  outputs=[
159
  gr.Textbox(label="Detected Language"),
160
  gr.Dropdown(label="Available Models", choices=[]),
 
162
  gr.Textbox(label="Transcription", lines=10),
163
  gr.Slider(minimum=0, maximum=100, label="Progress", interactive=False),
164
  gr.Textbox(label="Status"),
165
+ gr.File(label="Download Transcription")
 
166
  ],
167
+ title="Multilingual Audio Transcriber with Real-time Display and Progress Indicator",
168
+ description="Upload an audio file to detect the language, select the transcription model, and get the transcription in real-time. Optimized for Spanish, English, and Portuguese.",
169
  live=True
170
  )
171