Update app.py
Browse files
app.py
CHANGED
@@ -10,11 +10,15 @@ from transformers import logging
|
|
10 |
import math
|
11 |
import json
|
12 |
from pyannote.audio import Pipeline
|
|
|
13 |
|
14 |
# Suppress warnings
|
15 |
warnings.filterwarnings("ignore")
|
16 |
logging.set_verbosity_error()
|
17 |
|
|
|
|
|
|
|
18 |
# Read the Hugging Face token from the environment variable
|
19 |
HUGGINGFACE_TOKEN = os.getenv("HF_TOKEN")
|
20 |
|
@@ -39,15 +43,19 @@ MODELS = {
|
|
39 |
|
40 |
def convert_audio_to_wav(audio_path):
|
41 |
try:
|
|
|
42 |
wav_path = "converted_audio.wav"
|
43 |
command = ["ffmpeg", "-i", audio_path, "-ac", "1", "-ar", "16000", wav_path]
|
44 |
subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
45 |
return wav_path
|
46 |
except Exception as e:
|
|
|
47 |
raise RuntimeError(f"Error converting audio to WAV: {e}")
|
48 |
|
49 |
def detect_language(audio_path):
|
50 |
try:
|
|
|
51 |
speech, _ = librosa.load(audio_path, sr=16000, duration=30)
|
52 |
|
53 |
processor = WhisperProcessor.from_pretrained("openai/whisper-base")
|
@@ -63,18 +71,25 @@ def detect_language(audio_path):
|
|
63 |
pt_confidence = next((lang.prob for lang in langs if lang.lang == 'pt'), 0)
|
64 |
|
65 |
if abs(es_confidence - pt_confidence) < 0.2:
|
|
|
66 |
return 'es'
|
67 |
|
68 |
-
|
|
|
|
|
69 |
except Exception as e:
|
|
|
70 |
raise RuntimeError(f"Error detecting language: {e}")
|
71 |
|
72 |
def diarize_audio(wav_audio):
|
73 |
try:
|
|
|
74 |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=HUGGINGFACE_TOKEN)
|
75 |
diarization = pipeline(wav_audio)
|
|
|
76 |
return diarization
|
77 |
except Exception as e:
|
|
|
78 |
raise RuntimeError(f"Error in diarization: {e}")
|
79 |
|
80 |
def transcribe_audio_stream(audio, model_name):
|
@@ -118,10 +133,12 @@ def transcribe_audio_stream(audio, model_name):
|
|
118 |
transcriptions.append((timestamp, result["text"], progress))
|
119 |
yield transcriptions, progress
|
120 |
except Exception as e:
|
|
|
121 |
raise RuntimeError(f"Error in transcription: {e}")
|
122 |
|
123 |
def merge_diarization_with_transcription(transcriptions, diarization, rate):
|
124 |
try:
|
|
|
125 |
speaker_transcriptions = []
|
126 |
for segment in diarization.itertracks(yield_label=True):
|
127 |
start, end, speaker = segment
|
@@ -132,37 +149,47 @@ def merge_diarization_with_transcription(transcriptions, diarization, rate):
|
|
132 |
if start_time <= ts <= end_time:
|
133 |
text_segment += text + " "
|
134 |
speaker_transcriptions.append((start_time, end_time, speaker, text_segment.strip()))
|
|
|
135 |
return speaker_transcriptions
|
136 |
except Exception as e:
|
|
|
137 |
raise RuntimeError(f"Error merging diarization with transcription: {e}")
|
138 |
|
139 |
def detect_and_select_model(audio):
|
140 |
try:
|
|
|
141 |
wav_audio = convert_audio_to_wav(audio)
|
142 |
language = detect_language(wav_audio)
|
143 |
model_options = MODELS.get(language, MODELS["en"])
|
|
|
144 |
return language, model_options
|
145 |
except Exception as e:
|
|
|
146 |
raise RuntimeError(f"Error detecting and selecting model: {e}")
|
147 |
|
148 |
def save_transcription(transcriptions, file_format):
|
149 |
try:
|
|
|
150 |
if file_format == "txt":
|
151 |
file_path = "/tmp/transcription.txt"
|
152 |
with open(file_path, "w") as f:
|
153 |
for start, end, speaker, text in transcriptions:
|
154 |
f.write(f"[{start:.2f}-{end:.2f}] {speaker}: {text}\n")
|
|
|
155 |
return file_path
|
156 |
elif file_format == "json":
|
157 |
file_path = "/tmp/transcription.json"
|
158 |
with open(file_path, "w") as f:
|
159 |
json.dump(transcriptions, f)
|
|
|
160 |
return file_path
|
161 |
except Exception as e:
|
|
|
162 |
raise RuntimeError(f"Error saving transcription: {e}")
|
163 |
|
164 |
def combined_interface(audio):
|
165 |
try:
|
|
|
166 |
language, model_options = detect_and_select_model(audio)
|
167 |
selected_model = model_options[0]
|
168 |
|
@@ -189,8 +216,8 @@ def combined_interface(audio):
|
|
189 |
os.remove(wav_audio)
|
190 |
|
191 |
yield language, model_options, selected_model, transcriptions_text, 100, "Transcription complete!", txt_file_path, json_file_path
|
192 |
-
|
193 |
except Exception as e:
|
|
|
194 |
yield str(e), [], "", "An error occurred during processing.", 0, "Error", None, None
|
195 |
|
196 |
iface = gr.Interface(
|
|
|
10 |
import math
|
11 |
import json
|
12 |
from pyannote.audio import Pipeline
|
13 |
+
import numpy as np # Asegúrate de importar numpy
|
14 |
|
15 |
# Suppress warnings
|
16 |
warnings.filterwarnings("ignore")
|
17 |
logging.set_verbosity_error()
|
18 |
|
19 |
+
# Inicializar numpy correctamente
|
20 |
+
np._import_array()
|
21 |
+
|
22 |
# Read the Hugging Face token from the environment variable
|
23 |
HUGGINGFACE_TOKEN = os.getenv("HF_TOKEN")
|
24 |
|
|
|
43 |
|
44 |
def convert_audio_to_wav(audio_path):
|
45 |
try:
|
46 |
+
print("Converting audio to WAV format...")
|
47 |
wav_path = "converted_audio.wav"
|
48 |
command = ["ffmpeg", "-i", audio_path, "-ac", "1", "-ar", "16000", wav_path]
|
49 |
subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
50 |
+
print(f"Audio converted to {wav_path}")
|
51 |
return wav_path
|
52 |
except Exception as e:
|
53 |
+
print(f"Error converting audio to WAV: {e}")
|
54 |
raise RuntimeError(f"Error converting audio to WAV: {e}")
|
55 |
|
56 |
def detect_language(audio_path):
|
57 |
try:
|
58 |
+
print("Detecting language...")
|
59 |
speech, _ = librosa.load(audio_path, sr=16000, duration=30)
|
60 |
|
61 |
processor = WhisperProcessor.from_pretrained("openai/whisper-base")
|
|
|
71 |
pt_confidence = next((lang.prob for lang in langs if lang.lang == 'pt'), 0)
|
72 |
|
73 |
if abs(es_confidence - pt_confidence) < 0.2:
|
74 |
+
print("Detected language: Spanish")
|
75 |
return 'es'
|
76 |
|
77 |
+
detected_language = max(langs, key=lambda x: x.prob).lang
|
78 |
+
print(f"Detected language: {detected_language}")
|
79 |
+
return detected_language
|
80 |
except Exception as e:
|
81 |
+
print(f"Error detecting language: {e}")
|
82 |
raise RuntimeError(f"Error detecting language: {e}")
|
83 |
|
84 |
def diarize_audio(wav_audio):
|
85 |
try:
|
86 |
+
print("Performing diarization...")
|
87 |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=HUGGINGFACE_TOKEN)
|
88 |
diarization = pipeline(wav_audio)
|
89 |
+
print("Diarization complete.")
|
90 |
return diarization
|
91 |
except Exception as e:
|
92 |
+
print(f"Error in diarization: {e}")
|
93 |
raise RuntimeError(f"Error in diarization: {e}")
|
94 |
|
95 |
def transcribe_audio_stream(audio, model_name):
|
|
|
133 |
transcriptions.append((timestamp, result["text"], progress))
|
134 |
yield transcriptions, progress
|
135 |
except Exception as e:
|
136 |
+
print(f"Error in transcription: {e}")
|
137 |
raise RuntimeError(f"Error in transcription: {e}")
|
138 |
|
139 |
def merge_diarization_with_transcription(transcriptions, diarization, rate):
|
140 |
try:
|
141 |
+
print("Merging diarization with transcription...")
|
142 |
speaker_transcriptions = []
|
143 |
for segment in diarization.itertracks(yield_label=True):
|
144 |
start, end, speaker = segment
|
|
|
149 |
if start_time <= ts <= end_time:
|
150 |
text_segment += text + " "
|
151 |
speaker_transcriptions.append((start_time, end_time, speaker, text_segment.strip()))
|
152 |
+
print("Merge complete.")
|
153 |
return speaker_transcriptions
|
154 |
except Exception as e:
|
155 |
+
print(f"Error merging diarization with transcription: {e}")
|
156 |
raise RuntimeError(f"Error merging diarization with transcription: {e}")
|
157 |
|
158 |
def detect_and_select_model(audio):
|
159 |
try:
|
160 |
+
print("Detecting and selecting model...")
|
161 |
wav_audio = convert_audio_to_wav(audio)
|
162 |
language = detect_language(wav_audio)
|
163 |
model_options = MODELS.get(language, MODELS["en"])
|
164 |
+
print(f"Selected model: {model_options[0]}")
|
165 |
return language, model_options
|
166 |
except Exception as e:
|
167 |
+
print(f"Error detecting and selecting model: {e}")
|
168 |
raise RuntimeError(f"Error detecting and selecting model: {e}")
|
169 |
|
170 |
def save_transcription(transcriptions, file_format):
|
171 |
try:
|
172 |
+
print(f"Saving transcription to {file_format} format...")
|
173 |
if file_format == "txt":
|
174 |
file_path = "/tmp/transcription.txt"
|
175 |
with open(file_path, "w") as f:
|
176 |
for start, end, speaker, text in transcriptions:
|
177 |
f.write(f"[{start:.2f}-{end:.2f}] {speaker}: {text}\n")
|
178 |
+
print(f"Transcription saved to {file_path}")
|
179 |
return file_path
|
180 |
elif file_format == "json":
|
181 |
file_path = "/tmp/transcription.json"
|
182 |
with open(file_path, "w") as f:
|
183 |
json.dump(transcriptions, f)
|
184 |
+
print(f"Transcription saved to {file_path}")
|
185 |
return file_path
|
186 |
except Exception as e:
|
187 |
+
print(f"Error saving transcription: {e}")
|
188 |
raise RuntimeError(f"Error saving transcription: {e}")
|
189 |
|
190 |
def combined_interface(audio):
|
191 |
try:
|
192 |
+
print("Starting combined interface...")
|
193 |
language, model_options = detect_and_select_model(audio)
|
194 |
selected_model = model_options[0]
|
195 |
|
|
|
216 |
os.remove(wav_audio)
|
217 |
|
218 |
yield language, model_options, selected_model, transcriptions_text, 100, "Transcription complete!", txt_file_path, json_file_path
|
|
|
219 |
except Exception as e:
|
220 |
+
print(f"Error in combined interface: {e}")
|
221 |
yield str(e), [], "", "An error occurred during processing.", 0, "Error", None, None
|
222 |
|
223 |
iface = gr.Interface(
|