Spaces:
Sleeping
Sleeping
import gradio as gr | |
import tensorflow as tf | |
import tensorflow_datasets as tfds | |
tf.keras.utils.set_random_seed(1234) | |
class PositionalEncoding(tf.keras.layers.Layer): | |
def __init__(self, position, d_model, **kwargs): | |
super(PositionalEncoding, self).__init__(**kwargs) | |
self.position = position | |
self.d_model = d_model | |
self.pos_encoding = self.positional_encoding(position, d_model) | |
def get_config(self): | |
config = super(PositionalEncoding, self).get_config() | |
config.update( | |
{ | |
"position": self.position, | |
"d_model": self.d_model, | |
} | |
) | |
return config | |
def get_angles(self, position, i, d_model): | |
angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32)) | |
return position * angles | |
def positional_encoding(self, position, d_model): | |
angle_rads = self.get_angles( | |
position=tf.range(position, dtype=tf.float32)[:, tf.newaxis], | |
i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :], | |
d_model=d_model, | |
) | |
# apply sin to even index in the array | |
sines = tf.math.sin(angle_rads[:, 0::2]) | |
# apply cos to odd index in the array | |
cosines = tf.math.cos(angle_rads[:, 1::2]) | |
pos_encoding = tf.concat([sines, cosines], axis=-1) | |
pos_encoding = pos_encoding[tf.newaxis, ...] | |
return tf.cast(pos_encoding, tf.float32) | |
def call(self, inputs): | |
return inputs + self.pos_encoding[:, : tf.shape(inputs)[1], :] | |
def scaled_dot_product_attention(query, key, value, mask): | |
"""Calculate the attention weights.""" | |
matmul_qk = tf.matmul(query, key, transpose_b=True) | |
# scale matmul_qk | |
depth = tf.cast(tf.shape(key)[-1], tf.float32) | |
logits = matmul_qk / tf.math.sqrt(depth) | |
# add the mask to zero out padding tokens | |
if mask is not None: | |
logits += mask * -1e9 | |
# softmax is normalized on the last axis (seq_len_k) | |
attention_weights = tf.nn.softmax(logits, axis=-1) | |
output = tf.matmul(attention_weights, value) | |
return output | |
def create_padding_mask(x): | |
mask = tf.cast(tf.math.equal(x, 0), tf.float32) | |
# (batch_size, 1, 1, sequence length) | |
return mask[:, tf.newaxis, tf.newaxis, :] | |
def create_look_ahead_mask(x): | |
seq_len = tf.shape(x)[1] | |
look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0) | |
padding_mask = create_padding_mask(x) | |
return tf.maximum(look_ahead_mask, padding_mask) | |
class MultiHeadAttentionLayer(tf.keras.layers.Layer): | |
def __init__(self, d_model, num_heads, **kwargs): | |
assert d_model % num_heads == 0 | |
super(MultiHeadAttentionLayer, self).__init__(**kwargs) | |
self.num_heads = num_heads | |
self.d_model = d_model | |
self.depth = d_model // self.num_heads | |
self.query_dense = tf.keras.layers.Dense(units=d_model) | |
self.key_dense = tf.keras.layers.Dense(units=d_model) | |
self.value_dense = tf.keras.layers.Dense(units=d_model) | |
self.dense = tf.keras.layers.Dense(units=d_model) | |
def get_config(self): | |
config = super(MultiHeadAttentionLayer, self).get_config() | |
config.update( | |
{ | |
"num_heads": self.num_heads, | |
"d_model": self.d_model, | |
} | |
) | |
return config | |
def split_heads(self, inputs, batch_size): | |
inputs = tf.keras.layers.Lambda( | |
lambda inputs: tf.reshape( | |
inputs, shape=(batch_size, -1, self.num_heads, self.depth) | |
) | |
)(inputs) | |
return tf.keras.layers.Lambda( | |
lambda inputs: tf.transpose(inputs, perm=[0, 2, 1, 3]) | |
)(inputs) | |
def call(self, inputs): | |
query, key, value, mask = ( | |
inputs["query"], | |
inputs["key"], | |
inputs["value"], | |
inputs["mask"], | |
) | |
batch_size = tf.shape(query)[0] | |
# linear layers | |
query = self.query_dense(query) | |
key = self.key_dense(key) | |
value = self.value_dense(value) | |
# split heads | |
query = self.split_heads(query, batch_size) | |
key = self.split_heads(key, batch_size) | |
value = self.split_heads(value, batch_size) | |
# scaled dot-product attention | |
scaled_attention = scaled_dot_product_attention(query, key, value, mask) | |
scaled_attention = tf.keras.layers.Lambda( | |
lambda scaled_attention: tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) | |
)(scaled_attention) | |
# concatenation of heads | |
concat_attention = tf.keras.layers.Lambda( | |
lambda scaled_attention: tf.reshape( | |
scaled_attention, (batch_size, -1, self.d_model) | |
) | |
)(scaled_attention) | |
# final linear layer | |
outputs = self.dense(concat_attention) | |
return outputs | |
model = tf.keras.models.load_model( | |
"model/model1.h5", | |
custom_objects={ | |
"PositionalEncoding": PositionalEncoding, | |
"MultiHeadAttentionLayer": MultiHeadAttentionLayer, | |
}, | |
compile=False, | |
) | |
tokenizer = tfds.deprecated.text.SubwordTextEncoder.load_from_file('model/model1') | |
MAX_LENGTH = 40 | |
# Define start and end token to indicate the start and end of a sentence | |
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1] | |
# Vocabulary size plus start and end token | |
VOCAB_SIZE = tokenizer.vocab_size + 2 | |
def evaluate(sentence): | |
sentence = tf.expand_dims( | |
START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0 | |
) | |
output = tf.expand_dims(START_TOKEN, 0) | |
for i in range(MAX_LENGTH): | |
predictions = model(inputs=[sentence, output], training=False) | |
# select the last word from the seq_len dimension | |
predictions = predictions[:, -1:, :] | |
predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32) | |
# return the result if the predicted_id is equal to the end token | |
if tf.equal(predicted_id, END_TOKEN[0]): | |
break | |
# concatenated the predicted_id to the output which is given to the decoder | |
# as its input. | |
output = tf.concat([output, predicted_id], axis=-1) | |
return tf.squeeze(output, axis=0) | |
def predict(sentence): | |
prediction = evaluate(sentence) | |
predicted_sentence = tokenizer.decode( | |
[i for i in prediction if i < tokenizer.vocab_size] | |
) | |
return predicted_sentence | |
def process_input(sentence, history): | |
history.append(sentence) | |
inp = " < ".join(history[-2:]) | |
print(inp) | |
resp = predict(inp) | |
history.append(resp) | |
return ([(history[i], history[i+1]) for (i, x) in list(enumerate(history))[::2]], history) | |
with gr.Blocks() as demo: | |
markdown = gr.Markdown( | |
"# <center> Tmislit-97\n" | |
"##### <center> By عبدالله ف. الغرابي \n" | |
""" | |
\n<div id="header" align="center" style="margin-bottom:20px"> | |
<div id="header"><img src="https://media.discordapp.net/attachments/913886328863739944/1077559683688177684/photo_5436021961043066278_c.jpg" width="100" style="border-radius:50%"/></div> | |
<div id="socials" style="display: flex; flow: right; justify-content: center; margin-top: 10px"> | |
<a href="https://www.instagram.com/abdullah.ghrabat/" style="margin-right: 20px"> | |
<img src="https://img.shields.io/badge/Instagram-gray?logo=Instagram&logoColor=Orange&style=for-the-badge&backgroundColor=white" alt="Instagram"/> | |
</a> | |
<a href="https://t.me/abdullah_ghrabat"> | |
<img src="https://img.shields.io/badge/Telegram-blue?logo=Telegram&logoColor=Orange&style=for-the-badge&backgroundColor=white" alt="Instagram"/> | |
</a> | |
</div> | |
</div>\n | |
""" | |
"\n<center> ذكاء اسطناعي مدرب من محادثات الكروب العام\n" | |
"<center> مكون من" | |
'<center>40% ذكاء' | |
'<center>60% تمسلت\n' | |
) | |
chatbot = gr.Chatbot() | |
state = gr.State([]) | |
with gr.Row(): | |
txt = gr.Textbox(show_label=False, placeholder="اكتب رسالتك هنا !").style(container=False) | |
txt.submit(process_input, [txt, state], [chatbot, state]) | |
txt.submit(lambda :"", None, txt) | |
if __name__ == "__main__": | |
demo.launch() |