Spaces:
Sleeping
Sleeping
| import base64 | |
| import streamlit as st | |
| from openai import OpenAI | |
| import os | |
| from dotenv import load_dotenv | |
| import fitz | |
| from PIL import Image | |
| import io | |
| import tempfile | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize OpenAI client | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) | |
| def convert_pdf_to_images(pdf_file): | |
| """Convert PDF to list of images using PyMuPDF""" | |
| images = [] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| tmp_file.write(pdf_file.getvalue()) | |
| pdf_path = tmp_file.name | |
| pdf_document = fitz.open(pdf_path) | |
| for page_number in range(pdf_document.page_count): | |
| page = pdf_document[page_number] | |
| pix = page.get_pixmap() | |
| img_data = pix.tobytes("png") | |
| image = Image.open(io.BytesIO(img_data)) | |
| images.append(image) | |
| pdf_document.close() | |
| os.unlink(pdf_path) | |
| return images | |
| def format_response(text): | |
| """Format the analysis response with clean styling""" | |
| formatted_text = "" | |
| # Split into pages | |
| pages = text.split("Page") | |
| for page_num, page_content in enumerate(pages[1:], 1): # Skip first empty split | |
| formatted_text += f'\n### Page {page_num}\n' | |
| # Process each line | |
| lines = page_content.split('\n') | |
| for line in lines: | |
| # Skip empty lines and lines with asterisks | |
| if line.strip() and not line.strip().startswith('*') and not line.strip().startswith('Here'): | |
| # Remove asterisks and dashes | |
| line = line.replace('**', '').replace('- ', '') | |
| if ':' in line: | |
| label, value = line.split(':', 1) | |
| formatted_text += f'- *{label.strip()}*: {value.strip()}\n' | |
| return formatted_text | |
| def analyze_image(image): | |
| """Analyze image using OpenAI API""" | |
| try: | |
| img_byte_arr = io.BytesIO() | |
| image.save(img_byte_arr, format='PNG') | |
| img_byte_arr = img_byte_arr.getvalue() | |
| base64_image = base64.b64encode(img_byte_arr).decode("utf-8") | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", # Update to the correct model name | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": """Please analyze the image and extract the following information: | |
| - Sender information | |
| - Recipient information | |
| - Container details | |
| - Weights and measurements | |
| - Dates and reference numbers | |
| - Cargo details | |
| Format the response as 'Label: Value' pairs.""" | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{base64_image}" | |
| }, | |
| }, | |
| ], | |
| } | |
| ], | |
| max_tokens=1000 | |
| ) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"An error occurred: {str(e)}" | |
| def main(): | |
| st.set_page_config(page_title="Document Analysis App", layout="wide") | |
| st.title("Document Analysis App") | |
| uploaded_file = st.file_uploader("Upload document (PDF/Image)", type=['pdf', 'png', 'jpg', 'jpeg']) | |
| if uploaded_file is not None: | |
| if uploaded_file.type == "application/pdf": | |
| # Handle PDF | |
| with st.spinner("Processing PDF..."): | |
| images = convert_pdf_to_images(uploaded_file) | |
| if st.button("Extract Information"): | |
| with st.spinner("Analyzing document..."): | |
| all_results = [] | |
| for i, image in enumerate(images, 1): | |
| result = analyze_image(image) | |
| all_results.append(f"Page {i} Information:\n{result}") | |
| combined_results = "\n\n".join(all_results) | |
| st.markdown(format_response(combined_results)) | |
| else: | |
| # Handle single image | |
| image = Image.open(uploaded_file) | |
| if st.button("Extract Information"): | |
| with st.spinner("Analyzing document..."): | |
| result = analyze_image(image) | |
| st.markdown(format_response(result)) | |
| # Call the main function directly (no need for __name__ == "__main__" in Hugging Face Spaces) | |
| main() |