Spaces:
Sleeping
Sleeping
| from langchain_community.document_loaders import ( | |
| UnstructuredWordDocumentLoader, | |
| TextLoader, | |
| CSVLoader, | |
| PyPDFLoader, | |
| UnstructuredMarkdownLoader, | |
| ) | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| from app.core.chunks import Chunk | |
| import nltk # used for proper tokenizer workflow | |
| from uuid import ( | |
| uuid4, | |
| ) # for generating unique id as hex (uuid4 is used as it generates ids form pseudo random numbers unlike uuid1 and others) | |
| import numpy as np | |
| from app.settings import logging, settings | |
| from concurrent.futures import ProcessPoolExecutor, as_completed | |
| import os | |
| import fitz | |
| class PDFLoader: | |
| def __init__(self, file_path: str): | |
| self.file_path = file_path | |
| def load(self) -> list[Document]: | |
| docs = [] | |
| with fitz.open(self.file_path) as doc: | |
| for page in doc: | |
| text = page.get_text("text") | |
| metadata = { | |
| "source": self.file_path, | |
| "page": page.number, | |
| } | |
| docs.append(Document(page_content=text, metadata=metadata)) | |
| return docs | |
| class DocumentProcessor: | |
| """ | |
| TODO: determine the most suitable chunk size | |
| chunks -> the list of chunks from loaded files | |
| chunks_unsaved -> the list of recently added chunks that have not been saved to db yet | |
| processed -> the list of files that were already splitted into chunks | |
| unprocessed -> !processed | |
| text_splitter -> text splitting strategy | |
| """ | |
| def __init__(self): | |
| self.chunks_unsaved: list[Chunk] = [] | |
| self.unprocessed: list[Document] = [] | |
| self.max_workers = min(4, os.cpu_count() or 1) | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| **settings.text_splitter.model_dump() | |
| ) | |
| """ | |
| Measures cosine between two vectors | |
| """ | |
| def cosine_similarity(self, vec1, vec2): | |
| return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2)) | |
| """ | |
| Updates a list of the most relevant chunks without interacting with db | |
| """ | |
| def update_most_relevant_chunk( | |
| self, | |
| chunk: list[np.float64, Chunk], | |
| relevant_chunks: list[list[np.float64, Chunk]], | |
| mx_len=15, | |
| ): | |
| relevant_chunks.append(chunk) | |
| for i in range(len(relevant_chunks) - 1, 0, -1): | |
| if relevant_chunks[i][0] > relevant_chunks[i - 1][0]: | |
| relevant_chunks[i], relevant_chunks[i - 1] = ( | |
| relevant_chunks[i - 1], | |
| relevant_chunks[i], | |
| ) | |
| else: | |
| break | |
| if len(relevant_chunks) > mx_len: | |
| del relevant_chunks[-1] | |
| """ | |
| Loads one file - extracts text from file | |
| TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader | |
| TODO: Play with .pdf and text from img extraction | |
| TODO: Try chunking with llm | |
| add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true | |
| """ | |
| def check_size(self, file_path: str = "") -> bool: | |
| try: | |
| size = os.path.getsize(filename=file_path) | |
| except Exception: | |
| size = 0 | |
| if size > 1000000: | |
| return True | |
| return False | |
| def document_multiplexer(self, filepath: str, get_loader: bool = False, get_chunking_strategy: bool = False): | |
| loader = None | |
| parallelization = False | |
| if filepath.endswith(".pdf"): | |
| loader = PyPDFLoader( | |
| file_path=filepath | |
| ) # splits each presentation into slides and processes it as separate file | |
| parallelization = False | |
| elif filepath.endswith(".docx") or filepath.endswith(".doc"): | |
| loader = UnstructuredWordDocumentLoader(file_path=filepath) | |
| elif filepath.endswith(".txt"): | |
| loader = TextLoader(file_path=filepath) | |
| elif filepath.endswith(".csv"): | |
| loader = CSVLoader(file_path=filepath) | |
| elif filepath.endswith(".json"): | |
| loader = TextLoader(file_path=filepath) | |
| elif filepath.endswith(".md"): | |
| loader = UnstructuredMarkdownLoader(file_path=filepath) | |
| if filepath.endswith(".pdf"): | |
| parallelization = False | |
| else: | |
| parallelization = self.check_size(file_path=filepath) | |
| if get_loader: | |
| return loader | |
| elif get_chunking_strategy: | |
| return parallelization | |
| else: | |
| raise RuntimeError("What to do, my lord?") | |
| def load_document( | |
| self, filepath: str, add_to_unprocessed: bool = False | |
| ) -> list[Document]: | |
| loader = self.document_multiplexer(filepath=filepath, get_loader=True) | |
| if loader is None: | |
| raise RuntimeError("Unsupported type of file") | |
| documents: list[Document] = [] # We can not assign a single value to the document since .pdf are splitted into several files | |
| try: | |
| documents = loader.load() | |
| # print("-" * 100, documents, "-" * 100, sep="\n") | |
| except Exception: | |
| raise RuntimeError("File is corrupted") | |
| if add_to_unprocessed: | |
| for doc in documents: | |
| self.unprocessed.append(doc) | |
| strategy = self.document_multiplexer(filepath=filepath, get_chunking_strategy=True) | |
| print(f"Strategy --> {strategy}") | |
| self.generate_chunks(parallelization=strategy) | |
| return documents | |
| """ | |
| Similar to load_document, but for multiple files | |
| add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true | |
| """ | |
| def load_documents( | |
| self, documents: list[str], add_to_unprocessed: bool = False | |
| ) -> list[Document]: | |
| extracted_documents: list[Document] = [] | |
| for doc in documents: | |
| temp_storage: list[Document] = [] | |
| try: | |
| temp_storage = self.load_document( | |
| filepath=doc, add_to_unprocessed=True | |
| ) | |
| except Exception as e: | |
| logging.error( | |
| "Error at load_documents while loading %s", doc, exc_info=e | |
| ) | |
| continue | |
| for extrc_doc in temp_storage: | |
| extracted_documents.append(extrc_doc) | |
| if add_to_unprocessed: | |
| self.unprocessed.append(extrc_doc) | |
| return extracted_documents | |
| def split_into_groups(self, original_list: list[any], split_by: int = 15) -> list[list[any]]: | |
| output = [] | |
| for i in range(0, len(original_list), split_by): | |
| new_group = original_list[i: i + split_by] | |
| output.append(new_group) | |
| return output | |
| def _chunkinize(self, document: Document, text: list[str], lines: list[dict]) -> list[Chunk]: | |
| output: list[Chunk] = [] | |
| for chunk in text: | |
| start_l, end_l = self.get_start_end_lines( | |
| splitted_text=lines, | |
| start_char=chunk.metadata.get("start_index", 0), | |
| end_char=chunk.metadata.get("start_index", 0) | |
| + len(chunk.page_content), | |
| ) | |
| new_chunk = Chunk( | |
| id=uuid4(), | |
| filename=document.metadata.get("source", ""), | |
| page_number=document.metadata.get("page", 0), | |
| start_index=chunk.metadata.get("start_index", 0), | |
| start_line=start_l, | |
| end_line=end_l, | |
| text=chunk.page_content, | |
| ) | |
| # print(new_chunk) | |
| output.append(new_chunk) | |
| return output | |
| def precompute_lines(self, splitted_document: list[str]) -> list[dict]: | |
| current_start = 0 | |
| output: list[dict] = [] | |
| for i, line in enumerate(splitted_document): | |
| output.append({"id": i + 1, "start": current_start, "end": current_start + len(line) + 1, "text": line}) | |
| current_start += len(line) + 1 | |
| return output | |
| def generate_chunks(self, parallelization: bool = True): | |
| intermediate = [] | |
| for document in self.unprocessed: | |
| text: list[str] = self.text_splitter.split_documents(documents=[document]) | |
| lines: list[dict] = self.precompute_lines(splitted_document=document.page_content.splitlines()) | |
| groups = self.split_into_groups(original_list=text, split_by=50) | |
| if parallelization: | |
| print("<------- Apply Parallel Execution ------->") | |
| with ProcessPoolExecutor(max_workers=self.max_workers) as executor: | |
| futures = [executor.submit(self._chunkinize, document, group, lines) for group in groups] | |
| for feature in as_completed(futures): | |
| intermediate.append(feature.result()) | |
| else: | |
| intermediate.append(self._chunkinize(document=document, text=text, lines=lines)) | |
| for group in intermediate: | |
| for chunk in group: | |
| self.chunks_unsaved.append(chunk) | |
| self.unprocessed = [] | |
| def find_line(self, splitted_text: list[dict], char) -> int: | |
| l, r = 0, len(splitted_text) - 1 | |
| while l <= r: | |
| m = (l + r) // 2 | |
| line = splitted_text[m] | |
| if line["start"] <= char < line["end"]: | |
| return m + 1 | |
| elif char < line["start"]: | |
| r = m - 1 | |
| else: | |
| l = m + 1 | |
| return r | |
| def get_start_end_lines( | |
| self, | |
| splitted_text: list[dict], | |
| start_char: int, | |
| end_char: int, | |
| debug_mode: bool = False, | |
| ) -> tuple[int, int]: | |
| start = self.find_line(splitted_text=splitted_text, char=start_char) | |
| end = self.find_line(splitted_text=splitted_text, char=end_char) | |
| return (start, end) | |
| """ | |
| Note: it should be used only once to download tokenizers, futher usage is not recommended | |
| """ | |
| def update_nltk(self) -> None: | |
| nltk.download("punkt") | |
| nltk.download("averaged_perceptron_tagger") | |
| """ | |
| For now the system works as follows: we save recently loaded chunks in two arrays: | |
| chunks - for all chunks, even for that ones that havn't been saveed to db | |
| chunks_unsaved - for chunks that have been added recently | |
| I do not know weather we really need to store all chunks that were added in the | |
| current session, but chunks_unsaved are used to avoid dublications while saving to db. | |
| """ | |
| def get_and_save_unsaved_chunks(self) -> list[Chunk]: | |
| chunks_copy: list[Chunk] = self.chunks_unsaved.copy() | |
| self.clear_unsaved_chunks() | |
| return chunks_copy | |
| def clear_unsaved_chunks(self): | |
| self.chunks_unsaved = [] | |
| def get_all_chunks(self) -> list[Chunk]: | |
| return self.chunks_unsaved | |