RITES2 / app.py
akshansh36's picture
Upload 3 files
d059469 verified
import streamlit as st
import streamlit_chat
import os
from config import app_name
from config import website_name
from config import DATABASE
from config import PINECONE_INDEX
from config import CHAT_COLLECTION
from pymongo import MongoClient
from bson import ObjectId
from dotenv import load_dotenv
import pinecone
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
import re
st.set_page_config(layout="wide", page_title=app_name, page_icon="πŸ“„")
load_dotenv()
import logging
from pytz import timezone, utc
from datetime import datetime
logging.basicConfig(
level=logging.DEBUG, # This is for your application logs
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Suppress pymongo debug logs by setting the pymongo logger to a higher level
pymongo_logger = logging.getLogger('pymongo')
pymongo_logger.setLevel(logging.WARNING)
FLASH_API = os.getenv("FLASH_API")
PINECONE_API = os.getenv("PINECONE_API_KEY")
MONGO_URI = os.getenv("MONGO_URI")
pc = pinecone.Pinecone(
api_key=PINECONE_API
)
index = pc.Index(PINECONE_INDEX)
# MongoDB connection setup
client = MongoClient(MONGO_URI)
db = client[DATABASE]
chat_sessions = db[CHAT_COLLECTION]
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=FLASH_API)
llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash", temperature=0, max_tokens=None, google_api_key=FLASH_API)
# Load the extracted JSON data
# Initialize session state for current chat session
if 'current_chat_id' not in st.session_state:
st.session_state['current_chat_id'] = None
if 'chat_history' not in st.session_state:
st.session_state['chat_history'] = []
if 'regenerate' not in st.session_state:
st.session_state['regenerate'] = False # Track regenerate button state
# Function to create a new chat session in MongoDB
def create_new_chat_session():
# Get the current time in IST
ind_time = datetime.now(timezone("Asia/Kolkata"))
# Convert IST time to UTC for storing in MongoDB
utc_time = ind_time.astimezone(utc)
new_session = {
"created_at": utc_time, # Store in UTC
"messages": [] # Empty at first
}
session_id = chat_sessions.insert_one(new_session).inserted_id
return str(session_id)
# Function to load a chat session by MongoDB ID
# Function to load the chat session by MongoDB ID (load full history for display)
def load_chat_session(session_id):
session = chat_sessions.find_one({"_id": ObjectId(session_id)})
if session:
# Load the full chat history (no slicing here)
st.session_state['chat_history'] = session['messages']
# Function to update chat session in MongoDB (store last 15 question-answer pairs)
# Function to update chat session in MongoDB (store entire chat history)
def update_chat_session(session_id, question, answer, improved_question):
# Append the new question-answer pair to the full messages array
chat_sessions.update_one(
{"_id": ObjectId(session_id)},
{"$push": {
"messages": {"$each": [{"question": question, 'improved_question': improved_question, "answer": answer}]}}}
)
# Function to replace the last response in MongoDB
def replace_last_response_in_mongo(session_id, new_answer):
last_message_index = len(st.session_state['chat_history']) - 1
if last_message_index >= 0:
# Replace the last response in MongoDB
chat_sessions.update_one(
{"_id": ObjectId(session_id)},
{"$set": {f"messages.{last_message_index}.answer": new_answer}}
)
# Function to regenerate the response
def regenerate_response():
try:
if st.session_state['chat_history']:
last_question = st.session_state['chat_history'][-1]["question"] # Get the last question
# Exclude the last response from the history when sending the question to LLM
previous_history = st.session_state['chat_history'][:-1] # Exclude the last Q&A pair
with st.spinner("Please wait, regenerating the response!"):
# Generate a new response for the last question using only the previous history
query = get_context_from_messages(last_question, previous_history)
if query:
logging.info(f"Extracted query is :{query}\n")
extracted_query = get_query_from_llm_answer(query)
if extracted_query:
query = extracted_query
else:
query = last_question
query_embedding = embeddings.embed_query(query)
search_results = index.query(vector=query_embedding, top_k=10, include_metadata=True)
matches = search_results['matches']
content = ""
for i, match in enumerate(matches):
chunk = match['metadata']['chunk']
url = match['metadata']['url']
content += f"chunk{i}: {chunk}\n" + f"url{i}: {url}\n"
new_reply = generate_summary(content, query, previous_history)
st.session_state['chat_history'][-1]["answer"] = new_reply
# Update MongoDB with the new response
if st.session_state['current_chat_id']:
replace_last_response_in_mongo(st.session_state['current_chat_id'], new_reply)
st.session_state['regenerate'] = False # Reset regenerate flag
st.rerun()
except Exception as e:
st.error("Error occured in Regenerating response, please try again later.")
def generate_summary(chunks, query, chat_history):
try:
# Limit the history sent to the LLM to the latest 3 question-answer pairs
limited_history = chat_history[-3:] if len(chat_history) > 3 else chat_history
# Create conversation history for the LLM, only using the last 15 entries
history_text = "\n".join([f"User: {q['improved_question']}\nLLM: {q['answer']}" for q in limited_history])
# Define the system and user prompts including the limited history
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are a website-specific chatbot specializing in answering user queries about {website_name}. You will be provided with data chunks sourced from {website_name}, and each chunk has an associated URL. When formulating your responses:
1. Clarity and Completeness
- Always strive to deliver thorough, concise, and direct answers.
- If the user’s query is ambiguous or there are multiple possible answers, ask for clarification with a clear rationale.
2. No Chunk Names
- Do not reference chunk filenames or mention the term β€œchunk” in your replies.
- Instead, present the information in a natural, conversational style.
3. Use of Conversation History
- Refer back to conversation history for consistency and to get context for a follow up question.
- If there are previous statements like β€œThe answer is not available,” ignore them unless still relevant to the current query.
4. Handling Off-Topic Queries
- If the user sends greetings, introductions, or queries unrelated to {website_name}, respond politely and conversationally without forcing a website-related answer.
5. Source URLs
- Always provide the URLs you used to answer the query under a β€œSources” heading at the end of your reply.
- If the same URL appears in multiple relevant chunks, list that URL only once in the sources section.
- Only include the URLs that genuinely informed or supported your answer.
- if the answer itself contains url, then quote it properly
6. No Direct Chunk Quotes
- Summarize or paraphrase the original content instead of quoting chunk names or raw text verbatim (unless absolutely necessary for clarity).
7. Relevance Check
- Thoroughly check the provided data chunks before replying.
- If you can’t find an answer in the chunks, or if the query is irrelevant politely ask for clarification or explain that you cannot answer.
8. Formatting
- Present your answers in a well-structured formatβ€”either bullet points or clear paragraphsβ€”to ensure maximum readability.
"""),
("human", f'''
"Query":\n {query}\n
Below are the pinecone chunks that should be used to answer the user query:
"Extracted Data": \n{chunks}\n
Below is the previous conversation history:
"Previous Conversation History": \n{history_text}\n
'''
)
])
# Chain the prompt with LLM for response generation
chain = prompt | llm
result = chain.invoke({"Query": query, "Extracted Data": chunks, "Previous Conversation History": history_text})
# Return the generated response
logging.info(f"LLM answer is :{result}")
return result.content
except Exception as e:
st.error(f"Error answering your question: {e}")
return None
def get_context_from_messages(query, chat_history):
try:
logging.info(f"Getting context from original query: {query}")
# Limit the history sent to the LLM to the latest 3 question-answer pairs
limited_history = chat_history[-3:] if len(chat_history) > 3 else chat_history
# Create conversation history for the LLM, only using the last 15 entries
history_text = "\n".join([f"User: {q['question']}\nLLM: {q['answer']}" for q in limited_history])
# Define the system and user prompts including the limited history
prompt = ChatPromptTemplate.from_messages([
("system", f""""I will provide you with a user query and up to the last 3 messages from the chat history which includes both questions and answers.Your task is to understand the user query nicely and restructure it if required such that it makes complete sense and is completely self contained.
The provided queries are related to {website_name}.
1. If the query is a follow-up, use the provided chat history to reconstruct a well-defined, contextually complete query that can stand alone."
2. if the query is self contained, if applicable try to improve it to make is coherent.
3. if the user query is salutations, greetings or not relevant in that case give the query back as it is.
I have provided an output format below, stricly follow it. Do not give anything else other than just the output.
expected_output_format: "query: String or None"
"""),
("human", f'''
"Query":\n {query}\n
"Previous Conversation History": \n{history_text}\n
'''
)
])
# Chain the prompt with LLM for response generation
chain = prompt | llm
result = chain.invoke({"Query": query, "Previous Conversation History": history_text})
logging.info(f"llm answer for query extraction is :{result}")
# Return the generated response
return result.content
except Exception as e:
logging.error(f"exception occured in getting query from original query :{e}")
return None
def get_query_from_llm_answer(llm_output):
match = re.search(r'query:\s*(.*)', llm_output)
if match:
query = match.group(1).strip().strip('"') # Remove leading/trailing spaces and quotes
return None if query.lower() == "none" else query
return None
# Sidebar for showing chat sessions and creating new sessions
st.sidebar.header("Chat Sessions")
# Button for creating a new chat
if st.sidebar.button("New Chat"):
new_chat_id = create_new_chat_session()
st.session_state['current_chat_id'] = new_chat_id
st.session_state['chat_history'] = []
# List existing chat sessions with delete button (dustbin icon)
existing_sessions = chat_sessions.find().sort("created_at", -1)
for session in existing_sessions:
session_id = str(session['_id'])
# Retrieve stored UTC time and convert it to IST for display
utc_time = session['created_at']
ist_time = utc_time.replace(tzinfo=utc).astimezone(timezone("Asia/Kolkata"))
session_date = ist_time.strftime("%Y-%m-%d %H:%M:%S") # Format for display
col1, col2 = st.sidebar.columns([8, 1])
with col1:
if st.button(f"Session {session_date}", key=session_id):
st.session_state['current_chat_id'] = session_id
load_chat_session(session_id)
# Display delete icon (dustbin)
with col2:
if st.button("πŸ—‘οΈ", key=f"delete_{session_id}"):
chat_sessions.delete_one({"_id": ObjectId(session_id)})
st.rerun() # Refresh the app to remove the deleted session from the sidebar
# Main chat interface
st.markdown('<div class="fixed-header"><h1>Welcome To RITES Chatbot</h1></div>', unsafe_allow_html=True)
st.markdown("<hr>", unsafe_allow_html=True)
# Input box for the question
user_question = st.chat_input(f"Ask a Question related to {website_name}")
if user_question:
# Automatically create a new session if none exists
if not st.session_state['current_chat_id']:
new_chat_id = create_new_chat_session()
st.session_state['current_chat_id'] = new_chat_id
with st.spinner("Please wait, I am thinking!!"):
# Store the user's question and get the assistant's response
query = get_context_from_messages(user_question, st.session_state['chat_history'])
if query:
logging.info(f"Extracted query is :{query}\n")
extracted_query = get_query_from_llm_answer(query)
if extracted_query:
query = extracted_query
else:
query = user_question
query_embedding = embeddings.embed_query(query)
search_results = index.query(vector=query_embedding, top_k=10, include_metadata=True)
matches = search_results['matches']
content = ""
for i, match in enumerate(matches):
chunk = match['metadata']['chunk']
url = match['metadata']['url']
content += f"chunk{i}: {chunk}\n" + f"url{i}: {url}\n"
print(f"content being passed is {content}")
reply = generate_summary(content, query, st.session_state['chat_history'])
if reply:
# Append the new question-answer pair to chat history
st.session_state['chat_history'].append(
{"question": user_question, "answer": reply, "improved_question": query})
# Update the current chat session in MongoDB
if st.session_state['current_chat_id']:
update_chat_session(st.session_state['current_chat_id'], user_question, reply, query)
else:
st.error("Error processing your request, Please try again later.")
else:
st.error("Error processing your request, Please try again later.")
# Display the updated chat history (show last 15 question-answer pairs)
for i, pair in enumerate(st.session_state['chat_history']):
question = pair["question"]
answer = pair["answer"]
streamlit_chat.message(question, is_user=True, key=f"chat_message_user_{i}")
streamlit_chat.message(answer, is_user=False, key=f"chat_message_assistant_{i}")
# Display regenerate button under the last response
if st.session_state['chat_history'] and not st.session_state['regenerate']:
if st.button("πŸ”„ Regenerate", key="regenerate_button"):
st.session_state['regenerate'] = True
regenerate_response()