| | |
| |
|
| | import argparse |
| | import os |
| | import cv2 |
| | import numpy as np |
| | from ultralytics import YOLO |
| | from scenedetect import open_video, SceneManager, ContentDetector |
| | import torch |
| |
|
| | def parse_arguments(): |
| | """Parse command-line arguments.""" |
| | parser = argparse.ArgumentParser( |
| | description="Detect full faces in videos and capture screenshots on scene changes.", |
| | formatter_class=argparse.ArgumentDefaultsHelpFormatter |
| | ) |
| | parser.add_argument( |
| | "--input-dir", "-I", |
| | required=True, |
| | help="Directory containing input video files." |
| | ) |
| | parser.add_argument( |
| | "--output-dir", "-O", |
| | required=True, |
| | help="Directory to save screenshot outputs." |
| | ) |
| | parser.add_argument( |
| | "--min-width", "-w", |
| | type=int, |
| | default=200, |
| | help="Minimum width of face bounding box to trigger screenshot." |
| | ) |
| | parser.add_argument( |
| | "--min-height", "-m", |
| | type=int, |
| | default=200, |
| | help="Minimum height of face bounding box to trigger screenshot." |
| | ) |
| | return parser.parse_args() |
| |
|
| | def ensure_directory(directory): |
| | """Create directory if it doesn't exist.""" |
| | if not os.path.exists(directory): |
| | os.makedirs(directory) |
| |
|
| | def check_cuda(): |
| | """Check CUDA availability and return device.""" |
| | if torch.cuda.is_available(): |
| | device = torch.device("cuda") |
| | print(f"CUDA is available! Using GPU: {torch.cuda.get_device_name(0)}") |
| | print(f"CUDA version: {torch.version.cuda}") |
| | print(f"Number of GPUs: {torch.cuda.device_count()}") |
| | else: |
| | device = torch.device("cpu") |
| | print("CUDA is not available. Falling back to CPU.") |
| | return device |
| |
|
| | def is_full_face(box, frame_shape, min_width, min_height, min_proportion=0.1): |
| | """Check if the bounding box represents a full face within the frame.""" |
| | x1, y1, x2, y2 = box |
| | frame_height, frame_width = frame_shape[:2] |
| | |
| | |
| | if x1 <= 0 or y1 <= 0 or x2 >= frame_width or y2 >= frame_height: |
| | return False |
| | |
| | |
| | width = x2 - x1 |
| | height = y2 - y1 |
| | if width < min_width or height < min_height: |
| | return False |
| | |
| | |
| | if width < frame_width * min_proportion or height < frame_height * min_proportion: |
| | return False |
| | |
| | return True |
| |
|
| | def process_video(video_path, output_dir, min_width, min_height, model, device): |
| | """Process a single video for face detection and scene changes.""" |
| | |
| | try: |
| | video = open_video(video_path) |
| | scene_manager = SceneManager() |
| | scene_manager.add_detector(ContentDetector(threshold=30.0)) |
| | except Exception as e: |
| | print(f"Error initializing video for scene detection in {video_path}: {e}") |
| | return |
| |
|
| | |
| | cap = cv2.VideoCapture(video_path) |
| | if not cap.isOpened(): |
| | print(f"Error opening video file {video_path}") |
| | return |
| |
|
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| |
|
| | |
| | try: |
| | scene_manager.detect_scenes(video=video) |
| | scene_list = scene_manager.get_scene_list() |
| | scene_starts = [scene[0].get_frames() for scene in scene_list] |
| | except Exception as e: |
| | print(f"Error detecting scenes in {video_path}: {e}") |
| | cap.release() |
| | return |
| |
|
| | scene_index = 0 |
| | face_detected_in_scene = False |
| | frame_idx = 0 |
| | output_count = 0 |
| | video_name = os.path.splitext(os.path.basename(video_path))[0] |
| |
|
| | while cap.isOpened(): |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| |
|
| | |
| | if scene_index < len(scene_starts) and frame_idx >= scene_starts[scene_index]: |
| | face_detected_in_scene = False |
| | scene_index += 1 |
| | print(f"New scene detected at frame {frame_idx}") |
| |
|
| | |
| | if not face_detected_in_scene: |
| | try: |
| | results = model.predict(frame, classes=[0], conf=0.75, device=device) |
| | |
| | for result in results: |
| | boxes = result.boxes.xyxy.cpu().numpy() |
| | confidences = result.boxes.conf.cpu().numpy() |
| | classes = result.boxes.cls.cpu().numpy() |
| |
|
| | for box, conf, cls in zip(boxes, confidences, classes): |
| | if cls == 0: |
| | if is_full_face(box, frame.shape, min_width, min_height): |
| | |
| | output_path = os.path.join(output_dir, f"{video_name}_face_{output_count:04d}.png") |
| | cv2.imwrite(output_path, frame) |
| | print(f"Saved screenshot: {output_path}") |
| | output_count += 1 |
| | face_detected_in_scene = True |
| | break |
| | if face_detected_in_scene: |
| | break |
| |
|
| | except Exception as e: |
| | print(f"Error during face detection in {video_path}: {e}") |
| |
|
| | frame_idx += 1 |
| |
|
| | cap.release() |
| | print(f"Processed {video_path}: {output_count} screenshots saved.") |
| |
|
| | def main(): |
| | """Main function to process videos in input directory.""" |
| | args = parse_arguments() |
| |
|
| | |
| | if not os.path.isdir(args.input_dir): |
| | print(f"Error: Input directory '{args.input_dir}' does not exist.") |
| | return |
| |
|
| | |
| | ensure_directory(args.output_dir) |
| |
|
| | |
| | device = check_cuda() |
| |
|
| | |
| | try: |
| | model = YOLO("yolov11l.pt") |
| | model.to(device) |
| | print(f"YOLO model loaded on device: {device}") |
| | except Exception as e: |
| | print(f"Error loading YOLO model: {e}") |
| | return |
| |
|
| | |
| | video_extensions = ('.mp4', '.avi', '.mov', '.mkv') |
| |
|
| | |
| | for filename in os.listdir(args.input_dir): |
| | if filename.lower().endswith(video_extensions): |
| | video_path = os.path.join(args.input_dir, filename) |
| | print(f"Processing video: {video_path}") |
| | process_video(video_path, args.output_dir, args.min_width, args.min_height, model, device) |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|