# -*- coding: utf-8 -*- """Translation Model.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/1njNMtQLmXo_zVtAKxA91dZJh_bxlLume """ import pathlib import random import string import h5py import re import numpy as np import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers from tensorflow.keras.layers.experimental.preprocessing import TextVectorization from os.path import exists import gdown if not exists("cmn.txt"): url = "https://drive.google.com/uc?id=1FOC2x5HlgcFTMgnGhPjvLWWlEqVTLQno" gdown.download(url, quiet=False) with open('cmn.txt', encoding="utf-8") as f: lines = f.read().split("\n")[:-1] text_pairs = [] for line in lines: eng, cmn, o1 = line.split("\t") text_pairs.append((eng, cmn)) random.shuffle(text_pairs) num_val_samples = int(0.15 * len(text_pairs)) num_train_samples = len(text_pairs) - 2 * num_val_samples train_pairs = text_pairs[:num_train_samples] val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples] test_pairs = text_pairs[num_train_samples + num_val_samples :] strip_chars = string.punctuation + "¿" strip_chars = strip_chars.replace("[", "") strip_chars = strip_chars.replace("]", "") vocab_size = 15000 sequence_length = 20 batch_size = 64 eng_vectorization = TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length, ) cmn_vectorization = TextVectorization( max_tokens=vocab_size, output_mode="int", split='character', output_sequence_length=sequence_length + 1, standardize='strip_punctuation', ) train_eng_texts = [pair[0] for pair in train_pairs] train_cmn_texts = [pair[1] for pair in train_pairs] eng_vectorization.adapt(train_eng_texts) cmn_vectorization.adapt(train_cmn_texts) def format_dataset(eng, cmn): eng = eng_vectorization(eng) cmn = cmn_vectorization(cmn) return ( { "encoder_inputs": eng, "decoder_inputs": cmn[:, :-1], }, cmn[:, 1:], ) def make_dataset(pairs): eng_texts, cmn_texts = zip(*pairs) eng_texts = list(eng_texts) cmn_texts = list(cmn_texts) dataset = tf.data.Dataset.from_tensor_slices((eng_texts, cmn_texts)) dataset = dataset.batch(batch_size) dataset = dataset.map(format_dataset) return dataset.shuffle(2048).prefetch(16).cache() train_ds = make_dataset(train_pairs) val_ds = make_dataset(val_pairs) class TransformerEncoder(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super(TransformerEncoder, self).__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim ) self.dense_proj = keras.Sequential( [ layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim), ] ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.supports_masking = True def call(self, inputs, mask=None): if mask is not None: padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32") attention_output = self.attention( query=inputs, value=inputs, key=inputs, attention_mask=padding_mask ) proj_input = self.layernorm_1(inputs + attention_output) proj_output = self.dense_proj(proj_input) return self.layernorm_2(proj_input + proj_output) def get_config(self): config = super().get_config() config.update({ "embed_dim": self.embed_dim, "dense_dim": self.dense_dim, "num_heads": self.num_heads, }) return config class PositionalEmbedding(layers.Layer): def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs): super(PositionalEmbedding, self).__init__(**kwargs) self.token_embeddings = layers.Embedding( input_dim=vocab_size, output_dim=embed_dim ) self.position_embeddings = layers.Embedding( input_dim=sequence_length, output_dim=embed_dim ) self.sequence_length = sequence_length self.vocab_size = vocab_size self.embed_dim = embed_dim def call(self, inputs): length = tf.shape(inputs)[-1] positions = tf.range(start=0, limit=length, delta=1) embedded_tokens = self.token_embeddings(inputs) embedded_positions = self.position_embeddings(positions) return embedded_tokens + embedded_positions def compute_mask(self, inputs, mask=None): return tf.math.not_equal(inputs, 0) def get_config(self): config = super().get_config() config.update({ "sequence_length": self.sequence_length, "vocab_size": self.vocab_size, "embed_dim": self.embed_dim, }) return config class TransformerDecoder(layers.Layer): def __init__(self, embed_dim, latent_dim, num_heads, **kwargs): super(TransformerDecoder, self).__init__(**kwargs) self.embed_dim = embed_dim self.latent_dim = latent_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim ) self.attention_2 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim ) self.dense_proj = keras.Sequential( [ layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim), ] ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.layernorm_3 = layers.LayerNormalization() self.supports_masking = True def call(self, inputs, encoder_outputs, mask=None): causal_mask = self.get_causal_attention_mask(inputs) if mask is not None: padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32") padding_mask = tf.minimum(padding_mask, causal_mask) attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=causal_mask ) out_1 = self.layernorm_1(inputs + attention_output_1) attention_output_2 = self.attention_2( query=out_1, value=encoder_outputs, key=encoder_outputs, attention_mask=padding_mask, ) out_2 = self.layernorm_2(out_1 + attention_output_2) proj_output = self.dense_proj(out_2) return self.layernorm_3(out_2 + proj_output) def get_causal_attention_mask(self, inputs): input_shape = tf.shape(inputs) batch_size, sequence_length = input_shape[0], input_shape[1] i = tf.range(sequence_length)[:, tf.newaxis] j = tf.range(sequence_length) mask = tf.cast(i >= j, dtype="int32") mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) mult = tf.concat( [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], axis=0, ) return tf.tile(mask, mult) def get_config(self): config = super().get_config() config.update({ "embed_dim": self.embed_dim, "latent_dim": self.latent_dim, "num_heads": self.num_heads, }) return config url = "https://drive.google.com/uc?id=1a4eTAL4sLUi42P28Veihrv-fVPwFymTa" if not exists("re-model.h5"): gdown.download(url, quiet=False) custom_objects = {"TransformerEncoder": TransformerEncoder, "PositionalEmbedding": PositionalEmbedding, "TransformerDecoder": TransformerDecoder} with keras.utils.custom_object_scope(custom_objects): transformer = tf.keras.models.load_model('re-model.h5') cmn_vocab = cmn_vectorization.get_vocabulary() cmn_index_lookup = dict(zip(range(len(cmn_vocab)), cmn_vocab)) max_decoded_sentence_length = 20 def decode_sequence_chinese(input_sentence): tokenized_input_sentence = eng_vectorization([input_sentence]) decoded_sentence = "[start]" for i in range(max_decoded_sentence_length): tokenized_target_sentence = cmn_vectorization([decoded_sentence])[:, :-1] predictions = transformer([tokenized_input_sentence, tokenized_target_sentence]) sampled_token_index = np.argmax(predictions[0, i, :]) sampled_token = cmn_index_lookup[sampled_token_index] decoded_sentence += " " + sampled_token if sampled_token == "[end]": break return decoded_sentence