oceansweep's picture
Update App_Function_Libraries/DB/DB_Manager.py
cce77c2 verified
raw
history blame
22 kB
import configparser
import logging
import os
from contextlib import contextmanager
from time import sleep
from typing import Tuple
import sqlite3
# 3rd-Party Libraries
from elasticsearch import Elasticsearch
############################################################################################################
#
# This file contains the DatabaseManager class, which is responsible for managing the database connection, i.e. either SQLite or Elasticsearch.
####
# The DatabaseManager class provides the following methods:
# - add_media: Add a new media item to the database
# - fetch_items_by_keyword: Fetch media items from the database based on a keyword
# - fetch_item_details: Fetch details of a specific media item from the database
# - update_media_content: Update the content of a specific media item in the database
# - search_and_display_items: Search for media items in the database and display the results
# - close_connection: Close the database connection
####
# Import your existing SQLite functions
from App_Function_Libraries.DB.SQLite_DB import (
update_media_content as sqlite_update_media_content,
list_prompts as sqlite_list_prompts,
search_and_display as sqlite_search_and_display,
fetch_prompt_details as sqlite_fetch_prompt_details,
keywords_browser_interface as sqlite_keywords_browser_interface,
add_keyword as sqlite_add_keyword,
delete_keyword as sqlite_delete_keyword,
export_keywords_to_csv as sqlite_export_keywords_to_csv,
ingest_article_to_db as sqlite_ingest_article_to_db,
add_media_to_database as sqlite_add_media_to_database,
import_obsidian_note_to_db as sqlite_import_obsidian_note_to_db,
add_prompt as sqlite_add_prompt,
delete_chat_message as sqlite_delete_chat_message,
update_chat_message as sqlite_update_chat_message,
add_chat_message as sqlite_add_chat_message,
get_chat_messages as sqlite_get_chat_messages,
search_chat_conversations as sqlite_search_chat_conversations,
create_chat_conversation as sqlite_create_chat_conversation,
save_chat_history_to_database as sqlite_save_chat_history_to_database,
view_database as sqlite_view_database,
get_transcripts as sqlite_get_transcripts,
get_trashed_items as sqlite_get_trashed_items,
user_delete_item as sqlite_user_delete_item,
empty_trash as sqlite_empty_trash,
create_automated_backup as sqlite_create_automated_backup,
add_or_update_prompt as sqlite_add_or_update_prompt,
load_prompt_details as sqlite_load_prompt_details,
load_preset_prompts as sqlite_load_preset_prompts,
insert_prompt_to_db as sqlite_insert_prompt_to_db,
delete_prompt as sqlite_delete_prompt,
search_and_display_items as sqlite_search_and_display_items,
get_conversation_name as sqlite_get_conversation_name,
add_media_with_keywords as sqlite_add_media_with_keywords,
check_media_and_whisper_model as sqlite_check_media_and_whisper_model,
DatabaseError, create_document_version as sqlite_create_document_version,
get_document_version as sqlite_get_document_version
)
class Database:
def __init__(self, db_path=None):
self.db_path = db_path or os.getenv('DB_NAME', 'media_summary.db')
self.pool = []
self.pool_size = 10
@contextmanager
def get_connection(self):
retry_count = 5
retry_delay = 1
conn = None
while retry_count > 0:
try:
conn = self.pool.pop() if self.pool else sqlite3.connect(self.db_path, check_same_thread=False)
yield conn
self.pool.append(conn)
return
except sqlite3.OperationalError as e:
if 'database is locked' in str(e):
logging.warning(f"Database is locked, retrying in {retry_delay} seconds...")
retry_count -= 1
sleep(retry_delay)
else:
raise DatabaseError(f"Database error: {e}")
except Exception as e:
raise DatabaseError(f"Unexpected error: {e}")
finally:
# Ensure the connection is returned to the pool even on failure
if conn and conn not in self.pool:
self.pool.append(conn)
raise DatabaseError("Database is locked and retries have been exhausted")
def execute_query(self, query: str, params: Tuple = ()) -> None:
with self.get_connection() as conn:
try:
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
except sqlite3.Error as e:
raise DatabaseError(f"Database error: {e}, Query: {query}")
def close_all_connections(self):
for conn in self.pool:
conn.close()
self.pool.clear()
def get_db_config():
config = configparser.ConfigParser()
config.read('config.txt')
return {
'type': config['Database']['type'],
'sqlite_path': config.get('Database', 'sqlite_path', fallback='media_summary.db'),
'elasticsearch_host': config.get('Database', 'elasticsearch_host', fallback='localhost'),
'elasticsearch_port': config.getint('Database', 'elasticsearch_port', fallback=9200)
}
db_config = get_db_config()
db_type = db_config['type']
if db_type == 'sqlite':
# Use the config path if provided, otherwise fall back to default
db = Database(db_config.get('sqlite_path'))
elif db_type == 'elasticsearch':
es = Elasticsearch([{
'host': db_config['elasticsearch_host'],
'port': db_config['elasticsearch_port']
}])
else:
raise ValueError(f"Unsupported database type: {db_type}")
db_path = db_config['sqlite_path']
# Update this path to the directory where you want to store the database backups
backup_dir = os.environ.get('DB_BACKUP_DIR', 'path/to/backup/directory')
if db_type == 'sqlite':
conn = sqlite3.connect(db_config['sqlite_path'])
cursor = conn.cursor()
elif db_type == 'elasticsearch':
es = Elasticsearch([{
'host': db_config['elasticsearch_host'],
'port': db_config['elasticsearch_port']
}])
else:
raise ValueError(f"Unsupported database type: {db_type}")
############################################################################################################
#
# DB-Searching functions
def view_database(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_view_database(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def search_and_display_items(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_search_and_display_items(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def search_and_display(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_search_and_display(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
#
# End of DB-Searching functions
############################################################################################################
############################################################################################################
#
# Transcript-related Functions
def get_transcripts(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_get_transcripts(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
#
# End of Transcript-related Functions
############################################################################################################
############################################################################################################
#
# DB-Ingestion functions
def add_media_to_database(*args, **kwargs):
if db_type == 'sqlite':
result = sqlite_add_media_to_database(*args, **kwargs)
# Extract content
segments = args[2]
if isinstance(segments, list):
content = ' '.join([segment.get('Text', '') for segment in segments if 'Text' in segment])
elif isinstance(segments, dict):
content = segments.get('text', '') or segments.get('content', '')
else:
content = str(segments)
# Extract media_id from the result
# Assuming the result is in the format "Media 'Title' added/updated successfully with ID: {media_id}"
import re
match = re.search(r"with ID: (\d+)", result)
if match:
media_id = int(match.group(1))
# Create initial document version
sqlite_create_document_version(media_id, content)
return result
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_to_database not yet implemented")
def import_obsidian_note_to_db(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_import_obsidian_note_to_db(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def update_media_content(*args, **kwargs):
if db_type == 'sqlite':
result = sqlite_update_media_content(*args, **kwargs)
# Extract media_id and content
selected_item = args[0]
item_mapping = args[1]
content_input = args[2]
if selected_item and item_mapping and selected_item in item_mapping:
media_id = item_mapping[selected_item]
# Create new document version
sqlite_create_document_version(media_id, content_input)
return result
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of update_media_content not yet implemented")
def add_media_with_keywords(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_add_media_with_keywords(*args, **kwargs)
elif db_type == 'elasticsearch':
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def check_media_and_whisper_model(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_check_media_and_whisper_model(*args, **kwargs)
elif db_type == 'elasticsearch':
raise NotImplementedError("Elasticsearch version of check_media_and_whisper_model not yet implemented")
def ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, custom_prompt):
if db_type == 'sqlite':
return sqlite_ingest_article_to_db(url, title, author, content, keywords, summary, ingestion_date, custom_prompt)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of ingest_article_to_db not yet implemented")
else:
raise ValueError(f"Unsupported database type: {db_type}")
#
# End of DB-Ingestion functions
############################################################################################################
############################################################################################################
#
# Prompt-related functions
def list_prompts(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_list_prompts(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def fetch_prompt_details(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_fetch_prompt_details(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def add_prompt(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_add_prompt(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def add_or_update_prompt(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_add_or_update_prompt(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def load_prompt_details(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_load_prompt_details(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def load_preset_prompts(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_load_preset_prompts(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def insert_prompt_to_db(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_insert_prompt_to_db(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def delete_prompt(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_delete_prompt(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
#
# End of Prompt-related functions
############################################################################################################
############################################################################################################
#
# Keywords-related Functions
def keywords_browser_interface(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_keywords_browser_interface(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def add_keyword(*args, **kwargs):
if db_type == 'sqlite':
with db.get_connection() as conn:
cursor = conn.cursor()
return sqlite_add_keyword(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def delete_keyword(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_delete_keyword(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def export_keywords_to_csv(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_export_keywords_to_csv(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
#
# End of Keywords-related Functions
############################################################################################################
############################################################################################################
#
# Chat-related Functions
def delete_chat_message(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_delete_chat_message(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def update_chat_message(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_update_chat_message(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def add_chat_message(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_add_chat_message(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def get_chat_messages(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_get_chat_messages(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def search_chat_conversations(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_search_chat_conversations(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def create_chat_conversation(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_create_chat_conversation(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def save_chat_history_to_database(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_save_chat_history_to_database(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def get_conversation_name(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_get_conversation_name(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
#
# End of Chat-related Functions
############################################################################################################
############################################################################################################
#
# Trash-related Functions
def get_trashed_items(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_get_trashed_items(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def user_delete_item(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_user_delete_item(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
def empty_trash(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_empty_trash(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
#
# End of Trash-related Functions
############################################################################################################
############################################################################################################
#
# DB-Backup Functions
def create_automated_backup(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_create_automated_backup(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of add_media_with_keywords not yet implemented")
#
# End of DB-Backup Functions
############################################################################################################
############################################################################################################
#
# Document Versioning Functions
def create_document_version(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_create_document_version(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of create_document_version not yet implemented")
def get_document_version(*args, **kwargs):
if db_type == 'sqlite':
return sqlite_get_document_version(*args, **kwargs)
elif db_type == 'elasticsearch':
# Implement Elasticsearch version
raise NotImplementedError("Elasticsearch version of get_document_version not yet implemented")
#
# End of Document Versioning Functions
############################################################################################################
############################################################################################################
#
# Function to close the database connection for SQLite
def close_connection():
if db_type == 'sqlite':
db.close_all_connections()
# Elasticsearch doesn't need explicit closing
#
# End of file
############################################################################################################