import streamlit as st import json from data_module import faq_data, model_options import uuid from chat_handler import ChatHandler chat = ChatHandler() def add_custom_css(): st.markdown(""" """, unsafe_allow_html=True) def generate_user_id(): new_id = chat.generate_id() return new_id def clear_history(user_id): chat.clear_history(user_id) return 'response' if 'user_id' not in st.session_state: st.session_state['user_id'] = generate_user_id() with open("embeddings_db_model.json", "r") as file: embedding_models = json.load(file) embedding_model_names = [model["model"] for model in embedding_models] agent_types = [ 'JSON_CHAT_MODEL', 'REACT_TEXT' ] selected_model = st.sidebar.selectbox("Escolha o Modelo LLM", model_options) selected_embedding_model = st.sidebar.selectbox("Escolha o Modelo de Embedding", embedding_model_names) selected_embedding_dir = next(item for item in embedding_models if item["model"] == selected_embedding_model)["dir"] selected_agent_type = st.sidebar.selectbox("Escolha o Tipo de Agent", agent_types) add_custom_css() with st.sidebar: st.write("## Opções de Controle") if st.button('Limpar Histórico'): # Fazer a requisição para limpar o histórico response = clear_history(st.session_state['user_id']) if response: st.session_state.messages = [{"role": "assistant", "content": "Histórico limpo. Pode começar uma nova conversa."}] st.rerun() else: st.error("Erro ao limpar o histórico") with st.container(): col1, col2 = st.columns([1, 1]) with col1: st.caption("LLM:") st.write(selected_model) with col2: st.caption("Embeddings:") st.write(selected_embedding_model) st.title("⚖️ ChatBot Direito Tributário") st.caption("Direito Tributário da Pessoa Jurídica") st.caption("Projeto do Workshop de LLM UFG") if "messages" not in st.session_state: st.session_state["messages"] = [{"role": "assistant", "content": "Olá como posso ajudar?"}] if "faq_question" not in st.session_state: st.session_state["faq_question"] = None # Input de chat do usuário for msg in st.session_state.messages: if msg['role'] == 'assistant': img = "server_icon.png" else: img = 'user_icon.png' st.chat_message(msg["role"],avatar=img).write(msg["content"]) def process_question(question): st.session_state.messages.append({"role": "user", "content": question}) st.chat_message("user", avatar="user_icon.png").write(question) with st.chat_message("assistant", avatar="server_icon.png"): with st.spinner("Thinking..."): data = dict( user_id=st.session_state['user_id'], text= question, embedding_model= selected_embedding_model, embedding_dir= selected_embedding_dir, model= selected_model, agent_type=selected_agent_type ) msg,intermediary_steps = chat.post_message(message=data) st.write(str(msg)) st.session_state.messages.append({"role": "assistant", "content": msg}) # Adicionando os passos intermediários #intermediary_steps = response['response']['intermediate_steps'] # intermediary_steps = [] if intermediary_steps: with st.expander("Ver Passos Intermediários"): if intermediary_steps[0] == 'erro': st.markdown("## ERROR...\n") else: st.markdown("## > Entering new AgentExecutor chain...\n") for index, step in enumerate(intermediary_steps, start=1): # action = step[0].get('tool', 'Unknown') action = step[0].tool if hasattr(step[0], 'tool') else 'Unknown' # action_input = step[0].get('tool_input', 'N/A') # log = step[0].get('log', 'No log available') action_input = step[0].tool_input if hasattr(step[0], 'tool_input') else 'N/A' log = step[0].log if hasattr(step[0], 'log') else 'No log available' st.markdown(f"**Passo {index}:**") st.markdown(f" **Ação:** `{action}`") st.markdown(f" **Entrada da Ação:** `{action_input}`") st.code(log, language='json') st.markdown("---") # Adiciona a ação "Final Answer" ao final dos passos st.markdown("**Ação:** Final Answer") st.markdown(f"**Entrada da Ação:** `{msg}`") st.markdown("## > Finished chain.") else: with st.expander("Ver Passos Intermediários"): st.markdown("#### > Entering new AgentExecutor chain...\n") st.markdown("**Ação:** Final Answer") st.markdown(f"**Entrada da Ação:** `{msg}`") st.markdown("#### > Finished chain...") def add_faq_question_to_chat(question): st.session_state["faq_question"] = question # Barra lateral com perguntas frequentes with st.sidebar: st.write("## Perguntas Frequentes") for index, item in enumerate(faq_data, start=1): question_with_number = f"{index}\. {item['question']}" expander = st.expander(question_with_number, expanded=False) with expander: st.write(item["answer"]) button_key = f"button_{index}" if st.button("Enviar esta pergunta", key=button_key): st.session_state['selected_question'] = item["question"] if 'selected_question' in st.session_state and st.session_state['selected_question']: add_faq_question_to_chat(st.session_state['selected_question']) del st.session_state['selected_question'] if st.session_state["faq_question"]: process_question(st.session_state["faq_question"]) st.session_state["faq_question"] = None if prompt := st.chat_input(): process_question(prompt)