import whisper import gradio as gr import datetime import subprocess import torch import pyannote.audio from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding from pyannote.audio import Audio from pyannote.core import Segment import wave import contextlib from sklearn.cluster import AgglomerativeClustering import numpy as np model = whisper.load_model("large-v2") embedding_model = PretrainedSpeakerEmbedding( "speechbrain/spkrec-ecapa-voxceleb", device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') ) def bulk_transcribe(files): output="" number=1 for i in files: output+='Archivo '+str(number)+'\n'+transcribe(i.name)+'\n' number+=1 with open('Transcripción.txt', 'w') as file: file.write(output) return 'Transcrición.txt', output def transcribe(audio): num_speakers=3 path, error = convert_to_wav(audio) if error is not None: return error duration = get_duration(path) if duration > 4 * 60 * 60: return "La duración del audio es muy larga" result = model.transcribe(path) segments = result["segments"] num_speakers = min(max(round(num_speakers), 1), len(segments)) if len(segments) == 1: segments[0]['speaker'] = 'HABLANTE 1' else: embeddings = make_embeddings(path, segments, duration) add_speaker_labels(segments, embeddings, num_speakers) output = get_output(segments) return output def convert_to_wav(path): if path[-3:] != 'wav': new_path = '.'.join(path.split('.')[:-1]) + '.wav' try: subprocess.call(['ffmpeg', '-i', path, new_path, '-y']) except: return path, 'Error: No se pudo convertir archivo a .wav' path = new_path return path, None def get_duration(path): with contextlib.closing(wave.open(path,'r')) as f: frames = f.getnframes() rate = f.getframerate() return frames / float(rate) def make_embeddings(path, segments, duration): embeddings = np.zeros(shape=(len(segments), 192)) for i, segment in enumerate(segments): embeddings[i] = segment_embedding(path, segment, duration) return np.nan_to_num(embeddings) audio = Audio() def segment_embedding(path, segment, duration): start = segment["start"] # Whisper overshoots the end timestamp in the last segment end = min(duration, segment["end"]) clip = Segment(start, end) waveform, sample_rate = audio.crop(path, clip) return embedding_model(waveform[None]) def add_speaker_labels(segments, embeddings, num_speakers): clustering = AgglomerativeClustering(num_speakers).fit(embeddings) labels = clustering.labels_ for i in range(len(segments)): segments[i]["speaker"] = 'HABLANTE ' + str(labels[i] + 1) def time(secs): return datetime.timedelta(seconds=round(secs)) def get_output(segments): output = '' for (i, segment) in enumerate(segments): if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]: if i != 0: output += '\n\n' output += segment["speaker"] + ' ' + str(time(segment["start"])) + '\n\n' output += segment["text"][1:] + ' ' return output gr.Interface( title = 'Reconocimiento de hablantes con Whisper en Español', fn=bulk_transcribe, inputs=gr.File(file_count="multiple", file_types=["audio"]), outputs=[gr.File(), gr.Textbox(label='Transcripción')] ).launch()