agentic-defensor / src /data /document_processor.py
vichudo's picture
fix
6a4bd6f
Raw
History Blame Contribute Delete
4.94 kB
import os
import pickle
import faiss
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from tqdm import tqdm
from src.utils.config import DATA_DIR, EMBEDDINGS_DIR
from src.embeddings.embedder import TextEmbedder
class DocumentProcessor:
"""
Handles document loading, chunking, and processing operations.
"""
def __init__(self, data_dir: str = DATA_DIR, embeddings_dir: str = EMBEDDINGS_DIR):
"""
Initialize the document processor.
Args:
data_dir: Directory containing the document files
embeddings_dir: Directory for storing embeddings and indexes
"""
self.data_dir = data_dir
self.embeddings_dir = embeddings_dir
self.embedder = TextEmbedder()
# Create directories if they don't exist
os.makedirs(data_dir, exist_ok=True)
os.makedirs(embeddings_dir, exist_ok=True)
def process_documents(self, doc_chunks: List[Dict[str, Any]], save: bool = True) -> Tuple[List[Dict[str, Any]], List[List[float]]]:
"""
Process document chunks by generating embeddings and creating a FAISS index.
Args:
doc_chunks: List of document chunks to process
save: Whether to save the processed data to disk
Returns:
Tuple containing the document chunks and their embeddings
"""
print(f"Processing {len(doc_chunks)} document chunks...")
# Extract text chunks for embedding
texts = [chunk["chunk"] for chunk in doc_chunks]
# Generate embeddings
print("Generating embeddings...")
embeddings = self.embedder.get_embeddings_for_texts(texts)
# Save the results if requested
if save:
self._save_processed_data(doc_chunks, embeddings)
return doc_chunks, embeddings
def create_faiss_index(self, embeddings: List[List[float]], save: bool = True) -> faiss.Index:
"""
Create a FAISS index from the document embeddings.
Args:
embeddings: List of embedding vectors
save: Whether to save the index to disk
Returns:
FAISS index
"""
print("Creating FAISS index...")
# Convert embeddings to numpy array
embedding_array = np.array(embeddings, dtype='float32')
# Get dimensions
vector_dimension = embedding_array.shape[1]
# Create the index
index = faiss.IndexFlatL2(vector_dimension)
index.add(embedding_array)
print(f"Created FAISS index with {index.ntotal} vectors of dimension {vector_dimension}")
# Save the index if requested
if save:
index_path = os.path.join(self.embeddings_dir, "faiss_index.index")
faiss.write_index(index, index_path)
print(f"FAISS index saved to {index_path}")
return index
def _save_processed_data(self, doc_chunks: List[Dict[str, Any]], embeddings: List[List[float]]) -> None:
"""
Save the processed document chunks and embeddings to disk.
Args:
doc_chunks: List of document chunks
embeddings: List of embedding vectors
"""
# Save document chunks
chunks_path = os.path.join(self.data_dir, "doc_chunks.pkl")
with open(chunks_path, "wb") as f:
pickle.dump(doc_chunks, f)
print(f"Document chunks saved to {chunks_path}")
# Save embeddings
embeddings_path = os.path.join(self.embeddings_dir, "embeddings.pkl")
with open(embeddings_path, "wb") as f:
pickle.dump(embeddings, f)
print(f"Embeddings saved to {embeddings_path}")
def load_processed_data(self) -> Tuple[List[Dict[str, Any]], List[List[float]], faiss.Index]:
"""
Load processed document chunks, embeddings, and FAISS index from disk.
Returns:
Tuple containing document chunks, embeddings, and FAISS index
"""
# Load document chunks
chunks_path = os.path.join(self.data_dir, "doc_chunks.pkl")
with open(chunks_path, "rb") as f:
doc_chunks = pickle.load(f)
print(f"Document chunks loaded from {chunks_path}")
# Load embeddings
embeddings_path = os.path.join(self.embeddings_dir, "embeddings.pkl")
with open(embeddings_path, "rb") as f:
embeddings = pickle.load(f)
print(f"Embeddings loaded from {embeddings_path}")
# Load FAISS index
index_path = os.path.join(self.embeddings_dir, "faiss_index.index")
index = faiss.read_index(index_path)
print(f"FAISS index loaded from {index_path}")
return doc_chunks, embeddings, index