quantumbit's picture
Upload 41 files
5ff6b14 verified
raw
history blame
12.2 kB
"""
Modular Document Preprocessor
Main orchestrator class that uses all preprocessing modules to process documents.
"""
import os
import asyncio
from typing import List, Dict, Any, Union
from pathlib import Path
from config.config import OUTPUT_DIR
from .pdf_downloader import PDFDownloader
from .file_downloader import FileDownloader
from .text_extractor import TextExtractor
from .text_chunker import TextChunker
from .embedding_manager import EmbeddingManager
from .vector_storage import VectorStorage
from .metadata_manager import MetadataManager
# Import new extractors
from .docx_extractor import extract_docx
from .pptx_extractor import extract_pptx
from .xlsx_extractor import extract_xlsx
from .image_extractor import extract_image_content
class ModularDocumentPreprocessor:
"""
Modular document preprocessor that orchestrates the entire preprocessing pipeline.
This class combines all preprocessing modules to provide a clean interface
for document processing while maintaining separation of concerns.
"""
def __init__(self):
"""Initialize the modular document preprocessor."""
# Set up base database path
self.base_db_path = Path(OUTPUT_DIR).resolve()
self._ensure_base_directory()
# Initialize all modules
self.pdf_downloader = PDFDownloader() # Keep for backward compatibility
self.file_downloader = FileDownloader() # New enhanced downloader
self.text_extractor = TextExtractor()
self.text_chunker = TextChunker()
self.embedding_manager = EmbeddingManager()
self.vector_storage = VectorStorage(self.base_db_path)
self.metadata_manager = MetadataManager(self.base_db_path)
print("βœ… Modular Document Preprocessor initialized successfully")
def _ensure_base_directory(self):
"""Ensure the base directory exists."""
if not self.base_db_path.exists():
try:
self.base_db_path.mkdir(parents=True, exist_ok=True)
print(f"βœ… Created directory: {self.base_db_path}")
except PermissionError:
print(f"⚠️ Directory {self.base_db_path} should exist in production environment")
if not self.base_db_path.exists():
raise RuntimeError(f"Required directory {self.base_db_path} does not exist and cannot be created")
# Delegate metadata operations to metadata manager
def generate_doc_id(self, document_url: str) -> str:
"""Generate a unique document ID from the URL."""
return self.metadata_manager.generate_doc_id(document_url)
def is_document_processed(self, document_url: str) -> bool:
"""Check if a document has already been processed."""
return self.metadata_manager.is_document_processed(document_url)
def get_document_info(self, document_url: str) -> Dict[str, Any]:
"""Get information about a processed document."""
return self.metadata_manager.get_document_info(document_url)
def list_processed_documents(self) -> Dict[str, Dict]:
"""List all processed documents."""
return self.metadata_manager.list_processed_documents()
def get_collection_stats(self) -> Dict[str, Any]:
"""Get statistics about all collections."""
return self.metadata_manager.get_collection_stats()
async def process_document(self, document_url: str, force_reprocess: bool = False, timeout: int = 300) -> Union[str, List]:
"""
Process a single document: download, extract, chunk, embed, and store.
Args:
document_url: URL of the document (PDF, DOCX, PPTX, XLSX, images, etc.)
force_reprocess: If True, reprocess even if already processed
timeout: Download timeout in seconds (default: 300s/5min)
Returns:
str: Document ID for normal processing
List: [content, type] for special handling (oneshot, tabular, image)
"""
doc_id = self.generate_doc_id(document_url)
# Check if already processed
if not force_reprocess and self.is_document_processed(document_url):
print(f"βœ… Document {doc_id} already processed, skipping...")
return doc_id
print(f"πŸš€ Processing document: {doc_id}")
print(f"πŸ“„ URL: {document_url}")
temp_file_path = None
try:
# Step 1: Download file (enhanced to handle multiple types)
temp_file_path, ext = await self.file_downloader.download_file(document_url, timeout=timeout)
if temp_file_path == 'not supported':
return ['unsupported', ext]
# Step 2: Extract text based on file type
full_text = ""
match ext:
case 'pdf':
full_text = await self.text_extractor.extract_text_from_pdf(temp_file_path)
case 'docx':
full_text = extract_docx(temp_file_path)
case 'pptx':
full_text = extract_pptx(temp_file_path)
return [full_text, 'oneshot']
case 'url':
new_context = "URL for Context: " + temp_file_path
return [new_context, 'oneshot']
case 'txt':
with open(temp_file_path, 'r', encoding='utf-8') as f:
full_text = f.read()
case 'xlsx':
full_text = extract_xlsx(temp_file_path)
# Print a short preview (10-15 chars) to verify extraction
try:
preview = ''.join(full_text.split())[:15]
if preview:
print(f"πŸ”Ž XLSX extracted preview: {preview}")
except Exception:
pass
return [full_text, 'tabular']
case 'csv':
with open(temp_file_path, 'r', encoding='utf-8') as f:
full_text = f.read()
return [full_text, 'tabular']
case 'png' | 'jpeg' | 'jpg':
# Don't clean up image files - they'll be cleaned up by the caller
return [temp_file_path, 'image', True] # Third element indicates no cleanup needed
case _:
raise Exception(f"Unsupported file type: {ext}")
# Validate extracted text
if not self.text_extractor.validate_extracted_text(full_text):
raise Exception("No meaningful text extracted from document")
# Step 3: Create chunks
chunks = self.text_chunker.chunk_text(full_text)
# Check if document is too short for chunking
if len(chunks) < 16:
print(f"Only {len(chunks)} chunks formed, going for oneshot.")
return [full_text, 'oneshot']
if not chunks:
raise Exception("No chunks created from text")
# Log chunk statistics
chunk_stats = self.text_chunker.get_chunk_stats(chunks)
print(f"πŸ“Š Chunk Statistics: {chunk_stats['total_chunks']} chunks, "
f"avg size: {chunk_stats['avg_chunk_size']:.0f} chars")
# Step 4: Create embeddings
embeddings = await self.embedding_manager.create_embeddings(chunks)
# Validate embeddings
if not self.embedding_manager.validate_embeddings(embeddings, len(chunks)):
raise Exception("Invalid embeddings generated")
# Step 5: Store in Qdrant
await self.vector_storage.store_in_qdrant(chunks, embeddings, doc_id)
# Step 6: Save metadata
self.metadata_manager.save_document_metadata(chunks, doc_id, document_url)
print(f"βœ… Document {doc_id} processed successfully: {len(chunks)} chunks")
return doc_id
except Exception as e:
print(f"❌ Error processing document {doc_id}: {str(e)}")
raise
finally:
# Clean up temporary file - but NOT for images since they need the file path
if temp_file_path and ext not in ['png', 'jpeg', 'jpg']:
self.file_downloader.cleanup_temp_file(temp_file_path)
async def process_multiple_documents(self, document_urls: List[str], force_reprocess: bool = False) -> Dict[str, str]:
"""
Process multiple documents concurrently.
Args:
document_urls: List of PDF URLs
force_reprocess: If True, reprocess even if already processed
Returns:
Dict[str, str]: Mapping of URLs to document IDs
"""
print(f"πŸš€ Processing {len(document_urls)} documents...")
results = {}
# Process documents concurrently (with limited concurrency)
semaphore = asyncio.Semaphore(3) # Limit to 3 concurrent downloads
async def process_single(url):
async with semaphore:
try:
doc_id = await self.process_document(url, force_reprocess)
return url, doc_id
except Exception as e:
print(f"❌ Failed to process {url}: {str(e)}")
return url, None
tasks = [process_single(url) for url in document_urls]
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for result in completed_tasks:
if isinstance(result, tuple):
url, doc_id = result
if doc_id:
results[url] = doc_id
print(f"βœ… Successfully processed {len(results)}/{len(document_urls)} documents")
return results
def get_system_info(self) -> Dict[str, Any]:
"""
Get information about the preprocessing system.
Returns:
Dict[str, Any]: System information
"""
return {
"base_db_path": str(self.base_db_path),
"embedding_model": self.embedding_manager.get_model_info(),
"text_chunker_config": {
"chunk_size": self.text_chunker.chunk_size,
"chunk_overlap": self.text_chunker.chunk_overlap
},
"processed_documents_registry": self.metadata_manager.get_registry_path(),
"collection_stats": self.get_collection_stats()
}
def cleanup_document(self, document_url: str) -> bool:
"""
Remove all data for a specific document.
Args:
document_url: URL of the document to clean up
Returns:
bool: True if successfully cleaned up
"""
doc_id = self.generate_doc_id(document_url)
try:
# Remove vector storage
vector_removed = self.vector_storage.delete_collection(doc_id)
# Remove metadata
metadata_removed = self.metadata_manager.remove_document_metadata(doc_id)
success = vector_removed and metadata_removed
if success:
print(f"βœ… Successfully cleaned up document {doc_id}")
else:
print(f"⚠️ Partial cleanup for document {doc_id}")
return success
except Exception as e:
print(f"❌ Error cleaning up document {doc_id}: {e}")
return False