Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| from typing import List, Tuple | |
| import tempfile | |
| import time | |
| import functools | |
| from collections import defaultdict | |
| import onnxruntime as ort | |
| from utils.model_handler import ModelHandler | |
| from utils.helper import ( | |
| preprocess_image_tensor, | |
| postprocess_outputs, | |
| recortar_imagen, | |
| recortar_imagen_again, | |
| calculate_black_pixels_percentage, | |
| adaptive_edge_detection, | |
| ) | |
| from collections import OrderedDict | |
| from concurrent.futures import ThreadPoolExecutor | |
| from pathlib import Path | |
| from utils.helper import BASE_DIR | |
| import os | |
| class Profiler: | |
| """Clase para trackear el tiempo de ejecución de las funciones""" | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(Profiler, cls).__new__(cls) | |
| cls._instance.function_times = defaultdict(list) | |
| cls._instance.call_counts = defaultdict(int) | |
| return cls._instance | |
| def track_time(self, func): | |
| def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| result = func(*args, **kwargs) | |
| end_time = time.time() | |
| elapsed = end_time - start_time | |
| self.function_times[func.__name__].append(elapsed) | |
| self.call_counts[func.__name__] += 1 | |
| return result | |
| return wrapper | |
| def print_stats(self): | |
| print("\n===== FUNCIÓN TIMING STATS =====") | |
| print(f"{'FUNCIÓN':<30} {'LLAMADAS':<10} {'TOTAL (s)':<15} {'PROMEDIO (s)':<15} {'% TIEMPO':<10}") | |
| total_time = sum(sum(times) for times in self.function_times.values()) | |
| # Ordenar por tiempo total (descendente) | |
| sorted_funcs = sorted( | |
| self.function_times.items(), | |
| key=lambda x: sum(x[1]), | |
| reverse=True | |
| ) | |
| for func_name, times in sorted_funcs: | |
| total = sum(times) | |
| avg = total / len(times) if times else 0 | |
| calls = self.call_counts[func_name] | |
| percent = (total / total_time * 100) if total_time > 0 else 0 | |
| print(f"{func_name:<30} {calls:<10} {total:<15.4f} {avg:<15.4f} {percent:<10.2f}%") | |
| print(f"\nTiempo total de procesamiento: {total_time:.4f} segundos") | |
| print("================================") | |
| def get_stats_dict(self): | |
| """Devuelve las estadísticas como un diccionario para mostrar en Streamlit""" | |
| stats = [] | |
| total_time = sum(sum(times) for times in self.function_times.values()) | |
| for func_name, times in self.function_times.items(): | |
| total = sum(times) | |
| avg = total / len(times) if times else 0 | |
| calls = self.call_counts[func_name] | |
| percent = (total / total_time * 100) if total_time > 0 else 0 | |
| stats.append({ | |
| 'función': func_name, | |
| 'llamadas': calls, | |
| 'tiempo_total': total, | |
| 'tiempo_promedio': avg, | |
| 'porcentaje': percent | |
| }) | |
| # Ordenar por porcentaje de tiempo | |
| stats.sort(key=lambda x: x['porcentaje'], reverse=True) | |
| return stats, total_time | |
| def reset(self): | |
| """Reiniciar las estadísticas""" | |
| self.function_times.clear() | |
| self.call_counts.clear() | |
| profiler = Profiler() | |
| class VideoProcessor: | |
| def __init__(self): | |
| self.cap = None | |
| self.total_frames = 0 | |
| self.fps = 0 | |
| self.target_fps = 10 | |
| self.driver_crop_type = "Verstappen 2025" # Default driver crop type | |
| self.load_crop_variables(self.driver_crop_type) | |
| #self.yolo_model = YOLO("models/best.pt") | |
| self.model = ort.InferenceSession(Path(BASE_DIR) / "models" / "best-224.onnx") | |
| self.input_shape = (224, 224) # Match imgsz=224 from your original code | |
| self.conf_thres = 0.5 # Confidence threshold | |
| self.iou_thres = 0.5 # IoU threshold for NMS | |
| self.frame_count = 0 | |
| self.mode = "Default" # Default to False, can be set later | |
| self.video_name = "no_name" | |
| self.frame_cache = OrderedDict() | |
| self.frame_cache_size = 50 # Reduced size to conserve memory | |
| self.last_position = -1 | |
| self.frames_list_end = {} | |
| self.frames_list_start = {} | |
| def clear_cache(self): | |
| """Clear the frame cache to free memory.""" | |
| self.frame_cache.clear() | |
| def load_crop_variables(self,driver_crop_type): | |
| """ | |
| Cargar variables de recorte según el tipo de conductor | |
| """ | |
| driver_config = { | |
| "Albon 2024": { | |
| "starty": 0.55, | |
| "axes": 0.39, | |
| "y_start": 0.53, | |
| "x_center": 0.59 | |
| }, | |
| "Albon 2025": { | |
| "starty": 0.67, | |
| "axes": 0.42, | |
| "y_start": 0.53, | |
| "x_center": 0.59 | |
| }, | |
| "Alonso 2024": { | |
| "starty": 0.5, | |
| "axes": 0.29, | |
| "y_start": 0.53, | |
| "x_center": 0.56 | |
| }, | |
| "Alonso 2025": { | |
| "starty": 0.8, | |
| "axes": 0.5, | |
| "y_start": 0.53, | |
| "x_center": 0.572 | |
| }, | |
| "Bortoleto 2025": { | |
| "starty": 0.6, | |
| "axes": 0.4, | |
| "y_start": 0.53, | |
| "x_center": 0.572 | |
| }, | |
| "bottas": { | |
| "starty": 0.67, | |
| "axes": 0.43, | |
| "y_start": 0.53, | |
| "x_center": 0.574 | |
| }, | |
| "colapinto": { | |
| "starty": 0.52, | |
| "axes": 0.33, | |
| "y_start": 0.53, | |
| "x_center": 0.594 | |
| }, | |
| "Colapinto 2025": { | |
| "starty": 0.54, | |
| "axes": 0.4, | |
| "y_start": 0.53, | |
| "x_center": 0.58 | |
| }, | |
| "Gasly 2025": { | |
| "starty": 0.57, | |
| "axes": 0.35, | |
| "y_start": 0.53, | |
| "x_center": 0.58 | |
| }, | |
| "Hulk 2025": { | |
| "starty": 0.73, | |
| "axes": 0.3, | |
| "y_start": 0.53, | |
| "x_center": 0.548 | |
| }, | |
| "Lawson 2025": { | |
| "starty": 0.68, | |
| "axes": 0.42, | |
| "y_start": 0.53, | |
| "x_center": 0.555 | |
| }, | |
| "Ocon 2025": { | |
| "starty": 0.65, | |
| "axes": 0.42, | |
| "y_start": 0.53, | |
| "x_center": 0.585 | |
| }, | |
| "Sainz 2025": { | |
| "starty": 0.77, | |
| "axes": 0.42, | |
| "y_start": 0.53, | |
| "x_center": 0.57 | |
| }, | |
| "Stroll 2025": { | |
| "starty": 0.6, | |
| "axes": 0.45, | |
| "y_start": 0.53, | |
| "x_center": 0.565 | |
| }, | |
| "Bearman 2025": { | |
| "starty": 0.72, | |
| "axes": 0.45, | |
| "y_start": 0.53, | |
| "x_center": 0.58 | |
| }, | |
| "Hadjar 2025": { | |
| "starty": 0.7, | |
| "axes": 0.42, | |
| "y_start": 0.53, | |
| "x_center": 0.57 | |
| }, | |
| "hamilton-arabia": { | |
| "starty": 0.908, | |
| "axes": 0.4, | |
| "y_start": 0.53, | |
| "x_center": 0.554 | |
| }, | |
| "Hamilton 2025": { | |
| "starty": 0.59, | |
| "axes": 0.4, | |
| "y_start": 0.53, | |
| "x_center": 0.573 | |
| }, | |
| "hamilton-texas": { | |
| "starty": 0.7, | |
| "axes": 0.38, | |
| "y_start": 0.53, | |
| "x_center": 0.6 | |
| }, | |
| "leclerc-china": { | |
| "starty": 0.6, | |
| "axes": 0.36, | |
| "y_start": 0.53, | |
| "x_center": 0.58 | |
| }, | |
| "Leclerc 2025": { | |
| "starty": 0.65, | |
| "axes": 0.45, | |
| "y_start": 0.53, | |
| "x_center": 0.575 | |
| }, | |
| "magnussen": { | |
| "starty": 0.6, | |
| "axes": 0.34, | |
| "y_start": 0.53, | |
| "x_center": 0.58 | |
| }, | |
| "norris-arabia": { | |
| "starty": 0.7, | |
| "axes": 0.3, | |
| "y_start": 0.53, | |
| "x_center": 0.58 | |
| }, | |
| "norris-texas": { | |
| "starty": 0.7, | |
| "axes": 0.3, | |
| "y_start": 0.53, | |
| "x_center": 0.58 | |
| }, | |
| "Norris 2025": { | |
| "starty": 0.79, | |
| "axes": 0.6, | |
| "y_start": 0.53, | |
| "x_center": 0.571, | |
| "helmet_height_ratio": 0.5 | |
| }, | |
| "ocon": { | |
| "starty": 0.75, | |
| "axes": 0.35, | |
| "y_start": 0.53, | |
| "x_center": 0.555 | |
| }, | |
| "piastri-azerbaiya": { | |
| "starty": 0.65, | |
| "axes": 0.34, | |
| "y_start": 0.53, | |
| "x_center": 0.549 | |
| }, | |
| "piastri-singapure": { | |
| "starty": 0.65, | |
| "axes": 0.34, | |
| "y_start": 0.53, | |
| "x_center": 0.549 | |
| }, | |
| 'Piastri 2025': { | |
| "starty": 0.93, | |
| "axes": 0.59, | |
| "y_start": 0.53, | |
| "x_center": 0.573, | |
| "helmet_height_ratio": 0.3 | |
| }, | |
| "russel-singapure": { | |
| "starty": 0.63, | |
| "axes": 0.44, | |
| "y_start": 0.53, | |
| "x_center": 0.56 | |
| }, | |
| "Russell 2025": { | |
| "starty": 0.95, | |
| "axes": 0.65, | |
| "y_start": 0.53, | |
| "x_center": 0.574, | |
| "helmet_height_ratio": 0.35 | |
| }, | |
| "sainz": { | |
| "starty": 0.57, | |
| "axes": 0.32, | |
| "y_start": 0.53, | |
| "x_center": 0.59 | |
| }, | |
| "Tsunoda 2025":{ | |
| "starty": 0.92, | |
| "axes": 0.55, | |
| "y_start": 0.53, | |
| "x_center": 0.58, | |
| "helmet_height_ratio": 0.25 | |
| }, | |
| "verstappen_china": { | |
| "starty": 0.7, | |
| "axes": 0.42, | |
| "y_start": 0.53, | |
| "x_center": 0.57 | |
| }, | |
| "Verstappen 2025": { | |
| "starty": 0.7, | |
| "axes": 0.42, | |
| "y_start": 0.53, | |
| "x_center": 0.57, | |
| "helmet_height_ratio": 0.4 | |
| }, | |
| "vertappen": { | |
| "starty": 0.7, | |
| "axes": 0.42, | |
| "y_start": 0.53, | |
| "x_center": 0.57 | |
| }, | |
| "verstappen-arabia": { | |
| "starty": 0.95, | |
| "axes": 0.4, | |
| "y_start": 0.53, | |
| "x_center": 0.565 | |
| }, | |
| "yuki": { | |
| "starty": 0.64, | |
| "axes": 0.37, | |
| "y_start": 0.53, | |
| "x_center": 0.585 | |
| }, | |
| "Antonelli 2025": | |
| { | |
| "starty": 0.97, | |
| "axes": 0.65, | |
| "y_start": 0.53, | |
| "x_center": 0.595, | |
| "helmet_height_ratio": 0.5 | |
| }} | |
| #print(f"Driver crop type: {self.driver_crop_type}") | |
| self.driver_crop_type = driver_crop_type | |
| self.starty = driver_config[self.driver_crop_type]["starty"] | |
| self.axes = driver_config[self.driver_crop_type]["axes"] | |
| self.y_start = driver_config[self.driver_crop_type]["y_start"] | |
| self.x_center = driver_config[self.driver_crop_type]["x_center"] | |
| self.helmet_height_ratio = driver_config[self.driver_crop_type]["helmet_height_ratio"] if "helmet_height_ratio" in driver_config[self.driver_crop_type] else 0.5 | |
| def clean_up(self): | |
| """Release video capture and clear cache.""" | |
| self.clear_cache() | |
| self.frames_list_start = {} | |
| self.frames_list_end = {} | |
| self.video_path = None | |
| self.frame_count = 0 | |
| print("VideoProcessor cleaned up.") | |
| def load_video(self, video_file) -> bool: | |
| """Load video file and get basic information""" | |
| tfile = tempfile.NamedTemporaryFile(delete=True) | |
| tfile.write(video_file.read()) | |
| # Guardar ruta para posibles reinicios | |
| self.video_path = tfile.name | |
| # Obtener solo el nombre sin extensión (opcional) | |
| self.video_name = os.path.splitext(os.path.basename(self.video_path))[0] | |
| self.cap = cv2.VideoCapture(tfile.name) | |
| self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) | |
| #print(f"FPS: {self.fps}") | |
| #print(f"Total frames: {self.total_frames}") | |
| #self.frames_list_start = [None] * self.total_frames # prealocamos | |
| #self.frames_list_end = [None] * self.total_frames # prealocamos | |
| self.start_frame_min = 0 | |
| self.start_frame_max = min(100,int(self.total_frames * 0.1)) # 10% del total | |
| if self.total_frames > 500: | |
| self.end_frame_min = int(self.total_frames-100) # 90% del total | |
| else: | |
| self.end_frame_min = int(self.total_frames * 0.9) | |
| self.end_frame_max = self.total_frames - 1 | |
| i = 0 | |
| #print(len(self.frames_list_start), len(self.frames_list_end)) | |
| if self.frames_list_end == {}: | |
| current_frame_num = self.start_frame_min | |
| cap_thread = cv2.VideoCapture(self.video_path) | |
| cap_thread.set(cv2.CAP_PROP_POS_FRAMES, float(self.start_frame_min)) | |
| while current_frame_num <= self.start_frame_max: | |
| ret, frame = cap_thread.read() | |
| if not ret: | |
| # print(f"Advertencia: No se pudo leer el frame {current_frame_num} de {video_path}.") | |
| break | |
| processed_frame = cv2.cvtColor(cv2.resize(frame, (256, 144), interpolation=cv2.INTER_LINEAR), cv2.COLOR_BGR2GRAY) | |
| self.frames_list_start[current_frame_num] = processed_frame | |
| current_frame_num += 1 | |
| cap_thread.release() | |
| current_frame_num = self.end_frame_min | |
| cap_thread = cv2.VideoCapture(self.video_path) | |
| cap_thread.set(cv2.CAP_PROP_POS_FRAMES, float(self.end_frame_min)) | |
| while current_frame_num <= self.end_frame_max: | |
| ret, frame = cap_thread.read() | |
| if not ret: | |
| # print(f"Advertencia: No se pudo leer el frame {current_frame_num} de {video_path}.") | |
| break | |
| processed_frame = cv2.cvtColor(cv2.resize(frame, (256, 144), interpolation=cv2.INTER_LINEAR), cv2.COLOR_BGR2GRAY) | |
| self.frames_list_end[current_frame_num] = processed_frame | |
| current_frame_num += 1 | |
| cap_thread.release() | |
| '''while True: | |
| ret, frame = self.cap.read() | |
| if i >= start_frame_min and i <= start_frame_max: | |
| self.frames_list_start[i] = cv2.cvtColor(cv2.resize(frame, (426,240), interpolation=cv2.INTER_LINEAR),cv2.COLOR_BGR2GRAY) | |
| if i >= end_frame_min and i <= end_frame_max: | |
| self.frames_list_end[i] = cv2.cvtColor(cv2.resize(frame, (426,240), interpolation=cv2.INTER_LINEAR),cv2.COLOR_BGR2GRAY) | |
| if not ret or i >= self.total_frames: | |
| break | |
| i += 1''' | |
| self.cap = cv2.VideoCapture(tfile.name) | |
| return True | |
| def load_video2(self, video_file, output_resolution=(854, 480)) -> bool: | |
| """ | |
| Load video file, resize to 480p, and get basic information. | |
| Args: | |
| video_file: Input video file object | |
| output_resolution: Tuple of (width, height) for resizing (default: 854x480 for 480p) | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| try: | |
| # Create temporary file to store the input video | |
| tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| tfile.write(video_file.read()) | |
| tfile.close() # Close the file to allow VideoCapture to access it | |
| # Store the temporary file path | |
| self.video_path = tfile.name | |
| # Load the video | |
| self.cap = cv2.VideoCapture(tfile.name) | |
| if not self.cap.isOpened(): | |
| print("Error: Could not open video file.") | |
| return False | |
| # Get original video properties | |
| self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) | |
| #print(f"FPS: {self.fps}") | |
| #print(f"Total frames: {self.total_frames}") | |
| # Prepare for resizing and saving to a new temporary file | |
| output_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for MP4 | |
| out = cv2.VideoWriter(output_path, fourcc, self.fps, output_resolution) | |
| # Process each frame | |
| while self.cap.isOpened(): | |
| ret, frame = self.cap.read() | |
| if not ret: | |
| break | |
| # Resize frame to 480p | |
| resized_frame = cv2.resize(frame, output_resolution, interpolation=cv2.INTER_AREA) | |
| out.write(resized_frame) | |
| # Release resources | |
| self.cap.release() | |
| out.release() | |
| # Update video path to the resized video | |
| self.video_path = output_path | |
| self.cap = cv2.VideoCapture(self.video_path) | |
| if not self.cap.isOpened(): | |
| print("Error: Could not open resized video.") | |
| return False | |
| print(f"Video resized to {output_resolution} and saved to {output_path}") | |
| return True | |
| except Exception as e: | |
| print(f"Error processing video: {str(e)}") | |
| return False | |
| def load_video1(self, video_file) -> bool: | |
| """Load video file and get basic information""" | |
| with tempfile.TemporaryFile(suffix='.mp4') as tfile: | |
| tfile.write(video_file.read()) | |
| tfile.seek(0) | |
| self.video_path = tfile.name # Store for reference | |
| self.cap = cv2.VideoCapture(tfile.name) | |
| if not self.cap.isOpened(): | |
| return False | |
| self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) | |
| return True | |
| def get_frame1(self, frame_number: int) -> np.ndarray: | |
| """ | |
| Obtiene un frame específico del video con optimizaciones de rendimiento | |
| Args: | |
| frame_number: Número del frame a obtener | |
| Returns: | |
| Frame como array NumPy (formato RGB) o None si no está disponible | |
| """ | |
| if self.cap is None: | |
| return None | |
| # 1. Inicializar atributos de seguimiento si no existen | |
| if not hasattr(self, 'frame_cache'): | |
| # Usamos un diccionario limitado para caché de frames frecuentes | |
| self.frame_cache = {} | |
| self.frame_cache_size = 100 # Ajustar según memoria disponible | |
| self.last_position = -1 # Para seguimiento de posición | |
| # 2. Consultar caché primero (mejora extrema para frames accedidos repetidamente) | |
| if frame_number in self.frame_cache: | |
| return self.frame_cache[frame_number] | |
| # 3. Optimización para acceso secuencial (evita seeks innecesarios) | |
| if hasattr(self, 'last_position') and frame_number == self.last_position + 1: | |
| # El frame solicitado es el siguiente al último leído | |
| ret, frame = self.cap.read() | |
| if ret: | |
| self.last_position = frame_number | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| #rgb_frame = frame | |
| # Añadir al caché | |
| self.frame_cache[frame_number] = rgb_frame | |
| # Mantener tamaño del caché | |
| if len(self.frame_cache) > self.frame_cache_size: | |
| # Eliminar el frame más antiguo (menor número) | |
| oldest = min(self.frame_cache.keys()) | |
| del self.frame_cache[oldest] | |
| return rgb_frame | |
| # Si falla la lectura, continuar con método directo | |
| # 4. Acceso directo con mecanismo de reintento | |
| for attempt in range(3): # Intentar hasta 3 veces si falla | |
| self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| ret, frame = self.cap.read() | |
| if ret: | |
| # Actualizar last_position para futuras optimizaciones secuenciales | |
| self.last_position = frame_number | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Añadir al caché | |
| self.frame_cache[frame_number] = rgb_frame | |
| # Mantener tamaño del caché | |
| if len(self.frame_cache) > self.frame_cache_size: | |
| # Eliminar el frame más antiguo (menor número) | |
| oldest = min(self.frame_cache.keys()) | |
| del self.frame_cache[oldest] | |
| return rgb_frame | |
| if attempt < 2: # No reintentar en el último intento | |
| # Restaurar el objeto cap en caso de error | |
| # Esto ayuda con formatos de video problemáticos | |
| if hasattr(self, 'video_path') and self.video_path: | |
| self.cap.release() | |
| self.cap = cv2.VideoCapture(self.video_path) | |
| # Si llegamos aquí, todos los intentos fallaron | |
| return None | |
| def get_frame(self, frame_number: int) -> np.ndarray: | |
| if self.cap is None: | |
| return None | |
| '''if frame_number in self.frame_cache: | |
| return self.frame_cache[frame_number]''' | |
| if hasattr(self, 'last_position') and frame_number == self.last_position + 1: | |
| ret, frame = self.cap.read() | |
| if ret: | |
| self.last_position = frame_number | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| self.frame_cache[frame_number] = rgb_frame | |
| if len(self.frame_cache) > self.frame_cache_size: | |
| self.frame_cache.popitem(last=False) # Remove oldest item | |
| return cv2.resize(rgb_frame, (849, 477)) | |
| for attempt in range(3): | |
| self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| ret, frame = self.cap.read() | |
| if ret: | |
| self.last_position = frame_number | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| self.frame_cache[frame_number] = rgb_frame | |
| if len(self.frame_cache) > self.frame_cache_size: | |
| self.frame_cache.popitem(last=False) | |
| return cv2.resize(rgb_frame, (854,480), interpolation=cv2.INTER_LINEAR) | |
| if attempt < 2 and hasattr(self, 'video_path') and self.video_path: | |
| self.cap.release() | |
| self.cap = cv2.VideoCapture(self.video_path) | |
| print(f"Error reading frame {frame_number}, retrying...") | |
| return None | |
| def get_frame_example(self, frame_number: int) -> np.ndarray: | |
| """ | |
| Obtiene un frame específico del video con optimizaciones de rendimiento | |
| Args: | |
| frame_number: Número del frame a obtener | |
| Returns: | |
| Frame como array NumPy (formato RGB) o None si no está disponible | |
| """ | |
| if self.cap is None: | |
| return None | |
| print(f"Frame number: {frame_number}") | |
| # 1. Inicializar atributos de seguimiento si no existen | |
| if not hasattr(self, 'frame_cache'): | |
| # Usamos un diccionario limitado para caché de frames frecuentes | |
| self.frame_cache = {} | |
| self.frame_cache_size = 30 # Ajustar según memoria disponible | |
| self.last_position = -1 # Para seguimiento de posición | |
| # 2. Consultar caché primero (mejora extrema para frames accedidos repetidamente) | |
| if frame_number in self.frame_cache: | |
| return self.frame_cache[frame_number] | |
| # 4. Acceso directo con mecanismo de reintento | |
| for attempt in range(3): # Intentar hasta 3 veces si falla | |
| try: | |
| self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| ret, frame = self.cap.read() | |
| if ret: | |
| # Actualizar last_position para futuras optimizaciones secuenciales | |
| self.last_position = frame_number | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Añadir al caché | |
| self.frame_cache[frame_number] = rgb_frame | |
| # Mantener tamaño del caché | |
| if len(self.frame_cache) > self.frame_cache_size: | |
| # Eliminar el frame más antiguo (menor número) | |
| oldest = min(self.frame_cache.keys()) | |
| del self.frame_cache[oldest] | |
| return rgb_frame | |
| except: | |
| pass | |
| if attempt < 2: # No reintentar en el último intento | |
| # Restaurar el objeto cap en caso de error | |
| # Esto ayuda con formatos de video problemáticos | |
| if hasattr(self, 'video_path') and self.video_path: | |
| self.cap.release() | |
| self.cap = cv2.VideoCapture(self.video_path) | |
| # Si llegamos aquí, todos los intentos fallaron | |
| return None | |
| def mask_helmet_yolo(self, color_image: np.ndarray, helmet_height_ratio: float = 0.3, prev_mask: np.ndarray = None) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Usa YOLOv8 para segmentar el casco y lo pinta de verde. | |
| Si se proporciona una máscara previa, la reutiliza. | |
| Args: | |
| color_image: Imagen en color (BGR). | |
| helmet_height_ratio: Proporción de la imagen a considerar como región del casco (parte inferior). | |
| prev_mask: Máscara previa para reutilizar (opcional). | |
| Returns: | |
| Tuple: (Imagen con la región del casco pintada de verde, Máscara generada o reutilizada). | |
| """ | |
| # Copia de la imagen | |
| result_1 = color_image.copy() | |
| height, width = color_image.shape[:2] | |
| # Si hay una máscara previa, reutilizarla | |
| if prev_mask is not None: | |
| mask_final = prev_mask | |
| else: | |
| # Convertir la imagen a RGB (YOLOv8 espera imágenes en RGB) | |
| image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) | |
| # Realizar la predicción con YOLOv8 | |
| results = self.yolo_model(image_rgb, conf=0.2, iou=0.5,imgsz=224) # Ajusta conf e iou según necesidad | |
| # Inicializar máscara vacía | |
| mask_final = np.zeros((height, width), dtype=np.uint8) | |
| # Procesar los resultados de segmentación | |
| if results[0].masks is not None: | |
| for result in results: | |
| masks = result.masks.data.cpu().numpy() # Máscaras de segmentación | |
| boxes = result.boxes.xyxy.cpu().numpy() # Cajas delimitadoras | |
| classes = result.boxes.cls.cpu().numpy() # Clases predichas | |
| # Filtrar para la clase del casco (asumiendo que es la clase 0 o 'helmet') | |
| # Si usas un modelo pre-entrenado en COCO, la clase 'helmet' no existe, usa 'person' (clase 0) y ROI | |
| for i, cls in enumerate(classes): | |
| # Ajusta según la clase de tu modelo. Ejemplo: clase 0 para 'helmet' en modelo personalizado | |
| if int(cls) == 0: # Cambia según el índice de clase de tu modelo | |
| # Obtener la máscara correspondiente | |
| '''mask = masks[i] | |
| # Redimensionar la máscara al tamaño de la imagen | |
| mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST) | |
| mask = (mask > 0).astype(np.uint8) * 255 # Convertir a binario (0 o 255) | |
| # Opcional: Filtrar usando la ROI inferior para enfocarse en el casco | |
| roi_height = int(height * helmet_height_ratio) | |
| roi_mask = np.zeros((height, width), dtype=np.uint8) | |
| roi_mask[height - roi_height:, :] = 255 # Parte inferior | |
| mask = cv2.bitwise_and(mask, roi_mask) | |
| # Combinar máscaras si hay múltiples detecciones | |
| mask_final = cv2.bitwise_or(mask_final, mask)''' | |
| mask = masks[i] | |
| mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST) | |
| mask = (mask > 0).astype(np.uint8) * 255 | |
| mask_final = cv2.bitwise_or(mask_final, mask) | |
| # Refinar la máscara con operaciones morfológicas | |
| kernel = np.ones((5, 5), np.uint8) | |
| mask_final = cv2.erode(mask_final, kernel, iterations=1) # Eliminar ruido | |
| mask_final = cv2.dilate(mask_final, kernel, iterations=3) # Expandir para cubrir el casco | |
| else: | |
| # Si no se detecta casco, devolver la imagen sin cambios y máscara vacía | |
| print("No helmet detected in this frame.") | |
| return result_1, mask_final | |
| # Crear una imagen verde del mismo tamaño que la imagen original | |
| green_color = np.zeros_like(color_image) # Crear una imagen vacía | |
| green_color[:, :] = [125, 125, 125] # Color verde en BGR (0, 255, 0) | |
| # Aplicar la máscara para pintar solo la región del casco | |
| masked_green = cv2.bitwise_and(green_color, green_color, mask=mask_final) | |
| # Crear máscara invertida para conservar el resto de la imagen | |
| mask_inv = cv2.bitwise_not(mask_final) | |
| # Combinar la región verde con el resto de la imagen original | |
| result_original = cv2.bitwise_and(result_1, result_1, mask=mask_inv) | |
| result = cv2.add(masked_green, result_original) | |
| return result, mask_final | |
| def mask_helmet(self, img): | |
| """Mask the helmet region using SAM and paint it green.""" | |
| print("Processing frame...") | |
| img = cv2.resize(img, (224, 224), interpolation=cv2.INTER_LINEAR) | |
| height, width = img.shape[:2] | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| outputs = self.model.run(None, {"images":preprocess_image_tensor(img)}) | |
| flag,result = postprocess_outputs(outputs, height, width) | |
| # Procesar los resultados de segmentación | |
| if flag is True: | |
| result_image = img.copy() | |
| overlay = np.zeros_like(img, dtype=np.uint8) | |
| color = (125, 125, 125, 255) # RGBA color for the helmet | |
| # Extract RGB and alpha from color | |
| fill_color = color[:3] # (R, G, B) = (125, 125, 125) | |
| alpha = color[3] / 255.0 # Normalize alpha to [0, 1] | |
| for obj in result: | |
| x1, y1, x2, y2, _, _, _, polygon = obj | |
| # Translate polygon coordinates relative to (x1, y1) | |
| polygon = [(round(x1 + point[0]), round(y1 + point[1])) for point in polygon] | |
| # Convert polygon to format required by cv2.fillPoly | |
| pts = np.array(polygon, dtype=np.int32).reshape((-1, 1, 2)) | |
| # Draw filled polygon on overlay | |
| cv2.fillPoly(overlay, [pts], fill_color) | |
| # Create alpha mask for blending | |
| mask = np.any(overlay != 0, axis=2).astype(np.float32) | |
| alpha_mask = mask * alpha | |
| for c in range(3): # For each color channel | |
| result_image[:, :, c] = (1 - alpha_mask) * result_image[:, :, c] + alpha_mask * overlay[:, :, c] | |
| return result_image | |
| else: | |
| # Si no se detecta casco, devolver la imagen sin cambios y máscara vacía | |
| print("No helmet detected in this frame.") | |
| return img | |
| def extract_frames1(self, start_frame: int, end_frame: int, fps_target: int = 10) -> List[np.ndarray]: | |
| """ | |
| Extract frames con procesamiento vectorizado para mayor rendimiento, actualizando la máscara cada 10 frames. | |
| """ | |
| frames, crude_frames = [], [] | |
| # Calculate the total number of frames in the selection | |
| total_frames_selection = end_frame - start_frame + 1 | |
| # Calculate the duration of the selection in seconds | |
| selection_duration = total_frames_selection / self.fps | |
| # Calculate total frames to extract based on target fps | |
| frames_to_extract = int(selection_duration * fps_target) | |
| frames_to_extract = max(1, frames_to_extract) | |
| # Vectorizar cálculo de índices | |
| if frames_to_extract < total_frames_selection: | |
| frame_indices = np.linspace(start_frame, end_frame, frames_to_extract, dtype=int) | |
| else: | |
| frame_indices = np.arange(start_frame, end_frame + 1) | |
| counter = 0 | |
| # Procesamiento por lotes para reducir sobrecarga de función | |
| BATCH_SIZE =150 | |
| last_mask = None # Almacenar la última máscara generada | |
| for i in range(0, len(frame_indices), BATCH_SIZE): | |
| batch_indices = frame_indices[i:i+BATCH_SIZE] | |
| batch_frames = [] | |
| # Extract the frames in the current batch | |
| for frame_num in batch_indices: | |
| frame = self.get_frame(frame_num) | |
| if frame is not None: | |
| batch_frames.append((frame_num, frame)) | |
| # Process the batch of frames | |
| if batch_frames: | |
| for idx, (frame_num, frame) in enumerate(batch_frames): | |
| cropped = self.crop_frame(frame) | |
| result = self.mask_helmet(cropped) | |
| clahe_image = self.apply_clahe(result) | |
| threshold_image = self.apply_treshold(clahe_image) | |
| frames.append(threshold_image) | |
| return frames, crude_frames | |
| def extract_frames(self, start_frame: int, end_frame: int, fps_target: int = 10) -> List[np.ndarray]: | |
| frames, crude_frames = [], [] | |
| total_frames_selection = end_frame - start_frame + 1 | |
| selection_duration = total_frames_selection / self.fps | |
| frames_to_extract = max(1, int(selection_duration * fps_target)) | |
| frame_indices = np.linspace(start_frame, end_frame, frames_to_extract, dtype=int) if frames_to_extract < total_frames_selection else np.arange(start_frame, end_frame + 1) | |
| BATCH_SIZE = 64 | |
| def process_frame(frame_data): | |
| frame_num, frame = frame_data | |
| if frame is None: | |
| return None | |
| cropped = self.crop_frame(frame) | |
| result = self.mask_helmet(cropped) | |
| clahe_image = self.apply_clahe(result) | |
| threshold_image = self.apply_treshold(clahe_image) | |
| return threshold_image | |
| for i in range(0, len(frame_indices), BATCH_SIZE): | |
| batch_indices = frame_indices[i:i+BATCH_SIZE] | |
| batch_frames = [(idx, self.get_frame(idx)) for idx in batch_indices] | |
| with ThreadPoolExecutor(max_workers=2) as executor: # Adjust max_workers based on CPU cores | |
| batch_results = list(executor.map(process_frame, [f for f in batch_frames if f[1] is not None])) | |
| frames.extend([r for r in batch_results if r is not None]) | |
| return frames, crude_frames | |
| def crop_frame(self,image): | |
| if image is None: | |
| print(f"Error loading") | |
| return None | |
| height, width, _ = image.shape | |
| # Use the bottom half of the image | |
| #y_start = int(height * 0.53) | |
| # 55% of the height | |
| y_start = int(height * self.y_start) # 55% of the height | |
| crop_height = height - y_start # height of bottom half | |
| square_size = crop_height # base crop height | |
| # Increase width by 30%: new_width equals 130% of square_size | |
| new_width = square_size | |
| # Shift the crop center 20% to the right. | |
| # Calculate the desired center position. | |
| #x_center = int(width * 0.57) | |
| x_center = int(width * self.x_center) | |
| x_start = max(0, x_center - new_width // 2) | |
| x_end = x_start + new_width | |
| # Adapt the crop if x_end exceeds the image width | |
| if x_end > width: | |
| x_end = width | |
| x_start = max(0, width - new_width) | |
| # Crop the image: bottom half in height and new_width in horizontal dimension | |
| cropped_image = image[y_start:y_start+crop_height, x_start:x_end] | |
| return cropped_image | |
| def crop_frame_example(self,image): | |
| if image is None: | |
| print(f"Error loading") | |
| return None | |
| height, width, _ = image.shape | |
| # Use the bottom half of the image | |
| #y_start = int(height * 0.53) | |
| # 55% of the height | |
| y_start = int(height * self.y_start) # 55% of the height | |
| crop_height = height - y_start # height of bottom half | |
| square_size = crop_height # base crop height | |
| # Increase width by 30%: new_width equals 130% of square_size | |
| new_width = square_size | |
| # Shift the crop center 20% to the right. | |
| # Calculate the desired center position. | |
| #x_center = int(width * 0.57) | |
| x_center = int(width * self.x_center) | |
| x_start = max(0, x_center - new_width // 2) | |
| x_end = x_start + new_width | |
| # Adapt the crop if x_end exceeds the image width | |
| if x_end > width: | |
| x_end = width | |
| x_start = max(0, width - new_width) | |
| # Crop the image: bottom half in height and new_width in horizontal dimension | |
| cropped_image = image[y_start:y_start+crop_height, x_start:x_end] | |
| cropped_image = recortar_imagen(cropped_image,self.starty, self.axes) | |
| cropped_image = recortar_imagen_again(cropped_image,self.starty, self.axes) | |
| #print(self.starty, self.axes, self.y_start, self.x_center) | |
| return cropped_image | |
| def apply_clahe(self, image): | |
| image = recortar_imagen(image,self.starty, self.axes) | |
| if self.mode == "Default": | |
| clahe_image = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(3, 3)).apply(cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)) | |
| elif self.mode == "Low ilumination": | |
| clahe_image = cv2.createCLAHE(clipLimit=7.0, tileGridSize=(3, 3)).apply(cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)) | |
| #clahe_image = cv2.equalizeHist(image) | |
| return clahe_image | |
| def apply_treshold(self, image): | |
| #try: | |
| # Process the image with adaptive edge detection (target 6% de bordes) | |
| '''_, edges, _, config = adaptive_edge_detection( | |
| image, | |
| min_edge_percentage=3, | |
| max_edge_percentage=6, | |
| target_percentage=5, | |
| max_attempts=5 | |
| )''' | |
| percentage = calculate_black_pixels_percentage(image) | |
| _, edges, _, config = adaptive_edge_detection( | |
| image, | |
| min_edge_percentage=percentage, | |
| max_edge_percentage=percentage, | |
| target_percentage=percentage, | |
| max_attempts=1, | |
| mode = self.mode | |
| ) | |
| # Save the edge image | |
| if edges is not None: | |
| edges = recortar_imagen_again(edges,self.starty, self.axes) | |
| return edges | |
| def __del__(self): | |
| if self.cap is not None: | |
| self.cap.release() | |
| self.clear_cache() # Ensure cache is cleared on object deletion | |