RL_Car_Agent / app.py
IncreasingLoss's picture
Upload 3 files
9c0402d verified
import gradio as gr
import numpy as np
import pygame
import threading
import time
from PIL import Image
from car_game_classes_gradio import Car, RayCasting, MapGenerator
from ppo_agent_gradio import PPOAgent
from collections import deque
import gc
# Initialize Pygame
pygame.init()
SCREEN_WIDTH, SCREEN_HEIGHT = 1600, 900
screen = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT))
clock = pygame.time.Clock()
# Define constants
ACTION_SPACE = 8
state_size = 22
LOADED_MODEL = "model/18.6_car_model_epoch_270_lvl3_final.pth"
SIMULATION_FPS = 120 # Fixed simulation FPS, matching training
STREAM_FPS = 24
DT = 1.0 / SIMULATION_FPS # Fixed time step
DIFFICULTY = 2
# NEW: Display scaling option
ENABLE_DISPLAY_SCALING = True # Set to False to disable downscaling
DISPLAY_WIDTH, DISPLAY_HEIGHT = 800, 450 # Target display resolution
# Optimized frame buffering with deque
frame_buffer = deque(maxlen=2) # Only keep 2 latest frames
buffer_lock = threading.Lock()
stream_counter = 0
stream_interval = SIMULATION_FPS // STREAM_FPS # Stream every ~5 simulation frames
# Control variables
simulation_running = False
simulation_thread = None
# NEW: Performance monitoring
performance_stats = {
'frame_count': 0,
'last_gc': time.time(),
'gc_interval': 30.0 # Run garbage collection every 30 seconds
}
def point_to_line_distance(point, line_p0, line_p1):
p = pygame.math.Vector2(point)
a = pygame.math.Vector2(line_p0)
b = pygame.math.Vector2(line_p1)
ap = p - a
ab = b - a
magnitude_ab = ab.length_squared()
if magnitude_ab == 0:
return ap.length()
projection = ap.dot(ab) / magnitude_ab
projection = max(0, min(1, projection))
closest = a + projection * ab
return p.distance_to(closest)
class TrainingEnvironment:
def __init__(self, action_space=8):
self.action_space = action_space
self.map_generator = MapGenerator(SCREEN_WIDTH, SCREEN_HEIGHT, DIFFICULTY, 18)
car_surf = pygame.image.load("assets/car_transparent.png").convert_alpha()
w, h = car_surf.get_size()
self.car_image = pygame.transform.scale(car_surf, (w//18, h//18))
self.start_pos = None
self.start_dir = None
self.previous_speed = 0.0
self.speed_history = deque(maxlen=5)
self.steering_history = deque(maxlen=10)
self.steering_persistence = {'direction': 0, 'count': 0}
self.last_steering_reward = 0.0
self.prev_pos = None
self.passed_goals = []
self.last_action = 0
# NEW: Cached map surface
self.map_surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT))
self.reset()
def generate_new_map(self):
start_pos, start_dir = self.map_generator.update()
self.start_pos = pygame.math.Vector2(start_pos)
self.start_dir = pygame.math.Vector2(start_dir).normalize()
# NEW: Draw map onto cached surface once after generation
self.map_surface.fill((255, 255, 255)) # Clear previous map
self.map_generator.draw(self.map_surface) # Render map to cached surface
def reset_car(self):
if self.start_pos is None or self.start_dir is None:
raise ValueError("Map not generated yet")
self.car = Car(self.car_image, self.start_pos)
self.car.direction = self.start_dir
self.rays = RayCasting(
self.car.pos, self.car.direction, self.map_generator.surface,
[-60, -30, -15, 0, 15, 30, 60],
[14, 22, 27, 26, 27, 22, 14],
draw_rays=False)
self.current_goal_index = 0
self.passed_goals = []
self.prev_pos = self.car.pos.copy()
self.previous_speed = 0.0
self.speed_history.clear()
self.steering_history.clear()
self.steering_persistence = {'direction': 0, 'count': 0}
self.last_steering_reward = 0.0
self.last_action = 0
def reset(self):
self.map_generator.difficulty = DIFFICULTY
self.generate_new_map()
self.reset_car()
def detect_steering_need(self, ray_distances):
norm_distances = [min(d / self.rays.max_distance, 1.0) for d in ray_distances]
left_rays = norm_distances[0:3]
center_ray = norm_distances[3]
right_rays = norm_distances[4:7]
left_avg = sum(left_rays) / len(left_rays)
right_avg = sum(right_rays) / len(right_rays)
space_difference = right_avg - left_avg
steering_needed = False
preferred_direction = 0
urgency = 0.0
min_side_distance = min(min(left_rays), min(right_rays))
if min_side_distance < 0.3:
steering_needed = True
urgency = 1.0 - min_side_distance
preferred_direction = 1 if left_avg < right_avg else -1
elif abs(space_difference) > 0.15:
steering_needed = True
urgency = min(abs(space_difference), 1.0)
preferred_direction = 1 if space_difference > 0 else -1
elif center_ray < 0.4:
steering_needed = True
urgency = 1.0 - center_ray
preferred_direction = 1 if left_avg < right_avg else -1
return {
'needed': steering_needed,
'direction': preferred_direction,
'urgency': urgency,
'space_diff': space_difference,
'left_space': left_avg,
'right_space': right_avg,
'center_distance': center_ray
}
def get_state(self):
self.rays.update(self.car.pos, self.car.direction)
ray_distances = self.rays.get_ray_distances()
normalized_ray_distances = [min(distance / self.rays.max_distance, 1.0) for distance in ray_distances]
steering_info = self.detect_steering_need(ray_distances)
state = [
self.car.velocity / self.car.max_velocity,
self.car.direction.x,
self.car.direction.y,
int(self.car.is_braking),
]
state.extend(normalized_ray_distances)
state.extend([
float(steering_info['needed']),
steering_info['direction'],
steering_info['urgency'],
steering_info['space_diff'],
steering_info['left_space'],
steering_info['right_space'],
steering_info['center_distance']
])
total_goals = len(self.map_generator.goals)
progress = self.current_goal_index / max(total_goals, 1)
state.append(progress)
state.append(self.last_action / 4.0)
speed_change = self.car.velocity - self.previous_speed
self.speed_history.append(speed_change)
avg_speed_change = sum(self.speed_history) / max(len(self.speed_history), 1)
state.extend([speed_change / 100.0, avg_speed_change / 100.0])
self.previous_speed = self.car.velocity
return np.array(state, dtype=np.float32)
def execute_action_dispatch(self, action, dt):
action_int = int(action)
self.last_action = action_int
keys = {'w': False, 'a': False, 'd': False, 's': False}
steer_factor = 1.0
if action_int == 0:
keys['w'] = True
elif action_int == 1:
keys['a'] = True
steer_factor = 1.0
elif action_int == 2:
keys['d'] = True
steer_factor = 1.0
elif action_int == 3:
keys['w'] = True
keys['a'] = True
steer_factor = 0.8
elif action_int == 4:
keys['w'] = True
keys['d'] = True
steer_factor = 0.8
elif action_int == 5:
keys['s'] = True
original_brake = self.car.brake_deceleration
self.car.brake_deceleration *= 0.6
self.car.update(dt, keys, steer_factor)
self.car.brake_deceleration = original_brake
return
elif action_int == 6:
keys['s'] = True
keys['a'] = True
steer_factor = 1.2
elif action_int == 7:
keys['s'] = True
keys['d'] = True
steer_factor = 1.2
self.car.update(dt, keys, steer_factor)
def detect_curve_severity(self, ray_distances, steering_info):
normalized_ray_distances = [min(distance / self.rays.max_distance, 1.0) for distance in ray_distances]
longest_distance = max(normalized_ray_distances)
longest_index = normalized_ray_distances.index(longest_distance)
main_ray_index = 3
if longest_index == main_ray_index:
return 0.0
left_adjacent_index = max(0, longest_index - 1)
right_adjacent_index = min(len(normalized_ray_distances) - 1, longest_index + 1)
left_adjacent_distance = normalized_ray_distances[left_adjacent_index]
right_adjacent_distance = normalized_ray_distances[right_adjacent_index]
if left_adjacent_distance > right_adjacent_distance:
longer_adjacent_index = left_adjacent_index
longer_adjacent_distance = left_adjacent_distance
else:
longer_adjacent_index = right_adjacent_index
longer_adjacent_distance = right_adjacent_distance
weight = longer_adjacent_distance / longest_distance
ray_angles = [-60, -30, -15, 0, 15, 30, 60]
longest_ray_angle = ray_angles[longest_index]
longer_adjacent_angle = ray_angles[longer_adjacent_index]
longest_pos_angle = longest_ray_angle + weight * (longer_adjacent_angle - longest_ray_angle)
front_ray_angle = ray_angles[main_ray_index]
angle_difference = abs(longest_pos_angle - front_ray_angle)
if angle_difference >= 50:
curve_severity = 1.0
else:
curve_severity = angle_difference / 40.0
return curve_severity
def calculate_reward(self):
self.rays.update(self.car.pos, self.car.direction)
ray_distances = self.rays.get_ray_distances()
collision = self.rays.has_zero_length_ray()
if collision:
return -100.0, collision, False
steering_info = self.detect_steering_need(ray_distances)
reward = 0.0
curve_severity = self.detect_curve_severity(ray_distances, steering_info)
center_distance = ray_distances[3] / self.rays.max_distance
if center_distance > 0.25:
reward += 0.25 * self.car.velocity - 6
if curve_severity > 0.1:
if curve_severity > 0.5:
if 0.125 < self.car.velocity < 0.25:
reward += self.car.velocity / 10
else:
reward += -2
elif curve_severity > 0.25:
if 0.15 < self.car.velocity < 0.35:
reward += self.car.velocity / 12
else:
reward += -2
else:
if 0.35 < self.car.velocity < 0.7:
reward += self.car.velocity / 10
else:
reward += -2
else:
if self.car.velocity > 40:
reward += self.car.velocity / 20
elif self.car.velocity < 25:
reward += -0.5
if hasattr(self, 'speed_consistency_tracker'):
self.speed_consistency_tracker = {'appropriate': False, 'count': 0}
if steering_info['needed']:
steering_urgency = steering_info['urgency']
base_steering_reward = 5.0 + (steering_urgency * 4.0)
correct_action = False
if steering_info['direction'] == -1:
if self.last_action in [1, 3, 6]:
correct_action = True
if self.steering_persistence['direction'] == -1:
self.steering_persistence['count'] += 2
persistence_bonus = min(3.0, self.steering_persistence['count'] * 0.5)
base_steering_reward += persistence_bonus
else:
self.steering_persistence = {'direction': -1, 'count': 1}
if curve_severity > 0.4 and self.last_action == 6:
base_steering_reward += 5.0
elif steering_info['direction'] == 1:
if self.last_action in [2, 4, 7]:
correct_action = True
if self.steering_persistence['direction'] == 1:
self.steering_persistence['count'] += 2
persistence_bonus = min(3.0, self.steering_persistence['count'] * 0.5)
base_steering_reward += persistence_bonus
else:
self.steering_persistence = {'direction': 1, 'count': 1}
if curve_severity > 0.4 and self.last_action == 7:
base_steering_reward += 5.0
if correct_action:
reward += base_steering_reward
else:
self.steering_persistence = {'direction': 0, 'count': 0}
if steering_urgency > 0.6:
penalty = -4.0 * steering_urgency
reward += penalty
else:
self.steering_persistence = {'direction': 0, 'count': 0}
speed_factor = self.car.velocity / self.car.max_velocity
min_side_distance = min([ray_distances[i] for i in [0, 1, 2, 4, 5, 6]]) / self.rays.max_distance
center_distance = ray_distances[3] / self.rays.max_distance
if min_side_distance < 0.01:
danger_penalty = -5.0 * (1.0 - min_side_distance * 12.5) * (0.5 + speed_factor * 0.5)
reward += danger_penalty
elif min_side_distance < 0.0166:
if speed_factor > 0.7:
minor_penalty = -2 * (1.0 - min_side_distance * 6.67)
reward += minor_penalty
if 0.01666 <= min_side_distance and center_distance > 0.2:
safety_efficiency_bonus = 0.3
if 0.3 <= speed_factor <= 0.8:
safety_efficiency_bonus += 0.2
reward += safety_efficiency_bonus
total_goals = len(self.map_generator.goals)
if self.current_goal_index < total_goals:
goal = self.map_generator.goals[self.current_goal_index]
dist = point_to_line_distance(self.car.pos, goal[0], goal[1])
if dist < 8:
reward += 7.5
self.passed_goals.append(self.current_goal_index)
self.current_goal_index += 1
all_goals_completed = self.current_goal_index >= total_goals
if all_goals_completed:
reward += 20.0
reward = max(-15.0, min(reward, 12.0))
self.prev_pos = pygame.math.Vector2(self.car.pos)
self.last_steering_reward = reward
return reward, collision, all_goals_completed
def scale_frame_for_display(frame_array):
"""Scale frame for display if enabled"""
if not ENABLE_DISPLAY_SCALING:
return frame_array
# Use numpy for fast scaling
scale_x = SCREEN_WIDTH // DISPLAY_WIDTH
scale_y = SCREEN_HEIGHT // DISPLAY_HEIGHT
if scale_x > 1 and scale_y > 1:
# Downsample by taking every scale_x, scale_y pixel
scaled = frame_array[::scale_y, ::scale_x]
return scaled
else:
return frame_array
def periodic_cleanup():
"""Perform periodic cleanup to prevent memory buildup"""
current_time = time.time()
if current_time - performance_stats['last_gc'] > performance_stats['gc_interval']:
gc.collect() # Force garbage collection
performance_stats['last_gc'] = current_time
print(f"Performed garbage collection at frame {performance_stats['frame_count']}")
def optimized_game_loop():
"""Optimized game loop with smart frame buffering and cleanup"""
global simulation_running, stream_counter
try:
pygame.init()
pygame.display.set_mode((1, 1), pygame.NOFRAME)
screen = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT))
env = TrainingEnvironment(ACTION_SPACE)
env.reset()
agent = PPOAgent(state_size, ACTION_SPACE)
agent.load(LOADED_MODEL)
steps = 0
last_time = time.time()
while simulation_running:
current_time = time.time()
# Handle pygame events (minimal processing)
pygame.event.pump() # More efficient than get()
# AI logic
state = env.get_state()
action, _ = agent.select_action(state)
action_int = int(action)
env.execute_action_dispatch(action_int, DT)
reward, collision, all_goals_completed = env.calculate_reward()
env.rays.update(env.car.pos, env.car.direction)
if collision or all_goals_completed:
env.reset()
steps += 1
stream_counter += 1
performance_stats['frame_count'] += 1
# Only generate frames at streaming intervals
if stream_counter >= stream_interval:
stream_counter = 0
# Render frame using cached map surface
screen.blit(env.map_surface, (0, 0)) # Blit cached map
env.car.draw(screen) # Draw dynamic car
# Draw rays
for i, (angle, offset) in enumerate(zip([-60, -30, -15, 0, 15, 30, 60], [14, 22, 27, 26, 27, 22, 14])):
if i < len(env.rays.last_collisions):
ray_dir = env.car.direction.rotate(angle).normalize()
origin = env.car.pos + ray_dir * offset
end_point = env.rays.last_collisions[i]
pygame.draw.line(screen, (255, 255, 0),
(int(origin.x), int(origin.y)),
(int(end_point.x), int(end_point.y)),
width=2)
# Convert to numpy array directly (faster than PIL)
frame_array = pygame.surfarray.array3d(screen)
frame_array = np.transpose(frame_array, (1, 0, 2)) # Correct orientation
# NEW: Scale frame for display if enabled
display_frame = scale_frame_for_display(frame_array)
# Thread-safe frame buffering
with buffer_lock:
frame_buffer.append(display_frame.copy())
# NEW: Periodic cleanup to prevent slowdown
if performance_stats['frame_count'] % 1000 == 0: # Every 1000 frames
periodic_cleanup()
# Maintain simulation FPS
clock.tick(SIMULATION_FPS)
except Exception as e:
print(f"Error in game loop: {e}")
simulation_running = False
def get_frame():
"""Get frame from buffer - much faster than PIL conversion"""
try:
with buffer_lock:
if frame_buffer:
return frame_buffer[-1] # Return latest frame
else:
# Return black frame as fallback
if ENABLE_DISPLAY_SCALING:
return np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8)
else:
return np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8)
except Exception:
if ENABLE_DISPLAY_SCALING:
return np.zeros((DISPLAY_HEIGHT, DISPLAY_WIDTH, 3), dtype=np.uint8)
else:
return np.zeros((SCREEN_HEIGHT, SCREEN_WIDTH, 3), dtype=np.uint8)
def start_simulation():
"""Start the simulation thread"""
global simulation_running, simulation_thread
if not simulation_running:
simulation_running = True
# Reset performance stats
performance_stats['frame_count'] = 0
performance_stats['last_gc'] = time.time()
simulation_thread = threading.Thread(target=optimized_game_loop, daemon=True)
simulation_thread.start()
scale_info = f" (Display: {DISPLAY_WIDTH}x{DISPLAY_HEIGHT})" if ENABLE_DISPLAY_SCALING else " (Display: Full Resolution)"
return f"Simulation Started{scale_info}"
return "Simulation Already Running"
def stop_simulation():
"""Stop the simulation thread"""
global simulation_running
simulation_running = False
# Clear frame buffer
with buffer_lock:
frame_buffer.clear()
return "Simulation Stopped"
def update_difficulty(input_diff_lvl):
global DIFFICULTY
DIFFICULTY = input_diff_lvl
return f"Difficulty set to {input_diff_lvl}"
def toggle_display_scaling(enabled):
"""Toggle display scaling on/off"""
global ENABLE_DISPLAY_SCALING
ENABLE_DISPLAY_SCALING = enabled
scale_info = f"enabled ({DISPLAY_WIDTH}x{DISPLAY_HEIGHT})" if enabled else "disabled (full resolution)"
return f"Display scaling {scale_info}"
css = """
#game-image {
height: auto !important;
object-fit: contain;
margin: 0 auto;
display: block;
}
.gr-button {
margin: 5px;
}
"""
def create_optimized_interface():
with gr.Blocks(css=css, title="AI Car Steering Simulation") as demo:
gr.Markdown("# AI Car Steering Simulation")
gr.Markdown("*Simulation runs at 1600x900 internally. Display scaling can be enabled for better performance.*")
with gr.Row():
with gr.Column(scale=4):
img = gr.Image(
type="numpy",
label="Game Stream",
elem_id="game-image",
show_download_button=False,
show_share_button=False,
interactive=False,
streaming=True
)
with gr.Column(scale=1):
gr.Markdown("### Controls")
start_btn = gr.Button("Start Simulation", variant="primary")
stop_btn = gr.Button("Stop Simulation", variant="secondary")
gr.Markdown("### Settings")
difficulty_slider = gr.Slider(
minimum=1,
maximum=2,
step=1,
value=2,
label="Difficulty Level"
)
# NEW: Display scaling toggle
scaling_checkbox = gr.Checkbox(
value=ENABLE_DISPLAY_SCALING,
label=f"Enable Display Scaling ({DISPLAY_WIDTH}x{DISPLAY_HEIGHT})",
info="Reduces display resolution for better performance. Simulation stays at 1600x900."
)
gr.Markdown("### Status")
status_text = gr.Textbox(
value="Ready to start",
label="Status",
interactive=False
)
# Event handlers
start_btn.click(fn=start_simulation, outputs=status_text)
stop_btn.click(fn=stop_simulation, outputs=status_text)
difficulty_slider.change(fn=update_difficulty, inputs=difficulty_slider, outputs=status_text)
scaling_checkbox.change(fn=toggle_display_scaling, inputs=scaling_checkbox, outputs=status_text)
# Optimized timer for frame updates
timer = gr.Timer(value=1.0/STREAM_FPS) # 24 FPS streaming
timer.tick(fn=get_frame, outputs=img)
# Configure for HF Spaces
demo.queue(
max_size=10,
api_open=False
)
return demo
if __name__ == "__main__":
demo = create_optimized_interface()
demo.launch(
share=False,
show_error=True,
quiet=False
)