File size: 13,459 Bytes
1215818
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
import copy
import logging
from typing import Optional, List

import torch
from presidio_analyzer import (
    RecognizerResult,
    EntityRecognizer,
    AnalysisExplanation,
)
from presidio_analyzer.nlp_engine import NlpArtifacts

from .configuration import BERT_DEID_CONFIGURATION


logger = logging.getLogger("presidio-analyzer")

try:
    from transformers import (
        AutoTokenizer,
        AutoModelForTokenClassification,
        pipeline,
        TokenClassificationPipeline,
    )

except ImportError:
    logger.error("transformers_rec is not installed")


class TransformersRecognizer(EntityRecognizer):
    """
    Wrapper for a transformers_rec model, if needed to be used within Presidio Analyzer.
    The class loads models hosted on HuggingFace - https://huggingface.co/
    and loads the model and tokenizer into a TokenClassification pipeline.
    Samples are split into short text chunks, ideally shorter than max_length input_ids of the individual model,
    to avoid truncation by the Tokenizer and loss of information

    A configuration object should be maintained for each dataset-model combination and translate
    entities names into a standardized view. A sample of a configuration file is attached in
    the example.
    :param supported_entities: List of entities to run inference on
    :type supported_entities: Optional[List[str]]
    :param pipeline: Instance of a TokenClassificationPipeline including a Tokenizer and a Model, defaults to None
    :type pipeline: Optional[TokenClassificationPipeline], optional
    :param model_path: string referencing a HuggingFace uploaded model to be used for Inference, defaults to None
    :type model_path: Optional[str], optional

    :example
    >from presidio_analyzer import AnalyzerEngine, RecognizerRegistry
    >model_path = "obi/deid_roberta_i2b2"
    >transformers_recognizer = TransformersRecognizer(model_path=model_path,
    >supported_entities = model_configuration.get("PRESIDIO_SUPPORTED_ENTITIES"))
    >transformers_recognizer.load_transformer(**model_configuration)
    >registry = RecognizerRegistry()
    >registry.add_recognizer(transformers_recognizer)
    >analyzer = AnalyzerEngine(registry=registry)
    >sample = "My name is Christopher and I live in Irbid."
    >results = analyzer.analyze(sample, language="en",return_decision_process=True)

    >for result in results:
    >    print(result,'----', sample[result.start:result.end])
    """

    def load(self) -> None:
        pass

    def __init__(
        self,
        model_path: Optional[str] = None,
        pipeline: Optional[TokenClassificationPipeline] = None,
        supported_entities: Optional[List[str]] = None,
    ):
        if not supported_entities:
            supported_entities = BERT_DEID_CONFIGURATION[
                "PRESIDIO_SUPPORTED_ENTITIES"
            ]
        super().__init__(
            supported_entities=supported_entities,
            name=f"Transformers model {model_path}",
        )

        self.model_path = model_path
        self.pipeline = pipeline
        self.is_loaded = False

        self.aggregation_mechanism = None
        self.ignore_labels = None
        self.model_to_presidio_mapping = None
        self.entity_mapping = None
        self.default_explanation = None
        self.text_overlap_length = None
        self.chunk_length = None

    def load_transformer(self, **kwargs) -> None:
        """Load external configuration parameters and set default values.

        :param kwargs: define default values for class attributes and modify pipeline behavior
        **DATASET_TO_PRESIDIO_MAPPING (dict) - defines mapping entity strings from dataset format to Presidio format
        **MODEL_TO_PRESIDIO_MAPPING (dict) -  defines mapping entity strings from chosen model format to Presidio format
        **SUB_WORD_AGGREGATION(str) - define how to aggregate sub-word tokens into full words and spans as defined
        in HuggingFace https://huggingface.co/transformers/v4.8.0/main_classes/pipelines.html#transformers.TokenClassificationPipeline # noqa
        **CHUNK_OVERLAP_SIZE (int) - number of overlapping characters in each text chunk
        when splitting a single text into multiple inferences
        **CHUNK_SIZE (int) - number of characters in each chunk of text
        **LABELS_TO_IGNORE (List(str)) - List of entities to skip evaluation. Defaults to ["O"]
        **DEFAULT_EXPLANATION (str) - string format to use for prediction explanations
        """

        self.entity_mapping = kwargs.get("DATASET_TO_PRESIDIO_MAPPING", {})
        self.model_to_presidio_mapping = kwargs.get("MODEL_TO_PRESIDIO_MAPPING", {})
        self.ignore_labels = kwargs.get("LABELS_TO_IGNORE", ["O"])
        self.aggregation_mechanism = kwargs.get("SUB_WORD_AGGREGATION", "simple")
        self.default_explanation = kwargs.get("DEFAULT_EXPLANATION", None)
        self.text_overlap_length = kwargs.get("CHUNK_OVERLAP_SIZE", 40)
        self.chunk_length = kwargs.get("CHUNK_SIZE", 600)
        if not self.pipeline:
            if not self.model_path:
                self.model_path = "obi/deid_roberta_i2b2"
                logger.warning(
                    f"Both 'model' and 'model_path' arguments are None. Using default model_path={self.model_path}"
                )

        self._load_pipeline()

    def _load_pipeline(self) -> None:
        """Initialize NER transformers_rec pipeline using the model_path provided"""

        logging.debug(f"Initializing NER pipeline using {self.model_path} path")
        device = 0 if torch.cuda.is_available() else -1
        self.pipeline = pipeline(
            "ner",
            model=AutoModelForTokenClassification.from_pretrained(self.model_path),
            tokenizer=AutoTokenizer.from_pretrained(self.model_path),
            # Will attempt to group sub-entities to word level
            aggregation_strategy=self.aggregation_mechanism,
            device=device,
            framework="pt",
            ignore_labels=self.ignore_labels,
        )

        self.is_loaded = True

    def get_supported_entities(self) -> List[str]:
        """
        Return supported entities by this model.
        :return: List of the supported entities.
        """
        return self.supported_entities

    # Class to use transformers_rec with Presidio as an external recognizer.
    def analyze(
        self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None
    ) -> List[RecognizerResult]:
        """
        Analyze text using transformers_rec model to produce NER tagging.
        :param text : The text for analysis.
        :param entities: Not working properly for this recognizer.
        :param nlp_artifacts: Not used by this recognizer.
        :return: The list of Presidio RecognizerResult constructed from the recognized
            transformers_rec detections.
        """

        results = list()
        # Run transformer model on the provided text
        ner_results = self._get_ner_results_for_text(text)

        for res in ner_results:
            entity = self.model_to_presidio_mapping.get(res["entity_group"], None)
            if not entity:
                continue

            res["entity_group"] = self.__check_label_transformer(res["entity_group"])
            textual_explanation = self.default_explanation.format(res["entity_group"])
            explanation = self.build_transformers_explanation(
                float(round(res["score"], 2)), textual_explanation, res["word"]
            )
            transformers_result = self._convert_to_recognizer_result(res, explanation)

            results.append(transformers_result)

        return results

    @staticmethod
    def split_text_to_word_chunks(
        input_length: int, chunk_length: int, overlap_length: int
    ) -> List[List]:
        """The function calculates chunks of text with size chunk_length. Each chunk has overlap_length number of
        words to create context and continuity for the model

        :param input_length: Length of input_ids for a given text
        :type input_length: int
        :param chunk_length: Length of each chunk of input_ids.
        Should match the max input length of the transformer model
        :type chunk_length: int
        :param overlap_length: Number of overlapping words in each chunk
        :type overlap_length: int
        :return: List of start and end positions for individual text chunks
        :rtype: List[List]
        """
        if input_length < chunk_length:
            return [[0, input_length]]
        if chunk_length <= overlap_length:
            logger.warning(
                "overlap_length should be shorter than chunk_length, setting overlap_length to by half of chunk_length"
            )
            overlap_length = chunk_length // 2
        return [
            [i, min([i + chunk_length, input_length])]
            for i in range(
                0, input_length - overlap_length, chunk_length - overlap_length
            )
        ]

    def _get_ner_results_for_text(self, text: str) -> List[dict]:
        """The function runs model inference on the provided text.
        The text is split into chunks with n overlapping characters.
        The results are then aggregated and duplications are removed.

        :param text: The text to run inference on
        :type text: str
        :return: List of entity predictions on the word level
        :rtype: List[dict]
        """
        model_max_length = self.pipeline.tokenizer.model_max_length
        # calculate inputs based on the text
        text_length = len(text)
        # split text into chunks
        logger.info(
            f"splitting the text into chunks, length {text_length} > {model_max_length*2}"
        )
        predictions = list()
        chunk_indexes = TransformersRecognizer.split_text_to_word_chunks(
            text_length, self.chunk_length, self.text_overlap_length
        )

        # iterate over text chunks and run inference
        for chunk_start, chunk_end in chunk_indexes:
            chunk_text = text[chunk_start:chunk_end]
            chunk_preds = self.pipeline(chunk_text)

            # align indexes to match the original text - add to each position the value of chunk_start
            aligned_predictions = list()
            for prediction in chunk_preds:
                prediction_tmp = copy.deepcopy(prediction)
                prediction_tmp["start"] += chunk_start
                prediction_tmp["end"] += chunk_start
                aligned_predictions.append(prediction_tmp)

            predictions.extend(aligned_predictions)

        # remove duplicates
        predictions = [dict(t) for t in {tuple(d.items()) for d in predictions}]
        return predictions

    @staticmethod
    def _convert_to_recognizer_result(
        prediction_result: dict, explanation: AnalysisExplanation
    ) -> RecognizerResult:
        """The method parses NER model predictions into a RecognizerResult format to enable down the stream analysis

        :param prediction_result: A single example of entity prediction
        :type prediction_result: dict
        :param explanation: Textual representation of model prediction
        :type explanation: str
        :return: An instance of RecognizerResult which is used to model evaluation calculations
        :rtype: RecognizerResult
        """

        transformers_results = RecognizerResult(
            entity_type=prediction_result["entity_group"],
            start=prediction_result["start"],
            end=prediction_result["end"],
            score=float(round(prediction_result["score"], 2)),
            analysis_explanation=explanation,
        )

        return transformers_results

    def build_transformers_explanation(
        self,
        original_score: float,
        explanation: str,
        pattern: str,
    ) -> AnalysisExplanation:
        """
        Create explanation for why this result was detected.
        :param original_score: Score given by this recognizer
        :param explanation: Explanation string
        :param pattern: Regex pattern used
        :return Structured explanation and scores of a NER model prediction
        :rtype: AnalysisExplanation
        """
        explanation = AnalysisExplanation(
            recognizer=self.__class__.__name__,
            original_score=float(original_score),
            textual_explanation=explanation,
            pattern=pattern,
        )
        return explanation

    def __check_label_transformer(self, label: str) -> str:
        """The function validates the predicted label is identified by Presidio
        and maps the string into a Presidio representation
        :param label: Predicted label by the model
        :type label: str
        :return: Returns the predicted entity if the label is found in model_to_presidio mapping dictionary
        and is supported by Presidio entities
        :rtype: str
        """

        if label == "O":
            return label

        # convert model label to presidio label
        entity = self.model_to_presidio_mapping.get(label, None)

        if entity is None:
            logger.warning(f"Found unrecognized label {label}, returning entity as 'O'")
            return "O"

        if entity not in self.supported_entities:
            logger.warning(f"Found entity {entity} which is not supported by Presidio")
            return "O"
        return entity