|
import os |
|
from itertools import compress |
|
|
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import pandas as pd |
|
from keras.layers import Dense, Dropout, LeakyReLU |
|
from keras.models import Sequential |
|
from keras.optimizers import Adam |
|
from numpy.random import randn |
|
from sklearn.decomposition import PCA |
|
from tqdm import tqdm |
|
|
|
|
|
class GAN(object): |
|
def __init__(self, number_of_features, saved_models_path, learning_rate, alpha_relu, dropout, loss, activation): |
|
""" |
|
A constructor for the GAN class |
|
:param number_of_features: number of features |
|
:param saved_models_path: the output folder path |
|
""" |
|
self.saved_models_path = saved_models_path |
|
self.number_of_features = number_of_features |
|
|
|
self.generator_model = None |
|
self.noise_dim = None |
|
self.discriminator_model = None |
|
self.learning_rate = learning_rate |
|
self.gan_model = None |
|
self.activation = activation |
|
self.alpha_relu = alpha_relu |
|
self.loss = loss |
|
self.dropout = dropout |
|
self.number_of_features = number_of_features |
|
|
|
self.build_generator() |
|
self.build_discriminator() |
|
self.build_gan() |
|
|
|
def build_generator(self): |
|
""" |
|
This function creates the generator model |
|
:return: |
|
""" |
|
noise_size = int(self.number_of_features / 2) |
|
self.noise_dim = (noise_size,) |
|
|
|
self.generator_model = Sequential() |
|
self.generator_model.add(Dense(int(self.number_of_features * 2), input_shape=self.noise_dim)) |
|
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu)) |
|
|
|
self.generator_model.add(Dense(int(self.number_of_features * 4))) |
|
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu)) |
|
self.generator_model.add(Dropout(self.dropout)) |
|
|
|
self.generator_model.add(Dense(int(self.number_of_features * 2))) |
|
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu)) |
|
self.generator_model.add(Dropout(self.dropout)) |
|
|
|
|
|
self.generator_model.add(Dense(self.number_of_features, activation=self.activation)) |
|
self.generator_model.summary() |
|
|
|
def build_discriminator(self): |
|
""" |
|
Create discriminator model |
|
:return: |
|
""" |
|
self.discriminator_model = Sequential() |
|
|
|
self.discriminator_model.add(Dense(self.number_of_features * 2, input_shape=(self.number_of_features,))) |
|
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu)) |
|
|
|
self.discriminator_model.add(Dense(self.number_of_features * 4)) |
|
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu)) |
|
self.discriminator_model.add(Dropout(self.dropout)) |
|
|
|
self.discriminator_model.add(Dense(self.number_of_features * 2)) |
|
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu)) |
|
self.discriminator_model.add(Dropout(self.dropout)) |
|
|
|
|
|
self.discriminator_model.add(Dense(1, activation=self.activation)) |
|
optimizer = Adam(lr=self.learning_rate) |
|
self.discriminator_model.compile(loss=self.loss, optimizer=optimizer) |
|
self.discriminator_model.summary() |
|
|
|
def build_gan(self): |
|
""" |
|
Create the GAN network |
|
:return: the GAN model object |
|
""" |
|
self.gan_model = Sequential() |
|
self.discriminator_model.trainable = False |
|
|
|
|
|
self.gan_model.add(self.generator_model) |
|
self.gan_model.add(self.discriminator_model) |
|
|
|
|
|
optimizer = Adam(lr=self.learning_rate) |
|
self.gan_model.compile(loss=self.loss, optimizer=optimizer) |
|
|
|
return self.gan_model |
|
|
|
def train(self, scaled_data, epochs, batch_size, to_plot_losses, model_name): |
|
""" |
|
This function trains the generator and discriminator outputs |
|
:param model_name: |
|
:param to_plot_losses: whether or not to plot history |
|
:param scaled_data: the data after min max scaling |
|
:param epochs: number of epochs |
|
:param batch_size: the batch size |
|
:return: losses_list: returns the losses dictionary the generator or discriminator outputs |
|
""" |
|
dis_output, gen_output, prev_output = self.check_for_existed_output(model_name) |
|
if prev_output: |
|
return -1, -1 |
|
|
|
losses_output = os.path.join(self.saved_models_path, f'{model_name}_losses.png') |
|
discriminator_loss = [] |
|
generator_loss = [] |
|
|
|
|
|
half_batch_size = int(batch_size / 2) |
|
iterations = int(len(scaled_data) / half_batch_size) |
|
iterations = iterations + 1 if len(scaled_data) % batch_size != 0 else iterations |
|
|
|
for epoch in range(1, epochs + 1): |
|
np.random.shuffle(scaled_data) |
|
p_bar = tqdm(range(iterations), ascii=True) |
|
for iteration in p_bar: |
|
dis_loss, gen_loss = self.train_models(batch_size=batch_size, half_batch_size=half_batch_size, |
|
index=iteration, scaled_data=scaled_data) |
|
discriminator_loss.append(dis_loss) |
|
generator_loss.append(gen_loss) |
|
p_bar.set_description( |
|
f"Epoch ({epoch}/{epochs}) | DISCRIMINATOR LOSS: {dis_loss:.2f} | GENERATOR LOSS: {gen_loss:.2f} |") |
|
|
|
|
|
self.discriminator_model.save_weights(dis_output) |
|
self.generator_model.save_weights(gen_output) |
|
|
|
|
|
if to_plot_losses: |
|
self.plot_losses(discriminator_loss=discriminator_loss, generator_loss=generator_loss, |
|
losses_output=losses_output) |
|
|
|
return generator_loss[-1], discriminator_loss[-1] |
|
|
|
def check_for_existed_output(self, model_name) -> (str, str, bool): |
|
""" |
|
This function checks for existed output |
|
:param model_name: model's name |
|
:return: |
|
""" |
|
prev_output = False |
|
dis_output = os.path.join(self.saved_models_path, f'{model_name}_dis_weights.h5') |
|
gen_output = os.path.join(self.saved_models_path, f'{model_name}_gen_weights.h5') |
|
if os.path.exists(dis_output) and os.path.exists(gen_output): |
|
print("The model was trained in the past") |
|
self.discriminator_model.load_weights(dis_output) |
|
self.generator_model.load_weights(gen_output) |
|
prev_output = True |
|
return dis_output, gen_output, prev_output |
|
|
|
def train_models(self, batch_size, half_batch_size, index, scaled_data): |
|
""" |
|
This function trains the discriminator and the generator |
|
:param batch_size: batch size |
|
:param half_batch_size: half of the batch size |
|
:param index: |
|
:param scaled_data: |
|
:return: |
|
""" |
|
self.discriminator_model.trainable = True |
|
|
|
|
|
x_real, y_real = self.get_real_samples(data=scaled_data, batch_size=half_batch_size, index=index) |
|
d_real_loss = self.discriminator_model.train_on_batch(x_real, y_real) |
|
|
|
|
|
x_fake, y_fake = self.create_fake_samples(batch_size=half_batch_size) |
|
d_fake_loss = self.discriminator_model.train_on_batch(x_fake, y_fake) |
|
|
|
avg_dis_loss = 0.5 * (d_real_loss + d_fake_loss) |
|
|
|
|
|
noise = randn(self.noise_dim[0] * batch_size).reshape((batch_size, self.noise_dim[0])) |
|
|
|
self.discriminator_model.trainable = False |
|
gen_loss = self.gan_model.train_on_batch(noise, np.ones((batch_size, 1))) |
|
|
|
return avg_dis_loss, gen_loss |
|
|
|
@staticmethod |
|
def get_real_samples(data, batch_size, index): |
|
""" |
|
Generate batch_size of real samples with class labels |
|
:param data: the original data |
|
:param batch_size: batch size |
|
:param index: the index of the batch |
|
:return: x: real samples, y: labels |
|
""" |
|
start_index = batch_size * index |
|
end_index = start_index + batch_size |
|
x = data[start_index: end_index] |
|
|
|
return x, np.ones((len(x), 1)) |
|
|
|
def create_fake_samples(self, batch_size): |
|
""" |
|
Use the generator to generate n fake examples, with class labels |
|
:param batch_size: batch size |
|
:return: |
|
""" |
|
noise = randn(self.noise_dim[0] * batch_size).reshape((batch_size, self.noise_dim[0])) |
|
x = self.generator_model.predict(noise) |
|
|
|
return x, np.zeros((len(x), 1)) |
|
|
|
@staticmethod |
|
def plot_losses(discriminator_loss, generator_loss, losses_output): |
|
""" |
|
Plot training loss values |
|
:param generator_loss: |
|
:param discriminator_loss: |
|
:param losses_output: |
|
:return: |
|
""" |
|
plt.plot(discriminator_loss) |
|
plt.plot(generator_loss) |
|
plt.xlabel('Iteration') |
|
plt.ylabel('Loss') |
|
plt.title('Discriminator and Generator Losses') |
|
plt.legend(['Discriminator Loss', 'Generator Loss']) |
|
plt.savefig(losses_output) |
|
|
|
@staticmethod |
|
def return_minimum_euclidean_distance(scaled_data, x): |
|
""" |
|
This function returns the |
|
:param scaled_data: the original data |
|
:param x: a record we want to compare with |
|
:return: the minimum distance and the index of the minimum value |
|
""" |
|
s = np.power(np.power((scaled_data - np.array(x)), 2).sum(1), 0.5) |
|
return pd.Series([s[s.argmin()], s.argmin()]) |
|
|
|
def test(self, scaled_data, sample_num, pca_output): |
|
""" |
|
This function tests the model |
|
:param scaled_data: the original scaled data |
|
:param sample_num: number of samples to generate |
|
:param pca_output: the output of PCA |
|
:return: |
|
""" |
|
x_fake, y_fake = self.create_fake_samples(batch_size=sample_num) |
|
fake_pred = self.discriminator_model.predict(x_fake) |
|
|
|
|
|
dis_fooled_scaled = np.asarray(list(compress(x_fake, fake_pred > 0.5))) |
|
dis_not_fooled_scaled = np.asarray(list(compress(x_fake, fake_pred <= 0.5))) |
|
|
|
|
|
mean_min_distance_fooled, mean_min_distance_not_fooled = (-1, -1) |
|
if len(dis_fooled_scaled) > 0 and len(dis_not_fooled_scaled) > 0: |
|
mean_min_distance_fooled = self.get_mean_distance_score(scaled_data, dis_fooled_scaled) |
|
print(f'The mean minimum distance for fooled samples is {mean_min_distance_fooled}') |
|
mean_min_distance_not_fooled = self.get_mean_distance_score(scaled_data, dis_not_fooled_scaled) |
|
print(f'The mean minimum distance for not fooled samples is {mean_min_distance_not_fooled}') |
|
else: |
|
print(f'The fooled xor the not Fooled data frames is empty') |
|
|
|
|
|
data_pca_df = self.get_pca_df(scaled_data, 'original') |
|
dis_fooled_pca_df = self.get_pca_df(dis_fooled_scaled, 'fooled') |
|
dis_not_fooled_pca_df = self.get_pca_df(dis_not_fooled_scaled, 'not fooled') |
|
pca_frames = [data_pca_df, dis_fooled_pca_df, dis_not_fooled_pca_df] |
|
pca_result = pd.concat(pca_frames) |
|
self.plot_pca(pca_result, pca_output) |
|
|
|
return dis_fooled_scaled, dis_not_fooled_scaled, mean_min_distance_fooled, mean_min_distance_not_fooled |
|
|
|
def get_mean_distance_score(self, scaled_data, dis_scaled): |
|
""" |
|
This function returns the mean distance score for the given dataframe |
|
:param scaled_data: the original data |
|
:param dis_scaled: a dataframe |
|
:return: |
|
""" |
|
dis_fooled_scaled_ecu = pd.DataFrame(dis_scaled) |
|
dis_fooled_scaled_ecu[['min_distance', 'similar_i']] = dis_fooled_scaled_ecu.apply( |
|
lambda x: self.return_minimum_euclidean_distance(scaled_data, x), axis=1) |
|
mean_min_distance_fooled = dis_fooled_scaled_ecu['min_distance'].mean() |
|
return mean_min_distance_fooled |
|
|
|
@staticmethod |
|
def plot_pca(pca_result, pca_output): |
|
""" |
|
This function plots the PCA figure |
|
:param pca_result: dataframe with all the results |
|
:param pca_output: output path |
|
:return: |
|
""" |
|
fig = plt.figure(figsize=(8, 8)) |
|
ax = fig.add_subplot(1, 1, 1) |
|
ax.set_xlabel('Principal Component 1', fontsize=15) |
|
ax.set_ylabel('Principal Component 2', fontsize=15) |
|
ax.set_title('PCA With Two Components', fontsize=20) |
|
targets = ['original', 'fooled', 'not fooled'] |
|
colors = ['r', 'g', 'b'] |
|
for target, color in zip(targets, colors): |
|
indices_to_keep = pca_result['name'] == target |
|
ax.scatter(pca_result.loc[indices_to_keep, 'comp1'], pca_result.loc[indices_to_keep, 'comp2'], |
|
c=color, s=50) |
|
ax.legend(targets) |
|
ax.grid() |
|
plt.savefig(pca_output) |
|
|
|
@staticmethod |
|
def get_pca_df(scaled_data, data_name): |
|
""" |
|
This function creates the PCA dataframe |
|
:param scaled_data: the original data |
|
:param data_name: the name of the column |
|
:return: |
|
""" |
|
pca = PCA(n_components=2) |
|
principal_components = pca.fit_transform(scaled_data) |
|
principal_df = pd.DataFrame(data=principal_components, columns=['comp1', 'comp2']) |
|
principal_df['name'] = data_name |
|
return principal_df |
|
|