File size: 5,517 Bytes
67002a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from pyannote.audio import Pipeline
from pydub import AudioSegment
import whisperx
import torch
import json
import os
from fastapi import FastAPI, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pydub.utils import mediainfo
import subprocess
from dotenv import load_dotenv

# cargar el archivo .env
load_dotenv()

# Inicializar la aplicaci贸n
app = FastAPI()

# Configurar CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# Carga de token y verificaci贸n de disponibilidad de GPU
hf_token = os.getenv("HF_TOKEN")

device = "cuda" if torch.cuda.is_available() else "cpu"

# Tipo de c贸mputo #"float32" o "float16" si se desea utilizar la aceleraci贸n de la GPU o "float32" si se desea utilizar la CPU.
compute_type = "float32" if device == "cuda" else "float32"

# Transcripci贸n
print("Descargando el modelo...")
model = whisperx.load_model(
    "large-v2", device, language="es", compute_type=compute_type
)
print("隆Modelo descargado con 茅xito!")


def transcribe_wav(audio):
    """
    Transcribe un archivo de audio WAV y devuelve el texto transrito.
    """
    resultado = model.transcribe(audio, language="es", batch_size=28)
    return "\n".join([segment["text"] for segment in resultado["segments"]])


def segment_and_transcribe_audio(audio_entrada: str) -> dict:
    """
    Segmenta y transcribe el archivo de audio de entrada.

    Parameters:
    :param audio_entrada: str: Ruta al archivo de audio de entrada.

    Returns:
    dict: Diccionario que contiene las transcripciones para cada segmento de parlante.
    """
    transcriptions = {}

    if not os.path.exists(audio_entrada):
        raise FileNotFoundError("El archivo de audio no existe.")

    audio_info = mediainfo(audio_entrada)
    duration_s = audio_info.get("duration")
    if duration_s is None:
        raise ValueError("No se pudo obtener la duraci贸n del archivo de audio.")

    duration_s = float(duration_s)
    print(f"Duraci贸n del audio: {duration_s} s")
    if duration_s < 10:
        raise ValueError(
            f"La duraci贸n del audio es demasiado corta para realizar una transcripci贸n. Duraci贸n: {duration_s} s"
        )

    # Cargar el archivo de audio usando whisperx.load_audio
    audio = whisperx.load_audio(audio_entrada)

    # Realizar la transcripci贸n completa del audio
    transcripcion_completa = transcribe_wav(audio)
    transcriptions["full"] = transcripcion_completa

    # Inicializar el pipeline de diarizaci贸n de parlantes
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1", use_auth_token=hf_token
    )
    pipeline.to(torch.device(device))

    # Realizar la diarizaci贸n de los parlantes en el audio
    with open(audio_entrada, "rb") as f:
        diarization = pipeline(f)

    # Escribir los resultados de la diarizaci贸n en un archivo de texto
    with open("audio.txt", "w") as lab:
        diarization.write_lab(lab)

    # Leer los resultados de la diarizaci贸n desde el archivo de texto
    with open("audio.txt", "r") as file:
        lines = file.readlines()

    # Procesar los resultados de la diarizaci贸n
    data = []
    for line in lines:
        parts = line.strip().split()
        data.append(
            {"start": float(parts[0]), "end": float(parts[1]), "speaker": parts[2]}
        )

    # Guardar los resultados de la diarizaci贸n en un archivo JSON
    with open("audio.json", "w") as file:
        json.dump(data, file, indent=4)

    # Cargar el archivo de audio original usando AudioSegment
    audio_segment = AudioSegment.from_wav(audio_entrada)

    # Leer los resultados de la diarizaci贸n desde el archivo JSON
    with open("audio.json", "r") as file:
        diarization = json.load(file)

    # Segmentar el audio en base a los resultados de la diarizaci贸n
    segments = {}
    for segment in diarization:
        speaker = segment["speaker"]
        if speaker not in segments:
            segments[speaker] = []
        start = int(segment["start"] * 1000)
        end = int(segment["end"] * 1000)
        segments[speaker].append(audio_segment[start:end])

    # Transcribir cada segmento de parlante y guardar los resultados
    for speaker, segs in segments.items():
        result = sum(segs, AudioSegment.empty())
        result.export(f"{speaker}.wav", format="wav")
        transcriptions[speaker] = transcribe_wav(f"{speaker}.wav")

    # Eliminar los archivos temporales creados
    os.remove("audio.txt")
    os.remove(audio_entrada)
    os.remove("audio.json")
    for speaker in segments.keys():
        os.remove(f"{speaker}.wav")

    return transcriptions


@app.get("/")
def read_root():
    return {"message": "隆Bienvenido a la API de transcripci贸n de audio!"}


@app.post("/transcribe")
def transcribe_audio(audio: UploadFile = File(...)):
    """
    Transcribe un archivo de audio y devuelve el texto transcrito.

    Parameters:
    :param audio: UploadFile: Archivo de audio a transcribir.

    Returns:
    JSONResponse: Respuesta JSON que contiene el texto transcrito.
    """
    # Guardar el archivo de audio en wav con la libreria AudioSegment
    audio_segment = AudioSegment.from_file(audio.file)
    audio_segment.export("audio.wav", format="wav")
    return segment_and_transcribe_audio("audio.wav")


# if __name__ == "__main__":

#     subprocess.run(
#         ["uvicorn", "app:app", "--host", "localhost", "--port", "7860", "--reload"]
#     )