|
|
import ast |
|
|
import datetime |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
from io import StringIO |
|
|
from typing import List |
|
|
|
|
|
import boto3 |
|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import pymupdf |
|
|
from botocore.exceptions import ( |
|
|
ClientError, |
|
|
NoCredentialsError, |
|
|
PartialCredentialsError, |
|
|
TokenRetrievalError, |
|
|
) |
|
|
from gradio import FileData |
|
|
|
|
|
from tools.aws_functions import download_file_from_s3 |
|
|
from tools.config import ( |
|
|
AWS_REGION, |
|
|
DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS, |
|
|
DOCUMENT_REDACTION_BUCKET, |
|
|
INPUT_FOLDER, |
|
|
LOAD_PREVIOUS_TEXTRACT_JOBS_S3, |
|
|
OUTPUT_FOLDER, |
|
|
RUN_AWS_FUNCTIONS, |
|
|
TEXTRACT_JOBS_LOCAL_LOC, |
|
|
TEXTRACT_JOBS_S3_LOC, |
|
|
TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, |
|
|
) |
|
|
from tools.file_conversion import get_input_file_names |
|
|
from tools.helper_functions import get_file_name_without_type, get_textract_file_suffix |
|
|
from tools.secure_path_utils import ( |
|
|
secure_basename, |
|
|
secure_file_write, |
|
|
secure_join, |
|
|
) |
|
|
|
|
|
DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS = int(DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS) |
|
|
|
|
|
|
|
|
def analyse_document_with_textract_api( |
|
|
local_pdf_path: str, |
|
|
s3_input_prefix: str, |
|
|
s3_output_prefix: str, |
|
|
job_df: pd.DataFrame, |
|
|
s3_bucket_name: str = TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, |
|
|
local_output_dir: str = OUTPUT_FOLDER, |
|
|
handwrite_signature_checkbox: List[str] = list(), |
|
|
successful_job_number: int = 0, |
|
|
total_document_page_count: int = 1, |
|
|
general_s3_bucket_name: str = DOCUMENT_REDACTION_BUCKET, |
|
|
aws_region: str = AWS_REGION, |
|
|
): |
|
|
""" |
|
|
Uploads a local PDF to S3, starts a Textract analysis job (detecting text & signatures), |
|
|
waits for completion, and downloads the output JSON from S3 to a local directory. |
|
|
|
|
|
Args: |
|
|
local_pdf_path (str): Path to the local PDF file. |
|
|
s3_bucket_name (str): Name of the S3 bucket to use. |
|
|
s3_input_prefix (str): S3 prefix (folder) to upload the input PDF. |
|
|
s3_output_prefix (str): S3 prefix (folder) where Textract should write output. |
|
|
job_df (pd.DataFrame): Dataframe containing information from previous Textract API calls. |
|
|
s3_bucket_name (str, optional): S3 bucket in which to save API call outputs. |
|
|
local_output_dir (str, optional): Local directory to save the downloaded JSON results. |
|
|
handwrite_signature_checkbox (List[str], optional): List of feature types to extract from the document. |
|
|
successful_job_number (int): The number of successful jobs that have been submitted in this session. |
|
|
total_document_page_count (int): The number of pages in the document |
|
|
aws_region (str, optional): AWS region name. Defaults to boto3 default region. |
|
|
|
|
|
Returns: |
|
|
str: Path to the downloaded local JSON output file, or None if failed. |
|
|
|
|
|
Raises: |
|
|
FileNotFoundError: If the local_pdf_path does not exist. |
|
|
boto3.exceptions.NoCredentialsError: If AWS credentials are not found. |
|
|
Exception: For other AWS errors or job failures. |
|
|
""" |
|
|
|
|
|
|
|
|
is_a_textract_api_call = True |
|
|
task_textbox = "textract" |
|
|
|
|
|
|
|
|
if isinstance(local_pdf_path, list): |
|
|
local_pdf_path = local_pdf_path[-1] |
|
|
|
|
|
if not os.path.exists(local_pdf_path): |
|
|
raise FileNotFoundError(f"Input document not found {local_pdf_path}") |
|
|
|
|
|
file_extension = os.path.splitext(local_pdf_path)[1].lower() |
|
|
|
|
|
|
|
|
if not total_document_page_count and file_extension in [".pdf"]: |
|
|
print("Page count not provided. Loading PDF to get page count") |
|
|
try: |
|
|
pymupdf_doc = pymupdf.open(local_pdf_path) |
|
|
total_document_page_count = pymupdf_doc.page_count |
|
|
pymupdf_doc.close() |
|
|
print("Page count:", total_document_page_count) |
|
|
except Exception as e: |
|
|
print("Failed to load PDF to get page count:", e, "setting page count to 1") |
|
|
total_document_page_count = 1 |
|
|
|
|
|
else: |
|
|
total_document_page_count = 1 |
|
|
|
|
|
if not os.path.exists(local_output_dir): |
|
|
os.makedirs(local_output_dir) |
|
|
log_message = f"Created local output directory: {local_output_dir}" |
|
|
print(log_message) |
|
|
|
|
|
|
|
|
|
|
|
session = boto3.Session(region_name=aws_region) |
|
|
s3_client = session.client("s3") |
|
|
textract_client = session.client("textract") |
|
|
|
|
|
|
|
|
pdf_filename = secure_basename(local_pdf_path) |
|
|
s3_input_key = secure_join(s3_input_prefix, pdf_filename).replace( |
|
|
"\\", "/" |
|
|
) |
|
|
|
|
|
log_message = ( |
|
|
f"Uploading '{local_pdf_path}' to 's3://{s3_bucket_name}/{s3_input_key}'..." |
|
|
) |
|
|
print(log_message) |
|
|
|
|
|
try: |
|
|
s3_client.upload_file(local_pdf_path, s3_bucket_name, s3_input_key) |
|
|
log_message = "Upload successful." |
|
|
print(log_message) |
|
|
|
|
|
except Exception as e: |
|
|
log_message = f"Failed to upload PDF to S3: {e}" |
|
|
print(log_message) |
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
if not job_df.empty: |
|
|
job_df = job_df.loc[ |
|
|
job_df["job_date_time"] |
|
|
> ( |
|
|
datetime.datetime.now() |
|
|
- datetime.timedelta(days=DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS) |
|
|
), |
|
|
:, |
|
|
] |
|
|
|
|
|
|
|
|
if not job_df.empty: |
|
|
|
|
|
if "file_name" in job_df.columns: |
|
|
matching_job_id_file_names = job_df.loc[ |
|
|
(job_df["file_name"] == pdf_filename) |
|
|
& ( |
|
|
job_df["signature_extraction"].astype(str) |
|
|
== str(handwrite_signature_checkbox) |
|
|
), |
|
|
"file_name", |
|
|
] |
|
|
matching_job_id_file_names_dates = job_df.loc[ |
|
|
(job_df["file_name"] == pdf_filename) |
|
|
& ( |
|
|
job_df["signature_extraction"].astype(str) |
|
|
== str(handwrite_signature_checkbox) |
|
|
), |
|
|
"job_date_time", |
|
|
] |
|
|
matching_job_id = job_df.loc[ |
|
|
(job_df["file_name"] == pdf_filename) |
|
|
& ( |
|
|
job_df["signature_extraction"].astype(str) |
|
|
== str(handwrite_signature_checkbox) |
|
|
), |
|
|
"job_id", |
|
|
] |
|
|
matching_handwrite_signature = job_df.loc[ |
|
|
(job_df["file_name"] == pdf_filename) |
|
|
& ( |
|
|
job_df["signature_extraction"].astype(str) |
|
|
== str(handwrite_signature_checkbox) |
|
|
), |
|
|
"signature_extraction", |
|
|
] |
|
|
|
|
|
if len(matching_job_id) > 0: |
|
|
pass |
|
|
else: |
|
|
matching_job_id = "unknown_job_id" |
|
|
|
|
|
if ( |
|
|
len(matching_job_id_file_names) > 0 |
|
|
and len(matching_handwrite_signature) > 0 |
|
|
): |
|
|
out_message = f"Existing Textract outputs found for file {pdf_filename} from date {matching_job_id_file_names_dates.iloc[0]}. No need to re-analyse. Please download existing results from the list with job ID {matching_job_id.iloc[0]}" |
|
|
gr.Warning(out_message) |
|
|
raise Exception(out_message) |
|
|
|
|
|
|
|
|
message = "Starting Textract document analysis job..." |
|
|
print(message) |
|
|
|
|
|
try: |
|
|
if ( |
|
|
"Extract signatures" in handwrite_signature_checkbox |
|
|
or "Extract forms" in handwrite_signature_checkbox |
|
|
or "Extract layout" in handwrite_signature_checkbox |
|
|
or "Extract tables" in handwrite_signature_checkbox |
|
|
): |
|
|
feature_types = list() |
|
|
if "Extract signatures" in handwrite_signature_checkbox: |
|
|
feature_types.append("SIGNATURES") |
|
|
if "Extract forms" in handwrite_signature_checkbox: |
|
|
feature_types.append("FORMS") |
|
|
if "Extract layout" in handwrite_signature_checkbox: |
|
|
feature_types.append("LAYOUT") |
|
|
if "Extract tables" in handwrite_signature_checkbox: |
|
|
feature_types.append("TABLES") |
|
|
response = textract_client.start_document_analysis( |
|
|
DocumentLocation={ |
|
|
"S3Object": {"Bucket": s3_bucket_name, "Name": s3_input_key} |
|
|
}, |
|
|
FeatureTypes=feature_types, |
|
|
OutputConfig={"S3Bucket": s3_bucket_name, "S3Prefix": s3_output_prefix}, |
|
|
) |
|
|
job_type = "document_analysis" |
|
|
|
|
|
if ( |
|
|
"Extract signatures" not in handwrite_signature_checkbox |
|
|
and "Extract forms" not in handwrite_signature_checkbox |
|
|
and "Extract layout" not in handwrite_signature_checkbox |
|
|
and "Extract tables" not in handwrite_signature_checkbox |
|
|
): |
|
|
response = textract_client.start_document_text_detection( |
|
|
DocumentLocation={ |
|
|
"S3Object": {"Bucket": s3_bucket_name, "Name": s3_input_key} |
|
|
}, |
|
|
OutputConfig={"S3Bucket": s3_bucket_name, "S3Prefix": s3_output_prefix}, |
|
|
) |
|
|
job_type = "document_text_detection" |
|
|
|
|
|
job_id = response["JobId"] |
|
|
print(f"Textract job started with JobId: {job_id}") |
|
|
|
|
|
|
|
|
log_csv_key_location = f"{s3_output_prefix}/textract_document_jobs.csv" |
|
|
|
|
|
StringIO() |
|
|
log_df = pd.DataFrame( |
|
|
[ |
|
|
{ |
|
|
"job_id": job_id, |
|
|
"file_name": pdf_filename, |
|
|
"job_type": job_type, |
|
|
"signature_extraction": handwrite_signature_checkbox, |
|
|
"job_date_time": datetime.datetime.now().strftime( |
|
|
"%Y-%m-%d %H:%M:%S" |
|
|
), |
|
|
} |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
log_file_path = secure_join(local_output_dir, "textract_document_jobs.csv") |
|
|
|
|
|
|
|
|
secure_file_write( |
|
|
local_output_dir, |
|
|
pdf_filename + "_textract_document_jobs_job_id.txt", |
|
|
job_id, |
|
|
) |
|
|
|
|
|
|
|
|
file_exists = os.path.exists(log_file_path) |
|
|
|
|
|
|
|
|
log_df.to_csv(log_file_path, mode="a", index=False, header=not file_exists) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
s3_client.upload_file( |
|
|
log_file_path, general_s3_bucket_name, log_csv_key_location |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
print(f"Job ID written to {log_csv_key_location}") |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
error = f"Failed to start Textract job: {e}" |
|
|
print(error) |
|
|
|
|
|
raise |
|
|
|
|
|
successful_job_number += 1 |
|
|
total_number_of_textract_page_calls = total_document_page_count |
|
|
|
|
|
return ( |
|
|
f"Textract analysis job submitted, job ID:{job_id}", |
|
|
job_id, |
|
|
job_type, |
|
|
successful_job_number, |
|
|
is_a_textract_api_call, |
|
|
total_number_of_textract_page_calls, |
|
|
task_textbox, |
|
|
) |
|
|
|
|
|
|
|
|
def return_job_status( |
|
|
job_id: str, |
|
|
response: dict, |
|
|
attempts: int, |
|
|
poll_interval_seconds: int = 0, |
|
|
max_polling_attempts: int = 1, |
|
|
): |
|
|
""" |
|
|
Polls the AWS Textract service to retrieve the current status of an asynchronous document analysis job. |
|
|
This function checks the job status from the provided response and logs relevant information or errors. |
|
|
|
|
|
Args: |
|
|
job_id (str): The unique identifier of the Textract job. |
|
|
response (dict): The response dictionary received from Textract's `get_document_analysis` or `get_document_text_detection` call. |
|
|
attempts (int): The current polling attempt number. |
|
|
poll_interval_seconds (int, optional): The time in seconds to wait before the next poll (currently unused in this function, but kept for context). Defaults to 0. |
|
|
max_polling_attempts (int, optional): The maximum number of polling attempts allowed (currently unused in this function, but kept for context). Defaults to 1. |
|
|
|
|
|
Returns: |
|
|
str: The current status of the Textract job (e.g., 'IN_PROGRESS', 'SUCCEEDED'). |
|
|
|
|
|
Raises: |
|
|
Exception: If the Textract job status is 'FAILED' or 'PARTIAL_SUCCESS', or if an unexpected status is encountered. |
|
|
""" |
|
|
|
|
|
job_status = response["JobStatus"] |
|
|
logging.info( |
|
|
f"Polling attempt {attempts}/{max_polling_attempts}. Job status: {job_status}" |
|
|
) |
|
|
|
|
|
if job_status == "IN_PROGRESS": |
|
|
pass |
|
|
|
|
|
elif job_status == "SUCCEEDED": |
|
|
logging.info("Textract job succeeded.") |
|
|
elif job_status in ["FAILED", "PARTIAL_SUCCESS"]: |
|
|
status_message = response.get("StatusMessage", "No status message provided.") |
|
|
warnings = response.get("Warnings", []) |
|
|
logging.error( |
|
|
f"Textract job ended with status: {job_status}. Message: {status_message}" |
|
|
) |
|
|
if warnings: |
|
|
logging.warning(f"Warnings: {warnings}") |
|
|
|
|
|
|
|
|
raise Exception( |
|
|
f"Textract job {job_id} failed or partially failed. Status: {job_status}. Message: {status_message}" |
|
|
) |
|
|
else: |
|
|
|
|
|
raise Exception(f"Unexpected Textract job status: {job_status}") |
|
|
|
|
|
return job_status |
|
|
|
|
|
|
|
|
def download_textract_job_files( |
|
|
s3_client: str, |
|
|
s3_bucket_name: str, |
|
|
s3_output_key_prefix: str, |
|
|
pdf_filename: str, |
|
|
job_id: str, |
|
|
local_output_dir: str, |
|
|
handwrite_signature_checkbox: List[str] = list(), |
|
|
): |
|
|
""" |
|
|
Download and combine selected job files from the AWS Textract service. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
list_response = s3_client.list_objects_v2( |
|
|
Bucket=s3_bucket_name, Prefix=s3_output_key_prefix |
|
|
) |
|
|
|
|
|
output_files = list_response.get("Contents", []) |
|
|
if not output_files: |
|
|
|
|
|
|
|
|
|
|
|
list_response = s3_client.list_objects_v2( |
|
|
Bucket=s3_bucket_name, Prefix=s3_output_key_prefix |
|
|
) |
|
|
output_files = list_response.get("Contents", []) |
|
|
|
|
|
if not output_files: |
|
|
logging.error( |
|
|
f"No output files found in s3://{s3_bucket_name}/{s3_output_key_prefix}" |
|
|
) |
|
|
|
|
|
|
|
|
raise FileNotFoundError( |
|
|
f"Textract output files not found in S3 path: s3://{s3_bucket_name}/{s3_output_key_prefix}" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
json_files_to_download = [ |
|
|
f |
|
|
for f in output_files |
|
|
if f["Key"] != s3_output_key_prefix |
|
|
and not f["Key"].endswith("/") |
|
|
and "access_check" not in f["Key"] |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
if not json_files_to_download: |
|
|
error = f"No JSON files found (only prefix marker?) in s3://{s3_bucket_name}/{s3_output_key_prefix}" |
|
|
print(error) |
|
|
|
|
|
raise FileNotFoundError(error) |
|
|
|
|
|
combined_blocks = [] |
|
|
|
|
|
for f in sorted( |
|
|
json_files_to_download, key=lambda x: x["Key"] |
|
|
): |
|
|
obj = s3_client.get_object(Bucket=s3_bucket_name, Key=f["Key"]) |
|
|
data = json.loads(obj["Body"].read()) |
|
|
|
|
|
|
|
|
if "Blocks" in data: |
|
|
combined_blocks.extend(data["Blocks"]) |
|
|
else: |
|
|
logging.warning(f"No 'Blocks' key in file: {f['Key']}") |
|
|
|
|
|
|
|
|
combined_output = { |
|
|
"DocumentMetadata": { |
|
|
"Pages": len(set(block.get("Page", 1) for block in combined_blocks)) |
|
|
}, |
|
|
"Blocks": combined_blocks, |
|
|
"JobStatus": "SUCCEEDED", |
|
|
} |
|
|
|
|
|
output_filename_base = os.path.basename(pdf_filename) |
|
|
output_filename_base_no_ext = os.path.splitext(output_filename_base)[0] |
|
|
|
|
|
textract_suffix = get_textract_file_suffix(handwrite_signature_checkbox) |
|
|
local_output_filename = ( |
|
|
f"{output_filename_base_no_ext}{textract_suffix}_textract.json" |
|
|
) |
|
|
local_output_path = secure_join(local_output_dir, local_output_filename) |
|
|
|
|
|
secure_file_write( |
|
|
local_output_dir, local_output_filename, json.dumps(combined_output) |
|
|
) |
|
|
|
|
|
print(f"Combined Textract output written to {local_output_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
downloaded_file_path = local_output_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return downloaded_file_path |
|
|
|
|
|
|
|
|
def check_for_provided_job_id(job_id: str): |
|
|
if not job_id: |
|
|
raise Exception("Please provide a job ID.") |
|
|
return |
|
|
|
|
|
|
|
|
def load_pdf_job_file_from_s3( |
|
|
load_s3_jobs_input_loc, |
|
|
pdf_filename, |
|
|
local_output_dir, |
|
|
s3_bucket_name, |
|
|
RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, |
|
|
): |
|
|
|
|
|
try: |
|
|
pdf_file_location = "" |
|
|
doc_file_name_no_extension_textbox = "" |
|
|
|
|
|
s3_input_key_prefix = secure_join(load_s3_jobs_input_loc, pdf_filename).replace( |
|
|
"\\", "/" |
|
|
) |
|
|
s3_input_key_prefix = s3_input_key_prefix + ".pdf" |
|
|
|
|
|
local_input_file_path = secure_join(local_output_dir, pdf_filename) |
|
|
local_input_file_path = local_input_file_path + ".pdf" |
|
|
|
|
|
download_file_from_s3( |
|
|
s3_bucket_name, |
|
|
s3_input_key_prefix, |
|
|
local_input_file_path, |
|
|
RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, |
|
|
) |
|
|
|
|
|
pdf_file_location = [local_input_file_path] |
|
|
doc_file_name_no_extension_textbox = get_file_name_without_type(pdf_filename) |
|
|
except Exception as e: |
|
|
print("Could not download PDF job file from S3 due to:", e) |
|
|
|
|
|
return pdf_file_location, doc_file_name_no_extension_textbox |
|
|
|
|
|
|
|
|
def replace_existing_pdf_input_for_whole_document_outputs( |
|
|
load_s3_jobs_input_loc: str, |
|
|
pdf_filename: str, |
|
|
local_output_dir: str, |
|
|
s3_bucket_name: str, |
|
|
in_doc_files: FileData = [], |
|
|
input_folder: str = INPUT_FOLDER, |
|
|
RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, |
|
|
progress=gr.Progress(track_tqdm=True), |
|
|
): |
|
|
|
|
|
progress(0.1, "Loading PDF from s3") |
|
|
|
|
|
if in_doc_files: |
|
|
( |
|
|
doc_file_name_no_extension_textbox, |
|
|
doc_file_name_with_extension_textbox, |
|
|
doc_full_file_name_textbox, |
|
|
doc_file_name_textbox_list, |
|
|
total_pdf_page_count, |
|
|
) = get_input_file_names(in_doc_files) |
|
|
|
|
|
if pdf_filename == doc_file_name_no_extension_textbox: |
|
|
print("Existing loaded PDF file has same name as file from S3") |
|
|
doc_file_name_no_extension_textbox = pdf_filename |
|
|
downloaded_pdf_file_location = in_doc_files |
|
|
else: |
|
|
downloaded_pdf_file_location, doc_file_name_no_extension_textbox = ( |
|
|
load_pdf_job_file_from_s3( |
|
|
load_s3_jobs_input_loc, |
|
|
pdf_filename, |
|
|
local_output_dir, |
|
|
s3_bucket_name, |
|
|
RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, |
|
|
) |
|
|
) |
|
|
|
|
|
( |
|
|
doc_file_name_no_extension_textbox, |
|
|
doc_file_name_with_extension_textbox, |
|
|
doc_full_file_name_textbox, |
|
|
doc_file_name_textbox_list, |
|
|
total_pdf_page_count, |
|
|
) = get_input_file_names(downloaded_pdf_file_location) |
|
|
else: |
|
|
downloaded_pdf_file_location, doc_file_name_no_extension_textbox = ( |
|
|
load_pdf_job_file_from_s3( |
|
|
load_s3_jobs_input_loc, |
|
|
pdf_filename, |
|
|
local_output_dir, |
|
|
s3_bucket_name, |
|
|
RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, |
|
|
) |
|
|
) |
|
|
|
|
|
( |
|
|
doc_file_name_no_extension_textbox, |
|
|
doc_file_name_with_extension_textbox, |
|
|
doc_full_file_name_textbox, |
|
|
doc_file_name_textbox_list, |
|
|
total_pdf_page_count, |
|
|
) = get_input_file_names(downloaded_pdf_file_location) |
|
|
|
|
|
return ( |
|
|
downloaded_pdf_file_location, |
|
|
doc_file_name_no_extension_textbox, |
|
|
doc_file_name_with_extension_textbox, |
|
|
doc_full_file_name_textbox, |
|
|
doc_file_name_textbox_list, |
|
|
total_pdf_page_count, |
|
|
) |
|
|
|
|
|
|
|
|
def poll_whole_document_textract_analysis_progress_and_download( |
|
|
job_id: str, |
|
|
job_type_dropdown: str, |
|
|
s3_output_prefix: str, |
|
|
pdf_filename: str, |
|
|
job_df: pd.DataFrame, |
|
|
s3_bucket_name: str = TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, |
|
|
local_output_dir: str = OUTPUT_FOLDER, |
|
|
load_s3_jobs_loc: str = TEXTRACT_JOBS_S3_LOC, |
|
|
load_local_jobs_loc: str = TEXTRACT_JOBS_LOCAL_LOC, |
|
|
aws_region: str = AWS_REGION, |
|
|
load_jobs_from_s3: str = LOAD_PREVIOUS_TEXTRACT_JOBS_S3, |
|
|
poll_interval_seconds: int = 1, |
|
|
max_polling_attempts: int = 1, |
|
|
DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS: int = DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS, |
|
|
progress=gr.Progress(track_tqdm=True), |
|
|
): |
|
|
""" |
|
|
Poll AWS for the status of a Textract API job. Return status, and if finished, combine and download results into a locally-stored json file for further processing by the app. |
|
|
""" |
|
|
|
|
|
progress(0.1, "Querying AWS Textract for status of document analysis job") |
|
|
|
|
|
if job_id: |
|
|
|
|
|
session = boto3.Session(region_name=aws_region) |
|
|
s3_client = session.client("s3") |
|
|
textract_client = session.client("textract") |
|
|
|
|
|
|
|
|
job_status = "IN_PROGRESS" |
|
|
attempts = 0 |
|
|
|
|
|
message = "Polling Textract for job completion status..." |
|
|
print(message) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
job_df = load_in_textract_job_details( |
|
|
load_s3_jobs=load_jobs_from_s3, |
|
|
load_s3_jobs_loc=load_s3_jobs_loc, |
|
|
load_local_jobs_loc=load_local_jobs_loc, |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
print(f"Failed to update job details dataframe: {e}") |
|
|
|
|
|
|
|
|
while job_status == "IN_PROGRESS" and attempts <= max_polling_attempts: |
|
|
attempts += 1 |
|
|
try: |
|
|
if job_type_dropdown == "document_analysis": |
|
|
response = textract_client.get_document_analysis(JobId=job_id) |
|
|
job_status = return_job_status( |
|
|
job_id, |
|
|
response, |
|
|
attempts, |
|
|
poll_interval_seconds, |
|
|
max_polling_attempts, |
|
|
) |
|
|
elif job_type_dropdown == "document_text_detection": |
|
|
response = textract_client.get_document_text_detection(JobId=job_id) |
|
|
job_status = return_job_status( |
|
|
job_id, |
|
|
response, |
|
|
attempts, |
|
|
poll_interval_seconds, |
|
|
max_polling_attempts, |
|
|
) |
|
|
else: |
|
|
error = "Unknown job type, cannot poll job" |
|
|
print(error) |
|
|
logging.error(error) |
|
|
raise Exception(error) |
|
|
|
|
|
except textract_client.exceptions.InvalidJobIdException: |
|
|
error_message = f"Invalid JobId: {job_id}. This might happen if the job expired (older than {DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS} days) or never existed." |
|
|
print(error_message) |
|
|
logging.error(error_message) |
|
|
raise Exception(error_message) |
|
|
except Exception as e: |
|
|
error_message = ( |
|
|
f"Error while polling Textract status for job {job_id}: {e}" |
|
|
) |
|
|
print(error_message) |
|
|
logging.error(error_message) |
|
|
raise Exception(error_message) |
|
|
|
|
|
downloaded_file_path = None |
|
|
if job_status == "SUCCEEDED": |
|
|
|
|
|
|
|
|
|
|
|
progress(0.5, "Document analysis task outputs found. Downloading from S3") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
handwrite_signature_checkbox = list() |
|
|
if not job_df.empty: |
|
|
if "signature_extraction" in job_df.columns: |
|
|
matching_signature_extraction = job_df.loc[ |
|
|
job_df["job_id"] == job_id, "signature_extraction" |
|
|
] |
|
|
if not matching_signature_extraction.empty: |
|
|
signature_extraction_str = matching_signature_extraction.iloc[0] |
|
|
|
|
|
|
|
|
if isinstance(signature_extraction_str, str): |
|
|
try: |
|
|
handwrite_signature_checkbox = ast.literal_eval( |
|
|
signature_extraction_str |
|
|
) |
|
|
except (ValueError, SyntaxError): |
|
|
|
|
|
handwrite_signature_checkbox = [ |
|
|
signature_extraction_str |
|
|
] |
|
|
elif isinstance(signature_extraction_str, list): |
|
|
handwrite_signature_checkbox = signature_extraction_str |
|
|
|
|
|
if "file_name" in job_df.columns: |
|
|
matching_job_id_file_names = job_df.loc[ |
|
|
job_df["job_id"] == job_id, "file_name" |
|
|
] |
|
|
|
|
|
if pdf_filename and not matching_job_id_file_names.empty: |
|
|
if pdf_filename == matching_job_id_file_names.iloc[0]: |
|
|
out_message = f"Existing Textract outputs found for file {pdf_filename}. No need to re-download." |
|
|
gr.Warning(out_message) |
|
|
raise Exception(out_message) |
|
|
|
|
|
if not matching_job_id_file_names.empty: |
|
|
pdf_filename = matching_job_id_file_names.iloc[0] |
|
|
else: |
|
|
pdf_filename = "unknown_file" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
s3_output_key_prefix = ( |
|
|
secure_join(s3_output_prefix, job_id).replace("\\", "/") + "/" |
|
|
) |
|
|
logging.info( |
|
|
f"Searching for output files in s3://{s3_bucket_name}/{s3_output_key_prefix}" |
|
|
) |
|
|
|
|
|
try: |
|
|
downloaded_file_path = download_textract_job_files( |
|
|
s3_client, |
|
|
s3_bucket_name, |
|
|
s3_output_key_prefix, |
|
|
pdf_filename, |
|
|
job_id, |
|
|
local_output_dir, |
|
|
handwrite_signature_checkbox, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(f"Failed to download or process Textract output from S3: {e}") |
|
|
raise |
|
|
|
|
|
else: |
|
|
raise Exception("No Job ID provided.") |
|
|
|
|
|
output_pdf_filename = get_file_name_without_type(pdf_filename) |
|
|
|
|
|
return downloaded_file_path, job_status, job_df, output_pdf_filename |
|
|
|
|
|
|
|
|
def load_in_textract_job_details( |
|
|
load_s3_jobs: str = LOAD_PREVIOUS_TEXTRACT_JOBS_S3, |
|
|
load_s3_jobs_loc: str = TEXTRACT_JOBS_S3_LOC, |
|
|
load_local_jobs_loc: str = TEXTRACT_JOBS_LOCAL_LOC, |
|
|
document_redaction_bucket: str = DOCUMENT_REDACTION_BUCKET, |
|
|
aws_region: str = AWS_REGION, |
|
|
DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS: int = DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS, |
|
|
): |
|
|
""" |
|
|
Load in a dataframe of jobs previous submitted to the Textract API service. |
|
|
""" |
|
|
job_df = pd.DataFrame( |
|
|
columns=[ |
|
|
"job_id", |
|
|
"file_name", |
|
|
"job_type", |
|
|
"signature_extraction", |
|
|
"job_date_time", |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
session = boto3.Session(region_name=aws_region) |
|
|
s3_client = session.client("s3") |
|
|
|
|
|
local_output_path = f"{load_local_jobs_loc}/textract_document_jobs.csv" |
|
|
|
|
|
if load_s3_jobs == "True": |
|
|
s3_output_key = f"{load_s3_jobs_loc}/textract_document_jobs.csv" |
|
|
|
|
|
try: |
|
|
s3_client.head_object(Bucket=document_redaction_bucket, Key=s3_output_key) |
|
|
|
|
|
s3_client.download_file( |
|
|
document_redaction_bucket, s3_output_key, local_output_path |
|
|
) |
|
|
|
|
|
except ClientError as e: |
|
|
if e.response["Error"]["Code"] == "404": |
|
|
print("Log file does not exist in S3.") |
|
|
else: |
|
|
print(f"Unexpected error occurred: {e}") |
|
|
except (NoCredentialsError, PartialCredentialsError, TokenRetrievalError) as e: |
|
|
print(f"AWS credential issue encountered: {e}") |
|
|
print("Skipping S3 log file download.") |
|
|
|
|
|
|
|
|
if os.path.exists(local_output_path): |
|
|
print("Found log file in local path") |
|
|
job_df = pd.read_csv(local_output_path) |
|
|
|
|
|
if "job_date_time" in job_df.columns: |
|
|
job_df["job_date_time"] = pd.to_datetime( |
|
|
job_df["job_date_time"], errors="coerce" |
|
|
) |
|
|
|
|
|
cutoff_time = pd.Timestamp.now() - pd.Timedelta( |
|
|
days=DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS |
|
|
) |
|
|
job_df = job_df.loc[job_df["job_date_time"] > cutoff_time, :] |
|
|
|
|
|
try: |
|
|
job_df = job_df[ |
|
|
[ |
|
|
"job_id", |
|
|
"file_name", |
|
|
"job_type", |
|
|
"signature_extraction", |
|
|
"job_date_time", |
|
|
] |
|
|
] |
|
|
except Exception as e: |
|
|
print( |
|
|
"Could not find one or more columns in Textract whole document list dataframe:", |
|
|
e, |
|
|
) |
|
|
|
|
|
return job_df |
|
|
|
|
|
|
|
|
def download_textract_output( |
|
|
job_id: str, output_bucket: str, output_prefix: str, local_folder: str |
|
|
): |
|
|
""" |
|
|
Checks the status of a Textract job and downloads the output ZIP file if the job is complete. |
|
|
|
|
|
:param job_id: The Textract job ID. |
|
|
:param output_bucket: The S3 bucket where the output is stored. |
|
|
:param output_prefix: The prefix (folder path) in S3 where the output file is stored. |
|
|
:param local_folder: The local directory where the ZIP file should be saved. |
|
|
""" |
|
|
textract_client = boto3.client("textract") |
|
|
s3_client = boto3.client("s3") |
|
|
|
|
|
|
|
|
while True: |
|
|
response = textract_client.get_document_analysis(JobId=job_id) |
|
|
status = response["JobStatus"] |
|
|
|
|
|
if status == "SUCCEEDED": |
|
|
print("Job completed successfully.") |
|
|
break |
|
|
elif status == "FAILED": |
|
|
print( |
|
|
"Job failed:", |
|
|
response.get("StatusMessage", "No error message provided."), |
|
|
) |
|
|
return |
|
|
else: |
|
|
print(f"Job is still {status}.") |
|
|
|
|
|
|
|
|
|
|
|
output_file_key = f"{output_prefix}/{job_id}.zip" |
|
|
local_file_path = secure_join(local_folder, f"{job_id}.zip") |
|
|
|
|
|
|
|
|
try: |
|
|
s3_client.download_file(output_bucket, output_file_key, local_file_path) |
|
|
print(f"Output file downloaded to: {local_file_path}") |
|
|
except Exception as e: |
|
|
print(f"Error downloading file: {e}") |
|
|
|
|
|
|
|
|
def check_textract_outputs_exist(textract_output_found_checkbox): |
|
|
if textract_output_found_checkbox is True: |
|
|
print("Textract outputs found") |
|
|
return |
|
|
else: |
|
|
raise Exception( |
|
|
"Relevant Textract outputs not found. Please ensure you have selected to correct results output and you have uploaded the relevant document file in 'Choose document or image file...' above" |
|
|
) |
|
|
|