chatbot-carometro / mysqlchatstore.py
leandroaraujodev's picture
feat: bump version 0.4.0
e92e757
from typing import Optional, Any
from sqlalchemy import create_engine, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from pydantic import Field
import pymysql
from llama_index.core.storage.chat_store import BaseChatStore
from llama_index.core.llms import ChatMessage
from llama_index.core.memory import ChatMemoryBuffer
class MySQLChatStore(BaseChatStore):
"""
Implementação de um ChatStore que armazena mensagens em uma tabela MySQL,
unindo a pergunta do usuário e a resposta do assistente na mesma linha.
"""
table_name: Optional[str] = Field(default="chatstore", description="Nome da tabela MySQL.")
_session: Optional[sessionmaker] = None
_async_session: Optional[sessionmaker] = None
def __init__(self, session: sessionmaker, async_session: sessionmaker, table_name: str):
super().__init__(table_name=table_name.lower())
self._session = session
self._async_session = async_session
self._initialize()
@classmethod
def from_params(cls, host: str, port: str, database: str, user: str, password: str, table_name: str = "chatstore") -> "MySQLChatStore":
"""
Cria o sessionmaker síncrono e assíncrono, retornando a instância da classe.
"""
conn_str = f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
async_conn_str = f"mysql+aiomysql://{user}:{password}@{host}:{port}/{database}"
session, async_session = cls._connect(conn_str, async_conn_str)
return cls(session=session, async_session=async_session, table_name=table_name)
@classmethod
def _connect(cls, connection_string: str, async_connection_string: str) -> tuple[sessionmaker, sessionmaker]:
"""
Cria e retorna um sessionmaker síncrono e um sessionmaker assíncrono.
"""
engine = create_engine(connection_string, echo=False)
session = sessionmaker(bind=engine)
async_engine = create_async_engine(async_connection_string)
async_session = sessionmaker(bind=async_engine, class_=AsyncSession)
return session, async_session
def _initialize(self):
"""
Garante que a tabela exista, com colunas para armazenar user_input e response.
"""
with self._session() as session:
session.execute(text(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
chat_store_key VARCHAR(255) NOT NULL,
user_input TEXT,
response TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
session.commit()
def get_keys(self) -> list[str]:
"""
Retorna todas as chaves armazenadas.
"""
with self._session() as session:
result = session.execute(text(f"""
SELECT DISTINCT chat_store_key FROM {self.table_name}
"""))
return [row[0] for row in result.fetchall()]
def get_messages(self, key: str) -> list[ChatMessage]:
"""
Retorna a conversa inteira (perguntas e respostas), na ordem de inserção (id).
Cada linha pode conter o user_input, o response ou ambos (caso já respondido).
"""
with self._session() as session:
rows = session.execute(text(f"""
SELECT user_input, response
FROM {self.table_name}
WHERE chat_store_key = :key
ORDER BY id
"""), {"key": key}).fetchall()
messages = []
for user_in, resp in rows:
if user_in is not None:
messages.append(ChatMessage(role='user', content=user_in))
if resp is not None:
messages.append(ChatMessage(role='assistant', content=resp))
return messages
def set_messages(self, key: str, messages: list[ChatMessage]) -> None:
"""
Sobrescreve o histórico de mensagens de uma chave (apaga tudo e insere novamente).
Se quiser somente acrescentar, use add_message.
Aqui, cada pergunta do usuário gera uma nova linha.
Assim que encontrar uma mensagem de assistente, atualiza essa mesma linha.
Se houver assistentes sem usuários, insere normalmente.
"""
with self._session() as session:
# Limpa histórico anterior
session.execute(text(f"""
DELETE FROM {self.table_name} WHERE chat_store_key = :key
"""), {"key": key})
# Reinsere na ordem
current_id = None
for msg in messages:
if msg.role == 'user':
# Cria nova linha com user_input
result = session.execute(text(f"""
INSERT INTO {self.table_name} (chat_store_key, user_input)
VALUES (:key, :ui)
"""), {"key": key, "ui": msg.content})
# Pega o id do insert
current_id = result.lastrowid
else:
# Tenta atualizar a última linha se existir
if current_id is not None:
session.execute(text(f"""
UPDATE {self.table_name}
SET response = :resp
WHERE id = :id
"""), {"resp": msg.content, "id": current_id})
# Depois de atualizar a linha, zera o current_id
current_id = None
else:
# Se não houver pergunta pendente, insere como nova linha
session.execute(text(f"""
INSERT INTO {self.table_name} (chat_store_key, response)
VALUES (:key, :resp)
"""), {"key": key, "resp": msg.content})
session.commit()
def add_message(self, key: str, message: ChatMessage) -> None:
"""
Acrescenta uma nova mensagem no fluxo. Se for do usuário, insere nova linha;
se for do assistente, tenta preencher a linha pendente que não tenha resposta.
"""
with self._session() as session:
if message.role == 'user':
# Sempre cria uma nova linha para mensagens de usuário
insert_stmt = text(f"""
INSERT INTO {self.table_name} (chat_store_key, user_input)
VALUES (:key, :ui)
""")
session.execute(insert_stmt, {
"key": key,
"ui": message.content
})
else:
# Tenta encontrar a última linha sem resposta
row = session.execute(text(f"""
SELECT id
FROM {self.table_name}
WHERE chat_store_key = :key
AND user_input IS NOT NULL
AND response IS NULL
ORDER BY id DESC
LIMIT 1
"""), {"key": key}).fetchone()
if row:
# Atualiza com a resposta
msg_id = row[0]
update_stmt = text(f"""
UPDATE {self.table_name}
SET response = :resp
WHERE id = :id
""")
session.execute(update_stmt, {
"resp": message.content,
"id": msg_id
})
else:
# Se não achar linha pendente, insere como nova
insert_stmt = text(f"""
INSERT INTO {self.table_name} (chat_store_key, response)
VALUES (:key, :resp)
""")
session.execute(insert_stmt, {
"key": key,
"resp": message.content
})
session.commit()
def delete_messages(self, key: str) -> None:
"""
Remove todas as linhas associadas a 'key'.
"""
with self._session() as session:
session.execute(text(f"""
DELETE FROM {self.table_name} WHERE chat_store_key = :key
"""), {"key": key})
session.commit()
def delete_last_message(self, key: str) -> Optional[ChatMessage]:
"""
Apaga a última mensagem da conversa (considerando a ordem de inserção).
Se a última linha tiver pergunta e resposta, remove primeiro a resposta;
caso não exista resposta, remove a linha inteira.
"""
with self._session() as session:
# Localiza a última linha
row = session.execute(text(f"""
SELECT id, user_input, response
FROM {self.table_name}
WHERE chat_store_key = :key
ORDER BY id DESC
LIMIT 1
"""), {"key": key}).fetchone()
if not row:
return None
row_id, user_in, resp = row
# Se a linha tiver somente pergunta, apagamos a linha inteira.
# Se tiver também a resposta, apagamos só a parte do assistente.
if user_in and resp:
# Remove a resposta
session.execute(text(f"""
UPDATE {self.table_name}
SET response = NULL
WHERE id = :id
"""), {"id": row_id})
session.commit()
return ChatMessage(role='assistant', content=resp)
else:
# Deleta a linha inteira
session.execute(text(f"""
DELETE FROM {self.table_name}
WHERE id = :id
"""), {"id": row_id})
session.commit()
if user_in:
return ChatMessage(role='user', content=user_in)
elif resp:
return ChatMessage(role='assistant', content=resp)
else:
return None
def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
"""
Deleta a mensagem com base na ordem total do histórico. O índice 'idx' é
calculado após reconstruir a lista de ChatMessages (user e assistant).
"""
messages = self.get_messages(key)
if idx < 0 or idx >= len(messages):
return None
removed = messages[idx]
# Agora precisamos traduzir 'idx' para saber qual registro no banco será modificado.
# É mais simples recriar todos os dados com set_messages sem a mensagem em 'idx':
messages.pop(idx)
self.set_messages(key, messages)
return removed