|
import streamlit as st |
|
import json |
|
from data_module import faq_data, model_options |
|
import uuid |
|
from chat_handler import ChatHandler |
|
|
|
chat = ChatHandler() |
|
def add_custom_css(): |
|
st.markdown(""" |
|
<style> |
|
.css-1d391kg { width: 35%; } |
|
</style> |
|
""", unsafe_allow_html=True) |
|
def generate_user_id(): |
|
new_id = chat.generate_id() |
|
return new_id |
|
|
|
def clear_history(user_id): |
|
chat.clear_history(user_id) |
|
return 'response' |
|
|
|
if 'user_id' not in st.session_state: |
|
st.session_state['user_id'] = generate_user_id() |
|
|
|
with open("embeddings_db_model.json", "r") as file: |
|
embedding_models = json.load(file) |
|
embedding_model_names = [model["model"] for model in embedding_models] |
|
|
|
agent_types = [ |
|
'JSON_CHAT_MODEL', |
|
'REACT_TEXT' |
|
] |
|
selected_model = st.sidebar.selectbox("Escolha o Modelo LLM", model_options) |
|
selected_embedding_model = st.sidebar.selectbox("Escolha o Modelo de Embedding", embedding_model_names) |
|
selected_embedding_dir = next(item for item in embedding_models if item["model"] == selected_embedding_model)["dir"] |
|
selected_agent_type = st.sidebar.selectbox("Escolha o Tipo de Agent", agent_types) |
|
|
|
|
|
add_custom_css() |
|
with st.sidebar: |
|
st.write("## Opções de Controle") |
|
if st.button('Limpar Histórico'): |
|
|
|
response = clear_history(st.session_state['user_id']) |
|
if response: |
|
st.session_state.messages = [{"role": "assistant", "content": "Histórico limpo. Pode começar uma nova conversa."}] |
|
st.rerun() |
|
else: |
|
st.error("Erro ao limpar o histórico") |
|
with st.container(): |
|
col1, col2 = st.columns([1, 1]) |
|
with col1: |
|
st.caption("LLM:") |
|
st.write(selected_model) |
|
with col2: |
|
st.caption("Embeddings:") |
|
st.write(selected_embedding_model) |
|
|
|
st.title("⚖️ ChatBot Direito Tributário") |
|
st.caption("Direito Tributário da Pessoa Jurídica") |
|
st.caption("Projeto do Workshop de LLM UFG") |
|
|
|
|
|
|
|
if "messages" not in st.session_state: |
|
st.session_state["messages"] = [{"role": "assistant", "content": "Olá como posso ajudar?"}] |
|
if "faq_question" not in st.session_state: |
|
st.session_state["faq_question"] = None |
|
|
|
|
|
for msg in st.session_state.messages: |
|
if msg['role'] == 'assistant': |
|
img = "server_icon.png" |
|
else: |
|
img = 'user_icon.png' |
|
st.chat_message(msg["role"],avatar=img).write(msg["content"]) |
|
|
|
def process_question(question): |
|
st.session_state.messages.append({"role": "user", "content": question}) |
|
st.chat_message("user", avatar="user_icon.png").write(question) |
|
|
|
with st.chat_message("assistant", avatar="server_icon.png"): |
|
with st.spinner("Thinking..."): |
|
data = dict( |
|
user_id=st.session_state['user_id'], |
|
text= question, |
|
embedding_model= selected_embedding_model, |
|
embedding_dir= selected_embedding_dir, |
|
model= selected_model, |
|
agent_type=selected_agent_type |
|
) |
|
|
|
msg,intermediary_steps = chat.post_message(message=data) |
|
st.write(str(msg)) |
|
st.session_state.messages.append({"role": "assistant", "content": msg}) |
|
|
|
|
|
|
|
|
|
if intermediary_steps: |
|
with st.expander("Ver Passos Intermediários"): |
|
if intermediary_steps[0] == 'erro': |
|
st.markdown("## ERROR...\n") |
|
else: |
|
st.markdown("## > Entering new AgentExecutor chain...\n") |
|
for index, step in enumerate(intermediary_steps, start=1): |
|
|
|
action = step[0].tool if hasattr(step[0], 'tool') else 'Unknown' |
|
|
|
|
|
action_input = step[0].tool_input if hasattr(step[0], 'tool_input') else 'N/A' |
|
log = step[0].log if hasattr(step[0], 'log') else 'No log available' |
|
st.markdown(f"**Passo {index}:**") |
|
st.markdown(f" **Ação:** `{action}`") |
|
st.markdown(f" **Entrada da Ação:** `{action_input}`") |
|
st.code(log, language='json') |
|
st.markdown("---") |
|
|
|
st.markdown("**Ação:** Final Answer") |
|
st.markdown(f"**Entrada da Ação:** `{msg}`") |
|
st.markdown("## > Finished chain.") |
|
else: |
|
with st.expander("Ver Passos Intermediários"): |
|
st.markdown("#### > Entering new AgentExecutor chain...\n") |
|
st.markdown("**Ação:** Final Answer") |
|
st.markdown(f"**Entrada da Ação:** `{msg}`") |
|
st.markdown("#### > Finished chain...") |
|
|
|
def add_faq_question_to_chat(question): |
|
st.session_state["faq_question"] = question |
|
|
|
|
|
with st.sidebar: |
|
st.write("## Perguntas Frequentes") |
|
for index, item in enumerate(faq_data, start=1): |
|
question_with_number = f"{index}\. {item['question']}" |
|
expander = st.expander(question_with_number, expanded=False) |
|
|
|
with expander: |
|
st.write(item["answer"]) |
|
button_key = f"button_{index}" |
|
if st.button("Enviar esta pergunta", key=button_key): |
|
st.session_state['selected_question'] = item["question"] |
|
|
|
if 'selected_question' in st.session_state and st.session_state['selected_question']: |
|
add_faq_question_to_chat(st.session_state['selected_question']) |
|
del st.session_state['selected_question'] |
|
|
|
|
|
|
|
if st.session_state["faq_question"]: |
|
process_question(st.session_state["faq_question"]) |
|
st.session_state["faq_question"] = None |
|
|
|
|
|
if prompt := st.chat_input(): |
|
process_question(prompt) |
|
|