import re import pysrt from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.document_loaders import ( PyMuPDFLoader, Docx2txtLoader, YoutubeLoader, WebBaseLoader, TextLoader, ) from langchain.schema import Document from tempfile import NamedTemporaryFile import logging logger = logging.getLogger(__name__) class DataLoader: def __init__(self, config): """ Class for handling all data extraction and chunking Inputs: config - dictionary from yaml file, containing all important parameters """ self.config = config self.remove_leftover_delimiters = config["splitter_options"][ "remove_leftover_delimiters" ] # Main list of all documents self.document_chunks_full = [] self.document_names = [] if config["splitter_options"]["use_splitter"]: if config["splitter_options"]["split_by_token"]: self.splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=config["splitter_options"]["chunk_size"], chunk_overlap=config["splitter_options"]["chunk_overlap"], separators=config["splitter_options"]["chunk_separators"], ) else: self.splitter = RecursiveCharacterTextSplitter( chunk_size=config["splitter_options"]["chunk_size"], chunk_overlap=config["splitter_options"]["chunk_overlap"], separators=config["splitter_options"]["chunk_separators"], ) else: self.splitter = None logger.info("InfoLoader instance created") def get_chunks(self, uploaded_files, weblinks): # Main list of all documents self.document_chunks_full = [] self.document_names = [] def remove_delimiters(document_chunks: list): """ Helper function to remove remaining delimiters in document chunks """ for chunk in document_chunks: for delimiter in self.config["splitter_options"][ "delimiters_to_remove" ]: chunk.page_content = re.sub(delimiter, " ", chunk.page_content) return document_chunks def remove_chunks(document_chunks: list): """ Helper function to remove any unwanted document chunks after splitting """ front = self.config["splitter_options"]["front_chunk_to_remove"] end = self.config["splitter_options"]["last_chunks_to_remove"] # Remove pages for _ in range(front): del document_chunks[0] for _ in range(end): document_chunks.pop() logger.info(f"\tNumber of pages after skipping: {len(document_chunks)}") return document_chunks def get_pdf(temp_file_path: str, title: str): """ Function to process PDF files """ loader = PyMuPDFLoader( temp_file_path ) # This loader preserves more metadata if self.splitter: document_chunks = self.splitter.split_documents(loader.load()) else: document_chunks = loader.load() if "title" in document_chunks[0].metadata.keys(): title = document_chunks[0].metadata["title"] logger.info( f"\t\tOriginal no. of pages: {document_chunks[0].metadata['total_pages']}" ) return title, document_chunks def get_txt(temp_file_path: str, title: str): """ Function to process TXT files """ loader = TextLoader(temp_file_path, autodetect_encoding=True) if self.splitter: document_chunks = self.splitter.split_documents(loader.load()) else: document_chunks = loader.load() # Update the metadata for chunk in document_chunks: chunk.metadata["source"] = title chunk.metadata["page"] = "N/A" return title, document_chunks def get_srt(temp_file_path: str, title: str): """ Function to process SRT files """ subs = pysrt.open(temp_file_path) text = "" for sub in subs: text += sub.text document_chunks = [Document(page_content=text)] if self.splitter: document_chunks = self.splitter.split_documents(document_chunks) # Update the metadata for chunk in document_chunks: chunk.metadata["source"] = title chunk.metadata["page"] = "N/A" return title, document_chunks def get_docx(temp_file_path: str, title: str): """ Function to process DOCX files """ loader = Docx2txtLoader(temp_file_path) if self.splitter: document_chunks = self.splitter.split_documents(loader.load()) else: document_chunks = loader.load() # Update the metadata for chunk in document_chunks: chunk.metadata["source"] = title chunk.metadata["page"] = "N/A" return title, document_chunks def get_youtube_transcript(url: str): """ Function to retrieve youtube transcript and process text """ loader = YoutubeLoader.from_youtube_url( url, add_video_info=True, language=["en"], translation="en" ) if self.splitter: document_chunks = self.splitter.split_documents(loader.load()) else: document_chunks = loader.load_and_split() # Replace the source with title (for display in st UI later) for chunk in document_chunks: chunk.metadata["source"] = chunk.metadata["title"] logger.info(chunk.metadata["title"]) return title, document_chunks def get_html(url: str): """ Function to process websites via HTML files """ loader = WebBaseLoader(url) if self.splitter: document_chunks = self.splitter.split_documents(loader.load()) else: document_chunks = loader.load_and_split() title = document_chunks[0].metadata["title"] logger.info(document_chunks[0].metadata) return title, document_chunks # Handle file by file for file_index, file_path in enumerate(uploaded_files): file_name = file_path.split("/")[-1] file_type = file_name.split(".")[-1] # Handle different file types if file_type == "pdf": title, document_chunks = get_pdf(file_path, file_name) elif file_type == "txt": title, document_chunks = get_txt(file_path, file_name) elif file_type == "docx": title, document_chunks = get_docx(file_path, file_name) elif file_type == "srt": title, document_chunks = get_srt(file_path, file_name) # Additional wrangling - Remove leftover delimiters and any specified chunks if self.remove_leftover_delimiters: document_chunks = remove_delimiters(document_chunks) if self.config["splitter_options"]["remove_chunks"]: document_chunks = remove_chunks(document_chunks) logger.info(f"\t\tExtracted no. of chunks: {len(document_chunks)}") self.document_names.append(title) self.document_chunks_full.extend(document_chunks) # Handle youtube links: if weblinks[0] != "": logger.info(f"Splitting weblinks: total of {len(weblinks)}") # Handle link by link for link_index, link in enumerate(weblinks): try: logger.info(f"\tSplitting link {link_index+1} : {link}") if "youtube" in link: title, document_chunks = get_youtube_transcript(link) else: title, document_chunks = get_html(link) # Additional wrangling - Remove leftover delimiters and any specified chunks if self.remove_leftover_delimiters: document_chunks = remove_delimiters(document_chunks) if self.config["splitter_options"]["remove_chunks"]: document_chunks = remove_chunks(document_chunks) print(f"\t\tExtracted no. of chunks: {len(document_chunks)}") self.document_names.append(title) self.document_chunks_full.extend(document_chunks) except: logger.info(f"\t\tError splitting link {link_index+1} : {link}") logger.info( f"\tNumber of document chunks extracted in total: {len(self.document_chunks_full)}\n\n" ) return self.document_chunks_full, self.document_names