Spaces:
Sleeping
Sleeping
| """ | |
| Video Processing Utilities | |
| Handles video operations like frame extraction, trimming, and concatenation | |
| """ | |
| import subprocess | |
| import os | |
| import base64 | |
| from pathlib import Path | |
| from typing import List, Tuple, Optional | |
| import json | |
| import tempfile | |
| def get_video_info(video_path: str) -> dict: | |
| """ | |
| Get video metadata using ffprobe | |
| Args: | |
| video_path: Path to video file | |
| Returns: | |
| Dictionary containing video metadata | |
| """ | |
| try: | |
| cmd = [ | |
| 'ffprobe', | |
| '-v', 'quiet', | |
| '-print_format', 'json', | |
| '-show_format', | |
| '-show_streams', | |
| video_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return json.loads(result.stdout) | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"Failed to get video info: {e}") | |
| except FileNotFoundError: | |
| raise Exception("ffprobe not found. Please install ffmpeg.") | |
| def extract_frame( | |
| video_path: str, | |
| timestamp: float, | |
| output_path: Optional[str] = None, | |
| return_base64: bool = False, | |
| compress: bool = True | |
| ) -> str: | |
| """ | |
| Extract a single frame from video at specified timestamp | |
| Args: | |
| video_path: Path to video file | |
| timestamp: Time in seconds to extract frame | |
| output_path: Output path for frame (optional) | |
| return_base64: Return base64-encoded image instead of path | |
| compress: If False, use lossless PNG format (no compression) | |
| Returns: | |
| Path to extracted frame or base64 data URL | |
| """ | |
| try: | |
| if compress: | |
| # Compressed JPEG (default behavior) | |
| if not output_path: | |
| output_path = tempfile.mktemp(suffix='.jpg') | |
| cmd = [ | |
| 'ffmpeg', | |
| '-ss', str(timestamp), | |
| '-i', video_path, | |
| '-vframes', '1', | |
| '-q:v', '2', # JPEG quality (2 = high quality) | |
| '-y', | |
| output_path | |
| ] | |
| mime_type = 'image/jpeg' | |
| else: | |
| # Uncompressed PNG (lossless, for continuity frames) | |
| if not output_path: | |
| output_path = tempfile.mktemp(suffix='.png') | |
| cmd = [ | |
| 'ffmpeg', | |
| '-ss', str(timestamp), | |
| '-i', video_path, | |
| '-vframes', '1', | |
| '-f', 'image2', # Force image format | |
| '-pix_fmt', 'rgba', # Preserve full color information | |
| '-y', | |
| output_path | |
| ] | |
| mime_type = 'image/png' | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| if return_base64: | |
| with open(output_path, 'rb') as f: | |
| image_data = base64.b64encode(f.read()).decode('utf-8') | |
| # Clean up temp file if we created it | |
| if not output_path: | |
| os.remove(output_path) | |
| return f"data:{mime_type};base64,{image_data}" | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"Failed to extract frame: {e}") | |
| except FileNotFoundError: | |
| raise Exception("ffmpeg not found. Please install ffmpeg.") | |
| def extract_transition_frames( | |
| video_path: str, | |
| transition_window: float = 1.5 | |
| ) -> List[Tuple[float, str]]: | |
| """ | |
| Extract transition frames from the end of a video | |
| Args: | |
| video_path: Path to video file | |
| transition_window: Time window before end (in seconds) | |
| Returns: | |
| List of (timestamp, base64_data_url) tuples | |
| """ | |
| try: | |
| # Get video duration | |
| info = get_video_info(video_path) | |
| duration = float(info['format']['duration']) | |
| # Calculate transition zone | |
| transition_start = max(0, duration - transition_window) | |
| # Extract 3 frames | |
| timestamps = [ | |
| transition_start, # Mid-point | |
| duration - 0.5, # Near end | |
| duration - 0.1 # Final frame | |
| ] | |
| frames = [] | |
| for timestamp in timestamps: | |
| frame_data = extract_frame(video_path, timestamp, return_base64=True) | |
| frames.append((timestamp, frame_data)) | |
| return frames | |
| except Exception as e: | |
| raise Exception(f"Failed to extract transition frames: {e}") | |
| def trim_video( | |
| input_path: str, | |
| output_path: str, | |
| start_time: float, | |
| end_time: float | |
| ) -> str: | |
| """ | |
| Trim video to specified time range | |
| Args: | |
| input_path: Input video path | |
| output_path: Output video path | |
| start_time: Start time in seconds | |
| end_time: End time in seconds | |
| Returns: | |
| Path to trimmed video | |
| """ | |
| try: | |
| duration = end_time - start_time | |
| cmd = [ | |
| 'ffmpeg', | |
| '-ss', str(start_time), | |
| '-i', input_path, | |
| '-t', str(duration), | |
| '-c:v', 'libx264', | |
| '-c:a', 'aac', | |
| '-y', | |
| output_path | |
| ] | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"Failed to trim video: {e}") | |
| except FileNotFoundError: | |
| raise Exception("ffmpeg not found. Please install ffmpeg.") | |
| def concatenate_videos( | |
| video_paths: List[str], | |
| output_path: str | |
| ) -> str: | |
| """ | |
| Concatenate multiple videos into one | |
| Args: | |
| video_paths: List of video file paths | |
| output_path: Output path for concatenated video | |
| Returns: | |
| Path to concatenated video | |
| """ | |
| try: | |
| # Create temporary file list | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: | |
| for path in video_paths: | |
| f.write(f"file '{os.path.abspath(path)}'\n") | |
| list_file = f.name | |
| try: | |
| cmd = [ | |
| 'ffmpeg', | |
| '-f', 'concat', | |
| '-safe', '0', | |
| '-i', list_file, | |
| '-c', 'copy', | |
| '-y', | |
| output_path | |
| ] | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| return output_path | |
| finally: | |
| os.remove(list_file) | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"Failed to concatenate videos: {e}") | |
| except FileNotFoundError: | |
| raise Exception("ffmpeg not found. Please install ffmpeg.") | |
| def generate_thumbnail( | |
| video_path: str, | |
| output_path: str, | |
| width: int = 160, | |
| height: int = 90 | |
| ) -> str: | |
| """ | |
| Generate thumbnail from video | |
| Args: | |
| video_path: Path to video file | |
| output_path: Output path for thumbnail | |
| width: Thumbnail width | |
| height: Thumbnail height | |
| Returns: | |
| Path to thumbnail | |
| """ | |
| try: | |
| cmd = [ | |
| 'ffmpeg', | |
| '-i', video_path, | |
| '-vf', f'scale={width}:{height}', | |
| '-frames:v', '1', | |
| '-q:v', '2', | |
| '-y', | |
| output_path | |
| ] | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| return output_path | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"Failed to generate thumbnail: {e}") | |
| except FileNotFoundError: | |
| raise Exception("ffmpeg not found. Please install ffmpeg.") | |