wav2text / app.py
asdrolf
ok
7f097eb
raw
history blame
3.03 kB
import streamlit as st
from faster_whisper import WhisperModel
from pyannote.audio import Pipeline
import pyannote.core
from collections import defaultdict
# Función para asignar hablantes a segmentos de transcripción
def assign_speakers_to_segments(diarization, transcription_segments):
speaker_segments = []
# Convertir diarización en un diccionario con los tiempos de inicio y fin de cada hablante
diarization_dict = defaultdict(list)
for segment, _, speaker in diarization.itertracks(yield_label=True):
diarization_dict[speaker].append((segment.start, segment.end))
for transcription_segment in transcription_segments:
start, end = transcription_segment.start, transcription_segment.end
speakers_count = defaultdict(float)
# Contar la duración de cada hablante dentro del segmento de transcripción
for speaker, times in diarization_dict.items():
for seg_start, seg_end in times:
# Calcular la intersección del tiempo de hablante con el segmento de transcripción
overlap_start = max(start, seg_start)
overlap_end = min(end, seg_end)
overlap_duration = max(0, overlap_end - overlap_start)
speakers_count[speaker] += overlap_duration
# Elegir el hablante con la mayor duración total en el segmento
if speakers_count:
speaker = max(speakers_count, key=speakers_count.get)
else:
speaker = "Unknown"
# Añadir el texto del segmento de transcripción y el hablante correspondiente
speaker_segments.append((speaker, transcription_segment.text))
return speaker_segments
# Función principal de la aplicación Streamlit
def main():
st.title("Identificacion de participantes y transcipcion de reuniones.")
# Opción para cargar archivo de audio
audio_file = st.file_uploader("Cargar archivo de audio", type=['wav'])
if audio_file is not None:
# Guardar el archivo de audio cargado
with open("audio_temp.wav", "wb") as f:
f.write(audio_file.getbuffer())
# Cargar y ejecutar los modelos
speaker_segments = process_audio("audio_temp.wav")
# Mostrar los resultados
for speaker, text in speaker_segments:
st.write(f"Speaker {speaker}: {text}")
# Función para procesar el audio
def process_audio(audio_path):
# Cargar el modelo de diarización
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=st.secrets["hfapi"]
)
# Cargar el modelo de transcripción
model = WhisperModel("large-v3", device="cpu", compute_type="int8")
# Ejecutar la diarización y la transcripción
diarization = pipeline(audio_path)
segments, info = model.transcribe(audio_path, language="es")
# Asignar hablantes a segmentos de transcripción
return assign_speakers_to_segments(diarization, segments)
if __name__ == "__main__":
main()