import numpy as np from collections import deque import matplotlib.pyplot as plt import pdb # PyTorch import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.distributions import Categorical # Gym import gym import gym_pygame # Hugging Face Hub from huggingface_hub import notebook_login # To log to our Hugging Face account to be able to upload models to the Hub. import imageio device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print(device) env_id = "CartPole-v1" # Create the env env = gym.make(env_id, render_mode="rgb_array") # Create the evaluation env eval_env = gym.make(env_id, render_mode="rgb_array") # Get the state space and action space s_size = env.observation_space.shape[0] a_size = env.action_space.n print("_____OBSERVATION SPACE_____ \n") print("The State Space is: ", s_size) # cart postion, cart velocity, pole angle, pole velocity at tip print("Sample observation", env.observation_space.sample()) # Get a random observation print("\n _____ACTION SPACE_____ \n") print("The Action Space is: ", a_size) print("Action Space Sample", env.action_space.sample()) # Take a random action # Policy Gradient Network class Policy(nn.Module): def __init__(self, s_size, a_size, h_size): super(Policy, self).__init__() self.fc1 = nn.Linear(s_size, h_size) self.fc2 = nn.Linear(h_size, a_size) def forward(self, x): x = F.relu(self.fc1(x)) x = self.fc2(x) return F.softmax(x, dim=1) def act(self, state): state = torch.from_numpy(state).float().unsqueeze(0).to(device) probs = self.forward(state).cpu()[0] m = Categorical(probs) action = m.sample() return action.item(), m.log_prob(action) def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every): # Help us to calculate the score during the training scores_deque = deque(maxlen=100) scores = [] # Line 3 of pseudocode for i_episode in range(1, n_training_episodes + 1): saved_log_probs = [] rewards = [] state = env.reset()[0] # Line 4 of pseudocode for t in range(max_t): action, log_prob = policy.act(state) saved_log_probs.append(log_prob) state, reward, done, _ = env.step(action)[0] rewards.append(reward) if done: break scores_deque.append(sum(rewards)) scores.append(sum(rewards)) # Line 6 of pseudocode: calculate the return returns = deque(maxlen=max_t) n_steps = len(rewards) # Compute the discounted returns at each timestep, # as # the sum of the gamma-discounted return at time t (G_t) + the reward at time t # # In O(N) time, where N is the number of time steps # (this definition of the discounted return G_t follows the definition of this quantity # shown at page 44 of Sutton&Barto 2017 2nd draft) # G_t = r_(t+1) + r_(t+2) + ... # Given this formulation, the returns at each timestep t can be computed # by re-using the computed future returns G_(t+1) to compute the current return G_t # G_t = r_(t+1) + gamma*G_(t+1) # G_(t-1) = r_t + gamma* G_t # (this follows a dynamic programming approach, with which we memorize solutions in order # to avoid computing them multiple times) # This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft) # G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ... ## Given the above, we calculate the returns at timestep t as: # gamma[t] * return[t] + reward[t] # ## We compute this starting from the last timestep to the first, in order ## to employ the formula presented above and avoid redundant computations that would be needed ## if we were to do it from first to last. ## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps ## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1) ## a normal python list would instead require O(N) to do this. for t in range(n_steps)[::-1]: disc_return_t = returns[0] if len(returns) > 0 else 0 returns.appendleft(gamma * disc_return_t + rewards[t]) ## standardization of the returns is employed to make training more stable eps = np.finfo(np.float32).eps.item() ## eps is the smallest representable float, which is # added to the standard deviation of the returns to avoid numerical instabilities returns = torch.tensor(returns) if len(returns) > 1: returns = (returns - returns.mean()) / (returns.std() + eps) # Line 7: policy_loss = [] for log_prob, disc_return in zip(saved_log_probs, returns): policy_loss.append(-log_prob * disc_return) if len(policy_loss) > 1: policy_loss = torch.cat(policy_loss).sum() else: policy_loss = policy_loss[0] # Line 8: PyTorch prefers gradient descent optimizer.zero_grad() policy_loss.backward() optimizer.step() if i_episode % print_every == 0: print("Episode {}\tAverage Score: {:.2f}".format(i_episode, np.mean(scores_deque))) return scores cartpole_hyperparameters = { "h_size": 16, "n_training_episodes": 1000, "n_evaluation_episodes": 10, "max_t": 1000, "gamma": 1.0, "lr": 1e-2, "env_id": env_id, "state_space": s_size, "action_space": a_size, } # Create policy and place it to the device cartpole_policy = Policy( cartpole_hyperparameters["state_space"], cartpole_hyperparameters["action_space"], cartpole_hyperparameters["h_size"], ).to(device) cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"]) scores = reinforce( cartpole_policy, cartpole_optimizer, cartpole_hyperparameters["n_training_episodes"], cartpole_hyperparameters["max_t"], cartpole_hyperparameters["gamma"], 100, ) def evaluate_agent(env, max_steps, n_eval_episodes, policy): """ Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward. :param env: The evaluation environment :param n_eval_episodes: Number of episode to evaluate the agent :param policy: The Reinforce agent """ episode_rewards = [] for episode in range(n_eval_episodes): state = env.reset()[0] step = 0 done = False total_rewards_ep = 0 for step in range(max_steps): action, _ = policy.act(state) new_state, reward, done, info = env.step(action)[0] total_rewards_ep += reward if done: break state = new_state episode_rewards.append(total_rewards_ep) mean_reward = np.mean(episode_rewards) std_reward = np.std(episode_rewards) return mean_reward, std_reward evaluate_agent( eval_env, cartpole_hyperparameters["max_t"], cartpole_hyperparameters["n_evaluation_episodes"], cartpole_policy ) from huggingface_hub import HfApi, snapshot_download from huggingface_hub.repocard import metadata_eval_result, metadata_save from pathlib import Path import datetime import json import imageio import tempfile import os def record_video(env, policy, out_directory, fps=30): """ Generate a replay video of the agent :param env :param Qtable: Qtable of our agent :param out_directory :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1) """ images = [] done = False state = env.reset()[0] img = env.render() images.append(img) while not done: # Take the action (index) that have the maximum expected future reward given that state action, _ = policy.act(state) state, reward, done, info = env.step(action)[0] # We directly put next_state = state for recording logic img = env.render() images.append(img) imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps) def push_to_hub(repo_id, model, hyperparameters, eval_env, video_fps=30 ): """ Evaluate, Generate a video and Upload a model to Hugging Face Hub. This method does the complete pipeline: - It evaluates the model - It generates the model card - It generates a replay video of the agent - It pushes everything to the Hub :param repo_id: repo_id: id of the model repository from the Hugging Face Hub :param model: the pytorch model we want to save :param hyperparameters: training hyperparameters :param eval_env: evaluation environment :param video_fps: how many frame per seconds to record our video replay """ _, repo_name = repo_id.split("/") api = HfApi() # Step 1: Create the repo repo_url = api.create_repo( repo_id=repo_id, exist_ok=True, ) local_dir = "./cartpole-v1" # Step 2: Save the model torch.save(model, os.path.join(local_dir, "model.pt")) # Step 3: Save the hyperparameters to JSON hyper_path = os.path.join(local_dir, "hyperparameters.json") with open(hyper_path, "w") as outfile: json.dump(hyperparameters, outfile) # Step 4: Evaluate the model and build JSON mean_reward, std_reward = evaluate_agent(eval_env, hyperparameters["max_t"], hyperparameters["n_evaluation_episodes"], model) # Get datetime eval_datetime = datetime.datetime.now() eval_form_datetime = eval_datetime.isoformat() evaluate_data = { "env_id": hyperparameters["env_id"], "mean_reward": mean_reward, "n_evaluation_episodes": hyperparameters["n_evaluation_episodes"], "eval_datetime": eval_form_datetime, } # Write a JSON file result_path = os.path.join(local_dir, "results.json") with open(result_path, "w") as outfile: json.dump(evaluate_data, outfile) # Step 5: Create the model card env_name = hyperparameters["env_id"] metadata = {} metadata["tags"] = [ env_name, "reinforce", "reinforcement-learning", "custom-implementation", "deep-rl-class" ] # Add metrics eval = metadata_eval_result( model_pretty_name=repo_name, task_pretty_name="reinforcement-learning", task_id="reinforcement-learning", metrics_pretty_name="mean_reward", metrics_id="mean_reward", metrics_value=f"{mean_reward:.2f} +/- {std_reward:.2f}", dataset_pretty_name=env_name, dataset_id=env_name, ) # Merges both dictionaries metadata = {**metadata, **eval} model_card = f""" # **Reinforce** Agent playing **{env_id}** This is a trained model of a **Reinforce** agent playing **{env_id}** . To learn to use this model and train yours check Unit 4 of the Deep Reinforcement Learning Course: https://huggingface.co/deep-rl-course/unit4/introduction """ readme_path = Path(local_dir )/ "README.md" readme = "" if readme_path.exists(): with readme_path.open("r", encoding="utf8") as f: readme = f.read() else: readme = model_card with readme_path.open("w", encoding="utf-8") as f: f.write(readme) # Save our metrics to Readme metadata metadata_save(readme_path, metadata) # Step 6: Record a video video_path = os.path.join(local_dir,"replay.mp4") record_video(env, model, video_path, video_fps) # Step 7. Push everything to the Hub api.upload_folder( repo_id=repo_id, folder_path="./", path_in_repo=".", ) print(f"Your model is pushed to the Hub. You can view your model here: {repo_url}") repo_id = "dlwlgus53/Reinforce_cartpol-v1" push_to_hub( repo_id, cartpole_policy, # The model we want to save cartpole_hyperparameters, # Hyperparameters eval_env, # Evaluation environment video_fps=30 )