| | import tensorflow as tf |
| | import numpy as np |
| | from tensorflow.keras.layers import * |
| | import os |
| | from datetime import datetime |
| | import json |
| |
|
| | class PositionalEncoding(Layer): |
| | def __init__(self, position, d_model): |
| | super(PositionalEncoding, self).__init__() |
| | self.pos_encoding = self.positional_encoding(position, d_model) |
| | |
| | def get_angles(self, position, i, d_model): |
| | angles = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model)) |
| | return position * angles |
| |
|
| | def positional_encoding(self, position, d_model): |
| | angle_rads = self.get_angles( |
| | position=np.arange(position)[:, np.newaxis], |
| | i=np.arange(d_model)[np.newaxis, :], |
| | d_model=d_model) |
| | |
| | sines = np.sin(angle_rads[:, 0::2]) |
| | cosines = np.cos(angle_rads[:, 1::2]) |
| | pos_encoding = np.concatenate([sines, cosines], axis=-1) |
| | pos_encoding = pos_encoding[np.newaxis, ...] |
| | return tf.cast(pos_encoding, dtype=tf.float32) |
| | |
| | def call(self, inputs): |
| | return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :] |
| |
|
| | class MultiHeadAttention(Layer): |
| | def __init__(self, d_model, num_heads): |
| | super(MultiHeadAttention, self).__init__() |
| | self.num_heads = num_heads |
| | self.d_model = d_model |
| | assert d_model % self.num_heads == 0 |
| | self.depth = d_model // self.num_heads |
| | self.wq = Dense(d_model) |
| | self.wk = Dense(d_model) |
| | self.wv = Dense(d_model) |
| | self.dense = Dense(d_model) |
| | |
| | def split_heads(self, x, batch_size): |
| | x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth)) |
| | return tf.transpose(x, perm=[0, 2, 1, 3]) |
| | |
| | def call(self, v, k, q, mask=None): |
| | batch_size = tf.shape(q)[0] |
| | q = self.wq(q) |
| | k = self.wk(k) |
| | v = self.wv(v) |
| | q = self.split_heads(q, batch_size) |
| | k = self.split_heads(k, batch_size) |
| | v = self.split_heads(v, batch_size) |
| | matmul_qk = tf.matmul(q, k, transpose_b=True) |
| | dk = tf.cast(tf.shape(k)[-1], tf.float32) |
| | scaled_attention_logits = matmul_qk / tf.math.sqrt(dk) |
| | if mask is not None: |
| | scaled_attention_logits += (mask * -1e9) |
| | attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) |
| | output = tf.matmul(attention_weights, v) |
| | output = tf.transpose(output, perm=[0, 2, 1, 3]) |
| | concat_attention = tf.reshape(output, (batch_size, -1, self.d_model)) |
| | output = self.dense(concat_attention) |
| | return output |
| |
|
| | class TransformerBlock(Layer): |
| | def __init__(self, d_model, num_heads, dff, rate=0.1): |
| | super(TransformerBlock, self).__init__() |
| | self.mha = MultiHeadAttention(d_model, num_heads) |
| | self.ffn = tf.keras.Sequential([ |
| | Dense(dff, activation='relu'), |
| | Dense(d_model) |
| | ]) |
| | self.layernorm1 = LayerNormalization(epsilon=1e-6) |
| | self.layernorm2 = LayerNormalization(epsilon=1e-6) |
| | self.dropout1 = Dropout(rate) |
| | self.dropout2 = Dropout(rate) |
| | |
| | def call(self, x, training=False, mask=None): |
| | attn_output = self.mha(x, x, x, mask) |
| | attn_output = self.dropout1(attn_output, training=training) |
| | out1 = self.layernorm1(x + attn_output) |
| | ffn_output = self.ffn(out1) |
| | ffn_output = self.dropout2(ffn_output, training=training) |
| | out2 = self.layernorm2(out1 + ffn_output) |
| | return out2 |
| |
|
| | class TextToSpeechTransformer(tf.keras.Model): |
| | def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, |
| | maximum_position_encoding, rate=0.1): |
| | super(TextToSpeechTransformer, self).__init__() |
| | |
| | self.embedding = Embedding(input_vocab_size, d_model) |
| | self.pos_encoding = PositionalEncoding(maximum_position_encoding, d_model) |
| | self.dropout = Dropout(rate) |
| | |
| | self.transformer_blocks = [ |
| | TransformerBlock(d_model, num_heads, dff, rate) |
| | for _ in range(num_layers) |
| | ] |
| | |
| | self.final_layer = Dense(80) |
| | |
| | def call(self, x, training=False, mask=None): |
| | x = self.embedding(x) |
| | x = self.pos_encoding(x) |
| | x = self.dropout(x, training=training) |
| | |
| | for transformer_block in self.transformer_blocks: |
| | x = transformer_block(x, training=training, mask=mask) |
| | |
| | return self.final_layer(x) |
| |
|
| | class TTSTrainer: |
| | def __init__(self, model_params, training_params): |
| | self.model_params = model_params |
| | self.training_params = training_params |
| | self.model = self._build_model() |
| | self.timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| | self.checkpoint_dir = f"checkpoints/{self.timestamp}" |
| | os.makedirs(self.checkpoint_dir, exist_ok=True) |
| | |
| | def _build_model(self): |
| | model = TextToSpeechTransformer(**self.model_params) |
| | |
| | optimizer = tf.keras.optimizers.Adam( |
| | learning_rate=self.training_params['learning_rate'] |
| | ) |
| | |
| | model.compile( |
| | optimizer=optimizer, |
| | loss=tf.keras.losses.Huber(delta=1.0), |
| | metrics=['mae'] |
| | ) |
| | return model |
| | |
| | def _create_dataset(self, texts, mels, batch_size): |
| | dataset = tf.data.Dataset.from_tensor_slices((texts, mels)) |
| | dataset = dataset.cache() |
| | dataset = dataset.shuffle(10000) |
| | dataset = dataset.batch(batch_size) |
| | dataset = dataset.prefetch(tf.data.AUTOTUNE) |
| | return dataset |
| | |
| | def train(self, texts, mels): |
| | train_size = int(0.9 * len(texts)) |
| | train_texts, val_texts = texts[:train_size], texts[train_size:] |
| | train_mels, val_mels = mels[:train_size], mels[train_size:] |
| | |
| | train_dataset = self._create_dataset( |
| | train_texts, train_mels, self.training_params['batch_size'] |
| | ) |
| | val_dataset = self._create_dataset( |
| | val_texts, val_mels, self.training_params['batch_size'] |
| | ) |
| | |
| | checkpoint_path = f"{self.checkpoint_dir}/model" |
| | os.makedirs(checkpoint_path, exist_ok=True) |
| | |
| | callbacks = [ |
| | tf.keras.callbacks.ModelCheckpoint( |
| | filepath=checkpoint_path, |
| | save_weights_only=True, |
| | save_best_only=True, |
| | monitor='val_loss' |
| | ), |
| | tf.keras.callbacks.EarlyStopping( |
| | monitor='val_loss', |
| | patience=5, |
| | restore_best_weights=True |
| | ), |
| | tf.keras.callbacks.ReduceLROnPlateau( |
| | monitor='val_loss', |
| | factor=0.5, |
| | patience=2 |
| | ), |
| | tf.keras.callbacks.TensorBoard( |
| | log_dir=f"{self.checkpoint_dir}/logs" |
| | ) |
| | ] |
| | |
| | history = self.model.fit( |
| | train_dataset, |
| | validation_data=val_dataset, |
| | epochs=self.training_params['epochs'], |
| | callbacks=callbacks |
| | ) |
| | |
| | self._save_model_and_config() |
| | return history |
| | |
| | def _save_model_and_config(self): |
| | config = { |
| | 'model_params': self.model_params, |
| | 'training_params': self.training_params |
| | } |
| | |
| | config_path = f"{self.checkpoint_dir}/config.json" |
| | with open(config_path, 'w') as f: |
| | json.dump(config, f) |
| | |
| | weights_path = f"{self.checkpoint_dir}/model_weights" |
| | self.model.save_weights(weights_path) |
| | |
| | tf.saved_model.save(self.model, f"{self.checkpoint_dir}/saved_model") |
| | |
| | def load_model(self, checkpoint_dir): |
| | config_path = f"{checkpoint_dir}/config.json" |
| | with open(config_path, 'r') as f: |
| | config = json.load(f) |
| | |
| | self.model = self._build_model() |
| | weights_path = f"{checkpoint_dir}/model_weights" |
| | self.model.load_weights(weights_path) |
| |
|
| | if __name__ == "__main__": |
| | model_params = { |
| | 'num_layers': 6, |
| | 'd_model': 256, |
| | 'num_heads': 8, |
| | 'dff': 1024, |
| | 'input_vocab_size': 1000, |
| | 'maximum_position_encoding': 2048, |
| | 'rate': 0.1 |
| | } |
| | |
| | training_params = { |
| | 'batch_size': 32, |
| | 'epochs': 100, |
| | 'learning_rate': 0.001 |
| | } |
| | |
| | trainer = TTSTrainer(model_params, training_params) |
| | |
| | |
| | input_texts = np.random.randint(0, 1000, size=(1000, 100)) |
| | target_mels = np.random.uniform(size=(1000, 100, 80)) |
| | |
| | history = trainer.train(input_texts, target_mels) |