| |
| import argparse |
| import json |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| def _log(message: str) -> None: |
| print(f"[cleanup] {message}", flush=True) |
|
|
|
|
| def _load_json(path: Path) -> dict[str, Any]: |
| with path.open("r", encoding="utf-8") as f: |
| return json.load(f) |
|
|
|
|
| def _save_json(path: Path, payload: Any) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", encoding="utf-8") as f: |
| json.dump(payload, f, indent=2, ensure_ascii=False) |
|
|
|
|
| def _extract_json_object(text: str) -> dict[str, Any]: |
| text = text.strip() |
| if not text: |
| raise ValueError("Model returned empty text.") |
|
|
| try: |
| parsed = json.loads(text) |
| if isinstance(parsed, dict): |
| return parsed |
| except Exception: |
| pass |
|
|
| start = text.find("{") |
| while start >= 0: |
| depth = 0 |
| for idx in range(start, len(text)): |
| ch = text[idx] |
| if ch == "{": |
| depth += 1 |
| elif ch == "}": |
| depth -= 1 |
| if depth == 0: |
| candidate = text[start : idx + 1] |
| try: |
| parsed = json.loads(candidate) |
| if isinstance(parsed, dict): |
| return parsed |
| except Exception: |
| break |
| start = text.find("{", start + 1) |
|
|
| raise ValueError("Could not parse a JSON object from model output.") |
|
|
|
|
| def _response_to_dict(response: Any) -> dict[str, Any]: |
| if hasattr(response, "model_dump") and callable(response.model_dump): |
| return response.model_dump() |
| if hasattr(response, "to_dict") and callable(response.to_dict): |
| return response.to_dict() |
| return {"raw_response": str(response)} |
|
|
|
|
| def _response_text(response: Any) -> str: |
| output_text = getattr(response, "output_text", None) |
| if isinstance(output_text, str) and output_text.strip(): |
| return output_text |
|
|
| data = _response_to_dict(response) |
| if isinstance(data, dict): |
| for key in ("output_text", "text"): |
| val = data.get(key) |
| if isinstance(val, str) and val.strip(): |
| return val |
| return "" |
|
|
|
|
| def _usage_from_response_dict(payload: dict[str, Any]) -> dict[str, int | None]: |
| usage = payload.get("usage") |
| if not isinstance(usage, dict): |
| return { |
| "input_tokens": None, |
| "output_tokens": None, |
| "total_tokens": None, |
| "cached_input_tokens": None, |
| "reasoning_tokens": None, |
| } |
|
|
| input_details = usage.get("input_tokens_details", {}) |
| output_details = usage.get("output_tokens_details", {}) |
| return { |
| "input_tokens": usage.get("input_tokens"), |
| "output_tokens": usage.get("output_tokens"), |
| "total_tokens": usage.get("total_tokens"), |
| "cached_input_tokens": input_details.get("cached_tokens") if isinstance(input_details, dict) else None, |
| "reasoning_tokens": output_details.get("reasoning_tokens") if isinstance(output_details, dict) else None, |
| } |
|
|
|
|
| def _sum_usage( |
| first: dict[str, int | None], |
| second: dict[str, int | None], |
| ) -> dict[str, int | None]: |
| def _sum_key(key: str) -> int | None: |
| a = first.get(key) |
| b = second.get(key) |
| if isinstance(a, int) and isinstance(b, int): |
| return a + b |
| if isinstance(a, int): |
| return a |
| if isinstance(b, int): |
| return b |
| return None |
|
|
| total = _sum_key("total_tokens") |
| input_tokens = _sum_key("input_tokens") |
| output_tokens = _sum_key("output_tokens") |
| if total is None and isinstance(input_tokens, int) and isinstance(output_tokens, int): |
| total = input_tokens + output_tokens |
|
|
| return { |
| "input_tokens": input_tokens, |
| "output_tokens": output_tokens, |
| "total_tokens": total, |
| "cached_input_tokens": _sum_key("cached_input_tokens"), |
| "reasoning_tokens": _sum_key("reasoning_tokens"), |
| } |
|
|
|
|
| def _parse_executive_names( |
| *, |
| names_csv: str | None, |
| ) -> list[str]: |
| out: list[str] = [] |
|
|
| if names_csv: |
| for item in names_csv.split(","): |
| name = item.strip().strip('"').strip("'") |
| if name: |
| out.append(name) |
|
|
| |
| seen = set() |
| deduped: list[str] = [] |
| for name in out: |
| key = name.lower() |
| if key in seen: |
| continue |
| seen.add(key) |
| deduped.append(name) |
| return deduped |
|
|
|
|
| def _build_intro_payload(turns: list[dict[str, Any]], intro_turn_limit: int) -> list[dict[str, Any]]: |
| sampled = turns[: max(1, intro_turn_limit)] |
| payload: list[dict[str, Any]] = [] |
| for idx, turn in enumerate(sampled): |
| payload.append( |
| { |
| "turn_index": idx, |
| "speaker": turn.get("speaker"), |
| "start": turn.get("start"), |
| "end": turn.get("end"), |
| "text": turn.get("text"), |
| } |
| ) |
| return payload |
|
|
|
|
| def _extract_qna_announcements(turns: list[dict[str, Any]], max_items: int = 200) -> list[dict[str, Any]]: |
| announcements: list[dict[str, Any]] = [] |
| for idx, turn in enumerate(turns): |
| text = str(turn.get("text", "")).strip() |
| if not text: |
| continue |
| lowered = text.lower() |
| if "line of" in lowered and ("please go ahead" in lowered or "question" in lowered): |
| announcements.append( |
| { |
| "turn_index": idx, |
| "speaker": turn.get("speaker"), |
| "text": text, |
| } |
| ) |
| if len(announcements) >= max_items: |
| break |
| return announcements |
|
|
|
|
| def _extract_response_id(response: Any, response_dict: dict[str, Any]) -> str | None: |
| rid = getattr(response, "id", None) |
| if isinstance(rid, str) and rid: |
| return rid |
| candidate = response_dict.get("id") |
| if isinstance(candidate, str) and candidate: |
| return candidate |
| return None |
|
|
|
|
| def run_cleanup_pipeline( |
| *, |
| input_file: Path, |
| api_key: str, |
| model: str, |
| output_dir: Path, |
| intro_turn_limit: int, |
| executive_names_csv: str | None, |
| ) -> dict[str, Any]: |
| try: |
| from openai import OpenAI |
| except ImportError as exc: |
| raise RuntimeError( |
| "Missing dependency: openai. Install with `pip install openai`." |
| ) from exc |
|
|
| _log("Loading transcript JSON...") |
| transcript_json = _load_json(input_file) |
| turns = transcript_json.get("turns") |
| if not isinstance(turns, list) or not turns: |
| raise ValueError("Input JSON must contain a non-empty `turns` list.") |
|
|
| _log("Parsing executive names input...") |
| executive_names = _parse_executive_names( |
| names_csv=executive_names_csv, |
| ) |
| intro_turns_payload = _build_intro_payload(turns, intro_turn_limit=intro_turn_limit) |
| qna_announcements = _extract_qna_announcements(turns) |
|
|
| run_dir = output_dir / datetime.now().strftime("%Y%m%d_%H%M%S") |
| run_dir.mkdir(parents=True, exist_ok=True) |
| executive_names_out_path = run_dir / "executive_names.json" |
| _save_json(executive_names_out_path, {"names": executive_names}) |
| _log(f"Run directory: {run_dir}") |
| _log(f"Saved executive names file: {executive_names_out_path}") |
|
|
| client = OpenAI(api_key=api_key) |
|
|
| speaker_map_system = ( |
| "You are a transcript entity-resolution assistant. " |
| "Return strict JSON only, no markdown. " |
| "Infer speaker identities from transcript context." |
| ) |
| speaker_map_user = json.dumps( |
| { |
| "task": "Infer speaker mapping from transcript context (intro + Q&A announcements).", |
| "rules": [ |
| "Use explicit or near-explicit intro context ('I now hand over to ...', self-intros, operator intros).", |
| "Label any conference host/queue-management voice as exactly 'Operator' when they do call control.", |
| "Do not map Operator to an executive name.", |
| "Do not guess beyond evidence.", |
| "Prefer names from `executive_names` when they match context.", |
| "In Q&A, infer non-executive participant names from operator announcements such as 'line of <name> from <firm>', even if absent in executive list.", |
| "Keep unknown speakers as null names if evidence is weak.", |
| ], |
| "output_schema": { |
| "speaker_mapping": [ |
| { |
| "speaker_label": "SPEAKER_XX", |
| "inferred_name": "string or null", |
| "confidence": "number 0..1", |
| "evidence_turn_indexes": ["int"], |
| "reason": "short string", |
| } |
| ], |
| "notes": ["string"], |
| }, |
| "executive_names": executive_names, |
| "intro_turns": intro_turns_payload, |
| "qna_announcements": qna_announcements, |
| "transcript_turns": turns, |
| }, |
| ensure_ascii=False, |
| ) |
|
|
| _log("OpenAI call 1/2: inferring speaker mapping...") |
| speaker_map_response = client.responses.create( |
| model=model, |
| input=[ |
| {"role": "system", "content": speaker_map_system}, |
| {"role": "user", "content": speaker_map_user}, |
| ], |
| ) |
| speaker_map_raw = _response_to_dict(speaker_map_response) |
| first_response_id = _extract_response_id(speaker_map_response, speaker_map_raw) |
| speaker_map_usage = _usage_from_response_dict(speaker_map_raw) |
| speaker_map_text = _response_text(speaker_map_response) |
| speaker_map_json = _extract_json_object(speaker_map_text) |
|
|
| speaker_map_path = run_dir / "speaker_mapping.json" |
| speaker_map_raw_path = run_dir / "speaker_mapping_raw_response.json" |
| _save_json(speaker_map_path, speaker_map_json) |
| _save_json(speaker_map_raw_path, speaker_map_raw) |
|
|
| cleanup_system = ( |
| "You are a transcript cleanup and diarization refinement assistant. " |
| "Return strict JSON only, no markdown." |
| ) |
| cleanup_payload_base = { |
| "task": "Clean transcript and produce final speaker-attributed turns.", |
| "rules": [ |
| "Correct likely misspellings and improve punctuation/casing.", |
| "Remove false starts and repeated filler where safe, but keep meaning.", |
| "Standardize executive names to the canonical forms in `executive_names` where applicable.", |
| "Use `speaker_mapping` from call 1, but keep unknown labels if unsupported.", |
| "Label the conference host/control speaker as exactly 'Operator' when they are handling queue/instructions.", |
| "In Q&A, infer names not present in `executive_names` from context and operator announcements.", |
| "If a very short mid-sentence speaker switch is likely diarization noise, merge/reassign using sentence continuity.", |
| "Preserve turn order and timing progression.", |
| "Output speaker labels as inferred names when confidence is sufficient; otherwise keep SPEAKER_XX.", |
| "Do not invent facts not present in transcript context.", |
| ], |
| "output_schema": { |
| "speaker_mapping_final": [ |
| { |
| "source_label": "SPEAKER_XX", |
| "final_label": "Name or SPEAKER_XX", |
| "confidence": "number 0..1", |
| "reason": "short string", |
| } |
| ], |
| "turns": [ |
| { |
| "speaker": "Name or SPEAKER_XX", |
| "start": "float", |
| "end": "float", |
| "text": "cleaned text", |
| } |
| ], |
| "summary": { |
| "turn_count": "int", |
| "speaker_count": "int", |
| "notes": ["string"], |
| }, |
| }, |
| "executive_names": executive_names, |
| "speaker_mapping": speaker_map_json.get("speaker_mapping", []), |
| } |
| cleanup_payload_with_turns = dict(cleanup_payload_base) |
| cleanup_payload_with_turns["transcript_turns"] = turns |
| cleanup_payload_context_only = dict(cleanup_payload_base) |
| cleanup_payload_context_only["context_hint"] = ( |
| "Use the transcript context from the previous response. " |
| "Do not request retransmission." |
| ) |
|
|
| _log("OpenAI call 2/2: cleaning transcript and refining speaker labels...") |
| cleanup_response = None |
| used_context_chaining = False |
| if first_response_id: |
| _log("Using previous_response_id context chaining for call 2.") |
| try: |
| cleanup_response = client.responses.create( |
| model=model, |
| previous_response_id=first_response_id, |
| input=[ |
| {"role": "system", "content": cleanup_system}, |
| {"role": "user", "content": json.dumps(cleanup_payload_context_only, ensure_ascii=False)}, |
| ], |
| ) |
| used_context_chaining = True |
| except TypeError: |
| _log("Client does not support previous_response_id; falling back to explicit transcript payload.") |
| except Exception as exc: |
| _log(f"Context-chained call failed ({exc}); falling back to explicit transcript payload.") |
|
|
| if cleanup_response is None: |
| cleanup_response = client.responses.create( |
| model=model, |
| input=[ |
| {"role": "system", "content": cleanup_system}, |
| {"role": "user", "content": json.dumps(cleanup_payload_with_turns, ensure_ascii=False)}, |
| ], |
| ) |
| cleanup_raw = _response_to_dict(cleanup_response) |
| cleanup_usage = _usage_from_response_dict(cleanup_raw) |
| cleanup_text = _response_text(cleanup_response) |
| cleaned_json = _extract_json_object(cleanup_text) |
| token_usage = { |
| "speaker_mapping_call": speaker_map_usage, |
| "cleanup_call": cleanup_usage, |
| "combined": _sum_usage(speaker_map_usage, cleanup_usage), |
| } |
|
|
| cleaned_json["inputs"] = { |
| "source_file": str(input_file), |
| "speaker_mapping_file": str(speaker_map_path), |
| "context_chaining_used_for_cleanup": used_context_chaining, |
| } |
| cleaned_json["openai_token_usage"] = token_usage |
|
|
| cleaned_path = run_dir / "cleaned_transcript.json" |
| cleaned_raw_path = run_dir / "cleanup_raw_response.json" |
| cleaned_text_path = run_dir / "cleaned_transcript.txt" |
|
|
| _save_json(cleaned_path, cleaned_json) |
| _save_json(cleaned_raw_path, cleanup_raw) |
|
|
| output_turns = cleaned_json.get("turns", []) |
| lines: list[str] = [] |
| if isinstance(output_turns, list): |
| for turn in output_turns: |
| if not isinstance(turn, dict): |
| continue |
| speaker = str(turn.get("speaker", "SPEAKER_XX")) |
| text = str(turn.get("text", "")).strip() |
| if text: |
| lines.append(f"{speaker}: {text}") |
| cleaned_text_path.write_text("\n".join(lines), encoding="utf-8") |
| _log("Saved cleaned transcript outputs.") |
|
|
| run_summary = { |
| "run_dir": str(run_dir), |
| "input_file": str(input_file), |
| "model": model, |
| "speaker_mapping_file": str(speaker_map_path), |
| "speaker_mapping_raw_file": str(speaker_map_raw_path), |
| "cleaned_transcript_file": str(cleaned_path), |
| "cleaned_transcript_raw_file": str(cleaned_raw_path), |
| "cleaned_text_file": str(cleaned_text_path), |
| "intro_turn_limit": intro_turn_limit, |
| "executive_names_file": str(executive_names_out_path), |
| "context_chaining_used_for_cleanup": used_context_chaining, |
| "openai_token_usage": token_usage, |
| } |
| _save_json(run_dir / "run_summary.json", run_summary) |
| _log("Completed.") |
| return run_summary |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser( |
| description=( |
| "Run two OpenAI calls over a merged transcript JSON: " |
| "(1) speaker mapping inference, (2) cleaned/re-labeled transcript." |
| ) |
| ) |
| parser.add_argument("--input-file", required=True, help="Path to merged transcript JSON.") |
| parser.add_argument("--api-key", required=True, help="OpenAI API key.") |
| parser.add_argument("--model", default="gpt-5", help="OpenAI model ID (default: gpt-5).") |
| parser.add_argument( |
| "--intro-turn-limit", |
| type=int, |
| default=80, |
| help="Number of initial turns to use for speaker-introduction inference.", |
| ) |
| parser.add_argument( |
| "--executive-names-csv", |
| default=None, |
| help='Comma-separated executive names, e.g. "Name A,Name B,Name C".', |
| ) |
| parser.add_argument( |
| "--output-dir", |
| default="benchmark_outputs/cleanup_openai", |
| help="Directory to store outputs.", |
| ) |
|
|
| args = parser.parse_args() |
| summary = run_cleanup_pipeline( |
| input_file=Path(args.input_file), |
| api_key=args.api_key, |
| model=args.model, |
| output_dir=Path(args.output_dir), |
| intro_turn_limit=args.intro_turn_limit, |
| executive_names_csv=args.executive_names_csv, |
| ) |
| print(json.dumps(summary, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|