import os import tempfile import gradio as gr import ffmpeg import logging import whisper as openai_whisper # Renamed to avoid potential conflicts import numpy as np import torch import datetime import subprocess import shlex from pathlib import Path import re # For parsing ASS/SRT # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Define fonts directory - adapt for Hugging Face environment if needed FONTS_DIR = '/usr/share/fonts/truetype' # Common Linux font location # Check common font locations for other OS if needed if not os.path.exists(FONTS_DIR) and os.path.exists('/System/Library/Fonts'): # macOS FONTS_DIR = '/System/Library/Fonts' elif not os.path.exists(FONTS_DIR) and os.path.exists('C:\Windows\Fonts'): # Windows FONTS_DIR = 'C:\Windows\Fonts' FONT_PATHS = {} ACCEPTABLE_FONTS = ['Arial', 'Helvetica', 'Times New Roman'] # Start with common fallbacks try: if FONTS_DIR and os.path.exists(FONTS_DIR): logger.info(f"Searching for fonts in: {FONTS_DIR}") found_fonts = [] for root, dirs, files in os.walk(FONTS_DIR): for file in files: if file.lower().endswith(('.ttf', '.otf', '.ttc')): font_path = os.path.join(root, file) font_name = os.path.splitext(file)[0] # Basic name cleanup base_font_name = re.sub(r'[-_ ]?(bold|italic|regular|medium|light|condensed)?$', '', font_name, flags=re.IGNORECASE) if base_font_name not in FONT_PATHS: FONT_PATHS[base_font_name] = font_path found_fonts.append(base_font_name) if found_fonts: ACCEPTABLE_FONTS = sorted(list(set(found_fonts + ACCEPTABLE_FONTS))) logger.info(f"Found system fonts: {ACCEPTABLE_FONTS}") else: logger.warning(f"No font files found in {FONTS_DIR}. Using defaults.") else: logger.warning(f"Font directory {FONTS_DIR} not found. Using defaults: {ACCEPTABLE_FONTS}") except Exception as e: logger.warning(f"Could not load system fonts from {FONTS_DIR}: {e}. Using defaults: {ACCEPTABLE_FONTS}") # Global variable for Whisper model to avoid reloading whisper_model = None def generate_style_line(options): """Generate ASS style line from options. Uses common defaults. Ensure color format is correct (&HBBGGRRAA or &HAABBGGRR depending on FFmpeg build) Using &HBBGGRR format for PrimaryColour based on common FFmpeg usage. """ # Convert hex color picker (#FFFFFF) to ASS format (&HBBGGRR) def hex_to_ass_bgr(hex_color): hex_color = hex_color.lstrip('#') if len(hex_color) == 6: r, g, b = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) return f"&H{b:02X}{g:02X}{r:02X}" return '&H00FFFFFF' # Default to white if format is wrong primary_color_ass = hex_to_ass_bgr(options.get('primary_color', '#FFFFFF')) style_options = { 'Name': 'Default', 'Fontname': options.get('font_name', 'Arial'), # Ensure this font is accessible to FFmpeg 'Fontsize': options.get('font_size', 24), 'PrimaryColour': primary_color_ass, 'SecondaryColour': '&H000000FF', # Often unused, but good to define 'OutlineColour': '&H00000000', # Black outline 'BackColour': '&H80000000', # Semi-transparent black background/shadow 'Bold': 0, # Use -1 for True, 0 for False in ASS 'Italic': 0, 'Underline': 0, 'StrikeOut': 0, 'ScaleX': 100, 'ScaleY': 100, 'Spacing': 0, 'Angle': 0, 'BorderStyle': 1, # 1 = Outline + Shadow 'Outline': 2, # Outline thickness 'Shadow': 1, # Shadow distance 'Alignment': options.get('alignment', 2), # 2 = Bottom Center 'MarginL': 10, 'MarginR': 10, 'MarginV': 10, # Bottom margin 'Encoding': 1 # Default ANSI encoding } logger.info(f"Generated ASS Style Options: {style_options}") return f"Style: {','.join(map(str, style_options.values()))}" def transcribe_audio(audio_path, progress=None): """Transcribe audio using Whisper ASR model.""" global whisper_model logger.info(f"Starting transcription for: {audio_path}") try: if whisper_model is None: safe_progress_update(progress, 0.1, "Loading Whisper model...") device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Using device: {device} for Whisper") # Use a smaller model if only CPU is available to potentially speed things up model_size = "base" if device == "cuda" else "tiny.en" # or "tiny" logger.info(f"Loading Whisper model size: {model_size}") whisper_model = openai_whisper.load_model(model_size, device=device) safe_progress_update(progress, 0.3, "Model loaded, processing audio...") result = whisper_model.transcribe(audio_path, fp16=torch.cuda.is_available()) logger.info(f"Transcription result (first 100 chars): {str(result)[:100]}") safe_progress_update(progress, 0.7, "Transcription complete, formatting captions...") return result except Exception as e: logger.exception(f"Error transcribing audio: {audio_path}") # Use logger.exception to include traceback raise def format_time(seconds): """Format time in SRT/ASS format (H:MM:SS.ms).""" # ASS format uses H:MM:SS.xx (hundredths of a second) hundredths = int((seconds % 1) * 100) s = int(seconds) % 60 m = int(seconds / 60) % 60 h = int(seconds / 3600) return f"{h}:{m:02d}:{s:02d}.{hundredths:02d}" def format_time_srt(seconds): """Format time in SRT format (HH:MM:SS,ms).""" ms = int((seconds % 1) * 1000) s = int(seconds) % 60 m = int(seconds / 60) % 60 h = int(seconds / 3600) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" def generate_srt_from_transcript(segments): """Convert whisper segments to SRT format.""" srt_content = "" for i, segment in enumerate(segments): start_time = format_time_srt(segment["start"]) end_time = format_time_srt(segment["end"]) text = segment["text"].strip() srt_content += f"{i+1}\n{start_time} --> {end_time}\n{text}\n\n" logger.info(f"Generated SRT (first 200 chars): {srt_content[:200]}") return srt_content.strip() def generate_ass_dialogue_line(segment, style_name='Default'): """Generate a single ASS dialogue line from a segment.""" start_time = format_time(segment["start"]) end_time = format_time(segment["end"]) text = segment["text"].strip().replace('\n', '\\N') # Replace newline with ASS newline # Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text return f"Dialogue: 0,{start_time},{end_time},{style_name},,0,0,0,,{text}" def generate_ass_from_transcript(segments, style_options): """Convert whisper segments to ASS format including style header.""" style_line = generate_style_line(style_options) ass_header = f""" [Script Info] Title: Generated Captions ScriptType: v4.00+ WrapStyle: 0 PlayResX: 384 PlayResY: 288 ScaledBorderAndShadow: yes [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding {style_line} [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ dialogue_lines = [generate_ass_dialogue_line(seg) for seg in segments] full_ass_content = ass_header + "\n".join(dialogue_lines) logger.info(f"Generated ASS (first 300 chars): {full_ass_content[:300]}") return full_ass_content def extract_audio(video_path, output_path): """Extract audio from video file using ffmpeg subprocess.""" logger.info(f"Attempting to extract audio from {video_path} to {output_path}") try: command = [ "ffmpeg", "-i", video_path, "-vn", # No video "-acodec", "pcm_s16le", # Standard WAV format "-ac", "1", # Mono "-ar", "16000", # 16kHz sample rate (common for ASR) "-y", # Overwrite output output_path ] logger.info(f"Running audio extraction command: {' '.join(map(shlex.quote, command))}") process = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', # Explicitly set encoding check=False ) if process.returncode != 0: logger.error(f"FFmpeg audio extraction error (Code {process.returncode}):\nSTDOUT:\n{process.stdout}\nSTDERR:\n{process.stderr}") return False, f"FFmpeg failed (Code {process.returncode}): {process.stderr[:500]}..." if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: logger.error(f"Audio extraction failed: Output file not created or empty. FFmpeg stderr: {process.stderr}") return False, f"Output audio file not created or empty. FFmpeg stderr: {process.stderr[:500]}..." logger.info(f"Audio extracted successfully to {output_path}, size: {os.path.getsize(output_path)} bytes") return True, "" except Exception as e: logger.exception(f"Exception during audio extraction from {video_path}") return False, str(e) def run_ffmpeg_with_subtitles(video_path, subtitle_path, output_path, style_options=None): """Burn subtitles into video using ffmpeg subprocess. Args: video_path: Path to input video subtitle_path: Path to ASS subtitle file output_path: Path to save output video style_options: Optional style parameters (not directly used, but kept for consistency) Returns: tuple: (success, error_message) """ logger.info(f"Attempting to burn subtitles from {subtitle_path} into {video_path}") # Check if the subtitle file exists and is not empty if not os.path.exists(subtitle_path) or os.path.getsize(subtitle_path) == 0: return False, f"Subtitle file {subtitle_path} does not exist or is empty" # Check if the video file exists if not os.path.exists(video_path): return False, f"Video file {video_path} does not exist" # Validate the video file using ffprobe try: probe_cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name,width,height", "-of", "json", video_path ] probe_result = subprocess.run( probe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8' ) if probe_result.returncode != 0: logger.error(f"FFprobe validation failed: {probe_result.stderr}") return False, f"FFprobe validation failed: {probe_result.stderr[:200]}..." except Exception as e: logger.exception(f"Exception during video validation: {video_path}") return False, f"Video validation failed: {str(e)}" try: # The subtitle path needs to be properly escaped for the filter complex # On Windows, backslashes need special handling subtitle_path_esc = subtitle_path.replace('\\', '\\\\') # Ensure paths are properly quoted for the shell command command = [ "ffmpeg", "-i", video_path, "-vf", f"ass='{subtitle_path_esc}'", "-c:v", "libx264", # Use H.264 codec for broad compatibility "-preset", "medium", # Balance between speed and quality "-crf", "23", # Reasonable quality setting (lower is better) "-c:a", "aac", # Use AAC for audio "-b:a", "128k", # Decent audio bitrate "-movflags", "+faststart", # Optimize for web playback "-y", # Overwrite output if exists output_path ] logger.info(f"Running subtitle burn command: {' '.join(map(shlex.quote, command))}") process = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', check=False ) if process.returncode != 0: logger.error(f"FFmpeg subtitle burn error (Code {process.returncode}):\nSTDOUT:\n{process.stdout}\nSTDERR:\n{process.stderr}") return False, f"FFmpeg failed (Code {process.returncode}): {process.stderr[:500]}..." # Verify output file was created and is not empty if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: logger.error(f"Subtitle burning failed: Output file not created or empty. FFmpeg stderr: {process.stderr}") return False, f"Output video file not created or empty. FFmpeg stderr: {process.stderr[:500]}..." logger.info(f"Subtitles burned successfully, output: {output_path}, size: {os.path.getsize(output_path)} bytes") return True, "" except Exception as e: logger.exception(f"Exception during subtitle burning: {video_path}") return False, str(e) def safe_progress_update(progress_callback, value, desc=""): """Safely update progress without crashing if progress_callback is None or fails.""" if progress_callback is not None: try: progress_callback(value, desc) except Exception as e: # Avoid flooding logs for simple progress updates # logger.warning(f"Progress update progress failed: {e}") pass # Silently ignore progress update errors def parse_srt_to_dialogue(srt_content): """Basic SRT parser to list of dialogue events for ASS conversion.""" dialogue = [] # Regex to find index, timecodes, and text blocks # Allows comma or period for milliseconds separator pattern = re.compile( r'^\s*(\d+)\s*$\n?' # Index line r'(\d{1,2}):(\d{2}):(\d{2})[,.](\d{3})\s*-->\s*' # Start time r'(\d{1,2}):(\d{2}):(\d{2})[,.](\d{3})\s*$\n' # End time r'(.*?)(?=\n\s*\n\d+\s*$|\Z)', # Text block (non-greedy) until blank line and next index or end of string re.DOTALL | re.MULTILINE ) logger.info("Attempting to parse SRT/VTT content...") matches_found = 0 last_index = 0 for match in pattern.finditer(srt_content): matches_found += 1 try: index = int(match.group(1)) sh, sm, ss, sms = map(int, match.group(2, 3, 4, 5)) eh, em, es, ems = map(int, match.group(6, 7, 8, 9)) start_sec = sh * 3600 + sm * 60 + ss + sms / 1000.0 end_sec = eh * 3600 + em * 60 + es + ems / 1000.0 text = match.group(10).strip().replace('\n', '\\N') # Replace newline with ASS \N # Basic validation if end_sec < start_sec: logger.warning(f"SRT parse warning: End time {end_sec} before start time {start_sec} at index {index}. Skipping.") continue if not text: logger.warning(f"SRT parse warning: Empty text content at index {index}. Skipping.") continue dialogue.append({'start': start_sec, 'end': end_sec, 'text': text}) last_index = match.end() except Exception as e: logger.warning(f"Could not parse SRT block starting near index {match.group(1)}: {e}") # Check if parsing consumed a reasonable amount of the input if matches_found > 0 and last_index < len(srt_content) * 0.8: logger.warning(f"SRT parsing finished early. Found {matches_found} blocks, but stopped near character {last_index} of {len(srt_content)}. Input format might be inconsistent.") elif matches_found == 0 and len(srt_content) > 10: logger.error(f"SRT parsing failed. No dialogue blocks found in content starting with: {srt_content[:100]}...") logger.info(f"Parsed {len(dialogue)} dialogue events from SRT/VTT content.") return dialogue def parse_ass_to_dialogue(ass_content): """Basic ASS parser to extract dialogue events.""" dialogue = [] # Regex for ASS Dialogue line - make capturing groups non-optional where possible # Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text pattern = re.compile( r'^Dialogue:\s*' r'(?P\d+),\s*' r'(?P\d+:\d{2}:\d{2}\.\d{2}),\s*' r'(?P\d+:\d{2}:\d{2}\.\d{2}),\s*' r'(?P