HK0712 commited on
Commit
8b00763
·
1 Parent(s): f254355

TEST: added timestamp(may not correct)

Browse files
Files changed (1) hide show
  1. analyzer/ASR_en_us.py +224 -126
analyzer/ASR_en_us.py CHANGED
@@ -1,60 +1,73 @@
1
- # ASR_en_us.py
2
-
3
  import torch
4
  import soundfile as sf
5
  import librosa
6
- from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
7
  import os
8
- from phonemizer import phonemize
9
  import numpy as np
10
  from datetime import datetime, timezone
11
 
12
- # 【【【【【 新增程式碼 #1:自動檢測可用設備 】】】】】
 
 
 
 
 
 
 
 
 
 
13
  DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
14
- print(f"INFO: ASR_fr_fr.py is configured to use device: {DEVICE}")
15
 
16
- # --- 1. 全域設定與模型載入函數 (保持不變) ---
17
  MODEL_NAME = "MultiBridge/wav2vec-LnNor-IPA-ft"
18
-
19
  processor = None
 
20
  model = None
21
 
22
  def load_model():
23
  """
24
- (方案 A) 讓 transformers 自動處理模型的下載、快取和加載。
25
- 它會自動使用 Dockerfile 中設定的 HF_HOME 環境變數。
 
26
  """
27
- global processor, model
 
28
  if processor and model:
29
  print(f"模型 '{MODEL_NAME}' 已載入,跳過。")
30
  return True
31
 
32
  print(f"正在準備 ASR 模型 '{MODEL_NAME}'...")
33
- print(f"Transformers 將自動在 HF_HOME 指定的快取中尋找或下載。")
34
  try:
35
- # 直接使用模型的線上名稱調用 from_pretrained
36
- # 這就是魔法發生的地方!
37
  processor = Wav2Vec2Processor.from_pretrained(MODEL_NAME)
38
  model = Wav2Vec2ForCTC.from_pretrained(MODEL_NAME)
39
-
40
  model.to(DEVICE)
 
 
 
 
 
 
 
 
 
41
  print(f"模型 '{MODEL_NAME}' 和處理器載入成功!")
42
  return True
43
  except Exception as e:
44
- print(f"處理或載入模型 '{MODEL_NAME}' 時發生錯誤: {e}")
45
  raise RuntimeError(f"Failed to load model '{MODEL_NAME}': {e}")
46
 
47
- # --- 2. 智能 IPA 切分函數 (已更新) ---
48
- # 移除了包含 'ː' 的組合,因為我們將在源頭移除它
49
  MULTI_CHAR_PHONEMES = {
50
- 'tʃ', 'dʒ', # 輔音 (Affricates)
51
- 'eɪ', 'aɪ', 'oʊ', 'aʊ', 'ɔɪ', # 雙元音 (Diphthongs)
52
- 'ɪə', 'eə', 'ʊə', 'ər' # R-controlled 和其他組合
53
  }
54
 
55
  def _tokenize_ipa(ipa_string: str) -> list:
56
  """
57
- IPA 字串智能地切分為音素列表,能正確處理多字元音素。
58
  """
59
  phonemes = []
60
  i = 0
@@ -68,132 +81,115 @@ def _tokenize_ipa(ipa_string: str) -> list:
68
  i += 1
69
  return phonemes
70
 
71
- # --- 3. 核心分析函數 (主入口) (已修改) ---
72
  def analyze(audio_file_path: str, target_sentence: str) -> dict:
73
  """
74
  接收音訊檔案路徑和目標句子,回傳詳細的發音分析字典。
75
- 這是此模組的主要進入點。
76
  """
77
  if not processor or not model:
78
- raise RuntimeError("模型尚未載入。請確保在呼叫 analyze 之前已成功執行 load_model()。")
79
-
80
- # =========================================================================
81
- # 第一步:執行您現有的、未經修改的完整分析流程
82
- # =========================================================================
83
- target_ipa_by_word_str = phonemize(target_sentence, language='en-us', backend='espeak', with_stress=True, strip=True).split()
 
 
 
 
 
 
84
  target_ipa_by_word = [
85
  _tokenize_ipa(word.replace('ˌ', '').replace('ˈ', '').replace('ː', ''))
86
  for word in target_ipa_by_word_str
87
  ]
88
  target_words_original = target_sentence.split()
89
 
 
90
  try:
91
  speech, sample_rate = sf.read(audio_file_path)
92
  if sample_rate != 16000:
93
  speech = librosa.resample(y=speech, orig_sr=sample_rate, target_sr=16000)
 
94
  except Exception as e:
95
  raise IOError(f"讀取或處理音訊時發生錯誤: {e}")
96
-
97
- input_values = processor(speech, sampling_rate=16000, return_tensors="pt").input_values
98
- input_values = input_values.to(DEVICE)
 
 
99
  with torch.no_grad():
100
  logits = model(input_values).logits
101
- predicted_ids = torch.argmax(logits, dim=-1)
102
-
103
- # 這是您原始程式碼的流程,我們先獲取不帶時間戳的辨識結果
104
- user_ipa_full = processor.decode(predicted_ids[0])
105
 
106
- # 使用您原始的對齊函數
 
107
  word_alignments = _get_phoneme_alignments_by_word(user_ipa_full, target_ipa_by_word)
108
-
109
- # 使用您原始的格式化函數,產生不含時間戳的初始結果
110
- initial_result = _format_to_json_structure(word_alignments, target_sentence, target_words_original)
111
-
112
-
113
- # =========================================================================
114
- # 第二步:獲取帶時間戳的辨識結果,用於後續的「注入」
115
- # =========================================================================
116
- transcription_with_offsets = processor.batch_decode(
117
- predicted_ids,
118
- output_char_offsets=True
119
- )
120
-
121
- char_offsets = transcription_with_offsets.char_offsets[0]
122
-
123
- # 建立一個從辨識出的音素到其時間戳的映射字典
124
- # 鍵是音素,值是時間戳物件列表(因為同一個音素可能出現多次)
125
- phoneme_to_ts_map = {}
126
- for offset in char_offsets:
127
- phoneme = offset['char']
128
- if phoneme not in phoneme_to_ts_map:
129
- phoneme_to_ts_map[phoneme] = []
130
-
131
- phoneme_to_ts_map[phoneme].append({
132
- "start_time": round(offset['start_offset'] * model.config.inputs_to_logits_ratio / 16000.0, 2),
133
- "end_time": round(offset['end_offset'] * model.config.inputs_to_logits_ratio / 16000.0, 2)
 
 
 
134
  })
135
 
136
- # =========================================================================
137
- # 第三步:將時間戳「注入」到初始結果中,完成最終輸出
138
- # =========================================================================
139
- # 複製映射字典,以便在遍歷時可以安全地從中移除已使用的時間戳
140
- ts_map_copy = {k: v[:] for k, v in phoneme_to_ts_map.items()}
141
-
142
- for word_data in initial_result["words"]:
143
- word_start_time = None
144
- word_end_time = None
145
-
146
- for phoneme_data in word_data["phonemes"]:
147
- user_phoneme = phoneme_data["user"]
148
-
149
- # 預設時間戳為 null
150
- phoneme_data["startTime"] = None
151
- phoneme_data["endTime"] = None
152
-
153
- # 如果使用者發音不是'-',且在時間戳映射中能找到
154
- if user_phoneme != '-' and user_phoneme in ts_map_copy and ts_map_copy[user_phoneme]:
155
- # 取出並移除第一個可用的時間戳(先進先出)
156
- ts = ts_map_copy[user_phoneme].pop(0)
157
-
158
- # 為音素注入時間戳
159
- phoneme_data["startTime"] = ts["start_time"]
160
- phoneme_data["endTime"] = ts["end_time"]
161
-
162
- # 更新單字的時間戳
163
- if word_start_time is None:
164
- word_start_time = ts["start_time"]
165
- word_end_time = ts["end_time"] # 不斷更新為最後一個音素的結束時間
166
-
167
- # 為單字注入時間戳
168
- word_data["startTime"] = word_start_time
169
- word_data["endTime"] = word_end_time
170
-
171
- return initial_result
172
-
173
-
174
-
175
- # --- 4. 對齊函數 (與上一版相同) ---
176
  def _get_phoneme_alignments_by_word(user_phoneme_str, target_words_ipa_tokenized):
177
  """
178
- (已修改) 使用新的切分邏輯執行音素對齊。
179
  """
180
  user_phonemes = _tokenize_ipa(user_phoneme_str)
181
-
182
  target_phonemes_flat = []
183
- word_boundaries_indices = []
184
  current_idx = 0
185
  for word_ipa_tokens in target_words_ipa_tokenized:
186
  target_phonemes_flat.extend(word_ipa_tokens)
187
  current_idx += len(word_ipa_tokens)
188
  word_boundaries_indices.append(current_idx - 1)
189
 
 
190
  dp = np.zeros((len(user_phonemes) + 1, len(target_phonemes_flat) + 1))
191
  for i in range(1, len(user_phonemes) + 1): dp[i][0] = i
192
  for j in range(1, len(target_phonemes_flat) + 1): dp[0][j] = j
 
193
  for i in range(1, len(user_phonemes) + 1):
194
  for j in range(1, len(target_phonemes_flat) + 1):
195
  cost = 0 if user_phonemes[i-1] == target_phonemes_flat[j-1] else 1
196
- dp[i][j] = min(dp[i-1][j] + 1, dp[i][j-1] + 1, dp[i-1][j-1] + cost)
 
 
 
 
197
 
198
  i, j = len(user_phonemes), len(target_phonemes_flat)
199
  user_path, target_path = [], []
@@ -205,73 +201,60 @@ def _get_phoneme_alignments_by_word(user_phoneme_str, target_words_ipa_tokenized
205
  user_path.insert(0, user_phonemes[i-1]); target_path.insert(0, '-'); i -= 1
206
  else:
207
  user_path.insert(0, '-'); target_path.insert(0, target_phonemes_flat[j-1]); j -= 1
208
-
209
  alignments_by_word = []
210
  word_start_idx_in_path = 0
211
  target_phoneme_counter_in_path = 0
212
-
213
  for path_idx, p in enumerate(target_path):
214
  if p != '-':
215
  if target_phoneme_counter_in_path in word_boundaries_indices:
216
  target_alignment = target_path[word_start_idx_in_path : path_idx + 1]
217
- user_alignment = user_path[word_start_idx_in_path : path_idx + 1]
218
-
219
  alignments_by_word.append({
220
  "target": target_alignment,
221
  "user": user_alignment
222
  })
223
-
224
  word_start_idx_in_path = path_idx + 1
225
-
226
  target_phoneme_counter_in_path += 1
227
-
228
  return alignments_by_word
229
 
230
- # --- 5. 格式化函數 (與上一版相同) ---
231
  def _format_to_json_structure(alignments, sentence, original_words) -> dict:
232
  total_phonemes = 0
233
  total_errors = 0
234
  correct_words_count = 0
235
  words_data = []
236
-
237
  num_words_to_process = min(len(alignments), len(original_words))
238
 
239
  for i in range(num_words_to_process):
240
  alignment = alignments[i]
241
  word_is_correct = True
242
  phonemes_data = []
243
-
244
  for j in range(len(alignment['target'])):
245
  target_phoneme = alignment['target'][j]
246
  user_phoneme = alignment['user'][j]
247
  is_match = (user_phoneme == target_phoneme)
248
-
249
  phonemes_data.append({
250
  "target": target_phoneme,
251
  "user": user_phoneme,
252
  "isMatch": is_match
253
  })
254
-
255
  if not is_match:
256
  word_is_correct = False
257
  if not (user_phoneme == '-' and target_phoneme == '-'):
258
  total_errors += 1
259
-
260
  if word_is_correct:
261
  correct_words_count += 1
262
-
263
  words_data.append({
264
  "word": original_words[i],
265
  "isCorrect": word_is_correct,
266
  "phonemes": phonemes_data
267
  })
268
-
269
  total_phonemes += sum(1 for p in alignment['target'] if p != '-')
270
 
271
  total_words = len(original_words)
272
  if len(alignments) < total_words:
273
  for i in range(len(alignments), total_words):
274
- # 確保這裡也移除 'ː'
275
  missed_word_ipa_str = phonemize(original_words[i], language='en-us', backend='espeak', strip=True).replace('ː', '')
276
  missed_word_ipa = _tokenize_ipa(missed_word_ipa_str)
277
  phonemes_data = []
@@ -279,7 +262,6 @@ def _format_to_json_structure(alignments, sentence, original_words) -> dict:
279
  phonemes_data.append({"target": p_ipa, "user": "-", "isMatch": False})
280
  total_errors += 1
281
  total_phonemes += 1
282
-
283
  words_data.append({
284
  "word": original_words[i],
285
  "isCorrect": False,
@@ -302,5 +284,121 @@ def _format_to_json_structure(alignments, sentence, original_words) -> dict:
302
  },
303
  "words": words_data
304
  }
305
-
306
  return final_result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ASR_en_us.py (fixed & replace-with)
 
2
  import torch
3
  import soundfile as sf
4
  import librosa
 
5
  import os
 
6
  import numpy as np
7
  from datetime import datetime, timezone
8
 
9
+ from phonemizer import phonemize
10
+ from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
11
+
12
+ # Optional: LM-assisted decoder (preferred for robust offsets)
13
+ try:
14
+ from transformers import Wav2Vec2ProcessorWithLM
15
+ HAS_WITH_LM = True
16
+ except Exception:
17
+ HAS_WITH_LM = False
18
+
19
+ # ---------- Device ----------
20
  DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
21
+ print(f"INFO: ASR_en_us.py is configured to use device: {DEVICE}")
22
 
23
+ # ---------- Global & model ----------
24
  MODEL_NAME = "MultiBridge/wav2vec-LnNor-IPA-ft"
 
25
  processor = None
26
+ processor_lm = None
27
  model = None
28
 
29
  def load_model():
30
  """
31
+ 載入模型與處理器:
32
+ - 先載標準 Processor + 模型
33
+ - 若可用,再載 LM Processor 以取得更穩定的 offsets
34
  """
35
+ global processor, processor_lm, model
36
+
37
  if processor and model:
38
  print(f"模型 '{MODEL_NAME}' 已載入,跳過。")
39
  return True
40
 
41
  print(f"正在準備 ASR 模型 '{MODEL_NAME}'...")
 
42
  try:
 
 
43
  processor = Wav2Vec2Processor.from_pretrained(MODEL_NAME)
44
  model = Wav2Vec2ForCTC.from_pretrained(MODEL_NAME)
 
45
  model.to(DEVICE)
46
+
47
+ if HAS_WITH_LM:
48
+ try:
49
+ processor_lm = Wav2Vec2ProcessorWithLM.from_pretrained(MODEL_NAME)
50
+ print("LM 解碼器載入成功:將優先使用 logits + LM 取得 offsets。")
51
+ except Exception as e:
52
+ processor_lm = None
53
+ print(f"LM 解碼器不可用({e}),回退到標準解碼。")
54
+
55
  print(f"模型 '{MODEL_NAME}' 和處理器載入成功!")
56
  return True
57
  except Exception as e:
58
+ print(f"載入模型/處理器時發生錯誤: {e}")
59
  raise RuntimeError(f"Failed to load model '{MODEL_NAME}': {e}")
60
 
61
+ # ---------- IPA multi-char handling ----------
 
62
  MULTI_CHAR_PHONEMES = {
63
+ 'tʃ', 'dʒ', # Affricates
64
+ 'eɪ', 'aɪ', 'oʊ', 'aʊ', 'ɔɪ', # Diphthongs
65
+ 'ɪə', 'eə', 'ʊə', 'ər' # R-controlled & others
66
  }
67
 
68
  def _tokenize_ipa(ipa_string: str) -> list:
69
  """
70
+ 智能切分 IPA 字串為音素列表,處理多字元音素。
71
  """
72
  phonemes = []
73
  i = 0
 
81
  i += 1
82
  return phonemes
83
 
84
+ # ---------- Core analyze ----------
85
  def analyze(audio_file_path: str, target_sentence: str) -> dict:
86
  """
87
  接收音訊檔案路徑和目標句子,回傳詳細的發音分析字典。
88
+ 修正:以 logits 取得 offsets,保留 CTC 時序;順序注入;多字元音素聚合;詞級時間回寫。
89
  """
90
  if not processor or not model:
91
+ raise RuntimeError("模型尚未載入。請先呼叫 load_model()。")
92
+
93
+ # 1) 目標 IPA 解析
94
+ target_ipa_by_word_str = phonemize(
95
+ target_sentence,
96
+ language='en-us',
97
+ backend='espeak',
98
+ with_stress=True,
99
+ strip=True
100
+ ).split()
101
+
102
+ # 去掉重音與長度符號
103
  target_ipa_by_word = [
104
  _tokenize_ipa(word.replace('ˌ', '').replace('ˈ', '').replace('ː', ''))
105
  for word in target_ipa_by_word_str
106
  ]
107
  target_words_original = target_sentence.split()
108
 
109
+ # 2) 讀取與重取樣
110
  try:
111
  speech, sample_rate = sf.read(audio_file_path)
112
  if sample_rate != 16000:
113
  speech = librosa.resample(y=speech, orig_sr=sample_rate, target_sr=16000)
114
+ sample_rate = 16000
115
  except Exception as e:
116
  raise IOError(f"讀取或處理音訊時發生錯誤: {e}")
117
+
118
+ # 3) 前處理 & 模型推論
119
+ inputs = processor(speech, sampling_rate=sample_rate, return_tensors="pt")
120
+ input_values = inputs.input_values.to(DEVICE)
121
+
122
  with torch.no_grad():
123
  logits = model(input_values).logits
124
+ pred_ids = torch.argmax(logits, dim=-1)
 
 
 
125
 
126
+ # 使用者 IPA(不含時間戳) + 對齊
127
+ user_ipa_full = processor.decode(pred_ids[0])
128
  word_alignments = _get_phoneme_alignments_by_word(user_ipa_full, target_ipa_by_word)
129
+ result = _format_to_json_structure(word_alignments, target_sentence, target_words_original)
130
+
131
+ # 4) 取得 offsets(優先 logits+LM,否則回退)
132
+ char_offsets = None
133
+ if processor_lm is not None:
134
+ try:
135
+ lm_out = processor_lm.batch_decode(logits.cpu().numpy())
136
+ if hasattr(lm_out, "char_offsets") and lm_out.char_offsets:
137
+ char_offsets = lm_out.char_offsets[0]
138
+ except Exception as e:
139
+ print(f"LM 解碼 offsets 失敗,回退到標準。原因: {e}")
140
+
141
+ if char_offsets is None:
142
+ transcription_with_offsets = processor.batch_decode(
143
+ pred_ids,
144
+ output_char_offsets=True
145
+ )
146
+ char_offsets = transcription_with_offsets.char_offsets[0] if hasattr(transcription_with_offsets, "char_offsets") else []
147
+
148
+ # 5) offsets 轉秒並按順序注入
149
+ step_sec = (model.config.inputs_to_logits_ratio / float(sample_rate)) # 例如 320/16000=0.02s
150
+ ts_seq = []
151
+ for off in char_offsets:
152
+ s = round(off.get('start_offset', None) * step_sec, 3) if off.get('start_offset', None) is not None else None
153
+ e = round(off.get('end_offset', None) * step_sec, 3) if off.get('end_offset', None) is not None else None
154
+ ts_seq.append({
155
+ "char": off.get('char', ''),
156
+ "start": s,
157
+ "end": e
158
  })
159
 
160
+ _inject_timestamps_in_order(result, ts_seq)
161
+
162
+ # 6) 補上分析時間戳
163
+ result["analysisTimestampUTC"] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S (UTC)')
164
+ return result
165
+
166
+ # ---------- Alignment ----------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  def _get_phoneme_alignments_by_word(user_phoneme_str, target_words_ipa_tokenized):
168
  """
169
+ 使用新的切分邏輯執行音素對齊:輸出 by-word 的 user/target 對齊路徑。
170
  """
171
  user_phonemes = _tokenize_ipa(user_phoneme_str)
 
172
  target_phonemes_flat = []
173
+ word_boundaries_indices = []
174
  current_idx = 0
175
  for word_ipa_tokens in target_words_ipa_tokenized:
176
  target_phonemes_flat.extend(word_ipa_tokens)
177
  current_idx += len(word_ipa_tokens)
178
  word_boundaries_indices.append(current_idx - 1)
179
 
180
+ # DP for edit distance
181
  dp = np.zeros((len(user_phonemes) + 1, len(target_phonemes_flat) + 1))
182
  for i in range(1, len(user_phonemes) + 1): dp[i][0] = i
183
  for j in range(1, len(target_phonemes_flat) + 1): dp[0][j] = j
184
+
185
  for i in range(1, len(user_phonemes) + 1):
186
  for j in range(1, len(target_phonemes_flat) + 1):
187
  cost = 0 if user_phonemes[i-1] == target_phonemes_flat[j-1] else 1
188
+ dp[i][j] = min(
189
+ dp[i-1][j] + 1,
190
+ dp[i][j-1] + 1,
191
+ dp[i-1][j-1] + cost
192
+ )
193
 
194
  i, j = len(user_phonemes), len(target_phonemes_flat)
195
  user_path, target_path = [], []
 
201
  user_path.insert(0, user_phonemes[i-1]); target_path.insert(0, '-'); i -= 1
202
  else:
203
  user_path.insert(0, '-'); target_path.insert(0, target_phonemes_flat[j-1]); j -= 1
204
+
205
  alignments_by_word = []
206
  word_start_idx_in_path = 0
207
  target_phoneme_counter_in_path = 0
 
208
  for path_idx, p in enumerate(target_path):
209
  if p != '-':
210
  if target_phoneme_counter_in_path in word_boundaries_indices:
211
  target_alignment = target_path[word_start_idx_in_path : path_idx + 1]
212
+ user_alignment = user_path[word_start_idx_in_path : path_idx + 1]
 
213
  alignments_by_word.append({
214
  "target": target_alignment,
215
  "user": user_alignment
216
  })
 
217
  word_start_idx_in_path = path_idx + 1
 
218
  target_phoneme_counter_in_path += 1
 
219
  return alignments_by_word
220
 
221
+ # ---------- Formatting ----------
222
  def _format_to_json_structure(alignments, sentence, original_words) -> dict:
223
  total_phonemes = 0
224
  total_errors = 0
225
  correct_words_count = 0
226
  words_data = []
 
227
  num_words_to_process = min(len(alignments), len(original_words))
228
 
229
  for i in range(num_words_to_process):
230
  alignment = alignments[i]
231
  word_is_correct = True
232
  phonemes_data = []
 
233
  for j in range(len(alignment['target'])):
234
  target_phoneme = alignment['target'][j]
235
  user_phoneme = alignment['user'][j]
236
  is_match = (user_phoneme == target_phoneme)
 
237
  phonemes_data.append({
238
  "target": target_phoneme,
239
  "user": user_phoneme,
240
  "isMatch": is_match
241
  })
 
242
  if not is_match:
243
  word_is_correct = False
244
  if not (user_phoneme == '-' and target_phoneme == '-'):
245
  total_errors += 1
 
246
  if word_is_correct:
247
  correct_words_count += 1
 
248
  words_data.append({
249
  "word": original_words[i],
250
  "isCorrect": word_is_correct,
251
  "phonemes": phonemes_data
252
  })
 
253
  total_phonemes += sum(1 for p in alignment['target'] if p != '-')
254
 
255
  total_words = len(original_words)
256
  if len(alignments) < total_words:
257
  for i in range(len(alignments), total_words):
 
258
  missed_word_ipa_str = phonemize(original_words[i], language='en-us', backend='espeak', strip=True).replace('ː', '')
259
  missed_word_ipa = _tokenize_ipa(missed_word_ipa_str)
260
  phonemes_data = []
 
262
  phonemes_data.append({"target": p_ipa, "user": "-", "isMatch": False})
263
  total_errors += 1
264
  total_phonemes += 1
 
265
  words_data.append({
266
  "word": original_words[i],
267
  "isCorrect": False,
 
284
  },
285
  "words": words_data
286
  }
 
287
  return final_result
288
+
289
+ # ---------- Timestamp injection (new core) ----------
290
+ def _inject_timestamps_in_order(result_dict: dict, ts_seq: list):
291
+ """
292
+ 以「順序」把時間戳注入到每個音素與詞:
293
+ - 不用字串鍵映射,避免同符號多次出現造成錯位
294
+ - 多字元 IPA 音素以相鄰 char 聚合其時間邊界
295
+ - 寫回詞級 start/end;做基本數學一致性檢查
296
+ """
297
+ # 依序消耗 char offsets
298
+ k = 0 # 指向 ts_seq
299
+ total_ts = len(ts_seq)
300
+
301
+ for word in result_dict["words"]:
302
+ word_start = None
303
+ word_end = None
304
+
305
+ for p in word["phonemes"]:
306
+ p_user = p.get("user", "-")
307
+ # 預設
308
+ p["startTime"] = None
309
+ p["endTime"] = None
310
+
311
+ if p_user == "-" or k >= total_ts:
312
+ continue
313
+
314
+ # 可能存在空白、分隔符等:跳過無效 char
315
+ while k < total_ts and (ts_seq[k]["char"] is None or ts_seq[k]["char"] == ""):
316
+ k += 1
317
+ if k >= total_ts:
318
+ break
319
+ if k >= total_ts:
320
+ break
321
+
322
+ # 精確匹配:下一個 char 等於整個音素
323
+ if ts_seq[k]["char"] == p_user:
324
+ s = ts_seq[k]["start"]; e = ts_seq[k]["end"]
325
+ if _valid_ts_pair(s, e):
326
+ p["startTime"] = s; p["endTime"] = e
327
+ word_start = s if word_start is None else word_start
328
+ word_end = e
329
+ k += 1
330
+ continue
331
+
332
+ # 多字元音素:嘗試聚合相鄰 char
333
+ if len(p_user) > 1:
334
+ agg_start = None
335
+ agg_end = None
336
+ consumed = 0
337
+ buffer = ""
338
+
339
+ while (k + consumed) < total_ts and len(buffer) < len(p_user):
340
+ cur_char = ts_seq[k + consumed]["char"] or ""
341
+ buffer += cur_char
342
+ ts_s = ts_seq[k + consumed]["start"]
343
+ ts_e = ts_seq[k + consumed]["end"]
344
+ if ts_s is not None:
345
+ agg_start = ts_s if agg_start is None else min(agg_start, ts_s)
346
+ if ts_e is not None:
347
+ agg_end = ts_e if agg_end is None else max(agg_end, ts_e)
348
+ consumed += 1
349
+ if buffer == p_user:
350
+ if _valid_ts_pair(agg_start, agg_end):
351
+ p["startTime"] = agg_start
352
+ p["endTime"] = agg_end
353
+ word_start = agg_start if word_start is None else word_start
354
+ word_end = agg_end
355
+ k += consumed
356
+ break
357
+ # 若聚合失敗,不消耗 ts_seq,保留 None
358
+
359
+ # 單字元但不相等:避免錯位,不消耗 ts_seq;保留 None
360
+
361
+ # 詞級時間回寫(以該詞第一/最後一個有時間的音素為邊界)
362
+ word["startTime"] = word_start
363
+ word["endTime"] = word_end
364
+
365
+ # 事後基本檢查:全局時間單調 & 音素不重疊
366
+ _sanitize_monotonic_and_nonoverlap(result_dict)
367
+
368
+ def _valid_ts_pair(s, e):
369
+ return (s is not None) and (e is not None) and (s <= e)
370
+
371
+ def _sanitize_monotonic_and_nonoverlap(result_dict: dict):
372
+ """
373
+ 保證列表中各音素時間不回退、不重疊(允許等邊界接觸),
374
+ 並限制到非負與合理的浮點小數三位。
375
+ """
376
+ last_end = None
377
+ for w in result_dict.get("words", []):
378
+ w_start = None
379
+ w_end = None
380
+ for p in w.get("phonemes", []):
381
+ s = p.get("startTime", None)
382
+ e = p.get("endTime", None)
383
+ if s is None or e is None:
384
+ continue
385
+ # 不重疊:若 s < last_end,則把 s 夾到 last_end
386
+ if last_end is not None and s < last_end:
387
+ s = last_end
388
+ # 非負與單調
389
+ if s < 0:
390
+ s = 0.0
391
+ if e < s:
392
+ e = s
393
+ # 四捨五入到 3 位
394
+ p["startTime"] = round(float(s), 3)
395
+ p["endTime"] = round(float(e), 3)
396
+ last_end = p["endTime"]
397
+
398
+ # 詞級邊界更新
399
+ w_start = p["startTime"] if w_start is None else w_start
400
+ w_end = p["endTime"]
401
+
402
+ # 回寫詞級
403
+ w["startTime"] = w_start if w_start is not None else w.get("startTime", None)
404
+ w["endTime"] = w_end if w_end is not None else w.get("endTime", None)