| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
|
|
| import os |
| import ffmpeg |
| import logging |
| import subprocess |
| import whisper |
| from datetime import timedelta |
| import srt |
| import re |
| from services.file_management import download_file |
| from services.cloud_storage import upload_file |
| import requests |
| from urllib.parse import urlparse |
| from config import LOCAL_STORAGE_PATH |
|
|
| |
| logger = logging.getLogger(__name__) |
| logger.setLevel(logging.INFO) |
| if not logger.hasHandlers(): |
| handler = logging.StreamHandler() |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| handler.setFormatter(formatter) |
| logger.addHandler(handler) |
|
|
| POSITION_ALIGNMENT_MAP = { |
| "bottom_left": 1, |
| "bottom_center": 2, |
| "bottom_right": 3, |
| "middle_left": 4, |
| "middle_center": 5, |
| "middle_right": 6, |
| "top_left": 7, |
| "top_center": 8, |
| "top_right": 9 |
| } |
|
|
| def rgb_to_ass_color(rgb_color): |
| """Convert RGB hex to ASS (&HAABBGGRR).""" |
| if isinstance(rgb_color, str): |
| rgb_color = rgb_color.lstrip('#') |
| if len(rgb_color) == 6: |
| r = int(rgb_color[0:2], 16) |
| g = int(rgb_color[2:4], 16) |
| b = int(rgb_color[4:6], 16) |
| return f"&H00{b:02X}{g:02X}{r:02X}" |
| return "&H00FFFFFF" |
|
|
| def generate_transcription(video_path, language='auto'): |
| try: |
| model = whisper.load_model("base") |
| transcription_options = { |
| 'word_timestamps': True, |
| 'verbose': True, |
| } |
| if language != 'auto': |
| transcription_options['language'] = language |
| result = model.transcribe(video_path, **transcription_options) |
| logger.info(f"Transcription generated successfully for video: {video_path}") |
| return result |
| except Exception as e: |
| logger.error(f"Error in transcription: {str(e)}") |
| raise |
|
|
| def get_video_resolution(video_path): |
| try: |
| probe = ffmpeg.probe(video_path) |
| video_streams = [s for s in probe['streams'] if s['codec_type'] == 'video'] |
| if video_streams: |
| width = int(video_streams[0]['width']) |
| height = int(video_streams[0]['height']) |
| logger.info(f"Video resolution determined: {width}x{height}") |
| return width, height |
| else: |
| logger.warning(f"No video streams found for {video_path}. Using default resolution 384x288.") |
| return 384, 288 |
| except Exception as e: |
| logger.error(f"Error getting video resolution: {str(e)}. Using default resolution 384x288.") |
| return 384, 288 |
|
|
| def get_available_fonts(): |
| """Get the list of available fonts on the system.""" |
| try: |
| import matplotlib.font_manager as fm |
| except ImportError: |
| logger.error("matplotlib not installed. Install via 'pip install matplotlib'.") |
| return [] |
| font_list = fm.findSystemFonts(fontpaths=None, fontext='ttf') |
| font_names = set() |
| for font in font_list: |
| try: |
| font_prop = fm.FontProperties(fname=font) |
| font_name = font_prop.get_name() |
| font_names.add(font_name) |
| except Exception: |
| continue |
| logger.info(f"Available fonts retrieved: {font_names}") |
| return list(font_names) |
|
|
| def format_ass_time(seconds): |
| """Convert float seconds to ASS time format H:MM:SS.cc""" |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = int(seconds % 60) |
| centiseconds = int(round((seconds - int(seconds)) * 100)) |
| return f"{hours}:{minutes:02}:{secs:02}.{centiseconds:02}" |
|
|
| def process_subtitle_text(text, replace_dict, all_caps, max_words_per_line): |
| """Apply text transformations: replacements, all caps, and optional line splitting.""" |
| for old_word, new_word in replace_dict.items(): |
| text = re.sub(re.escape(old_word), new_word, text, flags=re.IGNORECASE) |
| if all_caps: |
| text = text.upper() |
| if max_words_per_line > 0: |
| words = text.split() |
| lines = [' '.join(words[i:i+max_words_per_line]) for i in range(0, len(words), max_words_per_line)] |
| text = '\\N'.join(lines) |
| return text |
|
|
| def srt_to_transcription_result(srt_content): |
| """Convert SRT content into a transcription-like structure for uniform processing.""" |
| subtitles = list(srt.parse(srt_content)) |
| segments = [] |
| for sub in subtitles: |
| segments.append({ |
| 'start': sub.start.total_seconds(), |
| 'end': sub.end.total_seconds(), |
| 'text': sub.content.strip(), |
| 'words': [] |
| }) |
| logger.info("Converted SRT content to transcription result.") |
| return {'segments': segments} |
|
|
| def split_lines(text, max_words_per_line): |
| """Split text into multiple lines if max_words_per_line > 0.""" |
| if max_words_per_line <= 0: |
| return [text] |
| words = text.split() |
| lines = [' '.join(words[i:i+max_words_per_line]) for i in range(0, len(words), max_words_per_line)] |
| return lines |
|
|
| def is_url(string): |
| """Check if the given string is a valid HTTP/HTTPS URL.""" |
| try: |
| result = urlparse(string) |
| return result.scheme in ('http', 'https') |
| except: |
| return False |
|
|
| def download_captions(captions_url): |
| """Download captions from the given URL.""" |
| try: |
| logger.info(f"Downloading captions from URL: {captions_url}") |
| response = requests.get(captions_url) |
| response.raise_for_status() |
| logger.info("Captions downloaded successfully.") |
| return response.text |
| except Exception as e: |
| logger.error(f"Error downloading captions: {str(e)}") |
| raise |
|
|
| def determine_alignment_code(position_str, alignment_str, x, y, video_width, video_height): |
| """ |
| Determine the final \an alignment code and (x,y) position based on: |
| - x,y (if provided) |
| - position_str (one of top_left, top_center, ...) |
| - alignment_str (left, center, right) |
| - If x,y not provided, divide the video into a 3x3 grid and position accordingly. |
| """ |
| logger.info(f"[determine_alignment_code] Inputs: position_str={position_str}, alignment_str={alignment_str}, x={x}, y={y}, video_width={video_width}, video_height={video_height}") |
|
|
| horizontal_map = { |
| 'left': 1, |
| 'center': 2, |
| 'right': 3 |
| } |
|
|
| |
| if x is not None and y is not None: |
| logger.info("[determine_alignment_code] x and y provided, ignoring position and alignment for grid.") |
| vertical_code = 4 |
| horiz_code = horizontal_map.get(alignment_str, 2) |
| an_code = vertical_code + (horiz_code - 1) |
| logger.info(f"[determine_alignment_code] Using provided x,y. an_code={an_code}") |
| return an_code, True, x, y |
|
|
| |
| pos_lower = position_str.lower() |
| if 'top' in pos_lower: |
| vertical_base = 7 |
| vertical_center = video_height / 6 |
| elif 'middle' in pos_lower: |
| vertical_base = 4 |
| vertical_center = video_height / 2 |
| else: |
| vertical_base = 1 |
| vertical_center = (5 * video_height) / 6 |
|
|
| if 'left' in pos_lower: |
| left_boundary = 0 |
| right_boundary = video_width / 3 |
| center_line = video_width / 6 |
| elif 'right' in pos_lower: |
| left_boundary = (2 * video_width) / 3 |
| right_boundary = video_width |
| center_line = (5 * video_width) / 6 |
| else: |
| |
| left_boundary = video_width / 3 |
| right_boundary = (2 * video_width) / 3 |
| center_line = video_width / 2 |
|
|
| |
| if alignment_str == 'left': |
| final_x = left_boundary |
| horiz_code = 1 |
| elif alignment_str == 'right': |
| final_x = right_boundary |
| horiz_code = 3 |
| else: |
| final_x = center_line |
| horiz_code = 2 |
|
|
| final_y = vertical_center |
| an_code = vertical_base + (horiz_code - 1) |
|
|
| logger.info(f"[determine_alignment_code] Computed final_x={final_x}, final_y={final_y}, an_code={an_code}") |
| return an_code, True, int(final_x), int(final_y) |
|
|
| def create_style_line(style_options, video_resolution): |
| """ |
| Create the style line for ASS subtitles. |
| """ |
| font_family = style_options.get('font_family', 'Arial') |
| available_fonts = get_available_fonts() |
| if font_family not in available_fonts: |
| logger.warning(f"Font '{font_family}' not found.") |
| return {'error': f"Font '{font_family}' not available.", 'available_fonts': available_fonts} |
|
|
| line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF')) |
| secondary_color = line_color |
| outline_color = rgb_to_ass_color(style_options.get('outline_color', '#000000')) |
| box_color = rgb_to_ass_color(style_options.get('box_color', '#000000')) |
|
|
| font_size = style_options.get('font_size', int(video_resolution[1] * 0.05)) |
| bold = '1' if style_options.get('bold', False) else '0' |
| italic = '1' if style_options.get('italic', False) else '0' |
| underline = '1' if style_options.get('underline', False) else '0' |
| strikeout = '1' if style_options.get('strikeout', False) else '0' |
| scale_x = style_options.get('scale_x', '100') |
| scale_y = style_options.get('scale_y', '100') |
| spacing = style_options.get('spacing', '0') |
| angle = style_options.get('angle', '0') |
| border_style = style_options.get('border_style', '1') |
| outline_width = style_options.get('outline_width', '2') |
| shadow_offset = style_options.get('shadow_offset', '0') |
|
|
| margin_l = style_options.get('margin_l', '20') |
| margin_r = style_options.get('margin_r', '20') |
| margin_v = style_options.get('margin_v', '20') |
|
|
| |
| alignment = 5 |
|
|
| style_line = ( |
| f"Style: Default,{font_family},{font_size},{line_color},{secondary_color}," |
| f"{outline_color},{box_color},{bold},{italic},{underline},{strikeout}," |
| f"{scale_x},{scale_y},{spacing},{angle},{border_style},{outline_width}," |
| f"{shadow_offset},{alignment},{margin_l},{margin_r},{margin_v},0" |
| ) |
| logger.info(f"Created ASS style line: {style_line}") |
| return style_line |
|
|
| def generate_ass_header(style_options, video_resolution): |
| """ |
| Generate the ASS file header with the Default style. |
| """ |
| ass_header = f"""[Script Info] |
| ScriptType: v4.00+ |
| PlayResX: {video_resolution[0]} |
| PlayResY: {video_resolution[1]} |
| ScaledBorderAndShadow: yes |
| |
| [V4+ Styles] |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding |
| """ |
| style_line = create_style_line(style_options, video_resolution) |
| if isinstance(style_line, dict) and 'error' in style_line: |
| |
| return style_line |
|
|
| ass_header += style_line + "\n\n[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n" |
| logger.info("Generated ASS header.") |
| return ass_header |
|
|
| |
|
|
| def handle_classic(transcription_result, style_options, replace_dict, video_resolution): |
| """ |
| Classic style handler: Centers the text based on position and alignment. |
| """ |
| max_words_per_line = int(style_options.get('max_words_per_line', 0)) |
| all_caps = style_options.get('all_caps', False) |
| if style_options['font_size'] is None: |
| style_options['font_size'] = int(video_resolution[1] * 0.05) |
|
|
| position_str = style_options.get('position', 'middle_center') |
| alignment_str = style_options.get('alignment', 'center') |
| x = style_options.get('x') |
| y = style_options.get('y') |
|
|
| an_code, use_pos, final_x, final_y = determine_alignment_code( |
| position_str, alignment_str, x, y, |
| video_width=video_resolution[0], |
| video_height=video_resolution[1] |
| ) |
|
|
| logger.info(f"[Classic] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") |
|
|
| events = [] |
| for segment in transcription_result['segments']: |
| text = segment['text'].strip().replace('\n', ' ') |
| lines = split_lines(text, max_words_per_line) |
| processed_text = '\\N'.join(process_subtitle_text(line, replace_dict, all_caps, 0) for line in lines) |
| start_time = format_ass_time(segment['start']) |
| end_time = format_ass_time(segment['end']) |
| position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" |
| events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{processed_text}") |
| logger.info(f"Handled {len(events)} dialogues in classic style.") |
| return "\n".join(events) |
|
|
| def handle_karaoke(transcription_result, style_options, replace_dict, video_resolution): |
| """ |
| Karaoke style handler: Highlights words as they are spoken. |
| """ |
| max_words_per_line = int(style_options.get('max_words_per_line', 0)) |
| all_caps = style_options.get('all_caps', False) |
| if style_options['font_size'] is None: |
| style_options['font_size'] = int(video_resolution[1] * 0.05) |
|
|
| position_str = style_options.get('position', 'middle_center') |
| alignment_str = style_options.get('alignment', 'center') |
| x = style_options.get('x') |
| y = style_options.get('y') |
|
|
| an_code, use_pos, final_x, final_y = determine_alignment_code( |
| position_str, alignment_str, x, y, |
| video_width=video_resolution[0], |
| video_height=video_resolution[1] |
| ) |
| word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00')) |
|
|
| logger.info(f"[Karaoke] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") |
|
|
| events = [] |
| for segment in transcription_result['segments']: |
| words = segment.get('words', []) |
| if not words: |
| continue |
|
|
| if max_words_per_line > 0: |
| lines_content = [] |
| current_line = [] |
| current_line_words = 0 |
| for w_info in words: |
| w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) |
| duration_cs = int(round((w_info['end'] - w_info['start']) * 100)) |
| highlighted_word = f"{{\\k{duration_cs}}}{w} " |
| current_line.append(highlighted_word) |
| current_line_words += 1 |
| if current_line_words >= max_words_per_line: |
| lines_content.append(''.join(current_line).strip()) |
| current_line = [] |
| current_line_words = 0 |
| if current_line: |
| lines_content.append(''.join(current_line).strip()) |
| else: |
| line_content = [] |
| for w_info in words: |
| w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) |
| duration_cs = int(round((w_info['end'] - w_info['start']) * 100)) |
| highlighted_word = f"{{\\k{duration_cs}}}{w} " |
| line_content.append(highlighted_word) |
| lines_content = [''.join(line_content).strip()] |
|
|
| dialogue_text = '\\N'.join(lines_content) |
| start_time = format_ass_time(words[0]['start']) |
| end_time = format_ass_time(words[-1]['end']) |
| position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" |
| events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{word_color}}}{dialogue_text}") |
| logger.info(f"Handled {len(events)} dialogues in karaoke style.") |
| return "\n".join(events) |
|
|
| def handle_highlight(transcription_result, style_options, replace_dict, video_resolution): |
| """ |
| Highlight style handler: Highlights words sequentially. |
| """ |
| max_words_per_line = int(style_options.get('max_words_per_line', 0)) |
| all_caps = style_options.get('all_caps', False) |
| if style_options['font_size'] is None: |
| style_options['font_size'] = int(video_resolution[1] * 0.05) |
|
|
| position_str = style_options.get('position', 'middle_center') |
| alignment_str = style_options.get('alignment', 'center') |
| x = style_options.get('x') |
| y = style_options.get('y') |
|
|
| an_code, use_pos, final_x, final_y = determine_alignment_code( |
| position_str, alignment_str, x, y, |
| video_width=video_resolution[0], |
| video_height=video_resolution[1] |
| ) |
|
|
| word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00')) |
| line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF')) |
| events = [] |
|
|
| logger.info(f"[Highlight] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") |
|
|
| for segment in transcription_result['segments']: |
| words = segment.get('words', []) |
| if not words: |
| continue |
|
|
| |
| processed_words = [] |
| for w_info in words: |
| w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) |
| if w: |
| processed_words.append((w, w_info['start'], w_info['end'])) |
|
|
| if not processed_words: |
| continue |
|
|
| |
| if max_words_per_line > 0: |
| line_sets = [processed_words[i:i+max_words_per_line] for i in range(0, len(processed_words), max_words_per_line)] |
| else: |
| line_sets = [processed_words] |
|
|
| for line_set in line_sets: |
| |
| line_start = line_set[0][1] |
| line_end = line_set[-1][2] |
| |
| |
| base_text = ' '.join(word for word, _, _ in line_set) |
| start_time = format_ass_time(line_start) |
| end_time = format_ass_time(line_end) |
| position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" |
| events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{base_text}") |
| |
| |
| for idx, (word, w_start, w_end) in enumerate(line_set): |
| |
| highlighted_words = [] |
| |
| for i, (w, _, _) in enumerate(line_set): |
| if i == idx: |
| |
| highlighted_words.append(f"{{\\c{word_color}}}{w}{{\\c{line_color}}}") |
| else: |
| |
| highlighted_words.append(w) |
| |
| highlighted_text = ' '.join(highlighted_words) |
| word_start_time = format_ass_time(w_start) |
| word_end_time = format_ass_time(w_end) |
| events.append(f"Dialogue: 1,{word_start_time},{word_end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{highlighted_text}") |
|
|
| logger.info(f"Handled {len(events)} dialogues in highlight style.") |
| return "\n".join(events) |
|
|
| def handle_underline(transcription_result, style_options, replace_dict, video_resolution): |
| """ |
| Underline style handler: Underlines the current word. |
| """ |
| max_words_per_line = int(style_options.get('max_words_per_line', 0)) |
| all_caps = style_options.get('all_caps', False) |
| if style_options['font_size'] is None: |
| style_options['font_size'] = int(video_resolution[1] * 0.05) |
|
|
| position_str = style_options.get('position', 'middle_center') |
| alignment_str = style_options.get('alignment', 'center') |
| x = style_options.get('x') |
| y = style_options.get('y') |
|
|
| an_code, use_pos, final_x, final_y = determine_alignment_code( |
| position_str, alignment_str, x, y, |
| video_width=video_resolution[0], |
| video_height=video_resolution[1] |
| ) |
| line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF')) |
| events = [] |
|
|
| logger.info(f"[Underline] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") |
|
|
| for segment in transcription_result['segments']: |
| words = segment.get('words', []) |
| if not words: |
| continue |
| processed_words = [] |
| for w_info in words: |
| w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) |
| if w: |
| processed_words.append((w, w_info['start'], w_info['end'])) |
|
|
| if not processed_words: |
| continue |
|
|
| if max_words_per_line > 0: |
| line_sets = [processed_words[i:i+max_words_per_line] for i in range(0, len(processed_words), max_words_per_line)] |
| else: |
| line_sets = [processed_words] |
|
|
| for line_set in line_sets: |
| for idx, (word, w_start, w_end) in enumerate(line_set): |
| line_words = [] |
| for w_idx, (w_text, _, _) in enumerate(line_set): |
| if w_idx == idx: |
| line_words.append(f"{{\\u1}}{w_text}{{\\u0}}") |
| else: |
| line_words.append(w_text) |
| full_text = ' '.join(line_words) |
| start_time = format_ass_time(w_start) |
| end_time = format_ass_time(w_end) |
| position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" |
| events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{full_text}") |
| logger.info(f"Handled {len(events)} dialogues in underline style.") |
| return "\n".join(events) |
|
|
| def handle_word_by_word(transcription_result, style_options, replace_dict, video_resolution): |
| """ |
| Word-by-Word style handler: Displays each word individually. |
| """ |
| max_words_per_line = int(style_options.get('max_words_per_line', 0)) |
| all_caps = style_options.get('all_caps', False) |
| if style_options['font_size'] is None: |
| style_options['font_size'] = int(video_resolution[1] * 0.05) |
|
|
| position_str = style_options.get('position', 'middle_center') |
| alignment_str = style_options.get('alignment', 'center') |
| x = style_options.get('x') |
| y = style_options.get('y') |
|
|
| an_code, use_pos, final_x, final_y = determine_alignment_code( |
| position_str, alignment_str, x, y, |
| video_width=video_resolution[0], |
| video_height=video_resolution[1] |
| ) |
| word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00')) |
| events = [] |
|
|
| logger.info(f"[Word-by-Word] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") |
|
|
| for segment in transcription_result['segments']: |
| words = segment.get('words', []) |
| if not words: |
| continue |
|
|
| if max_words_per_line > 0: |
| grouped_words = [words[i:i+max_words_per_line] for i in range(0, len(words), max_words_per_line)] |
| else: |
| grouped_words = [words] |
|
|
| for word_group in grouped_words: |
| for w_info in word_group: |
| w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) |
| if not w: |
| continue |
| start_time = format_ass_time(w_info['start']) |
| end_time = format_ass_time(w_info['end']) |
| position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" |
| events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{word_color}}}{w}") |
| logger.info(f"Handled {len(events)} dialogues in word-by-word style.") |
| return "\n".join(events) |
|
|
| STYLE_HANDLERS = { |
| 'classic': handle_classic, |
| 'karaoke': handle_karaoke, |
| 'highlight': handle_highlight, |
| 'underline': handle_underline, |
| 'word_by_word': handle_word_by_word |
| } |
|
|
| def srt_to_ass(transcription_result, style_type, settings, replace_dict, video_resolution): |
| """ |
| Convert transcription result to ASS based on the specified style. |
| """ |
| default_style_settings = { |
| 'line_color': '#FFFFFF', |
| 'word_color': '#FFFF00', |
| 'box_color': '#000000', |
| 'outline_color': '#000000', |
| 'all_caps': False, |
| 'max_words_per_line': 0, |
| 'font_size': None, |
| 'font_family': 'Arial', |
| 'bold': False, |
| 'italic': False, |
| 'underline': False, |
| 'strikeout': False, |
| 'outline_width': 2, |
| 'shadow_offset': 0, |
| 'border_style': 1, |
| 'x': None, |
| 'y': None, |
| 'position': 'middle_center', |
| 'alignment': 'center' |
| } |
| style_options = {**default_style_settings, **settings} |
|
|
| if style_options['font_size'] is None: |
| style_options['font_size'] = int(video_resolution[1] * 0.05) |
|
|
| ass_header = generate_ass_header(style_options, video_resolution) |
| if isinstance(ass_header, dict) and 'error' in ass_header: |
| |
| return ass_header |
|
|
| handler = STYLE_HANDLERS.get(style_type.lower()) |
| if not handler: |
| logger.warning(f"Unknown style '{style_type}', defaulting to 'classic'.") |
| handler = handle_classic |
|
|
| dialogue_lines = handler(transcription_result, style_options, replace_dict, video_resolution) |
| logger.info("Converted transcription result to ASS format.") |
| return ass_header + dialogue_lines + "\n" |
|
|
| def process_subtitle_events(transcription_result, style_type, settings, replace_dict, video_resolution): |
| """ |
| Process transcription results into ASS subtitle format. |
| """ |
| return srt_to_ass(transcription_result, style_type, settings, replace_dict, video_resolution) |
|
|
| def parse_time_string(time_str): |
| """Parse a time string in hh:mm:ss.ms or mm:ss.ms or ss.ms format to seconds (float).""" |
| import re |
| if not isinstance(time_str, str): |
| raise ValueError("Time value must be a string in hh:mm:ss.ms format.") |
| pattern = r"^(?:(\d+):)?(\d{1,2}):(\d{2}(?:\.\d{1,3})?)$" |
| match = re.match(pattern, time_str) |
| if not match: |
| |
| try: |
| return float(time_str) |
| except Exception: |
| raise ValueError(f"Invalid time string: {time_str}") |
| h, m, s = match.groups(default="0") |
| total_seconds = int(h) * 3600 + int(m) * 60 + float(s) |
| return total_seconds |
|
|
| def filter_subtitle_lines(sub_content, exclude_time_ranges, subtitle_type): |
| """ |
| Remove subtitle lines/blocks that overlap with exclude_time_ranges. |
| Supports 'ass' and 'srt' subtitle_type. |
| """ |
|
|
| def parse_ass_time(ass_time): |
| try: |
| h, m, rest = ass_time.split(":") |
| s, cs = rest.split(".") |
| return int(h) * 3600 + int(m) * 60 + int(s) + int(cs) / 100 |
| except Exception: |
| return 0 |
| def parse_time_range(rng): |
| start = parse_time_string(rng['start']) |
| end = parse_time_string(rng['end']) |
| return {'start': start, 'end': end} |
| parsed_ranges = [parse_time_range(rng) for rng in exclude_time_ranges] |
| if not exclude_time_ranges: |
| return sub_content |
| if subtitle_type == 'ass': |
| lines = sub_content.splitlines() |
| filtered_lines = [] |
| for line in lines: |
| if line.startswith("Dialogue:"): |
| parts = line.split(",", 10) |
| if len(parts) > 3: |
| start = parse_ass_time(parts[1]) |
| end = parse_ass_time(parts[2]) |
| overlap = False |
| for rng in parsed_ranges: |
| if start < rng['end'] and end > rng['start']: |
| overlap = True |
| break |
| if overlap: |
| continue |
| filtered_lines.append(line) |
| return "\n".join(filtered_lines) |
| elif subtitle_type == 'srt': |
| subtitles = list(srt.parse(sub_content)) |
| filtered = [] |
| for sub in subtitles: |
| start = sub.start.total_seconds() |
| end = sub.end.total_seconds() |
| overlap = False |
| for rng in parsed_ranges: |
| if start < rng['end'] and end > rng['start']: |
| overlap = True |
| break |
| if not overlap: |
| filtered.append(sub) |
| return srt.compose(filtered) |
| else: |
| return sub_content |
|
|
| def normalize_exclude_time_ranges(exclude_time_ranges): |
| norm = [] |
| for rng in exclude_time_ranges: |
| start = rng.get("start") |
| end = rng.get("end") |
| if not isinstance(start, str) or not isinstance(end, str): |
| raise ValueError("exclude_time_ranges start/end must be strings in hh:mm:ss.ms format.") |
| start_sec = parse_time_string(start) |
| end_sec = parse_time_string(end) |
| if start_sec < 0 or end_sec < 0: |
| raise ValueError("exclude_time_ranges start/end must be non-negative.") |
| if end_sec <= start_sec: |
| raise ValueError("exclude_time_ranges end must be strictly greater than start.") |
| norm.append({"start": start, "end": end}) |
| return norm |
|
|
| def generate_ass_captions_v1(video_url, captions, settings, replace, exclude_time_ranges, job_id, language='auto', PlayResX=None, PlayResY=None): |
| """ |
| Captioning process with transcription fallback and multiple styles. |
| Integrates with the updated logic for positioning and alignment. |
| If PlayResX and PlayResY are provided, use them for ASS generation; otherwise, get from video. |
| """ |
| try: |
| |
| if exclude_time_ranges: |
| exclude_time_ranges = normalize_exclude_time_ranges(exclude_time_ranges) |
|
|
| if not isinstance(settings, dict): |
| logger.error(f"Job {job_id}: 'settings' should be a dictionary.") |
| return {"error": "'settings' should be a dictionary."} |
|
|
| |
| style_options = {k.replace('-', '_'): v for k, v in settings.items()} |
|
|
| if not isinstance(replace, list): |
| logger.error(f"Job {job_id}: 'replace' should be a list of objects with 'find' and 'replace' keys.") |
| return {"error": "'replace' should be a list of objects with 'find' and 'replace' keys."} |
|
|
| |
| replace_dict = {} |
| for item in replace: |
| if 'find' in item and 'replace' in item: |
| replace_dict[item['find']] = item['replace'] |
| else: |
| logger.warning(f"Job {job_id}: Invalid replace item {item}. Skipping.") |
|
|
| |
| if 'highlight_color' in style_options: |
| logger.warning(f"Job {job_id}: 'highlight_color' is deprecated; merging into 'word_color'.") |
| style_options['word_color'] = style_options.pop('highlight_color') |
|
|
| |
| font_family = style_options.get('font_family', 'Arial') |
| available_fonts = get_available_fonts() |
| if font_family not in available_fonts: |
| logger.warning(f"Job {job_id}: Font '{font_family}' not found.") |
| |
| return {"error": f"Font '{font_family}' not available.", "available_fonts": available_fonts} |
|
|
| logger.info(f"Job {job_id}: Font '{font_family}' is available.") |
|
|
| |
| if captions and is_url(captions): |
| logger.info(f"Job {job_id}: Captions provided as URL. Downloading captions.") |
| try: |
| captions_content = download_captions(captions) |
| except Exception as e: |
| logger.error(f"Job {job_id}: Failed to download captions: {str(e)}") |
| return {"error": f"Failed to download captions: {str(e)}"} |
| elif captions: |
| logger.info(f"Job {job_id}: Captions provided as raw content.") |
| captions_content = captions |
| else: |
| captions_content = None |
|
|
| |
| try: |
| video_path = download_file(video_url, LOCAL_STORAGE_PATH) |
| logger.info(f"Job {job_id}: Video downloaded to {video_path}") |
| except Exception as e: |
| logger.error(f"Job {job_id}: Video download error: {str(e)}") |
| |
| return {"error": str(e)} |
|
|
| |
| if PlayResX is not None and PlayResY is not None: |
| video_resolution = (PlayResX, PlayResY) |
| logger.info(f"Job {job_id}: Using provided PlayResX/PlayResY = {PlayResX}x{PlayResY}") |
| else: |
| video_resolution = get_video_resolution(video_path) |
| logger.info(f"Job {job_id}: Video resolution detected = {video_resolution[0]}x{video_resolution[1]}") |
|
|
| |
| style_type = style_options.get('style', 'classic').lower() |
| logger.info(f"Job {job_id}: Using style '{style_type}' for captioning.") |
|
|
| |
| if captions_content: |
| |
| if '[Script Info]' in captions_content: |
| |
| subtitle_content = captions_content |
| subtitle_type = 'ass' |
| logger.info(f"Job {job_id}: Detected ASS formatted captions.") |
| else: |
| |
| logger.info(f"Job {job_id}: Detected SRT formatted captions.") |
| |
| if style_type != 'classic': |
| error_message = "Only 'classic' style is supported for SRT captions." |
| logger.error(f"Job {job_id}: {error_message}") |
| return {"error": error_message} |
| transcription_result = srt_to_transcription_result(captions_content) |
| |
| subtitle_content = process_subtitle_events(transcription_result, style_type, style_options, replace_dict, video_resolution) |
| subtitle_type = 'ass' |
| else: |
| |
| logger.info(f"Job {job_id}: No captions provided, generating transcription.") |
| transcription_result = generate_transcription(video_path, language=language) |
| |
| subtitle_content = process_subtitle_events(transcription_result, style_type, style_options, replace_dict, video_resolution) |
| subtitle_type = 'ass' |
|
|
| |
| if isinstance(subtitle_content, dict) and 'error' in subtitle_content: |
| logger.error(f"Job {job_id}: {subtitle_content['error']}") |
| |
| if 'available_fonts' in subtitle_content: |
| return {"error": subtitle_content['error'], "available_fonts": subtitle_content.get('available_fonts', [])} |
| else: |
| return {"error": subtitle_content['error']} |
|
|
| |
| if exclude_time_ranges: |
| subtitle_content = filter_subtitle_lines(subtitle_content, exclude_time_ranges, subtitle_type) |
| if subtitle_type == 'ass': |
| logger.info(f"Job {job_id}: Filtered ASS Dialogue lines due to exclude_time_ranges.") |
| elif subtitle_type == 'srt': |
| logger.info(f"Job {job_id}: Filtered SRT subtitle blocks due to exclude_time_ranges.") |
|
|
| |
| subtitle_filename = f"{job_id}.{subtitle_type}" |
| subtitle_path = os.path.join(LOCAL_STORAGE_PATH, subtitle_filename) |
| try: |
| with open(subtitle_path, 'w', encoding='utf-8') as f: |
| f.write(subtitle_content) |
| logger.info(f"Job {job_id}: Subtitle file saved to {subtitle_path}") |
| except Exception as e: |
| logger.error(f"Job {job_id}: Failed to save subtitle file: {str(e)}") |
| return {"error": f"Failed to save subtitle file: {str(e)}"} |
|
|
| return subtitle_path |
| except Exception as e: |
| logger.error(f"Job {job_id}: Error in generate_ass_captions_v1: {str(e)}", exc_info=True) |
| return {"error": str(e)} |
|
|