Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import subprocess | |
import gradio as gr | |
from langchain import HuggingFaceHub | |
from langchain.chains import create_retrieval_chain | |
from langchain.chains.combine_documents import create_stuff_documents_chain | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain_core.prompts import ChatPromptTemplate | |
# Get the directory of the current module | |
module_directory = os.path.dirname(os.path.abspath(__file__)) | |
class DocumentProcessor: | |
def __init__(self, document_paths, token): | |
self.document_paths = document_paths | |
os.environ["HUGGINGFACEHUB_API_TOKEN"] = os.getenv('hf_token') | |
# self.load_documents([r"E:\FreeLance\download\tmp_doc.pdf"]) | |
self.persist_directory = './docs/chroma/' | |
# self.vectordb = None | |
self.llm = HuggingFaceHub(repo_id="tiiuae/falcon-7b-instruct", | |
model_kwargs={"max_length": 300, "max_new_tokens": 300}) | |
# self.llm = HuggingFaceHub(repo_id="tiiuae/falcon-7b-instruct", | |
# model_kwargs={"max_length": 1000, "max_new_tokens": 1000}) | |
# self.llm = HuggingFaceHub(repo_id="tiiuae/falcon-40b", # tiiuae/falcon-40b | |
# model_kwargs={"max_length": 1000, "max_new_tokens": 1000}) | |
def load_documents(self, file_paths): | |
# self.document_paths = [r"E:\FreeLance\download\tmp_doc.pdf"] | |
self.document_paths = file_paths | |
# Load PDF | |
loaders = [ | |
# Duplicate documents on purpose - messy data | |
PyPDFLoader(self.document_paths[0]) | |
# PyPDFLoader("/content/documents/ddpm.pdf"), | |
] | |
self.docs = [] | |
for loader in loaders: | |
self.docs.extend(loader.load()) | |
def split_documents(self): | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=15) | |
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=150) | |
self.splits = text_splitter.split_documents(self.docs) | |
def change_permissions(self, directory): | |
try: | |
# Define the command | |
command = ["chmod", "777", "-R", directory] | |
# Execute the command | |
subprocess.run(command, check=True) | |
print(f"Permissions for {directory} changed to 664 successfully.") | |
except subprocess.CalledProcessError as e: | |
print(f"An error occurred while changing permissions: {e}") | |
def delete_embeddings(self): | |
if os.path.isdir(self.persist_directory): | |
self.change_permissions(self.persist_directory) | |
print('directory exist') | |
shutil.rmtree(self.persist_directory, ignore_errors=True) | |
def create_embeddings(self): | |
embeddings = HuggingFaceEmbeddings() | |
self.vectordb_doc = Chroma.from_documents( | |
documents=self.splits, | |
embedding=embeddings, | |
persist_directory=self.persist_directory | |
) | |
print(self.vectordb_doc._collection.count()) | |
# self.vectordb = vectordb | |
def get_embeddings(self): | |
return self.vectordb_doc | |
def parse_output(self, response): | |
# Find the index where "Question:" starts | |
question_index = response.find("Question:") | |
# Get all text including and after "Question:" | |
if question_index != -1: | |
result_text = response[question_index:].strip() | |
return result_text | |
else: | |
return "I apologies, I don't know the answer" | |
def document_chain(self): | |
# prompt = ChatPromptTemplate.from_template("""Answer the following question based only on the provided context: | |
# | |
# <context> | |
# {context} | |
# </context> | |
# | |
# Question: {input}""") | |
prompt = ChatPromptTemplate.from_template(""" | |
Answer the following question based only on the provided context: {context} | |
Question: {input} | |
""") | |
document_chain = create_stuff_documents_chain(self.llm, prompt) | |
return document_chain | |
def reterival_chain(self, document_chain, document_embeddings): | |
retriever = document_embeddings.as_retriever() | |
retrieval_chain = create_retrieval_chain(retriever, document_chain) | |
return retrieval_chain | |
def get_response(self, retrieval_chain, message): | |
# response = retrieval_chain.invoke({"input": "how can langsmith help with testing?"}) | |
response = retrieval_chain.invoke({"input": message}) | |
# print(response["answer"]) | |
return response["answer"] | |
print('') | |
def upload_file(files, processor): | |
try: | |
file_paths = [file.name for file in files] | |
processor.load_documents(file_paths) | |
processor.split_documents() | |
processor.delete_embeddings() | |
doc_embeddings = processor.create_embeddings() | |
gr.Info("Document Uploaded,Enjoy Chat Now!") | |
except Exception as e: | |
# Handle any exceptions that occur during execution | |
print(f"An error occurred: {e}") | |
gr.Warning("Upload File(s) Again!") | |
# return doc_embeddings | |
# print(file_paths) | |
def echo(message, history, processor): | |
try: | |
document_chain = processor.document_chain() | |
document_embeddings = processor.get_embeddings() | |
reterival_chain = processor.reterival_chain(document_chain, document_embeddings) | |
chain_result = processor.get_response(reterival_chain, message) | |
parsed_result = processor.parse_output(chain_result) | |
return parsed_result | |
except Exception as e: | |
# Handle any exceptions that occur during execution | |
print(f"An error occurred: {e}") | |
gr.Warning("An Error Occurred, Refresh Website!") | |
def upload_warning(): | |
gr.Warning("Upload PDF File(s) First!") | |
def main(): | |
css = """ | |
.container { | |
height: 90vh; | |
} | |
.container_1 { | |
height: 80vh; | |
} | |
.container_2 { | |
height: 20vh; | |
} | |
""" | |
processor = DocumentProcessor(document_paths='', token='') | |
with gr.Blocks(css=css) as demo: | |
demo.load(upload_warning, inputs=None, outputs=None) | |
with gr.Column(elem_classes=["container"]): | |
gr.Markdown("## Chat with your Data") | |
with gr.Column(elem_classes=["container_2"]): | |
# gr.Markdown("Make sure uploading PDF file(s) first!") | |
file_output = gr.File() | |
upload_button = gr.UploadButton("Click to Upload File(s)", file_types=["pdf", "doc"], | |
file_count="multiple") | |
# Function to handle the upload and pass the processor | |
def process_upload(files): | |
upload_file(files, processor) | |
# Get the document embeddings returned by process_upload | |
upload_button.upload(process_upload, upload_button, file_output) | |
with gr.Column(elem_classes=["container_1"]): | |
def process_echo(message, history): | |
return echo(message, history, processor) | |
gr.ChatInterface(fn=process_echo, examples=["what is title", "what is summary", "create notes"]) | |
gr.Markdown("* Note: The answers can be incorrect, However they can be enhanced") | |
# with gr.Blocks() as demo: | |
# gr.ChatInterface(fn=echo, examples=["what is title", "what is summary", "merhaba"], title="chat with your data") | |
## file_output = gr.File() | |
# upload_button = gr.UploadButton("Click to Upload a File", file_types=["pdf", "doc"], file_count="multiple") | |
# upload_button.upload(upload_file, upload_button, file_output) | |
# demo = gr.ChatInterface(fn=echo, examples=["what is title", "what is summary", "merhaba"], title="Echo Bot") | |
demo.launch() | |
if __name__ == "__main__": | |
main() | |