import streamlit as st from streamlit_feedback import streamlit_feedback import os import pandas as pd import base64 from io import BytesIO import chromadb from llama_index.core import ( VectorStoreIndex, SimpleDirectoryReader, StorageContext, Document ) from llama_index.vector_stores.chroma.base import ChromaVectorStore from llama_index.embeddings.huggingface.base import HuggingFaceEmbedding from llama_index.llms.openai import OpenAI from llama_index.core.memory import ChatMemoryBuffer from llama_index.core.tools import QueryEngineTool from llama_index.agent.openai import OpenAIAgent from llama_index.core import Settings from vision_api import get_transcribed_text from qna_prompting import get_qna_question_tool, evaluate_qna_answer_tool import nest_asyncio nest_asyncio.apply() # App title st.set_page_config(page_title="💬 Open AI Chatbot") openai_api = os.getenv("OPENAI_API_KEY") # "./raw_documents/HI_Knowledge_Base.pdf" image_prompt = False input_files = ["./raw_documents/HI Chapter Summary Version 1.3.pdf", "./raw_documents/qna.txt"] embedding_model = "BAAI/bge-small-en-v1.5" persisted_vector_db = "./models/chroma_db" fine_tuned_path = "local:models/fine-tuned-embeddings" system_content = ("You are a helpful study assistant. " "You do not respond as 'User' or pretend to be 'User'. " "You only respond once as 'Assistant'." ) data_df = pd.DataFrame( { "Completion": [30, 40, 100, 10], } ) data_df.index = ["Chapter 1", "Chapter 2", "Chapter 3", "Chapter 4"] # Replicate Credentials with st.sidebar: st.title("💬 Open AI Chatbot") st.write("This chatbot is created using the GPT model from Open AI.") if openai_api: pass elif "OPENAI_API_KEY" in st.secrets: st.success("API key already provided!", icon="✅") openai_api = st.secrets["OPENAI_API_KEY"] else: openai_api = st.text_input("Enter OpenAI API token:", type="password") if not (openai_api.startswith("sk-") and len(openai_api)==51): st.warning("Please enter your credentials!", icon="⚠️") else: st.success("Proceed to entering your prompt message!", icon="👉") ### for streamlit purpose os.environ["OPENAI_API_KEY"] = openai_api st.subheader("Models and parameters") selected_model = st.sidebar.selectbox("Choose an OpenAI model", ["gpt-3.5-turbo-0125", "gpt-4-0125-preview"], key="selected_model") temperature = st.sidebar.slider("temperature", min_value=0.0, max_value=2.0, value=0.0, step=0.01) st.data_editor( data_df, column_config={ "Completion": st.column_config.ProgressColumn( "Completion %", help="Percentage of content covered", format="%.1f%%", min_value=0, max_value=100, ), }, hide_index=False, ) st.markdown("📖 Reach out to SakiMilo to learn how to create this app!") if "init" not in st.session_state.keys(): st.session_state.init = {"warm_started": "No"} st.session_state.feedback = False # Store LLM generated responses if "messages" not in st.session_state.keys(): st.session_state.messages = [{"role": "assistant", "content": "How may I assist you today?", "type": "text"}] if "feedback_key" not in st.session_state: st.session_state.feedback_key = 0 if "release_file" not in st.session_state: st.session_state.release_file = "false" if "question_id" not in st.session_state: st.session_state.question_id = None if "qna_answer" not in st.session_state: st.session_state.qna_answer = None def clear_chat_history(): st.session_state.messages = [{"role": "assistant", "content": "How may I assist you today?", "type": "text"}] chat_engine = get_query_engine(input_files=input_files, llm_model=selected_model, temperature=temperature, embedding_model=embedding_model, fine_tuned_path=fine_tuned_path, system_content=system_content, persisted_vector_db=persisted_vector_db) chat_engine.reset() st.sidebar.button("Clear Chat History", on_click=clear_chat_history) if st.sidebar.button("I want to submit a feedback!"): st.session_state.feedback = True st.session_state.feedback_key += 1 # overwrite feedback component @st.cache_resource def get_document_object(input_files): documents = SimpleDirectoryReader(input_files=input_files).load_data() document = Document(text="\n\n".join([doc.text for doc in documents])) return document @st.cache_resource def get_llm_object(selected_model, temperature): llm = OpenAI(model=selected_model, temperature=temperature) return llm @st.cache_resource def get_embedding_model(model_name, fine_tuned_path=None): if fine_tuned_path is None: print(f"loading from `{model_name}` from huggingface") embed_model = HuggingFaceEmbedding(model_name=model_name) else: print(f"loading from local `{fine_tuned_path}`") embed_model = fine_tuned_path return embed_model @st.cache_resource def get_query_engine(input_files, llm_model, temperature, embedding_model, fine_tuned_path, system_content, persisted_vector_db): llm = get_llm_object(llm_model, temperature) embedded_model = get_embedding_model( model_name=embedding_model, fine_tuned_path=fine_tuned_path ) Settings.llm = llm Settings.chunk_size = 1024 Settings.embed_model = embedded_model if os.path.exists(persisted_vector_db): print("loading from vector database - chroma") db = chromadb.PersistentClient(path=persisted_vector_db) chroma_collection = db.get_or_create_collection("quickstart") vector_store = ChromaVectorStore(chroma_collection=chroma_collection) storage_context = StorageContext.from_defaults(vector_store=vector_store) index = VectorStoreIndex.from_vector_store( vector_store=vector_store, storage_context=storage_context ) else: print("create new chroma vector database..") documents = SimpleDirectoryReader(input_files=input_files).load_data() db = chromadb.PersistentClient(path=persisted_vector_db) chroma_collection = db.get_or_create_collection("quickstart") vector_store = ChromaVectorStore(chroma_collection=chroma_collection) nodes = Settings.node_parser.get_nodes_from_documents(documents) storage_context = StorageContext.from_defaults(vector_store=vector_store) storage_context.docstore.add_documents(nodes) index = VectorStoreIndex(nodes, storage_context=storage_context) memory = ChatMemoryBuffer.from_defaults(token_limit=15000) hi_content_engine = index.as_query_engine( memory=memory, system_prompt=system_content, similarity_top_k=3, streaming=True ) hi_textbook_query_description = """ Use this tool to extract content from Health Insurance textbook that has 15 chapters in total. When user wants to learn more about a particular chapter, this tool will help to assist user to get better understanding of the content of the textbook. """ hi_query_tool = QueryEngineTool.from_defaults( query_engine=hi_content_engine, name="vector_tool", description=hi_textbook_query_description ) agent = OpenAIAgent.from_tools(tools=[ hi_query_tool, get_qna_question_tool, evaluate_qna_answer_tool ], max_function_calls=1, llm=llm, verbose=True) print("loaded AI agent, let's begin the chat!") print("="*50) print("") return agent def generate_llm_response(prompt_input, tool_choice="auto"): chat_agent = get_query_engine(input_files=input_files, llm_model=selected_model, temperature=temperature, embedding_model=embedding_model, fine_tuned_path=fine_tuned_path, system_content=system_content, persisted_vector_db=persisted_vector_db) # st.session_state.messages response = chat_agent.stream_chat(prompt_input, tool_choice=tool_choice) return response def handle_feedback(user_response): st.toast("✔️ Feedback received!") st.session_state.feedback = False def handle_image_upload(): st.session_state.release_file = "true" # Warm start if st.session_state.init["warm_started"] == "No": clear_chat_history() st.session_state.init["warm_started"] = "Yes" # Image upload option with st.sidebar: image_file = st.file_uploader("Upload your image here...", type=["png", "jpeg", "jpg"], on_change=handle_image_upload) if st.session_state.release_file == "true" and image_file: with st.spinner("Uploading..."): b64string = base64.b64encode(image_file.read()).decode('utf-8') message = { "role": "user", "content": b64string, "type": "image"} st.session_state.messages.append(message) transcribed_msg = get_transcribed_text(b64string) message = { "role": "admin", "content": transcribed_msg, "type": "text"} st.session_state.messages.append(message) st.session_state.release_file = "false" # Display or clear chat messages for message in st.session_state.messages: if message["role"] == "admin": continue with st.chat_message(message["role"]): if message["type"] == "text": st.write(message["content"]) elif message["type"] == "image": img_io = BytesIO(base64.b64decode(message["content"].encode("utf-8"))) st.image(img_io) # User-provided prompt if prompt := st.chat_input(disabled=not openai_api): client = OpenAI() st.session_state.messages.append({"role": "user", "content": prompt, "type": "text"}) with st.chat_message("user"): st.write(prompt) # Retrieve text prompt from image submission if prompt is None and \ st.session_state.messages[-1]["role"] == "admin": image_prompt = True prompt = st.session_state.messages[-1]["content"] # Generate a new response if last message is not from assistant if st.session_state.messages[-1]["role"] != "assistant": with st.chat_message("assistant"): with st.spinner("Thinking..."): if image_prompt: response = generate_llm_response(prompt, tool_choice="vector_tool") image_prompt = False else: response = generate_llm_response(prompt, tool_choice="auto") placeholder = st.empty() full_response = "" for token in response.response_gen: token = token.replace("\n", " \n") full_response += token placeholder.markdown(full_response) placeholder.markdown(full_response) message = {"role": "assistant", "content": full_response, "type": "text"} st.session_state.messages.append(message) # Trigger feedback if st.session_state.feedback: result = streamlit_feedback( feedback_type="thumbs", optional_text_label="[Optional] Please provide an explanation", on_submit=handle_feedback, key=f"feedback_{st.session_state.feedback_key}" )