nevoit's picture
Upload 40 files
76cdfb8
raw
history blame contribute delete
No virus
13.8 kB
import os
from itertools import compress
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from keras.layers import Dense, Dropout, LeakyReLU
from keras.models import Sequential
from keras.optimizers import Adam
from numpy.random import randn
from sklearn.decomposition import PCA
from tqdm import tqdm
class GAN(object):
def __init__(self, number_of_features, saved_models_path, learning_rate, alpha_relu, dropout, loss, activation):
"""
A constructor for the GAN class
:param number_of_features: number of features
:param saved_models_path: the output folder path
"""
self.saved_models_path = saved_models_path
self.number_of_features = number_of_features
self.generator_model = None
self.noise_dim = None
self.discriminator_model = None
self.learning_rate = learning_rate
self.gan_model = None
self.activation = activation
self.alpha_relu = alpha_relu
self.loss = loss
self.dropout = dropout
self.number_of_features = number_of_features
self.build_generator() # build the generator
self.build_discriminator() # build the discriminator
self.build_gan() # build the GAN
def build_generator(self):
"""
This function creates the generator model
:return:
"""
noise_size = int(self.number_of_features / 2)
self.noise_dim = (noise_size,) # size of the noise space
self.generator_model = Sequential()
self.generator_model.add(Dense(int(self.number_of_features * 2), input_shape=self.noise_dim))
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.generator_model.add(Dense(int(self.number_of_features * 4)))
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.generator_model.add(Dropout(self.dropout))
self.generator_model.add(Dense(int(self.number_of_features * 2)))
self.generator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.generator_model.add(Dropout(self.dropout))
# Compile it
self.generator_model.add(Dense(self.number_of_features, activation=self.activation))
self.generator_model.summary()
def build_discriminator(self):
"""
Create discriminator model
:return:
"""
self.discriminator_model = Sequential()
self.discriminator_model.add(Dense(self.number_of_features * 2, input_shape=(self.number_of_features,)))
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.discriminator_model.add(Dense(self.number_of_features * 4))
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.discriminator_model.add(Dropout(self.dropout))
self.discriminator_model.add(Dense(self.number_of_features * 2))
self.discriminator_model.add(LeakyReLU(alpha=self.alpha_relu))
self.discriminator_model.add(Dropout(self.dropout))
# Compile it
self.discriminator_model.add(Dense(1, activation=self.activation))
optimizer = Adam(lr=self.learning_rate)
self.discriminator_model.compile(loss=self.loss, optimizer=optimizer)
self.discriminator_model.summary()
def build_gan(self):
"""
Create the GAN network
:return: the GAN model object
"""
self.gan_model = Sequential()
self.discriminator_model.trainable = False
# The following lines connect the generator and discriminator models to the GAN.
self.gan_model.add(self.generator_model)
self.gan_model.add(self.discriminator_model)
# Compile it
optimizer = Adam(lr=self.learning_rate)
self.gan_model.compile(loss=self.loss, optimizer=optimizer)
return self.gan_model
def train(self, scaled_data, epochs, batch_size, to_plot_losses, model_name):
"""
This function trains the generator and discriminator outputs
:param model_name:
:param to_plot_losses: whether or not to plot history
:param scaled_data: the data after min max scaling
:param epochs: number of epochs
:param batch_size: the batch size
:return: losses_list: returns the losses dictionary the generator or discriminator outputs
"""
dis_output, gen_output, prev_output = self.check_for_existed_output(model_name)
if prev_output:
return -1, -1
losses_output = os.path.join(self.saved_models_path, f'{model_name}_losses.png')
discriminator_loss = []
generator_loss = []
# We need to use half of the batch size for the fake data and half for the real one
half_batch_size = int(batch_size / 2)
iterations = int(len(scaled_data) / half_batch_size)
iterations = iterations + 1 if len(scaled_data) % batch_size != 0 else iterations
for epoch in range(1, epochs + 1): # iterates over the epochs
np.random.shuffle(scaled_data)
p_bar = tqdm(range(iterations), ascii=True)
for iteration in p_bar:
dis_loss, gen_loss = self.train_models(batch_size=batch_size, half_batch_size=half_batch_size,
index=iteration, scaled_data=scaled_data)
discriminator_loss.append(dis_loss)
generator_loss.append(gen_loss)
p_bar.set_description(
f"Epoch ({epoch}/{epochs}) | DISCRIMINATOR LOSS: {dis_loss:.2f} | GENERATOR LOSS: {gen_loss:.2f} |")
# Save weights for future use
self.discriminator_model.save_weights(dis_output)
self.generator_model.save_weights(gen_output)
# Plot losses
if to_plot_losses:
self.plot_losses(discriminator_loss=discriminator_loss, generator_loss=generator_loss,
losses_output=losses_output)
return generator_loss[-1], discriminator_loss[-1]
def check_for_existed_output(self, model_name) -> (str, str, bool):
"""
This function checks for existed output
:param model_name: model's name
:return:
"""
prev_output = False
dis_output = os.path.join(self.saved_models_path, f'{model_name}_dis_weights.h5')
gen_output = os.path.join(self.saved_models_path, f'{model_name}_gen_weights.h5')
if os.path.exists(dis_output) and os.path.exists(gen_output):
print("The model was trained in the past")
self.discriminator_model.load_weights(dis_output)
self.generator_model.load_weights(gen_output)
prev_output = True
return dis_output, gen_output, prev_output
def train_models(self, batch_size, half_batch_size, index, scaled_data):
"""
This function trains the discriminator and the generator
:param batch_size: batch size
:param half_batch_size: half of the batch size
:param index:
:param scaled_data:
:return:
"""
self.discriminator_model.trainable = True
# Create a batch of real data and train the model
x_real, y_real = self.get_real_samples(data=scaled_data, batch_size=half_batch_size, index=index)
d_real_loss = self.discriminator_model.train_on_batch(x_real, y_real)
# Create a batch of fake data and train the model
x_fake, y_fake = self.create_fake_samples(batch_size=half_batch_size)
d_fake_loss = self.discriminator_model.train_on_batch(x_fake, y_fake)
avg_dis_loss = 0.5 * (d_real_loss + d_fake_loss)
# Create noise for the generator model
noise = randn(self.noise_dim[0] * batch_size).reshape((batch_size, self.noise_dim[0]))
self.discriminator_model.trainable = False
gen_loss = self.gan_model.train_on_batch(noise, np.ones((batch_size, 1)))
return avg_dis_loss, gen_loss
@staticmethod
def get_real_samples(data, batch_size, index):
"""
Generate batch_size of real samples with class labels
:param data: the original data
:param batch_size: batch size
:param index: the index of the batch
:return: x: real samples, y: labels
"""
start_index = batch_size * index
end_index = start_index + batch_size
x = data[start_index: end_index]
return x, np.ones((len(x), 1))
def create_fake_samples(self, batch_size):
"""
Use the generator to generate n fake examples, with class labels
:param batch_size: batch size
:return:
"""
noise = randn(self.noise_dim[0] * batch_size).reshape((batch_size, self.noise_dim[0]))
x = self.generator_model.predict(noise) # create fake samples using the generator
return x, np.zeros((len(x), 1))
@staticmethod
def plot_losses(discriminator_loss, generator_loss, losses_output):
"""
Plot training loss values
:param generator_loss:
:param discriminator_loss:
:param losses_output:
:return:
"""
plt.plot(discriminator_loss)
plt.plot(generator_loss)
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('Discriminator and Generator Losses')
plt.legend(['Discriminator Loss', 'Generator Loss'])
plt.savefig(losses_output)
@staticmethod
def return_minimum_euclidean_distance(scaled_data, x):
"""
This function returns the
:param scaled_data: the original data
:param x: a record we want to compare with
:return: the minimum distance and the index of the minimum value
"""
s = np.power(np.power((scaled_data - np.array(x)), 2).sum(1), 0.5)
return pd.Series([s[s.argmin()], s.argmin()])
def test(self, scaled_data, sample_num, pca_output):
"""
This function tests the model
:param scaled_data: the original scaled data
:param sample_num: number of samples to generate
:param pca_output: the output of PCA
:return:
"""
x_fake, y_fake = self.create_fake_samples(batch_size=sample_num)
fake_pred = self.discriminator_model.predict(x_fake)
# Filter data to different matrices
dis_fooled_scaled = np.asarray(list(compress(x_fake, fake_pred > 0.5)))
dis_not_fooled_scaled = np.asarray(list(compress(x_fake, fake_pred <= 0.5)))
# ------------- Euclidean -------------
mean_min_distance_fooled, mean_min_distance_not_fooled = (-1, -1)
if len(dis_fooled_scaled) > 0 and len(dis_not_fooled_scaled) > 0:
mean_min_distance_fooled = self.get_mean_distance_score(scaled_data, dis_fooled_scaled)
print(f'The mean minimum distance for fooled samples is {mean_min_distance_fooled}')
mean_min_distance_not_fooled = self.get_mean_distance_score(scaled_data, dis_not_fooled_scaled)
print(f'The mean minimum distance for not fooled samples is {mean_min_distance_not_fooled}')
else:
print(f'The fooled xor the not Fooled data frames is empty')
# ------------- PCA --------------
data_pca_df = self.get_pca_df(scaled_data, 'original')
dis_fooled_pca_df = self.get_pca_df(dis_fooled_scaled, 'fooled')
dis_not_fooled_pca_df = self.get_pca_df(dis_not_fooled_scaled, 'not fooled')
pca_frames = [data_pca_df, dis_fooled_pca_df, dis_not_fooled_pca_df]
pca_result = pd.concat(pca_frames)
self.plot_pca(pca_result, pca_output)
return dis_fooled_scaled, dis_not_fooled_scaled, mean_min_distance_fooled, mean_min_distance_not_fooled
def get_mean_distance_score(self, scaled_data, dis_scaled):
"""
This function returns the mean distance score for the given dataframe
:param scaled_data: the original data
:param dis_scaled: a dataframe
:return:
"""
dis_fooled_scaled_ecu = pd.DataFrame(dis_scaled)
dis_fooled_scaled_ecu[['min_distance', 'similar_i']] = dis_fooled_scaled_ecu.apply(
lambda x: self.return_minimum_euclidean_distance(scaled_data, x), axis=1)
mean_min_distance_fooled = dis_fooled_scaled_ecu['min_distance'].mean()
return mean_min_distance_fooled
@staticmethod
def plot_pca(pca_result, pca_output):
"""
This function plots the PCA figure
:param pca_result: dataframe with all the results
:param pca_output: output path
:return:
"""
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
ax.set_xlabel('Principal Component 1', fontsize=15)
ax.set_ylabel('Principal Component 2', fontsize=15)
ax.set_title('PCA With Two Components', fontsize=20)
targets = ['original', 'fooled', 'not fooled']
colors = ['r', 'g', 'b']
for target, color in zip(targets, colors):
indices_to_keep = pca_result['name'] == target
ax.scatter(pca_result.loc[indices_to_keep, 'comp1'], pca_result.loc[indices_to_keep, 'comp2'],
c=color, s=50)
ax.legend(targets)
ax.grid()
plt.savefig(pca_output)
@staticmethod
def get_pca_df(scaled_data, data_name):
"""
This function creates the PCA dataframe
:param scaled_data: the original data
:param data_name: the name of the column
:return:
"""
pca = PCA(n_components=2)
principal_components = pca.fit_transform(scaled_data)
principal_df = pd.DataFrame(data=principal_components, columns=['comp1', 'comp2'])
principal_df['name'] = data_name
return principal_df