Spaces:
Sleeping
Sleeping
# importing necessary libraries | |
import os | |
import time | |
import streamlit as st | |
from dotenv import load_dotenv | |
from PyPDF2 import PdfReader | |
from docx import Document | |
from docx.text.paragraph import Paragraph | |
from docx.table import Table | |
from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
from langchain.prompts import PromptTemplate | |
from langchain.chains import LLMChain | |
from langchain.memory import ConversationBufferWindowMemory | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
# load the environment variables into the python script | |
load_dotenv() | |
# fetching the openai_api_key environment variable | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
# Initialize session states | |
if "vectorDB" not in st.session_state: | |
st.session_state.vectorDB = None | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "bot_name" not in st.session_state: | |
st.session_state.bot_name = "" | |
if "chain" not in st.session_state: | |
st.session_state.chain = None | |
def process_paragraph(paragraph): | |
"""This Function returns the content of the paragraph present inside the DOC file""" | |
return paragraph.text | |
def process_table(table): | |
"""This function extracts the content from the table present inside the DOC file""" | |
text = "" | |
for row in table.rows: | |
for cell in row.cells: | |
text += cell.text | |
return text | |
def read_docx(file_path): | |
"""This function extracts the text from the DOC file""" | |
doc = Document(file_path) | |
text = [] | |
for element in doc.iter_inner_content(): | |
if isinstance(element, Paragraph): | |
text.append(process_paragraph(element)) | |
elif isinstance(element, Table): | |
text.append(process_table(element)) | |
return " ".join(text) | |
def read_text_file(text_file): | |
"""This function extracts the text from the TEXT file""" | |
try: | |
text = text_file.read().decode("utf-8") | |
return text | |
except Exception as e: | |
st.error(f"Error while reading {text_file.name} file : **{e}**") | |
return None | |
def get_pdf_text(pdf): | |
"""This function extracts the text from the PDF file""" | |
try: | |
text = [] | |
pdf_reader = PdfReader(pdf) | |
for page in pdf_reader.pages: | |
text.append(page.extract_text()) | |
return " ".join(text) | |
except Exception as e: | |
st.error(f"Error while reading {pdf.name} file : **{e}**") | |
return None | |
def get_vectorstore(text_chunks): | |
"""This function will create a vector database as well as create & store the embedding of the text chunks into the VectorDB""" | |
embeddings = OpenAIEmbeddings() | |
vectorstore = FAISS.from_texts(texts=text_chunks, embedding=embeddings) | |
return vectorstore | |
def get_text_chunks(text: str): | |
"""This function will split the text into the smaller chunks""" | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=50, | |
length_function=len, | |
is_separator_regex=False, | |
) | |
chunks = text_splitter.split_text(text) | |
return chunks | |
def processing(files): | |
"""This function""" | |
data = [] | |
for file in files: | |
if file.name.endswith(".docx"): | |
text = read_docx(file) | |
elif file.name.endswith(".pdf"): | |
text = get_pdf_text(file) | |
else: | |
text = read_text_file(file) | |
data.append(text) | |
raw_text = " ".join(data) | |
# divinding the raw text into smaller chunks | |
text_chunks = get_text_chunks(raw_text) | |
# Creating and storing the chunks in vector database | |
vectorDB = get_vectorstore(text_chunks) | |
return vectorDB | |
def get_response(query: str): | |
"""This function will return the output of the user query!""" | |
# getting the context from the database that is similar to the user query | |
query_context = st.session_state.vectorDB.similarity_search(query=query) | |
# calling the chain to get the output from the LLM | |
response = st.session_state.chain.invoke( | |
{ | |
"human_input": query, | |
"context": query_context[0].page_content, | |
"name": st.session_state.bot_name, | |
} | |
)["text"] | |
# Iterate through each word in the 'response' string after splitting it based on whitespace | |
for word in response.split(): | |
# Yield the current word followed by a space, effectively creating a generator | |
yield word + " " | |
# Pause execution for 0.05 seconds (50 milliseconds) to introduce a delay | |
time.sleep(0.05) | |
def get_conversation_chain(vectorDB): | |
"""This function will create and return a LLM-Chain""" | |
# using OPENAI ChatModel | |
llm = ChatOpenAI(temperature=0.1, model="gpt-3.5-turbo-16k") | |
# creating a template to pass into LLM | |
template = """You are a friendly customer support ChatBot with a name: {name} for the company, aiming to enhance the customer experience by providing tailored assistance and information. | |
Answer the question as detailed as possible and to the point from the context: {context}\n\n. | |
If the answer is not in the provided context then only just say, "answer is not available in the context", do not provide the wrong answer\n\n | |
{chat_history} | |
Human: {human_input} | |
AI: """ | |
# creating a prompt that is used to format the input of the user | |
prompt = PromptTemplate( | |
template=template, | |
input_variables=["chat_history", "human_input", "name", "context"], | |
) | |
# creating a memory that will store the chat history between chatbot and user | |
memory = ConversationBufferWindowMemory( | |
memory_key="chat_history", input_key="human_input", k=5 | |
) | |
chain = LLMChain(llm=llm, prompt=prompt, memory=memory, verbose=True) | |
return chain | |
if __name__ == "__main__": | |
# setting the config of WebPage | |
st.set_page_config(page_title="Personalized ChatBot", page_icon="π€") | |
st.header("Personalized Customer Support Chatbot π€", divider="rainbow") | |
# taking input( bot name and pdf file) from the user | |
with st.sidebar: | |
st.caption("Please enter the **Bot Name** and Upload **PDF** File!") | |
bot_name = st.text_input( | |
label="Bot Name", placeholder="Enter the bot name here....", key="bot_name" | |
) | |
files = st.file_uploader( | |
label="Upload Files!", | |
type=["pdf", "txt", "docx"], | |
accept_multiple_files=True, | |
) | |
# moving forward only when both the inputs are given by the user | |
if files and bot_name: | |
# the Process File button will process the pdf file and save the chunks into the vector database | |
if st.button("Process File"): | |
# if there is existing chat history we will delete it | |
if st.session_state.messages != []: | |
st.session_state.messages = [] | |
with st.spinner("Processing....."): | |
st.session_state["vectorDB"] = processing(files) | |
st.session_state["chain"] = get_conversation_chain( | |
st.session_state["vectorDB"] | |
) | |
st.success("File Processed", icon="β ") | |
# if the vector database is ready to use then only show the chatbot interface | |
if st.session_state.vectorDB: | |
# Display chat messages from history on app rerun | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.write(message["content"]) | |
# taking the input i.e. query from the user (walrus operator) | |
if prompt := st.chat_input(f"Message {st.session_state.bot_name}"): | |
# Add user message to chat history | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
# Display user message in chat message container | |
with st.chat_message("user"): | |
st.write(prompt) | |
# Display assistant response in chat message container | |
with st.chat_message("assistant"): | |
response = st.write_stream(get_response(prompt)) | |
# Add assistant response to chat history | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |