clipforge / src /humeo /transcript_align.py
moonlantern1's picture
Deploy ClipForge Docker Space
eda316b verified
"""Map source-timeline ASR words to per-clip subtitle timings (t=0 at clip in-point)."""
from __future__ import annotations
from humeo_core.schemas import Clip, ClipSubtitleWords, RenderTheme, TranscriptWord
# Whisper / WhisperX / OpenAI-normalized segment shapes
_MAX_WORDS_PER_CUE = 8
_MAX_CUE_SEC = 4.0
_PUNCTUATION_BREAK_CHARS = (".", "?", "!", ";", ":")
_SENTENCE_RESTART_WORDS = frozenset(
{
"And",
"But",
"Did",
"Now",
"So",
"That",
"Then",
"This",
"Those",
"What",
"When",
"Where",
"Why",
}
)
def _iter_words_from_segments(transcript: dict) -> list[TranscriptWord]:
out: list[TranscriptWord] = []
for seg in transcript.get("segments", []) or []:
words = seg.get("words") or []
if words:
for raw in words:
w = str(raw.get("word", "")).strip()
if not w:
continue
out.append(
TranscriptWord(
word=w,
start_time=float(raw["start"]),
end_time=float(raw["end"]),
)
)
continue
# Segment-level only (no word list): treat whole segment as one token
text = str(seg.get("text", "")).strip()
if text:
out.append(
TranscriptWord(
word=text,
start_time=float(seg.get("start", 0.0)),
end_time=float(seg.get("end", 0.0)),
)
)
return out
def clip_subtitle_words(transcript: dict, clip: Clip) -> ClipSubtitleWords:
"""Words overlapping ``clip`` with times shifted to start at 0 (clip-local)."""
clip_start = clip.start_time_sec
clip_end = clip.end_time_sec
words = _iter_words_from_segments(transcript)
local: list[TranscriptWord] = []
for w in words:
if w.end_time <= clip_start or w.start_time >= clip_end:
continue
t0 = max(w.start_time, clip_start) - clip_start
t1 = min(w.end_time, clip_end) - clip_start
if t1 <= t0:
continue
local.append(TranscriptWord(word=w.word, start_time=t0, end_time=t1))
if local:
return ClipSubtitleWords(words=local)
return ClipSubtitleWords(words=_fallback_even_words(clip))
def _fallback_even_words(clip: Clip) -> list[TranscriptWord]:
"""Even split over clip duration when no word timestamps exist."""
text = (clip.transcript or "").strip()
if not text:
return []
parts = text.split()
if not parts:
return []
d = clip.duration_sec
step = d / len(parts)
out: list[TranscriptWord] = []
for i, p in enumerate(parts):
out.append(
TranscriptWord(
word=p,
start_time=i * step,
end_time=(i + 1) * step if i < len(parts) - 1 else d,
)
)
return out
def _looks_like_sentence_restart(prev_word: str, next_word: str) -> bool:
prev = prev_word.rstrip("\"')]}")
nxt = next_word.lstrip("\"'([{")
if not prev or not nxt:
return False
if nxt in _SENTENCE_RESTART_WORDS:
return True
return any(ch.isdigit() for ch in prev) and nxt[0].isupper()
def clip_words_to_srt_lines(
words: list[TranscriptWord],
*,
max_words_per_cue: int = _MAX_WORDS_PER_CUE,
max_cue_sec: float = _MAX_CUE_SEC,
prefer_break_on_punctuation: bool = False,
min_words_before_break: int = 1,
) -> list[tuple[float, float, str]]:
"""Group words into SRT cues: max N words and max duration per cue."""
chunks = group_words_to_cue_chunks(
words,
max_words_per_cue=max_words_per_cue,
max_cue_sec=max_cue_sec,
prefer_break_on_punctuation=prefer_break_on_punctuation,
min_words_before_break=min_words_before_break,
)
return [
(chunk[0].start_time, chunk[-1].end_time, " ".join(w.word for w in chunk))
for chunk in chunks
]
def group_words_to_cue_chunks(
words: list[TranscriptWord],
*,
max_words_per_cue: int = _MAX_WORDS_PER_CUE,
max_cue_sec: float = _MAX_CUE_SEC,
prefer_break_on_punctuation: bool = False,
min_words_before_break: int = 1,
) -> list[list[TranscriptWord]]:
"""Group words into timed cue chunks while preserving per-word timings."""
if not words:
return []
max_words_per_cue = max(1, int(max_words_per_cue))
max_cue_sec = max(0.2, float(max_cue_sec))
min_words_before_break = max(1, int(min_words_before_break))
chunks_out: list[list[TranscriptWord]] = []
i = 0
n = len(words)
while i < n:
chunk: list[TranscriptWord] = [words[i]]
t0 = words[i].start_time
end_t = words[i].end_time
j = i + 1
while j < n:
w = words[j]
if len(chunk) >= max_words_per_cue:
break
if w.start_time - t0 > max_cue_sec:
break
if (
prefer_break_on_punctuation
and (len(chunk) >= 2 or end_t - t0 >= 0.45)
and _looks_like_sentence_restart(chunk[-1].word, w.word)
):
break
chunk.append(w)
end_t = w.end_time
j += 1
if (
prefer_break_on_punctuation
and len(chunk) >= min_words_before_break
and chunk[-1].word.rstrip("\"')]}").endswith(_PUNCTUATION_BREAK_CHARS)
):
break
chunks_out.append(chunk)
i = j
return chunks_out
def format_srt(lines: list[tuple[float, float, str]]) -> str:
blocks: list[str] = []
for idx, (start, end, text) in enumerate(lines, start=1):
blocks.append(
f"{idx}\n{_fmt_time(start)} --> {_fmt_time(end)}\n{text}\n"
)
return "\n".join(blocks)
def _fmt_time(seconds: float) -> str:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int(round((seconds % 1) * 1000))
if millis >= 1000:
millis = 999
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
# ---------------------------------------------------------------------------
# ASS / SubStation Alpha output (the format libass natively renders)
# ---------------------------------------------------------------------------
def _fmt_ass_time(seconds: float) -> str:
"""ASS time format: ``H:MM:SS.cs`` (centiseconds)."""
seconds = max(0.0, seconds)
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
whole = int(secs)
cs = int(round((secs - whole) * 100))
if cs >= 100:
cs = 99
return f"{hours:d}:{minutes:02d}:{whole:02d}.{cs:02d}"
def _escape_ass_text(text: str) -> str:
"""Escape characters that are significant to the ASS dialogue parser."""
return (
text.replace("\\", r"\\")
.replace("{", r"\{")
.replace("}", r"\}")
.replace("\n", r"\N")
)
def format_ass(
lines: list[tuple[float, float, str]],
*,
play_res_x: int,
play_res_y: int,
font_size: int,
margin_v: int,
margin_h: int = 60,
font_name: str = "Arial",
render_theme: RenderTheme = RenderTheme.LEGACY,
) -> str:
"""Render captions as an ASS script whose PlayRes matches the output video.
Why this exists: libass' font/margin scaling multiplies every pixel-ish
value by ``video_height / PlayResY``. The default ``PlayResY=288`` blew
``FontSize=48`` up to ~320 output pixels and pushed ``MarginV`` to the
middle of the frame. Pinning ``PlayResY`` to the actual output height
makes that scale factor exactly 1.0, so ``font_size`` and ``margin_v``
below are honest output pixel values.
"""
if render_theme == RenderTheme.REFERENCE_LOWER_THIRD:
style_line = (
f"Style: Default,{font_name},{font_size},&H00FFFFFF,&H000000FF,"
"&H00000000,&H00000000,-1,0,0,0,100,100,-1,0,1,3,0,2,"
f"{margin_h},{margin_h},{margin_v},0\n"
)
else:
style_line = (
f"Style: Default,{font_name},{font_size},&H00FFFFFF,&H000000FF,"
f"&H00000000,&H70000000,-1,0,0,0,100,100,0,0,4,0,0,2,"
f"{margin_h},{margin_h},{margin_v},0\n"
)
header = (
"[Script Info]\n"
"ScriptType: v4.00+\n"
f"PlayResX: {play_res_x}\n"
f"PlayResY: {play_res_y}\n"
"WrapStyle: 0\n"
"ScaledBorderAndShadow: yes\n"
"YCbCr Matrix: None\n"
"\n"
"[V4+ Styles]\n"
"Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, "
"OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, "
"ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, "
"Alignment, MarginL, MarginR, MarginV, Encoding\n"
+ style_line +
"\n"
"[Events]\n"
"Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n"
)
events = []
for start, end, text in lines:
events.append(
f"Dialogue: 0,{_fmt_ass_time(start)},{_fmt_ass_time(end)},Default,,"
f"0,0,0,,{_escape_ass_text(text)}"
)
return header + "\n".join(events) + ("\n" if events else "")