wav2text / app.py
asdrolf
ok
3f3ba6a
import streamlit as st
from audio_recorder_streamlit import audio_recorder
from faster_whisper import WhisperModel
from pyannote.audio import Pipeline
import pyannote.core
from collections import defaultdict
# Función para asignar hablantes a segmentos de transcripción
def assign_speakers_to_segments(diarization, transcription_segments):
speaker_segments = []
# Convertir diarización en un diccionario con los tiempos de inicio y fin de cada hablante
diarization_dict = defaultdict(list)
for segment, _, speaker in diarization.itertracks(yield_label=True):
diarization_dict[speaker].append((segment.start, segment.end))
for transcription_segment in transcription_segments:
start, end = transcription_segment.start, transcription_segment.end
speakers_count = defaultdict(float)
# Contar la duración de cada hablante dentro del segmento de transcripción
for speaker, times in diarization_dict.items():
for seg_start, seg_end in times:
# Calcular la intersección del tiempo de hablante con el segmento de transcripción
overlap_start = max(start, seg_start)
overlap_end = min(end, seg_end)
overlap_duration = max(0, overlap_end - overlap_start)
speakers_count[speaker] += overlap_duration
# Elegir el hablante con la mayor duración total en el segmento
if speakers_count:
speaker = max(speakers_count, key=speakers_count.get)
else:
speaker = "Unknown"
# Añadir el texto del segmento de transcripción y el hablante correspondiente
speaker_segments.append((speaker, transcription_segment.text))
return speaker_segments
# Función principal de la aplicación Streamlit
def main():
logo_url = "https://wallnergroup.com/wp-content/uploads/2023/06/logo-wallner-2048x270.png"
st.image(logo_url)
st.markdown("""
<style>
.titulo {
font-size:24px !important;
font-weight: bold;
text-align: center;
margin: auto;
}
.bywall {
font-size:8px !important;
font-weight: bold;
text-align: center;
margin: auto; }
</style>
<div class='titulo'>
Identificación de Participantes y Transcripción de Reuniones.
</div>
<div class='bywall'>
by Wallner Group
""", unsafe_allow_html=True)
audio_ready = False # Variable para indicar si el audio está listo para ser procesado
# Widget de grabación de audio
audio_bytes = audio_recorder(pause_threshold=10.0, sample_rate=16_000, text="Click para grabar")
if audio_bytes:
# Guardar el archivo de audio grabado
with open("audio_temp.wav", "wb") as f:
f.write(audio_bytes)
audio_ready = True
# Opción para cargar archivo de audio
audio_file = st.file_uploader("Alternativamente, puedes cargar un archivo de audio", type=['wav'])
if audio_file is not None:
# Guardar el archivo de audio cargado
with open("audio_temp.wav", "wb") as f:
f.write(audio_file.getbuffer())
audio_ready = True
# Procesar el audio si está listo
if audio_ready:
# Cargar y ejecutar los modelos
speaker_segments = process_audio("audio_temp.wav")
# Mostrar los resultados
for speaker, text in speaker_segments:
st.write(f"{speaker}: {text}")
# Función para procesar el audio
def process_audio(audio_path):
# Cargar el modelo de diarización
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=st.secrets["hfapi"]
)
# Cargar el modelo de transcripción
model = WhisperModel("large-v3", device="cpu", compute_type="int8")
# Ejecutar la diarización y la transcripción
diarization = pipeline(audio_path)
segments, info = model.transcribe(audio_path, language="es")
# Asignar hablantes a segmentos de transcripción
return assign_speakers_to_segments(diarization, segments)
if __name__ == "__main__":
main()