import numpy as np import pandas as pd import tensorflow as tf import math from tqdm import tqdm def scaled_dot_product_attention(q, k, v): # calculate the dot product of query and key dot_product = tf.matmul(q, k, transpose_b=True) # scale the dot product scaled_dot_product = dot_product / tf.math.sqrt(tf.cast(tf.shape(k)[-1], dtype=tf.float32)) # apply softmax activation to obtain attention weights attention_weights = tf.nn.softmax(scaled_dot_product, axis=-1) # compute the weighted sum of the value vectors with attention weights output = tf.matmul(attention_weights, v) return output class LinearLayer(tf.keras.layers.Layer): def __init__(self, ix, ox): super().__init__() self.ix = ix self.ox = ox def build(self, input_shapes): self.w1 = self.add_weight(shape=(self.ix, self.ox)) self.b1 = self.add_weight(shape=(1, self.ox)) def call(self, inputs): bz, key = tf.shape(inputs)[0], tf.shape(inputs)[1] inputs = tf.reshape(inputs, (-1, self.ix)) inputs = tf.matmul(inputs, self.w1) + self.b1 inputs = tf.reshape(inputs, (bz, key, self.ox)) return inputs class split_heads(tf.keras.layers.Layer): def __init__(self, num_heads = 10): super().__init__() self.num_heads = num_heads def call(self, inputs): bz, key = tf.shape(inputs)[0], tf.shape(inputs)[1] inputs = tf.reshape(inputs, (bz, key, self.num_heads, -1)) inputs = tf.transpose(inputs, (0, 2, 1, 3)) return inputs class merge_heads(tf.keras.layers.Layer): def __init__(self): super().__init__() def call(self, inputs): bz, key = tf.shape(inputs)[0], tf.shape(inputs)[2] inputs = tf.transpose(inputs, (0, 2, 1, 3)) inputs = tf.reshape(inputs, (bz, key, -1)) return inputs class GPT_Attention(tf.keras.layers.Layer): def __init__(self, ix, ox, num_heads): super().__init__() self.ix = ix self.ox = ox self.num_heads = num_heads self.linear1 = LinearLayer(self.ix, self.ox * 3) self.split = split_heads(num_heads = self.num_heads) self.merge = merge_heads() self.linear2 = LinearLayer(self.ox, self.ix) if self.ox % self.num_heads != 0: raise ValueError('The value ox = '+ str(self.ox) +' SHOULD be divisible by number of heads provided') def call(self, inputs): if len(inputs) > 0: inputs = inputs[0] inputs = self.linear1(inputs) k, q, v = tf.split(inputs, 3, axis = -1) k = self.split(k) q = self.split(q) v = self.split(v) #k, q, v = tf.split(inputs, 3, axis = -1) inputs = scaled_dot_product_attention(k, q, v) inputs = self.merge(inputs) inputs = self.linear2(inputs) return inputs class MultiHeadAttention(tf.keras.layers.Layer): def __init__(self, num_heads = 8, key_dim = 64, key_embedding = 512): super(MultiHeadAttention, self).__init__() self.num_heads = num_heads self.key_dim = key_dim self.key_embedding = key_embedding self.head_vectors = [] def build(self, input_shape): #print(input_shape) self.W_k = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='key') self.W_q = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='query') self.W_v = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='value') self.W_o = self.add_weight(shape=(self.key_dim, self.key_embedding)) def call(self, inputs): query, key, value = inputs self.head_vectors = [] head_concat = None for i in range(self.num_heads): q = tf.einsum('bij, ij -> bij', query, self.W_q[i]) k = tf.einsum('bij, ij -> bij', key, self.W_k[i]) v = tf.einsum('bij, ij -> bij', value, self.W_v[i]) self.head_vectors += [scaled_dot_product_attention(q, k, v)] head_concat = tf.concat(self.head_vectors, -2) #print(tf.shape(head_concat)) output =tf.einsum('bij, kj -> bkj', head_concat, self.W_o) return output class Decoder(tf.keras.layers.Layer): def __init__(self, num_heads = 8, key_dim = 64, key_embedding = 512, GPT_attention = False): super(Decoder, self).__init__() self.num_heads = num_heads self.key_dim = key_dim self.key_embedding = key_embedding if GPT_attention: self.attention = GPT_Attention(key_embedding, key_embedding, num_heads) else: self.attention = MultiHeadAttention(num_heads = num_heads, key_dim = key_dim, key_embedding = key_embedding) self.normalize1 = tf.keras.layers.LayerNormalization(axis = -2) self.normalize2 = tf.keras.layers.LayerNormalization(axis = -2) def build(self, input_shape): #print(input_shape) self.x1 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='vec1') self.x2 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='vec2') self.y1 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='bias1') self.y2 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='bias2') def call(self, inputs): first_sublayer_output = self.attention((inputs, inputs, inputs)) first_sublayer_output = self.normalize1(first_sublayer_output + inputs) first_nn = tf.einsum('bij, ij -> bij', first_sublayer_output, self.x1) + self.y1 first_nn = tf.keras.activations.relu(first_nn, alpha=0.0, max_value=None, threshold=0.0) second_nn = tf.einsum('bij, ij -> bij', first_nn, self.x2) + self.y2 second_sublayer_output = self.normalize2(second_nn + first_sublayer_output) return second_sublayer_output def positional_function(words, embedding): pos = np.zeros((words, embedding)) for i in range(words): for j in range(embedding): if j%2 == 0: pos[i, j] = math.sin(i/pow(10000, 2*j/(512))) else: pos[i, j] = math.cos(i/pow(10000, 2*j/(512))) return pos class PositionalEmbedding(tf.keras.layers.Layer): def __init__(self, positional_function = positional_function, embedding_size = 512, words = 64): super(PositionalEmbedding, self).__init__() self.embedding_size = embedding_size self.words = words self.pos_mat = tf.cast(tf.convert_to_tensor(positional_function(self.words, self.embedding_size)), tf.float32) def build(self, input_sizes): print(input_sizes) def call(self, inputs): embed = tf.einsum("bij, ij -> bij", inputs, self.pos_mat) return embed def generate_output(model, vectorizer, text_size = 70, gpt_input = 64, input_sequence = []): if input_sequence == []: input_sequence = tf.zeros((1, gpt_input)).numpy() text = tf.zeros((1, text_size)).numpy() text[0][: gpt_input] = input_sequence[0][: gpt_input] GPT = model for i in tqdm(range(gpt_input, text_size)): #print("Iteration number:" + str(i)) output = tf.argmax(GPT(input_sequence), -1).numpy() text[0][i - 1] = output input_sequence = text[0][i - gpt_input : i].reshape(1, gpt_input) op = [vectorizer.get_vocabulary()[int(text[0][i])] for i in range(len(text[0]))] return ' '.join(op)