| | """ |
| | Story-based Keyframe Extraction |
| | Generates keyframes based on meaningful story moments |
| | """ |
| |
|
| | import os |
| | import srt |
| | import cv2 |
| | import json |
| | import numpy as np |
| | from typing import List, Dict |
| | from backend.keyframes.extract_frames import extract_frames |
| | from backend.utils import copy_and_rename_file |
| |
|
| | def generate_keyframes_story(video_path: str, filtered_subtitles: List = None, max_frames: int = 12): |
| | """Generate keyframes based on story moments |
| | |
| | Args: |
| | video_path: Path to video file |
| | filtered_subtitles: List of filtered subtitle objects (if provided) |
| | max_frames: Maximum number of frames to generate |
| | """ |
| | print("๐ Generating story-based keyframes...") |
| | |
| | |
| | if filtered_subtitles: |
| | subs = filtered_subtitles |
| | print(f"Using {len(subs)} pre-filtered story moments") |
| | else: |
| | |
| | try: |
| | with open("test1.srt") as f: |
| | data = f.read() |
| | all_subs = list(srt.parse(data)) |
| | |
| | |
| | if len(all_subs) > max_frames: |
| | |
| | step = len(all_subs) // max_frames |
| | subs = all_subs[::step][:max_frames] |
| | print(f"Sampled {len(subs)} from {len(all_subs)} subtitles") |
| | else: |
| | subs = all_subs |
| | |
| | except Exception as e: |
| | print(f"โ Error reading subtitles: {e}") |
| | return False |
| | |
| | |
| | final_dir = os.path.join("frames", "final") |
| | if not os.path.exists(final_dir): |
| | os.makedirs(final_dir) |
| | |
| | |
| | for f in os.listdir(final_dir): |
| | if f.endswith('.png'): |
| | os.remove(os.path.join(final_dir, f)) |
| | |
| | frame_counter = 0 |
| | total_subs = len(subs) |
| | |
| | print(f"๐ฏ Processing {total_subs} story segments...") |
| | |
| | |
| | for i, sub in enumerate(subs): |
| | print(f"๐ Segment {i+1}/{total_subs}: {sub.content[:50]}...") |
| | |
| | |
| | sub_dir = f"frames/sub{sub.index}" |
| | if not os.path.exists(sub_dir): |
| | os.makedirs(sub_dir) |
| | |
| | try: |
| | |
| | frames_per_segment = 1 if total_subs > 10 else 2 |
| | |
| | frames = extract_frames( |
| | video_path, |
| | sub_dir, |
| | sub.start.total_seconds(), |
| | sub.end.total_seconds(), |
| | frames_per_segment |
| | ) |
| | |
| | if frames: |
| | |
| | best_frame_idx = len(frames) // 2 |
| | best_frame = frames[best_frame_idx] |
| | |
| | |
| | src = os.path.join(sub_dir, best_frame) |
| | dst_filename = f"frame{frame_counter:03d}.png" |
| | |
| | copy_and_rename_file(src, final_dir, dst_filename) |
| | frame_counter += 1 |
| | |
| | print(f"โ
Selected frame for segment {i+1}") |
| | else: |
| | print(f"โ ๏ธ No frames extracted for segment {i+1}") |
| | |
| | except Exception as e: |
| | print(f"โ Error processing segment {i+1}: {e}") |
| | continue |
| | |
| | |
| | if frame_counter < 5: |
| | print(f"โ ๏ธ Only {frame_counter} frames generated, trying to extract more...") |
| | |
| | _extract_backup_frames(video_path, final_dir, frame_counter, min(10, max_frames)) |
| | |
| | print(f"โ
Generated {frame_counter} keyframes in {final_dir}") |
| | |
| | |
| | generated_files = [f for f in os.listdir(final_dir) if f.endswith('.png')] |
| | print(f"๐ Frame files: {len(generated_files)} files in frames/final/") |
| | |
| | |
| | _save_frame_metadata(final_dir, subs[:frame_counter]) |
| | |
| | return True |
| |
|
| | def _extract_backup_frames(video_path: str, output_dir: str, start_idx: int, target_count: int): |
| | """Extract backup frames if not enough story frames""" |
| | try: |
| | cap = cv2.VideoCapture(video_path) |
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | duration = total_frames / fps |
| | |
| | |
| | interval = duration / (target_count - start_idx) |
| | |
| | for i in range(start_idx, target_count): |
| | timestamp = i * interval |
| | frame_num = int(timestamp * fps) |
| | |
| | cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) |
| | ret, frame = cap.read() |
| | |
| | if ret: |
| | output_path = os.path.join(output_dir, f"frame{i:03d}.png") |
| | cv2.imwrite(output_path, frame) |
| | print(f"โ
Extracted backup frame {i}") |
| | |
| | cap.release() |
| | |
| | except Exception as e: |
| | print(f"โ Backup frame extraction failed: {e}") |
| |
|
| | def _save_frame_metadata(output_dir: str, subtitles: List): |
| | """Save metadata about which frames correspond to which subtitles""" |
| | metadata = [] |
| | |
| | for i, sub in enumerate(subtitles): |
| | metadata.append({ |
| | 'frame': f'frame{i:03d}.png', |
| | 'subtitle': sub.content, |
| | 'start': str(sub.start), |
| | 'end': str(sub.end), |
| | 'index': sub.index |
| | }) |
| | |
| | metadata_path = os.path.join(output_dir, 'frame_metadata.json') |
| | with open(metadata_path, 'w') as f: |
| | json.dump(metadata, f, indent=2) |
| | |
| | print(f"๐พ Saved frame metadata to {metadata_path}") |