|
import numpy as np |
|
import pandas as pd |
|
import tensorflow as tf |
|
import math |
|
from tqdm import tqdm |
|
|
|
def scaled_dot_product_attention(q, k, v): |
|
|
|
dot_product = tf.matmul(q, k, transpose_b=True) |
|
|
|
|
|
|
|
scaled_dot_product = dot_product / tf.math.sqrt(tf.cast(tf.shape(k)[-1], dtype=tf.float32)) |
|
|
|
|
|
attention_weights = tf.nn.softmax(scaled_dot_product, axis=-1) |
|
|
|
|
|
output = tf.matmul(attention_weights, v) |
|
|
|
return output |
|
|
|
|
|
class LinearLayer(tf.keras.layers.Layer): |
|
def __init__(self, ix, ox): |
|
super().__init__() |
|
self.ix = ix |
|
self.ox = ox |
|
|
|
|
|
def build(self, input_shapes): |
|
self.w1 = self.add_weight(shape=(self.ix, self.ox)) |
|
self.b1 = self.add_weight(shape=(1, self.ox)) |
|
|
|
def call(self, inputs): |
|
bz, key = tf.shape(inputs)[0], tf.shape(inputs)[1] |
|
inputs = tf.reshape(inputs, (-1, self.ix)) |
|
inputs = tf.matmul(inputs, self.w1) + self.b1 |
|
inputs = tf.reshape(inputs, (bz, key, self.ox)) |
|
return inputs |
|
|
|
|
|
|
|
class split_heads(tf.keras.layers.Layer): |
|
def __init__(self, num_heads = 10): |
|
super().__init__() |
|
self.num_heads = num_heads |
|
|
|
def call(self, inputs): |
|
bz, key = tf.shape(inputs)[0], tf.shape(inputs)[1] |
|
|
|
inputs = tf.reshape(inputs, (bz, key, self.num_heads, -1)) |
|
inputs = tf.transpose(inputs, (0, 2, 1, 3)) |
|
|
|
return inputs |
|
|
|
|
|
class merge_heads(tf.keras.layers.Layer): |
|
def __init__(self): |
|
super().__init__() |
|
|
|
def call(self, inputs): |
|
bz, key = tf.shape(inputs)[0], tf.shape(inputs)[2] |
|
|
|
inputs = tf.transpose(inputs, (0, 2, 1, 3)) |
|
inputs = tf.reshape(inputs, (bz, key, -1)) |
|
return inputs |
|
|
|
|
|
|
|
class GPT_Attention(tf.keras.layers.Layer): |
|
|
|
def __init__(self, ix, ox, num_heads): |
|
super().__init__() |
|
self.ix = ix |
|
self.ox = ox |
|
self.num_heads = num_heads |
|
self.linear1 = LinearLayer(self.ix, self.ox * 3) |
|
self.split = split_heads(num_heads = self.num_heads) |
|
self.merge = merge_heads() |
|
self.linear2 = LinearLayer(self.ox, self.ix) |
|
|
|
if self.ox % self.num_heads != 0: |
|
raise ValueError('The value ox = '+ str(self.ox) +' SHOULD be divisible by number of heads provided') |
|
|
|
def call(self, inputs): |
|
if len(inputs) > 0: |
|
inputs = inputs[0] |
|
inputs = self.linear1(inputs) |
|
k, q, v = tf.split(inputs, 3, axis = -1) |
|
k = self.split(k) |
|
q = self.split(q) |
|
v = self.split(v) |
|
|
|
inputs = scaled_dot_product_attention(k, q, v) |
|
inputs = self.merge(inputs) |
|
inputs = self.linear2(inputs) |
|
|
|
return inputs |
|
|
|
|
|
|
|
class MultiHeadAttention(tf.keras.layers.Layer): |
|
def __init__(self, num_heads = 8, key_dim = 64, key_embedding = 512): |
|
super(MultiHeadAttention, self).__init__() |
|
self.num_heads = num_heads |
|
self.key_dim = key_dim |
|
self.key_embedding = key_embedding |
|
self.head_vectors = [] |
|
|
|
def build(self, input_shape): |
|
|
|
|
|
self.W_k = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='key') |
|
self.W_q = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='query') |
|
self.W_v = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='value') |
|
|
|
self.W_o = self.add_weight(shape=(self.key_dim, self.key_embedding)) |
|
|
|
|
|
def call(self, inputs): |
|
query, key, value = inputs |
|
|
|
self.head_vectors = [] |
|
head_concat = None |
|
|
|
for i in range(self.num_heads): |
|
q = tf.einsum('bij, ij -> bij', query, self.W_q[i]) |
|
k = tf.einsum('bij, ij -> bij', key, self.W_k[i]) |
|
v = tf.einsum('bij, ij -> bij', value, self.W_v[i]) |
|
|
|
self.head_vectors += [scaled_dot_product_attention(q, k, v)] |
|
|
|
|
|
head_concat = tf.concat(self.head_vectors, -2) |
|
|
|
output =tf.einsum('bij, kj -> bkj', head_concat, self.W_o) |
|
|
|
|
|
return output |
|
|
|
class Decoder(tf.keras.layers.Layer): |
|
def __init__(self, num_heads = 8, key_dim = 64, key_embedding = 512, GPT_attention = False): |
|
super(Decoder, self).__init__() |
|
|
|
self.num_heads = num_heads |
|
self.key_dim = key_dim |
|
self.key_embedding = key_embedding |
|
if GPT_attention: |
|
self.attention = GPT_Attention(key_embedding, key_embedding, num_heads) |
|
else: |
|
self.attention = MultiHeadAttention(num_heads = num_heads, key_dim = key_dim, key_embedding = key_embedding) |
|
self.normalize1 = tf.keras.layers.LayerNormalization(axis = -2) |
|
self.normalize2 = tf.keras.layers.LayerNormalization(axis = -2) |
|
|
|
|
|
def build(self, input_shape): |
|
|
|
|
|
self.x1 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='vec1') |
|
self.x2 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='vec2') |
|
|
|
self.y1 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='bias1') |
|
self.y2 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='bias2') |
|
|
|
def call(self, inputs): |
|
|
|
first_sublayer_output = self.attention((inputs, inputs, inputs)) |
|
first_sublayer_output = self.normalize1(first_sublayer_output + inputs) |
|
|
|
first_nn = tf.einsum('bij, ij -> bij', first_sublayer_output, self.x1) + self.y1 |
|
first_nn = tf.keras.activations.relu(first_nn, alpha=0.0, max_value=None, threshold=0.0) |
|
second_nn = tf.einsum('bij, ij -> bij', first_nn, self.x2) + self.y2 |
|
|
|
second_sublayer_output = self.normalize2(second_nn + first_sublayer_output) |
|
|
|
|
|
|
|
return second_sublayer_output |
|
|
|
def positional_function(words, embedding): |
|
pos = np.zeros((words, embedding)) |
|
|
|
for i in range(words): |
|
for j in range(embedding): |
|
if j%2 == 0: |
|
pos[i, j] = math.sin(i/pow(10000, 2*j/(512))) |
|
else: |
|
pos[i, j] = math.cos(i/pow(10000, 2*j/(512))) |
|
|
|
return pos |
|
|
|
|
|
class PositionalEmbedding(tf.keras.layers.Layer): |
|
def __init__(self, positional_function = positional_function, embedding_size = 512, words = 64): |
|
super(PositionalEmbedding, self).__init__() |
|
self.embedding_size = embedding_size |
|
self.words = words |
|
self.pos_mat = tf.cast(tf.convert_to_tensor(positional_function(self.words, self.embedding_size)), tf.float32) |
|
|
|
def build(self, input_sizes): |
|
print(input_sizes) |
|
|
|
def call(self, inputs): |
|
embed = tf.einsum("bij, ij -> bij", inputs, self.pos_mat) |
|
return embed |
|
|
|
def generate_output(model, vectorizer, text_size = 70, gpt_input = 64, input_sequence = []): |
|
|
|
if input_sequence == []: |
|
input_sequence = tf.zeros((1, gpt_input)).numpy() |
|
|
|
text = tf.zeros((1, text_size)).numpy() |
|
text[0][: gpt_input] = input_sequence[0][: gpt_input] |
|
|
|
GPT = model |
|
|
|
|
|
for i in tqdm(range(gpt_input, text_size)): |
|
|
|
output = tf.argmax(GPT(input_sequence), -1).numpy() |
|
text[0][i - 1] = output |
|
input_sequence = text[0][i - gpt_input : i].reshape(1, gpt_input) |
|
|
|
op = [vectorizer.get_vocabulary()[int(text[0][i])] for i in range(len(text[0]))] |
|
return ' '.join(op) |