Spaces:
Runtime error
Runtime error
import streamlit as st | |
import pandas as pd | |
import os | |
import random | |
import time | |
from module.__custom__ import * | |
from streamlit_extras.switch_page_button import switch_page | |
df = pd.read_csv('./data/cosine.csv') | |
# Openai API Key | |
import openai | |
import json | |
def read_api_key_from_secrets(file_path='secrets.json'): | |
try: | |
with open(file_path, 'r') as secrets_file: | |
secrets_data = json.load(secrets_file) | |
openai_api_key = secrets_data.get('openai_api_key') | |
if openai_api_key is not None: | |
return openai_api_key | |
else: | |
raise KeyError("'openai_api_key' not found in secrets.json") | |
except FileNotFoundError: | |
raise FileNotFoundError(f"The file {file_path} was not found.") | |
except json.JSONDecodeError: | |
raise ValueError(f"Error decoding JSON in {file_path}. Please check the file format.") | |
# Example usage | |
try: | |
openai.api_key = os.environ['key'] | |
os.environ['OPENAI_API_KEY'] = os.environ['key'] | |
print(f"OpenAI API Key Found") | |
except (FileNotFoundError, ValueError, KeyError) as e: | |
print(f"Error: {e}") | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.chains.query_constructor.base import AttributeInfo | |
from langchain.retrievers.self_query.base import SelfQueryRetriever | |
embedding = OpenAIEmbeddings() | |
# from langchain.embeddings.sentence_transformer import SentenceTransformerEmbeddings | |
# embedding = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") | |
# LLM | |
from langchain.chat_models import ChatOpenAI | |
llm_name = "gpt-3.5-turbo" | |
llm = ChatOpenAI(model_name=llm_name, temperature=0) | |
# load from disk | |
db_cos = Chroma( | |
persist_directory="./data/docs/chroma_cos", | |
embedding_function=embedding | |
) | |
db_plot = Chroma( | |
persist_directory="./data/docs/chroma_plot", | |
embedding_function=embedding | |
) | |
with st.sidebar: is_plot = st.toggle('Enable Plot') | |
db_selected = db_cos | |
if is_plot: db_selected = db_plot | |
##### Conversational Retrieval ##### | |
from langchain.agents.agent_toolkits.conversational_retrieval.tool import ( | |
create_retriever_tool, | |
) | |
retriever = db_selected.as_retriever() | |
retriever_tool = create_retriever_tool( | |
retriever, | |
"document-retriever", | |
"Query a retriever to get information about the video game dataset.", | |
) | |
################################## | |
##### Retriever - Self Query ##### | |
metadata_field_info = [ | |
AttributeInfo( | |
name="name", | |
description="The name of the video game on steam", | |
type="string", | |
) | |
] | |
document_content_description = "Brief summary of a video game on Steam" | |
retriever_plot = SelfQueryRetriever.from_llm( | |
llm, | |
db_selected, | |
document_content_description, | |
metadata_field_info, | |
enable_limit=True, | |
) | |
################################## | |
from typing import List | |
from langchain.utils.openai_functions import convert_pydantic_to_openai_function | |
from pydantic import BaseModel, Field | |
class Response(BaseModel): | |
"""Final response to the question being asked. | |
If you do not have an answer, say you do not have an answer, and ask the user to ask another recommendation. | |
If you do have an answer, be verbose and explain why you think the game answers the user's query. | |
Don't give information not mentioned in the documents CONTEXT. | |
You should always refuse to answer questions that are not related to this specific domain, of video game recommendation. | |
If no document passes the minimum threshold of similarity .75, default to apologizing for no answer. | |
""" | |
answer: str = Field(description="The final answer to the user, including the names in the answer.") | |
name: List[str] = Field( | |
description="A list of the names of the games found for the user. Only include the game name if it was given as a result to the user's query." | |
) | |
import json | |
from langchain.schema.agent import AgentActionMessageLog, AgentFinish | |
def parse(output): | |
# If no function was invoked, return to user | |
if "function_call" not in output.additional_kwargs: | |
return AgentFinish(return_values={"output": output.content}, log=output.content) | |
# Parse out the function call | |
function_call = output.additional_kwargs["function_call"] | |
name = function_call["name"] | |
inputs = json.loads(function_call["arguments"]) | |
# If the Response function was invoked, return to the user with the function inputs | |
if name == "Response": | |
return AgentFinish(return_values=inputs, log=str(function_call)) | |
# Otherwise, return an agent action | |
else: | |
return AgentActionMessageLog( | |
tool=name, tool_input=inputs, log="", message_log=[output] | |
) | |
from langchain.agents import AgentExecutor | |
from langchain.agents.format_scratchpad import format_to_openai_function_messages | |
from langchain.chat_models import ChatOpenAI | |
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain.tools.render import format_tool_to_openai_function | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", "You are a recommendation assistant, based off documents."), | |
("user", "{input}"), | |
MessagesPlaceholder(variable_name="agent_scratchpad"), | |
] | |
) | |
llm_with_tools = llm.bind( | |
functions=[ | |
# The retriever tool | |
format_tool_to_openai_function(retriever_tool), | |
# Response schema | |
convert_pydantic_to_openai_function(Response), | |
] | |
) | |
agent = ( | |
{ | |
"input": lambda x: x["input"], | |
# Format agent scratchpad from intermediate steps | |
"agent_scratchpad": lambda x: format_to_openai_function_messages( | |
x["intermediate_steps"] | |
), | |
} | |
| prompt | |
| llm_with_tools | |
| parse | |
) | |
agent_executor = AgentExecutor(tools=[retriever_tool], agent=agent, verbose=True) | |
post_prompt = """ | |
1. Respond with a respectable and friendy tone. | |
2. You should give the best possible answer based on user's query. | |
3. Do not give me any information that is not included in the document. | |
4. If you are able to, provide the links to the steam site for the games answer. | |
5. If you need more context from the user, ask them to provide more context in the next query. Do not include games that contain the queried game in the title. | |
6. If a user asks for a type of game, use that type to find a game that mentions the type. | |
""" | |
# If you do not have an answer, your response should be kind and apologetic, as to why you do not have an answer. | |
# If a user asks for a specific number of games, and you cannot provide that, answer with what games you found and explain why you could not find others. | |
st.header("🕹️ GameInsightify") | |
st.title("Your Personal :green[Game Recommender]") | |
st.image('./data/img/demoGIF.gif') | |
# Description for users | |
st.markdown(""" | |
Welcome to GameInsightify! This chatbot will help you find the perfect game based on your preferences. | |
Just type in what you're looking for in a game, and let our AI assistant provide recommendations. | |
""") | |
# Initialize chat history | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if 'gamenames' not in st.session_state: | |
st.session_state.gamenames = [] | |
# Slider on range and button to clear chat history | |
col1, col2= st.columns([8,2]) | |
with col1: | |
pass | |
with col2: | |
if st.button("Clear chat"): | |
st.session_state.messages = [] | |
st.session_state.gamenames = [] | |
# Display chat messages from history on app rerun | |
tab1, tab2= st.tabs(['Chatbot', ' ']) | |
with tab1: # this tab exist becasue i have to limit the height of chatbot | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
with tab2: pass # this tab exist becasue i have to limit the height of chatbot | |
# Accept user input | |
if prompt := st.chat_input("Need a game recommendation?"): | |
st.session_state.messages.append({"role": "user", "content": prompt}) # Add user message to chat history | |
with st.chat_message("user"): # Display user message in chat message container | |
st.markdown(prompt) | |
with st.chat_message("assistant"): # Display assistant response in chat message container | |
message_placeholder = st.empty() | |
assistant_response = "" | |
full_response = "" | |
# docs = db.max_marginal_relevance_search(prompt,k=query_num, fetch_k=10) # Sending query to db | |
if is_plot: | |
docs = retriever_plot.invoke(prompt) | |
full_response = random.choice( # 1st sentence of response | |
["I recommend the following games:\n", | |
f"Hi, human! These are the {len(docs)} best games:\n", | |
f"I bet you will love these {len(docs)} games:\n",] | |
) | |
# formatting response from db | |
top_games = [] | |
for idx, doc in enumerate(docs): | |
gamename = doc.metadata['name'] | |
top_games.append(gamename) | |
assistant_response += f"{idx+1}. {gamename}\n" | |
else: | |
docs = agent_executor.invoke( | |
{"input": f"{prompt} {post_prompt}"}, | |
return_only_outputs=True, | |
) # retrieve response from chatgpt | |
try: | |
assistant_response += docs["answer"] | |
except: | |
assistant_response += docs["output"] | |
top_games = docs['name'] | |
print(docs) | |
# separating response into chunk of words | |
chunks = [] | |
for line in assistant_response.splitlines(): | |
for word in line.split() : chunks.append(word) | |
chunks.append('\n') | |
chunks = chunks[0:-1] | |
# Simulate stream of response with milliseconds delay | |
for chunk in chunks: | |
full_response += chunk + " " | |
time.sleep(0.05) | |
message_placeholder.markdown(full_response + "▌") # Add a blinking cursor to simulate typing | |
message_placeholder.markdown(full_response) | |
# Add assistant response to chat history | |
st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
if is_plot: st.session_state.gamenames.append(top_games) | |
col1, col2, col3= st.columns([4,3,4]) | |
with col2: | |
if is_plot and db_selected==db_plot: | |
if st.button("Plot Games"): # button in center column | |
switch_page('Overall') | |
else: | |
try: | |
appid = df[df['Name']==top_games[0]]['AppID'].iloc[0] | |
url = f'https://store.steampowered.com/app/{appid}' | |
st.link_button("Check on Steam", url) | |
except: pass | |
with st.sidebar: | |
try: home_dfbox(top_games) | |
except: pass | |
# Styling on Tabs | |
css = ''' | |
div.stTabs { | |
min-height: 20vh; # Minimum height set for the chat area | |
max-height: 60vh; # Maximum height, after which scrolling starts | |
overflow-y: auto; # Allows scrolling when content exceeds max height | |
overflow-x: hidden; | |
} | |
''' | |
st.markdown(f'<style>{css}</style>', unsafe_allow_html=True) |