seba3y commited on
Commit
be1b9b7
1 Parent(s): 7ee2023

Upload 4 files

Browse files
Files changed (4) hide show
  1. app.py +166 -0
  2. model.py +243 -0
  3. requirements.txt +9 -0
  4. wav2vec_aligen.py +51 -0
app.py ADDED
@@ -0,0 +1,166 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ # from logic import Speaker_speech_analysis
3
+ from scipy.io import wavfile
4
+ from wav2vec_aligen import speaker_pronunciation_assesment
5
+
6
+
7
+
8
+ def create_html_from_scores(word_levels):
9
+ html_output = ''
10
+ for word, level in word_levels:
11
+ if level == '/':
12
+ html_output += f'<span style="color: #0000ff;">{level}</span> '
13
+ elif level == 'Wrong':
14
+ html_output += f'<span style="color: #dc3545;">{word}</span> '
15
+ elif level == 'Understandable':
16
+ html_output += f'<span style="color: #ffc107;">{word}</span> '
17
+ else:
18
+ html_output += f'<span style="color: #28a745;">{word}</span> '
19
+ return html_output
20
+
21
+ def generate_progress_bar(score, label):
22
+ score = round(score, 2)
23
+ score_text = f"{score:.2f}" if score < 100 else "100"
24
+ if score < 30:
25
+ bar_color = "#dc3545"
26
+ elif score < 60:
27
+ bar_color = "#dc6545"
28
+ elif score < 80:
29
+ bar_color = "#ffc107"
30
+ else:
31
+ bar_color = "#28a745"
32
+ bar_length = f"{(score / 100) * 100}%"
33
+ return f"""
34
+ <div class="progress-label">{label}:</div>
35
+ <div class="progress-container">
36
+ <div class="progress-bar" style="width: {bar_length}; background-color: {bar_color};">
37
+ <div class="progress-score">{score_text}</div>
38
+ </div>
39
+ </div>
40
+ <div class="progress-max">Max: 100</div>
41
+ """
42
+ # CSS to be used in the Gradio Interface
43
+
44
+
45
+
46
+
47
+ def analyze_audio(text, audio):
48
+ # Write the processed audio to a temporary WAV file
49
+ if text is None or audio is None:
50
+ return 'the audio or the text is missing'
51
+ temp_filename = 'temp_audio.wav'
52
+ wavfile.write(temp_filename, audio[0], audio[1])
53
+
54
+
55
+ result = speaker_pronunciation_assesment(temp_filename)
56
+ accuracy_score = result['pronunciation_accuracy']
57
+ fluency_score = result['fluency_score']
58
+ total_score = result['total_score']
59
+ content_scores = result['content_scores']
60
+
61
+ pronunciation_progress_bar = generate_progress_bar(accuracy_score, "Pronunciation Accuracy")
62
+ fluency_progress_bar = generate_progress_bar(fluency_score, "Fluency Score")
63
+ content_progress_bar = generate_progress_bar(content_scores, "Content Score")
64
+ total_progress_bar = generate_progress_bar(total_score, "Total Score")
65
+
66
+
67
+ html_with_css = f"""
68
+ <style>
69
+ .legend {{
70
+ font-size: 22px;
71
+ display: flex;
72
+ align-items: center;
73
+ gap: 12px;
74
+ }}
75
+
76
+ .legend-dot {{
77
+ height: 15px;
78
+ width: 15px;
79
+ border-radius: 50%;
80
+ display: inline-block;
81
+ }}
82
+
83
+ .good {{ color: #28a745;
84
+ }}
85
+ .average {{ color: #ffc107;
86
+ }}
87
+ .bad {{ color: #dc3545;
88
+ }}
89
+
90
+ .wrong {{ color: #dc3545;
91
+ }}
92
+
93
+ .text {{
94
+ font-size: 20px;
95
+ margin-bottom: 20px;
96
+ }}
97
+
98
+ .progress-container {{
99
+ width: 100%;
100
+ background-color: #ddd;
101
+ border-radius: 13px;
102
+ overflow: hidden;
103
+ }}
104
+
105
+ .progress-bar {{
106
+ height: 30px;
107
+ line-height: 30px;
108
+ text-align: center;
109
+ font-size: 16px;
110
+ border-radius: 15px;
111
+ transition: width 1s ease;
112
+ }}
113
+
114
+ .progress-label {{
115
+ font-weight: bold;
116
+ font-size: 22px;
117
+ margin-bottom: 20px;
118
+ margin-top: 5px;
119
+ text-align: center;
120
+ }}
121
+
122
+ .progress-score {{
123
+ display: inline-block;
124
+ color: black;
125
+ }}
126
+
127
+ .progress-max {{
128
+ text-align: right;
129
+ margin: 10px;
130
+ font-size: 16px;
131
+ }}
132
+
133
+ </style>
134
+
135
+
136
+ <div class="legend">
137
+ <span class="legend-dot" style="background-color: #28a745;"></span><span>Good</span>
138
+ <span class="legend-dot" style="background-color: #ffc107;"></span><span>Understandable</span>
139
+ <span class="legend-dot" style="background-color: #dc3545;"></span><span>Bad</span>
140
+ <span class="legend-dot" style="background-color: #0000ff;"></span><span>No Speech</span>
141
+ </div>
142
+
143
+ {total_progress_bar}
144
+ {pronunciation_progress_bar}
145
+ {fluency_progress_bar}
146
+ {content_progress_bar}
147
+ """
148
+ #
149
+
150
+ return html_with_css
151
+
152
+ # Define the Gradio interface
153
+ iface = gr.Interface(fn=analyze_audio,
154
+ inputs=[gr.Textbox(label='Training Text', placeholder='Write the text for pronunciation task', interactive=True, visible=True, show_copy_button=True,),
155
+ gr.Audio(label="Recoreded Audio", sources=['microphone', 'upload'])
156
+ ],
157
+ outputs=[gr.HTML(label="Analysis of pronunciation"),
158
+ ],
159
+ # css=additional_css,
160
+ # title="Audio Analysis Tool",
161
+ description="Write any text and recored an audio to predict pronunciation erors"
162
+ )
163
+
164
+ # Run the Gradio app
165
+ if __name__ == "__main__":
166
+ iface.launch(share=True)
model.py ADDED
@@ -0,0 +1,243 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from transformers import Wav2Vec2PreTrainedModel, Wav2Vec2Model
2
+ from transformers.modeling_outputs import CausalLMOutput
3
+ from typing import Optional, Tuple, Union
4
+ import warnings
5
+ import torch
6
+ import torch.nn as nn
7
+ import math
8
+
9
+
10
+
11
+ _HIDDEN_STATES_START_POSITION = 2
12
+
13
+ def _no_grad_trunc_normal_(tensor, mean, std, a, b):
14
+ # Cut & paste from PyTorch official master until it's in a few official releases - RW
15
+ # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf
16
+ def norm_cdf(x):
17
+ # Computes standard normal cumulative distribution function
18
+ return (1. + math.erf(x / math.sqrt(2.))) / 2.
19
+
20
+
21
+ with torch.no_grad():
22
+ # Values are generated by using a truncated uniform distribution and
23
+ # then using the inverse CDF for the normal distribution.
24
+ # Get upper and lower cdf values
25
+ l = norm_cdf((a - mean) / std)
26
+ u = norm_cdf((b - mean) / std)
27
+
28
+ # Uniformly fill tensor with values from [l, u], then translate to
29
+ # [2l-1, 2u-1].
30
+ tensor.uniform_(2 * l - 1, 2 * u - 1)
31
+
32
+ # Use inverse cdf transform for normal distribution to get truncated
33
+ # standard normal
34
+ tensor.erfinv_()
35
+
36
+ # Transform to proper mean, std
37
+ tensor.mul_(std * math.sqrt(2.))
38
+ tensor.add_(mean)
39
+
40
+ # Clamp to ensure it's in the proper range
41
+ tensor.clamp_(min=a, max=b)
42
+ return tensor
43
+
44
+ def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.):
45
+ return _no_grad_trunc_normal_(tensor, mean, std, a, b)
46
+
47
+
48
+ class Wav2Vec2ForWav2Vec2ForCTCAndUttranceRegression(Wav2Vec2PreTrainedModel):
49
+ def __init__(self, config, target_lang: Optional[str] = None):
50
+ super().__init__(config)
51
+
52
+ self.wav2vec2 = Wav2Vec2Model(config)
53
+ self.dropout = nn.Dropout(config.final_dropout)
54
+
55
+ self.target_lang = target_lang
56
+
57
+ if config.vocab_size is None:
58
+ raise ValueError(
59
+ f"You are trying to instantiate {self.__class__} with a configuration that "
60
+ "does not define the vocabulary size of the language model head. Please "
61
+ "instantiate the model as follows: `Wav2Vec2ForCTC.from_pretrained(..., vocab_size=vocab_size)`. "
62
+ "or define `vocab_size` of your model's configuration."
63
+ )
64
+ output_hidden_size = (
65
+ config.output_hidden_size if hasattr(config, "add_adapter") and config.add_adapter else config.hidden_size
66
+ )
67
+ self.lm_head = nn.Linear(output_hidden_size, config.vocab_size)
68
+
69
+ # utterance level, 1=accuracy, 2=fluency, 3=total score, 4=cotent
70
+ self.cls_token1 = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
71
+ self.mlp_head_utt1 = nn.Sequential(nn.LayerNorm(config.hidden_size), nn.Linear(config.hidden_size, 1))
72
+
73
+ self.cls_token2 = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
74
+ self.mlp_head_utt2 = nn.Sequential(nn.LayerNorm(config.hidden_size), nn.Linear(config.hidden_size, 1))
75
+
76
+ self.cls_token3 = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
77
+ self.mlp_head_utt3 = nn.Sequential(nn.LayerNorm(config.hidden_size), nn.Linear(config.hidden_size, 1))
78
+
79
+ self.cls_token4 = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
80
+ self.mlp_head_utt4 = nn.Sequential(nn.LayerNorm(config.hidden_size), nn.Linear(config.hidden_size, 1))
81
+ self.post_init()
82
+ # initialize the cls tokens
83
+ trunc_normal_(self.cls_token1, std=.092)
84
+ trunc_normal_(self.cls_token2, std=.01)
85
+ trunc_normal_(self.cls_token3, std=.052)
86
+ trunc_normal_(self.cls_token4, std=.02)
87
+ # Initialize weights and apply final processing
88
+
89
+
90
+ def tie_weights(self):
91
+ """
92
+ This method overwrites [`~PreTrainedModel.tie_weights`] so that adapter weights can be correctly loaded when
93
+ passing `target_lang=...` to `from_pretrained(...)`.
94
+
95
+ This method is **not** supposed to be called by the user and is prone to be changed in the future.
96
+ """
97
+
98
+ # Note that `tie_weights` is usually used to tie input and output embedding weights. The method is re-purposed to
99
+ # correctly load adapter layers for Wav2Vec2 so that we do not have to introduce a new API to
100
+ # [`PreTrainedModel`]. While slightly hacky, Wav2Vec2 never has to tie input and output embeddings, so that it is
101
+ # ok to repurpose this function here.
102
+ target_lang = self.target_lang
103
+
104
+ if target_lang is not None and getattr(self.config, "adapter_attn_dim", None) is None:
105
+ raise ValueError(f"Cannot pass `target_lang`: {target_lang} if `config.adapter_attn_dim` is not defined.")
106
+ elif target_lang is None and getattr(self.config, "adapter_attn_dim", None) is not None:
107
+ print("By default `target_lang` is set to 'eng'.")
108
+ elif target_lang is not None:
109
+ self.load_adapter(target_lang, force_load=True)
110
+
111
+
112
+ def freeze_feature_extractor(self):
113
+ """
114
+ Calling this function will disable the gradient computation for the feature encoder so that its parameters will
115
+ not be updated during training.
116
+ """
117
+ warnings.warn(
118
+ "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
119
+ "Please use the equivalent `freeze_feature_encoder` method instead.",
120
+ FutureWarning,
121
+ )
122
+ self.freeze_feature_encoder()
123
+
124
+ def freeze_feature_encoder(self):
125
+ """
126
+ Calling this function will disable the gradient computation for the feature encoder so that its parameter will
127
+ not be updated during training.
128
+ """
129
+ self.wav2vec2.feature_extractor._freeze_parameters()
130
+
131
+ def freeze_base_model(self):
132
+ """
133
+ Calling this function will disable the gradient computation for the base model so that its parameters will not
134
+ be updated during training. Only the classification head will be updated.
135
+ """
136
+ for param in self.wav2vec2.parameters():
137
+ param.requires_grad = False
138
+
139
+
140
+ def forward(
141
+ self,
142
+ input_values: Optional[torch.Tensor],
143
+ attention_mask: Optional[torch.Tensor] = None,
144
+ output_attentions: Optional[bool] = None,
145
+ output_hidden_states: Optional[bool] = None,
146
+ return_dict: Optional[bool] = None,
147
+ labels: Optional[torch.Tensor] = None,
148
+ ) -> Union[Tuple, CausalLMOutput]:
149
+ r"""
150
+ labels (`torch.LongTensor` of shape `(batch_size, target_length)`, *optional*):
151
+ Labels for connectionist temporal classification. Note that `target_length` has to be smaller or equal to
152
+ the sequence length of the output logits. Indices are selected in `[-100, 0, ..., config.vocab_size - 1]`.
153
+ All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ...,
154
+ config.vocab_size - 1]`.
155
+ """
156
+
157
+ return_dict = return_dict if return_dict is not None else self.config.use_return_dict
158
+ B, T = input_values.size()
159
+
160
+ extract_features = self.wav2vec2.feature_extractor(input_values)
161
+ extract_features = extract_features.transpose(1, 2)
162
+
163
+ if attention_mask is not None:
164
+ # compute reduced attention_mask corresponding to feature vectors
165
+ attention_mask = self.wav2vec2._get_feature_vector_attention_mask(
166
+ extract_features.shape[1], attention_mask, add_adapter=False
167
+ )
168
+
169
+ hidden_states, extract_features = self.wav2vec2.feature_projection(extract_features)
170
+ hidden_states = self.wav2vec2._mask_hidden_states(
171
+ hidden_states, mask_time_indices=None, attention_mask=attention_mask
172
+ )
173
+
174
+ cls_token1 = self.cls_token1.expand(B, -1, -1)
175
+ cls_token2 = self.cls_token2.expand(B, -1, -1)
176
+ cls_token3 = self.cls_token3.expand(B, -1, -1)
177
+ cls_token4 = self.cls_token4.expand(B, -1, -1)
178
+ hidden_states = torch.cat((cls_token1, cls_token2, cls_token3, cls_token4, hidden_states), dim=1) #cls_token4
179
+ # hidden_states = torch.cat((cls_token1, cls_token3, hidden_states), dim=1) #cls_token4
180
+ outputs = self.wav2vec2.encoder(
181
+ hidden_states,
182
+ attention_mask=attention_mask,
183
+ output_attentions=output_attentions,
184
+ output_hidden_states=output_hidden_states,
185
+ return_dict=return_dict,
186
+ )
187
+ hidden_states = outputs[0]
188
+ hidden_states = self.dropout(hidden_states)
189
+
190
+ # the first 4 tokens are utterance-level cls tokens, i.e., accuracy, fluency, total scores, content
191
+ u1 = self.mlp_head_utt1(hidden_states[:, 0])
192
+ u2 = self.mlp_head_utt2(hidden_states[:, 1])
193
+ u3 = self.mlp_head_utt3(hidden_states[:, 2])
194
+ u4 = self.mlp_head_utt4(hidden_states[:, 3])
195
+
196
+ logits = self.lm_head(hidden_states[:, 4:])
197
+
198
+ loss = None
199
+ if labels is not None:
200
+ labels, utt_label = labels['labels'], labels['utt_label'][:, :4]
201
+ if labels.max() >= self.config.vocab_size:
202
+ raise ValueError(f"Label values must be <= vocab_size: {self.config.vocab_size}")
203
+
204
+ # retrieve loss input_lengths from attention_mask
205
+ attention_mask = (
206
+ attention_mask if attention_mask is not None else torch.ones_like(input_values, dtype=torch.long)
207
+ )
208
+ input_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1)).to(torch.long)
209
+
210
+ # assuming that padded tokens are filled with -100
211
+ # when not being attended to
212
+ labels_mask = labels >= 0
213
+ target_lengths = labels_mask.sum(-1)
214
+ flattened_targets = labels.masked_select(labels_mask)
215
+
216
+ log_probs = nn.functional.log_softmax(logits, dim=-1, dtype=torch.float32).transpose(0, 1)
217
+
218
+ with torch.backends.cudnn.flags(enabled=False):
219
+ # utterance level loss, also mse
220
+ utt_preds = torch.cat((u1, u2, u3, u4), dim=1)
221
+ # utt_preds = torch.cat((u1, u2), dim=1)
222
+
223
+ loss_utt = nn.functional.mse_loss(utt_preds ,utt_label)
224
+
225
+
226
+ loss_ph = nn.functional.ctc_loss(
227
+ log_probs,
228
+ flattened_targets,
229
+ input_lengths,
230
+ target_lengths,
231
+ blank=self.config.pad_token_id,
232
+ reduction=self.config.ctc_loss_reduction,
233
+ zero_infinity=self.config.ctc_zero_infinity,
234
+ )
235
+ loss = loss_utt + loss_ph
236
+
237
+ if not return_dict:
238
+ output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:]
239
+ return ((loss,) + output) if loss is not None else output
240
+ # utterance level, 1=accuracy, 2=fluency, 3=total score, 4=content, , 'content': u4
241
+ return CausalLMOutput(
242
+ loss=loss, logits={'logits': logits, 'accuracy': u2, 'fluency': u1, 'total score': u3, 'content': u4}, hidden_states=outputs.hidden_states, attentions=outputs.attentions
243
+ )
requirements.txt ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ wave
2
+ torch
3
+ optimum
4
+ scipy
5
+ numpy
6
+ resampy
7
+ gradio
8
+ librosa
9
+ transformers
wav2vec_aligen.py ADDED
@@ -0,0 +1,51 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import torch
2
+ import librosa
3
+ import os
4
+ from model import Wav2Vec2ForWav2Vec2ForCTCAndUttranceRegression
5
+ from transformers import Wav2Vec2Processor
6
+ from optimum.bettertransformer import BetterTransformer
7
+
8
+ device = 'cuda' if torch.cuda.is_available() else 'cpu'
9
+ os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'
10
+ os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = '1'
11
+ os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
12
+ torch.random.manual_seed(0);
13
+ # protobuf==3.20.0
14
+
15
+ model_name = "seba3y/wav2vec-base-en-pronunciation-assesment"
16
+ processor = Wav2Vec2Processor.from_pretrained(model_name)
17
+ model = Wav2Vec2ForWav2Vec2ForCTCAndUttranceRegression.from_pretrained(model_name).to(device)
18
+ model = BetterTransformer.transform(model)
19
+
20
+ def load_audio(audio_path, processor):
21
+ audio, sr = librosa.load(audio_path, sr=16000)
22
+
23
+ input_values = processor(audio, sampling_rate=16000, return_tensors="pt").input_values
24
+ return input_values
25
+
26
+ @torch.inference_mode()
27
+ def get_emissions(input_values, model):
28
+ results = model(input_values,).logits
29
+ results.pop('logits')
30
+ return results
31
+
32
+
33
+ def speaker_pronunciation_assesment(audio_path):
34
+ input_values = load_audio(audio_path, processor)
35
+ result_scores = get_emissions(input_values, model)
36
+
37
+ content_scores = round(result_scores['content'].cpu().item())
38
+ pronunciation_score = round(result_scores['accuracy'].cpu().item())
39
+ fluency_score = round(result_scores['fluency'].cpu().item())
40
+ total_score = round(result_scores['total score'].cpu().item())
41
+
42
+
43
+ result = {'pronunciation_accuracy': pronunciation_score,
44
+ 'content_scores': content_scores,
45
+ 'total_score': total_score,
46
+ 'fluency_score': fluency_score}
47
+ return result
48
+
49
+ if __name__ == '__main__':
50
+ print(__naem__)
51
+