Spaces:
Sleeping
Sleeping
""" | |
Modular Document Preprocessor | |
Main orchestrator class that uses all preprocessing modules to process documents. | |
""" | |
import os | |
import asyncio | |
from typing import List, Dict, Any, Union | |
from pathlib import Path | |
from config.config import OUTPUT_DIR | |
from .pdf_downloader import PDFDownloader | |
from .file_downloader import FileDownloader | |
from .text_extractor import TextExtractor | |
from .text_chunker import TextChunker | |
from .embedding_manager import EmbeddingManager | |
from .vector_storage import VectorStorage | |
from .metadata_manager import MetadataManager | |
# Import new extractors | |
from .docx_extractor import extract_docx | |
from .pptx_extractor import extract_pptx | |
from .xlsx_extractor import extract_xlsx | |
from .image_extractor import extract_image_content | |
class ModularDocumentPreprocessor: | |
""" | |
Modular document preprocessor that orchestrates the entire preprocessing pipeline. | |
This class combines all preprocessing modules to provide a clean interface | |
for document processing while maintaining separation of concerns. | |
""" | |
def __init__(self): | |
"""Initialize the modular document preprocessor.""" | |
# Set up base database path | |
self.base_db_path = Path(OUTPUT_DIR).resolve() | |
self._ensure_base_directory() | |
# Initialize all modules | |
self.pdf_downloader = PDFDownloader() # Keep for backward compatibility | |
self.file_downloader = FileDownloader() # New enhanced downloader | |
self.text_extractor = TextExtractor() | |
self.text_chunker = TextChunker() | |
self.embedding_manager = EmbeddingManager() | |
self.vector_storage = VectorStorage(self.base_db_path) | |
self.metadata_manager = MetadataManager(self.base_db_path) | |
print("β Modular Document Preprocessor initialized successfully") | |
def _ensure_base_directory(self): | |
"""Ensure the base directory exists.""" | |
if not self.base_db_path.exists(): | |
try: | |
self.base_db_path.mkdir(parents=True, exist_ok=True) | |
print(f"β Created directory: {self.base_db_path}") | |
except PermissionError: | |
print(f"β οΈ Directory {self.base_db_path} should exist in production environment") | |
if not self.base_db_path.exists(): | |
raise RuntimeError(f"Required directory {self.base_db_path} does not exist and cannot be created") | |
# Delegate metadata operations to metadata manager | |
def generate_doc_id(self, document_url: str) -> str: | |
"""Generate a unique document ID from the URL.""" | |
return self.metadata_manager.generate_doc_id(document_url) | |
def is_document_processed(self, document_url: str) -> bool: | |
"""Check if a document has already been processed.""" | |
return self.metadata_manager.is_document_processed(document_url) | |
def get_document_info(self, document_url: str) -> Dict[str, Any]: | |
"""Get information about a processed document.""" | |
return self.metadata_manager.get_document_info(document_url) | |
def list_processed_documents(self) -> Dict[str, Dict]: | |
"""List all processed documents.""" | |
return self.metadata_manager.list_processed_documents() | |
def get_collection_stats(self) -> Dict[str, Any]: | |
"""Get statistics about all collections.""" | |
return self.metadata_manager.get_collection_stats() | |
async def process_document(self, document_url: str, force_reprocess: bool = False, timeout: int = 300) -> Union[str, List]: | |
""" | |
Process a single document: download, extract, chunk, embed, and store. | |
Args: | |
document_url: URL of the document (PDF, DOCX, PPTX, XLSX, images, etc.) | |
force_reprocess: If True, reprocess even if already processed | |
timeout: Download timeout in seconds (default: 300s/5min) | |
Returns: | |
str: Document ID for normal processing | |
List: [content, type] for special handling (oneshot, tabular, image) | |
""" | |
doc_id = self.generate_doc_id(document_url) | |
# Check if already processed | |
if not force_reprocess and self.is_document_processed(document_url): | |
print(f"β Document {doc_id} already processed, skipping...") | |
return doc_id | |
print(f"π Processing document: {doc_id}") | |
print(f"π URL: {document_url}") | |
temp_file_path = None | |
try: | |
# Step 1: Download file (enhanced to handle multiple types) | |
temp_file_path, ext = await self.file_downloader.download_file(document_url, timeout=timeout) | |
if temp_file_path == 'not supported': | |
return ['unsupported', ext] | |
# Step 2: Extract text based on file type | |
full_text = "" | |
match ext: | |
case 'pdf': | |
full_text = await self.text_extractor.extract_text_from_pdf(temp_file_path) | |
case 'docx': | |
full_text = extract_docx(temp_file_path) | |
case 'pptx': | |
full_text = extract_pptx(temp_file_path) | |
return [full_text, 'oneshot'] | |
case 'url': | |
new_context = "URL for Context: " + temp_file_path | |
return [new_context, 'oneshot'] | |
case 'txt': | |
with open(temp_file_path, 'r', encoding='utf-8') as f: | |
full_text = f.read() | |
case 'xlsx': | |
full_text = extract_xlsx(temp_file_path) | |
# Print a short preview (10-15 chars) to verify extraction | |
try: | |
preview = ''.join(full_text.split())[:15] | |
if preview: | |
print(f"π XLSX extracted preview: {preview}") | |
except Exception: | |
pass | |
return [full_text, 'tabular'] | |
case 'csv': | |
with open(temp_file_path, 'r', encoding='utf-8') as f: | |
full_text = f.read() | |
return [full_text, 'tabular'] | |
case 'png' | 'jpeg' | 'jpg': | |
# Don't clean up image files - they'll be cleaned up by the caller | |
return [temp_file_path, 'image', True] # Third element indicates no cleanup needed | |
case _: | |
raise Exception(f"Unsupported file type: {ext}") | |
# Validate extracted text | |
if not self.text_extractor.validate_extracted_text(full_text): | |
raise Exception("No meaningful text extracted from document") | |
# Step 3: Create chunks | |
chunks = self.text_chunker.chunk_text(full_text) | |
# Check if document is too short for chunking | |
if len(chunks) < 16: | |
print(f"Only {len(chunks)} chunks formed, going for oneshot.") | |
return [full_text, 'oneshot'] | |
if not chunks: | |
raise Exception("No chunks created from text") | |
# Log chunk statistics | |
chunk_stats = self.text_chunker.get_chunk_stats(chunks) | |
print(f"π Chunk Statistics: {chunk_stats['total_chunks']} chunks, " | |
f"avg size: {chunk_stats['avg_chunk_size']:.0f} chars") | |
# Step 4: Create embeddings | |
embeddings = await self.embedding_manager.create_embeddings(chunks) | |
# Validate embeddings | |
if not self.embedding_manager.validate_embeddings(embeddings, len(chunks)): | |
raise Exception("Invalid embeddings generated") | |
# Step 5: Store in Qdrant | |
await self.vector_storage.store_in_qdrant(chunks, embeddings, doc_id) | |
# Step 6: Save metadata | |
self.metadata_manager.save_document_metadata(chunks, doc_id, document_url) | |
print(f"β Document {doc_id} processed successfully: {len(chunks)} chunks") | |
return doc_id | |
except Exception as e: | |
print(f"β Error processing document {doc_id}: {str(e)}") | |
raise | |
finally: | |
# Clean up temporary file - but NOT for images since they need the file path | |
if temp_file_path and ext not in ['png', 'jpeg', 'jpg']: | |
self.file_downloader.cleanup_temp_file(temp_file_path) | |
async def process_multiple_documents(self, document_urls: List[str], force_reprocess: bool = False) -> Dict[str, str]: | |
""" | |
Process multiple documents concurrently. | |
Args: | |
document_urls: List of PDF URLs | |
force_reprocess: If True, reprocess even if already processed | |
Returns: | |
Dict[str, str]: Mapping of URLs to document IDs | |
""" | |
print(f"π Processing {len(document_urls)} documents...") | |
results = {} | |
# Process documents concurrently (with limited concurrency) | |
semaphore = asyncio.Semaphore(3) # Limit to 3 concurrent downloads | |
async def process_single(url): | |
async with semaphore: | |
try: | |
doc_id = await self.process_document(url, force_reprocess) | |
return url, doc_id | |
except Exception as e: | |
print(f"β Failed to process {url}: {str(e)}") | |
return url, None | |
tasks = [process_single(url) for url in document_urls] | |
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True) | |
for result in completed_tasks: | |
if isinstance(result, tuple): | |
url, doc_id = result | |
if doc_id: | |
results[url] = doc_id | |
print(f"β Successfully processed {len(results)}/{len(document_urls)} documents") | |
return results | |
def get_system_info(self) -> Dict[str, Any]: | |
""" | |
Get information about the preprocessing system. | |
Returns: | |
Dict[str, Any]: System information | |
""" | |
return { | |
"base_db_path": str(self.base_db_path), | |
"embedding_model": self.embedding_manager.get_model_info(), | |
"text_chunker_config": { | |
"chunk_size": self.text_chunker.chunk_size, | |
"chunk_overlap": self.text_chunker.chunk_overlap | |
}, | |
"processed_documents_registry": self.metadata_manager.get_registry_path(), | |
"collection_stats": self.get_collection_stats() | |
} | |
def cleanup_document(self, document_url: str) -> bool: | |
""" | |
Remove all data for a specific document. | |
Args: | |
document_url: URL of the document to clean up | |
Returns: | |
bool: True if successfully cleaned up | |
""" | |
doc_id = self.generate_doc_id(document_url) | |
try: | |
# Remove vector storage | |
vector_removed = self.vector_storage.delete_collection(doc_id) | |
# Remove metadata | |
metadata_removed = self.metadata_manager.remove_document_metadata(doc_id) | |
success = vector_removed and metadata_removed | |
if success: | |
print(f"β Successfully cleaned up document {doc_id}") | |
else: | |
print(f"β οΈ Partial cleanup for document {doc_id}") | |
return success | |
except Exception as e: | |
print(f"β Error cleaning up document {doc_id}: {e}") | |
return False | |