Spaces:
Runtime error
Runtime error
import boto3 | |
import csv | |
import os | |
from botocore.exceptions import NoCredentialsError | |
from pdf2image import convert_from_path | |
from PIL import Image | |
import gradio as gr | |
from io import BytesIO | |
from datasets.filesystems import S3FileSystem | |
import s3fs | |
# AWS Setup | |
aws_access_key_id = os.getenv('AWS_ACCESS_KEY') | |
aws_secret_access_key = os.getenv('AWS_SECRET_KEY') | |
region_name = os.getenv('AWS_REGION') | |
s3_bucket = os.getenv('AWS_BUCKET') | |
# Initialize s3fs with your AWS credentials | |
s3_fs = s3fs.S3FileSystem(key=aws_access_key_id, secret=aws_secret_access_key, client_kwargs={'region_name': region_name}) | |
textract_client = boto3.client('textract', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name) | |
def upload_file_to_s3(file_content, bucket, object_name=None): | |
"""Uploads file to S3 using s3fs.""" | |
if object_name is None: | |
raise ValueError("object_name cannot be None") | |
try: | |
with s3_fs.open(f's3://{bucket}/{object_name}', 'wb') as f: | |
f.write(file_content.read()) | |
return object_name | |
except FileNotFoundError: | |
print("The file was not found") | |
return None | |
except Exception as e: # Catch broader exceptions that may arise from permissions or AWS issues | |
print(f"An error occurred: {e}") | |
return None | |
def process_image(file_content, s3_bucket, textract_client, object_name): | |
s3_object_key = upload_file_to_s3(file_content, s3_bucket, object_name) | |
if not s3_object_key: | |
return None | |
# Call Textract | |
response = textract_client.analyze_document( | |
Document={'S3Object': {'Bucket': s3_bucket, 'Name': s3_object_key}}, | |
FeatureTypes=["TABLES"] | |
) | |
return response | |
def generate_table_csv(tables, blocks_map, writer): | |
for table in tables: | |
rows = get_rows_columns_map(table, blocks_map) | |
for row_index, cols in rows.items(): | |
row = [] | |
for col_index in range(1, max(cols.keys()) + 1): | |
row.append(cols.get(col_index, "")) | |
writer.writerow(row) | |
def get_rows_columns_map(table_result, blocks_map): | |
rows = {} | |
for relationship in table_result['Relationships']: | |
if relationship['Type'] == 'CHILD': | |
for child_id in relationship['Ids']: | |
cell = blocks_map[child_id] | |
if 'RowIndex' in cell and 'ColumnIndex' in cell: | |
row_index = cell['RowIndex'] | |
col_index = cell['ColumnIndex'] | |
if row_index not in rows: | |
rows[row_index] = {} | |
rows[row_index][col_index] = get_text(cell, blocks_map) | |
return rows | |
def get_text(result, blocks_map): | |
text = '' | |
if 'Relationships' in result: | |
for relationship in result['Relationships']: | |
if relationship['Type'] == 'CHILD': | |
for child_id in relationship['Ids']: | |
word = blocks_map[child_id] | |
if word['BlockType'] == 'WORD': | |
text += word['Text'] + ' ' | |
if word['BlockType'] == 'SELECTION_ELEMENT': | |
if word['SelectionStatus'] == 'SELECTED': | |
text += 'X ' | |
return text.strip() | |
def is_image_file(filename): | |
image_file_extensions = ['png', 'jpg', 'jpeg'] | |
return any(filename.lower().endswith(ext) for ext in image_file_extensions) | |
def process_file_and_generate_csv(input_file): | |
output_csv_path = "output.csv" # Output CSV file name | |
file_content = BytesIO(input_file.read()) # Read file content into memory for processing | |
file_content.seek(0) # Go to the start of the file-like object | |
object_name = os.path.basename(input_file.name) | |
# Check if the uploaded file is an image or needs conversion | |
images = [] | |
if is_image_file(object_name): | |
images.append(Image.open(file_content)) | |
file_content.seek(0) # Reset for potential re-use | |
else: | |
# Convert PDF/TIFF to images | |
images.extend(convert_from_path(file_content)) | |
csv_output = BytesIO() | |
writer = csv.writer(csv_output) | |
for i, image in enumerate(images): | |
# Process each image and upload to S3 for Textract processing | |
image_byte_array = BytesIO() | |
image.save(image_byte_array, format='JPEG') | |
image_byte_array.seek(0) | |
response = process_image(image_byte_array, s3_bucket, textract_client, f"{object_name}_{i}.jpg") | |
if response: | |
blocks = response['Blocks'] | |
blocks_map = {block['Id']: block for block in blocks} | |
tables = [block for block in blocks if block['BlockType'] == "TABLE"] | |
generate_table_csv(tables, blocks_map, writer) | |
csv_output.seek(0) # Go to the start of the CSV in-memory file | |
return csv_output, output_csv_path | |
# Gradio Interface | |
iface = gr.Interface( | |
fn=process_file_and_generate_csv, | |
inputs=gr.File(label="Upload your file (PDF, PNG, JPG, TIFF)"), | |
outputs=[gr.File(label="Download Generated CSV"), "text"], | |
description="Upload a document to extract tables into a CSV file." | |
) | |
# Launch the interface | |
iface.launch() | |