YT-AI-Automation / backend /src /video_engine.py
github-actions
Sync Docker Space
5f3e9f5
"""Dual-engine video builder.
Routes call :class:`VideoStudio` instead of touching the platform-specific
engines directly. The studio inspects the host OS (and the
``USE_POWERPOINT`` config flag) and dispatches to one of two engines:
- **Windows** β†’ ``core.powerpoint.controller.PowerPointController`` driving
PowerPoint via COM automation. Same behaviour the project has shipped
on Windows since day one.
- **Linux / macOS** β†’ MoviePy + ffmpeg. Stitches the same screenshot
list into a 4K H.264 MP4 with the same intro / outro thumbnail
layering plus optional intro / outro video clips, watermark, and an
audio bed (per-slide voiceovers + 10 %-volume background music).
Both engines accept the same ``config_data`` dict (see
:meth:`VideoStudio.build_video`) and return the same shape so the calling
route doesn't care which one ran.
"""
from __future__ import annotations
import logging
import os
import platform
import re
import shutil
import subprocess
from pathlib import Path
from typing import Any, Callable, List, Optional
logger = logging.getLogger(__name__)
def _resolve_use_powerpoint() -> bool:
"""Read ``USE_POWERPOINT`` from config, falling back to OS detection.
Routes import VideoStudio after backend ``config`` has been added to
``sys.path`` (see ``app.py``). We re-resolve on every instantiation
so flipping the env var without restarting picks up the new value.
"""
override = os.environ.get("USE_POWERPOINT")
if override is not None and override.strip():
return override.strip().lower() in {"1", "true", "yes", "on"}
try:
import config # type: ignore
flag = getattr(config, "USE_POWERPOINT", None)
if flag is not None:
return bool(flag)
except Exception: # pragma: no cover - config import is best-effort
pass
return platform.system() == "Windows"
class VideoEngineError(Exception):
"""Base class for engine-level failures surfaced to the caller."""
class MovieEngineUnavailableError(VideoEngineError):
"""MoviePy / ffmpeg isn't usable on this host. Caller should surface
a clean error to the user (e.g. via SSE) rather than blow up."""
# Default video knobs β€” match the spec (4K, 30 fps, libx264 ultrafast).
_DEFAULT_RESOLUTION = (3840, 2160)
_DEFAULT_FPS = 30
_DEFAULT_PRESET = "ultrafast"
_DEFAULT_CODEC = "libx264"
_DEFAULT_CRF = 23
_BG_MUSIC_VOLUME = 0.10
_WATERMARK_OPACITY = 0.50
class VideoStudio:
"""OS-aware faΓ§ade around the PowerPoint and MoviePy engines."""
def __init__(self, use_powerpoint: Optional[bool] = None):
self.use_powerpoint = (
bool(use_powerpoint) if use_powerpoint is not None else _resolve_use_powerpoint()
)
# ──────────────────────────────────────────────────────────────────
# Public API
# ──────────────────────────────────────────────────────────────────
def build_video(self, config_data: dict) -> dict:
"""Build an MP4 from ``config_data`` using the selected engine.
Required keys:
``image_files`` – ordered list of slide screenshot paths
``output_video_path`` – where the MP4 lands
Optional keys (all engines):
``output_pptx_path`` – companion PPTX path (Windows only)
``template_path`` – PowerPoint template (Windows only)
``slide_duration`` – seconds per slide (default 5.0)
``resolution`` – ``(w, h)`` tuple (default 4K)
``fps`` – default 30
``quality`` – 1–5 (Windows) / 0–100 (MoviePy)
``intro_thumbnail_path`` / ``intro_thumbnail_duration``
``outro_thumbnail_path`` / ``outro_thumbnail_duration``
``progress_callback`` – ``fn(payload: dict)``
``cancel_event`` – threading.Event for cooperative cancel
Optional ffmpeg / MoviePy keys:
``intro_video_path`` – intro MP4 before the screenshot slides
``outro_video_path`` – outro MP4 after the screenshot slides
MoviePy-only optional keys:
``voiceover_files`` – ``list[Optional[str]]`` (one per slide)
``narration_audio`` – single full-length narration track
``background_music`` – music file mixed in at 10 % volume
``logo_path`` – watermark, 50 % opacity, bottom-right
Returns:
``{'presentation_path': str | None, 'video_path': str, 'warning': str | None}``
"""
if self.use_powerpoint:
return self._build_with_powerpoint(config_data)
if self._can_build_simple_with_ffmpeg(config_data):
return self._build_simple_with_ffmpeg(config_data)
return self._build_with_moviepy(config_data)
# ──────────────────────────────────────────────────────────────────
# Windows β€” PowerPoint COM
# ──────────────────────────────────────────────────────────────────
def _build_with_powerpoint(self, cfg: dict) -> dict:
from core.powerpoint.controller import PowerPointController
controller = PowerPointController()
result = controller.create_and_export_video(
template_path=cfg["template_path"],
image_files=list(cfg["image_files"]),
output_pptx_path=cfg["output_pptx_path"],
output_video_path=cfg["output_video_path"],
slide_duration=float(cfg.get("slide_duration", 5.0)),
transition_type=cfg.get("transition_type", "fade"),
resolution=tuple(cfg.get("resolution") or _DEFAULT_RESOLUTION),
fps=int(cfg.get("fps") or _DEFAULT_FPS),
quality=int(cfg.get("quality") or 5),
intro_thumbnail_path=cfg.get("intro_thumbnail_path"),
intro_thumbnail_duration=float(cfg.get("intro_thumbnail_duration", 5.0)),
outro_thumbnail_path=cfg.get("outro_thumbnail_path"),
outro_thumbnail_duration=float(cfg.get("outro_thumbnail_duration", 5.0)),
progress_callback=cfg.get("progress_callback"),
cancel_event=cfg.get("cancel_event"),
)
# Normalise the return shape so MoviePy and PPT branches are
# interchangeable from the caller's POV.
return {
"presentation_path": result.get("presentation_path"),
"video_path": result.get("video_path"),
"warning": result.get("warning"),
"engine": "powerpoint",
}
# ──────────────────────────────────────────────────────────────────
# Linux / macOS β€” MoviePy
# ──────────────────────────────────────────────────────────────────
def _sort_image_files_like_powerpoint(self, image_files: List[str]) -> List[str]:
"""Match PowerPointExporter.create_from_template screenshot ordering."""
def sort_key(filepath: str) -> int:
basename = os.path.basename(filepath)
match = re.search(r"\((\d+)\)", basename)
if match:
return int(match.group(1))
nums = re.findall(r"\d+", basename)
return int(nums[-1]) if nums else 0
return sorted(image_files, key=sort_key)
def _quality_to_crf(self, cfg: dict) -> int:
if cfg.get("encode_crf") is not None:
return int(cfg["encode_crf"])
raw_quality = cfg.get("quality")
try:
quality = int(raw_quality)
except (TypeError, ValueError):
return _DEFAULT_CRF
# Some callers pass PowerPoint's legacy 1-5 quality, while queued
# Linux runs pass the UI's 1-100 value. Convert both to a sane x264 CRF.
if quality <= 5:
quality = quality * 20
quality = max(1, min(100, quality))
return round(31 - (quality / 100) * 13)
def _can_build_simple_with_ffmpeg(self, cfg: dict) -> bool:
"""Use ffmpeg directly for still-image timelines.
MoviePy is flexible, but 4K still slides force Python to generate
every frame. ffmpeg can hold each image for N seconds natively, which
is much faster for the common screenshot-only export path.
"""
if not shutil.which("ffmpeg"):
return False
image_files = list(cfg.get("image_files") or [])
if not image_files or any(not Path(p).is_file() for p in image_files):
return False
unsupported_keys = ("narration_audio", "background_music", "logo_path")
if any(cfg.get(key) for key in unsupported_keys):
return False
for key in ("intro_video_path", "outro_video_path"):
path = cfg.get(key)
if path and not Path(path).is_file():
return False
voiceover_files = [p for p in list(cfg.get("voiceover_files") or []) if p]
if voiceover_files:
return False
return True
def _build_simple_with_ffmpeg(self, cfg: dict) -> dict:
image_files: List[str] = self._sort_image_files_like_powerpoint(
list(cfg.get("image_files") or [])
)
if not image_files:
raise VideoEngineError("ffmpeg engine requires at least one image_file")
output_video_path: str = cfg["output_video_path"]
Path(output_video_path).parent.mkdir(parents=True, exist_ok=True)
resolution = tuple(cfg.get("resolution") or _DEFAULT_RESOLUTION)
fps = int(cfg.get("fps") or _DEFAULT_FPS)
slide_duration = float(cfg.get("slide_duration", 5.0))
encode_preset = str(cfg.get("encode_preset") or _DEFAULT_PRESET)
encode_codec = str(cfg.get("encode_codec") or _DEFAULT_CODEC)
encode_crf = self._quality_to_crf(cfg)
thread_count = int(
cfg.get("threads")
or max(1, min((os.cpu_count() or 4), 16))
)
progress_callback: Optional[Callable[[dict], None]] = cfg.get("progress_callback")
cancel_event = cfg.get("cancel_event")
def _emit(progress: int, message: str) -> None:
if progress_callback:
try:
progress_callback(
{"stage": "ffmpeg", "progress": progress, "message": message}
)
except Exception:
logger.exception("progress_callback raised")
def _duration(value: Any, default: float) -> float:
try:
parsed = float(value)
except Exception:
return default
return parsed if parsed > 0 else default
timeline: List[tuple[str, float]] = []
intro_thumb = cfg.get("intro_thumbnail_path")
if intro_thumb and Path(intro_thumb).is_file():
timeline.append(
(str(intro_thumb), _duration(cfg.get("intro_thumbnail_duration"), 5.0))
)
timeline.extend((str(path), slide_duration) for path in image_files)
outro_thumb = cfg.get("outro_thumbnail_path")
if outro_thumb and Path(outro_thumb).is_file():
timeline.append(
(str(outro_thumb), _duration(cfg.get("outro_thumbnail_duration"), 5.0))
)
width, height = int(resolution[0]), int(resolution[1])
# Keep still-slide exports variable-frame-rate: one encoded frame can
# be held for the slide duration instead of duplicating it to match fps.
vf = f"scale={width}:{height},setsar=1,format=yuv420p"
def _concat_path(path: str) -> str:
escaped = Path(path).resolve().as_posix().replace("'", "'\\''")
return f"file '{escaped}'"
def _run_ffmpeg(cmd: List[str]) -> None:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
try:
while proc.poll() is None:
if cancel_event is not None and cancel_event.is_set():
proc.terminate()
raise VideoEngineError("Cancelled by user")
try:
proc.wait(timeout=0.5)
except subprocess.TimeoutExpired:
pass
finally:
if proc.poll() is None:
proc.kill()
stdout, stderr = proc.communicate()
if proc.returncode:
detail = (stderr or stdout or "ffmpeg failed").strip()
raise VideoEngineError(f"ffmpeg export failed: {detail}")
def _has_audio(path: str) -> bool:
if not shutil.which("ffprobe"):
return True
probe = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"a:0",
"-show_entries",
"stream=index",
"-of",
"csv=p=0",
path,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
return bool(probe.stdout.strip())
def _video_segment(input_path: str, output_path: Path) -> None:
cmd = [
"ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
input_path,
]
audio_index = "0:a:0"
if not _has_audio(input_path):
cmd.extend(
[
"-f",
"lavfi",
"-i",
"anullsrc=channel_layout=stereo:sample_rate=48000",
]
)
audio_index = "1:a:0"
cmd.extend(
[
"-map",
"0:v:0",
"-map",
audio_index,
"-vf",
f"scale={width}:{height},setsar=1,fps={fps},format=yuv420p",
"-c:v",
encode_codec,
"-preset",
encode_preset,
"-crf",
str(encode_crf),
"-threads",
str(thread_count),
"-c:a",
"aac",
"-ar",
"48000",
"-ac",
"2",
"-shortest",
str(output_path),
]
)
_run_ffmpeg(cmd)
_emit(
90,
f"Encoding MP4 with ffmpeg ({encode_codec} {encode_preset}, crf={encode_crf}, threads={thread_count})...",
)
output_path = Path(output_video_path)
slide_concat_path = output_path.with_suffix(".slides.ffconcat")
segment_concat_path = output_path.with_suffix(".segments.ffconcat")
slide_segment_path = output_path.with_suffix(".slides.mp4")
segment_paths: List[Path] = []
try:
intro_video = cfg.get("intro_video_path")
if intro_video:
intro_segment_path = output_path.with_suffix(".intro.mp4")
_video_segment(str(intro_video), intro_segment_path)
segment_paths.append(intro_segment_path)
lines = ["ffconcat version 1.0"]
for path, duration in timeline:
lines.append(_concat_path(path))
lines.append(f"duration {duration:.6f}")
# The concat demuxer needs the final file repeated to honor its
# duration instead of treating it as a single-frame tail.
lines.append(_concat_path(timeline[-1][0]))
slide_concat_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
total_slide_duration = sum(duration for _path, duration in timeline)
_run_ffmpeg(
[
"ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
"error",
"-f",
"concat",
"-safe",
"0",
"-i",
str(slide_concat_path),
"-f",
"lavfi",
"-t",
f"{total_slide_duration:.6f}",
"-i",
"anullsrc=channel_layout=stereo:sample_rate=48000",
"-map",
"0:v:0",
"-map",
"1:a:0",
"-vf",
vf,
"-fps_mode",
"vfr",
"-c:v",
encode_codec,
"-preset",
encode_preset,
"-crf",
str(encode_crf),
"-threads",
str(thread_count),
"-c:a",
"aac",
"-ar",
"48000",
"-ac",
"2",
"-shortest",
str(slide_segment_path),
]
)
segment_paths.append(slide_segment_path)
outro_video = cfg.get("outro_video_path")
if outro_video:
outro_segment_path = output_path.with_suffix(".outro.mp4")
_video_segment(str(outro_video), outro_segment_path)
segment_paths.append(outro_segment_path)
segment_lines = ["ffconcat version 1.0"]
segment_lines.extend(_concat_path(str(path)) for path in segment_paths)
segment_concat_path.write_text(
"\n".join(segment_lines) + "\n",
encoding="utf-8",
)
_run_ffmpeg(
[
"ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
"error",
"-f",
"concat",
"-safe",
"0",
"-i",
str(segment_concat_path),
"-c",
"copy",
"-movflags",
"+faststart",
output_video_path,
]
)
finally:
for path in [
slide_concat_path,
segment_concat_path,
*segment_paths,
]:
try:
path.unlink()
except OSError:
pass
_emit(99, "MP4 written to disk.")
return {
"presentation_path": None,
"video_path": output_video_path,
"warning": None,
"engine": "ffmpeg",
}
def _build_with_moviepy(self, cfg: dict) -> dict:
try:
from moviepy import ( # type: ignore
AudioFileClip,
CompositeAudioClip,
CompositeVideoClip,
ImageClip,
VideoFileClip,
concatenate_videoclips,
)
except ImportError as exc: # pragma: no cover - exercised on hosts w/o moviepy
raise MovieEngineUnavailableError(
"MoviePy is not installed on this host. The Linux/macOS "
"video engine requires `moviepy>=2.0` and `ffmpeg`. "
"Install with: pip install 'moviepy>=2.0' && apt-get install ffmpeg"
) from exc
image_files: List[str] = self._sort_image_files_like_powerpoint(
list(cfg.get("image_files") or [])
)
if not image_files:
raise VideoEngineError("MoviePy engine requires at least one image_file")
output_video_path: str = cfg["output_video_path"]
Path(output_video_path).parent.mkdir(parents=True, exist_ok=True)
resolution = tuple(cfg.get("resolution") or _DEFAULT_RESOLUTION)
fps = int(cfg.get("fps") or _DEFAULT_FPS)
slide_duration = float(cfg.get("slide_duration", 5.0))
encode_preset = str(cfg.get("encode_preset") or _DEFAULT_PRESET)
encode_codec = str(cfg.get("encode_codec") or _DEFAULT_CODEC)
encode_crf = self._quality_to_crf(cfg)
thread_count = int(
cfg.get("threads")
or max(1, min((os.cpu_count() or 4), 16))
)
progress_callback: Optional[Callable[[dict], None]] = cfg.get("progress_callback")
cancel_event = cfg.get("cancel_event")
def _emit(stage: str, progress: int, message: str) -> None:
if progress_callback:
try:
progress_callback(
{"stage": stage, "progress": progress, "message": message}
)
except Exception: # never let a buggy callback crash the build
logger.exception("progress_callback raised")
def _check_cancel() -> None:
if cancel_event is not None and cancel_event.is_set():
raise VideoEngineError("Cancelled by user")
opened_clips: List[Any] = []
def _track(c):
opened_clips.append(c)
return c
try:
voiceover_files: List[Optional[str]] = list(
cfg.get("voiceover_files") or [None] * len(image_files)
)
# Pad / trim so we always have one entry per slide.
if len(voiceover_files) < len(image_files):
voiceover_files += [None] * (len(image_files) - len(voiceover_files))
voiceover_files = voiceover_files[: len(image_files)]
sequence: List[Any] = []
# Layer 1 β€” Intro video
intro_video = cfg.get("intro_video_path")
if intro_video and Path(intro_video).is_file():
_emit("moviepy", 5, "Loading intro video...")
clip = _track(VideoFileClip(intro_video)).resized(resolution)
sequence.append(clip)
_check_cancel()
# Layer 2 β€” Intro thumbnail (still image)
intro_thumb = cfg.get("intro_thumbnail_path")
if intro_thumb and Path(intro_thumb).is_file():
_emit("moviepy", 8, "Adding intro thumbnail...")
clip = (
_track(ImageClip(intro_thumb))
.with_duration(float(cfg.get("intro_thumbnail_duration", 5.0)))
.resized(resolution)
)
sequence.append(clip)
_check_cancel()
# Layer 3 β€” Slides + per-slide voiceovers
_emit("moviepy", 12, f"Composing {len(image_files)} slides...")
for idx, (img_path, voice_path) in enumerate(zip(image_files, voiceover_files)):
_check_cancel()
voice_clip = None
if voice_path and Path(voice_path).is_file():
voice_clip = _track(AudioFileClip(voice_path))
duration = max(voice_clip.duration, 0.1)
else:
duration = slide_duration
slide = (
_track(ImageClip(img_path)).with_duration(duration).resized(resolution)
)
if voice_clip is not None:
slide = slide.with_audio(voice_clip)
sequence.append(slide)
# 12 β†’ 70 %: linear over slides.
pct = 12 + int(58 * (idx + 1) / max(len(image_files), 1))
_emit("moviepy", pct, f"Slide {idx + 1}/{len(image_files)} ready")
# Layer 4 β€” Outro thumbnail (the "Thanks" image)
outro_thumb = cfg.get("outro_thumbnail_path")
if outro_thumb and Path(outro_thumb).is_file():
_emit("moviepy", 72, "Adding outro thumbnail...")
clip = (
_track(ImageClip(outro_thumb))
.with_duration(float(cfg.get("outro_thumbnail_duration", 5.0)))
.resized(resolution)
)
sequence.append(clip)
_check_cancel()
# Layer 5 β€” Outro video
outro_video = cfg.get("outro_video_path")
if outro_video and Path(outro_video).is_file():
_emit("moviepy", 75, "Loading outro video...")
clip = _track(VideoFileClip(outro_video)).resized(resolution)
sequence.append(clip)
_check_cancel()
if not sequence:
raise VideoEngineError(
"MoviePy engine produced an empty timeline (no slides or intros)"
)
_emit("moviepy", 78, "Concatenating layers...")
timeline = concatenate_videoclips(sequence, method="compose")
opened_clips.append(timeline)
# Watermark β€” logo @ 50 % opacity, bottom-right, full duration.
logo_path = cfg.get("logo_path")
if logo_path and Path(logo_path).is_file():
_emit("moviepy", 82, "Compositing watermark...")
# Scale logo to ~12 % of frame width by default; users can
# supply a pre-sized PNG to override.
logo = (
_track(ImageClip(logo_path))
.with_opacity(_WATERMARK_OPACITY)
.with_duration(timeline.duration)
.with_position(("right", "bottom"))
)
timeline = CompositeVideoClip([timeline, logo], size=resolution)
opened_clips.append(timeline)
_check_cancel()
# Audio bed β€” narration over background music @ 10 % volume.
audio_layers: List[Any] = []
if timeline.audio is not None:
audio_layers.append(timeline.audio)
narration = cfg.get("narration_audio")
if narration and Path(narration).is_file():
audio_layers.append(_track(AudioFileClip(narration)))
bg_music = cfg.get("background_music")
if bg_music and Path(bg_music).is_file():
bg = _track(AudioFileClip(bg_music)).with_volume_scaled(_BG_MUSIC_VOLUME)
# Trim background music to the final timeline length so it
# doesn't extend past the last slide.
if bg.duration > timeline.duration:
bg = bg.subclipped(0, timeline.duration)
audio_layers.append(bg)
if audio_layers:
_emit("moviepy", 86, "Mixing audio bed...")
timeline = timeline.with_audio(CompositeAudioClip(audio_layers))
# Export β€” default H.264 ultrafast, with configurable knobs.
_emit(
"moviepy",
90,
f"Encoding MP4 ({encode_codec} {encode_preset}, crf={encode_crf}, threads={thread_count})...",
)
ffmpeg_logger = _ProgressBarLogger(progress_callback)
ffmpeg_params = [
"-movflags", "+faststart",
"-pix_fmt", "yuv420p",
"-crf", str(encode_crf),
]
timeline.write_videofile(
output_video_path,
codec=encode_codec,
preset=encode_preset,
fps=fps,
audio_codec="aac" if timeline.audio is not None else None,
threads=thread_count,
ffmpeg_params=ffmpeg_params,
logger=ffmpeg_logger,
)
_emit("moviepy", 99, "MP4 written to disk.")
finally:
for clip in opened_clips:
close = getattr(clip, "close", None)
if callable(close):
try:
close()
except Exception:
pass
return {
"presentation_path": None,
"video_path": output_video_path,
"warning": None,
"engine": "moviepy",
}
# ──────────────────────────────────────────────────────────────────────
# proglog adapter β€” bridges MoviePy's progress bar to our SSE callback.
# ──────────────────────────────────────────────────────────────────────
def _make_progress_logger_base():
"""Lazy import so importing video_engine doesn't require proglog."""
try:
from proglog import ProgressBarLogger # type: ignore
return ProgressBarLogger
except ImportError: # pragma: no cover
return object
_BaseLogger = _make_progress_logger_base()
class _ProgressBarLogger(_BaseLogger): # type: ignore[misc]
"""Forward MoviePy's ``bar`` events to ``progress_callback``.
MoviePy uses ``proglog`` to emit progress as it writes frames. The
``bars`` dict is keyed by bar name (``'t'`` for the timeline bar
when writing video) and exposes ``index`` / ``total``. We map that
to the 90β†’99 % band of the overall job so the SSE stream keeps
moving during the encode step.
"""
def __init__(self, progress_callback: Optional[Callable[[dict], None]]):
try:
super().__init__() # type: ignore[misc]
except Exception:
pass
self._cb = progress_callback
self._last_pct: int = -1
def bars_callback(self, bar, attr, value, old_value=None): # type: ignore[override]
if not self._cb or attr != "index":
return
bars = getattr(self, "bars", {}) or {}
info = bars.get(bar) or {}
total = info.get("total") or 0
if not total:
return
# Map encode progress into 90β†’99 so it slots after the
# composition phase already reported by ``_emit``.
pct = 90 + int(9 * (value / total))
if pct == self._last_pct:
return
self._last_pct = pct
try:
self._cb(
{
"stage": "moviepy",
"progress": pct,
"message": f"Encoding... {value}/{total}",
}
)
except Exception:
pass
__all__ = [
"VideoStudio",
"VideoEngineError",
"MovieEngineUnavailableError",
]