#/export import os import openpyxl as opxl import PyPDF2 from pathlib import Path from pdf2image import convert_from_path import numpy as np from split import * import fastai from fastai.learner import load_learner from fastai.vision.core import PILImage import pandas as pd from collections import OrderedDict import re from google.api_core.exceptions import InternalServerError import shutil from typing import Optional from google.api_core.client_options import ClientOptions from google.cloud import documentai # type: ignore # Make a mini report batch for testing def make_mini_batch(infile, outfile, bs=15): reader = PyPDF2.PdfReader(infile) rand_pgs = list(np.random.choice(len(reader.pages), bs, replace=False)) writer = PyPDF2.PdfWriter() for pg in rand_pgs: page = reader.pages[int(pg)] writer.add_page(page) writer.write(outfile) # Now define a function that outputs a folder of individual .jpgs for a batch report def report_to_jpegs(filename, outfolder): reader = PyPDF2.PdfReader(filename) path = Path(outfolder) if not path.exists(): path.mkdir() for i, page in enumerate(reader.pages): writer = PyPDF2.PdfWriter() dest = (path/f'file{i}.pdf') writer.add_page(page) writer.write(dest) folder_to_img(outfolder) def define_others(folder, classifier): other_files = [] # A list of files to unlink for root, _, filelist in os.walk(folder): if '.ipynb_checkpoints' in root: continue for file in filelist: path = os.path.join(root, file) img = PILImage.create(path) _, idx, _ = classifier.predict(img) if idx.item() == 1: other_files.append(path) return other_files #Importing Boilerplate Documentai code to process a file # [START documentai_process_document] # [START documentai_process_document_processor_version] # TODO(developer): Uncomment these variables before running the sample. # project_id = "YOUR_PROJECT_ID" # location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu" # processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample # file_path = "/path/to/local/pdf" # mime_type = "application/pdf" # Refer to https://cloud.google.com/document-ai/docs/file-types for supported file types # field_mask = "text,entities,pages.pageNumber" # Optional. The fields to return in the Document object. # processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Processor version to use def process_document_sample( project_id: str, location: str, processor_id: str, file_path: str, mime_type: str, field_mask: Optional[str] = None, processor_version_id: Optional[str] = None, ) -> None: # You must set the `api_endpoint` if you use a location other than "us". opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com") client = documentai.DocumentProcessorServiceClient(client_options=opts) if processor_version_id: # The full resource name of the processor version, e.g.: # `projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}` name = client.processor_version_path( project_id, location, processor_id, processor_version_id ) else: # The full resource name of the processor, e.g.: # `projects/{project_id}/locations/{location}/processors/{processor_id}` name = client.processor_path(project_id, location, processor_id) # Read the file into memory with open(file_path, "rb") as image: image_content = image.read() # Load binary data raw_document = documentai.RawDocument(content=image_content, mime_type=mime_type) # For more information: https://cloud.google.com/document-ai/docs/reference/rest/v1/ProcessOptions # Optional: Additional configurations for processing. process_options = documentai.ProcessOptions( # Process only specific pages individual_page_selector=documentai.ProcessOptions.IndividualPageSelector( pages=[1] ) ) # Configure the process request request = documentai.ProcessRequest( name=name, raw_document=raw_document, field_mask=field_mask, process_options=process_options, ) result = client.process_document(request=request) # For a full list of `Document` object attributes, reference this page: # https://cloud.google.com/document-ai/docs/reference/rest/v1/Document document = result.document # Read the text recognition output from the processor # print("The document contains the following text:") # print(document.text) return document # [END documentai_process_document_processor_version] # [END documentai_process_document] # Function that takes in a list of filenames, runs each through google ocr and returns a pandas dataframe of the data def extract_fields(files, fields=[]): # Initialize an empty DataFrame with the specified fields as columns df = pd.DataFrame(columns=fields) for file in files: try: doc = process_document_sample( project_id="573919539759", location="us", processor_id="7b2493d94a089d26", processor_version_id="5e493494e810a1f3", file_path=file, mime_type="image/jpeg" ) # Initialize a dictionary to hold the entity mentions for the current document row_data = {f: None for f in fields} for entity in doc.entities: if entity.type in row_data: row_data[entity.type] = entity.mention_text # Convert the row data to a DataFrame and concatenate it df = pd.concat([df, pd.DataFrame([row_data])], ignore_index=True) except InternalServerError as e: page_num = re.search(r'\d+', file).group() print(f'There was an internal error processing page {page_num}') return df def dataframe_from_reports(folder, columns): files = [] for root, _, filelist in os.walk(folder): if '.ipynb_checkpoints' in root: continue for file in filelist: path = os.path.join(root, file) files.append(path) return extract_fields(files, columns) # a quick function that tells us if we are running in the huggingface space or not def in_space(): return 'SPACE_ID' in os.environ or 'HUGGINGFACE_SPACE_REPOSITORY' in os.environ # Script def script(report, jpeg_foldername = 'images'): # First transform report to a folder of individual images report_to_jpegs(report, jpeg_foldername) # Load in our classifier and use it to define and delete irrelevant files classifier = load_learner('pr_classifier.pkl') others = define_others(jpeg_foldername, classifier) for o in others: Path(o).unlink() # Set credentials for using documentai if in_space(): # get the secret creds = os.getenv('API_JSON_STR') # write to a file so we can use with open('temp.json', 'w') as file: file.write(creds) os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'temp.json' else: os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'quantum-spring-421822-9be9922d589f.json' # Reading in file to get fields variable with open('fields.txt', 'r') as file: fields = file.read().strip().replace("'", "").split(',') fields = [f.replace(' ', '') for f in fields] df = dataframe_from_reports(jpeg_foldername, fields) processed = unpivot_df(df_transform(df)) excel_file = 'out.xlsx' processed.to_excel(excel_file, sheet_name='Processed Data') with pd.ExcelWriter(excel_file, engine='openpyxl', mode='a') as writer: df.to_excel(writer, sheet_name='Raw Data') overwrite_styles(excel_file) shutil.rmtree(jpeg_foldername) return excel_file def process_file(file): # Save the uploaded file to a temporary location temp_file_path = 'temp_report.pdf' with open(temp_file_path, 'wb') as temp_file: temp_file.write(file) # Run the script and get the path to the Excel file excel_file_path = script(temp_file_path) # Clean up the temporary file os.remove(temp_file_path) return excel_file_path def split_names(name): if name is not None: name = name.strip() if pd.isna(name) or name=='': return '', '' full_pattern = r'^([A-Za-z]+)\s([A-Za-z]\s)?([A-Za-z]+)' match = re.match(full_pattern, name) if match: return match.group(1), match.group(3) else: return name, '' def extract_apt(address): if pd.isna(address): return '' address = address.strip() pattern = r'\b(?:Apt|Apartment|#|Unit|Suite)\s+(\w+)\b' match = re.search(pattern, address, re.IGNORECASE) if match: return match.group(0) return '' def liable(val): # Going to take both 118a or 118b if '25' in str(val): return True return False def df_transform(df): # TODO: Split up first and last name for all # TODO: Extract out Apt # if possible # TODO: If there's 25 in A, B have Liable=True else false # df['box_26_name1'], df['box_56_name1'] = df['box_26_name1'].fillna('').fillnan(''), df['box_56_name1'].fillna('').fillnan('') df = df.fillna('') df = df.replace(to_replace='nan', value='') df['box_26_name1'], df['box_56_name1'] = df['box_26_name1'].astype(str), df['box_56_name1'].astype(str) names1_split = df['box_26_name1'].apply(lambda x: pd.Series(split_names(x))) names2_split = df['box_56_name1'].apply(lambda x: pd.Series(split_names(x))) names1_split.columns = ['First Name 1', 'Last Name 1'] names2_split.columns = ['First Name 2', 'Last Name 2'] names = pd.concat([names1_split, names2_split], axis=1) apts1 = df['box_27_street_address1'].apply(lambda x: extract_apt(x)) apts2 = df['box_57_street_address2'].apply(lambda x: extract_apt(x)) apts = pd.concat([apts1, apts2], axis=1) apts.columns = ['Apts 1', 'Apts 2'] liable_a, liable_b = df['box_118a'].apply(lambda x: liable(x)), df['box_118b'].apply(lambda x: liable(x)) df['Liable'] = liable_a | liable_b df = df.join([names, apts]) df = df.drop(columns = ['box_26_name1', 'box_56_name1', 'box_118a', 'box_118b', 'box_119a', 'box_119b']) df = df.rename(columns={'box_27_street_address1': 'Street Address 1', 'box_28_city1': 'City/Zip 1', 'box_57_street_address2': 'Street Address 2', 'box_58_city2': 'City/Zip 2', 'Police_Department_City':'Incident City'}) col_order = ['First Name 1', 'Last Name 1', 'Street Address 1', 'Apts 1','City/Zip 1', 'First Name 2', 'Last Name 2', 'Street Address 2', 'Apts 2', 'City/Zip 2', 'Incident City', 'Liable'] remaining = [col for col in df.columns if col not in col_order] df = df.reindex(columns = col_order+remaining) return df # expects dataframe returned from df_transform function def unpivot_df(df): # Drop this stupid column if 'Unnamed: 0' in df.columns: df = df.drop(columns='Unnamed: 0') # first we separate both dataframes into 2 df1 = (df[['First Name 1', 'Last Name 1', 'Street Address 1', 'Apts 1', 'City/Zip 1', 'Incident City', 'Liable']]).copy() df2 = (df[['First Name 2', 'Last Name 2', 'Street Address 2', 'Apts 2', 'City/Zip 2', 'Incident City', 'Liable']]).copy() # if the first person wasn't liable then this one is df2['Liable'] = ~df2['Liable'] # adding these lines to drop any rows where the person isn't captured df1.dropna(subset=['First Name 1', 'Last Name 1'], how='all', inplace=True) df2.dropna(subset=['First Name 2', 'Last Name 2'], how='all', inplace=True) df1.rename(columns={'First Name 1': 'First Name', 'Last Name 1': 'Last Name', 'Street Address 1': 'Street Address', 'Apts 1': 'Apts','City/Zip 1': 'City/Zip'}, inplace=True) df2.rename(columns={'First Name 2': 'First Name', 'Last Name 2': 'Last Name', 'Street Address 2': 'Street Address', 'Apts 2': 'Apts','City/Zip 2': 'City/Zip'}, inplace=True) return pd.concat([df1, df2]) def fit_cols(filename): wb = opxl.load_workbook(filename) for ws in wb: for col_cells in ws.columns: newlen = max(len(str(cell.value)) for cell in col_cells) col_letter = opxl.utils.get_column_letter(col_cells[0].column) if newlen > 0: ws.column_dimensions[col_letter].width = newlen*1.25+0.2 wb.save(filename) def overwrite_styles(filename): wb = opxl.load_workbook(filename) center_align = opxl.styles.Alignment( horizontal='center', wrapText=True ) border = opxl.styles.Border( left = opxl.styles.Side(style='thin'), right = opxl.styles.Side(style='thin'), top = opxl.styles.Side(style='thin'), bottom = opxl.styles.Side(style='thin') ) for ws in wb: for col in ws.iter_cols(): for cell in col: cell.border = border cell.alignment = center_align wb.save(filename) fit_cols(filename)