import streamlit as st from audio_recorder_streamlit import audio_recorder from faster_whisper import WhisperModel from pyannote.audio import Pipeline import pyannote.core from collections import defaultdict # Función para asignar hablantes a segmentos de transcripción def assign_speakers_to_segments(diarization, transcription_segments): speaker_segments = [] # Convertir diarización en un diccionario con los tiempos de inicio y fin de cada hablante diarization_dict = defaultdict(list) for segment, _, speaker in diarization.itertracks(yield_label=True): diarization_dict[speaker].append((segment.start, segment.end)) for transcription_segment in transcription_segments: start, end = transcription_segment.start, transcription_segment.end speakers_count = defaultdict(float) # Contar la duración de cada hablante dentro del segmento de transcripción for speaker, times in diarization_dict.items(): for seg_start, seg_end in times: # Calcular la intersección del tiempo de hablante con el segmento de transcripción overlap_start = max(start, seg_start) overlap_end = min(end, seg_end) overlap_duration = max(0, overlap_end - overlap_start) speakers_count[speaker] += overlap_duration # Elegir el hablante con la mayor duración total en el segmento if speakers_count: speaker = max(speakers_count, key=speakers_count.get) else: speaker = "Unknown" # Añadir el texto del segmento de transcripción y el hablante correspondiente speaker_segments.append((speaker, transcription_segment.text)) return speaker_segments # Función principal de la aplicación Streamlit def main(): logo_url = "https://wallnergroup.com/wp-content/uploads/2023/06/logo-wallner-2048x270.png" st.image(logo_url) st.markdown("""
Identificación de Participantes y Transcripción de Reuniones.
by Wallner Group """, unsafe_allow_html=True) audio_ready = False # Variable para indicar si el audio está listo para ser procesado # Widget de grabación de audio audio_bytes = audio_recorder(pause_threshold=10.0, sample_rate=16_000, text="Click para grabar") if audio_bytes: # Guardar el archivo de audio grabado with open("audio_temp.wav", "wb") as f: f.write(audio_bytes) audio_ready = True # Opción para cargar archivo de audio audio_file = st.file_uploader("Alternativamente, puedes cargar un archivo de audio", type=['wav']) if audio_file is not None: # Guardar el archivo de audio cargado with open("audio_temp.wav", "wb") as f: f.write(audio_file.getbuffer()) audio_ready = True # Procesar el audio si está listo if audio_ready: # Cargar y ejecutar los modelos speaker_segments = process_audio("audio_temp.wav") # Mostrar los resultados for speaker, text in speaker_segments: st.write(f"{speaker}: {text}") # Función para procesar el audio def process_audio(audio_path): # Cargar el modelo de diarización pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=st.secrets["hfapi"] ) # Cargar el modelo de transcripción model = WhisperModel("large-v3", device="cpu", compute_type="int8") # Ejecutar la diarización y la transcripción diarization = pipeline(audio_path) segments, info = model.transcribe(audio_path, language="es") # Asignar hablantes a segmentos de transcripción return assign_speakers_to_segments(diarization, segments) if __name__ == "__main__": main()