File size: 3,906 Bytes
2db7738
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Video processing utilities for the DRS application.

This module provides helper functions to save uploaded videos to the
filesystem and to trim the last N seconds from a video.  Using
OpenCV's ``VideoCapture`` and ``VideoWriter`` avoids external
dependencies like ffmpeg or moviepy, which may not be installed in
all execution environments.
"""

from __future__ import annotations

import os
import shutil
from pathlib import Path
from typing import Union

import cv2


def save_uploaded_video(name: str, file_obj: Union[bytes, str, Path]) -> str:
    """Persist an uploaded video to a predictable location on disk.

    When a user records or uploads a video in the Gradio interface, it
    arrives as a temporary file object.  To analyse the video later we
    copy it into the working directory using its original filename.

    Parameters
    ----------
    name: str
        The original filename from the upload widget.
    file_obj: Union[bytes, str, Path]
        The file-like object representing the uploaded video.  Gradio
        passes the file as a ``gradio.Files`` object whose `.name`
        property holds the temporary path.  This function accepts
        either the temporary path or an open file handle.

    Returns
    -------
    str
        The absolute path where the video has been saved.
    """
    # Determine a safe output directory.  Use the current working
    # directory so that Gradio can later access the file by path.
    output_dir = Path(os.getcwd()) / "user_videos"
    output_dir.mkdir(exist_ok=True)

    # Compose an output filename; avoid overwriting by prefixing with an
    # incrementing integer if necessary.
    base_name = Path(name).stem
    ext = Path(name).suffix or ".mp4"
    counter = 0
    dest = output_dir / f"{base_name}{ext}"
    while dest.exists():
        counter += 1
        dest = output_dir / f"{base_name}_{counter}{ext}"

    # If file_obj is a path, simply copy it; otherwise, read and write
    if isinstance(file_obj, (str, Path)):
        shutil.copy(str(file_obj), dest)
    else:
        # Gradio passes a file-like object with a `.read()` method
        with open(dest, "wb") as f_out:
            f_out.write(file_obj.read())
    return str(dest)


def trim_last_seconds(input_path: str, output_path: str, seconds: int) -> None:
    """Save the last ``seconds`` of a video to ``output_path``.

    This function reads the entire video file, calculates the starting
    frame corresponding to ``seconds`` before the end, and writes the
    remaining frames to a new video using OpenCV.  If the video is
    shorter than the requested duration, the whole video is copied.

    Parameters
    ----------
    input_path: str
        Path to the source video file.
    output_path: str
        Path where the trimmed video will be saved.
    seconds: int
        The duration from the end of the video to retain.
    """
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise RuntimeError(f"Unable to open video: {input_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if fps <= 0:
        fps = 30.0  # default fallback
    frames_to_keep = int(seconds * fps)
    start_frame = max(total_frames - frames_to_keep, 0)

    # Prepare writer with the same properties as the input
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Skip frames until start_frame
    current = 0
    while current < start_frame:
        ret, _ = cap.read()
        if not ret:
            break
        current += 1

    # Write remaining frames
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        out.write(frame)

    cap.release()
    out.release()