Spaces:
Running
Running
import os | |
import io | |
import cv2 | |
import numpy as np | |
import tempfile | |
import time | |
import math | |
import json | |
from PIL import Image, ImageEnhance, ImageFilter | |
from pdf2image import convert_from_bytes | |
import streamlit as st | |
import logging | |
import concurrent.futures | |
from pathlib import Path | |
# Configure logging | |
logger = logging.getLogger("preprocessing") | |
logger.setLevel(logging.INFO) | |
# Ensure logs directory exists | |
def ensure_log_directory(config): | |
"""Create logs directory if it doesn't exist""" | |
if config.get("logging", {}).get("enabled", False): | |
log_path = config.get("logging", {}).get("output_path", "logs/preprocessing_metrics.json") | |
log_dir = os.path.dirname(log_path) | |
if log_dir: | |
Path(log_dir).mkdir(parents=True, exist_ok=True) | |
def log_preprocessing_metrics(metrics, config): | |
"""Log preprocessing metrics to JSON file""" | |
if not config.get("enabled", False): | |
return | |
log_path = config.get("output_path", "logs/preprocessing_metrics.json") | |
ensure_log_directory({"logging": {"enabled": True, "output_path": log_path}}) | |
# Add timestamp | |
metrics["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") | |
# Append to log file | |
try: | |
existing_data = [] | |
if os.path.exists(log_path): | |
with open(log_path, 'r') as f: | |
existing_data = json.load(f) | |
if not isinstance(existing_data, list): | |
existing_data = [existing_data] | |
existing_data.append(metrics) | |
with open(log_path, 'w') as f: | |
json.dump(existing_data, f, indent=2) | |
logger.info(f"Logged preprocessing metrics to {log_path}") | |
except Exception as e: | |
logger.error(f"Error logging preprocessing metrics: {str(e)}") | |
def get_document_config(document_type, global_config): | |
""" | |
Get document-specific preprocessing configuration by merging with global settings. | |
Args: | |
document_type: The type of document (e.g., 'standard', 'newspaper', 'handwritten') | |
global_config: The global preprocessing configuration | |
Returns: | |
A merged configuration dictionary with document-specific overrides | |
""" | |
# Start with a copy of the global config | |
config = { | |
"deskew": global_config.get("deskew", {}), | |
"thresholding": global_config.get("thresholding", {}), | |
"morphology": global_config.get("morphology", {}), | |
"performance": global_config.get("performance", {}), | |
"logging": global_config.get("logging", {}) | |
} | |
# Apply document-specific overrides if they exist | |
doc_types = global_config.get("document_types", {}) | |
if document_type in doc_types: | |
doc_config = doc_types[document_type] | |
# Merge document-specific settings into the config | |
for section in doc_config: | |
if section in config: | |
config[section].update(doc_config[section]) | |
return config | |
def deskew_image(img_array, config): | |
""" | |
Detect and correct skew in document images. | |
Uses a combination of methods (minAreaRect and/or Hough transform) | |
to estimate the skew angle more robustly. | |
Args: | |
img_array: Input image as numpy array | |
config: Deskew configuration dict | |
Returns: | |
Deskewed image as numpy array, estimated angle, success flag | |
""" | |
if not config.get("enabled", False): | |
return img_array, 0.0, True | |
# Convert to grayscale if needed | |
gray = img_array if len(img_array.shape) == 2 else cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
# Start with a threshold to get binary image for angle detection | |
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
angles = [] | |
angle_threshold = config.get("angle_threshold", 0.1) | |
max_angle = config.get("max_angle", 45.0) | |
# Method 1: minAreaRect approach | |
try: | |
# Find all contours | |
contours, _ = cv2.findContours(binary, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) | |
# Filter contours by area to avoid noise | |
min_area = binary.shape[0] * binary.shape[1] * 0.0001 # 0.01% of image area | |
filtered_contours = [cnt for cnt in contours if cv2.contourArea(cnt) > min_area] | |
# Get angles from rotated rectangles around contours | |
for contour in filtered_contours: | |
rect = cv2.minAreaRect(contour) | |
width, height = rect[1] | |
# Calculate the angle based on the longer side | |
# (This is important for getting the orientation right) | |
angle = rect[2] | |
if width < height: | |
angle += 90 | |
# Normalize angle to -45 to 45 range | |
if angle > 45: | |
angle -= 90 | |
if angle < -45: | |
angle += 90 | |
# Clamp angle to max limit | |
angle = max(min(angle, max_angle), -max_angle) | |
angles.append(angle) | |
except Exception as e: | |
logger.error(f"Error in minAreaRect skew detection: {str(e)}") | |
# Method 2: Hough Transform approach (if enabled) | |
if config.get("use_hough", True): | |
try: | |
# Apply Canny edge detection | |
edges = cv2.Canny(gray, 50, 150, apertureSize=3) | |
# Apply Hough lines | |
lines = cv2.HoughLinesP(edges, 1, np.pi/180, | |
threshold=100, minLineLength=100, maxLineGap=10) | |
if lines is not None: | |
for line in lines: | |
x1, y1, x2, y2 = line[0] | |
if x2 - x1 != 0: # Avoid division by zero | |
# Calculate line angle in degrees | |
angle = math.atan2(y2 - y1, x2 - x1) * 180.0 / np.pi | |
# Normalize angle to -45 to 45 range | |
if angle > 45: | |
angle -= 90 | |
if angle < -45: | |
angle += 90 | |
# Clamp angle to max limit | |
angle = max(min(angle, max_angle), -max_angle) | |
angles.append(angle) | |
except Exception as e: | |
logger.error(f"Error in Hough transform skew detection: {str(e)}") | |
# If no angles were detected, return original image | |
if not angles: | |
logger.warning("No skew angles detected, using original image") | |
return img_array, 0.0, False | |
# Combine angles using the specified consensus method | |
consensus_method = config.get("consensus_method", "average") | |
if consensus_method == "average": | |
final_angle = sum(angles) / len(angles) | |
elif consensus_method == "median": | |
final_angle = sorted(angles)[len(angles) // 2] | |
elif consensus_method == "min": | |
final_angle = min(angles, key=abs) | |
elif consensus_method == "max": | |
final_angle = max(angles, key=abs) | |
else: | |
final_angle = sum(angles) / len(angles) # Default to average | |
# If angle is below threshold, don't rotate | |
if abs(final_angle) < angle_threshold: | |
logger.info(f"Detected angle ({final_angle:.2f}°) is below threshold, skipping deskew") | |
return img_array, final_angle, True | |
# Log the detected angle | |
logger.info(f"Deskewing image with angle: {final_angle:.2f}°") | |
# Get image dimensions | |
h, w = img_array.shape[:2] | |
center = (w // 2, h // 2) | |
# Get rotation matrix | |
rotation_matrix = cv2.getRotationMatrix2D(center, final_angle, 1.0) | |
# Calculate new image dimensions | |
abs_cos = abs(rotation_matrix[0, 0]) | |
abs_sin = abs(rotation_matrix[0, 1]) | |
new_w = int(h * abs_sin + w * abs_cos) | |
new_h = int(h * abs_cos + w * abs_sin) | |
# Adjust the rotation matrix to account for new dimensions | |
rotation_matrix[0, 2] += (new_w / 2) - center[0] | |
rotation_matrix[1, 2] += (new_h / 2) - center[1] | |
# Perform the rotation | |
try: | |
# Determine the number of channels to create the correct output array | |
if len(img_array.shape) == 3: | |
rotated = cv2.warpAffine(img_array, rotation_matrix, (new_w, new_h), | |
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, | |
borderValue=(255, 255, 255)) | |
else: | |
rotated = cv2.warpAffine(img_array, rotation_matrix, (new_w, new_h), | |
flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, | |
borderValue=255) | |
return rotated, final_angle, True | |
except Exception as e: | |
logger.error(f"Error rotating image: {str(e)}") | |
if config.get("fallback", {}).get("enabled", True): | |
logger.info("Using original image as fallback after rotation failure") | |
return img_array, final_angle, False | |
return img_array, final_angle, False | |
def preblur(img_array, config): | |
""" | |
Apply pre-filtering blur to stabilize thresholding results. | |
Args: | |
img_array: Input image as numpy array | |
config: Pre-blur configuration dict | |
Returns: | |
Blurred image as numpy array | |
""" | |
if not config.get("enabled", False): | |
return img_array | |
method = config.get("method", "gaussian") | |
kernel_size = config.get("kernel_size", 3) | |
# Ensure kernel size is odd | |
if kernel_size % 2 == 0: | |
kernel_size += 1 | |
try: | |
if method == "gaussian": | |
return cv2.GaussianBlur(img_array, (kernel_size, kernel_size), 0) | |
elif method == "median": | |
return cv2.medianBlur(img_array, kernel_size) | |
else: | |
logger.warning(f"Unknown blur method: {method}, using gaussian") | |
return cv2.GaussianBlur(img_array, (kernel_size, kernel_size), 0) | |
except Exception as e: | |
logger.error(f"Error applying {method} blur: {str(e)}") | |
return img_array | |
def apply_threshold(img_array, config): | |
""" | |
Apply thresholding to create binary image. | |
Supports Otsu's method and adaptive thresholding. | |
Includes pre-filtering and fallback mechanisms. | |
Args: | |
img_array: Input image as numpy array | |
config: Thresholding configuration dict | |
Returns: | |
Binary image as numpy array, success flag | |
""" | |
method = config.get("method", "adaptive") | |
if method == "none": | |
return img_array, True | |
# Convert to grayscale if needed | |
gray = img_array if len(img_array.shape) == 2 else cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
# Apply pre-blur if configured | |
preblur_config = config.get("preblur", {}) | |
if preblur_config.get("enabled", False): | |
gray = preblur(gray, preblur_config) | |
binary = None | |
try: | |
if method == "otsu": | |
# Apply Otsu's thresholding | |
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
elif method == "adaptive": | |
# Apply adaptive thresholding | |
block_size = config.get("adaptive_block_size", 11) | |
constant = config.get("adaptive_constant", 2) | |
# Ensure block size is odd | |
if block_size % 2 == 0: | |
block_size += 1 | |
binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, block_size, constant) | |
else: | |
logger.warning(f"Unknown thresholding method: {method}, using adaptive") | |
block_size = config.get("adaptive_block_size", 11) | |
constant = config.get("adaptive_constant", 2) | |
# Ensure block size is odd | |
if block_size % 2 == 0: | |
block_size += 1 | |
binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
cv2.THRESH_BINARY, block_size, constant) | |
except Exception as e: | |
logger.error(f"Error applying {method} thresholding: {str(e)}") | |
if config.get("fallback", {}).get("enabled", True): | |
logger.info("Using original grayscale image as fallback after thresholding failure") | |
return gray, False | |
return gray, False | |
# Calculate percentage of non-zero pixels for logging | |
nonzero_pct = np.count_nonzero(binary) / binary.size * 100 | |
logger.info(f"Binary image has {nonzero_pct:.2f}% non-zero pixels") | |
# Check if thresholding was successful (crude check) | |
if nonzero_pct < 1 or nonzero_pct > 99: | |
logger.warning(f"Thresholding produced extreme result ({nonzero_pct:.2f}% non-zero)") | |
if config.get("fallback", {}).get("enabled", True): | |
logger.info("Using original grayscale image as fallback after poor thresholding") | |
return gray, False | |
return binary, True | |
def apply_morphology(binary_img, config): | |
""" | |
Apply morphological operations to clean up binary image. | |
Supports opening, closing, or both operations. | |
Args: | |
binary_img: Binary image as numpy array | |
config: Morphology configuration dict | |
Returns: | |
Processed binary image as numpy array | |
""" | |
if not config.get("enabled", False): | |
return binary_img | |
operation = config.get("operation", "close") | |
kernel_size = config.get("kernel_size", 1) | |
kernel_shape = config.get("kernel_shape", "rect") | |
# Create appropriate kernel | |
if kernel_shape == "rect": | |
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size*2+1, kernel_size*2+1)) | |
elif kernel_shape == "ellipse": | |
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size*2+1, kernel_size*2+1)) | |
elif kernel_shape == "cross": | |
kernel = cv2.getStructuringElement(cv2.MORPH_CROSS, (kernel_size*2+1, kernel_size*2+1)) | |
else: | |
logger.warning(f"Unknown kernel shape: {kernel_shape}, using rect") | |
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size*2+1, kernel_size*2+1)) | |
result = binary_img | |
try: | |
if operation == "open": | |
# Opening: Erosion followed by dilation - removes small noise | |
result = cv2.morphologyEx(binary_img, cv2.MORPH_OPEN, kernel) | |
elif operation == "close": | |
# Closing: Dilation followed by erosion - fills small holes | |
result = cv2.morphologyEx(binary_img, cv2.MORPH_CLOSE, kernel) | |
elif operation == "both": | |
# Both operations in sequence | |
result = cv2.morphologyEx(binary_img, cv2.MORPH_OPEN, kernel) | |
result = cv2.morphologyEx(result, cv2.MORPH_CLOSE, kernel) | |
else: | |
logger.warning(f"Unknown morphological operation: {operation}, using close") | |
result = cv2.morphologyEx(binary_img, cv2.MORPH_CLOSE, kernel) | |
except Exception as e: | |
logger.error(f"Error applying morphological operation: {str(e)}") | |
return binary_img | |
return result | |
# Cache for 24 hours | |
def convert_pdf_to_images(pdf_bytes, dpi=150, rotation=0): | |
"""Convert PDF bytes to a list of images with caching""" | |
try: | |
images = convert_from_bytes(pdf_bytes, dpi=dpi) | |
# Apply rotation if specified | |
if rotation != 0 and images: | |
rotated_images = [] | |
for img in images: | |
rotated_img = img.rotate(rotation, expand=True, resample=Image.BICUBIC) | |
rotated_images.append(rotated_img) | |
return rotated_images | |
return images | |
except Exception as e: | |
st.error(f"Error converting PDF: {str(e)}") | |
logger.error(f"PDF conversion error: {str(e)}") | |
return [] | |
def preprocess_image(image_bytes, preprocessing_options): | |
""" | |
Conservative preprocessing function for handwritten documents with early exit for clean scans. | |
Implements light processing: grayscale → denoise (gently) → contrast (conservative) | |
Args: | |
image_bytes: Image content as bytes | |
preprocessing_options: Dictionary with document_type, grayscale, denoise, contrast options | |
Returns: | |
Processed image bytes or original image bytes if no processing needed | |
""" | |
# Setup basic console logging | |
logger = logging.getLogger("image_preprocessor") | |
logger.setLevel(logging.INFO) | |
# Log which preprocessing options are being applied | |
logger.info(f"Document type: {preprocessing_options.get('document_type', 'standard')}") | |
# Check if any preprocessing is actually requested | |
has_preprocessing = ( | |
preprocessing_options.get("grayscale", False) or | |
preprocessing_options.get("denoise", False) or | |
preprocessing_options.get("contrast", 0) != 0 | |
) | |
# Convert bytes to PIL Image | |
image = Image.open(io.BytesIO(image_bytes)) | |
# Check for minimal skew and exit early if document is already straight | |
# This avoids unnecessary processing for clean scans | |
try: | |
from utils.image_utils import detect_skew | |
skew_angle = detect_skew(image) | |
if abs(skew_angle) < 0.5: | |
logger.info(f"Document has minimal skew ({skew_angle:.2f}°), skipping preprocessing") | |
# Return original image bytes as is for perfectly straight documents | |
if not has_preprocessing: | |
return image_bytes | |
except Exception as e: | |
logger.warning(f"Error in skew detection: {str(e)}, continuing with preprocessing") | |
# If no preprocessing options are selected, return the original image | |
if not has_preprocessing: | |
logger.info("No preprocessing options selected, skipping preprocessing") | |
return image_bytes | |
# Initialize metrics for logging | |
metrics = { | |
"file": preprocessing_options.get("filename", "unknown"), | |
"document_type": preprocessing_options.get("document_type", "standard"), | |
"preprocessing_applied": [] | |
} | |
start_time = time.time() | |
# Handle RGBA images (transparency) by converting to RGB | |
if image.mode == 'RGBA': | |
# Convert RGBA to RGB by compositing onto white background | |
logger.info("Converting RGBA image to RGB") | |
background = Image.new('RGB', image.size, (255, 255, 255)) | |
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel | |
image = background | |
metrics["preprocessing_applied"].append("alpha_conversion") | |
elif image.mode not in ('RGB', 'L'): | |
# Convert other modes to RGB | |
logger.info(f"Converting {image.mode} image to RGB") | |
image = image.convert('RGB') | |
metrics["preprocessing_applied"].append("format_conversion") | |
# Convert to NumPy array for OpenCV processing | |
img_array = np.array(image) | |
# Apply grayscale if requested (useful for handwritten text) | |
if preprocessing_options.get("grayscale", False): | |
if len(img_array.shape) == 3: # Only convert if it's not already grayscale | |
# For handwritten documents, apply gentle CLAHE to enhance contrast locally | |
if preprocessing_options.get("document_type") == "handwritten": | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
clahe = cv2.createCLAHE(clipLimit=1.5, tileGridSize=(8,8)) # Conservative clip limit | |
img_array = clahe.apply(img_array) | |
else: | |
# Standard grayscale for printed documents | |
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
metrics["preprocessing_applied"].append("grayscale") | |
# Apply light denoising if requested | |
if preprocessing_options.get("denoise", False): | |
try: | |
# Apply very gentle denoising | |
is_color = len(img_array.shape) == 3 and img_array.shape[2] == 3 | |
if is_color: | |
# Very light color denoising with conservative parameters | |
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 2, 2, 3, 7) | |
else: | |
# Very light grayscale denoising | |
img_array = cv2.fastNlMeansDenoising(img_array, None, 2, 3, 7) | |
metrics["preprocessing_applied"].append("light_denoise") | |
except Exception as e: | |
logger.error(f"Denoising error: {str(e)}") | |
# Apply contrast adjustment if requested (conservative range) | |
contrast_value = preprocessing_options.get("contrast", 0) | |
if contrast_value != 0: | |
# Use a gentler contrast adjustment factor | |
contrast_factor = 1 + (contrast_value / 200) # Conservative scaling factor | |
# Convert NumPy array back to PIL Image for contrast adjustment | |
if len(img_array.shape) == 2: # If grayscale, convert to RGB for PIL | |
image = Image.fromarray(cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)) | |
else: | |
image = Image.fromarray(img_array) | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(contrast_factor) | |
# Convert back to NumPy array | |
img_array = np.array(image) | |
metrics["preprocessing_applied"].append(f"contrast_{contrast_value}") | |
# Convert back to PIL Image | |
if len(img_array.shape) == 2: # If grayscale, convert to RGB for saving | |
processed_image = Image.fromarray(cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)) | |
else: | |
processed_image = Image.fromarray(img_array) | |
# Record total processing time | |
metrics["processing_time"] = (time.time() - start_time) * 1000 # ms | |
# Higher quality for OCR processing | |
byte_io = io.BytesIO() | |
try: | |
# Make sure the image is in RGB mode before saving as JPEG | |
if processed_image.mode not in ('RGB', 'L'): | |
processed_image = processed_image.convert('RGB') | |
processed_image.save(byte_io, format='JPEG', quality=92, optimize=True) | |
byte_io.seek(0) | |
logger.info(f"Preprocessing complete. Original image mode: {image.mode}, processed mode: {processed_image.mode}") | |
logger.info(f"Original size: {len(image_bytes)/1024:.1f}KB, processed size: {len(byte_io.getvalue())/1024:.1f}KB") | |
logger.info(f"Applied preprocessing steps: {', '.join(metrics['preprocessing_applied'])}") | |
return byte_io.getvalue() | |
except Exception as e: | |
logger.error(f"Error saving processed image: {str(e)}") | |
# Fallback to original image | |
logger.info("Using original image as fallback") | |
return image_bytes | |
def create_temp_file(content, suffix, temp_file_paths): | |
"""Create a temporary file and track it for cleanup""" | |
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
tmp.write(content) | |
temp_path = tmp.name | |
# Track temporary file for cleanup | |
temp_file_paths.append(temp_path) | |
logger.info(f"Created temporary file: {temp_path}") | |
return temp_path | |
def apply_preprocessing_to_file(file_bytes, file_ext, preprocessing_options, temp_file_paths): | |
""" | |
Apply conservative preprocessing to file and return path to the temporary file. | |
Handles format conversion and user-selected preprocessing options. | |
Args: | |
file_bytes: File content as bytes | |
file_ext: File extension (e.g., '.jpg', '.pdf') | |
preprocessing_options: Dictionary with document_type and preprocessing options | |
temp_file_paths: List to track temporary files for cleanup | |
Returns: | |
Tuple of (temp_file_path, was_processed_flag) | |
""" | |
document_type = preprocessing_options.get("document_type", "standard") | |
# Check for user-selected preprocessing | |
has_preprocessing = ( | |
preprocessing_options.get("grayscale", False) or | |
preprocessing_options.get("denoise", False) or | |
preprocessing_options.get("contrast", 0) != 0 | |
) | |
# Check for RGBA/transparency that needs conversion | |
format_needs_conversion = False | |
# Only check formats that might have transparency | |
if file_ext.lower() in ['.png', '.tif', '.tiff']: | |
try: | |
# Check if image has transparency | |
image = Image.open(io.BytesIO(file_bytes)) | |
if image.mode == 'RGBA' or image.mode not in ('RGB', 'L'): | |
format_needs_conversion = True | |
except Exception as e: | |
logger.warning(f"Error checking image format: {str(e)}") | |
# Process if user requested preprocessing OR format needs conversion | |
needs_processing = has_preprocessing or format_needs_conversion | |
if needs_processing: | |
# Apply preprocessing | |
logger.info(f"Applying preprocessing with options: {preprocessing_options}") | |
logger.info(f"Using document type '{document_type}' with advanced preprocessing options") | |
# Add filename to preprocessing options for logging if available | |
if hasattr(file_bytes, 'name'): | |
preprocessing_options["filename"] = file_bytes.name | |
processed_bytes = preprocess_image(file_bytes, preprocessing_options) | |
# Save processed image to temp file | |
temp_path = create_temp_file(processed_bytes, file_ext, temp_file_paths) | |
return temp_path, True # Return path and flag indicating preprocessing was applied | |
else: | |
# No preprocessing needed, just save the original file | |
logger.info("No preprocessing applied - using original image") | |
temp_path = create_temp_file(file_bytes, file_ext, temp_file_paths) | |
return temp_path, False # Return path and flag indicating no preprocessing was applied | |