PDFtoCSVForFD / app.py
danial0203's picture
Update app.py
562d36e verified
raw
history blame
4.39 kB
import os
from datasets.filesystems import S3FileSystem
import s3fs
import boto3
from pdf2image import convert_from_path
import csv
from PIL import Image
import gradio as gr
# AWS and S3 Initialization with environment variables
aws_access_key_id = os.getenv('AWS_ACCESS_KEY')
aws_secret_access_key = os.getenv('AWS_SECRET_KEY')
region_name = os.getenv('AWS_REGION')
s3_bucket = os.getenv('AWS_BUCKET')
s3fs = S3FileSystem(key=aws_access_key_id, secret=aws_secret_access_key, region=region_name)
textract_client = boto3.client('textract', region_name=region_name)
def upload_file_to_s3(file_path, bucket, object_name=None):
if object_name is None:
object_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
s3fs.put(file_path, f"s3://{bucket}/{object_name}")
return object_name
except FileNotFoundError:
print("The file was not found")
return None
def process_image(file_path, s3_bucket, textract_client):
s3_object_key = upload_file_to_s3(file_path, s3_bucket)
if not s3_object_key:
return None
response = textract_client.analyze_document(
Document={'S3Object': {'Bucket': s3_bucket, 'Name': s3_object_key}},
FeatureTypes=["TABLES"]
)
return response
def generate_table_csv(tables, blocks_map, csv_output_path):
with open(csv_output_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
for table in tables:
rows = get_rows_columns_map(table, blocks_map)
for row_index, cols in rows.items():
row = []
for col_index in range(1, max(cols.keys()) + 1):
row.append(cols.get(col_index, ""))
writer.writerow(row)
def get_rows_columns_map(table_result, blocks_map):
rows = {}
for relationship in table_result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
cell = blocks_map[child_id]
if 'RowIndex' in cell and 'ColumnIndex' in cell:
row_index = cell['RowIndex']
col_index = cell['ColumnIndex']
if row_index not in rows:
rows[row_index] = {}
rows[row_index][col_index] = get_text(cell, blocks_map)
return rows
def get_text(result, blocks_map):
text = ''
if 'Relationships' in result:
for relationship in result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
word = blocks_map[child_id]
if word['BlockType'] == 'WORD':
text += word['Text'] + ' '
if word['BlockType'] == 'SELECTION_ELEMENT':
if word['SelectionStatus'] == 'SELECTED':
text += 'X '
return text.strip()
def process_file_and_generate_csv(file_path):
# The file_path is directly usable; no need to check for attributes or methods
csv_output_path = "/tmp/output.csv"
if file_path.lower().endswith(('.png', '.jpg', '.jpeg')):
images = [Image.open(file_path)]
else:
# Convert PDF or other supported formats to images
images = convert_from_path(file_path)
for i, image in enumerate(images):
image_path = f"/tmp/image_{i}.jpg"
image.save(image_path, 'JPEG')
response = process_image(image_path, s3_bucket, textract_client)
if response:
blocks = response['Blocks']
blocks_map = {block['Id']: block for block in blocks}
tables = [block for block in blocks if block['BlockType'] == "TABLE"]
generate_table_csv(tables, blocks_map, csv_output_path)
# No need to remove the original file_path; Gradio handles temporary file cleanup
# Return the CSV output path and a success message for Gradio to handle
return csv_output_path, "Processing completed successfully!"
# Gradio Interface
iface = gr.Interface(
fn=process_file_and_generate_csv,
inputs=gr.File(label="Upload your file (PDF, PNG, JPG, TIFF)"),
outputs=[gr.File(label="Download Generated CSV"), "text"],
description="Upload a document to extract tables into a CSV file."
)
if __name__ == "__main__":
iface.launch()