import os import pickle import faiss import numpy as np from typing import List, Dict, Any, Optional, Tuple from tqdm import tqdm from src.utils.config import DATA_DIR, EMBEDDINGS_DIR from src.embeddings.embedder import TextEmbedder class DocumentProcessor: """ Handles document loading, chunking, and processing operations. """ def __init__(self, data_dir: str = DATA_DIR, embeddings_dir: str = EMBEDDINGS_DIR): """ Initialize the document processor. Args: data_dir: Directory containing the document files embeddings_dir: Directory for storing embeddings and indexes """ self.data_dir = data_dir self.embeddings_dir = embeddings_dir self.embedder = TextEmbedder() # Create directories if they don't exist os.makedirs(data_dir, exist_ok=True) os.makedirs(embeddings_dir, exist_ok=True) def process_documents(self, doc_chunks: List[Dict[str, Any]], save: bool = True) -> Tuple[List[Dict[str, Any]], List[List[float]]]: """ Process document chunks by generating embeddings and creating a FAISS index. Args: doc_chunks: List of document chunks to process save: Whether to save the processed data to disk Returns: Tuple containing the document chunks and their embeddings """ print(f"Processing {len(doc_chunks)} document chunks...") # Extract text chunks for embedding texts = [chunk["chunk"] for chunk in doc_chunks] # Generate embeddings print("Generating embeddings...") embeddings = self.embedder.get_embeddings_for_texts(texts) # Save the results if requested if save: self._save_processed_data(doc_chunks, embeddings) return doc_chunks, embeddings def create_faiss_index(self, embeddings: List[List[float]], save: bool = True) -> faiss.Index: """ Create a FAISS index from the document embeddings. Args: embeddings: List of embedding vectors save: Whether to save the index to disk Returns: FAISS index """ print("Creating FAISS index...") # Convert embeddings to numpy array embedding_array = np.array(embeddings, dtype='float32') # Get dimensions vector_dimension = embedding_array.shape[1] # Create the index index = faiss.IndexFlatL2(vector_dimension) index.add(embedding_array) print(f"Created FAISS index with {index.ntotal} vectors of dimension {vector_dimension}") # Save the index if requested if save: index_path = os.path.join(self.embeddings_dir, "faiss_index.index") faiss.write_index(index, index_path) print(f"FAISS index saved to {index_path}") return index def _save_processed_data(self, doc_chunks: List[Dict[str, Any]], embeddings: List[List[float]]) -> None: """ Save the processed document chunks and embeddings to disk. Args: doc_chunks: List of document chunks embeddings: List of embedding vectors """ # Save document chunks chunks_path = os.path.join(self.data_dir, "doc_chunks.pkl") with open(chunks_path, "wb") as f: pickle.dump(doc_chunks, f) print(f"Document chunks saved to {chunks_path}") # Save embeddings embeddings_path = os.path.join(self.embeddings_dir, "embeddings.pkl") with open(embeddings_path, "wb") as f: pickle.dump(embeddings, f) print(f"Embeddings saved to {embeddings_path}") def load_processed_data(self) -> Tuple[List[Dict[str, Any]], List[List[float]], faiss.Index]: """ Load processed document chunks, embeddings, and FAISS index from disk. Returns: Tuple containing document chunks, embeddings, and FAISS index """ # Load document chunks chunks_path = os.path.join(self.data_dir, "doc_chunks.pkl") with open(chunks_path, "rb") as f: doc_chunks = pickle.load(f) print(f"Document chunks loaded from {chunks_path}") # Load embeddings embeddings_path = os.path.join(self.embeddings_dir, "embeddings.pkl") with open(embeddings_path, "rb") as f: embeddings = pickle.load(f) print(f"Embeddings loaded from {embeddings_path}") # Load FAISS index index_path = os.path.join(self.embeddings_dir, "faiss_index.index") index = faiss.read_index(index_path) print(f"FAISS index loaded from {index_path}") return doc_chunks, embeddings, index