File size: 8,993 Bytes
e53c2d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
from langchain_community.document_loaders import PyPDFLoader, UnstructuredWordDocumentLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from app.models import Embedder
from app.chunks import Chunk
import nltk # used for proper tokenizer workflow
from uuid import uuid4 # for generating unique id as hex (uuid4 is used as it generates ids form pseudo random numbers unlike uuid1 and others)
import numpy as np
from app.settings import logging, text_splitter_config, embedder_model


# TODO: replace PDFloader since it is completely unusable OR try to fix it


class DocumentProcessor:
    '''
    TODO: determine the most suitable chunk size

    chunks -> the list of chunks from loaded files
    chunks_unsaved -> the list of recently added chunks that have not been saved to db yet
    processed -> the list of files that were already splitted into chunks
    upprocessed -> !processed
    text_splitter -> text splitting strategy
    '''

    def __init__(self):
        self.chunks: list[Chunk] = []
        self.chunks_unsaved: list[Chunk] = []
        self.processed: list[Document] = []
        self.unprocessed: list[Document] = []
        self.embedder = Embedder(embedder_model)
        self.text_splitter = RecursiveCharacterTextSplitter(**text_splitter_config)

    '''
    Measures cosine between two vectors
    '''
    def cosine_similarity(self, vec1, vec2):
        return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

    '''
    Updates a list of the most relevant chunks without interacting with db
    '''
    def update_most_relevant_chunk(self, chunk: list[np.float64, Chunk], relevant_chunks: list[list[np.float64, Chunk]],
                                   mx_len=15):
        relevant_chunks.append(chunk)
        for i in range(len(relevant_chunks) - 1, 0, -1):
            if relevant_chunks[i][0] > relevant_chunks[i - 1][0]:
                relevant_chunks[i], relevant_chunks[i - 1] = relevant_chunks[i - 1], relevant_chunks[i]
            else:
                break

        if len(relevant_chunks) > mx_len:
            del relevant_chunks[-1]

    '''
    Loads one file - extracts text from file

    TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader
    TODO: Play with .pdf and text from img extraction
    TODO: Try chunking with llm

    add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true
    '''
    def load_document(self, filepath: str, add_to_unprocessed: bool = False) -> list[Document]:
        loader = None

        if filepath.endswith(".pdf"):
            loader = PyPDFLoader(
                file_path=filepath)  # splits each presentation into slides and processes it as separate file
        elif filepath.endswith(".docx") or filepath.endswith(".doc"):
            # loader = Docx2txtLoader(file_path=filepath) ## try it later, since UnstructuredWordDocumentLoader is extremly slow
            loader = UnstructuredWordDocumentLoader(file_path=filepath)
        elif filepath.endswith(".txt"):
            loader = TextLoader(file_path=filepath)

        if loader is None:
            raise RuntimeError("Unsupported type of file")

        documents: list[
            Document] = []  # We can not assign a single value to the document since .pdf are splitted into several files
        try:
            documents = loader.load()
        except Exception:
            raise RuntimeError("File is corrupted")

        if add_to_unprocessed:
            for doc in documents:
                self.unprocessed.append(doc)

        return documents

    '''
    Similar to load_document, but for multiple files

    add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true
    '''
    def load_documents(self, documents: list[str], add_to_unprocessed: bool = False) -> list[Document]:
        extracted_documents: list[Document] = []

        for doc in documents:
            temp_storage: list[Document] = []

            try:
                temp_storage = self.load_document(filepath=doc,
                                                  add_to_unprocessed=False)  # In some cases it should be True, but i can not imagine any :(
            except Exception as e:
                logging.error("Error at load_documents while loading %s", doc, exc_info=e)
                continue

            for extrc_doc in temp_storage:
                extracted_documents.append(extrc_doc)

                if add_to_unprocessed:
                    self.unprocessed.append(extrc_doc)

        return extracted_documents

    '''
    Generates chunks with recursive splitter from the list of unprocessed files, add files to the list of processed, and clears unprocessed

    TODO: try to split text with other llm (not really needed, but we should at least try it)
    '''
    def generate_chunks(self, query: str = "", embedding: bool = False):
        most_relevant = []

        if embedding:
            query_embedded = self.embedder.encode(query)

        for document in self.unprocessed:
            self.processed.append(document)

            text: list[str] = self.text_splitter.split_documents([document])
            lines: list[str] = document.page_content.split("\n")

            for chunk in text:

                start_l, end_l = self.get_start_end_lines(
                    splitted_text=lines,
                    start_char=chunk.metadata.get("start_index", 0),
                    end_char=chunk.metadata.get("start_index", 0) + len(chunk.page_content)
                )

                newChunk = Chunk(
                    id=uuid4(),
                    filename=document.metadata.get("source", ""),
                    page_number=document.metadata.get("page", 0),
                    start_index=chunk.metadata.get("start_index", 0),
                    start_line=start_l,
                    end_line=end_l,
                    text=chunk.page_content
                )

                if embedding:
                    chunk_embedded = self.embedder.encode(newChunk.text)
                    similarity = self.cosine_similarity(query_embedded, chunk_embedded)
                    self.update_most_relevant_chunk([similarity, newChunk], most_relevant)

                self.chunks.append(newChunk)
                self.chunks_unsaved.append(newChunk)

            self.unprocessed = []
            print(len(self.chunks_unsaved))
        return most_relevant

    '''
    Determines the line, were the chunk starts and ends (1-based indexing)

    Some magic stuff here. To be honest, i understood it after 7th attempt

    TODO: invent more efficient way

    splitted_text -> original text splitted by \n
    start_char -> index of symbol, were current chunk starts
    end_char ->  index of symbol, were current chunk ends
    debug_mode -> flag, which enables printing useful info about the process
    '''
    def get_start_end_lines(self, splitted_text: list[str], start_char: int, end_char: int, debug_mode: bool = False) -> \
    tuple[int, int]:
        if debug_mode:
            logging.info(splitted_text)

        start, end, char_ct = 0, 0, 0
        iter_count = 1

        for i, line in enumerate(splitted_text):
            if debug_mode:
                logging.info(
                    f"start={start_char}, current={char_ct}, end_current={char_ct + len(line) + 1}, end={end_char}, len={len(line)}, iter={iter_count}\n")

            if char_ct <= start_char <= char_ct + len(line) + 1:
                start = i + 1
            if char_ct <= end_char <= char_ct + len(line) + 1:
                end = i + 1
                break

            iter_count += 1
            char_ct += len(line) + 1

        if debug_mode:
            logging.info(f"result => {start} {end}\n\n\n")

        return start, end

    '''
    Note: it should be used only once to download tokenizers, futher usage is not recommended
    '''
    def update_nltk(self) -> None:
        nltk.download('punkt')
        nltk.download('averaged_perceptron_tagger')

    '''
    For now the system works as follows: we save recently loaded chunks in two arrays:
        chunks - for all chunks, even for that ones that havn't been saveed to db
        chunks_unsaved - for chunks that have been added recently
    I do not know weather we really need to store all chunks that were added in the 
    current session, but chunks_unsaved are used to avoid dublications while saving to db.
    '''
    def clear_unsaved_chunks(self):
        self.chunks_unsaved = []

    def get_all_chunks(self) -> list[Chunk]:
        return self.chunks

    '''
    If we want to save chunks to db, we need to clear the temp storage to avoid dublications
    '''
    def get_and_save_unsaved_chunks(self) -> list[Chunk]:
        chunks_copy: list[Chunk] = self.chunks.copy()
        self.clear_unsaved_chunks()
        return chunks_copy