Spaces:
Sleeping
Sleeping
from dataclasses import dataclass | |
import torch | |
import librosa | |
import numpy as np | |
import os | |
import scipy.stats as stats | |
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python' | |
os.environ['MODEL_IS_LOADED'] = '0' | |
# os.environ['PHONEMIZER_ESPEAK_LIBRARY'] = "C:\Program Files\eSpeak NG\libespeak-ng.dll" | |
os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = '1' | |
os.environ['TRANSFORMERS_VERBOSITY'] = 'error' | |
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC | |
from optimum.bettertransformer import BetterTransformer | |
torch.random.manual_seed(0); | |
model_name = "facebook/wav2vec2-lv-60-espeak-cv-ft" | |
processor = Wav2Vec2Processor.from_pretrained(model_name, phone_delimiter_token=' ', word_delimiter_token=' ') | |
model = Wav2Vec2ForCTC.from_pretrained(model_name).to('cpu').eval() | |
model = BetterTransformer.transform(model) | |
class Point: | |
token_index: int | |
time_index: int | |
score: float | |
# Merge the labels | |
class Segment: | |
label: str | |
start: int | |
end: int | |
score: float | |
def __repr__(self): | |
return f"{self.label}\t({self.score:4.2f}): [{self.start:5d}, {self.end:5d}]" | |
def __len__(self): | |
return self.end - self.start | |
def get_trellis(emission, tokens, blank_id=0): | |
num_frame = emission.size(0) | |
num_tokens = len(tokens) | |
trellis = torch.zeros((num_frame, num_tokens)) | |
trellis[1:, 0] = torch.cumsum(emission[1:, blank_id], 0) | |
trellis[0, 1:] = -float("inf") | |
trellis[-num_tokens + 1 :, 0] = float("inf") | |
for t in range(num_frame - 1): | |
trellis[t + 1, 1:] = torch.maximum(trellis[t, 1:] + emission[t, blank_id], # Score for staying at the same token | |
trellis[t, :-1] + emission[t, tokens[1:]], # Score for changing to the next token | |
) | |
return trellis | |
def backtrack(trellis, emission, tokens, blank_id=0): | |
t, j = trellis.size(0) - 1, trellis.size(1) - 1 | |
aligenment_path = [Point(j, t, emission[t, blank_id].exp().item())] | |
while j > 0: | |
# Should not happen but just in case | |
assert t > 0 | |
# 1. Figure out if the current position was stay or change | |
# Frame-wise score of stay vs change | |
p_stay = emission[t - 1, blank_id] | |
p_change = emission[t - 1, tokens[j]] | |
# Context-aware score for stay vs change | |
stayed = trellis[t - 1, j] + p_stay | |
changed = trellis[t - 1, j - 1] + p_change | |
# Update position | |
t -= 1 | |
if changed > stayed: | |
j -= 1 | |
# Store the aligenment_path with frame-wise probability. | |
prob = (p_change if changed > stayed else p_stay).exp().item() | |
aligenment_path.append(Point(j, t, prob)) | |
# Now j == 0, which means, it reached the SoS. | |
# Fill up the rest for the sake of visualization | |
while t > 0: | |
prob = emission[t - 1, blank_id].exp().item() | |
aligenment_path.append(Point(j, t - 1, prob)) | |
t -= 1 | |
return aligenment_path[::-1] | |
def merge_repeats(aligenment_path, ph): | |
i1, i2 = 0, 0 | |
segments = [] | |
while i1 < len(aligenment_path): | |
while i2 < len(aligenment_path) and aligenment_path[i1].token_index == aligenment_path[i2].token_index: | |
i2 += 1 | |
score = sum(aligenment_path[k].score for k in range(i1, i2)) / (i2 - i1) | |
segments.append( | |
Segment( | |
ph[aligenment_path[i1].token_index], | |
aligenment_path[i1].time_index, | |
aligenment_path[i2 - 1].time_index + 1, | |
score, | |
) | |
) | |
i1 = i2 | |
return segments | |
def load_model(device='cpu'): | |
model_name = "facebook/wav2vec2-lv-60-espeak-cv-ft" | |
processor = Wav2Vec2Processor.from_pretrained(model_name, phone_delimiter_token=' ', word_delimiter_token=' ') | |
model = Wav2Vec2ForCTC.from_pretrained(model_name).to(device).eval() | |
model = BetterTransformer.transform(model) | |
return processor, model | |
def load_audio(audio_path, processor): | |
audio, sr = librosa.load(audio_path, sr=16000) | |
input_values = processor(audio, sampling_rate=16000, return_tensors="pt").input_values | |
return input_values | |
def get_emissions(input_values, model): | |
emissions = model(input_values,).logits | |
emissions = torch.log_softmax(emissions, dim=-1) | |
emission = emissions[0].cpu().detach() | |
return emission | |
def get_chnocial_phonemes(transcript, processor): | |
transcript = transcript.replace('from the', 'from | the') | |
phoneme_ids = processor.tokenizer(transcript).input_ids | |
ph = processor.tokenizer.phonemize(transcript) | |
phoneme_list = ph.replace(' ', ' ').split() | |
transcript = transcript.replace('from | the', 'from the') | |
words = transcript.split() | |
words_phonemes = ph.split(' ') | |
words_phoneme_mapping = [(w, p) for w, p in zip(words, words_phonemes)] | |
return phoneme_list, phoneme_ids, words_phoneme_mapping | |
def word_level_scoring(words_phoneme_mapping, segments): | |
word_scores = [] | |
start = 0 | |
for word, ph_seq in words_phoneme_mapping: | |
n_ph = len(ph_seq.split()) | |
cum_score = 0 | |
wrong = 0 | |
for i in range(start, start + n_ph): | |
s = segments[i] | |
cum_score += s.score | |
if s.score < 0.50: | |
wrong += 1 | |
start += n_ph | |
word_scores.append((word, np.round(cum_score / n_ph, 5), np.round(wrong / n_ph, 5))) | |
return word_scores | |
def map_word2_class(word_scores): | |
word_levels = [] | |
for w, sc, wrong_ratio in word_scores: | |
if wrong_ratio > 0.5 or sc < 0.60: | |
word_levels.append((w, '/')) | |
elif sc < 0.70: | |
word_levels.append((w, 'Wrong')) | |
elif sc < 0.85: | |
word_levels.append((w, 'Understandable')) | |
else: | |
word_levels.append((w, 'Correct')) | |
return word_levels | |
def calculate_content_scores(word_levels): | |
content_scores = len(word_levels) | |
for w, c in word_levels: | |
if c == '/': | |
content_scores -= 1 | |
elif c == 'Wrong': | |
content_scores -= 0.5 | |
else:None | |
content_scores = (content_scores / len(word_levels)) * 100 | |
return content_scores | |
def calculate_sentence_pronunciation_accuracy(word_scores): | |
w_scores = 0 | |
error_scores = 0 | |
for w, sc, wrong_ratio in word_scores: | |
sc = sc * 100 | |
if sc > 60: | |
if sc < 70: | |
sc = ((sc - 60) / (70 - 60)) * (20 - 0) + 0 | |
elif sc < 88: | |
sc = ((sc - 70) / (88 - 70)) * (70 - 20) + 20 | |
else: | |
sc = ((sc - 88) / (100 - 88)) * (100 - 70) + 70 | |
w_scores += sc | |
error_scores += wrong_ratio | |
w_scores = (w_scores / len(word_scores)) | |
# w_scores =( (w_scores - 50) / (100 - 50)) * 100 | |
error_scores = (error_scores / len(word_scores)) * 40 | |
pronunciation_accuracy = min(w_scores, w_scores - error_scores) | |
return pronunciation_accuracy | |
def get_hard_aligenment_with_scores(input_values, transcript): | |
# processor, model = load_model(device='cpu') | |
emission = get_emissions(input_values, model) | |
phoneme_list, phoneme_ids, words_phoneme_mapping = get_chnocial_phonemes(transcript, processor) | |
trellis = get_trellis(emission, phoneme_ids) | |
aligenment_path = backtrack(trellis, emission, phoneme_ids) | |
segments = merge_repeats(aligenment_path, phoneme_list) | |
return segments, words_phoneme_mapping | |
def normalize_aspect(value, mean, std): | |
""" Normalize an aspect of speech using normal distribution. """ | |
return stats.norm(mean, std).cdf(value) | |
def calculate_fluency_scores(audio, total_words, content_score, pron_score): | |
# Constants | |
content_score, pron_score = content_score / 100, pron_score / 100 | |
sample_rate = 16000 # Assuming a sample rate of 16 kHz | |
# Define means and standard deviations for fluent speech | |
speech_rate_mean, speech_rate_std = 170, 50 | |
phonation_time_mean, phonation_time_std = 50, 4 | |
# Calculate speaking and total duration | |
non_silent_intervals = librosa.effects.split(audio, top_db=20) | |
speaking_time = sum([intv[1] - intv[0] for intv in non_silent_intervals]) / sample_rate | |
total_duration = len(audio) / sample_rate | |
# Phonation time ratio | |
phonation_time_ratio = speaking_time / total_duration * 60 | |
phonation_time_ratio = normalize_aspect(phonation_time_ratio, phonation_time_mean, phonation_time_std) | |
if phonation_time_ratio > 0.5: | |
phonation_time_ratio = 0.5 - (phonation_time_ratio - 0.5) | |
phonation_time_ratio = (phonation_time_ratio / 0.5) * 1 | |
speech_rate = (total_words / (total_duration / 60)) | |
speech_rate = speech_rate * content_score | |
speech_rate_score = normalize_aspect(speech_rate, speech_rate_mean, speech_rate_std) | |
if speech_rate_score > 0.5: | |
speech_rate_score = 0.5 - (speech_rate_score - 0.5) | |
speech_rate_score = (speech_rate_score / 0.5) * 1 | |
w_rate_score = 0.4 | |
w_pho_ratio = 0.35 | |
w_pro = 0.25 | |
scaled_fluency_score = speech_rate_score * w_rate_score + phonation_time_ratio * w_pho_ratio + pron_score * w_pro | |
scaled_fluency_score = scaled_fluency_score * 100 | |
return scaled_fluency_score, speech_rate | |
def speaker_pronunciation_assesment(audio_path, transcript): | |
input_values = load_audio(audio_path, processor) | |
segments, words_phoneme_mapping = get_hard_aligenment_with_scores(input_values, transcript) | |
word_scores = word_level_scoring(words_phoneme_mapping, segments) | |
word_levels = map_word2_class(word_scores) | |
content_scores = calculate_content_scores(word_levels) | |
pronunciation_accuracy = calculate_sentence_pronunciation_accuracy(word_scores) | |
fluency_accuracy, wpm = calculate_fluency_scores(input_values[0], len(word_scores), content_scores, pronunciation_accuracy) | |
result = {'pronunciation_accuracy': pronunciation_accuracy, | |
'word_levels': word_levels, | |
'content_scores': content_scores, | |
'wpm': wpm, | |
'stress': None, | |
'fluency_score': fluency_accuracy} | |
return result | |
if __name__ == '__main__': | |
MODEL_IS_LOADED = False | |
else: | |
MODEL_IS_LOADED = False | |