from pprint import pprint import json from presidio_analyzer import AnalyzerEngine, RecognizerRegistry from presidio_analyzer.nlp_engine import NlpEngineProvider, NlpArtifacts from presidio_analyzer import PatternRecognizer from presidio_analyzer import Pattern, PatternRecognizer from presidio_analyzer.predefined_recognizers import SpacyRecognizer from presidio_analyzer.predefined_recognizers import IbanRecognizer, EmailRecognizer, IpRecognizer,\ EmailRecognizer, PhoneRecognizer, UrlRecognizer, DateRecognizer from presidio_anonymizer import AnonymizerEngine from presidio_anonymizer.entities import OperatorConfig import logging from typing import Optional, List, Tuple, Set from presidio_analyzer import ( RecognizerResult, EntityRecognizer, AnalysisExplanation, ) from flair.data import Sentence from flair.models import SequenceTagger ### Creating FlairRecognizer class for NER(names, location) class FlairRecognizer(EntityRecognizer): ENTITIES = [ "LOCATION", "PERSON", "ORGANIZATION", # "MISCELLANEOUS" # - There are no direct correlation with Presidio entities. ] DEFAULT_EXPLANATION = "Identified as {} by Flair's Named Entity Recognition" CHECK_LABEL_GROUPS = [ ({"LOCATION"}, {"LOC", "LOCATION"}), ({"PERSON"}, {"PER", "PERSON"}), ({"ORGANIZATION"}, {"ORG"}), # ({"MISCELLANEOUS"}, {"MISC"}), # Probably not PII ] MODEL_LANGUAGES = { #"en": "flair/ner-english-large", #"es": "flair/ner-spanish-large", "de": "flair/ner-german-large", #"nl": "flair/ner-dutch-large", } PRESIDIO_EQUIVALENCES = { "PER": "PERSON", "LOC": "LOCATION", "ORG": "ORGANIZATION", # 'MISC': 'MISCELLANEOUS' # - Probably not PII } def __init__( self, supported_language: str = "en", supported_entities: Optional[List[str]] = None, check_label_groups: Optional[Tuple[Set, Set]] = None, model: SequenceTagger = None, ): self.check_label_groups = ( check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS ) supported_entities = supported_entities if supported_entities else self.ENTITIES self.model = ( model if model else SequenceTagger.load(self.MODEL_LANGUAGES.get(supported_language)) ) super().__init__( supported_entities=supported_entities, supported_language=supported_language, name="Flair Analytics", ) print("Flair class initialized") def load(self) -> None: """Load the model, not used. Model is loaded during initialization.""" pass def get_supported_entities(self) -> List[str]: """ Return supported entities by this model. :return: List of the supported entities. """ return self.supported_entities # Class to use Flair with Presidio as an external recognizer. def analyze( self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None ) -> List[RecognizerResult]: """ Analyze text using Text Analytics. :param text: The text for analysis. :param entities: Not working properly for this recognizer. :param nlp_artifacts: Not used by this recognizer. :param language: Text language. Supported languages in MODEL_LANGUAGES :return: The list of Presidio RecognizerResult constructed from the recognized Flair detections. """ results = [] sentences = Sentence(text) self.model.predict(sentences) # If there are no specific list of entities, we will look for all of it. if not entities: entities = self.supported_entities for entity in entities: if entity not in self.supported_entities: continue for ent in sentences.get_spans("ner"): if not self.__check_label( entity, ent.labels[0].value, self.check_label_groups ): continue textual_explanation = self.DEFAULT_EXPLANATION.format( ent.labels[0].value ) explanation = self.build_flair_explanation( round(ent.score, 2), textual_explanation ) flair_result = self._convert_to_recognizer_result(ent, explanation) results.append(flair_result) return results def _convert_to_recognizer_result(self, entity, explanation) -> RecognizerResult: entity_type = self.PRESIDIO_EQUIVALENCES.get(entity.tag, entity.tag) flair_score = round(entity.score, 2) flair_results = RecognizerResult( entity_type=entity_type, start=entity.start_position, end=entity.end_position, score=flair_score, analysis_explanation=explanation, ) return flair_results def build_flair_explanation( self, original_score: float, explanation: str ) -> AnalysisExplanation: """ Create explanation for why this result was detected. :param original_score: Score given by this recognizer :param explanation: Explanation string :return: """ explanation = AnalysisExplanation( recognizer=self.__class__.__name__, original_score=original_score, textual_explanation=explanation, ) return explanation @staticmethod def __check_label( entity: str, label: str, check_label_groups: Tuple[Set, Set] ) -> bool: return any( [entity in egrp and label in lgrp for egrp, lgrp in check_label_groups] ) class PIIService: def __init__(self): configuration = { "nlp_engine_name": "spacy", "models": [ {"lang_code": "de", "model_name": "de_core_news_sm"} ], } # Create NLP engine based on configuration provider = NlpEngineProvider(nlp_configuration=configuration) nlp_engine = provider.create_engine() ## Creating regex for PatternRecognizers - SWIFT, vehicle number, zipcode, ssn swift_regex = r"\b[A-Z]{4}DE[A-Z0-9]{2}(?:[A-Z0-9]{3})?" vehicle_number_with_hyphen_regex = r"\b[A-ZÄÖÜ]{1,3}-[A-ZÄÖÜ]{1,2}-[0-9]{1,4}" vehicle_number_without_hyphen_regex = r"\b[A-ZÄÖÜ]{1,3}[A-ZÄÖÜ]{1,2}[0-9]{1,4}" german_zipcode_regex = r"\b((?:0[1-46-9]\d{3})|(?:[1-357-9]\d{4})|(?:[4][0-24-9]\d{3})|(?:[6][013-9]\d{3}))\b(?![\d/])" german_ssn_regex = r"\b\d{2}\s?\d{6}\s?[A-Z]\s?\d{3}\b" # Creating Presidio pattern object vehicle_numbers_pattern1 = Pattern(name="vehicle_pattern", regex=vehicle_number_without_hyphen_regex, score=1) vehicle_numbers_pattern2 = Pattern(name="vehicle_pattern", regex=vehicle_number_with_hyphen_regex, score=1) swift_pattern = Pattern(name="bank_swift_pattern", regex=swift_regex, score=1) germanzipcode_pattern = Pattern(name="german_zip_pattern",regex=german_zipcode_regex, score=1) german_ssn_pattern = Pattern(name="german_ssn_pattern",regex=german_ssn_regex, score=1) # Define the recognizer swift_recognizer = PatternRecognizer(supported_entity="SWIFT", supported_language="de",patterns=[swift_pattern]) vehicle_number_recognizer = PatternRecognizer(supported_entity="VEHICLE_NUMBER", supported_language="de",patterns=[vehicle_numbers_pattern1,vehicle_numbers_pattern2]) germanzip_recognizer = PatternRecognizer(supported_entity="GERMAN_ZIP", supported_language="de",patterns=[germanzipcode_pattern]) germanssn_recognizer = PatternRecognizer(supported_entity="GERMAN_SSN", supported_language="de",patterns=[german_ssn_pattern]) ## Lading flair entity model for person, location ID print("Loading flair") flair_recognizer = FlairRecognizer(supported_language="de") print("Flair loaded") registry = RecognizerRegistry() #registry.load_predefined_recognizers() #registry.add_recognizer(SpacyRecognizer(supported_language="de")) #registry.add_recognizer(SpacyRecognizer(supported_language="en")) registry.remove_recognizer("SpacyRecognizer") registry.add_recognizer(flair_recognizer) registry.add_recognizer(swift_recognizer) registry.add_recognizer(vehicle_number_recognizer) registry.add_recognizer(germanzip_recognizer) registry.add_recognizer(germanssn_recognizer) ## Adding predefined recognizers registry.add_recognizer(IbanRecognizer(supported_language="de")) registry.add_recognizer(DateRecognizer(supported_language="de")) registry.add_recognizer(EmailRecognizer(supported_language="de")) registry.add_recognizer(IpRecognizer(supported_language="de")) registry.add_recognizer(PhoneRecognizer(supported_language="de")) registry.add_recognizer(UrlRecognizer(supported_language="de")) registry.add_recognizer(PhoneRecognizer(supported_language="de")) print("Recognizer registry loaded") self.analyzer = AnalyzerEngine(registry=registry, nlp_engine=nlp_engine, supported_languages=["de"]) #print(f"Type of recognizers ::\n {self.analyzer.registry.recognizers}") print("PII initialized") self.anonymizer = AnonymizerEngine() def identify(self, text): results_de = self.analyzer.analyze( text, language='de' ) #anonymized_results = self.anonymize(results_de, text) entities = [] for result in results_de: result_dict = result.to_dict() temp_entity = { "start":result_dict['start'], "end":result_dict['end'], "entity_type":result_dict['entity_type'], "score":result_dict['score'], "word":text[result_dict['start']:result_dict['end']] } entities.append(temp_entity) return {"entities":entities, "text":text}#, "anonymized_text":anonymized_results['text']} """def anonymize(self, entities, text): anonymized_results = self.anonymizer.anonymize( text=text, analyzer_results=entities, #operators={"DEFAULT": OperatorConfig("replace", {"new_value": ""})}, ) return ""#json.loads(anonymized_results.to_json())""" def add_mask(self, data): masked_data = [] entity_count = {} for item_idx,item in enumerate(data['entities']): entity_type = item['entity_type'] word = item['word'] suffix = entity_count.get(entity_type, 0) + 1 entity_count[entity_type] = suffix masked_word = f"{entity_type}_{suffix}" item['mask'] = masked_word #data['entities'][item_idx]['mask'] = masked_word masked_data.append(item) return masked_data def anonymize(self, entities, text): print("anonymyzing") updated_text = text for ent_idx, ent in enumerate(entities): #text[ent['start']:ent['end']] = ent['mask'] updated_text = updated_text[:ent['start']] + " " + ent['mask'] + " " + updated_text[ent['end']:] return updated_text def remove_overlapping_entities(entities): return