oceansweep commited on
Commit
c5f6dbc
·
verified ·
1 Parent(s): c8ebc55

Update App_Function_Libraries/DB/Character_Chat_DB.py

Browse files
App_Function_Libraries/DB/Character_Chat_DB.py CHANGED
@@ -1,701 +1,701 @@
1
- # character_chat_db.py
2
- # Database functions for managing character cards and chat histories.
3
- # #
4
- # Imports
5
- import configparser
6
- import sqlite3
7
- import json
8
- import os
9
- import sys
10
- from typing import List, Dict, Optional, Tuple, Any, Union
11
-
12
- from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
13
- from Tests.Chat_APIs.Chat_APIs_Integration_test import logging
14
-
15
- #
16
- #######################################################################################################################
17
- #
18
- #
19
-
20
- def ensure_database_directory():
21
- os.makedirs(get_database_dir(), exist_ok=True)
22
-
23
- ensure_database_directory()
24
-
25
-
26
- # Construct the path to the config file
27
- config_path = get_project_relative_path('Config_Files/config.txt')
28
-
29
- # Read the config file
30
- config = configparser.ConfigParser()
31
- config.read(config_path)
32
-
33
- # Get the chat db path from the config, or use the default if not specified
34
- chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
35
- print(f"Chat Database path: {chat_DB_PATH}")
36
-
37
- ########################################################################################################
38
- #
39
- # Functions
40
-
41
- # FIXME - Setup properly and test/add documentation for its existence...
42
- def initialize_database():
43
- """Initialize the SQLite database with required tables and FTS5 virtual tables."""
44
- conn = None
45
- try:
46
- conn = sqlite3.connect(chat_DB_PATH)
47
- cursor = conn.cursor()
48
-
49
- # Enable foreign key constraints
50
- cursor.execute("PRAGMA foreign_keys = ON;")
51
-
52
- # Create CharacterCards table with V2 fields
53
- cursor.execute("""
54
- CREATE TABLE IF NOT EXISTS CharacterCards (
55
- id INTEGER PRIMARY KEY AUTOINCREMENT,
56
- name TEXT UNIQUE NOT NULL,
57
- description TEXT,
58
- personality TEXT,
59
- scenario TEXT,
60
- image BLOB,
61
- post_history_instructions TEXT,
62
- first_mes TEXT,
63
- mes_example TEXT,
64
- creator_notes TEXT,
65
- system_prompt TEXT,
66
- alternate_greetings TEXT,
67
- tags TEXT,
68
- creator TEXT,
69
- character_version TEXT,
70
- extensions TEXT,
71
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP
72
- );
73
- """)
74
-
75
- # Create CharacterChats table
76
- cursor.execute("""
77
- CREATE TABLE IF NOT EXISTS CharacterChats (
78
- id INTEGER PRIMARY KEY AUTOINCREMENT,
79
- character_id INTEGER NOT NULL,
80
- conversation_name TEXT,
81
- chat_history TEXT,
82
- is_snapshot BOOLEAN DEFAULT FALSE,
83
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
84
- FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
85
- );
86
- """)
87
-
88
- # Create FTS5 virtual table for CharacterChats
89
- cursor.execute("""
90
- CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
91
- conversation_name,
92
- chat_history,
93
- content='CharacterChats',
94
- content_rowid='id'
95
- );
96
- """)
97
-
98
- # Create triggers to keep FTS5 table in sync with CharacterChats
99
- cursor.executescript("""
100
- CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
101
- INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
102
- VALUES (new.id, new.conversation_name, new.chat_history);
103
- END;
104
-
105
- CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
106
- DELETE FROM CharacterChats_fts WHERE rowid = old.id;
107
- END;
108
-
109
- CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
110
- UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
111
- WHERE rowid = new.id;
112
- END;
113
- """)
114
-
115
- # Create ChatKeywords table
116
- cursor.execute("""
117
- CREATE TABLE IF NOT EXISTS ChatKeywords (
118
- chat_id INTEGER NOT NULL,
119
- keyword TEXT NOT NULL,
120
- FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
121
- );
122
- """)
123
-
124
- # Create indexes for faster searches
125
- cursor.execute("""
126
- CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
127
- """)
128
- cursor.execute("""
129
- CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
130
- """)
131
-
132
- conn.commit()
133
- logging.info("Database initialized successfully.")
134
- except sqlite3.Error as e:
135
- logging.error(f"SQLite error occurred during database initialization: {e}")
136
- if conn:
137
- conn.rollback()
138
- raise
139
- except Exception as e:
140
- logging.error(f"Unexpected error occurred during database initialization: {e}")
141
- if conn:
142
- conn.rollback()
143
- raise
144
- finally:
145
- if conn:
146
- conn.close()
147
-
148
- # Call initialize_database() at the start of your application
149
- def setup_chat_database():
150
- try:
151
- initialize_database()
152
- except Exception as e:
153
- logging.critical(f"Failed to initialize database: {e}")
154
- sys.exit(1)
155
-
156
- setup_chat_database()
157
-
158
- ########################################################################################################
159
- #
160
- # Character Card handling
161
-
162
- def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
163
- """Parse and validate a character card according to V2 specification."""
164
- v2_data = {
165
- 'name': card_data.get('name', ''),
166
- 'description': card_data.get('description', ''),
167
- 'personality': card_data.get('personality', ''),
168
- 'scenario': card_data.get('scenario', ''),
169
- 'first_mes': card_data.get('first_mes', ''),
170
- 'mes_example': card_data.get('mes_example', ''),
171
- 'creator_notes': card_data.get('creator_notes', ''),
172
- 'system_prompt': card_data.get('system_prompt', ''),
173
- 'post_history_instructions': card_data.get('post_history_instructions', ''),
174
- 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
175
- 'tags': json.dumps(card_data.get('tags', [])),
176
- 'creator': card_data.get('creator', ''),
177
- 'character_version': card_data.get('character_version', ''),
178
- 'extensions': json.dumps(card_data.get('extensions', {}))
179
- }
180
-
181
- # Handle 'image' separately as it might be binary data
182
- if 'image' in card_data:
183
- v2_data['image'] = card_data['image']
184
-
185
- return v2_data
186
-
187
-
188
- def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
189
- """Add or update a character card in the database."""
190
- conn = sqlite3.connect(chat_DB_PATH)
191
- cursor = conn.cursor()
192
- try:
193
- parsed_card = parse_character_card(card_data)
194
-
195
- # Check if character already exists
196
- cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
197
- row = cursor.fetchone()
198
-
199
- if row:
200
- # Update existing character
201
- character_id = row[0]
202
- update_query = """
203
- UPDATE CharacterCards
204
- SET description = ?, personality = ?, scenario = ?, image = ?,
205
- post_history_instructions = ?, first_mes = ?, mes_example = ?,
206
- creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
207
- tags = ?, creator = ?, character_version = ?, extensions = ?
208
- WHERE id = ?
209
- """
210
- cursor.execute(update_query, (
211
- parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
212
- parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
213
- parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
214
- parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
215
- parsed_card['character_version'], parsed_card['extensions'], character_id
216
- ))
217
- else:
218
- # Insert new character
219
- insert_query = """
220
- INSERT INTO CharacterCards (name, description, personality, scenario, image,
221
- post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
222
- alternate_greetings, tags, creator, character_version, extensions)
223
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
224
- """
225
- cursor.execute(insert_query, (
226
- parsed_card['name'], parsed_card['description'], parsed_card['personality'],
227
- parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
228
- parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
229
- parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
230
- parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
231
- ))
232
- character_id = cursor.lastrowid
233
-
234
- conn.commit()
235
- return character_id
236
- except sqlite3.IntegrityError as e:
237
- logging.error(f"Error adding character card: {e}")
238
- return None
239
- except Exception as e:
240
- logging.error(f"Unexpected error adding character card: {e}")
241
- return None
242
- finally:
243
- conn.close()
244
-
245
- # def add_character_card(card_data: Dict) -> Optional[int]:
246
- # """Add or update a character card in the database.
247
- #
248
- # Returns the ID of the inserted character or None if failed.
249
- # """
250
- # conn = sqlite3.connect(chat_DB_PATH)
251
- # cursor = conn.cursor()
252
- # try:
253
- # # Ensure all required fields are present
254
- # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
255
- # for field in required_fields:
256
- # if field not in card_data:
257
- # card_data[field] = '' # Assign empty string if field is missing
258
- #
259
- # # Check if character already exists
260
- # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
261
- # row = cursor.fetchone()
262
- #
263
- # if row:
264
- # # Update existing character
265
- # character_id = row[0]
266
- # cursor.execute("""
267
- # UPDATE CharacterCards
268
- # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
269
- # WHERE id = ?
270
- # """, (
271
- # card_data['description'],
272
- # card_data['personality'],
273
- # card_data['scenario'],
274
- # card_data['image'],
275
- # card_data['post_history_instructions'],
276
- # card_data['first_message'],
277
- # character_id
278
- # ))
279
- # else:
280
- # # Insert new character
281
- # cursor.execute("""
282
- # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
283
- # VALUES (?, ?, ?, ?, ?, ?, ?)
284
- # """, (
285
- # card_data['name'],
286
- # card_data['description'],
287
- # card_data['personality'],
288
- # card_data['scenario'],
289
- # card_data['image'],
290
- # card_data['post_history_instructions'],
291
- # card_data['first_message']
292
- # ))
293
- # character_id = cursor.lastrowid
294
- #
295
- # conn.commit()
296
- # return cursor.lastrowid
297
- # except sqlite3.IntegrityError as e:
298
- # logging.error(f"Error adding character card: {e}")
299
- # return None
300
- # except Exception as e:
301
- # logging.error(f"Unexpected error adding character card: {e}")
302
- # return None
303
- # finally:
304
- # conn.close()
305
-
306
-
307
- def get_character_cards() -> List[Dict]:
308
- """Retrieve all character cards from the database."""
309
- logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
310
- conn = sqlite3.connect(chat_DB_PATH)
311
- cursor = conn.cursor()
312
- cursor.execute("SELECT * FROM CharacterCards")
313
- rows = cursor.fetchall()
314
- columns = [description[0] for description in cursor.description]
315
- conn.close()
316
- characters = [dict(zip(columns, row)) for row in rows]
317
- #logging.debug(f"Characters fetched from DB: {characters}")
318
- return characters
319
-
320
-
321
- def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
322
- """
323
- Retrieve a single character card by its ID.
324
-
325
- Args:
326
- character_id: Can be either an integer ID or a dictionary containing character data.
327
-
328
- Returns:
329
- A dictionary containing the character card data, or None if not found.
330
- """
331
- conn = sqlite3.connect(chat_DB_PATH)
332
- cursor = conn.cursor()
333
- try:
334
- if isinstance(character_id, dict):
335
- # If a dictionary is passed, assume it's already a character card
336
- return character_id
337
- elif isinstance(character_id, int):
338
- # If an integer is passed, fetch the character from the database
339
- cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
340
- row = cursor.fetchone()
341
- if row:
342
- columns = [description[0] for description in cursor.description]
343
- return dict(zip(columns, row))
344
- else:
345
- logging.warning(f"Invalid type for character_id: {type(character_id)}")
346
- return None
347
- except Exception as e:
348
- logging.error(f"Error in get_character_card_by_id: {e}")
349
- return None
350
- finally:
351
- conn.close()
352
-
353
-
354
- def update_character_card(character_id: int, card_data: Dict) -> bool:
355
- """Update an existing character card."""
356
- conn = sqlite3.connect(chat_DB_PATH)
357
- cursor = conn.cursor()
358
- try:
359
- cursor.execute("""
360
- UPDATE CharacterCards
361
- SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
362
- WHERE id = ?
363
- """, (
364
- card_data.get('name'),
365
- card_data.get('description'),
366
- card_data.get('personality'),
367
- card_data.get('scenario'),
368
- card_data.get('image'),
369
- card_data.get('post_history_instructions', ''),
370
- card_data.get('first_message', "Hello! I'm ready to chat."),
371
- character_id
372
- ))
373
- conn.commit()
374
- return cursor.rowcount > 0
375
- except sqlite3.IntegrityError as e:
376
- logging.error(f"Error updating character card: {e}")
377
- return False
378
- finally:
379
- conn.close()
380
-
381
-
382
- def delete_character_card(character_id: int) -> bool:
383
- """Delete a character card and its associated chats."""
384
- conn = sqlite3.connect(chat_DB_PATH)
385
- cursor = conn.cursor()
386
- try:
387
- # Delete associated chats first due to foreign key constraint
388
- cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
389
- cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
390
- conn.commit()
391
- return cursor.rowcount > 0
392
- except sqlite3.Error as e:
393
- logging.error(f"Error deleting character card: {e}")
394
- return False
395
- finally:
396
- conn.close()
397
-
398
-
399
- def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
400
- """
401
- Add a new chat history for a character, optionally associating keywords.
402
-
403
- Args:
404
- character_id (int): The ID of the character.
405
- conversation_name (str): Name of the conversation.
406
- chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
407
- keywords (Optional[List[str]]): List of keywords to associate with this chat.
408
- is_snapshot (bool, optional): Whether this chat is a snapshot.
409
-
410
- Returns:
411
- Optional[int]: The ID of the inserted chat or None if failed.
412
- """
413
- conn = sqlite3.connect(chat_DB_PATH)
414
- cursor = conn.cursor()
415
- try:
416
- chat_history_json = json.dumps(chat_history)
417
- cursor.execute("""
418
- INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
419
- VALUES (?, ?, ?, ?)
420
- """, (
421
- character_id,
422
- conversation_name,
423
- chat_history_json,
424
- is_snapshot
425
- ))
426
- chat_id = cursor.lastrowid
427
-
428
- if keywords:
429
- # Insert keywords into ChatKeywords table
430
- keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
431
- cursor.executemany("""
432
- INSERT INTO ChatKeywords (chat_id, keyword)
433
- VALUES (?, ?)
434
- """, keyword_records)
435
-
436
- conn.commit()
437
- return chat_id
438
- except sqlite3.Error as e:
439
- logging.error(f"Error adding character chat: {e}")
440
- return None
441
- finally:
442
- conn.close()
443
-
444
-
445
- def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
446
- """Retrieve all chats, or chats for a specific character if character_id is provided."""
447
- conn = sqlite3.connect(chat_DB_PATH)
448
- cursor = conn.cursor()
449
- if character_id is not None:
450
- cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
451
- else:
452
- cursor.execute("SELECT * FROM CharacterChats")
453
- rows = cursor.fetchall()
454
- columns = [description[0] for description in cursor.description]
455
- conn.close()
456
- return [dict(zip(columns, row)) for row in rows]
457
-
458
-
459
- def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
460
- """Retrieve a single chat by its ID."""
461
- conn = sqlite3.connect(chat_DB_PATH)
462
- cursor = conn.cursor()
463
- cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
464
- row = cursor.fetchone()
465
- conn.close()
466
- if row:
467
- columns = [description[0] for description in cursor.description]
468
- chat = dict(zip(columns, row))
469
- chat['chat_history'] = json.loads(chat['chat_history'])
470
- return chat
471
- return None
472
-
473
-
474
- def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]:
475
- """
476
- Search for character chats using FTS5, optionally filtered by character_id.
477
-
478
- Args:
479
- query (str): The search query.
480
- character_id (Optional[int]): The ID of the character to filter chats by.
481
-
482
- Returns:
483
- Tuple[List[Dict], str]: A list of matching chats and a status message.
484
- """
485
- if not query.strip():
486
- return [], "Please enter a search query."
487
-
488
- conn = sqlite3.connect(chat_DB_PATH)
489
- cursor = conn.cursor()
490
- try:
491
- if character_id is not None:
492
- # Search with character_id filter
493
- cursor.execute("""
494
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
495
- FROM CharacterChats_fts
496
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
497
- WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ?
498
- ORDER BY rank
499
- """, (query, character_id))
500
- else:
501
- # Search without character_id filter
502
- cursor.execute("""
503
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
504
- FROM CharacterChats_fts
505
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
506
- WHERE CharacterChats_fts MATCH ?
507
- ORDER BY rank
508
- """, (query,))
509
-
510
- rows = cursor.fetchall()
511
- columns = [description[0] for description in cursor.description]
512
- results = [dict(zip(columns, row)) for row in rows]
513
-
514
- if character_id is not None:
515
- status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character."
516
- else:
517
- status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters."
518
-
519
- return results, status_message
520
- except Exception as e:
521
- logging.error(f"Error searching chats with FTS5: {e}")
522
- return [], f"Error occurred during search: {e}"
523
- finally:
524
- conn.close()
525
-
526
- def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
527
- """Update an existing chat history."""
528
- conn = sqlite3.connect(chat_DB_PATH)
529
- cursor = conn.cursor()
530
- try:
531
- chat_history_json = json.dumps(chat_history)
532
- cursor.execute("""
533
- UPDATE CharacterChats
534
- SET chat_history = ?
535
- WHERE id = ?
536
- """, (
537
- chat_history_json,
538
- chat_id
539
- ))
540
- conn.commit()
541
- return cursor.rowcount > 0
542
- except sqlite3.Error as e:
543
- logging.error(f"Error updating character chat: {e}")
544
- return False
545
- finally:
546
- conn.close()
547
-
548
-
549
- def delete_character_chat(chat_id: int) -> bool:
550
- """Delete a specific chat."""
551
- conn = sqlite3.connect(chat_DB_PATH)
552
- cursor = conn.cursor()
553
- try:
554
- cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
555
- conn.commit()
556
- return cursor.rowcount > 0
557
- except sqlite3.Error as e:
558
- logging.error(f"Error deleting character chat: {e}")
559
- return False
560
- finally:
561
- conn.close()
562
-
563
- def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
564
- """
565
- Fetch chat IDs associated with any of the specified keywords.
566
-
567
- Args:
568
- keywords (List[str]): List of keywords to search for.
569
-
570
- Returns:
571
- List[int]: List of chat IDs associated with the keywords.
572
- """
573
- if not keywords:
574
- return []
575
-
576
- conn = sqlite3.connect(chat_DB_PATH)
577
- cursor = conn.cursor()
578
- try:
579
- # Construct the WHERE clause to search for each keyword
580
- keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
581
- sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
582
- cursor.execute(sql_query, keywords)
583
- rows = cursor.fetchall()
584
- chat_ids = [row[0] for row in rows]
585
- return chat_ids
586
- except Exception as e:
587
- logging.error(f"Error in fetch_keywords_for_chats: {e}")
588
- return []
589
- finally:
590
- conn.close()
591
-
592
- def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
593
- """Save chat history to the CharacterChats table.
594
-
595
- Returns the ID of the inserted chat or None if failed.
596
- """
597
- return add_character_chat(character_id, conversation_name, chat_history)
598
-
599
- def migrate_chat_to_media_db():
600
- pass
601
-
602
-
603
- def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
604
- """
605
- Perform a full-text search on specified fields with optional filtering and pagination.
606
-
607
- Args:
608
- query (str): The search query.
609
- fields (List[str]): List of fields to search in.
610
- where_clause (str, optional): Additional SQL WHERE clause to filter results.
611
- page (int, optional): Page number for pagination.
612
- results_per_page (int, optional): Number of results per page.
613
-
614
- Returns:
615
- List[Dict[str, Any]]: List of matching chat records with content and metadata.
616
- """
617
- if not query.strip():
618
- return []
619
-
620
- conn = sqlite3.connect(chat_DB_PATH)
621
- cursor = conn.cursor()
622
- try:
623
- # Construct the MATCH query for FTS5
624
- match_query = " AND ".join(fields) + f" MATCH ?"
625
- # Adjust the query with the fields
626
- fts_query = f"""
627
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
628
- FROM CharacterChats_fts
629
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
630
- WHERE {match_query}
631
- """
632
- if where_clause:
633
- fts_query += f" AND ({where_clause})"
634
- fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
635
- offset = (page - 1) * results_per_page
636
- cursor.execute(fts_query, (query, results_per_page, offset))
637
- rows = cursor.fetchall()
638
- columns = [description[0] for description in cursor.description]
639
- results = [dict(zip(columns, row)) for row in rows]
640
- return results
641
- except Exception as e:
642
- logging.error(f"Error in search_db: {e}")
643
- return []
644
- finally:
645
- conn.close()
646
-
647
-
648
- def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
649
- List[Dict[str, Any]]:
650
- """
651
- Perform a full-text search within the specified chat IDs using FTS5.
652
-
653
- Args:
654
- query (str): The user's query.
655
- relevant_chat_ids (List[int]): List of chat IDs to search within.
656
- page (int): Pagination page number.
657
- results_per_page (int): Number of results per page.
658
-
659
- Returns:
660
- List[Dict[str, Any]]: List of search results with content and metadata.
661
- """
662
- try:
663
- # Construct a WHERE clause to limit the search to relevant chat IDs
664
- where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
665
- if not where_clause:
666
- where_clause = "1" # No restriction if no chat IDs
667
-
668
- # Perform full-text search using FTS5
669
- fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
670
-
671
- filtered_fts_results = [
672
- {
673
- "content": result['content'],
674
- "metadata": {"media_id": result['id']}
675
- }
676
- for result in fts_results
677
- if result['id'] in relevant_chat_ids
678
- ]
679
- return filtered_fts_results
680
- except Exception as e:
681
- logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
682
- return []
683
-
684
-
685
- def fetch_all_chats() -> List[Dict[str, Any]]:
686
- """
687
- Fetch all chat messages from the database.
688
-
689
- Returns:
690
- List[Dict[str, Any]]: List of chat messages with relevant metadata.
691
- """
692
- try:
693
- chats = get_character_chats() # Modify this function to retrieve all chats
694
- return chats
695
- except Exception as e:
696
- logging.error(f"Error fetching all chats: {str(e)}")
697
- return []
698
-
699
- #
700
- # End of Character_Chat_DB.py
701
- #######################################################################################################################
 
1
+ # character_chat_db.py
2
+ # Database functions for managing character cards and chat histories.
3
+ # #
4
+ # Imports
5
+ import configparser
6
+ import sqlite3
7
+ import logging
8
+ import json
9
+ import os
10
+ import sys
11
+ from typing import List, Dict, Optional, Tuple, Any, Union
12
+
13
+ from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
14
+
15
+ #
16
+ #######################################################################################################################
17
+ #
18
+ #
19
+
20
+ def ensure_database_directory():
21
+ os.makedirs(get_database_dir(), exist_ok=True)
22
+
23
+ ensure_database_directory()
24
+
25
+
26
+ # Construct the path to the config file
27
+ config_path = get_project_relative_path('Config_Files/config.txt')
28
+
29
+ # Read the config file
30
+ config = configparser.ConfigParser()
31
+ config.read(config_path)
32
+
33
+ # Get the chat db path from the config, or use the default if not specified
34
+ chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
35
+ print(f"Chat Database path: {chat_DB_PATH}")
36
+
37
+ ########################################################################################################
38
+ #
39
+ # Functions
40
+
41
+ # FIXME - Setup properly and test/add documentation for its existence...
42
+ def initialize_database():
43
+ """Initialize the SQLite database with required tables and FTS5 virtual tables."""
44
+ conn = None
45
+ try:
46
+ conn = sqlite3.connect(chat_DB_PATH)
47
+ cursor = conn.cursor()
48
+
49
+ # Enable foreign key constraints
50
+ cursor.execute("PRAGMA foreign_keys = ON;")
51
+
52
+ # Create CharacterCards table with V2 fields
53
+ cursor.execute("""
54
+ CREATE TABLE IF NOT EXISTS CharacterCards (
55
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
56
+ name TEXT UNIQUE NOT NULL,
57
+ description TEXT,
58
+ personality TEXT,
59
+ scenario TEXT,
60
+ image BLOB,
61
+ post_history_instructions TEXT,
62
+ first_mes TEXT,
63
+ mes_example TEXT,
64
+ creator_notes TEXT,
65
+ system_prompt TEXT,
66
+ alternate_greetings TEXT,
67
+ tags TEXT,
68
+ creator TEXT,
69
+ character_version TEXT,
70
+ extensions TEXT,
71
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
72
+ );
73
+ """)
74
+
75
+ # Create CharacterChats table
76
+ cursor.execute("""
77
+ CREATE TABLE IF NOT EXISTS CharacterChats (
78
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
79
+ character_id INTEGER NOT NULL,
80
+ conversation_name TEXT,
81
+ chat_history TEXT,
82
+ is_snapshot BOOLEAN DEFAULT FALSE,
83
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
84
+ FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
85
+ );
86
+ """)
87
+
88
+ # Create FTS5 virtual table for CharacterChats
89
+ cursor.execute("""
90
+ CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
91
+ conversation_name,
92
+ chat_history,
93
+ content='CharacterChats',
94
+ content_rowid='id'
95
+ );
96
+ """)
97
+
98
+ # Create triggers to keep FTS5 table in sync with CharacterChats
99
+ cursor.executescript("""
100
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
101
+ INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
102
+ VALUES (new.id, new.conversation_name, new.chat_history);
103
+ END;
104
+
105
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
106
+ DELETE FROM CharacterChats_fts WHERE rowid = old.id;
107
+ END;
108
+
109
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
110
+ UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
111
+ WHERE rowid = new.id;
112
+ END;
113
+ """)
114
+
115
+ # Create ChatKeywords table
116
+ cursor.execute("""
117
+ CREATE TABLE IF NOT EXISTS ChatKeywords (
118
+ chat_id INTEGER NOT NULL,
119
+ keyword TEXT NOT NULL,
120
+ FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
121
+ );
122
+ """)
123
+
124
+ # Create indexes for faster searches
125
+ cursor.execute("""
126
+ CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
127
+ """)
128
+ cursor.execute("""
129
+ CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
130
+ """)
131
+
132
+ conn.commit()
133
+ logging.info("Database initialized successfully.")
134
+ except sqlite3.Error as e:
135
+ logging.error(f"SQLite error occurred during database initialization: {e}")
136
+ if conn:
137
+ conn.rollback()
138
+ raise
139
+ except Exception as e:
140
+ logging.error(f"Unexpected error occurred during database initialization: {e}")
141
+ if conn:
142
+ conn.rollback()
143
+ raise
144
+ finally:
145
+ if conn:
146
+ conn.close()
147
+
148
+ # Call initialize_database() at the start of your application
149
+ def setup_chat_database():
150
+ try:
151
+ initialize_database()
152
+ except Exception as e:
153
+ logging.critical(f"Failed to initialize database: {e}")
154
+ sys.exit(1)
155
+
156
+ setup_chat_database()
157
+
158
+ ########################################################################################################
159
+ #
160
+ # Character Card handling
161
+
162
+ def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
163
+ """Parse and validate a character card according to V2 specification."""
164
+ v2_data = {
165
+ 'name': card_data.get('name', ''),
166
+ 'description': card_data.get('description', ''),
167
+ 'personality': card_data.get('personality', ''),
168
+ 'scenario': card_data.get('scenario', ''),
169
+ 'first_mes': card_data.get('first_mes', ''),
170
+ 'mes_example': card_data.get('mes_example', ''),
171
+ 'creator_notes': card_data.get('creator_notes', ''),
172
+ 'system_prompt': card_data.get('system_prompt', ''),
173
+ 'post_history_instructions': card_data.get('post_history_instructions', ''),
174
+ 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
175
+ 'tags': json.dumps(card_data.get('tags', [])),
176
+ 'creator': card_data.get('creator', ''),
177
+ 'character_version': card_data.get('character_version', ''),
178
+ 'extensions': json.dumps(card_data.get('extensions', {}))
179
+ }
180
+
181
+ # Handle 'image' separately as it might be binary data
182
+ if 'image' in card_data:
183
+ v2_data['image'] = card_data['image']
184
+
185
+ return v2_data
186
+
187
+
188
+ def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
189
+ """Add or update a character card in the database."""
190
+ conn = sqlite3.connect(chat_DB_PATH)
191
+ cursor = conn.cursor()
192
+ try:
193
+ parsed_card = parse_character_card(card_data)
194
+
195
+ # Check if character already exists
196
+ cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
197
+ row = cursor.fetchone()
198
+
199
+ if row:
200
+ # Update existing character
201
+ character_id = row[0]
202
+ update_query = """
203
+ UPDATE CharacterCards
204
+ SET description = ?, personality = ?, scenario = ?, image = ?,
205
+ post_history_instructions = ?, first_mes = ?, mes_example = ?,
206
+ creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
207
+ tags = ?, creator = ?, character_version = ?, extensions = ?
208
+ WHERE id = ?
209
+ """
210
+ cursor.execute(update_query, (
211
+ parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
212
+ parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
213
+ parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
214
+ parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
215
+ parsed_card['character_version'], parsed_card['extensions'], character_id
216
+ ))
217
+ else:
218
+ # Insert new character
219
+ insert_query = """
220
+ INSERT INTO CharacterCards (name, description, personality, scenario, image,
221
+ post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
222
+ alternate_greetings, tags, creator, character_version, extensions)
223
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
224
+ """
225
+ cursor.execute(insert_query, (
226
+ parsed_card['name'], parsed_card['description'], parsed_card['personality'],
227
+ parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
228
+ parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
229
+ parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
230
+ parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
231
+ ))
232
+ character_id = cursor.lastrowid
233
+
234
+ conn.commit()
235
+ return character_id
236
+ except sqlite3.IntegrityError as e:
237
+ logging.error(f"Error adding character card: {e}")
238
+ return None
239
+ except Exception as e:
240
+ logging.error(f"Unexpected error adding character card: {e}")
241
+ return None
242
+ finally:
243
+ conn.close()
244
+
245
+ # def add_character_card(card_data: Dict) -> Optional[int]:
246
+ # """Add or update a character card in the database.
247
+ #
248
+ # Returns the ID of the inserted character or None if failed.
249
+ # """
250
+ # conn = sqlite3.connect(chat_DB_PATH)
251
+ # cursor = conn.cursor()
252
+ # try:
253
+ # # Ensure all required fields are present
254
+ # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
255
+ # for field in required_fields:
256
+ # if field not in card_data:
257
+ # card_data[field] = '' # Assign empty string if field is missing
258
+ #
259
+ # # Check if character already exists
260
+ # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
261
+ # row = cursor.fetchone()
262
+ #
263
+ # if row:
264
+ # # Update existing character
265
+ # character_id = row[0]
266
+ # cursor.execute("""
267
+ # UPDATE CharacterCards
268
+ # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
269
+ # WHERE id = ?
270
+ # """, (
271
+ # card_data['description'],
272
+ # card_data['personality'],
273
+ # card_data['scenario'],
274
+ # card_data['image'],
275
+ # card_data['post_history_instructions'],
276
+ # card_data['first_message'],
277
+ # character_id
278
+ # ))
279
+ # else:
280
+ # # Insert new character
281
+ # cursor.execute("""
282
+ # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
283
+ # VALUES (?, ?, ?, ?, ?, ?, ?)
284
+ # """, (
285
+ # card_data['name'],
286
+ # card_data['description'],
287
+ # card_data['personality'],
288
+ # card_data['scenario'],
289
+ # card_data['image'],
290
+ # card_data['post_history_instructions'],
291
+ # card_data['first_message']
292
+ # ))
293
+ # character_id = cursor.lastrowid
294
+ #
295
+ # conn.commit()
296
+ # return cursor.lastrowid
297
+ # except sqlite3.IntegrityError as e:
298
+ # logging.error(f"Error adding character card: {e}")
299
+ # return None
300
+ # except Exception as e:
301
+ # logging.error(f"Unexpected error adding character card: {e}")
302
+ # return None
303
+ # finally:
304
+ # conn.close()
305
+
306
+
307
+ def get_character_cards() -> List[Dict]:
308
+ """Retrieve all character cards from the database."""
309
+ logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
310
+ conn = sqlite3.connect(chat_DB_PATH)
311
+ cursor = conn.cursor()
312
+ cursor.execute("SELECT * FROM CharacterCards")
313
+ rows = cursor.fetchall()
314
+ columns = [description[0] for description in cursor.description]
315
+ conn.close()
316
+ characters = [dict(zip(columns, row)) for row in rows]
317
+ #logging.debug(f"Characters fetched from DB: {characters}")
318
+ return characters
319
+
320
+
321
+ def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
322
+ """
323
+ Retrieve a single character card by its ID.
324
+
325
+ Args:
326
+ character_id: Can be either an integer ID or a dictionary containing character data.
327
+
328
+ Returns:
329
+ A dictionary containing the character card data, or None if not found.
330
+ """
331
+ conn = sqlite3.connect(chat_DB_PATH)
332
+ cursor = conn.cursor()
333
+ try:
334
+ if isinstance(character_id, dict):
335
+ # If a dictionary is passed, assume it's already a character card
336
+ return character_id
337
+ elif isinstance(character_id, int):
338
+ # If an integer is passed, fetch the character from the database
339
+ cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
340
+ row = cursor.fetchone()
341
+ if row:
342
+ columns = [description[0] for description in cursor.description]
343
+ return dict(zip(columns, row))
344
+ else:
345
+ logging.warning(f"Invalid type for character_id: {type(character_id)}")
346
+ return None
347
+ except Exception as e:
348
+ logging.error(f"Error in get_character_card_by_id: {e}")
349
+ return None
350
+ finally:
351
+ conn.close()
352
+
353
+
354
+ def update_character_card(character_id: int, card_data: Dict) -> bool:
355
+ """Update an existing character card."""
356
+ conn = sqlite3.connect(chat_DB_PATH)
357
+ cursor = conn.cursor()
358
+ try:
359
+ cursor.execute("""
360
+ UPDATE CharacterCards
361
+ SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
362
+ WHERE id = ?
363
+ """, (
364
+ card_data.get('name'),
365
+ card_data.get('description'),
366
+ card_data.get('personality'),
367
+ card_data.get('scenario'),
368
+ card_data.get('image'),
369
+ card_data.get('post_history_instructions', ''),
370
+ card_data.get('first_message', "Hello! I'm ready to chat."),
371
+ character_id
372
+ ))
373
+ conn.commit()
374
+ return cursor.rowcount > 0
375
+ except sqlite3.IntegrityError as e:
376
+ logging.error(f"Error updating character card: {e}")
377
+ return False
378
+ finally:
379
+ conn.close()
380
+
381
+
382
+ def delete_character_card(character_id: int) -> bool:
383
+ """Delete a character card and its associated chats."""
384
+ conn = sqlite3.connect(chat_DB_PATH)
385
+ cursor = conn.cursor()
386
+ try:
387
+ # Delete associated chats first due to foreign key constraint
388
+ cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
389
+ cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
390
+ conn.commit()
391
+ return cursor.rowcount > 0
392
+ except sqlite3.Error as e:
393
+ logging.error(f"Error deleting character card: {e}")
394
+ return False
395
+ finally:
396
+ conn.close()
397
+
398
+
399
+ def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
400
+ """
401
+ Add a new chat history for a character, optionally associating keywords.
402
+
403
+ Args:
404
+ character_id (int): The ID of the character.
405
+ conversation_name (str): Name of the conversation.
406
+ chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
407
+ keywords (Optional[List[str]]): List of keywords to associate with this chat.
408
+ is_snapshot (bool, optional): Whether this chat is a snapshot.
409
+
410
+ Returns:
411
+ Optional[int]: The ID of the inserted chat or None if failed.
412
+ """
413
+ conn = sqlite3.connect(chat_DB_PATH)
414
+ cursor = conn.cursor()
415
+ try:
416
+ chat_history_json = json.dumps(chat_history)
417
+ cursor.execute("""
418
+ INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
419
+ VALUES (?, ?, ?, ?)
420
+ """, (
421
+ character_id,
422
+ conversation_name,
423
+ chat_history_json,
424
+ is_snapshot
425
+ ))
426
+ chat_id = cursor.lastrowid
427
+
428
+ if keywords:
429
+ # Insert keywords into ChatKeywords table
430
+ keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
431
+ cursor.executemany("""
432
+ INSERT INTO ChatKeywords (chat_id, keyword)
433
+ VALUES (?, ?)
434
+ """, keyword_records)
435
+
436
+ conn.commit()
437
+ return chat_id
438
+ except sqlite3.Error as e:
439
+ logging.error(f"Error adding character chat: {e}")
440
+ return None
441
+ finally:
442
+ conn.close()
443
+
444
+
445
+ def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
446
+ """Retrieve all chats, or chats for a specific character if character_id is provided."""
447
+ conn = sqlite3.connect(chat_DB_PATH)
448
+ cursor = conn.cursor()
449
+ if character_id is not None:
450
+ cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
451
+ else:
452
+ cursor.execute("SELECT * FROM CharacterChats")
453
+ rows = cursor.fetchall()
454
+ columns = [description[0] for description in cursor.description]
455
+ conn.close()
456
+ return [dict(zip(columns, row)) for row in rows]
457
+
458
+
459
+ def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
460
+ """Retrieve a single chat by its ID."""
461
+ conn = sqlite3.connect(chat_DB_PATH)
462
+ cursor = conn.cursor()
463
+ cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
464
+ row = cursor.fetchone()
465
+ conn.close()
466
+ if row:
467
+ columns = [description[0] for description in cursor.description]
468
+ chat = dict(zip(columns, row))
469
+ chat['chat_history'] = json.loads(chat['chat_history'])
470
+ return chat
471
+ return None
472
+
473
+
474
+ def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]:
475
+ """
476
+ Search for character chats using FTS5, optionally filtered by character_id.
477
+
478
+ Args:
479
+ query (str): The search query.
480
+ character_id (Optional[int]): The ID of the character to filter chats by.
481
+
482
+ Returns:
483
+ Tuple[List[Dict], str]: A list of matching chats and a status message.
484
+ """
485
+ if not query.strip():
486
+ return [], "Please enter a search query."
487
+
488
+ conn = sqlite3.connect(chat_DB_PATH)
489
+ cursor = conn.cursor()
490
+ try:
491
+ if character_id is not None:
492
+ # Search with character_id filter
493
+ cursor.execute("""
494
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
495
+ FROM CharacterChats_fts
496
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
497
+ WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ?
498
+ ORDER BY rank
499
+ """, (query, character_id))
500
+ else:
501
+ # Search without character_id filter
502
+ cursor.execute("""
503
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
504
+ FROM CharacterChats_fts
505
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
506
+ WHERE CharacterChats_fts MATCH ?
507
+ ORDER BY rank
508
+ """, (query,))
509
+
510
+ rows = cursor.fetchall()
511
+ columns = [description[0] for description in cursor.description]
512
+ results = [dict(zip(columns, row)) for row in rows]
513
+
514
+ if character_id is not None:
515
+ status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character."
516
+ else:
517
+ status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters."
518
+
519
+ return results, status_message
520
+ except Exception as e:
521
+ logging.error(f"Error searching chats with FTS5: {e}")
522
+ return [], f"Error occurred during search: {e}"
523
+ finally:
524
+ conn.close()
525
+
526
+ def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
527
+ """Update an existing chat history."""
528
+ conn = sqlite3.connect(chat_DB_PATH)
529
+ cursor = conn.cursor()
530
+ try:
531
+ chat_history_json = json.dumps(chat_history)
532
+ cursor.execute("""
533
+ UPDATE CharacterChats
534
+ SET chat_history = ?
535
+ WHERE id = ?
536
+ """, (
537
+ chat_history_json,
538
+ chat_id
539
+ ))
540
+ conn.commit()
541
+ return cursor.rowcount > 0
542
+ except sqlite3.Error as e:
543
+ logging.error(f"Error updating character chat: {e}")
544
+ return False
545
+ finally:
546
+ conn.close()
547
+
548
+
549
+ def delete_character_chat(chat_id: int) -> bool:
550
+ """Delete a specific chat."""
551
+ conn = sqlite3.connect(chat_DB_PATH)
552
+ cursor = conn.cursor()
553
+ try:
554
+ cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
555
+ conn.commit()
556
+ return cursor.rowcount > 0
557
+ except sqlite3.Error as e:
558
+ logging.error(f"Error deleting character chat: {e}")
559
+ return False
560
+ finally:
561
+ conn.close()
562
+
563
+ def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
564
+ """
565
+ Fetch chat IDs associated with any of the specified keywords.
566
+
567
+ Args:
568
+ keywords (List[str]): List of keywords to search for.
569
+
570
+ Returns:
571
+ List[int]: List of chat IDs associated with the keywords.
572
+ """
573
+ if not keywords:
574
+ return []
575
+
576
+ conn = sqlite3.connect(chat_DB_PATH)
577
+ cursor = conn.cursor()
578
+ try:
579
+ # Construct the WHERE clause to search for each keyword
580
+ keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
581
+ sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
582
+ cursor.execute(sql_query, keywords)
583
+ rows = cursor.fetchall()
584
+ chat_ids = [row[0] for row in rows]
585
+ return chat_ids
586
+ except Exception as e:
587
+ logging.error(f"Error in fetch_keywords_for_chats: {e}")
588
+ return []
589
+ finally:
590
+ conn.close()
591
+
592
+ def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
593
+ """Save chat history to the CharacterChats table.
594
+
595
+ Returns the ID of the inserted chat or None if failed.
596
+ """
597
+ return add_character_chat(character_id, conversation_name, chat_history)
598
+
599
+ def migrate_chat_to_media_db():
600
+ pass
601
+
602
+
603
+ def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
604
+ """
605
+ Perform a full-text search on specified fields with optional filtering and pagination.
606
+
607
+ Args:
608
+ query (str): The search query.
609
+ fields (List[str]): List of fields to search in.
610
+ where_clause (str, optional): Additional SQL WHERE clause to filter results.
611
+ page (int, optional): Page number for pagination.
612
+ results_per_page (int, optional): Number of results per page.
613
+
614
+ Returns:
615
+ List[Dict[str, Any]]: List of matching chat records with content and metadata.
616
+ """
617
+ if not query.strip():
618
+ return []
619
+
620
+ conn = sqlite3.connect(chat_DB_PATH)
621
+ cursor = conn.cursor()
622
+ try:
623
+ # Construct the MATCH query for FTS5
624
+ match_query = " AND ".join(fields) + f" MATCH ?"
625
+ # Adjust the query with the fields
626
+ fts_query = f"""
627
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
628
+ FROM CharacterChats_fts
629
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
630
+ WHERE {match_query}
631
+ """
632
+ if where_clause:
633
+ fts_query += f" AND ({where_clause})"
634
+ fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
635
+ offset = (page - 1) * results_per_page
636
+ cursor.execute(fts_query, (query, results_per_page, offset))
637
+ rows = cursor.fetchall()
638
+ columns = [description[0] for description in cursor.description]
639
+ results = [dict(zip(columns, row)) for row in rows]
640
+ return results
641
+ except Exception as e:
642
+ logging.error(f"Error in search_db: {e}")
643
+ return []
644
+ finally:
645
+ conn.close()
646
+
647
+
648
+ def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
649
+ List[Dict[str, Any]]:
650
+ """
651
+ Perform a full-text search within the specified chat IDs using FTS5.
652
+
653
+ Args:
654
+ query (str): The user's query.
655
+ relevant_chat_ids (List[int]): List of chat IDs to search within.
656
+ page (int): Pagination page number.
657
+ results_per_page (int): Number of results per page.
658
+
659
+ Returns:
660
+ List[Dict[str, Any]]: List of search results with content and metadata.
661
+ """
662
+ try:
663
+ # Construct a WHERE clause to limit the search to relevant chat IDs
664
+ where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
665
+ if not where_clause:
666
+ where_clause = "1" # No restriction if no chat IDs
667
+
668
+ # Perform full-text search using FTS5
669
+ fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
670
+
671
+ filtered_fts_results = [
672
+ {
673
+ "content": result['content'],
674
+ "metadata": {"media_id": result['id']}
675
+ }
676
+ for result in fts_results
677
+ if result['id'] in relevant_chat_ids
678
+ ]
679
+ return filtered_fts_results
680
+ except Exception as e:
681
+ logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
682
+ return []
683
+
684
+
685
+ def fetch_all_chats() -> List[Dict[str, Any]]:
686
+ """
687
+ Fetch all chat messages from the database.
688
+
689
+ Returns:
690
+ List[Dict[str, Any]]: List of chat messages with relevant metadata.
691
+ """
692
+ try:
693
+ chats = get_character_chats() # Modify this function to retrieve all chats
694
+ return chats
695
+ except Exception as e:
696
+ logging.error(f"Error fetching all chats: {str(e)}")
697
+ return []
698
+
699
+ #
700
+ # End of Character_Chat_DB.py
701
+ #######################################################################################################################