# ============================================================================== # InterFuser End-to-End Control Model # (Ready for Hugging Face Hub) # ============================================================================== import torch from torch import nn import torch.nn.functional as F from transformers import PreTrainedModel, PretrainedConfig from transformers.utils.generic import ModelOutput from functools import partial import math import copy from typing import Optional, Tuple, Union, List from torch import Tensor from dataclasses import dataclass import numpy as np from collections import deque # --- حاول استيراد timm، وإلا أبلغ المستخدم try: from timm.models.resnet import resnet50d, resnet26d, resnet18d except ImportError: raise ImportError("Please install timm to use this model: `pip install timm`") class PIDController: def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20): self._K_P = K_P self._K_I = K_I self._K_D = K_D self._window = deque([0 for _ in range(n)], maxlen=n) self._max = 0.0 self._min = 0.0 def step(self, error): self._window.append(error) self._max = max(self._max, abs(error)) self._min = -abs(self._max) if len(self._window) >= 2: integral = np.mean(self._window) derivative = self._window[-1] - self._window[-2] else: integral = 0.0 derivative = 0.0 return self._K_P * error + self._K_I * integral + self._K_D * derivative # ================== 1. الإعدادات ================== SAVE_VIDEO = True OUTPUT_FILENAME = 'simulation_output.mp4' FPS = 10 WAYPOINT_SCALE_FACTOR = 5.0 T1_FUTURE_TIME = 1.0 # ثانية في المستقبل T2_FUTURE_TIME = 2.0 # ثانيتين في المستقبل TRACKER_FREQUENCY = 10 MERGE_PERCENT = 0.4 # حساب الثوابت PIXELS_PER_METER = 8 MAX_DISTANCE = 32 IMG_SIZE = MAX_DISTANCE * PIXELS_PER_METER * 2 EGO_CAR_X = IMG_SIZE // 2 EGO_CAR_Y = IMG_SIZE - (4.0 * PIXELS_PER_METER) reweight_array = np.ones((20, 20, 7)) # يمكن تعديل هذه القيمة حسب الحاجة # المتغيرات العامة للحالة last_valid_waypoints = None last_valid_theta = 0.0 # ================== 2. وظائف المساعدة ================== def ensure_rgb(image): """تحويل الصورة إلى RGB إذا كانت grayscale.""" if len(image.shape) == 2 or image.shape[2] == 1: return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) return image def process_camera_image(tensor_image): """تحويل صورة الكاميرا من Tensor إلى NumPy Array.""" image_np = tensor_image.permute(1, 2, 0).cpu().numpy() image_np = (image_np * np.array([0.229, 0.224, 0.225])) + np.array([0.485, 0.456, 0.406]) image_np = np.clip(image_np, 0, 1) return (image_np * 255).astype(np.uint8)[:, :, ::-1] # BGR def convert_grid_to_xy(i, j): """تحويل الشبكة إلى إحداثيات x, y.""" return (j - 9.5) * 1.6, (19.5 - i) * 1.6 def add_rect(img, loc, ori, box, value, color): """ إضافة مستطيل إلى الخريطة. """ center_x = int(loc[0] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER) center_y = int(loc[1] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER) size_px = ( int(box[0] * PIXELS_PER_METER), int(box[1] * PIXELS_PER_METER) ) angle_deg = -np.degrees(math.atan2(ori[1], ori[0])) box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg)) box_points = np.int32(box_points) adjusted_color = [int(x * value) for x in color] cv2.fillConvexPoly(img, box_points, adjusted_color) return img def find_peak_box(data): """ اكتشاف القمم في البيانات وتصنيفها. """ det_data = np.zeros((22, 22, 7)) det_data[1:21, 1:21] = data detected_objects = [] for i in range(1, 21): for j in range(1, 21): if det_data[i, j, 0] > 0.6 and ( det_data[i, j, 0] > det_data[i, j - 1, 0] and det_data[i, j, 0] > det_data[i, j + 1, 0] and det_data[i, j, 0] > det_data[i - 1, j, 0] and det_data[i, j, 0] > det_data[i + 1, j, 0] ): length = det_data[i, j, 4] width = det_data[i, j, 5] confidence = det_data[i, j, 0] obj_type = 'unknown' if length > 4.0: obj_type = 'car' elif length / width > 1.5: obj_type = 'bike' else: obj_type = 'pedestrian' detected_objects.append({ 'coords': (i - 1, j - 1), 'type': obj_type, 'confidence': confidence, 'raw_data': det_data[i, j] }) return detected_objects def render(det_data, t=0): """ رسم كائنات الكشف على الخريطة BEV. """ CLASS_COLORS = {'car': (0, 0, 255), 'bike': (0, 255, 0), 'pedestrian': (255, 0, 0), 'unknown': (128, 128, 128)} det_weighted = det_data * reweight_array detected_objects = find_peak_box(det_weighted) counts = {cls: 0 for cls in CLASS_COLORS.keys()} [counts.update({obj['type']: counts.get(obj['type'], 0) + 1}) for obj in detected_objects] img = np.zeros((IMG_SIZE, IMG_SIZE, 3), np.uint8) for obj in detected_objects: i, j = obj['coords'] obj_data = obj['raw_data'] speed = obj_data[6] center_x, center_y = convert_grid_to_xy(i, j) theta = obj_data[3] * np.pi ori = np.array([math.cos(theta), math.sin(theta)]) loc_x = center_x + obj_data[1] + t * speed * ori[0] loc_y = center_y + obj_data[2] - t * speed * ori[1] box = np.array([obj_data[4], obj_data[5]]) if obj['type'] == 'pedestrian': box *= 1.5 add_rect( img, loc=np.array([loc_x, loc_y]), ori=ori, box=box, value=obj['confidence'], color=CLASS_COLORS[obj['type']] ) return img, counts def render_self_car(loc, ori, box, pixels_per_meter=PIXELS_PER_METER): """ رسم السيارة الذاتية على الخريطة BEV. Args: loc: موقع السيارة [x, y] في النظام العالمي. ori: اتجاه السيارة [cos(theta), sin(theta)]. box: أبعاد السيارة [طول, عرض]. pixels_per_meter: عدد البكسلات لكل متر. Returns: self_car_map: خريطة السيارة ذاتية القيادة (RGB - 3 قنوات). """ img = np.zeros((IMG_SIZE, IMG_SIZE, 3), np.uint8) center_x = int(loc[0] * pixels_per_meter + MAX_DISTANCE * pixels_per_meter) center_y = int(loc[1] * pixels_per_meter + MAX_DISTANCE * pixels_per_meter) size_px = ( int(box[0] * pixels_per_meter), int(box[1] * pixels_per_meter) ) angle_deg = -np.degrees(math.atan2(ori[1], ori[0])) box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg)) box_points = np.int32(box_points) ego_color = (0, 255, 255) # أصفر cv2.fillConvexPoly(img, box_points, ego_color) return img # ← نرجع الصورة بأكملها وليس جزءًا منها def render_waypoints(waypoints, pixels_per_meter=PIXELS_PER_METER): global last_valid_waypoints img = np.zeros((IMG_SIZE, IMG_SIZE, 3), np.uint8) current_waypoints = waypoints if waypoints is not None and len(waypoints) > 2: last_valid_waypoints = waypoints else: current_waypoints = last_valid_waypoints if current_waypoints is None: return img origin_x, origin_y = EGO_CAR_X, EGO_CAR_Y for i, point in enumerate(current_waypoints): px = int(origin_x + point[1] * pixels_per_meter) py = int(origin_y - point[0] * pixels_per_meter) color = (0, 0, 255) if i == len(current_waypoints) - 1 else (0, 255, 0) cv2.circle(img, (px, py), 4, color, -1) return img def collision_detections(map1, map2, threshold=0.04): """ تحقق من وجود تداخل بين خريطة البيئة ونموذج السيارة. """ print("map1 shape:", map1.shape) print("map2 shape:", map2.shape) # تحويل map2 إلى grayscale إذا كانت تحتوي على 3 قنوات (RGB) if len(map2.shape) == 3 and map2.shape[2] == 3: map2 = cv2.cvtColor(map2, cv2.COLOR_BGR2GRAY) # التأكد من أن map1 و map2 لها نفس الأبعاد assert map1.shape == map2.shape overlap_map = (map1 > 0.01) & (map2 > 0.01) ratio = float(np.sum(overlap_map)) / np.sum(map2 > 0) return ratio < threshold def get_max_safe_distance(meta_data, downsampled_waypoints, t, collision_buffer, threshold): """ حساب أقصى مسافة آمنة قبل حدوث تصادم. """ surround_map = meta_data.reshape(20, 20, 7)[..., :3][..., 0] if np.sum(surround_map) < 1: return np.linalg.norm(downsampled_waypoints[-3]) hero_bounding_box = np.array([2.45, 1.0]) + collision_buffer safe_distance = 0.0 for i in range(len(downsampled_waypoints) - 2): aim = (downsampled_waypoints[i + 1] + downsampled_waypoints[i + 2]) / 2.0 loc = downsampled_waypoints[i] ori = aim - loc self_car_map = render_self_car(loc=loc, ori=ori, box=hero_bounding_box, pixels_per_meter=PIXELS_PER_METER) # تصغير الخريطة والتحويل إلى grayscale self_car_map_resized = cv2.resize(self_car_map, (20, 20)) self_car_map_gray = cv2.cvtColor(self_car_map_resized, cv2.COLOR_BGR2GRAY) if not collision_detections(surround_map, self_car_map_gray, threshold): break safe_distance = max(safe_distance, np.linalg.norm(loc)) return safe_distance def downsample_waypoints(waypoints, precision=0.2): """ تقليل عدد نقاط المسار. """ downsampled_waypoints = [] last_waypoint = np.array([0.0, 0.0]) for i in range(len(waypoints)): now_waypoint = waypoints[i] dis = np.linalg.norm(now_waypoint - last_waypoint) if dis > precision: interval = int(dis / precision) move_vector = (now_waypoint - last_waypoint) / (interval + 1) for j in range(interval): downsampled_waypoints.append(last_waypoint + move_vector * (j + 1)) downsampled_waypoints.append(now_waypoint) last_waypoint = now_waypoint return downsampled_waypoints # ================== 3. فئة التتبع ================== class TrackedObject: def __init__(self): self.last_step = 0 self.last_pos = [0, 0] self.historical_pos = [] self.historical_steps = [] self.historical_features = [] class Tracker: def __init__(self, frequency=10): self.tracks = [] self.alive_ids = [] self.frequency = frequency def update_and_predict(self, det_data, pos, theta, frame_num): det_data_weighted = det_data * reweight_array detected_objects = find_peak_box(det_data_weighted) objects_info = [] R = np.array([[np.cos(-theta), -np.sin(-theta)], [np.sin(-theta), np.cos(-theta)]]) for obj in detected_objects: i, j = obj['coords'] obj_data = obj['raw_data'] center_y, center_x = convert_grid_to_xy(i, j) center_x += obj_data[1] center_y += obj_data[2] loc = R.T.dot(np.array([center_x, center_y])) objects_info.append([loc[0] + pos[0], loc[1] + pos[1], obj_data[1:]]) # [x, y, features...] updates_ids = self._update(objects_info, frame_num) speed_results, heading_results = self._predict(updates_ids) for k, poi in enumerate(updates_ids): i, j = poi if heading_results[k] is not None: factor = MERGE_PERCENT * 0.1 det_data[i, j, 3] = heading_results[k] * factor + det_data[i, j, 3] * (1 - factor) if speed_results[k] is not None: factor = MERGE_PERCENT * 0.1 det_data[i, j, 6] = speed_results[k] * factor + det_data[i, j, 6] * (1 - factor) return det_data def _update(self, objects_info, step): latest_ids = [] if len(self.tracks) == 0: for object_info in objects_info: to = TrackedObject() to.update(step, object_info) self.tracks.append(to) latest_ids.append(len(self.tracks) - 1) else: matched_ids = set() for idx, object_info in enumerate(objects_info): min_id, min_error = -1, float('inf') pos_x, pos_y = object_info[:2] for _id in self.alive_ids: if _id in matched_ids: continue track_pos = self.tracks[_id].last_pos distance = np.sqrt((track_pos[0] - pos_x)**2 + (track_pos[1] - pos_y)**2) if distance < 2.0 and distance < min_error: min_error = distance min_id = _id if min_id != -1: self.tracks[min_id].update(step, objects_info[idx]) latest_ids.append(min_id) matched_ids.add(min_id) else: to = TrackedObject() self.tracks.append(to) latest_ids.append(len(self.tracks) - 1) self.alive_ids = [i for i, track in enumerate(self.tracks) if track.last_step > step - 6] return latest_ids def _match(self, objects_info): results = [] matched_ids = set() for object_info in objects_info: min_id, min_error = -1, float('inf') pos_x, pos_y = object_info[:2] for _id in self.alive_ids: if _id in matched_ids: continue track_pos = self.tracks[_id].last_pos distance = np.sqrt((track_pos[0] - pos_x)**2 + (track_pos[1] - pos_y)**2) if distance < min_error: min_error = distance min_id = _id results.append(min_id) if min_id != -1: matched_ids.add(min_id) return results def _predict(self, updates_ids): speed_results, heading_results = [], [] for each_id in updates_ids: to = self.tracks[each_id] avg_speed, avg_heading = [], [] for feature in to.historical_features: avg_speed.append(feature[2]) avg_heading.append(feature[:2]) if len(avg_speed) < 2: speed_results.append(None) heading_results.append(None) continue avg_speed = np.mean(avg_speed) avg_heading = np.mean(np.stack(avg_heading), axis=0) yaw_angle = get_yaw_angle(avg_heading) heading_results.append((4 - yaw_angle / np.pi) % 2) speed_results.append(avg_speed) return speed_results, heading_results def get_yaw_angle(forward_vector): forward_vector = forward_vector / np.linalg.norm(forward_vector) yaw = math.atan2(forward_vector[1], forward_vector[0]) return yaw # ================== 4. فئة المتحكم ================== class InterfuserController: def __init__(self, config): self.turn_controller = PIDController( K_P=config.turn_KP, K_I=config.turn_KI, K_D=config.turn_KD, n=config.turn_n, ) self.speed_controller = PIDController( K_P=config.speed_KP, K_I=config.speed_KI, K_D=config.speed_KD, n=config.speed_n, ) self.config = config self.collision_buffer = np.array(config.collision_buffer) self.detect_threshold = config.detect_threshold self.stop_steps = 0 self.forced_forward_steps = 0 self.red_light_steps = 0 self.block_red_light = 0 self.in_stop_sign_effect = False self.block_stop_sign_distance = 0 self.stop_sign_timer = 0 self.stop_sign_trigger_times = 0 def run_step( self, speed, waypoints, junction, traffic_light_state, stop_sign, meta_data ): # --- تحديث حالة التوقف --- if speed < 0.2: self.stop_steps += 1 else: self.stop_steps = max(0, self.stop_steps - 10) if speed < 0.06 and self.in_stop_sign_effect: self.in_stop_sign_effect = False if junction < 0.3: self.stop_sign_trigger_times = 0 if traffic_light_state > 0.7: self.red_light_steps += 1 else: self.red_light_steps = 0 if self.red_light_steps > 1000: self.block_red_light = 80 self.red_light_steps = 0 if self.block_red_light > 0: self.block_red_light -= 1 traffic_light_state = 0.01 if stop_sign < 0.6 and self.block_stop_sign_distance < 0.1: self.in_stop_sign_effect = True self.block_stop_sign_distance = 2.0 self.stop_sign_trigger_times = 3 self.block_stop_sign_distance = max( 0, self.block_stop_sign_distance - 0.05 * speed ) if self.block_stop_sign_distance < 0.1: if self.stop_sign_trigger_times > 0: self.block_stop_sign_distance = 2.0 self.stop_sign_trigger_times -= 1 self.in_stop_sign_effect = True # --- حساب زاوية الانعطاف --- aim = (waypoints[1] + waypoints[0]) / 2.0 angle = np.degrees(np.pi / 2 - np.arctan2(aim[1], aim[0])) / 90 if speed < 0.01: angle = 0 steer = self.turn_controller.step(angle) steer = np.clip(steer, -1.0, 1.0) brake = False throttle = 0.0 desired_speed = 0.0 downsampled_waypoints = downsample_waypoints(waypoints) d_0 = get_max_safe_distance( meta_data, downsampled_waypoints, t=0, collision_buffer=self.collision_buffer, threshold=self.detect_threshold, ) d_05 = get_max_safe_distance( meta_data, downsampled_waypoints, t=0.5, collision_buffer=self.collision_buffer, threshold=self.detect_threshold, ) d_075 = get_max_safe_distance( meta_data, downsampled_waypoints, t=0.75, collision_buffer=self.collision_buffer, threshold=self.detect_threshold, ) d_1 = get_max_safe_distance( meta_data, downsampled_waypoints, t=1, collision_buffer=self.collision_buffer, threshold=self.detect_threshold, ) d_15 = get_max_safe_distance( meta_data, downsampled_waypoints, t=1.5, collision_buffer=self.collision_buffer, threshold=self.detect_threshold, ) d_2 = get_max_safe_distance( meta_data, downsampled_waypoints, t=2, collision_buffer=self.collision_buffer, threshold=self.detect_threshold, ) d_05 = min(d_0, d_05, d_075) d_1 = min(d_05, d_075, d_15, d_2) safe_dis = min(d_05, d_1) d_0 = max(0, d_0 - 2.0) d_05 = max(0, d_05 - 2.0) d_1 = max(0, d_1 - 2.0) # --- تفعيل الفرملة فقط إذا كانت الإشارة حمراء أو هناك علامة Stop --- if traffic_light_state > 0.5: brake = True desired_speed = 0.0 elif stop_sign > 0.6 and traffic_light_state <= 0.5: if self.stop_sign_timer < 20: brake = True desired_speed = 0.0 self.stop_sign_timer += 1 else: brake = False desired_speed = max(0, min(self.config.max_speed, speed + 0.2)) else: brake = False desired_speed = max(0, min(self.config.max_speed, speed + 0.2)) delta = np.clip(desired_speed - speed, 0.0, self.config.clip_delta) throttle = self.speed_controller.step(delta) throttle = np.clip(throttle, 0.0, self.config.max_throttle) # --- إذا كانت السرعة أعلى من 1.1 مرة السرعة المستهدفة، نفرم --- if speed > desired_speed * self.config.brake_ratio: brake = True # --- إعداد معلومات التشخيص --- meta_info_1 = f"speed: {speed:.2f}, target_speed: {desired_speed:.2f}" meta_info_2 = f"on_road_prob: {junction:.2f}, red_light_prob: {traffic_light_state:.2f}, stop_sign_prob: {1 - stop_sign:.2f}" meta_info_3 = f"stop_steps: {self.stop_steps}, block_stop_sign_distance: {self.block_stop_sign_distance:.1f}" # --- حالة خاصة بعد فترة طويلة من التوقف --- if self.stop_steps > 1200: self.forced_forward_steps = 12 self.stop_steps = 0 if self.forced_forward_steps > 0: throttle = 0.8 brake = False self.forced_forward_steps -= 1 if self.in_stop_sign_effect: throttle = 0 brake = True return steer, throttle, brake, (meta_info_1, meta_info_2, meta_info_3, safe_dis) class ControllerConfig: turn_KP, turn_KI, turn_KD, turn_n = 1.0, 0.1, 0.1, 20 speed_KP, speed_KI, speed_KD, speed_n = 0.5, 0.05, 0.1, 20 max_speed, max_throttle, clip_delta = 6.0, 0.75, 0.25 collision_buffer, detect_threshold = [0.0, 0.0], 0.04 brake_speed, brake_ratio = 0.4, 1.1 # ================== 5. واجهة العرض ================== class DisplayInterface: def __init__(self, width=1200, height=600): self._width = width self._height = height def run_interface(self, data): dashboard = np.zeros((self._height, self._width, 3), dtype=np.uint8) font = cv2.FONT_HERSHEY_SIMPLEX dashboard[:, :800] = cv2.resize(data.get('camera_view'), (800, 600)) dashboard[:400, 800:1200] = cv2.resize(data['map_t0'], (400, 400)) dashboard[400:600, 800:1000] = cv2.resize(data['map_t1'], (200, 200)) dashboard[400:600, 1000:1200] = cv2.resize(data['map_t2'], (200, 200)) # خطوط فصل cv2.line(dashboard, (800, 0), (800, 600), (255, 255, 255), 2) cv2.line(dashboard, (800, 400), (1200, 400), (255, 255, 255), 2) cv2.line(dashboard, (1000, 400), (1000, 600), (255, 255, 255), 2) y_pos = 40 for key, text in data['text_info'].items(): cv2.putText(dashboard, text, (820, y_pos), font, 0.6, (255, 255, 255), 1) y_pos += 30 y_pos += 10 for t, counts in data['object_counts'].items(): count_str = f"{t}: C={counts['car']} B={counts['bike']} P={counts['pedestrian']}" cv2.putText(dashboard, count_str, (820, y_pos), font, 0.5, (255, 255, 255), 1) y_pos += 20 cv2.putText(dashboard, "t0", (1160, 30), font, 0.8, (0, 255, 255), 2) cv2.putText(dashboard, "t1", (960, 430), font, 0.8, (0, 255, 255), 2) cv2.putText(dashboard, "t2", (1160, 430), font, 0.8, (0, 255, 255), 2) return dashboard from torch.utils.data import random_split # --- تحديد التحوّلات --- transform = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) lidar_transform = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((112, 112)), transforms.ToTensor(), transforms.Normalize(mean=[0.5], std=[0.5]), ]) class LMDriveDataset(Dataset): def __init__(self, data_dir, transform=None, lidar_transform=None): self.data_dir = Path(data_dir) self.transform = transform self.lidar_transform = lidar_transform self.samples = [] measurement_dir = self.data_dir / "measurements" image_dir = self.data_dir / "rgb_full" measurement_files = sorted([f for f in os.listdir(measurement_dir) if f.endswith(".json")]) image_files = sorted([f for f in os.listdir(image_dir) if f.endswith(".jpg")]) num_samples = min(len(measurement_files), len(image_files)) for i in range(num_samples): frame_id = i measurement_path = str(measurement_dir / f"{frame_id:04d}.json") image_name = f"{frame_id:04d}.jpg" image_path = str(image_dir / image_name) if not os.path.exists(measurement_path) or not os.path.exists(image_path): continue with open(measurement_path, "r") as f: measurements_data = json.load(f) self.samples.append({ "image_path": image_path, "measurement_path": measurement_path, "frame_id": frame_id, "measurements": measurements_data }) def __len__(self): return len(self.samples) def __getitem__(self, idx): sample = self.samples[idx] # قراءة الصورة الكاملة (2400x800) full_image = cv2.imread(sample["image_path"]) if full_image is None: raise ValueError(f"Failed to load image: {sample['image_path']}") full_image = cv2.cvtColor(full_image, cv2.COLOR_BGR2RGB) # تقسيم الصورة إلى أجزاء (كل جزء 600x800) front_image = full_image[:600, :800] # الجزء الأول left_image = full_image[600:1200, :800] # الجزء الثاني right_image = full_image[1200:1800, :800] # الجزء الثالث center_image = full_image[1800:2400, :800]# الجزء الرابع # تطبيق التحويل على كل صورة front_image_tensor = self.transform(front_image) left_image_tensor = self.transform(left_image) right_image_tensor = self.transform(right_image) center_image_tensor = self.transform(center_image) # تحميل الليدار lidar_path = str(self.data_dir / "lidar" / f"{sample['frame_id']:04d}.png") lidar = cv2.imread(lidar_path) if lidar is None: lidar = np.zeros((112, 112, 3), dtype=np.uint8) # مكان فارغ else: if len(lidar.shape) == 2: lidar = cv2.cvtColor(lidar, cv2.COLOR_GRAY2BGR) lidar = cv2.cvtColor(lidar, cv2.COLOR_BGR2RGB) lidar_tensor = self.lidar_transform(lidar) # استخراج القياسات measurements_data = sample["measurements"] x = measurements_data.get("x", 0.0) y = measurements_data.get("y", 0.0) theta = measurements_data.get("theta", 0.0) speed = measurements_data.get("speed", 0.0) steer = measurements_data.get("steer", 0.0) throttle = measurements_data.get("throttle", 0.0) brake = int(measurements_data.get("brake", False)) command = measurements_data.get("command", 0) is_junction = int(measurements_data.get("is_junction", False)) should_brake = int(measurements_data.get("should_brake", 0)) x_command = measurements_data.get("x_command", 0.0) y_command = measurements_data.get("y_command", 0.0) target_point = torch.tensor([x_command, y_command], dtype=torch.float32) measurements = torch.tensor( [x, y, theta, speed, steer, throttle, brake, command, is_junction, should_brake], dtype=torch.float32 ) return { "rgb": front_image_tensor, "rgb_left": left_image_tensor, "rgb_right": right_image_tensor, "rgb_center": center_image_tensor, "lidar": lidar_tensor, "measurements": measurements, "target_point": target_point } # ============================================================================== # SECTION 1: HELPER CLASSES (Copied from original implementation) # ============================================================================== def to_2tuple(x): if isinstance(x, tuple): return x return (x, x) class HybridEmbed(nn.Module): def __init__(self, backbone, img_size=224, patch_size=1, feature_size=None, in_chans=3, embed_dim=768): super().__init__() img_size = to_2tuple(img_size) self.img_size = img_size self.patch_size = to_2tuple(patch_size) self.backbone = backbone if feature_size is None: with torch.no_grad(): training = backbone.training if training: backbone.eval() o = self.backbone(torch.zeros(1, in_chans, img_size[0], img_size[1])) if isinstance(o, (list, tuple)): o = o[-1] feature_size = o.shape[-2:] feature_dim = o.shape[1] backbone.train(training) else: feature_size = to_2tuple(feature_size) if hasattr(self.backbone, 'feature_info'): feature_dim = self.backbone.feature_info.channels()[-1] else: feature_dim = self.backbone.num_features self.proj = nn.Conv2d(feature_dim, embed_dim, kernel_size=1) def forward(self, x): x = self.backbone(x) if isinstance(x, (list, tuple)): x = x[-1] x = self.proj(x) return x class PositionEmbeddingSine(nn.Module): def __init__(self, num_pos_feats=64, temperature=10000, normalize=False, scale=None): super().__init__() self.num_pos_feats = num_pos_feats self.temperature = temperature self.normalize = normalize if scale is not None and normalize is False: raise ValueError("normalize should be True if scale is passed") if scale is None: scale = 2 * math.pi self.scale = scale def forward(self, tensor): x = tensor bs, _, h, w = x.shape not_mask = torch.ones((bs, h, w), device=x.device) y_embed = not_mask.cumsum(1, dtype=torch.float32) x_embed = not_mask.cumsum(2, dtype=torch.float32) if self.normalize: eps = 1e-6 y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device) dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats) pos_x = x_embed[:, :, :, None] / dim_t pos_y = y_embed[:, :, :, None] / dim_t pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3) pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3) pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2) return pos class TransformerEncoder(nn.Module): def __init__(self, encoder_layer, num_layers, norm=None): super().__init__() self.layers = _get_clones(encoder_layer, num_layers) self.num_layers = num_layers self.norm = norm def forward(self, src, mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None): output = src for layer in self.layers: output = layer(output, src_mask=mask, src_key_padding_mask=src_key_padding_mask, pos=pos) if self.norm is not None: output = self.norm(output) return output class TransformerDecoder(nn.Module): def __init__(self, decoder_layer, num_layers, norm=None, return_intermediate=False): super().__init__() self.layers = _get_clones(decoder_layer, num_layers) self.num_layers = num_layers self.norm = norm self.return_intermediate = return_intermediate def forward(self, tgt, memory, tgt_mask: Optional[Tensor] = None, memory_mask: Optional[Tensor] = None, tgt_key_padding_mask: Optional[Tensor] = None, memory_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None, query_pos: Optional[Tensor] = None): output = tgt intermediate = [] for layer in self.layers: output = layer(output, memory, tgt_mask=tgt_mask, memory_mask=memory_mask, tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask, pos=pos, query_pos=query_pos) if self.return_intermediate: intermediate.append(self.norm(output)) if self.norm is not None: output = self.norm(output) if self.return_intermediate: intermediate.pop(); intermediate.append(output) return torch.stack(intermediate) if self.return_intermediate else output.unsqueeze(0) class TransformerEncoderLayer(nn.Module): def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation=nn.ReLU, normalize_before=False): super().__init__() self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) self.linear1 = nn.Linear(d_model, dim_feedforward); self.dropout = nn.Dropout(dropout); self.linear2 = nn.Linear(dim_feedforward, d_model) self.norm1 = nn.LayerNorm(d_model); self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout); self.dropout2 = nn.Dropout(dropout) self.activation = activation(); self.normalize_before = normalize_before def with_pos_embed(self, tensor, pos: Optional[Tensor]): return tensor if pos is None else tensor + pos def forward(self, src, src_mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None): q = k = self.with_pos_embed(src, pos) src2 = self.self_attn(q, k, value=src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0] src = src + self.dropout1(src2) src = self.norm1(src) src2 = self.linear2(self.dropout(self.activation(self.linear1(src)))) src = src + self.dropout2(src2) src = self.norm2(src) return src class TransformerDecoderLayer(nn.Module): def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation=nn.ReLU, normalize_before=False): super().__init__() self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout) self.linear1 = nn.Linear(d_model, dim_feedforward); self.dropout = nn.Dropout(dropout); self.linear2 = nn.Linear(dim_feedforward, d_model) self.norm1 = nn.LayerNorm(d_model); self.norm2 = nn.LayerNorm(d_model); self.norm3 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout); self.dropout2 = nn.Dropout(dropout); self.dropout3 = nn.Dropout(dropout) self.activation = activation(); self.normalize_before = normalize_before def with_pos_embed(self, tensor, pos: Optional[Tensor]): return tensor if pos is None else tensor + pos def forward(self, tgt, memory, tgt_mask: Optional[Tensor] = None, memory_mask: Optional[Tensor] = None, tgt_key_padding_mask: Optional[Tensor] = None, memory_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None, query_pos: Optional[Tensor] = None): q = k = self.with_pos_embed(tgt, query_pos) tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)[0] tgt = tgt + self.dropout1(tgt2) tgt = self.norm1(tgt) tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos), key=self.with_pos_embed(memory, pos), value=memory, attn_mask=memory_mask, key_padding_mask=memory_key_padding_mask)[0] tgt = tgt + self.dropout2(tgt2) tgt = self.norm2(tgt) tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt)))) tgt = tgt + self.dropout3(tgt2) tgt = self.norm3(tgt) return tgt def _get_clones(module, N): return nn.ModuleList([copy.deepcopy(module) for i in range(N)]) class LinearWaypointsPredictor(nn.Module): def __init__(self, input_dim, cumsum=True): super().__init__() self.cumsum = cumsum self.rank_embed = nn.Parameter(torch.zeros(1, 10, input_dim)) self.head_fc1_list = nn.ModuleList([nn.Linear(input_dim, 64) for _ in range(6)]) self.head_relu = nn.ReLU(inplace=True) self.head_fc2_list = nn.ModuleList([nn.Linear(64, 2) for _ in range(6)]) def forward(self, x, measurements): bs, n, dim = x.shape x = (x + self.rank_embed).reshape(-1, dim) mask = measurements[:, :6].unsqueeze(-1).repeat(n, 1, 2) rs = [self.head_fc2_list[i](self.head_relu(self.head_fc1_list[i](x))) for i in range(6)] x = torch.sum(torch.stack(rs, 1) * mask, dim=1).view(bs, n, 2) return torch.cumsum(x, 1) if self.cumsum else x class GRUWaypointsPredictor(nn.Module): def __init__(self, input_dim, waypoints=10): super().__init__() self.gru = torch.nn.GRU(input_size=input_dim, hidden_size=64, batch_first=True) self.encoder = nn.Linear(2, 64) self.decoder = nn.Linear(64, 2) self.waypoints = waypoints def forward(self, x, target_point): bs = x.shape[0] z = self.encoder(target_point).unsqueeze(0) output, _ = self.gru(x, z) output = self.decoder(output.reshape(bs * self.waypoints, -1)).reshape(bs, self.waypoints, 2) return torch.cumsum(output, 1) # ============================================================================== # SECTION 2: CONTROL LOGIC CLASSES (From 2nd script) # ============================================================================== class PIDController: def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20): self._K_P = K_P self._K_I = K_I self._K_D = K_D self._window = deque([0 for _ in range(n)], maxlen=n) def step(self, error): self._window.append(error) if len(self._window) >= 2: integral = np.mean(self._window) derivative = self._window[-1] - self._window[-2] else: integral = 0.0 derivative = 0.0 return self._K_P * error + self._K_I * integral + self._K_D * derivative class ControllerConfig: turn_KP, turn_KI, turn_KD, turn_n = 1.0, 0.1, 0.1, 20 speed_KP, speed_KI, speed_KD, speed_n = 0.5, 0.05, 0.1, 20 max_speed, max_throttle, clip_delta = 6.0, 0.75, 0.25 collision_buffer, detect_threshold = [0.0, 0.0], 0.04 brake_speed, brake_ratio = 0.4, 1.1 class InterfuserController: def __init__(self, config: ControllerConfig): self.turn_controller = PIDController(K_P=config.turn_KP, K_I=config.turn_KI, K_D=config.turn_KD, n=config.turn_n) self.speed_controller = PIDController(K_P=config.speed_KP, K_I=config.speed_KI, K_D=config.speed_KD, n=config.speed_n) self.config = config self.stop_steps = 0 self.red_light_steps = 0 def run_step(self, speed, waypoints, junction, traffic_light_state, stop_sign, meta_data): if speed < 0.2: self.stop_steps += 1 else: self.stop_steps = max(0, self.stop_steps - 10) aim = (waypoints[1] + waypoints[0]) / 2.0 angle = np.degrees(np.pi / 2 - np.arctan2(aim[1], aim[0])) / 90 if speed < 0.01: angle = 0.0 steer = self.turn_controller.step(angle) steer = np.clip(steer, -1.0, 1.0) brake = False # Simplified speed control logic for clarity if traffic_light_state > 0.5 or stop_sign > 0.5: desired_speed = 0.0 brake = True else: # A simple logic to move forward towards a target speed desired_speed = self.config.max_speed if speed > desired_speed: brake = True delta = np.clip(desired_speed - speed, 0.0, self.config.clip_delta) throttle = self.speed_controller.step(delta) throttle = np.clip(throttle, 0.0, self.config.max_throttle) if brake: throttle = 0.0 metadata = { "speed": f"{speed:.2f}", "target_speed": f"{desired_speed:.2f}", "steer": f"{steer:.2f}", "throttle": f"{throttle:.2f}", "brake": f"{brake}", "junction": f"{junction:.2f}", "light_state": f"{traffic_light_state:.2f}", "stop_sign": f"{stop_sign:.2f}" } return steer, throttle, brake, metadata # ============================================================================== # SECTION 3: HUGGING FACE MODEL CONFIGURATION # ============================================================================== class InterfuserConfig(PretrainedConfig): model_type = "interfuser" def __init__(self, img_size=224, patch_size=8, in_chans=3, embed_dim=768, enc_depth=6, dec_depth=6, dim_feedforward=2048, normalize_before=False, rgb_backbone_name="r26", lidar_backbone_name="r26", num_heads=8, dropout=0.1, waypoints_pred_head="linear-sum", use_view_embed=True, **kwargs): super().__init__(**kwargs) self.img_size = img_size self.patch_size = patch_size self.in_chans = in_chans self.embed_dim = embed_dim self.enc_depth = enc_depth self.dec_depth = dec_depth self.dim_feedforward = dim_feedforward self.normalize_before = normalize_before self.rgb_backbone_name = rgb_backbone_name self.lidar_backbone_name = lidar_backbone_name self.num_heads = num_heads self.dropout = dropout self.waypoints_pred_head = waypoints_pred_head self.use_view_embed = use_view_embed @dataclass class InterfuserControlOutput(ModelOutput): steer: torch.FloatTensor = None throttle: torch.FloatTensor = None brake: torch.FloatTensor = None waypoints: Optional[torch.FloatTensor] = None traffic_predictions: Optional[torch.FloatTensor] = None metadata: Optional[dict] = None # ============================================================================== # SECTION 4: CORE PREDICTION MODEL # ============================================================================== class Interfuser(nn.Module): def __init__(self, config: InterfuserConfig): super().__init__() self.config = config embed_dim = config.embed_dim act_layer = nn.GELU backbone_map = {"r50": resnet50d, "r26": resnet26d, "r18": resnet18d} rgb_backbone = backbone_map.get(config.rgb_backbone_name, resnet26d)(pretrained=True, in_chans=3, features_only=True, out_indices=[4]) lidar_backbone = backbone_map.get(config.lidar_backbone_name, resnet26d)(pretrained=False, in_chans=3, features_only=True, out_indices=[4]) self.rgb_patch_embed = HybridEmbed(backbone=rgb_backbone, img_size=config.img_size, patch_size=config.patch_size, in_chans=3, embed_dim=embed_dim) self.lidar_patch_embed = HybridEmbed(backbone=lidar_backbone, img_size=config.img_size, patch_size=config.patch_size, in_chans=3, embed_dim=embed_dim) self.use_view_embed = config.use_view_embed if self.use_view_embed: self.view_embed = nn.Parameter(torch.zeros(1, embed_dim, 5, 1)) self.global_embed = nn.Parameter(torch.zeros(1, embed_dim, 5)) nn.init.uniform_(self.view_embed) nn.init.uniform_(self.global_embed) self.query_pos_embed = nn.Parameter(torch.zeros(1, embed_dim, 11)) self.query_embed = nn.Parameter(torch.zeros(400 + 11, 1, embed_dim)) nn.init.uniform_(self.query_pos_embed) nn.init.uniform_(self.query_embed) self.waypoints_generator = LinearWaypointsPredictor(embed_dim, cumsum=True) self.junction_pred_head = nn.Linear(embed_dim, 2) self.traffic_light_pred_head = nn.Linear(embed_dim, 2) self.stop_sign_head = nn.Linear(embed_dim, 2) self.traffic_pred_head = nn.Sequential(nn.Linear(embed_dim + 32, 64), nn.ReLU(), nn.Linear(64, 7), nn.Sigmoid()) self.position_encoding = PositionEmbeddingSine(embed_dim // 2, normalize=True) encoder_layer = TransformerEncoderLayer(embed_dim, config.num_heads, config.dim_feedforward, config.dropout, act_layer, config.normalize_before) self.encoder = TransformerEncoder(encoder_layer, config.enc_depth, None) decoder_layer = TransformerDecoderLayer(embed_dim, config.num_heads, config.dim_feedforward, config.dropout, act_layer, config.normalize_before) decoder_norm = nn.LayerNorm(embed_dim) self.decoder = TransformerDecoder(decoder_layer, config.dec_depth, decoder_norm, return_intermediate=False) def forward_features(self, images, lidar): features = [] token_embeds = [self.rgb_patch_embed(images[:, i]) for i in range(4)] token_embeds.append(self.lidar_patch_embed(lidar)) for i, token_embed in enumerate(token_embeds): pos_embed = self.position_encoding(token_embed) if self.use_view_embed: token_embed += self.view_embed[:, :, i:i+1, :] spatial_token = (token_embed + pos_embed).flatten(2).permute(2, 0, 1) global_token = torch.mean(token_embed, [2,3], keepdim=False)[:,:,None].permute(2,0,1) if self.use_view_embed: global_token += self.global_embed[:,:,i:i+1].permute(2,0,1) features.extend([spatial_token, global_token]) return torch.cat(features, 0) def forward(self, inputs): images = torch.stack([inputs['rgb'], inputs['rgb_left'], inputs['rgb_right'], inputs['rgb_center']], dim=1) lidar = inputs['lidar'] measurements = inputs['measurements'] target_point = inputs['target_point'] bs = images.shape[0] features = self.forward_features(images, lidar) tgt = self.position_encoding(torch.ones((bs, 1, 20, 20), device=images.device)).flatten(2) tgt = torch.cat([tgt, self.query_pos_embed.repeat(bs, 1, 1)], 2) tgt = tgt.permute(2, 0, 1) memory = self.encoder(features) hs = self.decoder(self.query_embed.repeat(1, bs, 1), memory, query_pos=tgt)[0].permute(1, 0, 2) traffic_feature, is_junction_feature, waypoints_feature = hs[:, :400], hs[:, 400], hs[:, 401:411] waypoints = self.waypoints_generator(waypoints_feature, measurements) is_junction = self.junction_pred_head(is_junction_feature) traffic_light_state = self.traffic_light_pred_head(is_junction_feature) stop_sign = self.stop_sign_head(is_junction_feature) velocity = measurements[:, 6:7].unsqueeze(-1).repeat(1, 400, 32) traffic_feature_with_vel = torch.cat([traffic_feature, velocity], dim=2) traffic = self.traffic_pred_head(traffic_feature_with_vel) return traffic, waypoints, is_junction, traffic_light_state, stop_sign, traffic_feature # ============================================================================== # SECTION 5: THE FINAL INTEGRATED MODEL FOR HUGGING FACE # ============================================================================== class InterfuserForEndToEndControl(PreTrainedModel): config_class = InterfuserConfig def __init__(self, config: InterfuserConfig): super().__init__(config) self.model = Interfuser(config) self.controller = InterfuserController(ControllerConfig()) self.WAYPOINT_SCALE_FACTOR = 5.0 def _init_weights(self, module): if hasattr(module, 'reset_parameters'): module.reset_parameters() elif isinstance(module, nn.Linear): module.weight.data.normal_(mean=0.0, std=0.02) if module.bias is not None: module.bias.data.zero_() def forward( self, rgb: torch.FloatTensor, rgb_left: torch.FloatTensor, rgb_right: torch.FloatTensor, rgb_center: torch.FloatTensor, lidar: torch.FloatTensor, measurements: torch.FloatTensor, target_point: torch.FloatTensor, return_dict: Optional[bool] = None, ) -> Union[Tuple, InterfuserControlOutput]: return_dict = return_dict if return_dict is not None else self.config.use_return_dict if self.training: raise NotImplementedError("This end-to-end model is designed for inference only.") if rgb.shape[0] > 1: raise NotImplementedError("End-to-end control model currently supports batch_size=1 only.") inputs_for_model = {"rgb": rgb, "rgb_left": rgb_left, "rgb_right": rgb_right, "rgb_center": rgb_center, "lidar": lidar, "measurements": measurements, "target_point": target_point} (traffic, waypoints, is_junction, traffic_light_state, stop_sign, _) = self.model(inputs_for_model) speed_mps = measurements[0, 6].item() traffic_np = traffic[0].detach().cpu().numpy().reshape(20, 20, -1) waypoints_np = waypoints[0].detach().cpu().numpy().reshape(-1, 2) * self.WAYPOINT_SCALE_FACTOR steer, throttle, brake, metadata = self.controller.run_step( speed=speed_mps, waypoints=waypoints_np, junction=is_junction.sigmoid()[0, 1].item(), traffic_light_state=traffic_light_state.sigmoid()[0, 0].item(), stop_sign=stop_sign.sigmoid()[0, 1].item(), meta_data=traffic_np ) if not return_dict: return (steer, throttle, brake) return InterfuserControlOutput( steer=torch.tensor(steer, device=self.device), throttle=torch.tensor(throttle, device=self.device), brake=torch.tensor(brake, device=self.device), waypoints=waypoints, traffic_predictions=traffic, metadata=metadata ) def reset(self): self.controller = InterfuserController(ControllerConfig()) print("Control logic has been reset.")