import streamlit as st from faster_whisper import WhisperModel from pyannote.audio import Pipeline import pyannote.core from collections import defaultdict # Función para asignar hablantes a segmentos de transcripción def assign_speakers_to_segments(diarization, transcription_segments): speaker_segments = [] # Convertir diarización en un diccionario con los tiempos de inicio y fin de cada hablante diarization_dict = defaultdict(list) for segment, _, speaker in diarization.itertracks(yield_label=True): diarization_dict[speaker].append((segment.start, segment.end)) for transcription_segment in transcription_segments: start, end = transcription_segment.start, transcription_segment.end speakers_count = defaultdict(float) # Contar la duración de cada hablante dentro del segmento de transcripción for speaker, times in diarization_dict.items(): for seg_start, seg_end in times: # Calcular la intersección del tiempo de hablante con el segmento de transcripción overlap_start = max(start, seg_start) overlap_end = min(end, seg_end) overlap_duration = max(0, overlap_end - overlap_start) speakers_count[speaker] += overlap_duration # Elegir el hablante con la mayor duración total en el segmento if speakers_count: speaker = max(speakers_count, key=speakers_count.get) else: speaker = "Unknown" # Añadir el texto del segmento de transcripción y el hablante correspondiente speaker_segments.append((speaker, transcription_segment.text)) return speaker_segments # Función principal de la aplicación Streamlit def main(): st.title("Aplicación de Diarización y Transcripción de Audio") # Opción para cargar archivo de audio audio_file = st.file_uploader("Cargar archivo de audio", type=['wav']) if audio_file is not None: # Guardar el archivo de audio cargado with open("audio_temp.wav", "wb") as f: f.write(audio_file.getbuffer()) # Cargar y ejecutar los modelos speaker_segments = process_audio("audio_temp.wav") # Mostrar los resultados for speaker, text in speaker_segments: st.write(f"Speaker {speaker}: {text}") # Función para procesar el audio def process_audio(audio_path): # Cargar el modelo de diarización pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=st.secrets["hfapi"] ) # Cargar el modelo de transcripción model = WhisperModel("large-v3", device="cpu", compute_type="int8") # Ejecutar la diarización y la transcripción diarization = pipeline(audio_path) segments, info = model.transcribe(audio_path) # Asignar hablantes a segmentos de transcripción return assign_speakers_to_segments(diarization, segments) if __name__ == "__main__": main()