| |
| """ |
| interview_cuts.py — Gera cortes para entrevistas em PT (pergunta curta + resposta longa). |
| |
| Uso típico: |
| python interview_cuts.py video.mp4 --min 60 --max 150 --qmax 12 --gap 2.0 --lead-in-question yes --max-cuts 20 --preview |
| |
| Pré-requisitos: |
| - Ter o arquivo <base>_transcript.json na mesma pasta do vídeo (gerado pelo video_cuts_offline_mac_plus_subs.py). |
| |
| Saídas: |
| - <base>_interview_cuts.json |
| - <base>_interview_cuts.sh |
| - PREVIEW_<base>_interview.mp4 (opcional) |
| """ |
| import argparse, json, os, re, shlex, subprocess, math |
| from pathlib import Path |
| from typing import List, Dict, Any |
|
|
| try: |
| import numpy as np |
| except Exception: |
| np = None |
| try: |
| from resemblyzer import VoiceEncoder, preprocess_wav |
| _HAVE_RESEMBLYZER = True |
| except Exception: |
| VoiceEncoder = None |
| preprocess_wav = None |
| _HAVE_RESEMBLYZER = False |
|
|
|
|
| def load_json(p: Path): |
| with p.open("r", encoding="utf-8") as f: |
| return json.load(f) |
|
|
| def save_json(obj, p: Path): |
| with p.open("w", encoding="utf-8") as f: |
| json.dump(obj, f, ensure_ascii=False, indent=2) |
|
|
| def normspace(s: str) -> str: |
| return re.sub(r"\s+", " ", (s or "").strip()) |
|
|
| def first_sentence(s: str, limit=120) -> str: |
| s = normspace(s) |
| parts = re.split(r"(?<=[.!?])\s+", s) |
| out = parts[0] if parts and parts[0] else s |
| return out[:limit].rstrip() |
|
|
|
|
| def ensure_wav_16k_mono(video_path: Path) -> Path: |
| """Export a temporary 16k mono wav next to the video if not present.""" |
| wav_path = video_path.with_suffix(".16k.wav") |
| if wav_path.exists(): |
| return wav_path |
| cmd = [ |
| "ffmpeg", "-y", |
| "-i", str(video_path), |
| "-ac", "1", "-ar", "16000", |
| str(wav_path) |
| ] |
| subprocess.run(cmd, check=True) |
| return wav_path |
|
|
|
|
| def diarize_with_resemblyzer(wav_path: Path, n_speakers: int = 2, debug: bool = False): |
| """Lightweight diarization using Resemblyzer.""" |
| if not _HAVE_RESEMBLYZER or np is None: |
| raise RuntimeError("pip install resemblyzer numpy scikit-learn soundfile") |
| try: |
| from sklearn.cluster import AgglomerativeClustering |
| except Exception: |
| raise RuntimeError("pip install scikit-learn") |
|
|
| wav = preprocess_wav(str(wav_path)) |
| enc = VoiceEncoder() |
| _, partial_embeds, partial_slices = enc.embed_utterance(wav, return_partials=True) |
| sr = 16000.0 |
| duration = float(len(wav)) / sr if len(wav) > 0 else 0.0 |
| if len(partial_embeds) == 0 or duration <= 0.0: |
| return [] |
| half = 0.8 |
| n_parts = len(partial_embeds) |
| partial_times = np.array([duration/2.0], dtype=float) if duration <= 2*half else np.linspace(half, duration - half, n_parts) |
| n_samples = len(partial_embeds) |
| if n_samples < 2: |
| return [] |
| X = np.vstack(partial_embeds) |
| n_speakers = max(2, int(n_speakers)) |
| n_clusters = max(2, min(n_speakers, X.shape[0])) |
| labels = AgglomerativeClustering(n_clusters=n_clusters).fit_predict(X) |
| segs = [] |
| cur_spk = int(labels[0]) |
| cur_start = max(0.0, float(partial_times[0] - half)) |
| cur_end = float(partial_times[0] + half) |
| for i in range(1, len(labels)): |
| spk = int(labels[i]) |
| st = float(partial_times[i] - half) |
| en = float(partial_times[i] + half) |
| if spk == cur_spk and st <= cur_end + 0.1: |
| cur_end = max(cur_end, en) |
| else: |
| segs.append({"start": round(max(0.0, cur_start), 3), "end": round(max(cur_end, cur_start+0.1), 3), "spk": cur_spk}) |
| cur_spk = spk |
| cur_start = st |
| cur_end = en |
| segs.append({"start": round(max(0.0, cur_start), 3), "end": round(max(cur_end, cur_start+0.1), 3), "spk": cur_spk}) |
| return segs |
|
|
|
|
| def assign_speakers_to_transcript(transcript: List[Dict[str, Any]], diar_segs: List[Dict[str, Any]]): |
| def spk_at(t: float): |
| for s in diar_segs: |
| if s["start"] - 0.1 <= t <= s["end"] + 0.1: |
| return s["spk"] |
| if diar_segs: |
| bydist = min(diar_segs, key=lambda s: abs((s["start"] + s["end"]) / 2 - t)) |
| return bydist["spk"] |
| return -1 |
| return [spk_at((float(seg.get("start",0)) + float(seg.get("end",0))) / 2.0) for seg in transcript] |
|
|
|
|
| def detect_questions(transcript: List[Dict[str, Any]], qmax: float, wc_max: int, qmark_required: bool, debug: bool=False) -> List[int]: |
| idxs = [] |
| for i, seg in enumerate(transcript): |
| st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = max(0.0, en - st) |
| text = (seg.get("text") or "").strip() |
| wc = len(text.split()) |
| has_qmark = text.endswith("?") |
| dur_ok = d <= qmax |
| wc_ok = wc <= wc_max and wc >= 2 |
| is_q = (has_qmark or dur_ok) and wc_ok |
| if qmark_required: |
| is_q = has_qmark and wc_ok |
| if is_q: |
| idxs.append(i) |
| return idxs |
|
|
|
|
| def build_interview_cuts( |
| transcript: List[Dict[str, Any]], |
| min_len: float, |
| max_len: float, |
| qmax: float, |
| gap: float, |
| lead_in_question: bool, |
| max_cuts: int, |
| wc_max: int = 35, |
| qmark_required: bool = False, |
| spk_labels: List[int] | None = None, |
| interviewer_id: int | None = None, |
| debug: bool = False, |
| ) -> List[Dict[str, Any]]: |
| if spk_labels is not None and interviewer_id is not None: |
| qs = set() |
| for i, seg in enumerate(transcript): |
| st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = en - st |
| text = (seg.get("text") or "").strip() |
| wc = len(text.split()) |
| has_q = text.endswith("?") |
| if spk_labels[i] == interviewer_id and wc <= wc_max and (d <= qmax or has_q): |
| qs.add(i) |
| else: |
| qs = set(detect_questions(transcript, qmax, wc_max, qmark_required, debug)) |
| cuts = [] |
| n = len(transcript) |
| i = 0 |
| while i < n: |
| seg = transcript[i] |
| st = float(seg.get("start", 0)); en = float(seg.get("end", 0)); d = en - st |
| txt = normspace(seg.get("text", "")) |
| if not txt or d < 0.2: |
| i += 1; continue |
| if i in qs: |
| j = i + 1 |
| resp_start = None |
| end_time = en |
| collected_text = [] |
| segments = [] |
| while j < n: |
| s2 = transcript[j] |
| st2 = float(s2.get("start", 0)); en2 = float(s2.get("end", 0)); d2 = en2 - st2 |
| txt2 = normspace(s2.get("text", "")) |
| if j in qs: |
| break |
| if d2 < 0.25: |
| j += 1 |
| continue |
| if resp_start is not None and st2 - end_time > gap: |
| break |
| if txt2: |
| if resp_start is None: |
| resp_start = st2 |
| segments.append({"start": st2, "end": en2}) |
| collected_text.append(txt2) |
| end_time = en2 |
| if end_time - (resp_start if resp_start is not None else st) >= max_len: |
| break |
| j += 1 |
| if resp_start is not None: |
| start_cut = st if lead_in_question else resp_start |
| end_cut = end_time |
| dur = end_cut - start_cut |
| if dur >= min_len * 0.6: |
| label = first_sentence(" ".join(collected_text), 70) or "Resposta marcante" |
| hook = first_sentence(txt, 90) if lead_in_question else "" |
| cuts.append({ |
| "start": round(start_cut, 3), |
| "end": round(end_cut, 3), |
| "label": label, |
| "hook": hook, |
| "reason": "Pergunta curta seguida de resposta longa", |
| "segments": ([{"start": st, "end": en}] if lead_in_question else []) + segments |
| }) |
| if len(cuts) >= max_cuts: |
| break |
| i = max(i + 1, j) |
| continue |
| else: |
| j = i + 1 |
| end_time = en |
| collected = [txt] if txt else [] |
| segments = [{"start": st, "end": en}] |
| while j < n and float(transcript[j].get("start",0)) - end_time <= gap: |
| s2 = transcript[j] |
| st2 = float(s2.get("start", 0)); en2 = float(s2.get("end", 0)) |
| t2 = normspace(s2.get("text", "")) |
| if en2 - st2 < 0.25: |
| j += 1 |
| continue |
| if t2: |
| segments.append({"start": st2, "end": en2}) |
| collected.append(t2) |
| end_time = en2 |
| if end_time - st >= max_len: |
| break |
| j += 1 |
| dur = end_time - st |
| if dur >= min_len and collected: |
| cuts.append({ |
| "start": round(st, 3), |
| "end": round(end_time, 3), |
| "label": first_sentence(" ".join(collected), 70) or "Resposta destacada", |
| "hook": "", |
| "reason": "Resposta contínua em entrevista", |
| "segments": segments |
| }) |
| if len(cuts) >= max_cuts: |
| break |
| i = j |
| continue |
| return cuts |
|
|
|
|
| def write_shell_and_preview(video_path: Path, base: str, cuts: List[Dict[str, Any]], preview: bool): |
| out_dir = video_path.parent |
| sh_path = out_dir / f"{base}_interview_cuts.sh" |
| parts_dir = out_dir / "export_parts" |
| parts_dir.mkdir(exist_ok=True) |
|
|
| lines = ["#!/usr/bin/env bash", "set -e"] |
| for k, c in enumerate(cuts, 1): |
| ss = c["start"]; ee = c["end"]; dd = round(ee - ss, 3) |
| out_file = parts_dir / f"{base}_cut_{k:02}.mp4" |
| cmd = ( |
| f"ffmpeg -hide_banner -loglevel warning -y -ss {ss} -i {shlex.quote(str(video_path))} -t {dd} " |
| f"-c:v libx264 -crf 22 -preset veryfast -vf scale=1080:-2:flags=bicubic -c:a aac -b:a 128k {shlex.quote(str(out_file))}" |
| ) |
| lines.append(cmd) |
| if preview and cuts: |
| plist = out_dir / f"{base}_interview_preview_list.txt" |
| with plist.open("w", encoding="utf-8") as f: |
| for k in range(1, len(cuts)+1): |
| p = parts_dir / f"{base}_cut_{k:02}.mp4" |
| f.write(f"file {p.name}\n") |
| preview_path = out_dir / f"PREVIEW_{base}_interview.mp4" |
| lines.append(f"ffmpeg -hide_banner -loglevel warning -y -f concat -safe 0 -i {shlex.quote(str(plist))} -c copy {shlex.quote(str(preview_path))}") |
|
|
| sh_path.write_text("\n".join(lines) + "\n", encoding="utf-8") |
| os.chmod(sh_path, 0o755) |
| print(f"✅ Script de export: {sh_path}") |
|
|
|
|
| def main(): |
| ap = argparse.ArgumentParser("Cortes para entrevistas (pergunta curta + resposta longa)") |
| ap.add_argument("video", help="Arquivo de entrada (.mp4/.mov)") |
| ap.add_argument("--min", type=float, default=60.0, help="Duração mínima do corte em segundos") |
| ap.add_argument("--max", type=float, default=150.0, help="Duração máxima do corte em segundos") |
| ap.add_argument("--qmax", type=float, default=12.0, help="Máximo de duração para marcar perguntas") |
| ap.add_argument("--gap", type=float, default=2.0, help="Tolerância de gap entre segmentos") |
| ap.add_argument("--lead-in-question", choices=["yes","no"], default="yes", help="Incluir pergunta antes da resposta") |
| ap.add_argument("--max-cuts", type=int, default=20, help="Limite de cortes") |
| ap.add_argument("--preview", action="store_true", help="Gera comando de prévia por concat") |
| ap.add_argument("--q-wc-max", type=int, default=35, help="Máximo de palavras para considerar pergunta") |
| ap.add_argument("--qmark-required", action="store_true", help="Exigir '?' para marcar pergunta") |
| ap.add_argument("--diarize", action="store_true", help="Ativar diarização com Resemblyzer") |
| ap.add_argument("--n-speakers", type=int, default=2, help="Número de falantes para clusterizar") |
| ap.add_argument("--debug", action="store_true", help="Imprimir diagnóstico") |
| args = ap.parse_args() |
|
|
| video_path = Path(args.video).expanduser().resolve() |
| base = video_path.stem |
| transcript_path = video_path.with_name(f"{base}_transcript.json") |
| if not transcript_path.exists(): |
| print(f"ERRO: não achei '{transcript_path.name}'. Gere a transcrição primeiro com video_cuts_offline_mac_plus_subs.py") |
| raise SystemExit(1) |
|
|
| transcript = load_json(transcript_path) |
|
|
| spk_labels = None |
| interviewer_id = None |
| if args.diarize: |
| try: |
| wav16k = ensure_wav_16k_mono(video_path) |
| diar = diarize_with_resemblyzer(wav16k, n_speakers=args.n_speakers, debug=args.debug) |
| if diar: |
| spk_labels = assign_speakers_to_transcript(transcript, diar) |
| totals = {} |
| for i, seg in enumerate(transcript): |
| st = float(seg.get("start",0)); en = float(seg.get("end",0)); d = max(0.0, en-st) |
| spk = spk_labels[i] if spk_labels and i < len(spk_labels) else -1 |
| totals[spk] = totals.get(spk, 0.0) + d |
| if totals: |
| interviewer_id = sorted(totals.items(), key=lambda kv: kv[1])[0][0] |
| except Exception as e: |
| print(f"[warn] Diarização falhou: {e}. Seguindo sem diarização.") |
|
|
| cuts = build_interview_cuts( |
| transcript=transcript, |
| min_len=args.min, |
| max_len=args.max, |
| qmax=args.qmax, |
| gap=args.gap, |
| lead_in_question=(args.lead_in_question=="yes"), |
| max_cuts=args.max_cuts, |
| wc_max=args.q_wc_max, |
| qmark_required=args.qmark_required, |
| spk_labels=spk_labels, |
| interviewer_id=interviewer_id, |
| debug=args.debug, |
| ) |
|
|
| out_json = video_path.with_name(f"{base}_interview_cuts.json") |
| save_json(cuts, out_json) |
| print(f"✅ Gerado: {out_json}") |
|
|
| write_shell_and_preview(video_path, base, cuts, preview=args.preview) |
|
|
| if __name__ == "__main__": |
| main() |
|
|