| """ |
| transcript_parser.py |
| -------------------- |
| Parses Zoom transcripts (TXT or VTT format) into structured speaker turns. |
| Each turn is the atomic unit for downstream segmentation by Agent 1. |
| |
| Supported formats: |
| - Zoom TXT export: "Speaker Name\tHH:MM:SS\nUtterance text\n" |
| - Zoom VTT export: WebVTT with timestamp blocks and speaker labels |
| |
| Output: List of Turn dicts, ready to be saved as parsed JSON. |
| """ |
|
|
| import re |
| import json |
| from pathlib import Path |
| from dataclasses import dataclass, asdict |
| from typing import Optional, Union |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Turn: |
| turn_id: int |
| speaker: str |
| start_time: str |
| end_time: Optional[str] |
| start_seconds: float |
| end_seconds: Optional[float] |
| text: str |
|
|
|
|
| |
| |
| |
|
|
| def _hhmmss_to_seconds(ts: str) -> float: |
| """Convert HH:MM:SS or HH:MM:SS.mmm to total seconds.""" |
| ts = ts.strip() |
| |
| ts = ts.split(".")[0] |
| parts = ts.split(":") |
| try: |
| if len(parts) == 3: |
| h, m, s = parts |
| return int(h) * 3600 + int(m) * 60 + int(s) |
| elif len(parts) == 2: |
| m, s = parts |
| return int(m) * 60 + int(s) |
| except ValueError: |
| pass |
| return 0.0 |
|
|
|
|
| def _seconds_to_hhmmss(seconds: float) -> str: |
| seconds = int(seconds) |
| h = seconds // 3600 |
| m = (seconds % 3600) // 60 |
| s = seconds % 60 |
| return f"{h:02d}:{m:02d}:{s:02d}" |
|
|
|
|
| def _clean_text(text: str) -> str: |
| """Strip VTT positioning tags, extra whitespace, and common artefacts.""" |
| text = re.sub(r"<[^>]+>", "", text) |
| text = re.sub(r"\[.*?\]", "", text) |
| text = re.sub(r"\s+", " ", text) |
| return text.strip() |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| _TXT_HEADER_RE = re.compile( |
| r"^(?P<speaker>.+?)\t(?P<time>\d{1,2}:\d{2}:\d{2})\s*$" |
| ) |
|
|
| |
| _TXT_HEADER_ALT_RE = re.compile( |
| r"^(?P<time>\d{1,2}:\d{2}:\d{2})\s{2,}(?P<speaker>.+?)\s*$" |
| ) |
|
|
| |
| _TXT_HEADER_BRACKET_RE = re.compile( |
| r"^\[(?P<speaker>[^\]]+)\]\s+(?P<time>\d{1,2}:\d{2}:\d{2})\s*$" |
| ) |
|
|
|
|
| def _parse_txt(lines: list[str]) -> list[Turn]: |
| turns = [] |
| turn_id = 0 |
| current_speaker = None |
| current_time = None |
| current_lines = [] |
|
|
| def _flush(): |
| nonlocal turn_id |
| if current_speaker and current_lines: |
| text = _clean_text(" ".join(current_lines)) |
| if text: |
| turns.append(Turn( |
| turn_id=turn_id, |
| speaker=current_speaker, |
| start_time=current_time, |
| end_time=None, |
| start_seconds=_hhmmss_to_seconds(current_time), |
| end_seconds=None, |
| text=text, |
| )) |
| turn_id += 1 |
|
|
| for raw_line in lines: |
| |
| |
| line = raw_line.strip().lstrip("") |
| m = (_TXT_HEADER_RE.match(line) |
| or _TXT_HEADER_ALT_RE.match(line) |
| or _TXT_HEADER_BRACKET_RE.match(line)) |
| if m: |
| _flush() |
| current_speaker = m.group("speaker").strip() |
| current_time = m.group("time").strip() |
| current_lines = [] |
| else: |
| stripped = line.strip() |
| if stripped: |
| current_lines.append(stripped) |
|
|
| _flush() |
|
|
| |
| for i in range(len(turns) - 1): |
| turns[i].end_time = turns[i + 1].start_time |
| turns[i].end_seconds = turns[i + 1].start_seconds |
|
|
| return turns |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| _VTT_TIMESTAMP_RE = re.compile( |
| r"^(?P<start>\d{2}:\d{2}:\d{2}[\.,]\d{3})\s+-->\s+(?P<end>\d{2}:\d{2}:\d{2}[\.,]\d{3})" |
| ) |
| _VTT_VOICE_TAG_RE = re.compile(r"^<v\s+(?P<speaker>[^>]+)>(?P<text>.*)$") |
|
|
|
|
| def _parse_vtt(lines: list[str]) -> list[Turn]: |
| turns = [] |
| turn_id = 0 |
| i = 0 |
|
|
| while i < len(lines): |
| line = lines[i].strip() |
|
|
| m_ts = _VTT_TIMESTAMP_RE.match(line) |
| if m_ts: |
| start_raw = m_ts.group("start").replace(",", ".") |
| end_raw = m_ts.group("end").replace(",", ".") |
| start_time = start_raw[:8] |
| end_time = end_raw[:8] |
|
|
| |
| text_lines = [] |
| speaker = "Unknown" |
| i += 1 |
| while i < len(lines) and lines[i].strip(): |
| tline = lines[i].strip() |
| m_v = _VTT_VOICE_TAG_RE.match(tline) |
| if m_v: |
| speaker = m_v.group("speaker").strip() |
| tline = m_v.group("text") |
| text_lines.append(tline) |
| i += 1 |
|
|
| text = _clean_text(" ".join(text_lines)) |
| if text: |
| turns.append(Turn( |
| turn_id=turn_id, |
| speaker=speaker, |
| start_time=start_time, |
| end_time=end_time, |
| start_seconds=_hhmmss_to_seconds(start_time), |
| end_seconds=_hhmmss_to_seconds(end_time), |
| text=text, |
| )) |
| turn_id += 1 |
| else: |
| i += 1 |
|
|
| return turns |
|
|
|
|
| |
| |
| |
|
|
| def detect_format(content: str) -> str: |
| """Detect whether a transcript is VTT or TXT format.""" |
| first_lines = content.strip().splitlines()[:5] |
| for line in first_lines: |
| if line.strip().upper() == "WEBVTT": |
| return "vtt" |
| |
| for line in content.splitlines()[:20]: |
| if _TXT_HEADER_RE.match(line) or _TXT_HEADER_ALT_RE.match(line) or _TXT_HEADER_BRACKET_RE.match(line): |
| return "txt" |
| return "txt" |
|
|
|
|
| def parse_transcript(filepath: Union[str, Path]) -> list[dict]: |
| """ |
| Parse a Zoom transcript file (TXT or VTT) into a list of Turn dicts. |
| |
| Args: |
| filepath: Path to the .txt or .vtt transcript file. |
| |
| Returns: |
| List of dicts with keys: turn_id, speaker, start_time, end_time, |
| start_seconds, end_seconds, text. |
| """ |
| path = Path(filepath) |
| if not path.exists(): |
| raise FileNotFoundError(f"Transcript not found: {filepath}") |
|
|
| |
| |
| try: |
| content = path.read_text(encoding="utf-8-sig") |
| except UnicodeDecodeError: |
| content = path.read_text(encoding="latin-1") |
|
|
| fmt = detect_format(content) |
| lines = content.splitlines() |
|
|
| print(f"[Parser] {path.name}: {len(lines)} raw lines, detected format={fmt!r}") |
| print(f"[Parser] First 3 lines: {lines[:3]}") |
|
|
| if fmt == "vtt": |
| turns = _parse_vtt(lines) |
| else: |
| turns = _parse_txt(lines) |
|
|
| if not turns: |
| |
| sample = "\n ".join(repr(l) for l in lines[:8]) |
| raise ValueError( |
| f"No speaker turns extracted from {path.name}.\n" |
| f"Format detected: {fmt!r}\n" |
| f"File has {len(lines)} lines. First 8 lines:\n {sample}\n" |
| "Expected Zoom TXT format: '[Speaker Name] HH:MM:SS' header lines, " |
| "or Zoom VTT export. Check encoding and file format." |
| ) |
|
|
| return [asdict(t) for t in turns] |
|
|
|
|
| def extract_window( |
| turns: list[dict], |
| start_seconds: float, |
| duration_seconds: float = 600, |
| ) -> list[dict]: |
| """ |
| Return all turns that begin within [start_seconds, start_seconds + duration]. |
| Used by Agent 1 to feed the sliding window. |
| """ |
| end_seconds = start_seconds + duration_seconds |
| return [ |
| t for t in turns |
| if start_seconds <= t["start_seconds"] < end_seconds |
| ] |
|
|
|
|
| def save_parsed(turns: list[dict], output_path: Union[str, Path]) -> None: |
| """Save parsed turns to JSON.""" |
| Path(output_path).write_text( |
| json.dumps(turns, indent=2, ensure_ascii=False), |
| encoding="utf-8", |
| ) |
| print(f"Saved {len(turns)} turns to {output_path}") |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import sys |
|
|
| if len(sys.argv) < 2: |
| print("Usage: python transcript_parser.py <transcript_file> [output.json]") |
| sys.exit(1) |
|
|
| input_file = sys.argv[1] |
| output_file = sys.argv[2] if len(sys.argv) > 2 else "parsed_transcript.json" |
|
|
| turns = parse_transcript(input_file) |
| save_parsed(turns, output_file) |
|
|
| |
| total_seconds = turns[-1]["start_seconds"] if turns else 0 |
| print(f"\nSummary:") |
| print(f" Format detected : {detect_format(Path(input_file).read_text())}") |
| print(f" Total turns : {len(turns)}") |
| print(f" Speakers : {sorted(set(t['speaker'] for t in turns))}") |
| print(f" Duration : {_seconds_to_hhmmss(total_seconds)}") |
| print(f"\nFirst 3 turns:") |
| for t in turns[:3]: |
| print(f" [{t['start_time']}] {t['speaker']}: {t['text'][:80]}...") |
|
|