Spaces:
Running
Running
""" | |
Utility functions for OCR processing with Mistral AI. | |
Contains helper functions for working with OCR responses and image handling. | |
""" | |
import json | |
import base64 | |
import io | |
import zipfile | |
import logging | |
import numpy as np | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Optional, Union, Any, Tuple | |
from functools import lru_cache | |
# Configure logging | |
logger = logging.getLogger("ocr_utils") | |
try: | |
from PIL import Image, ImageEnhance, ImageFilter, ImageOps | |
import cv2 | |
PILLOW_AVAILABLE = True | |
CV2_AVAILABLE = True | |
except ImportError as e: | |
# Check which image libraries are available | |
if "PIL" in str(e): | |
PILLOW_AVAILABLE = False | |
if "cv2" in str(e): | |
CV2_AVAILABLE = False | |
from mistralai import DocumentURLChunk, ImageURLChunk, TextChunk | |
# Import configuration | |
try: | |
from config import IMAGE_PREPROCESSING | |
except ImportError: | |
# Fallback defaults if config not available | |
IMAGE_PREPROCESSING = { | |
"enhance_contrast": 1.5, | |
"sharpen": True, | |
"denoise": True, | |
"max_size_mb": 8.0, | |
"target_dpi": 300, | |
"compression_quality": 92 | |
} | |
def replace_images_in_markdown(markdown_str: str, images_dict: dict) -> str: | |
""" | |
Replace image placeholders in markdown with base64-encoded images. | |
Args: | |
markdown_str: Markdown text containing image placeholders | |
images_dict: Dictionary mapping image IDs to base64 strings | |
Returns: | |
Markdown text with images replaced by base64 data | |
""" | |
for img_name, base64_str in images_dict.items(): | |
markdown_str = markdown_str.replace( | |
f"", f"" | |
) | |
return markdown_str | |
def get_combined_markdown(ocr_response) -> str: | |
""" | |
Combine OCR text and images into a single markdown document. | |
Args: | |
ocr_response: OCR response object from Mistral AI | |
Returns: | |
Combined markdown string with embedded images | |
""" | |
markdowns = [] | |
# Process each page of the OCR response | |
for page in ocr_response.pages: | |
# Extract image data if available | |
image_data = {} | |
if hasattr(page, "images"): | |
for img in page.images: | |
if hasattr(img, "id") and hasattr(img, "image_base64"): | |
image_data[img.id] = img.image_base64 | |
# Replace image placeholders with base64 data | |
page_markdown = page.markdown if hasattr(page, "markdown") else "" | |
processed_markdown = replace_images_in_markdown(page_markdown, image_data) | |
markdowns.append(processed_markdown) | |
# Join all pages' markdown with double newlines | |
return "\n\n".join(markdowns) | |
def encode_image_for_api(image_path: Union[str, Path]) -> str: | |
""" | |
Encode an image as base64 data URL for API submission. | |
Args: | |
image_path: Path to the image file | |
Returns: | |
Base64 data URL for the image | |
""" | |
# Convert to Path object if string | |
image_file = Path(image_path) if isinstance(image_path, str) else image_path | |
# Verify image exists | |
if not image_file.is_file(): | |
raise FileNotFoundError(f"Image file not found: {image_file}") | |
# Encode image as base64 | |
encoded = base64.b64encode(image_file.read_bytes()).decode() | |
return f"data:image/jpeg;base64,{encoded}" | |
def process_image_with_ocr(client, image_path: Union[str, Path], model: str = "mistral-ocr-latest"): | |
""" | |
Process an image with OCR and return the response. | |
Args: | |
client: Mistral AI client | |
image_path: Path to the image file | |
model: OCR model to use | |
Returns: | |
OCR response object | |
""" | |
# Encode image as base64 | |
base64_data_url = encode_image_for_api(image_path) | |
# Process image with OCR | |
image_response = client.ocr.process( | |
document=ImageURLChunk(image_url=base64_data_url), | |
model=model | |
) | |
return image_response | |
def ocr_response_to_json(ocr_response, indent: int = 4) -> str: | |
""" | |
Convert OCR response to a formatted JSON string. | |
Args: | |
ocr_response: OCR response object | |
indent: Indentation level for JSON formatting | |
Returns: | |
Formatted JSON string | |
""" | |
# Convert OCR response to a dictionary | |
response_dict = { | |
"text": ocr_response.text if hasattr(ocr_response, "text") else "", | |
"pages": [] | |
} | |
# Process pages if available | |
if hasattr(ocr_response, "pages"): | |
for page in ocr_response.pages: | |
page_dict = { | |
"text": page.text if hasattr(page, "text") else "", | |
"markdown": page.markdown if hasattr(page, "markdown") else "", | |
"images": [] | |
} | |
# Process images if available | |
if hasattr(page, "images"): | |
for img in page.images: | |
img_dict = { | |
"id": img.id if hasattr(img, "id") else "", | |
"base64": img.image_base64 if hasattr(img, "image_base64") else "" | |
} | |
page_dict["images"].append(img_dict) | |
response_dict["pages"].append(page_dict) | |
# Convert dictionary to JSON | |
return json.dumps(response_dict, indent=indent) | |
def create_results_zip_in_memory(results): | |
""" | |
Create a zip file containing OCR results in memory. | |
Args: | |
results: Dictionary or list of OCR results | |
Returns: | |
Binary zip file data | |
""" | |
# Create a BytesIO object | |
zip_buffer = io.BytesIO() | |
# Check if results is a list or a dictionary | |
is_list = isinstance(results, list) | |
# Create zip file in memory | |
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
if is_list: | |
# Handle list of results | |
for i, result in enumerate(results): | |
try: | |
# Add JSON results for each file | |
result_json = json.dumps(result, indent=2) | |
zipf.writestr(f"results_{i+1}.json", result_json) | |
# Add HTML content (generated from the result) | |
html_content = create_html_with_images(result) | |
filename = result.get('file_name', f'document_{i+1}').split('.')[0] | |
zipf.writestr(f"{filename}_with_images.html", html_content) | |
# Add raw OCR text if available | |
if "ocr_contents" in result and "raw_text" in result["ocr_contents"]: | |
zipf.writestr(f"ocr_text_{i+1}.txt", result["ocr_contents"]["raw_text"]) | |
# Add HTML visualization if available | |
if "html_visualization" in result: | |
zipf.writestr(f"visualization_{i+1}.html", result["html_visualization"]) | |
# Add images if available (limit to conserve memory) | |
if "pages_data" in result: | |
for page_idx, page in enumerate(result["pages_data"]): | |
for img_idx, img in enumerate(page.get("images", [])[:3]): # Limit to first 3 images per page | |
img_base64 = img.get("image_base64", "") | |
if img_base64: | |
# Strip data URL prefix if present | |
if img_base64.startswith("data:image"): | |
img_base64 = img_base64.split(",", 1)[1] | |
# Decode base64 and add to zip | |
try: | |
img_data = base64.b64decode(img_base64) | |
zipf.writestr(f"images/result_{i+1}_page_{page_idx+1}_img_{img_idx+1}.jpg", img_data) | |
except: | |
pass | |
except Exception: | |
# If any result fails, skip it and continue | |
continue | |
else: | |
# Handle single result | |
try: | |
# Add JSON results | |
results_json = json.dumps(results, indent=2) | |
zipf.writestr("results.json", results_json) | |
# Add HTML content | |
html_content = create_html_with_images(results) | |
filename = results.get('file_name', 'document').split('.')[0] | |
zipf.writestr(f"{filename}_with_images.html", html_content) | |
# Add raw OCR text if available | |
if "ocr_contents" in results and "raw_text" in results["ocr_contents"]: | |
zipf.writestr("ocr_text.txt", results["ocr_contents"]["raw_text"]) | |
# Add HTML visualization if available | |
if "html_visualization" in results: | |
zipf.writestr("visualization.html", results["html_visualization"]) | |
# Add images if available | |
if "pages_data" in results: | |
for page_idx, page in enumerate(results["pages_data"]): | |
for img_idx, img in enumerate(page.get("images", [])): | |
img_base64 = img.get("image_base64", "") | |
if img_base64: | |
# Strip data URL prefix if present | |
if img_base64.startswith("data:image"): | |
img_base64 = img_base64.split(",", 1)[1] | |
# Decode base64 and add to zip | |
try: | |
img_data = base64.b64decode(img_base64) | |
zipf.writestr(f"images/page_{page_idx+1}_img_{img_idx+1}.jpg", img_data) | |
except: | |
pass | |
except Exception: | |
# If processing fails, return empty zip | |
pass | |
# Seek to the beginning of the BytesIO object | |
zip_buffer.seek(0) | |
# Return the zip file bytes | |
return zip_buffer.getvalue() | |
def create_results_zip(results, output_dir=None, zip_name=None): | |
""" | |
Create a zip file containing OCR results. | |
Args: | |
results: Dictionary or list of OCR results | |
output_dir: Optional output directory | |
zip_name: Optional zip file name | |
Returns: | |
Path to the created zip file | |
""" | |
# Create temporary output directory if not provided | |
if output_dir is None: | |
output_dir = Path.cwd() / "output" | |
output_dir.mkdir(exist_ok=True) | |
else: | |
output_dir = Path(output_dir) | |
output_dir.mkdir(exist_ok=True) | |
# Check if results is a list or a dictionary | |
is_list = isinstance(results, list) | |
# Generate zip name if not provided | |
if zip_name is None: | |
if is_list: | |
# For list of results, use timestamp and generic name | |
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
zip_name = f"ocr-results_{timestamp}.zip" | |
else: | |
# For single result, use original file's info | |
# Check if processed_at exists, otherwise use current timestamp | |
if "processed_at" in results: | |
timestamp = results.get("processed_at", "").replace(":", "-").replace(".", "-") | |
else: | |
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
file_name = results.get("file_name", "ocr-results") | |
zip_name = f"{file_name}_{timestamp}.zip" | |
try: | |
# Get zip data in memory first | |
zip_data = create_results_zip_in_memory(results) | |
# Save to file | |
zip_path = output_dir / zip_name | |
with open(zip_path, 'wb') as f: | |
f.write(zip_data) | |
return zip_path | |
except Exception as e: | |
# Create an empty zip file as fallback | |
zip_path = output_dir / zip_name | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
zipf.writestr("info.txt", "Could not create complete archive") | |
return zip_path | |
# Advanced image preprocessing functions | |
def preprocess_image_for_ocr(image_path: Union[str, Path]) -> Tuple[Image.Image, str]: | |
""" | |
Preprocess an image for optimal OCR performance with enhanced speed and memory optimization. | |
Args: | |
image_path: Path to the image file | |
Returns: | |
Tuple of (processed PIL Image, base64 string) | |
""" | |
# Fast path: Skip all processing if PIL not available | |
if not PILLOW_AVAILABLE: | |
logger.info("PIL not available, skipping image preprocessing") | |
return None, encode_image_for_api(image_path) | |
# Convert to Path object if string | |
image_file = Path(image_path) if isinstance(image_path, str) else image_path | |
# Thread-safe caching with early exit for already processed images | |
try: | |
# Fast stat calls for file metadata - consolidate to reduce I/O | |
file_stat = image_file.stat() | |
file_size = file_stat.st_size | |
file_size_mb = file_size / (1024 * 1024) | |
mod_time = file_stat.st_mtime | |
# Create a cache key based on essential file properties | |
cache_key = f"{image_file.name}_{file_size}_{mod_time}" | |
# Fast path: Return cached result if available | |
if hasattr(preprocess_image_for_ocr, "_cache") and cache_key in preprocess_image_for_ocr._cache: | |
logger.debug(f"Using cached preprocessing result for {image_file.name}") | |
return preprocess_image_for_ocr._cache[cache_key] | |
# Optimization: Skip heavy processing for very small files | |
# Small images (less than 100KB) likely don't need preprocessing | |
if file_size < 100000: # 100KB | |
logger.info(f"Image {image_file.name} is small ({file_size/1024:.1f}KB), using minimal processing") | |
with Image.open(image_file) as img: | |
# Normalize mode only | |
if img.mode not in ('RGB', 'L'): | |
img = img.convert('RGB') | |
# Save with light optimization | |
buffer = io.BytesIO() | |
img.save(buffer, format="JPEG", quality=95, optimize=True) | |
buffer.seek(0) | |
# Get base64 | |
encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
# Cache and return | |
result = (img, base64_data_url) | |
if not hasattr(preprocess_image_for_ocr, "_cache"): | |
preprocess_image_for_ocr._cache = {} | |
# Clean cache if needed | |
if len(preprocess_image_for_ocr._cache) > 20: # Increased cache size for better performance | |
# Remove oldest 5 entries for better batch processing | |
for _ in range(5): | |
if preprocess_image_for_ocr._cache: | |
preprocess_image_for_ocr._cache.pop(next(iter(preprocess_image_for_ocr._cache))) | |
preprocess_image_for_ocr._cache[cache_key] = result | |
return result | |
except Exception as e: | |
# If stat or cache handling fails, log and continue with processing | |
logger.debug(f"Cache handling failed for {image_path}: {str(e)}") | |
# Ensure we have a valid file_size_mb for later decisions | |
try: | |
file_size_mb = image_file.stat().st_size / (1024 * 1024) | |
except: | |
file_size_mb = 0 # Default if we can't determine size | |
try: | |
# Process start time for performance logging | |
start_time = time.time() | |
# Open and process the image with minimal memory footprint | |
with Image.open(image_file) as img: | |
# Normalize image mode | |
if img.mode not in ('RGB', 'L'): | |
img = img.convert('RGB') | |
# Fast path: Quick check of image properties to determine appropriate processing | |
width, height = img.size | |
image_area = width * height | |
# Detect document type only for medium to large images to save processing time | |
is_document = False | |
if image_area > 500000: # Approx 700x700 or larger | |
# Store image for document detection | |
_detect_document_type_impl._current_img = img | |
is_document = _detect_document_type_impl(None) | |
logger.debug(f"Document type detection for {image_file.name}: {'document' if is_document else 'photo'}") | |
# Resize large images for API efficiency | |
if file_size_mb > IMAGE_PREPROCESSING["max_size_mb"] or max(width, height) > 3000: | |
# Calculate target dimensions directly instead of using the heavier resize function | |
target_width, target_height = width, height | |
max_dimension = max(width, height) | |
# Use a sliding scale for reduction based on image size | |
if max_dimension > 5000: | |
scale_factor = 0.25 # Aggressive reduction for very large images | |
elif max_dimension > 3000: | |
scale_factor = 0.4 # Significant reduction for large images | |
else: | |
scale_factor = 0.6 # Moderate reduction for medium images | |
# Calculate new dimensions | |
new_width = int(width * scale_factor) | |
new_height = int(height * scale_factor) | |
# Use direct resize with optimized resampling filter based on image size | |
if image_area > 3000000: # Very large, use faster but lower quality | |
processed_img = img.resize((new_width, new_height), Image.BILINEAR) | |
else: # Medium size, use better quality | |
processed_img = img.resize((new_width, new_height), Image.LANCZOS) | |
logger.debug(f"Resized image from {width}x{height} to {new_width}x{new_height}") | |
else: | |
# Skip resizing for smaller images | |
processed_img = img | |
# Apply appropriate processing based on document type and size | |
if is_document: | |
# Process as document with optimized path based on size | |
if image_area > 1000000: # Full processing for larger documents | |
preprocess_document_image._current_img = processed_img | |
processed = _preprocess_document_image_impl() | |
else: # Lightweight processing for smaller documents | |
# Just enhance contrast for small documents to save time | |
enhancer = ImageEnhance.Contrast(processed_img) | |
processed = enhancer.enhance(1.3) | |
else: | |
# Process as photo with optimized path based on size | |
if image_area > 1000000: # Full processing for larger photos | |
preprocess_general_image._current_img = processed_img | |
processed = _preprocess_general_image_impl() | |
else: # Skip processing for smaller photos | |
processed = processed_img | |
# Optimize memory handling during encoding | |
buffer = io.BytesIO() | |
# Adjust quality based on image size to optimize API payload | |
if file_size_mb > 5: | |
quality = 85 # Lower quality for large files | |
else: | |
quality = IMAGE_PREPROCESSING["compression_quality"] | |
# Save with optimized parameters | |
processed.save(buffer, format="JPEG", quality=quality, optimize=True) | |
buffer.seek(0) | |
# Get base64 with minimal memory footprint | |
encoded_image = base64.b64encode(buffer.getvalue()).decode() | |
base64_data_url = f"data:image/jpeg;base64,{encoded_image}" | |
# Update cache thread-safely | |
result = (processed, base64_data_url) | |
if not hasattr(preprocess_image_for_ocr, "_cache"): | |
preprocess_image_for_ocr._cache = {} | |
# LRU-like cache management with improved clearing | |
if len(preprocess_image_for_ocr._cache) > 20: | |
try: | |
# Remove several entries to avoid frequent cache clearing | |
for _ in range(5): | |
if preprocess_image_for_ocr._cache: | |
preprocess_image_for_ocr._cache.pop(next(iter(preprocess_image_for_ocr._cache))) | |
except: | |
# If removal fails, just continue | |
pass | |
# Add to cache | |
try: | |
preprocess_image_for_ocr._cache[cache_key] = result | |
except Exception: | |
# If caching fails, just proceed | |
pass | |
# Log performance metrics | |
processing_time = time.time() - start_time | |
logger.debug(f"Image preprocessing completed in {processing_time:.3f}s for {image_file.name}") | |
# Return both processed image and base64 string | |
return result | |
except Exception as e: | |
# If preprocessing fails, log error and use original image | |
logger.warning(f"Image preprocessing failed: {str(e)}. Using original image.") | |
return None, encode_image_for_api(image_path) | |
# Removed caching decorator to fix unhashable type error | |
def detect_document_type(img: Image.Image) -> bool: | |
""" | |
Detect if an image is likely a document (text-heavy) vs. a photo. | |
Args: | |
img: PIL Image object | |
Returns: | |
True if likely a document, False otherwise | |
""" | |
# Direct implementation without caching | |
return _detect_document_type_impl(None) | |
def _detect_document_type_impl(img_hash=None) -> bool: | |
""" | |
Optimized implementation of document type detection for faster processing. | |
The img_hash parameter is unused but kept for backward compatibility. | |
Enhanced to better detect handwritten documents. | |
""" | |
# Fast path: Get the image from thread-local storage | |
if not hasattr(_detect_document_type_impl, "_current_img"): | |
return False # Fail safe in case image is not set | |
img = _detect_document_type_impl._current_img | |
# Skip processing for tiny images - just classify as non-documents | |
width, height = img.size | |
if width * height < 100000: # Approx 300x300 or smaller | |
return False | |
# Convert to grayscale for analysis (using faster conversion) | |
gray_img = img.convert('L') | |
# PIL-only path for systems without OpenCV | |
if not CV2_AVAILABLE: | |
# Faster method: Sample a subset of the image for edge detection | |
# Downscale image for faster processing | |
sample_size = min(width, height, 1000) | |
scale_factor = sample_size / max(width, height) | |
if scale_factor < 0.9: # Only resize if significant reduction | |
sample_img = gray_img.resize( | |
(int(width * scale_factor), int(height * scale_factor)), | |
Image.NEAREST # Fastest resampling method | |
) | |
else: | |
sample_img = gray_img | |
# Fast edge detection on sample | |
edges = sample_img.filter(ImageFilter.FIND_EDGES) | |
# Count edge pixels using threshold (faster than summing individual pixels) | |
edge_data = edges.getdata() | |
edge_threshold = 40 # Lowered threshold to better detect handwritten texts | |
# Use list comprehension for better performance | |
edge_count = sum(1 for p in edge_data if p > edge_threshold) | |
total_pixels = len(edge_data) | |
edge_ratio = edge_count / total_pixels | |
# Check if bright areas exist - simple approximation of text/background contrast | |
bright_count = sum(1 for p in gray_img.getdata() if p > 200) | |
bright_ratio = bright_count / (width * height) | |
# Documents typically have more edges (text boundaries) and bright areas (background) | |
# Lowered edge threshold to better detect handwritten documents | |
return edge_ratio > 0.035 or bright_ratio > 0.4 | |
# OpenCV path - optimized for speed and enhanced for handwritten documents | |
img_np = np.array(gray_img) | |
# 1. Fast check: Variance of pixel values | |
# Documents typically have high variance (text on background) | |
# Handwritten documents may have less contrast than printed text | |
std_dev = np.std(img_np) | |
if std_dev > 45: # Lowered threshold to better detect handwritten documents | |
return True | |
# 2. Quick check using downsampled image for edges | |
# Downscale for faster processing on large images | |
if max(img_np.shape) > 1000: | |
scale = 1000 / max(img_np.shape) | |
small_img = cv2.resize(img_np, None, fx=scale, fy=scale, interpolation=cv2.INTER_NEAREST) | |
else: | |
small_img = img_np | |
# Use adaptive edge detection parameters for handwritten documents | |
# Lowered threshold to better detect fainter handwritten text | |
edges = cv2.Canny(small_img, 30, 130, L2gradient=False) | |
edge_ratio = np.count_nonzero(edges) / edges.size | |
# 3. Fast histogram approximation using bins | |
# Instead of calculating full histogram, use bins for dark and light regions | |
# Adjusted for handwritten documents which may have more gray values | |
dark_mask = img_np < 60 # Increased threshold to capture lighter handwritten text | |
light_mask = img_np > 180 # Lowered threshold to account for aged paper | |
dark_ratio = np.count_nonzero(dark_mask) / img_np.size | |
light_ratio = np.count_nonzero(light_mask) / img_np.size | |
# Special analysis for handwritten documents | |
# Check for line-like structures typical in handwritten text | |
if CV2_AVAILABLE and edge_ratio > 0.02: # Lower threshold to capture handwritten documents | |
# Try to find line segments that could indicate text lines | |
lines = cv2.HoughLinesP(edges, 1, np.pi/180, | |
threshold=50, # Lower threshold for detection | |
minLineLength=30, # Shorter lines for handwriting | |
maxLineGap=20) # Larger gap for discontinuous handwriting | |
# If we find enough line segments, it's likely a document with text | |
if lines is not None and len(lines) > 10: | |
return True | |
# Combine heuristics for final decision | |
# Documents typically have both dark (text) and light (background) regions, | |
# and/or well-defined edges | |
# Lower thresholds for handwritten documents | |
return (dark_ratio > 0.03 and light_ratio > 0.25) or edge_ratio > 0.03 | |
# Removed caching to fix unhashable type error | |
def preprocess_document_image(img: Image.Image) -> Image.Image: | |
""" | |
Preprocess a document image for optimal OCR. | |
Args: | |
img: PIL Image object | |
Returns: | |
Processed PIL Image | |
""" | |
# Store the image for the implementation function | |
preprocess_document_image._current_img = img | |
# The actual implementation is separated for cleaner code organization | |
return _preprocess_document_image_impl() | |
def _preprocess_document_image_impl() -> Image.Image: | |
""" | |
Optimized implementation of document preprocessing with adaptive processing based on image size. | |
Enhanced for better handwritten document processing. | |
""" | |
# Fast path: Get image from thread-local storage | |
if not hasattr(preprocess_document_image, "_current_img"): | |
raise ValueError("No image set for document preprocessing") | |
img = preprocess_document_image._current_img | |
# Analyze image size to determine processing strategy | |
width, height = img.size | |
img_size = width * height | |
# Check if the image might be a handwritten document - use special processing | |
is_handwritten = False | |
try: | |
# Simple check for handwritten document characteristics | |
# Handwritten documents often have more varied strokes and less stark contrast | |
if CV2_AVAILABLE: | |
# Convert to grayscale and calculate local variance | |
gray_np = np.array(img.convert('L')) | |
# Higher variance in edge strengths can indicate handwriting | |
edges = cv2.Canny(gray_np, 30, 100) | |
if np.count_nonzero(edges) / edges.size > 0.02: # Low edge threshold for handwriting | |
# Additional check with gradient magnitudes | |
sobelx = cv2.Sobel(gray_np, cv2.CV_64F, 1, 0, ksize=3) | |
sobely = cv2.Sobel(gray_np, cv2.CV_64F, 0, 1, ksize=3) | |
magnitude = np.sqrt(sobelx**2 + sobely**2) | |
# Handwriting typically has more variation in gradient magnitudes | |
if np.std(magnitude) > 20: | |
is_handwritten = True | |
except: | |
# If detection fails, assume it's not handwritten | |
pass | |
# Ultra-fast path for tiny images - just convert to grayscale with contrast enhancement | |
if img_size < 300000: # ~500x600 or smaller | |
gray = img.convert('L') | |
# Lower contrast enhancement for handwritten documents | |
contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
enhancer = ImageEnhance.Contrast(gray) | |
return enhancer.enhance(contrast_level) | |
# Fast path for small images - minimal processing | |
if img_size < 1000000: # ~1000x1000 or smaller | |
gray = img.convert('L') | |
# Use gentler contrast enhancement for handwritten documents | |
contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
enhancer = ImageEnhance.Contrast(gray) | |
enhanced = enhancer.enhance(contrast_level) | |
# Light sharpening only if sharpen is enabled | |
# Use milder sharpening for handwritten documents to preserve stroke detail | |
if IMAGE_PREPROCESSING["sharpen"]: | |
if is_handwritten: | |
# Use edge enhancement which is gentler than SHARPEN for handwriting | |
enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
else: | |
enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
return enhanced | |
# Standard path for medium images | |
# Convert to grayscale (faster processing) | |
gray = img.convert('L') | |
# Adaptive contrast enhancement based on document type | |
contrast_level = 1.4 if is_handwritten else IMAGE_PREPROCESSING["enhance_contrast"] | |
enhancer = ImageEnhance.Contrast(gray) | |
enhanced = enhancer.enhance(contrast_level) | |
# Apply light sharpening for text clarity - adapt based on document type | |
if IMAGE_PREPROCESSING["sharpen"]: | |
if is_handwritten: | |
# Use edge enhancement which is gentler than SHARPEN for handwriting | |
enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
else: | |
enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
# Advanced processing with OpenCV if available | |
if CV2_AVAILABLE and IMAGE_PREPROCESSING["denoise"]: | |
try: | |
# Convert to numpy array for OpenCV processing | |
img_np = np.array(enhanced) | |
if is_handwritten: | |
# Special treatment for handwritten documents | |
# Use guided filter which preserves edges better than NLMeans | |
# Guided filter works well for handwriting by preserving stroke details | |
if img_size > 3000000: # Large images - downsample first | |
scale_factor = 0.5 | |
small_img = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, | |
interpolation=cv2.INTER_AREA) | |
# Apply bilateral filter which preserves edges while smoothing | |
filtered = cv2.bilateralFilter(small_img, 9, 75, 75) | |
# Resize back | |
filtered = cv2.resize(filtered, (width, height), interpolation=cv2.INTER_LINEAR) | |
else: | |
# Use bilateral filter directly for smaller images | |
filtered = cv2.bilateralFilter(img_np, 7, 50, 50) | |
# Convert back to PIL Image | |
enhanced = Image.fromarray(filtered) | |
# For handwritten docs, avoid binary thresholding which can destroy subtle strokes | |
return enhanced | |
else: | |
# Standard document processing - optimized for printed text | |
# Optimize denoising parameters based on image size | |
if img_size > 4000000: # Very large images | |
# More aggressive downsampling for very large images | |
scale_factor = 0.5 | |
downsample = cv2.resize(img_np, None, fx=scale_factor, fy=scale_factor, | |
interpolation=cv2.INTER_AREA) | |
# Lighter denoising for downsampled image | |
h_value = 7 # Strength parameter | |
template_window = 5 | |
search_window = 13 | |
# Apply denoising on smaller image | |
denoised_np = cv2.fastNlMeansDenoising(downsample, None, h_value, template_window, search_window) | |
# Resize back to original size | |
denoised_np = cv2.resize(denoised_np, (width, height), interpolation=cv2.INTER_LINEAR) | |
else: | |
# Direct denoising for medium-large images | |
h_value = 8 # Balanced for speed and quality | |
template_window = 5 | |
search_window = 15 | |
# Apply denoising | |
denoised_np = cv2.fastNlMeansDenoising(img_np, None, h_value, template_window, search_window) | |
# Convert back to PIL Image | |
enhanced = Image.fromarray(denoised_np) | |
# Apply adaptive thresholding only if it improves text visibility | |
# Create a binarized version of the image | |
if img_size < 8000000: # Skip for extremely large images to save processing time | |
binary = cv2.adaptiveThreshold(denoised_np, 255, | |
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, 11, 2) | |
# Quick verification that binarization preserves text information | |
# Use simplified check that works well for document images | |
white_pixels_binary = np.count_nonzero(binary > 200) | |
white_pixels_orig = np.count_nonzero(denoised_np > 200) | |
# Check if binary preserves reasonable amount of white pixels (background) | |
if white_pixels_binary > white_pixels_orig * 0.8: | |
# Binarization looks good, use it | |
return Image.fromarray(binary) | |
return enhanced | |
except Exception as e: | |
# If OpenCV processing fails, continue with PIL-enhanced image | |
pass | |
elif IMAGE_PREPROCESSING["denoise"]: | |
# Fallback PIL denoising for systems without OpenCV | |
if is_handwritten: | |
# Lighter filtering for handwritten text to preserve details | |
# Use a smaller median filter for handwritten documents | |
enhanced = enhanced.filter(ImageFilter.MedianFilter(1)) | |
else: | |
# Standard filtering for printed documents | |
enhanced = enhanced.filter(ImageFilter.MedianFilter(3)) | |
# Return enhanced grayscale image | |
return enhanced | |
# Removed caching to fix unhashable type error | |
def preprocess_general_image(img: Image.Image) -> Image.Image: | |
""" | |
Preprocess a general image for OCR. | |
Args: | |
img: PIL Image object | |
Returns: | |
Processed PIL Image | |
""" | |
# Store the image for implementation function | |
preprocess_general_image._current_img = img | |
return _preprocess_general_image_impl() | |
def _preprocess_general_image_impl() -> Image.Image: | |
""" | |
Optimized implementation of general image preprocessing with size-based processing paths | |
""" | |
# Fast path: Get the image from thread-local storage | |
if not hasattr(preprocess_general_image, "_current_img"): | |
raise ValueError("No image set for general preprocessing") | |
img = preprocess_general_image._current_img | |
# Ultra-fast path: Skip processing completely for small images to improve performance | |
width, height = img.size | |
img_size = width * height | |
if img_size < 300000: # Skip for tiny images under ~0.3 megapixel | |
# Just ensure correct color mode | |
if img.mode != 'RGB': | |
return img.convert('RGB') | |
return img | |
# Fast path: Minimal processing for smaller images | |
if img_size < 600000: # ~800x750 or smaller | |
# Ensure RGB mode | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
# Very light contrast enhancement only | |
enhancer = ImageEnhance.Contrast(img) | |
return enhancer.enhance(1.15) # Lighter enhancement for small images | |
# Standard path: Apply moderate enhancements for medium images | |
# Convert to RGB to ensure compatibility | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
# Moderate enhancement only | |
enhancer = ImageEnhance.Contrast(img) | |
enhanced = enhancer.enhance(1.2) # Less aggressive than document enhancement | |
# Skip additional processing for medium-sized images | |
if img_size < 1000000: # Skip for images under ~1 megapixel | |
return enhanced | |
# Enhanced path: Additional processing for larger images | |
try: | |
# Apply optimized enhancement pipeline for large non-document images | |
# 1. Improve color saturation slightly for better feature extraction | |
saturation = ImageEnhance.Color(enhanced) | |
enhanced = saturation.enhance(1.1) | |
# 2. Apply adaptive sharpening based on image size | |
if img_size > 2500000: # Very large images (~1600x1600 or larger) | |
# Use EDGE_ENHANCE instead of SHARPEN for more subtle enhancement on large images | |
enhanced = enhanced.filter(ImageFilter.EDGE_ENHANCE) | |
else: | |
# Standard sharpening for regular large images | |
enhanced = enhanced.filter(ImageFilter.SHARPEN) | |
# 3. Apply additional processing with OpenCV if available (for largest images) | |
if CV2_AVAILABLE and img_size > 3000000: | |
# Convert to numpy array | |
img_np = np.array(enhanced) | |
# Apply subtle enhancement of details (CLAHE) | |
try: | |
# Convert to LAB color space for better processing | |
lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB) | |
# Only enhance the L channel (luminance) | |
l, a, b = cv2.split(lab) | |
# Create CLAHE object with optimal parameters for photos | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
# Apply CLAHE to L channel | |
l = clahe.apply(l) | |
# Merge channels back and convert to RGB | |
lab = cv2.merge((l, a, b)) | |
enhanced_np = cv2.cvtColor(lab, cv2.COLOR_LAB2RGB) | |
# Convert back to PIL | |
enhanced = Image.fromarray(enhanced_np) | |
except: | |
# If CLAHE fails, continue with PIL-enhanced image | |
pass | |
except Exception: | |
# If any enhancement fails, fall back to basic contrast enhancement | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
enhancer = ImageEnhance.Contrast(img) | |
enhanced = enhancer.enhance(1.2) | |
return enhanced | |
# Removed caching decorator to fix unhashable type error | |
def resize_image(img: Image.Image, target_dpi: int = 300) -> Image.Image: | |
""" | |
Resize an image to an optimal size for OCR while preserving quality. | |
Args: | |
img: PIL Image object | |
target_dpi: Target DPI (dots per inch) | |
Returns: | |
Resized PIL Image | |
""" | |
# Store the image for implementation function | |
resize_image._current_img = img | |
return resize_image_impl(target_dpi) | |
def resize_image_impl(target_dpi: int = 300) -> Image.Image: | |
""" | |
Implementation of resize function that uses thread-local storage. | |
Args: | |
target_dpi: Target DPI (dots per inch) | |
Returns: | |
Resized PIL Image | |
""" | |
# Get the image from thread-local storage (set by the caller) | |
if not hasattr(resize_image, "_current_img"): | |
raise ValueError("No image set for resizing") | |
img = resize_image._current_img | |
# Calculate current dimensions | |
width, height = img.size | |
# Fixed target dimensions based on DPI | |
# Using 8.5x11 inches (standard paper size) as reference | |
max_width = int(8.5 * target_dpi) | |
max_height = int(11 * target_dpi) | |
# Check if resizing is needed - quick early return | |
if width <= max_width and height <= max_height: | |
return img # No resizing needed | |
# Calculate scaling factor once | |
scale_factor = min(max_width / width, max_height / height) | |
# Calculate new dimensions | |
new_width = int(width * scale_factor) | |
new_height = int(height * scale_factor) | |
# Use BICUBIC for better balance of speed and quality | |
return img.resize((new_width, new_height), Image.BICUBIC) | |
def calculate_image_entropy(img: Image.Image) -> float: | |
""" | |
Calculate the entropy (information content) of an image. | |
Args: | |
img: PIL Image object | |
Returns: | |
Entropy value | |
""" | |
# Convert to grayscale | |
if img.mode != 'L': | |
img = img.convert('L') | |
# Calculate histogram | |
histogram = img.histogram() | |
total_pixels = img.width * img.height | |
# Calculate entropy | |
entropy = 0 | |
for h in histogram: | |
if h > 0: | |
probability = h / total_pixels | |
entropy -= probability * np.log2(probability) | |
return entropy | |
def create_html_with_images(result): | |
""" | |
Create an HTML document with embedded images from OCR results. | |
Args: | |
result: OCR result dictionary containing pages_data | |
Returns: | |
HTML content as string | |
""" | |
# Create HTML document structure | |
html_content = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>OCR Document with Images</title> | |
<style> | |
body { | |
font-family: Georgia, serif; | |
line-height: 1.7; | |
margin: 0 auto; | |
max-width: 800px; | |
padding: 20px; | |
} | |
img { | |
max-width: 90%; | |
max-height: 500px; | |
object-fit: contain; | |
margin: 20px auto; | |
display: block; | |
border: 1px solid #ddd; | |
border-radius: 4px; | |
} | |
.image-container { | |
margin: 20px 0; | |
text-align: center; | |
} | |
.page-break { | |
border-top: 1px solid #ddd; | |
margin: 40px 0; | |
padding-top: 40px; | |
} | |
h3 { | |
color: #333; | |
border-bottom: 1px solid #eee; | |
padding-bottom: 10px; | |
} | |
p { | |
margin: 12px 0; | |
} | |
.page-text-content { | |
margin-bottom: 20px; | |
} | |
.text-block { | |
background-color: #f9f9f9; | |
padding: 15px; | |
border-radius: 4px; | |
border-left: 3px solid #546e7a; | |
margin-bottom: 15px; | |
color: #333; | |
} | |
.text-block p { | |
margin: 8px 0; | |
color: #333; | |
} | |
.metadata { | |
background-color: #f5f5f5; | |
padding: 10px 15px; | |
border-radius: 4px; | |
margin-bottom: 20px; | |
font-size: 14px; | |
} | |
.metadata p { | |
margin: 5px 0; | |
} | |
</style> | |
</head> | |
<body> | |
""" | |
# Add document metadata | |
html_content += f""" | |
<div class="metadata"> | |
<h2>{result.get('file_name', 'Document')}</h2> | |
<p><strong>Processed at:</strong> {result.get('timestamp', '')}</p> | |
<p><strong>Languages:</strong> {', '.join(result.get('languages', ['Unknown']))}</p> | |
<p><strong>Topics:</strong> {', '.join(result.get('topics', ['Unknown']))}</p> | |
</div> | |
""" | |
# Check if we have pages_data | |
if 'pages_data' in result and result['pages_data']: | |
pages_data = result['pages_data'] | |
# Process each page | |
for i, page in enumerate(pages_data): | |
page_markdown = page.get('markdown', '') | |
images = page.get('images', []) | |
# Add page header if multi-page | |
if len(pages_data) > 1: | |
html_content += f"<h3>Page {i+1}</h3>" | |
# Create image dictionary | |
image_dict = {} | |
for img in images: | |
if 'id' in img and 'image_base64' in img: | |
image_dict[img['id']] = img['image_base64'] | |
# Process the markdown content | |
if page_markdown: | |
# Extract text content (lines without images) | |
text_content = [] | |
image_lines = [] | |
for line in page_markdown.split('\n'): | |
if ' | |
elif line.strip(): | |
text_content.append(line) | |
# Add text content | |
if text_content: | |
html_content += '<div class="text-block">' | |
for line in text_content: | |
html_content += f"<p>{line}</p>" | |
html_content += '</div>' | |
# Add images | |
for line in image_lines: | |
# Extract image ID and alt text using simple parsing | |
try: | |
alt_start = line.find('![') + 2 | |
alt_end = line.find(']', alt_start) | |
alt_text = line[alt_start:alt_end] | |
img_start = line.find('(', alt_end) + 1 | |
img_end = line.find(')', img_start) | |
img_id = line[img_start:img_end] | |
if img_id in image_dict: | |
html_content += f'<div class="image-container">' | |
html_content += f'<img src="{image_dict[img_id]}" alt="{alt_text}">' | |
html_content += f'</div>' | |
except: | |
# If parsing fails, just skip this image | |
continue | |
# Add page separator if not the last page | |
if i < len(pages_data) - 1: | |
html_content += '<div class="page-break"></div>' | |
# Add structured content if available | |
if 'ocr_contents' in result and isinstance(result['ocr_contents'], dict): | |
html_content += '<h3>Structured Content</h3>' | |
for section, content in result['ocr_contents'].items(): | |
if content and section not in ['error', 'raw_text', 'partial_text']: | |
html_content += f'<h4>{section.replace("_", " ").title()}</h4>' | |
if isinstance(content, str): | |
html_content += f'<p>{content}</p>' | |
elif isinstance(content, list): | |
html_content += '<ul>' | |
for item in content: | |
html_content += f'<li>{str(item)}</li>' | |
html_content += '</ul>' | |
elif isinstance(content, dict): | |
html_content += '<dl>' | |
for k, v in content.items(): | |
html_content += f'<dt>{k}</dt><dd>{v}</dd>' | |
html_content += '</dl>' | |
# Close HTML document | |
html_content += """ | |
</body> | |
</html> | |
""" | |
return html_content | |
def generate_document_thumbnail(image_path: Union[str, Path], max_size: int = 300) -> str: | |
""" | |
Generate a thumbnail for document preview. | |
Args: | |
image_path: Path to the image file | |
max_size: Maximum dimension for thumbnail | |
Returns: | |
Base64 encoded thumbnail | |
""" | |
if not PILLOW_AVAILABLE: | |
return None | |
try: | |
# Open the image | |
with Image.open(image_path) as img: | |
# Calculate thumbnail size preserving aspect ratio | |
width, height = img.size | |
if width > height: | |
new_width = max_size | |
new_height = int(height * (max_size / width)) | |
else: | |
new_height = max_size | |
new_width = int(width * (max_size / height)) | |
# Create thumbnail | |
thumbnail = img.resize((new_width, new_height), Image.LANCZOS) | |
# Save to buffer | |
buffer = io.BytesIO() | |
thumbnail.save(buffer, format="JPEG", quality=85) | |
buffer.seek(0) | |
# Encode as base64 | |
encoded = base64.b64encode(buffer.getvalue()).decode() | |
return f"data:image/jpeg;base64,{encoded}" | |
except Exception: | |
# Return None if thumbnail generation fails | |
return None | |
def try_local_ocr_fallback(image_path: Union[str, Path], base64_data_url: str = None) -> str: | |
""" | |
Attempt to use local pytesseract OCR as a fallback when API fails | |
Args: | |
image_path: Path to the image file | |
base64_data_url: Optional base64 data URL if already available | |
Returns: | |
OCR text string if successful, None if failed | |
""" | |
logger.info("Attempting local OCR fallback using pytesseract...") | |
try: | |
import pytesseract | |
from PIL import Image | |
# Load image - either from path or from base64 | |
if base64_data_url and base64_data_url.startswith('data:image'): | |
# Extract image from base64 | |
image_data = base64_data_url.split(',', 1)[1] | |
image_bytes = base64.b64decode(image_data) | |
image = Image.open(io.BytesIO(image_bytes)) | |
else: | |
# Load from file path | |
image_path = Path(image_path) if isinstance(image_path, str) else image_path | |
image = Image.open(image_path) | |
# Convert to RGB if not already (pytesseract works best with RGB) | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Apply image enhancements for better OCR | |
# Convert to grayscale for better text recognition | |
image = image.convert('L') | |
# Enhance contrast | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(2.0) # Higher contrast for better OCR | |
# Run OCR | |
ocr_text = pytesseract.image_to_string(image, lang='eng') | |
if ocr_text and len(ocr_text.strip()) > 50: | |
logger.info(f"Local OCR successful: extracted {len(ocr_text)} characters") | |
return ocr_text | |
else: | |
logger.warning("Local OCR produced minimal or no text") | |
return None | |
except ImportError: | |
logger.warning("Pytesseract not installed - local OCR not available") | |
return None | |
except Exception as e: | |
logger.error(f"Local OCR fallback failed: {str(e)}") | |
return None |