synls / GAN /timegan.py
danlin1128's picture
Upload 33 files
67069a4 verified
import tensorflow as tf
from tensorflow import keras
from tensorflow.python.types.core import TensorLike
import numpy as np
import numpy.typing as npt
from tqdm import tqdm, trange
from collections import OrderedDict
import typing as T
import logging
from GAN.zoo import BasicRecurrentArchitecture
logger = logging.getLogger("models")
logger.setLevel(logging.DEBUG)
class LossTracker(OrderedDict):
"""
Dictionary of lists, extends python OrderedDict.
Example: Given {'loss_a': [1], 'loss_b': [2]}, adding key='loss_a' with value=0.7
gives {'loss_a': [1, 0.7], 'loss_b': [2]}, and adding key='loss_c' with value=1.2
gives {'loss_a': [1, 0.7], 'loss_b': [2], 'loss_c': [1.2]}
"""
def __setitem__(self, key: T.Any, value: T.Any) -> None:
try:
# Assumes the key already exists
# and the value is a list [oldest_value, another_old, ...]
# key -> [oldest_value, another_old, ..., new_value]
self[key].append(value)
# If there is no key, add key -> [new_value]
except KeyError:
# key -> [new_value]
super(LossTracker, self).__setitem__(key, [value])
def to_numpy(self) -> npt.NDArray:
"""
:return 2d vector of losses
"""
_losses = np.array([np.array(v) for v in self.values() if isinstance(v, list)])
return _losses
def labels(self) -> T.List:
"""
:return list of keys
"""
return list(self.keys())
class TimeGAN(keras.Model):
"""
Time-series Generative Adversarial Networks (TimeGAN)
Reference: Jinsung Yoon, Daniel Jarrett, Mihaela van der Schaar,
"Time-series Generative Adversarial Networks,"
Neural Information Processing Systems (NeurIPS), 2019.
Paper link: https://papers.nips.cc/paper/8789-time-series-generative-adversarial-networks
"""
def __init__(
self,
seq_len: int = 24,
module: str = "gru",
hidden_dim: int = 24,
n_features: int = 6,
n_layers: int = 3,
batch_size: int = 256,
gamma: float = 1.0,
) -> None:
super().__init__()
self.seq_len = seq_len
self.hidden_dim = hidden_dim
self.dim = n_features
assert module in ["gru", "lstm", "lstmLN"]
self.module = module
self.n_layers = n_layers
self.batch_size = batch_size
self.gamma = gamma
# ----------------------------
# Basic Architectures
# ----------------------------
self.embedder = BasicRecurrentArchitecture(
hidden_dim=self.hidden_dim,
output_dim=self.hidden_dim,
n_layers=self.n_layers,
network_type=self.module,
name="Embedder",
).build()
self.recovery = BasicRecurrentArchitecture(
hidden_dim=self.hidden_dim,
output_dim=self.dim,
n_layers=self.n_layers,
network_type=self.module,
name="Recovery",
).build()
self.supervisor = BasicRecurrentArchitecture(
hidden_dim=self.hidden_dim,
output_dim=self.hidden_dim,
n_layers=self.n_layers,
network_type=self.module,
name="Supervisor",
).build()
self.discriminator = BasicRecurrentArchitecture(
hidden_dim=self.hidden_dim,
output_dim=1,
n_layers=self.n_layers,
network_type=self.module,
name="Discriminator",
).build()
self.generator_aux = BasicRecurrentArchitecture(
hidden_dim=self.hidden_dim,
output_dim=self.hidden_dim,
n_layers=self.n_layers,
network_type=self.module,
name="Generator",
).build()
# ----------------------------
# Optimizers: call .compile() to set them
# ----------------------------
self.autoencoder_opt = keras.optimizers.Adam()
self.adversarialsup_opt = keras.optimizers.Adam()
self.generator_opt = keras.optimizers.Adam()
self.embedder_opt = keras.optimizers.Adam()
self.discriminator_opt = keras.optimizers.Adam()
# ----------------------------
# Loss functions: call .compile() to set them
# ----------------------------
self._mse = keras.losses.MeanSquaredError()
self._bce = keras.losses.BinaryCrossentropy()
# --------------------------
# All losses: will be populated in .fit()
# --------------------------
self.training_losses_history = LossTracker()
# --------------------------
# Synthetic data generation during training: will be populated in .fit()
# --------------------------
self.synthetic_data_generated_in_training = dict()
def compile(
self,
d_optimizer: keras.optimizers.Optimizer = keras.optimizers.Adam(), # keras.optimizers.legacy.Adam()
g_optimizer: keras.optimizers.Optimizer = keras.optimizers.Adam(),
emb_optimizer: keras.optimizers.Optimizer = keras.optimizers.Adam(),
supgan_optimizer: keras.optimizers.Optimizer = keras.optimizers.Adam(),
ae_optimizer: keras.optimizers.Optimizer = keras.optimizers.Adam(),
emb_loss: keras.losses.Loss = keras.losses.MeanSquaredError(),
clf_loss: keras.losses.Loss = keras.losses.BinaryCrossentropy(),
) -> None:
"""
Assign optimizers and loss functions.
:param d_optimizer: An optimizer for the GAN's discriminator
:param g_optimizer: An optimizer for the GAN's generator
:param emb_optimizer: An optimizer for the GAN's embedder
:param supgan_optimizer: An optimizer for the adversarial supervised network
:param ae_optimizer: An optimizer for the autoencoder network
:param emb_loss: A loss function for the embedding recovery
:param clf_loss: A loss function for the discriminator task
:return: None
"""
# ----------------------------
# Optimizers
# ----------------------------
self.autoencoder_opt = ae_optimizer
self.adversarialsup_opt = supgan_optimizer
self.generator_opt = g_optimizer
self.embedder_opt = emb_optimizer
self.discriminator_opt = d_optimizer
# ----------------------------
# Loss functions
# ----------------------------
self._mse = emb_loss
self._bce = clf_loss
def _define_timegan(self) -> None:
# --------------------------------
# Data and Noise Inputs
# --------------------------------
X = keras.layers.Input(
shape=[self.seq_len, self.dim], batch_size=self.batch_size, name="RealData"
)
Z = keras.layers.Input(
shape=[self.seq_len, self.dim],
batch_size=self.batch_size,
name="RandomNoise",
)
# --------------------------------
# Autoencoder: Embedder + Recovery
# --------------------------------
H = self.embedder(X)
X_tilde = self.recovery(H)
self.autoencoder = keras.models.Model(
inputs=X, outputs=X_tilde, name="Autoencoder"
)
self.autoencoder.summary()
# ---------------------------------
# Adversarial Supervised
# ---------------------------------
E_Hat = self.generator_aux(Z)
H_hat = self.supervisor(E_Hat)
Y_fake = self.discriminator(H_hat)
self.adversarial_supervised = keras.models.Model(
inputs=Z, outputs=Y_fake, name="AdversarialSupervised"
)
self.adversarial_supervised.summary()
# ---------------------------------
# Adversarial embedded in latent space
# ---------------------------------
Y_fake_e = self.discriminator(E_Hat)
self.adversarial_embedded = keras.models.Model(
inputs=Z, outputs=Y_fake_e, name="AdversarialEmbedded"
)
self.adversarial_embedded.summary()
# ---------------------------------
# Synthetic data generator
# ---------------------------------
X_hat = self.recovery(H_hat)
self.generator = keras.models.Model(
inputs=Z, outputs=X_hat, name="FinalGenerator"
)
self.generator.summary()
# --------------------------------
# Discriminator
# --------------------------------
Y_real = self.discriminator(H)
self.discriminator_model = keras.models.Model(
inputs=X, outputs=Y_real, name="FinalDiscriminator"
)
self.discriminator_model.summary()
@tf.function
def _train_autoencoder(
self, X: TensorLike, optimizer: keras.optimizers.Optimizer
) -> float:
"""
1. Embedding network training: minimize E_loss0
"""
with tf.GradientTape() as tape:
X_tilde = self.autoencoder(X)
E_loss_T0 = self._mse(X, X_tilde)
E_loss0 = 10.0 * tf.sqrt(E_loss_T0)
e_vars = self.embedder.trainable_variables
r_vars = self.recovery.trainable_variables
all_trainable = e_vars + r_vars
gradients = tape.gradient(E_loss0, all_trainable)
optimizer.apply_gradients(zip(gradients, all_trainable))
return E_loss0
@tf.function
def _train_supervisor(
self, X: TensorLike, optimizer: keras.optimizers.Optimizer
) -> float:
"""
2. Training with supervised loss only: minimize G_loss_S
"""
with tf.GradientTape() as tape:
H = self.embedder(X)
H_hat_supervised = self.supervisor(H)
G_loss_S = self._mse(H[:, 1:, :], H_hat_supervised[:, :-1, :])
g_vars = self.generator.trainable_variables
s_vars = self.supervisor.trainable_variables
all_trainable = g_vars + s_vars
gradients = tape.gradient(G_loss_S, all_trainable)
apply_grads = [
(grad, var)
for (grad, var) in zip(gradients, all_trainable)
if grad is not None
]
optimizer.apply_gradients(apply_grads)
return G_loss_S
@tf.function
def _train_generator(
self, X: TensorLike, Z: TensorLike, optimizer: keras.optimizers.Optimizer
) -> T.Tuple[float, float, float, float, float]:
"""
3. Joint training (Generator training twice more than discriminator training): minimize G_loss
"""
with tf.GradientTape() as tape:
# 1. Adversarial loss
Y_fake = self.adversarial_supervised(Z)
G_loss_U = self._bce(y_true=tf.ones_like(Y_fake), y_pred=Y_fake)
Y_fake_e = self.adversarial_embedded(Z)
G_loss_U_e = self._bce(y_true=tf.ones_like(Y_fake_e), y_pred=Y_fake_e)
# 2. Supervised loss
H = self.embedder(X)
H_hat_supervised = self.supervisor(H)
G_loss_S = self._mse(H[:, 1:, :], H_hat_supervised[:, :-1, :])
# 3. Two Moments
X_hat = self.generator(Z)
G_loss_V = self._compute_generator_moments_loss(X, X_hat)
# 4. Summation
G_loss = (
G_loss_U
+ self.gamma * G_loss_U_e
+ 100 * tf.sqrt(G_loss_S)
+ 100 * G_loss_V
)
g_vars = self.generator_aux.trainable_variables
s_vars = self.supervisor.trainable_variables
all_trainable = g_vars + s_vars
gradients = tape.gradient(G_loss, all_trainable)
apply_grads = [
(grad, var)
for (grad, var) in zip(gradients, all_trainable)
if grad is not None
]
optimizer.apply_gradients(apply_grads)
return G_loss_U, G_loss_U_e, G_loss_S, G_loss_V, G_loss
@tf.function
def _train_embedder(
self, X: TensorLike, optimizer: keras.optimizers.Optimizer
) -> T.Tuple[float, float]:
"""
Train embedder during joint training: minimize E_loss
"""
with tf.GradientTape() as tape:
# Supervised Loss
H = self.embedder(X)
H_hat_supervised = self.supervisor(H)
G_loss_S = self._mse(H[:, 1:, :], H_hat_supervised[:, :-1, :])
# Reconstruction Loss
X_tilde = self.autoencoder(X)
E_loss_T0 = self._mse(X, X_tilde)
E_loss0 = 10 * tf.sqrt(E_loss_T0)
E_loss = E_loss0 + 0.1 * G_loss_S
e_vars = self.embedder.trainable_variables
r_vars = self.recovery.trainable_variables
all_trainable = e_vars + r_vars
gradients = tape.gradient(E_loss, all_trainable)
optimizer.apply_gradients(zip(gradients, all_trainable))
return E_loss, E_loss_T0
@tf.function
def _train_discriminator(
self, X: TensorLike, Z: TensorLike, optimizer: keras.optimizers.Optimizer
) -> float:
"""
minimize D_loss
"""
with tf.GradientTape() as tape:
D_loss = self._check_discriminator_loss(X, Z)
d_vars = self.discriminator.trainable_variables
gradients = tape.gradient(D_loss, d_vars)
optimizer.apply_gradients(zip(gradients, d_vars))
return D_loss
@staticmethod
def _compute_generator_moments_loss(
y_true: TensorLike, y_pred: TensorLike
) -> float:
"""
:param y_true: TensorLike
:param y_pred: TensorLike
:return G_loss_V: float
"""
_eps = 1e-6
y_true_mean, y_true_var = tf.nn.moments(x=y_true, axes=[0])
y_pred_mean, y_pred_var = tf.nn.moments(x=y_pred, axes=[0])
# G_loss_V2
g_loss_mean = tf.reduce_mean(abs(y_true_mean - y_pred_mean))
# G_loss_V1
g_loss_var = tf.reduce_mean(
abs(tf.sqrt(y_true_var + _eps) - tf.sqrt(y_pred_var + _eps))
)
# G_loss_V = G_loss_V1 + G_loss_V2
return g_loss_mean + g_loss_var
def _check_discriminator_loss(self, X: TensorLike, Z: TensorLike) -> float:
"""
:param X: TensorLike
:param Z: TensorLike
:return D_loss: float
"""
# Loss on false negatives
Y_real = self.discriminator_model(X)
D_loss_real = self._bce(y_true=tf.ones_like(Y_real), y_pred=Y_real)
# Loss on false positives
Y_fake = self.adversarial_supervised(Z)
D_loss_fake = self._bce(y_true=tf.zeros_like(Y_fake), y_pred=Y_fake)
Y_fake_e = self.adversarial_embedded(Z)
D_loss_fake_e = self._bce(y_true=tf.zeros_like(Y_fake_e), y_pred=Y_fake_e)
D_loss = D_loss_real + D_loss_fake + self.gamma * D_loss_fake_e
return D_loss
def _generate_noise(self) -> TensorLike:
"""
Random vector generation
:return Z, generated random vector
"""
while True:
yield np.random.uniform(low=0, high=1, size=(self.seq_len, self.dim))
def get_noise_batch(self) -> T.Iterator:
"""
Return an iterator of random noise vectors
"""
return iter(
tf.data.Dataset.from_generator(
self._generate_noise, output_types=tf.float32
)
.batch(self.batch_size)
.repeat()
)
def _get_data_batch(self, data: TensorLike, n_windows: int) -> T.Iterator:
"""
Return an iterator of shuffled input data
"""
data = tf.convert_to_tensor(data, dtype=tf.float32)
return iter(
tf.data.Dataset.from_tensor_slices(data)
.shuffle(buffer_size=n_windows)
.batch(self.batch_size)
.repeat()
)
def fit(
self,
data: T.Union[TensorLike, tf.data.Dataset],
epochs: int,
checkpoints_interval: T.Optional[int] = None,
generate_synthetic: T.Tuple = (),
*args,
**kwargs,
):
"""
:param data: TensorLike, the training data
:param epochs: int, the number of epochs for the training loops
:param checkpoints_interval: int, the interval for printing out loss values
(loss values will be print out every 'checkpoints_interval' epochs)
Default: None (no print out)
:param generate_synthetic: list of int, a list of epoch numbers when synthetic data samples are generated
Default: [] (no generation)
:return None
"""
assert not (
self.autoencoder_opt is None
or self.adversarialsup_opt is None
or self.generator_opt is None
or self.embedder_opt is None
or self.discriminator_opt is None
), "One of the optimizers is not defined. Please call .compile() to set them"
assert not (
self._mse is None or self._bce is None
), "One of the loss functions is not defined. Please call .compile() to set them"
# take tf.data.Dataset | TensorLike
if isinstance(data, tf.data.Dataset):
batches = iter(data.repeat())
else:
batches = self._get_data_batch(data, n_windows=len(data))
# Define the model
self._define_timegan()
# 1. Embedding network training
logger.info("Start Embedding Network Training")
for epoch in tqdm(range(epochs), desc="Autoencoder - training"):
X_ = next(batches)
step_e_loss_0 = self._train_autoencoder(X_, self.autoencoder_opt)
# Checkpoint
if checkpoints_interval is not None and epoch % checkpoints_interval == 0:
logger.info(f"step: {epoch}/{epochs}, e_loss: {step_e_loss_0}")
self.training_losses_history["autoencoder"] = float(step_e_loss_0)
logger.info("Finished Embedding Network Training")
# 2. Training only with supervised loss
logger.info("Start Training with Supervised Loss Only")
# Adversarial Supervised network training
for epoch in tqdm(range(epochs), desc="Adversarial Supervised - training"):
X_ = next(batches)
step_g_loss_s = self._train_supervisor(X_, self.adversarialsup_opt)
# Checkpoint
if checkpoints_interval is not None and epoch % checkpoints_interval == 0:
logger.info(
f"step: {epoch}/{epochs}, s_loss: {np.round(np.sqrt(step_g_loss_s), 4)}"
)
self.training_losses_history["adversarial_supervised"] = float(
np.sqrt(step_g_loss_s)
)
logger.info("Finished Training with Supervised Loss Only")
# 3. Joint Training
logger.info("Start Joint Training")
# GAN with embedding network training
for epoch in tqdm(range(epochs), desc="GAN with embedding - training"):
# Generator training (twice more than discriminator training)
for kk in range(2):
X_ = next(batches)
Z_ = next(self.get_noise_batch())
# --------------------------
# Train the generator
# --------------------------
(
step_g_loss_u,
step_g_loss_u_e,
step_g_loss_s,
step_g_loss_v,
step_g_loss,
) = self._train_generator(X_, Z_, self.generator_opt)
# --------------------------
# Train the embedder
# --------------------------
_, step_e_loss_t0 = self._train_embedder(X_, self.embedder_opt)
X_ = next(batches)
Z_ = next(self.get_noise_batch())
step_d_loss = self._check_discriminator_loss(X_, Z_)
if step_d_loss > 0.15:
logger.info(
"Train Discriminator (discriminator does not work well yet)"
)
step_d_loss = self._train_discriminator(X_, Z_, self.discriminator_opt)
# Print multiple checkpoints
if checkpoints_interval is not None and epoch % checkpoints_interval == 0:
logger.info(
f"""step: {epoch}/{epochs},
d_loss: {np.round(step_d_loss, 4)},
g_loss_u: {np.round(step_g_loss_u, 4)},
g_loss_u_e: {np.round(step_g_loss_u_e, 4)},
g_loss_s: {np.round(np.sqrt(step_g_loss_s), 4)},
g_loss_v: {np.round(step_g_loss_v, 4)},
g_loss_v: {np.round(step_g_loss, 4)},
e_loss_t0: {np.round(np.sqrt(step_e_loss_t0), 4)}"""
)
self.training_losses_history["discriminator"] = float(step_d_loss)
self.training_losses_history["generator_u"] = float(step_g_loss_u)
self.training_losses_history["generator_u_e"] = float(step_g_loss_u_e)
self.training_losses_history["generator_v"] = float(step_g_loss_v)
self.training_losses_history["generator_s"] = float(np.sqrt(step_g_loss_s))
self.training_losses_history["generator"] = float(step_g_loss)
self.training_losses_history["embedder"] = float(np.sqrt(step_e_loss_t0))
# Synthetic data generation
if epoch in generate_synthetic:
_sample = self.generate(n_samples=len(data))
self.synthetic_data_generated_in_training[epoch] = _sample
logger.info("Finished Joint Training")
return
def generate(self, n_samples: int) -> TensorLike:
"""
Generate synthetic time series
"""
steps = n_samples // self.batch_size + 1
data = []
for _ in trange(steps, desc="Synthetic data generation"):
Z_ = next(self.get_noise_batch())
records = self.generator(Z_)
data.append(records)
return np.array(np.vstack(data))[:n_samples]