lbw_drs_app_new / drs_modules /video_processing.py
dschandra's picture
Upload 6 files
2db7738 verified
"""Video processing utilities for the DRS application.
This module provides helper functions to save uploaded videos to the
filesystem and to trim the last N seconds from a video. Using
OpenCV's ``VideoCapture`` and ``VideoWriter`` avoids external
dependencies like ffmpeg or moviepy, which may not be installed in
all execution environments.
"""
from __future__ import annotations
import os
import shutil
from pathlib import Path
from typing import Union
import cv2
def save_uploaded_video(name: str, file_obj: Union[bytes, str, Path]) -> str:
"""Persist an uploaded video to a predictable location on disk.
When a user records or uploads a video in the Gradio interface, it
arrives as a temporary file object. To analyse the video later we
copy it into the working directory using its original filename.
Parameters
----------
name: str
The original filename from the upload widget.
file_obj: Union[bytes, str, Path]
The file-like object representing the uploaded video. Gradio
passes the file as a ``gradio.Files`` object whose `.name`
property holds the temporary path. This function accepts
either the temporary path or an open file handle.
Returns
-------
str
The absolute path where the video has been saved.
"""
# Determine a safe output directory. Use the current working
# directory so that Gradio can later access the file by path.
output_dir = Path(os.getcwd()) / "user_videos"
output_dir.mkdir(exist_ok=True)
# Compose an output filename; avoid overwriting by prefixing with an
# incrementing integer if necessary.
base_name = Path(name).stem
ext = Path(name).suffix or ".mp4"
counter = 0
dest = output_dir / f"{base_name}{ext}"
while dest.exists():
counter += 1
dest = output_dir / f"{base_name}_{counter}{ext}"
# If file_obj is a path, simply copy it; otherwise, read and write
if isinstance(file_obj, (str, Path)):
shutil.copy(str(file_obj), dest)
else:
# Gradio passes a file-like object with a `.read()` method
with open(dest, "wb") as f_out:
f_out.write(file_obj.read())
return str(dest)
def trim_last_seconds(input_path: str, output_path: str, seconds: int) -> None:
"""Save the last ``seconds`` of a video to ``output_path``.
This function reads the entire video file, calculates the starting
frame corresponding to ``seconds`` before the end, and writes the
remaining frames to a new video using OpenCV. If the video is
shorter than the requested duration, the whole video is copied.
Parameters
----------
input_path: str
Path to the source video file.
output_path: str
Path where the trimmed video will be saved.
seconds: int
The duration from the end of the video to retain.
"""
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
raise RuntimeError(f"Unable to open video: {input_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps <= 0:
fps = 30.0 # default fallback
frames_to_keep = int(seconds * fps)
start_frame = max(total_frames - frames_to_keep, 0)
# Prepare writer with the same properties as the input
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# Skip frames until start_frame
current = 0
while current < start_frame:
ret, _ = cap.read()
if not ret:
break
current += 1
# Write remaining frames
while True:
ret, frame = cap.read()
if not ret:
break
out.write(frame)
cap.release()
out.release()