from fastapi import FastAPI, HTTPException, Header, Depends, Request, Response from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.exceptions import RequestValidationError import asyncio from typing import Optional, List from pydantic import BaseModel, ValidationError import pandas as pd import numpy as np import os from filesplit.merge import Merge import tensorflow as tf import string import re from tensorflow import keras from keras_nlp.layers import TransformerEncoder from tensorflow.keras import layers from tensorflow.keras.utils import plot_model api = FastAPI() dataPath = "data" imagePath = "images" # ===== Keras ==== strip_chars = string.punctuation + "¿" strip_chars = strip_chars.replace("[", "") strip_chars = strip_chars.replace("]", "") def custom_standardization(input_string): lowercase = tf.strings.lower(input_string) lowercase=tf.strings.regex_replace(lowercase, "[à]", "a") return tf.strings.regex_replace( lowercase, f"[{re.escape(strip_chars)}]", "") def load_vocab(file_path): with open(file_path, "r", encoding="utf-8") as file: return file.read().split('\n')[:-1] def decode_sequence_rnn(input_sentence, src, tgt): global translation_model vocab_size = 15000 sequence_length = 50 source_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length, standardize=custom_standardization, vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"), ) target_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length + 1, standardize=custom_standardization, vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"), ) tgt_vocab = target_vectorization.get_vocabulary() tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab)) max_decoded_sentence_length = 50 tokenized_input_sentence = source_vectorization([input_sentence]) decoded_sentence = "[start]" for i in range(max_decoded_sentence_length): tokenized_target_sentence = target_vectorization([decoded_sentence]) next_token_predictions = translation_model.predict( [tokenized_input_sentence, tokenized_target_sentence], verbose=0) sampled_token_index = np.argmax(next_token_predictions[0, i, :]) sampled_token = tgt_index_lookup[sampled_token_index] decoded_sentence += " " + sampled_token if sampled_token == "[end]": break return decoded_sentence[8:-6] # ===== Enf of Keras ==== # ===== Transformer section ==== class TransformerDecoder(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim) self.attention_2 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim) self.dense_proj = keras.Sequential( [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),] ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.layernorm_3 = layers.LayerNormalization() self.supports_masking = True def get_config(self): config = super().get_config() config.update({ "embed_dim": self.embed_dim, "num_heads": self.num_heads, "dense_dim": self.dense_dim, }) return config def get_causal_attention_mask(self, inputs): input_shape = tf.shape(inputs) batch_size, sequence_length = input_shape[0], input_shape[1] i = tf.range(sequence_length)[:, tf.newaxis] j = tf.range(sequence_length) mask = tf.cast(i >= j, dtype="int32") mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) mult = tf.concat( [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], axis=0) return tf.tile(mask, mult) def call(self, inputs, encoder_outputs, mask=None): causal_mask = self.get_causal_attention_mask(inputs) if mask is not None: padding_mask = tf.cast( mask[:, tf.newaxis, :], dtype="int32") padding_mask = tf.minimum(padding_mask, causal_mask) else: padding_mask = mask attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=causal_mask) attention_output_1 = self.layernorm_1(inputs + attention_output_1) attention_output_2 = self.attention_2( query=attention_output_1, value=encoder_outputs, key=encoder_outputs, attention_mask=padding_mask, ) attention_output_2 = self.layernorm_2( attention_output_1 + attention_output_2) proj_output = self.dense_proj(attention_output_2) return self.layernorm_3(attention_output_2 + proj_output) class PositionalEmbedding(layers.Layer): def __init__(self, sequence_length, input_dim, output_dim, **kwargs): super().__init__(**kwargs) self.token_embeddings = layers.Embedding( input_dim=input_dim, output_dim=output_dim) self.position_embeddings = layers.Embedding( input_dim=sequence_length, output_dim=output_dim) self.sequence_length = sequence_length self.input_dim = input_dim self.output_dim = output_dim def call(self, inputs): length = tf.shape(inputs)[-1] positions = tf.range(start=0, limit=length, delta=1) embedded_tokens = self.token_embeddings(inputs) embedded_positions = self.position_embeddings(positions) return embedded_tokens + embedded_positions def compute_mask(self, inputs, mask=None): return tf.math.not_equal(inputs, 0) def get_config(self): config = super(PositionalEmbedding, self).get_config() config.update({ "output_dim": self.output_dim, "sequence_length": self.sequence_length, "input_dim": self.input_dim, }) return config def decode_sequence_tranf(input_sentence, src, tgt): global translation_model vocab_size = 15000 sequence_length = 30 source_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length, standardize=custom_standardization, vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"), ) target_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length + 1, standardize=custom_standardization, vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"), ) tgt_vocab = target_vectorization.get_vocabulary() tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab)) max_decoded_sentence_length = 50 tokenized_input_sentence = source_vectorization([input_sentence]) decoded_sentence = "[start]" for i in range(max_decoded_sentence_length): tokenized_target_sentence = target_vectorization( [decoded_sentence])[:, :-1] predictions = translation_model( [tokenized_input_sentence, tokenized_target_sentence]) sampled_token_index = np.argmax(predictions[0, i, :]) sampled_token = tgt_index_lookup[sampled_token_index] decoded_sentence += " " + sampled_token if sampled_token == "[end]": break return decoded_sentence[8:-6] # ==== End Transforformer section ==== def load_all_data(): merge = Merge( dataPath+"/rnn_en-fr_split", dataPath, "seq2seq_rnn-model-en-fr.h5").merge(cleanup=False) merge = Merge( dataPath+"/rnn_fr-en_split", dataPath, "seq2seq_rnn-model-fr-en.h5").merge(cleanup=False) rnn_en_fr = keras.models.load_model(dataPath+"/seq2seq_rnn-model-en-fr.h5", compile=False) rnn_fr_en = keras.models.load_model(dataPath+"/seq2seq_rnn-model-fr-en.h5", compile=False) rnn_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) rnn_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) custom_objects = {"TransformerDecoder": TransformerDecoder, "PositionalEmbedding": PositionalEmbedding} with keras.saving.custom_object_scope(custom_objects): transformer_en_fr = keras.models.load_model( "data/transformer-model-en-fr.h5") transformer_fr_en = keras.models.load_model( "data/transformer-model-fr-en.h5") merge = Merge( "data/transf_en-fr_weight_split", "data", "transformer-model-en-fr.weights.h5").merge(cleanup=False) merge = Merge( "data/transf_fr-en_weight_split", "data", "transformer-model-fr-en.weights.h5").merge(cleanup=False) transformer_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) transformer_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) return rnn_en_fr, rnn_fr_en, transformer_en_fr, transformer_fr_en rnn_en_fr, rnn_fr_en, transformer_en_fr, transformer_fr_en = load_all_data() def find_lang_label(lang_sel): global lang_tgt, label_lang return label_lang[lang_tgt.index(lang_sel)] @api.get('/', name="Vérification que l'API fonctionne") def check_api(): load_all_data() return {'message': "L'API fonctionne"} @api.get('/small_vocab/rnn', name="Traduction par RNN") async def trad_rnn(lang_tgt:str, texte: str): global translation_model if (lang_tgt=='en'): translation_model = rnn_fr_en return decode_sequence_rnn(texte, "fr", "en") else: translation_model = rnn_en_fr return decode_sequence_rnn(texte, "en", "fr") @api.get('/small_vocab/transformer', name="Traduction par Transformer") async def trad_transformer(lang_tgt:str, texte: str): global translation_model if (lang_tgt=='en'): translation_model = transformer_fr_en return decode_sequence_tranf(texte, "fr", "en") else: translation_model = transformer_en_fr return decode_sequence_tranf(texte, "en", "fr") @api.get('/small_vocab/plot_model', name="Affiche le modèle") def affiche_modele(lang_tgt:str, model_type: str): global translation_model if (lang_tgt=='en'): if model_type=="rnn": translation_model = rnn_fr_en else: translation_model = transformer_fr_en else: if model_type=="rnn": translation_model = rnn_en_fr else: translation_model = transformer_en_fr plot_model(translation_model, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=imagePath+'/model_plot.png') with open(imagePath+'/model_plot.png', "rb") as image_file: # Lire les données de l'image image_data = image_file.read() # Retourner l'image en tant que réponse HTTP avec le type de contenu approprié return Response(content=image_data, media_type="image/png") ''' def run(): global n1, df_data_src, df_data_tgt, translation_model, placeholder, model_speech global df_data_en, df_data_fr, lang_classifier, translation_en_fr, translation_fr_en global lang_tgt, label_lang st.write("") st.title(tr(title)) # st.write("## **"+tr("Explications")+" :**\n") st.markdown(tr( """ Enfin, nous avons réalisé une traduction :red[**Seq2Seq**] ("Sequence-to-Sequence") avec des :red[**réseaux neuronaux**]. """) , unsafe_allow_html=True) st.markdown(tr( """ La traduction Seq2Seq est une méthode d'apprentissage automatique qui permet de traduire des séquences de texte d'une langue à une autre en utilisant un :red[**encodeur**] pour capturer le sens du texte source, un :red[**décodeur**] pour générer la traduction, avec un ou plusieurs :red[**vecteurs d'intégration**] qui relient les deux, afin de transmettre le contexte, l'attention ou la position. """) , unsafe_allow_html=True) st.image("assets/deepnlp_graph1.png",use_column_width=True) st.markdown(tr( """ Nous avons mis en oeuvre ces techniques avec des Réseaux Neuronaux Récurrents (GRU en particulier) et des Transformers Vous en trouverez :red[**5 illustrations**] ci-dessous. """) , unsafe_allow_html=True) # Utilisation du module translate lang_tgt = ['en','fr','af','ak','sq','de','am','en','ar','hy','as','az','ba','bm','eu','bn','be','my','bs','bg','ks','ca','ny','zh','si','ko','co','ht','hr','da','dz','gd','es','eo','et','ee','fo','fj','fi','fr','fy','gl','cy','lg','ka','el','gn','gu','ha','he','hi','hu','ig','id','iu','ga','is','it','ja','kn','kk','km','ki','rw','ky','rn','ku','lo','la','lv','li','ln','lt','lb','mk','ms','ml','dv','mg','mt','mi','mr','mn','nl','ne','no','nb','nn','oc','or','ug','ur','uz','ps','pa','fa','pl','pt','ro','ru','sm','sg','sa','sc','sr','sn','sd','sk','sl','so','st','su','sv','sw','ss','tg','tl','ty','ta','tt','cs','te','th','bo','ti','to','ts','tn','tr','tk','tw','uk','vi','wo','xh','yi'] label_lang = ['Anglais','Français','Afrikaans','Akan','Albanais','Allemand','Amharique','Anglais','Arabe','Arménien','Assamais','Azéri','Bachkir','Bambara','Basque','Bengali','Biélorusse','Birman','Bosnien','Bulgare','Cachemiri','Catalan','Chichewa','Chinois','Cingalais','Coréen','Corse','Créolehaïtien','Croate','Danois','Dzongkha','Écossais','Espagnol','Espéranto','Estonien','Ewe','Féroïen','Fidjien','Finnois','Français','Frisonoccidental','Galicien','Gallois','Ganda','Géorgien','Grecmoderne','Guarani','Gujarati','Haoussa','Hébreu','Hindi','Hongrois','Igbo','Indonésien','Inuktitut','Irlandais','Islandais','Italien','Japonais','Kannada','Kazakh','Khmer','Kikuyu','Kinyarwanda','Kirghiz','Kirundi','Kurde','Lao','Latin','Letton','Limbourgeois','Lingala','Lituanien','Luxembourgeois','Macédonien','Malais','Malayalam','Maldivien','Malgache','Maltais','MaorideNouvelle-Zélande','Marathi','Mongol','Néerlandais','Népalais','Norvégien','Norvégienbokmål','Norvégiennynorsk','Occitan','Oriya','Ouïghour','Ourdou','Ouzbek','Pachto','Pendjabi','Persan','Polonais','Portugais','Roumain','Russe','Samoan','Sango','Sanskrit','Sarde','Serbe','Shona','Sindhi','Slovaque','Slovène','Somali','SothoduSud','Soundanais','Suédois','Swahili','Swati','Tadjik','Tagalog','Tahitien','Tamoul','Tatar','Tchèque','Télougou','Thaï','Tibétain','Tigrigna','Tongien','Tsonga','Tswana','Turc','Turkmène','Twi','Ukrainien','Vietnamien','Wolof','Xhosa','Yiddish'] lang_src = {'ar': 'arabic', 'bg': 'bulgarian', 'de': 'german', 'el':'modern greek', 'en': 'english', 'es': 'spanish', 'fr': 'french', \ 'hi': 'hindi', 'it': 'italian', 'ja': 'japanese', 'nl': 'dutch', 'pl': 'polish', 'pt': 'portuguese', 'ru': 'russian', 'sw': 'swahili', \ 'th': 'thai', 'tr': 'turkish', 'ur': 'urdu', 'vi': 'vietnamese', 'zh': 'chinese'} st.write("#### "+tr("Choisissez le type de traduction")+" :") chosen_id = tab_bar(data=[ TabBarItemData(id="tab1", title="small vocab", description=tr("avec Keras et un RNN")), TabBarItemData(id="tab2", title="small vocab", description=tr("avec Keras et un Transformer")), TabBarItemData(id="tab3", title=tr("Phrase personnelle"), description=tr("à écrire")), TabBarItemData(id="tab4", title=tr("Phrase personnelle"), description=tr("à dicter")), TabBarItemData(id="tab5", title=tr("Funny translation !"), description=tr("avec le Fine Tuning"))], default="tab1") if (chosen_id == "tab1") or (chosen_id == "tab2") : if (chosen_id == "tab1"): st.write("
"+tr("Schéma d'un Réseau de Neurones Récurrents")+"
", unsafe_allow_html=True) st.image("assets/deepnlp_graph3.png",use_column_width=True) else: st.write("
"+tr("Schéma d'un Transformer")+"
", unsafe_allow_html=True) st.image("assets/deepnlp_graph12.png",use_column_width=True) st.write("## **"+tr("Paramètres")+" :**\n") TabContainerHolder = st.container() Sens = TabContainerHolder.radio(tr('Sens')+':',('Anglais -> Français','Français -> Anglais'), horizontal=True) Lang = ('en_fr' if Sens=='Anglais -> Français' else 'fr_en') if (Lang=='en_fr'): df_data_src = df_data_en df_data_tgt = df_data_fr if (chosen_id == "tab1"): translation_model = rnn_en_fr else: translation_model = transformer_en_fr else: df_data_src = df_data_fr df_data_tgt = df_data_en if (chosen_id == "tab1"): translation_model = rnn_fr_en else: translation_model = transformer_fr_en sentence1 = st.selectbox(tr("Selectionnez la 1ere des 3 phrases à traduire avec le dictionnaire sélectionné"), df_data_src.iloc[:-4],index=int(n1) ) n1 = df_data_src[df_data_src[0]==sentence1].index.values[0] st.write("## **"+tr("Résultats")+" :**\n") if (chosen_id == "tab1"): display_translation(n1, Lang,1) else: display_translation(n1, Lang,2) st.write("## **"+tr("Details sur la méthode")+" :**\n") if (chosen_id == "tab1"): st.markdown(tr( """ Nous avons utilisé 2 Gated Recurrent Units. Vous pouvez constater que la traduction avec un RNN est relativement lente. Ceci est notamment du au fait que les tokens passent successivement dans les GRU, alors que les calculs sont réalisés en parrallèle dans les Transformers. Le score BLEU est bien meilleur que celui des traductions mot à mot.
""") , unsafe_allow_html=True) else: st.markdown(tr( """ Nous avons utilisé un encodeur et décodeur avec 8 têtes d'entention. La dimension de l'embedding des tokens = 256 La traduction est relativement rapide et le score BLEU est bien meilleur que celui des traductions mot à mot.
""") , unsafe_allow_html=True) st.write("
"+tr("Architecture du modèle utilisé")+":
", unsafe_allow_html=True) plot_model(translation_model, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=st.session_state.ImagePath+'/model_plot.png') st.image(st.session_state.ImagePath+'/model_plot.png',use_column_width=True) st.write("
", unsafe_allow_html=True) '''