Demosthene-OR's picture
Update main_dl.py
e5c5c99
raw
history blame
19.5 kB
from fastapi import FastAPI, HTTPException, Header, Depends, Request, Response
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.exceptions import RequestValidationError
import asyncio
from typing import Optional, List
from pydantic import BaseModel, ValidationError
import pandas as pd
import numpy as np
import os
from filesplit.merge import Merge
import tensorflow as tf
import string
import re
from tensorflow import keras
from keras_nlp.layers import TransformerEncoder
from tensorflow.keras import layers
from tensorflow.keras.utils import plot_model
api = FastAPI()
dataPath = "data"
imagePath = "images"
# ===== Keras ====
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
def custom_standardization(input_string):
lowercase = tf.strings.lower(input_string)
lowercase=tf.strings.regex_replace(lowercase, "[à]", "a")
return tf.strings.regex_replace(
lowercase, f"[{re.escape(strip_chars)}]", "")
def load_vocab(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return file.read().split('\n')[:-1]
def decode_sequence_rnn(input_sentence, src, tgt):
global translation_model
vocab_size = 15000
sequence_length = 50
source_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"),
)
target_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length + 1,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"),
)
tgt_vocab = target_vectorization.get_vocabulary()
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab))
max_decoded_sentence_length = 50
tokenized_input_sentence = source_vectorization([input_sentence])
decoded_sentence = "[start]"
for i in range(max_decoded_sentence_length):
tokenized_target_sentence = target_vectorization([decoded_sentence])
next_token_predictions = translation_model.predict(
[tokenized_input_sentence, tokenized_target_sentence], verbose=0)
sampled_token_index = np.argmax(next_token_predictions[0, i, :])
sampled_token = tgt_index_lookup[sampled_token_index]
decoded_sentence += " " + sampled_token
if sampled_token == "[end]":
break
return decoded_sentence[8:-6]
# ===== Enf of Keras ====
# ===== Transformer section ====
class TransformerDecoder(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.attention_2 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.dense_proj = keras.Sequential(
[layers.Dense(dense_dim, activation="relu"),
layers.Dense(embed_dim),]
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.layernorm_3 = layers.LayerNormalization()
self.supports_masking = True
def get_config(self):
config = super().get_config()
config.update({
"embed_dim": self.embed_dim,
"num_heads": self.num_heads,
"dense_dim": self.dense_dim,
})
return config
def get_causal_attention_mask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, sequence_length = input_shape[0], input_shape[1]
i = tf.range(sequence_length)[:, tf.newaxis]
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
mult = tf.concat(
[tf.expand_dims(batch_size, -1),
tf.constant([1, 1], dtype=tf.int32)], axis=0)
return tf.tile(mask, mult)
def call(self, inputs, encoder_outputs, mask=None):
causal_mask = self.get_causal_attention_mask(inputs)
if mask is not None:
padding_mask = tf.cast(
mask[:, tf.newaxis, :], dtype="int32")
padding_mask = tf.minimum(padding_mask, causal_mask)
else:
padding_mask = mask
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=causal_mask)
attention_output_1 = self.layernorm_1(inputs + attention_output_1)
attention_output_2 = self.attention_2(
query=attention_output_1,
value=encoder_outputs,
key=encoder_outputs,
attention_mask=padding_mask,
)
attention_output_2 = self.layernorm_2(
attention_output_1 + attention_output_2)
proj_output = self.dense_proj(attention_output_2)
return self.layernorm_3(attention_output_2 + proj_output)
class PositionalEmbedding(layers.Layer):
def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
super().__init__(**kwargs)
self.token_embeddings = layers.Embedding(
input_dim=input_dim, output_dim=output_dim)
self.position_embeddings = layers.Embedding(
input_dim=sequence_length, output_dim=output_dim)
self.sequence_length = sequence_length
self.input_dim = input_dim
self.output_dim = output_dim
def call(self, inputs):
length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=length, delta=1)
embedded_tokens = self.token_embeddings(inputs)
embedded_positions = self.position_embeddings(positions)
return embedded_tokens + embedded_positions
def compute_mask(self, inputs, mask=None):
return tf.math.not_equal(inputs, 0)
def get_config(self):
config = super(PositionalEmbedding, self).get_config()
config.update({
"output_dim": self.output_dim,
"sequence_length": self.sequence_length,
"input_dim": self.input_dim,
})
return config
def decode_sequence_tranf(input_sentence, src, tgt):
global translation_model
vocab_size = 15000
sequence_length = 30
source_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"),
)
target_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length + 1,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"),
)
tgt_vocab = target_vectorization.get_vocabulary()
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab))
max_decoded_sentence_length = 50
tokenized_input_sentence = source_vectorization([input_sentence])
decoded_sentence = "[start]"
for i in range(max_decoded_sentence_length):
tokenized_target_sentence = target_vectorization(
[decoded_sentence])[:, :-1]
predictions = translation_model(
[tokenized_input_sentence, tokenized_target_sentence])
sampled_token_index = np.argmax(predictions[0, i, :])
sampled_token = tgt_index_lookup[sampled_token_index]
decoded_sentence += " " + sampled_token
if sampled_token == "[end]":
break
return decoded_sentence[8:-6]
# ==== End Transforformer section ====
def load_all_data():
merge = Merge( dataPath+"/rnn_en-fr_split", dataPath, "seq2seq_rnn-model-en-fr.h5").merge(cleanup=False)
merge = Merge( dataPath+"/rnn_fr-en_split", dataPath, "seq2seq_rnn-model-fr-en.h5").merge(cleanup=False)
rnn_en_fr = keras.models.load_model(dataPath+"/seq2seq_rnn-model-en-fr.h5", compile=False)
rnn_fr_en = keras.models.load_model(dataPath+"/seq2seq_rnn-model-fr-en.h5", compile=False)
rnn_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
rnn_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
custom_objects = {"TransformerDecoder": TransformerDecoder, "PositionalEmbedding": PositionalEmbedding}
with keras.saving.custom_object_scope(custom_objects):
transformer_en_fr = keras.models.load_model( "data/transformer-model-en-fr.h5")
transformer_fr_en = keras.models.load_model( "data/transformer-model-fr-en.h5")
merge = Merge( "data/transf_en-fr_weight_split", "data", "transformer-model-en-fr.weights.h5").merge(cleanup=False)
merge = Merge( "data/transf_fr-en_weight_split", "data", "transformer-model-fr-en.weights.h5").merge(cleanup=False)
transformer_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
return rnn_en_fr, rnn_fr_en, transformer_en_fr, transformer_fr_en
rnn_en_fr, rnn_fr_en, transformer_en_fr, transformer_fr_en = load_all_data()
def find_lang_label(lang_sel):
global lang_tgt, label_lang
return label_lang[lang_tgt.index(lang_sel)]
@api.get('/', name="Vérification que l'API fonctionne")
def check_api():
load_all_data()
return {'message': "L'API fonctionne"}
@api.get('/small_vocab/rnn', name="Traduction par RNN")
def trad_rnn(lang_tgt:str,
texte: str):
global translation_model
if (lang_tgt=='en'):
translation_model = rnn_fr_en
return decode_sequence_rnn(texte, "fr", "en")
else:
translation_model = rnn_en_fr
return decode_sequence_rnn(texte, "en", "fr")
@api.get('/small_vocab/transformer', name="Traduction par Transformer")
def trad_transformer(lang_tgt:str,
texte: str):
global translation_model
if (lang_tgt=='en'):
translation_model = transformer_fr_en
return decode_sequence_tranf(texte, "fr", "en")
else:
translation_model = transformer_en_fr
return decode_sequence_tranf(texte, "en", "fr")
@api.get('/small_vocab/plot_model', name="Affiche le modèle")
def affiche_modele(lang_tgt:str,
model_type: str):
global translation_model
if (lang_tgt=='en'):
if model_type=="rnn":
translation_model = rnn_fr_en
else:
translation_model = transformer_fr_en
else:
if model_type=="rnn":
translation_model = rnn_en_fr
else:
translation_model = transformer_en_fr
plot_model(translation_model, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=imagePath+'/model_plot.png')
with open(imagePath+'/model_plot.png', "rb") as image_file:
# Lire les données de l'image
image_data = image_file.read()
# Retourner l'image en tant que réponse HTTP avec le type de contenu approprié
return Response(content=image_data, media_type="image/png")
'''
def run():
global n1, df_data_src, df_data_tgt, translation_model, placeholder, model_speech
global df_data_en, df_data_fr, lang_classifier, translation_en_fr, translation_fr_en
global lang_tgt, label_lang
st.write("")
st.title(tr(title))
#
st.write("## **"+tr("Explications")+" :**\n")
st.markdown(tr(
"""
Enfin, nous avons réalisé une traduction :red[**Seq2Seq**] ("Sequence-to-Sequence") avec des :red[**réseaux neuronaux**].
""")
, unsafe_allow_html=True)
st.markdown(tr(
"""
La traduction Seq2Seq est une méthode d'apprentissage automatique qui permet de traduire des séquences de texte d'une langue à une autre en utilisant
un :red[**encodeur**] pour capturer le sens du texte source, un :red[**décodeur**] pour générer la traduction,
avec un ou plusieurs :red[**vecteurs d'intégration**] qui relient les deux, afin de transmettre le contexte, l'attention ou la position.
""")
, unsafe_allow_html=True)
st.image("assets/deepnlp_graph1.png",use_column_width=True)
st.markdown(tr(
"""
Nous avons mis en oeuvre ces techniques avec des Réseaux Neuronaux Récurrents (GRU en particulier) et des Transformers
Vous en trouverez :red[**5 illustrations**] ci-dessous.
""")
, unsafe_allow_html=True)
# Utilisation du module translate
lang_tgt = ['en','fr','af','ak','sq','de','am','en','ar','hy','as','az','ba','bm','eu','bn','be','my','bs','bg','ks','ca','ny','zh','si','ko','co','ht','hr','da','dz','gd','es','eo','et','ee','fo','fj','fi','fr','fy','gl','cy','lg','ka','el','gn','gu','ha','he','hi','hu','ig','id','iu','ga','is','it','ja','kn','kk','km','ki','rw','ky','rn','ku','lo','la','lv','li','ln','lt','lb','mk','ms','ml','dv','mg','mt','mi','mr','mn','nl','ne','no','nb','nn','oc','or','ug','ur','uz','ps','pa','fa','pl','pt','ro','ru','sm','sg','sa','sc','sr','sn','sd','sk','sl','so','st','su','sv','sw','ss','tg','tl','ty','ta','tt','cs','te','th','bo','ti','to','ts','tn','tr','tk','tw','uk','vi','wo','xh','yi']
label_lang = ['Anglais','Français','Afrikaans','Akan','Albanais','Allemand','Amharique','Anglais','Arabe','Arménien','Assamais','Azéri','Bachkir','Bambara','Basque','Bengali','Biélorusse','Birman','Bosnien','Bulgare','Cachemiri','Catalan','Chichewa','Chinois','Cingalais','Coréen','Corse','Créolehaïtien','Croate','Danois','Dzongkha','Écossais','Espagnol','Espéranto','Estonien','Ewe','Féroïen','Fidjien','Finnois','Français','Frisonoccidental','Galicien','Gallois','Ganda','Géorgien','Grecmoderne','Guarani','Gujarati','Haoussa','Hébreu','Hindi','Hongrois','Igbo','Indonésien','Inuktitut','Irlandais','Islandais','Italien','Japonais','Kannada','Kazakh','Khmer','Kikuyu','Kinyarwanda','Kirghiz','Kirundi','Kurde','Lao','Latin','Letton','Limbourgeois','Lingala','Lituanien','Luxembourgeois','Macédonien','Malais','Malayalam','Maldivien','Malgache','Maltais','MaorideNouvelle-Zélande','Marathi','Mongol','Néerlandais','Népalais','Norvégien','Norvégienbokmål','Norvégiennynorsk','Occitan','Oriya','Ouïghour','Ourdou','Ouzbek','Pachto','Pendjabi','Persan','Polonais','Portugais','Roumain','Russe','Samoan','Sango','Sanskrit','Sarde','Serbe','Shona','Sindhi','Slovaque','Slovène','Somali','SothoduSud','Soundanais','Suédois','Swahili','Swati','Tadjik','Tagalog','Tahitien','Tamoul','Tatar','Tchèque','Télougou','Thaï','Tibétain','Tigrigna','Tongien','Tsonga','Tswana','Turc','Turkmène','Twi','Ukrainien','Vietnamien','Wolof','Xhosa','Yiddish']
lang_src = {'ar': 'arabic', 'bg': 'bulgarian', 'de': 'german', 'el':'modern greek', 'en': 'english', 'es': 'spanish', 'fr': 'french', \
'hi': 'hindi', 'it': 'italian', 'ja': 'japanese', 'nl': 'dutch', 'pl': 'polish', 'pt': 'portuguese', 'ru': 'russian', 'sw': 'swahili', \
'th': 'thai', 'tr': 'turkish', 'ur': 'urdu', 'vi': 'vietnamese', 'zh': 'chinese'}
st.write("#### "+tr("Choisissez le type de traduction")+" :")
chosen_id = tab_bar(data=[
TabBarItemData(id="tab1", title="small vocab", description=tr("avec Keras et un RNN")),
TabBarItemData(id="tab2", title="small vocab", description=tr("avec Keras et un Transformer")),
TabBarItemData(id="tab3", title=tr("Phrase personnelle"), description=tr("à écrire")),
TabBarItemData(id="tab4", title=tr("Phrase personnelle"), description=tr("à dicter")),
TabBarItemData(id="tab5", title=tr("Funny translation !"), description=tr("avec le Fine Tuning"))],
default="tab1")
if (chosen_id == "tab1") or (chosen_id == "tab2") :
if (chosen_id == "tab1"):
st.write("<center><h5><b>"+tr("Schéma d'un Réseau de Neurones Récurrents")+"</b></h5></center>", unsafe_allow_html=True)
st.image("assets/deepnlp_graph3.png",use_column_width=True)
else:
st.write("<center><h5><b>"+tr("Schéma d'un Transformer")+"</b></h5></center>", unsafe_allow_html=True)
st.image("assets/deepnlp_graph12.png",use_column_width=True)
st.write("## **"+tr("Paramètres")+" :**\n")
TabContainerHolder = st.container()
Sens = TabContainerHolder.radio(tr('Sens')+':',('Anglais -> Français','Français -> Anglais'), horizontal=True)
Lang = ('en_fr' if Sens=='Anglais -> Français' else 'fr_en')
if (Lang=='en_fr'):
df_data_src = df_data_en
df_data_tgt = df_data_fr
if (chosen_id == "tab1"):
translation_model = rnn_en_fr
else:
translation_model = transformer_en_fr
else:
df_data_src = df_data_fr
df_data_tgt = df_data_en
if (chosen_id == "tab1"):
translation_model = rnn_fr_en
else:
translation_model = transformer_fr_en
sentence1 = st.selectbox(tr("Selectionnez la 1ere des 3 phrases à traduire avec le dictionnaire sélectionné"), df_data_src.iloc[:-4],index=int(n1) )
n1 = df_data_src[df_data_src[0]==sentence1].index.values[0]
st.write("## **"+tr("Résultats")+" :**\n")
if (chosen_id == "tab1"):
display_translation(n1, Lang,1)
else:
display_translation(n1, Lang,2)
st.write("## **"+tr("Details sur la méthode")+" :**\n")
if (chosen_id == "tab1"):
st.markdown(tr(
"""
Nous avons utilisé 2 Gated Recurrent Units.
Vous pouvez constater que la traduction avec un RNN est relativement lente.
Ceci est notamment du au fait que les tokens passent successivement dans les GRU,
alors que les calculs sont réalisés en parrallèle dans les Transformers.
Le score BLEU est bien meilleur que celui des traductions mot à mot.
<br>
""")
, unsafe_allow_html=True)
else:
st.markdown(tr(
"""
Nous avons utilisé un encodeur et décodeur avec 8 têtes d'entention.
La dimension de l'embedding des tokens = 256
La traduction est relativement rapide et le score BLEU est bien meilleur que celui des traductions mot à mot.
<br>
""")
, unsafe_allow_html=True)
st.write("<center><h5>"+tr("Architecture du modèle utilisé")+":</h5>", unsafe_allow_html=True)
plot_model(translation_model, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=st.session_state.ImagePath+'/model_plot.png')
st.image(st.session_state.ImagePath+'/model_plot.png',use_column_width=True)
st.write("</center>", unsafe_allow_html=True)
'''