notebook-backend / utils /document_processor.py
mohhhhhit's picture
first init
3736c33 verified
import PyPDF2
import pdfplumber
from docx import Document
from pathlib import Path
from typing import List, Dict
import re
import warnings
import logging
# Suppress PyPDF2 warnings about font descriptors
warnings.filterwarnings('ignore', category=UserWarning, module='PyPDF2')
logging.getLogger('PyPDF2').setLevel(logging.ERROR)
class DocumentProcessor:
"""Process various document types and extract text content."""
def __init__(self):
self.supported_formats = ['.pdf', '.txt', '.docx']
def process_file(self, file_path: Path) -> Dict[str, any]:
"""
Process a single file and extract its content.
Args:
file_path: Path to the file
Returns:
Dictionary containing file metadata and content
"""
suffix = file_path.suffix.lower()
if suffix == '.pdf':
content = self._extract_pdf(file_path)
elif suffix == '.txt':
content = self._extract_txt(file_path)
elif suffix == '.docx':
content = self._extract_docx(file_path)
else:
raise ValueError(f"Unsupported file format: {suffix}")
return {
'filename': file_path.name,
'path': str(file_path),
'content': content,
'format': suffix
}
def _extract_pdf(self, file_path: Path) -> str:
"""Extract text from PDF using pdfplumber with PyPDF2 fallback."""
text = ""
try:
# Primary: Use pdfplumber (better for complex PDFs)
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
except Exception as e:
# Fallback: Use PyPDF2 with warnings suppressed
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
try:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
except Exception:
continue # Skip problematic pages
except Exception as e2:
raise ValueError(f"Could not extract text from PDF: {file_path.name}")
return self._clean_text(text)
def _extract_txt(self, file_path: Path) -> str:
"""Extract text from TXT file."""
try:
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
except UnicodeDecodeError:
with open(file_path, 'r', encoding='latin-1') as file:
text = file.read()
return self._clean_text(text)
def _extract_docx(self, file_path: Path) -> str:
"""Extract text from DOCX file."""
doc = Document(file_path)
text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
return self._clean_text(text)
def _clean_text(self, text: str) -> str:
"""Clean and normalize text."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep punctuation
text = re.sub(r'[^\w\s.,!?;:()\-\'\"]+', '', text)
return text.strip()
def chunk_text(self, text: str, chunk_size: int = 512, overlap: int = 50, semantic: bool = True) -> List[str]:
"""
Split text into chunks using semantic or simple chunking.
Args:
text: The text to chunk
chunk_size: Target size of each chunk in characters
overlap: Number of overlapping characters between chunks
semantic: Use semantic chunking (by headers/concepts) if True
Returns:
List of text chunks
"""
if semantic:
return self._semantic_chunk(text, chunk_size, overlap)
else:
return self._simple_chunk(text, chunk_size, overlap)
def _semantic_chunk(self, text: str, target_size: int = 512, overlap: int = 50) -> List[str]:
"""
Chunk text by detecting headers and logical sections.
Perfect for lecture slides and structured documents.
"""
chunks = []
# Split by common header patterns
# Pattern 1: Lines that are ALL CAPS or Title Case followed by newline
# Pattern 2: Lines starting with numbers like "1.", "1.1", etc.
# Pattern 3: Lines with clear visual separators
# First, split by double newlines (paragraphs)
sections = text.split('\n\n')
current_chunk = ""
current_header = ""
for section in sections:
section = section.strip()
if not section:
continue
# Check if this looks like a header
is_header = self._is_likely_header(section)
if is_header and len(current_chunk) > 100:
# Save previous chunk and start new one with this header
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = section + "\n\n"
current_header = section
else:
# Add to current chunk
potential_chunk = current_chunk + section + "\n\n"
# If chunk is getting too large, split it
if len(potential_chunk) > target_size * 1.5:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = section + "\n\n"
else:
current_chunk = potential_chunk
# Add final chunk
if current_chunk:
chunks.append(current_chunk.strip())
# If semantic chunking produced too few chunks, fall back to simple chunking
if len(chunks) < len(text) / (target_size * 2):
return self._simple_chunk(text, target_size, overlap)
return chunks
def _is_likely_header(self, text: str) -> bool:
"""Detect if text is likely a header/title."""
# Too long to be a header
if len(text) > 200:
return False
# Single line headers
if '\n' not in text:
# ALL CAPS
if text.isupper() and len(text.split()) <= 10:
return True
# Title Case
if text.istitle() and len(text.split()) <= 10:
return True
# Numbered sections like "1.", "1.1", "Chapter 1"
if re.match(r'^(\d+\.)+\s+', text) or re.match(r'^(Chapter|Section|Part)\s+\d+', text, re.IGNORECASE):
return True
return False
def _simple_chunk(self, text: str, chunk_size: int = 512, overlap: int = 50) -> List[str]:
"""
Split text into overlapping chunks (original method).
"""
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = start + chunk_size
chunk = text[start:end]
# Try to break at sentence boundary
if end < text_length:
last_period = chunk.rfind('.')
last_newline = chunk.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > chunk_size * 0.5: # At least 50% through the chunk
chunk = chunk[:break_point + 1]
end = start + break_point + 1
chunks.append(chunk.strip())
start = end - overlap
return chunks