File size: 9,309 Bytes
459923e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import io
import os
import json
import tempfile
from google.cloud import vision
from google.oauth2 import service_account
from PIL import Image
import base64
import re
import logging
from pdf2image import convert_from_path
from OCRAccuracyAnalyzer import OCRAccuracyAnalyzer
import cv2
import numpy as np
import shutil
from typing import Tuple, Dict, Any
from datetime import datetime
import platform

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Importing OCR.py...")

# Handle Google Cloud credentials - support both environment variables and file
def get_google_credentials():
    """Get Google Cloud credentials from environment variable or file."""
    # First, try to get credentials from environment variable (for Heroku)
    credentials_json = os.environ.get('GOOGLE_CLOUD_CREDENTIALS')
    if credentials_json:
        try:
            import json
            credentials_info = json.loads(credentials_json)
            return service_account.Credentials.from_service_account_info(credentials_info)
        except Exception as e:
            logger.warning(f"Failed to parse credentials from environment: {e}")
    
    # Fall back to file-based credentials (for local development)
    credentials_path = os.path.join(os.path.dirname(__file__), "css-edge-e347b0ed2b9e.json")
    if os.path.exists(credentials_path):
        return service_account.Credentials.from_service_account_file(credentials_path)
    
    # If neither is available, raise an error
    raise FileNotFoundError(
        "Google Cloud credentials not found. "
        "Please set GOOGLE_CLOUD_CREDENTIALS environment variable or "
        f"place credentials file at: {credentials_path}"
    )

class OCR:
    def __init__(self):
        logger.info("Initializing OCR...")
        try:
            # Get credentials using the helper function
            credentials = get_google_credentials()
            self.client = vision.ImageAnnotatorClient(credentials=credentials)
            self.accuracy_analyzer = OCRAccuracyAnalyzer()
            logger.info("Successfully initialized Google Cloud Vision client")
        except Exception as e:
            logger.error(f"Failed to initialize Google Cloud Vision client: {str(e)}")
            raise

    def preprocess_image(self, image_path):
        logger.info(f"Preprocessing image: {image_path}")
        try:
            img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
            if img is None:
                return image_path  # fallback if image can't be read
            
            # Resize image if too large (optimize for performance)
            height, width = img.shape
            if width > 2000 or height > 2000:
                scale = min(2000/width, 2000/height)
                new_width = int(width * scale)
                new_height = int(height * scale)
                img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA)
            
            # Apply OTSU binarization for better OCR
            _, img = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
            
            # Apply slight blur to reduce noise
            img = cv2.GaussianBlur(img, (1, 1), 0)
            
            processed_path = f"preprocessed_{os.path.basename(image_path)}"
            cv2.imwrite(processed_path, img, [cv2.IMWRITE_PNG_COMPRESSION, 9])
            return processed_path
        except Exception as e:
            logger.warning(f"Image preprocessing failed: {e}")
            return image_path

    def process_image_with_vision(self, image_path):
        """Process an image file using Google Cloud Vision API with optimized settings."""
        try:
            # Preprocess image for better OCR
            processed_path = self.preprocess_image(image_path)
            
            with open(processed_path, 'rb') as image_file:
                content = image_file.read()
            
            image = vision.Image(content=content)
            
            # Use document text detection for better accuracy
            response = self.client.document_text_detection(image=image)
            
            if response.error.message:
                raise Exception(f"Error during Vision API call: {response.error.message}")
            
            # Calculate accuracy metrics
            accuracy_metrics = self.accuracy_analyzer.analyze_ocr_quality(
                response.full_text_annotation,
                response.full_text_annotation.text
            )
            
            # Clean up processed image
            if processed_path != image_path and os.path.exists(processed_path):
                os.remove(processed_path)
            
            # Debug: print/log the full extracted text
            logger.info(f"Extracted text (first 500 chars): {response.full_text_annotation.text[:500]}")
            
            # Return both the text content and accuracy metrics
            return response.full_text_annotation.text, accuracy_metrics
            
        except Exception as e:
            logger.error(f"Error processing image: {str(e)}")
            return "", {"overall_accuracy": 0.0}

    def process_pdf_file_with_vision(self, pdf_path):
        """Process a PDF file by converting pages to images and using Google Cloud Vision API with optimized settings."""
        try:
            # Use system-installed Poppler (much faster and smaller)
            # Convert PDF to images with optimized settings
            images = convert_from_path(
                pdf_path,
                dpi=200,  # Reduced from 300 for better performance
                thread_count=1,  # Reduced for container environments
                grayscale=True,  # Smaller file size
                size=(1654, 2340)  # A4 size at 200 DPI
            )
            
            all_text = ""
            all_accuracy_metrics = []

            for i, image in enumerate(images):
                # Save page as temporary image with compression
                temp_path = f"temp_page_{i}.png"
                image.save(temp_path, 'PNG', optimize=True, quality=85)
                
                logger.info(f"Processing page {i + 1} of PDF...")
                page_text, page_metrics = self.process_image_with_vision(temp_path)
                all_text += f"\n--- Page {i + 1} ---\n" + page_text
                all_accuracy_metrics.append(page_metrics.get("overall_accuracy", 0.0))
                
                # Clean up temporary file
                if os.path.exists(temp_path):
                    os.remove(temp_path)

            # Average accuracy across all pages
            avg_accuracy = sum(all_accuracy_metrics) / len(all_accuracy_metrics) if all_accuracy_metrics else 0.0
            return all_text, {"overall_accuracy": avg_accuracy}
            
        except Exception as e:
            logger.error(f"Error processing PDF: {str(e)}")
            return "", {"overall_accuracy": 0.0}

    def process_file(self, file_path):
        """Process either PDF or image file."""
        if file_path.lower().endswith('.pdf'):
            return self.process_pdf_file_with_vision(file_path)
        else:
            return self.process_image_with_vision(file_path)

    def save_text_to_file(self, text, output_path):
        """Save the text to a .txt file."""
        try:
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(text)
            logger.info(f"Saved recognized text to {output_path}")
        except Exception as e:
            logger.error(f"Error saving text to file: {e}")

    def run_ocr_pipeline(self):
        """
        Run the OCR pipeline with file and directory selection.
        """
        logger.info("DEBUG: Entered run_ocr_pipeline")
        logger.info("Select files for OCR processing...")
        
        # This would need to be implemented based on your UI framework
        # For now, return a placeholder
        return {"status": "OCR pipeline not implemented for headless mode"}

    def run_ocr(self, uploaded_file, output_directory):
        """
        Run the OCR process for an uploaded file.
        :param uploaded_file: The file to process (Streamlit or local file path).
        :param output_directory: Directory to save the processed result.
        :return: Path to the saved text file.
        """
        try:
            # Ensure output directory exists
            os.makedirs(output_directory, exist_ok=True)
            
            # Process the file
            if uploaded_file.lower().endswith('.pdf'):
                extracted_text, accuracy_metrics = self.process_pdf_file_with_vision(uploaded_file)
            else:
                extracted_text, accuracy_metrics = self.process_image_with_vision(uploaded_file)
            
            # Generate output filename
            base_name = os.path.splitext(os.path.basename(uploaded_file))[0]
            output_path = os.path.join(output_directory, f"{base_name}_ocr_result.txt")
            
            # Save the extracted text
            self.save_text_to_file(extracted_text, output_path)
            
            return output_path
            
        except Exception as e:
            logger.error(f"Error in run_ocr: {e}")
            raise