Report-Parser / fns.py
Sina-Haz
classifier now uses relative path
5eb3e32
raw
history blame contribute delete
No virus
11.7 kB
#/export
import os
import openpyxl as opxl
import PyPDF2
from pathlib import Path
from pdf2image import convert_from_path
import numpy as np
from split import *
import fastai
from fastai.learner import load_learner
from fastai.vision.core import PILImage
import pandas as pd
from collections import OrderedDict
import re
from google.api_core.exceptions import InternalServerError
import shutil
from typing import Optional
from google.api_core.client_options import ClientOptions
from google.cloud import documentai # type: ignore
# Make a mini report batch for testing
def make_mini_batch(infile, outfile, bs=15):
reader = PyPDF2.PdfReader(infile)
rand_pgs = list(np.random.choice(len(reader.pages), bs, replace=False))
writer = PyPDF2.PdfWriter()
for pg in rand_pgs:
page = reader.pages[int(pg)]
writer.add_page(page)
writer.write(outfile)
# Now define a function that outputs a folder of individual .jpgs for a batch report
def report_to_jpegs(filename, outfolder):
reader = PyPDF2.PdfReader(filename)
path = Path(outfolder)
if not path.exists():
path.mkdir()
for i, page in enumerate(reader.pages):
writer = PyPDF2.PdfWriter()
dest = (path/f'file{i}.pdf')
writer.add_page(page)
writer.write(dest)
folder_to_img(outfolder)
def define_others(folder, classifier):
other_files = [] # A list of files to unlink
for root, _, filelist in os.walk(folder):
if '.ipynb_checkpoints' in root:
continue
for file in filelist:
path = os.path.join(root, file)
img = PILImage.create(path)
_, idx, _ = classifier.predict(img)
if idx.item() == 1:
other_files.append(path)
return other_files
#Importing Boilerplate Documentai code to process a file
# [START documentai_process_document]
# [START documentai_process_document_processor_version]
# TODO(developer): Uncomment these variables before running the sample.
# project_id = "YOUR_PROJECT_ID"
# location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu"
# processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample
# file_path = "/path/to/local/pdf"
# mime_type = "application/pdf" # Refer to https://cloud.google.com/document-ai/docs/file-types for supported file types
# field_mask = "text,entities,pages.pageNumber" # Optional. The fields to return in the Document object.
# processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Processor version to use
def process_document_sample(
project_id: str,
location: str,
processor_id: str,
file_path: str,
mime_type: str,
field_mask: Optional[str] = None,
processor_version_id: Optional[str] = None,
) -> None:
# You must set the `api_endpoint` if you use a location other than "us".
opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
client = documentai.DocumentProcessorServiceClient(client_options=opts)
if processor_version_id:
# The full resource name of the processor version, e.g.:
# `projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}`
name = client.processor_version_path(
project_id, location, processor_id, processor_version_id
)
else:
# The full resource name of the processor, e.g.:
# `projects/{project_id}/locations/{location}/processors/{processor_id}`
name = client.processor_path(project_id, location, processor_id)
# Read the file into memory
with open(file_path, "rb") as image:
image_content = image.read()
# Load binary data
raw_document = documentai.RawDocument(content=image_content, mime_type=mime_type)
# For more information: https://cloud.google.com/document-ai/docs/reference/rest/v1/ProcessOptions
# Optional: Additional configurations for processing.
process_options = documentai.ProcessOptions(
# Process only specific pages
individual_page_selector=documentai.ProcessOptions.IndividualPageSelector(
pages=[1]
)
)
# Configure the process request
request = documentai.ProcessRequest(
name=name,
raw_document=raw_document,
field_mask=field_mask,
process_options=process_options,
)
result = client.process_document(request=request)
# For a full list of `Document` object attributes, reference this page:
# https://cloud.google.com/document-ai/docs/reference/rest/v1/Document
document = result.document
# Read the text recognition output from the processor
# print("The document contains the following text:")
# print(document.text)
return document
# [END documentai_process_document_processor_version]
# [END documentai_process_document]
# Function that takes in a list of filenames, runs each through google ocr and returns a pandas dataframe of the data
def extract_fields(files, fields=[]):
# Initialize an empty DataFrame with the specified fields as columns
df = pd.DataFrame(columns=fields)
for file in files:
try:
doc = process_document_sample(
project_id="573919539759",
location="us",
processor_id="7b2493d94a089d26",
processor_version_id="5e493494e810a1f3",
file_path=file,
mime_type="image/jpeg"
)
# Initialize a dictionary to hold the entity mentions for the current document
row_data = {f: None for f in fields}
for entity in doc.entities:
if entity.type in row_data:
row_data[entity.type] = entity.mention_text
# Convert the row data to a DataFrame and concatenate it
df = pd.concat([df, pd.DataFrame([row_data])], ignore_index=True)
except InternalServerError as e:
page_num = re.search(r'\d+', file).group()
print(f'There was an internal error processing page {page_num}')
return df
def dataframe_from_reports(folder, columns):
files = []
for root, _, filelist in os.walk(folder):
if '.ipynb_checkpoints' in root:
continue
for file in filelist:
path = os.path.join(root, file)
files.append(path)
return extract_fields(files, columns)
# Script
def script(report, jpeg_foldername = 'images'):
# First transform report to a folder of individual images
report_to_jpegs(report, jpeg_foldername)
# Load in our classifier and use it to define and delete irrelevant files
classifier = load_learner('pr_classifier.pkl')
others = define_others(jpeg_foldername, classifier)
for o in others:
Path(o).unlink()
# Set credentials for using documentai
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'quantum-spring-421822-5b13d9d18bde.json'
# Reading in file to get fields variable
with open('fields.txt', 'r') as file:
fields = file.read().strip().replace("'", "").split(',')
fields = [f.replace(' ', '') for f in fields]
df = dataframe_from_reports(jpeg_foldername, fields)
processed = df_transform(df)
excel_file = 'out.xlsx'
processed.to_excel(excel_file, sheet_name='Processed Data')
with pd.ExcelWriter(excel_file, engine='openpyxl', mode='a') as writer:
df.to_excel(writer, sheet_name='Raw Data')
overwrite_styles(excel_file)
shutil.rmtree(jpeg_foldername)
return excel_file
def process_file(file):
# Save the uploaded file to a temporary location
temp_file_path = 'temp_report.pdf'
with open(temp_file_path, 'wb') as temp_file:
temp_file.write(file)
# Run the script and get the path to the Excel file
excel_file_path = script(temp_file_path)
# Clean up the temporary file
os.remove(temp_file_path)
return excel_file_path
def split_names(name):
if name is not None: name = name.strip()
if pd.isna(name) or name=='':
return '', ''
full_pattern = r'^([A-Za-z]+)\s([A-Za-z]\s)?([A-Za-z]+)'
match = re.match(full_pattern, name)
if match:
return match.group(1), match.group(3)
else:
return name, ''
def extract_apt(address):
if pd.isna(address): return ''
address = address.strip()
pattern = r'\b(?:Apt|Apartment|#|Unit|Suite)\s+(\w+)\b'
match = re.search(pattern, address, re.IGNORECASE)
if match: return match.group(0)
return ''
def liable(val):
# Going to take both 118a or 118b
if '25' in str(val):
return True
return False
def df_transform(df):
# TODO: Split up first and last name for all
# TODO: Extract out Apt # if possible
# TODO: If there's 25 in A, B have Liable=True else false
# df['box_26_name1'], df['box_56_name1'] = df['box_26_name1'].fillna('').fillnan(''), df['box_56_name1'].fillna('').fillnan('')
df = df.fillna('')
df = df.replace(to_replace='nan', value='')
df['box_26_name1'], df['box_56_name1'] = df['box_26_name1'].astype(str), df['box_56_name1'].astype(str)
names1_split = df['box_26_name1'].apply(lambda x: pd.Series(split_names(x)))
names2_split = df['box_56_name1'].apply(lambda x: pd.Series(split_names(x)))
names1_split.columns = ['First Name 1', 'Last Name 1']
names2_split.columns = ['First Name 2', 'Last Name 2']
names = pd.concat([names1_split, names2_split], axis=1)
apts1 = df['box_27_street_address1'].apply(lambda x: extract_apt(x))
apts2 = df['box_57_street_address2'].apply(lambda x: extract_apt(x))
apts = pd.concat([apts1, apts2], axis=1)
apts.columns = ['Apts 1', 'Apts 2']
liable_a, liable_b = df['box_118a'].apply(lambda x: liable(x)), df['box_118b'].apply(lambda x: liable(x))
df['Liable'] = liable_a | liable_b
df = df.join([names, apts])
df = df.drop(columns = ['box_26_name1', 'box_56_name1', 'box_118a', 'box_118b', 'box_119a', 'box_119b'])
df = df.rename(columns={'box_27_street_address1': 'Street Address 1', 'box_28_city1': 'City/Zip 1',
'box_57_street_address2': 'Street Address 2', 'box_58_city2': 'City/Zip 2',
'Police_Department_City':'Incident City'})
col_order = ['First Name 1', 'Last Name 1', 'Street Address 1', 'Apts 1','City/Zip 1',
'First Name 2', 'Last Name 2', 'Street Address 2', 'Apts 2', 'City/Zip 2',
'Incident City', 'Liable']
remaining = [col for col in df.columns if col not in col_order]
df = df.reindex(columns = col_order+remaining)
return df
def fit_cols(filename):
wb = opxl.load_workbook(filename)
for ws in wb:
for col_cells in ws.columns:
newlen = max(len(str(cell.value)) for cell in col_cells)
col_letter = opxl.utils.get_column_letter(col_cells[0].column)
if newlen > 0:
ws.column_dimensions[col_letter].width = newlen*1.25+0.2
wb.save(filename)
def overwrite_styles(filename):
wb = opxl.load_workbook(filename)
center_align = opxl.styles.Alignment(
horizontal='center', wrapText=True
)
border = opxl.styles.Border(
left = opxl.styles.Side(style='thin'),
right = opxl.styles.Side(style='thin'),
top = opxl.styles.Side(style='thin'),
bottom = opxl.styles.Side(style='thin')
)
for ws in wb:
for col in ws.iter_cols():
for cell in col:
cell.border = border
cell.alignment = center_align
wb.save(filename)
fit_cols(filename)