Spaces:
Runtime error
Runtime error
################################################################################################# | |
# Taking code from https://huggingface.co/spaces/vumichien/Whisper_speaker_diarization/blob/main/app.py | |
from faster_whisper import WhisperModel | |
#import datetime | |
#import subprocess | |
import gradio as gr | |
from pathlib import Path | |
import pandas as pd | |
#import re | |
import time | |
import os | |
import numpy as np | |
from sklearn.cluster import AgglomerativeClustering | |
from sklearn.metrics import silhouette_score | |
#from pytube import YouTube | |
#import yt_dlp | |
import torch | |
#import pyannote.audio | |
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding | |
from pyannote.audio import Audio | |
from pyannote.core import Segment | |
from gpuinfo import GPUInfo | |
import wave | |
import contextlib | |
from transformers import pipeline | |
import psutil | |
import whisperx | |
import gc | |
def doWhisperX(audio_file, whisper_model="large-v2", language="es"): | |
if language == "Cualquiera": | |
language = None | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
#audio_file = "audio.mp3" | |
batch_size = 16 # reduce if low on GPU mem | |
compute_type = "float16" # change to "int8" if low on GPU mem (may reduce accuracy) | |
# 1. Transcribe with original whisper (batched) | |
model = whisperx.load_model(whisper_model, device, compute_type=compute_type) | |
audio = whisperx.load_audio(audio_file) | |
result_whisper = model.transcribe(audio, language=language, batch_size=batch_size) | |
#print(result_whisper["segments"]) # before alignment | |
# delete model if low on GPU resources | |
# import gc; gc.collect(); torch.cuda.empty_cache(); del model | |
# 2. Align whisper output | |
model_a, metadata = whisperx.load_align_model(language_code=result_whisper["language"], device=device) | |
result_aligned = whisperx.align(result_whisper["segments"], model_a, metadata, audio, device, return_char_alignments=False) | |
#print(result_aligned) # after alignment | |
# delete model if low on GPU resources | |
# import gc; gc.collect(); torch.cuda.empty_cache(); del model_a | |
# 3. Assign speaker labels | |
diarize_model = whisperx.DiarizationPipeline(use_auth_token=os.environ['HF_TOKEN'], device=device) | |
# add min/max number of speakers if known | |
diarize_segments = diarize_model(audio) | |
# diarize_model(audio, min_speakers=min_speakers, max_speakers=max_speakers) | |
result_speakers = whisperx.assign_word_speakers(diarize_segments, result_aligned) | |
#print(diarize_segments) | |
#print(result["segments"]) # segments are now assigned speaker IDs | |
return result_whisper, result_aligned, result_speakers, diarize_segments | |
embedding_model = PretrainedSpeakerEmbedding( | |
"speechbrain/spkrec-ecapa-voxceleb", | |
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")) | |
def fast_transcription(audio_file, whisper_model, language): | |
""" | |
# Transcribe youtube link using OpenAI Whisper | |
1. Using Open AI's Whisper model to seperate audio into segments and generate transcripts. | |
2. Generating speaker embeddings for each segments. | |
3. Applying agglomerative clustering on the embeddings to identify the speaker for each segment. | |
Speech Recognition is based on models from OpenAI Whisper https://github.com/openai/whisper | |
Speaker diarization model and pipeline from by https://github.com/pyannote/pyannote-audio | |
""" | |
# model = whisper.load_model(whisper_model) | |
# model = WhisperModel(whisper_model, device="cuda", compute_type="int8_float16") | |
model = WhisperModel(whisper_model, compute_type="int8") | |
time_start = time.time() | |
# if(video_file_path == None): | |
# raise ValueError("Error no video input") | |
# print(video_file_path) | |
try: | |
# Get duration | |
with contextlib.closing(wave.open(audio_file,'r')) as f: | |
frames = f.getnframes() | |
rate = f.getframerate() | |
duration = frames / float(rate) | |
print(f"conversion to wav ready, duration of audio file: {duration}") | |
# Transcribe audio | |
options = dict(language=language, beam_size=5, best_of=5, word_timestamps=True) | |
transcribe_options = dict(task="transcribe", **options) | |
segments_generator, info = model.transcribe(audio_file, **transcribe_options) | |
#segments_raw, info = model.transcribe(audio_file, **transcribe_options) | |
# # Convert back to original openai format | |
# segments = [] | |
# i = 0 | |
# for segment_chunk in segments_raw: | |
# chunk = {} | |
# chunk["start"] = segment_chunk.start | |
# chunk["end"] = segment_chunk.end | |
# chunk["text"] = segment_chunk.text | |
# segments.append(chunk) | |
# i += 1 | |
# print("transcribe audio done with fast whisper") | |
segments = [] | |
for segment in segments_generator: | |
segments.append(segment) | |
# if progress_listener is not None: | |
# progress_listener.on_progress(segment.end, info.duration) | |
# if verbose: | |
# print("[{}->{}] {}".format(format_timestamp(segment.start, True), format_timestamp(segment.end, True), | |
# segment.text)) | |
text = " ".join([segment.text for segment in segments]) | |
# Convert the segments to a format that is easier to serialize | |
whisper_segments = [{ | |
"text": segment.text, | |
"start": segment.start, | |
"end": segment.end, | |
# Extra fields added by faster-whisper | |
"words": [{ | |
"start": word.start, | |
"end": word.end, | |
"word": word.word, | |
"probability": word.probability | |
} for word in (segment.words if segment.words is not None else []) ] | |
} for segment in segments] | |
result = { | |
"segments": whisper_segments, | |
"text": text, | |
"language": info.language if info else None, | |
# Extra fields added by faster-whisper | |
"language_probability": info.language_probability if info else None, | |
"duration": info.duration if info else None | |
} | |
except Exception as e: | |
raise RuntimeError("Error converting video to audio") | |
#text from the list | |
return result | |
#return [str(s["start"]) + " " + s["text"] for s in segments] #pd.DataFrame(segments) | |
import datetime | |
def convert_time(secs): | |
return datetime.timedelta(seconds=round(secs)) | |
def speech_to_text(audio_file, selected_source_lang, whisper_model, num_speakers): | |
""" | |
# Transcribe youtube link using OpenAI Whisper | |
1. Using Open AI's Whisper model to seperate audio into segments and generate transcripts. | |
2. Generating speaker embeddings for each segments. | |
3. Applying agglomerative clustering on the embeddings to identify the speaker for each segment. | |
Speech Recognition is based on models from OpenAI Whisper https://github.com/openai/whisper | |
Speaker diarization model and pipeline from by https://github.com/pyannote/pyannote-audio | |
""" | |
# model = whisper.load_model(whisper_model) | |
# model = WhisperModel(whisper_model, device="cuda", compute_type="int8_float16") | |
model = WhisperModel(whisper_model, compute_type="int8") | |
time_start = time.time() | |
# if(video_file_path == None): | |
# raise ValueError("Error no video input") | |
# print(video_file_path) | |
try: | |
# # Read and convert youtube video | |
# _,file_ending = os.path.splitext(f'{video_file_path}') | |
# print(f'file enging is {file_ending}') | |
# audio_file = video_file_path.replace(file_ending, ".wav") | |
# print("starting conversion to wav") | |
# os.system(f'ffmpeg -i "{video_file_path}" -ar 16000 -ac 1 -c:a pcm_s16le "{audio_file}"') | |
# Get duration | |
with contextlib.closing(wave.open(audio_file,'r')) as f: | |
frames = f.getnframes() | |
rate = f.getframerate() | |
duration = frames / float(rate) | |
print(f"conversion to wav ready, duration of audio file: {duration}") | |
# Transcribe audio | |
options = dict(language=selected_source_lang, beam_size=5, best_of=5) | |
transcribe_options = dict(task="transcribe", **options) | |
segments_raw, info = model.transcribe(audio_file, **transcribe_options) | |
# Convert back to original openai format | |
segments = [] | |
i = 0 | |
for segment_chunk in segments_raw: | |
chunk = {} | |
chunk["start"] = segment_chunk.start | |
chunk["end"] = segment_chunk.end | |
chunk["text"] = segment_chunk.text | |
segments.append(chunk) | |
i += 1 | |
print("transcribe audio done with fast whisper") | |
except Exception as e: | |
raise RuntimeError("Error converting video to audio") | |
try: | |
# Create embedding | |
def segment_embedding(segment): | |
audio = Audio() | |
start = segment["start"] | |
# Whisper overshoots the end timestamp in the last segment | |
end = min(duration, segment["end"]) | |
clip = Segment(start, end) | |
waveform, sample_rate = audio.crop(audio_file, clip) | |
return embedding_model(waveform[None]) | |
embeddings = np.zeros(shape=(len(segments), 192)) | |
for i, segment in enumerate(segments): | |
embeddings[i] = segment_embedding(segment) | |
embeddings = np.nan_to_num(embeddings) | |
print(f'Embedding shape: {embeddings.shape}') | |
if num_speakers == 0: | |
# Find the best number of speakers | |
score_num_speakers = {} | |
for num_speakers in range(2, 10+1): | |
clustering = AgglomerativeClustering(num_speakers).fit(embeddings) | |
score = silhouette_score(embeddings, clustering.labels_, metric='euclidean') | |
score_num_speakers[num_speakers] = score | |
best_num_speaker = max(score_num_speakers, key=lambda x:score_num_speakers[x]) | |
print(f"The best number of speakers: {best_num_speaker} with {score_num_speakers[best_num_speaker]} score") | |
else: | |
best_num_speaker = num_speakers | |
# Assign speaker label | |
clustering = AgglomerativeClustering(best_num_speaker).fit(embeddings) | |
labels = clustering.labels_ | |
for i in range(len(segments)): | |
segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1) | |
# Make output | |
objects = { | |
'Start' : [], | |
'End': [], | |
'Speaker': [], | |
'Text': [] | |
} | |
text = '' | |
for (i, segment) in enumerate(segments): | |
if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]: | |
objects['Start'].append(str(convert_time(segment["start"]))) | |
objects['Speaker'].append(segment["speaker"]) | |
if i != 0: | |
objects['End'].append(str(convert_time(segments[i - 1]["end"]))) | |
objects['Text'].append(text) | |
text = '' | |
text += segment["text"] + ' ' | |
objects['End'].append(str(convert_time(segments[i - 1]["end"]))) | |
objects['Text'].append(text) | |
time_end = time.time() | |
time_diff = time_end - time_start | |
memory = psutil.virtual_memory() | |
gpu_utilization, gpu_memory = GPUInfo.gpu_usage() | |
gpu_utilization = gpu_utilization[0] if len(gpu_utilization) > 0 else 0 | |
gpu_memory = gpu_memory[0] if len(gpu_memory) > 0 else 0 | |
system_info = f""" | |
*Memory: {memory.total / (1024 * 1024 * 1024):.2f}GB, used: {memory.percent}%, available: {memory.available / (1024 * 1024 * 1024):.2f}GB.* | |
*Processing time: {time_diff:.5} seconds.* | |
*GPU Utilization: {gpu_utilization}%, GPU Memory: {gpu_memory}MiB.* | |
""" | |
save_path = "transcript_result.csv" | |
df_results = pd.DataFrame(objects) | |
#df_results.to_csv(save_path) | |
return df_results, system_info, save_path | |
except Exception as e: | |
raise RuntimeError("Error Running inference with local model", e) | |
# ####################################################################### | |
# def fast_whisper(audio_file, whisper_model="large_v2", language="es"): | |
# return out |