--- tags: - CartPole-v1 - reinforce - reinforcement-learning - custom-implementation - deep-rl-class model-index: - name: CartPole results: - task: type: reinforcement-learning name: reinforcement-learning dataset: name: CartPole-v1 type: CartPole-v1 metrics: - type: mean_reward value: 500.00 +/- 0.00 name: mean_reward verified: false --- # **Reinforce** Agent playing **CartPole-v1** This is a trained model of a **Reinforce** agent playing **CartPole-v1**. ```python # ----------- Libraries ----------- import numpy as np from collections import deque import matplotlib.pyplot as plt %matplotlib inline # PyTorch import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.distributions import Categorical # Gym import gym import gym_pygame #------------- Enviroment ----------- env_id = "CartPole-v1" # Create the env env = gym.make(env_id) # Create the evaluation env eval_env = gym.make(env_id) # Get the state space and action space s_size = env.observation_space.shape[0] a_size = env.action_space.n #------------ Policy -------------- class Policy(nn.Module): def __init__(self, s_size, a_size, h_size): super(Policy, self).__init__() # Create two fully connected layers self.fc1 = nn.Linear(s_size, h_size) self.fc2 = nn.Linear(h_size, a_size) def forward(self, x): # Define the forward pass # state goes to fc1 then we apply ReLU activation function x = F.relu(self.fc1(x)) # fc1 outputs goes to fc2 x = self.fc2(x) # We output the softmax return F.softmax(x, dim=1) def act(self, state): """ Given a state, take action """ state = torch.from_numpy(state).float().unsqueeze(0).to(device) probs = self.forward(state).cpu() m = Categorical(probs) action = m.sample() return action.item(), m.log_prob(action) #--------------- Reinforce -------------- def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every): # Help us to calculate the score during the training scores_deque = deque(maxlen=100) scores = [] # Line 3 of pseudocode for i_episode in range(1, n_training_episodes+1): saved_log_probs = [] rewards = [] state = env.reset() # Line 4 of pseudocode for t in range(max_t): action, log_prob = policy.act(state) saved_log_probs.append(log_prob) state, reward, done, _ = env.step(action) rewards.append(reward) if done: break scores_deque.append(sum(rewards)) scores.append(sum(rewards)) # Line 6 of pseudocode: calculate the return returns = deque(maxlen=max_t) n_steps = len(rewards) # Compute the discounted returns at each timestep, # as the sum of the gamma-discounted return at time t (G_t) + the reward at time t # In O(N) time, where N is the number of time steps # (this definition of the discounted return G_t follows the definition of this quantity # shown at page 44 of Sutton&Barto 2017 2nd draft) # G_t = r_(t+1) + r_(t+2) + ... # Given this formulation, the returns at each timestep t can be computed # by re-using the computed future returns G_(t+1) to compute the current return G_t # G_t = r_(t+1) + gamma*G_(t+1) # G_(t-1) = r_t + gamma* G_t # (this follows a dynamic programming approach, with which we memorize solutions in order # to avoid computing them multiple times) # This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft) # G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ... ## Given the above, we calculate the returns at timestep t as: # gamma[t] * return[t] + reward[t] # ## We compute this starting from the last timestep to the first, in order ## to employ the formula presented above and avoid redundant computations that would be needed ## if we were to do it from first to last. ## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps ## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1) ## a normal python list would instead require O(N) to do this. for t in range(n_steps)[::-1]: disc_return_t = (returns[0] if len(returns)>0 else 0) returns.appendleft( gamma*disc_return_t + rewards[t] ) ## standardization of the returns is employed to make training more stable eps = np.finfo(np.float32).eps.item() ## eps is the smallest representable float, which is # added to the standard deviation of the returns to avoid numerical instabilities returns = torch.tensor(returns) returns = (returns - returns.mean()) / (returns.std() + eps) # Line 7: policy_loss = [] for log_prob, disc_return in zip(saved_log_probs, returns): policy_loss.append(-log_prob * disc_return) policy_loss = torch.cat(policy_loss).sum() # Line 8: PyTorch prefers gradient descent optimizer.zero_grad() policy_loss.backward() optimizer.step() if i_episode % print_every == 0: print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque))) return scores # ---------- Training Hyperparameters ---------- cartpole_hyperparameters = { "h_size": 16, "n_training_episodes": 1000, "n_evaluation_episodes": 100, "max_t": 1000, "gamma": 1.0, "lr": 1e-2, "env_id": env_id, "state_space": s_size, "action_space": a_size, } # ---------- Policy and optimizer ---------- # Create policy and place it to the device cartpole_policy = Policy(cartpole_hyperparameters["state_space"], cartpole_hyperparameters["action_space"], cartpole_hyperparameters["h_size"]).to(device) cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"]) # --------- Training ----------- scores = reinforce(cartpole_policy, cartpole_optimizer, cartpole_hyperparameters["n_training_episodes"], cartpole_hyperparameters["max_t"], cartpole_hyperparameters["gamma"], 100) ```