|
--- |
|
tags: |
|
- CartPole-v1 |
|
- reinforce |
|
- reinforcement-learning |
|
- custom-implementation |
|
- deep-rl-class |
|
model-index: |
|
- name: CartPole |
|
results: |
|
- task: |
|
type: reinforcement-learning |
|
name: reinforcement-learning |
|
dataset: |
|
name: CartPole-v1 |
|
type: CartPole-v1 |
|
metrics: |
|
- type: mean_reward |
|
value: 500.00 +/- 0.00 |
|
name: mean_reward |
|
verified: false |
|
--- |
|
|
|
# **Reinforce** Agent playing **CartPole-v1** |
|
This is a trained model of a **Reinforce** agent playing **CartPole-v1**. |
|
|
|
```python |
|
|
|
# ----------- Libraries ----------- |
|
import numpy as np |
|
|
|
from collections import deque |
|
|
|
import matplotlib.pyplot as plt |
|
%matplotlib inline |
|
|
|
# PyTorch |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import torch.optim as optim |
|
from torch.distributions import Categorical |
|
|
|
# Gym |
|
import gym |
|
import gym_pygame |
|
|
|
|
|
#------------- Enviroment ----------- |
|
env_id = "CartPole-v1" |
|
# Create the env |
|
env = gym.make(env_id) |
|
|
|
# Create the evaluation env |
|
eval_env = gym.make(env_id) |
|
|
|
# Get the state space and action space |
|
s_size = env.observation_space.shape[0] |
|
a_size = env.action_space.n |
|
|
|
|
|
#------------ Policy -------------- |
|
class Policy(nn.Module): |
|
def __init__(self, s_size, a_size, h_size): |
|
super(Policy, self).__init__() |
|
# Create two fully connected layers |
|
self.fc1 = nn.Linear(s_size, h_size) |
|
self.fc2 = nn.Linear(h_size, a_size) |
|
|
|
|
|
def forward(self, x): |
|
# Define the forward pass |
|
# state goes to fc1 then we apply ReLU activation function |
|
x = F.relu(self.fc1(x)) |
|
# fc1 outputs goes to fc2 |
|
x = self.fc2(x) |
|
# We output the softmax |
|
return F.softmax(x, dim=1) |
|
|
|
def act(self, state): |
|
""" |
|
Given a state, take action |
|
""" |
|
state = torch.from_numpy(state).float().unsqueeze(0).to(device) |
|
probs = self.forward(state).cpu() |
|
m = Categorical(probs) |
|
action = m.sample() |
|
return action.item(), m.log_prob(action) |
|
|
|
#--------------- Reinforce -------------- |
|
def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every): |
|
# Help us to calculate the score during the training |
|
scores_deque = deque(maxlen=100) |
|
scores = [] |
|
# Line 3 of pseudocode |
|
for i_episode in range(1, n_training_episodes+1): |
|
saved_log_probs = [] |
|
rewards = [] |
|
state = env.reset() |
|
# Line 4 of pseudocode |
|
for t in range(max_t): |
|
action, log_prob = policy.act(state) |
|
saved_log_probs.append(log_prob) |
|
state, reward, done, _ = env.step(action) |
|
rewards.append(reward) |
|
if done: |
|
break |
|
scores_deque.append(sum(rewards)) |
|
scores.append(sum(rewards)) |
|
|
|
# Line 6 of pseudocode: calculate the return |
|
returns = deque(maxlen=max_t) |
|
n_steps = len(rewards) |
|
# Compute the discounted returns at each timestep, |
|
# as the sum of the gamma-discounted return at time t (G_t) + the reward at time t |
|
|
|
# In O(N) time, where N is the number of time steps |
|
# (this definition of the discounted return G_t follows the definition of this quantity |
|
# shown at page 44 of Sutton&Barto 2017 2nd draft) |
|
# G_t = r_(t+1) + r_(t+2) + ... |
|
|
|
# Given this formulation, the returns at each timestep t can be computed |
|
# by re-using the computed future returns G_(t+1) to compute the current return G_t |
|
# G_t = r_(t+1) + gamma*G_(t+1) |
|
# G_(t-1) = r_t + gamma* G_t |
|
# (this follows a dynamic programming approach, with which we memorize solutions in order |
|
# to avoid computing them multiple times) |
|
|
|
# This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft) |
|
# G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ... |
|
|
|
|
|
## Given the above, we calculate the returns at timestep t as: |
|
# gamma[t] * return[t] + reward[t] |
|
# |
|
## We compute this starting from the last timestep to the first, in order |
|
## to employ the formula presented above and avoid redundant computations that would be needed |
|
## if we were to do it from first to last. |
|
|
|
## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps |
|
## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1) |
|
## a normal python list would instead require O(N) to do this. |
|
for t in range(n_steps)[::-1]: |
|
disc_return_t = (returns[0] if len(returns)>0 else 0) |
|
returns.appendleft( gamma*disc_return_t + rewards[t] ) |
|
|
|
## standardization of the returns is employed to make training more stable |
|
eps = np.finfo(np.float32).eps.item() |
|
|
|
## eps is the smallest representable float, which is |
|
# added to the standard deviation of the returns to avoid numerical instabilities |
|
returns = torch.tensor(returns) |
|
returns = (returns - returns.mean()) / (returns.std() + eps) |
|
|
|
# Line 7: |
|
policy_loss = [] |
|
for log_prob, disc_return in zip(saved_log_probs, returns): |
|
policy_loss.append(-log_prob * disc_return) |
|
policy_loss = torch.cat(policy_loss).sum() |
|
|
|
# Line 8: PyTorch prefers gradient descent |
|
optimizer.zero_grad() |
|
policy_loss.backward() |
|
optimizer.step() |
|
|
|
if i_episode % print_every == 0: |
|
print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque))) |
|
|
|
return scores |
|
|
|
# ---------- Training Hyperparameters ---------- |
|
cartpole_hyperparameters = { |
|
"h_size": 16, |
|
"n_training_episodes": 1000, |
|
"n_evaluation_episodes": 100, |
|
"max_t": 1000, |
|
"gamma": 1.0, |
|
"lr": 1e-2, |
|
"env_id": env_id, |
|
"state_space": s_size, |
|
"action_space": a_size, |
|
} |
|
|
|
# ---------- Policy and optimizer ---------- |
|
# Create policy and place it to the device |
|
cartpole_policy = Policy(cartpole_hyperparameters["state_space"], cartpole_hyperparameters["action_space"], cartpole_hyperparameters["h_size"]).to(device) |
|
cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"]) |
|
|
|
# --------- Training ----------- |
|
scores = reinforce(cartpole_policy, |
|
cartpole_optimizer, |
|
cartpole_hyperparameters["n_training_episodes"], |
|
cartpole_hyperparameters["max_t"], |
|
cartpole_hyperparameters["gamma"], |
|
100) |
|
``` |
|
|