File size: 10,801 Bytes
5b5a20d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
014b03e
 
 
 
 
 
 
5b5a20d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
014b03e
 
 
 
 
 
 
 
5b5a20d
 
014b03e
 
 
5b5a20d
 
 
 
 
 
014b03e
 
5b5a20d
014b03e
 
 
 
 
5b5a20d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""
transcript_parser.py
--------------------
Parses Zoom transcripts (TXT or VTT format) into structured speaker turns.
Each turn is the atomic unit for downstream segmentation by Agent 1.

Supported formats:
  - Zoom TXT export:  "Speaker Name\tHH:MM:SS\nUtterance text\n"
  - Zoom VTT export:  WebVTT with timestamp blocks and speaker labels

Output: List of Turn dicts, ready to be saved as parsed JSON.
"""

import re
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional, Union


# ---------------------------------------------------------------------------
# Data structure
# ---------------------------------------------------------------------------

@dataclass
class Turn:
    turn_id: int              # sequential index across the transcript
    speaker: str              # normalised speaker name
    start_time: str           # HH:MM:SS
    end_time: Optional[str]   # HH:MM:SS — available in VTT, None in TXT
    start_seconds: float      # for windowing arithmetic
    end_seconds: Optional[float]
    text: str                 # cleaned utterance


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _hhmmss_to_seconds(ts: str) -> float:
    """Convert HH:MM:SS or HH:MM:SS.mmm to total seconds."""
    ts = ts.strip()
    # strip milliseconds if present (VTT uses HH:MM:SS.mmm)
    ts = ts.split(".")[0]
    parts = ts.split(":")
    try:
        if len(parts) == 3:
            h, m, s = parts
            return int(h) * 3600 + int(m) * 60 + int(s)
        elif len(parts) == 2:
            m, s = parts
            return int(m) * 60 + int(s)
    except ValueError:
        pass
    return 0.0


def _seconds_to_hhmmss(seconds: float) -> str:
    seconds = int(seconds)
    h = seconds // 3600
    m = (seconds % 3600) // 60
    s = seconds % 60
    return f"{h:02d}:{m:02d}:{s:02d}"


def _clean_text(text: str) -> str:
    """Strip VTT positioning tags, extra whitespace, and common artefacts."""
    text = re.sub(r"<[^>]+>", "", text)          # remove <00:00:00.000> tags
    text = re.sub(r"\[.*?\]", "", text)           # remove [Music] [Applause] etc
    text = re.sub(r"\s+", " ", text)              # collapse whitespace
    return text.strip()


# ---------------------------------------------------------------------------
# TXT parser
# ---------------------------------------------------------------------------
# Zoom TXT export format:
#
#   Speaker Name\tHH:MM:SS
#   Utterance text (may span multiple lines until next speaker block)
#
# Some exports use a tab separator, others a newline between name+time and text.
# We handle both.

_TXT_HEADER_RE = re.compile(
    r"^(?P<speaker>.+?)\t(?P<time>\d{1,2}:\d{2}:\d{2})\s*$"
)

# Alternative format: "HH:MM:SS  Speaker Name"
_TXT_HEADER_ALT_RE = re.compile(
    r"^(?P<time>\d{1,2}:\d{2}:\d{2})\s{2,}(?P<speaker>.+?)\s*$"
)

# Format: "[Speaker Name] HH:MM:SS"
_TXT_HEADER_BRACKET_RE = re.compile(
    r"^\[(?P<speaker>[^\]]+)\]\s+(?P<time>\d{1,2}:\d{2}:\d{2})\s*$"
)


def _parse_txt(lines: list[str]) -> list[Turn]:
    turns = []
    turn_id = 0
    current_speaker = None
    current_time = None
    current_lines = []

    def _flush():
        nonlocal turn_id
        if current_speaker and current_lines:
            text = _clean_text(" ".join(current_lines))
            if text:
                turns.append(Turn(
                    turn_id=turn_id,
                    speaker=current_speaker,
                    start_time=current_time,
                    end_time=None,
                    start_seconds=_hhmmss_to_seconds(current_time),
                    end_seconds=None,
                    text=text,
                ))
                turn_id += 1

    for raw_line in lines:
        # Strip BOM, carriage returns, and leading/trailing whitespace that
        # could prevent regex anchors from matching (common in Windows exports)
        line = raw_line.strip().lstrip("")
        m = (_TXT_HEADER_RE.match(line)
             or _TXT_HEADER_ALT_RE.match(line)
             or _TXT_HEADER_BRACKET_RE.match(line))
        if m:
            _flush()
            current_speaker = m.group("speaker").strip()
            current_time = m.group("time").strip()
            current_lines = []
        else:
            stripped = line.strip()
            if stripped:
                current_lines.append(stripped)

    _flush()

    # Back-fill end times from next turn's start
    for i in range(len(turns) - 1):
        turns[i].end_time = turns[i + 1].start_time
        turns[i].end_seconds = turns[i + 1].start_seconds

    return turns


# ---------------------------------------------------------------------------
# VTT parser
# ---------------------------------------------------------------------------
# WebVTT format:
#
#   WEBVTT
#
#   00:00:01.000 --> 00:00:04.000
#   <v Speaker Name>Utterance text
#
# Speaker may also appear as a plain line before the text.

_VTT_TIMESTAMP_RE = re.compile(
    r"^(?P<start>\d{2}:\d{2}:\d{2}[\.,]\d{3})\s+-->\s+(?P<end>\d{2}:\d{2}:\d{2}[\.,]\d{3})"
)
_VTT_VOICE_TAG_RE = re.compile(r"^<v\s+(?P<speaker>[^>]+)>(?P<text>.*)$")


def _parse_vtt(lines: list[str]) -> list[Turn]:
    turns = []
    turn_id = 0
    i = 0

    while i < len(lines):
        line = lines[i].strip()

        m_ts = _VTT_TIMESTAMP_RE.match(line)
        if m_ts:
            start_raw = m_ts.group("start").replace(",", ".")
            end_raw = m_ts.group("end").replace(",", ".")
            start_time = start_raw[:8]   # HH:MM:SS
            end_time = end_raw[:8]

            # Collect text lines for this block
            text_lines = []
            speaker = "Unknown"
            i += 1
            while i < len(lines) and lines[i].strip():
                tline = lines[i].strip()
                m_v = _VTT_VOICE_TAG_RE.match(tline)
                if m_v:
                    speaker = m_v.group("speaker").strip()
                    tline = m_v.group("text")
                text_lines.append(tline)
                i += 1

            text = _clean_text(" ".join(text_lines))
            if text:
                turns.append(Turn(
                    turn_id=turn_id,
                    speaker=speaker,
                    start_time=start_time,
                    end_time=end_time,
                    start_seconds=_hhmmss_to_seconds(start_time),
                    end_seconds=_hhmmss_to_seconds(end_time),
                    text=text,
                ))
                turn_id += 1
        else:
            i += 1

    return turns


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------

def detect_format(content: str) -> str:
    """Detect whether a transcript is VTT or TXT format."""
    first_lines = content.strip().splitlines()[:5]
    for line in first_lines:
        if line.strip().upper() == "WEBVTT":
            return "vtt"
    # Check for TXT header pattern in first 20 lines
    for line in content.splitlines()[:20]:
        if _TXT_HEADER_RE.match(line) or _TXT_HEADER_ALT_RE.match(line) or _TXT_HEADER_BRACKET_RE.match(line):
            return "txt"
    return "txt"   # default assumption


def parse_transcript(filepath: Union[str, Path]) -> list[dict]:
    """
    Parse a Zoom transcript file (TXT or VTT) into a list of Turn dicts.

    Args:
        filepath: Path to the .txt or .vtt transcript file.

    Returns:
        List of dicts with keys: turn_id, speaker, start_time, end_time,
        start_seconds, end_seconds, text.
    """
    path = Path(filepath)
    if not path.exists():
        raise FileNotFoundError(f"Transcript not found: {filepath}")

    # utf-8-sig strips the BOM (U+FEFF) automatically — common in Windows/Zoom exports.
    # Fall back to latin-1 (never raises decoding errors) if utf-8 fails.
    try:
        content = path.read_text(encoding="utf-8-sig")
    except UnicodeDecodeError:
        content = path.read_text(encoding="latin-1")

    fmt   = detect_format(content)
    lines = content.splitlines()

    print(f"[Parser] {path.name}: {len(lines)} raw lines, detected format={fmt!r}")
    print(f"[Parser] First 3 lines: {lines[:3]}")

    if fmt == "vtt":
        turns = _parse_vtt(lines)
    else:
        turns = _parse_txt(lines)

    if not turns:
        # Provide diagnostic context so the log reveals the actual problem
        sample = "\n  ".join(repr(l) for l in lines[:8])
        raise ValueError(
            f"No speaker turns extracted from {path.name}.\n"
            f"Format detected: {fmt!r}\n"
            f"File has {len(lines)} lines. First 8 lines:\n  {sample}\n"
            "Expected Zoom TXT format: '[Speaker Name] HH:MM:SS' header lines, "
            "or Zoom VTT export. Check encoding and file format."
        )

    return [asdict(t) for t in turns]


def extract_window(
    turns: list[dict],
    start_seconds: float,
    duration_seconds: float = 600,   # 10 minutes default
) -> list[dict]:
    """
    Return all turns that begin within [start_seconds, start_seconds + duration].
    Used by Agent 1 to feed the sliding window.
    """
    end_seconds = start_seconds + duration_seconds
    return [
        t for t in turns
        if start_seconds <= t["start_seconds"] < end_seconds
    ]


def save_parsed(turns: list[dict], output_path: Union[str, Path]) -> None:
    """Save parsed turns to JSON."""
    Path(output_path).write_text(
        json.dumps(turns, indent=2, ensure_ascii=False),
        encoding="utf-8",
    )
    print(f"Saved {len(turns)} turns to {output_path}")


# ---------------------------------------------------------------------------
# CLI usage
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    import sys

    if len(sys.argv) < 2:
        print("Usage: python transcript_parser.py <transcript_file> [output.json]")
        sys.exit(1)

    input_file = sys.argv[1]
    output_file = sys.argv[2] if len(sys.argv) > 2 else "parsed_transcript.json"

    turns = parse_transcript(input_file)
    save_parsed(turns, output_file)

    # Print summary
    total_seconds = turns[-1]["start_seconds"] if turns else 0
    print(f"\nSummary:")
    print(f"  Format detected : {detect_format(Path(input_file).read_text())}")
    print(f"  Total turns     : {len(turns)}")
    print(f"  Speakers        : {sorted(set(t['speaker'] for t in turns))}")
    print(f"  Duration        : {_seconds_to_hhmmss(total_seconds)}")
    print(f"\nFirst 3 turns:")
    for t in turns[:3]:
        print(f"  [{t['start_time']}] {t['speaker']}: {t['text'][:80]}...")