|
import streamlit as st |
|
from faster_whisper import WhisperModel |
|
import datetime |
|
import subprocess |
|
from pathlib import Path |
|
import pandas as pd |
|
import re |
|
import time |
|
import os |
|
import numpy as np |
|
from sklearn.cluster import AgglomerativeClustering |
|
from sklearn.metrics import silhouette_score |
|
|
|
import torch |
|
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding |
|
from pyannote.audio import Audio |
|
from pyannote.core import Segment |
|
|
|
|
|
import wave |
|
import contextlib |
|
from transformers import pipeline |
|
from huggingface_hub import hf_hub_download |
|
from transformers import AutoTokenizer |
|
import onnxruntime |
|
import numpy as np |
|
import librosa |
|
|
|
whisper_models = ["tiny", "base", "small", "medium", "large-v1", "large-v2"] |
|
source_languages = {"en": "English"} |
|
|
|
MODEL_NAME = "vumichien/whisper-medium-jp" |
|
lang = "en" |
|
|
|
device = 0 if torch.cuda.is_available() else "cpu" |
|
|
|
embedding_model = PretrainedSpeakerEmbedding( |
|
"speechbrain/spkrec-ecapa-voxceleb", |
|
device=torch.device("cuda")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def segment_embedding(segment, duration, audio_file): |
|
audio = Audio() |
|
start = segment["start"] |
|
|
|
end = min(duration, segment["end"]) |
|
clip = Segment(start, end) |
|
waveform, sample_rate = audio.crop(audio_file, clip) |
|
return embedding_model(waveform[None]) |
|
|
|
def fast_whisper(audio_file, model): |
|
|
|
options = dict(language=lang, beam_size=5, best_of=5) |
|
transcribe_options = dict(task="transcribe", **options) |
|
segments_raw, info = model.transcribe(audio_file, **transcribe_options) |
|
|
|
|
|
segments = [] |
|
i = 0 |
|
for segment_chunk in segments_raw: |
|
chunk = {} |
|
chunk["start"] = segment_chunk.start |
|
chunk["end"] = segment_chunk.end |
|
chunk["text"] = segment_chunk.text |
|
segments.append(chunk) |
|
i += 1 |
|
print("transcribe audio done with fast whisper") |
|
|
|
return segments |
|
|
|
def get_embeddings(segments, duration, audio_file): |
|
embeddings = np.zeros(shape=(len(segments), 192)) |
|
for i, segment in enumerate(segments): |
|
embeddings[i] = segment_embedding(segment, duration, audio_file) |
|
embeddings = np.nan_to_num(embeddings) |
|
|
|
print("Got embeddings for segments") |
|
return embeddings |
|
|
|
def get_n_speakers(embeddings, num_speakers): |
|
if num_speakers == 0: |
|
|
|
score_num_speakers = {} |
|
|
|
for num_speakers in range(2, 10+1): |
|
clustering = AgglomerativeClustering(num_speakers).fit(embeddings) |
|
score = silhouette_score(embeddings, clustering.labels_, metric='euclidean') |
|
score_num_speakers[num_speakers] = score |
|
best_num_speaker = max(score_num_speakers, key=lambda x:score_num_speakers[x]) |
|
print(f"The best number of speakers: {best_num_speaker} with {score_num_speakers[best_num_speaker]} score") |
|
else: |
|
best_num_speaker = num_speakers |
|
|
|
print(f"best num speakers is {best_num_speaker}") |
|
|
|
return best_num_speaker |
|
|
|
def assign_speaker(best_num_speaker, embeddings, segments): |
|
|
|
clustering = AgglomerativeClustering(best_num_speaker).fit(embeddings) |
|
labels = clustering.labels_ |
|
for i in range(len(segments)): |
|
segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1) |
|
|
|
print(f"I know who said what now") |
|
return segments |
|
|
|
def convert_time(secs): |
|
return datetime.timedelta(seconds=round(secs)) |
|
|
|
def segments2df(segments): |
|
|
|
objects = { |
|
'Start' : [], |
|
'End': [], |
|
'Speaker': [], |
|
'Text': [] |
|
} |
|
text = '' |
|
for (i, segment) in enumerate(segments): |
|
if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]: |
|
objects['Start'].append(str(convert_time(segment["start"]))) |
|
objects['Speaker'].append(segment["speaker"]) |
|
if i != 0: |
|
objects['End'].append(str(convert_time(segments[i - 1]["end"]))) |
|
objects['Text'].append(text) |
|
text = '' |
|
text += segment["text"] + ' ' |
|
objects['End'].append(str(convert_time(segments[i - 1]["end"]))) |
|
objects['Text'].append(text) |
|
|
|
df_results = pd.DataFrame(objects) |
|
|
|
return df_results |
|
|
|
|
|
def speech_to_text(audio_file, whisper_model, num_speakers=0): |
|
model = WhisperModel(whisper_model, compute_type="int8") |
|
time_start = time.time() |
|
|
|
if(audio_file == None): raise ValueError("Error no audio_file") |
|
|
|
model = WhisperModel(whisper_model, compute_type="int8") |
|
y, sr = librosa.load(audio_file) |
|
duration = len(y)/sr |
|
segments = fast_whisper(audio_file, model) |
|
embeddings = get_embeddings(segments, duration, audio_file) |
|
best_num_speaker = get_n_speakers(embeddings, num_speakers) |
|
segments = assign_speaker(best_num_speaker, embeddings, segments) |
|
diary = segments2df(segments) |
|
|
|
return diary |
|
|
|
onnx_path = hf_hub_download(repo_id='UKP-SQuARE/roberta-base-pf-boolq-onnx', filename='model.onnx') |
|
onnx_model = onnxruntime.InferenceSession(onnx_path, providers=['CPUExecutionProvider']) |
|
|
|
question = 'Can she answer' |
|
tokenizer = AutoTokenizer.from_pretrained('UKP-SQuARE/roberta-base-pf-boolq-onnx') |
|
|
|
def answer(context, question): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return "please use the other app" |
|
|
|
|
|
uploaded_file = st.sidebar.file_uploader("Choose a file") |
|
num_speakers = st.sidebar.slider("num speakers (0 means auto detect)", 0, 10, 0) |
|
diary = None |
|
question = None |
|
if uploaded_file is not None: |
|
filename = uploaded_file.name |
|
with open(filename, "wb") as f: |
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
if st.sidebar.checkbox('Get conversation'): |
|
torch.cuda.empty_cache() |
|
whisper_model = "base" |
|
diary = speech_to_text(filename, whisper_model, num_speakers=num_speakers) |
|
|
|
st.dataframe(diary.style.highlight_max(axis=0)) |
|
|
|
|
|
question = st.sidebar.text_input('Question', 'Can she answer') |
|
if st.sidebar.button('Answer'): |
|
diary["text_all"] = diary["Speaker"] + ": "+ diary["Text"] |
|
context = " \n ".join(diary["text_all"].to_list()) |
|
outputs = answer(context, question) |
|
|
|
outputs = outputs[0][0] |
|
if outputs[0]>outputs[1]: st.sidebar.write("Answer is Yes") |
|
if outputs[0]<outputs[1]: st.sidebar.write("Answer is No") |
|
|