Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| import time | |
| import os | |
| from ultralytics import YOLO | |
| from PIL import Image | |
| import matplotlib.pyplot as plt | |
| def detect_and_blur(input_source, model=None, frame_skip=3): | |
| """Detect and blur sensitive elements in images or video frames | |
| Args: | |
| input_source: Image path or video frame (numpy array) | |
| model: YOLO model instance | |
| frame_skip: Frame skip rate for video processing | |
| Returns: | |
| result_rgb: Processed image with blurred regions | |
| detections: Dict with counts of detected objects | |
| boxes: Dict with bounding boxes of detected objects | |
| """ | |
| if isinstance(input_source, str): # Image path | |
| frame = cv2.imread(input_source) | |
| if frame is None: | |
| raise ValueError(f"Could not read image from {input_source}") | |
| else: # Video frame or numpy array | |
| frame = input_source.copy() | |
| # Handle RGB vs BGR input | |
| if len(frame.shape) == 3 and frame.shape[2] == 3: | |
| if isinstance(input_source, str) or input_source is not None: | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| else: | |
| frame_rgb = frame # Assume RGB if directly passed | |
| else: | |
| raise ValueError("Input must be a color image with 3 channels") | |
| result_img = frame.copy() | |
| detections = {'faces': 0, 'plates': 0} | |
| boxes = {'faces': [], 'plates': []} | |
| if model: | |
| try: | |
| # Run YOLOv8 inference | |
| results = model.predict(frame_rgb, conf=0.5) | |
| for r in results: | |
| for box in r.boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0].tolist()) | |
| cls_id = int(box.cls[0]) | |
| conf = float(box.conf[0]) | |
| # Ensure coordinates are within image bounds | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2) | |
| # Skip invalid boxes | |
| if x2 <= x1 or y2 <= y1: | |
| continue | |
| # Apply Gaussian blur to the detected region | |
| region = result_img[y1:y2, x1:x2] | |
| # Adjust kernel size based on detection type and region size | |
| kernel_size = 55 if cls_id == 1 else 71 # Different blur for faces vs plates/text | |
| kernel_size = max(25, min(kernel_size, (x2-x1)//2*2+1, (y2-y1)//2*2+1)) | |
| kernel_size = kernel_size + 1 if kernel_size % 2 == 0 else kernel_size | |
| # Apply blur only if kernel size is valid | |
| if kernel_size >= 3: | |
| blurred = cv2.GaussianBlur(region, (kernel_size, kernel_size), 15) | |
| result_img[y1:y2, x1:x2] = blurred | |
| # Update detection counts and boxes | |
| if cls_id == 0: # Assuming 0 is plate/text | |
| detections['plates'] += 1 | |
| boxes['plates'].append((x1, y1, x2, y2)) | |
| else: # Assuming 1 is face | |
| detections['faces'] += 1 | |
| boxes['faces'].append((x1, y1, x2, y2)) | |
| except Exception as e: | |
| print(f"Detection error: {e}") | |
| # Ensure output is RGB for consistent interface | |
| if isinstance(input_source, str) or input_source is not None: | |
| result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| else: | |
| result_rgb = result_img | |
| return result_rgb, detections, boxes | |
| def process_video(input_path, output_path=None, model=None, frame_skip=3, output_fps=30): | |
| """Process video with optimized frame skipping | |
| Args: | |
| input_path: Path to input video | |
| output_path: Path to save processed video (if None, auto-generated) | |
| model: YOLO model instance | |
| frame_skip: Process 1 in every N frames | |
| output_fps: Output video frame rate | |
| Returns: | |
| output_path: Path to processed video file | |
| """ | |
| try: | |
| # Open video file | |
| cap = cv2.VideoCapture(input_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Could not open video file {input_path}") | |
| # Get video properties | |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| original_fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Set output parameters | |
| fps = original_fps if original_fps > 0 else output_fps | |
| if output_path is None: | |
| # Create results directory if it doesn't exist | |
| results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "results") | |
| os.makedirs(results_dir, exist_ok=True) | |
| output_path = os.path.join(results_dir, f"processed_{os.path.basename(input_path)}") | |
| # Create video writer | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps//frame_skip, (frame_width, frame_height)) | |
| # Display processing information | |
| print(f"Processing video: {os.path.basename(input_path)}") | |
| print(f"Original: {frame_width}x{frame_height} @ {original_fps:.1f}fps") | |
| print(f"Processing: 1 every {frame_skip} frames") | |
| print(f"Output: {fps//frame_skip:.1f}fps | Estimated time: {total_frames/(fps*frame_skip):.1f}s") | |
| # Process video frames | |
| frame_count = 0 | |
| processed_frames = 0 | |
| start_time = time.time() | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Skip frames according to frame_skip | |
| if frame_count % frame_skip != 0: | |
| frame_count += 1 | |
| continue | |
| try: | |
| # Process frame | |
| result_rgb, _, _ = detect_and_blur(frame, model) | |
| result_bgr = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR) | |
| out.write(result_bgr) | |
| processed_frames += 1 | |
| # Print progress periodically | |
| if time.time() - start_time >= 5: | |
| elapsed = time.time() - start_time | |
| fps = processed_frames / elapsed | |
| print(f"Progress: {frame_count}/{total_frames} | " | |
| f"Processed: {processed_frames} | " | |
| f"Current FPS: {fps:.1f}") | |
| start_time = time.time() | |
| processed_frames = 0 | |
| except Exception as e: | |
| print(f"Error processing frame {frame_count}: {e}") | |
| frame_count += 1 | |
| # Clean up | |
| cap.release() | |
| out.release() | |
| print(f"\nVideo processing complete! Saved to {output_path}") | |
| return output_path | |
| except Exception as e: | |
| print(f"Video processing failed: {e}") | |
| return None | |
| def process_image(image_path, output_path=None, model=None, visualize=False): | |
| """Process single image with optional visualization | |
| Args: | |
| image_path: Path to input image or numpy array | |
| output_path: Path to save processed image (if None, auto-generated) | |
| model: YOLO model instance | |
| visualize: Whether to create visualization with original, detections, and result | |
| Returns: | |
| result_path: Path to processed image file | |
| or | |
| result_rgb: Processed image as numpy array (if output_path is None) | |
| """ | |
| try: | |
| # Process image | |
| result_rgb, detections, boxes = detect_and_blur(image_path, model) | |
| # Handle input as numpy array | |
| if not isinstance(image_path, str): | |
| if output_path is None: | |
| return result_rgb | |
| image_filename = "processed_image.jpg" | |
| else: | |
| image_filename = os.path.basename(image_path) | |
| # Create visualization if requested | |
| if visualize: | |
| # Load original image | |
| if isinstance(image_path, str): | |
| original = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB) | |
| else: | |
| original = image_path | |
| # Create image with detection boxes | |
| detection_img = original.copy() | |
| # Draw face boxes | |
| for box in boxes['faces']: | |
| x1, y1, x2, y2 = box | |
| cv2.rectangle(detection_img, (x1, y1), (x2, y2), (255, 0, 0), 2) | |
| cv2.putText(detection_img, "Face", (x1, y1-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) | |
| # Draw plate/text boxes | |
| for box in boxes['plates']: | |
| x1, y1, x2, y2 = box | |
| cv2.rectangle(detection_img, (x1, y1), (x2, y2), (0, 0, 255), 2) | |
| cv2.putText(detection_img, "Plate/Text", (x1, y1-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) | |
| # Create comparison visualization (only for local debugging) | |
| fig, axes = plt.subplots(1, 3, figsize=(20, 7)) | |
| titles = ["Original", "Detections", "Blurred Result"] | |
| images = [original, detection_img, result_rgb] | |
| for ax, title, img in zip(axes, titles, images): | |
| ax.imshow(img) | |
| ax.set_title(title) | |
| ax.axis("off") | |
| plt.tight_layout() | |
| plt.show() | |
| # Save result if output path provided | |
| if output_path is not None: | |
| # Save the processed image | |
| cv2.imwrite(output_path, cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)) | |
| print(f"Saved result to {output_path}") | |
| else: | |
| # Auto-generate output path if not provided | |
| results_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "results") | |
| os.makedirs(results_dir, exist_ok=True) | |
| result_path = os.path.join(results_dir, f"processed_{image_filename}") | |
| cv2.imwrite(result_path, cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)) | |
| print(f"Saved result to {result_path}") | |
| output_path = result_path | |
| print(f"Detections: {detections['faces']} faces, {detections['plates']} plates/text regions") | |
| return output_path if isinstance(image_path, str) else result_rgb | |
| except Exception as e: | |
| print(f"Image processing error: {e}") | |
| return None | |
| def process_pil_image(pil_image, model=None): | |
| """Process PIL Image for Gradio interface | |
| Args: | |
| pil_image: PIL Image | |
| model: YOLO model instance | |
| Returns: | |
| result_pil: Processed PIL Image | |
| detections: Dict with counts of detected objects | |
| """ | |
| try: | |
| # Convert PIL to numpy array | |
| img_array = np.array(pil_image) | |
| # Process the image | |
| result_rgb, detections, _ = detect_and_blur(img_array, model) | |
| # Convert back to PIL if needed | |
| result_pil = Image.fromarray(result_rgb) | |
| return result_pil, detections | |
| except Exception as e: | |
| print(f"PIL image processing error: {e}") | |
| return pil_image, {'faces': 0, 'plates': 0} |