|
|
|
|
|
|
|
|
|
import configparser |
|
import sqlite3 |
|
import logging |
|
import json |
|
import os |
|
import sys |
|
from typing import List, Dict, Optional, Tuple, Any, Union |
|
|
|
from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_database_directory(): |
|
os.makedirs(get_database_dir(), exist_ok=True) |
|
|
|
ensure_database_directory() |
|
|
|
|
|
|
|
config_path = get_project_relative_path('Config_Files/config.txt') |
|
|
|
|
|
config = configparser.ConfigParser() |
|
config.read(config_path) |
|
|
|
|
|
chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db')) |
|
print(f"Chat Database path: {chat_DB_PATH}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
def initialize_database(): |
|
"""Initialize the SQLite database with required tables and FTS5 virtual tables.""" |
|
conn = None |
|
try: |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute("PRAGMA foreign_keys = ON;") |
|
|
|
|
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS CharacterCards ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
name TEXT UNIQUE NOT NULL, |
|
description TEXT, |
|
personality TEXT, |
|
scenario TEXT, |
|
image BLOB, |
|
post_history_instructions TEXT, |
|
first_mes TEXT, |
|
mes_example TEXT, |
|
creator_notes TEXT, |
|
system_prompt TEXT, |
|
alternate_greetings TEXT, |
|
tags TEXT, |
|
creator TEXT, |
|
character_version TEXT, |
|
extensions TEXT, |
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP |
|
); |
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS CharacterChats ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
character_id INTEGER NOT NULL, |
|
conversation_name TEXT, |
|
chat_history TEXT, |
|
is_snapshot BOOLEAN DEFAULT FALSE, |
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, |
|
FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE |
|
); |
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5( |
|
conversation_name, |
|
chat_history, |
|
content='CharacterChats', |
|
content_rowid='id' |
|
); |
|
""") |
|
|
|
|
|
cursor.executescript(""" |
|
CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN |
|
INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history) |
|
VALUES (new.id, new.conversation_name, new.chat_history); |
|
END; |
|
|
|
CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN |
|
DELETE FROM CharacterChats_fts WHERE rowid = old.id; |
|
END; |
|
|
|
CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN |
|
UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history |
|
WHERE rowid = new.id; |
|
END; |
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS ChatKeywords ( |
|
chat_id INTEGER NOT NULL, |
|
keyword TEXT NOT NULL, |
|
FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE |
|
); |
|
""") |
|
|
|
|
|
cursor.execute(""" |
|
CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword); |
|
""") |
|
cursor.execute(""" |
|
CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id); |
|
""") |
|
|
|
conn.commit() |
|
logging.info("Database initialized successfully.") |
|
except sqlite3.Error as e: |
|
logging.error(f"SQLite error occurred during database initialization: {e}") |
|
if conn: |
|
conn.rollback() |
|
raise |
|
except Exception as e: |
|
logging.error(f"Unexpected error occurred during database initialization: {e}") |
|
if conn: |
|
conn.rollback() |
|
raise |
|
finally: |
|
if conn: |
|
conn.close() |
|
|
|
|
|
def setup_chat_database(): |
|
try: |
|
initialize_database() |
|
except Exception as e: |
|
logging.critical(f"Failed to initialize database: {e}") |
|
sys.exit(1) |
|
|
|
setup_chat_database() |
|
|
|
|
|
|
|
|
|
|
|
def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Parse and validate a character card according to V2 specification.""" |
|
v2_data = { |
|
'name': card_data.get('name', ''), |
|
'description': card_data.get('description', ''), |
|
'personality': card_data.get('personality', ''), |
|
'scenario': card_data.get('scenario', ''), |
|
'first_mes': card_data.get('first_mes', ''), |
|
'mes_example': card_data.get('mes_example', ''), |
|
'creator_notes': card_data.get('creator_notes', ''), |
|
'system_prompt': card_data.get('system_prompt', ''), |
|
'post_history_instructions': card_data.get('post_history_instructions', ''), |
|
'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])), |
|
'tags': json.dumps(card_data.get('tags', [])), |
|
'creator': card_data.get('creator', ''), |
|
'character_version': card_data.get('character_version', ''), |
|
'extensions': json.dumps(card_data.get('extensions', {})) |
|
} |
|
|
|
|
|
if 'image' in card_data: |
|
v2_data['image'] = card_data['image'] |
|
|
|
return v2_data |
|
|
|
|
|
def add_character_card(card_data: Dict[str, Any]) -> Optional[int]: |
|
"""Add or update a character card in the database.""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
parsed_card = parse_character_card(card_data) |
|
|
|
|
|
cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],)) |
|
row = cursor.fetchone() |
|
|
|
if row: |
|
|
|
character_id = row[0] |
|
update_query = """ |
|
UPDATE CharacterCards |
|
SET description = ?, personality = ?, scenario = ?, image = ?, |
|
post_history_instructions = ?, first_mes = ?, mes_example = ?, |
|
creator_notes = ?, system_prompt = ?, alternate_greetings = ?, |
|
tags = ?, creator = ?, character_version = ?, extensions = ? |
|
WHERE id = ? |
|
""" |
|
cursor.execute(update_query, ( |
|
parsed_card['description'], parsed_card['personality'], parsed_card['scenario'], |
|
parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'], |
|
parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'], |
|
parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'], |
|
parsed_card['character_version'], parsed_card['extensions'], character_id |
|
)) |
|
else: |
|
|
|
insert_query = """ |
|
INSERT INTO CharacterCards (name, description, personality, scenario, image, |
|
post_history_instructions, first_mes, mes_example, creator_notes, system_prompt, |
|
alternate_greetings, tags, creator, character_version, extensions) |
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
|
""" |
|
cursor.execute(insert_query, ( |
|
parsed_card['name'], parsed_card['description'], parsed_card['personality'], |
|
parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'], |
|
parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'], |
|
parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'], |
|
parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions'] |
|
)) |
|
character_id = cursor.lastrowid |
|
|
|
conn.commit() |
|
return character_id |
|
except sqlite3.IntegrityError as e: |
|
logging.error(f"Error adding character card: {e}") |
|
return None |
|
except Exception as e: |
|
logging.error(f"Unexpected error adding character card: {e}") |
|
return None |
|
finally: |
|
conn.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_character_cards() -> List[Dict]: |
|
"""Retrieve all character cards from the database.""" |
|
logging.debug(f"Fetching characters from DB: {chat_DB_PATH}") |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT * FROM CharacterCards") |
|
rows = cursor.fetchall() |
|
columns = [description[0] for description in cursor.description] |
|
conn.close() |
|
characters = [dict(zip(columns, row)) for row in rows] |
|
|
|
return characters |
|
|
|
|
|
def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]: |
|
""" |
|
Retrieve a single character card by its ID. |
|
|
|
Args: |
|
character_id: Can be either an integer ID or a dictionary containing character data. |
|
|
|
Returns: |
|
A dictionary containing the character card data, or None if not found. |
|
""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
if isinstance(character_id, dict): |
|
|
|
return character_id |
|
elif isinstance(character_id, int): |
|
|
|
cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,)) |
|
row = cursor.fetchone() |
|
if row: |
|
columns = [description[0] for description in cursor.description] |
|
return dict(zip(columns, row)) |
|
else: |
|
logging.warning(f"Invalid type for character_id: {type(character_id)}") |
|
return None |
|
except Exception as e: |
|
logging.error(f"Error in get_character_card_by_id: {e}") |
|
return None |
|
finally: |
|
conn.close() |
|
|
|
|
|
def update_character_card(character_id: int, card_data: Dict) -> bool: |
|
"""Update an existing character card.""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
cursor.execute(""" |
|
UPDATE CharacterCards |
|
SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? |
|
WHERE id = ? |
|
""", ( |
|
card_data.get('name'), |
|
card_data.get('description'), |
|
card_data.get('personality'), |
|
card_data.get('scenario'), |
|
card_data.get('image'), |
|
card_data.get('post_history_instructions', ''), |
|
card_data.get('first_message', "Hello! I'm ready to chat."), |
|
character_id |
|
)) |
|
conn.commit() |
|
return cursor.rowcount > 0 |
|
except sqlite3.IntegrityError as e: |
|
logging.error(f"Error updating character card: {e}") |
|
return False |
|
finally: |
|
conn.close() |
|
|
|
|
|
def delete_character_card(character_id: int) -> bool: |
|
"""Delete a character card and its associated chats.""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
|
|
cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,)) |
|
cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,)) |
|
conn.commit() |
|
return cursor.rowcount > 0 |
|
except sqlite3.Error as e: |
|
logging.error(f"Error deleting character card: {e}") |
|
return False |
|
finally: |
|
conn.close() |
|
|
|
|
|
def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]: |
|
""" |
|
Add a new chat history for a character, optionally associating keywords. |
|
|
|
Args: |
|
character_id (int): The ID of the character. |
|
conversation_name (str): Name of the conversation. |
|
chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples. |
|
keywords (Optional[List[str]]): List of keywords to associate with this chat. |
|
is_snapshot (bool, optional): Whether this chat is a snapshot. |
|
|
|
Returns: |
|
Optional[int]: The ID of the inserted chat or None if failed. |
|
""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
chat_history_json = json.dumps(chat_history) |
|
cursor.execute(""" |
|
INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot) |
|
VALUES (?, ?, ?, ?) |
|
""", ( |
|
character_id, |
|
conversation_name, |
|
chat_history_json, |
|
is_snapshot |
|
)) |
|
chat_id = cursor.lastrowid |
|
|
|
if keywords: |
|
|
|
keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords] |
|
cursor.executemany(""" |
|
INSERT INTO ChatKeywords (chat_id, keyword) |
|
VALUES (?, ?) |
|
""", keyword_records) |
|
|
|
conn.commit() |
|
return chat_id |
|
except sqlite3.Error as e: |
|
logging.error(f"Error adding character chat: {e}") |
|
return None |
|
finally: |
|
conn.close() |
|
|
|
|
|
def get_character_chats(character_id: Optional[int] = None) -> List[Dict]: |
|
"""Retrieve all chats, or chats for a specific character if character_id is provided.""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
if character_id is not None: |
|
cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,)) |
|
else: |
|
cursor.execute("SELECT * FROM CharacterChats") |
|
rows = cursor.fetchall() |
|
columns = [description[0] for description in cursor.description] |
|
conn.close() |
|
return [dict(zip(columns, row)) for row in rows] |
|
|
|
|
|
def get_character_chat_by_id(chat_id: int) -> Optional[Dict]: |
|
"""Retrieve a single chat by its ID.""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,)) |
|
row = cursor.fetchone() |
|
conn.close() |
|
if row: |
|
columns = [description[0] for description in cursor.description] |
|
chat = dict(zip(columns, row)) |
|
chat['chat_history'] = json.loads(chat['chat_history']) |
|
return chat |
|
return None |
|
|
|
|
|
def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]: |
|
""" |
|
Search for character chats using FTS5, optionally filtered by character_id. |
|
|
|
Args: |
|
query (str): The search query. |
|
character_id (Optional[int]): The ID of the character to filter chats by. |
|
|
|
Returns: |
|
Tuple[List[Dict], str]: A list of matching chats and a status message. |
|
""" |
|
if not query.strip(): |
|
return [], "Please enter a search query." |
|
|
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
if character_id is not None: |
|
|
|
cursor.execute(""" |
|
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history |
|
FROM CharacterChats_fts |
|
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id |
|
WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ? |
|
ORDER BY rank |
|
""", (query, character_id)) |
|
else: |
|
|
|
cursor.execute(""" |
|
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history |
|
FROM CharacterChats_fts |
|
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id |
|
WHERE CharacterChats_fts MATCH ? |
|
ORDER BY rank |
|
""", (query,)) |
|
|
|
rows = cursor.fetchall() |
|
columns = [description[0] for description in cursor.description] |
|
results = [dict(zip(columns, row)) for row in rows] |
|
|
|
if character_id is not None: |
|
status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character." |
|
else: |
|
status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters." |
|
|
|
return results, status_message |
|
except Exception as e: |
|
logging.error(f"Error searching chats with FTS5: {e}") |
|
return [], f"Error occurred during search: {e}" |
|
finally: |
|
conn.close() |
|
|
|
def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool: |
|
"""Update an existing chat history.""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
chat_history_json = json.dumps(chat_history) |
|
cursor.execute(""" |
|
UPDATE CharacterChats |
|
SET chat_history = ? |
|
WHERE id = ? |
|
""", ( |
|
chat_history_json, |
|
chat_id |
|
)) |
|
conn.commit() |
|
return cursor.rowcount > 0 |
|
except sqlite3.Error as e: |
|
logging.error(f"Error updating character chat: {e}") |
|
return False |
|
finally: |
|
conn.close() |
|
|
|
|
|
def delete_character_chat(chat_id: int) -> bool: |
|
"""Delete a specific chat.""" |
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,)) |
|
conn.commit() |
|
return cursor.rowcount > 0 |
|
except sqlite3.Error as e: |
|
logging.error(f"Error deleting character chat: {e}") |
|
return False |
|
finally: |
|
conn.close() |
|
|
|
def fetch_keywords_for_chats(keywords: List[str]) -> List[int]: |
|
""" |
|
Fetch chat IDs associated with any of the specified keywords. |
|
|
|
Args: |
|
keywords (List[str]): List of keywords to search for. |
|
|
|
Returns: |
|
List[int]: List of chat IDs associated with the keywords. |
|
""" |
|
if not keywords: |
|
return [] |
|
|
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
|
|
keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords)) |
|
sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}" |
|
cursor.execute(sql_query, keywords) |
|
rows = cursor.fetchall() |
|
chat_ids = [row[0] for row in rows] |
|
return chat_ids |
|
except Exception as e: |
|
logging.error(f"Error in fetch_keywords_for_chats: {e}") |
|
return [] |
|
finally: |
|
conn.close() |
|
|
|
def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]: |
|
"""Save chat history to the CharacterChats table. |
|
|
|
Returns the ID of the inserted chat or None if failed. |
|
""" |
|
return add_character_chat(character_id, conversation_name, chat_history) |
|
|
|
def migrate_chat_to_media_db(): |
|
pass |
|
|
|
|
|
def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]: |
|
""" |
|
Perform a full-text search on specified fields with optional filtering and pagination. |
|
|
|
Args: |
|
query (str): The search query. |
|
fields (List[str]): List of fields to search in. |
|
where_clause (str, optional): Additional SQL WHERE clause to filter results. |
|
page (int, optional): Page number for pagination. |
|
results_per_page (int, optional): Number of results per page. |
|
|
|
Returns: |
|
List[Dict[str, Any]]: List of matching chat records with content and metadata. |
|
""" |
|
if not query.strip(): |
|
return [] |
|
|
|
conn = sqlite3.connect(chat_DB_PATH) |
|
cursor = conn.cursor() |
|
try: |
|
|
|
match_query = " AND ".join(fields) + f" MATCH ?" |
|
|
|
fts_query = f""" |
|
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history |
|
FROM CharacterChats_fts |
|
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id |
|
WHERE {match_query} |
|
""" |
|
if where_clause: |
|
fts_query += f" AND ({where_clause})" |
|
fts_query += " ORDER BY rank LIMIT ? OFFSET ?" |
|
offset = (page - 1) * results_per_page |
|
cursor.execute(fts_query, (query, results_per_page, offset)) |
|
rows = cursor.fetchall() |
|
columns = [description[0] for description in cursor.description] |
|
results = [dict(zip(columns, row)) for row in rows] |
|
return results |
|
except Exception as e: |
|
logging.error(f"Error in search_db: {e}") |
|
return [] |
|
finally: |
|
conn.close() |
|
|
|
|
|
def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \ |
|
List[Dict[str, Any]]: |
|
""" |
|
Perform a full-text search within the specified chat IDs using FTS5. |
|
|
|
Args: |
|
query (str): The user's query. |
|
relevant_chat_ids (List[int]): List of chat IDs to search within. |
|
page (int): Pagination page number. |
|
results_per_page (int): Number of results per page. |
|
|
|
Returns: |
|
List[Dict[str, Any]]: List of search results with content and metadata. |
|
""" |
|
try: |
|
|
|
where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids]) |
|
if not where_clause: |
|
where_clause = "1" |
|
|
|
|
|
fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page) |
|
|
|
filtered_fts_results = [ |
|
{ |
|
"content": result['content'], |
|
"metadata": {"media_id": result['id']} |
|
} |
|
for result in fts_results |
|
if result['id'] in relevant_chat_ids |
|
] |
|
return filtered_fts_results |
|
except Exception as e: |
|
logging.error(f"Error in perform_full_text_search_chat: {str(e)}") |
|
return [] |
|
|
|
|
|
def fetch_all_chats() -> List[Dict[str, Any]]: |
|
""" |
|
Fetch all chat messages from the database. |
|
|
|
Returns: |
|
List[Dict[str, Any]]: List of chat messages with relevant metadata. |
|
""" |
|
try: |
|
chats = get_character_chats() |
|
return chats |
|
except Exception as e: |
|
logging.error(f"Error fetching all chats: {str(e)}") |
|
return [] |
|
|
|
|
|
|
|
|
|
|