from fastapi import FastAPI, HTTPException, Header, Depends, Request, Response, Query from fastapi.responses import JSONResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.exceptions import RequestValidationError import asyncio from typing import Optional, List from pydantic import BaseModel, ValidationError import pandas as pd import numpy as np import os from filesplit.merge import Merge import tensorflow as tf import string import re import json import csv import tiktoken from sklearn.preprocessing import LabelEncoder from tensorflow import keras from keras_nlp.layers import TransformerEncoder from tensorflow.keras import layers from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.utils import plot_model api = FastAPI() dataPath = "data" imagePath = "images" # ===== Keras ==== strip_chars = string.punctuation + "¿" strip_chars = strip_chars.replace("[", "") strip_chars = strip_chars.replace("]", "") def custom_standardization(input_string): lowercase = tf.strings.lower(input_string) lowercase=tf.strings.regex_replace(lowercase, "[à]", "a") return tf.strings.regex_replace( lowercase, f"[{re.escape(strip_chars)}]", "") def load_vocab(file_path): with open(file_path, "r", encoding="utf-8") as file: return file.read().split('\n')[:-1] def decode_sequence_rnn(input_sentence, src, tgt): global translation_model vocab_size = 15000 sequence_length = 50 source_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length, standardize=custom_standardization, vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"), ) target_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length + 1, standardize=custom_standardization, vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"), ) tgt_vocab = target_vectorization.get_vocabulary() tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab)) max_decoded_sentence_length = 50 tokenized_input_sentence = source_vectorization([input_sentence]) decoded_sentence = "[start]" for i in range(max_decoded_sentence_length): tokenized_target_sentence = target_vectorization([decoded_sentence]) next_token_predictions = translation_model.predict( [tokenized_input_sentence, tokenized_target_sentence], verbose=0) sampled_token_index = np.argmax(next_token_predictions[0, i, :]) sampled_token = tgt_index_lookup[sampled_token_index] decoded_sentence += " " + sampled_token if sampled_token == "[end]": break return decoded_sentence[8:-6] # ===== Enf of Keras ==== # ===== Transformer section ==== class TransformerDecoder(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim) self.attention_2 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim) self.dense_proj = keras.Sequential( [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),] ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.layernorm_3 = layers.LayerNormalization() self.supports_masking = True def get_config(self): config = super().get_config() config.update({ "embed_dim": self.embed_dim, "num_heads": self.num_heads, "dense_dim": self.dense_dim, }) return config def get_causal_attention_mask(self, inputs): input_shape = tf.shape(inputs) batch_size, sequence_length = input_shape[0], input_shape[1] i = tf.range(sequence_length)[:, tf.newaxis] j = tf.range(sequence_length) mask = tf.cast(i >= j, dtype="int32") mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) mult = tf.concat( [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], axis=0) return tf.tile(mask, mult) def call(self, inputs, encoder_outputs, mask=None): causal_mask = self.get_causal_attention_mask(inputs) if mask is not None: padding_mask = tf.cast( mask[:, tf.newaxis, :], dtype="int32") padding_mask = tf.minimum(padding_mask, causal_mask) else: padding_mask = mask attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=causal_mask) attention_output_1 = self.layernorm_1(inputs + attention_output_1) attention_output_2 = self.attention_2( query=attention_output_1, value=encoder_outputs, key=encoder_outputs, attention_mask=padding_mask, ) attention_output_2 = self.layernorm_2( attention_output_1 + attention_output_2) proj_output = self.dense_proj(attention_output_2) return self.layernorm_3(attention_output_2 + proj_output) class PositionalEmbedding(layers.Layer): def __init__(self, sequence_length, input_dim, output_dim, **kwargs): super().__init__(**kwargs) self.token_embeddings = layers.Embedding( input_dim=input_dim, output_dim=output_dim) self.position_embeddings = layers.Embedding( input_dim=sequence_length, output_dim=output_dim) self.sequence_length = sequence_length self.input_dim = input_dim self.output_dim = output_dim def call(self, inputs): length = tf.shape(inputs)[-1] positions = tf.range(start=0, limit=length, delta=1) embedded_tokens = self.token_embeddings(inputs) embedded_positions = self.position_embeddings(positions) return embedded_tokens + embedded_positions def compute_mask(self, inputs, mask=None): return tf.math.not_equal(inputs, 0) def get_config(self): config = super(PositionalEmbedding, self).get_config() config.update({ "output_dim": self.output_dim, "sequence_length": self.sequence_length, "input_dim": self.input_dim, }) return config def decode_sequence_transf(input_sentence, src, tgt): global translation_model vocab_size = 15000 sequence_length = 30 source_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length, standardize=custom_standardization, vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"), ) target_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length + 1, standardize=custom_standardization, vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"), ) tgt_vocab = target_vectorization.get_vocabulary() tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab)) max_decoded_sentence_length = 50 tokenized_input_sentence = source_vectorization([input_sentence]) decoded_sentence = "[start]" for i in range(max_decoded_sentence_length): tokenized_target_sentence = target_vectorization( [decoded_sentence])[:, :-1] predictions = translation_model( [tokenized_input_sentence, tokenized_target_sentence]) sampled_token_index = np.argmax(predictions[0, i, :]) sampled_token = tgt_index_lookup[sampled_token_index] decoded_sentence += " " + sampled_token if sampled_token == "[end]": break return decoded_sentence[8:-6] # ==== End Transforformer section ==== def load_rnn(): merge = Merge( dataPath+"/rnn_en-fr_split", dataPath, "seq2seq_rnn-model-en-fr.h5").merge(cleanup=False) merge = Merge( dataPath+"/rnn_fr-en_split", dataPath, "seq2seq_rnn-model-fr-en.h5").merge(cleanup=False) rnn_en_fr = keras.models.load_model(dataPath+"/seq2seq_rnn-model-en-fr.h5") # , compile=False) rnn_fr_en = keras.models.load_model(dataPath+"/seq2seq_rnn-model-fr-en.h5") # , compile=False) rnn_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) rnn_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) return rnn_en_fr, rnn_fr_en def load_transformer(): custom_objects = {"TransformerDecoder": TransformerDecoder, "PositionalEmbedding": PositionalEmbedding} with keras.saving.custom_object_scope(custom_objects): transformer_en_fr = keras.models.load_model( "data/transformer-model-en-fr.h5") transformer_fr_en = keras.models.load_model( "data/transformer-model-fr-en.h5") merge = Merge( "data/transf_en-fr_weight_split", "data", "transformer-model-en-fr.weights.h5").merge(cleanup=False) merge = Merge( "data/transf_fr-en_weight_split", "data", "transformer-model-fr-en.weights.h5").merge(cleanup=False) transformer_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) transformer_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) return transformer_en_fr, transformer_fr_en rnn_en_fr, rnn_fr_en = load_rnn() transformer_en_fr, transformer_fr_en = load_transformer() # ==== Language identifier ==== def encode_text(textes): global tokenizer max_length=250 sequences = tokenizer.encode_batch(textes) return pad_sequences(sequences, maxlen=max_length, padding='post') def read_list_lan(): with open(dataPath+'/multilingue/lan_code.csv', 'r') as fichier_csv: reader = csv.reader(fichier_csv) lan_code = next(reader) return lan_code def init_dl_identifier(): global tokenizer, dl_model, label_encoder, lan_to_language, lan_identified tokenizer = tiktoken.get_encoding("cl100k_base") # Lisez le contenu du fichier JSON with open(dataPath+'/multilingue/lan_to_language.json', 'r') as fichier: lan_to_language = json.load(fichier) label_encoder = LabelEncoder() list_lan = read_list_lan() lan_identified = [lan_to_language[l] for l in list_lan] label_encoder.fit(list_lan) merge = Merge(dataPath+"/dl_id_lang_split", dataPath, "dl_tiktoken_id_language_model.h5").merge(cleanup=False) dl_model = keras.models.load_model(dataPath+"/dl_tiktoken_id_language_model.h5") #, compile=False) return def lang_id_dl(sentences): global dl_model, label_encoder, lan_to_language if 'dl_model' not in globals(): init_dl_identifier() predictions = dl_model.predict(encode_text(sentences)) # Décodage des prédictions en langues predicted_labels_encoded = np.argmax(predictions, axis=1) predicted_languages = label_encoder.classes_[predicted_labels_encoded] if (len(sentences)==1): return lan_to_language[predicted_languages[0]] else: return [l for l in predicted_languages] # ==== Endpoints ==== @api.get('/', name="Vérification que l'API fonctionne") def check_api(): load_rnn() load_transformer() init_dl_identifier() return {'message': "L'API fonctionne"} @api.get('/small_vocab/rnn', name="Traduction par RNN") async def trad_rnn(lang_tgt:str, texte: str): global translation_model if 'translation_model' not in globals(): load_rnn() load_transformer() if (lang_tgt=='en'): translation_model = rnn_fr_en return decode_sequence_rnn(texte, "fr", "en") else: translation_model = rnn_en_fr return decode_sequence_rnn(texte, "en", "fr") @api.get('/small_vocab/transformer', name="Traduction par Transformer") async def trad_transformer(lang_tgt:str, texte: str): global translation_model if 'translation_model' not in globals(): load_rnn() load_transformer() if (lang_tgt=='en'): translation_model = transformer_fr_en return decode_sequence_transf(texte, "fr", "en") else: translation_model = transformer_en_fr return decode_sequence_transf(texte, "en", "fr") @api.get('/small_vocab/plot_model', name="Affiche le modèle") def affiche_modele(model_type: str, lang_tgt:Optional[str]=None): global translation_model, dl_model if model_type=="lang_id": model_to_display = dl_model elif (model_type=="rnn"): if (lang_tgt=='en'): model_to_display = rnn_fr_en else: model_to_display = rnn_en_fr else: if (lang_tgt=='en'): model_to_display = transformer_fr_en else: model_to_display = transformer_en_fr plot_model(model_to_display, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=imagePath+'/model_plot.png') with open(imagePath+'/model_plot.png', "rb") as image_file: # Lire les données de l'image image_data = image_file.read() # Retourner l'image en tant que réponse HTTP avec le type de contenu approprié return Response(content=image_data, media_type="image/png") @api.get('/lang_id_dl', name="Id de langue par DL") async def language_id_dl(sentence:List[str] = Query(..., min_length=1)): return lang_id_dl(sentence) @api.get('/lan_identified', name="Langues identifiées par les modèles") def languages_identified(): global lan_identified if 'lan_identified' not in globals(): init_dl_identifier() return lan_identified