email_parser / email_parser /_models_signatures.py
jeanpoll
first working version of app
79e12fd
raw history blame
No virus
8.02 kB
import logging
import pandas as pd
import numpy as np
import regex
import os
import configparser
from sentence_transformers import SentenceTransformer
from scipy.spatial import distance
from keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from tensorflow import keras
import pickle
from . import nlp, utils
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), 'config.ini'))
model_name = config["DEFAULT"]["name_model_signature"]
model = keras.models.load_model(filepath=utils.get_model_full_path(model_name))
minmax_scaler = pickle.load(open(utils.get_model_full_path(model_name +"/minmax_scaler.p"), "rb"))
standard_scaler = pickle.load(open(utils.get_model_full_path(model_name +"/standard_scaler.p"), "rb"))
list_name_columns_features = ["line_number",
"text",
"start",
"end",
"PER", "ORG", "LOC", "DATE", "TEL", "EMAIL", "WEB",
"SIGNATURE",
"word_count",
"inv_distance_to_merci",
"inv_distance_to_cordlt",
"inv_distance_to_regards",
"inv_distance_to_sincerely",
"inv_distance_to_sent_from",
"start_with_ps", "position_line",
"special_characters_count", "empty_chars_with_prev_line"]
list_columns_used_in_model = ["PER", "ORG", "LOC", "DATE", "TEL", "EMAIL",
# "WEB",
"word_count",
"inv_distance_to_merci",
"inv_distance_to_cordlt",
# "inv_distance_to_regards",
"inv_distance_to_sincerely",
"inv_distance_to_sent_from",
"start_with_ps",
"position_line",
"special_characters_count",
"empty_chars_with_prev_line"]
columns_to_scale_minmax = ["PER", "ORG", "LOC", "DATE", "TEL", "EMAIL", "WEB", "position_line",
"empty_chars_with_prev_line",
"inv_distance_to_merci",
"inv_distance_to_cordlt",
"inv_distance_to_regards",
"inv_distance_to_sincerely",
"inv_distance_to_sent_from",
"start_with_ps"
]
columns_to_scale_standard = ["word_count", "special_characters_count"]
def f_retrieve_entities_for_line(df_ner, start=0, end=1e12):
"""Retrieve all entities in the previously computed dataframe for a specific line
Args:
df_ner: dataframe containing found entities
start: start position of the line in original text
end: end position of the line in original text
"""
if len(df_ner) > 0:
df = df_ner.query(f"""(start>= {start} and end <= {end}) or (start<={start} and end>={end})""")
return df
embedder_model = SentenceTransformer("distiluse-base-multilingual-cased-v1")
def f_create_embedding_inv_dist_feature(text1, text2):
""" Computing distance between two texts based on their embedding
provided by the SentenceTransformer above"""
embedding_merci = embedder_model.encode(text1)
embedding_line = embedder_model.encode(text2)
dist = distance.cosine(embedding_merci, embedding_line)
return 1 / (dist + 0.01)
def f_create_email_lines_features(text, df_ner=None, position_offset=0):
list_lines = nlp.f_split_text_by_lines(text, position_offset)
list_features_vectors = []
if df_ner is None:
df_ner = nlp.f_ner(text)
for line_number in range(0, len(list_lines)):
list_features_vectors.append(f_create_line_features(list_lines, line_number, df_ner))
df_features = pd.DataFrame(list_features_vectors, columns=list_name_columns_features)
return df_features
def f_create_line_features(list_lines, line_number, df_ner):
current_line = list_lines[line_number]
total_lines = len(list_lines)
features_vector = [line_number, current_line[2], current_line[0], current_line[1]]
logging.debug(f"Creating line features for {current_line}")
df_ner_line = f_retrieve_entities_for_line(df_ner=df_ner, start=current_line[0], end=current_line[1])
# Adding entity to feature vector
for entity in ["PER", "ORG", "LOC", "DATE", "TEL", "EMAIL", "WEB", "SIGNATURE"]:
value = len(df_ner_line.query(f"entity=='{entity}'")) if df_ner_line is not None else 0
features_vector.append(value)
# Adding word count
features_vector.append(len(current_line[2].split()))
# distance to greeting word "merci"
features_vector.append(f_create_embedding_inv_dist_feature("merci", current_line[2].lower()))
# distance to greeting word "merci"
features_vector.append(f_create_embedding_inv_dist_feature("cordialement", current_line[2].lower()))
# distance to greeting word "regards"
features_vector.append(f_create_embedding_inv_dist_feature("regards", current_line[2].lower()))
# distance to greeting word "regards"
features_vector.append(f_create_embedding_inv_dist_feature("sincerely", current_line[2].lower()))
# distance to word "sent from"
features_vector.append(f_create_embedding_inv_dist_feature("sent from", current_line[2].lower()))
# Line start with ps:
features_vector.append(regex.match(r"\s*ps *:", current_line[2], flags=regex.IGNORECASE ) is not None)
# Adding position line in email
position_in_email = (line_number + 1) / total_lines
features_vector.append(position_in_email)
# Adding special character count
special_char_count = len(regex.findall(r"[^\p{L}0-9 .,\n]", current_line[2]))
features_vector.append(special_char_count)
# Number of empty chars with previous line
empty_chars_with_prev_line = 0 if line_number == 0 else current_line[0] - list_lines[line_number - 1][1]
features_vector.append(empty_chars_with_prev_line)
return features_vector
def generate_x_y(df, minmax_scaler=None, standard_scaler=None, n_last_lines_to_keep=30,
list_columns=list_columns_used_in_model):
df, minmax_scaler, standard_scaler = f_scale_parameters(df, minmax_scaler, standard_scaler)
x = df[list_columns].to_numpy()[-n_last_lines_to_keep:, :]
x = np.expand_dims(x, axis=0)
y = df["is_signature"].to_numpy()[-n_last_lines_to_keep:]
y = np.expand_dims(y, axis=0)
return x, y, minmax_scaler, standard_scaler
def f_scale_parameters(df_tagged_data, minmax_scaler=None, standard_scaler=None):
# df_tagged_data = df_tagged_data.copy(deep=True)
if minmax_scaler is None:
logging.debug("fitting new min max scaller")
minmax_scaler = MinMaxScaler()
df_tagged_data.loc[:, columns_to_scale_minmax] = minmax_scaler.fit_transform(
df_tagged_data[columns_to_scale_minmax])
else:
logging.debug("using already fitted minmax scaler")
df_tagged_data.loc[:, columns_to_scale_minmax] = minmax_scaler.transform(
df_tagged_data[columns_to_scale_minmax])
if standard_scaler is None:
logging.debug("fitting new standard scaler")
standard_scaler = StandardScaler()
df_tagged_data.loc[:, columns_to_scale_standard] = standard_scaler.fit_transform(
df_tagged_data[columns_to_scale_standard])
else:
logging.debug("using already fitted scaler")
df_tagged_data.loc[:, columns_to_scale_standard] = standard_scaler.transform(
df_tagged_data[columns_to_scale_standard])
return df_tagged_data, minmax_scaler, standard_scaler