Spaces:
Running
Running
File size: 5,635 Bytes
3a55e0a 1a1ac75 a1ae797 3a55e0a a1ae797 326d072 3a55e0a 326d072 a1ae797 6be9120 a1ae797 3a55e0a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
import google.generativeai as genai
genai.configure(api_key="AIzaSyAP85jSUKncrIGOAhm3Gvo-TYra_e1wmEA")
import os
import pandas as pd
import io
import tempfile
from PyPDF2 import PdfReader
import re
import csv
from PIL import Image
import fitz # PyMuPDF
from PIL import Image
def configure_gemini(api_key: str):
"""Configure Gemini API with the provided key"""
genai.configure(api_key=api_key)
# def pdf_to_images(pdf_bytes: bytes) -> list:
# """Convert PDF bytes to list of PIL Images"""
# return convert_from_bytes(pdf_bytes)
def pdf_to_images(pdf_bytes: bytes) -> list[Image.Image]:
"""Convert PDF to PIL Images using PyMuPDF (no poppler needed)."""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
images = []
for page in doc:
pix = page.get_pixmap()
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
images.append(img)
return images
def analyze_single_document(images: list, prompt: str) -> dict:
"""Analyze a single document and return results"""
model = genai.GenerativeModel('gemini-2.5-pro-exp-03-25')
response = model.generate_content([prompt] + images)
return response.text
def analyze_pdf_directly(pdf_bytes: bytes, prompt: str, model_name: str = "gemini-1.5-pro"):
"""Analyze a PDF directly using Gemini's PDF support"""
model = genai.GenerativeModel(model_name)
# Create a temporary PDF file
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp_file:
tmp_file.write(pdf_bytes)
tmp_file_path = tmp_file.name
try:
# Use the file upload feature
response = model.generate_content(
[prompt, genai.upload_file(tmp_file_path)]
)
print(f"Response: {response}")
return response.text
finally:
# Clean up temporary file
if os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
def extract_response_text(response) -> str:
"""Extract text content from Gemini response object"""
try:
if hasattr(response, 'text'):
return response.text
elif hasattr(response, 'result') and hasattr(response.result, 'candidates'):
for candidate in response.result.candidates:
if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts'):
for part in candidate.content.parts:
if hasattr(part, 'text'):
return part.text
return str(response)
except Exception as e:
print(f"Error extracting response text: {str(e)}")
return str(response)
def extract_csv_from_response(response) -> str:
"""Extract CSV data from Gemini response"""
try:
# Get the text content from the response
response_text = extract_response_text(response)
# Extract CSV content between ```csv markers
csv_match = re.search(r'```csv(.*?)```', response_text, re.DOTALL)
if csv_match:
return csv_match.group(1).strip()
# Fallback: Try to find any CSV-like content
lines = []
in_csv = False
for line in response_text.split('\n'):
if ',' in line and ('Category,' in line or 'Location,' in line):
in_csv = True
if in_csv:
lines.append(line)
if lines:
return '\n'.join(lines)
return response_text # Return full response if no CSV found
except Exception as e:
print(f"Error extracting CSV: {str(e)}")
return response.text if hasattr(response, 'text') else str(response)
def csv_to_dataframe(csv_data: str) -> pd.DataFrame:
"""Convert CSV string to pandas DataFrame with error handling"""
if not csv_data.strip():
return pd.DataFrame()
try:
# Clean line breaks and extra spaces
cleaned_data = "\n".join([line.strip() for line in csv_data.split('\n') if line.strip()])
# Use CSV reader to handle irregular fields
rows = []
reader = csv.reader(io.StringIO(cleaned_data),
delimiter=',',
quotechar='"',
skipinitialspace=True)
header = next(reader)
for row in reader:
if len(row) > len(header):
# Combine extra fields into the last column
row = row[:len(header)-1] + [','.join(row[len(header)-1:])]
rows.append(row)
return pd.DataFrame(rows, columns=header)
except Exception as e:
print(f"CSV conversion error: {str(e)}")
try:
# Fallback to pandas with flexible parsing
return pd.read_csv(io.StringIO(cleaned_data),
on_bad_lines='warn',
engine='python',
quotechar='"',
skipinitialspace=True)
except Exception as fallback_error:
print(f"Fallback conversion failed: {str(fallback_error)}")
return pd.DataFrame()
def save_csv(csv_data: str, filename: str) -> str:
"""Save CSV data to file"""
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
csvfile.write(csv_data.strip())
return filename
def get_pdf_metadata(pdf_bytes: bytes) -> dict:
"""Extract basic PDF metadata"""
reader = PdfReader(io.BytesIO(pdf_bytes))
return {
'page_count': len(reader.pages),
'author': reader.metadata.author if reader.metadata else None,
'title': reader.metadata.title if reader.metadata else None
} |