Video_AdGenesis_App / utils /video_processor.py
sushilideaclan01's picture
first push
91d209c
"""
Video Processing Utilities
Handles video operations like frame extraction, trimming, and concatenation
"""
import subprocess
import os
import base64
from pathlib import Path
from typing import List, Tuple, Optional
import json
import tempfile
def get_video_info(video_path: str) -> dict:
"""
Get video metadata using ffprobe
Args:
video_path: Path to video file
Returns:
Dictionary containing video metadata
"""
try:
cmd = [
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
raise Exception(f"Failed to get video info: {e}")
except FileNotFoundError:
raise Exception("ffprobe not found. Please install ffmpeg.")
def extract_frame(
video_path: str,
timestamp: float,
output_path: Optional[str] = None,
return_base64: bool = False,
compress: bool = True
) -> str:
"""
Extract a single frame from video at specified timestamp
Args:
video_path: Path to video file
timestamp: Time in seconds to extract frame
output_path: Output path for frame (optional)
return_base64: Return base64-encoded image instead of path
compress: If False, use lossless PNG format (no compression)
Returns:
Path to extracted frame or base64 data URL
"""
try:
if compress:
# Compressed JPEG (default behavior)
if not output_path:
output_path = tempfile.mktemp(suffix='.jpg')
cmd = [
'ffmpeg',
'-ss', str(timestamp),
'-i', video_path,
'-vframes', '1',
'-q:v', '2', # JPEG quality (2 = high quality)
'-y',
output_path
]
mime_type = 'image/jpeg'
else:
# Uncompressed PNG (lossless, for continuity frames)
if not output_path:
output_path = tempfile.mktemp(suffix='.png')
cmd = [
'ffmpeg',
'-ss', str(timestamp),
'-i', video_path,
'-vframes', '1',
'-f', 'image2', # Force image format
'-pix_fmt', 'rgba', # Preserve full color information
'-y',
output_path
]
mime_type = 'image/png'
subprocess.run(cmd, capture_output=True, check=True)
if return_base64:
with open(output_path, 'rb') as f:
image_data = base64.b64encode(f.read()).decode('utf-8')
# Clean up temp file if we created it
if not output_path:
os.remove(output_path)
return f"data:{mime_type};base64,{image_data}"
return output_path
except subprocess.CalledProcessError as e:
raise Exception(f"Failed to extract frame: {e}")
except FileNotFoundError:
raise Exception("ffmpeg not found. Please install ffmpeg.")
def extract_transition_frames(
video_path: str,
transition_window: float = 1.5
) -> List[Tuple[float, str]]:
"""
Extract transition frames from the end of a video
Args:
video_path: Path to video file
transition_window: Time window before end (in seconds)
Returns:
List of (timestamp, base64_data_url) tuples
"""
try:
# Get video duration
info = get_video_info(video_path)
duration = float(info['format']['duration'])
# Calculate transition zone
transition_start = max(0, duration - transition_window)
# Extract 3 frames
timestamps = [
transition_start, # Mid-point
duration - 0.5, # Near end
duration - 0.1 # Final frame
]
frames = []
for timestamp in timestamps:
frame_data = extract_frame(video_path, timestamp, return_base64=True)
frames.append((timestamp, frame_data))
return frames
except Exception as e:
raise Exception(f"Failed to extract transition frames: {e}")
def trim_video(
input_path: str,
output_path: str,
start_time: float,
end_time: float
) -> str:
"""
Trim video to specified time range
Args:
input_path: Input video path
output_path: Output video path
start_time: Start time in seconds
end_time: End time in seconds
Returns:
Path to trimmed video
"""
try:
duration = end_time - start_time
cmd = [
'ffmpeg',
'-ss', str(start_time),
'-i', input_path,
'-t', str(duration),
'-c:v', 'libx264',
'-c:a', 'aac',
'-y',
output_path
]
subprocess.run(cmd, capture_output=True, check=True)
return output_path
except subprocess.CalledProcessError as e:
raise Exception(f"Failed to trim video: {e}")
except FileNotFoundError:
raise Exception("ffmpeg not found. Please install ffmpeg.")
def concatenate_videos(
video_paths: List[str],
output_path: str
) -> str:
"""
Concatenate multiple videos into one
Args:
video_paths: List of video file paths
output_path: Output path for concatenated video
Returns:
Path to concatenated video
"""
try:
# Create temporary file list
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
for path in video_paths:
f.write(f"file '{os.path.abspath(path)}'\n")
list_file = f.name
try:
cmd = [
'ffmpeg',
'-f', 'concat',
'-safe', '0',
'-i', list_file,
'-c', 'copy',
'-y',
output_path
]
subprocess.run(cmd, capture_output=True, check=True)
return output_path
finally:
os.remove(list_file)
except subprocess.CalledProcessError as e:
raise Exception(f"Failed to concatenate videos: {e}")
except FileNotFoundError:
raise Exception("ffmpeg not found. Please install ffmpeg.")
def generate_thumbnail(
video_path: str,
output_path: str,
width: int = 160,
height: int = 90
) -> str:
"""
Generate thumbnail from video
Args:
video_path: Path to video file
output_path: Output path for thumbnail
width: Thumbnail width
height: Thumbnail height
Returns:
Path to thumbnail
"""
try:
cmd = [
'ffmpeg',
'-i', video_path,
'-vf', f'scale={width}:{height}',
'-frames:v', '1',
'-q:v', '2',
'-y',
output_path
]
subprocess.run(cmd, capture_output=True, check=True)
return output_path
except subprocess.CalledProcessError as e:
raise Exception(f"Failed to generate thumbnail: {e}")
except FileNotFoundError:
raise Exception("ffmpeg not found. Please install ffmpeg.")