Spaces:
Sleeping
Sleeping
File size: 5,517 Bytes
67002a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
from pyannote.audio import Pipeline
from pydub import AudioSegment
import whisperx
import torch
import json
import os
from fastapi import FastAPI, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pydub.utils import mediainfo
import subprocess
from dotenv import load_dotenv
# cargar el archivo .env
load_dotenv()
# Inicializar la aplicaci贸n
app = FastAPI()
# Configurar CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Carga de token y verificaci贸n de disponibilidad de GPU
hf_token = os.getenv("HF_TOKEN")
device = "cuda" if torch.cuda.is_available() else "cpu"
# Tipo de c贸mputo #"float32" o "float16" si se desea utilizar la aceleraci贸n de la GPU o "float32" si se desea utilizar la CPU.
compute_type = "float32" if device == "cuda" else "float32"
# Transcripci贸n
print("Descargando el modelo...")
model = whisperx.load_model(
"large-v2", device, language="es", compute_type=compute_type
)
print("隆Modelo descargado con 茅xito!")
def transcribe_wav(audio):
"""
Transcribe un archivo de audio WAV y devuelve el texto transrito.
"""
resultado = model.transcribe(audio, language="es", batch_size=28)
return "\n".join([segment["text"] for segment in resultado["segments"]])
def segment_and_transcribe_audio(audio_entrada: str) -> dict:
"""
Segmenta y transcribe el archivo de audio de entrada.
Parameters:
:param audio_entrada: str: Ruta al archivo de audio de entrada.
Returns:
dict: Diccionario que contiene las transcripciones para cada segmento de parlante.
"""
transcriptions = {}
if not os.path.exists(audio_entrada):
raise FileNotFoundError("El archivo de audio no existe.")
audio_info = mediainfo(audio_entrada)
duration_s = audio_info.get("duration")
if duration_s is None:
raise ValueError("No se pudo obtener la duraci贸n del archivo de audio.")
duration_s = float(duration_s)
print(f"Duraci贸n del audio: {duration_s} s")
if duration_s < 10:
raise ValueError(
f"La duraci贸n del audio es demasiado corta para realizar una transcripci贸n. Duraci贸n: {duration_s} s"
)
# Cargar el archivo de audio usando whisperx.load_audio
audio = whisperx.load_audio(audio_entrada)
# Realizar la transcripci贸n completa del audio
transcripcion_completa = transcribe_wav(audio)
transcriptions["full"] = transcripcion_completa
# Inicializar el pipeline de diarizaci贸n de parlantes
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1", use_auth_token=hf_token
)
pipeline.to(torch.device(device))
# Realizar la diarizaci贸n de los parlantes en el audio
with open(audio_entrada, "rb") as f:
diarization = pipeline(f)
# Escribir los resultados de la diarizaci贸n en un archivo de texto
with open("audio.txt", "w") as lab:
diarization.write_lab(lab)
# Leer los resultados de la diarizaci贸n desde el archivo de texto
with open("audio.txt", "r") as file:
lines = file.readlines()
# Procesar los resultados de la diarizaci贸n
data = []
for line in lines:
parts = line.strip().split()
data.append(
{"start": float(parts[0]), "end": float(parts[1]), "speaker": parts[2]}
)
# Guardar los resultados de la diarizaci贸n en un archivo JSON
with open("audio.json", "w") as file:
json.dump(data, file, indent=4)
# Cargar el archivo de audio original usando AudioSegment
audio_segment = AudioSegment.from_wav(audio_entrada)
# Leer los resultados de la diarizaci贸n desde el archivo JSON
with open("audio.json", "r") as file:
diarization = json.load(file)
# Segmentar el audio en base a los resultados de la diarizaci贸n
segments = {}
for segment in diarization:
speaker = segment["speaker"]
if speaker not in segments:
segments[speaker] = []
start = int(segment["start"] * 1000)
end = int(segment["end"] * 1000)
segments[speaker].append(audio_segment[start:end])
# Transcribir cada segmento de parlante y guardar los resultados
for speaker, segs in segments.items():
result = sum(segs, AudioSegment.empty())
result.export(f"{speaker}.wav", format="wav")
transcriptions[speaker] = transcribe_wav(f"{speaker}.wav")
# Eliminar los archivos temporales creados
os.remove("audio.txt")
os.remove(audio_entrada)
os.remove("audio.json")
for speaker in segments.keys():
os.remove(f"{speaker}.wav")
return transcriptions
@app.get("/")
def read_root():
return {"message": "隆Bienvenido a la API de transcripci贸n de audio!"}
@app.post("/transcribe")
def transcribe_audio(audio: UploadFile = File(...)):
"""
Transcribe un archivo de audio y devuelve el texto transcrito.
Parameters:
:param audio: UploadFile: Archivo de audio a transcribir.
Returns:
JSONResponse: Respuesta JSON que contiene el texto transcrito.
"""
# Guardar el archivo de audio en wav con la libreria AudioSegment
audio_segment = AudioSegment.from_file(audio.file)
audio_segment.export("audio.wav", format="wav")
return segment_and_transcribe_audio("audio.wav")
# if __name__ == "__main__":
# subprocess.run(
# ["uvicorn", "app:app", "--host", "localhost", "--port", "7860", "--reload"]
# )
|