abhaskumarsinha's picture
Upload 2 files
d891407
raw
history blame
No virus
7.92 kB
import numpy as np
import pandas as pd
import tensorflow as tf
import math
from tqdm import tqdm
def scaled_dot_product_attention(q, k, v):
# calculate the dot product of query and key
dot_product = tf.matmul(q, k, transpose_b=True)
# scale the dot product
scaled_dot_product = dot_product / tf.math.sqrt(tf.cast(tf.shape(k)[-1], dtype=tf.float32))
# apply softmax activation to obtain attention weights
attention_weights = tf.nn.softmax(scaled_dot_product, axis=-1)
# compute the weighted sum of the value vectors with attention weights
output = tf.matmul(attention_weights, v)
return output
class LinearLayer(tf.keras.layers.Layer):
def __init__(self, ix, ox):
super().__init__()
self.ix = ix
self.ox = ox
def build(self, input_shapes):
self.w1 = self.add_weight(shape=(self.ix, self.ox))
self.b1 = self.add_weight(shape=(1, self.ox))
def call(self, inputs):
bz, key = tf.shape(inputs)[0], tf.shape(inputs)[1]
inputs = tf.reshape(inputs, (-1, self.ix))
inputs = tf.matmul(inputs, self.w1) + self.b1
inputs = tf.reshape(inputs, (bz, key, self.ox))
return inputs
class split_heads(tf.keras.layers.Layer):
def __init__(self, num_heads = 10):
super().__init__()
self.num_heads = num_heads
def call(self, inputs):
bz, key = tf.shape(inputs)[0], tf.shape(inputs)[1]
inputs = tf.reshape(inputs, (bz, key, self.num_heads, -1))
inputs = tf.transpose(inputs, (0, 2, 1, 3))
return inputs
class merge_heads(tf.keras.layers.Layer):
def __init__(self):
super().__init__()
def call(self, inputs):
bz, key = tf.shape(inputs)[0], tf.shape(inputs)[2]
inputs = tf.transpose(inputs, (0, 2, 1, 3))
inputs = tf.reshape(inputs, (bz, key, -1))
return inputs
class GPT_Attention(tf.keras.layers.Layer):
def __init__(self, ix, ox, num_heads):
super().__init__()
self.ix = ix
self.ox = ox
self.num_heads = num_heads
self.linear1 = LinearLayer(self.ix, self.ox * 3)
self.split = split_heads(num_heads = self.num_heads)
self.merge = merge_heads()
self.linear2 = LinearLayer(self.ox, self.ix)
if self.ox % self.num_heads != 0:
raise ValueError('The value ox = '+ str(self.ox) +' SHOULD be divisible by number of heads provided')
def call(self, inputs):
if len(inputs) > 0:
inputs = inputs[0]
inputs = self.linear1(inputs)
k, q, v = tf.split(inputs, 3, axis = -1)
k = self.split(k)
q = self.split(q)
v = self.split(v)
#k, q, v = tf.split(inputs, 3, axis = -1)
inputs = scaled_dot_product_attention(k, q, v)
inputs = self.merge(inputs)
inputs = self.linear2(inputs)
return inputs
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, num_heads = 8, key_dim = 64, key_embedding = 512):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.key_dim = key_dim
self.key_embedding = key_embedding
self.head_vectors = []
def build(self, input_shape):
#print(input_shape)
self.W_k = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='key')
self.W_q = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='query')
self.W_v = self.add_weight(shape=(self.num_heads, self.key_dim, self.key_embedding), name='value')
self.W_o = self.add_weight(shape=(self.key_dim, self.key_embedding))
def call(self, inputs):
query, key, value = inputs
self.head_vectors = []
head_concat = None
for i in range(self.num_heads):
q = tf.einsum('bij, ij -> bij', query, self.W_q[i])
k = tf.einsum('bij, ij -> bij', key, self.W_k[i])
v = tf.einsum('bij, ij -> bij', value, self.W_v[i])
self.head_vectors += [scaled_dot_product_attention(q, k, v)]
head_concat = tf.concat(self.head_vectors, -2)
#print(tf.shape(head_concat))
output =tf.einsum('bij, kj -> bkj', head_concat, self.W_o)
return output
class Decoder(tf.keras.layers.Layer):
def __init__(self, num_heads = 8, key_dim = 64, key_embedding = 512, GPT_attention = False):
super(Decoder, self).__init__()
self.num_heads = num_heads
self.key_dim = key_dim
self.key_embedding = key_embedding
if GPT_attention:
self.attention = GPT_Attention(key_embedding, key_embedding, num_heads)
else:
self.attention = MultiHeadAttention(num_heads = num_heads, key_dim = key_dim, key_embedding = key_embedding)
self.normalize1 = tf.keras.layers.LayerNormalization(axis = -2)
self.normalize2 = tf.keras.layers.LayerNormalization(axis = -2)
def build(self, input_shape):
#print(input_shape)
self.x1 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='vec1')
self.x2 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='vec2')
self.y1 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='bias1')
self.y2 = self.add_weight(shape=(self.key_dim, self.key_embedding), name='bias2')
def call(self, inputs):
first_sublayer_output = self.attention((inputs, inputs, inputs))
first_sublayer_output = self.normalize1(first_sublayer_output + inputs)
first_nn = tf.einsum('bij, ij -> bij', first_sublayer_output, self.x1) + self.y1
first_nn = tf.keras.activations.relu(first_nn, alpha=0.0, max_value=None, threshold=0.0)
second_nn = tf.einsum('bij, ij -> bij', first_nn, self.x2) + self.y2
second_sublayer_output = self.normalize2(second_nn + first_sublayer_output)
return second_sublayer_output
def positional_function(words, embedding):
pos = np.zeros((words, embedding))
for i in range(words):
for j in range(embedding):
if j%2 == 0:
pos[i, j] = math.sin(i/pow(10000, 2*j/(512)))
else:
pos[i, j] = math.cos(i/pow(10000, 2*j/(512)))
return pos
class PositionalEmbedding(tf.keras.layers.Layer):
def __init__(self, positional_function = positional_function, embedding_size = 512, words = 64):
super(PositionalEmbedding, self).__init__()
self.embedding_size = embedding_size
self.words = words
self.pos_mat = tf.cast(tf.convert_to_tensor(positional_function(self.words, self.embedding_size)), tf.float32)
def build(self, input_sizes):
print(input_sizes)
def call(self, inputs):
embed = tf.einsum("bij, ij -> bij", inputs, self.pos_mat)
return embed
def generate_output(model, vectorizer, text_size = 70, gpt_input = 64, input_sequence = []):
if input_sequence == []:
input_sequence = tf.zeros((1, gpt_input)).numpy()
text = tf.zeros((1, text_size)).numpy()
text[0][: gpt_input] = input_sequence[0][: gpt_input]
GPT = model
for i in tqdm(range(gpt_input, text_size)):
#print("Iteration number:" + str(i))
output = tf.argmax(GPT(input_sequence), -1).numpy()
text[0][i - 1] = output
input_sequence = text[0][i - gpt_input : i].reshape(1, gpt_input)
op = [vectorizer.get_vocabulary()[int(text[0][i])] for i in range(len(text[0]))]
return ' '.join(op)