import logging import os import regex from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline import pandas as pd import numpy as np from . import utils, _models_signatures from .utils import timing from langid.langid import LanguageIdentifier from langid.langid import model as model_langid # Creating language_identifier object for usage in function f_detect_language language_identifier = LanguageIdentifier.from_modelstring(model_langid, norm_probs=True) language_identifier.set_languages(['en', 'fr']) logging.info(f"Reading config file from folder:{os.path.join(os.path.dirname(__file__))}") config = utils.f_read_config(os.path.join(os.path.dirname(__file__), 'config.ini')) device = int(config["DEFAULT"]["device"]) default_lang = config["DEFAULT"]["default_lang"] tokenizer_dict = {} models_dict = {} nlp_dict = {} dict_regex_pattern = dict(EMAIL=r'[\p{L}\p{M}\-\d._]{1,}@[\p{L}\p{M}\d\-_]{1,}(\.[\p{L}\p{M}]{1,}){1,}', TEL=r'(? 0: if df_result is None or len(df_result) == 0: return pd.DataFrame(list_result_new, columns=list_columns_names) list_row = [] for row in list_result_new: df_intersect = df_result.query("({1}>=start and {0}<=end)".format(row[2], row[3])) if len(df_intersect) == 0: list_row.append(row) df_final = pd.concat([df_result, pd.DataFrame(list_row, columns=list_columns_names)], ignore_index=True) \ .sort_values(by="start") return df_final else: # If list_result_new was empty we just return df_result return df_result @timing def f_detect_language(text, default=default_lang): """ Detect language Args: text: text on which language should be detected default: default value if there is an error or score of predicted value is to low (default nlp.default_lang) Returns: "fr" or "en" """ lang = default try: if text.strip() != "": lang, score = language_identifier.classify(text.strip().replace("\n"," ").lower()) # If scroe is not high enough we will take default value instead if score < 0.8: lang = default_lang except Exception as e: logging.error("following error occurs when trying to detect language: {}".format(e)) finally: return lang @timing def f_find_regex_pattern(text, type_, pattern): """ Find all occurences of a pattern in a text and return a list of results Args: text: the text to be analyzed type_: the entity type (value is added in result) pattern: regex pattern to be found Returns: A list containing type, matched value, position start and end of each result """ list_result = [] results = regex.finditer(pattern, text, flags=regex.IGNORECASE) for match in results: value = match.string[match.start(): match.end()].replace("\n", " ").strip() list_result.append([type_, value, match.start(), match.end(), 1]) return list_result @timing def f_ner_regex(text, dict_pattern=dict_regex_pattern, df_result=pd.DataFrame()): """Run a series of regex expression to detect email, tel and postal codes in a full text. Args: text: the text to be analyzed dict_pattern: dictionary of regex expression to be ran successively (default nlp.dict_regex_pattern) df_result: results of this function will be merged with values provided here. If value is already found at an overlapping position in df_results, the existing value will be kept Returns: Dataframe containing results merged with provided argument df_result (if any) """ logging.debug("Starting regex") list_result = [] # we run f_find_regex_pattern for each pattern in dict_regex for type_, pattern in dict_pattern.items(): result = f_find_regex_pattern(text, type_, pattern) if len(result) != 0: list_result += result df_result = f_concat_results(df_result, list_result) return df_result @timing def f_split_text_by_lines(text, position_offset=0): """ :param text: text that should be split :return: list containing for each line: [position start, position end, sentence] """ results = [] # iter_lines = regex.finditer(".*(?=\n|$)", text) iter_lines = regex.finditer("[^>\n]((.*?([!?.>] ){1,})|.*(?=\n|$))", text) for line_match in iter_lines: start_line = line_match.start() end_line = line_match.end() line = line_match.group() if len(line.strip()) > 1: results.append([start_line + position_offset, end_line + position_offset, line]) return results def f_detect_email_signature(text, df_ner=None, cut_off_score=0.6, lang=default_lang): # with tf.device("/cpu:0"): if text.strip() == "": return None if df_ner is None: df_ner = f_ner(text, lang=lang) df_features = _models_signatures.f_create_email_lines_features(text, df_ner=df_ner) if len(df_features)==0: return None # We add dummy value for signature in order to use same function than for training of the model df_features["is_signature"] = -2 x, y_out, _, _ = _models_signatures.generate_x_y(df_features, _models_signatures.minmax_scaler, _models_signatures.standard_scaler) y_predict = _models_signatures.model.predict(x) y_predict_value = (y_predict> cut_off_score).reshape([-1]) y_predict_value = np.pad(y_predict_value, (len(df_features) - len(y_predict_value), 0), constant_values=0)[ -len(df_features):] y_predict_score = y_predict.reshape([-1]) y_predict_score = np.pad(y_predict_score, (len(df_features) - len(y_predict_score), 0), constant_values=1)[ -len(df_features):] # return(y_predict, y_mask) df_features["prediction"] = y_predict_value df_features["score"] = y_predict_score # return df_features series_position_body = df_features.query(f"""prediction==0""")['end'] if len(series_position_body) > 0: body_end_pos = max(series_position_body) else: # In this case everything was detected as a signature body_end_pos = 0 score = df_features.query(f"""prediction==1""")["score"].mean() signature_text = text[body_end_pos:].strip().replace("\n", " ") if signature_text != "": list_result = [ # ["body", text[:body_end_pos], 0 + pos_start_email, body_end_pos + pos_start_email, 1, ""], ["SIGNATURE", signature_text, body_end_pos, len(text), score]] df_result = f_concat_results(pd.DataFrame(), list_result) else: df_result = None return df_result