veeiiinnnnn's picture
new
592cb1d
"""
Database service for resolving adapters and executing queries.
The backend now uses one active database configuration at a time.
"""
from __future__ import annotations
import asyncio
import logging
import threading
from typing import Any, Union
from pathlib import Path
import sqlite3
import psycopg2
from .db_adapters import SQLAlchemyAdapter, SQLiteAdapter, SupabaseAdapter, build_adapter
from .db_registry import ProviderConfig, get_provider_registry, normalize_provider_type
from ..config import get_settings
logger = logging.getLogger(__name__)
DbAdapter = Union[SQLiteAdapter, SupabaseAdapter, SQLAlchemyAdapter]
_adapter_cache: dict[str, DbAdapter] = {}
_adapter_cache_lock = threading.Lock()
APP_TABLES: list[str] = [
"conversation_documents",
"space_agents",
"attachments",
"conversation_events",
"conversation_messages",
"document_chunks",
"document_sections",
"space_documents",
"conversations",
"agents",
"spaces",
"home_shortcuts",
"home_notes",
"user_settings",
"memory_summaries",
"memory_domains",
"user_tools",
"pending_form_runs",
"scrapbook",
]
def _resolve_provider(provider_id_or_type: str | None) -> ProviderConfig | None:
registry = get_provider_registry()
providers = registry.list()
if not providers:
return None
active = providers[0]
if not provider_id_or_type:
return active
raw = str(provider_id_or_type).strip()
if not raw:
return active
by_id = registry.get(raw)
if by_id:
return by_id
provider_type = normalize_provider_type(raw)
if provider_type == active.type:
return active
return None
def get_db_adapter(provider_id_or_type: str | None = None) -> DbAdapter | None:
"""
Get a database adapter for the specified provider (or default).
"""
provider = _resolve_provider(provider_id_or_type)
if not provider:
# Only warn if explicitly requested but not found, or if no providers at all
if provider_id_or_type or not get_provider_registry().list():
logger.warning("[DB] No database provider found for: %s", provider_id_or_type)
return None
with _adapter_cache_lock:
if provider.id in _adapter_cache:
return _adapter_cache[provider.id]
try:
adapter = build_adapter(provider)
_adapter_cache[provider.id] = adapter
logger.info("[DB] Built adapter for provider: %s (%s)", provider.id, provider.type)
return adapter
except Exception as e:
logger.error("[DB] Failed to build adapter for %s: %s", provider.id, e)
return None
def invalidate_db_adapter_cache(provider_id: str | None = None) -> None:
with _adapter_cache_lock:
if provider_id is None:
_adapter_cache.clear()
return
_adapter_cache.pop(provider_id, None)
async def execute_db_async(adapter: DbAdapter, request: Any) -> Any:
"""
Execute a synchronous adapter query in a worker thread to avoid blocking the event loop.
"""
return await asyncio.to_thread(adapter.execute, request)
def initialize_provider_schema(provider: ProviderConfig) -> dict[str, Any]:
"""
Initialize required database schema for provider.
- SQLite: schema is auto-created by adapter constructor.
- Supabase/Postgres: execute supabase/schema.sql when SUPABASE_DB_URL is configured.
"""
if provider.type == "sqlite":
if not provider.sqlite_path:
return {"success": False, "message": "SQLite path is missing."}
# Ensure we don't reuse a stale adapter/connection after reset.
invalidate_db_adapter_cache(provider.id)
conn = None
try:
conn = sqlite3.connect(provider.sqlite_path)
cursor = conn.cursor()
cursor.execute("PRAGMA foreign_keys=OFF;")
for table in APP_TABLES:
cursor.execute(f"DROP TABLE IF EXISTS {table};")
conn.commit()
except Exception as exc:
logger.exception("[DB] SQLite reset failed: %s", exc)
return {"success": False, "message": f"SQLite reset failed: {exc}"}
finally:
if conn:
conn.close()
invalidate_db_adapter_cache(provider.id)
adapter = get_db_adapter(provider.id)
if not adapter:
return {"success": False, "message": "Failed to initialize SQLite adapter."}
return {"success": True, "message": "SQLite schema reset and initialized."}
if provider.type in {"postgres", "mysql", "mariadb"}:
return {
"success": False,
"message": (
f"Automatic schema initialization is not implemented for provider type '{provider.type}'. "
"Create the schema externally and use /db/query test to validate connectivity."
),
}
if provider.type != "supabase":
return {"success": False, "message": f"Unsupported provider type: {provider.type}"}
settings = get_settings()
db_url = (settings.supabase_db_url or "").strip()
if not db_url:
return {
"success": False,
"message": "SUPABASE_DB_URL is required for automatic Supabase initialization.",
}
schema_path = Path(__file__).resolve().parents[3] / "supabase" / "schema.sql"
if not schema_path.exists():
return {"success": False, "message": f"Schema file not found: {schema_path}"}
schema_sql = schema_path.read_text(encoding="utf-8")
if not schema_sql.strip():
return {"success": False, "message": "Schema SQL file is empty."}
conn = None
try:
conn = psycopg2.connect(db_url)
conn.autocommit = True
with conn.cursor() as cursor:
for table in APP_TABLES:
cursor.execute(f"DROP TABLE IF EXISTS public.{table} CASCADE;")
cursor.execute(schema_sql)
return {"success": True, "message": "Supabase schema reset and initialized."}
except Exception as exc:
logger.exception("[DB] Supabase initialization failed: %s", exc)
return {"success": False, "message": f"Supabase initialization failed: {exc}"}
finally:
if conn:
conn.close()