import streamlit as st import sys import os # Configuração do caminho para incluir as pastas necessárias sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from pipelines.message import send_message from agent import pipeline from interface.auxiliary_functions import ( extract_content, extract_links, fetch_webpage_content, url_to_suggestion, connect_to_services ) class Chatbot: def __init__(self): # Configuração inicial do histórico de chat if "chat_history" not in st.session_state: st.session_state.chat_history = [{ "type": "text", "role": "assistant", "content": "Como eu posso ajudar?", "links": [] }] if "chat" not in st.session_state and st.session_state.get('username'): chat = st.session_state.services["edgedb_client"].query(''' Select Chat { id } filter .user.username = $username and .prototipo.id = $prototipo_id ''', username=st.session_state.username, prototipo_id=st.session_state.prototipo_id ) if not chat: chat = self._create_default_chat() else: chat = chat[0] self.update_messages(chat) st.session_state.chat = chat # Conteúdo dos botões do sidebar if "topics" not in st.session_state: st.session_state["topics"] = [ "Níveis da conta govbr.", "Dúvidas no reconhecimento facial.", "Como recuperar minha conta gov.br", "Dúvidas para aumentar o nível com a CIN." ] if "is_feedback_active" not in st.session_state: st.session_state.is_feedback_active = False # Pergunta do usuário no chatbot self.response = "" self.links = [] def _create_default_chat(self): chat = st.session_state.services["edgedb_client"].query(''' INSERT Chat { user := (SELECT User FILTER .username = $username), prototipo := (SELECT Prototipo FILTER .id = $prototipo_id) } ''', username=st.session_state.username, prototipo_id=st.session_state.prototipo_id ) message = st.session_state.services["edgedb_client"].query(''' SELECT ( INSERT Message { content := "Como eu posso ajudar?", role := "assistant", chat := (SELECT Chat FILTER .id = $chat_id) } ) { content, role } ''', chat_id=chat[0].id) # st.session_state.chat_history = message return chat[0] def update_messages(self, chat): messages = st.session_state.services["edgedb_client"].query(''' SELECT Message { id, content, role } FILTER .chat.id = $chat_id ORDER BY .created_at ASC; ''', chat_id=chat.id) # st.session_state.chat_history = messages def create_sidebar(self): """Cria o sidebar com tópicos e opções.""" st.image('https://www.gov.br/++theme++padrao_govbr/img/govbr-logo-large.png', width=200) st.header("Tópicos frequentes") for topic in st.session_state["topics"]: if st.button(topic, key=topic): self.response = topic # Espaços em branco para organização for _ in range(5): st.write("") # Botão centralizado para limpar histórico col1, col2, col3 = st.columns([1, 2, 1]) with col2: if st.button("LIMPAR HISTÓRICO"): st.session_state.chat_history = [{ "type": "text", "role": "assistant", "content": "Como eu posso ajudar?", "links": [] }] st.session_state.is_feedback_active = False def display_suggestions(self, links): """Exibe as sugestões de links no chat.""" if links: st.subheader("Você pode acessar a página na web em:") st.write(links[0]) # O primeiro link será o da página principal st.subheader("Próximos Passos:") suggestions = [(link, url_to_suggestion(link)) for link in links[1:] if url_to_suggestion(link)] if suggestions: st.write("Você também pode se interessar em:") for link, suggestion in suggestions: st.write(f"- [{suggestion}]({link})") # Exibe os links clicáveis no formato Markdown else: st.write("Não há sugestões de navegação adicionais.") else: st.write("Não há links disponíveis para exibição.") def mount_chatbot(self): """Configura o chatbot com título, subtítulo e espaço de entrada.""" st.title("Bem-vindo à ajuda do gov.br") st.caption("💬 Qual a sua dificuldade hoje? Estou aqui para ajudar!") # Exibição do espaço para mandar mensagem if user_query := st.chat_input(placeholder="Digite sua mensagem"): st.session_state.chat_history.append({"type": "text", "role": "user", "content": user_query}) self.generate_answer(user_query) st.session_state.is_feedback_active = True # Ativando o feedback def add_to_history(self, message, role="user", type="text"): """Adiciona uma mensagem ao histórico de chat.""" st.session_state.chat_history.append({ "role": role, "type": type, "content": message }) def generate_answer(self, input_message): """Gera uma resposta para a consulta do usuário.""" # Chama o pipeline para obter a resposta response_text = send_message(input_message, pipeline) # Armazena a resposta do agente com a tag self.add_to_history(response_text, role="assistant") # Extração do caminho da resposta do bot path = extract_content(response_text, "path") if path: base_url = "https://www.gov.br/governodigital/pt-br/" full_url = base_url + path # Armazena o link principal self.links = [full_url] with st.spinner("Obtendo conteúdo da página..."): content, original_links = fetch_webpage_content(full_url) # Agora retornando também os links desativados # Adiciona o conteúdo da página ao histórico do chatbot self.add_to_history(content, role="assistant", type="page") # Extrai links ativos do conteúdo da página e armazena para sugestões extracted_links = extract_links(content) # Combinar links ativos e links inibidos all_links = extracted_links + original_links self.links.extend(all_links) def _save_msg_to_db(self, msg, role): message = st.session_state.services["edgedb_client"].query(''' SELECT ( INSERT Message { content := $content, chat := (SELECT Chat FILTER .id = $chat_id), role := $role } ) { id } ''', content=msg, chat_id=st.session_state.chat.id, role=role) return message def display_feedback(self): user_input = st.session_state.chat_history[-2]["content"] bot_output = st.session_state.chat_history[-1]["content"] with st.expander("Avaliação do atendimento"): # st.write(f'O que achou da resposta para a pergunta "{user_input}"?') st.write(f'O que achou da resposta para a pergunta?') rate = st.feedback("stars") # rate = st.feedback("faces") text_feedback = st.text_input("Comentários extras:") # Botão para confirmar a avaliação if rate is not None: if st.button("Enviar Avaliação"): try: msg = self._save_msg_to_db(bot_output, "assistant") feedback_rate = rate + 1 # TODO Colocar nessa parte a estrutura para adicionar o feedback_data ao banco de dados st.session_state.services["edgedb_client"].query(''' INSERT Feedback { rating := $rating, content := $content, message := (SELECT Message FILTER .id = $message_id), prototipo := (SELECT Prototipo FILTER .id = $prototipo_id) } ''', message_id=msg[-1].id, rating=feedback_rate, content=text_feedback, prototipo_id=st.session_state.prototipo_id ) # st.session_state.chat_history st.success(f"Avaliação enviada!") except Exception as e: print(e) st.error("Erro ao enviar avaliação!") st.session_state.is_feedback_active = False # Desativando o feedback