File size: 4,197 Bytes
af8d508 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import regularizers
import numpy as np
import tensorflow_probability as tfp
#Affine Coupling Layer
## Creating a custom layer with keras API.
output_dim = 256
reg = 0.01
def Coupling(input_shape):
input = keras.layers.Input(shape=input_shape)
t_layer_1 = keras.layers.Dense(
output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
)(input)
t_layer_2 = keras.layers.Dense(
output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
)(t_layer_1)
t_layer_3 = keras.layers.Dense(
output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
)(t_layer_2)
t_layer_4 = keras.layers.Dense(
output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
)(t_layer_3)
t_layer_5 = keras.layers.Dense(
input_shape, activation="linear", kernel_regularizer=regularizers.l2(reg)
)(t_layer_4)
s_layer_1 = keras.layers.Dense(
output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
)(input)
s_layer_2 = keras.layers.Dense(
output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
)(s_layer_1)
s_layer_3 = keras.layers.Dense(
output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
)(s_layer_2)
s_layer_4 = keras.layers.Dense(
output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
)(s_layer_3)
s_layer_5 = keras.layers.Dense(
input_shape, activation="tanh", kernel_regularizer=regularizers.l2(reg)
)(s_layer_4)
return keras.Model(inputs=input, outputs=[s_layer_5, t_layer_5])
#Real NVP
class RealNVP(keras.Model):
def __init__(self, num_coupling_layers):
super(RealNVP, self).__init__()
self.num_coupling_layers = num_coupling_layers
# Distribution of the latent space.
self.distribution = tfp.distributions.MultivariateNormalDiag(
loc=[0.0, 0.0], scale_diag=[1.0, 1.0]
)
self.masks = np.array(
[[0, 1], [1, 0]] * (num_coupling_layers // 2), dtype="float32"
)
self.loss_tracker = keras.metrics.Mean(name="loss")
self.layers_list = [Coupling(2) for i in range(num_coupling_layers)]
@property
def metrics(self):
"""List of the model's metrics.
We make sure the loss tracker is listed as part of `model.metrics`
so that `fit()` and `evaluate()` are able to `reset()` the loss tracker
at the start of each epoch and at the start of an `evaluate()` call.
"""
return [self.loss_tracker]
def call(self, x, training=True):
log_det_inv = 0
direction = 1
if training:
direction = -1
for i in range(self.num_coupling_layers)[::direction]:
x_masked = x * self.masks[i]
reversed_mask = 1 - self.masks[i]
s, t = self.layers_list[i](x_masked)
s *= reversed_mask
t *= reversed_mask
gate = (direction - 1) / 2
x = (
reversed_mask
* (x * tf.exp(direction * s) + direction * t * tf.exp(gate * s))
+ x_masked
)
log_det_inv += gate * tf.reduce_sum(s, [1])
return x, log_det_inv
# Log likelihood of the normal distribution plus the log determinant of the jacobian.
def log_loss(self, x):
y, logdet = self(x)
log_likelihood = self.distribution.log_prob(y) + logdet
return -tf.reduce_mean(log_likelihood)
def train_step(self, data):
with tf.GradientTape() as tape:
loss = self.log_loss(data)
g = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(g, self.trainable_variables))
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def test_step(self, data):
loss = self.log_loss(data)
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def load_model():
return RealNVP(num_coupling_layers=6)
|