Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import numpy as np | |
from collections import deque | |
from tensorflow import keras | |
from tensorflow.keras.models import load_model | |
import random | |
import streamlit | |
class DoubleDeepQNetwork: | |
def __init__(self, s_size, a_size, alpha, gamma, epsilon, epsilon_min, epsilon_decay): | |
self.nS = s_size | |
self.nA = a_size | |
self.memory = deque([], maxlen=2500) | |
self.alpha = alpha | |
self.gamma = gamma | |
# Explore/Exploit | |
self.epsilon = epsilon | |
self.epsilon_min = epsilon_min | |
self.epsilon_decay = epsilon_decay | |
self.model = self.build_model() | |
self.model_target = self.build_model() # Second (target) neural network | |
self.update_target_from_model() # Update weights | |
self.loss = [] | |
def build_model(self): | |
model = keras.Sequential() # linear stack of layers https://keras.io/models/sequential/ | |
model.add(keras.layers.Dense(256, input_dim=self.nS, activation='relu')) # [Input] -> Layer 1 | |
model.add(keras.layers.Dense(256, activation='relu')) # Layer 2 -> 3 | |
model.add(keras.layers.Dense(self.nA, activation='linear')) # Layer 3 -> [output] | |
model.compile(loss='mean_squared_error', # Loss function: Mean Squared Error | |
optimizer=keras.optimizers.Adam( | |
lr=self.alpha)) # Optimizer: Adam (Feel free to check other options) | |
return model | |
def update_target_from_model(self): | |
# Update the target model from the base model | |
self.model_target.set_weights(self.model.get_weights()) | |
def action(self, state): | |
if np.random.rand() <= self.epsilon: | |
return random.randrange(self.nA) # Explore | |
action_vals = self.model.predict(state) # Exploit: Use the NN to predict the correct action from this state | |
return np.argmax(action_vals[0]) | |
def test_action(self, state): # Exploit | |
action_vals = self.model.predict(state) | |
return np.argmax(action_vals[0]) | |
def store(self, state, action, reward, next_state, done): | |
# Store the experience in memory | |
self.memory.append((state, action, reward, next_state, done)) | |
def save_model(self, agentName): | |
# Save the agent model weights in a file | |
self.model.save(agentName) | |
def load_saved_model(self, agent_name): | |
return load_model(agent_name) | |
def experience_replay(self, batch_size): | |
# Execute the experience replay | |
minibatch = random.sample(self.memory, batch_size) # Randomly sample from memory | |
# streamlit.write(f"{minibatch}") | |
# Convert to numpy for speed by vectorization | |
x = [] | |
y = [] | |
np_array = list(minibatch) | |
st = np.zeros((0, self.nS)) # States | |
nst = np.zeros((0, self.nS)) # Next States | |
for i in range(len(np_array)): | |
st = np.append(st, np_array[i][0], axis=0) | |
nst = np.append(nst, np_array[i][3], axis=0) | |
st_predict = self.model.predict(st) # Here is the speedup! I can predict on the ENTIRE batch | |
nst_predict = self.model.predict(nst) | |
nst_predict_target = self.model_target.predict(nst) # Predict from the TARGET | |
index = 0 | |
for state, action, reward, next_state, done in minibatch: | |
x.append(state) | |
# Predict from state | |
nst_action_predict_target = nst_predict_target[index] | |
nst_action_predict_model = nst_predict[index] | |
if done: # Terminal: Just assign reward much like {* (not done) - QB[state][action]} | |
target = reward | |
else: # Non-terminal | |
target = reward + self.gamma * nst_action_predict_target[ | |
np.argmax(nst_action_predict_model)] # Using Q to get T is Double DQN | |
target_f = st_predict[index] | |
target_f[action] = target | |
y.append(target_f) | |
index += 1 | |
# Reshape for Keras Fit | |
x_reshape = np.array(x).reshape(batch_size, self.nS) | |
y_reshape = np.array(y) | |
epoch_count = 1 | |
hist = self.model.fit(x_reshape, y_reshape, epochs=epoch_count, verbose=0) | |
# Graph Losses | |
for i in range(epoch_count): | |
self.loss.append(hist.history['loss'][i]) | |
# Decay Epsilon | |
if self.epsilon > self.epsilon_min: | |
self.epsilon *= self.epsilon_decay | |