import os import string import time from typing import Any, Dict, List, Tuple, Union import chromadb import numpy as np import openai import pandas as pd import requests import streamlit as st from datasets import load_dataset from langchain.document_loaders import TextLoader from langchain.embeddings.sentence_transformer import SentenceTransformerEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.vectorstores import Chroma from scipy.spatial.distance import cosine from utils.helper_functions import * openai.api_key = os.environ["OPENAI_API_KEY"] # Front-end Design st.set_page_config(layout="wide") st.title("YSA|Larkin Chatbot") # Initialize chat history if "messages" not in st.session_state: st.session_state.messages = [] # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) st.sidebar.markdown( """ ### Instructions: This app guides you through [YSA](https://youthspiritartworks.org/)/[Larkin](https://larkinstreetyouth.org/) website , utilizing a RAG-ready Q&A dataset [here](https://huggingface.co/datasets/eagle0504/youthless-homeless-shelter-web-scrape-dataset-qa-formatted) for chatbot assistance. The Larkin domain is processed into QA data [here](https://huggingface.co/datasets/eagle0504/larkin-web-scrape-dataset-qa-formatted). 🤖 Enter a question, and it finds similar ones in the database, offering answers with a distance score to gauge relevance—the lower the score, the closer the match. 🎯 For better accuracy and to reduce errors, user feedback helps refine the database. ✨ """ ) st.sidebar.success("Select a shelter first!") option = st.sidebar.selectbox("Which website do you want to ask?", ("YSA", "Larkin")) st.sidebar.warning( "Runnning AI Judge takes a bit longer so we default this option as 'No'." ) run_ai_judge = st.sidebar.selectbox( "Shall we run AI Judge to provide additional scores?", ("No", "Yes") ) special_threshold = st.sidebar.number_input( "Insert a threshold for distances score to filter data (default 0.2):", value=0.2, placeholder="Type a number...", ) user_timer = st.sidebar.selectbox("Shall we time each step?", ("No", "Yes")) st.sidebar.success( "The 'distances' score indicates the proximity of your question to our database questions (lower is better). The 'ai_judge' ranks the similarity between user's question and database answers independently (higher is better)." ) clear_button = st.sidebar.button("Clear Conversation", key="clear") if clear_button: st.session_state.messages = [] # Load the dataset from a provided source. if option == "YSA": begin_t = time.time() dataset = load_dataset( "eagle0504/ysa-web-scrape-dataset-qa-formatted-small-version" ) end_t = time.time() if user_timer == "Yes": st.success(f"{option} Database loaded. | Time: {end_t - begin_t} sec") initial_input = "Tell me about YSA" else: begin_t = time.time() dataset = load_dataset("eagle0504/larkin-web-scrape-dataset-qa-formatted") end_t = time.time() if user_timer == "Yes": st.success(f"{option} Database loaded. | Time: {end_t - begin_t} sec") initial_input = "Tell me about Larkin" # Initialize a new client for ChromeDB. client = chromadb.Client() # Generate a random number between 1 billion and 10 billion. random_number: int = np.random.randint(low=1e9, high=1e10) # Generate a random string consisting of 10 uppercase letters and digits. random_string: str = "".join( np.random.choice(list(string.ascii_uppercase + string.digits), size=10) ) # Combine the random number and random string into one identifier. combined_string: str = f"{random_number}{random_string}" # Create a new collection in ChromeDB with the combined string as its name. collection = client.create_collection(combined_string) # Embed and store the first N supports for this demo with st.spinner("Loading, please be patient with us ... 🙏"): L = len(dataset["train"]["questions"]) begin_t = time.time() collection.add( ids=[str(i) for i in range(0, L)], # IDs are just strings documents=dataset["train"]["questions"], # Enter questions here metadatas=[{"type": "support"} for _ in range(0, L)], ) end_t = time.time() if user_timer == "Yes": st.success(f"Add to VectorDB. | Time: {end_t - begin_t} sec") # React to user input if prompt := st.chat_input(initial_input): with st.spinner("Loading, please be patient with us ... 🙏"): # Display user message in chat message container st.chat_message("user").markdown(prompt) # Add user message to chat history st.session_state.messages.append({"role": "user", "content": prompt}) question = prompt begin_t = time.time() results = collection.query(query_texts=question, n_results=5) end_t = time.time() if user_timer == "Yes": st.success(f"Query answser. | Time: {end_t - begin_t} sec") idx = results["ids"][0] idx = [int(i) for i in idx] ref = pd.DataFrame( { "idx": idx, "questions": [dataset["train"]["questions"][i] for i in idx], "answers": [dataset["train"]["answers"][i] for i in idx], "distances": results["distances"][0], } ) # special_threshold = st.sidebar.slider('How old are you?', 0, 0.6, 0.1) # 0.3 filtered_ref = ref[ref["distances"] < special_threshold] if filtered_ref.shape[0] > 0: if user_timer == "Yes": st.success("There are highly relevant information in our database.") ref_from_db_search = filtered_ref["answers"].str.cat(sep=" ") final_ref = filtered_ref else: st.warning( "The database may not have relevant information to help your question so please be aware of hallucinations." ) ref_from_db_search = ref["answers"].str.cat(sep=" ") final_ref = ref if option == "YSA": try: begin_t = time.time() llm_response = llama2_7b_ysa(question) end_t = time.time() if user_timer == "Yes": st.success(f"Running LLM. | Time: {end_t - begin_t} sec") did_this_llm_run = "yes" except: st.warning("Sorry, the inference endpoint is temporarily down. 😔") llm_response = "NA." did_this_llm_run = "no" else: st.warning( "Apologies! We are in the progress of fine-tune the model, so it's currently unavailable. ⚙️" ) llm_response = "NA" finetuned_llm_guess = ["from_llm", question, llm_response, 0] if did_this_llm_run == "no": st.warning("Fine-tuned LLM not used in this call.") else: final_ref.loc[-1] = finetuned_llm_guess final_ref = final_ref.reset_index() # add ai judge as additional rating if run_ai_judge == "Yes": independent_ai_judge_score = [] begin_t = time.time() for i in range(final_ref.shape[0]): this_content = final_ref["answers"][i] if len(this_content) > 3: arr1 = openai_text_embedding(question) arr2 = openai_text_embedding(this_content) # this_score = calculate_sts_openai_score(question, this_content) this_score = quantized_influence(arr1, arr2, k=3)[0] else: this_score = 0 independent_ai_judge_score.append(this_score) final_ref["ai_judge"] = independent_ai_judge_score end_t = time.time() if user_timer == "Yes": st.success(f"Using AI Judge. | Time: {end_t - begin_t} sec") engineered_prompt = f""" Based on the context: {ref_from_db_search} answer the user question: {question} Answer the question directly (don't say "based on the context, ...") """ begin_t = time.time() answer = call_chatgpt(engineered_prompt) end_t = time.time() if user_timer == "Yes": st.success(f"Final API Call. | Time: {end_t - begin_t} sec") response = answer # Display assistant response in chat message container with st.chat_message("assistant"): with st.spinner("Wait for it..."): st.markdown(response) with st.expander("See reference:"): st.table(final_ref) # Add assistant response to chat history st.session_state.messages.append({"role": "assistant", "content": response})