Spaces:
Sleeping
Sleeping
from pyannote.audio import Pipeline | |
from pydub import AudioSegment | |
import whisperx | |
import torch | |
import json | |
import os | |
from fastapi import FastAPI, File, UploadFile | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydub.utils import mediainfo | |
import subprocess | |
from dotenv import load_dotenv | |
# cargar el archivo .env | |
load_dotenv() | |
# Inicializar la aplicaci贸n | |
app = FastAPI() | |
# Configurar CORS | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Carga de token y verificaci贸n de disponibilidad de GPU | |
hf_token = os.getenv("HF_TOKEN") | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Tipo de c贸mputo #"float32" o "float16" si se desea utilizar la aceleraci贸n de la GPU o "float32" si se desea utilizar la CPU. | |
compute_type = "float16" if device == "cuda" else "float32" | |
# Transcripci贸n | |
print("Descargando el modelo...") | |
model = whisperx.load_model( | |
"large-v2", device, language="es", compute_type=compute_type | |
) | |
print("隆Modelo descargado con 茅xito!") | |
def transcribe_wav(audio): | |
""" | |
Transcribe un archivo de audio WAV y devuelve el texto transrito. | |
""" | |
resultado = model.transcribe(audio, language="es", batch_size=28) | |
return "\n".join([segment["text"] for segment in resultado["segments"]]) | |
def segment_and_transcribe_audio(audio_entrada: str) -> dict: | |
""" | |
Segmenta y transcribe el archivo de audio de entrada. | |
Parameters: | |
:param audio_entrada: str: Ruta al archivo de audio de entrada. | |
Returns: | |
dict: Diccionario que contiene las transcripciones para cada segmento de parlante. | |
""" | |
transcriptions = {} | |
if not os.path.exists(audio_entrada): | |
raise FileNotFoundError("El archivo de audio no existe.") | |
audio_info = mediainfo(audio_entrada) | |
duration_s = audio_info.get("duration") | |
if duration_s is None: | |
raise ValueError("No se pudo obtener la duraci贸n del archivo de audio.") | |
duration_s = float(duration_s) | |
print(f"Duraci贸n del audio: {duration_s} s") | |
if duration_s < 10: | |
raise ValueError( | |
f"La duraci贸n del audio es demasiado corta para realizar una transcripci贸n. Duraci贸n: {duration_s} s" | |
) | |
# Cargar el archivo de audio usando whisperx.load_audio | |
audio = whisperx.load_audio(audio_entrada) | |
# Realizar la transcripci贸n completa del audio | |
transcripcion_completa = transcribe_wav(audio) | |
transcriptions["full"] = transcripcion_completa | |
# Inicializar el pipeline de diarizaci贸n de parlantes | |
pipeline = Pipeline.from_pretrained( | |
"pyannote/speaker-diarization-3.1", use_auth_token=hf_token | |
) | |
pipeline.to(torch.device(device)) | |
# Realizar la diarizaci贸n de los parlantes en el audio | |
with open(audio_entrada, "rb") as f: | |
diarization = pipeline(f) | |
# Escribir los resultados de la diarizaci贸n en un archivo de texto | |
with open("audio.txt", "w") as lab: | |
diarization.write_lab(lab) | |
# Leer los resultados de la diarizaci贸n desde el archivo de texto | |
with open("audio.txt", "r") as file: | |
lines = file.readlines() | |
# Procesar los resultados de la diarizaci贸n | |
data = [] | |
for line in lines: | |
parts = line.strip().split() | |
data.append( | |
{"start": float(parts[0]), "end": float(parts[1]), "speaker": parts[2]} | |
) | |
# Guardar los resultados de la diarizaci贸n en un archivo JSON | |
with open("audio.json", "w") as file: | |
json.dump(data, file, indent=4) | |
# Cargar el archivo de audio original usando AudioSegment | |
audio_segment = AudioSegment.from_wav(audio_entrada) | |
# Leer los resultados de la diarizaci贸n desde el archivo JSON | |
with open("audio.json", "r") as file: | |
diarization = json.load(file) | |
# Segmentar el audio en base a los resultados de la diarizaci贸n | |
segments = {} | |
for segment in diarization: | |
speaker = segment["speaker"] | |
if speaker not in segments: | |
segments[speaker] = [] | |
start = int(segment["start"] * 1000) | |
end = int(segment["end"] * 1000) | |
segments[speaker].append(audio_segment[start:end]) | |
# Transcribir cada segmento de parlante y guardar los resultados | |
for speaker, segs in segments.items(): | |
result = sum(segs, AudioSegment.empty()) | |
result.export(f"{speaker}.wav", format="wav") | |
transcriptions[speaker] = transcribe_wav(f"{speaker}.wav") | |
# Eliminar los archivos temporales creados | |
os.remove("audio.txt") | |
os.remove(audio_entrada) | |
os.remove("audio.json") | |
for speaker in segments.keys(): | |
os.remove(f"{speaker}.wav") | |
return transcriptions | |
def read_root(): | |
return {"message": "隆Bienvenido a la API de transcripci贸n de audio!"} | |
def transcribe_audio(audio: UploadFile = File(...)): | |
""" | |
Transcribe un archivo de audio y devuelve el texto transcrito. | |
Parameters: | |
:param audio: UploadFile: Archivo de audio a transcribir. | |
Returns: | |
JSONResponse: Respuesta JSON que contiene el texto transcrito. | |
""" | |
# Guardar el archivo de audio en wav con la libreria AudioSegment | |
audio_segment = AudioSegment.from_file(audio.file) | |
audio_segment.export("audio.wav", format="wav") | |
return segment_and_transcribe_audio("audio.wav") | |
# if __name__ == "__main__": | |
# subprocess.run( | |
# ["uvicorn", "app:app", "--host", "localhost", "--port", "7860", "--reload"] | |
# ) | |