| |
| |
| """ |
| cha_json.py — 將單一 CLAN .cha 轉成 JSON(強化 %mor/%wor/%gra 對齊) |
| 用法: |
| # CLI |
| python3 cha_json.py --input /path/to/input.cha --output /path/to/output.json |
| |
| 程式化呼叫(供 pipeline 使用): |
| from cha_json import cha_to_json_file, cha_to_dict |
| out_path, data = cha_to_json_file("/path/in.cha", "/path/out.json") |
| data2 = cha_to_dict("/path/in.cha") |
| """ |
|
|
| from __future__ import annotations |
| import re |
| import json |
| import sys |
| import argparse |
| from pathlib import Path |
| from collections import defaultdict |
| from typing import List, Dict, Any, Tuple, Optional |
|
|
| |
| TAG_PREFIXES = ("*PAR", "*INV", "%mor:", "%gra:", "%wor:", "@") |
| WORD_RE = re.compile(r"[A-Za-z0-9]+") |
|
|
| |
| ID_PAR_RE = re.compile(r"\|PAR\d*\|") |
|
|
| |
| UTTER_RE = re.compile(r"^\*(INV|PAR\d+):") |
|
|
| |
| SYN_SETS = [ |
| {"be", "am", "is", "are", "was", "were", "been", "being"}, |
| {"have", "has", "had"}, |
| {"do", "does", "did", "done", "doing"}, |
| {"go", "goes", "going", "went", "gone"}, |
| {"run", "runs", "running", "ran"}, |
| {"see", "sees", "seeing", "saw", "seen"}, |
| {"get", "gets", "getting", "got", "gotten"}, |
| {"drop", "drops", "dropping", "dropped"}, |
| {"swim", "swims", "swimming", "swam", "swum"}, |
| ] |
| def same_syn(a: str, b: str) -> bool: |
| if not a or not b: |
| return False |
| for s in SYN_SETS: |
| if a in s and b in s: |
| return True |
| return False |
|
|
| def canonical(txt: str) -> str: |
| """token/word → 比對用字串:去掉 & ~ - | 之後的非字母數字、轉小寫""" |
| head = re.split(r"[~\-\&|]", txt, 1)[0] |
| m = WORD_RE.search(head) |
| return m.group(0).lower() if m else "" |
|
|
| def merge_multiline(block_lines: List[str]) -> str: |
| """ |
| 合併跨行的 %mor/%wor/%gra。 |
| 規則:以 '%' 開頭者作為起始,往下串,遇到新標籤或 @ 開頭就停。 |
| """ |
| merged, buf = [], None |
| for raw in block_lines: |
| ln = raw.rstrip("\n").replace("\x15", "") |
| if ln.lstrip().startswith("%") and ":" in ln: |
| if buf: |
| merged.append(buf) |
| buf = ln |
| else: |
| if buf and ln.strip(): |
| buf += " " + ln.strip() |
| else: |
| merged.append(ln) |
| if buf: |
| merged.append(buf) |
| return "\n".join(merged) |
|
|
| def cha_to_json(lines: List[str]) -> Dict[str, Any]: |
| """ |
| 將 .cha 檔行列表轉 JSON 結構。 |
| 回傳格式: |
| { |
| "sentences": [...], |
| "pos_mapping": {...}, |
| "grammar_mapping": {...}, |
| "aphasia_types": {...}, |
| "text_all": "..." # 方便下游模型使用的 PAR 合併文字 |
| } |
| """ |
| |
| pos_map: Dict[str, int] = defaultdict(lambda: len(pos_map) + 1) |
| gra_map: Dict[str, int] = defaultdict(lambda: len(gra_map) + 1) |
| aphasia_map: Dict[str, int] = defaultdict(lambda: len(aphasia_map)) |
|
|
| data: List[Dict[str, Any]] = [] |
| sent: Optional[Dict[str, Any]] = None |
|
|
| i = 0 |
| while i < len(lines): |
| line = lines[i].rstrip("\n") |
|
|
| |
| if line.startswith("@Begin"): |
| sent = { |
| "sentence_id": f"S{len(data)+1}", |
| "sentence_pid": None, |
| "aphasia_type": None, |
| "dialogues": [] |
| } |
| i += 1 |
| continue |
|
|
| |
| if line.startswith("@End"): |
| if sent and sent["dialogues"]: |
| if not sent.get("aphasia_type"): |
| sent["aphasia_type"] = "UNKNOWN" |
| aphasia_map["UNKNOWN"] |
| data.append(sent) |
| sent = None |
| i += 1 |
| continue |
|
|
| |
| if sent and line.startswith("@PID:"): |
| parts = line.split("\t") |
| if len(parts) > 1: |
| sent["sentence_pid"] = parts[1].strip() |
| i += 1 |
| continue |
|
|
| if sent and line.startswith("@ID:"): |
| |
| if ID_PAR_RE.search(line): |
| aph = "UNKNOWN" |
| |
| |
| |
| aph = aph.upper() |
| aphasia_map[aph] |
| sent["aphasia_type"] = aph |
| i += 1 |
| continue |
|
|
| |
| if sent and UTTER_RE.match(line): |
| role_tag = UTTER_RE.match(line).group(1) |
| role = "INV" if role_tag == "INV" else "PAR" |
|
|
| if not sent["dialogues"]: |
| sent["dialogues"].append({"INV": [], "PAR": []}) |
| |
| if role == "INV" and sent["dialogues"][-1]["PAR"]: |
| sent["dialogues"].append({"INV": [], "PAR": []}) |
|
|
| |
| sent["dialogues"][-1][role].append( |
| {"tokens": [], "word_pos_ids": [], "word_grammar_ids": [], "word_durations": [], "utterance_text": ""} |
| ) |
| i += 1 |
| continue |
|
|
| |
| if sent and line.startswith("%mor:"): |
| blk = [line]; i += 1 |
| while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): |
| blk.append(lines[i]); i += 1 |
|
|
| units = merge_multiline(blk).replace("%mor:", "").strip().split() |
| toks, pos_ids = [], [] |
| for u in units: |
| if "|" in u: |
| pos, rest = u.split("|", 1) |
| word = rest.split("|", 1)[0] |
| toks.append(word) |
| pos_ids.append(pos_map[pos]) |
|
|
| dlg = sent["dialogues"][-1] |
| tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] |
| tgt["tokens"], tgt["word_pos_ids"] = toks, pos_ids |
| |
| tgt["utterance_text"] = " ".join(toks).strip() |
| continue |
|
|
| |
| if sent and line.startswith("%wor:"): |
| blk = [line]; i += 1 |
| while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): |
| blk.append(lines[i]); i += 1 |
|
|
| merged = merge_multiline(blk).replace("%wor:", "").strip() |
| |
| raw_pairs = re.findall(r"(\S+)\s+(\d+)_(\d+)", merged) |
| wor = [(w, int(s), int(e)) for (w, s, e) in raw_pairs] |
|
|
| dlg = sent["dialogues"][-1] |
| tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] |
|
|
| |
| aligned: List[Tuple[str, int]] = [] |
| j = 0 |
| for tok in tgt.get("tokens", []): |
| c_tok = canonical(tok) |
| match = None |
| for k in range(j, len(wor)): |
| c_w = canonical(wor[k][0]) |
| if ( |
| c_tok == c_w |
| or c_w.startswith(c_tok) |
| or c_tok.startswith(c_w) |
| or same_syn(c_tok, c_w) |
| ): |
| match = wor[k] |
| j = k + 1 |
| break |
| dur = (match[2] - match[1]) if match else 0 |
| aligned.append([tok, dur]) |
| tgt["word_durations"] = aligned |
| continue |
|
|
| |
| if sent and line.startswith("%gra:"): |
| blk = [line]; i += 1 |
| while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): |
| blk.append(lines[i]); i += 1 |
|
|
| units = merge_multiline(blk).replace("%gra:", "").strip().split() |
| triples = [] |
| for u in units: |
| |
| parts = u.split("|") |
| if len(parts) == 3: |
| a, b, r = parts |
| if a.isdigit() and b.isdigit(): |
| triples.append([int(a), int(b), gra_map[r]]) |
|
|
| dlg = sent["dialogues"][-1] |
| tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] |
| tgt["word_grammar_ids"] = triples |
| continue |
|
|
| |
| i += 1 |
|
|
| |
| if sent and sent["dialogues"]: |
| if not sent.get("aphasia_type"): |
| sent["aphasia_type"] = "UNKNOWN" |
| aphasia_map["UNKNOWN"] |
| data.append(sent) |
|
|
| |
| par_texts: List[str] = [] |
| for s in data: |
| for turn in s.get("dialogues", []): |
| for par_ut in turn.get("PAR", []): |
| if par_ut.get("utterance_text"): |
| par_texts.append(par_ut["utterance_text"]) |
| text_all = "\n".join(par_texts).strip() |
|
|
| return { |
| "sentences": data, |
| "pos_mapping": dict(pos_map), |
| "grammar_mapping": dict(gra_map), |
| "aphasia_types": dict(aphasia_map), |
| "text_all": text_all |
| } |
|
|
| |
| def cha_to_dict(cha_path: str) -> Dict[str, Any]: |
| """讀取 .cha 檔並回傳 dict(不寫檔)。""" |
| p = Path(cha_path) |
| if not p.exists(): |
| raise FileNotFoundError(f"找不到檔案: {cha_path}") |
| with p.open("r", encoding="utf-8") as fh: |
| lines = fh.readlines() |
| return cha_to_json(lines) |
|
|
| def cha_to_json_file(cha_path: str, output_json: Optional[str] = None) -> Tuple[str, Dict[str, Any]]: |
| """ |
| 將 .cha 轉成 JSON 並寫檔。 |
| 回傳:(output_json_path, data_dict) |
| """ |
| data = cha_to_dict(cha_path) |
| out_path = Path(output_json) if output_json else Path(cha_path).with_suffix(".json") |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| with out_path.open("w", encoding="utf-8") as fh: |
| json.dump(data, fh, ensure_ascii=False, indent=4) |
| return str(out_path), data |
|
|
| |
| def parse_args(): |
| p = argparse.ArgumentParser() |
| p.add_argument("--input", "-i", type=str, required=True, help="輸入 .cha 檔") |
| p.add_argument("--output", "-o", type=str, required=True, help="輸出 .json 檔") |
| return p.parse_args() |
|
|
| def cha_to_json_path(cha_path: str, output_json: str | None = None) -> str: |
| """Backward-compatible alias for old code.""" |
| out, _ = cha_to_json_file(cha_path, output_json=output_json) |
| return out |
|
|
| def main(): |
| args = parse_args() |
| in_path = Path(args.input) |
| out_path = Path(args.output) |
|
|
| if not in_path.exists(): |
| sys.exit(f"❌ 找不到檔案: {in_path}") |
|
|
| with in_path.open("r", encoding="utf-8") as fh: |
| lines = fh.readlines() |
|
|
| dataset = cha_to_json(lines) |
|
|
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| with out_path.open("w", encoding="utf-8") as fh: |
| json.dump(dataset, fh, ensure_ascii=False, indent=4) |
|
|
| print( |
| f"✅ 轉換完成 → {out_path}(句數 {len(dataset['sentences'])}," |
| f"pos={len(dataset['pos_mapping'])},gra={len(dataset['grammar_mapping'])}," |
| f"類型鍵={list(dataset['aphasia_types'].keys())})" |
| ) |
|
|
| if __name__ == "__main__": |
| main() |
|
|