Spaces:
Sleeping
Sleeping
File size: 9,309 Bytes
459923e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
import io
import os
import json
import tempfile
from google.cloud import vision
from google.oauth2 import service_account
from PIL import Image
import base64
import re
import logging
from pdf2image import convert_from_path
from OCRAccuracyAnalyzer import OCRAccuracyAnalyzer
import cv2
import numpy as np
import shutil
from typing import Tuple, Dict, Any
from datetime import datetime
import platform
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Importing OCR.py...")
# Handle Google Cloud credentials - support both environment variables and file
def get_google_credentials():
"""Get Google Cloud credentials from environment variable or file."""
# First, try to get credentials from environment variable (for Heroku)
credentials_json = os.environ.get('GOOGLE_CLOUD_CREDENTIALS')
if credentials_json:
try:
import json
credentials_info = json.loads(credentials_json)
return service_account.Credentials.from_service_account_info(credentials_info)
except Exception as e:
logger.warning(f"Failed to parse credentials from environment: {e}")
# Fall back to file-based credentials (for local development)
credentials_path = os.path.join(os.path.dirname(__file__), "css-edge-e347b0ed2b9e.json")
if os.path.exists(credentials_path):
return service_account.Credentials.from_service_account_file(credentials_path)
# If neither is available, raise an error
raise FileNotFoundError(
"Google Cloud credentials not found. "
"Please set GOOGLE_CLOUD_CREDENTIALS environment variable or "
f"place credentials file at: {credentials_path}"
)
class OCR:
def __init__(self):
logger.info("Initializing OCR...")
try:
# Get credentials using the helper function
credentials = get_google_credentials()
self.client = vision.ImageAnnotatorClient(credentials=credentials)
self.accuracy_analyzer = OCRAccuracyAnalyzer()
logger.info("Successfully initialized Google Cloud Vision client")
except Exception as e:
logger.error(f"Failed to initialize Google Cloud Vision client: {str(e)}")
raise
def preprocess_image(self, image_path):
logger.info(f"Preprocessing image: {image_path}")
try:
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if img is None:
return image_path # fallback if image can't be read
# Resize image if too large (optimize for performance)
height, width = img.shape
if width > 2000 or height > 2000:
scale = min(2000/width, 2000/height)
new_width = int(width * scale)
new_height = int(height * scale)
img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA)
# Apply OTSU binarization for better OCR
_, img = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# Apply slight blur to reduce noise
img = cv2.GaussianBlur(img, (1, 1), 0)
processed_path = f"preprocessed_{os.path.basename(image_path)}"
cv2.imwrite(processed_path, img, [cv2.IMWRITE_PNG_COMPRESSION, 9])
return processed_path
except Exception as e:
logger.warning(f"Image preprocessing failed: {e}")
return image_path
def process_image_with_vision(self, image_path):
"""Process an image file using Google Cloud Vision API with optimized settings."""
try:
# Preprocess image for better OCR
processed_path = self.preprocess_image(image_path)
with open(processed_path, 'rb') as image_file:
content = image_file.read()
image = vision.Image(content=content)
# Use document text detection for better accuracy
response = self.client.document_text_detection(image=image)
if response.error.message:
raise Exception(f"Error during Vision API call: {response.error.message}")
# Calculate accuracy metrics
accuracy_metrics = self.accuracy_analyzer.analyze_ocr_quality(
response.full_text_annotation,
response.full_text_annotation.text
)
# Clean up processed image
if processed_path != image_path and os.path.exists(processed_path):
os.remove(processed_path)
# Debug: print/log the full extracted text
logger.info(f"Extracted text (first 500 chars): {response.full_text_annotation.text[:500]}")
# Return both the text content and accuracy metrics
return response.full_text_annotation.text, accuracy_metrics
except Exception as e:
logger.error(f"Error processing image: {str(e)}")
return "", {"overall_accuracy": 0.0}
def process_pdf_file_with_vision(self, pdf_path):
"""Process a PDF file by converting pages to images and using Google Cloud Vision API with optimized settings."""
try:
# Use system-installed Poppler (much faster and smaller)
# Convert PDF to images with optimized settings
images = convert_from_path(
pdf_path,
dpi=200, # Reduced from 300 for better performance
thread_count=1, # Reduced for container environments
grayscale=True, # Smaller file size
size=(1654, 2340) # A4 size at 200 DPI
)
all_text = ""
all_accuracy_metrics = []
for i, image in enumerate(images):
# Save page as temporary image with compression
temp_path = f"temp_page_{i}.png"
image.save(temp_path, 'PNG', optimize=True, quality=85)
logger.info(f"Processing page {i + 1} of PDF...")
page_text, page_metrics = self.process_image_with_vision(temp_path)
all_text += f"\n--- Page {i + 1} ---\n" + page_text
all_accuracy_metrics.append(page_metrics.get("overall_accuracy", 0.0))
# Clean up temporary file
if os.path.exists(temp_path):
os.remove(temp_path)
# Average accuracy across all pages
avg_accuracy = sum(all_accuracy_metrics) / len(all_accuracy_metrics) if all_accuracy_metrics else 0.0
return all_text, {"overall_accuracy": avg_accuracy}
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
return "", {"overall_accuracy": 0.0}
def process_file(self, file_path):
"""Process either PDF or image file."""
if file_path.lower().endswith('.pdf'):
return self.process_pdf_file_with_vision(file_path)
else:
return self.process_image_with_vision(file_path)
def save_text_to_file(self, text, output_path):
"""Save the text to a .txt file."""
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(text)
logger.info(f"Saved recognized text to {output_path}")
except Exception as e:
logger.error(f"Error saving text to file: {e}")
def run_ocr_pipeline(self):
"""
Run the OCR pipeline with file and directory selection.
"""
logger.info("DEBUG: Entered run_ocr_pipeline")
logger.info("Select files for OCR processing...")
# This would need to be implemented based on your UI framework
# For now, return a placeholder
return {"status": "OCR pipeline not implemented for headless mode"}
def run_ocr(self, uploaded_file, output_directory):
"""
Run the OCR process for an uploaded file.
:param uploaded_file: The file to process (Streamlit or local file path).
:param output_directory: Directory to save the processed result.
:return: Path to the saved text file.
"""
try:
# Ensure output directory exists
os.makedirs(output_directory, exist_ok=True)
# Process the file
if uploaded_file.lower().endswith('.pdf'):
extracted_text, accuracy_metrics = self.process_pdf_file_with_vision(uploaded_file)
else:
extracted_text, accuracy_metrics = self.process_image_with_vision(uploaded_file)
# Generate output filename
base_name = os.path.splitext(os.path.basename(uploaded_file))[0]
output_path = os.path.join(output_directory, f"{base_name}_ocr_result.txt")
# Save the extracted text
self.save_text_to_file(extracted_text, output_path)
return output_path
except Exception as e:
logger.error(f"Error in run_ocr: {e}")
raise |