Spaces:
Build error
Build error
| import os | |
| import re | |
| import requests | |
| import pysrt | |
| from langchain_community.document_loaders import ( | |
| Docx2txtLoader, | |
| YoutubeLoader, | |
| TextLoader, | |
| ) | |
| from langchain.schema import Document | |
| import logging | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_experimental.text_splitter import SemanticChunker | |
| from langchain_openai.embeddings import OpenAIEmbeddings | |
| import json | |
| from concurrent.futures import ThreadPoolExecutor | |
| from urllib.parse import urljoin | |
| import html2text | |
| import bs4 | |
| import PyPDF2 | |
| from modules.dataloader.pdf_readers.base import PDFReader | |
| from modules.dataloader.pdf_readers.llama import LlamaParser | |
| from modules.dataloader.pdf_readers.gpt import GPTParser | |
| from modules.dataloader.helpers import get_metadata | |
| from modules.config.constants import TIMEOUT | |
| logger = logging.getLogger(__name__) | |
| BASE_DIR = os.getcwd() | |
| class HTMLReader: | |
| def __init__(self): | |
| pass | |
| def read_url(self, url): | |
| response = requests.get(url, timeout=TIMEOUT) | |
| if response.status_code == 200: | |
| return response.text | |
| else: | |
| logger.warning(f"Failed to download HTML from URL: {url}") | |
| return None | |
| def check_links(self, base_url, html_content): | |
| soup = bs4.BeautifulSoup(html_content, "html.parser") | |
| for link in soup.find_all("a"): | |
| href = link.get("href") | |
| if not href or href.startswith("#"): | |
| continue | |
| elif not href.startswith("https"): | |
| href = href.replace("http", "https") | |
| absolute_url = urljoin(base_url, href) | |
| link["href"] = absolute_url | |
| resp = requests.head(absolute_url, timeout=TIMEOUT) | |
| if resp.status_code != 200: | |
| logger.warning( | |
| f"Link {absolute_url} is broken. Status code: {resp.status_code}" | |
| ) | |
| return str(soup) | |
| def html_to_md(self, url, html_content): | |
| html_processed = self.check_links(url, html_content) | |
| markdown_content = html2text.html2text(html_processed) | |
| return markdown_content | |
| def read_html(self, url): | |
| html_content = self.read_url(url) | |
| if html_content: | |
| return self.html_to_md(url, html_content) | |
| else: | |
| return None | |
| class FileReader: | |
| def __init__(self, logger, kind): | |
| self.logger = logger | |
| self.kind = kind | |
| if kind == "llama": | |
| self.pdf_reader = LlamaParser() | |
| elif kind == "gpt": | |
| self.pdf_reader = GPTParser() | |
| else: | |
| self.pdf_reader = PDFReader() | |
| self.web_reader = HTMLReader() | |
| self.logger.info( | |
| f"Initialized FileReader with {kind} PDF reader and HTML reader" | |
| ) | |
| def extract_text_from_pdf(self, pdf_path): | |
| text = "" | |
| with open(pdf_path, "rb") as file: | |
| reader = PyPDF2.PdfReader(file) | |
| num_pages = len(reader.pages) | |
| for page_num in range(num_pages): | |
| page = reader.pages[page_num] | |
| text += page.extract_text() | |
| return text | |
| def read_pdf(self, temp_file_path: str): | |
| documents = self.pdf_reader.parse(temp_file_path) | |
| return documents | |
| def read_txt(self, temp_file_path: str): | |
| loader = TextLoader(temp_file_path, autodetect_encoding=True) | |
| return loader.load() | |
| def read_docx(self, temp_file_path: str): | |
| loader = Docx2txtLoader(temp_file_path) | |
| return loader.load() | |
| def read_srt(self, temp_file_path: str): | |
| subs = pysrt.open(temp_file_path) | |
| text = "" | |
| for sub in subs: | |
| text += sub.text | |
| return [Document(page_content=text)] | |
| def read_youtube_transcript(self, url: str): | |
| loader = YoutubeLoader.from_youtube_url( | |
| url, add_video_info=True, language=["en"], translation="en" | |
| ) | |
| return loader.load() | |
| def read_html(self, url: str): | |
| return [Document(page_content=self.web_reader.read_html(url))] | |
| def read_tex_from_url(self, tex_url): | |
| response = requests.get(tex_url, timeout=TIMEOUT) | |
| if response.status_code == 200: | |
| return [Document(page_content=response.text)] | |
| else: | |
| self.logger.error(f"Failed to fetch .tex file from URL: {tex_url}") | |
| return None | |
| class ChunkProcessor: | |
| def __init__(self, config, logger): | |
| self.config = config | |
| self.logger = logger | |
| self.document_data = {} | |
| self.document_metadata = {} | |
| self.document_chunks_full = [] | |
| # TODO: Fix when reparse_files is False | |
| if not config["vectorstore"]["reparse_files"]: | |
| self.load_document_data() | |
| if config["splitter_options"]["use_splitter"]: | |
| if config["splitter_options"]["chunking_mode"] == "fixed": | |
| if config["splitter_options"]["split_by_token"]: | |
| self.splitter = ( | |
| RecursiveCharacterTextSplitter.from_tiktoken_encoder( | |
| chunk_size=config["splitter_options"]["chunk_size"], | |
| chunk_overlap=config["splitter_options"]["chunk_overlap"], | |
| separators=config["splitter_options"]["chunk_separators"], | |
| disallowed_special=(), | |
| ) | |
| ) | |
| else: | |
| self.splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=config["splitter_options"]["chunk_size"], | |
| chunk_overlap=config["splitter_options"]["chunk_overlap"], | |
| separators=config["splitter_options"]["chunk_separators"], | |
| disallowed_special=(), | |
| ) | |
| else: | |
| self.splitter = SemanticChunker( | |
| OpenAIEmbeddings(), breakpoint_threshold_type="percentile" | |
| ) | |
| else: | |
| self.splitter = None | |
| self.logger.info("ChunkProcessor instance created") | |
| def remove_delimiters(self, document_chunks: list): | |
| for chunk in document_chunks: | |
| for delimiter in self.config["splitter_options"]["delimiters_to_remove"]: | |
| chunk.page_content = re.sub(delimiter, " ", chunk.page_content) | |
| return document_chunks | |
| def remove_chunks(self, document_chunks: list): | |
| front = self.config["splitter_options"]["front_chunk_to_remove"] | |
| end = self.config["splitter_options"]["last_chunks_to_remove"] | |
| for _ in range(front): | |
| del document_chunks[0] | |
| for _ in range(end): | |
| document_chunks.pop() | |
| return document_chunks | |
| def process_chunks( | |
| self, documents, file_type="txt", source="", page=0, metadata={} | |
| ): | |
| # TODO: Clear up this pipeline of re-adding metadata | |
| documents = [Document(page_content=documents, source=source, page=page)] | |
| if ( | |
| file_type == "pdf" | |
| and self.config["splitter_options"]["chunking_mode"] == "fixed" | |
| ): | |
| document_chunks = documents | |
| else: | |
| document_chunks = self.splitter.split_documents(documents) | |
| # add the source and page number back to the metadata | |
| for chunk in document_chunks: | |
| chunk.metadata["source"] = source | |
| chunk.metadata["page"] = page | |
| # add the metadata extracted from the document | |
| for key, value in metadata.items(): | |
| chunk.metadata[key] = value | |
| if self.config["splitter_options"]["remove_leftover_delimiters"]: | |
| document_chunks = self.remove_delimiters(document_chunks) | |
| if self.config["splitter_options"]["remove_chunks"]: | |
| document_chunks = self.remove_chunks(document_chunks) | |
| return document_chunks | |
| def chunk_docs(self, file_reader, uploaded_files, weblinks): | |
| addl_metadata = get_metadata( | |
| *self.config["metadata"]["metadata_links"], self.config | |
| ) # For any additional metadata | |
| # remove already processed files if reparse_files is False | |
| if not self.config["vectorstore"]["reparse_files"]: | |
| total_documents = len(uploaded_files) + len(weblinks) | |
| uploaded_files = [ | |
| file_path | |
| for file_path in uploaded_files | |
| if file_path not in self.document_data | |
| ] | |
| weblinks = [link for link in weblinks if link not in self.document_data] | |
| print( | |
| f"Total documents to process: {total_documents}, Documents already processed: {total_documents - len(uploaded_files) - len(weblinks)}" | |
| ) | |
| with ThreadPoolExecutor() as executor: | |
| executor.map( | |
| self.process_file, | |
| uploaded_files, | |
| range(len(uploaded_files)), | |
| [file_reader] * len(uploaded_files), | |
| [addl_metadata] * len(uploaded_files), | |
| ) | |
| executor.map( | |
| self.process_weblink, | |
| weblinks, | |
| range(len(weblinks)), | |
| [file_reader] * len(weblinks), | |
| [addl_metadata] * len(weblinks), | |
| ) | |
| document_names = [ | |
| f"{file_name}_{page_num}" | |
| for file_name, pages in self.document_data.items() | |
| for page_num in pages.keys() | |
| ] | |
| documents = [ | |
| page for doc in self.document_data.values() for page in doc.values() | |
| ] | |
| document_metadata = [ | |
| page for doc in self.document_metadata.values() for page in doc.values() | |
| ] | |
| self.save_document_data() | |
| self.logger.info( | |
| f"Total document chunks extracted: {len(self.document_chunks_full)}" | |
| ) | |
| return self.document_chunks_full, document_names, documents, document_metadata | |
| def process_documents( | |
| self, documents, file_path, file_type, metadata_source, addl_metadata | |
| ): | |
| file_data = {} | |
| file_metadata = {} | |
| for doc in documents: | |
| # if len(doc.page_content) <= 400: # better approach to filter out non-informative documents | |
| # continue | |
| page_num = doc.metadata.get("page", 0) | |
| file_data[page_num] = doc.page_content | |
| # Create a new dictionary for metadata in each iteration | |
| metadata = addl_metadata.get(file_path, {}).copy() | |
| metadata["page"] = page_num | |
| metadata["source"] = file_path | |
| file_metadata[page_num] = metadata | |
| if self.config["vectorstore"]["db_option"] not in ["RAGatouille"]: | |
| document_chunks = self.process_chunks( | |
| doc.page_content, | |
| file_type, | |
| source=file_path, | |
| page=page_num, | |
| metadata=metadata, | |
| ) | |
| self.document_chunks_full.extend(document_chunks) | |
| self.document_data[file_path] = file_data | |
| self.document_metadata[file_path] = file_metadata | |
| def process_file(self, file_path, file_index, file_reader, addl_metadata): | |
| print(f"Processing file {file_index + 1} : {file_path}") | |
| file_name = os.path.basename(file_path) | |
| file_type = file_name.split(".")[-1] | |
| read_methods = { | |
| "pdf": file_reader.read_pdf, | |
| "txt": file_reader.read_txt, | |
| "docx": file_reader.read_docx, | |
| "srt": file_reader.read_srt, | |
| "tex": file_reader.read_tex_from_url, | |
| } | |
| if file_type not in read_methods: | |
| self.logger.warning(f"Unsupported file type: {file_type}") | |
| return | |
| try: | |
| if file_path in self.document_data: | |
| self.logger.warning(f"File {file_name} already processed") | |
| documents = [ | |
| Document(page_content=content) | |
| for content in self.document_data[file_path].values() | |
| ] | |
| else: | |
| documents = read_methods[file_type](file_path) | |
| self.process_documents( | |
| documents, file_path, file_type, "file", addl_metadata | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Error processing file {file_name}: {str(e)}") | |
| def process_weblink(self, link, link_index, file_reader, addl_metadata): | |
| if link in self.document_data: | |
| return | |
| self.logger.info(f"Reading link {link_index + 1} : {link}") | |
| try: | |
| if "youtube" in link: | |
| documents = file_reader.read_youtube_transcript(link) | |
| else: | |
| documents = file_reader.read_html(link) | |
| self.process_documents(documents, link, "txt", "link", addl_metadata) | |
| except Exception as e: | |
| self.logger.error(f"Error Reading link {link_index + 1} : {link}: {str(e)}") | |
| def save_document_data(self): | |
| if not os.path.exists(f"{self.config['log_chunk_dir']}/docs"): | |
| os.makedirs(f"{self.config['log_chunk_dir']}/docs") | |
| self.logger.info( | |
| f"Creating directory {self.config['log_chunk_dir']}/docs for document data" | |
| ) | |
| self.logger.info( | |
| f"Saving document content to {self.config['log_chunk_dir']}/docs/doc_content.json" | |
| ) | |
| if not os.path.exists(f"{self.config['log_chunk_dir']}/metadata"): | |
| os.makedirs(f"{self.config['log_chunk_dir']}/metadata") | |
| self.logger.info( | |
| f"Creating directory {self.config['log_chunk_dir']}/metadata for document metadata" | |
| ) | |
| self.logger.info( | |
| f"Saving document metadata to {self.config['log_chunk_dir']}/metadata/doc_metadata.json" | |
| ) | |
| with open( | |
| f"{self.config['log_chunk_dir']}/docs/doc_content.json", "w" | |
| ) as json_file: | |
| json.dump(self.document_data, json_file, indent=4) | |
| with open( | |
| f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "w" | |
| ) as json_file: | |
| json.dump(self.document_metadata, json_file, indent=4) | |
| def load_document_data(self): | |
| try: | |
| with open( | |
| f"{self.config['log_chunk_dir']}/docs/doc_content.json", "r" | |
| ) as json_file: | |
| self.document_data = json.load(json_file) | |
| with open( | |
| f"{self.config['log_chunk_dir']}/metadata/doc_metadata.json", "r" | |
| ) as json_file: | |
| self.document_metadata = json.load(json_file) | |
| self.logger.info( | |
| f"Loaded document content from {self.config['log_chunk_dir']}/docs/doc_content.json. Total documents: {len(self.document_data)}" | |
| ) | |
| except FileNotFoundError: | |
| self.logger.warning( | |
| f"Document content not found in {self.config['log_chunk_dir']}/docs/doc_content.json" | |
| ) | |
| self.document_data = {} | |
| self.document_metadata = {} | |
| class DataLoader: | |
| def __init__(self, config, logger=None): | |
| self.file_reader = FileReader( | |
| logger=logger, kind=config["llm_params"]["pdf_reader"] | |
| ) | |
| self.chunk_processor = ChunkProcessor(config, logger=logger) | |
| def get_chunks(self, uploaded_files, weblinks): | |
| return self.chunk_processor.chunk_docs( | |
| self.file_reader, uploaded_files, weblinks | |
| ) | |
| if __name__ == "__main__": | |
| import yaml | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Process some links.") | |
| parser.add_argument( | |
| "--links", nargs="+", required=True, help="List of links to process." | |
| ) | |
| parser.add_argument( | |
| "--config_file", type=str, help="Path to the main config file", required=True | |
| ) | |
| parser.add_argument( | |
| "--project_config_file", | |
| type=str, | |
| help="Path to the project config file", | |
| required=True, | |
| ) | |
| args = parser.parse_args() | |
| links_to_process = args.links | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| with open(args.config_file, "r") as f: | |
| config = yaml.safe_load(f) | |
| with open(args.project_config_file, "r") as f: | |
| project_config = yaml.safe_load(f) | |
| # Combine project config with the main config | |
| config.update(project_config) | |
| STORAGE_DIR = os.path.join(BASE_DIR, config["vectorstore"]["data_path"]) | |
| uploaded_files = [ | |
| os.path.join(STORAGE_DIR, file) | |
| for file in os.listdir(STORAGE_DIR) | |
| if file != "urls.txt" | |
| ] | |
| data_loader = DataLoader(config, logger=logger) | |
| # Just for testing | |
| ( | |
| document_chunks, | |
| document_names, | |
| documents, | |
| document_metadata, | |
| ) = data_loader.get_chunks( | |
| links_to_process, | |
| [], | |
| ) | |
| print(document_names[:5]) | |
| print(len(document_chunks)) | |