|
|
|
|
|
import logging |
|
from typing import Optional, List, Tuple, Set |
|
|
|
from presidio_analyzer import ( |
|
RecognizerResult, |
|
EntityRecognizer, |
|
AnalysisExplanation, |
|
) |
|
from presidio_analyzer.nlp_engine import NlpArtifacts |
|
|
|
from flair.data import Sentence |
|
from flair.models import SequenceTagger |
|
|
|
|
|
logger = logging.getLogger("presidio-analyzer") |
|
|
|
|
|
class FlairRecognizer(EntityRecognizer): |
|
""" |
|
Wrapper for a flair model, if needed to be used within Presidio Analyzer. |
|
|
|
:example: |
|
>from presidio_analyzer import AnalyzerEngine, RecognizerRegistry |
|
|
|
>flair_recognizer = FlairRecognizer() |
|
|
|
>registry = RecognizerRegistry() |
|
>registry.add_recognizer(flair_recognizer) |
|
|
|
>analyzer = AnalyzerEngine(registry=registry) |
|
|
|
>results = analyzer.analyze( |
|
> "My name is Christopher and I live in Irbid.", |
|
> language="en", |
|
> return_decision_process=True, |
|
>) |
|
>for result in results: |
|
> print(result) |
|
> print(result.analysis_explanation) |
|
|
|
|
|
""" |
|
|
|
ENTITIES = [ |
|
"LOCATION", |
|
"PERSON", |
|
"ORGANIZATION", |
|
|
|
] |
|
|
|
DEFAULT_EXPLANATION = "Identified as {} by Flair's Named Entity Recognition" |
|
|
|
CHECK_LABEL_GROUPS = [ |
|
({"LOCATION"}, {"LOC", "LOCATION"}), |
|
({"PERSON"}, {"PER", "PERSON"}), |
|
({"ORGANIZATION"}, {"ORG"}), |
|
|
|
] |
|
|
|
MODEL_LANGUAGES = {"en": "flair/ner-english-large"} |
|
|
|
PRESIDIO_EQUIVALENCES = { |
|
"PER": "PERSON", |
|
"LOC": "LOCATION", |
|
"ORG": "ORGANIZATION", |
|
|
|
} |
|
|
|
def __init__( |
|
self, |
|
supported_language: str = "en", |
|
supported_entities: Optional[List[str]] = None, |
|
check_label_groups: Optional[Tuple[Set, Set]] = None, |
|
model: SequenceTagger = None, |
|
model_path: Optional[str] = None, |
|
): |
|
self.check_label_groups = ( |
|
check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS |
|
) |
|
|
|
supported_entities = supported_entities if supported_entities else self.ENTITIES |
|
|
|
if model and model_path: |
|
raise ValueError("Only one of model or model_path should be provided.") |
|
elif model and not model_path: |
|
self.model = model |
|
elif not model and model_path: |
|
print(f"Loading model from {model_path}") |
|
self.model = SequenceTagger.load(model_path) |
|
else: |
|
print(f"Loading model for language {supported_language}") |
|
self.model = SequenceTagger.load( |
|
self.MODEL_LANGUAGES.get(supported_language) |
|
) |
|
|
|
super().__init__( |
|
supported_entities=supported_entities, |
|
supported_language=supported_language, |
|
name="Flair Analytics", |
|
) |
|
|
|
def load(self) -> None: |
|
"""Load the model, not used. Model is loaded during initialization.""" |
|
pass |
|
|
|
def get_supported_entities(self) -> List[str]: |
|
""" |
|
Return supported entities by this model. |
|
|
|
:return: List of the supported entities. |
|
""" |
|
return self.supported_entities |
|
|
|
|
|
def analyze( |
|
self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None |
|
) -> List[RecognizerResult]: |
|
""" |
|
Analyze text using Text Analytics. |
|
|
|
:param text: The text for analysis. |
|
:param entities: Not working properly for this recognizer. |
|
:param nlp_artifacts: Not used by this recognizer. |
|
:param language: Text language. Supported languages in MODEL_LANGUAGES |
|
:return: The list of Presidio RecognizerResult constructed from the recognized |
|
Flair detections. |
|
""" |
|
|
|
results = [] |
|
|
|
sentences = Sentence(text) |
|
self.model.predict(sentences) |
|
|
|
|
|
if not entities: |
|
entities = self.supported_entities |
|
|
|
for entity in entities: |
|
if entity not in self.supported_entities: |
|
continue |
|
|
|
for ent in sentences.get_spans("ner"): |
|
if not self.__check_label( |
|
entity, ent.labels[0].value, self.check_label_groups |
|
): |
|
continue |
|
textual_explanation = self.DEFAULT_EXPLANATION.format( |
|
ent.labels[0].value |
|
) |
|
explanation = self.build_flair_explanation( |
|
round(ent.score, 2), textual_explanation |
|
) |
|
flair_result = self._convert_to_recognizer_result(ent, explanation) |
|
|
|
results.append(flair_result) |
|
|
|
return results |
|
|
|
def _convert_to_recognizer_result(self, entity, explanation) -> RecognizerResult: |
|
entity_type = self.PRESIDIO_EQUIVALENCES.get(entity.tag, entity.tag) |
|
flair_score = round(entity.score, 2) |
|
|
|
flair_results = RecognizerResult( |
|
entity_type=entity_type, |
|
start=entity.start_position, |
|
end=entity.end_position, |
|
score=flair_score, |
|
analysis_explanation=explanation, |
|
) |
|
|
|
return flair_results |
|
|
|
def build_flair_explanation( |
|
self, original_score: float, explanation: str |
|
) -> AnalysisExplanation: |
|
""" |
|
Create explanation for why this result was detected. |
|
|
|
:param original_score: Score given by this recognizer |
|
:param explanation: Explanation string |
|
:return: |
|
""" |
|
explanation = AnalysisExplanation( |
|
recognizer=self.__class__.__name__, |
|
original_score=original_score, |
|
textual_explanation=explanation, |
|
) |
|
return explanation |
|
|
|
@staticmethod |
|
def __check_label( |
|
entity: str, label: str, check_label_groups: Tuple[Set, Set] |
|
) -> bool: |
|
return any( |
|
[entity in egrp and label in lgrp for egrp, lgrp in check_label_groups] |
|
) |
|
|