Spaces:
Sleeping
Sleeping
import gradio as gr | |
import google.generativeai as genai | |
import json | |
import pandas as pd | |
from datetime import datetime | |
import os | |
from pathlib import Path | |
from PIL import Image | |
import io | |
import base64 | |
import logging | |
import sys | |
import tempfile | |
# Import and register HEIF support | |
try: | |
from pillow_heif import register_heif_opener | |
register_heif_opener() | |
HEIF_SUPPORTED = True | |
except ImportError: | |
HEIF_SUPPORTED = False | |
logging.warning("pillow-heif not installed. HEIF/HEIC support disabled.") | |
# Import Google Drive functionality | |
from google_funcs import ( | |
get_drive_service, | |
upload_excel_to_exports_folder, | |
upload_image_to_images_folder, | |
list_files_in_folder, | |
download_file_from_drive, | |
get_existing_cumulative_file, | |
cleanup_duplicate_cumulative_files, | |
delete_file_from_drive | |
) | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s', | |
handlers=[ | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Configure AI API | |
logger.info("Configuring AI API") | |
gemini_api_key = os.getenv("Gemini_API") | |
if not gemini_api_key: | |
logger.error("Gemini_API environment variable not found!") | |
logger.error("Please set the Gemini_API environment variable with your AI API key") | |
raise ValueError("β Gemini_API environment variable is required. Please set it in your environment.") | |
genai.configure(api_key=gemini_api_key) | |
logger.info("AI API configured successfully") | |
# Initialize Google Drive service | |
logger.info("Initializing Google Drive service") | |
try: | |
drive_service = get_drive_service() | |
logger.info("Google Drive service initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize Google Drive service: {e}") | |
logger.error("Please ensure GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables are set") | |
raise ValueError("β Google Drive credentials are required. Please set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables.") | |
# Log startup | |
logger.info("Business Card Data Extractor starting up with Google Drive storage") | |
def upload_to_google_drive(file_path, is_excel=False, filename=None): | |
"""Upload a file to Google Drive""" | |
try: | |
if is_excel: | |
logger.info(f"Uploading Excel file to Google Drive: {filename or file_path}") | |
result = upload_excel_to_exports_folder(drive_service, file_path=file_path, filename=filename) | |
else: | |
logger.info(f"Uploading image file to Google Drive: {filename or file_path}") | |
result = upload_image_to_images_folder(drive_service, file_path=file_path, filename=filename) | |
if result: | |
logger.info(f"Successfully uploaded to Google Drive: {result['webViewLink']}") | |
return result | |
else: | |
logger.error("Failed to upload to Google Drive") | |
return None | |
except Exception as e: | |
logger.error(f"Failed to upload to Google Drive: {e}") | |
return None | |
def upload_bytes_to_google_drive(file_data, filename, is_excel=False): | |
"""Upload file data (bytes) to Google Drive""" | |
try: | |
if is_excel: | |
logger.info(f"Uploading Excel data to Google Drive: {filename}") | |
result = upload_excel_to_exports_folder(drive_service, file_data=file_data, filename=filename) | |
else: | |
logger.info(f"Uploading image data to Google Drive: {filename}") | |
result = upload_image_to_images_folder(drive_service, file_data=file_data, filename=filename) | |
if result: | |
logger.info(f"Successfully uploaded to Google Drive: {result['webViewLink']}") | |
return result | |
else: | |
logger.error("Failed to upload to Google Drive") | |
return None | |
except Exception as e: | |
logger.error(f"Failed to upload to Google Drive: {e}") | |
return None | |
def extract_business_card_data_batch(images, filenames, model_name="gemini-2.5-flash"): | |
"""Extract data from multiple business card images in a single API call""" | |
logger.info(f"Starting batch extraction for {len(images)} images using model: {model_name}") | |
logger.debug(f"Filenames in batch: {filenames}") | |
# Load prompts | |
logger.debug("Loading prompt templates") | |
try: | |
with open("prompts/prompt.txt", "r", encoding="utf-8") as f: | |
prompt_template = f.read() | |
logger.debug(f"Loaded prompt template ({len(prompt_template)} characters)") | |
with open("prompts/system_prompt.txt", "r", encoding="utf-8") as f: | |
system_prompt = f.read() | |
logger.debug(f"Loaded system prompt ({len(system_prompt)} characters)") | |
except FileNotFoundError as e: | |
logger.error(f"Failed to load prompt files: {e}") | |
raise | |
# Configure model | |
logger.debug(f"Configuring AI model: {model_name}") | |
generation_config = { | |
"temperature": 0.1, | |
"response_mime_type": "application/json" | |
} | |
try: | |
model = genai.GenerativeModel( | |
model_name=model_name, | |
generation_config=generation_config, | |
system_instruction=system_prompt | |
) | |
logger.debug("AI model configured successfully") | |
except Exception as e: | |
logger.error(f"Failed to configure AI model: {e}") | |
raise | |
# Prepare multiple images for the model | |
logger.debug("Preparing content parts for API request") | |
content_parts = [] | |
# Add the prompt first | |
batch_prompt = f""" | |
{prompt_template} | |
I'm sending you {len(images)} business card images. Please extract the data from each card and return a JSON array with {len(images)} objects. Each object should contain the extracted data for one business card in the same order as the images. | |
Return format: [card1_data, card2_data, card3_data, ...] | |
""" | |
content_parts.append(batch_prompt) | |
logger.debug(f"Added batch prompt ({len(batch_prompt)} characters)") | |
# Add each image | |
logger.debug("Converting and adding images to request") | |
for i, image in enumerate(images): | |
try: | |
buffered = io.BytesIO() | |
image.save(buffered, format="PNG") | |
img_base64 = base64.b64encode(buffered.getvalue()).decode() | |
image_part = { | |
"mime_type": "image/png", | |
"data": img_base64 | |
} | |
content_parts.append(f"Business Card {i+1}:") | |
content_parts.append(image_part) | |
logger.debug(f"Added image {i+1} ({len(img_base64)} base64 characters)") | |
except Exception as e: | |
logger.error(f"Failed to process image {i+1} ({filenames[i] if i < len(filenames) else 'unknown'}): {e}") | |
raise | |
# Generate content | |
logger.info(f"Making API call to {model_name} with {len(content_parts)} content parts") | |
try: | |
response = model.generate_content(content_parts) | |
logger.info(f"API call successful. Response length: {len(response.text) if response.text else 0} characters") | |
logger.debug(f"Raw response: {response.text[:500]}..." if len(response.text) > 500 else f"Raw response: {response.text}") | |
except Exception as e: | |
logger.error(f"API call failed: {e}") | |
raise | |
# Parse response | |
logger.debug("Parsing JSON response") | |
try: | |
# Parse JSON response | |
response_data = json.loads(response.text) | |
logger.info(f"Successfully parsed JSON response") | |
# Ensure we got an array | |
if not isinstance(response_data, list): | |
logger.debug("Response is not an array, converting to array") | |
response_data = [response_data] | |
logger.info(f"Response contains {len(response_data)} extracted card data objects") | |
# Add metadata to each card's data | |
logger.debug("Adding metadata to extracted data") | |
for i, data in enumerate(response_data): | |
# Use user-friendly model name for Excel | |
data['method'] = "Speed-Optimized model" if "flash" in model_name else "Accuracy-Optimized model" | |
if i < len(filenames): | |
data['filename'] = filenames[i] | |
logger.debug(f"Added metadata to card {i+1}: {filenames[i]}") | |
logger.info(f"Batch extraction completed successfully for {len(response_data)} cards") | |
return response_data | |
except json.JSONDecodeError as e: | |
logger.warning(f"Initial JSON parsing failed: {e}. Attempting to clean response.") | |
# Try to clean the response | |
text = response.text.strip() | |
if text.startswith("```json"): | |
text = text[7:] | |
logger.debug("Removed ```json prefix") | |
if text.endswith("```"): | |
text = text[:-3] | |
logger.debug("Removed ``` suffix") | |
try: | |
response_data = json.loads(text.strip()) | |
logger.info("Successfully parsed cleaned JSON response") | |
# Ensure we got an array | |
if not isinstance(response_data, list): | |
logger.debug("Cleaned response is not an array, converting to array") | |
response_data = [response_data] | |
logger.info(f"Cleaned response contains {len(response_data)} extracted card data objects") | |
# Add metadata to each card's data | |
logger.debug("Adding metadata to cleaned extracted data") | |
for i, data in enumerate(response_data): | |
# Use user-friendly model name for Excel | |
data['method'] = "Speed-Optimized model" if "flash" in model_name else "Accuracy-Optimized model" | |
if i < len(filenames): | |
data['filename'] = filenames[i] | |
logger.debug(f"Added metadata to cleaned card {i+1}: {filenames[i]}") | |
logger.info(f"Batch extraction completed successfully after cleaning for {len(response_data)} cards") | |
return response_data | |
except json.JSONDecodeError as e2: | |
logger.error(f"Failed to parse even cleaned JSON response: {e2}") | |
logger.error(f"Cleaned text: {text[:1000]}...") | |
raise | |
def extract_business_card_data(image, model_name="gemini-2.5-flash"): | |
"""Extract data from single business card image - legacy function""" | |
logger.debug(f"Single card extraction called with model: {model_name}") | |
result = extract_business_card_data_batch([image], ["single_card"], model_name) | |
if result: | |
logger.debug("Single card extraction successful") | |
return result[0] | |
else: | |
logger.warning("Single card extraction returned no results") | |
return None | |
def convert_image_for_processing(image, filename): | |
"""Convert image to RGB JPEG format for better compatibility""" | |
try: | |
# Convert to RGB if necessary (HEIF images might be in different modes) | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Create a buffer for the converted image | |
buffer = io.BytesIO() | |
image.save(buffer, format='JPEG', quality=95) | |
buffer.seek(0) | |
# Return the converted image | |
return Image.open(buffer) | |
except Exception as e: | |
logger.warning(f"Could not convert {filename}: {str(e)}. Using original image.") | |
return image | |
def process_business_cards(images, model_name="gemini-2.5-flash", save_images=True): | |
"""Process multiple business card images and create both current run and cumulative Excel files""" | |
logger.info(f"Starting business card processing session") | |
logger.info(f"Number of images received: {len(images) if images else 0}") | |
logger.info(f"Model selected: {model_name}") | |
logger.info(f"Save images option: {save_images}") | |
if not images: | |
logger.warning("No images provided for processing") | |
return None, None, "Please upload at least one business card image.", None | |
all_data = [] | |
errors = [] | |
# Prepare images for batch processing | |
logger.info("Preparing images for batch processing") | |
image_batches = [] | |
filename_batches = [] | |
batch_size = 5 | |
logger.debug(f"Using batch size: {batch_size}") | |
# Load and group images into batches of 5 | |
loaded_images = [] | |
filenames = [] | |
uploaded_image_links = [] | |
logger.info(f"Loading {len(images)} images") | |
for idx, image_path in enumerate(images): | |
try: | |
# Load image | |
if isinstance(image_path, str): | |
logger.debug(f"Loading image {idx+1}: {image_path}") | |
image = Image.open(image_path) | |
filename = os.path.basename(image_path) | |
else: | |
logger.debug(f"Using direct image object {idx+1}") | |
image = image_path | |
filename = f"image_{idx+1}.png" | |
# Convert image for better compatibility (especially for HEIF/HEIC) | |
converted_image = convert_image_for_processing(image, filename) | |
loaded_images.append(converted_image) | |
filenames.append(filename) | |
logger.debug(f"Successfully loaded image {idx+1}: {filename} (size: {converted_image.size})") | |
except Exception as e: | |
error_msg = f"Error loading {image_path}: {str(e)}" | |
logger.error(error_msg) | |
errors.append(error_msg) | |
logger.info(f"Successfully loaded {len(loaded_images)} out of {len(images)} images") | |
# Save images to Google Drive if requested | |
if save_images and loaded_images: | |
logger.info(f"Saving {len(loaded_images)} images to Google Drive") | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
for i, (image, filename) in enumerate(zip(loaded_images, filenames)): | |
try: | |
# Create unique filename with timestamp | |
name, ext = os.path.splitext(filename) | |
if not ext: | |
ext = '.png' | |
unique_filename = f"{timestamp}_{i+1:03d}_{name}{ext}" | |
# Convert image to bytes | |
img_buffer = io.BytesIO() | |
image.save(img_buffer, format='PNG') | |
img_bytes = img_buffer.getvalue() | |
# Upload to Google Drive | |
result = upload_bytes_to_google_drive(img_bytes, unique_filename, is_excel=False) | |
if result: | |
uploaded_image_links.append(result['webViewLink']) | |
logger.debug(f"Saved image {i+1}: {unique_filename}") | |
else: | |
uploaded_image_links.append(None) | |
logger.error(f"Failed to upload image {unique_filename}") | |
except Exception as e: | |
logger.error(f"Failed to save image {filename}: {e}") | |
uploaded_image_links.append(None) | |
logger.info(f"Successfully uploaded {sum(1 for link in uploaded_image_links if link)} images to Google Drive") | |
# Group into batches | |
logger.info(f"Grouping {len(loaded_images)} images into batches of {batch_size}") | |
for i in range(0, len(loaded_images), batch_size): | |
batch_images = loaded_images[i:i + batch_size] | |
batch_filenames = filenames[i:i + batch_size] | |
image_batches.append(batch_images) | |
filename_batches.append(batch_filenames) | |
logger.debug(f"Created batch {len(image_batches)} with {len(batch_images)} images: {batch_filenames}") | |
logger.info(f"Created {len(image_batches)} batches for processing") | |
# Process each batch | |
logger.info(f"Starting processing of {len(image_batches)} batches") | |
for batch_idx, (batch_images, batch_filenames) in enumerate(zip(image_batches, filename_batches)): | |
try: | |
logger.info(f"Processing batch {batch_idx + 1}/{len(image_batches)} ({len(batch_images)} cards)") | |
print(f"Processing batch {batch_idx + 1}/{len(image_batches)} ({len(batch_images)} cards)") | |
# Extract data for the entire batch | |
logger.debug(f"Calling batch extraction for batch {batch_idx + 1}") | |
batch_data = extract_business_card_data_batch(batch_images, batch_filenames, model_name) | |
logger.info(f"Batch {batch_idx + 1} extraction completed, got {len(batch_data)} results") | |
# Process each card's data in the batch | |
logger.debug(f"Processing individual card data for batch {batch_idx + 1}") | |
for i, data in enumerate(batch_data): | |
card_filename = batch_filenames[i] if i < len(batch_filenames) else f"card_{i+1}" | |
logger.debug(f"Processing card data for: {card_filename}") | |
# Add timestamp to data | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
data['processed_date'] = timestamp | |
logger.debug(f"Added timestamp {timestamp} to {card_filename}") | |
# Add Google Drive image link if images were saved | |
global_index = batch_idx * batch_size + i | |
if save_images and global_index < len(uploaded_image_links) and uploaded_image_links[global_index]: | |
data['google_drive_image_link'] = uploaded_image_links[global_index] | |
logger.debug(f"Added Google Drive image link for {card_filename}: {uploaded_image_links[global_index]}") | |
else: | |
data['google_drive_image_link'] = None | |
# Handle multiple values (emails, phones) by joining with commas | |
list_fields_processed = [] | |
for key, value in data.items(): | |
if isinstance(value, list): | |
original_count = len(value) | |
data[key] = ', '.join(str(v) for v in value) | |
list_fields_processed.append(f"{key}({original_count})") | |
logger.debug(f"Combined {original_count} {key} values for {card_filename}") | |
if list_fields_processed: | |
logger.debug(f"List fields processed for {card_filename}: {list_fields_processed}") | |
# Combine phone fields if they exist separately | |
if 'mobile_phones' in data and data['mobile_phones']: | |
logger.debug(f"Combining phone fields for {card_filename}") | |
if data.get('phones'): | |
# Combine mobile and regular phones | |
existing_phones = str(data['phones']) if data['phones'] else "" | |
mobile_phones = str(data['mobile_phones']) if data['mobile_phones'] else "" | |
combined = [p for p in [existing_phones, mobile_phones] if p and p != 'null'] | |
data['phones'] = ', '.join(combined) | |
logger.debug(f"Combined phones for {card_filename}: {data['phones']}") | |
else: | |
data['phones'] = data['mobile_phones'] | |
logger.debug(f"Used mobile phones as phones for {card_filename}: {data['phones']}") | |
del data['mobile_phones'] # Remove separate mobile field | |
# Combine address fields if they exist separately | |
if 'street' in data and data['street']: | |
logger.debug(f"Combining address fields for {card_filename}") | |
if data.get('address'): | |
# If both exist, combine them | |
if str(data['street']) != str(data['address']) and data['street'] != 'null': | |
original_address = data['address'] | |
data['address'] = f"{data['street']}, {data['address']}" | |
logger.debug(f"Combined address for {card_filename}: '{data['street']}' + '{original_address}' = '{data['address']}'") | |
else: | |
data['address'] = data['street'] | |
logger.debug(f"Used street as address for {card_filename}: {data['address']}") | |
del data['street'] # Remove separate street field | |
all_data.append(data) | |
logger.debug(f"Added processed data for {card_filename} to results (total: {len(all_data)})") | |
logger.info(f"Completed processing batch {batch_idx + 1}, total cards processed so far: {len(all_data)}") | |
except Exception as e: | |
batch_filenames_str = ', '.join(batch_filenames) | |
error_msg = f"Error processing batch {batch_idx + 1} ({batch_filenames_str}): {str(e)}" | |
logger.error(error_msg) | |
errors.append(error_msg) | |
if not all_data: | |
logger.warning("No data could be extracted from any images") | |
error_summary = "No data could be extracted from the images.\n" + "\n".join(errors) | |
return None, None, error_summary, None | |
logger.info(f"Successfully extracted data from {len(all_data)} business cards") | |
# Create DataFrame for current run | |
logger.info("Creating DataFrame for current run") | |
current_df = pd.DataFrame(all_data) | |
logger.debug(f"Current run DataFrame created with {len(current_df)} rows and {len(current_df.columns)} columns") | |
logger.debug(f"Columns: {list(current_df.columns)}") | |
# Generate timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
logger.debug(f"Generated timestamp: {timestamp}") | |
# Create temporary files for Excel generation | |
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as current_temp: | |
current_temp_path = current_temp.name | |
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as cumulative_temp: | |
cumulative_temp_path = cumulative_temp.name | |
current_filename = f"current_run_{timestamp}.xlsx" | |
cumulative_filename = "all_business_cards_total.xlsx" | |
# Download and merge existing cumulative data from Google Drive | |
logger.info("Checking for existing cumulative file in Google Drive") | |
cumulative_df = current_df # Default to current data | |
try: | |
# Clean up any duplicate cumulative files first | |
duplicates_removed = cleanup_duplicate_cumulative_files(drive_service) | |
if duplicates_removed > 0: | |
logger.info(f"Cleaned up {duplicates_removed} duplicate cumulative files") | |
# Get the existing cumulative file | |
existing_file = get_existing_cumulative_file(drive_service) | |
if existing_file: | |
logger.info(f"Existing cumulative file found: {existing_file['name']} (ID: {existing_file['id']})") | |
# Create temporary file for download | |
with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as existing_temp: | |
existing_temp_path = existing_temp.name | |
# Download existing file | |
if download_file_from_drive(drive_service, existing_file['id'], existing_temp_path): | |
logger.info("Successfully downloaded existing cumulative file") | |
try: | |
# Read existing data | |
existing_df = pd.read_excel(existing_temp_path) | |
logger.info(f"Loaded existing data: {len(existing_df)} rows") | |
# Merge with current data | |
cumulative_df = pd.concat([existing_df, current_df], ignore_index=True) | |
logger.info(f"Merged data: {len(existing_df)} existing + {len(current_df)} new = {len(cumulative_df)} total rows") | |
# Delete the old file from Google Drive since we'll upload a new one | |
delete_file_from_drive(drive_service, existing_file['id']) | |
logger.info("Deleted old cumulative file from Google Drive") | |
except Exception as e: | |
logger.error(f"Failed to read existing Excel file: {e}") | |
logger.info("Using current data only") | |
cumulative_df = current_df | |
finally: | |
# Clean up temporary file | |
try: | |
os.unlink(existing_temp_path) | |
except: | |
pass | |
else: | |
logger.warning("Failed to download existing cumulative file, using current data only") | |
cumulative_df = current_df | |
else: | |
logger.info("No existing cumulative file found, using current data only") | |
cumulative_df = current_df | |
except Exception as e: | |
logger.warning(f"Error handling existing cumulative data: {e}") | |
logger.info("Using current data only") | |
cumulative_df = current_df | |
# Write current run Excel file | |
logger.info(f"Creating current run Excel file: {current_filename}") | |
try: | |
with pd.ExcelWriter(current_temp_path, engine='openpyxl') as writer: | |
current_df.to_excel(writer, index=False, sheet_name='Current Run') | |
logger.debug(f"Written {len(current_df)} rows to 'Current Run' sheet") | |
# Auto-adjust column widths | |
logger.debug("Auto-adjusting column widths for current run file") | |
worksheet = writer.sheets['Current Run'] | |
for column in current_df: | |
column_length = max(current_df[column].astype(str).map(len).max(), len(column)) | |
col_idx = current_df.columns.get_loc(column) | |
final_width = min(column_length + 2, 50) | |
worksheet.column_dimensions[chr(65 + col_idx)].width = final_width | |
logger.info(f"Current run Excel file created locally") | |
# Upload current run file to Google Drive | |
current_result = upload_to_google_drive(current_temp_path, is_excel=True, filename=current_filename) | |
if current_result: | |
logger.info(f"Current run file uploaded to Google Drive: {current_result['webViewLink']}") | |
except Exception as e: | |
logger.error(f"Failed to create current run Excel file: {e}") | |
raise | |
# Write cumulative Excel file | |
logger.info(f"Creating cumulative Excel file: {cumulative_filename}") | |
try: | |
with pd.ExcelWriter(cumulative_temp_path, engine='openpyxl') as writer: | |
cumulative_df.to_excel(writer, index=False, sheet_name='All Business Cards') | |
logger.debug(f"Written {len(cumulative_df)} rows to 'All Business Cards' sheet") | |
# Auto-adjust column widths | |
logger.debug("Auto-adjusting column widths for cumulative file") | |
worksheet = writer.sheets['All Business Cards'] | |
for column in cumulative_df: | |
column_length = max(cumulative_df[column].astype(str).map(len).max(), len(column)) | |
col_idx = cumulative_df.columns.get_loc(column) | |
final_width = min(column_length + 2, 50) | |
worksheet.column_dimensions[chr(65 + col_idx)].width = final_width | |
logger.info(f"Cumulative Excel file created locally") | |
# Upload cumulative file to Google Drive | |
cumulative_result = upload_to_google_drive(cumulative_temp_path, is_excel=True, filename=cumulative_filename) | |
if cumulative_result: | |
logger.info(f"Cumulative file uploaded to Google Drive: {cumulative_result['webViewLink']}") | |
except Exception as e: | |
logger.error(f"Failed to create cumulative Excel file: {e}") | |
raise | |
# Note: Don't delete temp files here - Gradio needs them for download | |
# Gradio will handle cleanup automatically | |
# Create summary message | |
logger.info("Creating summary message") | |
num_batches = len(image_batches) if 'image_batches' in locals() else 1 | |
summary = f"Successfully processed {len(all_data)} business card(s) in {num_batches} batch(es) of up to 5 cards.\n" | |
model_display = "Speed-Optimized model" if "flash" in model_name else "Accuracy-Optimized model" | |
summary += f"π€ AI Model used: {model_display}\n" | |
summary += f"β‘ API calls made: {num_batches} (instead of {len(all_data)})\n" | |
if save_images: | |
num_uploaded = sum(1 for link in uploaded_image_links if link) if 'uploaded_image_links' in locals() else 0 | |
summary += f"πΎ Images uploaded to Google Drive: {num_uploaded} cards\n\n" | |
else: | |
summary += f"πΎ Images uploaded to Google Drive: No (save option was disabled)\n\n" | |
summary += f"π Current run file: {current_filename} (uploaded to Google Drive)\n" | |
summary += f"π Total cumulative file: {cumulative_filename} (uploaded to Google Drive)\n" | |
summary += f"π Total cards in database: {len(cumulative_df)}\n" | |
# Add cleanup information | |
if 'duplicates_removed' in locals() and duplicates_removed > 0: | |
summary += f"π§Ή Cleaned up {duplicates_removed} duplicate cumulative files\n" | |
if 'old_runs_removed' in locals() and old_runs_removed > 0: | |
summary += f"π§Ή Cleaned up {old_runs_removed} old current run files\n" | |
summary += "\n" | |
# Add Google Drive links | |
summary += "π Google Drive Links:\n" | |
if 'current_result' in locals() and current_result: | |
summary += f" π Current Run: {current_result['webViewLink']}\n" | |
if 'cumulative_result' in locals() and cumulative_result: | |
summary += f" π Total Database: {cumulative_result['webViewLink']}\n" | |
summary += f" π Exports Folder: https://drive.google.com/drive/folders/1k5iP4egzLrGJwnHkMhxt9bAkaCiieojO\n" | |
summary += f" πΌοΈ Images Folder: https://drive.google.com/drive/folders/1gd280IqcAzpAFTPeYsZjoBUOU9S7Zx3c\n\n" | |
if errors: | |
logger.warning(f"Encountered {len(errors)} errors during processing") | |
summary += "Errors encountered:\n" + "\n".join(errors) | |
for error in errors: | |
logger.warning(f"Processing error: {error}") | |
else: | |
logger.info("No errors encountered during processing") | |
# Display preview of current run | |
logger.debug("Creating preview DataFrame") | |
preview_df = current_df.head(10) | |
logger.debug(f"Preview contains {len(preview_df)} rows") | |
logger.info("Business card processing session completed successfully") | |
logger.info(f"Session summary - Cards: {len(all_data)}, Batches: {num_batches}, API calls: {num_batches}, Total DB size: {len(cumulative_df)}") | |
# Return the temporary file paths for download (Gradio will handle the download) | |
return current_temp_path, cumulative_temp_path, summary, preview_df | |
# Create Gradio interface | |
logger.info("Creating Gradio interface") | |
with gr.Blocks(title="Business Card Data Extractor") as demo: | |
gr.Markdown( | |
""" | |
# Business Card Data Extractor | |
Upload business card images to extract contact information and export to Excel. | |
Cards are processed in batches of 5 for efficiency (fewer API calls, lower cost). | |
**Two files are generated:** | |
- π **Current Run**: Contains only the cards you just processed | |
- π **Total Database**: Contains ALL cards ever processed (cumulative) | |
**βοΈ Google Drive Storage:** | |
- π Excel files: Automatically uploaded to Google Drive exports folder | |
- πΌοΈ Images: Uploaded to Google Drive images folder (if save option enabled) | |
- π **Direct Links**: Access files directly through provided Google Drive links | |
- π **Organized Folders**: Separate folders for exports and images | |
**π File Access:** | |
- β¬οΈ Download directly from interface buttons (temporary copies) | |
- π Access permanent files via Google Drive links in results | |
- π **Exports Folder**: https://drive.google.com/drive/folders/1k5iP4egzLrGJwnHkMhxt9bAkaCiieojO | |
- πΌοΈ **Images Folder**: https://drive.google.com/drive/folders/1gd280IqcAzpAFTPeYsZjoBUOU9S7Zx3c | |
**βοΈ Google Drive Integration:** | |
- Requires `GOOGLE_CLIENT_ID` and `GOOGLE_CLIENT_SECRET` environment variables | |
- Files are automatically uploaded and organized in predefined folders | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
# Define supported file types including phone formats | |
supported_types = [".jpg", ".jpeg", ".png", ".webp", ".bmp"] | |
if HEIF_SUPPORTED: | |
supported_types.extend([".heif", ".heic"]) | |
image_input = gr.File( | |
label="Upload Business Cards", | |
file_count="multiple", | |
file_types=supported_types | |
) | |
model_selector = gr.Dropdown( | |
choices=[("Accuracy-Optimized model", "gemini-2.5-pro"), ("Speed-Optimized model", "gemini-2.5-flash")], | |
value="gemini-2.5-pro", | |
label="AI Model Selection" | |
) | |
save_images_checkbox = gr.Checkbox( | |
value=True, | |
label="Save Business Card Images" | |
) | |
process_btn = gr.Button("Process Business Cards", variant="primary") | |
with gr.Column(): | |
current_file = gr.File(label="π Download Current Run") | |
total_file = gr.File(label="π Download Total Database") | |
status_output = gr.Textbox(label="Processing Status", lines=5) | |
preview_output = gr.Dataframe(label="Data Preview (Current Run)") | |
# Wrapper function for better error handling and logging | |
def process_with_logging(images, model_name, save_images): | |
"""Wrapper function to add error handling and logging to the main process""" | |
try: | |
logger.info(f"Gradio interface initiated processing request") | |
logger.debug(f"Request parameters - Images: {len(images) if images else 0}, Model: {model_name}, Save Images: {save_images}") | |
return process_business_cards(images, model_name, save_images) | |
except Exception as e: | |
logger.error(f"Unexpected error in Gradio processing: {e}") | |
error_msg = f"An unexpected error occurred: {str(e)}\nPlease check the logs for more details." | |
return None, None, error_msg, None | |
# Handle processing | |
process_btn.click( | |
fn=process_with_logging, | |
inputs=[image_input, model_selector, save_images_checkbox], | |
outputs=[current_file, total_file, status_output, preview_output] | |
) | |
gr.Markdown( | |
""" | |
## Features: | |
- π€ **Model Selection**: Choose between Speed-Optimized model (fast) or Accuracy-Optimized model (accurate) | |
- β‘ **Batch Processing**: Processes 5 cards per API call for efficiency | |
- π **Data Extraction**: Names, emails, phone numbers, addresses, and more | |
- π **Smart Combination**: Multiple emails/phones combined with commas | |
- π **Address Merging**: All phone types and address fields combined | |
- βοΈ **Google Drive Storage**: Automatic upload to organized Drive folders | |
- π **Direct Links**: Instant access to files via Google Drive URLs | |
- π **Dual Output**: Current run + cumulative database files | |
- π **Full Tracking**: Processing date, filename, Google Drive links, and AI model used | |
- π― **One Row Per Card**: Each business card becomes one spreadsheet row | |
""" | |
) | |
# Launch for Hugging Face Spaces deployment | |
logger.info("Starting Gradio demo") | |
# Get password from environment variable for authentication | |
hf_space_password = os.getenv("SPACE_PASSWORD") | |
if hf_space_password: | |
# Launch with password protection | |
logger.info("Launching with password protection enabled") | |
demo.launch( | |
auth=("user", hf_space_password), | |
server_name="0.0.0.0", | |
server_port=7860, | |
ssr_mode=False # Disable SSR to avoid svelte-i18n errors | |
) | |
else: | |
# Launch without password protection | |
logger.warning("SPACE_PASSWORD not set - launching without password protection") | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
ssr_mode=False # Disable SSR to avoid svelte-i18n errors | |
) |