script / transcript_parser.py
Lara Monteagudo Tubau
Update app and transcript parser
014b03e
"""
transcript_parser.py
--------------------
Parses Zoom transcripts (TXT or VTT format) into structured speaker turns.
Each turn is the atomic unit for downstream segmentation by Agent 1.
Supported formats:
- Zoom TXT export: "Speaker Name\tHH:MM:SS\nUtterance text\n"
- Zoom VTT export: WebVTT with timestamp blocks and speaker labels
Output: List of Turn dicts, ready to be saved as parsed JSON.
"""
import re
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional, Union
# ---------------------------------------------------------------------------
# Data structure
# ---------------------------------------------------------------------------
@dataclass
class Turn:
turn_id: int # sequential index across the transcript
speaker: str # normalised speaker name
start_time: str # HH:MM:SS
end_time: Optional[str] # HH:MM:SS — available in VTT, None in TXT
start_seconds: float # for windowing arithmetic
end_seconds: Optional[float]
text: str # cleaned utterance
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _hhmmss_to_seconds(ts: str) -> float:
"""Convert HH:MM:SS or HH:MM:SS.mmm to total seconds."""
ts = ts.strip()
# strip milliseconds if present (VTT uses HH:MM:SS.mmm)
ts = ts.split(".")[0]
parts = ts.split(":")
try:
if len(parts) == 3:
h, m, s = parts
return int(h) * 3600 + int(m) * 60 + int(s)
elif len(parts) == 2:
m, s = parts
return int(m) * 60 + int(s)
except ValueError:
pass
return 0.0
def _seconds_to_hhmmss(seconds: float) -> str:
seconds = int(seconds)
h = seconds // 3600
m = (seconds % 3600) // 60
s = seconds % 60
return f"{h:02d}:{m:02d}:{s:02d}"
def _clean_text(text: str) -> str:
"""Strip VTT positioning tags, extra whitespace, and common artefacts."""
text = re.sub(r"<[^>]+>", "", text) # remove <00:00:00.000> tags
text = re.sub(r"\[.*?\]", "", text) # remove [Music] [Applause] etc
text = re.sub(r"\s+", " ", text) # collapse whitespace
return text.strip()
# ---------------------------------------------------------------------------
# TXT parser
# ---------------------------------------------------------------------------
# Zoom TXT export format:
#
# Speaker Name\tHH:MM:SS
# Utterance text (may span multiple lines until next speaker block)
#
# Some exports use a tab separator, others a newline between name+time and text.
# We handle both.
_TXT_HEADER_RE = re.compile(
r"^(?P<speaker>.+?)\t(?P<time>\d{1,2}:\d{2}:\d{2})\s*$"
)
# Alternative format: "HH:MM:SS Speaker Name"
_TXT_HEADER_ALT_RE = re.compile(
r"^(?P<time>\d{1,2}:\d{2}:\d{2})\s{2,}(?P<speaker>.+?)\s*$"
)
# Format: "[Speaker Name] HH:MM:SS"
_TXT_HEADER_BRACKET_RE = re.compile(
r"^\[(?P<speaker>[^\]]+)\]\s+(?P<time>\d{1,2}:\d{2}:\d{2})\s*$"
)
def _parse_txt(lines: list[str]) -> list[Turn]:
turns = []
turn_id = 0
current_speaker = None
current_time = None
current_lines = []
def _flush():
nonlocal turn_id
if current_speaker and current_lines:
text = _clean_text(" ".join(current_lines))
if text:
turns.append(Turn(
turn_id=turn_id,
speaker=current_speaker,
start_time=current_time,
end_time=None,
start_seconds=_hhmmss_to_seconds(current_time),
end_seconds=None,
text=text,
))
turn_id += 1
for raw_line in lines:
# Strip BOM, carriage returns, and leading/trailing whitespace that
# could prevent regex anchors from matching (common in Windows exports)
line = raw_line.strip().lstrip("")
m = (_TXT_HEADER_RE.match(line)
or _TXT_HEADER_ALT_RE.match(line)
or _TXT_HEADER_BRACKET_RE.match(line))
if m:
_flush()
current_speaker = m.group("speaker").strip()
current_time = m.group("time").strip()
current_lines = []
else:
stripped = line.strip()
if stripped:
current_lines.append(stripped)
_flush()
# Back-fill end times from next turn's start
for i in range(len(turns) - 1):
turns[i].end_time = turns[i + 1].start_time
turns[i].end_seconds = turns[i + 1].start_seconds
return turns
# ---------------------------------------------------------------------------
# VTT parser
# ---------------------------------------------------------------------------
# WebVTT format:
#
# WEBVTT
#
# 00:00:01.000 --> 00:00:04.000
# <v Speaker Name>Utterance text
#
# Speaker may also appear as a plain line before the text.
_VTT_TIMESTAMP_RE = re.compile(
r"^(?P<start>\d{2}:\d{2}:\d{2}[\.,]\d{3})\s+-->\s+(?P<end>\d{2}:\d{2}:\d{2}[\.,]\d{3})"
)
_VTT_VOICE_TAG_RE = re.compile(r"^<v\s+(?P<speaker>[^>]+)>(?P<text>.*)$")
def _parse_vtt(lines: list[str]) -> list[Turn]:
turns = []
turn_id = 0
i = 0
while i < len(lines):
line = lines[i].strip()
m_ts = _VTT_TIMESTAMP_RE.match(line)
if m_ts:
start_raw = m_ts.group("start").replace(",", ".")
end_raw = m_ts.group("end").replace(",", ".")
start_time = start_raw[:8] # HH:MM:SS
end_time = end_raw[:8]
# Collect text lines for this block
text_lines = []
speaker = "Unknown"
i += 1
while i < len(lines) and lines[i].strip():
tline = lines[i].strip()
m_v = _VTT_VOICE_TAG_RE.match(tline)
if m_v:
speaker = m_v.group("speaker").strip()
tline = m_v.group("text")
text_lines.append(tline)
i += 1
text = _clean_text(" ".join(text_lines))
if text:
turns.append(Turn(
turn_id=turn_id,
speaker=speaker,
start_time=start_time,
end_time=end_time,
start_seconds=_hhmmss_to_seconds(start_time),
end_seconds=_hhmmss_to_seconds(end_time),
text=text,
))
turn_id += 1
else:
i += 1
return turns
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def detect_format(content: str) -> str:
"""Detect whether a transcript is VTT or TXT format."""
first_lines = content.strip().splitlines()[:5]
for line in first_lines:
if line.strip().upper() == "WEBVTT":
return "vtt"
# Check for TXT header pattern in first 20 lines
for line in content.splitlines()[:20]:
if _TXT_HEADER_RE.match(line) or _TXT_HEADER_ALT_RE.match(line) or _TXT_HEADER_BRACKET_RE.match(line):
return "txt"
return "txt" # default assumption
def parse_transcript(filepath: Union[str, Path]) -> list[dict]:
"""
Parse a Zoom transcript file (TXT or VTT) into a list of Turn dicts.
Args:
filepath: Path to the .txt or .vtt transcript file.
Returns:
List of dicts with keys: turn_id, speaker, start_time, end_time,
start_seconds, end_seconds, text.
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Transcript not found: {filepath}")
# utf-8-sig strips the BOM (U+FEFF) automatically — common in Windows/Zoom exports.
# Fall back to latin-1 (never raises decoding errors) if utf-8 fails.
try:
content = path.read_text(encoding="utf-8-sig")
except UnicodeDecodeError:
content = path.read_text(encoding="latin-1")
fmt = detect_format(content)
lines = content.splitlines()
print(f"[Parser] {path.name}: {len(lines)} raw lines, detected format={fmt!r}")
print(f"[Parser] First 3 lines: {lines[:3]}")
if fmt == "vtt":
turns = _parse_vtt(lines)
else:
turns = _parse_txt(lines)
if not turns:
# Provide diagnostic context so the log reveals the actual problem
sample = "\n ".join(repr(l) for l in lines[:8])
raise ValueError(
f"No speaker turns extracted from {path.name}.\n"
f"Format detected: {fmt!r}\n"
f"File has {len(lines)} lines. First 8 lines:\n {sample}\n"
"Expected Zoom TXT format: '[Speaker Name] HH:MM:SS' header lines, "
"or Zoom VTT export. Check encoding and file format."
)
return [asdict(t) for t in turns]
def extract_window(
turns: list[dict],
start_seconds: float,
duration_seconds: float = 600, # 10 minutes default
) -> list[dict]:
"""
Return all turns that begin within [start_seconds, start_seconds + duration].
Used by Agent 1 to feed the sliding window.
"""
end_seconds = start_seconds + duration_seconds
return [
t for t in turns
if start_seconds <= t["start_seconds"] < end_seconds
]
def save_parsed(turns: list[dict], output_path: Union[str, Path]) -> None:
"""Save parsed turns to JSON."""
Path(output_path).write_text(
json.dumps(turns, indent=2, ensure_ascii=False),
encoding="utf-8",
)
print(f"Saved {len(turns)} turns to {output_path}")
# ---------------------------------------------------------------------------
# CLI usage
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python transcript_parser.py <transcript_file> [output.json]")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "parsed_transcript.json"
turns = parse_transcript(input_file)
save_parsed(turns, output_file)
# Print summary
total_seconds = turns[-1]["start_seconds"] if turns else 0
print(f"\nSummary:")
print(f" Format detected : {detect_format(Path(input_file).read_text())}")
print(f" Total turns : {len(turns)}")
print(f" Speakers : {sorted(set(t['speaker'] for t in turns))}")
print(f" Duration : {_seconds_to_hhmmss(total_seconds)}")
print(f"\nFirst 3 turns:")
for t in turns[:3]:
print(f" [{t['start_time']}] {t['speaker']}: {t['text'][:80]}...")