Aphasia_Classification / cha_json.py
Ellie5757575757's picture
Update cha_json.py
cda796a verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
cha_json.py — 將單一 CLAN .cha 轉成 JSON(強化 %mor/%wor/%gra 對齊)
用法:
# CLI
python3 cha_json.py --input /path/to/input.cha --output /path/to/output.json
程式化呼叫(供 pipeline 使用):
from cha_json import cha_to_json_file, cha_to_dict
out_path, data = cha_to_json_file("/path/in.cha", "/path/out.json")
data2 = cha_to_dict("/path/in.cha")
"""
from __future__ import annotations
import re
import json
import sys
import argparse
from pathlib import Path
from collections import defaultdict
from typing import List, Dict, Any, Tuple, Optional
# 可接受的跨行停止條件(用於 %mor/%wor/%gra 合併)
TAG_PREFIXES = ("*PAR", "*INV", "%mor:", "%gra:", "%wor:", "@")
WORD_RE = re.compile(r"[A-Za-z0-9]+")
# 病人角色:PAR / PAR0 / PAR1 / ...
ID_PAR_RE = re.compile(r"\|PAR\d*\|")
# 對話行:*INV: 或 *PAR0: / *PAR1: / ...
UTTER_RE = re.compile(r"^\*(INV|PAR\d+):")
# ────────── 同義集合(對齊時容忍形態變化) ──────────
SYN_SETS = [
{"be", "am", "is", "are", "was", "were", "been", "being"},
{"have", "has", "had"},
{"do", "does", "did", "done", "doing"},
{"go", "goes", "going", "went", "gone"},
{"run", "runs", "running", "ran"},
{"see", "sees", "seeing", "saw", "seen"},
{"get", "gets", "getting", "got", "gotten"},
{"drop", "drops", "dropping", "dropped"},
{"swim", "swims", "swimming", "swam", "swum"},
]
def same_syn(a: str, b: str) -> bool:
if not a or not b:
return False
for s in SYN_SETS:
if a in s and b in s:
return True
return False
def canonical(txt: str) -> str:
"""token/word → 比對用字串:去掉 & ~ - | 之後的非字母數字、轉小寫"""
head = re.split(r"[~\-\&|]", txt, 1)[0]
m = WORD_RE.search(head)
return m.group(0).lower() if m else ""
def merge_multiline(block_lines: List[str]) -> str:
"""
合併跨行的 %mor/%wor/%gra。
規則:以 '%' 開頭者作為起始,往下串,遇到新標籤或 @ 開頭就停。
"""
merged, buf = [], None
for raw in block_lines:
ln = raw.rstrip("\n").replace("\x15", "") # 去掉 CLAN 控制字
if ln.lstrip().startswith("%") and ":" in ln:
if buf:
merged.append(buf)
buf = ln
else:
if buf and ln.strip():
buf += " " + ln.strip()
else:
merged.append(ln)
if buf:
merged.append(buf)
return "\n".join(merged)
def cha_to_json(lines: List[str]) -> Dict[str, Any]:
"""
將 .cha 檔行列表轉 JSON 結構。
回傳格式:
{
"sentences": [...],
"pos_mapping": {...},
"grammar_mapping": {...},
"aphasia_types": {...},
"text_all": "..." # 方便下游模型使用的 PAR 合併文字
}
"""
# 對應表(pos / gra 從 1 起算;aphasia 類型 0 起)
pos_map: Dict[str, int] = defaultdict(lambda: len(pos_map) + 1)
gra_map: Dict[str, int] = defaultdict(lambda: len(gra_map) + 1)
aphasia_map: Dict[str, int] = defaultdict(lambda: len(aphasia_map))
data: List[Dict[str, Any]] = []
sent: Optional[Dict[str, Any]] = None
i = 0
while i < len(lines):
line = lines[i].rstrip("\n")
# 啟段
if line.startswith("@Begin"):
sent = {
"sentence_id": f"S{len(data)+1}",
"sentence_pid": None,
"aphasia_type": None, # 若最後仍沒有,就標 UNKNOWN
"dialogues": [] # [ { "INV": [...], "PAR": [...] }, ... ]
}
i += 1
continue
# 結束
if line.startswith("@End"):
if sent and sent["dialogues"]:
if not sent.get("aphasia_type"):
sent["aphasia_type"] = "UNKNOWN"
aphasia_map["UNKNOWN"]
data.append(sent)
sent = None
i += 1
continue
# 句子屬性
if sent and line.startswith("@PID:"):
parts = line.split("\t")
if len(parts) > 1:
sent["sentence_pid"] = parts[1].strip()
i += 1
continue
if sent and line.startswith("@ID:"):
# 是否為病人那位 PAR*
if ID_PAR_RE.search(line):
aph = "UNKNOWN"
# 如果 @ID 有標註失語類型,可在此使用 regex 抓出來並替換 aph
# m = re.search(r"WAB:([A-Za-z]+)", line)
# if m: aph = m.group(1)
aph = aph.upper()
aphasia_map[aph] # 建立 map(自動編號)
sent["aphasia_type"] = aph
i += 1
continue
# 對話行:*INV: 或 *PARx:
if sent and UTTER_RE.match(line):
role_tag = UTTER_RE.match(line).group(1)
role = "INV" if role_tag == "INV" else "PAR"
if not sent["dialogues"]:
sent["dialogues"].append({"INV": [], "PAR": []})
# 新輪對話:若來的是 INV 且上一輪已有 PAR,視為下一輪
if role == "INV" and sent["dialogues"][-1]["PAR"]:
sent["dialogues"].append({"INV": [], "PAR": []})
# 新增一個空 turn(之後 %mor/%wor/%gra 會補)
sent["dialogues"][-1][role].append(
{"tokens": [], "word_pos_ids": [], "word_grammar_ids": [], "word_durations": [], "utterance_text": ""}
)
i += 1
continue
# %mor
if sent and line.startswith("%mor:"):
blk = [line]; i += 1
while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES):
blk.append(lines[i]); i += 1
units = merge_multiline(blk).replace("%mor:", "").strip().split()
toks, pos_ids = [], []
for u in units:
if "|" in u:
pos, rest = u.split("|", 1)
word = rest.split("|", 1)[0]
toks.append(word)
pos_ids.append(pos_map[pos])
dlg = sent["dialogues"][-1]
tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1]
tgt["tokens"], tgt["word_pos_ids"] = toks, pos_ids
# 也保存 plain text 供下游模型使用
tgt["utterance_text"] = " ".join(toks).strip()
continue
# %wor
if sent and line.startswith("%wor:"):
blk = [line]; i += 1
while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES):
blk.append(lines[i]); i += 1
merged = merge_multiline(blk).replace("%wor:", "").strip()
# 抓 <word> <start>_<end>
raw_pairs = re.findall(r"(\S+)\s+(\d+)_(\d+)", merged)
wor = [(w, int(s), int(e)) for (w, s, e) in raw_pairs]
dlg = sent["dialogues"][-1]
tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1]
# 與 %mor tokens 對齊,duration = end - start
aligned: List[Tuple[str, int]] = []
j = 0
for tok in tgt.get("tokens", []):
c_tok = canonical(tok)
match = None
for k in range(j, len(wor)):
c_w = canonical(wor[k][0])
if (
c_tok == c_w
or c_w.startswith(c_tok)
or c_tok.startswith(c_w)
or same_syn(c_tok, c_w)
):
match = wor[k]
j = k + 1
break
dur = (match[2] - match[1]) if match else 0
aligned.append([tok, dur])
tgt["word_durations"] = aligned
continue
# %gra
if sent and line.startswith("%gra:"):
blk = [line]; i += 1
while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES):
blk.append(lines[i]); i += 1
units = merge_multiline(blk).replace("%gra:", "").strip().split()
triples = []
for u in units:
# 例:1|2|DET
parts = u.split("|")
if len(parts) == 3:
a, b, r = parts
if a.isdigit() and b.isdigit():
triples.append([int(a), int(b), gra_map[r]])
dlg = sent["dialogues"][-1]
tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1]
tgt["word_grammar_ids"] = triples
continue
# 其他行
i += 1
# 收尾(檔案若意外沒 @End)
if sent and sent["dialogues"]:
if not sent.get("aphasia_type"):
sent["aphasia_type"] = "UNKNOWN"
aphasia_map["UNKNOWN"]
data.append(sent)
# 建立 text_all:把所有 PAR utterance_text 串起來
par_texts: List[str] = []
for s in data:
for turn in s.get("dialogues", []):
for par_ut in turn.get("PAR", []):
if par_ut.get("utterance_text"):
par_texts.append(par_ut["utterance_text"])
text_all = "\n".join(par_texts).strip()
return {
"sentences": data,
"pos_mapping": dict(pos_map),
"grammar_mapping": dict(gra_map),
"aphasia_types": dict(aphasia_map),
"text_all": text_all
}
# ────────── 封裝:檔案 → dict / 檔案 → 檔案 ──────────
def cha_to_dict(cha_path: str) -> Dict[str, Any]:
"""讀取 .cha 檔並回傳 dict(不寫檔)。"""
p = Path(cha_path)
if not p.exists():
raise FileNotFoundError(f"找不到檔案: {cha_path}")
with p.open("r", encoding="utf-8") as fh:
lines = fh.readlines()
return cha_to_json(lines)
def cha_to_json_file(cha_path: str, output_json: Optional[str] = None) -> Tuple[str, Dict[str, Any]]:
"""
將 .cha 轉成 JSON 並寫檔。
回傳:(output_json_path, data_dict)
"""
data = cha_to_dict(cha_path)
out_path = Path(output_json) if output_json else Path(cha_path).with_suffix(".json")
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", encoding="utf-8") as fh:
json.dump(data, fh, ensure_ascii=False, indent=4)
return str(out_path), data
# ────────── CLI ──────────
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--input", "-i", type=str, required=True, help="輸入 .cha 檔")
p.add_argument("--output", "-o", type=str, required=True, help="輸出 .json 檔")
return p.parse_args()
def cha_to_json_path(cha_path: str, output_json: str | None = None) -> str:
"""Backward-compatible alias for old code."""
out, _ = cha_to_json_file(cha_path, output_json=output_json)
return out
def main():
args = parse_args()
in_path = Path(args.input)
out_path = Path(args.output)
if not in_path.exists():
sys.exit(f"❌ 找不到檔案: {in_path}")
with in_path.open("r", encoding="utf-8") as fh:
lines = fh.readlines()
dataset = cha_to_json(lines)
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", encoding="utf-8") as fh:
json.dump(dataset, fh, ensure_ascii=False, indent=4)
print(
f"✅ 轉換完成 → {out_path}(句數 {len(dataset['sentences'])},"
f"pos={len(dataset['pos_mapping'])},gra={len(dataset['grammar_mapping'])},"
f"類型鍵={list(dataset['aphasia_types'].keys())})"
)
if __name__ == "__main__":
main()