import gradio as gr import tensorflow as tf import tensorflow_datasets as tfds tf.keras.utils.set_random_seed(1234) class PositionalEncoding(tf.keras.layers.Layer): def __init__(self, position, d_model, **kwargs): super(PositionalEncoding, self).__init__(**kwargs) self.position = position self.d_model = d_model self.pos_encoding = self.positional_encoding(position, d_model) def get_config(self): config = super(PositionalEncoding, self).get_config() config.update( { "position": self.position, "d_model": self.d_model, } ) return config def get_angles(self, position, i, d_model): angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32)) return position * angles def positional_encoding(self, position, d_model): angle_rads = self.get_angles( position=tf.range(position, dtype=tf.float32)[:, tf.newaxis], i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :], d_model=d_model, ) # apply sin to even index in the array sines = tf.math.sin(angle_rads[:, 0::2]) # apply cos to odd index in the array cosines = tf.math.cos(angle_rads[:, 1::2]) pos_encoding = tf.concat([sines, cosines], axis=-1) pos_encoding = pos_encoding[tf.newaxis, ...] return tf.cast(pos_encoding, tf.float32) def call(self, inputs): return inputs + self.pos_encoding[:, : tf.shape(inputs)[1], :] def scaled_dot_product_attention(query, key, value, mask): """Calculate the attention weights.""" matmul_qk = tf.matmul(query, key, transpose_b=True) # scale matmul_qk depth = tf.cast(tf.shape(key)[-1], tf.float32) logits = matmul_qk / tf.math.sqrt(depth) # add the mask to zero out padding tokens if mask is not None: logits += mask * -1e9 # softmax is normalized on the last axis (seq_len_k) attention_weights = tf.nn.softmax(logits, axis=-1) output = tf.matmul(attention_weights, value) return output def create_padding_mask(x): mask = tf.cast(tf.math.equal(x, 0), tf.float32) # (batch_size, 1, 1, sequence length) return mask[:, tf.newaxis, tf.newaxis, :] def create_look_ahead_mask(x): seq_len = tf.shape(x)[1] look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0) padding_mask = create_padding_mask(x) return tf.maximum(look_ahead_mask, padding_mask) class MultiHeadAttentionLayer(tf.keras.layers.Layer): def __init__(self, d_model, num_heads, **kwargs): assert d_model % num_heads == 0 super(MultiHeadAttentionLayer, self).__init__(**kwargs) self.num_heads = num_heads self.d_model = d_model self.depth = d_model // self.num_heads self.query_dense = tf.keras.layers.Dense(units=d_model) self.key_dense = tf.keras.layers.Dense(units=d_model) self.value_dense = tf.keras.layers.Dense(units=d_model) self.dense = tf.keras.layers.Dense(units=d_model) def get_config(self): config = super(MultiHeadAttentionLayer, self).get_config() config.update( { "num_heads": self.num_heads, "d_model": self.d_model, } ) return config def split_heads(self, inputs, batch_size): inputs = tf.keras.layers.Lambda( lambda inputs: tf.reshape( inputs, shape=(batch_size, -1, self.num_heads, self.depth) ) )(inputs) return tf.keras.layers.Lambda( lambda inputs: tf.transpose(inputs, perm=[0, 2, 1, 3]) )(inputs) def call(self, inputs): query, key, value, mask = ( inputs["query"], inputs["key"], inputs["value"], inputs["mask"], ) batch_size = tf.shape(query)[0] # linear layers query = self.query_dense(query) key = self.key_dense(key) value = self.value_dense(value) # split heads query = self.split_heads(query, batch_size) key = self.split_heads(key, batch_size) value = self.split_heads(value, batch_size) # scaled dot-product attention scaled_attention = scaled_dot_product_attention(query, key, value, mask) scaled_attention = tf.keras.layers.Lambda( lambda scaled_attention: tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) )(scaled_attention) # concatenation of heads concat_attention = tf.keras.layers.Lambda( lambda scaled_attention: tf.reshape( scaled_attention, (batch_size, -1, self.d_model) ) )(scaled_attention) # final linear layer outputs = self.dense(concat_attention) return outputs model = tf.keras.models.load_model( "model/model1.h5", custom_objects={ "PositionalEncoding": PositionalEncoding, "MultiHeadAttentionLayer": MultiHeadAttentionLayer, }, compile=False, ) tokenizer = tfds.deprecated.text.SubwordTextEncoder.load_from_file('model/model1') MAX_LENGTH = 40 # Define start and end token to indicate the start and end of a sentence START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1] # Vocabulary size plus start and end token VOCAB_SIZE = tokenizer.vocab_size + 2 def evaluate(sentence): sentence = tf.expand_dims( START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0 ) output = tf.expand_dims(START_TOKEN, 0) for i in range(MAX_LENGTH): predictions = model(inputs=[sentence, output], training=False) # select the last word from the seq_len dimension predictions = predictions[:, -1:, :] predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32) # return the result if the predicted_id is equal to the end token if tf.equal(predicted_id, END_TOKEN[0]): break # concatenated the predicted_id to the output which is given to the decoder # as its input. output = tf.concat([output, predicted_id], axis=-1) return tf.squeeze(output, axis=0) def predict(sentence): prediction = evaluate(sentence) predicted_sentence = tokenizer.decode( [i for i in prediction if i < tokenizer.vocab_size] ) return predicted_sentence def process_input(sentence, history): history.append(sentence) inp = " < ".join(history[-2:]) print(inp) resp = predict(inp) history.append(resp) return ([(history[i], history[i+1]) for (i, x) in list(enumerate(history))[::2]], history) with gr.Blocks() as demo: markdown = gr.Markdown( "#
Tmislit-97\n" "#####
By عبدالله ف. الغرابي \n" """ \n\n """ "\n
ذكاء اسطناعي مدرب من محادثات الكروب العام\n" "
مكون من" '
40% ذكاء' '
60% تمسلت\n' ) chatbot = gr.Chatbot() state = gr.State([]) with gr.Row(): txt = gr.Textbox(show_label=False, placeholder="اكتب رسالتك هنا !").style(container=False) txt.submit(process_input, [txt, state], [chatbot, state]) txt.submit(lambda :"", None, txt) if __name__ == "__main__": demo.launch()