Spaces:
Sleeping
Sleeping
| """ | |
| PDF Processing Module - Layer 1: PDF Understanding | |
| Handles multimodal extraction: text, images, tables | |
| """ | |
| import PyPDF2 | |
| import fitz # PyMuPDF | |
| from pdf2image import convert_from_path | |
| from PIL import Image | |
| import pytesseract | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| import io | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class PDFProcessor: | |
| """ | |
| Comprehensive PDF processing for medical documents | |
| Implements hybrid extraction: native text + OCR fallback | |
| """ | |
| def __init__(self): | |
| self.supported_formats = ['.pdf'] | |
| logger.info("PDF Processor initialized") | |
| async def extract_content(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Extract multimodal content from PDF | |
| Returns: | |
| Dict with: | |
| - text: extracted text content | |
| - images: list of extracted images | |
| - tables: detected tabular content | |
| - metadata: document metadata | |
| - page_count: number of pages | |
| """ | |
| try: | |
| logger.info(f"Starting PDF extraction: {file_path}") | |
| # Initialize result structure | |
| result = { | |
| "text": "", | |
| "images": [], | |
| "tables": [], | |
| "metadata": {}, | |
| "page_count": 0, | |
| "extraction_method": "hybrid" | |
| } | |
| # Open PDF with PyMuPDF for robust extraction | |
| doc = fitz.open(file_path) | |
| result["page_count"] = len(doc) | |
| result["metadata"] = self._extract_metadata(doc) | |
| all_text = [] | |
| all_images = [] | |
| # Process each page | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| # Extract text | |
| page_text = page.get_text() | |
| # If native text extraction fails, use OCR | |
| if not page_text.strip(): | |
| logger.info(f"Page {page_num + 1}: Using OCR (no native text)") | |
| page_text = await self._ocr_page(file_path, page_num) | |
| result["extraction_method"] = "hybrid_with_ocr" | |
| all_text.append(page_text) | |
| # Extract images from page | |
| page_images = self._extract_images_from_page(page, page_num) | |
| all_images.extend(page_images) | |
| # Detect tables (simplified detection) | |
| tables = self._detect_tables(page_text) | |
| result["tables"].extend(tables) | |
| result["text"] = "\n\n".join(all_text) | |
| result["images"] = all_images | |
| # Extract structured sections | |
| result["sections"] = self._extract_sections(result["text"]) | |
| doc.close() | |
| logger.info(f"PDF extraction complete: {result['page_count']} pages, " | |
| f"{len(result['images'])} images, {len(result['tables'])} tables") | |
| return result | |
| except Exception as e: | |
| logger.error(f"PDF extraction failed: {str(e)}") | |
| raise | |
| def _extract_metadata(self, doc: fitz.Document) -> Dict[str, Any]: | |
| """Extract PDF metadata""" | |
| metadata = {} | |
| try: | |
| pdf_metadata = doc.metadata | |
| metadata = { | |
| "title": pdf_metadata.get("title", ""), | |
| "author": pdf_metadata.get("author", ""), | |
| "subject": pdf_metadata.get("subject", ""), | |
| "creator": pdf_metadata.get("creator", ""), | |
| "producer": pdf_metadata.get("producer", ""), | |
| "creation_date": pdf_metadata.get("creationDate", ""), | |
| "modification_date": pdf_metadata.get("modDate", "") | |
| } | |
| except Exception as e: | |
| logger.warning(f"Metadata extraction failed: {str(e)}") | |
| return metadata | |
| async def _ocr_page(self, file_path: str, page_num: int) -> str: | |
| """Perform OCR on a single page""" | |
| try: | |
| # Convert PDF page to image | |
| images = convert_from_path( | |
| file_path, | |
| first_page=page_num + 1, | |
| last_page=page_num + 1, | |
| dpi=300 | |
| ) | |
| if images: | |
| # Perform OCR | |
| text = pytesseract.image_to_string(images[0]) | |
| return text | |
| return "" | |
| except Exception as e: | |
| logger.warning(f"OCR failed for page {page_num + 1}: {str(e)}") | |
| return "" | |
| def _extract_images_from_page(self, page: fitz.Page, page_num: int) -> List[Dict[str, Any]]: | |
| """Extract images from a PDF page""" | |
| images = [] | |
| try: | |
| image_list = page.get_images(full=True) | |
| for img_index, img_info in enumerate(image_list): | |
| images.append({ | |
| "page": page_num + 1, | |
| "index": img_index, | |
| "xref": img_info[0], | |
| "width": img_info[2], | |
| "height": img_info[3] | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Image extraction failed for page {page_num + 1}: {str(e)}") | |
| return images | |
| def _detect_tables(self, text: str) -> List[Dict[str, Any]]: | |
| """ | |
| Detect tabular content in text | |
| Simplified heuristic-based detection | |
| """ | |
| tables = [] | |
| # Look for common table patterns | |
| lines = text.split('\n') | |
| potential_table = [] | |
| in_table = False | |
| for line in lines: | |
| # Simple heuristic: lines with multiple tabs or pipes | |
| if '\t' in line or '|' in line or line.count(' ') > 3: | |
| potential_table.append(line) | |
| in_table = True | |
| elif in_table and potential_table: | |
| # End of table | |
| if len(potential_table) >= 2: # At least header + 1 row | |
| tables.append({ | |
| "rows": potential_table, | |
| "row_count": len(potential_table) | |
| }) | |
| potential_table = [] | |
| in_table = False | |
| return tables | |
| def _extract_sections(self, text: str) -> Dict[str, str]: | |
| """ | |
| Extract common medical report sections | |
| """ | |
| sections = {} | |
| # Common section headers in medical reports | |
| section_headers = [ | |
| "HISTORY", "PHYSICAL EXAMINATION", "ASSESSMENT", "PLAN", | |
| "CHIEF COMPLAINT", "DIAGNOSIS", "FINDINGS", "IMPRESSION", | |
| "RECOMMENDATIONS", "LAB RESULTS", "MEDICATIONS", "ALLERGIES", | |
| "VITAL SIGNS", "PAST MEDICAL HISTORY", "FAMILY HISTORY", | |
| "SOCIAL HISTORY", "REVIEW OF SYSTEMS" | |
| ] | |
| lines = text.split('\n') | |
| current_section = "GENERAL" | |
| current_content = [] | |
| for line in lines: | |
| line_upper = line.strip().upper() | |
| # Check if line is a section header | |
| is_header = False | |
| for header in section_headers: | |
| if header in line_upper and len(line.strip()) < 50: | |
| # Save previous section | |
| if current_content: | |
| sections[current_section] = '\n'.join(current_content) | |
| current_section = header | |
| current_content = [] | |
| is_header = True | |
| break | |
| if not is_header: | |
| current_content.append(line) | |
| # Save last section | |
| if current_content: | |
| sections[current_section] = '\n'.join(current_content) | |
| return sections | |