Spaces:
Runtime error
Runtime error
import streamlit as st | |
import datetime | |
import numpy as np | |
import gspread | |
from google.oauth2 import service_account | |
# Add this function to create a Google Sheets service | |
def create_google_sheets_service(json_credentials_path, scopes): | |
creds = service_account.Credentials.from_service_account_file(json_credentials_path).with_scopes(scopes) | |
return gspread.authorize(creds) | |
from datetime import datetime | |
import pytz | |
import requests | |
def get_user_ip(): | |
try: | |
response = requests.get("https://api.ipify.org?format=json") | |
ip = response.json()['ip'] | |
except: | |
ip = "Unknown" | |
return ip | |
from google.api_core.retry import Retry | |
from google.api_core import retry | |
def write_data_to_google_sheet(service, spreadsheet_url, sheet_name, data): | |
sheet = service.open_by_url(spreadsheet_url).worksheet(sheet_name) | |
# Add header row | |
header_row = ["Questions", "Answers", "Timestamp", "User IP"] | |
for i, header in enumerate(header_row, start=1): | |
sheet.update_cell(1, i, header) | |
# Set timezone to Saudi Arabia time | |
saudi_timezone = pytz.timezone("Asia/Riyadh") | |
# Get user's IP address | |
user_ip = get_user_ip() | |
# Find the next empty row | |
next_row = len(sheet.get_all_values()) + 1 | |
# Write data to the Google Sheet | |
for i, item in enumerate(data, start=next_row): | |
sheet.update_cell(i, 1, item['query']) | |
sheet.update_cell(i, 2, item['response']) | |
saudi_time = datetime.now(saudi_timezone).strftime("%Y-%m-%d %H:%M:%S") | |
sheet.update_cell(i, 3, saudi_time) | |
sheet.update_cell(i, 4, user_ip) | |
# Add these lines to the beginning of your `app` function | |
json_credentials_path = 'credentials.json' # Replace with the path to your JSON credentials file | |
scopes = ['https://www.googleapis.com/auth/spreadsheets'] | |
service = create_google_sheets_service(json_credentials_path, scopes) | |
spreadsheet_url = 'https://docs.google.com/spreadsheets/d/1R1AUf0Bzk5fLTpV6vk023DW7FV19kBT3e1lPWysDW2Q/edit#gid=1555077198' | |
sheet_name = 'Sheet1' # Replace with the name of the sheet where you want to store the data | |
#@title State of Union Text | |
#state_of_the_union = """ txt_file""" | |
# Environment Vars | |
#os.environ["OPENAI_API_KEY"] = openai_api_key | |
import os | |
os.environ["OPENAI_API_KEY"] = openai_api_key | |
os.environ['OPENAI_API_KEY'] = st.secrets['OPENAI_API_KEY'] | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.document_loaders.csv_loader import CSVLoader | |
from langchain.vectorstores.faiss import FAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain import OpenAI, VectorDBQA | |
from langdetect import detect | |
from googletrans import Translator | |
from langchain.vectorstores import Chroma | |
from langchain.document_loaders import PyPDFLoader | |
from langchain.chains import RetrievalQA | |
from langchain.llms import OpenAI | |
from langchain.document_loaders import TextLoader | |
#from langchain.translator import OpenAITranslator | |
import openai | |
from datetime import datetime | |
import pandas as pd | |
import pytz | |
import streamlit as st | |
from hashlib import sha256 | |
def create_hashed_password(password): | |
return sha256(password.encode('utf-8')).hexdigest() | |
def login(): | |
st.title('Please Login') | |
entered_username = st.text_input('Username') | |
entered_password = st.text_input('Password', type='password') | |
if st.button('Login'): | |
names = ['User', 'Customer'] | |
usernames = ['warba', 'Warba'] | |
passwords = ['warba123', 'warba123'] | |
hashed_passwords = [create_hashed_password(password) for password in passwords] | |
for name, username, hashed_password in zip(names, usernames, hashed_passwords): | |
if username == entered_username and hashed_password == create_hashed_password(entered_password): | |
st.session_state["authentication_status"] = True | |
st.session_state["name"] = name | |
break | |
else: | |
st.session_state["authentication_status"] = False | |
if st.session_state.get("authentication_status", None): | |
return True | |
elif st.session_state["authentication_status"] == False: | |
st.error('Sorry, wrong login credentials') | |
return False | |
elif st.session_state["authentication_status"] == None: | |
st.warning('Please enter your username and password') | |
return False | |
else: | |
return False | |
#text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
#texts = text_splitter.split_text(state_of_the_union) | |
#loader = PyPDFLoader("warba_5_6.pdf") | |
#documents = loader.load() | |
#texts = text_splitter.split_documents(documents) | |
################# | |
from langchain.chat_models import ChatOpenAI | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain.prompts.prompt import PromptTemplate | |
from langchain.callbacks import get_openai_callback | |
qa_template = """ | |
You are a helpful AI assistant named Q&A bot developed and created by Warba Bank Developers. The user gives you a file its content is represented by the following pieces of context, use them to answer the question at the end. | |
If you don't know the answer, just say you don't know. Do NOT try to make up an answer. | |
If the question is not related to the context, politely respond that you are tuned to only answer questions that are related to the context. | |
Use as much detail as possible when responding. | |
context: {context} | |
========= | |
question: {question} | |
====== | |
""" | |
QA_PROMPT = PromptTemplate(template=qa_template, input_variables=["context","question" ]) | |
#loader = CSVLoader("Warba_QA_bot_full_dataset_June_14_csv.csv", csv_args = {"delimiter": ','}) | |
#documents = loader.load() | |
loader = CSVLoader(file_path="Warba_QA_bot_full_dataset_June_14_csv_updated.csv", encoding="utf-8",csv_args={'delimiter': ',',}) | |
data = loader.load() | |
#text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size = 1000,chunk_overlap = 0,length_function = len,) | |
embeddings = OpenAIEmbeddings() | |
vectors = FAISS.from_documents(data, embeddings) | |
chain = ConversationalRetrievalChain.from_llm(llm = ChatOpenAI(temperature=0.0,model_name='gpt-3.5-turbo', openai_api_key=st.secrets['OPENAI_API_KEY']), | |
retriever=vectors.as_retriever(),max_tokens_limit=4097,combine_docs_chain_kwargs={"prompt": QA_PROMPT}) | |
#faissIndex = FAISS.from_documents(docs, OpenAIEmbeddings()) | |
#faissIndex.save_local("faiss_warba_docs") | |
#from langchain.chains import RetrievalQA | |
#from langchain.chat_models import ChatOpenAI | |
#chatbot = RetrievalQA.from_chain_type(llm=ChatOpenAI(openai_api_key=st.secrets['OPENAI_API_KEY'],temperature=0, model_name="gpt-3.5-turbo", max_tokens=256), chain_type="stuff", retriever=FAISS.load_local("faiss_warba_docs", OpenAIEmbeddings()).as_retriever(search_type="similarity", search_kwargs={"k":1})) | |
###embeddings = OpenAIEmbeddings() | |
###text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
###texts = text_splitter.split_text(state_of_the_union) | |
###vectorstore = FAISS.from_texts(texts, embeddings) | |
#import numpy as np | |
#text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
#texts = text_splitter.split_text(state_of_the_union) | |
#embeddings = OpenAIEmbeddings() | |
#vectorstore = FAISS.from_texts(texts, embeddings) | |
#db = Chroma.from_documents(texts, embeddings) | |
#retriever = db.as_retriever(search_type="similarity", search_kwargs={"k":2}) | |
#llm = OpenAI(model_name='gpt-3.5-turbo',temperature=0, max_tokens=256 ) | |
#qa = VectorDBQA.from_chain_type(llm, chain_type="stuff", vectorstore=vectorstore) | |
from langchain.chat_models import ChatOpenAI | |
#qa = VectorDBQA.from_chain_type(llm=OpenAI(model_name='gpt-3.5-turbo',temperature=0.2,max_tokens=256), chain_type="stuff", vectorstore=vectorstore) | |
#qa = VectorDBQA.from_chain_type(llm=ChatOpenAI(model_name='gpt-3.5-turbo',temperature=0.2,max_tokens=256), chain_type="stuff", vectorstore=vectorstore) | |
#qa = RetrievalQA.from_chain_type(llm=OpenAI(model_name='gpt-3.5-turbo'), chain_type="stuff", retriever=retriever, return_source_documents=True) | |
from langchain.chains import load_chain | |
#translator = OpenAITranslator() | |
from googletrans import Translator | |
#chain = load_chain("lc://chains/vector-db-qa/stuff/chain.json", vectorstore=vectorstore) | |
#from langchain.chains.question_answering import load_qa_chain | |
#chain = load_qa_chain(llm=OpenAI(model_name='gpt-3.5-turbo'), chain_type="stuff") | |
from googletrans import Translator | |
def translate_to_arabic(text): | |
translator = Translator() | |
result = translator.translate(text, dest='ar') | |
return result.text | |
translator = Translator() | |
from langdetect import detect | |
import time | |
import streamlit as st | |
from datetime import datetime | |
import pytz | |
#def run_chain(query): | |
#return chain.run(query) | |
def run_chain(chat_history, question): | |
return chain.run({'chat_history': chat_history, 'question': question}) | |
def clear_conversation(): | |
if ( | |
st.button("🧹 Clear conversation", use_container_width=True) | |
or "history" not in st.session_state | |
): | |
st.session_state.history = [] | |
def download_conversation(): | |
conversation_df = pd.DataFrame( | |
st.session_state.history, columns=["timestamp", "query", "response"] | |
) | |
csv = conversation_df.to_csv(index=False) | |
st.download_button( | |
label="💾 Download conversation", | |
data=csv, | |
file_name=f"conversation_{datetime.now().strftime('%Y%m%d%H%M')}.csv", | |
mime="text/csv", | |
use_container_width=True, | |
) | |
def app(): | |
st.set_page_config(page_title="Q&A Bot", page_icon=":guardsman:") | |
st.markdown(""" | |
<style> | |
body { | |
background-color: #f0f2f6; | |
} | |
.title { | |
font-size: 25px; | |
font-weight: bold; | |
color: #151f6d; | |
text-align: center; | |
} | |
.response-block { | |
background-color: #151f6d; | |
padding: 10px; | |
color: white; | |
border-radius: 5px; | |
margin-top: 10px; | |
text-align: center; | |
font-size: 16px; # Increase font size by one degree | |
} | |
.stTextInput>div>div>input { | |
background-color: white; | |
} | |
.stButton>button { | |
width: 100%; | |
color: white; | |
background-color: #151f6d; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
st.markdown('<div class="title">Questions and Answers Bot for Warba Bank.</div>', unsafe_allow_html=True) | |
st.write("") # Empty line for spacing | |
st.write("") # Empty line for spacing | |
sidebar = st.sidebar | |
show_history = sidebar.checkbox("Show conversation history", value=False) | |
# Add the checkbox for multi-line input in the sidebar | |
multiline = sidebar.checkbox('Use multi-line input') | |
with sidebar.expander("More options"): | |
clear_conversation() | |
download_conversation() | |
col1, col2 = st.columns([3,1]) | |
with col1: | |
# Depending on the state of the checkbox, display a single-line input or a multi-line input | |
if multiline: | |
query = st.text_area("Enter a question and get an answer from Q&A Bot:") | |
else: | |
query = st.text_input("Enter a question and get an answer from Q&A Bot:") | |
thinking_message_text = col1.empty() # Create a placeholder for the 'Thinking...' text | |
thinking_message_bar = col1.empty() # Create a placeholder for the progress bar | |
response_block = col1.empty() # Create a placeholder for the response block | |
with col2: | |
st.write("") # Empty line for spacing | |
st.write("") # Empty line for spacing | |
if st.button("Ask"): | |
if query: | |
# Start progress bar | |
progress_bar = thinking_message_bar.progress(0) | |
for i in range(100): | |
# Update the progress bar with each iteration. | |
time.sleep(0.01) # add delay for demonstration | |
progress_bar.progress(i + 1) | |
thinking_message_text.markdown(f'Thinking... {i+1}%', unsafe_allow_html=True) | |
sa_time = datetime.now(pytz.timezone('Asia/Riyadh')) | |
timestamp = sa_time.strftime('%Y-%m-%d %H:%M:%S') | |
#response = run_chain(query) | |
response = run_chain("", query) | |
# Clear the progress bar and the 'Thinking...' text | |
thinking_message_bar.empty() | |
thinking_message_text.empty() | |
# Display the response | |
response_block.markdown(f'<div class="response-block"> Answer: {response}</div>', unsafe_allow_html=True) | |
conversation_item = { | |
'timestamp': timestamp, | |
'query': query, | |
'response': response | |
} | |
st.session_state.history.append(conversation_item) | |
# Write data to Google Sheet | |
write_data_to_google_sheet(service, spreadsheet_url, sheet_name, [conversation_item]) | |
# Only show conversation history if checkbox is checked | |
if show_history: | |
st.write('\n\n## Conversation history') | |
for item in reversed(st.session_state.history): | |
st.write(f'### Question: {item["query"]}') | |
st.write(f'### Answer: {item["response"]}') | |
st.write('---') | |
if __name__ == "__main__": | |
#st.set_page_config(page_title="My Streamlit App") | |
if 'authentication_status' not in st.session_state or st.session_state["authentication_status"] == False: | |
login_successful = login() | |
if login_successful: | |
st.experimental_rerun() | |
else: | |
app() | |