Demosthene-OR's picture
Update main_dl.py
5e30142
from fastapi import FastAPI, HTTPException, Header, Depends, Request, Response, Query
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.exceptions import RequestValidationError
import asyncio
from typing import Optional, List
from pydantic import BaseModel, ValidationError
import pandas as pd
import numpy as np
import os
from filesplit.merge import Merge
import tensorflow as tf
import string
import re
import json
import csv
import tiktoken
from sklearn.preprocessing import LabelEncoder
from tensorflow import keras
from keras_nlp.layers import TransformerEncoder
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import plot_model
api = FastAPI()
dataPath = "data"
imagePath = "images"
# ===== Keras ====
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
def custom_standardization(input_string):
lowercase = tf.strings.lower(input_string)
lowercase=tf.strings.regex_replace(lowercase, "[à]", "a")
return tf.strings.regex_replace(
lowercase, f"[{re.escape(strip_chars)}]", "")
def load_vocab(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return file.read().split('\n')[:-1]
def decode_sequence_rnn(input_sentence, src, tgt):
global translation_model
vocab_size = 15000
sequence_length = 50
source_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"),
)
target_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length + 1,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"),
)
tgt_vocab = target_vectorization.get_vocabulary()
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab))
max_decoded_sentence_length = 50
tokenized_input_sentence = source_vectorization([input_sentence])
decoded_sentence = "[start]"
for i in range(max_decoded_sentence_length):
tokenized_target_sentence = target_vectorization([decoded_sentence])
next_token_predictions = translation_model.predict(
[tokenized_input_sentence, tokenized_target_sentence], verbose=0)
sampled_token_index = np.argmax(next_token_predictions[0, i, :])
sampled_token = tgt_index_lookup[sampled_token_index]
decoded_sentence += " " + sampled_token
if sampled_token == "[end]":
break
return decoded_sentence[8:-6]
# ===== Enf of Keras ====
# ===== Transformer section ====
class TransformerDecoder(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.attention_2 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.dense_proj = keras.Sequential(
[layers.Dense(dense_dim, activation="relu"),
layers.Dense(embed_dim),]
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.layernorm_3 = layers.LayerNormalization()
self.supports_masking = True
def get_config(self):
config = super().get_config()
config.update({
"embed_dim": self.embed_dim,
"num_heads": self.num_heads,
"dense_dim": self.dense_dim,
})
return config
def get_causal_attention_mask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, sequence_length = input_shape[0], input_shape[1]
i = tf.range(sequence_length)[:, tf.newaxis]
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
mult = tf.concat(
[tf.expand_dims(batch_size, -1),
tf.constant([1, 1], dtype=tf.int32)], axis=0)
return tf.tile(mask, mult)
def call(self, inputs, encoder_outputs, mask=None):
causal_mask = self.get_causal_attention_mask(inputs)
if mask is not None:
padding_mask = tf.cast(
mask[:, tf.newaxis, :], dtype="int32")
padding_mask = tf.minimum(padding_mask, causal_mask)
else:
padding_mask = mask
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=causal_mask)
attention_output_1 = self.layernorm_1(inputs + attention_output_1)
attention_output_2 = self.attention_2(
query=attention_output_1,
value=encoder_outputs,
key=encoder_outputs,
attention_mask=padding_mask,
)
attention_output_2 = self.layernorm_2(
attention_output_1 + attention_output_2)
proj_output = self.dense_proj(attention_output_2)
return self.layernorm_3(attention_output_2 + proj_output)
class PositionalEmbedding(layers.Layer):
def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
super().__init__(**kwargs)
self.token_embeddings = layers.Embedding(
input_dim=input_dim, output_dim=output_dim)
self.position_embeddings = layers.Embedding(
input_dim=sequence_length, output_dim=output_dim)
self.sequence_length = sequence_length
self.input_dim = input_dim
self.output_dim = output_dim
def call(self, inputs):
length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=length, delta=1)
embedded_tokens = self.token_embeddings(inputs)
embedded_positions = self.position_embeddings(positions)
return embedded_tokens + embedded_positions
def compute_mask(self, inputs, mask=None):
return tf.math.not_equal(inputs, 0)
def get_config(self):
config = super(PositionalEmbedding, self).get_config()
config.update({
"output_dim": self.output_dim,
"sequence_length": self.sequence_length,
"input_dim": self.input_dim,
})
return config
def decode_sequence_transf(input_sentence, src, tgt):
global translation_model
vocab_size = 15000
sequence_length = 30
source_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"),
)
target_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length + 1,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"),
)
tgt_vocab = target_vectorization.get_vocabulary()
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab))
max_decoded_sentence_length = 50
tokenized_input_sentence = source_vectorization([input_sentence])
decoded_sentence = "[start]"
for i in range(max_decoded_sentence_length):
tokenized_target_sentence = target_vectorization(
[decoded_sentence])[:, :-1]
predictions = translation_model(
[tokenized_input_sentence, tokenized_target_sentence])
sampled_token_index = np.argmax(predictions[0, i, :])
sampled_token = tgt_index_lookup[sampled_token_index]
decoded_sentence += " " + sampled_token
if sampled_token == "[end]":
break
return decoded_sentence[8:-6]
# ==== End Transforformer section ====
def load_rnn():
merge = Merge( dataPath+"/rnn_en-fr_split", dataPath, "seq2seq_rnn-model-en-fr.h5").merge(cleanup=False)
merge = Merge( dataPath+"/rnn_fr-en_split", dataPath, "seq2seq_rnn-model-fr-en.h5").merge(cleanup=False)
rnn_en_fr = keras.models.load_model(dataPath+"/seq2seq_rnn-model-en-fr.h5") # , compile=False)
rnn_fr_en = keras.models.load_model(dataPath+"/seq2seq_rnn-model-fr-en.h5") # , compile=False)
rnn_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
rnn_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
return rnn_en_fr, rnn_fr_en
def load_transformer():
custom_objects = {"TransformerDecoder": TransformerDecoder, "PositionalEmbedding": PositionalEmbedding}
with keras.saving.custom_object_scope(custom_objects):
transformer_en_fr = keras.models.load_model( "data/transformer-model-en-fr.h5")
transformer_fr_en = keras.models.load_model( "data/transformer-model-fr-en.h5")
merge = Merge( "data/transf_en-fr_weight_split", "data", "transformer-model-en-fr.weights.h5").merge(cleanup=False)
merge = Merge( "data/transf_fr-en_weight_split", "data", "transformer-model-fr-en.weights.h5").merge(cleanup=False)
transformer_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
return transformer_en_fr, transformer_fr_en
rnn_en_fr, rnn_fr_en = load_rnn()
transformer_en_fr, transformer_fr_en = load_transformer()
# ==== Language identifier ====
def encode_text(textes):
global tokenizer
max_length=250
sequences = tokenizer.encode_batch(textes)
return pad_sequences(sequences, maxlen=max_length, padding='post')
def read_list_lan():
with open(dataPath+'/multilingue/lan_code.csv', 'r') as fichier_csv:
reader = csv.reader(fichier_csv)
lan_code = next(reader)
return lan_code
def init_dl_identifier():
global tokenizer, dl_model, label_encoder, lan_to_language, lan_identified
tokenizer = tiktoken.get_encoding("cl100k_base")
# Lisez le contenu du fichier JSON
with open(dataPath+'/multilingue/lan_to_language.json', 'r') as fichier:
lan_to_language = json.load(fichier)
label_encoder = LabelEncoder()
list_lan = read_list_lan()
lan_identified = [lan_to_language[l] for l in list_lan]
label_encoder.fit(list_lan)
merge = Merge(dataPath+"/dl_id_lang_split", dataPath, "dl_tiktoken_id_language_model.h5").merge(cleanup=False)
dl_model = keras.models.load_model(dataPath+"/dl_tiktoken_id_language_model.h5") #, compile=False)
return
def lang_id_dl(sentences):
global dl_model, label_encoder, lan_to_language
if 'dl_model' not in globals():
init_dl_identifier()
predictions = dl_model.predict(encode_text(sentences))
# Décodage des prédictions en langues
predicted_labels_encoded = np.argmax(predictions, axis=1)
predicted_languages = label_encoder.classes_[predicted_labels_encoded]
if (len(sentences)==1): return lan_to_language[predicted_languages[0]]
else: return [l for l in predicted_languages]
# ==== Endpoints ====
@api.get('/', name="Vérification que l'API fonctionne")
def check_api():
load_rnn()
load_transformer()
init_dl_identifier()
return {'message': "L'API fonctionne"}
@api.get('/small_vocab/rnn', name="Traduction par RNN")
async def trad_rnn(lang_tgt:str,
texte: str):
global translation_model
if 'translation_model' not in globals():
load_rnn()
load_transformer()
if (lang_tgt=='en'):
translation_model = rnn_fr_en
return decode_sequence_rnn(texte, "fr", "en")
else:
translation_model = rnn_en_fr
return decode_sequence_rnn(texte, "en", "fr")
@api.get('/small_vocab/transformer', name="Traduction par Transformer")
async def trad_transformer(lang_tgt:str,
texte: str):
global translation_model
if 'translation_model' not in globals():
load_rnn()
load_transformer()
if (lang_tgt=='en'):
translation_model = transformer_fr_en
return decode_sequence_transf(texte, "fr", "en")
else:
translation_model = transformer_en_fr
return decode_sequence_transf(texte, "en", "fr")
@api.get('/small_vocab/plot_model', name="Affiche le modèle")
def affiche_modele(model_type: str,
lang_tgt:Optional[str]=None):
global translation_model, dl_model
if model_type=="lang_id":
model_to_display = dl_model
elif (model_type=="rnn"):
if (lang_tgt=='en'):
model_to_display = rnn_fr_en
else:
model_to_display = rnn_en_fr
else:
if (lang_tgt=='en'):
model_to_display = transformer_fr_en
else:
model_to_display = transformer_en_fr
plot_model(model_to_display, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=imagePath+'/model_plot.png')
with open(imagePath+'/model_plot.png', "rb") as image_file:
# Lire les données de l'image
image_data = image_file.read()
# Retourner l'image en tant que réponse HTTP avec le type de contenu approprié
return Response(content=image_data, media_type="image/png")
@api.get('/lang_id_dl', name="Id de langue par DL")
async def language_id_dl(sentence:List[str] = Query(..., min_length=1)):
return lang_id_dl(sentence)
@api.get('/lan_identified', name="Langues identifiées par les modèles")
def languages_identified():
global lan_identified
if 'lan_identified' not in globals():
init_dl_identifier()
return lan_identified