File size: 4,935 Bytes
6a4bd6f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import pickle
import faiss
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from tqdm import tqdm

from src.utils.config import DATA_DIR, EMBEDDINGS_DIR
from src.embeddings.embedder import TextEmbedder

class DocumentProcessor:
    """
    Handles document loading, chunking, and processing operations.
    """
    
    def __init__(self, data_dir: str = DATA_DIR, embeddings_dir: str = EMBEDDINGS_DIR):
        """
        Initialize the document processor.
        
        Args:
            data_dir: Directory containing the document files
            embeddings_dir: Directory for storing embeddings and indexes
        """
        self.data_dir = data_dir
        self.embeddings_dir = embeddings_dir
        self.embedder = TextEmbedder()
        
        # Create directories if they don't exist
        os.makedirs(data_dir, exist_ok=True)
        os.makedirs(embeddings_dir, exist_ok=True)
    
    def process_documents(self, doc_chunks: List[Dict[str, Any]], save: bool = True) -> Tuple[List[Dict[str, Any]], List[List[float]]]:
        """
        Process document chunks by generating embeddings and creating a FAISS index.
        
        Args:
            doc_chunks: List of document chunks to process
            save: Whether to save the processed data to disk
            
        Returns:
            Tuple containing the document chunks and their embeddings
        """
        print(f"Processing {len(doc_chunks)} document chunks...")
        
        # Extract text chunks for embedding
        texts = [chunk["chunk"] for chunk in doc_chunks]
        
        # Generate embeddings
        print("Generating embeddings...")
        embeddings = self.embedder.get_embeddings_for_texts(texts)
        
        # Save the results if requested
        if save:
            self._save_processed_data(doc_chunks, embeddings)
        
        return doc_chunks, embeddings
    
    def create_faiss_index(self, embeddings: List[List[float]], save: bool = True) -> faiss.Index:
        """
        Create a FAISS index from the document embeddings.
        
        Args:
            embeddings: List of embedding vectors
            save: Whether to save the index to disk
            
        Returns:
            FAISS index
        """
        print("Creating FAISS index...")
        
        # Convert embeddings to numpy array
        embedding_array = np.array(embeddings, dtype='float32')
        
        # Get dimensions
        vector_dimension = embedding_array.shape[1]
        
        # Create the index
        index = faiss.IndexFlatL2(vector_dimension)
        index.add(embedding_array)
        
        print(f"Created FAISS index with {index.ntotal} vectors of dimension {vector_dimension}")
        
        # Save the index if requested
        if save:
            index_path = os.path.join(self.embeddings_dir, "faiss_index.index")
            faiss.write_index(index, index_path)
            print(f"FAISS index saved to {index_path}")
        
        return index
    
    def _save_processed_data(self, doc_chunks: List[Dict[str, Any]], embeddings: List[List[float]]) -> None:
        """
        Save the processed document chunks and embeddings to disk.
        
        Args:
            doc_chunks: List of document chunks
            embeddings: List of embedding vectors
        """
        # Save document chunks
        chunks_path = os.path.join(self.data_dir, "doc_chunks.pkl")
        with open(chunks_path, "wb") as f:
            pickle.dump(doc_chunks, f)
        print(f"Document chunks saved to {chunks_path}")
        
        # Save embeddings
        embeddings_path = os.path.join(self.embeddings_dir, "embeddings.pkl")
        with open(embeddings_path, "wb") as f:
            pickle.dump(embeddings, f)
        print(f"Embeddings saved to {embeddings_path}")
    
    def load_processed_data(self) -> Tuple[List[Dict[str, Any]], List[List[float]], faiss.Index]:
        """
        Load processed document chunks, embeddings, and FAISS index from disk.
        
        Returns:
            Tuple containing document chunks, embeddings, and FAISS index
        """
        # Load document chunks
        chunks_path = os.path.join(self.data_dir, "doc_chunks.pkl")
        with open(chunks_path, "rb") as f:
            doc_chunks = pickle.load(f)
        print(f"Document chunks loaded from {chunks_path}")
        
        # Load embeddings
        embeddings_path = os.path.join(self.embeddings_dir, "embeddings.pkl")
        with open(embeddings_path, "rb") as f:
            embeddings = pickle.load(f)
        print(f"Embeddings loaded from {embeddings_path}")
        
        # Load FAISS index
        index_path = os.path.join(self.embeddings_dir, "faiss_index.index")
        index = faiss.read_index(index_path)
        print(f"FAISS index loaded from {index_path}")
        
        return doc_chunks, embeddings, index