# Policy Gradient

#### This version implements Policy Gradient with Keras to solve cartpole


In [13]:
# %%capture
# !pip install gym==0.22
# !pip install pygame
# !apt install python-opengl
# !apt install ffmpeg
# !apt install xvfb
# !pip install pyvirtualdisplay
# !pip install pyglet==1.5.1

In [14]:
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K

import gym
from gym import spaces
from gym.utils import seeding
from gym import wrappers

from tqdm.notebook import tqdm
from collections import deque
import numpy as np
import random
from matplotlib import pyplot as plt
from sklearn.preprocessing import MinMaxScaler

import io
import base64
from IPython.display import HTML, Video
print(tf.__version__)

# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

2.9.2




In [17]:
# custom model to be able to run a custom loss with parameters
class CustomModel(tf.keras.Model):
 def custom_loss(self,y, y_pred, d_returns):
 # print("y", y.shape)
 # K.print_tensor(y)
 # print("y Pred", y_pred.shape) 
 # K.print_tensor(y_pred)
 # print("d_retur", d_returns.shape) 
 # K.print_tensor(d_returns)
 # crossentropy 
 log_like = y * K.log(y_pred)
 # print("-log_like", log_like.shape) 
 # K.print_tensor(log_like)
 # print("-Log_lik * d_returns")
 # K.print_tensor(-log_like * d_returns)
 # print("k_sum")
 # K.print_tensor(K.sum(-log_like * d_returns ))
 return K.sum(-log_like * d_returns )
 
 def train_step(self, data):
 # Unpack the data. Its structure depends on your model and
 # on what you pass to `fit()`.
 if len(data) == 3:
 x, y, sample_weight = data
 else:
 sample_weight = None
 x, y = data

 # check if we passed the d_return
 if isinstance(x, tuple):
 x, d_return = x

 with tf.GradientTape() as tape:
 y_pred = self(x, training=True) # Forward pass
 # Compute the loss value.
 y = tf.cast(y, tf.float32)
 loss = self.custom_loss(y, y_pred, d_return)

 # Compute gradients
 trainable_vars = self.trainable_variables
 gradients = tape.gradient(loss, trainable_vars)

 # Update weights
 self.optimizer.apply_gradients(zip(gradients, trainable_vars))

 # Update the metrics.
 # Metrics are configured in `compile()`.
 self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

 # Return a dict mapping metric names to current value.
 # Note that it will include the loss (tracked in self.metrics).
 return {m.name: m.result() for m in self.metrics}

In [18]:
class Policy:
 def __init__(self, env=None, action_size=2):

 self.action_size = action_size

 # Hyperparameters
 self.gamma = 0.95 # Discount rate

 self.learning_rate = 1e-2
 
 # Construct DQN models
 self.env = env
 self.action_size = action_size
 self.action_space = [i for i in range(action_size)]
 print("action space",self.action_space)
 # self.saved_log_probs = None
 self.model= self._build_model()
 self.model.summary()

 def _build_model(self):
 
 x = Input(shape=(4,), name='x_input')
 # y_true = Input( shape=(2,), name='y_true' )
 d_returns = Input(shape=[1], name='d_returns')

 l = layers.Dense(16, activation = 'relu')(x)
 l = layers.Dense(16, activation = 'relu')(l)
 y_pred = layers.Dense(self.action_size, activation = 'softmax', name='y_pred')(l)
 
 optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)

 # model_train = Model( inputs=[x], outputs=[y_pred], name='train_only' )
 model_train = CustomModel( inputs=x, outputs=y_pred, name='train_only' )
 # model_predict = Model( inputs=x, outputs=y_pred, name='predict_only' )
 model_train.compile(loss=None, optimizer=optimizer, metrics = ['accuracy'])
 # model_train.compile(loss=None, optimizer=optimizer, metrics = ['accuracy'], run_eagerly = True)

 return model_train


 def act(self, state):
 # print("Act state",state)
 probs = self.model.predict(np.array([state]), verbose=0)[0]
 # print("probs",probs)
 action = np.random.choice(self.action_space, p=probs)
 # print("Action",action)
 # return the action and the log of the probability 
 # return action, np.log(probs[action])
 return action


 # this implements the reinforce 
 def learn(self, n_training_episodes=None, max_t=None, print_every=100):
 # Help us to calculate the score during the training
 scores_deque = deque(maxlen=100)
 scores = []
 # Line 3 of pseudocode
 for i_episode in range(1, n_training_episodes+1):
 # saved_log_probs = []
 saved_actions = []
 saved_state = []
 rewards = []
 state = self.env.reset()
 # Line 4 of pseudocode
 for t in range(max_t):
 saved_state.append(state)
 action = self.act(state)
 # action, log_prob = self.act(state)
 # saved_log_probs.append(log_prob)
 saved_actions.append(action)
 state, reward, done, _ = self.env.step(action)
 rewards.append(reward)
 if done:
 break 
 scores_deque.append(sum(rewards))
 scores.append(sum(rewards))
 
 # Line 6 of pseudocode: calculate the return
 returns = deque(maxlen=max_t) 
 n_steps = len(rewards) 
 # Compute the discounted returns at each timestep,
 # as 
 # the sum of the gamma-discounted return at time t (G_t) + the reward at time t
 #
 # In O(N) time, where N is the number of time steps
 # (this definition of the discounted return G_t follows the definition of this quantity 
 # shown at page 44 of Sutton&Barto 2017 2nd draft)
 # G_t = r_(t+1) + r_(t+2) + ...
 
 # Given this formulation, the returns at each timestep t can be computed 
 # by re-using the computed future returns G_(t+1) to compute the current return G_t
 # G_t = r_(t+1) + gamma*G_(t+1)
 # G_(t-1) = r_t + gamma* G_t
 # (this follows a dynamic programming approach, with which we memorize solutions in order 
 # to avoid computing them multiple times)
 
 # This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft)
 # G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ...
 
 
 ## Given the above, we calculate the returns at timestep t as: 
 # gamma[t] * return[t] + reward[t]
 #
 ## We compute this starting from the last timestep to the first, in order
 ## to employ the formula presented above and avoid redundant computations that would be needed 
 ## if we were to do it from first to last.
 
 ## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps
 ## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1)
 ## a normal python list would instead require O(N) to do this.
 for t in range(n_steps)[::-1]:
 disc_return_t = (returns[0] if len(returns)>0 else 0)
 returns.appendleft( self.gamma*disc_return_t + rewards[t] ) 
 
 ## standardization of the returns is employed to make training more stable
 eps = np.finfo(np.float32).eps.item()
 ## eps is the smallest representable float, which is 
 # added to the standard deviation of the returns to avoid numerical instabilities 
 returns = np.array(returns)
 returns = (returns - returns.mean()) / (returns.std() + eps)
 # self.saved_log_probs = saved_log_probs
 
 # Line 7:
 saved_state = np.array(saved_state)
 # print("Saved state", saved_state, saved_state.shape)
 saved_actions = np.array(to_categorical(saved_actions, num_classes=self.action_size))
 # print("Saved actions", saved_actions, saved_actions.shape)
 returns = returns.reshape(-1,1)
 # print("Returns", returns, returns.shape)
 # this is the trick part, we send a tuple so the CustomModel is able to split the x and use 
 # the returns inside to calculate the custom loss
 self.model.train_on_batch(x=(saved_state,returns), y=saved_actions)

 # policy_loss = []
 # for action, log_prob, disc_return in zip(saved_actions, saved_log_probs, returns):
 # policy_loss.append(-log_prob * disc_return)
 # policy_loss = torch.cat(policy_loss).sum()
 
 # # Line 8: PyTorch prefers gradient descent 
 # optimizer.zero_grad()
 # policy_loss.backward()
 # optimizer.step()
 
 if i_episode % print_every == 0:
 print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))
 
 return scores

 #
 # Loads a saved model
 #https://medium.com/@Bloomore/how-to-write-a-custom-loss-function-with-additional-arguments-in-keras-5f193929f7a0
 #
 def load(self, name):
 self.model.load_weights(name)

 #
 # Saves parameters of a trained model
 #
 def save(self, name):
 self.model.save_weights(name)

 def play(self, state):
 return np.argmax(self.model.predict(np.array([state]), verbose=0)[0])

In [None]:
env = gym.make('CartPole-v1')

model = Policy(env=env, action_size=2)
# model.learn(total_steps=6_000)

model.learn(n_training_episodes=1000, max_t=1000, print_every=100)
env.close()


action space [0, 1]
Model: "train_only"
_________________________________________________________________
 Layer (type) Output Shape Param # 
 x_input (InputLayer) [(None, 4)] 0 
 
 dense_6 (Dense) (None, 16) 80 
 
 dense_7 (Dense) (None, 16) 272 
 
 y_pred (Dense) (None, 2) 34 
 
Total params: 386
Trainable params: 386
Non-trainable params: 0
_________________________________________________________________
Episode 100	Average Score: 66.31
Episode 200	Average Score: 161.58
Episode 300	Average Score: 282.58


In [11]:
model.save("./alt/policy_grad_cartpole.h5")

In [15]:
eval_env = gym.make('CartPole-v1')
model = Policy(env=eval_env, action_size=2)
model.load("./alt/policy_grad_cartpole.h5")
eval_env = wrappers.Monitor(eval_env, "./alt/gym-results", force=True)
state = eval_env.reset()
total_reward = 0
for _ in range(1000):
 action = model.play(state)
 observation, reward, done, info = eval_env.step(action)
 total_reward +=reward
 state = observation
 if done: 
 print(f"Total reward {total_reward}")
 break
eval_env.close()

action space [0, 1]
Model: "train_only"
_________________________________________________________________
 Layer (type) Output Shape Param # 
 x_input (InputLayer) [(None, 4)] 0 
 
 dense_4 (Dense) (None, 16) 80 
 
 dense_5 (Dense) (None, 16) 272 
 
 y_pred (Dense) (None, 2) 34 
 
Total params: 386
Trainable params: 386
Non-trainable params: 0
_________________________________________________________________
Total reward 189.0
