SPARKNET / src /rag /indexer.py
MHamdan's picture
Initial commit: SPARKNET framework
d520909
"""
Document Indexer for RAG
Handles indexing processed documents into the vector store.
"""
from typing import List, Optional, Dict, Any, Union
from pathlib import Path
from pydantic import BaseModel, Field
from loguru import logger
from .store import VectorStore, get_vector_store
from .embeddings import EmbeddingAdapter, get_embedding_adapter
try:
from ..document.schemas.core import ProcessedDocument, DocumentChunk
from ..document.pipeline import process_document, PipelineConfig
DOCUMENT_MODULE_AVAILABLE = True
except ImportError:
DOCUMENT_MODULE_AVAILABLE = False
logger.warning("Document module not available for indexing")
class IndexerConfig(BaseModel):
"""Configuration for document indexer."""
# Batch settings
batch_size: int = Field(default=32, ge=1, description="Embedding batch size")
# Metadata to index
include_bbox: bool = Field(default=True, description="Include bounding boxes")
include_page: bool = Field(default=True, description="Include page numbers")
include_chunk_type: bool = Field(default=True, description="Include chunk types")
# Processing options
skip_empty_chunks: bool = Field(default=True, description="Skip empty text chunks")
min_chunk_length: int = Field(default=10, ge=1, description="Minimum chunk text length")
class IndexingResult(BaseModel):
"""Result of indexing operation."""
document_id: str
source_path: str
num_chunks_indexed: int
num_chunks_skipped: int
success: bool
error: Optional[str] = None
class DocumentIndexer:
"""
Indexes documents into the vector store for RAG.
Workflow:
1. Process document (if not already processed)
2. Extract chunks with metadata
3. Generate embeddings
4. Store in vector database
"""
def __init__(
self,
config: Optional[IndexerConfig] = None,
vector_store: Optional[VectorStore] = None,
embedding_adapter: Optional[EmbeddingAdapter] = None,
):
"""
Initialize indexer.
Args:
config: Indexer configuration
vector_store: Vector store instance
embedding_adapter: Embedding adapter instance
"""
self.config = config or IndexerConfig()
self._store = vector_store
self._embedder = embedding_adapter
@property
def store(self) -> VectorStore:
"""Get vector store (lazy initialization)."""
if self._store is None:
self._store = get_vector_store()
return self._store
@property
def embedder(self) -> EmbeddingAdapter:
"""Get embedding adapter (lazy initialization)."""
if self._embedder is None:
self._embedder = get_embedding_adapter()
return self._embedder
def index_document(
self,
source: Union[str, Path],
document_id: Optional[str] = None,
pipeline_config: Optional[Any] = None,
) -> IndexingResult:
"""
Index a document from file.
Args:
source: Path to document
document_id: Optional document ID
pipeline_config: Optional pipeline configuration
Returns:
IndexingResult
"""
if not DOCUMENT_MODULE_AVAILABLE:
return IndexingResult(
document_id=document_id or str(source),
source_path=str(source),
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error="Document processing module not available",
)
try:
# Process document
logger.info(f"Processing document: {source}")
processed = process_document(source, document_id, pipeline_config)
# Index the processed document
return self.index_processed_document(processed)
except Exception as e:
logger.error(f"Failed to index document: {e}")
return IndexingResult(
document_id=document_id or str(source),
source_path=str(source),
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error=str(e),
)
def index_processed_document(
self,
document: "ProcessedDocument",
) -> IndexingResult:
"""
Index an already-processed document.
Args:
document: ProcessedDocument instance
Returns:
IndexingResult
"""
document_id = document.metadata.document_id
source_path = document.metadata.source_path
try:
# Prepare chunks for indexing
chunks_to_index = []
skipped = 0
for chunk in document.chunks:
# Skip empty or short chunks
if self.config.skip_empty_chunks:
if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length:
skipped += 1
continue
chunk_data = {
"chunk_id": chunk.chunk_id,
"document_id": document_id,
"source_path": source_path,
"text": chunk.text,
"sequence_index": chunk.sequence_index,
"confidence": chunk.confidence,
}
if self.config.include_page:
chunk_data["page"] = chunk.page
if self.config.include_chunk_type:
chunk_data["chunk_type"] = chunk.chunk_type.value
if self.config.include_bbox and chunk.bbox:
chunk_data["bbox"] = {
"x_min": chunk.bbox.x_min,
"y_min": chunk.bbox.y_min,
"x_max": chunk.bbox.x_max,
"y_max": chunk.bbox.y_max,
}
chunks_to_index.append(chunk_data)
if not chunks_to_index:
return IndexingResult(
document_id=document_id,
source_path=source_path,
num_chunks_indexed=0,
num_chunks_skipped=skipped,
success=True,
)
# Generate embeddings in batches
logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks")
texts = [c["text"] for c in chunks_to_index]
embeddings = self.embedder.embed_batch(texts)
# Store in vector database
logger.info(f"Storing {len(chunks_to_index)} chunks in vector store")
self.store.add_chunks(chunks_to_index, embeddings)
logger.info(
f"Indexed document {document_id}: "
f"{len(chunks_to_index)} chunks, {skipped} skipped"
)
return IndexingResult(
document_id=document_id,
source_path=source_path,
num_chunks_indexed=len(chunks_to_index),
num_chunks_skipped=skipped,
success=True,
)
except Exception as e:
logger.error(f"Failed to index processed document: {e}")
return IndexingResult(
document_id=document_id,
source_path=source_path,
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error=str(e),
)
def index_batch(
self,
sources: List[Union[str, Path]],
pipeline_config: Optional[Any] = None,
) -> List[IndexingResult]:
"""
Index multiple documents.
Args:
sources: List of document paths
pipeline_config: Optional pipeline configuration
Returns:
List of IndexingResult
"""
results = []
for source in sources:
result = self.index_document(source, pipeline_config=pipeline_config)
results.append(result)
# Summary
successful = sum(1 for r in results if r.success)
total_chunks = sum(r.num_chunks_indexed for r in results)
logger.info(
f"Batch indexing complete: "
f"{successful}/{len(results)} documents, "
f"{total_chunks} total chunks"
)
return results
def delete_document(self, document_id: str) -> int:
"""
Remove a document from the index.
Args:
document_id: Document ID to remove
Returns:
Number of chunks deleted
"""
return self.store.delete_document(document_id)
def get_index_stats(self) -> Dict[str, Any]:
"""
Get indexing statistics.
Returns:
Dictionary with index stats
"""
total_chunks = self.store.count()
# Try to get document count
try:
if hasattr(self.store, 'list_documents'):
doc_ids = self.store.list_documents()
num_documents = len(doc_ids)
else:
num_documents = None
except:
num_documents = None
return {
"total_chunks": total_chunks,
"num_documents": num_documents,
"embedding_model": self.embedder.model_name,
"embedding_dimension": self.embedder.embedding_dimension,
}
# Global instance and factory
_document_indexer: Optional[DocumentIndexer] = None
def get_document_indexer(
config: Optional[IndexerConfig] = None,
vector_store: Optional[VectorStore] = None,
embedding_adapter: Optional[EmbeddingAdapter] = None,
) -> DocumentIndexer:
"""
Get or create singleton document indexer.
Args:
config: Indexer configuration
vector_store: Optional vector store instance
embedding_adapter: Optional embedding adapter
Returns:
DocumentIndexer instance
"""
global _document_indexer
if _document_indexer is None:
_document_indexer = DocumentIndexer(
config=config,
vector_store=vector_store,
embedding_adapter=embedding_adapter,
)
return _document_indexer
def reset_document_indexer():
"""Reset the global indexer instance."""
global _document_indexer
_document_indexer = None