|
import os |
|
import pickle |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
from keras.layers import Dense, Dropout, LeakyReLU |
|
from keras.models import Sequential |
|
from keras.optimizers import Adam |
|
from numpy.random import randn |
|
from sklearn.ensemble import RandomForestClassifier |
|
from sklearn import metrics |
|
from tqdm import tqdm |
|
|
|
|
|
class GG(object): |
|
|
|
def __init__(self, number_of_features, saved_models_path, learning_rate, dropout, alpha): |
|
""" |
|
The constructor for the General Generator class. |
|
:param number_of_features: Number of features in the data. Used to determine the noise dimensions |
|
:param saved_models_path: The folder where we save the models. |
|
""" |
|
self.saved_models_path = saved_models_path |
|
self.number_of_features = number_of_features |
|
|
|
self.generator_model = None |
|
self.discriminator_model = RandomForestClassifier() |
|
self.dropout = dropout |
|
self.alpha = alpha |
|
self.noise_dim = int(number_of_features / 2) |
|
self.learning_rate = learning_rate |
|
self.number_of_features = number_of_features |
|
self.build_generator() |
|
self.losses = {'gen_loss': [], 'dis_loss_pred': [], 'dis_loss_proba': []} |
|
|
|
|
|
def build_generator(self): |
|
""" |
|
This function creates the generator model for the GG. |
|
We used a fairly simple MLP architecture. |
|
:return: |
|
""" |
|
|
|
self.generator_model = Sequential() |
|
self.generator_model.add(Dense(int(self.number_of_features * 2), input_shape=(self.noise_dim + 1, ))) |
|
self.generator_model.add(LeakyReLU(alpha=self.alpha)) |
|
|
|
self.generator_model.add(Dense(int(self.number_of_features * 4))) |
|
self.generator_model.add(LeakyReLU(alpha=self.alpha)) |
|
self.generator_model.add(Dropout(self.dropout)) |
|
|
|
self.generator_model.add(Dense(int(self.number_of_features * 2))) |
|
self.generator_model.add(LeakyReLU(alpha=self.alpha)) |
|
self.generator_model.add(Dropout(self.dropout)) |
|
|
|
self.generator_model.add(Dense(self.number_of_features, activation='sigmoid')) |
|
optimizer = Adam(lr=self.learning_rate) |
|
self.generator_model.compile(loss='categorical_crossentropy', optimizer=optimizer) |
|
|
|
|
|
|
|
def train_gg(self, x_train, y_train, epochs, batch_size, model_name, data, output_path, to_plot=False): |
|
""" |
|
This function running the training stage manually. |
|
:param output_path: Path to save loss fig |
|
:param to_plot: Plots the losses if True |
|
:param x_train: the training set features |
|
:param y_train: the training set classes |
|
:param model_name: name of model to save (for generator) |
|
:param epochs: number of epochs |
|
:param batch_size: the batch size |
|
:return: trains the discriminator and generator. |
|
""" |
|
|
|
losses_path = os.path.join(self.saved_models_path, f'{model_name}_losses') |
|
model_file = os.path.join(self.saved_models_path, f'{model_name}_part_2_gen_weights.h5') |
|
|
|
|
|
self.train_black_box_dis(x_train, y_train) |
|
self.train_generator(x_train, model_file, epochs, batch_size, losses_path) |
|
if to_plot: |
|
self.plot_losses(data, output_path) |
|
|
|
def train_black_box_dis(self, x_train, y_train): |
|
""" |
|
Trains the discriminator and saves it. |
|
:param x_train: the training set features |
|
:param y_train: the training set classes |
|
:return: |
|
""" |
|
dis_output = os.path.join(self.saved_models_path, 'black_box_dis_model') |
|
|
|
if os.path.exists(dis_output): |
|
|
|
with open(dis_output, 'rb') as rf_file: |
|
self.discriminator_model = pickle.load(rf_file) |
|
|
|
self.discriminator_model.fit(x_train, y_train) |
|
with open(dis_output, 'wb') as rf_file: |
|
pickle.dump(self.discriminator_model, rf_file) |
|
|
|
def train_generator(self, data, model_path, epochs, start_batch_size, losses_path): |
|
""" |
|
Function for training the general generator. |
|
:param losses_path: The filepath for the loss results |
|
:param data: The normalized dataset |
|
:param model_path: The name of the model to save. includes epoch size, batches etc. |
|
:param epochs: Number of epochs |
|
:param start_batch_size: Size of batch to use. |
|
:return: trains the generator, saves it and the losses during training. |
|
""" |
|
|
|
if os.path.exists(model_path): |
|
self.generator_model.load_weights(model_path) |
|
with open(losses_path, 'rb') as loss_file: |
|
self.losses = pickle.load(loss_file) |
|
return |
|
|
|
for epoch in range(epochs): |
|
np.random.shuffle(data) |
|
batch_size = start_batch_size |
|
for i in tqdm(range(0, data.shape[0], batch_size), ascii=True): |
|
if data.shape[0] - i >= batch_size: |
|
batch_input = data[i:i + batch_size] |
|
else: |
|
batch_input = data[i:] |
|
batch_size = batch_input.shape[0] |
|
|
|
g_loss = self.train_generator_on_batch(batch_input) |
|
self.losses['gen_loss'].append(g_loss) |
|
|
|
self.save_generator_model(model_path, losses_path) |
|
|
|
def save_generator_model(self, generator_model_path, losses_path): |
|
""" |
|
Saves the model and the loss data with pickle. |
|
|
|
:param generator_model_path: File path for the generator |
|
:param losses_path: File path for the losses |
|
:return: |
|
""" |
|
self.generator_model.save_weights(generator_model_path) |
|
with open(losses_path, 'wb+') as loss_file: |
|
pickle.dump(self.losses, loss_file) |
|
|
|
def train_generator_on_batch(self, batch_input): |
|
""" |
|
Trains the generator for a single batch. Creates the necessary input, comprised of noise and the real |
|
probabilities obtained from the black box. Compared to the target output, made of real samples and the |
|
probabilities made up by the generator. |
|
:param batch_input: |
|
:return: |
|
""" |
|
batch_size = batch_input.shape[0] |
|
discriminator_probabilities = self.discriminator_model.predict_proba(batch_input)[:, -1:] |
|
|
|
|
|
noise = randn(batch_size, self.noise_dim) |
|
gen_model_input = np.hstack([noise, discriminator_probabilities]) |
|
generated_probabilities = self.generator_model.predict(gen_model_input)[:, -1:] |
|
target_output = np.hstack([batch_input, generated_probabilities]) |
|
g_loss = self.generator_model.train_on_batch(gen_model_input, target_output) |
|
|
|
return g_loss |
|
|
|
def plot_discriminator_results(self, x_test, y_test, data, path): |
|
""" |
|
:param x_test: Test set |
|
:param y_test: Test classes |
|
:return: Prints the required plots. |
|
""" |
|
|
|
blackbox_probs = self.discriminator_model.predict_proba(x_test) |
|
discriminator_predictions = self.discriminator_model.predict(x_test) |
|
count_1 = int(np.sum(y_test)) |
|
count_0 = int(y_test.shape[0] - count_1) |
|
class_data = (['Class 0', 'Class 1'], [count_0, count_1]) |
|
self.plot_data(class_data, path, mode='bar', x_title='Class', title=f'Distribution of classes - {data} dataset') |
|
self.plot_data(blackbox_probs[:, 0], path, title=f'Probabilities for test set - class 0 - {data} dataset') |
|
self.plot_data(blackbox_probs[:, 1], path, title=f'Probabilities for test set - class 1 - {data} dataset') |
|
|
|
min_confidence = blackbox_probs[:, 0].min(), blackbox_probs[:, 1].min() |
|
max_confidence = blackbox_probs[:, 0].max(), blackbox_probs[:, 1].max() |
|
mean_confidence = blackbox_probs[:, 0].mean(), blackbox_probs[:, 1].mean() |
|
|
|
print("Accuracy:", metrics.accuracy_score(y_test, discriminator_predictions)) |
|
for c in [0, 1]: |
|
print(f'Class {c} - Min confidence: {min_confidence[c]} - Max Confidence: {max_confidence[c]} - ' |
|
f'Mean confidence: {mean_confidence[c]}') |
|
|
|
def plot_generator_results(self, data, path, num_of_instances=1000): |
|
""" |
|
Creates plots for the generator results on 1000 instances. |
|
:param path: |
|
:param data: Name of dataset used. |
|
:param num_of_instances: Number of samples to generate. |
|
:return: |
|
""" |
|
sampled_proba, generated_instances = self.generate_n_samples(num_of_instances) |
|
|
|
proba_fake = self.discriminator_model.predict_proba(generated_instances[:, :-1]) |
|
for c in [0, 1]: |
|
title = f'Confidence Score for Class {c} of Fake Samples - {data} dataset' |
|
self.plot_data(proba_fake[:, c], path, x_title='Confidence Score', title=title) |
|
|
|
black_box_confidence = proba_fake[:, 1:] |
|
proba_error = np.abs(sampled_proba - black_box_confidence) |
|
generated_classes = np.array([int(round(c)) for c in generated_instances[:, -1].tolist()]).reshape(1000, 1) |
|
proba_stats = np.hstack([sampled_proba, generated_classes, proba_fake[:, :1], proba_fake[:, 1:], proba_error]) |
|
|
|
for c in [0, 1]: |
|
class_data = proba_stats[proba_stats[:, 1] == c] |
|
class_data = class_data[class_data[:, 0].argsort()] |
|
title = f'Error rate for different probabilities, class {c} - {data} dataset' |
|
self.plot_data((class_data[:, 0], class_data[:, -1]), path, mode='plot', y_title='error rate', title=title) |
|
|
|
def generate_n_samples(self, n): |
|
""" |
|
Functions for generating N samples with a uniformly distribution confidence level. |
|
:param n: Number of samples |
|
:return: a tuple of the confidence scores used and the samples created. |
|
""" |
|
noise = randn(n, self.noise_dim) |
|
|
|
confidences = np.random.uniform(0, 1, (n, 1)) |
|
|
|
generator_input = np.hstack([noise, confidences]) |
|
generated_instances = self.generator_model.predict(generator_input) |
|
|
|
return confidences, generated_instances |
|
|
|
@staticmethod |
|
def plot_data(data, path, mode='hist', x_title='Probabilities', y_title='# of Instances', title='Distribution'): |
|
""" |
|
:param path: Path to save |
|
:param mode: Mode to use |
|
:param y_title: Title of y axis |
|
:param x_title: Title of x axis |
|
:param data: Data to plot |
|
:param title: Title of plot |
|
:return: Prints a plot |
|
""" |
|
plt.clf() |
|
|
|
if mode == 'hist': |
|
plt.hist(data) |
|
elif mode == 'bar': |
|
plt.bar(data[0], data[1]) |
|
else: |
|
plt.plot(data[0], data[1]) |
|
|
|
plt.title(title) |
|
plt.ylabel(y_title) |
|
plt.xlabel(x_title) |
|
|
|
path = os.path.join(path, title) |
|
plt.savefig(path) |
|
|
|
def plot_losses(self, data, path): |
|
""" |
|
Plot the losses while training |
|
:return: |
|
""" |
|
plt.clf() |
|
plt.plot(self.losses['gen_loss']) |
|
plt.title('Model loss') |
|
plt.ylabel('Loss') |
|
plt.xlabel('Iteration') |
|
|
|
plt.savefig(os.path.join(path, f'{data} dataset - general_generator_loss.png')) |
|
|
|
def get_error(self, num_of_instances=1000): |
|
""" |
|
Calculates the error of the generator we created by measuring the difference between the probability that |
|
was given as input and the probability of the discriminator on the sample created. |
|
:param num_of_instances: Number of samples to generate. |
|
:return: An array of errors. |
|
""" |
|
sampled_proba, generated_instances = self.generate_n_samples(num_of_instances) |
|
proba_fake = self.discriminator_model.predict_proba(generated_instances[:, :-1]) |
|
black_box_confidence = proba_fake[:, 1:] |
|
return np.abs(sampled_proba - black_box_confidence) |
|
|
|
|