Spaces:
Runtime error
Runtime error
import streamlit as st | |
from pypdf import PdfReader | |
import os | |
from pathlib import Path | |
from dotenv import load_dotenv | |
import pickle | |
import timeit | |
from PIL import Image | |
import zipfile | |
import datetime | |
import shutil | |
from collections import defaultdict | |
import pandas as pd | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.document_loaders import PyPDFLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.document_loaders import PyPDFLoader, DirectoryLoader | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain.prompts.prompt import PromptTemplate | |
from langchain.vectorstores import Chroma | |
from langchain.document_loaders import PyPDFDirectoryLoader | |
from langchain.retrievers import BM25Retriever, EnsembleRetriever | |
from langchain.document_loaders import UnstructuredHTMLLoader | |
from langchain.llms import OpenAI | |
from langchain.chat_models import ChatOpenAI | |
from langchain.agents.agent_toolkits import create_retriever_tool | |
from langchain.agents.agent_toolkits import create_conversational_retrieval_agent | |
from langchain.utilities import SerpAPIWrapper | |
from langchain.agents import Tool | |
from langchain.agents import load_tools | |
load_dotenv() | |
current_timestamp = datetime.datetime.now() | |
timestamp_string = current_timestamp.strftime("%Y-%m-%d %H:%M:%S") | |
def build_llm(): | |
''' | |
Loading OpenAI model | |
''' | |
# llm= OpenAI(temperature=0.2) | |
llm= ChatOpenAI(temperature = 0, max_tokens=256) | |
return llm | |
def build_embedding_model(): | |
''' | |
Loading Sentence transformer model for text embedding | |
''' | |
embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2', | |
model_kwargs={'device': 'cpu'}) | |
return embeddings | |
def unzip_opm(): | |
# Specify the path to your ZIP file | |
zip_file_path = r'OPM_Files/OPM_Retirement_backup-20230902T130906Z-001.zip' | |
# Get the directory where the ZIP file is located | |
extract_path = os.path.dirname(zip_file_path) | |
# Create a folder with the same name as the ZIP file (without the .zip extension) | |
extract_folder = os.path.splitext(os.path.basename(zip_file_path))[0] | |
extract_folder_path = os.path.join(extract_path, extract_folder) | |
# Create the folder if it doesn't exist | |
if not os.path.exists(extract_folder_path): | |
os.makedirs(extract_folder_path) | |
# Open the ZIP file for reading | |
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | |
# Extract all the contents into the created folder | |
zip_ref.extractall(extract_folder_path) | |
print(f'Unzipped {zip_file_path} to {extract_folder_path}') | |
return extract_folder_path | |
return | |
def count_files_by_type(folder_path): | |
''' | |
Counting files by file type in the specified folder | |
''' | |
file_count_by_type = defaultdict(int) | |
for root, _, files in os.walk(folder_path): | |
for file in files: | |
_, extension = os.path.splitext(file) | |
file_count_by_type[extension] += 1 | |
return file_count_by_type | |
def generate_file_count_table(file_count_by_type): | |
''' | |
Generate a table files count file type | |
''' | |
data = {"File Type": [], "Number of Files": []} | |
for extension, count in file_count_by_type.items(): | |
data["File Type"].append(extension) | |
data["Number of Files"].append(count) | |
df = pd.DataFrame(data) | |
df = df.sort_values(by="Number of Files", ascending=False) # Sort by number of files | |
return df | |
def move_files_to_folders(folder_path): | |
''' | |
Move files to respective folder. Example, PDF docs to PDFs folder, HTML docs to HTMLs folder. | |
''' | |
for root, _, files in os.walk(folder_path): | |
for file in files: | |
_, extension = os.path.splitext(file) | |
source_path = os.path.join(root, file) | |
if extension == '.pdf': | |
dest_folder = "PDFs" | |
elif extension == '.html': | |
dest_folder = "HTMLs" | |
else: | |
continue | |
dest_path = os.path.join(dest_folder, file) | |
os.makedirs(dest_folder, exist_ok=True) | |
shutil.copy(source_path, dest_path) | |
def load_vectorstore(persist_directory, embeddings): | |
''' | |
This function will try first to load chroma database from the disk. If it does exist, | |
It will do the following, | |
1) Load the pdfs | |
2) create text chunks | |
3) Index it and store it in a Chroma DB | |
4) Peform the same for HTML files | |
5) Store the final chroma db in the disk | |
''' | |
if os.path.exists(persist_directory): | |
print("Using existing vectore store for these documents.") | |
vectorstore = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
print("Chroma DB loaded from the disk") | |
return vectorstore | |
else: | |
folder_path= unzip_opm() | |
print("Vector store is not available. Creating new one.") | |
file_count_by_type = count_files_by_type(folder_path) | |
file_count_table = generate_file_count_table(file_count_by_type) | |
print("File Count Table:") | |
print(file_count_table) | |
#move files into respective folders | |
move_files_to_folders(folder_path) | |
print("PDF and HTML files copied to separate folders.") | |
# Load the pdf files from the pdffolder in order to create new chroma db | |
pdf_folder_path= f"{folder_path}/PDFs" #pdf folder | |
html_folder_path= f"{folder_path}/HTMLs" #html folder | |
pdf_dir_loader = PyPDFDirectoryLoader(pdf_folder_path) | |
pdf_pages = pdf_dir_loader.load() | |
print("PDF files are loaded from the folder.") | |
#Loading HTML files from the html folder in order to create new chroma db | |
HTML_docs_path_list = [os.path.join(html_folder_path, f) for f in os.listdir(html_folder_path) if os.path.isfile(os.path.join(html_folder_path, f))] | |
html_loaders= [] | |
for html_file in HTML_docs_path_list: | |
loader = UnstructuredHTMLLoader(html_file) | |
html_loaders.append(loader) | |
html_pages = [] | |
docs_cannot_load= [] | |
for loader in html_loaders: | |
try: | |
html_pages.extend(loader.load()) | |
except: | |
print("Cannot load the file:", loader) | |
docs_cannot_load.append(loader) | |
print("HTML files are loaded from the folder.") | |
# Create text chunks from the PDF docs | |
text_splitter = RecursiveCharacterTextSplitter( | |
# Set a really small chunk size, just to show. | |
chunk_size = 1000, | |
chunk_overlap = 200, | |
length_function = len, | |
is_separator_regex = False, | |
) | |
pdf_texts = text_splitter.transform_documents(pdf_pages) | |
# Create text chunks from the HTML docs | |
html_texts = text_splitter.transform_documents(html_pages) | |
# Merging all the text chunks (HTML + PDF) | |
all_texts= pdf_texts+html_texts | |
print("PDF and HTML docs are split into chunks and created a final list representing all the chunks.") | |
# Create embeddings for all the text chunks and store it in a Chroma DB | |
vectorstore = Chroma.from_documents(all_texts, | |
embeddings, | |
persist_directory=persist_directory) | |
vectorstore.persist() | |
print("Chroma DB created and loaded") | |
return vectorstore | |
def load_text_chunks(text_chunks_pkl_dir): | |
''' | |
Loading the pickle file that holds all the documents from the disk. | |
If it does not exist, create new one. | |
Text documents are required to create BM25 Retriever. But loading all the documents in | |
every session will be a time consuming process. So we are storing all the docs in a pickle file | |
and load the pickle file from the disk to overcome this problem. | |
''' | |
try: | |
print("Text chunks are loading from the disk") | |
with open(text_chunks_pkl_dir, 'rb') as file: | |
cached_text_chunks = pickle.load(file) | |
# Now, `cached_text_chunks` contains your cached data | |
print("Text chunks are loaded from the disk") | |
return cached_text_chunks | |
except: | |
print("Creating text chunks from the docs and caching it.") | |
folder_path= unzip_opm() | |
pdf_folder_path= f"{folder_path}/PDFs" #pdf folder | |
html_folder_path= f"{folder_path}/HTMLs" #html folder | |
pdf_dir_loader = PyPDFDirectoryLoader(pdf_folder_path) | |
pdf_pages = pdf_dir_loader.load() | |
HTML_docs_path_list = [os.path.join(html_folder_path, f) for f in os.listdir(html_folder_path) if os.path.isfile(os.path.join(html_folder_path, f))] | |
html_loaders= [] | |
for html_file in HTML_docs_path_list: | |
loader = UnstructuredHTMLLoader(html_file) | |
html_loaders.append(loader) | |
html_pages = [] | |
for loader in html_loaders: | |
try: | |
html_pages.extend(loader.load()) | |
except: | |
print("Cannot load the file:", loader) | |
all_texts= pdf_pages+html_pages | |
# Cache the list to a file | |
with open('text_chunks.pkl', 'wb') as file: | |
pickle.dump(all_texts, file) | |
print("Text chunks are created and cached") | |
def load_ensemble_retriver(text_chunks, embeddings, chroma_vectorstore): | |
"""Load ensemble retiriever with BM25 and Chroma as individual retrievers""" | |
bm25_retriever = BM25Retriever.from_documents(text_chunks) | |
bm25_retriever.k = 1 | |
chroma_retriever = chroma_vectorstore.as_retriever(search_kwargs={"k": 1}) | |
ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, chroma_retriever], weights=[0.3, 0.7]) | |
return ensemble_retriever | |
def load_conversational_retrievel_agent(retriever, llm): | |
'''Load Conversational Retrievel agent with following tasks as tools, | |
1) OPM Knowledge base query | |
2) INternet search with SerpAPI | |
This agent combines RAG, chat interfaces, agents. | |
''' | |
retriever_tool = create_retriever_tool( | |
retriever, | |
"Search_US_Office_of_Personnel_Management_Document", | |
"Searches and returns documents regarding the U.S. Office of Personnel Management (OPM).") | |
search_api = SerpAPIWrapper() | |
search_api_tool = Tool( | |
name = "Current_Search", | |
func=search_api.run, | |
description="useful for when you need to answer questions about current events or the current state of the world" | |
) | |
tools = [retriever_tool] | |
agent_executor = create_conversational_retrieval_agent(llm, tools, verbose=True, max_token_limit=512) | |
return agent_executor | |