seba3y commited on
Commit
1fbfcdb
1 Parent(s): be1b9b7

Upload 2 files

Browse files
Files changed (2) hide show
  1. model.py +54 -200
  2. wav2vec_aligen.py +12 -12
model.py CHANGED
@@ -1,243 +1,97 @@
1
- from transformers import Wav2Vec2PreTrainedModel, Wav2Vec2Model
2
- from transformers.modeling_outputs import CausalLMOutput
3
  from typing import Optional, Tuple, Union
4
- import warnings
5
  import torch
6
  import torch.nn as nn
7
- import math
8
 
9
-
10
-
11
- _HIDDEN_STATES_START_POSITION = 2
12
-
13
- def _no_grad_trunc_normal_(tensor, mean, std, a, b):
14
- # Cut & paste from PyTorch official master until it's in a few official releases - RW
15
- # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf
16
- def norm_cdf(x):
17
- # Computes standard normal cumulative distribution function
18
- return (1. + math.erf(x / math.sqrt(2.))) / 2.
19
-
20
-
21
- with torch.no_grad():
22
- # Values are generated by using a truncated uniform distribution and
23
- # then using the inverse CDF for the normal distribution.
24
- # Get upper and lower cdf values
25
- l = norm_cdf((a - mean) / std)
26
- u = norm_cdf((b - mean) / std)
27
-
28
- # Uniformly fill tensor with values from [l, u], then translate to
29
- # [2l-1, 2u-1].
30
- tensor.uniform_(2 * l - 1, 2 * u - 1)
31
-
32
- # Use inverse cdf transform for normal distribution to get truncated
33
- # standard normal
34
- tensor.erfinv_()
35
-
36
- # Transform to proper mean, std
37
- tensor.mul_(std * math.sqrt(2.))
38
- tensor.add_(mean)
39
-
40
- # Clamp to ensure it's in the proper range
41
- tensor.clamp_(min=a, max=b)
42
- return tensor
43
-
44
- def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.):
45
- return _no_grad_trunc_normal_(tensor, mean, std, a, b)
46
-
47
-
48
- class Wav2Vec2ForWav2Vec2ForCTCAndUttranceRegression(Wav2Vec2PreTrainedModel):
49
- def __init__(self, config, target_lang: Optional[str] = None):
50
  super().__init__(config)
51
 
52
- self.wav2vec2 = Wav2Vec2Model(config)
53
- self.dropout = nn.Dropout(config.final_dropout)
54
-
55
- self.target_lang = target_lang
56
-
57
- if config.vocab_size is None:
58
  raise ValueError(
59
- f"You are trying to instantiate {self.__class__} with a configuration that "
60
- "does not define the vocabulary size of the language model head. Please "
61
- "instantiate the model as follows: `Wav2Vec2ForCTC.from_pretrained(..., vocab_size=vocab_size)`. "
62
- "or define `vocab_size` of your model's configuration."
63
  )
64
- output_hidden_size = (
65
- config.output_hidden_size if hasattr(config, "add_adapter") and config.add_adapter else config.hidden_size
66
- )
67
- self.lm_head = nn.Linear(output_hidden_size, config.vocab_size)
68
-
69
- # utterance level, 1=accuracy, 2=fluency, 3=total score, 4=cotent
70
- self.cls_token1 = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
71
- self.mlp_head_utt1 = nn.Sequential(nn.LayerNorm(config.hidden_size), nn.Linear(config.hidden_size, 1))
72
-
73
- self.cls_token2 = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
74
- self.mlp_head_utt2 = nn.Sequential(nn.LayerNorm(config.hidden_size), nn.Linear(config.hidden_size, 1))
75
-
76
- self.cls_token3 = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
77
- self.mlp_head_utt3 = nn.Sequential(nn.LayerNorm(config.hidden_size), nn.Linear(config.hidden_size, 1))
78
-
79
- self.cls_token4 = nn.Parameter(torch.zeros(1, 1, config.hidden_size))
80
- self.mlp_head_utt4 = nn.Sequential(nn.LayerNorm(config.hidden_size), nn.Linear(config.hidden_size, 1))
81
- self.post_init()
82
- # initialize the cls tokens
83
- trunc_normal_(self.cls_token1, std=.092)
84
- trunc_normal_(self.cls_token2, std=.01)
85
- trunc_normal_(self.cls_token3, std=.052)
86
- trunc_normal_(self.cls_token4, std=.02)
87
- # Initialize weights and apply final processing
88
-
89
-
90
- def tie_weights(self):
91
- """
92
- This method overwrites [`~PreTrainedModel.tie_weights`] so that adapter weights can be correctly loaded when
93
- passing `target_lang=...` to `from_pretrained(...)`.
94
-
95
- This method is **not** supposed to be called by the user and is prone to be changed in the future.
96
- """
97
-
98
- # Note that `tie_weights` is usually used to tie input and output embedding weights. The method is re-purposed to
99
- # correctly load adapter layers for Wav2Vec2 so that we do not have to introduce a new API to
100
- # [`PreTrainedModel`]. While slightly hacky, Wav2Vec2 never has to tie input and output embeddings, so that it is
101
- # ok to repurpose this function here.
102
- target_lang = self.target_lang
103
-
104
- if target_lang is not None and getattr(self.config, "adapter_attn_dim", None) is None:
105
- raise ValueError(f"Cannot pass `target_lang`: {target_lang} if `config.adapter_attn_dim` is not defined.")
106
- elif target_lang is None and getattr(self.config, "adapter_attn_dim", None) is not None:
107
- print("By default `target_lang` is set to 'eng'.")
108
- elif target_lang is not None:
109
- self.load_adapter(target_lang, force_load=True)
110
-
111
-
112
- def freeze_feature_extractor(self):
113
- """
114
- Calling this function will disable the gradient computation for the feature encoder so that its parameters will
115
- not be updated during training.
116
- """
117
- warnings.warn(
118
- "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
119
- "Please use the equivalent `freeze_feature_encoder` method instead.",
120
- FutureWarning,
121
- )
122
- self.freeze_feature_encoder()
123
 
124
- def freeze_feature_encoder(self):
125
- """
126
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
127
- not be updated during training.
128
- """
129
- self.wav2vec2.feature_extractor._freeze_parameters()
130
 
131
  def freeze_base_model(self):
132
  """
133
  Calling this function will disable the gradient computation for the base model so that its parameters will not
134
  be updated during training. Only the classification head will be updated.
135
  """
136
- for param in self.wav2vec2.parameters():
137
  param.requires_grad = False
138
 
139
-
 
140
  def forward(
141
  self,
142
- input_values: Optional[torch.Tensor],
143
  attention_mask: Optional[torch.Tensor] = None,
144
  output_attentions: Optional[bool] = None,
145
  output_hidden_states: Optional[bool] = None,
146
  return_dict: Optional[bool] = None,
147
  labels: Optional[torch.Tensor] = None,
148
- ) -> Union[Tuple, CausalLMOutput]:
149
  r"""
150
- labels (`torch.LongTensor` of shape `(batch_size, target_length)`, *optional*):
151
- Labels for connectionist temporal classification. Note that `target_length` has to be smaller or equal to
152
- the sequence length of the output logits. Indices are selected in `[-100, 0, ..., config.vocab_size - 1]`.
153
- All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ...,
154
- config.vocab_size - 1]`.
155
  """
156
 
157
  return_dict = return_dict if return_dict is not None else self.config.use_return_dict
158
- B, T = input_values.size()
159
-
160
- extract_features = self.wav2vec2.feature_extractor(input_values)
161
- extract_features = extract_features.transpose(1, 2)
162
-
163
- if attention_mask is not None:
164
- # compute reduced attention_mask corresponding to feature vectors
165
- attention_mask = self.wav2vec2._get_feature_vector_attention_mask(
166
- extract_features.shape[1], attention_mask, add_adapter=False
167
- )
168
-
169
- hidden_states, extract_features = self.wav2vec2.feature_projection(extract_features)
170
- hidden_states = self.wav2vec2._mask_hidden_states(
171
- hidden_states, mask_time_indices=None, attention_mask=attention_mask
172
- )
173
 
174
- cls_token1 = self.cls_token1.expand(B, -1, -1)
175
- cls_token2 = self.cls_token2.expand(B, -1, -1)
176
- cls_token3 = self.cls_token3.expand(B, -1, -1)
177
- cls_token4 = self.cls_token4.expand(B, -1, -1)
178
- hidden_states = torch.cat((cls_token1, cls_token2, cls_token3, cls_token4, hidden_states), dim=1) #cls_token4
179
- # hidden_states = torch.cat((cls_token1, cls_token3, hidden_states), dim=1) #cls_token4
180
- outputs = self.wav2vec2.encoder(
181
- hidden_states,
182
  attention_mask=attention_mask,
183
  output_attentions=output_attentions,
184
  output_hidden_states=output_hidden_states,
185
  return_dict=return_dict,
186
  )
187
- hidden_states = outputs[0]
188
- hidden_states = self.dropout(hidden_states)
189
 
190
- # the first 4 tokens are utterance-level cls tokens, i.e., accuracy, fluency, total scores, content
191
- u1 = self.mlp_head_utt1(hidden_states[:, 0])
192
- u2 = self.mlp_head_utt2(hidden_states[:, 1])
193
- u3 = self.mlp_head_utt3(hidden_states[:, 2])
194
- u4 = self.mlp_head_utt4(hidden_states[:, 3])
195
-
196
- logits = self.lm_head(hidden_states[:, 4:])
 
 
 
 
 
 
 
 
 
 
 
197
 
198
  loss = None
199
  if labels is not None:
200
- labels, utt_label = labels['labels'], labels['utt_label'][:, :4]
201
- if labels.max() >= self.config.vocab_size:
202
- raise ValueError(f"Label values must be <= vocab_size: {self.config.vocab_size}")
203
-
204
- # retrieve loss input_lengths from attention_mask
205
- attention_mask = (
206
- attention_mask if attention_mask is not None else torch.ones_like(input_values, dtype=torch.long)
207
- )
208
- input_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1)).to(torch.long)
209
-
210
- # assuming that padded tokens are filled with -100
211
- # when not being attended to
212
- labels_mask = labels >= 0
213
- target_lengths = labels_mask.sum(-1)
214
- flattened_targets = labels.masked_select(labels_mask)
215
-
216
- log_probs = nn.functional.log_softmax(logits, dim=-1, dtype=torch.float32).transpose(0, 1)
217
-
218
- with torch.backends.cudnn.flags(enabled=False):
219
- # utterance level loss, also mse
220
- utt_preds = torch.cat((u1, u2, u3, u4), dim=1)
221
- # utt_preds = torch.cat((u1, u2), dim=1)
222
-
223
- loss_utt = nn.functional.mse_loss(utt_preds ,utt_label)
224
-
225
-
226
- loss_ph = nn.functional.ctc_loss(
227
- log_probs,
228
- flattened_targets,
229
- input_lengths,
230
- target_lengths,
231
- blank=self.config.pad_token_id,
232
- reduction=self.config.ctc_loss_reduction,
233
- zero_infinity=self.config.ctc_zero_infinity,
234
- )
235
- loss = loss_utt + loss_ph
236
 
237
  if not return_dict:
238
  output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:]
239
  return ((loss,) + output) if loss is not None else output
240
- # utterance level, 1=accuracy, 2=fluency, 3=total score, 4=content, , 'content': u4
241
- return CausalLMOutput(
242
- loss=loss, logits={'logits': logits, 'accuracy': u2, 'fluency': u1, 'total score': u3, 'content': u4}, hidden_states=outputs.hidden_states, attentions=outputs.attentions
 
 
 
243
  )
 
1
+ from transformers import Wav2Vec2BertPreTrainedModel, Wav2Vec2BertModel
2
+ from transformers.modeling_outputs import SequenceClassifierOutput
3
  from typing import Optional, Tuple, Union
4
+ from torch.nn import MSELoss
5
  import torch
6
  import torch.nn as nn
 
7
 
8
+ class Wav2Vec2BertForSequenceClassification(Wav2Vec2BertPreTrainedModel):
9
+ # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2ForSequenceClassification.__init__ with Wav2Vec2->Wav2Vec2Bert,wav2vec2->wav2vec2_bert
10
+ def __init__(self, config):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  super().__init__(config)
12
 
13
+ if hasattr(config, "add_adapter") and config.add_adapter:
 
 
 
 
 
14
  raise ValueError(
15
+ "Sequence classification does not support the use of Wav2Vec2Bert adapters (config.add_adapter=True)"
 
 
 
16
  )
17
+ self.wav2vec2_bert = Wav2Vec2BertModel(config)
18
+ num_layers = config.num_hidden_layers + 1 # transformer layers + input embeddings
19
+ if config.use_weighted_layer_sum:
20
+ self.layer_weights = nn.Parameter(torch.ones(num_layers) / num_layers)
21
+ self.projector = nn.Linear(config.hidden_size, config.classifier_proj_size)
22
+ self.classifier = nn.Linear(config.classifier_proj_size, config.num_labels)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
+ # Initialize weights and apply final processing
25
+ self.post_init()
 
 
 
 
26
 
27
  def freeze_base_model(self):
28
  """
29
  Calling this function will disable the gradient computation for the base model so that its parameters will not
30
  be updated during training. Only the classification head will be updated.
31
  """
32
+ for param in self.wav2vec2_bert.parameters():
33
  param.requires_grad = False
34
 
35
+
36
+ # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2ForSequenceClassification.forward with Wav2Vec2->Wav2Vec2Bert,wav2vec2->wav2vec2_bert,WAV_2_VEC_2->WAV2VEC2_BERT, input_values->input_features
37
  def forward(
38
  self,
39
+ input_features: Optional[torch.Tensor],
40
  attention_mask: Optional[torch.Tensor] = None,
41
  output_attentions: Optional[bool] = None,
42
  output_hidden_states: Optional[bool] = None,
43
  return_dict: Optional[bool] = None,
44
  labels: Optional[torch.Tensor] = None,
45
+ ) -> Union[Tuple, SequenceClassifierOutput]:
46
  r"""
47
+ labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
48
+ Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
49
+ config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
50
+ `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
 
51
  """
52
 
53
  return_dict = return_dict if return_dict is not None else self.config.use_return_dict
54
+ output_hidden_states = True if self.config.use_weighted_layer_sum else output_hidden_states
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
 
56
+ outputs = self.wav2vec2_bert(
57
+ input_features,
 
 
 
 
 
 
58
  attention_mask=attention_mask,
59
  output_attentions=output_attentions,
60
  output_hidden_states=output_hidden_states,
61
  return_dict=return_dict,
62
  )
 
 
63
 
64
+ if self.config.use_weighted_layer_sum:
65
+ hidden_states = outputs[_HIDDEN_STATES_START_POSITION]
66
+ hidden_states = torch.stack(hidden_states, dim=1)
67
+ norm_weights = nn.functional.softmax(self.layer_weights, dim=-1)
68
+ hidden_states = (hidden_states * norm_weights.view(-1, 1, 1)).sum(dim=1)
69
+ else:
70
+ hidden_states = outputs[0]
71
+
72
+ hidden_states = self.projector(hidden_states)
73
+ if attention_mask is None:
74
+ pooled_output = hidden_states.mean(dim=1)
75
+ else:
76
+ padding_mask = self._get_feature_vector_attention_mask(hidden_states.shape[1], attention_mask)
77
+ hidden_states[~padding_mask] = 0.0
78
+ pooled_output = hidden_states.sum(dim=1) / padding_mask.sum(dim=1).view(-1, 1)
79
+
80
+ logits = self.classifier(pooled_output)
81
+ logits = nn.functional.relu(logits)
82
 
83
  loss = None
84
  if labels is not None:
85
+ loss_fct = MSELoss()
86
+ loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1, self.config.num_labels))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
 
88
  if not return_dict:
89
  output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:]
90
  return ((loss,) + output) if loss is not None else output
91
+
92
+ return SequenceClassifierOutput(
93
+ loss=loss,
94
+ logits=logits,
95
+ hidden_states=outputs.hidden_states,
96
+ attentions=outputs.attentions,
97
  )
wav2vec_aligen.py CHANGED
@@ -1,8 +1,8 @@
1
  import torch
2
  import librosa
3
  import os
4
- from model import Wav2Vec2ForWav2Vec2ForCTCAndUttranceRegression
5
- from transformers import Wav2Vec2Processor
6
  from optimum.bettertransformer import BetterTransformer
7
 
8
  device = 'cuda' if torch.cuda.is_available() else 'cpu'
@@ -12,21 +12,20 @@ os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
12
  torch.random.manual_seed(0);
13
  # protobuf==3.20.0
14
 
15
- model_name = "seba3y/wav2vec-base-en-pronunciation-assesment"
16
- processor = Wav2Vec2Processor.from_pretrained(model_name)
17
- model = Wav2Vec2ForWav2Vec2ForCTCAndUttranceRegression.from_pretrained(model_name).to(device)
18
  model = BetterTransformer.transform(model)
19
 
20
  def load_audio(audio_path, processor):
21
  audio, sr = librosa.load(audio_path, sr=16000)
22
 
23
- input_values = processor(audio, sampling_rate=16000, return_tensors="pt").input_values
24
  return input_values
25
 
26
  @torch.inference_mode()
27
  def get_emissions(input_values, model):
28
  results = model(input_values,).logits
29
- results.pop('logits')
30
  return results
31
 
32
 
@@ -34,10 +33,11 @@ def speaker_pronunciation_assesment(audio_path):
34
  input_values = load_audio(audio_path, processor)
35
  result_scores = get_emissions(input_values, model)
36
 
37
- content_scores = round(result_scores['content'].cpu().item())
38
- pronunciation_score = round(result_scores['accuracy'].cpu().item())
39
- fluency_score = round(result_scores['fluency'].cpu().item())
40
- total_score = round(result_scores['total score'].cpu().item())
 
41
 
42
 
43
  result = {'pronunciation_accuracy': pronunciation_score,
@@ -47,5 +47,5 @@ def speaker_pronunciation_assesment(audio_path):
47
  return result
48
 
49
  if __name__ == '__main__':
50
- print(__naem__)
51
 
 
1
  import torch
2
  import librosa
3
  import os
4
+ from model import Wav2Vec2BertForSequenceClassification
5
+ from transformers import AutoFeatureExtractor
6
  from optimum.bettertransformer import BetterTransformer
7
 
8
  device = 'cuda' if torch.cuda.is_available() else 'cpu'
 
12
  torch.random.manual_seed(0);
13
  # protobuf==3.20.0
14
 
15
+ model_name = "arslanarjumand/wav2vec-reptiles"
16
+ processor = AutoFeatureExtractor.from_pretrained(model_name)
17
+ model = Wav2Vec2BertForSequenceClassification.from_pretrained(model_name).to(device)
18
  model = BetterTransformer.transform(model)
19
 
20
  def load_audio(audio_path, processor):
21
  audio, sr = librosa.load(audio_path, sr=16000)
22
 
23
+ input_values = processor(audio, sampling_rate=16000, return_tensors="pt").input_features
24
  return input_values
25
 
26
  @torch.inference_mode()
27
  def get_emissions(input_values, model):
28
  results = model(input_values,).logits
 
29
  return results
30
 
31
 
 
33
  input_values = load_audio(audio_path, processor)
34
  result_scores = get_emissions(input_values, model)
35
 
36
+ pronunciation_score = round(result_scores[0].cpu().item())
37
+ fluency_score = round(result_scores[1].cpu().item())
38
+ total_score = round(result_scores[2].cpu().item())
39
+ content_scores = round(result_scores[3].cpu().item())
40
+
41
 
42
 
43
  result = {'pronunciation_accuracy': pronunciation_score,
 
47
  return result
48
 
49
  if __name__ == '__main__':
50
+ pass
51