Spaces:
Sleeping
Sleeping
#/export | |
import os | |
import openpyxl as opxl | |
import PyPDF2 | |
from pathlib import Path | |
from pdf2image import convert_from_path | |
import numpy as np | |
from split import * | |
import fastai | |
from fastai.learner import load_learner | |
from fastai.vision.core import PILImage | |
import pandas as pd | |
from collections import OrderedDict | |
import re | |
from google.api_core.exceptions import InternalServerError | |
import shutil | |
from typing import Optional | |
from google.api_core.client_options import ClientOptions | |
from google.cloud import documentai # type: ignore | |
# Make a mini report batch for testing | |
def make_mini_batch(infile, outfile, bs=15): | |
reader = PyPDF2.PdfReader(infile) | |
rand_pgs = list(np.random.choice(len(reader.pages), bs, replace=False)) | |
writer = PyPDF2.PdfWriter() | |
for pg in rand_pgs: | |
page = reader.pages[int(pg)] | |
writer.add_page(page) | |
writer.write(outfile) | |
# Now define a function that outputs a folder of individual .jpgs for a batch report | |
def report_to_jpegs(filename, outfolder): | |
reader = PyPDF2.PdfReader(filename) | |
path = Path(outfolder) | |
if not path.exists(): | |
path.mkdir() | |
for i, page in enumerate(reader.pages): | |
writer = PyPDF2.PdfWriter() | |
dest = (path/f'file{i}.pdf') | |
writer.add_page(page) | |
writer.write(dest) | |
folder_to_img(outfolder) | |
def define_others(folder, classifier): | |
other_files = [] # A list of files to unlink | |
for root, _, filelist in os.walk(folder): | |
if '.ipynb_checkpoints' in root: | |
continue | |
for file in filelist: | |
path = os.path.join(root, file) | |
img = PILImage.create(path) | |
_, idx, _ = classifier.predict(img) | |
if idx.item() == 1: | |
other_files.append(path) | |
return other_files | |
#Importing Boilerplate Documentai code to process a file | |
# [START documentai_process_document] | |
# [START documentai_process_document_processor_version] | |
# TODO(developer): Uncomment these variables before running the sample. | |
# project_id = "YOUR_PROJECT_ID" | |
# location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu" | |
# processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample | |
# file_path = "/path/to/local/pdf" | |
# mime_type = "application/pdf" # Refer to https://cloud.google.com/document-ai/docs/file-types for supported file types | |
# field_mask = "text,entities,pages.pageNumber" # Optional. The fields to return in the Document object. | |
# processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Processor version to use | |
def process_document_sample( | |
project_id: str, | |
location: str, | |
processor_id: str, | |
file_path: str, | |
mime_type: str, | |
field_mask: Optional[str] = None, | |
processor_version_id: Optional[str] = None, | |
) -> None: | |
# You must set the `api_endpoint` if you use a location other than "us". | |
opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com") | |
client = documentai.DocumentProcessorServiceClient(client_options=opts) | |
if processor_version_id: | |
# The full resource name of the processor version, e.g.: | |
# `projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}` | |
name = client.processor_version_path( | |
project_id, location, processor_id, processor_version_id | |
) | |
else: | |
# The full resource name of the processor, e.g.: | |
# `projects/{project_id}/locations/{location}/processors/{processor_id}` | |
name = client.processor_path(project_id, location, processor_id) | |
# Read the file into memory | |
with open(file_path, "rb") as image: | |
image_content = image.read() | |
# Load binary data | |
raw_document = documentai.RawDocument(content=image_content, mime_type=mime_type) | |
# For more information: https://cloud.google.com/document-ai/docs/reference/rest/v1/ProcessOptions | |
# Optional: Additional configurations for processing. | |
process_options = documentai.ProcessOptions( | |
# Process only specific pages | |
individual_page_selector=documentai.ProcessOptions.IndividualPageSelector( | |
pages=[1] | |
) | |
) | |
# Configure the process request | |
request = documentai.ProcessRequest( | |
name=name, | |
raw_document=raw_document, | |
field_mask=field_mask, | |
process_options=process_options, | |
) | |
result = client.process_document(request=request) | |
# For a full list of `Document` object attributes, reference this page: | |
# https://cloud.google.com/document-ai/docs/reference/rest/v1/Document | |
document = result.document | |
# Read the text recognition output from the processor | |
# print("The document contains the following text:") | |
# print(document.text) | |
return document | |
# [END documentai_process_document_processor_version] | |
# [END documentai_process_document] | |
# Function that takes in a list of filenames, runs each through google ocr and returns a pandas dataframe of the data | |
def extract_fields(files, fields=[]): | |
# Initialize an empty DataFrame with the specified fields as columns | |
df = pd.DataFrame(columns=fields) | |
for file in files: | |
try: | |
doc = process_document_sample( | |
project_id="573919539759", | |
location="us", | |
processor_id="7b2493d94a089d26", | |
processor_version_id="5e493494e810a1f3", | |
file_path=file, | |
mime_type="image/jpeg" | |
) | |
# Initialize a dictionary to hold the entity mentions for the current document | |
row_data = {f: None for f in fields} | |
for entity in doc.entities: | |
if entity.type in row_data: | |
row_data[entity.type] = entity.mention_text | |
# Convert the row data to a DataFrame and concatenate it | |
df = pd.concat([df, pd.DataFrame([row_data])], ignore_index=True) | |
except InternalServerError as e: | |
page_num = re.search(r'\d+', file).group() | |
print(f'There was an internal error processing page {page_num}') | |
return df | |
def dataframe_from_reports(folder, columns): | |
files = [] | |
for root, _, filelist in os.walk(folder): | |
if '.ipynb_checkpoints' in root: | |
continue | |
for file in filelist: | |
path = os.path.join(root, file) | |
files.append(path) | |
return extract_fields(files, columns) | |
# Script | |
def script(report, jpeg_foldername = 'images'): | |
# First transform report to a folder of individual images | |
report_to_jpegs(report, jpeg_foldername) | |
# Load in our classifier and use it to define and delete irrelevant files | |
classifier = load_learner('pr_classifier.pkl') | |
others = define_others(jpeg_foldername, classifier) | |
for o in others: | |
Path(o).unlink() | |
# Set credentials for using documentai | |
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'quantum-spring-421822-5b13d9d18bde.json' | |
# Reading in file to get fields variable | |
with open('fields.txt', 'r') as file: | |
fields = file.read().strip().replace("'", "").split(',') | |
fields = [f.replace(' ', '') for f in fields] | |
df = dataframe_from_reports(jpeg_foldername, fields) | |
processed = df_transform(df) | |
excel_file = 'out.xlsx' | |
processed.to_excel(excel_file, sheet_name='Processed Data') | |
with pd.ExcelWriter(excel_file, engine='openpyxl', mode='a') as writer: | |
df.to_excel(writer, sheet_name='Raw Data') | |
overwrite_styles(excel_file) | |
shutil.rmtree(jpeg_foldername) | |
return excel_file | |
def process_file(file): | |
# Save the uploaded file to a temporary location | |
temp_file_path = 'temp_report.pdf' | |
with open(temp_file_path, 'wb') as temp_file: | |
temp_file.write(file) | |
# Run the script and get the path to the Excel file | |
excel_file_path = script(temp_file_path) | |
# Clean up the temporary file | |
os.remove(temp_file_path) | |
return excel_file_path | |
def split_names(name): | |
if name is not None: name = name.strip() | |
if pd.isna(name) or name=='': | |
return '', '' | |
full_pattern = r'^([A-Za-z]+)\s([A-Za-z]\s)?([A-Za-z]+)' | |
match = re.match(full_pattern, name) | |
if match: | |
return match.group(1), match.group(3) | |
else: | |
return name, '' | |
def extract_apt(address): | |
if pd.isna(address): return '' | |
address = address.strip() | |
pattern = r'\b(?:Apt|Apartment|#|Unit|Suite)\s+(\w+)\b' | |
match = re.search(pattern, address, re.IGNORECASE) | |
if match: return match.group(0) | |
return '' | |
def liable(val): | |
# Going to take both 118a or 118b | |
if '25' in str(val): | |
return True | |
return False | |
def df_transform(df): | |
# TODO: Split up first and last name for all | |
# TODO: Extract out Apt # if possible | |
# TODO: If there's 25 in A, B have Liable=True else false | |
# df['box_26_name1'], df['box_56_name1'] = df['box_26_name1'].fillna('').fillnan(''), df['box_56_name1'].fillna('').fillnan('') | |
df = df.fillna('') | |
df = df.replace(to_replace='nan', value='') | |
df['box_26_name1'], df['box_56_name1'] = df['box_26_name1'].astype(str), df['box_56_name1'].astype(str) | |
names1_split = df['box_26_name1'].apply(lambda x: pd.Series(split_names(x))) | |
names2_split = df['box_56_name1'].apply(lambda x: pd.Series(split_names(x))) | |
names1_split.columns = ['First Name 1', 'Last Name 1'] | |
names2_split.columns = ['First Name 2', 'Last Name 2'] | |
names = pd.concat([names1_split, names2_split], axis=1) | |
apts1 = df['box_27_street_address1'].apply(lambda x: extract_apt(x)) | |
apts2 = df['box_57_street_address2'].apply(lambda x: extract_apt(x)) | |
apts = pd.concat([apts1, apts2], axis=1) | |
apts.columns = ['Apts 1', 'Apts 2'] | |
liable_a, liable_b = df['box_118a'].apply(lambda x: liable(x)), df['box_118b'].apply(lambda x: liable(x)) | |
df['Liable'] = liable_a | liable_b | |
df = df.join([names, apts]) | |
df = df.drop(columns = ['box_26_name1', 'box_56_name1', 'box_118a', 'box_118b', 'box_119a', 'box_119b']) | |
df = df.rename(columns={'box_27_street_address1': 'Street Address 1', 'box_28_city1': 'City/Zip 1', | |
'box_57_street_address2': 'Street Address 2', 'box_58_city2': 'City/Zip 2', | |
'Police_Department_City':'Incident City'}) | |
col_order = ['First Name 1', 'Last Name 1', 'Street Address 1', 'Apts 1','City/Zip 1', | |
'First Name 2', 'Last Name 2', 'Street Address 2', 'Apts 2', 'City/Zip 2', | |
'Incident City', 'Liable'] | |
remaining = [col for col in df.columns if col not in col_order] | |
df = df.reindex(columns = col_order+remaining) | |
return df | |
def fit_cols(filename): | |
wb = opxl.load_workbook(filename) | |
for ws in wb: | |
for col_cells in ws.columns: | |
newlen = max(len(str(cell.value)) for cell in col_cells) | |
col_letter = opxl.utils.get_column_letter(col_cells[0].column) | |
if newlen > 0: | |
ws.column_dimensions[col_letter].width = newlen*1.25+0.2 | |
wb.save(filename) | |
def overwrite_styles(filename): | |
wb = opxl.load_workbook(filename) | |
center_align = opxl.styles.Alignment( | |
horizontal='center', wrapText=True | |
) | |
border = opxl.styles.Border( | |
left = opxl.styles.Side(style='thin'), | |
right = opxl.styles.Side(style='thin'), | |
top = opxl.styles.Side(style='thin'), | |
bottom = opxl.styles.Side(style='thin') | |
) | |
for ws in wb: | |
for col in ws.iter_cols(): | |
for cell in col: | |
cell.border = border | |
cell.alignment = center_align | |
wb.save(filename) | |
fit_cols(filename) | |