| import subprocess |
| import json |
| import os |
| import uuid |
| from typing import Optional, Tuple |
| import imageio_ffmpeg |
| from schemas import ShortsStyle, AspectRatio |
|
|
| def get_video_info_ffmpeg(video_path: str) -> dict: |
| """Get video information using FFmpeg - much faster than MoviePy""" |
| try: |
| |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() |
| |
| |
| cmd = [ |
| ffmpeg_exe, '-i', video_path, |
| '-f', 'null', '-', |
| '-hide_banner' |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| |
| |
| info_text = result.stderr |
| |
| |
| duration = 0 |
| if "Duration:" in info_text: |
| duration_line = [line for line in info_text.split('\n') if 'Duration:' in line][0] |
| duration_str = duration_line.split('Duration:')[1].split(',')[0].strip() |
| |
| parts = duration_str.split(':') |
| if len(parts) == 3: |
| duration = float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2]) |
| |
| |
| video_stream = None |
| audio_stream = None |
| width = height = fps = bitrate = 0 |
| has_audio = False |
| |
| for line in info_text.split('\n'): |
| if 'Video:' in line: |
| video_stream = line |
| |
| if ', ' in line: |
| parts = line.split(', ') |
| for part in parts: |
| if 'x' in part and part.replace('x', '').replace(' ', '').isdigit(): |
| |
| dim_part = part.strip() |
| if ' ' in dim_part: |
| dim_part = dim_part.split()[0] |
| if 'x' in dim_part and len(dim_part.split('x')) == 2: |
| try: |
| width, height = map(int, dim_part.split('x')) |
| break |
| except: |
| pass |
| |
| |
| if ' fps' in part: |
| try: |
| fps = float(part.replace(' fps', '')) |
| except: |
| pass |
| |
| |
| if ' kb/s' in part: |
| try: |
| bitrate = int(part.replace(' kb/s', '')) * 1000 |
| except: |
| pass |
| |
| if 'Audio:' in line: |
| audio_stream = line |
| has_audio = True |
| |
| |
| info = { |
| 'duration': duration, |
| 'width': width, |
| 'height': height, |
| 'fps': fps, |
| 'bitrate': bitrate, |
| 'has_audio': has_audio, |
| 'size': os.path.getsize(video_path) if os.path.exists(video_path) else 0 |
| } |
| |
| return info |
| |
| |
| video_stream = None |
| audio_stream = None |
| |
| for stream in data.get('streams', []): |
| if stream.get('codec_type') == 'video': |
| video_stream = stream |
| elif stream.get('codec_type') == 'audio': |
| audio_stream = stream |
| |
| info = { |
| 'duration': float(data.get('format', {}).get('duration', 0)), |
| 'size': int(data.get('format', {}).get('size', 0)), |
| 'has_audio': audio_stream is not None, |
| 'width': int(video_stream.get('width', 0)) if video_stream else 0, |
| 'height': int(video_stream.get('height', 0)) if video_stream else 0, |
| 'fps': eval(video_stream.get('r_frame_rate', '0')) if video_stream else 0, |
| 'bitrate': int(data.get('format', {}).get('bit_rate', 0)), |
| } |
| |
| return info |
| |
| except Exception as e: |
| print(f"Error getting video info with FFmpeg: {e}") |
| return None |
|
|
| def escape_path_for_ass(path: str) -> str: |
| """Escapes Windows paths for the FFmpeg ASS filter.""" |
| |
| path = path.replace('\\', '/') |
| |
| path = path.replace(':', '\\:') |
| return path |
|
|
| def extract_clip_ffmpeg( |
| video_path: str, |
| start_time: float, |
| end_time: float, |
| output_path: str, |
| target_width: Optional[int] = None, |
| target_height: Optional[int] = None, |
| include_audio: bool = True, |
| style: Optional[ShortsStyle] = None, |
| aspect_ratio: Optional[AspectRatio] = None, |
| bg_music_path: Optional[str] = None, |
| video_volume: float = 1.0, |
| music_volume: float = 0.2, |
| loop_music: bool = True, |
| subtitle_path: Optional[str] = None |
| ) -> str: |
| """ |
| Extract video clip using FFmpeg - high speed with advanced filters and optional subtitles |
| """ |
| try: |
| duration = end_time - start_time |
| ffmpeg_exe = imageio_ffmpeg.get_ffmpeg_exe() |
| |
| |
| video_info = get_video_info_ffmpeg(video_path) |
| has_orig_audio = video_info.get('has_audio', False) if video_info else False |
| |
| |
| inputs = ['-i', video_path] |
| if bg_music_path and os.path.exists(bg_music_path): |
| if loop_music: |
| inputs.extend(['-stream_loop', '-1', '-i', bg_music_path]) |
| else: |
| inputs.extend(['-i', bg_music_path]) |
| |
| cmd = [ffmpeg_exe] + inputs + ['-ss', str(start_time), '-t', str(duration)] |
| |
| |
| if not target_width or not target_height: |
| target_width, target_height = 1080, 1920 |
| |
| filter_complex = "" |
| |
| if aspect_ratio == AspectRatio.ORIGINAL or style == ShortsStyle.ORIGINAL: |
| |
| filter_complex = "" |
| elif style == ShortsStyle.CINEMATIC: |
| |
| |
| filter_complex = ( |
| f"[0:v]split=2[bg_raw][fg_raw];" |
| f"[bg_raw]scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height},boxblur=20:2[bg];" |
| f"[fg_raw]scale={target_width}:-2,setsar=1[fg];" |
| f"[bg][fg]overlay=(W-w)/2:(H-h)/2,setsar=1[v]" |
| ) |
| elif style == ShortsStyle.SPLIT_SCREEN: |
| |
| half_h = target_height // 2 |
| filter_complex = ( |
| f"[0:v]split=2[top_raw][bottom_raw];" |
| f"[top_raw]scale={target_width}:{half_h}:force_original_aspect_ratio=increase,crop={target_width}:{half_h}[top];" |
| f"[bottom_raw]scale={target_width}:{half_h}:force_original_aspect_ratio=increase,crop={target_width}:{half_h}[bottom];" |
| f"[top][bottom]vstack=inputs=2[v]" |
| ) |
| elif style == ShortsStyle.CROP_FILL: |
| |
| filter_complex = f"scale={target_width}:{target_height}:force_original_aspect_ratio=increase,crop={target_width}:{target_height}" |
| elif style == ShortsStyle.FIT_BARS: |
| |
| filter_complex = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2" |
| elif aspect_ratio != AspectRatio.ORIGINAL: |
| |
| |
| filter_complex = f"scale={target_width}:-2,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2,setsar=1[v]" |
| else: |
| filter_complex = "" |
|
|
| |
| if subtitle_path and os.path.exists(subtitle_path): |
| escaped_sub_path = escape_path_for_ass(subtitle_path) |
| if filter_complex: |
| if "[v]" not in filter_complex: |
| filter_complex += "[v]" |
| filter_complex += f";[v]ass='{escaped_sub_path}'[v]" |
| else: |
| filter_complex = f"[0:v]ass='{escaped_sub_path}'[v]" |
|
|
| |
| audio_filter = "" |
| if include_audio: |
| if bg_music_path and os.path.exists(bg_music_path): |
| if has_orig_audio: |
| |
| |
| audio_filter = f"[0:a]volume={video_volume}[a_orig];[1:a]volume={music_volume}[a_bg];[a_orig][a_bg]amix=inputs=2:duration=first[aout]" |
| else: |
| |
| audio_filter = f"[1:a]volume={music_volume}[aout]" |
| elif has_orig_audio and video_volume != 1.0: |
| audio_filter = f"[0:a]volume={video_volume}[aout]" |
|
|
| if filter_complex: |
| |
| if "[v]" not in filter_complex: |
| filter_complex += "[v]" |
| |
| if audio_filter: |
| combined_filter = f"{filter_complex};{audio_filter}" |
| cmd.extend(['-filter_complex', combined_filter, '-map', '[v]', '-map', '[aout]']) |
| else: |
| cmd.extend(['-filter_complex', filter_complex, '-map', '[v]']) |
| if include_audio: |
| cmd.extend(['-map', '0:a?']) |
| elif audio_filter: |
| |
| cmd.extend(['-filter_complex', audio_filter, '-map', '0:v:0', '-map', '[aout]']) |
| |
| if not include_audio: |
| cmd.extend(['-an']) |
| |
| |
| cmd.extend([ |
| '-c:v', 'libx264', |
| '-preset', 'superfast', |
| '-crf', '23', |
| '-pix_fmt', 'yuv420p', |
| '-c:a', 'aac', |
| '-b:a', '128k', |
| '-ac', '2', |
| '-y', |
| '-loglevel', 'error', |
| output_path |
| ]) |
| |
| subprocess.run(cmd, check=True, capture_output=True) |
| return output_path |
| |
| except Exception as e: |
| print(f"❌ FFmpeg advanced extraction failed: {e}") |
| raise e |
|
|
| def should_use_ffmpeg(timestamps, custom_dims, export_audio, bg_music) -> bool: |
| """ |
| Determine if FFmpeg can be used instead of MoviePy |
| FFmpeg is faster for simple operations, MoviePy for complex ones |
| """ |
| |
| |
| |
| |
| |
| |
| if bg_music: |
| return False |
| |
| if custom_dims and hasattr(custom_dims, 'audio_path') and custom_dims.audio_path: |
| return False |
| |
| |
| if len(timestamps) > 1: |
| |
| sorted_ts = sorted(timestamps, key=lambda x: x.start) |
| for i in range(1, len(sorted_ts)): |
| if sorted_ts[i].start < sorted_ts[i-1].end: |
| return False |
| |
| return True |
|
|
| def hybrid_process_clips(video_path, timestamps, output_format, custom_dims=None, export_audio=True, bg_music=None, subtitle_path=None): |
| """ |
| Hybrid approach: Use FFmpeg for simple operations, MoviePy for complex ones |
| """ |
| try: |
| |
| if should_use_ffmpeg(timestamps, custom_dims, export_audio, bg_music): |
| print("Using FFmpeg for fast processing...") |
| return process_with_ffmpeg(video_path, timestamps, output_format, custom_dims, export_audio, subtitle_path=subtitle_path) |
| else: |
| print("Using MoviePy for complex processing...") |
| |
| from video_processor import process_video_clips |
| return process_video_clips(video_path, timestamps, output_format, custom_dims, export_audio) |
| |
| except Exception as e: |
| print(f"Hybrid processing failed: {e}") |
| print("Falling back to MoviePy...") |
| from video_processor import process_video_clips |
| return process_video_clips(video_path, timestamps, output_format, custom_dims, export_audio) |
|
|
| def process_with_ffmpeg(video_path, timestamps, output_format, custom_dims=None, export_audio=True, subtitle_path=None): |
| """Process clips using FFmpeg for maximum speed""" |
| clip_paths = [] |
| |
| |
| video_info = get_video_info_ffmpeg(video_path) |
| if not video_info: |
| raise Exception("Could not get video info") |
| |
| |
| target_width, target_height = None, None |
| if custom_dims and hasattr(custom_dims, 'width') and hasattr(custom_dims, 'height'): |
| if custom_dims.width and custom_dims.height: |
| target_width = custom_dims.width |
| target_height = custom_dims.height |
| |
| |
| for i, ts in enumerate(timestamps): |
| clip_id = uuid.uuid4().hex[:8] |
| |
| |
| ext = 'mp4' |
| if output_format and hasattr(output_format, 'value'): |
| if output_format.value == 'avi': |
| ext = 'avi' |
| elif output_format.value == 'mov': |
| ext = 'mov' |
| |
| output_filename = f"{clip_id}_clip_{i+1}.{ext}" |
| |
| |
| from routers.video import PROCESSED_DIR |
| output_path = os.path.join(PROCESSED_DIR, output_filename) |
| |
| |
| try: |
| clip_path = extract_clip_ffmpeg( |
| video_path, |
| ts.start_time if hasattr(ts, 'start_time') else ts.start, |
| ts.end_time if hasattr(ts, 'end_time') else ts.end, |
| output_path, |
| target_width, |
| target_height, |
| include_audio=export_audio, |
| style=output_format if isinstance(output_format, ShortsStyle) else None, |
| subtitle_path=subtitle_path |
| ) |
| clip_paths.append(clip_path) |
| except Exception as e: |
| print(f"FFmpeg extraction failed for clip {i+1}: {e}") |
| raise e |
| |
| return clip_paths, [] |