|
from PIL import Image
|
|
from typing import List
|
|
import pandas as pd
|
|
from presidio_image_redactor import ImageRedactorEngine, ImageAnalyzerEngine
|
|
from pdfminer.high_level import extract_pages
|
|
from tools.file_conversion import process_file
|
|
from pdfminer.layout import LTTextContainer, LTChar, LTTextLine, LTAnno
|
|
from pikepdf import Pdf, Dictionary, Name
|
|
from gradio import Progress
|
|
|
|
from tools.load_spacy_model_custom_recognisers import nlp_analyser, score_threshold
|
|
|
|
def redact_image_pdf(file_path:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, progress=Progress(track_tqdm=True)):
|
|
'''
|
|
take an path for an image of a document, then run this image through the Presidio ImageAnalyzer to get a redacted page back
|
|
'''
|
|
|
|
progress(0, desc="Converting pages to image")
|
|
|
|
image_paths = process_file(file_path)
|
|
|
|
|
|
|
|
|
|
images = []
|
|
number_of_pages = len(image_paths)
|
|
|
|
progress(0.1, desc="Redacting pages")
|
|
|
|
for i in progress.tqdm(range(0,number_of_pages), total=number_of_pages, unit="pages", desc="Redacting pages"):
|
|
|
|
|
|
image = image_paths[i]
|
|
|
|
|
|
image_analyser = ImageAnalyzerEngine(nlp_analyser)
|
|
engine = ImageRedactorEngine(image_analyser)
|
|
|
|
|
|
if language == 'en':
|
|
ocr_lang = 'eng'
|
|
else: ocr_lang = language
|
|
|
|
|
|
|
|
redacted_image = engine.redact(image,
|
|
fill=(0, 0, 0),
|
|
ocr_kwargs={"lang": ocr_lang},
|
|
allow_list=allow_list,
|
|
ad_hoc_recognizers= None,
|
|
**{
|
|
"language": language,
|
|
"entities": chosen_redact_entities,
|
|
"score_threshold": score_threshold
|
|
},
|
|
)
|
|
|
|
images.append(redacted_image)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return images
|
|
|
|
def redact_text_pdf(filename:str, language:str, chosen_redact_entities:List[str], allow_list:List[str]=None, progress=Progress()):
|
|
'''
|
|
Redact chosen entities from a pdf that is made up of multiple pages that are not images.
|
|
'''
|
|
|
|
combined_analyzer_results = []
|
|
analyser_explanations = []
|
|
annotations_all_pages = []
|
|
analyzed_bounding_boxes_df = pd.DataFrame()
|
|
|
|
pdf = Pdf.open(filename)
|
|
|
|
for page_num, page in progress.tqdm(enumerate(pdf.pages), total=len(pdf.pages), unit="pages", desc="Redacting pages"):
|
|
|
|
print("Page number is: ", page_num)
|
|
|
|
annotations_on_page = []
|
|
analyzed_bounding_boxes = []
|
|
|
|
for page_layout in extract_pages(filename, page_numbers = [page_num], maxpages=1):
|
|
analyzer_results = []
|
|
|
|
for text_container in page_layout:
|
|
if isinstance(text_container, LTTextContainer):
|
|
text_to_analyze = text_container.get_text()
|
|
|
|
analyzer_results = []
|
|
characters = []
|
|
|
|
analyzer_results = nlp_analyser.analyze(text=text_to_analyze,
|
|
language=language,
|
|
entities=chosen_redact_entities,
|
|
score_threshold=score_threshold,
|
|
return_decision_process=False,
|
|
allow_list=allow_list)
|
|
|
|
|
|
|
|
|
|
|
|
characters = [char
|
|
for line in text_container
|
|
if isinstance(line, LTTextLine)
|
|
for char in line]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(analyzer_results)
|
|
|
|
if len(analyzer_results) > 0 and len(characters) > 0:
|
|
analyzed_bounding_boxes.extend({"boundingBox": char.bbox, "result": result} for result in analyzer_results for char in characters[result.start:result.end] if isinstance(char, LTChar))
|
|
combined_analyzer_results.extend(analyzer_results)
|
|
|
|
if len(analyzer_results) > 0:
|
|
|
|
analyzed_bounding_boxes_df_new = pd.DataFrame(analyzed_bounding_boxes)
|
|
analyzed_bounding_boxes_df_text = analyzed_bounding_boxes_df_new['result'].astype(str).str.split(",",expand=True).replace(".*: ", "", regex=True)
|
|
analyzed_bounding_boxes_df_text.columns = ["type", "start", "end", "score"]
|
|
analyzed_bounding_boxes_df_new = pd.concat([analyzed_bounding_boxes_df_new, analyzed_bounding_boxes_df_text], axis = 1)
|
|
analyzed_bounding_boxes_df_new['page'] = page_num + 1
|
|
analyzed_bounding_boxes_df = pd.concat([analyzed_bounding_boxes_df, analyzed_bounding_boxes_df_new], axis = 0)
|
|
|
|
for analyzed_bounding_box in analyzed_bounding_boxes:
|
|
bounding_box = analyzed_bounding_box["boundingBox"]
|
|
annotation = Dictionary(
|
|
Type=Name.Annot,
|
|
Subtype=Name.Highlight,
|
|
QuadPoints=[bounding_box[0], bounding_box[3], bounding_box[2], bounding_box[3], bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[1]],
|
|
Rect=[bounding_box[0], bounding_box[1], bounding_box[2], bounding_box[3]],
|
|
C=[0, 0, 0],
|
|
CA=1,
|
|
T=analyzed_bounding_box["result"].entity_type
|
|
)
|
|
annotations_on_page.append(annotation)
|
|
|
|
annotations_all_pages.extend([annotations_on_page])
|
|
|
|
print("For page number: ", page_num, " there are ", len(annotations_all_pages[page_num]), " annotations")
|
|
page.Annots = pdf.make_indirect(annotations_on_page)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
analyzed_bounding_boxes_df.to_csv("output/annotations_made.csv")
|
|
|
|
return pdf |