Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
PDFOCR - Module for processing PDF files with OCR and extracting structured data. | |
Provides robust PDF to image conversion before OCR processing. | |
""" | |
import json | |
import os | |
import tempfile | |
import logging | |
from pathlib import Path | |
from typing import Optional, Dict, List, Union, Tuple, Any | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger("pdf_ocr") | |
# Import StructuredOCR for OCR processing | |
from structured_ocr import StructuredOCR | |
class PDFConversionResult: | |
"""Class to hold results of PDF to image conversion.""" | |
def __init__(self, | |
success: bool, | |
images: List[Path] = None, | |
error: str = None, | |
page_count: int = 0, | |
temp_files: List[str] = None): | |
"""Initialize the conversion result. | |
Args: | |
success: Whether the conversion was successful | |
images: List of paths to the converted images | |
error: Error message if conversion failed | |
page_count: Total number of pages in the PDF | |
temp_files: List of temporary files that should be cleaned up | |
""" | |
self.success = success | |
self.images = images or [] | |
self.error = error | |
self.page_count = page_count | |
self.temp_files = temp_files or [] | |
def __bool__(self): | |
"""Enable boolean evaluation of the result.""" | |
return self.success | |
def cleanup(self): | |
"""Clean up any temporary files created during conversion.""" | |
for temp_file in self.temp_files: | |
try: | |
if os.path.exists(temp_file): | |
os.unlink(temp_file) | |
logger.debug(f"Removed temporary file: {temp_file}") | |
except Exception as e: | |
logger.warning(f"Failed to remove temporary file {temp_file}: {e}") | |
self.temp_files = [] | |
class PDFOCR: | |
"""Class for processing PDF files with OCR and extracting structured data.""" | |
def __init__(self, api_key=None): | |
"""Initialize the PDF OCR processor.""" | |
self.processor = StructuredOCR(api_key=api_key) | |
self.temp_files = [] | |
def __del__(self): | |
"""Clean up resources when object is destroyed.""" | |
self.cleanup() | |
def cleanup(self): | |
"""Clean up any temporary files.""" | |
for temp_file in self.temp_files: | |
try: | |
if os.path.exists(temp_file): | |
os.unlink(temp_file) | |
logger.debug(f"Removed temporary file: {temp_file}") | |
except Exception as e: | |
logger.warning(f"Failed to remove temporary file {temp_file}: {e}") | |
self.temp_files = [] | |
def convert_pdf_to_images(self, | |
pdf_path: Union[str, Path], | |
dpi: int = 200, | |
max_pages: Optional[int] = None, | |
page_numbers: Optional[List[int]] = None) -> PDFConversionResult: | |
""" | |
Convert a PDF file to images. | |
Args: | |
pdf_path: Path to the PDF file | |
dpi: DPI for the output images | |
max_pages: Maximum number of pages to convert (None for all) | |
page_numbers: Specific page numbers to convert (1-based indexing) | |
Returns: | |
PDFConversionResult object with conversion results | |
""" | |
pdf_path = Path(pdf_path) | |
if not pdf_path.exists(): | |
return PDFConversionResult( | |
success=False, | |
error=f"PDF file not found: {pdf_path}" | |
) | |
# Check file size | |
file_size_mb = pdf_path.stat().st_size / (1024 * 1024) | |
logger.info(f"PDF size: {file_size_mb:.2f} MB") | |
try: | |
# Import pdf2image for conversion | |
import pdf2image | |
# Initialize list for temporary files | |
temp_files = [] | |
# Optimize conversion parameters based on file size | |
thread_count = min(4, os.cpu_count() or 2) | |
# First, determine total pages in the document | |
logger.info("Determining PDF page count...") | |
try: | |
# Use a lightweight approach with multi-threading for faster processing | |
pdf_info = pdf2image.convert_from_path( | |
pdf_path, | |
dpi=72, # Low DPI just for info | |
first_page=1, | |
last_page=1, | |
size=(100, 100), # Tiny image to save memory | |
fmt="jpeg", | |
thread_count=thread_count, | |
output_file=None | |
) | |
# Get page count from poppler info if available | |
if hasattr(pdf_info, 'n_pages'): | |
total_pages = pdf_info.n_pages | |
else: | |
# Try a different approach to get page count | |
try: | |
from pypdf import PdfReader | |
reader = PdfReader(pdf_path) | |
total_pages = len(reader.pages) | |
except: | |
total_pages = 1 | |
logger.warning("Could not determine total page count, assuming 1 page") | |
except Exception as e: | |
logger.warning(f"Failed to determine page count: {e}") | |
total_pages = 1 | |
logger.info(f"PDF has {total_pages} total pages") | |
# Determine which pages to process | |
pages_to_process = [] | |
# If specific pages are requested, use those | |
if page_numbers and any(1 <= p <= total_pages for p in page_numbers): | |
pages_to_process = [p for p in page_numbers if 1 <= p <= total_pages] | |
logger.info(f"Converting {len(pages_to_process)} specified pages: {pages_to_process}") | |
# If max_pages is set, limit to that number | |
elif max_pages and max_pages < total_pages: | |
pages_to_process = list(range(1, max_pages + 1)) | |
logger.info(f"Converting first {max_pages} pages of {total_pages} total") | |
# Otherwise convert all pages if reasonable count | |
else: | |
pages_to_process = list(range(1, total_pages + 1)) | |
logger.info(f"Converting all {total_pages} pages") | |
# Convert PDF to images | |
converted_images = [] | |
# Process in batches for better memory management | |
batch_size = min(5, len(pages_to_process)) # Process up to 5 pages at a time | |
for i in range(0, len(pages_to_process), batch_size): | |
batch_pages = pages_to_process[i:i+batch_size] | |
logger.info(f"Converting batch of pages {batch_pages}") | |
# Convert this batch of pages | |
try: | |
batch_images = pdf2image.convert_from_path( | |
pdf_path, | |
dpi=dpi, | |
first_page=min(batch_pages), | |
last_page=max(batch_pages), | |
thread_count=thread_count, | |
fmt="jpeg" | |
) | |
# Map converted images to requested page numbers | |
for idx, page_num in enumerate(range(min(batch_pages), max(batch_pages) + 1)): | |
if page_num in pages_to_process and idx < len(batch_images): | |
# Save the image to a temporary file | |
img_temp_path = tempfile.NamedTemporaryFile(suffix=f'_page{page_num}.jpg', delete=False).name | |
batch_images[idx].save(img_temp_path, format='JPEG', quality=95) | |
# Add to results and track the temp file | |
converted_images.append((page_num, Path(img_temp_path))) | |
temp_files.append(img_temp_path) | |
except Exception as e: | |
logger.error(f"Failed to convert batch {batch_pages}: {e}") | |
# Continue with other batches | |
# Sort by page number to ensure correct order | |
converted_images.sort(key=lambda x: x[0]) | |
# Extract just the image paths in correct page order | |
image_paths = [img_path for _, img_path in converted_images] | |
if not image_paths: | |
# No images were successfully converted | |
return PDFConversionResult( | |
success=False, | |
error="Failed to convert PDF to images", | |
page_count=total_pages, | |
temp_files=temp_files | |
) | |
# Store temp files for later cleanup | |
self.temp_files.extend(temp_files) | |
# Return successful result | |
return PDFConversionResult( | |
success=True, | |
images=image_paths, | |
page_count=total_pages, | |
temp_files=temp_files | |
) | |
except ImportError: | |
return PDFConversionResult( | |
success=False, | |
error="pdf2image module not available. Please install with: pip install pdf2image" | |
) | |
except Exception as e: | |
logger.error(f"PDF conversion error: {str(e)}") | |
return PDFConversionResult( | |
success=False, | |
error=f"Failed to convert PDF to images: {str(e)}" | |
) | |
def process_pdf(self, pdf_path, use_vision=True, max_pages=None, custom_pages=None, custom_prompt=None): | |
""" | |
Process a PDF file with OCR and extract structured data. | |
Args: | |
pdf_path: Path to the PDF file | |
use_vision: Whether to use vision model for improved analysis | |
max_pages: Maximum number of pages to process | |
custom_pages: Specific page numbers to process (1-based indexing) | |
custom_prompt: Custom instructions for processing | |
Returns: | |
Dictionary with structured OCR results | |
""" | |
pdf_path = Path(pdf_path) | |
if not pdf_path.exists(): | |
raise FileNotFoundError(f"PDF file not found: {pdf_path}") | |
# Convert page numbers to list if provided | |
page_numbers = None | |
if custom_pages: | |
if isinstance(custom_pages, (list, tuple)): | |
page_numbers = custom_pages | |
else: | |
try: | |
# Try to parse as comma-separated string | |
page_numbers = [int(p.strip()) for p in str(custom_pages).split(',')] | |
except: | |
logger.warning(f"Invalid custom_pages format: {custom_pages}. Should be list or comma-separated string.") | |
# First try our optimized PDF to image conversion | |
conversion_result = self.convert_pdf_to_images( | |
pdf_path=pdf_path, | |
max_pages=max_pages, | |
page_numbers=page_numbers | |
) | |
if conversion_result.success and conversion_result.images: | |
logger.info(f"Successfully converted PDF to {len(conversion_result.images)} images") | |
# Determine if we need to add PDF-specific context to the prompt | |
modified_prompt = custom_prompt | |
if not modified_prompt: | |
modified_prompt = f"This is a multi-page PDF document with {conversion_result.page_count} total pages, of which {len(conversion_result.images)} were processed." | |
elif "pdf" not in modified_prompt.lower() and "multi-page" not in modified_prompt.lower(): | |
modified_prompt += f" This is a multi-page PDF document with {conversion_result.page_count} total pages, of which {len(conversion_result.images)} were processed." | |
try: | |
# First process the first page with vision if requested | |
first_page_result = self.processor.process_file( | |
file_path=conversion_result.images[0], | |
file_type="image", | |
use_vision=use_vision, | |
custom_prompt=modified_prompt | |
) | |
# Process additional pages if available | |
all_pages_text = [] | |
all_languages = set() | |
# Extract text from first page | |
if 'ocr_contents' in first_page_result and 'raw_text' in first_page_result['ocr_contents']: | |
all_pages_text.append(first_page_result['ocr_contents']['raw_text']) | |
# Track languages from first page | |
if 'languages' in first_page_result: | |
for lang in first_page_result['languages']: | |
all_languages.add(str(lang)) | |
# Process additional pages if any | |
for i, img_path in enumerate(conversion_result.images[1:], 1): | |
try: | |
# Simple text extraction for additional pages | |
page_result = self.processor.process_file( | |
file_path=img_path, | |
file_type="image", | |
use_vision=False, # Use simpler processing for additional pages | |
custom_prompt=f"This is page {i+1} of a {conversion_result.page_count}-page document." | |
) | |
# Extract text | |
if 'ocr_contents' in page_result and 'raw_text' in page_result['ocr_contents']: | |
all_pages_text.append(page_result['ocr_contents']['raw_text']) | |
# Track languages | |
if 'languages' in page_result: | |
for lang in page_result['languages']: | |
all_languages.add(str(lang)) | |
except Exception as e: | |
logger.warning(f"Error processing page {i+1}: {e}") | |
# Combine all text into a single document | |
combined_text = "\n\n".join(all_pages_text) | |
# Update the first page result with combined data | |
if 'ocr_contents' in first_page_result: | |
first_page_result['ocr_contents']['raw_text'] = combined_text | |
# Update languages with all detected languages | |
if all_languages: | |
first_page_result['languages'] = list(all_languages) | |
# Add PDF metadata | |
first_page_result['file_name'] = pdf_path.name | |
first_page_result['file_type'] = "pdf" | |
first_page_result['total_pages'] = conversion_result.page_count | |
first_page_result['processed_pages'] = len(conversion_result.images) | |
# Add conversion info | |
first_page_result['pdf_conversion'] = { | |
"method": "pdf2image", | |
"pages_converted": len(conversion_result.images), | |
"pages_requested": len(page_numbers) if page_numbers else (max_pages or conversion_result.page_count) | |
} | |
return first_page_result | |
except Exception as e: | |
logger.error(f"Error processing converted images: {e}") | |
# Fall back to direct processing via StructuredOCR | |
finally: | |
# Clean up temporary files | |
conversion_result.cleanup() | |
# If conversion failed or processing the images failed, fall back to direct processing | |
logger.info(f"Using direct StructuredOCR processing for PDF") | |
return self.processor.process_file( | |
file_path=pdf_path, | |
file_type="pdf", | |
use_vision=use_vision, | |
max_pages=max_pages, | |
custom_pages=custom_pages, | |
custom_prompt=custom_prompt | |
) | |
def save_json_output(self, pdf_path, output_path, use_vision=True, max_pages=None, custom_pages=None, custom_prompt=None): | |
""" | |
Process a PDF file and save the structured output as JSON. | |
Args: | |
pdf_path: Path to the PDF file | |
output_path: Path where to save the JSON output | |
use_vision: Whether to use vision model for improved analysis | |
max_pages: Maximum number of pages to process | |
custom_pages: Specific page numbers to process (1-based indexing) | |
custom_prompt: Custom instructions for processing | |
Returns: | |
Path to the saved JSON file | |
""" | |
# Process the PDF | |
result = self.process_pdf( | |
pdf_path, | |
use_vision=use_vision, | |
max_pages=max_pages, | |
custom_pages=custom_pages, | |
custom_prompt=custom_prompt | |
) | |
# Save the result to JSON | |
output_path = Path(output_path) | |
output_path.parent.mkdir(parents=True, exist_ok=True) | |
with open(output_path, 'w') as f: | |
json.dump(result, f, indent=2) | |
return output_path | |
# For testing directly | |
if __name__ == "__main__": | |
import sys | |
import argparse | |
parser = argparse.ArgumentParser(description="Process PDF files with OCR.") | |
parser.add_argument("pdf_path", help="Path to the PDF file to process") | |
parser.add_argument("--output", "-o", help="Path to save the output JSON") | |
parser.add_argument("--no-vision", dest="use_vision", action="store_false", | |
help="Disable vision model for processing") | |
parser.add_argument("--max-pages", type=int, help="Maximum number of pages to process") | |
parser.add_argument("--pages", help="Specific pages to process (comma-separated)") | |
parser.add_argument("--prompt", help="Custom prompt for processing") | |
args = parser.parse_args() | |
processor = PDFOCR() | |
# Parse custom pages if provided | |
custom_pages = None | |
if args.pages: | |
try: | |
custom_pages = [int(p.strip()) for p in args.pages.split(',')] | |
except: | |
print(f"Error parsing pages: {args.pages}. Should be comma-separated list of numbers.") | |
sys.exit(1) | |
if args.output: | |
result_path = processor.save_json_output( | |
args.pdf_path, | |
args.output, | |
use_vision=args.use_vision, | |
max_pages=args.max_pages, | |
custom_pages=custom_pages, | |
custom_prompt=args.prompt | |
) | |
print(f"Results saved to: {result_path}") | |
else: | |
result = processor.process_pdf( | |
args.pdf_path, | |
use_vision=args.use_vision, | |
max_pages=args.max_pages, | |
custom_pages=custom_pages, | |
custom_prompt=args.prompt | |
) | |
print(json.dumps(result, indent=2)) | |