graphRAG / pdf_processor.py
nvtitan's picture
Upload 24 files
e884643 verified
"""
PDF Ingestion & Preprocessing Module
Handles extraction of text, tables, code blocks, and images from PDFs
"""
import fitz # PyMuPDF
import pdfplumber
import pytesseract
from PIL import Image
import io
import re
from typing import List, Dict, Any, Optional, Tuple
from loguru import logger
from models import Chunk, ChunkType, PDFMetadata
from config import settings
import uuid
class PDFProcessor:
"""
Comprehensive PDF processor that extracts:
- Page-level text with character ranges
- Tables (structured)
- Code blocks (detected heuristically)
- Images (with OCR)
"""
def __init__(self):
self.code_patterns = [
re.compile(r'```[\s\S]*?```'), # Markdown code blocks
re.compile(r'def\s+\w+\s*\('), # Python functions
re.compile(r'class\s+\w+\s*[:\(]'), # Python/Java classes
re.compile(r'function\s+\w+\s*\('), # JavaScript functions
re.compile(r'public\s+class\s+\w+'), # Java classes
]
def process_pdf(self, filepath: str, pdf_id: str) -> Tuple[List[Chunk], PDFMetadata]:
"""
Main entry point: process entire PDF and return chunks + metadata
Args:
filepath: Path to PDF file
pdf_id: Unique identifier for this PDF
Returns:
Tuple of (chunks list, metadata object)
"""
logger.info(f"Processing PDF: {filepath}")
chunks: List[Chunk] = []
# Open with PyMuPDF for text and images
pdf_doc = fitz.open(filepath)
num_pages = len(pdf_doc)
# Open with pdfplumber for tables
with pdfplumber.open(filepath) as plumber_pdf:
for page_num in range(num_pages):
logger.debug(f"Processing page {page_num + 1}/{num_pages}")
# Extract from PyMuPDF
fitz_page = pdf_doc[page_num]
page_chunks = self._process_page(
fitz_page=fitz_page,
plumber_page=plumber_pdf.pages[page_num],
page_num=page_num + 1, # 1-indexed
pdf_id=pdf_id
)
chunks.extend(page_chunks)
pdf_doc.close()
# Create metadata
import os
file_size = os.path.getsize(filepath)
metadata = PDFMetadata(
pdf_id=pdf_id,
filename=os.path.basename(filepath),
filepath=filepath,
num_pages=num_pages,
file_size_bytes=file_size,
num_chunks=len(chunks),
processing_status="completed"
)
logger.info(f"Extracted {len(chunks)} chunks from {num_pages} pages")
return chunks, metadata
def _process_page(
self,
fitz_page,
plumber_page,
page_num: int,
pdf_id: str
) -> List[Chunk]:
"""Process a single page and return all chunks"""
chunks: List[Chunk] = []
# 1. Extract raw text with character positions
page_text = fitz_page.get_text("text")
# 2. Extract tables
table_chunks = self._extract_tables(plumber_page, page_num, pdf_id)
chunks.extend(table_chunks)
# 3. Extract code blocks
code_chunks = self._extract_code_blocks(page_text, page_num, pdf_id)
chunks.extend(code_chunks)
# 4. Extract images and run OCR
image_chunks = self._extract_images(fitz_page, page_num, pdf_id)
chunks.extend(image_chunks)
# 5. Extract remaining text as paragraphs
# Remove table and code regions from text before creating paragraph chunks
cleaned_text = self._remove_extracted_regions(
page_text,
[c.text for c in code_chunks]
)
if cleaned_text.strip():
para_chunk = Chunk(
chunk_id=str(uuid.uuid4()),
pdf_id=pdf_id,
page_number=page_num,
char_range=(0, len(cleaned_text)),
type=ChunkType.PARAGRAPH,
text=cleaned_text,
metadata={"source": "text_extraction"}
)
chunks.append(para_chunk)
return chunks
def _extract_tables(self, plumber_page, page_num: int, pdf_id: str) -> List[Chunk]:
"""Extract tables from page using pdfplumber"""
chunks = []
tables = plumber_page.extract_tables()
for idx, table in enumerate(tables):
if not table:
continue
# Convert table to structured JSON
table_json = self._table_to_json(table)
# Convert table to text representation
table_text = self._table_to_text(table)
chunk = Chunk(
chunk_id=str(uuid.uuid4()),
pdf_id=pdf_id,
page_number=page_num,
char_range=(0, len(table_text)),
type=ChunkType.TABLE,
text=table_text,
table_json=table_json,
metadata={"table_index": idx, "num_rows": len(table)}
)
chunks.append(chunk)
logger.debug(f"Extracted {len(chunks)} tables from page {page_num}")
return chunks
def _table_to_json(self, table: List[List[str]]) -> Dict[str, Any]:
"""Convert table to structured JSON"""
if not table or len(table) < 2:
return {"headers": [], "rows": []}
headers = table[0]
rows = table[1:]
return {
"headers": headers,
"rows": [
{headers[i]: cell for i, cell in enumerate(row) if i < len(headers)}
for row in rows
]
}
def _table_to_text(self, table: List[List[str]]) -> str:
"""Convert table to readable text"""
return "\n".join([" | ".join([str(cell) for cell in row]) for row in table])
def _extract_code_blocks(self, text: str, page_num: int, pdf_id: str) -> List[Chunk]:
"""Extract code blocks using heuristic patterns"""
chunks = []
# Look for code patterns
for pattern in self.code_patterns:
matches = pattern.finditer(text)
for match in matches:
code_text = match.group(0)
if len(code_text) < 20: # Skip very short matches
continue
chunk = Chunk(
chunk_id=str(uuid.uuid4()),
pdf_id=pdf_id,
page_number=page_num,
char_range=(match.start(), match.end()),
type=ChunkType.CODE,
text=code_text,
metadata={
"pattern": pattern.pattern,
"detected_language": self._detect_language(code_text)
}
)
chunks.append(chunk)
# Also detect monospace font regions (if PDF has font info)
# This is more advanced and would require font analysis
logger.debug(f"Extracted {len(chunks)} code blocks from page {page_num}")
return chunks
def _detect_language(self, code: str) -> str:
"""Heuristically detect programming language"""
if 'def ' in code and ':' in code:
return 'python'
elif 'function' in code or 'const' in code or 'let' in code:
return 'javascript'
elif 'public class' in code or 'private' in code:
return 'java'
elif '#include' in code:
return 'c++'
else:
return 'unknown'
def _extract_images(self, fitz_page, page_num: int, pdf_id: str) -> List[Chunk]:
"""Extract images and run OCR"""
chunks = []
image_list = fitz_page.get_images()
for img_index, img in enumerate(image_list):
try:
xref = img[0]
base_image = fitz_page.parent.extract_image(xref)
image_bytes = base_image["image"]
# Convert to PIL Image
image = Image.open(io.BytesIO(image_bytes))
# Run OCR
ocr_text = pytesseract.image_to_string(image)
if ocr_text.strip():
image_id = f"{pdf_id}_p{page_num}_img{img_index}"
chunk = Chunk(
chunk_id=str(uuid.uuid4()),
pdf_id=pdf_id,
page_number=page_num,
char_range=(0, len(ocr_text)),
type=ChunkType.IMAGE_TEXT,
text=ocr_text,
image_id=image_id,
metadata={
"image_format": base_image["ext"],
"image_index": img_index
}
)
chunks.append(chunk)
except Exception as e:
logger.warning(f"Failed to extract image {img_index} on page {page_num}: {e}")
logger.debug(f"Extracted {len(chunks)} images from page {page_num}")
return chunks
def _remove_extracted_regions(self, text: str, code_blocks: List[str]) -> str:
"""Remove already-extracted code blocks from text"""
for code in code_blocks:
text = text.replace(code, "")
return text
def chunk_text(self, chunks: List[Chunk]) -> List[Chunk]:
"""
Further chunk large text blocks into smaller overlapping chunks
Args:
chunks: Initial chunks from PDF extraction
Returns:
Refined chunks with proper overlap
"""
refined_chunks = []
for chunk in chunks:
# Skip non-text chunks (tables, images already chunked)
if chunk.type in [ChunkType.TABLE, ChunkType.CODE]:
refined_chunks.append(chunk)
continue
# Split long paragraphs into smaller chunks with overlap
text = chunk.text
chunk_size = settings.chunk_size
overlap = settings.chunk_overlap
if len(text) <= chunk_size:
refined_chunks.append(chunk)
continue
# Create overlapping windows
for i in range(0, len(text), chunk_size - overlap):
chunk_text = text[i:i + chunk_size]
if len(chunk_text) < settings.min_chunk_size:
continue
new_chunk = Chunk(
chunk_id=str(uuid.uuid4()),
pdf_id=chunk.pdf_id,
page_number=chunk.page_number,
char_range=(i, i + len(chunk_text)),
type=chunk.type,
text=chunk_text,
metadata={
**chunk.metadata,
"parent_chunk_id": chunk.chunk_id,
"window_index": i // (chunk_size - overlap)
}
)
refined_chunks.append(new_chunk)
logger.info(f"Refined {len(chunks)} chunks into {len(refined_chunks)} chunks")
return refined_chunks