Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import pygame | |
import threading | |
import time | |
from PIL import Image | |
from car_game_classes_gradio import Car, RayCasting, MapGenerator | |
from ppo_agent_gradio import PPOAgent | |
from collections import deque | |
import gc | |
# Initialize Pygame | |
pygame.init() | |
SCREEN_WIDTH, SCREEN_HEIGHT = 1600, 900 | |
screen = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) | |
clock = pygame.time.Clock() | |
# Define constants | |
ACTION_SPACE = 8 | |
state_size = 22 | |
LOADED_MODEL = "model/18.6_car_model_epoch_270_lvl3_final.pth" | |
SIMULATION_FPS = 120 # Fixed simulation FPS, matching training | |
STREAM_FPS = 24 | |
DT = 1.0 / SIMULATION_FPS # Fixed time step | |
DIFFICULTY = 2 | |
# NEW: Display scaling option | |
ENABLE_DISPLAY_SCALING = True # Set to False to disable downscaling | |
DISPLAY_WIDTH, DISPLAY_HEIGHT = 800, 450 # Target display resolution | |
# Optimized frame buffering with deque | |
frame_buffer = deque(maxlen=2) # Only keep 2 latest frames | |
buffer_lock = threading.Lock() | |
stream_counter = 0 | |
stream_interval = SIMULATION_FPS // STREAM_FPS # Stream every ~5 simulation frames | |
# Control variables | |
simulation_running = False | |
simulation_thread = None | |
# NEW: Performance monitoring | |
performance_stats = { | |
'frame_count': 0, | |
'last_gc': time.time(), | |
'gc_interval': 30.0 # Run garbage collection every 30 seconds | |
} | |
def point_to_line_distance(point, line_p0, line_p1): | |
p = pygame.math.Vector2(point) | |
a = pygame.math.Vector2(line_p0) | |
b = pygame.math.Vector2(line_p1) | |
ap = p - a | |
ab = b - a | |
magnitude_ab = ab.length_squared() | |
if magnitude_ab == 0: | |
return ap.length() | |
projection = ap.dot(ab) / magnitude_ab | |
projection = max(0, min(1, projection)) | |
closest = a + projection * ab | |
return p.distance_to(closest) | |
class TrainingEnvironment: | |
def __init__(self, action_space=8): | |
self.action_space = action_space | |
self.map_generator = MapGenerator(SCREEN_WIDTH, SCREEN_HEIGHT, DIFFICULTY, 18) | |
car_surf = pygame.image.load("assets/car_transparent.png").convert_alpha() | |
w, h = car_surf.get_size() | |
self.car_image = pygame.transform.scale(car_surf, (w//18, h//18)) | |
self.start_pos = None | |
self.start_dir = None | |
self.previous_speed = 0.0 | |
self.speed_history = deque(maxlen=5) | |
self.steering_history = deque(maxlen=10) | |
self.steering_persistence = {'direction': 0, 'count': 0} | |
self.last_steering_reward = 0.0 | |
self.prev_pos = None | |
self.passed_goals = [] | |
self.last_action = 0 | |
# NEW: Cached map surface | |
self.map_surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) | |
self.reset() | |
def generate_new_map(self): | |
start_pos, start_dir = self.map_generator.update() | |
self.start_pos = pygame.math.Vector2(start_pos) | |
self.start_dir = pygame.math.Vector2(start_dir).normalize() | |
# NEW: Draw map onto cached surface once after generation | |
self.map_surface.fill((255, 255, 255)) # Clear previous map | |
self.map_generator.draw(self.map_surface) # Render map to cached surface | |
def reset_car(self): | |
if self.start_pos is None or self.start_dir is None: | |
raise ValueError("Map not generated yet") | |
self.car = Car(self.car_image, self.start_pos) | |
self.car.direction = self.start_dir | |
self.rays = RayCasting( | |
self.car.pos, self.car.direction, self.map_generator.surface, | |
[-60, -30, -15, 0, 15, 30, 60], | |
[14, 22, 27, 26, 27, 22, 14], | |
draw_rays=False) | |
self.current_goal_index = 0 | |
self.passed_goals = [] | |
self.prev_pos = self.car.pos.copy() | |
self.previous_speed = 0.0 | |
self.speed_history.clear() | |
self.steering_history.clear() | |
self.steering_persistence = {'direction': 0, 'count': 0} | |
self.last_steering_reward = 0.0 | |
self.last_action = 0 | |
def reset(self): | |
self.map_generator.difficulty = DIFFICULTY | |
self.generate_new_map() | |
self.reset_car() | |
def detect_steering_need(self, ray_distances): | |
norm_distances = [min(d / self.rays.max_distance, 1.0) for d in ray_distances] | |
left_rays = norm_distances[0:3] | |
center_ray = norm_distances[3] | |
right_rays = norm_distances[4:7] | |
left_avg = sum(left_rays) / len(left_rays) | |
right_avg = sum(right_rays) / len(right_rays) | |
space_difference = right_avg - left_avg | |
steering_needed = False | |
preferred_direction = 0 | |
urgency = 0.0 | |
min_side_distance = min(min(left_rays), min(right_rays)) | |
if min_side_distance < 0.3: | |
steering_needed = True | |
urgency = 1.0 - min_side_distance | |
preferred_direction = 1 if left_avg < right_avg else -1 | |
elif abs(space_difference) > 0.15: | |
steering_needed = True | |
urgency = min(abs(space_difference), 1.0) | |
preferred_direction = 1 if space_difference > 0 else -1 | |
elif center_ray < 0.4: | |
steering_needed = True | |
urgency = 1.0 - center_ray | |
preferred_direction = 1 if left_avg < right_avg else -1 | |
return { | |
'needed': steering_needed, | |
'direction': preferred_direction, | |
'urgency': urgency, | |
'space_diff': space_difference, | |
'left_space': left_avg, | |
'right_space': right_avg, | |
'center_distance': center_ray | |
} | |
def get_state(self): | |
self.rays.update(self.car.pos, self.car.direction) | |
ray_distances = self.rays.get_ray_distances() | |
normalized_ray_distances = [min(distance / self.rays.max_distance, 1.0) for distance in ray_distances] | |
steering_info = self.detect_steering_need(ray_distances) | |
state = [ | |
self.car.velocity / self.car.max_velocity, | |
self.car.direction.x, | |
self.car.direction.y, | |
int(self.car.is_braking), | |
] | |
state.extend(normalized_ray_distances) | |
state.extend([ | |
float(steering_info['needed']), | |
steering_info['direction'], | |
steering_info['urgency'], | |
steering_info['space_diff'], | |
steering_info['left_space'], | |
steering_info['right_space'], | |
steering_info['center_distance'] | |
]) | |
total_goals = len(self.map_generator.goals) | |
progress = self.current_goal_index / max(total_goals, 1) | |
state.append(progress) | |
state.append(self.last_action / 4.0) | |
speed_change = self.car.velocity - self.previous_speed | |
self.speed_history.append(speed_change) | |
avg_speed_change = sum(self.speed_history) / max(len(self.speed_history), 1) | |
state.extend([speed_change / 100.0, avg_speed_change / 100.0]) | |
self.previous_speed = self.car.velocity | |
return np.array(state, dtype=np.float32) | |
def execute_action_dispatch(self, action, dt): | |
action_int = int(action) | |
self.last_action = action_int | |
keys = {'w': False, 'a': False, 'd': False, 's': False} | |
steer_factor = 1.0 | |
if action_int == 0: | |
keys['w'] = True | |
elif action_int == 1: | |
keys['a'] = True | |
steer_factor = 1.0 | |
elif action_int == 2: | |
keys['d'] = True | |
steer_factor = 1.0 | |
elif action_int == 3: | |
keys['w'] = True | |
keys['a'] = True | |
steer_factor = 0.8 | |
elif action_int == 4: | |
keys['w'] = True | |
keys['d'] = True | |
steer_factor = 0.8 | |
elif action_int == 5: | |
keys['s'] = True | |
original_brake = self.car.brake_deceleration | |
self.car.brake_deceleration *= 0.6 | |
self.car.update(dt, keys, steer_factor) | |
self.car.brake_deceleration = original_brake | |
return | |
elif action_int == 6: | |
keys['s'] = True | |
keys['a'] = True | |
steer_factor = 1.2 | |
elif action_int == 7: | |
keys['s'] = True | |
keys['d'] = True | |
steer_factor = 1.2 | |
self.car.update(dt, keys, steer_factor) | |
def detect_curve_severity(self, ray_distances, steering_info): | |
normalized_ray_distances = [min(distance / self.rays.max_distance, 1.0) for distance in ray_distances] | |
longest_distance = max(normalized_ray_distances) | |
longest_index = normalized_ray_distances.index(longest_distance) | |
main_ray_index = 3 | |
if longest_index == main_ray_index: | |
return 0.0 | |
left_adjacent_index = max(0, longest_index - 1) | |
right_adjacent_index = min(len(normalized_ray_distances) - 1, longest_index + 1) | |
left_adjacent_distance = normalized_ray_distances[left_adjacent_index] | |
right_adjacent_distance = normalized_ray_distances[right_adjacent_index] | |
if left_adjacent_distance > right_adjacent_distance: | |
longer_adjacent_index = left_adjacent_index | |
longer_adjacent_distance = left_adjacent_distance | |
else: | |
longer_adjacent_index = right_adjacent_index | |
longer_adjacent_distance = right_adjacent_distance | |
weight = longer_adjacent_distance / longest_distance | |
ray_angles = [-60, -30, -15, 0, 15, 30, 60] | |
longest_ray_angle = ray_angles[longest_index] | |
longer_adjacent_angle = ray_angles[longer_adjacent_index] | |
longest_pos_angle = longest_ray_angle + weight * (longer_adjacent_angle - longest_ray_angle) | |
front_ray_angle = ray_angles[main_ray_index] | |
angle_difference = abs(longest_pos_angle - front_ray_angle) | |
if angle_difference >= 50: | |
curve_severity = 1.0 | |
else: | |
curve_severity = angle_difference / 40.0 | |
return curve_severity | |
def calculate_reward(self): | |
self.rays.update(self.car.pos, self.car.direction) | |
ray_distances = self.rays.get_ray_distances() | |
collision = self.rays.has_zero_length_ray() | |
if collision: | |
return -100.0, collision, False | |
steering_info = self.detect_steering_need(ray_distances) | |
reward = 0.0 | |
curve_severity = self.detect_curve_severity(ray_distances, steering_info) | |
center_distance = ray_distances[3] / self.rays.max_distance | |
if center_distance > 0.25: | |
reward += 0.25 * self.car.velocity - 6 | |
if curve_severity > 0.1: | |
if curve_severity > 0.5: | |
if 0.125 < self.car.velocity < 0.25: | |
reward += self.car.velocity / 10 | |
else: | |
reward += -2 | |
elif curve_severity > 0.25: | |
if 0.15 < self.car.velocity < 0.35: | |
reward += self.car.velocity / 12 | |
else: | |
reward += -2 | |
else: | |
if 0.35 < self.car.velocity < 0.7: | |
reward += self.car.velocity / 10 | |
else: | |
reward += -2 | |
else: | |
if self.car.velocity > 40: | |
reward += self.car.velocity / 20 | |
elif self.car.velocity < 25: | |
reward += -0.5 | |
if hasattr(self, 'speed_consistency_tracker'): | |
self.speed_consistency_tracker = {'appropriate': False, 'count': 0} | |
if steering_info['needed']: | |
steering_urgency = steering_info['urgency'] | |
base_steering_reward = 5.0 + (steering_urgency * 4.0) | |
correct_action = False | |
if steering_info['direction'] == -1: | |
if self.last_action in [1, 3, 6]: | |
correct_action = True | |
if self.steering_persistence['direction'] == -1: | |
self.steering_persistence['count'] += 2 | |
persistence_bonus = min(3.0, self.steering_persistence['count'] * 0.5) | |
base_steering_reward += persistence_bonus | |
else: | |
self.steering_persistence = {'direction': -1, 'count': 1} | |
if curve_severity > 0.4 and self.last_action == 6: | |
base_steering_reward += 5.0 | |
elif steering_info['direction'] == 1: | |
if self.last_action in [2, 4, 7]: | |
correct_action = True | |
if self.steering_persistence['direction'] == 1: | |
self.steering_persistence['count'] += 2 | |
persistence_bonus = min(3.0, self.steering_persistence['count'] * 0.5) | |
base_steering_reward += persistence_bonus | |
else: | |
self.steering_persistence = {'direction': 1, 'count': 1} | |
if curve_severity > 0.4 and self.last_action == 7: | |
base_steering_reward += 5.0 | |
if correct_action: | |
reward += base_steering_reward | |
else: | |
self.steering_persistence = {'direction': 0, 'count': 0} | |
if steering_urgency > 0.6: | |
penalty = -4.0 * steering_urgency | |
reward += penalty | |
else: | |
self.steering_persistence = {'direction': 0, 'count': 0} | |
speed_factor = self.car.velocity / self.car.max_velocity | |
min_side_distance = min([ray_distances[i] for i in [0, 1, 2, 4, 5, 6]]) / self.rays.max_distance | |
center_distance = ray_distances[3] / self.rays.max_distance | |
if min_side_distance < 0.01: | |
danger_penalty = -5.0 * (1.0 - min_side_distance * 12.5) * (0.5 + speed_factor * 0.5) | |
reward += danger_penalty | |
elif min_side_distance < 0.0166: | |
if speed_factor > 0.7: | |
minor_penalty = -2 * (1.0 - min_side_distance * 6.67) | |
reward += minor_penalty | |
if 0.01666 <= min_side_distance and center_distance > 0.2: | |
safety_efficiency_bonus = 0.3 | |
if 0.3 <= speed_factor <= 0.8: | |
safety_efficiency_bonus += 0.2 | |
reward += safety_efficiency_bonus | |
total_goals = len(self.map_generator.goals) | |
if self.current_goal_index < total_goals: | |
goal = self.map_generator.goals[self.current_goal_index] | |
dist = point_to_line_distance(self.car.pos, goal[0], goal[1]) | |
if dist < 8: | |
reward += 7.5 | |
self.passed_goals.append(self.current_goal_index) | |
self.current_goal_index += 1 | |
all_goals_completed = self.current_goal_index >= total_goals | |
if all_goals_completed: | |
reward += 20.0 | |
reward = max(-15.0, min(reward, 12.0)) | |
self.prev_pos = pygame.math.Vector2(self.car.pos) | |
self.last_steering_reward = reward | |
return reward, collision, all_goals_completed | |
def scale_frame_for_display(frame_array): | |
"""Scale frame for display if enabled""" | |
if not ENABLE_DISPLAY_SCALING: | |
return frame_array | |
# Use numpy for fast scaling | |
scale_x = SCREEN_WIDTH // DISPLAY_WIDTH | |
scale_y = SCREEN_HEIGHT // DISPLAY_HEIGHT | |
if scale_x > 1 and scale_y > 1: | |
# Downsample by taking every scale_x, scale_y pixel | |
scaled = frame_array[::scale_y, ::scale_x] | |
return scaled | |
else: | |
return frame_array | |
def periodic_cleanup(): | |
"""Perform periodic cleanup to prevent memory buildup""" | |
current_time = time.time() | |
if current_time - performance_stats['last_gc'] > performance_stats['gc_interval']: | |
gc.collect() # Force garbage collection | |
performance_stats['last_gc'] = current_time | |
print(f"Performed garbage collection at frame {performance_stats['frame_count']}") | |
def optimized_game_loop(): | |
"""Optimized game loop with smart frame buffering and cleanup""" | |
global simulation_running, stream_counter | |
try: | |
pygame.init() | |
pygame.display.set_mode((1, 1), pygame.NOFRAME) | |
screen = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT)) | |
env = TrainingEnvironment(ACTION_SPACE) | |
env.reset() | |
agent = PPOAgent(state_size, ACTION_SPACE) | |
agent.load(LOADED_MODEL) | |
steps = 0 | |
last_time = time.time() | |
while simulation_running: | |
current_time = time.time() | |
# Handle pygame events (minimal processing) | |
pygame.event.pump() # More efficient than get() | |
# AI logic | |
state = env.get_state() | |
action, _ = agent.select_action(state) | |
action_int = int(action) | |
env.execute_action_dispatch(action_int, DT) | |
reward, collision, all_goals_completed = env.calculate_reward() | |
env.rays.update(env.car.pos, env.car.direction) | |
if collision or all_goals_completed: | |
env.reset() | |
steps += 1 | |
stream_counter += 1 | |
performance_stats['frame_count'] += 1 | |
# Only generate frames at streaming intervals | |
if stream_counter >= stream_interval: | |
stream_counter = 0 | |
# Render frame using cached map surface | |
screen.blit(env.map_surface, (0, 0)) # Blit cached map | |
env.car.draw(screen) # Draw dynamic car | |
# Draw rays | |
for i, (angle, offset) in enumerate(zip([-60, -30, -15, 0, 15, 30, 60], [14, 22, 27, 26, 27, 22, 14])): | |
if i < len(env.rays.last_collisions): | |
ray_dir = env.car.direction.rotate(angle).normalize() | |
origin = env.car.pos + ray_dir * offset | |
end_point = env.rays.last_collisions[i] | |
pygame.draw.line(screen, (255, 255, 0), | |
(int(origin.x), int(origin.y)), | |
(int(end_point.x), int(end_point.y)), | |
width=2) | |
# Convert to numpy array directly (faster than PIL) | |
frame_array = pygame.surfarray.array3d(screen) | |
frame_array = np.transpose(frame_array, (1, 0, 2)) # Correct orientation | |
# NEW: Scale frame for display if enabled | |
display_frame = scale_frame_for_display(frame_array) | |
# Thread-safe frame buffering | |
with buffer_lock: | |
frame_buffer.append(display_frame.copy()) | |
# NEW: Periodic cleanup to prevent slowdown | |
if performance_stats['frame_count'] % 1000 == 0: # Every 1000 frames | |
periodic_cleanup() | |
# Maintain simulation FPS | |
clock.tick(SIMULATION_FPS) | |
except Exception as e: | |
print(f"Error in game loop: {e}") | |
simulation_running = False | |
def get_frame(): | |
"""Get frame from buffer - much faster than PIL conversion""" | |
try: | |
with buffer_lock: | |
if frame_buffer: | |
return frame_buffer[-1] # Return latest frame | |
else: | |
# Return black frame as fallback | |
if ENABLE_DISPLAY_SCALING: | |
return np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8) | |
else: | |
return np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8) | |
except Exception: | |
if ENABLE_DISPLAY_SCALING: | |
return np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8) | |
else: | |
return np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8) | |
def start_simulation(): | |
"""Start the simulation thread""" | |
global simulation_running, simulation_thread | |
if not simulation_running: | |
simulation_running = True | |
# Reset performance stats | |
performance_stats['frame_count'] = 0 | |
performance_stats['last_gc'] = time.time() | |
simulation_thread = threading.Thread(target=optimized_game_loop, daemon=True) | |
simulation_thread.start() | |
scale_info = f" (Display: {DISPLAY_WIDTH}x{DISPLAY_HEIGHT})" if ENABLE_DISPLAY_SCALING else " (Display: Full Resolution)" | |
return f"Simulation Started{scale_info}" | |
return "Simulation Already Running" | |
def stop_simulation(): | |
"""Stop the simulation thread""" | |
global simulation_running | |
simulation_running = False | |
# Clear frame buffer | |
with buffer_lock: | |
frame_buffer.clear() | |
return "Simulation Stopped" | |
def update_difficulty(input_diff_lvl): | |
global DIFFICULTY | |
DIFFICULTY = input_diff_lvl | |
return f"Difficulty set to {input_diff_lvl}" | |
def toggle_display_scaling(enabled): | |
"""Toggle display scaling on/off""" | |
global ENABLE_DISPLAY_SCALING | |
ENABLE_DISPLAY_SCALING = enabled | |
scale_info = f"enabled ({DISPLAY_WIDTH}x{DISPLAY_HEIGHT})" if enabled else "disabled (full resolution)" | |
return f"Display scaling {scale_info}" | |
css = """ | |
#game-image { | |
height: auto !important; | |
object-fit: contain; | |
margin: 0 auto; | |
display: block; | |
} | |
.gr-button { | |
margin: 5px; | |
} | |
""" | |
def create_optimized_interface(): | |
with gr.Blocks(css=css, title="AI Car Steering Simulation") as demo: | |
gr.Markdown("# AI Car Steering Simulation") | |
gr.Markdown("*Simulation runs at 1600x900 internally. Display scaling can be enabled for better performance.*") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
img = gr.Image( | |
type="numpy", | |
label="Game Stream", | |
elem_id="game-image", | |
show_download_button=False, | |
show_share_button=False, | |
interactive=False, | |
streaming=True | |
) | |
with gr.Column(scale=1): | |
gr.Markdown("### Controls") | |
start_btn = gr.Button("Start Simulation", variant="primary") | |
stop_btn = gr.Button("Stop Simulation", variant="secondary") | |
gr.Markdown("### Settings") | |
difficulty_slider = gr.Slider( | |
minimum=1, | |
maximum=2, | |
step=1, | |
value=2, | |
label="Difficulty Level" | |
) | |
# NEW: Display scaling toggle | |
scaling_checkbox = gr.Checkbox( | |
value=ENABLE_DISPLAY_SCALING, | |
label=f"Enable Display Scaling ({DISPLAY_WIDTH}x{DISPLAY_HEIGHT})", | |
info="Reduces display resolution for better performance. Simulation stays at 1600x900." | |
) | |
gr.Markdown("### Status") | |
status_text = gr.Textbox( | |
value="Ready to start", | |
label="Status", | |
interactive=False | |
) | |
# Event handlers | |
start_btn.click(fn=start_simulation, outputs=status_text) | |
stop_btn.click(fn=stop_simulation, outputs=status_text) | |
difficulty_slider.change(fn=update_difficulty, inputs=difficulty_slider, outputs=status_text) | |
scaling_checkbox.change(fn=toggle_display_scaling, inputs=scaling_checkbox, outputs=status_text) | |
# Optimized timer for frame updates | |
timer = gr.Timer(value=1.0/STREAM_FPS) # 24 FPS streaming | |
timer.tick(fn=get_frame, outputs=img) | |
# Configure for HF Spaces | |
demo.queue( | |
max_size=10, | |
api_open=False | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_optimized_interface() | |
demo.launch( | |
share=False, | |
show_error=True, | |
quiet=False | |
) |