| import gymnasium as gym |
| from gymnasium.vector import AsyncVectorEnv |
| import numpy as np |
| import tensorflow as tf |
| import json |
| import os |
| import time |
|
|
| from reward_shaping import LunarLanderRewardShaping |
| from config import * |
|
|
| def save_resume_data(filepath, timesteps, episodes): |
| """ |
| Saves the current training state to a JSON file located at the specified filepath. |
| """ |
| data = { |
| "timesteps": timesteps, |
| "episode_count": episodes, |
| "date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
| } |
| try: |
| with open(filepath, 'w') as f: |
| json.dump(data, f) |
| except Exception as e: |
| print(f"Error saving resume data to {filepath}: {e}") |
|
|
| def load_resume_data(filepath): |
| """ |
| Loads the last saved training state from the specified JSON file. |
| Returns (timesteps, episode_count). |
| """ |
| if os.path.exists(filepath): |
| try: |
| with open(filepath, 'r') as f: |
| data = json.load(f) |
| timesteps = data.get("timesteps", 0) |
| episodes = data.get("episode_count", 0) |
| return (timesteps, episodes) |
| except Exception as e: |
| print(f"Error loading resume data from {filepath}: {e}. Starting from scratch.") |
| return (0, 0) |
|
|
| def make_env(env_id, seed, idx, **kwargs): |
| """ |
| Creates a single environment instance with a unique seed and applies necessary wrappers. |
| """ |
| def thunk(): |
| env = gym.make(env_id, **kwargs) |
| |
| env = LunarLanderRewardShaping(env) |
| |
| env = gym.wrappers.RecordEpisodeStatistics(env) |
| |
| env.action_space.seed(seed + idx) |
| env.observation_space.seed(seed + idx) |
| |
| return env |
| return thunk |
|
|
| def make_parallel_envs(env_id, num_envs, seed): |
| """ |
| Creates multiple environments and wraps them in an AsyncVectorEnv. |
| """ |
| env_fns = [make_env(env_id, seed, i) for i in range(num_envs)] |
| return AsyncVectorEnv(env_fns) |
|
|
| def calculate_gae(rewards, values, terminated, truncated, next_value, gamma=GAMMA, gae_lambda=GAE_LAMBDA): |
| """ |
| Calculates Generalized Advantage Estimation (GAE) and Returns (R) from rollout data. |
| """ |
| advantages = np.zeros_like(rewards, dtype=np.float32) |
| last_gae_lambda = 0 |
| |
| for t in reversed(range(N_STEPS)): |
| done_mask = 1.0 - (terminated[t] | truncated[t]).astype(np.float32) |
| if t == N_STEPS - 1: |
| next_non_terminal_value = next_value |
| next_value_actual = values[t] |
| else: |
| next_non_terminal_value = values[t + 1] |
| next_value_actual = values[t] |
| |
| delta = rewards[t] + gamma * next_non_terminal_value * done_mask - next_value_actual |
| |
| advantages[t] = delta + gamma * gae_lambda * done_mask * last_gae_lambda |
| last_gae_lambda = advantages[t] |
| |
| returns = advantages + values |
| return advantages, returns |