Spaces:
Sleeping
Sleeping
#/export | |
import os | |
import PyPDF2 | |
from pathlib import Path | |
from pdf2image import convert_from_path | |
import numpy as np | |
from split import * | |
import fastai | |
from fastai.learner import load_learner | |
from fastai.vision.core import PILImage | |
import pandas as pd | |
from collections import OrderedDict | |
import re | |
from google.api_core.exceptions import InternalServerError | |
import shutil | |
from typing import Optional | |
from google.api_core.client_options import ClientOptions | |
from google.cloud import documentai # type: ignore | |
# Make a mini report batch for testing | |
def make_mini_batch(infile, outfile, bs=15): | |
reader = PyPDF2.PdfReader(infile) | |
rand_pgs = list(np.random.choice(len(reader.pages), bs, replace=False)) | |
writer = PyPDF2.PdfWriter() | |
for pg in rand_pgs: | |
page = reader.pages[int(pg)] | |
writer.add_page(page) | |
writer.write(outfile) | |
# Now define a function that outputs a folder of individual .jpgs for a batch report | |
def report_to_jpegs(filename, outfolder): | |
reader = PyPDF2.PdfReader(filename) | |
path = Path(outfolder) | |
if not path.exists(): | |
path.mkdir() | |
for i, page in enumerate(reader.pages): | |
writer = PyPDF2.PdfWriter() | |
dest = (path/f'file{i}.pdf') | |
writer.add_page(page) | |
writer.write(dest) | |
folder_to_img(outfolder) | |
def define_others(folder, classifier): | |
other_files = [] # A list of files to unlink | |
for root, _, filelist in os.walk(folder): | |
if '.ipynb_checkpoints' in root: | |
continue | |
for file in filelist: | |
path = os.path.join(root, file) | |
img = PILImage.create(path) | |
_, idx, _ = classifier.predict(img) | |
if idx.item() == 1: | |
other_files.append(path) | |
return other_files | |
#Importing Boilerplate Documentai code to process a file | |
# [START documentai_process_document] | |
# [START documentai_process_document_processor_version] | |
# TODO(developer): Uncomment these variables before running the sample. | |
# project_id = "YOUR_PROJECT_ID" | |
# location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu" | |
# processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample | |
# file_path = "/path/to/local/pdf" | |
# mime_type = "application/pdf" # Refer to https://cloud.google.com/document-ai/docs/file-types for supported file types | |
# field_mask = "text,entities,pages.pageNumber" # Optional. The fields to return in the Document object. | |
# processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Processor version to use | |
def process_document_sample( | |
project_id: str, | |
location: str, | |
processor_id: str, | |
file_path: str, | |
mime_type: str, | |
field_mask: Optional[str] = None, | |
processor_version_id: Optional[str] = None, | |
) -> None: | |
# You must set the `api_endpoint` if you use a location other than "us". | |
opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com") | |
client = documentai.DocumentProcessorServiceClient(client_options=opts) | |
if processor_version_id: | |
# The full resource name of the processor version, e.g.: | |
# `projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}` | |
name = client.processor_version_path( | |
project_id, location, processor_id, processor_version_id | |
) | |
else: | |
# The full resource name of the processor, e.g.: | |
# `projects/{project_id}/locations/{location}/processors/{processor_id}` | |
name = client.processor_path(project_id, location, processor_id) | |
# Read the file into memory | |
with open(file_path, "rb") as image: | |
image_content = image.read() | |
# Load binary data | |
raw_document = documentai.RawDocument(content=image_content, mime_type=mime_type) | |
# For more information: https://cloud.google.com/document-ai/docs/reference/rest/v1/ProcessOptions | |
# Optional: Additional configurations for processing. | |
process_options = documentai.ProcessOptions( | |
# Process only specific pages | |
individual_page_selector=documentai.ProcessOptions.IndividualPageSelector( | |
pages=[1] | |
) | |
) | |
# Configure the process request | |
request = documentai.ProcessRequest( | |
name=name, | |
raw_document=raw_document, | |
field_mask=field_mask, | |
process_options=process_options, | |
) | |
result = client.process_document(request=request) | |
# For a full list of `Document` object attributes, reference this page: | |
# https://cloud.google.com/document-ai/docs/reference/rest/v1/Document | |
document = result.document | |
# Read the text recognition output from the processor | |
# print("The document contains the following text:") | |
# print(document.text) | |
return document | |
# [END documentai_process_document_processor_version] | |
# [END documentai_process_document] | |
# Function that takes in a list of filenames, runs each through google ocr and returns a pandas dataframe of the data | |
def extract_fields(files, fields=[]): | |
# Initialize an empty DataFrame with the specified fields as columns | |
df = pd.DataFrame(columns=fields) | |
for file in files: | |
try: | |
doc = process_document_sample( | |
project_id="573919539759", | |
location="us", | |
processor_id="7b2493d94a089d26", | |
file_path=file, | |
mime_type="image/jpeg" | |
) | |
# Initialize a dictionary to hold the entity mentions for the current document | |
row_data = {f: None for f in fields} | |
for entity in doc.entities: | |
if entity.type in row_data: | |
row_data[entity.type] = entity.mention_text | |
# Convert the row data to a DataFrame and concatenate it | |
df = pd.concat([df, pd.DataFrame([row_data])], ignore_index=True) | |
except InternalServerError as e: | |
page_num = re.search(r'\d+', file).group() | |
print(f'There was an internal error processing page {page_num}') | |
return df | |
def dataframe_from_reports(folder, columns): | |
files = [] | |
for root, _, filelist in os.walk(folder): | |
if '.ipynb_checkpoints' in root: | |
continue | |
for file in filelist: | |
path = os.path.join(root, file) | |
files.append(path) | |
return extract_fields(files, columns) | |
# Script | |
def script(report, jpeg_foldername = 'images'): | |
# First transform report to a folder of individual images | |
report_to_jpegs(report, jpeg_foldername) | |
# Load in our classifier and use it to define and delete irrelevant files | |
classifier = load_learner('pr_classifier.pkl') | |
others = define_others(jpeg_foldername, classifier) | |
for o in others: | |
Path(o).unlink() | |
# Set credentials for using documentai | |
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'quantum-spring-421822-5b13d9d18bde.json' | |
# Reading in file to get fields variable | |
with open('fields.txt', 'r') as file: | |
fields = file.read().strip().replace("'", "").split(',') | |
fields = [f.replace(' ', '') for f in fields] | |
df = dataframe_from_reports(jpeg_foldername, fields) | |
excel_file = 'out.xlsx' | |
df.to_excel(excel_file, index=False) | |
shutil.rmtree(jpeg_foldername) | |
return excel_file | |
def process_file(file): | |
# Save the uploaded file to a temporary location | |
temp_file_path = 'temp_report.pdf' | |
with open(temp_file_path, 'wb') as temp_file: | |
temp_file.write(file) | |
# Run the script and get the path to the Excel file | |
excel_file_path = script(temp_file_path) | |
# Clean up the temporary file | |
os.remove(temp_file_path) | |
return excel_file_path | |