|
|
import json
|
|
|
import os
|
|
|
import random
|
|
|
import re
|
|
|
import shutil
|
|
|
import string
|
|
|
import time
|
|
|
import zipfile
|
|
|
from collections import defaultdict
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
from pathlib import Path
|
|
|
from typing import Any, Dict, List
|
|
|
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
import pymupdf
|
|
|
from gradio import Progress
|
|
|
from pdf2image import convert_from_path, pdfinfo_from_path
|
|
|
from PIL import Image, ImageFile
|
|
|
from pymupdf import Document, Page
|
|
|
from scipy.spatial import cKDTree
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
from tools.config import (
|
|
|
COMPRESS_REDACTED_PDF,
|
|
|
IMAGES_DPI,
|
|
|
INPUT_FOLDER,
|
|
|
LOAD_REDACTION_ANNOTATIONS_FROM_PDF,
|
|
|
LOAD_TRUNCATED_IMAGES,
|
|
|
MAX_IMAGE_PIXELS,
|
|
|
MAX_SIMULTANEOUS_FILES,
|
|
|
OUTPUT_FOLDER,
|
|
|
SELECTABLE_TEXT_EXTRACT_OPTION,
|
|
|
TESSERACT_TEXT_EXTRACT_OPTION,
|
|
|
TEXTRACT_TEXT_EXTRACT_OPTION,
|
|
|
)
|
|
|
from tools.helper_functions import get_file_name_without_type, read_file
|
|
|
from tools.secure_path_utils import secure_file_read, secure_join
|
|
|
from tools.secure_regex_utils import safe_extract_page_number_from_path
|
|
|
|
|
|
IMAGE_NUM_REGEX = re.compile(r"_(\d+)\.png$")
|
|
|
|
|
|
pd.set_option("future.no_silent_downcasting", True)
|
|
|
|
|
|
image_dpi = float(IMAGES_DPI)
|
|
|
if not MAX_IMAGE_PIXELS:
|
|
|
Image.MAX_IMAGE_PIXELS = None
|
|
|
else:
|
|
|
Image.MAX_IMAGE_PIXELS = MAX_IMAGE_PIXELS
|
|
|
|
|
|
ImageFile.LOAD_TRUNCATED_IMAGES = LOAD_TRUNCATED_IMAGES
|
|
|
|
|
|
|
|
|
def is_pdf_or_image(filename):
|
|
|
"""
|
|
|
Check if a file name is a PDF or an image file.
|
|
|
|
|
|
Args:
|
|
|
filename (str): The name of the file.
|
|
|
|
|
|
Returns:
|
|
|
bool: True if the file name ends with ".pdf", ".jpg", or ".png", False otherwise.
|
|
|
"""
|
|
|
if (
|
|
|
filename.lower().endswith(".pdf")
|
|
|
or filename.lower().endswith(".jpg")
|
|
|
or filename.lower().endswith(".jpeg")
|
|
|
or filename.lower().endswith(".png")
|
|
|
):
|
|
|
output = True
|
|
|
else:
|
|
|
output = False
|
|
|
return output
|
|
|
|
|
|
|
|
|
def is_pdf(filename):
|
|
|
"""
|
|
|
Check if a file name is a PDF.
|
|
|
|
|
|
Args:
|
|
|
filename (str): The name of the file.
|
|
|
|
|
|
Returns:
|
|
|
bool: True if the file name ends with ".pdf", False otherwise.
|
|
|
"""
|
|
|
return filename.lower().endswith(".pdf")
|
|
|
|
|
|
|
|
|
def check_image_size_and_reduce(out_path: str, image: Image):
|
|
|
"""
|
|
|
Check if a given image size is above around 4.5mb, and reduce size if necessary.
|
|
|
5mb is the maximum possible to submit to AWS Textract.
|
|
|
|
|
|
Args:
|
|
|
out_path (str): The file path where the image is currently saved and will be saved after resizing.
|
|
|
image (Image): The PIL Image object to be checked and potentially resized.
|
|
|
"""
|
|
|
|
|
|
all_img_details = list()
|
|
|
page_num = 0
|
|
|
|
|
|
|
|
|
max_size = 4.5 * 1024 * 1024
|
|
|
file_size = os.path.getsize(out_path)
|
|
|
|
|
|
width = image.width
|
|
|
height = image.height
|
|
|
|
|
|
|
|
|
if file_size > max_size:
|
|
|
|
|
|
|
|
|
print(f"Image size before {width}x{height}, original file_size: {file_size}")
|
|
|
|
|
|
while file_size > max_size:
|
|
|
|
|
|
new_width = int(width * 0.5)
|
|
|
new_height = int(height * 0.5)
|
|
|
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
|
|
|
|
|
image.save(out_path, format="PNG", optimize=True)
|
|
|
|
|
|
|
|
|
file_size = os.path.getsize(out_path)
|
|
|
print(f"Resized to {new_width}x{new_height}, new file_size: {file_size}")
|
|
|
else:
|
|
|
new_width = width
|
|
|
new_height = height
|
|
|
|
|
|
all_img_details.append((page_num, image, new_width, new_height))
|
|
|
|
|
|
return image, new_width, new_height, all_img_details, out_path
|
|
|
|
|
|
|
|
|
def process_single_page_for_image_conversion(
|
|
|
pdf_path: str,
|
|
|
page_num: int,
|
|
|
image_dpi: float = image_dpi,
|
|
|
create_images: bool = True,
|
|
|
input_folder: str = INPUT_FOLDER,
|
|
|
) -> tuple[int, str, float, float]:
|
|
|
"""
|
|
|
Processes a single page of a PDF or image file for image conversion,
|
|
|
saving it as a PNG and optionally resizing it if too large.
|
|
|
|
|
|
Args:
|
|
|
pdf_path (str): The path to the input PDF or image file.
|
|
|
page_num (int): The 0-indexed page number to process.
|
|
|
image_dpi (float, optional): The DPI to use for PDF to image conversion. Defaults to image_dpi from config.
|
|
|
create_images (bool, optional): Whether to create and save the image. Defaults to True.
|
|
|
input_folder (str, optional): The folder where the converted images will be saved. Defaults to INPUT_FOLDER from config.
|
|
|
|
|
|
Returns:
|
|
|
tuple[int, str, float, float]: A tuple containing:
|
|
|
- The processed page number.
|
|
|
- The path to the saved output image.
|
|
|
- The width of the processed image.
|
|
|
- The height of the processed image.
|
|
|
"""
|
|
|
|
|
|
out_path_placeholder = "placeholder_image_" + str(page_num) + ".png"
|
|
|
|
|
|
if create_images is True:
|
|
|
try:
|
|
|
|
|
|
image_output_dir = secure_join(os.getcwd(), input_folder)
|
|
|
out_path = secure_join(
|
|
|
image_output_dir, f"{os.path.basename(pdf_path)}_{page_num}.png"
|
|
|
)
|
|
|
os.makedirs(os.path.dirname(out_path), exist_ok=True)
|
|
|
|
|
|
if os.path.exists(out_path):
|
|
|
|
|
|
image = Image.open(out_path)
|
|
|
elif pdf_path.lower().endswith(".pdf"):
|
|
|
|
|
|
image_l = convert_from_path(
|
|
|
pdf_path,
|
|
|
first_page=page_num + 1,
|
|
|
last_page=page_num + 1,
|
|
|
dpi=image_dpi,
|
|
|
use_cropbox=False,
|
|
|
use_pdftocairo=False,
|
|
|
)
|
|
|
image = image_l[0]
|
|
|
image = image.convert("L")
|
|
|
|
|
|
image.save(out_path, format="PNG")
|
|
|
elif (
|
|
|
pdf_path.lower().endswith(".jpg")
|
|
|
or pdf_path.lower().endswith(".png")
|
|
|
or pdf_path.lower().endswith(".jpeg")
|
|
|
):
|
|
|
image = Image.open(pdf_path)
|
|
|
image.save(out_path, format="PNG")
|
|
|
else:
|
|
|
raise Warning("Could not create image.")
|
|
|
|
|
|
width, height = image.size
|
|
|
|
|
|
|
|
|
|
|
|
image, width, height, all_img_details, img_path = (
|
|
|
check_image_size_and_reduce(out_path, image)
|
|
|
)
|
|
|
|
|
|
return page_num, out_path, width, height
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"Error processing page {page_num + 1}: {e}")
|
|
|
return page_num, out_path_placeholder, pd.NA, pd.NA
|
|
|
else:
|
|
|
|
|
|
return page_num, out_path_placeholder, pd.NA, pd.NA
|
|
|
|
|
|
|
|
|
def convert_pdf_to_images(
|
|
|
pdf_path: str,
|
|
|
prepare_for_review: bool = False,
|
|
|
page_min: int = 0,
|
|
|
page_max: int = 0,
|
|
|
create_images: bool = True,
|
|
|
image_dpi: float = image_dpi,
|
|
|
num_threads: int = 8,
|
|
|
input_folder: str = INPUT_FOLDER,
|
|
|
):
|
|
|
"""
|
|
|
Converts a PDF document into a series of images, processing each page concurrently.
|
|
|
|
|
|
Args:
|
|
|
pdf_path (str): The path to the PDF file to convert.
|
|
|
prepare_for_review (bool, optional): If True, only the first page is processed (feature not currently used). Defaults to False.
|
|
|
page_min (int, optional): The starting page number (0-indexed) for conversion. If 0, uses the first page. Defaults to 0.
|
|
|
page_max (int, optional): The ending page number (exclusive, 0-indexed) for conversion. If 0, uses the last page of the document. Defaults to 0.
|
|
|
create_images (bool, optional): If True, images are created and saved to disk. Defaults to True.
|
|
|
image_dpi (float, optional): The DPI (dots per inch) to use for converting PDF pages to images. Defaults to the global `image_dpi`.
|
|
|
num_threads (int, optional): The number of threads to use for concurrent page processing. Defaults to 8.
|
|
|
input_folder (str, optional): The base input folder, used for determining output paths. Defaults to `INPUT_FOLDER`.
|
|
|
|
|
|
Returns:
|
|
|
list: A list of tuples, where each tuple contains (page_num, image_path, width, height) for successfully processed pages.
|
|
|
For failed pages, it returns (page_num, placeholder_path, pd.NA, pd.NA).
|
|
|
"""
|
|
|
|
|
|
|
|
|
if prepare_for_review is True:
|
|
|
page_count = pdfinfo_from_path(pdf_path)["Pages"]
|
|
|
page_min = 0
|
|
|
page_max = page_count
|
|
|
else:
|
|
|
page_count = pdfinfo_from_path(pdf_path)["Pages"]
|
|
|
|
|
|
print(f"Creating images. Number of pages in PDF: {page_count}")
|
|
|
|
|
|
|
|
|
|
|
|
if page_min == 0:
|
|
|
page_min = 0
|
|
|
else:
|
|
|
page_min = page_min - 1
|
|
|
|
|
|
|
|
|
if page_max == 0:
|
|
|
page_max = page_count
|
|
|
|
|
|
results = list()
|
|
|
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
|
|
futures = list()
|
|
|
for page_num in range(page_min, page_max):
|
|
|
futures.append(
|
|
|
executor.submit(
|
|
|
process_single_page_for_image_conversion,
|
|
|
pdf_path,
|
|
|
page_num,
|
|
|
image_dpi,
|
|
|
create_images=create_images,
|
|
|
input_folder=input_folder,
|
|
|
)
|
|
|
)
|
|
|
|
|
|
for future in tqdm(
|
|
|
as_completed(futures),
|
|
|
total=len(futures),
|
|
|
unit="pages",
|
|
|
desc="Converting pages to image",
|
|
|
):
|
|
|
page_num, img_path, width, height = future.result()
|
|
|
if img_path:
|
|
|
results.append((page_num, img_path, width, height))
|
|
|
else:
|
|
|
print(f"Page {page_num + 1} failed to process.")
|
|
|
results.append(
|
|
|
(
|
|
|
page_num,
|
|
|
"placeholder_image_" + str(page_num) + ".png",
|
|
|
pd.NA,
|
|
|
pd.NA,
|
|
|
)
|
|
|
)
|
|
|
|
|
|
|
|
|
results.sort(key=lambda x: x[0])
|
|
|
images = [result[1] for result in results]
|
|
|
widths = [result[2] for result in results]
|
|
|
heights = [result[3] for result in results]
|
|
|
|
|
|
|
|
|
return images, widths, heights, results
|
|
|
|
|
|
|
|
|
|
|
|
def process_file_for_image_creation(
|
|
|
file_path: str,
|
|
|
prepare_for_review: bool = False,
|
|
|
input_folder: str = INPUT_FOLDER,
|
|
|
create_images: bool = True,
|
|
|
page_min: int = 0,
|
|
|
page_max: int = 0,
|
|
|
):
|
|
|
"""
|
|
|
Processes a given file path, determining if it's an image or a PDF,
|
|
|
and then converts it into a list of image paths, along with their dimensions.
|
|
|
|
|
|
Args:
|
|
|
file_path (str): The path to the file (image or PDF) to be processed.
|
|
|
prepare_for_review (bool, optional): If True, prepares the PDF for review
|
|
|
(e.g., by converting pages to images). Defaults to False.
|
|
|
input_folder (str, optional): The folder where input files are located. Defaults to INPUT_FOLDER.
|
|
|
create_images (bool, optional): If True, images will be created from PDF pages.
|
|
|
If False, only metadata will be extracted. Defaults to True.
|
|
|
page_min (int, optional): The minimum page number to process (0-indexed). If 0, uses the first page. Defaults to 0.
|
|
|
page_max (int, optional): The maximum page number to process (0-indexed). If 0, uses the last page of the document. Defaults to 0.
|
|
|
"""
|
|
|
|
|
|
file_extension = os.path.splitext(file_path)[1].lower()
|
|
|
|
|
|
|
|
|
if file_extension in [".jpg", ".jpeg", ".png"]:
|
|
|
print(f"{file_path} is an image file.")
|
|
|
|
|
|
img_object = [file_path]
|
|
|
|
|
|
|
|
|
image = Image.open(file_path)
|
|
|
img_object, image_sizes_width, image_sizes_height, all_img_details, img_path = (
|
|
|
check_image_size_and_reduce(file_path, image)
|
|
|
)
|
|
|
|
|
|
if not isinstance(image_sizes_width, list):
|
|
|
img_path = [img_path]
|
|
|
image_sizes_width = [image_sizes_width]
|
|
|
image_sizes_height = [image_sizes_height]
|
|
|
all_img_details = [all_img_details]
|
|
|
|
|
|
|
|
|
elif file_extension == ".pdf":
|
|
|
|
|
|
|
|
|
img_path, image_sizes_width, image_sizes_height, all_img_details = (
|
|
|
convert_pdf_to_images(
|
|
|
file_path,
|
|
|
prepare_for_review,
|
|
|
page_min=page_min,
|
|
|
page_max=page_max,
|
|
|
input_folder=input_folder,
|
|
|
create_images=create_images,
|
|
|
)
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
print(f"{file_path} is not an image or PDF file.")
|
|
|
img_path = list()
|
|
|
image_sizes_width = list()
|
|
|
image_sizes_height = list()
|
|
|
all_img_details = list()
|
|
|
|
|
|
return img_path, image_sizes_width, image_sizes_height, all_img_details
|
|
|
|
|
|
|
|
|
def get_input_file_names(file_input: List[str]):
|
|
|
"""
|
|
|
Get list of input files to report to logs.
|
|
|
"""
|
|
|
|
|
|
all_relevant_files = list()
|
|
|
file_name_with_extension = ""
|
|
|
full_file_name = ""
|
|
|
total_pdf_page_count = 0
|
|
|
|
|
|
if isinstance(file_input, dict):
|
|
|
file_input = os.path.abspath(file_input["name"])
|
|
|
|
|
|
if isinstance(file_input, str):
|
|
|
file_input_list = [file_input]
|
|
|
else:
|
|
|
file_input_list = file_input
|
|
|
|
|
|
for file in file_input_list:
|
|
|
if isinstance(file, str):
|
|
|
file_path = file
|
|
|
else:
|
|
|
file_path = file.name
|
|
|
|
|
|
file_path_without_ext = get_file_name_without_type(file_path)
|
|
|
|
|
|
file_extension = os.path.splitext(file_path)[1].lower()
|
|
|
|
|
|
|
|
|
if (
|
|
|
(
|
|
|
file_extension
|
|
|
in [
|
|
|
".jpg",
|
|
|
".jpeg",
|
|
|
".png",
|
|
|
".pdf",
|
|
|
".xlsx",
|
|
|
".csv",
|
|
|
".parquet",
|
|
|
".docx",
|
|
|
]
|
|
|
)
|
|
|
& ("review_file" not in file_path_without_ext)
|
|
|
& ("ocr_output" not in file_path_without_ext)
|
|
|
& ("ocr_results_with_words" not in file_path_without_ext)
|
|
|
):
|
|
|
all_relevant_files.append(file_path_without_ext)
|
|
|
file_name_with_extension = file_path_without_ext + file_extension
|
|
|
full_file_name = file_path
|
|
|
|
|
|
|
|
|
if file_extension in [".pdf"]:
|
|
|
|
|
|
pdf_document = pymupdf.open(file_path)
|
|
|
|
|
|
page_count = pdf_document.page_count
|
|
|
|
|
|
|
|
|
pdf_document.close()
|
|
|
else:
|
|
|
page_count = 1
|
|
|
|
|
|
total_pdf_page_count += page_count
|
|
|
|
|
|
all_relevant_files_str = ", ".join(all_relevant_files)
|
|
|
|
|
|
return (
|
|
|
all_relevant_files_str,
|
|
|
file_name_with_extension,
|
|
|
full_file_name,
|
|
|
all_relevant_files,
|
|
|
total_pdf_page_count,
|
|
|
)
|
|
|
|
|
|
|
|
|
def convert_pymupdf_to_image_coords(
|
|
|
pymupdf_page: Page,
|
|
|
x1: float,
|
|
|
y1: float,
|
|
|
x2: float,
|
|
|
y2: float,
|
|
|
image: Image = None,
|
|
|
image_dimensions: dict = dict(),
|
|
|
):
|
|
|
"""
|
|
|
Converts bounding box coordinates from PyMuPDF page format to image coordinates.
|
|
|
|
|
|
This function takes coordinates (x1, y1, x2, y2) defined relative to a
|
|
|
PyMuPDF page's coordinate system and transforms them to correspond to
|
|
|
the coordinate system of a target image. It accounts for scaling differences
|
|
|
between the page's mediabox/rect and the image dimensions, as well as
|
|
|
any potential offsets.
|
|
|
|
|
|
Args:
|
|
|
pymupdf_page (Page): The PyMuPDF page object from which the coordinates originate.
|
|
|
x1 (float): The x-coordinate of the top-left corner in PyMuPDF page units.
|
|
|
y1 (float): The y-coordinate of the top-left corner in PyMuPDF page units.
|
|
|
x2 (float): The x-coordinate of the bottom-right corner in PyMuPDF page units.
|
|
|
y2 (float): The y-coordinate of the bottom-right corner in PyMuPDF page units.
|
|
|
image (Image, optional): A PIL Image object. If provided, its dimensions
|
|
|
are used as the target image dimensions. Defaults to None.
|
|
|
image_dimensions (dict, optional): A dictionary containing 'image_width' and
|
|
|
'image_height'. Used if 'image' is not provided
|
|
|
and 'image' is None. Defaults to an empty dictionary.
|
|
|
"""
|
|
|
|
|
|
rect = pymupdf_page.rect
|
|
|
rect_width = rect.width
|
|
|
rect_height = rect.height
|
|
|
|
|
|
|
|
|
mediabox = pymupdf_page.mediabox
|
|
|
mediabox_width = mediabox.width
|
|
|
mediabox_height = mediabox.height
|
|
|
|
|
|
|
|
|
if image:
|
|
|
image_page_width, image_page_height = image.size
|
|
|
elif image_dimensions:
|
|
|
image_page_width, image_page_height = (
|
|
|
image_dimensions["image_width"],
|
|
|
image_dimensions["image_height"],
|
|
|
)
|
|
|
else:
|
|
|
image_page_width, image_page_height = mediabox_width, mediabox_height
|
|
|
|
|
|
|
|
|
image_to_mediabox_x_scale = image_page_width / mediabox_width
|
|
|
image_to_mediabox_y_scale = image_page_height / mediabox_height
|
|
|
|
|
|
|
|
|
|
|
|
x1_image = x1 * image_to_mediabox_x_scale
|
|
|
x2_image = x2 * image_to_mediabox_x_scale
|
|
|
y1_image = y1 * image_to_mediabox_y_scale
|
|
|
y2_image = y2 * image_to_mediabox_y_scale
|
|
|
|
|
|
|
|
|
if mediabox_width != rect_width:
|
|
|
|
|
|
mediabox_to_rect_x_scale = mediabox_width / rect_width
|
|
|
mediabox_to_rect_y_scale = mediabox_height / rect_height
|
|
|
|
|
|
rect_width / mediabox_width
|
|
|
|
|
|
|
|
|
mediabox_rect_x_diff = (mediabox_width - rect_width) * (
|
|
|
image_to_mediabox_x_scale / 2
|
|
|
)
|
|
|
mediabox_rect_y_diff = (mediabox_height - rect_height) * (
|
|
|
image_to_mediabox_y_scale / 2
|
|
|
)
|
|
|
|
|
|
x1_image -= mediabox_rect_x_diff
|
|
|
x2_image -= mediabox_rect_x_diff
|
|
|
y1_image += mediabox_rect_y_diff
|
|
|
y2_image += mediabox_rect_y_diff
|
|
|
|
|
|
|
|
|
x1_image *= mediabox_to_rect_x_scale
|
|
|
x2_image *= mediabox_to_rect_x_scale
|
|
|
y1_image *= mediabox_to_rect_y_scale
|
|
|
y2_image *= mediabox_to_rect_y_scale
|
|
|
|
|
|
return x1_image, y1_image, x2_image, y2_image
|
|
|
|
|
|
|
|
|
def create_page_size_objects(
|
|
|
pymupdf_doc: Document,
|
|
|
image_sizes_width: List[float],
|
|
|
image_sizes_height: List[float],
|
|
|
image_file_paths: List[str],
|
|
|
page_min: int = 0,
|
|
|
page_max: int = 0,
|
|
|
):
|
|
|
"""
|
|
|
Creates page size objects for a PyMuPDF document.
|
|
|
|
|
|
Creates entries for ALL pages in the document. Pages that were processed for image creation
|
|
|
will have actual image paths and dimensions. Pages that were not processed will have
|
|
|
placeholder image paths and no image dimensions.
|
|
|
|
|
|
Args:
|
|
|
pymupdf_doc (Document): The PyMuPDF document object.
|
|
|
image_sizes_width (List[float]): List of image widths for processed pages.
|
|
|
image_sizes_height (List[float]): List of image heights for processed pages.
|
|
|
image_file_paths (List[str]): List of image file paths for processed pages.
|
|
|
page_min (int, optional): The minimum page number that was processed (0-indexed). If 0, uses the first page. Defaults to 0.
|
|
|
page_max (int, optional): The maximum page number that was processed (0-indexed). If 0, uses the last page of the document. Defaults to 0.
|
|
|
"""
|
|
|
page_sizes = list()
|
|
|
original_cropboxes = list()
|
|
|
|
|
|
|
|
|
|
|
|
if page_min == 0:
|
|
|
page_min = 0
|
|
|
else:
|
|
|
page_min = page_min - 1
|
|
|
|
|
|
|
|
|
if page_max == 0:
|
|
|
page_max = len(pymupdf_doc)
|
|
|
|
|
|
|
|
|
for page_no in range(len(pymupdf_doc)):
|
|
|
reported_page_no = page_no + 1
|
|
|
pymupdf_page = pymupdf_doc.load_page(page_no)
|
|
|
original_cropboxes.append(pymupdf_page.cropbox)
|
|
|
|
|
|
|
|
|
is_page_in_range = page_min <= page_no < page_max
|
|
|
image_index = page_no - page_min if is_page_in_range else None
|
|
|
|
|
|
|
|
|
out_page_image_sizes = {
|
|
|
"page": reported_page_no,
|
|
|
"mediabox_width": pymupdf_page.mediabox.width,
|
|
|
"mediabox_height": pymupdf_page.mediabox.height,
|
|
|
"cropbox_width": pymupdf_page.cropbox.width,
|
|
|
"cropbox_height": pymupdf_page.cropbox.height,
|
|
|
"original_cropbox": original_cropboxes[-1],
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
out_page_image_sizes["cropbox_x_offset"] = (
|
|
|
pymupdf_page.cropbox.x0 - pymupdf_page.mediabox.x0
|
|
|
)
|
|
|
|
|
|
|
|
|
out_page_image_sizes["cropbox_y_offset_from_top"] = (
|
|
|
pymupdf_page.mediabox.y1 - pymupdf_page.cropbox.y1
|
|
|
)
|
|
|
|
|
|
|
|
|
if (
|
|
|
is_page_in_range
|
|
|
and image_index is not None
|
|
|
and image_index < len(image_file_paths)
|
|
|
):
|
|
|
|
|
|
out_page_image_sizes["image_path"] = image_file_paths[image_index]
|
|
|
|
|
|
|
|
|
if (
|
|
|
image_sizes_width
|
|
|
and image_sizes_height
|
|
|
and image_index < len(image_sizes_width)
|
|
|
and image_index < len(image_sizes_height)
|
|
|
):
|
|
|
out_page_image_sizes["image_width"] = image_sizes_width[image_index]
|
|
|
out_page_image_sizes["image_height"] = image_sizes_height[image_index]
|
|
|
else:
|
|
|
|
|
|
out_page_image_sizes["image_path"] = f"image_placeholder_{page_no}.png"
|
|
|
|
|
|
|
|
|
page_sizes.append(out_page_image_sizes)
|
|
|
|
|
|
return page_sizes, original_cropboxes
|
|
|
|
|
|
|
|
|
def word_level_ocr_output_to_dataframe(ocr_results: dict) -> pd.DataFrame:
|
|
|
"""
|
|
|
Convert a json of ocr results to a dataframe
|
|
|
|
|
|
Args:
|
|
|
ocr_results (dict): A dictionary containing OCR results.
|
|
|
|
|
|
Returns:
|
|
|
pd.DataFrame: A dataframe containing the OCR results.
|
|
|
"""
|
|
|
rows = list()
|
|
|
ocr_results[0]
|
|
|
|
|
|
for ocr_result in ocr_results:
|
|
|
|
|
|
page_number = int(ocr_result["page"])
|
|
|
|
|
|
for line_key, line_data in ocr_result["results"].items():
|
|
|
|
|
|
line_number = int(line_data["line"])
|
|
|
if "conf" not in line_data:
|
|
|
line_data["conf"] = 100.0
|
|
|
for word in line_data["words"]:
|
|
|
if "conf" not in word:
|
|
|
word["conf"] = 100.0
|
|
|
rows.append(
|
|
|
{
|
|
|
"page": page_number,
|
|
|
"line": line_number,
|
|
|
"word_text": word["text"],
|
|
|
"word_x0": word["bounding_box"][0],
|
|
|
"word_y0": word["bounding_box"][1],
|
|
|
"word_x1": word["bounding_box"][2],
|
|
|
"word_y1": word["bounding_box"][3],
|
|
|
"word_conf": word["conf"],
|
|
|
"line_text": "",
|
|
|
"line_x0": line_data["bounding_box"][0],
|
|
|
"line_y0": line_data["bounding_box"][1],
|
|
|
"line_x1": line_data["bounding_box"][2],
|
|
|
"line_y1": line_data["bounding_box"][3],
|
|
|
"line_conf": line_data["conf"],
|
|
|
}
|
|
|
)
|
|
|
|
|
|
return pd.DataFrame(rows)
|
|
|
|
|
|
|
|
|
def extract_redactions(
|
|
|
doc: Document, page_sizes: List[Dict[str, Any]] = None
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Extracts all redaction annotations from a PDF document and converts them
|
|
|
to Gradio Annotation JSON format.
|
|
|
|
|
|
Note: This function identifies the *markings* for redaction. It does not
|
|
|
tell you if the redaction has been *applied* (i.e., the underlying
|
|
|
content is permanently removed).
|
|
|
|
|
|
Args:
|
|
|
doc: The PyMuPDF document object.
|
|
|
page_sizes: List of dictionaries containing page information with keys:
|
|
|
'page', 'image_path', 'image_width', 'image_height'.
|
|
|
If None, will create placeholder structure.
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries suitable for Gradio Annotation output, one dict per image/page.
|
|
|
Each dict has structure: {"image": image_path, "boxes": [list of annotation boxes]}
|
|
|
"""
|
|
|
|
|
|
|
|
|
def _generate_unique_ids(num_ids: int, existing_ids: set = None) -> List[str]:
|
|
|
if existing_ids is None:
|
|
|
existing_ids = set()
|
|
|
|
|
|
id_length = 12
|
|
|
character_set = string.ascii_letters + string.digits
|
|
|
unique_ids = list()
|
|
|
|
|
|
for _ in range(num_ids):
|
|
|
while True:
|
|
|
candidate_id = "".join(random.choices(character_set, k=id_length))
|
|
|
if candidate_id not in existing_ids:
|
|
|
existing_ids.add(candidate_id)
|
|
|
unique_ids.append(candidate_id)
|
|
|
break
|
|
|
|
|
|
return unique_ids
|
|
|
|
|
|
|
|
|
redactions_by_page = dict()
|
|
|
existing_ids = set()
|
|
|
|
|
|
for page_num, page in enumerate(doc):
|
|
|
page_redactions = list()
|
|
|
|
|
|
|
|
|
for annot in page.annots():
|
|
|
|
|
|
if annot.type[0] == pymupdf.PDF_ANNOT_REDACT:
|
|
|
|
|
|
|
|
|
annot_info = annot.info or {}
|
|
|
annot_colors = annot.colors or {}
|
|
|
|
|
|
|
|
|
rect = annot.rect
|
|
|
x0, y0, x1, y1 = rect.x0, rect.y0, rect.x1, rect.y1
|
|
|
|
|
|
|
|
|
if page_sizes:
|
|
|
|
|
|
page_size_info = None
|
|
|
for ps in page_sizes:
|
|
|
if ps.get("page") == page_num + 1:
|
|
|
page_size_info = ps
|
|
|
break
|
|
|
|
|
|
if page_size_info:
|
|
|
mediabox_width = page_size_info.get("mediabox_width", 1)
|
|
|
mediabox_height = page_size_info.get("mediabox_height", 1)
|
|
|
|
|
|
|
|
|
rel_x0 = x0 / mediabox_width
|
|
|
rel_y0 = y0 / mediabox_height
|
|
|
rel_x1 = x1 / mediabox_width
|
|
|
rel_y1 = y1 / mediabox_height
|
|
|
else:
|
|
|
|
|
|
rel_x0, rel_y0, rel_x1, rel_y1 = x0, y0, x1, y1
|
|
|
else:
|
|
|
|
|
|
rel_x0, rel_y0, rel_x1, rel_y1 = x0, y0, x1, y1
|
|
|
|
|
|
|
|
|
fill_color = annot_colors.get(
|
|
|
"fill", (0, 0, 0)
|
|
|
)
|
|
|
if isinstance(fill_color, (tuple, list)) and len(fill_color) >= 3:
|
|
|
|
|
|
color_255 = tuple(
|
|
|
int(component * 255) if component <= 1 else int(component)
|
|
|
for component in fill_color[:3]
|
|
|
)
|
|
|
else:
|
|
|
color_255 = (0, 0, 0)
|
|
|
|
|
|
|
|
|
redaction_box = {
|
|
|
"label": annot_info.get(
|
|
|
"title", f"Redaction {len(page_redactions) + 1}"
|
|
|
),
|
|
|
"color": str(color_255),
|
|
|
"xmin": rel_x0,
|
|
|
"ymin": rel_y0,
|
|
|
"xmax": rel_x1,
|
|
|
"ymax": rel_y1,
|
|
|
"text": annot_info.get("content", ""),
|
|
|
"id": None,
|
|
|
}
|
|
|
|
|
|
page_redactions.append(redaction_box)
|
|
|
|
|
|
if page_redactions:
|
|
|
redactions_by_page[page_num + 1] = page_redactions
|
|
|
|
|
|
|
|
|
all_boxes = list()
|
|
|
for page_redactions in redactions_by_page.values():
|
|
|
all_boxes.extend(page_redactions)
|
|
|
|
|
|
if all_boxes:
|
|
|
unique_ids = _generate_unique_ids(len(all_boxes), existing_ids)
|
|
|
|
|
|
|
|
|
box_idx = 0
|
|
|
for page_num, page_redactions in redactions_by_page.items():
|
|
|
for box in page_redactions:
|
|
|
box["id"] = unique_ids[box_idx]
|
|
|
box_idx += 1
|
|
|
|
|
|
|
|
|
json_data = list()
|
|
|
|
|
|
if page_sizes:
|
|
|
|
|
|
for page_info in page_sizes:
|
|
|
page_num = page_info.get("page", 1)
|
|
|
image_path = page_info.get(
|
|
|
"image_path", f"placeholder_image_{page_num}.png"
|
|
|
)
|
|
|
|
|
|
|
|
|
annotation_boxes = redactions_by_page.get(page_num, [])
|
|
|
|
|
|
json_data.append({"image": image_path, "boxes": annotation_boxes})
|
|
|
else:
|
|
|
|
|
|
for page_num in range(1, doc.page_count + 1):
|
|
|
image_path = f"placeholder_image_{page_num}.png"
|
|
|
annotation_boxes = redactions_by_page.get(page_num, [])
|
|
|
|
|
|
json_data.append({"image": image_path, "boxes": annotation_boxes})
|
|
|
|
|
|
total_redactions = sum(len(boxes) for boxes in redactions_by_page.values())
|
|
|
print(f"Found {total_redactions} redactions in the document")
|
|
|
|
|
|
return json_data
|
|
|
|
|
|
|
|
|
def prepare_image_or_pdf(
|
|
|
file_paths: List[str],
|
|
|
text_extract_method: str,
|
|
|
all_line_level_ocr_results_df: pd.DataFrame = None,
|
|
|
all_page_line_level_ocr_results_with_words_df: pd.DataFrame = None,
|
|
|
latest_file_completed: int = 0,
|
|
|
out_message: List[str] = list(),
|
|
|
first_loop_state: bool = False,
|
|
|
number_of_pages: int = 0,
|
|
|
all_annotations_object: List = list(),
|
|
|
prepare_for_review: bool = False,
|
|
|
in_fully_redacted_list: List[int] = list(),
|
|
|
output_folder: str = OUTPUT_FOLDER,
|
|
|
input_folder: str = INPUT_FOLDER,
|
|
|
prepare_images: bool = True,
|
|
|
page_sizes: list[dict] = list(),
|
|
|
pymupdf_doc: Document = list(),
|
|
|
textract_output_found: bool = False,
|
|
|
relevant_ocr_output_with_words_found: bool = False,
|
|
|
page_min: int = 0,
|
|
|
page_max: int = 0,
|
|
|
progress: Progress = Progress(track_tqdm=True),
|
|
|
) -> tuple[List[str], List[str]]:
|
|
|
"""
|
|
|
Prepare and process image or text PDF files for redaction.
|
|
|
|
|
|
This function takes a list of file paths, processes each file based on the specified redaction method,
|
|
|
and returns the output messages and processed file paths.
|
|
|
|
|
|
Args:
|
|
|
file_paths (List[str]): List of file paths to process.
|
|
|
text_extract_method (str): The redaction method to use.
|
|
|
latest_file_completed (optional, int): Index of the last completed file.
|
|
|
out_message (optional, List[str]): List to store output messages.
|
|
|
first_loop_state (optional, bool): Flag indicating if this is the first iteration.
|
|
|
number_of_pages (optional, int): integer indicating the number of pages in the document
|
|
|
all_annotations_object(optional, List of annotation objects): All annotations for current document
|
|
|
prepare_for_review(optional, bool): Is this preparation step preparing pdfs and json files to review current redactions?
|
|
|
in_fully_redacted_list(optional, List of int): A list of pages to fully redact
|
|
|
output_folder (optional, str): The output folder for file save
|
|
|
prepare_images (optional, bool): A boolean indicating whether to create images for each PDF page. Defaults to True.
|
|
|
page_sizes(optional, List[dict]): A list of dicts containing information about page sizes in various formats.
|
|
|
pymupdf_doc(optional, Document): A pymupdf document object that indicates the existing PDF document object.
|
|
|
textract_output_found (optional, bool): A boolean indicating whether Textract analysis output has already been found. Defaults to False.
|
|
|
relevant_ocr_output_with_words_found (optional, bool): A boolean indicating whether local OCR analysis output has already been found. Defaults to False.
|
|
|
page_min (optional, int): The minimum page number to process (0-indexed). If 0, uses the first page. Defaults to 0.
|
|
|
page_max (optional, int): The maximum page number to process (0-indexed). If 0, uses the last page of the document. Defaults to 0.
|
|
|
progress (optional, Progress): Progress tracker for the operation
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
tuple[List[str], List[str]]: A tuple containing the output messages and processed file paths.
|
|
|
"""
|
|
|
|
|
|
tic = time.perf_counter()
|
|
|
json_from_csv = False
|
|
|
original_cropboxes = list()
|
|
|
converted_file_paths = list()
|
|
|
image_file_paths = list()
|
|
|
all_img_details = list()
|
|
|
review_file_csv = pd.DataFrame()
|
|
|
out_textract_path = ""
|
|
|
combined_out_message = ""
|
|
|
final_out_message = ""
|
|
|
log_files_output_paths = list()
|
|
|
|
|
|
if isinstance(in_fully_redacted_list, pd.DataFrame):
|
|
|
if not in_fully_redacted_list.empty:
|
|
|
in_fully_redacted_list = in_fully_redacted_list.iloc[:, 0].tolist()
|
|
|
|
|
|
|
|
|
if first_loop_state is True:
|
|
|
latest_file_completed = 0
|
|
|
out_message = list()
|
|
|
all_annotations_object = list()
|
|
|
else:
|
|
|
print("Now redacting file", str(latest_file_completed))
|
|
|
|
|
|
|
|
|
if isinstance(out_message, str):
|
|
|
out_message = [out_message]
|
|
|
|
|
|
if not file_paths:
|
|
|
file_paths = list()
|
|
|
|
|
|
if isinstance(file_paths, dict):
|
|
|
file_paths = os.path.abspath(file_paths["name"])
|
|
|
|
|
|
if isinstance(file_paths, str):
|
|
|
file_path_number = 1
|
|
|
else:
|
|
|
file_path_number = len(file_paths)
|
|
|
|
|
|
if file_path_number > MAX_SIMULTANEOUS_FILES:
|
|
|
out_message = f"Number of files loaded is greater than {MAX_SIMULTANEOUS_FILES}. Please submit a smaller number of files."
|
|
|
print(out_message)
|
|
|
raise Exception(out_message)
|
|
|
|
|
|
latest_file_completed = int(latest_file_completed)
|
|
|
|
|
|
|
|
|
if latest_file_completed >= file_path_number:
|
|
|
print("Last file reached, returning files:", str(latest_file_completed))
|
|
|
if isinstance(out_message, list):
|
|
|
final_out_message = "\n".join(out_message)
|
|
|
else:
|
|
|
final_out_message = out_message
|
|
|
|
|
|
return (
|
|
|
final_out_message,
|
|
|
converted_file_paths,
|
|
|
image_file_paths,
|
|
|
number_of_pages,
|
|
|
number_of_pages,
|
|
|
pymupdf_doc,
|
|
|
all_annotations_object,
|
|
|
review_file_csv,
|
|
|
original_cropboxes,
|
|
|
page_sizes,
|
|
|
textract_output_found,
|
|
|
all_img_details,
|
|
|
all_line_level_ocr_results_df,
|
|
|
relevant_ocr_output_with_words_found,
|
|
|
all_page_line_level_ocr_results_with_words_df,
|
|
|
)
|
|
|
|
|
|
progress(0.1, desc="Preparing file")
|
|
|
|
|
|
if isinstance(file_paths, str):
|
|
|
file_paths_list = [file_paths]
|
|
|
file_paths_loop = file_paths_list
|
|
|
else:
|
|
|
file_paths_list = file_paths
|
|
|
file_paths_loop = sorted(
|
|
|
file_paths_list,
|
|
|
key=lambda x: (
|
|
|
os.path.splitext(x)[1] != ".pdf",
|
|
|
os.path.splitext(x)[1] != ".json",
|
|
|
),
|
|
|
)
|
|
|
|
|
|
|
|
|
for file in file_paths_loop:
|
|
|
converted_file_path = list()
|
|
|
image_file_path = list()
|
|
|
|
|
|
if isinstance(file, str):
|
|
|
file_path = file
|
|
|
else:
|
|
|
file_path = file.name
|
|
|
file_path_without_ext = get_file_name_without_type(file_path)
|
|
|
file_name_with_ext = os.path.basename(file_path)
|
|
|
|
|
|
print("Loading file:", file_name_with_ext)
|
|
|
|
|
|
if not file_path:
|
|
|
out_message = "Please select at least one file."
|
|
|
print(out_message)
|
|
|
raise Warning(out_message)
|
|
|
|
|
|
file_extension = os.path.splitext(file_path)[1].lower()
|
|
|
|
|
|
|
|
|
if is_pdf(file_path):
|
|
|
print(f"File {file_name_with_ext} is a PDF")
|
|
|
pymupdf_doc = pymupdf.open(file_path)
|
|
|
|
|
|
converted_file_path = file_path
|
|
|
|
|
|
if prepare_images is True:
|
|
|
(
|
|
|
image_file_paths,
|
|
|
image_sizes_width,
|
|
|
image_sizes_height,
|
|
|
all_img_details,
|
|
|
) = process_file_for_image_creation(
|
|
|
file_path,
|
|
|
prepare_for_review,
|
|
|
input_folder,
|
|
|
create_images=True,
|
|
|
page_min=page_min,
|
|
|
page_max=page_max,
|
|
|
)
|
|
|
else:
|
|
|
(
|
|
|
image_file_paths,
|
|
|
image_sizes_width,
|
|
|
image_sizes_height,
|
|
|
all_img_details,
|
|
|
) = process_file_for_image_creation(
|
|
|
file_path,
|
|
|
prepare_for_review,
|
|
|
input_folder,
|
|
|
create_images=False,
|
|
|
page_min=page_min,
|
|
|
page_max=page_max,
|
|
|
)
|
|
|
|
|
|
page_sizes, original_cropboxes = create_page_size_objects(
|
|
|
pymupdf_doc,
|
|
|
image_sizes_width,
|
|
|
image_sizes_height,
|
|
|
image_file_paths,
|
|
|
page_min,
|
|
|
page_max,
|
|
|
)
|
|
|
|
|
|
|
|
|
if (not all_annotations_object) & (prepare_for_review is True):
|
|
|
all_annotations_object = list()
|
|
|
|
|
|
for image_path in image_file_paths:
|
|
|
annotation = dict()
|
|
|
annotation["image"] = image_path
|
|
|
annotation["boxes"] = list()
|
|
|
|
|
|
all_annotations_object.append(annotation)
|
|
|
|
|
|
|
|
|
if (
|
|
|
LOAD_REDACTION_ANNOTATIONS_FROM_PDF is True
|
|
|
and prepare_for_review is True
|
|
|
):
|
|
|
|
|
|
redactions_list = extract_redactions(pymupdf_doc, page_sizes)
|
|
|
all_annotations_object = redactions_list
|
|
|
|
|
|
elif is_pdf_or_image(file_path):
|
|
|
print(f"File {file_name_with_ext} is an image")
|
|
|
|
|
|
if (
|
|
|
file_extension in [".jpg", ".jpeg", ".png"]
|
|
|
and text_extract_method == SELECTABLE_TEXT_EXTRACT_OPTION
|
|
|
):
|
|
|
text_extract_method = TESSERACT_TEXT_EXTRACT_OPTION
|
|
|
|
|
|
|
|
|
pymupdf_doc = pymupdf.open()
|
|
|
|
|
|
img = Image.open(file_path)
|
|
|
rect = pymupdf.Rect(
|
|
|
0, 0, img.width, img.height
|
|
|
)
|
|
|
pymupdf_page = pymupdf_doc.new_page(
|
|
|
width=img.width, height=img.height
|
|
|
)
|
|
|
pymupdf_page.insert_image(
|
|
|
rect, filename=file_path
|
|
|
)
|
|
|
pymupdf_page = pymupdf_doc.load_page(0)
|
|
|
|
|
|
file_path_str = str(file_path)
|
|
|
|
|
|
image_file_paths, image_sizes_width, image_sizes_height, all_img_details = (
|
|
|
process_file_for_image_creation(
|
|
|
file_path_str, prepare_for_review, input_folder, create_images=True
|
|
|
)
|
|
|
)
|
|
|
|
|
|
|
|
|
page_sizes, original_cropboxes = create_page_size_objects(
|
|
|
pymupdf_doc, image_sizes_width, image_sizes_height, image_file_paths
|
|
|
)
|
|
|
|
|
|
converted_file_path = output_folder + file_name_with_ext
|
|
|
|
|
|
pymupdf_doc.save(converted_file_path, garbage=4, deflate=True, clean=True)
|
|
|
|
|
|
|
|
|
elif file_extension in [".csv"]:
|
|
|
if "_review_file" in file_path_without_ext:
|
|
|
review_file_csv = read_file(file_path)
|
|
|
all_annotations_object = convert_review_df_to_annotation_json(
|
|
|
review_file_csv, image_file_paths, page_sizes
|
|
|
)
|
|
|
json_from_csv = True
|
|
|
elif "_ocr_output" in file_path_without_ext:
|
|
|
all_line_level_ocr_results_df = read_file(file_path)
|
|
|
|
|
|
if "line" not in all_line_level_ocr_results_df.columns:
|
|
|
all_line_level_ocr_results_df["line"] = ""
|
|
|
|
|
|
json_from_csv = False
|
|
|
elif "_ocr_results_with_words" in file_path_without_ext:
|
|
|
all_page_line_level_ocr_results_with_words_df = read_file(file_path)
|
|
|
json_from_csv = False
|
|
|
|
|
|
|
|
|
|
|
|
if (file_extension in [".json"]) | (json_from_csv is True):
|
|
|
|
|
|
if (file_extension in [".json"]) & (prepare_for_review is True):
|
|
|
if isinstance(file_path, str):
|
|
|
|
|
|
file_path_obj = Path(file_path)
|
|
|
base_dir = file_path_obj.parent
|
|
|
filename = file_path_obj.name
|
|
|
|
|
|
json_content = secure_file_read(base_dir, filename)
|
|
|
all_annotations_object = json.loads(json_content)
|
|
|
else:
|
|
|
|
|
|
all_annotations_object = json.loads(
|
|
|
file_path
|
|
|
)
|
|
|
|
|
|
|
|
|
elif (
|
|
|
file_extension in [".json"]
|
|
|
) and "_textract" in file_path_without_ext:
|
|
|
print("Saving Textract output")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
textract_pattern = re.compile(
|
|
|
r"_textract\.json$|_[a-z]+(?:_[a-z]+)*_textract\.json$"
|
|
|
)
|
|
|
if textract_pattern.search(file_path):
|
|
|
|
|
|
output_textract_json_file_name = file_path_without_ext + ".json"
|
|
|
elif file_path.endswith("_textract.json"):
|
|
|
output_textract_json_file_name = file_path_without_ext + ".json"
|
|
|
else:
|
|
|
|
|
|
output_textract_json_file_name = (
|
|
|
file_path_without_ext + "_textract.json"
|
|
|
)
|
|
|
|
|
|
out_textract_path = secure_join(
|
|
|
output_folder, output_textract_json_file_name
|
|
|
)
|
|
|
|
|
|
|
|
|
shutil.copy2(file_path, out_textract_path)
|
|
|
textract_output_found = True
|
|
|
continue
|
|
|
|
|
|
elif (
|
|
|
file_extension in [".json"]
|
|
|
) and "_ocr_results_with_words" in file_path_without_ext:
|
|
|
print("Saving local OCR output with words")
|
|
|
|
|
|
output_ocr_results_with_words_json_file_name = (
|
|
|
file_path_without_ext + ".json"
|
|
|
)
|
|
|
|
|
|
out_ocr_results_with_words_path = secure_join(
|
|
|
output_folder, output_ocr_results_with_words_json_file_name
|
|
|
)
|
|
|
|
|
|
|
|
|
shutil.copy2(
|
|
|
file_path, out_ocr_results_with_words_path
|
|
|
)
|
|
|
|
|
|
if prepare_for_review is True:
|
|
|
print("Converting local OCR output with words to csv")
|
|
|
page_sizes_df = pd.DataFrame(page_sizes)
|
|
|
(
|
|
|
all_page_line_level_ocr_results_with_words,
|
|
|
is_missing,
|
|
|
log_files_output_paths,
|
|
|
) = load_and_convert_ocr_results_with_words_json(
|
|
|
out_ocr_results_with_words_path,
|
|
|
log_files_output_paths,
|
|
|
page_sizes_df,
|
|
|
)
|
|
|
all_page_line_level_ocr_results_with_words_df = (
|
|
|
word_level_ocr_output_to_dataframe(
|
|
|
all_page_line_level_ocr_results_with_words
|
|
|
)
|
|
|
)
|
|
|
|
|
|
all_page_line_level_ocr_results_with_words_df = (
|
|
|
divide_coordinates_by_page_sizes(
|
|
|
all_page_line_level_ocr_results_with_words_df,
|
|
|
page_sizes_df,
|
|
|
xmin="word_x0",
|
|
|
xmax="word_x1",
|
|
|
ymin="word_y0",
|
|
|
ymax="word_y1",
|
|
|
)
|
|
|
)
|
|
|
all_page_line_level_ocr_results_with_words_df = (
|
|
|
divide_coordinates_by_page_sizes(
|
|
|
all_page_line_level_ocr_results_with_words_df,
|
|
|
page_sizes_df,
|
|
|
xmin="line_x0",
|
|
|
xmax="line_x1",
|
|
|
ymin="line_y0",
|
|
|
ymax="line_y1",
|
|
|
)
|
|
|
)
|
|
|
|
|
|
if (
|
|
|
text_extract_method == SELECTABLE_TEXT_EXTRACT_OPTION
|
|
|
and file_path.endswith("_ocr_results_with_words_local_text.json")
|
|
|
):
|
|
|
relevant_ocr_output_with_words_found = True
|
|
|
if (
|
|
|
text_extract_method == TESSERACT_TEXT_EXTRACT_OPTION
|
|
|
and file_path.endswith("_ocr_results_with_words_local_ocr.json")
|
|
|
):
|
|
|
relevant_ocr_output_with_words_found = True
|
|
|
if (
|
|
|
text_extract_method == TEXTRACT_TEXT_EXTRACT_OPTION
|
|
|
and file_path.endswith("_ocr_results_with_words_textract.json")
|
|
|
):
|
|
|
relevant_ocr_output_with_words_found = True
|
|
|
continue
|
|
|
|
|
|
|
|
|
if all_annotations_object:
|
|
|
|
|
|
image_file_paths_pages = [
|
|
|
safe_extract_page_number_from_path(s)
|
|
|
for s in image_file_paths
|
|
|
if safe_extract_page_number_from_path(s) is not None
|
|
|
]
|
|
|
image_file_paths_pages = [int(i) for i in image_file_paths_pages]
|
|
|
|
|
|
|
|
|
if image_file_paths:
|
|
|
for i, image_file_path in enumerate(image_file_paths):
|
|
|
|
|
|
if i < len(all_annotations_object):
|
|
|
annotation = all_annotations_object[i]
|
|
|
else:
|
|
|
annotation = dict()
|
|
|
all_annotations_object.append(annotation)
|
|
|
|
|
|
try:
|
|
|
if not annotation:
|
|
|
annotation = {"image": "", "boxes": []}
|
|
|
annotation_page_number = (
|
|
|
safe_extract_page_number_from_path(image_file_path)
|
|
|
)
|
|
|
if annotation_page_number is None:
|
|
|
continue
|
|
|
else:
|
|
|
annotation_page_number = (
|
|
|
safe_extract_page_number_from_path(
|
|
|
annotation["image"]
|
|
|
)
|
|
|
)
|
|
|
if annotation_page_number is None:
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
print("Extracting page number from image failed due to:", e)
|
|
|
annotation_page_number = 0
|
|
|
|
|
|
|
|
|
if annotation_page_number in image_file_paths_pages:
|
|
|
|
|
|
|
|
|
correct_image_page = annotation_page_number
|
|
|
annotation["image"] = image_file_paths[correct_image_page]
|
|
|
else:
|
|
|
print(
|
|
|
"Page", annotation_page_number, "image file not found."
|
|
|
)
|
|
|
|
|
|
all_annotations_object[i] = annotation
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out_folder = output_folder + file_path_without_ext + ".json"
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
if file_extension in [".zip"]:
|
|
|
|
|
|
|
|
|
out_folder = secure_join(
|
|
|
output_folder, file_path_without_ext + "_textract.json"
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(file_path, "r") as zip_ref:
|
|
|
json_files = [
|
|
|
f for f in zip_ref.namelist() if f.lower().endswith(".json")
|
|
|
]
|
|
|
|
|
|
if len(json_files) == 1:
|
|
|
json_filename = json_files[0]
|
|
|
|
|
|
|
|
|
extracted_path = secure_join(
|
|
|
os.path.dirname(file_path), json_filename
|
|
|
)
|
|
|
zip_ref.extract(json_filename, os.path.dirname(file_path))
|
|
|
|
|
|
|
|
|
shutil.move(extracted_path, out_folder)
|
|
|
|
|
|
textract_output_found = True
|
|
|
else:
|
|
|
print(
|
|
|
f"Skipping {file_path}: Expected 1 JSON file, found {len(json_files)}"
|
|
|
)
|
|
|
|
|
|
converted_file_paths.append(converted_file_path)
|
|
|
image_file_paths.extend(image_file_path)
|
|
|
|
|
|
toc = time.perf_counter()
|
|
|
out_time = f"File '{file_name_with_ext}' prepared in {toc - tic:0.1f} seconds."
|
|
|
|
|
|
print(out_time)
|
|
|
|
|
|
out_message.append(out_time)
|
|
|
combined_out_message = "\n".join(out_message)
|
|
|
|
|
|
if not page_sizes:
|
|
|
number_of_pages = 1
|
|
|
else:
|
|
|
number_of_pages = len(page_sizes)
|
|
|
|
|
|
print(f"Finished loading in {file_path_number} file(s)")
|
|
|
|
|
|
return (
|
|
|
combined_out_message,
|
|
|
converted_file_paths,
|
|
|
image_file_paths,
|
|
|
number_of_pages,
|
|
|
number_of_pages,
|
|
|
pymupdf_doc,
|
|
|
all_annotations_object,
|
|
|
review_file_csv,
|
|
|
original_cropboxes,
|
|
|
page_sizes,
|
|
|
textract_output_found,
|
|
|
all_img_details,
|
|
|
all_line_level_ocr_results_df,
|
|
|
relevant_ocr_output_with_words_found,
|
|
|
all_page_line_level_ocr_results_with_words_df,
|
|
|
)
|
|
|
|
|
|
|
|
|
def load_and_convert_ocr_results_with_words_json(
|
|
|
ocr_results_with_words_json_file_path: str,
|
|
|
log_files_output_paths: str,
|
|
|
page_sizes_df: pd.DataFrame,
|
|
|
):
|
|
|
"""
|
|
|
Loads Textract JSON from a file, detects if conversion is needed, and converts if necessary.
|
|
|
"""
|
|
|
|
|
|
if not os.path.exists(ocr_results_with_words_json_file_path):
|
|
|
print("No existing OCR results file found.")
|
|
|
return (
|
|
|
[],
|
|
|
True,
|
|
|
log_files_output_paths,
|
|
|
)
|
|
|
|
|
|
print("Found existing OCR results json results file.")
|
|
|
|
|
|
|
|
|
if ocr_results_with_words_json_file_path not in log_files_output_paths:
|
|
|
log_files_output_paths.append(ocr_results_with_words_json_file_path)
|
|
|
|
|
|
try:
|
|
|
with open(
|
|
|
ocr_results_with_words_json_file_path, "r", encoding="utf-8"
|
|
|
) as json_file:
|
|
|
ocr_results_with_words_data = json.load(json_file)
|
|
|
except json.JSONDecodeError:
|
|
|
print("Error: Failed to parse OCR results JSON file. Returning empty data.")
|
|
|
return [], True, log_files_output_paths
|
|
|
|
|
|
|
|
|
if "page" and "results" in ocr_results_with_words_data[0]:
|
|
|
print("JSON already in the correct format for app. No changes needed.")
|
|
|
return (
|
|
|
ocr_results_with_words_data,
|
|
|
False,
|
|
|
log_files_output_paths,
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
print("Invalid OCR result JSON format: 'page' or 'results' key missing.")
|
|
|
|
|
|
return (
|
|
|
[],
|
|
|
True,
|
|
|
log_files_output_paths,
|
|
|
)
|
|
|
|
|
|
|
|
|
def convert_text_pdf_to_img_pdf(
|
|
|
in_file_path: str,
|
|
|
out_text_file_path: List[str],
|
|
|
image_dpi: float = image_dpi,
|
|
|
output_folder: str = OUTPUT_FOLDER,
|
|
|
input_folder: str = INPUT_FOLDER,
|
|
|
):
|
|
|
file_path_without_ext = get_file_name_without_type(in_file_path)
|
|
|
|
|
|
print(
|
|
|
"In convert_text_pdf_to_img_pdf function, file_path_without_ext:",
|
|
|
file_path_without_ext,
|
|
|
)
|
|
|
|
|
|
out_file_paths = out_text_file_path
|
|
|
|
|
|
|
|
|
pdf_text_image_paths, image_sizes_width, image_sizes_height, all_img_details = (
|
|
|
process_file_for_image_creation(out_file_paths[0], input_folder=input_folder)
|
|
|
)
|
|
|
out_text_image_file_path = (
|
|
|
output_folder + file_path_without_ext + "_text_redacted_as_img.pdf"
|
|
|
)
|
|
|
pdf_text_image_paths[0].save(
|
|
|
out_text_image_file_path,
|
|
|
"PDF",
|
|
|
resolution=image_dpi,
|
|
|
save_all=True,
|
|
|
append_images=pdf_text_image_paths[1:],
|
|
|
)
|
|
|
|
|
|
out_file_paths = [out_text_image_file_path]
|
|
|
|
|
|
out_message = "PDF " + file_path_without_ext + " converted to image-based file."
|
|
|
print(out_message)
|
|
|
|
|
|
return out_message, out_file_paths
|
|
|
|
|
|
|
|
|
def save_pdf_with_or_without_compression(
|
|
|
pymupdf_doc: object,
|
|
|
out_redacted_pdf_file_path,
|
|
|
COMPRESS_REDACTED_PDF: bool = COMPRESS_REDACTED_PDF,
|
|
|
):
|
|
|
"""
|
|
|
Save a pymupdf document with basic cleaning or with full compression options. Can be useful for low memory systems to do minimal cleaning to avoid crashing with large PDFs.
|
|
|
"""
|
|
|
if COMPRESS_REDACTED_PDF is True:
|
|
|
pymupdf_doc.save(
|
|
|
out_redacted_pdf_file_path, garbage=4, deflate=True, clean=True
|
|
|
)
|
|
|
else:
|
|
|
pymupdf_doc.save(out_redacted_pdf_file_path, garbage=1, clean=True)
|
|
|
|
|
|
|
|
|
def join_values_within_threshold(df1: pd.DataFrame, df2: pd.DataFrame):
|
|
|
|
|
|
threshold = 5
|
|
|
|
|
|
|
|
|
df1["key"] = 1
|
|
|
df2["key"] = 1
|
|
|
merged = pd.merge(df1, df2, on="key").drop(columns=["key"])
|
|
|
|
|
|
|
|
|
conditions = (
|
|
|
(abs(merged["xmin_x"] - merged["xmin_y"]) <= threshold)
|
|
|
& (abs(merged["xmax_x"] - merged["xmax_y"]) <= threshold)
|
|
|
& (abs(merged["ymin_x"] - merged["ymin_y"]) <= threshold)
|
|
|
& (abs(merged["ymax_x"] - merged["ymax_y"]) <= threshold)
|
|
|
)
|
|
|
|
|
|
|
|
|
filtered = merged[conditions]
|
|
|
|
|
|
|
|
|
result = filtered.drop_duplicates(subset=["xmin_x", "xmax_x", "ymin_x", "ymax_x"])
|
|
|
|
|
|
|
|
|
final_df = pd.merge(
|
|
|
df1,
|
|
|
result,
|
|
|
left_on=["xmin", "xmax", "ymin", "ymax"],
|
|
|
right_on=["xmin_x", "xmax_x", "ymin_x", "ymax_x"],
|
|
|
how="left",
|
|
|
)
|
|
|
|
|
|
|
|
|
final_df = final_df.drop(columns=["key"])
|
|
|
|
|
|
|
|
|
def remove_duplicate_images_with_blank_boxes(data: List[dict]) -> List[dict]:
|
|
|
"""
|
|
|
Remove items from the annotator object where the same page exists twice.
|
|
|
"""
|
|
|
|
|
|
image_groups = defaultdict(list)
|
|
|
for item in data:
|
|
|
image_groups[item["image"]].append(item)
|
|
|
|
|
|
|
|
|
result = list()
|
|
|
for image, items in image_groups.items():
|
|
|
|
|
|
non_empty_boxes = [item for item in items if item.get("boxes")]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if non_empty_boxes:
|
|
|
|
|
|
result.append(non_empty_boxes[0])
|
|
|
else:
|
|
|
|
|
|
result.append(items[0])
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
def divide_coordinates_by_page_sizes(
|
|
|
review_file_df: pd.DataFrame,
|
|
|
page_sizes_df: pd.DataFrame,
|
|
|
xmin="xmin",
|
|
|
xmax="xmax",
|
|
|
ymin="ymin",
|
|
|
ymax="ymax",
|
|
|
) -> pd.DataFrame:
|
|
|
"""
|
|
|
Optimized function to convert absolute image coordinates (>1) to relative coordinates (<=1).
|
|
|
|
|
|
Identifies rows with absolute coordinates, merges page size information,
|
|
|
divides coordinates by dimensions, and combines with already-relative rows.
|
|
|
|
|
|
Args:
|
|
|
review_file_df: Input DataFrame with potentially mixed coordinate systems.
|
|
|
page_sizes_df: DataFrame with page dimensions ('page', 'image_width',
|
|
|
'image_height', 'mediabox_width', 'mediabox_height').
|
|
|
xmin, xmax, ymin, ymax: Names of the coordinate columns.
|
|
|
|
|
|
Returns:
|
|
|
DataFrame with coordinates converted to relative system, sorted.
|
|
|
"""
|
|
|
if review_file_df.empty or xmin not in review_file_df.columns:
|
|
|
return review_file_df
|
|
|
|
|
|
|
|
|
coord_cols = [xmin, xmax, ymin, ymax]
|
|
|
cols_to_convert = coord_cols + ["page"]
|
|
|
temp_df = review_file_df.copy()
|
|
|
|
|
|
for col in cols_to_convert:
|
|
|
if col in temp_df.columns:
|
|
|
temp_df[col] = pd.to_numeric(temp_df[col], errors="coerce")
|
|
|
else:
|
|
|
|
|
|
if col == "page" or col in coord_cols:
|
|
|
print(
|
|
|
f"Warning: Required column '{col}' not found in review_file_df. Returning original DataFrame."
|
|
|
)
|
|
|
return review_file_df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_absolute_mask = (
|
|
|
(temp_df[xmin] > 1)
|
|
|
& (temp_df[xmin].notna())
|
|
|
& (temp_df[xmax] > 1)
|
|
|
& (temp_df[xmax].notna())
|
|
|
& (temp_df[ymin] > 1)
|
|
|
& (temp_df[ymin].notna())
|
|
|
& (temp_df[ymax] > 1)
|
|
|
& (temp_df[ymax].notna())
|
|
|
)
|
|
|
|
|
|
|
|
|
df_rel = temp_df[
|
|
|
~is_absolute_mask
|
|
|
]
|
|
|
df_abs = temp_df[
|
|
|
is_absolute_mask
|
|
|
].copy()
|
|
|
|
|
|
|
|
|
if not df_abs.empty:
|
|
|
|
|
|
if "image_width" not in df_abs.columns and not page_sizes_df.empty:
|
|
|
ps_df_copy = page_sizes_df.copy()
|
|
|
|
|
|
|
|
|
ps_df_copy["page"] = pd.to_numeric(ps_df_copy["page"], errors="coerce")
|
|
|
|
|
|
|
|
|
merge_cols = [
|
|
|
"page",
|
|
|
"image_width",
|
|
|
"image_height",
|
|
|
"mediabox_width",
|
|
|
"mediabox_height",
|
|
|
]
|
|
|
available_merge_cols = [
|
|
|
col for col in merge_cols if col in ps_df_copy.columns
|
|
|
]
|
|
|
|
|
|
|
|
|
for col in [
|
|
|
"image_width",
|
|
|
"image_height",
|
|
|
"mediabox_width",
|
|
|
"mediabox_height",
|
|
|
]:
|
|
|
if col in ps_df_copy.columns:
|
|
|
|
|
|
if ps_df_copy[col].dtype == "object":
|
|
|
ps_df_copy[col] = ps_df_copy[col].replace("<NA>", pd.NA)
|
|
|
|
|
|
ps_df_copy[col] = pd.to_numeric(ps_df_copy[col], errors="coerce")
|
|
|
|
|
|
|
|
|
if "page" in available_merge_cols:
|
|
|
df_abs = df_abs.merge(
|
|
|
ps_df_copy[available_merge_cols], on="page", how="left"
|
|
|
)
|
|
|
else:
|
|
|
print(
|
|
|
"Warning: 'page' column not found in page_sizes_df. Cannot merge dimensions."
|
|
|
)
|
|
|
|
|
|
|
|
|
if "image_width" in df_abs.columns and "mediabox_width" in df_abs.columns:
|
|
|
|
|
|
if df_abs["image_width"].isna().all():
|
|
|
|
|
|
df_abs["image_width"] = df_abs["image_width"].fillna(
|
|
|
df_abs["mediabox_width"]
|
|
|
)
|
|
|
df_abs["image_height"] = df_abs["image_height"].fillna(
|
|
|
df_abs["mediabox_height"]
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
divisors_numeric = True
|
|
|
for col in ["image_width", "image_height"]:
|
|
|
if col in df_abs.columns:
|
|
|
df_abs[col] = pd.to_numeric(df_abs[col], errors="coerce")
|
|
|
else:
|
|
|
print(
|
|
|
f"Warning: Dimension column '{col}' missing. Cannot perform division."
|
|
|
)
|
|
|
divisors_numeric = False
|
|
|
|
|
|
|
|
|
if (
|
|
|
divisors_numeric
|
|
|
and "image_width" in df_abs.columns
|
|
|
and "image_height" in df_abs.columns
|
|
|
):
|
|
|
|
|
|
with np.errstate(divide="ignore", invalid="ignore"):
|
|
|
df_abs[xmin] = round(df_abs[xmin] / df_abs["image_width"], 6)
|
|
|
df_abs[xmax] = round(df_abs[xmax] / df_abs["image_width"], 6)
|
|
|
df_abs[ymin] = round(df_abs[ymin] / df_abs["image_height"], 6)
|
|
|
df_abs[ymax] = round(df_abs[ymax] / df_abs["image_height"], 6)
|
|
|
|
|
|
df_abs.replace([np.inf, -np.inf], np.nan, inplace=True)
|
|
|
else:
|
|
|
print(
|
|
|
"Skipping coordinate division due to missing or non-numeric dimension columns."
|
|
|
)
|
|
|
|
|
|
|
|
|
dfs_to_concat = [df for df in [df_rel, df_abs] if not df.empty]
|
|
|
|
|
|
if dfs_to_concat:
|
|
|
final_df = pd.concat(dfs_to_concat, ignore_index=True)
|
|
|
else:
|
|
|
|
|
|
print(
|
|
|
"Warning: Both relative and absolute splits resulted in empty DataFrames."
|
|
|
)
|
|
|
final_df = pd.DataFrame(columns=review_file_df.columns)
|
|
|
|
|
|
|
|
|
required_sort_columns = {"page", xmin, ymin}
|
|
|
if not final_df.empty and required_sort_columns.issubset(final_df.columns):
|
|
|
|
|
|
final_df["page"] = pd.to_numeric(final_df["page"], errors="coerce")
|
|
|
final_df[ymin] = pd.to_numeric(final_df[ymin], errors="coerce")
|
|
|
final_df[xmin] = pd.to_numeric(final_df[xmin], errors="coerce")
|
|
|
|
|
|
final_df.sort_values(["page", ymin, xmin], inplace=True, na_position="last")
|
|
|
|
|
|
|
|
|
|
|
|
cols_to_drop = ["image_width", "image_height", "mediabox_width", "mediabox_height"]
|
|
|
final_df = final_df.drop(columns=cols_to_drop, errors="ignore")
|
|
|
|
|
|
return final_df
|
|
|
|
|
|
|
|
|
def multiply_coordinates_by_page_sizes(
|
|
|
review_file_df: pd.DataFrame,
|
|
|
page_sizes_df: pd.DataFrame,
|
|
|
xmin="xmin",
|
|
|
xmax="xmax",
|
|
|
ymin="ymin",
|
|
|
ymax="ymax",
|
|
|
):
|
|
|
"""
|
|
|
Optimized function to convert relative coordinates to absolute based on page sizes.
|
|
|
|
|
|
Separates relative (<=1) and absolute (>1) coordinates, merges page sizes
|
|
|
for relative coordinates, calculates absolute pixel values, and recombines.
|
|
|
"""
|
|
|
if review_file_df.empty or xmin not in review_file_df.columns:
|
|
|
return review_file_df
|
|
|
|
|
|
coord_cols = [xmin, xmax, ymin, ymax]
|
|
|
|
|
|
for col in coord_cols + ["page"]:
|
|
|
if col in review_file_df.columns:
|
|
|
|
|
|
|
|
|
review_file_df[col] = pd.to_numeric(review_file_df[col], errors="coerce")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_relative_mask = (
|
|
|
(review_file_df[xmin].le(1) & review_file_df[xmin].notna())
|
|
|
& (review_file_df[xmax].le(1) & review_file_df[xmax].notna())
|
|
|
& (review_file_df[ymin].le(1) & review_file_df[ymin].notna())
|
|
|
& (review_file_df[ymax].le(1) & review_file_df[ymax].notna())
|
|
|
)
|
|
|
|
|
|
|
|
|
df_abs = review_file_df[~is_relative_mask].copy()
|
|
|
df_rel = review_file_df[is_relative_mask].copy()
|
|
|
|
|
|
if df_rel.empty:
|
|
|
|
|
|
if not df_abs.empty and {"page", xmin, ymin}.issubset(df_abs.columns):
|
|
|
df_abs.sort_values(["page", xmin, ymin], inplace=True, na_position="last")
|
|
|
return df_abs
|
|
|
|
|
|
|
|
|
if "image_width" not in df_rel.columns and not page_sizes_df.empty:
|
|
|
|
|
|
page_sizes_df = page_sizes_df.copy()
|
|
|
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce")
|
|
|
|
|
|
page_sizes_df[["image_width", "image_height"]] = page_sizes_df[
|
|
|
["image_width", "image_height"]
|
|
|
].replace("<NA>", pd.NA)
|
|
|
page_sizes_df["image_width"] = pd.to_numeric(
|
|
|
page_sizes_df["image_width"], errors="coerce"
|
|
|
)
|
|
|
page_sizes_df["image_height"] = pd.to_numeric(
|
|
|
page_sizes_df["image_height"], errors="coerce"
|
|
|
)
|
|
|
|
|
|
|
|
|
df_rel = df_rel.merge(
|
|
|
page_sizes_df[["page", "image_width", "image_height"]],
|
|
|
on="page",
|
|
|
how="left",
|
|
|
)
|
|
|
|
|
|
|
|
|
if "image_width" in df_rel.columns:
|
|
|
|
|
|
has_size_mask = df_rel["image_width"].notna() & df_rel["image_height"].notna()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df_rel.loc[has_size_mask, xmin] *= df_rel.loc[has_size_mask, "image_width"]
|
|
|
df_rel.loc[has_size_mask, xmax] *= df_rel.loc[has_size_mask, "image_width"]
|
|
|
df_rel.loc[has_size_mask, ymin] *= df_rel.loc[has_size_mask, "image_height"]
|
|
|
df_rel.loc[has_size_mask, ymax] *= df_rel.loc[has_size_mask, "image_height"]
|
|
|
|
|
|
|
|
|
|
|
|
dfs_to_concat = [df for df in [df_abs, df_rel] if not df.empty]
|
|
|
|
|
|
if not dfs_to_concat:
|
|
|
return pd.DataFrame()
|
|
|
|
|
|
final_df = pd.concat(
|
|
|
dfs_to_concat, ignore_index=True
|
|
|
)
|
|
|
|
|
|
|
|
|
required_sort_columns = {"page", xmin, ymin}
|
|
|
if not final_df.empty and required_sort_columns.issubset(final_df.columns):
|
|
|
|
|
|
final_df.sort_values(["page", xmin, ymin], inplace=True, na_position="last")
|
|
|
|
|
|
return final_df
|
|
|
|
|
|
|
|
|
def do_proximity_match_by_page_for_text(df1: pd.DataFrame, df2: pd.DataFrame):
|
|
|
"""
|
|
|
Match text from one dataframe to another based on proximity matching of coordinates page by page.
|
|
|
"""
|
|
|
|
|
|
if "text" not in df2.columns:
|
|
|
df2["text"] = ""
|
|
|
if "text" not in df1.columns:
|
|
|
df1["text"] = ""
|
|
|
|
|
|
|
|
|
merge_keys = ["xmin", "ymin", "xmax", "ymax", "label", "page"]
|
|
|
df1["key"] = df1[merge_keys].astype(str).agg("_".join, axis=1)
|
|
|
df2["key"] = df2[merge_keys].astype(str).agg("_".join, axis=1)
|
|
|
|
|
|
|
|
|
merged_df = df1.merge(
|
|
|
df2[["key", "text"]], on="key", how="left", suffixes=("", "_duplicate")
|
|
|
)
|
|
|
|
|
|
|
|
|
merged_df["text"] = np.where(
|
|
|
merged_df["text"].isna() | (merged_df["text"] == ""),
|
|
|
merged_df.pop("text_duplicate"),
|
|
|
merged_df["text"],
|
|
|
)
|
|
|
|
|
|
|
|
|
tolerance = 0.02
|
|
|
|
|
|
|
|
|
page_trees = dict()
|
|
|
for page in df2["page"].unique():
|
|
|
df2_page = df2[df2["page"] == page]
|
|
|
coords = df2_page[["xmin", "ymin", "xmax", "ymax"]].values
|
|
|
if np.all(np.isfinite(coords)) and len(coords) > 0:
|
|
|
page_trees[page] = (cKDTree(coords), df2_page)
|
|
|
|
|
|
|
|
|
for i, row in df1.iterrows():
|
|
|
page_number = row["page"]
|
|
|
|
|
|
if page_number in page_trees:
|
|
|
tree, df2_page = page_trees[page_number]
|
|
|
|
|
|
|
|
|
dist, idx = tree.query(
|
|
|
[row[["xmin", "ymin", "xmax", "ymax"]].values],
|
|
|
distance_upper_bound=tolerance,
|
|
|
)
|
|
|
|
|
|
if dist[0] < tolerance and idx[0] < len(df2_page):
|
|
|
merged_df.at[i, "text"] = df2_page.iloc[idx[0]]["text"]
|
|
|
|
|
|
|
|
|
merged_df.drop(columns=["key"], inplace=True)
|
|
|
|
|
|
return merged_df
|
|
|
|
|
|
|
|
|
def do_proximity_match_all_pages_for_text(
|
|
|
df1: pd.DataFrame, df2: pd.DataFrame, threshold: float = 0.03
|
|
|
):
|
|
|
"""
|
|
|
Match text from one dataframe to another based on proximity matching of coordinates across all pages.
|
|
|
"""
|
|
|
|
|
|
if "text" not in df2.columns:
|
|
|
df2["text"] = ""
|
|
|
if "text" not in df1.columns:
|
|
|
df1["text"] = ""
|
|
|
|
|
|
for col in ["xmin", "ymin", "xmax", "ymax"]:
|
|
|
df1[col] = pd.to_numeric(df1[col], errors="coerce")
|
|
|
|
|
|
for col in ["xmin", "ymin", "xmax", "ymax"]:
|
|
|
df2[col] = pd.to_numeric(df2[col], errors="coerce")
|
|
|
|
|
|
|
|
|
merge_keys = ["xmin", "ymin", "xmax", "ymax", "label", "page"]
|
|
|
df1["key"] = df1[merge_keys].astype(str).agg("_".join, axis=1)
|
|
|
df2["key"] = df2[merge_keys].astype(str).agg("_".join, axis=1)
|
|
|
|
|
|
|
|
|
merged_df = df1.merge(
|
|
|
df2[["key", "text"]], on="key", how="left", suffixes=("", "_duplicate")
|
|
|
)
|
|
|
|
|
|
|
|
|
merged_df["text"] = np.where(
|
|
|
merged_df["text"].isna() | (merged_df["text"] == ""),
|
|
|
merged_df.pop("text_duplicate"),
|
|
|
merged_df["text"],
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
query_coords = np.array(df1[["xmin", "ymin", "xmax", "ymax"]].values, dtype=float)
|
|
|
|
|
|
|
|
|
finite_mask = np.isfinite(query_coords).all(axis=1)
|
|
|
if not finite_mask.all():
|
|
|
|
|
|
query_coords = query_coords[
|
|
|
finite_mask
|
|
|
]
|
|
|
else:
|
|
|
pass
|
|
|
|
|
|
|
|
|
if query_coords.size > 0:
|
|
|
|
|
|
finite_mask_df2 = np.isfinite(df2[["xmin", "ymin", "xmax", "ymax"]].values).all(
|
|
|
axis=1
|
|
|
)
|
|
|
df2_finite = df2[finite_mask_df2]
|
|
|
|
|
|
|
|
|
tree = cKDTree(df2_finite[["xmin", "ymin", "xmax", "ymax"]].values)
|
|
|
|
|
|
|
|
|
tolerance = threshold
|
|
|
distances, indices = tree.query(query_coords, distance_upper_bound=tolerance)
|
|
|
|
|
|
|
|
|
for i, (dist, idx) in enumerate(zip(distances, indices)):
|
|
|
if dist < tolerance and idx < len(df2_finite):
|
|
|
merged_df.at[i, "text"] = df2_finite.iloc[idx]["text"]
|
|
|
|
|
|
|
|
|
merged_df.drop(columns=["key"], inplace=True)
|
|
|
|
|
|
return merged_df
|
|
|
|
|
|
|
|
|
def _extract_page_number(image_path: Any) -> int:
|
|
|
"""Helper function to safely extract page number."""
|
|
|
if not isinstance(image_path, str):
|
|
|
return 1
|
|
|
match = IMAGE_NUM_REGEX.search(image_path)
|
|
|
if match:
|
|
|
try:
|
|
|
return int(match.group(1)) + 1
|
|
|
except (ValueError, TypeError):
|
|
|
return 1
|
|
|
return 1
|
|
|
|
|
|
|
|
|
def convert_annotation_data_to_dataframe(all_annotations: List[Dict[str, Any]]):
|
|
|
"""
|
|
|
Convert annotation list to DataFrame using Pandas explode and json_normalize.
|
|
|
"""
|
|
|
if not all_annotations:
|
|
|
|
|
|
print("No annotations found, returning empty dataframe")
|
|
|
return pd.DataFrame(
|
|
|
columns=[
|
|
|
"image",
|
|
|
"page",
|
|
|
"label",
|
|
|
"color",
|
|
|
"xmin",
|
|
|
"xmax",
|
|
|
"ymin",
|
|
|
"ymax",
|
|
|
"text",
|
|
|
"id",
|
|
|
]
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
df = pd.DataFrame(
|
|
|
{
|
|
|
"image": [anno.get("image") for anno in all_annotations],
|
|
|
|
|
|
"boxes": [
|
|
|
(
|
|
|
anno.get("boxes")
|
|
|
if isinstance(anno.get("boxes"), list)
|
|
|
else (
|
|
|
[anno.get("boxes")]
|
|
|
if isinstance(anno.get("boxes"), dict)
|
|
|
else []
|
|
|
)
|
|
|
)
|
|
|
for anno in all_annotations
|
|
|
],
|
|
|
}
|
|
|
)
|
|
|
|
|
|
|
|
|
df["page"] = df["image"].apply(_extract_page_number)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
placeholder_box = {
|
|
|
"xmin": pd.NA,
|
|
|
"xmax": pd.NA,
|
|
|
"ymin": pd.NA,
|
|
|
"ymax": pd.NA,
|
|
|
"text": pd.NA,
|
|
|
"id": pd.NA,
|
|
|
}
|
|
|
df["boxes"] = df["boxes"].apply(lambda x: x if x else [placeholder_box])
|
|
|
|
|
|
|
|
|
df_exploded = df.explode("boxes", ignore_index=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mask = df_exploded["boxes"].notna() & df_exploded["boxes"].apply(
|
|
|
isinstance, args=(dict,)
|
|
|
)
|
|
|
normalized_boxes = pd.json_normalize(df_exploded.loc[mask, "boxes"])
|
|
|
|
|
|
|
|
|
|
|
|
final_df = (
|
|
|
df_exploded.loc[mask, ["image", "page"]]
|
|
|
.reset_index(drop=True)
|
|
|
.join(normalized_boxes)
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
essential_box_cols = ["xmin", "xmax", "ymin", "ymax", "text", "id", "label"]
|
|
|
for col in essential_box_cols:
|
|
|
if col not in final_df.columns:
|
|
|
final_df[col] = pd.NA
|
|
|
final_df[col] = final_df[col].replace({None: pd.NA})
|
|
|
|
|
|
base_cols = ["image"]
|
|
|
extra_box_cols = [
|
|
|
col
|
|
|
for col in final_df.columns
|
|
|
if col not in base_cols and col not in essential_box_cols
|
|
|
]
|
|
|
final_col_order = base_cols + essential_box_cols + sorted(extra_box_cols)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final_df = final_df.reindex(columns=final_col_order, fill_value=pd.NA)
|
|
|
final_df = final_df.dropna(
|
|
|
subset=["xmin", "xmax", "ymin", "ymax", "text", "id", "label"], how="all"
|
|
|
)
|
|
|
final_df.replace({None: pd.NA})
|
|
|
|
|
|
return final_df
|
|
|
|
|
|
|
|
|
def create_annotation_dicts_from_annotation_df(
|
|
|
all_image_annotations_df: pd.DataFrame, page_sizes: List[Dict[str, Any]]
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Convert annotation DataFrame back to list of dicts using dictionary lookup.
|
|
|
Ensures all images from page_sizes are present without duplicates.
|
|
|
"""
|
|
|
|
|
|
|
|
|
image_dict: Dict[str, Dict[str, Any]] = dict()
|
|
|
for item in page_sizes:
|
|
|
image_path = item.get("image_path")
|
|
|
if image_path:
|
|
|
image_dict[image_path] = {"image": image_path, "boxes": []}
|
|
|
|
|
|
|
|
|
if (
|
|
|
all_image_annotations_df.empty
|
|
|
or "image" not in all_image_annotations_df.columns
|
|
|
):
|
|
|
|
|
|
return list(image_dict.values())
|
|
|
|
|
|
|
|
|
|
|
|
box_cols = ["xmin", "ymin", "xmax", "ymax", "color", "label", "text", "id"]
|
|
|
available_cols = [
|
|
|
col for col in box_cols if col in all_image_annotations_df.columns
|
|
|
]
|
|
|
|
|
|
if "text" in all_image_annotations_df.columns:
|
|
|
all_image_annotations_df["text"] = all_image_annotations_df["text"].fillna("")
|
|
|
|
|
|
|
|
|
if not available_cols:
|
|
|
print(
|
|
|
f"Warning: None of the expected box columns ({box_cols}) found in DataFrame."
|
|
|
)
|
|
|
return list(image_dict.values())
|
|
|
|
|
|
|
|
|
|
|
|
coord_cols = ["xmin", "ymin", "xmax", "ymax"]
|
|
|
valid_box_df = all_image_annotations_df.dropna(
|
|
|
subset=[col for col in coord_cols if col in available_cols]
|
|
|
).copy()
|
|
|
|
|
|
|
|
|
if valid_box_df.empty:
|
|
|
print(
|
|
|
"Warning: No valid annotation rows found in DataFrame after dropping NA coordinates."
|
|
|
)
|
|
|
return list(image_dict.values())
|
|
|
|
|
|
|
|
|
try:
|
|
|
for image_path, group in valid_box_df.groupby(
|
|
|
"image", observed=True, sort=False
|
|
|
):
|
|
|
|
|
|
if image_path in image_dict:
|
|
|
|
|
|
|
|
|
boxes = group[available_cols].to_dict(orient="records")
|
|
|
|
|
|
image_dict[image_path]["boxes"] = boxes
|
|
|
|
|
|
except KeyError:
|
|
|
|
|
|
print("Error: Issue grouping DataFrame by 'image'.")
|
|
|
return list(image_dict.values())
|
|
|
|
|
|
|
|
|
result = list(image_dict.values())
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
def convert_annotation_json_to_review_df(
|
|
|
all_annotations: List[dict],
|
|
|
redaction_decision_output: pd.DataFrame = pd.DataFrame(),
|
|
|
page_sizes: List[dict] = list(),
|
|
|
do_proximity_match: bool = True,
|
|
|
) -> pd.DataFrame:
|
|
|
"""
|
|
|
Convert the annotation json data to a dataframe format.
|
|
|
Add on any text from the initial review_file dataframe by joining based on 'id' if available
|
|
|
in both sources, otherwise falling back to joining on pages/co-ordinates (if option selected).
|
|
|
|
|
|
Refactored for improved efficiency, prioritizing ID-based join and conditionally applying
|
|
|
coordinate division and proximity matching.
|
|
|
"""
|
|
|
|
|
|
|
|
|
review_file_df = convert_annotation_data_to_dataframe(all_annotations)
|
|
|
|
|
|
|
|
|
|
|
|
review_file_df.dropna(
|
|
|
subset=["xmin", "ymin", "xmax", "ymax"], how="any", inplace=True
|
|
|
)
|
|
|
|
|
|
|
|
|
if review_file_df.empty:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
standard_cols = [
|
|
|
"image",
|
|
|
"page",
|
|
|
"label",
|
|
|
"color",
|
|
|
"xmin",
|
|
|
"ymin",
|
|
|
"xmax",
|
|
|
"ymax",
|
|
|
"text",
|
|
|
]
|
|
|
if "id" in review_file_df.columns:
|
|
|
standard_cols.append("id")
|
|
|
return pd.DataFrame(columns=standard_cols)
|
|
|
|
|
|
|
|
|
if "id" not in review_file_df.columns:
|
|
|
review_file_df["id"] = ""
|
|
|
|
|
|
if (
|
|
|
not redaction_decision_output.empty
|
|
|
and "id" not in redaction_decision_output.columns
|
|
|
):
|
|
|
redaction_decision_output["id"] = ""
|
|
|
|
|
|
|
|
|
|
|
|
page_sizes_df = pd.DataFrame()
|
|
|
if page_sizes:
|
|
|
page_sizes_df = pd.DataFrame(page_sizes)
|
|
|
if not page_sizes_df.empty:
|
|
|
|
|
|
page_sizes_df["page"] = pd.to_numeric(
|
|
|
page_sizes_df["page"], errors="coerce"
|
|
|
)
|
|
|
page_sizes_df.dropna(subset=["page"], inplace=True)
|
|
|
if not page_sizes_df.empty:
|
|
|
page_sizes_df["page"] = page_sizes_df["page"].astype(int)
|
|
|
else:
|
|
|
print(
|
|
|
"Warning: Page sizes DataFrame became empty after processing, coordinate division will be skipped."
|
|
|
)
|
|
|
|
|
|
|
|
|
text_added_successfully = False
|
|
|
|
|
|
if not redaction_decision_output.empty:
|
|
|
|
|
|
|
|
|
|
|
|
id_col_exists_in_review = (
|
|
|
"id" in review_file_df.columns
|
|
|
and not review_file_df["id"].isnull().all()
|
|
|
and not (review_file_df["id"] == "").all()
|
|
|
)
|
|
|
id_col_exists_in_redaction = (
|
|
|
"id" in redaction_decision_output.columns
|
|
|
and not redaction_decision_output["id"].isnull().all()
|
|
|
and not (redaction_decision_output["id"] == "").all()
|
|
|
)
|
|
|
|
|
|
if id_col_exists_in_review and id_col_exists_in_redaction:
|
|
|
|
|
|
try:
|
|
|
|
|
|
review_file_df["id"] = review_file_df["id"].astype(str)
|
|
|
|
|
|
|
|
|
redaction_copy = redaction_decision_output.copy()
|
|
|
redaction_copy["id"] = redaction_copy["id"].astype(str)
|
|
|
|
|
|
|
|
|
cols_to_merge = ["id"]
|
|
|
if "text" in redaction_copy.columns:
|
|
|
cols_to_merge.append("text")
|
|
|
else:
|
|
|
print(
|
|
|
"Warning: 'text' column not found in redaction_decision_output. Cannot merge text using 'id'."
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
original_text_col_exists = "text" in review_file_df.columns
|
|
|
merge_suffix = "_redaction" if original_text_col_exists else ""
|
|
|
|
|
|
merged_df = pd.merge(
|
|
|
review_file_df,
|
|
|
redaction_copy[cols_to_merge],
|
|
|
on="id",
|
|
|
how="left",
|
|
|
suffixes=("", merge_suffix),
|
|
|
)
|
|
|
|
|
|
|
|
|
if "text" + merge_suffix in merged_df.columns:
|
|
|
redaction_text_col = "text" + merge_suffix
|
|
|
if original_text_col_exists:
|
|
|
|
|
|
merged_df["text"] = merged_df[redaction_text_col].combine_first(
|
|
|
merged_df["text"]
|
|
|
)
|
|
|
|
|
|
merged_df = merged_df.drop(columns=[redaction_text_col])
|
|
|
else:
|
|
|
|
|
|
merged_df = merged_df.rename(
|
|
|
columns={redaction_text_col: "text"}
|
|
|
)
|
|
|
|
|
|
text_added_successfully = (
|
|
|
True
|
|
|
)
|
|
|
|
|
|
review_file_df = merged_df
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
print(
|
|
|
f"Error during 'id'-based merge: {e}. Checking for proximity match fallback."
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not text_added_successfully and do_proximity_match:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "page" in review_file_df.columns:
|
|
|
review_file_df["page"] = (
|
|
|
pd.to_numeric(review_file_df["page"], errors="coerce")
|
|
|
.fillna(-1)
|
|
|
.astype(int)
|
|
|
)
|
|
|
review_file_df = review_file_df[
|
|
|
review_file_df["page"] != -1
|
|
|
]
|
|
|
if (
|
|
|
not redaction_decision_output.empty
|
|
|
and "page" in redaction_decision_output.columns
|
|
|
):
|
|
|
redaction_decision_output["page"] = (
|
|
|
pd.to_numeric(redaction_decision_output["page"], errors="coerce")
|
|
|
.fillna(-1)
|
|
|
.astype(int)
|
|
|
)
|
|
|
redaction_decision_output = redaction_decision_output[
|
|
|
redaction_decision_output["page"] != -1
|
|
|
]
|
|
|
|
|
|
|
|
|
if not page_sizes_df.empty:
|
|
|
|
|
|
review_file_df = divide_coordinates_by_page_sizes(
|
|
|
review_file_df, page_sizes_df
|
|
|
)
|
|
|
if not redaction_decision_output.empty:
|
|
|
redaction_decision_output = divide_coordinates_by_page_sizes(
|
|
|
redaction_decision_output, page_sizes_df
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if not redaction_decision_output.empty:
|
|
|
try:
|
|
|
review_file_df = do_proximity_match_all_pages_for_text(
|
|
|
df1=review_file_df,
|
|
|
df2=redaction_decision_output,
|
|
|
)
|
|
|
|
|
|
if "text" in review_file_df.columns:
|
|
|
text_added_successfully = True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(
|
|
|
f"Error during proximity match: {e}. Text data may not be added."
|
|
|
)
|
|
|
|
|
|
elif not text_added_successfully and not do_proximity_match:
|
|
|
print(
|
|
|
"Skipping joining text data (ID join not possible/failed, proximity match disabled)."
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
required_columns_base = [
|
|
|
"image",
|
|
|
"page",
|
|
|
"label",
|
|
|
"color",
|
|
|
"xmin",
|
|
|
"ymin",
|
|
|
"xmax",
|
|
|
"ymax",
|
|
|
]
|
|
|
final_columns = required_columns_base[:]
|
|
|
|
|
|
|
|
|
if "id" in review_file_df.columns:
|
|
|
final_columns.append("id")
|
|
|
if "text" in review_file_df.columns:
|
|
|
final_columns.append("text")
|
|
|
|
|
|
|
|
|
for col in final_columns:
|
|
|
if col not in review_file_df.columns:
|
|
|
|
|
|
|
|
|
review_file_df[col] = (
|
|
|
""
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
review_file_df = review_file_df[
|
|
|
[col for col in final_columns if col in review_file_df.columns]
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
if "color" in review_file_df.columns:
|
|
|
|
|
|
if review_file_df["color"].apply(lambda x: isinstance(x, list)).any():
|
|
|
review_file_df.loc[:, "color"] = review_file_df.loc[:, "color"].apply(
|
|
|
lambda x: tuple(x) if isinstance(x, list) else x
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
sort_columns = ["page", "ymin", "xmin", "label"]
|
|
|
valid_sort_columns = [col for col in sort_columns if col in review_file_df.columns]
|
|
|
if valid_sort_columns and not review_file_df.empty:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
review_file_df = review_file_df.sort_values(valid_sort_columns)
|
|
|
except TypeError as e:
|
|
|
print(
|
|
|
f"Warning: Could not sort DataFrame due to type error in sort columns: {e}"
|
|
|
)
|
|
|
|
|
|
|
|
|
base_cols = ["xmin", "xmax", "ymin", "ymax", "text", "id", "label"]
|
|
|
|
|
|
for col in base_cols:
|
|
|
if col not in review_file_df.columns:
|
|
|
review_file_df[col] = pd.NA
|
|
|
|
|
|
review_file_df = review_file_df.dropna(subset=base_cols, how="all")
|
|
|
|
|
|
return review_file_df
|
|
|
|
|
|
|
|
|
def fill_missing_ids_in_list(data_list: list) -> list:
|
|
|
"""
|
|
|
Generates unique alphanumeric IDs for dictionaries in a list where the 'id' is
|
|
|
missing, blank, or not a 12-character string.
|
|
|
|
|
|
Args:
|
|
|
data_list (list): A list of dictionaries, each potentially with an 'id' key.
|
|
|
|
|
|
Returns:
|
|
|
list: The input list with missing/invalid IDs filled.
|
|
|
Note: The function modifies the input list in place.
|
|
|
"""
|
|
|
|
|
|
|
|
|
if not isinstance(data_list, list):
|
|
|
raise TypeError("Input 'data_list' must be a list.")
|
|
|
|
|
|
if not data_list:
|
|
|
return data_list
|
|
|
|
|
|
id_length = 12
|
|
|
character_set = string.ascii_letters + string.digits
|
|
|
|
|
|
|
|
|
|
|
|
existing_ids = set()
|
|
|
for item in data_list:
|
|
|
if not isinstance(item, dict):
|
|
|
continue
|
|
|
item_id = item.get("id")
|
|
|
if isinstance(item_id, str) and len(item_id) == id_length:
|
|
|
existing_ids.add(item_id)
|
|
|
|
|
|
|
|
|
generated_ids_set = set()
|
|
|
num_filled = 0
|
|
|
|
|
|
for item in data_list:
|
|
|
if not isinstance(item, dict):
|
|
|
continue
|
|
|
|
|
|
item_id = item.get("id")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
needs_new_id = (
|
|
|
item_id is None
|
|
|
or not isinstance(item_id, str)
|
|
|
or item_id.strip() == ""
|
|
|
or len(item_id) != id_length
|
|
|
)
|
|
|
|
|
|
if needs_new_id:
|
|
|
|
|
|
attempts = 0
|
|
|
while True:
|
|
|
candidate_id = "".join(random.choices(character_set, k=id_length))
|
|
|
|
|
|
if (
|
|
|
candidate_id not in existing_ids
|
|
|
and candidate_id not in generated_ids_set
|
|
|
):
|
|
|
generated_ids_set.add(candidate_id)
|
|
|
item["id"] = (
|
|
|
candidate_id
|
|
|
)
|
|
|
num_filled += 1
|
|
|
break
|
|
|
attempts += 1
|
|
|
|
|
|
if attempts > len(data_list) * 100 + 1000:
|
|
|
raise RuntimeError(
|
|
|
f"Failed to generate a unique ID after {attempts} attempts. Check ID length or existing IDs."
|
|
|
)
|
|
|
|
|
|
if num_filled > 0:
|
|
|
pass
|
|
|
|
|
|
else:
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
return data_list
|
|
|
|
|
|
|
|
|
def fill_missing_box_ids(data_input: dict) -> dict:
|
|
|
"""
|
|
|
Generates unique alphanumeric IDs for bounding boxes in an input dictionary
|
|
|
where the 'id' is missing, blank, or not a 12-character string.
|
|
|
|
|
|
Args:
|
|
|
data_input (dict): The input dictionary containing 'image' and 'boxes' keys.
|
|
|
'boxes' should be a list of dictionaries, each potentially
|
|
|
with an 'id' key.
|
|
|
|
|
|
Returns:
|
|
|
dict: The input dictionary with missing/invalid box IDs filled.
|
|
|
Note: The function modifies the input dictionary in place.
|
|
|
"""
|
|
|
|
|
|
|
|
|
if not isinstance(data_input, dict):
|
|
|
raise TypeError("Input 'data_input' must be a dictionary.")
|
|
|
|
|
|
|
|
|
|
|
|
boxes = data_input
|
|
|
id_length = 12
|
|
|
character_set = string.ascii_letters + string.digits
|
|
|
|
|
|
|
|
|
|
|
|
existing_ids = set()
|
|
|
|
|
|
|
|
|
box_id = boxes.get("id")
|
|
|
if isinstance(box_id, str) and len(box_id) == id_length:
|
|
|
existing_ids.add(box_id)
|
|
|
|
|
|
|
|
|
generated_ids_set = set()
|
|
|
num_filled = 0
|
|
|
|
|
|
|
|
|
box_id = boxes.get("id")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
needs_new_id = (
|
|
|
box_id is None
|
|
|
or not isinstance(box_id, str)
|
|
|
or box_id.strip() == ""
|
|
|
or len(box_id) != id_length
|
|
|
)
|
|
|
|
|
|
if needs_new_id:
|
|
|
|
|
|
attempts = 0
|
|
|
while True:
|
|
|
candidate_id = "".join(random.choices(character_set, k=id_length))
|
|
|
|
|
|
if (
|
|
|
candidate_id not in existing_ids
|
|
|
and candidate_id not in generated_ids_set
|
|
|
):
|
|
|
generated_ids_set.add(candidate_id)
|
|
|
boxes["id"] = candidate_id
|
|
|
num_filled += 1
|
|
|
break
|
|
|
attempts += 1
|
|
|
|
|
|
if attempts > len(boxes) * 100 + 1000:
|
|
|
raise RuntimeError(
|
|
|
f"Failed to generate a unique ID after {attempts} attempts. Check ID length or existing IDs."
|
|
|
)
|
|
|
|
|
|
if num_filled > 0:
|
|
|
pass
|
|
|
|
|
|
else:
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
return data_input
|
|
|
|
|
|
|
|
|
def fill_missing_box_ids_each_box(data_input: Dict) -> Dict:
|
|
|
"""
|
|
|
Generates unique alphanumeric IDs for bounding boxes in a list
|
|
|
where the 'id' is missing, blank, or not a 12-character string.
|
|
|
|
|
|
Args:
|
|
|
data_input (Dict): The input dictionary containing 'image' and 'boxes' keys.
|
|
|
'boxes' should be a list of dictionaries, each potentially
|
|
|
with an 'id' key.
|
|
|
|
|
|
Returns:
|
|
|
Dict: The input dictionary with missing/invalid box IDs filled.
|
|
|
Note: The function modifies the input dictionary in place.
|
|
|
"""
|
|
|
|
|
|
if not isinstance(data_input, dict):
|
|
|
raise TypeError("Input 'data_input' must be a dictionary.")
|
|
|
if "boxes" not in data_input or not isinstance(data_input.get("boxes"), list):
|
|
|
|
|
|
return data_input
|
|
|
|
|
|
boxes_list = data_input["boxes"]
|
|
|
id_length = 12
|
|
|
character_set = string.ascii_letters + string.digits
|
|
|
|
|
|
|
|
|
|
|
|
existing_ids = set()
|
|
|
for box in boxes_list:
|
|
|
if isinstance(box, dict):
|
|
|
box_id = box.get("id")
|
|
|
if isinstance(box_id, str) and len(box_id) == id_length:
|
|
|
existing_ids.add(box_id)
|
|
|
|
|
|
|
|
|
generated_ids_this_run = set()
|
|
|
num_filled = 0
|
|
|
|
|
|
for box in boxes_list:
|
|
|
if not isinstance(box, dict):
|
|
|
continue
|
|
|
|
|
|
box_id = box.get("id")
|
|
|
|
|
|
|
|
|
needs_new_id = (
|
|
|
box_id is None
|
|
|
or not isinstance(box_id, str)
|
|
|
or box_id.strip() == ""
|
|
|
or len(box_id) != id_length
|
|
|
)
|
|
|
|
|
|
if needs_new_id:
|
|
|
|
|
|
while True:
|
|
|
candidate_id = "".join(random.choices(character_set, k=id_length))
|
|
|
|
|
|
if (
|
|
|
candidate_id not in existing_ids
|
|
|
and candidate_id not in generated_ids_this_run
|
|
|
):
|
|
|
generated_ids_this_run.add(candidate_id)
|
|
|
box["id"] = candidate_id
|
|
|
num_filled += 1
|
|
|
break
|
|
|
|
|
|
if num_filled > 0:
|
|
|
print(f"Successfully filled {num_filled} missing or invalid box IDs.")
|
|
|
|
|
|
|
|
|
return data_input
|
|
|
|
|
|
|
|
|
def fill_missing_ids(
|
|
|
df: pd.DataFrame, column_name: str = "id", length: int = 12
|
|
|
) -> pd.DataFrame:
|
|
|
"""
|
|
|
Optimized: Generates unique alphanumeric IDs for rows in a DataFrame column
|
|
|
where the value is missing (NaN, None) or an empty/whitespace string.
|
|
|
|
|
|
Args:
|
|
|
df (pd.DataFrame): The input Pandas DataFrame.
|
|
|
column_name (str): The name of the column to check and fill (defaults to 'id').
|
|
|
This column will be added if it doesn't exist.
|
|
|
length (int): The desired length of the generated IDs (defaults to 12).
|
|
|
|
|
|
Returns:
|
|
|
pd.DataFrame: The DataFrame with missing/empty IDs filled in the specified column.
|
|
|
Note: The function modifies the DataFrame directly (in-place).
|
|
|
"""
|
|
|
|
|
|
|
|
|
if not isinstance(df, pd.DataFrame):
|
|
|
raise TypeError("Input 'df' must be a Pandas DataFrame.")
|
|
|
if not isinstance(column_name, str) or not column_name:
|
|
|
raise ValueError("'column_name' must be a non-empty string.")
|
|
|
if not isinstance(length, int) or length <= 0:
|
|
|
raise ValueError("'length' must be a positive integer.")
|
|
|
|
|
|
|
|
|
original_dtype = None
|
|
|
if column_name not in df.columns:
|
|
|
|
|
|
|
|
|
df[column_name] = None
|
|
|
|
|
|
original_dtype = object
|
|
|
else:
|
|
|
original_dtype = df[column_name].dtype
|
|
|
|
|
|
|
|
|
|
|
|
is_null = df[column_name].isna()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_empty_str = pd.Series(False, index=df.index)
|
|
|
if not is_null.all():
|
|
|
temp_str_col = df.loc[~is_null, column_name].astype(str).str.strip()
|
|
|
is_empty_str.loc[~is_null] = temp_str_col == ""
|
|
|
|
|
|
|
|
|
is_missing_or_empty = is_null | is_empty_str
|
|
|
|
|
|
rows_to_fill_index = df.index[is_missing_or_empty]
|
|
|
num_needed = len(rows_to_fill_index)
|
|
|
|
|
|
if num_needed == 0:
|
|
|
|
|
|
if pd.api.types.is_object_dtype(original_dtype) or pd.api.types.is_string_dtype(
|
|
|
original_dtype
|
|
|
):
|
|
|
pass
|
|
|
else:
|
|
|
|
|
|
pass
|
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
valid_rows = df.loc[~is_missing_or_empty, column_name]
|
|
|
|
|
|
valid_rows = valid_rows.dropna()
|
|
|
|
|
|
if not pd.api.types.is_object_dtype(
|
|
|
valid_rows.dtype
|
|
|
) and not pd.api.types.is_string_dtype(valid_rows.dtype):
|
|
|
existing_ids = set(valid_rows.astype(str).str.strip())
|
|
|
else:
|
|
|
existing_ids = set(
|
|
|
valid_rows.astype(str).str.strip()
|
|
|
)
|
|
|
|
|
|
|
|
|
existing_ids.discard("")
|
|
|
|
|
|
|
|
|
character_set = string.ascii_letters + string.digits
|
|
|
generated_ids_set = set()
|
|
|
new_ids_list = list()
|
|
|
|
|
|
max_possible_ids = len(character_set) ** length
|
|
|
if num_needed > max_possible_ids:
|
|
|
raise ValueError(
|
|
|
f"Cannot generate {num_needed} unique IDs with length {length}. Maximum possible is {max_possible_ids}."
|
|
|
)
|
|
|
|
|
|
|
|
|
max_attempts_per_id = max(1000, num_needed * 10)
|
|
|
|
|
|
|
|
|
for i in range(num_needed):
|
|
|
attempts = 0
|
|
|
while True:
|
|
|
candidate_id = "".join(random.choices(character_set, k=length))
|
|
|
|
|
|
if (
|
|
|
candidate_id not in existing_ids
|
|
|
and candidate_id not in generated_ids_set
|
|
|
):
|
|
|
generated_ids_set.add(candidate_id)
|
|
|
new_ids_list.append(candidate_id)
|
|
|
break
|
|
|
attempts += 1
|
|
|
if attempts > max_attempts_per_id:
|
|
|
raise RuntimeError(
|
|
|
f"Failed to generate a unique ID after {attempts} attempts. Check length, character set, or density of existing IDs."
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not pd.api.types.is_object_dtype(
|
|
|
original_dtype
|
|
|
) and not pd.api.types.is_string_dtype(original_dtype):
|
|
|
df["id"] = df["id"].astype(str, errors="ignore")
|
|
|
|
|
|
|
|
|
df.loc[rows_to_fill_index, column_name] = new_ids_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
def convert_review_df_to_annotation_json(
|
|
|
review_file_df: pd.DataFrame,
|
|
|
image_paths: List[str],
|
|
|
page_sizes: List[
|
|
|
Dict
|
|
|
],
|
|
|
xmin="xmin",
|
|
|
xmax="xmax",
|
|
|
ymin="ymin",
|
|
|
ymax="ymax",
|
|
|
) -> List[Dict]:
|
|
|
"""
|
|
|
Optimized function to convert review DataFrame to Gradio Annotation JSON format.
|
|
|
|
|
|
Ensures absolute coordinates, handles missing IDs, deduplicates based on key fields,
|
|
|
selects final columns, and structures data per image/page based on page_sizes.
|
|
|
|
|
|
Args:
|
|
|
review_file_df: Input DataFrame with annotation data.
|
|
|
image_paths: List of image file paths (Note: currently unused if page_sizes provides paths).
|
|
|
page_sizes: REQUIRED list of dictionaries, each containing 'page',
|
|
|
'image_path', 'image_width', and 'image_height'. Defines
|
|
|
output structure and dimensions for coordinate conversion.
|
|
|
xmin, xmax, ymin, ymax: Names of the coordinate columns.
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries suitable for Gradio Annotation output, one dict per image/page.
|
|
|
"""
|
|
|
base_cols = ["xmin", "xmax", "ymin", "ymax", "text", "id", "label"]
|
|
|
|
|
|
for col in base_cols:
|
|
|
if col not in review_file_df.columns:
|
|
|
review_file_df[col] = pd.NA
|
|
|
|
|
|
review_file_df = review_file_df.dropna(
|
|
|
subset=["xmin", "xmax", "ymin", "ymax", "text", "id", "label"], how="all"
|
|
|
)
|
|
|
|
|
|
if not page_sizes:
|
|
|
raise ValueError("page_sizes argument is required and cannot be empty.")
|
|
|
|
|
|
|
|
|
try:
|
|
|
page_sizes_df = pd.DataFrame(page_sizes)
|
|
|
required_ps_cols = {"page", "image_path", "image_width", "image_height"}
|
|
|
if not required_ps_cols.issubset(page_sizes_df.columns):
|
|
|
missing = required_ps_cols - set(page_sizes_df.columns)
|
|
|
raise ValueError(f"page_sizes is missing required keys: {missing}")
|
|
|
|
|
|
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce")
|
|
|
page_sizes_df["image_width"] = pd.to_numeric(
|
|
|
page_sizes_df["image_width"], errors="coerce"
|
|
|
)
|
|
|
page_sizes_df["image_height"] = pd.to_numeric(
|
|
|
page_sizes_df["image_height"], errors="coerce"
|
|
|
)
|
|
|
|
|
|
page_sizes_df["page"] = page_sizes_df["page"].astype("Int64")
|
|
|
|
|
|
except Exception as e:
|
|
|
raise ValueError(f"Error processing page_sizes: {e}") from e
|
|
|
|
|
|
|
|
|
if review_file_df.empty:
|
|
|
print(
|
|
|
"Input review_file_df is empty. Proceeding to generate JSON structure with empty boxes."
|
|
|
)
|
|
|
|
|
|
for col in [xmin, xmax, ymin, ymax, "page", "label", "color", "id", "text"]:
|
|
|
if col not in review_file_df.columns:
|
|
|
review_file_df[col] = pd.NA
|
|
|
else:
|
|
|
|
|
|
coord_cols_to_check = [
|
|
|
c for c in [xmin, xmax, ymin, ymax] if c in review_file_df.columns
|
|
|
]
|
|
|
needs_multiplication = False
|
|
|
if coord_cols_to_check:
|
|
|
temp_df_numeric = review_file_df[coord_cols_to_check].apply(
|
|
|
pd.to_numeric, errors="coerce"
|
|
|
)
|
|
|
if (
|
|
|
temp_df_numeric.le(1).any().any()
|
|
|
):
|
|
|
needs_multiplication = True
|
|
|
|
|
|
if needs_multiplication:
|
|
|
|
|
|
review_file_df = multiply_coordinates_by_page_sizes(
|
|
|
review_file_df.copy(),
|
|
|
page_sizes_df,
|
|
|
xmin,
|
|
|
xmax,
|
|
|
ymin,
|
|
|
ymax,
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
|
|
|
cols_to_convert = [
|
|
|
c
|
|
|
for c in [xmin, xmax, ymin, ymax, "page"]
|
|
|
if c in review_file_df.columns
|
|
|
]
|
|
|
for col in cols_to_convert:
|
|
|
review_file_df[col] = pd.to_numeric(
|
|
|
review_file_df[col], errors="coerce"
|
|
|
)
|
|
|
|
|
|
|
|
|
if review_file_df.empty:
|
|
|
print("DataFrame became empty after coordinate processing.")
|
|
|
|
|
|
for col in [xmin, xmax, ymin, ymax, "page", "label", "color", "id", "text"]:
|
|
|
if col not in review_file_df.columns:
|
|
|
review_file_df[col] = pd.NA
|
|
|
|
|
|
|
|
|
review_file_df = fill_missing_ids(review_file_df.copy())
|
|
|
|
|
|
|
|
|
base_dedupe_cols = ["page", xmin, ymin, xmax, ymax, "label", "id"]
|
|
|
|
|
|
cols_for_dedupe = [
|
|
|
col for col in base_dedupe_cols if col in review_file_df.columns
|
|
|
]
|
|
|
|
|
|
if "image" in review_file_df.columns:
|
|
|
cols_for_dedupe.append("image")
|
|
|
|
|
|
|
|
|
|
|
|
for col in ["label", "id"]:
|
|
|
if col in cols_for_dedupe and col not in review_file_df.columns:
|
|
|
|
|
|
print(
|
|
|
f"Warning: Column '{col}' needed for dedupe but not found. Adding NA."
|
|
|
)
|
|
|
review_file_df[col] = ""
|
|
|
|
|
|
if cols_for_dedupe:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
review_file_df = review_file_df.drop_duplicates(subset=cols_for_dedupe)
|
|
|
else:
|
|
|
print("Skipping deduplication: No valid columns found to deduplicate by.")
|
|
|
|
|
|
|
|
|
required_final_cols = [
|
|
|
"page",
|
|
|
"label",
|
|
|
"color",
|
|
|
xmin,
|
|
|
ymin,
|
|
|
xmax,
|
|
|
ymax,
|
|
|
"id",
|
|
|
"text",
|
|
|
]
|
|
|
|
|
|
available_final_cols = [
|
|
|
col for col in required_final_cols if col in review_file_df.columns
|
|
|
]
|
|
|
|
|
|
|
|
|
for col in required_final_cols:
|
|
|
if col not in review_file_df.columns:
|
|
|
print(f"Adding missing final column '{col}' with default value.")
|
|
|
if col in ["label", "id", "text"]:
|
|
|
review_file_df[col] = ""
|
|
|
elif col == "color":
|
|
|
review_file_df[col] = None
|
|
|
else:
|
|
|
review_file_df[col] = pd.NA
|
|
|
available_final_cols.append(col)
|
|
|
|
|
|
|
|
|
review_file_df = review_file_df[available_final_cols]
|
|
|
|
|
|
|
|
|
if not review_file_df.empty:
|
|
|
|
|
|
if "color" in review_file_df.columns:
|
|
|
review_file_df["color"] = review_file_df["color"].apply(
|
|
|
lambda x: tuple(x) if isinstance(x, list) else x
|
|
|
)
|
|
|
|
|
|
if "page" in review_file_df.columns:
|
|
|
review_file_df["page"] = review_file_df["page"].astype("Int64")
|
|
|
|
|
|
|
|
|
if "page" in review_file_df.columns:
|
|
|
grouped_annotations = review_file_df.groupby("page")
|
|
|
group_keys = set(
|
|
|
grouped_annotations.groups.keys()
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
print("Error: 'page' column missing, cannot group annotations.")
|
|
|
grouped_annotations = None
|
|
|
group_keys = set()
|
|
|
|
|
|
|
|
|
json_data = list()
|
|
|
output_cols_for_boxes = [
|
|
|
col
|
|
|
for col in ["label", "color", xmin, ymin, xmax, ymax, "id", "text"]
|
|
|
if col in review_file_df.columns
|
|
|
]
|
|
|
|
|
|
|
|
|
for _, row in page_sizes_df.iterrows():
|
|
|
page_num = row["page"]
|
|
|
pdf_image_path = row["image_path"]
|
|
|
annotation_boxes = list()
|
|
|
|
|
|
|
|
|
|
|
|
if pd.notna(page_num) and page_num in group_keys and grouped_annotations:
|
|
|
try:
|
|
|
page_group_df = grouped_annotations.get_group(page_num)
|
|
|
|
|
|
|
|
|
annotation_boxes = (
|
|
|
page_group_df[output_cols_for_boxes]
|
|
|
.replace({np.nan: None})
|
|
|
.to_dict(orient="records")
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except KeyError:
|
|
|
print(
|
|
|
f"Warning: Group key {page_num} not found despite being in group_keys (should not happen)."
|
|
|
)
|
|
|
annotation_boxes = list()
|
|
|
|
|
|
|
|
|
json_data.append({"image": pdf_image_path, "boxes": annotation_boxes})
|
|
|
|
|
|
return json_data
|
|
|
|