import whisper import datetime import subprocess import wave import contextlib import torch import pyannote.audio from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding from pyannote.audio import Audio from pyannote.core import Segment from sklearn.cluster import AgglomerativeClustering import numpy as np model = whisper.load_model("large-v2") embedding_model = PretrainedSpeakerEmbedding( "speechbrain/spkrec-ecapa-voxceleb", device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') ) def transcribe(audio, num_speakers): print(type(audio)) path, error = convert_to_wav(audio) if error is not None: return error duration = get_duration(path) if duration > 4 * 60 * 60: return "Audio duration too long" result = model.transcribe(path) segments = result["segments"] num_speakers = min(max(round(num_speakers), 1), len(segments)) if len(segments) == 1: segments[0]['speaker'] = 'SPEAKER 1' else: embeddings = make_embeddings(path, segments, duration) add_speaker_labels(segments, embeddings, num_speakers) output = get_output(segments) return output def convert_to_wav(path): if path[-3:] != 'wav': new_path = '.'.join(path.split('.')[:-1]) + '.wav' try: subprocess.call(['ffmpeg', '-i', path, new_path, '-y']) except: return path, 'Error: Could not convert file to .wav' path = new_path return path, None def get_duration(path): with contextlib.closing(wave.open(path,'r')) as f: frames = f.getnframes() rate = f.getframerate() return frames / float(rate) def make_embeddings(path, segments, duration): embeddings = np.zeros(shape=(len(segments), 192)) for i, segment in enumerate(segments): embeddings[i] = segment_embedding(path, segment, duration) return np.nan_to_num(embeddings) audio = Audio() def segment_embedding(path, segment, duration): start = segment["start"] # Whisper overshoots the end timestamp in the last segment end = min(duration, segment["end"]) clip = Segment(start, end) waveform, sample_rate = audio.crop(path, clip) return embedding_model(waveform[None]) def add_speaker_labels(segments, embeddings, num_speakers): """Add speaker labels""" clustering = AgglomerativeClustering(num_speakers).fit(embeddings) labels = clustering.labels_ for i in range(len(segments)): segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1) def time(secs): """Function to return time delta""" return datetime.timedelta(seconds=round(secs)) def get_output(segments): """Format and generate the output string""" output = '' for (i, segment) in enumerate(segments): if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]: if i != 0: output += '\n\n' output += segment["speaker"] + ' ' + str(time(segment["start"])) + '\n' output += segment["text"][1:] + ' ' return output