Spaces:
Running
Running
import os | |
import sys | |
import time | |
import random | |
from enum import Enum | |
from pathlib import Path | |
import json | |
import base64 | |
import logging | |
from functools import lru_cache | |
from typing import Optional, Dict, Any, List, Union, Tuple | |
# Try to import pycountry, provide fallback if not available | |
try: | |
import pycountry | |
PYCOUNTRY_AVAILABLE = True | |
except ImportError: | |
PYCOUNTRY_AVAILABLE = False | |
logging.warning("pycountry module not available - using language code fallback") | |
from pydantic import BaseModel | |
# Try to import Mistral AI, provide fallback if not available | |
try: | |
from mistralai import Mistral | |
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk | |
from mistralai.models import OCRImageObject | |
MISTRAL_AVAILABLE = True | |
except ImportError: | |
MISTRAL_AVAILABLE = False | |
logging.warning("mistralai module not available - OCR functionality will be limited") | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# Import utilities for OCR processing | |
try: | |
from ocr_utils import replace_images_in_markdown, get_combined_markdown | |
except ImportError: | |
# Define fallback functions if module not found | |
def replace_images_in_markdown(markdown_str, images_dict): | |
for img_name, base64_str in images_dict.items(): | |
markdown_str = markdown_str.replace( | |
f"", f"" | |
) | |
return markdown_str | |
def get_combined_markdown(ocr_response): | |
markdowns = [] | |
for page in ocr_response.pages: | |
image_data = {} | |
for img in page.images: | |
image_data[img.id] = img.image_base64 | |
markdowns.append(replace_images_in_markdown(page.markdown, image_data)) | |
return "\n\n".join(markdowns) | |
# Import config directly (now local to historical-ocr) | |
try: | |
from config import MISTRAL_API_KEY, OCR_MODEL, TEXT_MODEL, VISION_MODEL, TEST_MODE | |
except ImportError: | |
# Fallback defaults if config is not available | |
import os | |
MISTRAL_API_KEY = os.environ.get("MISTRAL_API_KEY", "") | |
OCR_MODEL = "mistral-ocr-latest" | |
TEXT_MODEL = "mistral-large-latest" | |
VISION_MODEL = "mistral-large-latest" | |
TEST_MODE = True | |
logging.warning("Config module not found. Using environment variables and defaults.") | |
# Helper function to make OCR objects JSON serializable | |
# Removed caching to fix unhashable type error | |
def serialize_ocr_response(obj): | |
""" | |
Convert OCR response objects to JSON serializable format | |
Optimized for speed and memory usage | |
""" | |
# Fast path: Handle primitive types directly | |
if obj is None or isinstance(obj, (str, int, float, bool)): | |
return obj | |
# Handle collections with optimized recursion | |
if isinstance(obj, list): | |
return [serialize_ocr_response(item) for item in obj] | |
elif isinstance(obj, dict): | |
return {k: serialize_ocr_response(v) for k, v in obj.items()} | |
elif hasattr(obj, '__dict__'): | |
# For OCR objects with __dict__ attribute | |
result = {} | |
for key, value in obj.__dict__.items(): | |
if key.startswith('_'): | |
continue # Skip private attributes | |
# Fast path for OCRImageObject - most common complex object | |
if isinstance(value, OCRImageObject): | |
# Special handling for OCRImageObject with direct attribute access | |
result[key] = { | |
'id': value.id if hasattr(value, 'id') else None, | |
'image_base64': value.image_base64 if hasattr(value, 'image_base64') else None | |
} | |
# Handle collections | |
elif isinstance(value, list): | |
result[key] = [serialize_ocr_response(item) for item in value] | |
# Handle nested objects | |
elif hasattr(value, '__dict__'): | |
result[key] = serialize_ocr_response(value) | |
# Handle primitives and other types | |
else: | |
result[key] = value | |
return result | |
else: | |
return obj | |
# Create language enum for structured output - cache language lookup to avoid repeated processing | |
def get_language_dict(): | |
if PYCOUNTRY_AVAILABLE: | |
return {lang.alpha_2: lang.name for lang in pycountry.languages if hasattr(lang, 'alpha_2')} | |
else: | |
# Fallback with basic languages when pycountry is not available | |
return { | |
"en": "English", | |
"es": "Spanish", | |
"fr": "French", | |
"de": "German", | |
"it": "Italian", | |
"pt": "Portuguese", | |
"ru": "Russian", | |
"zh": "Chinese", | |
"ja": "Japanese", | |
"ar": "Arabic", | |
"hi": "Hindi", | |
"la": "Latin" | |
} | |
class LanguageMeta(Enum.__class__): | |
def __new__(metacls, cls, bases, classdict): | |
languages = get_language_dict() | |
for code, name in languages.items(): | |
classdict[name.upper().replace(' ', '_')] = name | |
return super().__new__(metacls, cls, bases, classdict) | |
class Language(Enum, metaclass=LanguageMeta): | |
pass | |
class StructuredOCRModel(BaseModel): | |
file_name: str | |
topics: list[str] | |
languages: list[Language] | |
ocr_contents: dict | |
class StructuredOCR: | |
def __init__(self, api_key=None): | |
"""Initialize the OCR processor with API key""" | |
# Check if we're running in test mode or if Mistral is not available | |
self.test_mode = TEST_MODE or not MISTRAL_AVAILABLE | |
if not MISTRAL_AVAILABLE: | |
logger = logging.getLogger("api_validator") | |
logger.warning("Mistral AI package not available - running in test mode") | |
self.api_key = "placeholder_key" | |
self.client = None | |
return | |
# Initialize API key - use provided key, or environment var | |
if self.test_mode and not api_key: | |
self.api_key = "placeholder_key" | |
else: | |
self.api_key = api_key or MISTRAL_API_KEY | |
# Ensure we have a valid API key when not in test mode | |
if not self.api_key and not self.test_mode: | |
raise ValueError("No Mistral API key provided. Please set the MISTRAL_API_KEY environment variable or enable TEST_MODE.") | |
# Clean the API key by removing any whitespace | |
self.api_key = self.api_key.strip() | |
# Check if API key exists but don't enforce length requirements | |
if not self.test_mode and not self.api_key: | |
logger = logging.getLogger("api_validator") | |
logger.warning("Warning: No API key provided") | |
# Initialize client with the API key | |
try: | |
self.client = Mistral(api_key=self.api_key) | |
# Skip validation to avoid unnecessary API calls | |
except Exception as e: | |
error_msg = str(e).lower() | |
if "unauthorized" in error_msg or "401" in error_msg: | |
raise ValueError(f"API key authentication failed. Please check your Mistral API key: {str(e)}") | |
else: | |
logger = logging.getLogger("api_validator") | |
logger.warning(f"Failed to initialize Mistral client: {str(e)}") | |
self.test_mode = True | |
self.client = None | |
def process_file(self, file_path, file_type=None, use_vision=True, max_pages=None, file_size_mb=None, custom_pages=None, custom_prompt=None): | |
"""Process a file and return structured OCR results | |
Args: | |
file_path: Path to the file to process | |
file_type: 'pdf' or 'image' (will be auto-detected if None) | |
use_vision: Whether to use vision model for improved analysis | |
max_pages: Optional limit on number of pages to process | |
file_size_mb: Optional file size in MB (used for automatic page limiting) | |
custom_pages: Optional list of specific page numbers to process | |
custom_prompt: Optional instructions for the AI to handle unusual document formatting or specific extraction needs | |
Returns: | |
Dictionary with structured OCR results | |
""" | |
# Convert file_path to Path object if it's a string | |
file_path = Path(file_path) | |
# Auto-detect file type if not provided | |
if file_type is None: | |
suffix = file_path.suffix.lower() | |
file_type = "pdf" if suffix == ".pdf" else "image" | |
# Get file size if not provided | |
if file_size_mb is None and file_path.exists(): | |
file_size_mb = file_path.stat().st_size / (1024 * 1024) # Convert bytes to MB | |
# Check if file exceeds API limits (50 MB) | |
if file_size_mb and file_size_mb > 50: | |
logging.warning(f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB") | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"confidence_score": 0.0, | |
"error": f"File size {file_size_mb:.2f} MB exceeds API limit of 50 MB", | |
"ocr_contents": { | |
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
"partial_text": "Document could not be processed due to size limitations." | |
} | |
} | |
# For PDF files, limit pages based on file size if no explicit limit is given | |
if file_type == "pdf" and file_size_mb and max_pages is None and custom_pages is None: | |
if file_size_mb > 100: # Very large files | |
max_pages = 3 | |
elif file_size_mb > 50: # Large files | |
max_pages = 5 | |
elif file_size_mb > 20: # Medium files | |
max_pages = 10 | |
else: # Small files | |
max_pages = None # Process all pages | |
# Start processing timer | |
start_time = time.time() | |
# Read and process the file | |
if file_type == "pdf": | |
result = self._process_pdf(file_path, use_vision, max_pages, custom_pages, custom_prompt) | |
else: | |
result = self._process_image(file_path, use_vision, custom_prompt) | |
# Add processing time information | |
processing_time = time.time() - start_time | |
result['processing_time'] = processing_time | |
# Add a default confidence score if not present | |
if 'confidence_score' not in result: | |
result['confidence_score'] = 0.85 # Default confidence | |
# Ensure the entire result is fully JSON serializable by running it through our serializer | |
try: | |
# First convert to a standard dict if it's not already | |
if not isinstance(result, dict): | |
result = serialize_ocr_response(result) | |
# Make a final pass to check for any remaining non-serializable objects | |
# Test JSON serialization to catch any remaining issues | |
json.dumps(result) | |
except TypeError as e: | |
# If there's a serialization error, run the whole result through our serializer | |
logger = logging.getLogger("serializer") | |
logger.warning(f"JSON serialization error in result: {str(e)}. Applying full serialization.") | |
result = serialize_ocr_response(result) | |
return result | |
def _process_pdf(self, file_path, use_vision=True, max_pages=None, custom_pages=None, custom_prompt=None): | |
""" | |
Process a PDF file with OCR - optimized version with smart page handling and memory management | |
Args: | |
file_path: Path to the PDF file | |
use_vision: Whether to use vision model for enhanced analysis | |
max_pages: Optional limit on the number of pages to process | |
custom_pages: Optional list of specific page numbers to process | |
custom_prompt: Optional custom prompt for specialized extraction | |
""" | |
logger = logging.getLogger("pdf_processor") | |
logger.info(f"Processing PDF: {file_path}") | |
# Track processing time | |
start_time = time.time() | |
# Fast path: Return placeholder if in test mode | |
if self.test_mode: | |
logger.info("Test mode active, returning placeholder response") | |
# Enhanced test mode placeholder that's more realistic | |
return { | |
"file_name": file_path.name, | |
"topics": ["Historical Document", "Literature", "American History"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"title": "Harper's New Monthly Magazine", | |
"publication_date": "1855", | |
"publisher": "Harper & Brothers, New York", | |
"raw_text": "This is a test mode placeholder for Harper's New Monthly Magazine from 1855. The actual document contains articles on literature, politics, science, and culture from mid-19th century America.", | |
"content": "The magazine includes various literary pieces, poetry, political commentary, and illustrations typical of 19th century periodicals. Known for publishing works by prominent American authors including Herman Melville and Charles Dickens.", | |
"key_figures": ["Herman Melville", "Charles Dickens", "Henry Wadsworth Longfellow"], | |
"noted_articles": ["Continued serialization of popular novels", "Commentary on contemporary political events", "Scientific discoveries and technological advancements"] | |
}, | |
"pdf_processing_method": "enhanced_test_mode", | |
"total_pages": 12, | |
"processed_pages": 3, | |
"processing_time": 0.5, | |
"confidence_score": 0.9 | |
} | |
try: | |
# PDF processing strategy decision based on file size | |
file_size_mb = file_path.stat().st_size / (1024 * 1024) | |
logger.info(f"PDF size: {file_size_mb:.2f} MB") | |
# Always use pdf2image for better control and consistency across all PDF files | |
use_pdf2image = True | |
# First try local PDF processing for better performance and control | |
if use_pdf2image: | |
try: | |
import tempfile | |
from pdf2image import convert_from_path | |
logger.info("Processing PDF using pdf2image for better multi-page handling") | |
# Convert PDF to images with optimized parameters | |
conversion_start = time.time() | |
# Use consistent DPI for all files to ensure reliable results | |
dpi = 200 # Higher quality DPI for all files to ensure better text recognition | |
# Only convert first page initially to check document type | |
pdf_first_page = convert_from_path(file_path, dpi=dpi, first_page=1, last_page=1) | |
logger.info(f"First page converted in {time.time() - conversion_start:.2f}s") | |
# Quick check if PDF has readable content | |
if not pdf_first_page: | |
logger.warning("PDF conversion produced no images, falling back to API") | |
raise Exception("PDF conversion failed to produce images") | |
# Determine total pages in the document | |
# First, try simple estimate from first page conversion | |
total_pages = 1 | |
# Try pdf2image info extraction | |
try: | |
# Try with pdf2image page counting - use simpler parameters | |
logger.info("Determining PDF page count...") | |
count_start = time.time() | |
# Use a lightweight approach with multi-threading for faster processing | |
pdf_info = convert_from_path( | |
file_path, | |
dpi=72, # Low DPI just for info | |
first_page=1, | |
last_page=1, | |
size=(100, 100), # Tiny image to save memory | |
fmt="jpeg", | |
thread_count=4, # Increased thread count for faster processing | |
output_file=None | |
) | |
# Extract page count | |
if hasattr(pdf_info, 'n_pages'): | |
total_pages = pdf_info.n_pages | |
elif isinstance(pdf_info, dict) and "Pages" in pdf_info: | |
total_pages = int(pdf_info.get("Pages", "1")) | |
elif len(pdf_first_page) > 0: | |
# Just estimate based on first page - at least we have one | |
total_pages = 1 | |
logger.info(f"Page count determined in {time.time() - count_start:.2f}s") | |
except Exception as count_error: | |
logger.warning(f"Error determining page count: {str(count_error)}. Using default of 1") | |
total_pages = 1 | |
logger.info(f"PDF has {total_pages} total pages") | |
# Determine which pages to process | |
pages_to_process = [] | |
# Handle custom page selection if provided | |
if custom_pages and any(0 < p <= total_pages for p in custom_pages): | |
# Filter valid page numbers | |
pages_to_process = [p for p in custom_pages if 0 < p <= total_pages] | |
logger.info(f"Processing {len(pages_to_process)} custom-selected pages: {pages_to_process}") | |
# Otherwise use max_pages limit if provided | |
elif max_pages and max_pages < total_pages: | |
pages_to_process = list(range(1, max_pages + 1)) | |
logger.info(f"Processing first {max_pages} pages of {total_pages} total") | |
# Or process all pages if reasonable count | |
elif total_pages <= 10: | |
pages_to_process = list(range(1, total_pages + 1)) | |
logger.info(f"Processing all {total_pages} pages") | |
# For large documents without limits, process subset of pages | |
else: | |
# Smart sampling: first page, last page, and some pages in between | |
pages_to_process = [1] # Always include first page | |
if total_pages > 1: | |
if total_pages <= 5: | |
# For few pages, process all | |
pages_to_process = list(range(1, total_pages + 1)) | |
else: | |
# For many pages, sample intelligently | |
# Add pages from the middle of the document | |
middle = total_pages // 2 | |
# Add last page if more than 3 pages | |
if total_pages > 3: | |
pages_to_process.append(total_pages) | |
# Add up to 3 pages from middle if document is large | |
if total_pages > 5: | |
pages_to_process.append(middle) | |
if total_pages > 10: | |
pages_to_process.append(middle // 2) | |
pages_to_process.append(middle + (middle // 2)) | |
# Sort pages for sequential processing | |
pages_to_process = sorted(list(set(pages_to_process))) | |
logger.info(f"Processing {len(pages_to_process)} sampled pages out of {total_pages} total: {pages_to_process}") | |
# Convert only the selected pages to minimize memory usage | |
selected_images = [] | |
combined_text = [] | |
# Process pages in larger batches for better efficiency | |
batch_size = 5 # Process 5 pages at a time for better throughput | |
for i in range(0, len(pages_to_process), batch_size): | |
batch_pages = pages_to_process[i:i+batch_size] | |
logger.info(f"Converting batch of pages {batch_pages}") | |
# Convert batch of pages with multi-threading for better performance | |
batch_start = time.time() | |
batch_images = convert_from_path( | |
file_path, | |
dpi=dpi, | |
first_page=min(batch_pages), | |
last_page=max(batch_pages), | |
thread_count=4, # Use multi-threading for faster PDF processing | |
fmt="jpeg" # Use JPEG format for better compatibility | |
) | |
logger.info(f"Batch conversion completed in {time.time() - batch_start:.2f}s") | |
# Map converted images to requested page numbers | |
for idx, page_num in enumerate(range(min(batch_pages), max(batch_pages) + 1)): | |
if page_num in pages_to_process and idx < len(batch_images): | |
if page_num == pages_to_process[0]: # First page to process | |
selected_images.append(batch_images[idx]) | |
# Process each page individually | |
with tempfile.NamedTemporaryFile(suffix='.jpeg', delete=False) as tmp: | |
batch_images[idx].save(tmp.name, format='JPEG') | |
# Simple OCR to extract text | |
try: | |
page_result = self._process_image(Path(tmp.name), False, None) | |
if 'ocr_contents' in page_result and 'raw_text' in page_result['ocr_contents']: | |
# Add page text to combined text | |
page_text = page_result['ocr_contents']['raw_text'] | |
combined_text.append(f"--- PAGE {page_num} ---\n{page_text}") | |
except Exception as page_e: | |
logger.warning(f"Error processing page {page_num}: {str(page_e)}") | |
# Clean up temp file | |
import os | |
os.unlink(tmp.name) | |
# If we have processed pages | |
if selected_images and combined_text: | |
# Save first image to temp file for vision model | |
with tempfile.NamedTemporaryFile(suffix='.jpeg', delete=False) as tmp: | |
selected_images[0].save(tmp.name, format='JPEG', quality=95) | |
first_image_path = tmp.name | |
# Combine all extracted text | |
all_text = "\n\n".join(combined_text) | |
# For custom prompts, use specialized processing | |
if custom_prompt: | |
try: | |
# Process image with vision model | |
result = self._process_image(Path(first_image_path), use_vision, None) | |
# Enhance with text analysis using combined text from all pages | |
enhanced_result = self._extract_structured_data_text_only(all_text, file_path.name, custom_prompt) | |
# Merge results, keeping images from original result | |
for key, value in enhanced_result.items(): | |
if key not in ('raw_response_data', 'pages_data', 'has_images'): | |
result[key] = value | |
# Update raw text with full document text | |
if 'ocr_contents' in result: | |
result['ocr_contents']['raw_text'] = all_text | |
except Exception as e: | |
logger.warning(f"Custom prompt processing failed: {str(e)}. Using standard processing.") | |
# Fall back to standard processing | |
result = self._process_image(Path(first_image_path), use_vision, None) | |
if 'ocr_contents' in result: | |
result['ocr_contents']['raw_text'] = all_text | |
else: | |
# Standard processing with combined text | |
result = self._process_image(Path(first_image_path), use_vision, None) | |
if 'ocr_contents' in result: | |
result['ocr_contents']['raw_text'] = all_text | |
# Add PDF metadata | |
result['file_name'] = file_path.name | |
result['pdf_processing_method'] = 'pdf2image_optimized' | |
result['total_pages'] = total_pages | |
result['processed_pages'] = len(pages_to_process) | |
result['pages_processed'] = pages_to_process | |
# Add processing info | |
result['processing_info'] = { | |
'method': 'local_pdf_processing', | |
'dpi': dpi, | |
'pages_sampled': pages_to_process, | |
'processing_time': time.time() - start_time | |
} | |
# Clean up | |
os.unlink(first_image_path) | |
return result | |
else: | |
logger.warning("No pages successfully processed with pdf2image, falling back to API") | |
raise Exception("Failed to process PDF pages locally") | |
except Exception as pdf2image_error: | |
logger.warning(f"Local PDF processing failed, falling back to API: {str(pdf2image_error)}") | |
# Fall back to API processing | |
# API-based PDF processing | |
logger.info("Processing PDF via Mistral API") | |
# Optimize file upload for faster processing | |
logger.info("Uploading PDF file to Mistral API") | |
upload_start = time.time() | |
# Set appropriate timeout based on file size | |
upload_timeout = max(60, min(300, int(file_size_mb * 5))) # 60s to 300s based on size | |
try: | |
# Upload the file (Mistral client doesn't support timeout parameter for upload) | |
uploaded_file = self.client.files.upload( | |
file={ | |
"file_name": file_path.stem, | |
"content": file_path.read_bytes(), | |
}, | |
purpose="ocr" | |
) | |
logger.info(f"PDF uploaded in {time.time() - upload_start:.2f}s") | |
# Get a signed URL for the uploaded file | |
signed_url = self.client.files.get_signed_url(file_id=uploaded_file.id, expiry=1) | |
# Process the PDF with OCR - use adaptive timeout based on file size | |
logger.info(f"Processing PDF with OCR using {OCR_MODEL}") | |
# Adaptive retry strategy based on file size | |
max_retries = 3 if file_size_mb < 20 else 2 # Fewer retries for large files | |
base_retry_delay = 1 if file_size_mb < 10 else 2 # Longer delays for large files | |
# Adaptive timeout based on file size | |
ocr_timeout_ms = min(180000, max(60000, int(file_size_mb * 3000))) # 60s to 180s | |
# Try processing with retries | |
for retry in range(max_retries): | |
try: | |
ocr_start = time.time() | |
pdf_response = self.client.ocr.process( | |
document=DocumentURLChunk(document_url=signed_url.url), | |
model=OCR_MODEL, | |
include_image_base64=True, | |
timeout_ms=ocr_timeout_ms | |
) | |
logger.info(f"PDF OCR processing completed in {time.time() - ocr_start:.2f}s") | |
break # Success, exit retry loop | |
except Exception as e: | |
error_msg = str(e) | |
logger.warning(f"API error on attempt {retry+1}/{max_retries}: {error_msg}") | |
# Handle errors with optimized retry logic | |
error_lower = error_msg.lower() | |
# Authentication errors - no point in retrying | |
if any(term in error_lower for term in ["unauthorized", "401", "403", "authentication"]): | |
logger.error("API authentication failed. Check your API key.") | |
raise ValueError(f"Authentication failed. Please verify your Mistral API key: {error_msg}") | |
# Connection or server errors - worth retrying | |
elif any(term in error_lower for term in ["connection", "timeout", "520", "server error", "502", "503", "504"]): | |
if retry < max_retries - 1: | |
# Exponential backoff with jitter for better retry behavior | |
wait_time = base_retry_delay * (2 ** retry) * (0.8 + 0.4 * random.random()) | |
logger.info(f"Connection issue detected. Waiting {wait_time:.1f}s before retry...") | |
time.sleep(wait_time) | |
else: | |
# Last retry failed | |
logger.error("Maximum retries reached, API connection error persists.") | |
raise ValueError(f"Could not connect to Mistral API after {max_retries} attempts: {error_msg}") | |
# Rate limit errors - much longer wait | |
elif any(term in error_lower for term in ["rate limit", "429", "too many requests", "requests rate limit exceeded"]): | |
# Check specifically for token exhaustion vs temporary rate limit | |
if "quota" in error_lower or "credit" in error_lower or "subscription" in error_lower: | |
logger.error("API quota or credit limit reached. No retry will help.") | |
raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
elif retry < max_retries - 1: | |
wait_time = base_retry_delay * (2 ** retry) * 6.0 # Significantly longer wait for rate limits | |
logger.info(f"Rate limit exceeded. Waiting {wait_time:.1f}s before retry...") | |
time.sleep(wait_time) | |
else: | |
logger.error("Maximum retries reached, rate limit error persists.") | |
raise ValueError(f"API rate limit exceeded. Please try again later: {error_msg}") | |
# Misc errors - typically no retry will help | |
else: | |
if retry < max_retries - 1 and any(term in error_lower for term in ["transient", "temporary"]): | |
# Only retry for errors explicitly marked as transient | |
wait_time = base_retry_delay * (2 ** retry) | |
logger.info(f"Transient error detected. Waiting {wait_time:.1f}s before retry...") | |
time.sleep(wait_time) | |
else: | |
logger.error(f"Unrecoverable API error: {error_msg}") | |
raise | |
# Calculate the number of pages to process | |
pages_to_process = pdf_response.pages | |
total_pages = len(pdf_response.pages) | |
limited_pages = False | |
logger.info(f"API returned {total_pages} total PDF pages") | |
# Smart page selection logic for better performance | |
if custom_pages: | |
# Convert to 0-based indexing and filter valid page numbers | |
valid_indices = [i-1 for i in custom_pages if 0 < i <= total_pages] | |
if valid_indices: | |
pages_to_process = [pdf_response.pages[i] for i in valid_indices] | |
limited_pages = True | |
logger.info(f"Processing {len(valid_indices)} custom-selected pages") | |
# Max pages limit with smart sampling | |
elif max_pages and total_pages > max_pages: | |
if max_pages == 1: | |
# Just first page | |
pages_to_process = pages_to_process[:1] | |
elif max_pages < 5 and total_pages > 10: | |
# For small max_pages on large docs, include first, last, and middle | |
indices = [0] # First page | |
if max_pages > 1: | |
indices.append(total_pages - 1) # Last page | |
if max_pages > 2: | |
indices.append(total_pages // 2) # Middle page | |
# Add more pages up to max_pages if needed | |
if max_pages > 3: | |
remaining = max_pages - len(indices) | |
step = total_pages // (remaining + 1) | |
for i in range(1, remaining + 1): | |
idx = i * step | |
if idx not in indices and 0 <= idx < total_pages: | |
indices.append(idx) | |
indices.sort() | |
pages_to_process = [pdf_response.pages[i] for i in indices] | |
else: | |
# Default: first max_pages | |
pages_to_process = pages_to_process[:max_pages] | |
limited_pages = True | |
logger.info(f"Processing {len(pages_to_process)} pages out of {total_pages} total") | |
# Calculate confidence score if available | |
try: | |
confidence_values = [page.confidence for page in pages_to_process if hasattr(page, 'confidence')] | |
confidence_score = sum(confidence_values) / len(confidence_values) if confidence_values else 0.89 | |
except Exception: | |
confidence_score = 0.89 # Improved default | |
# Merge page content intelligently - include page numbers for better context | |
all_markdown = [] | |
for idx, page in enumerate(pages_to_process): | |
# Try to determine actual page number | |
if custom_pages and len(custom_pages) == len(pages_to_process): | |
page_num = custom_pages[idx] | |
else: | |
# Estimate page number - may not be accurate with sampling | |
page_num = idx + 1 | |
page_markdown = page.markdown if hasattr(page, 'markdown') else "" | |
# Add page header if content exists | |
if page_markdown.strip(): | |
all_markdown.append(f"--- PAGE {page_num} ---\n{page_markdown}") | |
# Join all pages with separation | |
combined_markdown = "\n\n".join(all_markdown) | |
# Extract structured data with the appropriate model | |
if use_vision: | |
# Try to get a good image for vision model | |
vision_image = None | |
# Try first page with images | |
for page in pages_to_process: | |
if hasattr(page, 'images') and page.images: | |
vision_image = page.images[0].image_base64 | |
break | |
if vision_image: | |
# Use vision model with enhanced prompt | |
logger.info(f"Using vision model: {VISION_MODEL}") | |
result = self._extract_structured_data_with_vision( | |
vision_image, combined_markdown, file_path.name, custom_prompt | |
) | |
else: | |
# Fall back to text-only if no images available | |
logger.info(f"No images in PDF, falling back to text model: {TEXT_MODEL}") | |
result = self._extract_structured_data_text_only( | |
combined_markdown, file_path.name, custom_prompt | |
) | |
else: | |
# Use text-only model as requested | |
logger.info(f"Using text-only model as specified: {TEXT_MODEL}") | |
result = self._extract_structured_data_text_only( | |
combined_markdown, file_path.name, custom_prompt | |
) | |
# Add metadata about pages | |
if limited_pages: | |
result['limited_pages'] = { | |
'processed': len(pages_to_process), | |
'total': total_pages | |
} | |
# Set confidence score from OCR | |
result['confidence_score'] = confidence_score | |
# Add processing method info | |
result['pdf_processing_method'] = 'api' | |
result['total_pages'] = total_pages | |
result['processed_pages'] = len(pages_to_process) | |
# Store serialized OCR response for rendering | |
serialized_response = serialize_ocr_response(pdf_response) | |
result['raw_response_data'] = serialized_response | |
# Check if there are images to include | |
has_images = hasattr(pdf_response, 'pages') and any( | |
hasattr(page, 'images') and page.images for page in pdf_response.pages | |
) | |
result['has_images'] = has_images | |
# Include image data for rendering if available | |
if has_images: | |
# Prepare pages data with image references | |
result['pages_data'] = [] | |
# Get serialized pages - handle different formats | |
serialized_pages = None | |
try: | |
if hasattr(serialized_response, 'pages'): | |
serialized_pages = serialized_response.pages | |
elif isinstance(serialized_response, dict) and 'pages' in serialized_response: | |
serialized_pages = serialized_response.get('pages', []) | |
else: | |
# No pages found in response | |
logger.warning("No pages found in OCR response") | |
serialized_pages = [] | |
except Exception as pages_err: | |
logger.warning(f"Error extracting pages from OCR response: {str(pages_err)}") | |
serialized_pages = [] | |
# Process each page to extract images | |
for page_idx, page in enumerate(serialized_pages): | |
try: | |
# Skip processing pages not in our selection | |
if limited_pages and page_idx >= len(pages_to_process): | |
continue | |
# Extract page data with careful error handling | |
markdown = "" | |
images = [] | |
# Handle different page formats safely | |
if isinstance(page, dict): | |
markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
else: | |
# Try attribute access | |
if hasattr(page, 'markdown'): | |
markdown = page.markdown | |
if hasattr(page, 'images'): | |
images = page.images | |
# Create page data record | |
page_data = { | |
'page_number': page_idx + 1, | |
'markdown': markdown, | |
'images': [] | |
} | |
# Process images with careful error handling | |
for img_idx, img in enumerate(images): | |
try: | |
# Extract image ID and base64 data | |
img_id = None | |
img_base64 = None | |
if isinstance(img, dict): | |
img_id = img.get('id') | |
img_base64 = img.get('image_base64') | |
else: | |
# Try attribute access | |
if hasattr(img, 'id'): | |
img_id = img.id | |
if hasattr(img, 'image_base64'): | |
img_base64 = img.image_base64 | |
# Only add if we have valid image data | |
if img_base64 and isinstance(img_base64, str): | |
# Ensure ID exists | |
safe_id = img_id if img_id else f"img_{page_idx}_{img_idx}" | |
page_data['images'].append({ | |
'id': safe_id, | |
'image_base64': img_base64 | |
}) | |
except Exception as img_err: | |
logger.warning(f"Error processing image {img_idx} on page {page_idx+1}: {str(img_err)}") | |
continue # Skip this image | |
# Add page data if it has content | |
if page_data['markdown'] or page_data['images']: | |
result['pages_data'].append(page_data) | |
except Exception as page_err: | |
logger.warning(f"Error processing page {page_idx+1}: {str(page_err)}") | |
continue # Skip this page | |
# Record final processing time | |
total_time = time.time() - start_time | |
result['processing_time'] = total_time | |
logger.info(f"PDF API processing completed in {total_time:.2f}s") | |
return result | |
except Exception as api_e: | |
logger.error(f"Error in API-based PDF processing: {str(api_e)}") | |
# Re-raise to be caught by outer exception handler | |
raise | |
except Exception as e: | |
# Log the error and return a helpful error result | |
logger.error(f"Error processing PDF: {str(e)}") | |
# Return basic result on error | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"confidence_score": 0.0, | |
"error": str(e), | |
"ocr_contents": { | |
"error": f"Failed to process PDF: {str(e)}", | |
"partial_text": "Document could not be fully processed." | |
}, | |
"processing_time": time.time() - start_time | |
} | |
def _process_image(self, file_path, use_vision=True, custom_prompt=None): | |
"""Process an image file with OCR""" | |
logger = logging.getLogger("image_processor") | |
logger.info(f"Processing image: {file_path}") | |
# Check if we're in test mode | |
if self.test_mode: | |
# Return a placeholder document response | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"title": "Document", | |
"content": "Please set up API key to process documents." | |
}, | |
"processing_time": 0.5, | |
"confidence_score": 0.0 | |
} | |
try: | |
# Check file size | |
file_size_mb = file_path.stat().st_size / (1024 * 1024) | |
logger.info(f"Original image size: {file_size_mb:.2f} MB") | |
# Use enhanced preprocessing functions from ocr_utils | |
try: | |
from ocr_utils import preprocess_image_for_ocr, IMAGE_PREPROCESSING | |
logger.info(f"Applying advanced image preprocessing for OCR") | |
# Get preprocessing settings from config | |
max_size_mb = IMAGE_PREPROCESSING.get("max_size_mb", 8.0) | |
if file_size_mb > max_size_mb: | |
logger.info(f"Image is large ({file_size_mb:.2f} MB), optimizing for API submission") | |
# Preprocess image with document-type detection and appropriate enhancements | |
_, base64_data_url = preprocess_image_for_ocr(file_path) | |
logger.info(f"Image preprocessing completed successfully") | |
except (ImportError, AttributeError) as e: | |
# Fallback to basic processing if advanced functions not available | |
logger.warning(f"Advanced preprocessing not available: {str(e)}. Using basic image processing.") | |
# If image is larger than 8MB, resize it to reduce API payload size | |
if file_size_mb > 8: | |
logger.info("Image is large, resizing before API submission") | |
try: | |
from PIL import Image | |
import io | |
# Open and process the image | |
with Image.open(file_path) as img: | |
# Convert to RGB if not already (prevents mode errors) | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
# Calculate new dimensions (maintain aspect ratio) | |
# Target around 2000-2500 pixels on longest side for better OCR quality | |
width, height = img.size | |
max_dimension = max(width, height) | |
target_dimension = 2000 # Restored to 2000 for better image quality | |
if max_dimension > target_dimension: | |
scale_factor = target_dimension / max_dimension | |
resized_width = int(width * scale_factor) | |
resized_height = int(height * scale_factor) | |
# Use LANCZOS instead of BILINEAR for better quality | |
img = img.resize((resized_width, resized_height), Image.LANCZOS) | |
# Enhance contrast for better text recognition | |
from PIL import ImageEnhance | |
enhancer = ImageEnhance.Contrast(img) | |
img = enhancer.enhance(1.3) | |
# Save to bytes with compression | |
buffer = io.BytesIO() | |
img.save(buffer, format="JPEG", quality=92, optimize=True) # Higher quality for better OCR | |
buffer.seek(0) | |
# Get the base64 | |
encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
# Log the new size | |
new_size_mb = len(buffer.getvalue()) / (1024 * 1024) | |
logger.info(f"Resized image to {new_size_mb:.2f} MB") | |
except ImportError: | |
logger.warning("PIL not available for resizing. Using original image.") | |
encoded_image = base64.b64encode(file_path.read_bytes()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
except Exception as e: | |
logger.warning(f"Image resize failed: {str(e)}. Using original image.") | |
encoded_image = base64.b64encode(file_path.read_bytes()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
else: | |
# For smaller images, use as-is | |
encoded_image = base64.b64encode(file_path.read_bytes()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
except Exception as e: | |
# Fallback to original image if any preprocessing fails | |
logger.warning(f"Image preprocessing failed: {str(e)}. Using original image.") | |
encoded_image = base64.b64encode(file_path.read_bytes()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
# Process the image with OCR | |
logger.info(f"Processing image with OCR using {OCR_MODEL}") | |
# Add retry logic with more retries and longer backoff periods for rate limit issues | |
max_retries = 4 # Increased from 2 to give more chances to succeed | |
retry_delay = 2 # Increased from 1 to allow for longer backoff periods | |
for retry in range(max_retries): | |
try: | |
image_response = self.client.ocr.process( | |
document=ImageURLChunk(image_url=base64_data_url), | |
model=OCR_MODEL, | |
include_image_base64=True, | |
timeout_ms=90000 # 90 second timeout for better success rate | |
) | |
break # Success, exit retry loop | |
except Exception as e: | |
error_msg = str(e) | |
logger.warning(f"API error on attempt {retry+1}/{max_retries}: {error_msg}") | |
# Check specific error types to handle them appropriately | |
error_lower = error_msg.lower() | |
# Authentication errors - no point in retrying | |
if "unauthorized" in error_lower or "401" in error_lower: | |
logger.error("API authentication failed. Check your API key.") | |
raise ValueError(f"Authentication failed with API key. Please verify your Mistral API key is correct and active: {error_msg}") | |
# Connection errors - worth retrying | |
elif "connection" in error_lower or "timeout" in error_lower or "520" in error_msg or "server error" in error_lower: | |
if retry < max_retries - 1: | |
# Wait with shorter delay before retrying | |
wait_time = retry_delay * (2 ** retry) | |
logger.info(f"Connection issue detected. Waiting {wait_time}s before retry...") | |
time.sleep(wait_time) | |
else: | |
# Last retry failed | |
logger.error("Maximum retries reached, API connection error persists.") | |
raise ValueError(f"Could not connect to Mistral API after {max_retries} attempts: {error_msg}") | |
# Rate limit errors | |
elif "rate limit" in error_lower or "429" in error_lower or "requests rate limit exceeded" in error_lower: | |
# Check specifically for token exhaustion vs temporary rate limit | |
if "quota" in error_lower or "credit" in error_lower or "subscription" in error_lower: | |
logger.error("API quota or credit limit reached. No retry will help.") | |
raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
elif retry < max_retries - 1: | |
# More aggressive backoff for rate limits | |
wait_time = retry_delay * (2 ** retry) * 5 # 5x longer wait for rate limits | |
logger.info(f"Rate limit exceeded. Waiting {wait_time}s before retry...") | |
time.sleep(wait_time) | |
else: | |
# Last retry failed, try local OCR as fallback | |
logger.error("Maximum retries reached, rate limit error persists.") | |
try: | |
# Try to import the local OCR fallback function | |
from ocr_utils import try_local_ocr_fallback | |
# Attempt local OCR fallback | |
ocr_text = try_local_ocr_fallback(file_path, base64_data_url) | |
if ocr_text: | |
logger.info("Successfully used local OCR fallback") | |
# Return a basic result with the local OCR text | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"title": "Document (Local OCR)", | |
"content": "This document was processed with local OCR due to API rate limiting.", | |
"raw_text": ocr_text | |
}, | |
"processing_method": "local_fallback", | |
"processing_note": "Used local OCR due to API rate limit" | |
} | |
except (ImportError, Exception) as local_err: | |
logger.warning(f"Local OCR fallback failed: {str(local_err)}") | |
# If we get here, both API and local OCR failed | |
raise ValueError(f"Mistral API rate limit exceeded. Please try again later: {error_msg}") | |
# Other errors - no retry | |
else: | |
logger.error(f"Unrecoverable API error: {error_msg}") | |
raise | |
# Get the OCR markdown from the first page | |
image_ocr_markdown = image_response.pages[0].markdown if image_response.pages else "" | |
# Optimize: Skip vision model step if ocr_markdown is very small or empty | |
if not image_ocr_markdown or len(image_ocr_markdown) < 50: | |
logger.warning("OCR produced minimal or no text. Returning basic result.") | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"raw_text": image_ocr_markdown if image_ocr_markdown else "No text could be extracted from the image." | |
}, | |
"processing_note": "OCR produced minimal text content" | |
} | |
# Extract structured data using the appropriate model, with a single API call | |
if use_vision: | |
logger.info(f"Using vision model: {VISION_MODEL}") | |
result = self._extract_structured_data_with_vision(base64_data_url, image_ocr_markdown, file_path.name, custom_prompt) | |
else: | |
logger.info(f"Using text-only model: {TEXT_MODEL}") | |
result = self._extract_structured_data_text_only(image_ocr_markdown, file_path.name, custom_prompt) | |
# Store the serialized OCR response for image rendering (for compatibility with original version) | |
# Don't store raw_response directly as it's not JSON serializable | |
serialized_response = serialize_ocr_response(image_response) | |
result['raw_response_data'] = serialized_response | |
# Store key parts of the OCR response for image rendering | |
# With serialized format that can be stored in JSON | |
has_images = hasattr(image_response, 'pages') and image_response.pages and hasattr(image_response.pages[0], 'images') and image_response.pages[0].images | |
result['has_images'] = has_images | |
if has_images: | |
# Serialize the entire response to ensure it's JSON serializable | |
serialized_response = serialize_ocr_response(image_response) | |
# Create a structured representation of images that can be serialized | |
result['pages_data'] = [] | |
if hasattr(serialized_response, 'pages'): | |
serialized_pages = serialized_response.pages | |
else: | |
# Handle case where serialization returns a dict instead of an object | |
serialized_pages = serialized_response.get('pages', []) | |
for page_idx, page in enumerate(serialized_pages): | |
# Handle both object and dict forms | |
if isinstance(page, dict): | |
markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
else: | |
markdown = page.markdown if hasattr(page, 'markdown') else '' | |
images = page.images if hasattr(page, 'images') else [] | |
page_data = { | |
'page_number': page_idx + 1, | |
'markdown': markdown, | |
'images': [] | |
} | |
# Extract images if present | |
for img_idx, img in enumerate(images): | |
img_id = None | |
img_base64 = None | |
if isinstance(img, dict): | |
img_id = img.get('id') | |
img_base64 = img.get('image_base64') | |
else: | |
img_id = img.id if hasattr(img, 'id') else None | |
img_base64 = img.image_base64 if hasattr(img, 'image_base64') else None | |
if img_base64: | |
page_data['images'].append({ | |
'id': img_id if img_id else f"img_{page_idx}_{img_idx}", | |
'image_base64': img_base64 | |
}) | |
result['pages_data'].append(page_data) | |
logger.info("Image processing completed successfully") | |
return result | |
except Exception as e: | |
logger.error(f"Error processing image: {str(e)}") | |
# Return basic result on error | |
return { | |
"file_name": file_path.name, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"error": str(e), | |
"ocr_contents": { | |
"error": f"Failed to process image: {str(e)}", | |
"partial_text": "Image could not be processed." | |
} | |
} | |
def _extract_structured_data_with_vision(self, image_base64, ocr_markdown, filename, custom_prompt=None): | |
""" | |
Extract structured data using vision model with detailed historical context prompting | |
Optimized for speed, accuracy, and resilience | |
""" | |
logger = logging.getLogger("vision_processor") | |
try: | |
# Fast path: Skip vision API for minimal OCR text (saves an API call) | |
if not ocr_markdown or len(ocr_markdown.strip()) < 100: # Increased threshold for better detection | |
logger.info("Minimal OCR text detected, skipping vision model processing") | |
return { | |
"file_name": filename, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"raw_text": ocr_markdown if ocr_markdown else "No text could be extracted" | |
} | |
} | |
# Fast path: Skip if in test mode or no API key | |
if self.test_mode or not self.api_key: | |
logger.info("Test mode or no API key, using text-only processing") | |
return self._extract_structured_data_text_only(ocr_markdown, filename) | |
# Detect document type with optimized cached implementation | |
doc_type = self._detect_document_type(custom_prompt, ocr_markdown) | |
logger.info(f"Detected document type: {doc_type}") | |
# Optimize OCR text for processing - focus on the first part which usually contains | |
# the most important information (title, metadata, etc.) | |
if len(ocr_markdown) > 8000: | |
# Start with first 5000 chars | |
first_part = ocr_markdown[:5000] | |
# Then add representative samples from different parts of the document | |
# This captures headings and key information throughout | |
middle_start = len(ocr_markdown) // 2 - 1000 | |
middle_part = ocr_markdown[middle_start:middle_start+2000] if middle_start > 0 else "" | |
# Get ending section if large enough | |
if len(ocr_markdown) > 15000: | |
end_part = ocr_markdown[-1000:] | |
truncated_ocr = f"{first_part}\n...\n{middle_part}\n...\n{end_part}" | |
else: | |
truncated_ocr = f"{first_part}\n...\n{middle_part}" | |
logger.info(f"Truncated OCR text from {len(ocr_markdown)} to {len(truncated_ocr)} chars") | |
else: | |
truncated_ocr = ocr_markdown | |
# Build an optimized prompt based on document type | |
enhanced_prompt = self._build_enhanced_prompt(doc_type, truncated_ocr, custom_prompt) | |
# Measure API call time for optimization feedback | |
start_time = time.time() | |
try: | |
# Try with enhanced timing parameters based on document complexity | |
# Use shorter timeout for smaller documents | |
timeout_ms = min(120000, max(60000, len(truncated_ocr) * 10)) # 60-120 seconds based on text length | |
logger.info(f"Calling vision model with {timeout_ms}ms timeout and document type {doc_type}") | |
chat_response = self.client.chat.parse( | |
model=VISION_MODEL, | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
ImageURLChunk(image_url=image_base64), | |
TextChunk(text=enhanced_prompt) | |
], | |
}, | |
], | |
response_format=StructuredOCRModel, | |
temperature=0, | |
timeout_ms=timeout_ms | |
) | |
api_time = time.time() - start_time | |
logger.info(f"Vision model completed in {api_time:.2f}s with document type: {doc_type}") | |
except Exception as e: | |
# If there's an error with the enhanced prompt, try progressively simpler approaches | |
logger.warning(f"Enhanced prompt failed after {time.time() - start_time:.2f}s: {str(e)}") | |
# Try a simplified approach with less context | |
try: | |
# Shorter prompt with less contextual information | |
simplified_prompt = ( | |
f"You are an expert in historical document analysis. " | |
f"Analyze this document image and the OCR text below. " | |
f"<BEGIN_OCR>\n{truncated_ocr[:4000]}\n<END_OCR>\n" | |
f"Identify the document type, main topics, languages used, and extract key information " | |
f"including names, dates, places, and events. Return a structured JSON response." | |
) | |
# Add custom prompt if provided | |
if custom_prompt: | |
simplified_prompt += f"\n\nAdditional instructions: {custom_prompt}" | |
logger.info(f"Trying simplified prompt approach") | |
chat_response = self.client.chat.parse( | |
model=VISION_MODEL, | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
ImageURLChunk(image_url=image_base64), | |
TextChunk(text=simplified_prompt) | |
], | |
}, | |
], | |
response_format=StructuredOCRModel, | |
temperature=0, | |
timeout_ms=60000 # Shorter timeout for simplified approach | |
) | |
logger.info(f"Simplified prompt approach succeeded") | |
except Exception as second_e: | |
# If that fails, try with minimal prompt and just image analysis | |
logger.warning(f"Simplified prompt failed: {str(second_e)}. Trying minimal prompt.") | |
try: | |
# Minimal prompt focusing on just the image | |
minimal_prompt = ( | |
f"Analyze this historical document image. " | |
f"Extract the document type, main topics, languages, and key information. " | |
f"Provide your analysis in a structured JSON format." | |
) | |
logger.info(f"Trying minimal prompt with image-only focus") | |
chat_response = self.client.chat.parse( | |
model=VISION_MODEL, | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
ImageURLChunk(image_url=image_base64), | |
TextChunk(text=minimal_prompt) | |
], | |
}, | |
], | |
response_format=StructuredOCRModel, | |
temperature=0, | |
timeout_ms=45000 # Even shorter timeout for minimal approach | |
) | |
logger.info(f"Minimal prompt approach succeeded") | |
except Exception as third_e: | |
# If all vision attempts fail, fall back to text-only model | |
logger.warning(f"All vision model attempts failed, falling back to text-only model: {str(third_e)}") | |
return self._extract_structured_data_text_only(ocr_markdown, filename) | |
# Convert the response to a dictionary | |
result = json.loads(chat_response.choices[0].message.parsed.json()) | |
# Ensure languages is a list of strings, not Language enum objects | |
if 'languages' in result: | |
result['languages'] = [str(lang) for lang in result.get('languages', [])] | |
# Add metadata about processing | |
result['processing_info'] = { | |
'method': 'vision_model', | |
'document_type': doc_type, | |
'ocr_text_length': len(ocr_markdown), | |
'api_response_time': time.time() - start_time | |
} | |
# Add confidence score if not present | |
if 'confidence_score' not in result: | |
result['confidence_score'] = 0.92 # Vision model typically has higher confidence | |
except Exception as e: | |
# Fall back to text-only model if vision model fails | |
logger.warning(f"Vision model processing failed, falling back to text-only model: {str(e)}") | |
result = self._extract_structured_data_text_only(ocr_markdown, filename) | |
return result | |
# Thread-safe document type detection cache with increased size for better performance | |
_doc_type_cache = {} | |
_doc_type_cache_size = 256 | |
def _detect_document_type_cached(custom_prompt: Optional[str], ocr_text_sample: str) -> str: | |
""" | |
Cached version of document type detection logic with thread-safe implementation | |
""" | |
# Generate cache key - use first 50 chars of prompt and ocr_text to avoid memory issues | |
prompt_key = str(custom_prompt)[:50] if custom_prompt else "" | |
text_key = ocr_text_sample[:50] if ocr_text_sample else "" | |
cache_key = f"{prompt_key}::{text_key}" | |
# Check cache first (fast path) | |
if cache_key in StructuredOCR._doc_type_cache: | |
return StructuredOCR._doc_type_cache[cache_key] | |
# Set default document type | |
doc_type = "general" | |
# Optimized pattern matching with compiled lookup dictionaries | |
doc_type_patterns = { | |
"handwritten": ["handwritten", "handwriting", "cursive", "manuscript"], | |
"letter": ["letter", "correspondence", "message", "dear sir", "dear madam", "sincerely", "yours truly"], | |
"legal": ["form", "contract", "agreement", "legal", "certificate", "court", "attorney", "plaintiff", "defendant"], | |
"recipe": ["recipe", "food", "ingredients", "directions", "tbsp", "tsp", "cup", "mix", "bake", "cooking"], | |
"travel": ["travel", "expedition", "journey", "exploration", "voyage", "destination", "map"], | |
"scientific": ["scientific", "experiment", "hypothesis", "research", "study", "analysis", "results", "procedure"], | |
"newspaper": ["news", "newspaper", "article", "press", "headline", "column", "editor"] | |
} | |
# Fast custom prompt matching | |
if custom_prompt: | |
prompt_lower = custom_prompt.lower() | |
# Optimized pattern matching with early exit | |
for detected_type, patterns in doc_type_patterns.items(): | |
if any(term in prompt_lower for term in patterns): | |
doc_type = detected_type | |
break | |
# Fast OCR text matching if still general type | |
if doc_type == "general" and ocr_text_sample: | |
ocr_lower = ocr_text_sample.lower() | |
# Use the same patterns dictionary for consistency, but scan the OCR text | |
for detected_type, patterns in doc_type_patterns.items(): | |
if any(term in ocr_lower for term in patterns): | |
doc_type = detected_type | |
break | |
# Cache the result with improved LRU-like behavior | |
if len(StructuredOCR._doc_type_cache) >= StructuredOCR._doc_type_cache_size: | |
# Clear multiple entries at once for better performance | |
try: | |
# Remove up to 20 entries to avoid frequent cache clearing | |
for _ in range(20): | |
if StructuredOCR._doc_type_cache: | |
StructuredOCR._doc_type_cache.pop(next(iter(StructuredOCR._doc_type_cache))) | |
except: | |
# If concurrent modification causes issues, just proceed | |
pass | |
# Store in cache | |
StructuredOCR._doc_type_cache[cache_key] = doc_type | |
return doc_type | |
def _detect_document_type(self, custom_prompt: Optional[str], ocr_text: str) -> str: | |
""" | |
Detect document type based on content and custom prompt. | |
Args: | |
custom_prompt: User-provided custom prompt | |
ocr_text: OCR-extracted text | |
Returns: | |
Document type identifier ("handwritten", "printed", "letter", etc.) | |
""" | |
# Only sample first 1000 characters of OCR text for faster processing while maintaining accuracy | |
ocr_sample = ocr_text[:1000] if ocr_text else "" | |
# Use the cached version for better performance | |
return self._detect_document_type_cached(custom_prompt, ocr_sample) | |
def _build_enhanced_prompt(self, doc_type: str, ocr_text: str, custom_prompt: Optional[str]) -> str: | |
""" | |
Build an enhanced prompt based on document type. | |
Args: | |
doc_type: Detected document type | |
ocr_text: OCR-extracted text | |
custom_prompt: User-provided custom prompt | |
Returns: | |
Enhanced prompt optimized for the document type | |
""" | |
# Generic document section (included in all prompts) | |
generic_section = ( | |
f"This is a historical document's OCR text:\n" | |
f"<BEGIN_OCR>\n{ocr_text}\n<END_OCR>\n\n" | |
) | |
# Document-specific prompting | |
if doc_type == "handwritten": | |
specific_section = ( | |
f"You are an expert historian specializing in handwritten document transcription and analysis. " | |
f"The OCR system has attempted to capture the handwriting, but may have made errors with cursive script " | |
f"or unusual letter formations.\n\n" | |
f"Pay careful attention to:\n" | |
f"- Correcting OCR errors common in handwriting recognition\n" | |
f"- Preserving the original document structure\n" | |
f"- Identifying topics, language(s), and document type accurately\n" | |
f"- Detecting any names, dates, places, or events mentioned\n" | |
) | |
elif doc_type == "letter": | |
specific_section = ( | |
f"You are an expert in historical correspondence analysis. " | |
f"Analyze this letter as a historian would, identifying:\n" | |
f"- Sender and recipient (if mentioned)\n" | |
f"- Date and location of writing (if present)\n" | |
f"- Key topics discussed\n" | |
f"- Historical context and significance\n" | |
f"- Sentiment and tone of the communication\n" | |
f"- Closing formulations and signature\n" | |
) | |
elif doc_type == "recipe": | |
specific_section = ( | |
f"You are a culinary historian specializing in historical recipes. " | |
f"Analyze this recipe document to extract:\n" | |
f"- Recipe name/title\n" | |
f"- Complete list of ingredients with measurements\n" | |
f"- Preparation instructions in correct order\n" | |
f"- Cooking time and temperature if mentioned\n" | |
f"- Serving suggestions or yield information\n" | |
f"- Any cultural or historical context provided\n" | |
) | |
elif doc_type == "travel": | |
specific_section = ( | |
f"You are a historian specializing in historical travel and exploration accounts. " | |
f"Analyze this document to extract:\n" | |
f"- Geographical locations mentioned\n" | |
f"- Names of explorers, ships, or expeditions\n" | |
f"- Dates and timelines\n" | |
f"- Descriptions of indigenous peoples, cultures, or local conditions\n" | |
f"- Natural features, weather, or navigational details\n" | |
f"- Historical significance of the journey described\n" | |
) | |
elif doc_type == "scientific": | |
specific_section = ( | |
f"You are a historian of science specializing in historical scientific documents. " | |
f"Analyze this document to extract:\n" | |
f"- Scientific methodology described\n" | |
f"- Observations, measurements, or data presented\n" | |
f"- Scientific terminology of the period\n" | |
f"- Experimental apparatus or tools mentioned\n" | |
f"- Conclusions or hypotheses presented\n" | |
f"- Historical significance within scientific development\n" | |
) | |
elif doc_type == "newspaper": | |
specific_section = ( | |
f"You are a media historian specializing in historical newspapers and publications. " | |
f"Analyze this document to extract:\n" | |
f"- Publication name and date if present\n" | |
f"- Headlines and article titles\n" | |
f"- Main news content with focus on events, people, and places\n" | |
f"- Advertisement content if present\n" | |
f"- Historical context and significance\n" | |
f"- Editorial perspective or bias if detectable\n" | |
) | |
elif doc_type == "legal": | |
specific_section = ( | |
f"You are a legal historian specializing in historical legal documents. " | |
f"Analyze this document to extract:\n" | |
f"- Document type (contract, certificate, will, deed, etc.)\n" | |
f"- Parties involved and their roles\n" | |
f"- Key terms, conditions, or declarations\n" | |
f"- Dates, locations, and jurisdictions mentioned\n" | |
f"- Legal terminology of the period\n" | |
f"- Signatures, witnesses, or official markings\n" | |
) | |
else: | |
# General historical document | |
specific_section = ( | |
f"You are a historian specializing in historical document analysis. " | |
f"Analyze this document to extract:\n" | |
f"- Document type and purpose\n" | |
f"- Time period and historical context\n" | |
f"- Key topics, themes, and subjects\n" | |
f"- People, places, and events mentioned\n" | |
f"- Languages used and writing style\n" | |
f"- Historical significance and connections\n" | |
) | |
# Output instructions | |
output_section = ( | |
f"Create a structured JSON response with the following fields:\n" | |
f"- file_name: The document's name\n" | |
f"- topics: An array of topics covered in the document\n" | |
f"- languages: An array of languages used in the document\n" | |
f"- ocr_contents: A dictionary with the document's contents, organized logically\n" | |
) | |
# Add custom prompt if provided | |
custom_section = "" | |
if custom_prompt: | |
custom_section = f"\n\nADDITIONAL CONTEXT AND INSTRUCTIONS:\n{custom_prompt}\n" | |
# Combine all sections into complete prompt | |
return generic_section + specific_section + output_section + custom_section | |
def _extract_structured_data_text_only(self, ocr_markdown, filename, custom_prompt=None): | |
""" | |
Extract structured data using text-only model with detailed historical context prompting | |
and improved error handling | |
""" | |
logger = logging.getLogger("text_processor") | |
start_time = time.time() | |
try: | |
# Fast path: Skip for minimal OCR text | |
if not ocr_markdown or len(ocr_markdown.strip()) < 50: | |
logger.info("Minimal OCR text - returning basic result") | |
return { | |
"file_name": filename, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"raw_text": ocr_markdown if ocr_markdown else "No text could be extracted" | |
}, | |
"processing_method": "minimal_text" | |
} | |
# Check for API key to avoid unnecessary processing | |
if self.test_mode or not self.api_key: | |
logger.info("Test mode or no API key - returning basic result") | |
return { | |
"file_name": filename, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"raw_text": ocr_markdown[:10000] if ocr_markdown else "No text could be extracted", | |
"note": "API key not provided - showing raw OCR text only" | |
}, | |
"processing_method": "test_mode" | |
} | |
# Detect document type and build enhanced prompt | |
doc_type = self._detect_document_type(custom_prompt, ocr_markdown) | |
logger.info(f"Detected document type: {doc_type}") | |
# If OCR text is very large, truncate it to avoid API limits | |
truncated_text = ocr_markdown | |
if len(ocr_markdown) > 25000: | |
# Keep first 15000 chars and last 5000 chars | |
truncated_text = ocr_markdown[:15000] + "\n...[content truncated]...\n" + ocr_markdown[-5000:] | |
logger.info(f"OCR text truncated from {len(ocr_markdown)} to {len(truncated_text)} chars") | |
# Build the prompt with truncated text if needed | |
enhanced_prompt = self._build_enhanced_prompt(doc_type, truncated_text, custom_prompt) | |
# Use enhanced prompt with text-only model - with retry logic | |
max_retries = 2 | |
retry_delay = 1 | |
for retry in range(max_retries): | |
try: | |
logger.info(f"Calling text model ({TEXT_MODEL})") | |
api_start = time.time() | |
# Set appropriate timeout based on text length | |
timeout_ms = min(120000, max(30000, len(truncated_text) * 5)) # 30-120s based on length | |
# Make API call with appropriate timeout | |
chat_response = self.client.chat.parse( | |
model=TEXT_MODEL, | |
messages=[ | |
{ | |
"role": "user", | |
"content": enhanced_prompt | |
}, | |
], | |
response_format=StructuredOCRModel, | |
temperature=0, | |
timeout_ms=timeout_ms | |
) | |
api_time = time.time() - api_start | |
logger.info(f"Text model API call completed in {api_time:.2f}s") | |
# Convert the response to a dictionary | |
result = json.loads(chat_response.choices[0].message.parsed.json()) | |
# Ensure languages is a list of strings, not Language enum objects | |
if 'languages' in result: | |
result['languages'] = [str(lang) for lang in result.get('languages', [])] | |
# Add processing metadata | |
result['processing_method'] = 'text_model' | |
result['document_type'] = doc_type | |
result['model_used'] = TEXT_MODEL | |
result['processing_time'] = time.time() - start_time | |
# Add raw text for reference if not already present | |
if 'ocr_contents' in result and 'raw_text' not in result['ocr_contents']: | |
# Add truncated raw text if very large | |
if len(ocr_markdown) > 50000: | |
result['ocr_contents']['raw_text'] = ocr_markdown[:50000] + "\n...[content truncated]..." | |
else: | |
result['ocr_contents']['raw_text'] = ocr_markdown | |
return result | |
except Exception as api_error: | |
error_msg = str(api_error).lower() | |
logger.warning(f"API error on attempt {retry+1}/{max_retries}: {str(api_error)}") | |
# Check if retry would help | |
if retry < max_retries - 1: | |
# Rate limit errors - special handling with longer wait | |
if any(term in error_msg for term in ["rate limit", "429", "too many requests", "requests rate limit exceeded"]): | |
# Check specifically for token exhaustion vs temporary rate limit | |
if any(term in error_msg for term in ["quota", "credit", "subscription"]): | |
logger.error("API quota or credit limit reached. No retry will help.") | |
raise ValueError(f"Mistral API quota or credit limit reached. Please check your subscription: {error_msg}") | |
# Longer backoff for rate limit errors | |
wait_time = retry_delay * (2 ** retry) * 6.0 # 6x longer wait for rate limits | |
logger.info(f"Rate limit exceeded. Waiting {wait_time:.1f}s before retry...") | |
time.sleep(wait_time) | |
# Other transient errors | |
elif any(term in error_msg for term in ["timeout", "connection", "500", "503", "504"]): | |
# Wait before retrying | |
wait_time = retry_delay * (2 ** retry) | |
logger.info(f"Transient error, retrying in {wait_time}s") | |
time.sleep(wait_time) | |
else: | |
# Non-retryable error | |
raise | |
else: | |
# Last retry failed | |
raise | |
# This shouldn't be reached due to raise in the loop, but just in case | |
raise Exception("All retries failed for text model") | |
except Exception as e: | |
logger.error(f"Text model failed: {str(e)}. Creating basic result.") | |
# Create a basic result with available OCR text | |
try: | |
# Create a more informative fallback result | |
result = { | |
"file_name": filename, | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"raw_text": ocr_markdown[:50000] if ocr_markdown else "No text could be extracted", | |
"error": "AI processing failed: " + str(e).replace('"', '\\"') | |
}, | |
"processing_method": "fallback", | |
"processing_error": str(e), | |
"processing_time": time.time() - start_time | |
} | |
# Try to extract some basic metadata even without AI | |
if ocr_markdown: | |
# Simple content analysis | |
text_sample = ocr_markdown[:5000].lower() | |
# Try to detect language | |
if "dear" in text_sample and any(word in text_sample for word in ["sincerely", "regards", "truly"]): | |
result["topics"].append("Letter") | |
elif any(word in text_sample for word in ["recipe", "ingredients", "instructions", "cook", "bake"]): | |
result["topics"].append("Recipe") | |
elif any(word in text_sample for word in ["article", "report", "study", "analysis"]): | |
result["topics"].append("Article") | |
except Exception as inner_e: | |
logger.error(f"Error creating basic result: {str(inner_e)}") | |
result = { | |
"file_name": str(filename) if filename else "unknown", | |
"topics": ["Document"], | |
"languages": ["English"], | |
"ocr_contents": { | |
"error": "Processing failed completely", | |
"partial_text": ocr_markdown[:1000] if ocr_markdown else "Document could not be processed." | |
} | |
} | |
return result | |
# For testing directly | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) < 2: | |
print("Usage: python structured_ocr.py <file_path>") | |
sys.exit(1) | |
file_path = sys.argv[1] | |
processor = StructuredOCR() | |
result = processor.process_file(file_path) | |
print(json.dumps(result, indent=2)) |