|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from typing import List, Dict |
|
|
from PIL import Image |
|
|
import cv2 |
|
|
import onnxruntime as ort |
|
|
from utils.helper import BASE_DIR |
|
|
from pathlib import Path |
|
|
|
|
|
def denormalize_angles(normalized_angles): |
|
|
""" |
|
|
Convierte ángulos normalizados [-1,1] a grados [-180,180] |
|
|
""" |
|
|
return (normalized_angles + 1) / 2 * (180 - (-180)) + (-180) |
|
|
|
|
|
def preprocess_image_exactly_like_pytorch(image_input): |
|
|
""" |
|
|
Preprocesa una imagen de OpenCV (como adjusted_edges) |
|
|
para usarla con modelos ONNX. |
|
|
|
|
|
Args: |
|
|
image_input: Array NumPy de OpenCV (imagen de bordes, binaria, etc.) |
|
|
|
|
|
Returns: |
|
|
Array NumPy listo para inferencia con ONNX |
|
|
""" |
|
|
|
|
|
if image_input is None: |
|
|
raise ValueError("Received None as image input") |
|
|
|
|
|
|
|
|
if not isinstance(image_input, np.ndarray): |
|
|
raise TypeError(f"Expected NumPy array, got {type(image_input)}") |
|
|
|
|
|
|
|
|
if len(image_input.shape) < 2: |
|
|
raise ValueError(f"Invalid image shape: {image_input.shape}") |
|
|
|
|
|
|
|
|
img_copy = image_input.copy() |
|
|
|
|
|
|
|
|
|
|
|
if img_copy.dtype != np.uint8: |
|
|
if np.max(img_copy) <= 1.0: |
|
|
|
|
|
img_copy = (img_copy * 255).astype(np.uint8) |
|
|
else: |
|
|
|
|
|
img_copy = img_copy.astype(np.uint8) |
|
|
|
|
|
|
|
|
|
|
|
if np.mean(img_copy) < 10 and np.max(img_copy) > 0: |
|
|
|
|
|
img_copy = cv2.normalize(img_copy, None, 0, 255, cv2.NORM_MINMAX) |
|
|
|
|
|
|
|
|
if len(img_copy.shape) == 3: |
|
|
if img_copy.shape[2] == 3: |
|
|
|
|
|
img_copy = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY) |
|
|
else: |
|
|
|
|
|
img_copy = img_copy[:, :, 0] |
|
|
|
|
|
try: |
|
|
|
|
|
img_pil = Image.fromarray(img_copy) |
|
|
|
|
|
|
|
|
img_resized = img_pil.resize((224, 224), Image.BILINEAR) |
|
|
|
|
|
|
|
|
img_np = np.array(img_resized, dtype=np.float32) |
|
|
|
|
|
|
|
|
img_np = img_np / 255.0 |
|
|
|
|
|
|
|
|
img_np = (img_np - 0.5) / 0.5 |
|
|
|
|
|
|
|
|
img_np = np.expand_dims(img_np, axis=0) |
|
|
img_np = np.expand_dims(img_np, axis=0) |
|
|
|
|
|
return img_np |
|
|
except Exception as e: |
|
|
print(f"Error processing image: {e}") |
|
|
print(f"Image shape: {image_input.shape}, dtype: {image_input.dtype}") |
|
|
print(f"Min value: {np.min(image_input)}, Max value: {np.max(image_input)}") |
|
|
raise |
|
|
|
|
|
def correct_outlier_angles(df, window_size=5, std_threshold=3.0, max_diff_threshold=80.0): |
|
|
|
|
|
angles = df['steering_angle'].values |
|
|
corrected_angles = angles.copy() |
|
|
|
|
|
for i in range(len(angles)): |
|
|
if i < window_size // 2 or i >= len(angles) - window_size // 2: |
|
|
continue |
|
|
|
|
|
|
|
|
start_idx = max(0, i - window_size // 2) |
|
|
end_idx = min(len(angles), i + window_size // 2 + 1) |
|
|
window = angles[start_idx:end_idx] |
|
|
|
|
|
|
|
|
curr_angle = angles[i] |
|
|
local_mean = np.mean(window) |
|
|
local_std = np.std(window) if len(window) > 1 else 0 |
|
|
|
|
|
|
|
|
def angular_distance(a, b): |
|
|
diff = abs(a - b) |
|
|
return min(diff, 360 - diff) if diff > 180 else diff |
|
|
|
|
|
diff_from_mean = angular_distance(curr_angle, local_mean) |
|
|
|
|
|
|
|
|
is_outlier = (diff_from_mean > std_threshold * local_std) or (diff_from_mean > max_diff_threshold) |
|
|
|
|
|
if is_outlier: |
|
|
print(i, curr_angle, local_mean, local_std, diff_from_mean) |
|
|
|
|
|
corrected_window = np.delete(window, i - start_idx) |
|
|
if len(corrected_window) > 0: |
|
|
corrected_mean = np.mean(corrected_window) |
|
|
|
|
|
diff_to_corrected = angular_distance(curr_angle, corrected_mean) |
|
|
if diff_to_corrected > 180: |
|
|
corrected_angles[i] = corrected_mean - 360 if corrected_mean > 0 else corrected_mean + 360 |
|
|
else: |
|
|
corrected_angles[i] = corrected_mean |
|
|
|
|
|
|
|
|
corrected_df = df.copy() |
|
|
corrected_df['steering_angle'] = corrected_angles |
|
|
return corrected_df |
|
|
|
|
|
class ModelHandler: |
|
|
def __init__(self): |
|
|
|
|
|
self.current_model = None |
|
|
self.current_model_name = None |
|
|
self.fps = None |
|
|
self.available_models = { |
|
|
"F1 Steering Angle Detection": Path(BASE_DIR) / "models" / "f1-steering-angle-model.onnx", |
|
|
"Track Position Analysis": "position_model", |
|
|
"Driver Behavior Analysis": "behavior_model" |
|
|
} |
|
|
|
|
|
def _load_model_if_needed(self, model_name: str): |
|
|
"""Load the model only if it's not already loaded or if it's different""" |
|
|
if self.current_model is None or self.current_model_name != model_name: |
|
|
print(f"Loading model: {model_name}") |
|
|
self.current_model = ort.InferenceSession(self.available_models[model_name]) |
|
|
self.current_model_name = model_name |
|
|
|
|
|
def process_frames(self, frames: List[np.ndarray], model_name: str) -> Dict: |
|
|
"""Process frames through selected model with efficient batch processing""" |
|
|
if not frames: |
|
|
return [] |
|
|
|
|
|
|
|
|
self._load_model_if_needed(model_name) |
|
|
|
|
|
|
|
|
input_name = self.current_model.get_inputs()[0].name |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
BATCH_SIZE = 16 |
|
|
index = 0 |
|
|
|
|
|
for batch_start in range(0, len(frames), BATCH_SIZE): |
|
|
|
|
|
batch_end = min(batch_start + BATCH_SIZE, len(frames)) |
|
|
current_batch = frames[batch_start:batch_end] |
|
|
batch_inputs = [] |
|
|
|
|
|
|
|
|
for frame in current_batch: |
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
index= index+1 |
|
|
processed_input = preprocess_image_exactly_like_pytorch(frame) |
|
|
batch_inputs.append(processed_input) |
|
|
except Exception as e: |
|
|
print(f"Error preprocessing frame: {e}") |
|
|
|
|
|
empty_tensor = np.zeros((1, 1, 224, 224), dtype=np.float32) |
|
|
batch_inputs.append(empty_tensor) |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
batched_input = np.vstack(batch_inputs) |
|
|
|
|
|
|
|
|
ort_inputs = {input_name: batched_input} |
|
|
ort_outputs = self.current_model.run(None, ort_inputs) |
|
|
|
|
|
|
|
|
for i in range(len(current_batch)): |
|
|
frame_idx = batch_start + i +1 |
|
|
predicted_angle_normalized = ort_outputs[0][i][0] |
|
|
angle = denormalize_angles(predicted_angle_normalized) |
|
|
confidence = np.random.uniform(0.7, 0.99) |
|
|
|
|
|
results.append({ |
|
|
'frame_number': frame_idx, |
|
|
'steering_angle': angle, |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in batch processing: {e}") |
|
|
|
|
|
for i, frame in enumerate(current_batch): |
|
|
frame_idx = batch_start + i +1 |
|
|
try: |
|
|
input_data = preprocess_image_exactly_like_pytorch(frame) |
|
|
ort_inputs = {input_name: input_data} |
|
|
ort_outputs = self.current_model.run(None, ort_inputs) |
|
|
|
|
|
predicted_angle_normalized = ort_outputs[0][0][0] |
|
|
angle = denormalize_angles(predicted_angle_normalized) |
|
|
confidence = np.random.uniform(0.7, 0.99) |
|
|
|
|
|
results.append({ |
|
|
'frame_number': frame_idx, |
|
|
'steering_angle': angle |
|
|
}) |
|
|
except Exception as sub_e: |
|
|
print(f"Error processing individual frame {frame_idx}: {sub_e}") |
|
|
|
|
|
results.append({ |
|
|
'frame_number': frame_idx, |
|
|
'steering_angle': 0.0 |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
def export_results(self, results: Dict) -> pd.DataFrame: |
|
|
"""Convert results to pandas DataFrame for export""" |
|
|
df = pd.DataFrame(results) |
|
|
df['time'] = round(df['frame_number'] / self.fps,3) |
|
|
|
|
|
df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0) |
|
|
df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0) |
|
|
df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0) |
|
|
df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0) |
|
|
|
|
|
|
|
|
return df |
|
|
|