danial0203's picture
Update app.py
8f49668 verified
raw
history blame
5.21 kB
import boto3
import csv
import os
from botocore.exceptions import NoCredentialsError
from pdf2image import convert_from_path
from PIL import Image
import gradio as gr
from io import BytesIO
from datasets.filesystems import S3FileSystem
import s3fs
# AWS Setup
aws_access_key_id = os.getenv('AWS_ACCESS_KEY')
aws_secret_access_key = os.getenv('AWS_SECRET_KEY')
region_name = os.getenv('AWS_REGION')
s3_bucket = os.getenv('AWS_BUCKET')
# Initialize s3fs with your AWS credentials
s3_fs = s3fs.S3FileSystem(key=aws_access_key_id, secret=aws_secret_access_key, client_kwargs={'region_name': region_name})
textract_client = boto3.client('textract', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)
def upload_file_to_s3(file_content, bucket, object_name=None):
"""Uploads file to S3 using s3fs."""
if object_name is None:
raise ValueError("object_name cannot be None")
try:
with s3_fs.open(f's3://{bucket}/{object_name}', 'wb') as f:
f.write(file_content.read())
return object_name
except FileNotFoundError:
print("The file was not found")
return None
except Exception as e: # Catch broader exceptions that may arise from permissions or AWS issues
print(f"An error occurred: {e}")
return None
def process_image(file_content, s3_bucket, textract_client, object_name):
s3_object_key = upload_file_to_s3(file_content, s3_bucket, object_name)
if not s3_object_key:
return None
# Call Textract
response = textract_client.analyze_document(
Document={'S3Object': {'Bucket': s3_bucket, 'Name': s3_object_key}},
FeatureTypes=["TABLES"]
)
return response
def generate_table_csv(tables, blocks_map, writer):
for table in tables:
rows = get_rows_columns_map(table, blocks_map)
for row_index, cols in rows.items():
row = []
for col_index in range(1, max(cols.keys()) + 1):
row.append(cols.get(col_index, ""))
writer.writerow(row)
def get_rows_columns_map(table_result, blocks_map):
rows = {}
for relationship in table_result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
cell = blocks_map[child_id]
if 'RowIndex' in cell and 'ColumnIndex' in cell:
row_index = cell['RowIndex']
col_index = cell['ColumnIndex']
if row_index not in rows:
rows[row_index] = {}
rows[row_index][col_index] = get_text(cell, blocks_map)
return rows
def get_text(result, blocks_map):
text = ''
if 'Relationships' in result:
for relationship in result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
word = blocks_map[child_id]
if word['BlockType'] == 'WORD':
text += word['Text'] + ' '
if word['BlockType'] == 'SELECTION_ELEMENT':
if word['SelectionStatus'] == 'SELECTED':
text += 'X '
return text.strip()
def is_image_file(filename):
image_file_extensions = ['png', 'jpg', 'jpeg']
return any(filename.lower().endswith(ext) for ext in image_file_extensions)
def process_file_and_generate_csv(input_file):
output_csv_path = "output.csv" # Output CSV file name
file_content = BytesIO(input_file.read()) # Read file content into memory for processing
file_content.seek(0) # Go to the start of the file-like object
object_name = os.path.basename(input_file.name)
# Check if the uploaded file is an image or needs conversion
images = []
if is_image_file(object_name):
images.append(Image.open(file_content))
file_content.seek(0) # Reset for potential re-use
else:
# Convert PDF/TIFF to images
images.extend(convert_from_path(file_content))
csv_output = BytesIO()
writer = csv.writer(csv_output)
for i, image in enumerate(images):
# Process each image and upload to S3 for Textract processing
image_byte_array = BytesIO()
image.save(image_byte_array, format='JPEG')
image_byte_array.seek(0)
response = process_image(image_byte_array, s3_bucket, textract_client, f"{object_name}_{i}.jpg")
if response:
blocks = response['Blocks']
blocks_map = {block['Id']: block for block in blocks}
tables = [block for block in blocks if block['BlockType'] == "TABLE"]
generate_table_csv(tables, blocks_map, writer)
csv_output.seek(0) # Go to the start of the CSV in-memory file
return csv_output, output_csv_path
# Gradio Interface
iface = gr.Interface(
fn=process_file_and_generate_csv,
inputs=gr.File(label="Upload your file (PDF, PNG, JPG, TIFF)"),
outputs=[gr.File(label="Download Generated CSV"), "text"],
description="Upload a document to extract tables into a CSV file."
)
# Launch the interface
iface.launch()