import logging import pandas as pd import numpy as np import regex import os import configparser from sentence_transformers import SentenceTransformer from scipy.spatial import distance from keras.preprocessing.sequence import pad_sequences from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import MinMaxScaler from tensorflow import keras import pickle from . import nlp, utils config = configparser.ConfigParser() config.read(os.path.join(os.path.dirname(__file__), 'config.ini')) model_name = config["DEFAULT"]["name_model_signature"] model = keras.models.load_model(filepath=utils.get_model_full_path(model_name)) minmax_scaler = pickle.load(open(utils.get_model_full_path(model_name +"/minmax_scaler.p"), "rb")) standard_scaler = pickle.load(open(utils.get_model_full_path(model_name +"/standard_scaler.p"), "rb")) list_name_columns_features = ["line_number", "text", "start", "end", "PER", "ORG", "LOC", "DATE", "TEL", "EMAIL", "WEB", "SIGNATURE", "word_count", "inv_distance_to_merci", "inv_distance_to_cordlt", "inv_distance_to_regards", "inv_distance_to_sincerely", "inv_distance_to_sent_from", "start_with_ps", "position_line", "special_characters_count", "empty_chars_with_prev_line"] list_columns_used_in_model = ["PER", "ORG", "LOC", "DATE", "TEL", "EMAIL", # "WEB", "word_count", "inv_distance_to_merci", "inv_distance_to_cordlt", # "inv_distance_to_regards", "inv_distance_to_sincerely", "inv_distance_to_sent_from", "start_with_ps", "position_line", "special_characters_count", "empty_chars_with_prev_line"] columns_to_scale_minmax = ["PER", "ORG", "LOC", "DATE", "TEL", "EMAIL", "WEB", "position_line", "empty_chars_with_prev_line", "inv_distance_to_merci", "inv_distance_to_cordlt", "inv_distance_to_regards", "inv_distance_to_sincerely", "inv_distance_to_sent_from", "start_with_ps" ] columns_to_scale_standard = ["word_count", "special_characters_count"] def f_retrieve_entities_for_line(df_ner, start=0, end=1e12): """Retrieve all entities in the previously computed dataframe for a specific line Args: df_ner: dataframe containing found entities start: start position of the line in original text end: end position of the line in original text """ if len(df_ner) > 0: df = df_ner.query(f"""(start>= {start} and end <= {end}) or (start<={start} and end>={end})""") return df embedder_model = SentenceTransformer("distiluse-base-multilingual-cased-v1") def f_create_embedding_inv_dist_feature(text1, text2): """ Computing distance between two texts based on their embedding provided by the SentenceTransformer above""" embedding_merci = embedder_model.encode(text1) embedding_line = embedder_model.encode(text2) dist = distance.cosine(embedding_merci, embedding_line) return 1 / (dist + 0.01) def f_create_email_lines_features(text, df_ner=None, position_offset=0): list_lines = nlp.f_split_text_by_lines(text, position_offset) list_features_vectors = [] if df_ner is None: df_ner = nlp.f_ner(text) for line_number in range(0, len(list_lines)): list_features_vectors.append(f_create_line_features(list_lines, line_number, df_ner)) df_features = pd.DataFrame(list_features_vectors, columns=list_name_columns_features) return df_features def f_create_line_features(list_lines, line_number, df_ner): current_line = list_lines[line_number] total_lines = len(list_lines) features_vector = [line_number, current_line[2], current_line[0], current_line[1]] logging.debug(f"Creating line features for {current_line}") df_ner_line = f_retrieve_entities_for_line(df_ner=df_ner, start=current_line[0], end=current_line[1]) # Adding entity to feature vector for entity in ["PER", "ORG", "LOC", "DATE", "TEL", "EMAIL", "WEB", "SIGNATURE"]: value = len(df_ner_line.query(f"entity=='{entity}'")) if df_ner_line is not None else 0 features_vector.append(value) # Adding word count features_vector.append(len(current_line[2].split())) # distance to greeting word "merci" features_vector.append(f_create_embedding_inv_dist_feature("merci", current_line[2].lower())) # distance to greeting word "merci" features_vector.append(f_create_embedding_inv_dist_feature("cordialement", current_line[2].lower())) # distance to greeting word "regards" features_vector.append(f_create_embedding_inv_dist_feature("regards", current_line[2].lower())) # distance to greeting word "regards" features_vector.append(f_create_embedding_inv_dist_feature("sincerely", current_line[2].lower())) # distance to word "sent from" features_vector.append(f_create_embedding_inv_dist_feature("sent from", current_line[2].lower())) # Line start with ps: features_vector.append(regex.match(r"\s*ps *:", current_line[2], flags=regex.IGNORECASE ) is not None) # Adding position line in email position_in_email = (line_number + 1) / total_lines features_vector.append(position_in_email) # Adding special character count special_char_count = len(regex.findall(r"[^\p{L}0-9 .,\n]", current_line[2])) features_vector.append(special_char_count) # Number of empty chars with previous line empty_chars_with_prev_line = 0 if line_number == 0 else current_line[0] - list_lines[line_number - 1][1] features_vector.append(empty_chars_with_prev_line) return features_vector def generate_x_y(df, minmax_scaler=None, standard_scaler=None, n_last_lines_to_keep=30, list_columns=list_columns_used_in_model): df, minmax_scaler, standard_scaler = f_scale_parameters(df, minmax_scaler, standard_scaler) x = df[list_columns].to_numpy()[-n_last_lines_to_keep:, :] x = np.expand_dims(x, axis=0) y = df["is_signature"].to_numpy()[-n_last_lines_to_keep:] y = np.expand_dims(y, axis=0) return x, y, minmax_scaler, standard_scaler def f_scale_parameters(df_tagged_data, minmax_scaler=None, standard_scaler=None): # df_tagged_data = df_tagged_data.copy(deep=True) if minmax_scaler is None: logging.debug("fitting new min max scaller") minmax_scaler = MinMaxScaler() df_tagged_data.loc[:, columns_to_scale_minmax] = minmax_scaler.fit_transform( df_tagged_data[columns_to_scale_minmax]) else: logging.debug("using already fitted minmax scaler") df_tagged_data.loc[:, columns_to_scale_minmax] = minmax_scaler.transform( df_tagged_data[columns_to_scale_minmax]) if standard_scaler is None: logging.debug("fitting new standard scaler") standard_scaler = StandardScaler() df_tagged_data.loc[:, columns_to_scale_standard] = standard_scaler.fit_transform( df_tagged_data[columns_to_scale_standard]) else: logging.debug("using already fitted scaler") df_tagged_data.loc[:, columns_to_scale_standard] = standard_scaler.transform( df_tagged_data[columns_to_scale_standard]) return df_tagged_data, minmax_scaler, standard_scaler