import gradio as gr import numpy as np import pygame import threading import time from PIL import Image from car_game_classes_gradio import Car, RayCasting, MapGenerator from ppo_agent_gradio import PPOAgent from collections import deque import gc # Initialize Pygame pygame.init() SCREEN_WIDTH, SCREEN_HEIGHT = 1600, 900 screen = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) clock = pygame.time.Clock() # Define constants ACTION_SPACE = 8 state_size = 22 LOADED_MODEL = "model/18.6_car_model_epoch_270_lvl3_final.pth" SIMULATION_FPS = 120 # Fixed simulation FPS, matching training STREAM_FPS = 24 DT = 1.0 / SIMULATION_FPS # Fixed time step DIFFICULTY = 2 # NEW: Display scaling option ENABLE_DISPLAY_SCALING = True # Set to False to disable downscaling DISPLAY_WIDTH, DISPLAY_HEIGHT = 800, 450 # Target display resolution # Optimized frame buffering with deque frame_buffer = deque(maxlen=2) # Only keep 2 latest frames buffer_lock = threading.Lock() stream_counter = 0 stream_interval = SIMULATION_FPS // STREAM_FPS # Stream every ~5 simulation frames # Control variables simulation_running = False simulation_thread = None # NEW: Performance monitoring performance_stats = { 'frame_count': 0, 'last_gc': time.time(), 'gc_interval': 30.0 # Run garbage collection every 30 seconds } def point_to_line_distance(point, line_p0, line_p1): p = pygame.math.Vector2(point) a = pygame.math.Vector2(line_p0) b = pygame.math.Vector2(line_p1) ap = p - a ab = b - a magnitude_ab = ab.length_squared() if magnitude_ab == 0: return ap.length() projection = ap.dot(ab) / magnitude_ab projection = max(0, min(1, projection)) closest = a + projection * ab return p.distance_to(closest) class TrainingEnvironment: def __init__(self, action_space=8): self.action_space = action_space self.map_generator = MapGenerator(SCREEN_WIDTH, SCREEN_HEIGHT, DIFFICULTY, 18) car_surf = pygame.image.load("assets/car_transparent.png").convert_alpha() w, h = car_surf.get_size() self.car_image = pygame.transform.scale(car_surf, (w//18, h//18)) self.start_pos = None self.start_dir = None self.previous_speed = 0.0 self.speed_history = deque(maxlen=5) self.steering_history = deque(maxlen=10) self.steering_persistence = {'direction': 0, 'count': 0} self.last_steering_reward = 0.0 self.prev_pos = None self.passed_goals = [] self.last_action = 0 # NEW: Cached map surface self.map_surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) self.reset() def generate_new_map(self): start_pos, start_dir = self.map_generator.update() self.start_pos = pygame.math.Vector2(start_pos) self.start_dir = pygame.math.Vector2(start_dir).normalize() # NEW: Draw map onto cached surface once after generation self.map_surface.fill((255, 255, 255)) # Clear previous map self.map_generator.draw(self.map_surface) # Render map to cached surface def reset_car(self): if self.start_pos is None or self.start_dir is None: raise ValueError("Map not generated yet") self.car = Car(self.car_image, self.start_pos) self.car.direction = self.start_dir self.rays = RayCasting( self.car.pos, self.car.direction, self.map_generator.surface, [-60, -30, -15, 0, 15, 30, 60], [14, 22, 27, 26, 27, 22, 14], draw_rays=False) self.current_goal_index = 0 self.passed_goals = [] self.prev_pos = self.car.pos.copy() self.previous_speed = 0.0 self.speed_history.clear() self.steering_history.clear() self.steering_persistence = {'direction': 0, 'count': 0} self.last_steering_reward = 0.0 self.last_action = 0 def reset(self): self.map_generator.difficulty = DIFFICULTY self.generate_new_map() self.reset_car() def detect_steering_need(self, ray_distances): norm_distances = [min(d / self.rays.max_distance, 1.0) for d in ray_distances] left_rays = norm_distances[0:3] center_ray = norm_distances[3] right_rays = norm_distances[4:7] left_avg = sum(left_rays) / len(left_rays) right_avg = sum(right_rays) / len(right_rays) space_difference = right_avg - left_avg steering_needed = False preferred_direction = 0 urgency = 0.0 min_side_distance = min(min(left_rays), min(right_rays)) if min_side_distance < 0.3: steering_needed = True urgency = 1.0 - min_side_distance preferred_direction = 1 if left_avg < right_avg else -1 elif abs(space_difference) > 0.15: steering_needed = True urgency = min(abs(space_difference), 1.0) preferred_direction = 1 if space_difference > 0 else -1 elif center_ray < 0.4: steering_needed = True urgency = 1.0 - center_ray preferred_direction = 1 if left_avg < right_avg else -1 return { 'needed': steering_needed, 'direction': preferred_direction, 'urgency': urgency, 'space_diff': space_difference, 'left_space': left_avg, 'right_space': right_avg, 'center_distance': center_ray } def get_state(self): self.rays.update(self.car.pos, self.car.direction) ray_distances = self.rays.get_ray_distances() normalized_ray_distances = [min(distance / self.rays.max_distance, 1.0) for distance in ray_distances] steering_info = self.detect_steering_need(ray_distances) state = [ self.car.velocity / self.car.max_velocity, self.car.direction.x, self.car.direction.y, int(self.car.is_braking), ] state.extend(normalized_ray_distances) state.extend([ float(steering_info['needed']), steering_info['direction'], steering_info['urgency'], steering_info['space_diff'], steering_info['left_space'], steering_info['right_space'], steering_info['center_distance'] ]) total_goals = len(self.map_generator.goals) progress = self.current_goal_index / max(total_goals, 1) state.append(progress) state.append(self.last_action / 4.0) speed_change = self.car.velocity - self.previous_speed self.speed_history.append(speed_change) avg_speed_change = sum(self.speed_history) / max(len(self.speed_history), 1) state.extend([speed_change / 100.0, avg_speed_change / 100.0]) self.previous_speed = self.car.velocity return np.array(state, dtype=np.float32) def execute_action_dispatch(self, action, dt): action_int = int(action) self.last_action = action_int keys = {'w': False, 'a': False, 'd': False, 's': False} steer_factor = 1.0 if action_int == 0: keys['w'] = True elif action_int == 1: keys['a'] = True steer_factor = 1.0 elif action_int == 2: keys['d'] = True steer_factor = 1.0 elif action_int == 3: keys['w'] = True keys['a'] = True steer_factor = 0.8 elif action_int == 4: keys['w'] = True keys['d'] = True steer_factor = 0.8 elif action_int == 5: keys['s'] = True original_brake = self.car.brake_deceleration self.car.brake_deceleration *= 0.6 self.car.update(dt, keys, steer_factor) self.car.brake_deceleration = original_brake return elif action_int == 6: keys['s'] = True keys['a'] = True steer_factor = 1.2 elif action_int == 7: keys['s'] = True keys['d'] = True steer_factor = 1.2 self.car.update(dt, keys, steer_factor) def detect_curve_severity(self, ray_distances, steering_info): normalized_ray_distances = [min(distance / self.rays.max_distance, 1.0) for distance in ray_distances] longest_distance = max(normalized_ray_distances) longest_index = normalized_ray_distances.index(longest_distance) main_ray_index = 3 if longest_index == main_ray_index: return 0.0 left_adjacent_index = max(0, longest_index - 1) right_adjacent_index = min(len(normalized_ray_distances) - 1, longest_index + 1) left_adjacent_distance = normalized_ray_distances[left_adjacent_index] right_adjacent_distance = normalized_ray_distances[right_adjacent_index] if left_adjacent_distance > right_adjacent_distance: longer_adjacent_index = left_adjacent_index longer_adjacent_distance = left_adjacent_distance else: longer_adjacent_index = right_adjacent_index longer_adjacent_distance = right_adjacent_distance weight = longer_adjacent_distance / longest_distance ray_angles = [-60, -30, -15, 0, 15, 30, 60] longest_ray_angle = ray_angles[longest_index] longer_adjacent_angle = ray_angles[longer_adjacent_index] longest_pos_angle = longest_ray_angle + weight * (longer_adjacent_angle - longest_ray_angle) front_ray_angle = ray_angles[main_ray_index] angle_difference = abs(longest_pos_angle - front_ray_angle) if angle_difference >= 50: curve_severity = 1.0 else: curve_severity = angle_difference / 40.0 return curve_severity def calculate_reward(self): self.rays.update(self.car.pos, self.car.direction) ray_distances = self.rays.get_ray_distances() collision = self.rays.has_zero_length_ray() if collision: return -100.0, collision, False steering_info = self.detect_steering_need(ray_distances) reward = 0.0 curve_severity = self.detect_curve_severity(ray_distances, steering_info) center_distance = ray_distances[3] / self.rays.max_distance if center_distance > 0.25: reward += 0.25 * self.car.velocity - 6 if curve_severity > 0.1: if curve_severity > 0.5: if 0.125 < self.car.velocity < 0.25: reward += self.car.velocity / 10 else: reward += -2 elif curve_severity > 0.25: if 0.15 < self.car.velocity < 0.35: reward += self.car.velocity / 12 else: reward += -2 else: if 0.35 < self.car.velocity < 0.7: reward += self.car.velocity / 10 else: reward += -2 else: if self.car.velocity > 40: reward += self.car.velocity / 20 elif self.car.velocity < 25: reward += -0.5 if hasattr(self, 'speed_consistency_tracker'): self.speed_consistency_tracker = {'appropriate': False, 'count': 0} if steering_info['needed']: steering_urgency = steering_info['urgency'] base_steering_reward = 5.0 + (steering_urgency * 4.0) correct_action = False if steering_info['direction'] == -1: if self.last_action in [1, 3, 6]: correct_action = True if self.steering_persistence['direction'] == -1: self.steering_persistence['count'] += 2 persistence_bonus = min(3.0, self.steering_persistence['count'] * 0.5) base_steering_reward += persistence_bonus else: self.steering_persistence = {'direction': -1, 'count': 1} if curve_severity > 0.4 and self.last_action == 6: base_steering_reward += 5.0 elif steering_info['direction'] == 1: if self.last_action in [2, 4, 7]: correct_action = True if self.steering_persistence['direction'] == 1: self.steering_persistence['count'] += 2 persistence_bonus = min(3.0, self.steering_persistence['count'] * 0.5) base_steering_reward += persistence_bonus else: self.steering_persistence = {'direction': 1, 'count': 1} if curve_severity > 0.4 and self.last_action == 7: base_steering_reward += 5.0 if correct_action: reward += base_steering_reward else: self.steering_persistence = {'direction': 0, 'count': 0} if steering_urgency > 0.6: penalty = -4.0 * steering_urgency reward += penalty else: self.steering_persistence = {'direction': 0, 'count': 0} speed_factor = self.car.velocity / self.car.max_velocity min_side_distance = min([ray_distances[i] for i in [0, 1, 2, 4, 5, 6]]) / self.rays.max_distance center_distance = ray_distances[3] / self.rays.max_distance if min_side_distance < 0.01: danger_penalty = -5.0 * (1.0 - min_side_distance * 12.5) * (0.5 + speed_factor * 0.5) reward += danger_penalty elif min_side_distance < 0.0166: if speed_factor > 0.7: minor_penalty = -2 * (1.0 - min_side_distance * 6.67) reward += minor_penalty if 0.01666 <= min_side_distance and center_distance > 0.2: safety_efficiency_bonus = 0.3 if 0.3 <= speed_factor <= 0.8: safety_efficiency_bonus += 0.2 reward += safety_efficiency_bonus total_goals = len(self.map_generator.goals) if self.current_goal_index < total_goals: goal = self.map_generator.goals[self.current_goal_index] dist = point_to_line_distance(self.car.pos, goal[0], goal[1]) if dist < 8: reward += 7.5 self.passed_goals.append(self.current_goal_index) self.current_goal_index += 1 all_goals_completed = self.current_goal_index >= total_goals if all_goals_completed: reward += 20.0 reward = max(-15.0, min(reward, 12.0)) self.prev_pos = pygame.math.Vector2(self.car.pos) self.last_steering_reward = reward return reward, collision, all_goals_completed def scale_frame_for_display(frame_array): """Scale frame for display if enabled""" if not ENABLE_DISPLAY_SCALING: return frame_array # Use numpy for fast scaling scale_x = SCREEN_WIDTH // DISPLAY_WIDTH scale_y = SCREEN_HEIGHT // DISPLAY_HEIGHT if scale_x > 1 and scale_y > 1: # Downsample by taking every scale_x, scale_y pixel scaled = frame_array[::scale_y, ::scale_x] return scaled else: return frame_array def periodic_cleanup(): """Perform periodic cleanup to prevent memory buildup""" current_time = time.time() if current_time - performance_stats['last_gc'] > performance_stats['gc_interval']: gc.collect() # Force garbage collection performance_stats['last_gc'] = current_time print(f"Performed garbage collection at frame {performance_stats['frame_count']}") def optimized_game_loop(): """Optimized game loop with smart frame buffering and cleanup""" global simulation_running, stream_counter try: pygame.init() pygame.display.set_mode((1, 1), pygame.NOFRAME) screen = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) env = TrainingEnvironment(ACTION_SPACE) env.reset() agent = PPOAgent(state_size, ACTION_SPACE) agent.load(LOADED_MODEL) steps = 0 last_time = time.time() while simulation_running: current_time = time.time() # Handle pygame events (minimal processing) pygame.event.pump() # More efficient than get() # AI logic state = env.get_state() action, _ = agent.select_action(state) action_int = int(action) env.execute_action_dispatch(action_int, DT) reward, collision, all_goals_completed = env.calculate_reward() env.rays.update(env.car.pos, env.car.direction) if collision or all_goals_completed: env.reset() steps += 1 stream_counter += 1 performance_stats['frame_count'] += 1 # Only generate frames at streaming intervals if stream_counter >= stream_interval: stream_counter = 0 # Render frame using cached map surface screen.blit(env.map_surface, (0, 0)) # Blit cached map env.car.draw(screen) # Draw dynamic car # Draw rays for i, (angle, offset) in enumerate(zip([-60, -30, -15, 0, 15, 30, 60], [14, 22, 27, 26, 27, 22, 14])): if i < len(env.rays.last_collisions): ray_dir = env.car.direction.rotate(angle).normalize() origin = env.car.pos + ray_dir * offset end_point = env.rays.last_collisions[i] pygame.draw.line(screen, (255, 255, 0), (int(origin.x), int(origin.y)), (int(end_point.x), int(end_point.y)), width=2) # Convert to numpy array directly (faster than PIL) frame_array = pygame.surfarray.array3d(screen) frame_array = np.transpose(frame_array, (1, 0, 2)) # Correct orientation # NEW: Scale frame for display if enabled display_frame = scale_frame_for_display(frame_array) # Thread-safe frame buffering with buffer_lock: frame_buffer.append(display_frame.copy()) # NEW: Periodic cleanup to prevent slowdown if performance_stats['frame_count'] % 1000 == 0: # Every 1000 frames periodic_cleanup() # Maintain simulation FPS clock.tick(SIMULATION_FPS) except Exception as e: print(f"Error in game loop: {e}") simulation_running = False def get_frame(): """Get frame from buffer - much faster than PIL conversion""" try: with buffer_lock: if frame_buffer: return frame_buffer[-1] # Return latest frame else: # Return black frame as fallback if ENABLE_DISPLAY_SCALING: return np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8) else: return np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8) except Exception: if ENABLE_DISPLAY_SCALING: return np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8) else: return np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8) def start_simulation(): """Start the simulation thread""" global simulation_running, simulation_thread if not simulation_running: simulation_running = True # Reset performance stats performance_stats['frame_count'] = 0 performance_stats['last_gc'] = time.time() simulation_thread = threading.Thread(target=optimized_game_loop, daemon=True) simulation_thread.start() scale_info = f" (Display: {DISPLAY_WIDTH}x{DISPLAY_HEIGHT})" if ENABLE_DISPLAY_SCALING else " (Display: Full Resolution)" return f"Simulation Started{scale_info}" return "Simulation Already Running" def stop_simulation(): """Stop the simulation thread""" global simulation_running simulation_running = False # Clear frame buffer with buffer_lock: frame_buffer.clear() return "Simulation Stopped" def update_difficulty(input_diff_lvl): global DIFFICULTY DIFFICULTY = input_diff_lvl return f"Difficulty set to {input_diff_lvl}" def toggle_display_scaling(enabled): """Toggle display scaling on/off""" global ENABLE_DISPLAY_SCALING ENABLE_DISPLAY_SCALING = enabled scale_info = f"enabled ({DISPLAY_WIDTH}x{DISPLAY_HEIGHT})" if enabled else "disabled (full resolution)" return f"Display scaling {scale_info}" css = """ #game-image { height: auto !important; object-fit: contain; margin: 0 auto; display: block; } .gr-button { margin: 5px; } """ def create_optimized_interface(): with gr.Blocks(css=css, title="AI Car Steering Simulation") as demo: gr.Markdown("# AI Car Steering Simulation") gr.Markdown("*Simulation runs at 1600x900 internally. Display scaling can be enabled for better performance.*") with gr.Row(): with gr.Column(scale=4): img = gr.Image( type="numpy", label="Game Stream", elem_id="game-image", show_download_button=False, show_share_button=False, interactive=False, streaming=True ) with gr.Column(scale=1): gr.Markdown("### Controls") start_btn = gr.Button("Start Simulation", variant="primary") stop_btn = gr.Button("Stop Simulation", variant="secondary") gr.Markdown("### Settings") difficulty_slider = gr.Slider( minimum=1, maximum=2, step=1, value=2, label="Difficulty Level" ) # NEW: Display scaling toggle scaling_checkbox = gr.Checkbox( value=ENABLE_DISPLAY_SCALING, label=f"Enable Display Scaling ({DISPLAY_WIDTH}x{DISPLAY_HEIGHT})", info="Reduces display resolution for better performance. Simulation stays at 1600x900." ) gr.Markdown("### Status") status_text = gr.Textbox( value="Ready to start", label="Status", interactive=False ) # Event handlers start_btn.click(fn=start_simulation, outputs=status_text) stop_btn.click(fn=stop_simulation, outputs=status_text) difficulty_slider.change(fn=update_difficulty, inputs=difficulty_slider, outputs=status_text) scaling_checkbox.change(fn=toggle_display_scaling, inputs=scaling_checkbox, outputs=status_text) # Optimized timer for frame updates timer = gr.Timer(value=1.0/STREAM_FPS) # 24 FPS streaming timer.tick(fn=get_frame, outputs=img) # Configure for HF Spaces demo.queue( max_size=10, api_open=False ) return demo if __name__ == "__main__": demo = create_optimized_interface() demo.launch( share=False, show_error=True, quiet=False )