|
|
""" |
|
|
PDF Ingestion & Preprocessing Module |
|
|
Handles extraction of text, tables, code blocks, and images from PDFs |
|
|
""" |
|
|
import fitz |
|
|
import pdfplumber |
|
|
import pytesseract |
|
|
from PIL import Image |
|
|
import io |
|
|
import re |
|
|
from typing import List, Dict, Any, Optional, Tuple |
|
|
from loguru import logger |
|
|
from models import Chunk, ChunkType, PDFMetadata |
|
|
from config import settings |
|
|
import uuid |
|
|
|
|
|
|
|
|
class PDFProcessor: |
|
|
""" |
|
|
Comprehensive PDF processor that extracts: |
|
|
- Page-level text with character ranges |
|
|
- Tables (structured) |
|
|
- Code blocks (detected heuristically) |
|
|
- Images (with OCR) |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.code_patterns = [ |
|
|
re.compile(r'```[\s\S]*?```'), |
|
|
re.compile(r'def\s+\w+\s*\('), |
|
|
re.compile(r'class\s+\w+\s*[:\(]'), |
|
|
re.compile(r'function\s+\w+\s*\('), |
|
|
re.compile(r'public\s+class\s+\w+'), |
|
|
] |
|
|
|
|
|
def process_pdf(self, filepath: str, pdf_id: str) -> Tuple[List[Chunk], PDFMetadata]: |
|
|
""" |
|
|
Main entry point: process entire PDF and return chunks + metadata |
|
|
|
|
|
Args: |
|
|
filepath: Path to PDF file |
|
|
pdf_id: Unique identifier for this PDF |
|
|
|
|
|
Returns: |
|
|
Tuple of (chunks list, metadata object) |
|
|
""" |
|
|
logger.info(f"Processing PDF: {filepath}") |
|
|
|
|
|
chunks: List[Chunk] = [] |
|
|
|
|
|
|
|
|
pdf_doc = fitz.open(filepath) |
|
|
num_pages = len(pdf_doc) |
|
|
|
|
|
|
|
|
with pdfplumber.open(filepath) as plumber_pdf: |
|
|
for page_num in range(num_pages): |
|
|
logger.debug(f"Processing page {page_num + 1}/{num_pages}") |
|
|
|
|
|
|
|
|
fitz_page = pdf_doc[page_num] |
|
|
page_chunks = self._process_page( |
|
|
fitz_page=fitz_page, |
|
|
plumber_page=plumber_pdf.pages[page_num], |
|
|
page_num=page_num + 1, |
|
|
pdf_id=pdf_id |
|
|
) |
|
|
chunks.extend(page_chunks) |
|
|
|
|
|
pdf_doc.close() |
|
|
|
|
|
|
|
|
import os |
|
|
file_size = os.path.getsize(filepath) |
|
|
metadata = PDFMetadata( |
|
|
pdf_id=pdf_id, |
|
|
filename=os.path.basename(filepath), |
|
|
filepath=filepath, |
|
|
num_pages=num_pages, |
|
|
file_size_bytes=file_size, |
|
|
num_chunks=len(chunks), |
|
|
processing_status="completed" |
|
|
) |
|
|
|
|
|
logger.info(f"Extracted {len(chunks)} chunks from {num_pages} pages") |
|
|
return chunks, metadata |
|
|
|
|
|
def _process_page( |
|
|
self, |
|
|
fitz_page, |
|
|
plumber_page, |
|
|
page_num: int, |
|
|
pdf_id: str |
|
|
) -> List[Chunk]: |
|
|
"""Process a single page and return all chunks""" |
|
|
chunks: List[Chunk] = [] |
|
|
|
|
|
|
|
|
page_text = fitz_page.get_text("text") |
|
|
|
|
|
|
|
|
table_chunks = self._extract_tables(plumber_page, page_num, pdf_id) |
|
|
chunks.extend(table_chunks) |
|
|
|
|
|
|
|
|
code_chunks = self._extract_code_blocks(page_text, page_num, pdf_id) |
|
|
chunks.extend(code_chunks) |
|
|
|
|
|
|
|
|
image_chunks = self._extract_images(fitz_page, page_num, pdf_id) |
|
|
chunks.extend(image_chunks) |
|
|
|
|
|
|
|
|
|
|
|
cleaned_text = self._remove_extracted_regions( |
|
|
page_text, |
|
|
[c.text for c in code_chunks] |
|
|
) |
|
|
|
|
|
if cleaned_text.strip(): |
|
|
para_chunk = Chunk( |
|
|
chunk_id=str(uuid.uuid4()), |
|
|
pdf_id=pdf_id, |
|
|
page_number=page_num, |
|
|
char_range=(0, len(cleaned_text)), |
|
|
type=ChunkType.PARAGRAPH, |
|
|
text=cleaned_text, |
|
|
metadata={"source": "text_extraction"} |
|
|
) |
|
|
chunks.append(para_chunk) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _extract_tables(self, plumber_page, page_num: int, pdf_id: str) -> List[Chunk]: |
|
|
"""Extract tables from page using pdfplumber""" |
|
|
chunks = [] |
|
|
tables = plumber_page.extract_tables() |
|
|
|
|
|
for idx, table in enumerate(tables): |
|
|
if not table: |
|
|
continue |
|
|
|
|
|
|
|
|
table_json = self._table_to_json(table) |
|
|
|
|
|
|
|
|
table_text = self._table_to_text(table) |
|
|
|
|
|
chunk = Chunk( |
|
|
chunk_id=str(uuid.uuid4()), |
|
|
pdf_id=pdf_id, |
|
|
page_number=page_num, |
|
|
char_range=(0, len(table_text)), |
|
|
type=ChunkType.TABLE, |
|
|
text=table_text, |
|
|
table_json=table_json, |
|
|
metadata={"table_index": idx, "num_rows": len(table)} |
|
|
) |
|
|
chunks.append(chunk) |
|
|
|
|
|
logger.debug(f"Extracted {len(chunks)} tables from page {page_num}") |
|
|
return chunks |
|
|
|
|
|
def _table_to_json(self, table: List[List[str]]) -> Dict[str, Any]: |
|
|
"""Convert table to structured JSON""" |
|
|
if not table or len(table) < 2: |
|
|
return {"headers": [], "rows": []} |
|
|
|
|
|
headers = table[0] |
|
|
rows = table[1:] |
|
|
|
|
|
return { |
|
|
"headers": headers, |
|
|
"rows": [ |
|
|
{headers[i]: cell for i, cell in enumerate(row) if i < len(headers)} |
|
|
for row in rows |
|
|
] |
|
|
} |
|
|
|
|
|
def _table_to_text(self, table: List[List[str]]) -> str: |
|
|
"""Convert table to readable text""" |
|
|
return "\n".join([" | ".join([str(cell) for cell in row]) for row in table]) |
|
|
|
|
|
def _extract_code_blocks(self, text: str, page_num: int, pdf_id: str) -> List[Chunk]: |
|
|
"""Extract code blocks using heuristic patterns""" |
|
|
chunks = [] |
|
|
|
|
|
|
|
|
for pattern in self.code_patterns: |
|
|
matches = pattern.finditer(text) |
|
|
for match in matches: |
|
|
code_text = match.group(0) |
|
|
if len(code_text) < 20: |
|
|
continue |
|
|
|
|
|
chunk = Chunk( |
|
|
chunk_id=str(uuid.uuid4()), |
|
|
pdf_id=pdf_id, |
|
|
page_number=page_num, |
|
|
char_range=(match.start(), match.end()), |
|
|
type=ChunkType.CODE, |
|
|
text=code_text, |
|
|
metadata={ |
|
|
"pattern": pattern.pattern, |
|
|
"detected_language": self._detect_language(code_text) |
|
|
} |
|
|
) |
|
|
chunks.append(chunk) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Extracted {len(chunks)} code blocks from page {page_num}") |
|
|
return chunks |
|
|
|
|
|
def _detect_language(self, code: str) -> str: |
|
|
"""Heuristically detect programming language""" |
|
|
if 'def ' in code and ':' in code: |
|
|
return 'python' |
|
|
elif 'function' in code or 'const' in code or 'let' in code: |
|
|
return 'javascript' |
|
|
elif 'public class' in code or 'private' in code: |
|
|
return 'java' |
|
|
elif '#include' in code: |
|
|
return 'c++' |
|
|
else: |
|
|
return 'unknown' |
|
|
|
|
|
def _extract_images(self, fitz_page, page_num: int, pdf_id: str) -> List[Chunk]: |
|
|
"""Extract images and run OCR""" |
|
|
chunks = [] |
|
|
image_list = fitz_page.get_images() |
|
|
|
|
|
for img_index, img in enumerate(image_list): |
|
|
try: |
|
|
xref = img[0] |
|
|
base_image = fitz_page.parent.extract_image(xref) |
|
|
image_bytes = base_image["image"] |
|
|
|
|
|
|
|
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
|
|
|
|
|
|
ocr_text = pytesseract.image_to_string(image) |
|
|
|
|
|
if ocr_text.strip(): |
|
|
image_id = f"{pdf_id}_p{page_num}_img{img_index}" |
|
|
|
|
|
chunk = Chunk( |
|
|
chunk_id=str(uuid.uuid4()), |
|
|
pdf_id=pdf_id, |
|
|
page_number=page_num, |
|
|
char_range=(0, len(ocr_text)), |
|
|
type=ChunkType.IMAGE_TEXT, |
|
|
text=ocr_text, |
|
|
image_id=image_id, |
|
|
metadata={ |
|
|
"image_format": base_image["ext"], |
|
|
"image_index": img_index |
|
|
} |
|
|
) |
|
|
chunks.append(chunk) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to extract image {img_index} on page {page_num}: {e}") |
|
|
|
|
|
logger.debug(f"Extracted {len(chunks)} images from page {page_num}") |
|
|
return chunks |
|
|
|
|
|
def _remove_extracted_regions(self, text: str, code_blocks: List[str]) -> str: |
|
|
"""Remove already-extracted code blocks from text""" |
|
|
for code in code_blocks: |
|
|
text = text.replace(code, "") |
|
|
return text |
|
|
|
|
|
def chunk_text(self, chunks: List[Chunk]) -> List[Chunk]: |
|
|
""" |
|
|
Further chunk large text blocks into smaller overlapping chunks |
|
|
|
|
|
Args: |
|
|
chunks: Initial chunks from PDF extraction |
|
|
|
|
|
Returns: |
|
|
Refined chunks with proper overlap |
|
|
""" |
|
|
refined_chunks = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
|
|
|
if chunk.type in [ChunkType.TABLE, ChunkType.CODE]: |
|
|
refined_chunks.append(chunk) |
|
|
continue |
|
|
|
|
|
|
|
|
text = chunk.text |
|
|
chunk_size = settings.chunk_size |
|
|
overlap = settings.chunk_overlap |
|
|
|
|
|
if len(text) <= chunk_size: |
|
|
refined_chunks.append(chunk) |
|
|
continue |
|
|
|
|
|
|
|
|
for i in range(0, len(text), chunk_size - overlap): |
|
|
chunk_text = text[i:i + chunk_size] |
|
|
|
|
|
if len(chunk_text) < settings.min_chunk_size: |
|
|
continue |
|
|
|
|
|
new_chunk = Chunk( |
|
|
chunk_id=str(uuid.uuid4()), |
|
|
pdf_id=chunk.pdf_id, |
|
|
page_number=chunk.page_number, |
|
|
char_range=(i, i + len(chunk_text)), |
|
|
type=chunk.type, |
|
|
text=chunk_text, |
|
|
metadata={ |
|
|
**chunk.metadata, |
|
|
"parent_chunk_id": chunk.chunk_id, |
|
|
"window_index": i // (chunk_size - overlap) |
|
|
} |
|
|
) |
|
|
refined_chunks.append(new_chunk) |
|
|
|
|
|
logger.info(f"Refined {len(chunks)} chunks into {len(refined_chunks)} chunks") |
|
|
return refined_chunks |
|
|
|